From e9bba9999b05eb54157ebd71b00a5689e2a1cc61 Mon Sep 17 00:00:00 2001 From: Ben Wallis Date: Wed, 5 Oct 2022 21:57:59 +0100 Subject: [PATCH] Added UDP Query Server to Veloren Server --- Cargo.lock | 40 +++++- common/frontend/src/lib.rs | 1 + server/Cargo.toml | 1 + server/src/lib.rs | 9 ++ server/src/query_server.rs | 226 ++++++++++++++++++++++++++++++++++ server/src/settings.rs | 4 +- server/src/sys/mod.rs | 2 + server/src/sys/server_info.rs | 40 ++++++ 8 files changed, 320 insertions(+), 3 deletions(-) create mode 100644 server/src/query_server.rs create mode 100644 server/src/sys/server_info.rs diff --git a/Cargo.lock b/Cargo.lock index ad98e1bdda..92b85e2626 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1907,6 +1907,16 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "backtrace", + "version_check 0.9.4", +] + [[package]] name = "error-code" version = "2.3.1" @@ -4662,6 +4672,31 @@ dependencies = [ "tracing", ] +[[package]] +name = "protocol" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13cfa9ba37e0183f87fb14b82f23fc76494c458c72469d95b8a8eec75ad5f191" +dependencies = [ + "byteorder", + "error-chain", + "flate2", + "num-traits", + "protocol-derive", + "uuid 0.8.2", +] + +[[package]] +name = "protocol-derive" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28647f30298898ead966b51e9aee5c74e4ac709ce5ca554378fde187fd3f7e47" +dependencies = [ + "proc-macro2 1.0.43", + "quote 1.0.21", + "syn 1.0.100", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -6468,7 +6503,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "rand 0.8.5", "static_assertions", ] @@ -6881,7 +6916,7 @@ dependencies = [ "crossbeam-channel", "futures-core", "futures-util", - "hashbrown 0.9.1", + "hashbrown 0.12.3", "lazy_static", "lz-fear", "prometheus", @@ -6968,6 +7003,7 @@ dependencies = [ "portpicker", "prometheus", "prometheus-hyper", + "protocol", "quinn", "rand 0.8.5", "rand_distr", diff --git a/common/frontend/src/lib.rs b/common/frontend/src/lib.rs index 48a2860f2a..63ff78bafd 100644 --- a/common/frontend/src/lib.rs +++ b/common/frontend/src/lib.rs @@ -74,6 +74,7 @@ where "quinn_proto::connection=info", "veloren_server::persistence::character=info", "veloren_server::settings=info", + "veloren_server::query_server=info", ]; for s in default_directives { diff --git a/server/Cargo.toml b/server/Cargo.toml index 330e97568d..e3def86940 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -64,6 +64,7 @@ rand_distr = "0.4.0" enumset = "1.0.8" noise = { version = "0.7", default-features = false } censor = "0.2" +protocol = { version = "3.4", features = ["derive"] } rusqlite = { version = "0.24.2", features = ["array", "vtab", "bundled", "trace"] } refinery = { git = "https://gitlab.com/veloren/refinery.git", rev = "8ecf4b4772d791e6c8c0a3f9b66a7530fad1af3e", features = ["rusqlite"] } diff --git a/server/src/lib.rs b/server/src/lib.rs index e40fb7778a..b9cff08250 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -38,6 +38,7 @@ pub mod terrain_persistence; mod weather; +mod query_server; pub mod wiring; // Reexports @@ -125,6 +126,7 @@ use { common_state::plugin::{memory_manager::EcsWorld, PluginMgr}, }; +use crate::{query_server::QueryServer, sys::server_info::ServerInfoRequest}; use common::comp::Anchor; #[cfg(feature = "worldgen")] use world::{ @@ -309,6 +311,7 @@ impl Server { state.ecs_mut().insert(tick_metrics); state.ecs_mut().insert(physics_metrics); state.ecs_mut().insert(server_event_metrics); + if settings.experimental_terrain_persistence { #[cfg(feature = "persistent_world")] { @@ -478,6 +481,12 @@ impl Server { .await }); + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + state.ecs_mut().insert(receiver); + let mut query_server = QueryServer::new(settings.query_address, sender); + // Run the query server in its own long-running future + runtime.spawn(async move { query_server.run().await }); + let mut printed_quic_warning = false; for protocol in &settings.gameserver_protocols { match protocol { diff --git a/server/src/query_server.rs b/server/src/query_server.rs new file mode 100644 index 0000000000..cd798bc55d --- /dev/null +++ b/server/src/query_server.rs @@ -0,0 +1,226 @@ +use crate::{settings::ServerBattleMode, ServerInfoRequest}; +use common::resources::BattleMode; +use protocol::{wire::dgram, Protocol}; +use std::{ + io, + io::Cursor, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + time::Duration, +}; +use tokio::{net::UdpSocket, sync::mpsc::UnboundedSender, time::timeout}; +use tracing::{debug, info, trace}; + +// NOTE: Debug logging is disabled by default for this module - to enable it add +// veloren_server::query_server=trace to RUST_LOG + +pub struct QueryServer { + pub bind_addr: SocketAddr, + pipeline: dgram::Pipeline, + server_info_request_sender: UnboundedSender, +} + +impl QueryServer { + pub fn new( + bind_addr: SocketAddr, + server_info_request_sender: UnboundedSender, + ) -> Self { + Self { + bind_addr, + pipeline: protocol::wire::dgram::Pipeline::new( + protocol::wire::middleware::pipeline::default(), + protocol::Settings::default(), + ), + server_info_request_sender, + } + } + + pub async fn run(&mut self) -> Result<(), io::Error> { + let socket = UdpSocket::bind(self.bind_addr).await?; + + info!("Query Server running at {}", self.bind_addr); + + loop { + let mut buf = vec![0; 1500]; + let (len, remote_addr) = socket.recv_from(&mut buf).await?; + + if !QueryServer::validate_datagram(len, &mut buf) { + continue; + } + + if let Err(e) = self.process_datagram(buf, remote_addr).await { + debug!(?e, "Failed to process incoming datagram") + } + } + } + + fn validate_datagram(len: usize, data: &mut Vec) -> bool { + const VELOREN_HEADER: [u8; 7] = [0x56, 0x45, 0x4C, 0x4F, 0x52, 0x45, 0x4E]; + + if len < 8 { + trace!("Ignoring packet - too short"); + false + } else if data[0..7] != VELOREN_HEADER { + trace!("Ignoring packet - missing header"); + false + } else { + trace!("Validated packet, data: {:?}", data); + + // Discard the header after successful validation + *data = data.split_off(7); + true + } + } + + async fn process_datagram( + &mut self, + datagram: Vec, + remote_addr: SocketAddr, + ) -> Result<(), QueryError> { + let packet: QueryServerPacketKind = + self.pipeline.receive_from(&mut Cursor::new(datagram))?; + + debug!(?packet, ?remote_addr, "Query Server received packet"); + + match packet { + QueryServerPacketKind::Ping(_) => { + QueryServer::send_response(remote_addr, &QueryServerPacketKind::Pong(Pong {})) + .await? + }, + QueryServerPacketKind::ServerInfoQuery(ref _query) => { + let (sender, receiver) = tokio::sync::oneshot::channel::(); + let req = ServerInfoRequest { + response_sender: sender, + }; + self.server_info_request_sender + .send(req) + .map_err(|e| QueryError::ChannelError(format!("{}", e)))?; + + tokio::spawn(async move { + match timeout(Duration::from_secs(2), async move { + match receiver.await { + Ok(response) => { + trace!(?response, "Sending ServerInfoResponse"); + QueryServer::send_response( + remote_addr, + &QueryServerPacketKind::ServerInfoResponse(response), + ) + .await + .expect("Failed to send response"); // TODO remove expect + }, + Err(_) => { + // Oneshot receive error + }, + } + }) + .await + { + Ok(_) => {}, + Err(elapsed) => { + debug!( + ?elapsed, + "Timeout expired while waiting for ServerInfoResponse" + ); + }, + } + }); + }, + QueryServerPacketKind::Pong(_) | QueryServerPacketKind::ServerInfoResponse(_) => { + // Ignore any incoming packets + debug!(?packet, "Dropping received response packet"); + }, + } + + Ok(()) + } + + async fn send_response( + dest: SocketAddr, + packet: &QueryServerPacketKind, + ) -> Result<(), QueryError> { + let socket = + UdpSocket::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))).await?; + + let mut buf = Vec::::new(); + + let mut pipeline = protocol::wire::dgram::Pipeline::new( + protocol::wire::middleware::pipeline::default(), + protocol::Settings::default(), + ); + + pipeline.send_to(&mut Cursor::new(&mut buf), packet)?; + socket.send_to(buf.as_slice(), dest).await?; + + Ok(()) + } +} + +#[derive(Debug)] +enum QueryError { + NetworkError(io::Error), + ProtocolError(protocol::Error), + ChannelError(String), +} + +impl From for QueryError { + fn from(e: protocol::Error) -> Self { QueryError::ProtocolError(e) } +} + +impl From for QueryError { + fn from(e: io::Error) -> Self { QueryError::NetworkError(e) } +} + +#[derive(protocol::Protocol, Clone, Debug, PartialEq)] +pub struct Ping; + +#[derive(protocol::Protocol, Clone, Debug, PartialEq)] +pub struct Pong; + +#[derive(protocol::Protocol, Clone, Debug, PartialEq)] +pub struct ServerInfoQuery; + +#[derive(protocol::Protocol, Clone, Debug, PartialEq)] +#[protocol(discriminant = "integer")] +#[protocol(discriminator(u8))] +pub enum QueryServerPacketKind { + #[protocol(discriminator(0x00))] + Ping(Ping), + #[protocol(discriminator(0x01))] + Pong(Pong), + #[protocol(discriminator(0xA0))] + ServerInfoQuery(ServerInfoQuery), + #[protocol(discriminator(0xA1))] + ServerInfoResponse(ServerInfoResponse), +} + +#[derive(Protocol, Debug, Clone, PartialEq)] +pub struct ServerInfoResponse { + pub git_hash: String, /* TODO: use u8 array instead? String includes 8 bytes for capacity + * and length that we don't need */ + pub players_current: u16, + pub players_max: u16, + pub battle_mode: QueryBattleMode, // TODO: use a custom enum to avoid accidental breakage +} + +#[derive(Protocol, Debug, Clone, PartialEq)] +#[protocol(discriminant = "integer")] +#[protocol(discriminator(u8))] +pub enum QueryBattleMode { + #[protocol(discriminator(0x00))] + GlobalPvP, + #[protocol(discriminator(0x01))] + GlobalPvE, + #[protocol(discriminator(0x02))] + PerPlayer, +} + +impl From for QueryBattleMode { + fn from(battle_mode: ServerBattleMode) -> Self { + match battle_mode { + ServerBattleMode::Global(x) => match x { + BattleMode::PvP => QueryBattleMode::GlobalPvP, + BattleMode::PvE => QueryBattleMode::GlobalPvE, + }, + ServerBattleMode::PerPlayer { .. } => QueryBattleMode::PerPlayer, + } + } +} diff --git a/server/src/settings.rs b/server/src/settings.rs index 588e6d4dc8..2f0cfc5381 100644 --- a/server/src/settings.rs +++ b/server/src/settings.rs @@ -37,7 +37,7 @@ const BANLIST_FILENAME: &str = "banlist.ron"; const SERVER_DESCRIPTION_FILENAME: &str = "description.ron"; const ADMINS_FILENAME: &str = "admins.ron"; -#[derive(Copy, Clone, Debug, Deserialize, Serialize)] +#[derive(Copy, Clone, Debug, PartialEq, Deserialize, Serialize)] pub enum ServerBattleMode { Global(BattleMode), PerPlayer { default: BattleMode }, @@ -161,6 +161,7 @@ impl CalendarMode { pub struct Settings { pub gameserver_protocols: Vec, pub metrics_address: SocketAddr, + pub query_address: SocketAddr, pub auth_server_address: Option, pub max_players: u16, pub world_seed: u32, @@ -199,6 +200,7 @@ impl Default for Settings { }, ], metrics_address: SocketAddr::from((Ipv4Addr::LOCALHOST, 14005)), + query_address: SocketAddr::from((Ipv4Addr::LOCALHOST, 14006)), auth_server_address: Some("https://auth.veloren.net".into()), world_seed: DEFAULT_WORLD_SEED, server_name: "Veloren Server".into(), diff --git a/server/src/sys/mod.rs b/server/src/sys/mod.rs index 59e2b446f6..28cfe8dbc0 100644 --- a/server/src/sys/mod.rs +++ b/server/src/sys/mod.rs @@ -10,6 +10,7 @@ pub mod object; pub mod persistence; pub mod pets; pub mod sentinel; +pub mod server_info; pub mod subscription; pub mod terrain; pub mod terrain_sync; @@ -40,6 +41,7 @@ pub fn add_server_systems(dispatch_builder: &mut DispatcherBuilder) { dispatch::(dispatch_builder, &[]); // don't depend on chunk_serialize, as we assume everything is done in a SlowJow dispatch::(dispatch_builder, &[]); + dispatch::(dispatch_builder, &[]); } pub fn run_sync_systems(ecs: &mut specs::World) { diff --git a/server/src/sys/server_info.rs b/server/src/sys/server_info.rs new file mode 100644 index 0000000000..177b3edb18 --- /dev/null +++ b/server/src/sys/server_info.rs @@ -0,0 +1,40 @@ +use crate::{client::Client, query_server::ServerInfoResponse, Settings}; +use common_ecs::{Job, Origin, Phase, System}; +use specs::{Read, ReadStorage, WriteExpect}; +use tokio::sync::mpsc::UnboundedReceiver; +use tracing::error; + +/// TODO: description +#[derive(Default)] +pub struct Sys; +impl<'a> System<'a> for Sys { + type SystemData = ( + ReadStorage<'a, Client>, + Read<'a, Settings>, + WriteExpect<'a, UnboundedReceiver>, + ); + + const NAME: &'static str = "server_info"; + const ORIGIN: Origin = Origin::Server; + const PHASE: Phase = Phase::Create; + + fn run(_job: &mut Job, (clients, settings, mut receiver): Self::SystemData) { + let players_current = (&clients).count() as u16; + let server_info = ServerInfoResponse { + players_current, + players_max: settings.max_players, + git_hash: common::util::GIT_HASH.to_owned(), + battle_mode: settings.gameplay.battle_mode.into(), + }; + + while let Ok(request) = receiver.try_recv() { + if let Err(e) = request.response_sender.send(server_info.clone()) { + error!(?e, "Failed to process System Info request!"); + } + } + } +} + +pub struct ServerInfoRequest { + pub response_sender: tokio::sync::oneshot::Sender, +}