From 93ad288193bad0f324fc7e1e3f1ab150d72d20f2 Mon Sep 17 00:00:00 2001 From: crabman Date: Thu, 25 Apr 2024 12:01:54 +0000 Subject: [PATCH 01/12] query server crate --- Cargo.lock | 43 ++++++ Cargo.toml | 1 + common/query_server/Cargo.toml | 19 +++ common/query_server/examples/demo.rs | 48 +++++++ common/query_server/src/client.rs | 79 +++++++++++ common/query_server/src/lib.rs | 15 ++ common/query_server/src/proto.rs | 60 ++++++++ common/query_server/src/server.rs | 200 +++++++++++++++++++++++++++ 8 files changed, 465 insertions(+) create mode 100644 common/query_server/Cargo.toml create mode 100644 common/query_server/examples/demo.rs create mode 100644 common/query_server/src/client.rs create mode 100644 common/query_server/src/lib.rs create mode 100644 common/query_server/src/proto.rs create mode 100644 common/query_server/src/server.rs diff --git a/Cargo.lock b/Cargo.lock index a092516c31..296b1a7f49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2025,6 +2025,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "backtrace", + "version_check", +] + [[package]] name = "error-code" version = "2.3.1" @@ -4795,6 +4805,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "protocol" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13cfa9ba37e0183f87fb14b82f23fc76494c458c72469d95b8a8eec75ad5f191" +dependencies = [ + "byteorder", + "error-chain", + "num-traits", + "protocol-derive", +] + +[[package]] +name = "protocol-derive" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28647f30298898ead966b51e9aee5c74e4ac709ce5ca554378fde187fd3f7e47" +dependencies = [ + "proc-macro2 1.0.79", + "quote 1.0.35", + "syn 1.0.109", +] + [[package]] name = "psm" version = "0.1.21" @@ -7191,6 +7224,16 @@ dependencies = [ "veloren-world", ] +[[package]] +name = "veloren-server-query" +version = "0.1.0" +dependencies = [ + "protocol", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "veloren-voxygen" version = "0.16.0" diff --git a/Cargo.toml b/Cargo.toml index c8d02902e1..89e31e25fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "common/state", "common/systems", "common/frontend", + "common/query_server", "client", "client/i18n", "rtsim", diff --git a/common/query_server/Cargo.toml b/common/query_server/Cargo.toml new file mode 100644 index 0000000000..271cbe7f9d --- /dev/null +++ b/common/query_server/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "veloren-server-query" +version = "0.1.0" +authors = ["crabman ", "XVar "] +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[features] +server = ["dep:tokio"] +client = ["dep:tokio", "tokio/time"] +example = ["tokio/macros", "tokio/rt-multi-thread", "dep:tracing-subscriber"] +default = ["server", "client"] + +[dependencies] +tokio = { workspace = true, optional = true, features = ["net", "sync"] } +protocol = { version = "3.4.0", default-features = false, features = ["derive"] } +tracing-subscriber = { version = "0.3.7", optional = true } +tracing = { workspace = true } diff --git a/common/query_server/examples/demo.rs b/common/query_server/examples/demo.rs new file mode 100644 index 0000000000..6caeb0a6b0 --- /dev/null +++ b/common/query_server/examples/demo.rs @@ -0,0 +1,48 @@ +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::Arc, + time::Instant, +}; + +use tokio::sync::{watch, RwLock}; +use veloren_server_query::{ + client::QueryClient, + proto::{ServerBattleMode, ServerInfo}, + server::{Metrics, QueryServer}, +}; + +const DEFAULT_SERVER_INFO: ServerInfo = ServerInfo { + git_hash: ['\0'; 10], + players_count: 100, + player_cap: 300, + battlemode: ServerBattleMode::GlobalPvE, +}; + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 14006); + let (_sender, receiver) = watch::channel(DEFAULT_SERVER_INFO); + let mut server = QueryServer::new(addr, receiver); + let metrics = Arc::new(RwLock::new(Metrics::default())); + let metrics2 = Arc::clone(&metrics); + + tokio::task::spawn(async move { server.run(metrics2).await.unwrap() }); + + let client = QueryClient { addr }; + let ping = client.ping().await.unwrap(); + let info = client.server_info().await.unwrap(); + + println!("Ping = {}ms", ping.as_millis()); + println!("Server info: {info:?}"); + println!("Metrics = {:#?}", metrics.read().await); + assert_eq!(info, DEFAULT_SERVER_INFO); + + let start = Instant::now(); + + for _i in 0..10000 { + client.ping().await.unwrap(); + } + + dbg!(start.elapsed()); +} diff --git a/common/query_server/src/client.rs b/common/query_server/src/client.rs new file mode 100644 index 0000000000..17d3d993f7 --- /dev/null +++ b/common/query_server/src/client.rs @@ -0,0 +1,79 @@ +use std::{ + io, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + time::{Duration, Instant}, +}; + +use tokio::{net::UdpSocket, time::timeout}; + +use crate::proto::{Ping, QueryServerRequest, QueryServerResponse, ServerInfo, VELOREN_HEADER}; + +#[derive(Debug)] +pub enum QueryClientError { + Io(tokio::io::Error), + Protocol(protocol::Error), + InvalidResponse, + Timeout, +} + +pub struct QueryClient { + pub addr: SocketAddr, +} + +impl QueryClient { + pub async fn server_info(&self) -> Result { + self.send_query(QueryServerRequest::ServerInfo(Default::default())) + .await + .and_then(|(response, _)| { + if let QueryServerResponse::ServerInfo(info) = response { + Ok(info) + } else { + Err(QueryClientError::InvalidResponse) + } + }) + } + + pub async fn ping(&self) -> Result { + self.send_query(QueryServerRequest::Ping(Ping)) + .await + .map(|(_, elapsed)| elapsed) + } + + async fn send_query( + &self, + request: QueryServerRequest, + ) -> Result<(QueryServerResponse, Duration), QueryClientError> { + let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?; + + let mut pipeline = crate::create_pipeline(); + + let mut buf = VELOREN_HEADER.to_vec(); + + let mut cursor = io::Cursor::new(&mut buf); + cursor.set_position(VELOREN_HEADER.len() as u64); + pipeline.send_to(&mut cursor, &request)?; + + let query_sent = Instant::now(); + socket.send_to(buf.as_slice(), self.addr).await?; + + let mut buf = vec![0; 1500]; + let _ = timeout(Duration::from_secs(2), socket.recv_from(&mut buf)) + .await + .map_err(|_| QueryClientError::Timeout)? + .map_err(|_| QueryClientError::Timeout)?; + + let mut pipeline = crate::create_pipeline(); + + let packet: QueryServerResponse = pipeline.receive_from(&mut io::Cursor::new(&mut buf))?; + + Ok((packet, query_sent.elapsed())) + } +} + +impl From for QueryClientError { + fn from(value: tokio::io::Error) -> Self { Self::Io(value) } +} + +impl From for QueryClientError { + fn from(value: protocol::Error) -> Self { Self::Protocol(value) } +} diff --git a/common/query_server/src/lib.rs b/common/query_server/src/lib.rs new file mode 100644 index 0000000000..06a64aa014 --- /dev/null +++ b/common/query_server/src/lib.rs @@ -0,0 +1,15 @@ +use protocol::{ + wire::{self, dgram}, + Parcel, +}; + +#[cfg(feature = "client")] pub mod client; +pub mod proto; +#[cfg(feature = "server")] pub mod server; + +fn create_pipeline() -> dgram::Pipeline { + dgram::Pipeline::new( + wire::middleware::pipeline::default(), + protocol::Settings::default(), + ) +} diff --git a/common/query_server/src/proto.rs b/common/query_server/src/proto.rs new file mode 100644 index 0000000000..e12850409f --- /dev/null +++ b/common/query_server/src/proto.rs @@ -0,0 +1,60 @@ +use protocol::Protocol; + +pub const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n']; + +#[derive(Protocol, Debug, Clone, Copy)] +pub struct Ping; + +#[derive(Protocol, Debug, Clone, Copy)] +pub struct Pong; + +#[derive(Protocol, Debug, Clone, Copy)] +#[protocol(discriminant = "integer")] +#[protocol(discriminator(u8))] +#[allow(clippy::large_enum_variant)] +pub enum QueryServerRequest { + Ping(Ping), + ServerInfo(ServerInfoRequest), + // New requests should be added at the end to prevent breakage +} + +#[derive(Protocol, Debug, Clone, Copy)] +#[protocol(discriminant = "integer")] +#[protocol(discriminator(u8))] +pub enum QueryServerResponse { + Pong(Pong), + ServerInfo(ServerInfo), + // New responses should be added at the end to prevent breakage +} + +#[derive(Protocol, Debug, Clone, Copy)] +pub struct ServerInfoRequest { + // Padding to prevent amplification attacks + pub _padding: [u8; 256], +} + +#[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)] +pub struct ServerInfo { + pub git_hash: [char; 10], + pub players_count: u16, + pub player_cap: u16, + pub battlemode: ServerBattleMode, +} + +#[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)] +#[protocol(discriminant = "integer")] +#[protocol(discriminator(u8))] +#[repr(u8)] +pub enum ServerBattleMode { + GlobalPvP, + GlobalPvE, + PerPlayer, +} + +impl Default for ServerInfoRequest { + fn default() -> Self { ServerInfoRequest { _padding: [0; 256] } } +} + +impl ServerInfo { + pub fn git_hash(&self) -> String { String::from_iter(&self.git_hash) } +} diff --git a/common/query_server/src/server.rs b/common/query_server/src/server.rs new file mode 100644 index 0000000000..9901eb9da6 --- /dev/null +++ b/common/query_server/src/server.rs @@ -0,0 +1,200 @@ +use std::{ + io, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + sync::Arc, + time::Duration, +}; + +use protocol::wire::{self, dgram}; +use tokio::{ + net::UdpSocket, + sync::{watch, RwLock}, + time::timeout, +}; +use tracing::{debug, trace}; + +use crate::proto::{ + Ping, Pong, QueryServerRequest, QueryServerResponse, ServerInfo, VELOREN_HEADER, +}; + +const RESPONSE_SEND_TIMEOUT: Duration = Duration::from_secs(2); + +pub struct QueryServer { + pub addr: SocketAddr, + server_info: watch::Receiver, + pipeline: dgram::Pipeline, +} + +#[derive(Default, Clone, Copy, Debug)] +pub struct Metrics { + pub received_packets: u32, + pub dropped_packets: u32, + pub invalid_packets: u32, + pub proccessing_errors: u32, + pub ping_requests: u32, + pub info_requests: u32, + pub sent_responses: u32, + pub failed_responses: u32, + pub timed_out_responses: u32, +} + +impl QueryServer { + pub fn new(addr: SocketAddr, server_info: watch::Receiver) -> Self { + Self { + addr, + server_info, + pipeline: crate::create_pipeline(), + } + } + + pub async fn run(&mut self, metrics: Arc>) -> Result<(), tokio::io::Error> { + let socket = UdpSocket::bind(self.addr).await?; + + let mut buf = Box::new([0; 1024]); + loop { + let Ok((len, remote_addr)) = socket.recv_from(&mut *buf).await.inspect_err(|err| { + debug!("Error while receiving from query server socket: {err:?}") + }) else { + continue; + }; + + let mut new_metrics = Metrics { + received_packets: 1, + ..Default::default() + }; + + let raw_msg_buf = &buf[..len]; + let msg_buf = if Self::validate_datagram(raw_msg_buf) { + &raw_msg_buf[VELOREN_HEADER.len()..] + } else { + new_metrics.dropped_packets += 1; + continue; + }; + + if let Err(error) = self + .process_datagram(msg_buf, remote_addr, &mut new_metrics) + .await + { + debug!(?error, "Error while processing datagram"); + } + + *buf = [0; 1024]; + + // Update metrics at the end of eath packet + let mut metrics = metrics.write().await; + *metrics += new_metrics; + } + } + + // Header must be discarded after this validation passes + fn validate_datagram(data: &[u8]) -> bool { + let len = data.len(); + if len < VELOREN_HEADER.len() + 1 { + trace!(?len, "Datagram too short"); + false + } else if data[0..VELOREN_HEADER.len()] != VELOREN_HEADER { + trace!(?len, "Datagram header invalid"); + false + } else { + true + } + } + + async fn process_datagram( + &mut self, + datagram: &[u8], + remote: SocketAddr, + metrics: &mut Metrics, + ) -> Result<(), tokio::io::Error> { + let Ok(packet): Result = + self.pipeline.receive_from(&mut io::Cursor::new(datagram)) + else { + metrics.invalid_packets += 1; + return Ok(()); + }; + + trace!(?packet, "Received packet"); + + match packet { + QueryServerRequest::Ping(Ping) => { + metrics.ping_requests += 1; + tokio::task::spawn(async move { + _ = timeout( + RESPONSE_SEND_TIMEOUT, + Self::send_response(QueryServerResponse::Pong(Pong), remote), + ) + .await; + }); + }, + QueryServerRequest::ServerInfo(_) => { + metrics.info_requests += 1; + let server_info = *self.server_info.borrow(); + tokio::task::spawn(async move { + _ = timeout( + RESPONSE_SEND_TIMEOUT, + Self::send_response(QueryServerResponse::ServerInfo(server_info), remote), + ) + .await; + }); + }, + } + + Ok(()) + } + + async fn send_response(response: QueryServerResponse, addr: SocketAddr) { + let Ok(socket) = + UdpSocket::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))).await + else { + debug!("Failed to create response socket"); + return; + }; + + let mut buf = Vec::new(); + + let mut pipeline = crate::create_pipeline(); + + _ = pipeline.send_to(&mut io::Cursor::new(&mut buf), &response); + match socket.send_to(&buf, addr).await { + Ok(_) => { + // TODO: Sent responses metric + }, + Err(err) => { + // TODO: Failed response metric + debug!(?err, "Failed to send query server response"); + }, + } + } +} + +impl std::ops::AddAssign for Metrics { + fn add_assign( + &mut self, + Self { + received_packets, + dropped_packets, + invalid_packets, + proccessing_errors, + ping_requests, + info_requests, + sent_responses, + failed_responses, + timed_out_responses, + }: Self, + ) { + self.received_packets += received_packets; + self.dropped_packets += dropped_packets; + self.invalid_packets += invalid_packets; + self.proccessing_errors += proccessing_errors; + self.ping_requests += ping_requests; + self.info_requests += info_requests; + self.sent_responses += sent_responses; + self.failed_responses += failed_responses; + self.timed_out_responses += timed_out_responses; + } +} + +impl Metrics { + /// Resets all metrics to 0 + pub fn reset(&mut self) { *self = Self::default(); } +} From 83d4c775e235d1afe5cb08386c9f487cb58e1477 Mon Sep 17 00:00:00 2001 From: crabman Date: Thu, 25 Apr 2024 12:47:23 +0000 Subject: [PATCH 02/12] connect query server to gameserver --- Cargo.lock | 21 +++++++-------- common/query_server/Cargo.toml | 2 +- server/Cargo.toml | 1 + server/src/lib.rs | 23 +++++++++++++++++ server/src/settings.rs | 16 ++++++++++++ server/src/sys/mod.rs | 1 + server/src/sys/server_info.rs | 47 ++++++++++++++++++++++++++++++++++ 7 files changed, 100 insertions(+), 11 deletions(-) create mode 100644 server/src/sys/server_info.rs diff --git a/Cargo.lock b/Cargo.lock index 296b1a7f49..4ba2c9fb92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7103,6 +7103,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "veloren-query-server" +version = "0.1.0" +dependencies = [ + "protocol", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "veloren-rtsim" version = "0.10.0" @@ -7171,6 +7181,7 @@ dependencies = [ "veloren-common-state", "veloren-common-systems", "veloren-network", + "veloren-query-server", "veloren-rtsim", "veloren-server-agent", "veloren-world", @@ -7224,16 +7235,6 @@ dependencies = [ "veloren-world", ] -[[package]] -name = "veloren-server-query" -version = "0.1.0" -dependencies = [ - "protocol", - "tokio", - "tracing", - "tracing-subscriber", -] - [[package]] name = "veloren-voxygen" version = "0.16.0" diff --git a/common/query_server/Cargo.toml b/common/query_server/Cargo.toml index 271cbe7f9d..097a4a7b33 100644 --- a/common/query_server/Cargo.toml +++ b/common/query_server/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "veloren-server-query" +name = "veloren-query-server" version = "0.1.0" authors = ["crabman ", "XVar "] edition = "2021" diff --git a/server/Cargo.toml b/server/Cargo.toml index 5b180d2f9a..23ba5bd05c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -18,6 +18,7 @@ default = ["worldgen", "plugins", "persistent_world", "simd"] [dependencies] common = { package = "veloren-common", path = "../common" } common-base = { package = "veloren-common-base", path = "../common/base" } +veloren-query-server = { package = "veloren-query-server", path = "../common/query_server", features = ["server"] } common-ecs = { package = "veloren-common-ecs", path = "../common/ecs" } common-state = { package = "veloren-common-state", path = "../common/state" } common-systems = { package = "veloren-common-systems", path = "../common/systems" } diff --git a/server/src/lib.rs b/server/src/lib.rs index 332b2275d6..58a4959883 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -124,6 +124,7 @@ use test_world::{IndexOwned, World}; use tokio::runtime::Runtime; use tracing::{debug, error, info, trace, warn}; use vek::*; +use veloren_query_server::server::QueryServer; pub use world::{civ::WorldCivStage, sim::WorldSimStage, WorldGenerateStage}; use crate::{ @@ -597,6 +598,28 @@ impl Server { } } + if let Some(addr) = settings.query_address { + use veloren_query_server::proto::ServerInfo; + + let (query_server_info_tx, query_server_info_rx) = + tokio::sync::watch::channel(ServerInfo { + git_hash: *sys::server_info::GIT_HASH, + players_count: 0, + player_cap: settings.max_players, + battlemode: settings.gameplay.battle_mode.into(), + }); + let mut query_server = QueryServer::new(addr, query_server_info_rx); + let query_server_metrics = Arc::new(tokio::sync::RwLock::new( + veloren_query_server::server::Metrics::default(), + )); + let query_server_metrics2 = Arc::clone(&query_server_metrics); + runtime.spawn(async move { + _ = query_server.run(query_server_metrics2).await; + }); + state.ecs_mut().insert(query_server_info_tx); + state.ecs_mut().insert(query_server_metrics); + } + runtime.block_on(network.listen(ListenAddr::Mpsc(14004)))?; let connection_handler = ConnectionHandler::new(network, &runtime); diff --git a/server/src/settings.rs b/server/src/settings.rs index 880df37062..efb2013dfb 100644 --- a/server/src/settings.rs +++ b/server/src/settings.rs @@ -67,6 +67,20 @@ impl ServerBattleMode { } } +impl From for veloren_query_server::proto::ServerBattleMode { + fn from(value: ServerBattleMode) -> Self { + use veloren_query_server::proto::ServerBattleMode as QueryBattleMode; + + match value { + ServerBattleMode::Global(mode) => match mode { + BattleMode::PvP => QueryBattleMode::GlobalPvP, + BattleMode::PvE => QueryBattleMode::GlobalPvE, + }, + ServerBattleMode::PerPlayer { .. } => QueryBattleMode::PerPlayer, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Protocol { Quic { @@ -163,6 +177,7 @@ impl CalendarMode { pub struct Settings { pub gameserver_protocols: Vec, pub auth_server_address: Option, + pub query_address: Option, pub max_players: u16, pub world_seed: u32, pub server_name: String, @@ -204,6 +219,7 @@ impl Default for Settings { }, ], auth_server_address: Some("https://auth.veloren.net".into()), + query_address: Some(SocketAddr::from((Ipv4Addr::UNSPECIFIED, 14006))), world_seed: DEFAULT_WORLD_SEED, server_name: "Veloren Server".into(), max_players: 100, diff --git a/server/src/sys/mod.rs b/server/src/sys/mod.rs index f53c62fa71..cf9276fae3 100644 --- a/server/src/sys/mod.rs +++ b/server/src/sys/mod.rs @@ -11,6 +11,7 @@ pub mod object; pub mod persistence; pub mod pets; pub mod sentinel; +pub mod server_info; pub mod subscription; pub mod teleporter; pub mod terrain; diff --git a/server/src/sys/server_info.rs b/server/src/sys/server_info.rs new file mode 100644 index 0000000000..fd01ec8a70 --- /dev/null +++ b/server/src/sys/server_info.rs @@ -0,0 +1,47 @@ +use common_ecs::{Origin, Phase, System}; +use lazy_static::lazy_static; +use specs::{Read, ReadStorage}; +use veloren_query_server::proto::ServerInfo; + +use crate::{client::Client, Settings, Tick}; + +// Update the server stats every 60 ticks +const INFO_SEND_INTERVAL: u64 = 60; + +lazy_static! { + pub static ref GIT_HASH: [char; 10] = common::util::GIT_HASH[..10] + .chars() + .collect::>() + .try_into() + .unwrap_or_default(); +} + +#[derive(Default)] +pub struct Sys; + +impl<'a> System<'a> for Sys { + type SystemData = ( + Read<'a, Tick>, + Read<'a, Settings>, + Read<'a, Option>>, + ReadStorage<'a, Client>, + ); + + const NAME: &'static str = "server_info"; + const ORIGIN: Origin = Origin::Server; + const PHASE: Phase = Phase::Create; + + fn run(_job: &mut common_ecs::Job, (tick, settings, sender, clients): Self::SystemData) { + if let Some(sender) = sender.as_ref() + && tick.0 % INFO_SEND_INTERVAL == 0 + { + let count = clients.count().try_into().unwrap_or(u16::MAX); + _ = sender.send(ServerInfo { + git_hash: *GIT_HASH, + players_count: count, + player_cap: settings.max_players, + battlemode: settings.gameplay.battle_mode.into(), + }); + } + } +} From 800480b082fddf0ce53f9be98dd5bd3bcdeeb392 Mon Sep 17 00:00:00 2001 From: crabman Date: Thu, 25 Apr 2024 12:47:51 +0000 Subject: [PATCH 03/12] export metrics --- common/query_server/Cargo.toml | 4 ++ common/query_server/examples/demo.rs | 2 +- common/query_server/src/server.rs | 56 ++++++++++----- server/src/lib.rs | 2 + server/src/metrics.rs | 102 ++++++++++++++++++++++++++- server/src/sys/metrics.rs | 13 +++- 6 files changed, 159 insertions(+), 20 deletions(-) diff --git a/common/query_server/Cargo.toml b/common/query_server/Cargo.toml index 097a4a7b33..6db272cdcf 100644 --- a/common/query_server/Cargo.toml +++ b/common/query_server/Cargo.toml @@ -17,3 +17,7 @@ tokio = { workspace = true, optional = true, features = ["net", "sync"] } protocol = { version = "3.4.0", default-features = false, features = ["derive"] } tracing-subscriber = { version = "0.3.7", optional = true } tracing = { workspace = true } + +[[example]] +name = "demo" +required-features = ["example"] diff --git a/common/query_server/examples/demo.rs b/common/query_server/examples/demo.rs index 6caeb0a6b0..1bb5d09dd7 100644 --- a/common/query_server/examples/demo.rs +++ b/common/query_server/examples/demo.rs @@ -5,7 +5,7 @@ use std::{ }; use tokio::sync::{watch, RwLock}; -use veloren_server_query::{ +use veloren_query_server::{ client::QueryClient, proto::{ServerBattleMode, ServerInfo}, server::{Metrics, QueryServer}, diff --git a/common/query_server/src/server.rs b/common/query_server/src/server.rs index 9901eb9da6..09dae7333a 100644 --- a/common/query_server/src/server.rs +++ b/common/query_server/src/server.rs @@ -1,4 +1,5 @@ use std::{ + future::Future, io, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, sync::Arc, @@ -72,7 +73,11 @@ impl QueryServer { }; if let Err(error) = self - .process_datagram(msg_buf, remote_addr, &mut new_metrics) + .process_datagram( + msg_buf, + remote_addr, + (&mut new_metrics, Arc::clone(&metrics)), + ) .await { debug!(?error, "Error while processing datagram"); @@ -104,35 +109,50 @@ impl QueryServer { &mut self, datagram: &[u8], remote: SocketAddr, - metrics: &mut Metrics, + (new_metrics, metrics): (&mut Metrics, Arc>), ) -> Result<(), tokio::io::Error> { let Ok(packet): Result = self.pipeline.receive_from(&mut io::Cursor::new(datagram)) else { - metrics.invalid_packets += 1; + new_metrics.invalid_packets += 1; return Ok(()); }; trace!(?packet, "Received packet"); + async fn timed<'a, F: Future + 'a, O>( + fut: F, + metrics: &'a Arc>, + ) -> Option { + if let Ok(res) = timeout(RESPONSE_SEND_TIMEOUT, fut).await { + Some(res) + } else { + metrics.write().await.timed_out_responses += 1; + None + } + } match packet { QueryServerRequest::Ping(Ping) => { - metrics.ping_requests += 1; + new_metrics.ping_requests += 1; tokio::task::spawn(async move { - _ = timeout( - RESPONSE_SEND_TIMEOUT, - Self::send_response(QueryServerResponse::Pong(Pong), remote), + timed( + Self::send_response(QueryServerResponse::Pong(Pong), remote, &metrics), + &metrics, ) .await; }); }, QueryServerRequest::ServerInfo(_) => { - metrics.info_requests += 1; + new_metrics.info_requests += 1; let server_info = *self.server_info.borrow(); tokio::task::spawn(async move { - _ = timeout( - RESPONSE_SEND_TIMEOUT, - Self::send_response(QueryServerResponse::ServerInfo(server_info), remote), + timed( + Self::send_response( + QueryServerResponse::ServerInfo(server_info), + remote, + &metrics, + ), + &metrics, ) .await; }); @@ -142,7 +162,11 @@ impl QueryServer { Ok(()) } - async fn send_response(response: QueryServerResponse, addr: SocketAddr) { + async fn send_response( + response: QueryServerResponse, + addr: SocketAddr, + metrics: &Arc>, + ) { let Ok(socket) = UdpSocket::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))).await else { @@ -157,10 +181,10 @@ impl QueryServer { _ = pipeline.send_to(&mut io::Cursor::new(&mut buf), &response); match socket.send_to(&buf, addr).await { Ok(_) => { - // TODO: Sent responses metric + metrics.write().await.sent_responses += 1; }, Err(err) => { - // TODO: Failed response metric + metrics.write().await.failed_responses += 1; debug!(?err, "Failed to send query server response"); }, } @@ -195,6 +219,6 @@ impl std::ops::AddAssign for Metrics { } impl Metrics { - /// Resets all metrics to 0 - pub fn reset(&mut self) { *self = Self::default(); } + /// Resets all metrics to 0 and returns previous ones + pub fn reset(&mut self) -> Self { std::mem::take(self) } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 58a4959883..7c35384b7c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -275,6 +275,7 @@ impl Server { let tick_metrics = TickMetrics::new(®istry).unwrap(); let physics_metrics = PhysicsMetrics::new(®istry).unwrap(); let server_event_metrics = metrics::ServerEventMetrics::new(®istry).unwrap(); + let query_server_metrics = metrics::QueryServerMetrics::new(®istry).unwrap(); let battlemode_buffer = BattleModeBuffer::default(); @@ -377,6 +378,7 @@ impl Server { state.ecs_mut().insert(tick_metrics); state.ecs_mut().insert(physics_metrics); state.ecs_mut().insert(server_event_metrics); + state.ecs_mut().insert(query_server_metrics); if settings.experimental_terrain_persistence { #[cfg(feature = "persistent_world")] { diff --git a/server/src/metrics.rs b/server/src/metrics.rs index 1d146ee431..7be921c1c7 100644 --- a/server/src/metrics.rs +++ b/server/src/metrics.rs @@ -70,6 +70,18 @@ pub struct ServerEventMetrics { pub event_count: IntCounterVec, } +pub struct QueryServerMetrics { + pub received_packets: IntGauge, + pub dropped_packets: IntGauge, + pub invalid_packets: IntGauge, + pub proccessing_errors: IntGauge, + pub ping_requests: IntGauge, + pub info_requests: IntGauge, + pub sent_responses: IntGauge, + pub failed_responses: IntGauge, + pub timed_out_responses: IntGauge, +} + impl PhysicsMetrics { pub fn new(registry: &Registry) -> Result { let entity_entity_collision_checks_count = IntCounter::with_opts(Opts::new( @@ -415,7 +427,7 @@ impl TickMetrics { } impl ServerEventMetrics { - pub fn new(registry: &Registry) -> Result> { + pub fn new(registry: &Registry) -> Result { let event_count = IntCounterVec::new( Opts::new("event_count", "number of ServerEvents handled"), &["event"], @@ -425,3 +437,91 @@ impl ServerEventMetrics { Ok(Self { event_count }) } } + +impl QueryServerMetrics { + pub fn new(registry: &Registry) -> Result { + let received_packets = IntGauge::with_opts(Opts::new( + "query_server::received_packets", + "Total amount of received packets by the query server", + ))?; + let dropped_packets = IntGauge::with_opts(Opts::new( + "query_server::dropped_packets", + "Amount of dropped packets received by the query server (too short or invalid header)", + ))?; + let invalid_packets = IntGauge::with_opts(Opts::new( + "query_server::invalid_packets", + "Amount of unparseable packets received by the query server", + ))?; + let proccessing_errors = IntGauge::with_opts(Opts::new( + "query_server::proccessing_errors", + "Amount of errors that occured while processing a query server request", + ))?; + let ping_requests = IntGauge::with_opts(Opts::new( + "query_server::ping_requests", + "Amount of ping requests received by the query server", + ))?; + let info_requests = IntGauge::with_opts(Opts::new( + "query_server::info_requests", + "Amount of server info requests received by the query server", + ))?; + let sent_responses = IntGauge::with_opts(Opts::new( + "query_server::sent_responses", + "Amount of responses sent by the query server", + ))?; + let failed_responses = IntGauge::with_opts(Opts::new( + "query_server::failed_responses", + "Amount of responses which failed to be sent by the query server", + ))?; + let timed_out_responses = IntGauge::with_opts(Opts::new( + "query_server::timed_out_responses", + "Amount of responses which timed out", + ))?; + + registry.register(Box::new(received_packets.clone()))?; + registry.register(Box::new(dropped_packets.clone()))?; + registry.register(Box::new(invalid_packets.clone()))?; + registry.register(Box::new(proccessing_errors.clone()))?; + registry.register(Box::new(ping_requests.clone()))?; + registry.register(Box::new(info_requests.clone()))?; + registry.register(Box::new(sent_responses.clone()))?; + registry.register(Box::new(failed_responses.clone()))?; + registry.register(Box::new(timed_out_responses.clone()))?; + + Ok(Self { + received_packets, + dropped_packets, + invalid_packets, + proccessing_errors, + ping_requests, + info_requests, + sent_responses, + failed_responses, + timed_out_responses, + }) + } + + pub fn apply( + &self, + veloren_query_server::server::Metrics { + received_packets, + dropped_packets, + invalid_packets, + proccessing_errors, + ping_requests, + info_requests, + sent_responses, + failed_responses, + timed_out_responses, + }: veloren_query_server::server::Metrics, + ) { + self.received_packets.set(received_packets as i64); + self.dropped_packets.set(dropped_packets as i64); + self.invalid_packets.set(invalid_packets as i64); + self.proccessing_errors.set(proccessing_errors as i64); + self.ping_requests.set(ping_requests as i64); + self.info_requests.set(info_requests as i64); + self.sent_responses.set(sent_responses as i64); + self.failed_responses.set(failed_responses as i64); + self.timed_out_responses.set(timed_out_responses as i64); + } +} diff --git a/server/src/sys/metrics.rs b/server/src/sys/metrics.rs index d8633805cc..749f793c6a 100644 --- a/server/src/sys/metrics.rs +++ b/server/src/sys/metrics.rs @@ -1,12 +1,13 @@ use crate::{ chunk_generator::ChunkGenerator, - metrics::{EcsSystemMetrics, JobMetrics, PhysicsMetrics, TickMetrics}, + metrics::{EcsSystemMetrics, JobMetrics, PhysicsMetrics, QueryServerMetrics, TickMetrics}, HwStats, Tick, TickStart, }; use common::{resources::TimeOfDay, slowjob::SlowJobPool, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, SysMetrics, System}; -use specs::{Entities, Join, Read, ReadExpect}; +use specs::{Entities, Join, Read, ReadExpect, Write}; use std::time::Instant; +use veloren_query_server::server::Metrics as RawQueryServerMetrics; /// This system exports metrics #[derive(Default)] @@ -27,6 +28,8 @@ impl<'a> System<'a> for Sys { ReadExpect<'a, TickMetrics>, ReadExpect<'a, PhysicsMetrics>, ReadExpect<'a, JobMetrics>, + Write<'a, Option>, + ReadExpect<'a, QueryServerMetrics>, ); const NAME: &'static str = "metrics"; @@ -50,6 +53,8 @@ impl<'a> System<'a> for Sys { export_tick, export_physics, export_jobs, + mut raw_query_server, + export_query_server, ): Self::SystemData, ) { const NANOSEC_PER_SEC: f64 = std::time::Duration::from_secs(1).as_nanos() as f64; @@ -165,5 +170,9 @@ impl<'a> System<'a> for Sys { .system_length_hist .with_label_values(&["metrics"]) .observe(len as f64 / NANOSEC_PER_SEC); + + if let Some(query_server_metrics) = raw_query_server.as_mut() { + export_query_server.apply(query_server_metrics.reset()); + } } } From 419ee88cc2b436bf9dc882722e57bff7a3a96f83 Mon Sep 17 00:00:00 2001 From: crabman Date: Fri, 26 Apr 2024 08:06:40 +0000 Subject: [PATCH 04/12] add extra padding for future version information and remove pings --- common/query_server/examples/demo.rs | 9 +++-- common/query_server/src/client.rs | 43 ++++++++++++------------ common/query_server/src/lib.rs | 12 ------- common/query_server/src/proto.rs | 12 ++----- common/query_server/src/server.rs | 50 +++++++++++----------------- server/src/metrics.rs | 9 ----- server/src/sys/metrics.rs | 17 ++++++---- server/src/sys/server_info.rs | 4 +-- 8 files changed, 60 insertions(+), 96 deletions(-) diff --git a/common/query_server/examples/demo.rs b/common/query_server/examples/demo.rs index 1bb5d09dd7..a2a11b2f11 100644 --- a/common/query_server/examples/demo.rs +++ b/common/query_server/examples/demo.rs @@ -12,7 +12,7 @@ use veloren_query_server::{ }; const DEFAULT_SERVER_INFO: ServerInfo = ServerInfo { - git_hash: ['\0'; 10], + git_hash: ['\0'; 8], players_count: 100, player_cap: 300, battlemode: ServerBattleMode::GlobalPvE, @@ -30,19 +30,18 @@ async fn main() { tokio::task::spawn(async move { server.run(metrics2).await.unwrap() }); let client = QueryClient { addr }; - let ping = client.ping().await.unwrap(); - let info = client.server_info().await.unwrap(); + let (info, ping) = client.server_info().await.unwrap(); println!("Ping = {}ms", ping.as_millis()); println!("Server info: {info:?}"); - println!("Metrics = {:#?}", metrics.read().await); assert_eq!(info, DEFAULT_SERVER_INFO); let start = Instant::now(); for _i in 0..10000 { - client.ping().await.unwrap(); + client.server_info().await.unwrap(); } + println!("Metrics = {:#?}", metrics.read().await); dbg!(start.elapsed()); } diff --git a/common/query_server/src/client.rs b/common/query_server/src/client.rs index 17d3d993f7..da4a159c07 100644 --- a/common/query_server/src/client.rs +++ b/common/query_server/src/client.rs @@ -4,9 +4,13 @@ use std::{ time::{Duration, Instant}, }; +use protocol::Parcel; use tokio::{net::UdpSocket, time::timeout}; -use crate::proto::{Ping, QueryServerRequest, QueryServerResponse, ServerInfo, VELOREN_HEADER}; +use crate::proto::{ + QueryServerRequest, QueryServerResponse, ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, + VELOREN_HEADER, +}; #[derive(Debug)] pub enum QueryClientError { @@ -21,50 +25,45 @@ pub struct QueryClient { } impl QueryClient { - pub async fn server_info(&self) -> Result { + pub async fn server_info(&self) -> Result<(ServerInfo, Duration), QueryClientError> { self.send_query(QueryServerRequest::ServerInfo(Default::default())) .await - .and_then(|(response, _)| { + .and_then(|(response, duration)| { + #[allow(irrefutable_let_patterns)] // TODO: remove when more variants are added if let QueryServerResponse::ServerInfo(info) = response { - Ok(info) + Ok((info, duration)) } else { Err(QueryClientError::InvalidResponse) } }) } - pub async fn ping(&self) -> Result { - self.send_query(QueryServerRequest::Ping(Ping)) - .await - .map(|(_, elapsed)| elapsed) - } - async fn send_query( &self, request: QueryServerRequest, ) -> Result<(QueryServerResponse, Duration), QueryClientError> { let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?; - let mut pipeline = crate::create_pipeline(); - - let mut buf = VELOREN_HEADER.to_vec(); - - let mut cursor = io::Cursor::new(&mut buf); - cursor.set_position(VELOREN_HEADER.len() as u64); - pipeline.send_to(&mut cursor, &request)?; + let mut buf = Vec::with_capacity(VELOREN_HEADER.len() + 2 + MAX_REQUEST_SIZE); + buf.extend(VELOREN_HEADER); + // 2 extra bytes for version information, currently unused + buf.extend([0; 2]); + buf.extend(::raw_bytes( + &request, + &Default::default(), + )?); let query_sent = Instant::now(); - socket.send_to(buf.as_slice(), self.addr).await?; + socket.send_to(&buf, self.addr).await?; - let mut buf = vec![0; 1500]; + let mut buf = vec![0; MAX_RESPONSE_SIZE]; let _ = timeout(Duration::from_secs(2), socket.recv_from(&mut buf)) .await .map_err(|_| QueryClientError::Timeout)? .map_err(|_| QueryClientError::Timeout)?; - let mut pipeline = crate::create_pipeline(); - - let packet: QueryServerResponse = pipeline.receive_from(&mut io::Cursor::new(&mut buf))?; + let packet = + ::read(&mut io::Cursor::new(buf), &Default::default())?; Ok((packet, query_sent.elapsed())) } diff --git a/common/query_server/src/lib.rs b/common/query_server/src/lib.rs index 06a64aa014..bca951ce34 100644 --- a/common/query_server/src/lib.rs +++ b/common/query_server/src/lib.rs @@ -1,15 +1,3 @@ -use protocol::{ - wire::{self, dgram}, - Parcel, -}; - #[cfg(feature = "client")] pub mod client; pub mod proto; #[cfg(feature = "server")] pub mod server; - -fn create_pipeline() -> dgram::Pipeline { - dgram::Pipeline::new( - wire::middleware::pipeline::default(), - protocol::Settings::default(), - ) -} diff --git a/common/query_server/src/proto.rs b/common/query_server/src/proto.rs index e12850409f..79b3be6d56 100644 --- a/common/query_server/src/proto.rs +++ b/common/query_server/src/proto.rs @@ -1,19 +1,14 @@ use protocol::Protocol; pub const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n']; - -#[derive(Protocol, Debug, Clone, Copy)] -pub struct Ping; - -#[derive(Protocol, Debug, Clone, Copy)] -pub struct Pong; +pub const MAX_REQUEST_SIZE: usize = 300; +pub const MAX_RESPONSE_SIZE: usize = 256; #[derive(Protocol, Debug, Clone, Copy)] #[protocol(discriminant = "integer")] #[protocol(discriminator(u8))] #[allow(clippy::large_enum_variant)] pub enum QueryServerRequest { - Ping(Ping), ServerInfo(ServerInfoRequest), // New requests should be added at the end to prevent breakage } @@ -22,7 +17,6 @@ pub enum QueryServerRequest { #[protocol(discriminant = "integer")] #[protocol(discriminator(u8))] pub enum QueryServerResponse { - Pong(Pong), ServerInfo(ServerInfo), // New responses should be added at the end to prevent breakage } @@ -35,7 +29,7 @@ pub struct ServerInfoRequest { #[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)] pub struct ServerInfo { - pub git_hash: [char; 10], + pub git_hash: [char; 8], pub players_count: u16, pub player_cap: u16, pub battlemode: ServerBattleMode, diff --git a/common/query_server/src/server.rs b/common/query_server/src/server.rs index 09dae7333a..2993e9bf47 100644 --- a/common/query_server/src/server.rs +++ b/common/query_server/src/server.rs @@ -6,7 +6,7 @@ use std::{ time::Duration, }; -use protocol::wire::{self, dgram}; +use protocol::Parcel; use tokio::{ net::UdpSocket, sync::{watch, RwLock}, @@ -15,7 +15,7 @@ use tokio::{ use tracing::{debug, trace}; use crate::proto::{ - Ping, Pong, QueryServerRequest, QueryServerResponse, ServerInfo, VELOREN_HEADER, + QueryServerRequest, QueryServerResponse, ServerInfo, MAX_REQUEST_SIZE, VELOREN_HEADER, }; const RESPONSE_SEND_TIMEOUT: Duration = Duration::from_secs(2); @@ -23,7 +23,7 @@ const RESPONSE_SEND_TIMEOUT: Duration = Duration::from_secs(2); pub struct QueryServer { pub addr: SocketAddr, server_info: watch::Receiver, - pipeline: dgram::Pipeline, + settings: protocol::Settings, } #[derive(Default, Clone, Copy, Debug)] @@ -32,7 +32,6 @@ pub struct Metrics { pub dropped_packets: u32, pub invalid_packets: u32, pub proccessing_errors: u32, - pub ping_requests: u32, pub info_requests: u32, pub sent_responses: u32, pub failed_responses: u32, @@ -44,15 +43,17 @@ impl QueryServer { Self { addr, server_info, - pipeline: crate::create_pipeline(), + settings: Default::default(), } } pub async fn run(&mut self, metrics: Arc>) -> Result<(), tokio::io::Error> { let socket = UdpSocket::bind(self.addr).await?; - let mut buf = Box::new([0; 1024]); + let mut buf = Box::new([0; MAX_REQUEST_SIZE]); loop { + *buf = [0; MAX_REQUEST_SIZE]; + let Ok((len, remote_addr)) = socket.recv_from(&mut *buf).await.inspect_err(|err| { debug!("Error while receiving from query server socket: {err:?}") }) else { @@ -66,7 +67,8 @@ impl QueryServer { let raw_msg_buf = &buf[..len]; let msg_buf = if Self::validate_datagram(raw_msg_buf) { - &raw_msg_buf[VELOREN_HEADER.len()..] + // Require 2 extra bytes for version (currently unused) + &raw_msg_buf[(VELOREN_HEADER.len() + 2)..] } else { new_metrics.dropped_packets += 1; continue; @@ -83,18 +85,16 @@ impl QueryServer { debug!(?error, "Error while processing datagram"); } - *buf = [0; 1024]; - // Update metrics at the end of eath packet - let mut metrics = metrics.write().await; - *metrics += new_metrics; + *metrics.write().await += new_metrics; } } // Header must be discarded after this validation passes fn validate_datagram(data: &[u8]) -> bool { let len = data.len(); - if len < VELOREN_HEADER.len() + 1 { + // Require 2 extra bytes for version (currently unused) + if len < VELOREN_HEADER.len() + 3 { trace!(?len, "Datagram too short"); false } else if data[0..VELOREN_HEADER.len()] != VELOREN_HEADER { @@ -112,7 +112,7 @@ impl QueryServer { (new_metrics, metrics): (&mut Metrics, Arc>), ) -> Result<(), tokio::io::Error> { let Ok(packet): Result = - self.pipeline.receive_from(&mut io::Cursor::new(datagram)) + ::read(&mut io::Cursor::new(datagram), &self.settings) else { new_metrics.invalid_packets += 1; return Ok(()); @@ -132,16 +132,6 @@ impl QueryServer { } } match packet { - QueryServerRequest::Ping(Ping) => { - new_metrics.ping_requests += 1; - tokio::task::spawn(async move { - timed( - Self::send_response(QueryServerResponse::Pong(Pong), remote, &metrics), - &metrics, - ) - .await; - }); - }, QueryServerRequest::ServerInfo(_) => { new_metrics.info_requests += 1; let server_info = *self.server_info.borrow(); @@ -174,11 +164,13 @@ impl QueryServer { return; }; - let mut buf = Vec::new(); - - let mut pipeline = crate::create_pipeline(); - - _ = pipeline.send_to(&mut io::Cursor::new(&mut buf), &response); + let buf = if let Ok(data) = + ::raw_bytes(&response, &Default::default()) + { + data + } else { + Vec::new() + }; match socket.send_to(&buf, addr).await { Ok(_) => { metrics.write().await.sent_responses += 1; @@ -199,7 +191,6 @@ impl std::ops::AddAssign for Metrics { dropped_packets, invalid_packets, proccessing_errors, - ping_requests, info_requests, sent_responses, failed_responses, @@ -210,7 +201,6 @@ impl std::ops::AddAssign for Metrics { self.dropped_packets += dropped_packets; self.invalid_packets += invalid_packets; self.proccessing_errors += proccessing_errors; - self.ping_requests += ping_requests; self.info_requests += info_requests; self.sent_responses += sent_responses; self.failed_responses += failed_responses; diff --git a/server/src/metrics.rs b/server/src/metrics.rs index 7be921c1c7..720c925e2d 100644 --- a/server/src/metrics.rs +++ b/server/src/metrics.rs @@ -75,7 +75,6 @@ pub struct QueryServerMetrics { pub dropped_packets: IntGauge, pub invalid_packets: IntGauge, pub proccessing_errors: IntGauge, - pub ping_requests: IntGauge, pub info_requests: IntGauge, pub sent_responses: IntGauge, pub failed_responses: IntGauge, @@ -456,10 +455,6 @@ impl QueryServerMetrics { "query_server::proccessing_errors", "Amount of errors that occured while processing a query server request", ))?; - let ping_requests = IntGauge::with_opts(Opts::new( - "query_server::ping_requests", - "Amount of ping requests received by the query server", - ))?; let info_requests = IntGauge::with_opts(Opts::new( "query_server::info_requests", "Amount of server info requests received by the query server", @@ -481,7 +476,6 @@ impl QueryServerMetrics { registry.register(Box::new(dropped_packets.clone()))?; registry.register(Box::new(invalid_packets.clone()))?; registry.register(Box::new(proccessing_errors.clone()))?; - registry.register(Box::new(ping_requests.clone()))?; registry.register(Box::new(info_requests.clone()))?; registry.register(Box::new(sent_responses.clone()))?; registry.register(Box::new(failed_responses.clone()))?; @@ -492,7 +486,6 @@ impl QueryServerMetrics { dropped_packets, invalid_packets, proccessing_errors, - ping_requests, info_requests, sent_responses, failed_responses, @@ -507,7 +500,6 @@ impl QueryServerMetrics { dropped_packets, invalid_packets, proccessing_errors, - ping_requests, info_requests, sent_responses, failed_responses, @@ -518,7 +510,6 @@ impl QueryServerMetrics { self.dropped_packets.set(dropped_packets as i64); self.invalid_packets.set(invalid_packets as i64); self.proccessing_errors.set(proccessing_errors as i64); - self.ping_requests.set(ping_requests as i64); self.info_requests.set(info_requests as i64); self.sent_responses.set(sent_responses as i64); self.failed_responses.set(failed_responses as i64); diff --git a/server/src/sys/metrics.rs b/server/src/sys/metrics.rs index 749f793c6a..7fb44ad041 100644 --- a/server/src/sys/metrics.rs +++ b/server/src/sys/metrics.rs @@ -5,8 +5,9 @@ use crate::{ }; use common::{resources::TimeOfDay, slowjob::SlowJobPool, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, SysMetrics, System}; -use specs::{Entities, Join, Read, ReadExpect, Write}; -use std::time::Instant; +use specs::{Entities, Join, Read, ReadExpect}; +use std::{sync::Arc, time::Instant}; +use tokio::sync::RwLock; use veloren_query_server::server::Metrics as RawQueryServerMetrics; /// This system exports metrics @@ -28,8 +29,8 @@ impl<'a> System<'a> for Sys { ReadExpect<'a, TickMetrics>, ReadExpect<'a, PhysicsMetrics>, ReadExpect<'a, JobMetrics>, - Write<'a, Option>, - ReadExpect<'a, QueryServerMetrics>, + Option>>>, + Option>, ); const NAME: &'static str = "metrics"; @@ -53,7 +54,7 @@ impl<'a> System<'a> for Sys { export_tick, export_physics, export_jobs, - mut raw_query_server, + raw_query_server, export_query_server, ): Self::SystemData, ) { @@ -171,8 +172,10 @@ impl<'a> System<'a> for Sys { .with_label_values(&["metrics"]) .observe(len as f64 / NANOSEC_PER_SEC); - if let Some(query_server_metrics) = raw_query_server.as_mut() { - export_query_server.apply(query_server_metrics.reset()); + if let (Some(query_server_metrics), Some(export_query_server)) = + (raw_query_server, export_query_server) + { + export_query_server.apply(query_server_metrics.blocking_write().reset()); } } } diff --git a/server/src/sys/server_info.rs b/server/src/sys/server_info.rs index fd01ec8a70..43eedc9848 100644 --- a/server/src/sys/server_info.rs +++ b/server/src/sys/server_info.rs @@ -9,7 +9,7 @@ use crate::{client::Client, Settings, Tick}; const INFO_SEND_INTERVAL: u64 = 60; lazy_static! { - pub static ref GIT_HASH: [char; 10] = common::util::GIT_HASH[..10] + pub static ref GIT_HASH: [char; 8] = common::util::GIT_HASH[..8] .chars() .collect::>() .try_into() @@ -23,7 +23,7 @@ impl<'a> System<'a> for Sys { type SystemData = ( Read<'a, Tick>, Read<'a, Settings>, - Read<'a, Option>>, + Option>>, ReadStorage<'a, Client>, ); From 8efe53ab4559971fd0c5be492396486d0d7b816c Mon Sep 17 00:00:00 2001 From: crabman Date: Sun, 28 Apr 2024 12:58:23 +0000 Subject: [PATCH 05/12] protect against spoofed addressess --- Cargo.lock | 1 + common/query_server/Cargo.toml | 3 +- common/query_server/examples/demo.rs | 2 +- common/query_server/src/client.rs | 68 +++++++++++++++++++--------- common/query_server/src/proto.rs | 16 +++++++ common/query_server/src/server.rs | 65 ++++++++++++++++++++++---- 6 files changed, 122 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ba2c9fb92..46768b4797 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7108,6 +7108,7 @@ name = "veloren-query-server" version = "0.1.0" dependencies = [ "protocol", + "rand 0.8.5", "tokio", "tracing", "tracing-subscriber", diff --git a/common/query_server/Cargo.toml b/common/query_server/Cargo.toml index 6db272cdcf..b8de9b01e9 100644 --- a/common/query_server/Cargo.toml +++ b/common/query_server/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -server = ["dep:tokio"] +server = ["dep:tokio", "dep:rand"] client = ["dep:tokio", "tokio/time"] example = ["tokio/macros", "tokio/rt-multi-thread", "dep:tracing-subscriber"] default = ["server", "client"] @@ -17,6 +17,7 @@ tokio = { workspace = true, optional = true, features = ["net", "sync"] } protocol = { version = "3.4.0", default-features = false, features = ["derive"] } tracing-subscriber = { version = "0.3.7", optional = true } tracing = { workspace = true } +rand = { workspace = true, optional = true } [[example]] name = "demo" diff --git a/common/query_server/examples/demo.rs b/common/query_server/examples/demo.rs index a2a11b2f11..36a0a98087 100644 --- a/common/query_server/examples/demo.rs +++ b/common/query_server/examples/demo.rs @@ -29,7 +29,7 @@ async fn main() { tokio::task::spawn(async move { server.run(metrics2).await.unwrap() }); - let client = QueryClient { addr }; + let mut client = QueryClient::new(addr); let (info, ping) = client.server_info().await.unwrap(); println!("Ping = {}ms", ping.as_millis()); diff --git a/common/query_server/src/client.rs b/common/query_server/src/client.rs index da4a159c07..1391f32eb6 100644 --- a/common/query_server/src/client.rs +++ b/common/query_server/src/client.rs @@ -6,26 +6,33 @@ use std::{ use protocol::Parcel; use tokio::{net::UdpSocket, time::timeout}; +use tracing::trace; use crate::proto::{ - QueryServerRequest, QueryServerResponse, ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, - VELOREN_HEADER, + QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse, + ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER, }; +const MAX_REQUEST_RETRIES: usize = 5; + #[derive(Debug)] pub enum QueryClientError { Io(tokio::io::Error), Protocol(protocol::Error), InvalidResponse, Timeout, + ChallengeFailed, } pub struct QueryClient { pub addr: SocketAddr, + p: u64, } impl QueryClient { - pub async fn server_info(&self) -> Result<(ServerInfo, Duration), QueryClientError> { + pub fn new(addr: SocketAddr) -> Self { Self { addr, p: 0 } } + + pub async fn server_info(&mut self) -> Result<(ServerInfo, Duration), QueryClientError> { self.send_query(QueryServerRequest::ServerInfo(Default::default())) .await .and_then(|(response, duration)| { @@ -39,33 +46,50 @@ impl QueryClient { } async fn send_query( - &self, + &mut self, request: QueryServerRequest, ) -> Result<(QueryServerResponse, Duration), QueryClientError> { let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?; - let mut buf = Vec::with_capacity(VELOREN_HEADER.len() + 2 + MAX_REQUEST_SIZE); - buf.extend(VELOREN_HEADER); - // 2 extra bytes for version information, currently unused - buf.extend([0; 2]); - buf.extend(::raw_bytes( - &request, - &Default::default(), - )?); + let mut tries = 0; + while tries < MAX_REQUEST_RETRIES { + tries += 1; + let mut buf = Vec::with_capacity(VELOREN_HEADER.len() + 2 + MAX_REQUEST_SIZE); - let query_sent = Instant::now(); - socket.send_to(&buf, self.addr).await?; + buf.extend(VELOREN_HEADER); + // 2 extra bytes for version information, currently unused + buf.extend([0; 2]); + buf.extend(::raw_bytes( + &RawQueryServerRequest { p: self.p, request }, + &Default::default(), + )?); - let mut buf = vec![0; MAX_RESPONSE_SIZE]; - let _ = timeout(Duration::from_secs(2), socket.recv_from(&mut buf)) - .await - .map_err(|_| QueryClientError::Timeout)? - .map_err(|_| QueryClientError::Timeout)?; + let query_sent = Instant::now(); + socket.send_to(&buf, self.addr).await?; - let packet = - ::read(&mut io::Cursor::new(buf), &Default::default())?; + let mut buf = vec![0; MAX_RESPONSE_SIZE]; + let _ = timeout(Duration::from_secs(2), socket.recv_from(&mut buf)) + .await + .map_err(|_| QueryClientError::Timeout)? + .map_err(|_| QueryClientError::Timeout)?; - Ok((packet, query_sent.elapsed())) + let packet = ::read( + &mut io::Cursor::new(buf), + &Default::default(), + )?; + + match packet { + RawQueryServerResponse::Response(response) => { + return Ok((response, query_sent.elapsed())); + }, + RawQueryServerResponse::P(p) => { + trace!(?p, "Resetting p"); + self.p = p + }, + } + } + + Err(QueryClientError::ChallengeFailed) } } diff --git a/common/query_server/src/proto.rs b/common/query_server/src/proto.rs index 79b3be6d56..83c8bff730 100644 --- a/common/query_server/src/proto.rs +++ b/common/query_server/src/proto.rs @@ -4,6 +4,14 @@ pub const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n']; pub const MAX_REQUEST_SIZE: usize = 300; pub const MAX_RESPONSE_SIZE: usize = 256; +#[derive(Protocol, Debug, Clone, Copy)] +#[protocol(discriminant = "integer")] +#[protocol(discriminator(u8))] +pub struct RawQueryServerRequest { + pub p: u64, + pub request: QueryServerRequest, +} + #[derive(Protocol, Debug, Clone, Copy)] #[protocol(discriminant = "integer")] #[protocol(discriminator(u8))] @@ -13,6 +21,14 @@ pub enum QueryServerRequest { // New requests should be added at the end to prevent breakage } +#[derive(Protocol, Debug, Clone, Copy)] +#[protocol(discriminant = "integer")] +#[protocol(discriminator(u8))] +pub enum RawQueryServerResponse { + Response(QueryServerResponse), + P(u64), +} + #[derive(Protocol, Debug, Clone, Copy)] #[protocol(discriminant = "integer")] #[protocol(discriminator(u8))] diff --git a/common/query_server/src/server.rs b/common/query_server/src/server.rs index 2993e9bf47..396c57c866 100644 --- a/common/query_server/src/server.rs +++ b/common/query_server/src/server.rs @@ -1,12 +1,15 @@ +#[allow(deprecated)] use std::hash::SipHasher; use std::{ future::Future, + hash::{Hash, Hasher}, io, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, sync::Arc, - time::Duration, + time::{Duration, Instant}, }; use protocol::Parcel; +use rand::{thread_rng, Rng}; use tokio::{ net::UdpSocket, sync::{watch, RwLock}, @@ -15,10 +18,12 @@ use tokio::{ use tracing::{debug, trace}; use crate::proto::{ - QueryServerRequest, QueryServerResponse, ServerInfo, MAX_REQUEST_SIZE, VELOREN_HEADER, + QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse, + ServerInfo, MAX_REQUEST_SIZE, VELOREN_HEADER, }; const RESPONSE_SEND_TIMEOUT: Duration = Duration::from_secs(2); +const SECRET_REGEN_INTERNVAL: Duration = Duration::from_secs(60); pub struct QueryServer { pub addr: SocketAddr, @@ -50,6 +55,13 @@ impl QueryServer { pub async fn run(&mut self, metrics: Arc>) -> Result<(), tokio::io::Error> { let socket = UdpSocket::bind(self.addr).await?; + let gen_secret = || { + let mut rng = thread_rng(); + (rng.gen::(), rng.gen::()) + }; + let mut secrets = gen_secret(); + let mut last_secret_refresh = Instant::now(); + let mut buf = Box::new([0; MAX_REQUEST_SIZE]); loop { *buf = [0; MAX_REQUEST_SIZE]; @@ -78,6 +90,7 @@ impl QueryServer { .process_datagram( msg_buf, remote_addr, + secrets, (&mut new_metrics, Arc::clone(&metrics)), ) .await @@ -87,6 +100,14 @@ impl QueryServer { // Update metrics at the end of eath packet *metrics.write().await += new_metrics; + + { + let now = Instant::now(); + if now.duration_since(last_secret_refresh) > SECRET_REGEN_INTERNVAL { + last_secret_refresh = now; + secrets = gen_secret(); + } + } } } @@ -109,16 +130,27 @@ impl QueryServer { &mut self, datagram: &[u8], remote: SocketAddr, + secrets: (u64, u64), (new_metrics, metrics): (&mut Metrics, Arc>), ) -> Result<(), tokio::io::Error> { - let Ok(packet): Result = - ::read(&mut io::Cursor::new(datagram), &self.settings) + let Ok(RawQueryServerRequest { + p: client_p, + request, + }) = + ::read(&mut io::Cursor::new(datagram), &self.settings) else { new_metrics.invalid_packets += 1; return Ok(()); }; - trace!(?packet, "Received packet"); + trace!(?request, "Received packet"); + + #[allow(deprecated)] + let real_p = { + let mut hasher = SipHasher::new_with_keys(secrets.0, secrets.1); + remote.ip().hash(&mut hasher); + hasher.finish() + }; async fn timed<'a, F: Future + 'a, O>( fut: F, @@ -131,14 +163,29 @@ impl QueryServer { None } } - match packet { + + if real_p != client_p { + tokio::task::spawn(async move { + timed( + Self::send_response(RawQueryServerResponse::P(real_p), remote, &metrics), + &metrics, + ) + .await; + }); + + return Ok(()); + } + + match request { QueryServerRequest::ServerInfo(_) => { new_metrics.info_requests += 1; let server_info = *self.server_info.borrow(); tokio::task::spawn(async move { timed( Self::send_response( - QueryServerResponse::ServerInfo(server_info), + RawQueryServerResponse::Response(QueryServerResponse::ServerInfo( + server_info, + )), remote, &metrics, ), @@ -153,7 +200,7 @@ impl QueryServer { } async fn send_response( - response: QueryServerResponse, + response: RawQueryServerResponse, addr: SocketAddr, metrics: &Arc>, ) { @@ -165,7 +212,7 @@ impl QueryServer { }; let buf = if let Ok(data) = - ::raw_bytes(&response, &Default::default()) + ::raw_bytes(&response, &Default::default()) { data } else { From 4c0cadabcf5b39b11a5f7cb081a3f97c1e3d0f87 Mon Sep 17 00:00:00 2001 From: crabman Date: Sun, 28 Apr 2024 13:49:37 +0000 Subject: [PATCH 06/12] query server ratelimiting --- common/query_server/examples/demo.rs | 7 ++- common/query_server/src/lib.rs | 1 + common/query_server/src/ratelimit.rs | 89 ++++++++++++++++++++++++++++ common/query_server/src/server.rs | 24 ++++++-- server/src/metrics.rs | 9 +++ 5 files changed, 124 insertions(+), 6 deletions(-) create mode 100644 common/query_server/src/ratelimit.rs diff --git a/common/query_server/examples/demo.rs b/common/query_server/examples/demo.rs index 36a0a98087..0263cba6dc 100644 --- a/common/query_server/examples/demo.rs +++ b/common/query_server/examples/demo.rs @@ -5,6 +5,7 @@ use std::{ }; use tokio::sync::{watch, RwLock}; +use tracing::error; use veloren_query_server::{ client::QueryClient, proto::{ServerBattleMode, ServerInfo}, @@ -38,8 +39,10 @@ async fn main() { let start = Instant::now(); - for _i in 0..10000 { - client.server_info().await.unwrap(); + for _i in 0..32 { + if let Err(error) = client.server_info().await { + error!(?error, "Server info request error"); + } } println!("Metrics = {:#?}", metrics.read().await); diff --git a/common/query_server/src/lib.rs b/common/query_server/src/lib.rs index bca951ce34..6df1b6ff37 100644 --- a/common/query_server/src/lib.rs +++ b/common/query_server/src/lib.rs @@ -1,3 +1,4 @@ #[cfg(feature = "client")] pub mod client; pub mod proto; +mod ratelimit; #[cfg(feature = "server")] pub mod server; diff --git a/common/query_server/src/ratelimit.rs b/common/query_server/src/ratelimit.rs new file mode 100644 index 0000000000..70f72136b7 --- /dev/null +++ b/common/query_server/src/ratelimit.rs @@ -0,0 +1,89 @@ +use std::{ + collections::HashMap, + net::IpAddr, + time::{Duration, Instant}, +}; + +const SHIFT_EVERY: Duration = Duration::from_secs(15); + +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub enum ReducedIpAddr { + V4(u32), + V6(u64), +} + +/// Per-IP state, divided into 4 segments of [`SHIFT_EVERY`] each (one minute at +/// the time of writing). +pub struct IpState([u16; 4]); + +pub struct RateLimiter { + states: HashMap, + last_shift: Instant, + /// Maximum amount requests that can be done in `4 * SHIFT_EVERY` + limit: u16, +} + +impl RateLimiter { + pub fn new(limit: u16) -> Self { + Self { + states: Default::default(), + last_shift: Instant::now(), + limit, + } + } + + pub fn maintain(&mut self, now: Instant) { + if now.duration_since(self.last_shift) > SHIFT_EVERY { + for (_, state) in self.states.iter_mut() { + state.shift(); + } + + // Remove empty states + self.states.retain(|_, state| !state.is_empty()); + } + } + + pub fn can_request(&mut self, ip: ReducedIpAddr) -> bool { + if let Some(state) = self.states.get_mut(&ip) { + if state.total() >= self.limit { + state.0[0] = state.0[0].saturating_add(1); + false + } else { + state.0[1] += 1; + true + } + } else { + self.states.insert(ip, IpState::default()); + true + } + } +} + +impl IpState { + fn shift(&mut self) { + self.0.rotate_right(1); + self.0[0] = 0; + } + + fn is_empty(&self) -> bool { self.0.iter().all(|&freq| freq == 0) } + + fn total(&self) -> u16 { self.0.iter().fold(0, |total, &v| total.saturating_add(v)) } +} + +impl Default for IpState { + fn default() -> Self { Self([1, 0, 0, 0]) } +} + +impl From for ReducedIpAddr { + fn from(value: IpAddr) -> Self { + match value { + IpAddr::V4(v4) => Self::V4(u32::from_be_bytes(v4.octets())), + IpAddr::V6(v6) => { + let bytes = v6.octets(); + Self::V6(u64::from_be_bytes([ + bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], + ])) + }, + } + } +} diff --git a/common/query_server/src/server.rs b/common/query_server/src/server.rs index 396c57c866..52088836d9 100644 --- a/common/query_server/src/server.rs +++ b/common/query_server/src/server.rs @@ -17,9 +17,12 @@ use tokio::{ }; use tracing::{debug, trace}; -use crate::proto::{ - QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse, - ServerInfo, MAX_REQUEST_SIZE, VELOREN_HEADER, +use crate::{ + proto::{ + QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse, + ServerInfo, MAX_REQUEST_SIZE, VELOREN_HEADER, + }, + ratelimit::{RateLimiter, ReducedIpAddr}, }; const RESPONSE_SEND_TIMEOUT: Duration = Duration::from_secs(2); @@ -29,6 +32,7 @@ pub struct QueryServer { pub addr: SocketAddr, server_info: watch::Receiver, settings: protocol::Settings, + ratelimit: RateLimiter, } #[derive(Default, Clone, Copy, Debug)] @@ -41,6 +45,7 @@ pub struct Metrics { pub sent_responses: u32, pub failed_responses: u32, pub timed_out_responses: u32, + pub ratelimited: u32, } impl QueryServer { @@ -48,6 +53,7 @@ impl QueryServer { Self { addr, server_info, + ratelimit: RateLimiter::new(30), settings: Default::default(), } } @@ -107,6 +113,8 @@ impl QueryServer { last_secret_refresh = now; secrets = gen_secret(); } + + self.ratelimit.maintain(now); } } } @@ -148,7 +156,7 @@ impl QueryServer { #[allow(deprecated)] let real_p = { let mut hasher = SipHasher::new_with_keys(secrets.0, secrets.1); - remote.ip().hash(&mut hasher); + ReducedIpAddr::from(remote.ip()).hash(&mut hasher); hasher.finish() }; @@ -176,6 +184,12 @@ impl QueryServer { return Ok(()); } + if !self.ratelimit.can_request(remote.ip().into()) { + trace!("Ratelimited request"); + new_metrics.ratelimited += 1; + return Ok(()); + } + match request { QueryServerRequest::ServerInfo(_) => { new_metrics.info_requests += 1; @@ -242,6 +256,7 @@ impl std::ops::AddAssign for Metrics { sent_responses, failed_responses, timed_out_responses, + ratelimited, }: Self, ) { self.received_packets += received_packets; @@ -252,6 +267,7 @@ impl std::ops::AddAssign for Metrics { self.sent_responses += sent_responses; self.failed_responses += failed_responses; self.timed_out_responses += timed_out_responses; + self.ratelimited += ratelimited; } } diff --git a/server/src/metrics.rs b/server/src/metrics.rs index 720c925e2d..2e4cc478db 100644 --- a/server/src/metrics.rs +++ b/server/src/metrics.rs @@ -79,6 +79,7 @@ pub struct QueryServerMetrics { pub sent_responses: IntGauge, pub failed_responses: IntGauge, pub timed_out_responses: IntGauge, + pub ratelimited: IntGauge, } impl PhysicsMetrics { @@ -471,6 +472,10 @@ impl QueryServerMetrics { "query_server::timed_out_responses", "Amount of responses which timed out", ))?; + let ratelimited = IntGauge::with_opts(Opts::new( + "query_server::ratelimited", + "Ratelimited requests to the query server", + ))?; registry.register(Box::new(received_packets.clone()))?; registry.register(Box::new(dropped_packets.clone()))?; @@ -480,6 +485,7 @@ impl QueryServerMetrics { registry.register(Box::new(sent_responses.clone()))?; registry.register(Box::new(failed_responses.clone()))?; registry.register(Box::new(timed_out_responses.clone()))?; + registry.register(Box::new(ratelimited.clone()))?; Ok(Self { received_packets, @@ -490,6 +496,7 @@ impl QueryServerMetrics { sent_responses, failed_responses, timed_out_responses, + ratelimited, }) } @@ -504,6 +511,7 @@ impl QueryServerMetrics { sent_responses, failed_responses, timed_out_responses, + ratelimited, }: veloren_query_server::server::Metrics, ) { self.received_packets.set(received_packets as i64); @@ -514,5 +522,6 @@ impl QueryServerMetrics { self.sent_responses.set(sent_responses as i64); self.failed_responses.set(failed_responses as i64); self.timed_out_responses.set(timed_out_responses as i64); + self.ratelimited.set(ratelimited as i64); } } From 4455c777ce04c184e006e054e993578b753ba7c7 Mon Sep 17 00:00:00 2001 From: crabman Date: Mon, 29 Apr 2024 07:20:07 +0000 Subject: [PATCH 07/12] address review comments --- common/frontend/src/lib.rs | 1 + common/query_server/Cargo.toml | 6 +- common/query_server/examples/demo.rs | 17 +- common/query_server/src/client.rs | 70 ++++++-- common/query_server/src/proto.rs | 45 +++--- common/query_server/src/ratelimit.rs | 20 +-- common/query_server/src/server.rs | 231 +++++++++++++++------------ common/src/util/mod.rs | 5 + server/src/lib.rs | 17 +- server/src/metrics.rs | 63 ++++---- server/src/sys/metrics.rs | 18 ++- server/src/sys/server_info.rs | 24 +-- 12 files changed, 301 insertions(+), 216 deletions(-) diff --git a/common/frontend/src/lib.rs b/common/frontend/src/lib.rs index 97120a75d4..f2471fcb51 100644 --- a/common/frontend/src/lib.rs +++ b/common/frontend/src/lib.rs @@ -75,6 +75,7 @@ where "refinery_core::traits::divergent=off", "veloren_server::persistence::character=info", "veloren_server::settings=info", + "veloren_query_server=info", ]; for s in default_directives { diff --git a/common/query_server/Cargo.toml b/common/query_server/Cargo.toml index b8de9b01e9..69aa701518 100644 --- a/common/query_server/Cargo.toml +++ b/common/query_server/Cargo.toml @@ -7,13 +7,13 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -server = ["dep:tokio", "dep:rand"] -client = ["dep:tokio", "tokio/time"] +server = ["dep:rand"] +client = ["tokio/time"] example = ["tokio/macros", "tokio/rt-multi-thread", "dep:tracing-subscriber"] default = ["server", "client"] [dependencies] -tokio = { workspace = true, optional = true, features = ["net", "sync"] } +tokio = { workspace = true, features = ["net", "sync"] } protocol = { version = "3.4.0", default-features = false, features = ["derive"] } tracing-subscriber = { version = "0.3.7", optional = true } tracing = { workspace = true } diff --git a/common/query_server/examples/demo.rs b/common/query_server/examples/demo.rs index 0263cba6dc..a649ed683d 100644 --- a/common/query_server/examples/demo.rs +++ b/common/query_server/examples/demo.rs @@ -1,10 +1,10 @@ use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, - sync::Arc, + sync::{Arc, Mutex}, time::Instant, }; -use tokio::sync::{watch, RwLock}; +use tokio::sync::watch; use tracing::error; use veloren_query_server::{ client::QueryClient, @@ -13,7 +13,8 @@ use veloren_query_server::{ }; const DEFAULT_SERVER_INFO: ServerInfo = ServerInfo { - git_hash: ['\0'; 8], + git_hash: 0, + git_version: 0, players_count: 100, player_cap: 300, battlemode: ServerBattleMode::GlobalPvE, @@ -24,8 +25,8 @@ async fn main() { tracing_subscriber::fmt::init(); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 14006); let (_sender, receiver) = watch::channel(DEFAULT_SERVER_INFO); - let mut server = QueryServer::new(addr, receiver); - let metrics = Arc::new(RwLock::new(Metrics::default())); + let mut server = QueryServer::new(addr, receiver, 10002); + let metrics = Arc::new(Mutex::new(Metrics::default())); let metrics2 = Arc::clone(&metrics); tokio::task::spawn(async move { server.run(metrics2).await.unwrap() }); @@ -39,12 +40,12 @@ async fn main() { let start = Instant::now(); - for _i in 0..32 { - if let Err(error) = client.server_info().await { + for _i in 0..10000 { + if let Err(error) = client.ping().await { error!(?error, "Server info request error"); } } - println!("Metrics = {:#?}", metrics.read().await); + println!("Metrics = {:#?}", metrics.lock().unwrap()); dbg!(start.elapsed()); } diff --git a/common/query_server/src/client.rs b/common/query_server/src/client.rs index 1391f32eb6..c5ebf3306e 100644 --- a/common/query_server/src/client.rs +++ b/common/query_server/src/client.rs @@ -6,13 +6,14 @@ use std::{ use protocol::Parcel; use tokio::{net::UdpSocket, time::timeout}; -use tracing::trace; +use tracing::{trace, warn}; use crate::proto::{ QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse, - ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER, + ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER, VERSION, }; +// This must be at least 2 for the client to get a value for the `p` field. const MAX_REQUEST_RETRIES: usize = 5; #[derive(Debug)] @@ -20,10 +21,14 @@ pub enum QueryClientError { Io(tokio::io::Error), Protocol(protocol::Error), InvalidResponse, + InvalidVersion, Timeout, ChallengeFailed, + RequestTooLarge, } +/// The `p` field has to be requested from the server each time this client is +/// constructed, if possible reuse this! pub struct QueryClient { pub addr: SocketAddr, p: u64, @@ -33,10 +38,9 @@ impl QueryClient { pub fn new(addr: SocketAddr) -> Self { Self { addr, p: 0 } } pub async fn server_info(&mut self) -> Result<(ServerInfo, Duration), QueryClientError> { - self.send_query(QueryServerRequest::ServerInfo(Default::default())) + self.send_query(QueryServerRequest::ServerInfo) .await .and_then(|(response, duration)| { - #[allow(irrefutable_let_patterns)] // TODO: remove when more variants are added if let QueryServerResponse::ServerInfo(info) = response { Ok((info, duration)) } else { @@ -45,36 +49,68 @@ impl QueryClient { }) } + pub async fn ping(&mut self) -> Result { + self.send_query(QueryServerRequest::Ping) + .await + .and_then(|(response, duration)| { + if let QueryServerResponse::Pong = response { + Ok(duration) + } else { + Err(QueryClientError::InvalidResponse) + } + }) + } + async fn send_query( &mut self, request: QueryServerRequest, ) -> Result<(QueryServerResponse, Duration), QueryClientError> { let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?; - let mut tries = 0; - while tries < MAX_REQUEST_RETRIES { - tries += 1; + for _ in 0..MAX_REQUEST_RETRIES { let mut buf = Vec::with_capacity(VELOREN_HEADER.len() + 2 + MAX_REQUEST_SIZE); - buf.extend(VELOREN_HEADER); // 2 extra bytes for version information, currently unused - buf.extend([0; 2]); - buf.extend(::raw_bytes( - &RawQueryServerRequest { p: self.p, request }, - &Default::default(), - )?); + buf.extend(VERSION.to_le_bytes()); + buf.extend({ + let request_data = ::raw_bytes( + &RawQueryServerRequest { p: self.p, request }, + &Default::default(), + )?; + if request_data.len() > MAX_REQUEST_SIZE { + warn!( + ?request, + ?MAX_REQUEST_SIZE, + "Attempted to send request larger than the max size ({})", + request_data.len() + ); + Err(QueryClientError::RequestTooLarge)? + } + request_data + }); + buf.resize(2 + MAX_RESPONSE_SIZE, 0); + buf.extend(VELOREN_HEADER); let query_sent = Instant::now(); socket.send_to(&buf, self.addr).await?; let mut buf = vec![0; MAX_RESPONSE_SIZE]; - let _ = timeout(Duration::from_secs(2), socket.recv_from(&mut buf)) + let (buf_len, _) = timeout(Duration::from_secs(2), socket.recv_from(&mut buf)) .await - .map_err(|_| QueryClientError::Timeout)? - .map_err(|_| QueryClientError::Timeout)?; + .map_err(|_| QueryClientError::Timeout)??; + + if buf_len <= 2 { + Err(QueryClientError::InvalidResponse)? + } + + // FIXME: Allow lower versions once proper versioning is added. + if u16::from_le_bytes(buf[..2].try_into().unwrap()) != VERSION { + Err(QueryClientError::InvalidVersion)? + } let packet = ::read( - &mut io::Cursor::new(buf), + // TODO: Remove this padding once version information is added to packets + &mut io::Cursor::new(&buf[2..buf_len]), &Default::default(), )?; diff --git a/common/query_server/src/proto.rs b/common/query_server/src/proto.rs index 83c8bff730..ea6d00b98e 100644 --- a/common/query_server/src/proto.rs +++ b/common/query_server/src/proto.rs @@ -1,13 +1,17 @@ use protocol::Protocol; -pub const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n']; -pub const MAX_REQUEST_SIZE: usize = 300; -pub const MAX_RESPONSE_SIZE: usize = 256; +pub(crate) const VERSION: u16 = 0; +pub(crate) const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n']; +// The actual maximum size of packets will be `MAX_REQUEST_SIZE + +// VELOREN_HEADER.len() + 2` (2 added for currently unused version). +// NOTE: The actual maximum size must never exceed 1200 or we risk getting near +// MTU limits for some networks. +pub(crate) const MAX_REQUEST_SIZE: usize = 300; +pub(crate) const MAX_RESPONSE_SIZE: usize = 256; #[derive(Protocol, Debug, Clone, Copy)] -#[protocol(discriminant = "integer")] -#[protocol(discriminator(u8))] -pub struct RawQueryServerRequest { +pub(crate) struct RawQueryServerRequest { + /// See comment on [`RawQueryServerResponse::P`] pub p: u64, pub request: QueryServerRequest, } @@ -17,15 +21,22 @@ pub struct RawQueryServerRequest { #[protocol(discriminator(u8))] #[allow(clippy::large_enum_variant)] pub enum QueryServerRequest { - ServerInfo(ServerInfoRequest), + ServerInfo, + Ping, // New requests should be added at the end to prevent breakage } #[derive(Protocol, Debug, Clone, Copy)] #[protocol(discriminant = "integer")] #[protocol(discriminator(u8))] -pub enum RawQueryServerResponse { +pub(crate) enum RawQueryServerResponse { Response(QueryServerResponse), + /// This is used as a challenge to prevent IP address spoofing by verifying + /// that the client can receive from the source address. + /// + /// Any request to the server must include this value to be processed, + /// otherwise this response will be returned (giving clients a value to pass + /// for later requests). P(u64), } @@ -34,18 +45,14 @@ pub enum RawQueryServerResponse { #[protocol(discriminator(u8))] pub enum QueryServerResponse { ServerInfo(ServerInfo), + Pong, // New responses should be added at the end to prevent breakage } -#[derive(Protocol, Debug, Clone, Copy)] -pub struct ServerInfoRequest { - // Padding to prevent amplification attacks - pub _padding: [u8; 256], -} - #[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)] pub struct ServerInfo { - pub git_hash: [char; 8], + pub git_hash: u32, + pub git_version: i64, pub players_count: u16, pub player_cap: u16, pub battlemode: ServerBattleMode, @@ -60,11 +67,3 @@ pub enum ServerBattleMode { GlobalPvE, PerPlayer, } - -impl Default for ServerInfoRequest { - fn default() -> Self { ServerInfoRequest { _padding: [0; 256] } } -} - -impl ServerInfo { - pub fn git_hash(&self) -> String { String::from_iter(&self.git_hash) } -} diff --git a/common/query_server/src/ratelimit.rs b/common/query_server/src/ratelimit.rs index 70f72136b7..01b2a76f62 100644 --- a/common/query_server/src/ratelimit.rs +++ b/common/query_server/src/ratelimit.rs @@ -34,24 +34,20 @@ impl RateLimiter { pub fn maintain(&mut self, now: Instant) { if now.duration_since(self.last_shift) > SHIFT_EVERY { - for (_, state) in self.states.iter_mut() { - state.shift(); - } - // Remove empty states - self.states.retain(|_, state| !state.is_empty()); + self.states.retain(|_, state| { + state.shift(); + !state.is_empty() + }); + self.last_shift = now; } } pub fn can_request(&mut self, ip: ReducedIpAddr) -> bool { if let Some(state) = self.states.get_mut(&ip) { - if state.total() >= self.limit { - state.0[0] = state.0[0].saturating_add(1); - false - } else { - state.0[1] += 1; - true - } + state.0[0] = state.0[0].saturating_add(1); + + state.total() < self.limit } else { self.states.insert(ip, IpState::default()); true diff --git a/common/query_server/src/server.rs b/common/query_server/src/server.rs index 52088836d9..6f6f13b9bb 100644 --- a/common/query_server/src/server.rs +++ b/common/query_server/src/server.rs @@ -1,35 +1,29 @@ #[allow(deprecated)] use std::hash::SipHasher; use std::{ - future::Future, hash::{Hash, Hasher}, - io, - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, - sync::Arc, + io::{self, ErrorKind}, + net::SocketAddr, + sync::{Arc, Mutex}, time::{Duration, Instant}, }; use protocol::Parcel; use rand::{thread_rng, Rng}; -use tokio::{ - net::UdpSocket, - sync::{watch, RwLock}, - time::timeout, -}; -use tracing::{debug, trace}; +use tokio::{net::UdpSocket, sync::watch}; +use tracing::{debug, error, trace}; use crate::{ proto::{ QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse, - ServerInfo, MAX_REQUEST_SIZE, VELOREN_HEADER, + ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER, VERSION, }, ratelimit::{RateLimiter, ReducedIpAddr}, }; -const RESPONSE_SEND_TIMEOUT: Duration = Duration::from_secs(2); -const SECRET_REGEN_INTERNVAL: Duration = Duration::from_secs(60); +const SECRET_REGEN_INTERNVAL: Duration = Duration::from_secs(300); pub struct QueryServer { - pub addr: SocketAddr, + addr: SocketAddr, server_info: watch::Receiver, settings: protocol::Settings, ratelimit: RateLimiter, @@ -42,6 +36,7 @@ pub struct Metrics { pub invalid_packets: u32, pub proccessing_errors: u32, pub info_requests: u32, + pub ping_requests: u32, pub sent_responses: u32, pub failed_responses: u32, pub timed_out_responses: u32, @@ -49,17 +44,23 @@ pub struct Metrics { } impl QueryServer { - pub fn new(addr: SocketAddr, server_info: watch::Receiver) -> Self { + pub fn new(addr: SocketAddr, server_info: watch::Receiver, ratelimit: u16) -> Self { Self { addr, server_info, - ratelimit: RateLimiter::new(30), + ratelimit: RateLimiter::new(ratelimit), settings: Default::default(), } } - pub async fn run(&mut self, metrics: Arc>) -> Result<(), tokio::io::Error> { - let socket = UdpSocket::bind(self.addr).await?; + /// This produces TRACE level logs for any packet received on the assigned + /// port. To prevent potentially unfettered log spam, disable the TRACE + /// level for this crate (when outside of debugging contexts). + /// + /// NOTE: TRACE and DEBUG levels are disabled by default for this crate when + /// using `veloren-common-frontend`. + pub async fn run(&mut self, metrics: Arc>) -> Result<(), tokio::io::Error> { + let mut socket = UdpSocket::bind(self.addr).await?; let gen_secret = || { let mut rng = thread_rng(); @@ -70,12 +71,20 @@ impl QueryServer { let mut buf = Box::new([0; MAX_REQUEST_SIZE]); loop { - *buf = [0; MAX_REQUEST_SIZE]; - - let Ok((len, remote_addr)) = socket.recv_from(&mut *buf).await.inspect_err(|err| { - debug!("Error while receiving from query server socket: {err:?}") - }) else { - continue; + let (len, remote_addr) = match socket.recv_from(&mut *buf).await { + Ok(v) => v, + Err(e) if e.kind() == ErrorKind::NotConnected => { + error!( + ?e, + "Query server connection was closed, re-binding to socket..." + ); + socket = UdpSocket::bind(self.addr).await?; + continue; + }, + err => { + debug!(?err, "Error while receiving from query server socket"); + continue; + }, }; let mut new_metrics = Metrics { @@ -86,26 +95,19 @@ impl QueryServer { let raw_msg_buf = &buf[..len]; let msg_buf = if Self::validate_datagram(raw_msg_buf) { // Require 2 extra bytes for version (currently unused) - &raw_msg_buf[(VELOREN_HEADER.len() + 2)..] + &raw_msg_buf[2..(raw_msg_buf.len() - VELOREN_HEADER.len())] } else { new_metrics.dropped_packets += 1; continue; }; - if let Err(error) = self - .process_datagram( - msg_buf, - remote_addr, - secrets, - (&mut new_metrics, Arc::clone(&metrics)), - ) - .await - { - debug!(?error, "Error while processing datagram"); - } + self.process_datagram(msg_buf, remote_addr, secrets, &mut new_metrics, &socket) + .await; // Update metrics at the end of eath packet - *metrics.write().await += new_metrics; + if let Ok(mut metrics) = metrics.lock() { + *metrics += new_metrics; + } { let now = Instant::now(); @@ -123,12 +125,22 @@ impl QueryServer { fn validate_datagram(data: &[u8]) -> bool { let len = data.len(); // Require 2 extra bytes for version (currently unused) - if len < VELOREN_HEADER.len() + 3 { + if len < MAX_RESPONSE_SIZE.max(VELOREN_HEADER.len() + 2) { trace!(?len, "Datagram too short"); false - } else if data[0..VELOREN_HEADER.len()] != VELOREN_HEADER { + } else if len > (MAX_REQUEST_SIZE + VELOREN_HEADER.len() + 2) { + trace!(?len, "Datagram too large"); + false + } else if data[(len - VELOREN_HEADER.len())..] != VELOREN_HEADER { trace!(?len, "Datagram header invalid"); false + // FIXME: Allow lower versions once proper versioning is added. + } else if u16::from_ne_bytes(data[..2].try_into().unwrap()) != VERSION { + trace!( + "Datagram has invalid version {:?}, current {VERSION:?}", + &data[..2] + ); + false } else { true } @@ -139,106 +151,117 @@ impl QueryServer { datagram: &[u8], remote: SocketAddr, secrets: (u64, u64), - (new_metrics, metrics): (&mut Metrics, Arc>), - ) -> Result<(), tokio::io::Error> { + metrics: &mut Metrics, + socket: &UdpSocket, + ) { let Ok(RawQueryServerRequest { p: client_p, request, }) = ::read(&mut io::Cursor::new(datagram), &self.settings) else { - new_metrics.invalid_packets += 1; - return Ok(()); + metrics.invalid_packets += 1; + return; }; trace!(?request, "Received packet"); #[allow(deprecated)] let real_p = { + // Use SipHash-2-4 to compute the `p` value from a server specific + // secret and the client's address. + // + // This is used to verify that packets are from an entity that can + // receive packets at the given address. + // + // Only use the first 64 bits from Ipv6 addresses since the latter + // 64 bits can change very frequently (as much as for every + // request). let mut hasher = SipHasher::new_with_keys(secrets.0, secrets.1); ReducedIpAddr::from(remote.ip()).hash(&mut hasher); hasher.finish() }; - async fn timed<'a, F: Future + 'a, O>( - fut: F, - metrics: &'a Arc>, - ) -> Option { - if let Ok(res) = timeout(RESPONSE_SEND_TIMEOUT, fut).await { - Some(res) - } else { - metrics.write().await.timed_out_responses += 1; - None - } - } - if real_p != client_p { - tokio::task::spawn(async move { - timed( - Self::send_response(RawQueryServerResponse::P(real_p), remote, &metrics), - &metrics, - ) - .await; - }); + Self::send_response(RawQueryServerResponse::P(real_p), remote, socket, metrics).await; - return Ok(()); + return; } if !self.ratelimit.can_request(remote.ip().into()) { trace!("Ratelimited request"); - new_metrics.ratelimited += 1; - return Ok(()); + metrics.ratelimited += 1; + return; } match request { - QueryServerRequest::ServerInfo(_) => { - new_metrics.info_requests += 1; + QueryServerRequest::ServerInfo => { + metrics.info_requests += 1; let server_info = *self.server_info.borrow(); - tokio::task::spawn(async move { - timed( - Self::send_response( - RawQueryServerResponse::Response(QueryServerResponse::ServerInfo( - server_info, - )), - remote, - &metrics, - ), - &metrics, - ) - .await; - }); + Self::send_response( + RawQueryServerResponse::Response(QueryServerResponse::ServerInfo(server_info)), + remote, + socket, + metrics, + ) + .await; + }, + QueryServerRequest::Ping => { + metrics.ping_requests += 1; + Self::send_response( + RawQueryServerResponse::Response(QueryServerResponse::Pong), + remote, + socket, + metrics, + ) + .await; }, } - - Ok(()) } async fn send_response( response: RawQueryServerResponse, addr: SocketAddr, - metrics: &Arc>, + socket: &UdpSocket, + metrics: &mut Metrics, ) { - let Ok(socket) = - UdpSocket::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))).await - else { - debug!("Failed to create response socket"); - return; - }; + // TODO: Remove this extra padding once we add version information to requests + let mut buf = Vec::from(VERSION.to_ne_bytes()); - let buf = if let Ok(data) = - ::raw_bytes(&response, &Default::default()) - { - data - } else { - Vec::new() - }; - match socket.send_to(&buf, addr).await { - Ok(_) => { - metrics.write().await.sent_responses += 1; + match ::raw_bytes(&response, &Default::default()) { + Ok(data) => { + buf.extend(data); + + if buf.len() > MAX_RESPONSE_SIZE { + error!( + ?MAX_RESPONSE_SIZE, + "Attempted to send a response larger than the maximum allowed size (size: \ + {}, response: {response:?})", + buf.len() + ); + #[cfg(debug_assertions)] + panic!( + "Attempted to send a response larger than the maximum allowed size (size: \ + {}, max: {}, response: {response:?})", + buf.len(), + MAX_RESPONSE_SIZE + ); + } + + match socket.send_to(&buf, addr).await { + Ok(_) => { + metrics.sent_responses += 1; + }, + Err(err) => { + metrics.failed_responses += 1; + debug!(?err, "Failed to send query server response"); + }, + } }, - Err(err) => { - metrics.write().await.failed_responses += 1; - debug!(?err, "Failed to send query server response"); + Err(error) => { + trace!(?error, "Failed to serialize response"); + #[cfg(debug_assertions)] + panic!("Serializing response failed: {error:?} ({response:?})"); }, } } @@ -253,6 +276,7 @@ impl std::ops::AddAssign for Metrics { invalid_packets, proccessing_errors, info_requests, + ping_requests, sent_responses, failed_responses, timed_out_responses, @@ -264,6 +288,7 @@ impl std::ops::AddAssign for Metrics { self.invalid_packets += invalid_packets; self.proccessing_errors += proccessing_errors; self.info_requests += info_requests; + self.ping_requests += ping_requests; self.sent_responses += sent_responses; self.failed_responses += failed_responses; self.timed_out_responses += timed_out_responses; @@ -273,5 +298,7 @@ impl std::ops::AddAssign for Metrics { impl Metrics { /// Resets all metrics to 0 and returns previous ones + /// + /// Used by the consumer of the metrics. pub fn reset(&mut self) -> Self { std::mem::take(self) } } diff --git a/common/src/util/mod.rs b/common/src/util/mod.rs index 2fcdebf0ff..8bded12a1e 100644 --- a/common/src/util/mod.rs +++ b/common/src/util/mod.rs @@ -22,6 +22,10 @@ lazy_static::lazy_static! { static ref GIT_DATETIME: &'static str = GIT_VERSION.split('/').nth(1).expect("failed to retrieve git_datetime!"); pub static ref GIT_DATE: String = GIT_DATETIME.split('-').take(3).collect::>().join("-"); pub static ref GIT_TIME: &'static str = GIT_DATETIME.split('-').nth(3).expect("failed to retrieve git_time!"); + pub static ref GIT_DATE_TIMESTAMP: i64 = + NaiveDateTime::parse_from_str(dbg!(&*GIT_DATETIME), "%Y-%m-%d-%H:%M") + .expect("Invalid date") + .and_utc().timestamp(); pub static ref DISPLAY_VERSION: String = if GIT_TAG.is_empty() { format!("{}-{}", VELOREN_VERSION_STAGE, *GIT_DATE) } else { @@ -34,6 +38,7 @@ lazy_static::lazy_static! { }; } +use chrono::NaiveDateTime; pub use color::*; pub use dir::*; pub use grid_hasher::GridHasher; diff --git a/server/src/lib.rs b/server/src/lib.rs index 7c35384b7c..86bdafb1d0 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -93,6 +93,7 @@ use common::{ shared_server_config::ServerConstants, slowjob::SlowJobPool, terrain::TerrainChunk, + util::GIT_DATE_TIMESTAMP, vol::RectRasterableVol, }; use common_base::prof_span; @@ -116,7 +117,7 @@ use specs::{ use std::{ i32, ops::{Deref, DerefMut}, - sync::Arc, + sync::{Arc, Mutex}, time::{Duration, Instant}, }; #[cfg(not(feature = "worldgen"))] @@ -603,20 +604,24 @@ impl Server { if let Some(addr) = settings.query_address { use veloren_query_server::proto::ServerInfo; + const QUERY_SERVER_RATELIMIT: u16 = 120; + let (query_server_info_tx, query_server_info_rx) = tokio::sync::watch::channel(ServerInfo { git_hash: *sys::server_info::GIT_HASH, + git_version: *GIT_DATE_TIMESTAMP, players_count: 0, player_cap: settings.max_players, battlemode: settings.gameplay.battle_mode.into(), }); - let mut query_server = QueryServer::new(addr, query_server_info_rx); - let query_server_metrics = Arc::new(tokio::sync::RwLock::new( - veloren_query_server::server::Metrics::default(), - )); + let mut query_server = + QueryServer::new(addr, query_server_info_rx, QUERY_SERVER_RATELIMIT); + let query_server_metrics = + Arc::new(Mutex::new(veloren_query_server::server::Metrics::default())); let query_server_metrics2 = Arc::clone(&query_server_metrics); runtime.spawn(async move { - _ = query_server.run(query_server_metrics2).await; + let err = query_server.run(query_server_metrics2).await.err(); + error!(?err, "Query server stopped unexpectedly"); }); state.ecs_mut().insert(query_server_info_tx); state.ecs_mut().insert(query_server_metrics); diff --git a/server/src/metrics.rs b/server/src/metrics.rs index 2e4cc478db..41b5da54b0 100644 --- a/server/src/metrics.rs +++ b/server/src/metrics.rs @@ -71,15 +71,16 @@ pub struct ServerEventMetrics { } pub struct QueryServerMetrics { - pub received_packets: IntGauge, - pub dropped_packets: IntGauge, - pub invalid_packets: IntGauge, - pub proccessing_errors: IntGauge, - pub info_requests: IntGauge, - pub sent_responses: IntGauge, - pub failed_responses: IntGauge, - pub timed_out_responses: IntGauge, - pub ratelimited: IntGauge, + pub received_packets: IntCounter, + pub dropped_packets: IntCounter, + pub invalid_packets: IntCounter, + pub proccessing_errors: IntCounter, + pub info_requests: IntCounter, + pub ping_requests: IntCounter, + pub sent_responses: IntCounter, + pub failed_responses: IntCounter, + pub timed_out_responses: IntCounter, + pub ratelimited: IntCounter, } impl PhysicsMetrics { @@ -440,39 +441,43 @@ impl ServerEventMetrics { impl QueryServerMetrics { pub fn new(registry: &Registry) -> Result { - let received_packets = IntGauge::with_opts(Opts::new( + let received_packets = IntCounter::with_opts(Opts::new( "query_server::received_packets", "Total amount of received packets by the query server", ))?; - let dropped_packets = IntGauge::with_opts(Opts::new( + let dropped_packets = IntCounter::with_opts(Opts::new( "query_server::dropped_packets", "Amount of dropped packets received by the query server (too short or invalid header)", ))?; - let invalid_packets = IntGauge::with_opts(Opts::new( + let invalid_packets = IntCounter::with_opts(Opts::new( "query_server::invalid_packets", "Amount of unparseable packets received by the query server", ))?; - let proccessing_errors = IntGauge::with_opts(Opts::new( + let proccessing_errors = IntCounter::with_opts(Opts::new( "query_server::proccessing_errors", "Amount of errors that occured while processing a query server request", ))?; - let info_requests = IntGauge::with_opts(Opts::new( + let info_requests = IntCounter::with_opts(Opts::new( "query_server::info_requests", "Amount of server info requests received by the query server", ))?; - let sent_responses = IntGauge::with_opts(Opts::new( + let ping_requests = IntCounter::with_opts(Opts::new( + "query_server::ping_requests", + "Amount of server ping requests received by the query server", + ))?; + let sent_responses = IntCounter::with_opts(Opts::new( "query_server::sent_responses", "Amount of responses sent by the query server", ))?; - let failed_responses = IntGauge::with_opts(Opts::new( + let failed_responses = IntCounter::with_opts(Opts::new( "query_server::failed_responses", "Amount of responses which failed to be sent by the query server", ))?; - let timed_out_responses = IntGauge::with_opts(Opts::new( + let timed_out_responses = IntCounter::with_opts(Opts::new( "query_server::timed_out_responses", "Amount of responses which timed out", ))?; - let ratelimited = IntGauge::with_opts(Opts::new( + let ratelimited = IntCounter::with_opts(Opts::new( "query_server::ratelimited", "Ratelimited requests to the query server", ))?; @@ -482,6 +487,7 @@ impl QueryServerMetrics { registry.register(Box::new(invalid_packets.clone()))?; registry.register(Box::new(proccessing_errors.clone()))?; registry.register(Box::new(info_requests.clone()))?; + registry.register(Box::new(ping_requests.clone()))?; registry.register(Box::new(sent_responses.clone()))?; registry.register(Box::new(failed_responses.clone()))?; registry.register(Box::new(timed_out_responses.clone()))?; @@ -493,6 +499,7 @@ impl QueryServerMetrics { invalid_packets, proccessing_errors, info_requests, + ping_requests, sent_responses, failed_responses, timed_out_responses, @@ -508,20 +515,22 @@ impl QueryServerMetrics { invalid_packets, proccessing_errors, info_requests, + ping_requests, sent_responses, failed_responses, timed_out_responses, ratelimited, }: veloren_query_server::server::Metrics, ) { - self.received_packets.set(received_packets as i64); - self.dropped_packets.set(dropped_packets as i64); - self.invalid_packets.set(invalid_packets as i64); - self.proccessing_errors.set(proccessing_errors as i64); - self.info_requests.set(info_requests as i64); - self.sent_responses.set(sent_responses as i64); - self.failed_responses.set(failed_responses as i64); - self.timed_out_responses.set(timed_out_responses as i64); - self.ratelimited.set(ratelimited as i64); + self.received_packets.inc_by(received_packets as u64); + self.dropped_packets.inc_by(dropped_packets as u64); + self.invalid_packets.inc_by(invalid_packets as u64); + self.proccessing_errors.inc_by(proccessing_errors as u64); + self.info_requests.inc_by(info_requests as u64); + self.ping_requests.inc_by(ping_requests as u64); + self.sent_responses.inc_by(sent_responses as u64); + self.failed_responses.inc_by(failed_responses as u64); + self.timed_out_responses.inc_by(timed_out_responses as u64); + self.ratelimited.inc_by(ratelimited as u64); } } diff --git a/server/src/sys/metrics.rs b/server/src/sys/metrics.rs index 7fb44ad041..a848dbe76d 100644 --- a/server/src/sys/metrics.rs +++ b/server/src/sys/metrics.rs @@ -6,8 +6,10 @@ use crate::{ use common::{resources::TimeOfDay, slowjob::SlowJobPool, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, SysMetrics, System}; use specs::{Entities, Join, Read, ReadExpect}; -use std::{sync::Arc, time::Instant}; -use tokio::sync::RwLock; +use std::{ + sync::{Arc, Mutex}, + time::Instant, +}; use veloren_query_server::server::Metrics as RawQueryServerMetrics; /// This system exports metrics @@ -29,8 +31,8 @@ impl<'a> System<'a> for Sys { ReadExpect<'a, TickMetrics>, ReadExpect<'a, PhysicsMetrics>, ReadExpect<'a, JobMetrics>, - Option>>>, - Option>, + Option>>>, + ReadExpect<'a, QueryServerMetrics>, ); const NAME: &'static str = "metrics"; @@ -172,10 +174,12 @@ impl<'a> System<'a> for Sys { .with_label_values(&["metrics"]) .observe(len as f64 / NANOSEC_PER_SEC); - if let (Some(query_server_metrics), Some(export_query_server)) = - (raw_query_server, export_query_server) + if let Some(Ok(metrics)) = raw_query_server + .as_ref() + // Hold the lock for the shortest time possible + .map(|m| m.lock().map(|mut metrics| metrics.reset())) { - export_query_server.apply(query_server_metrics.blocking_write().reset()); + export_query_server.apply(metrics); } } } diff --git a/server/src/sys/server_info.rs b/server/src/sys/server_info.rs index 43eedc9848..92ccb00937 100644 --- a/server/src/sys/server_info.rs +++ b/server/src/sys/server_info.rs @@ -1,19 +1,18 @@ +use common::{comp::Player, util::GIT_DATE_TIMESTAMP}; use common_ecs::{Origin, Phase, System}; use lazy_static::lazy_static; use specs::{Read, ReadStorage}; +use tracing::error; use veloren_query_server::proto::ServerInfo; -use crate::{client::Client, Settings, Tick}; +use crate::{Settings, Tick}; // Update the server stats every 60 ticks const INFO_SEND_INTERVAL: u64 = 60; lazy_static! { - pub static ref GIT_HASH: [char; 8] = common::util::GIT_HASH[..8] - .chars() - .collect::>() - .try_into() - .unwrap_or_default(); + pub static ref GIT_HASH: u32 = + u32::from_str_radix(&common::util::GIT_HASH[..8], 16).expect("Invalid git hash"); } #[derive(Default)] @@ -24,24 +23,27 @@ impl<'a> System<'a> for Sys { Read<'a, Tick>, Read<'a, Settings>, Option>>, - ReadStorage<'a, Client>, + ReadStorage<'a, Player>, ); const NAME: &'static str = "server_info"; const ORIGIN: Origin = Origin::Server; const PHASE: Phase = Phase::Create; - fn run(_job: &mut common_ecs::Job, (tick, settings, sender, clients): Self::SystemData) { + fn run(_job: &mut common_ecs::Job, (tick, settings, sender, players): Self::SystemData) { if let Some(sender) = sender.as_ref() && tick.0 % INFO_SEND_INTERVAL == 0 { - let count = clients.count().try_into().unwrap_or(u16::MAX); - _ = sender.send(ServerInfo { + let count = players.count().try_into().unwrap_or(u16::MAX); + if let Err(error) = sender.send(ServerInfo { git_hash: *GIT_HASH, + git_version: *GIT_DATE_TIMESTAMP, players_count: count, player_cap: settings.max_players, battlemode: settings.gameplay.battle_mode.into(), - }); + }) { + error!(?error, "Failed to send server info to the query server"); + } } } } From e07b9b4c3aafb2aa8df8f315f4962b2773f05545 Mon Sep 17 00:00:00 2001 From: crabman Date: Tue, 30 Apr 2024 12:21:40 +0000 Subject: [PATCH 08/12] Bump h2 and rustls patch version Fixes RUSTSEC-2024-0336 and RUSTSEC-2024-0332 --- Cargo.lock | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 46768b4797..3d7d31585e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2689,9 +2689,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fbd2820c5e49886948654ab546d0688ff24530286bdcf8fca3cefb16d4618eb" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" dependencies = [ "bytes", "fnv", @@ -5404,9 +5404,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.10" +version = "0.21.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log", "ring 0.17.8", @@ -6651,7 +6651,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 0.1.10", "rand 0.8.5", "static_assertions", ] From 78fd92fae559e3e92007d2d4db9ed8eceda63fd6 Mon Sep 17 00:00:00 2001 From: crabman Date: Tue, 30 Apr 2024 15:48:19 +0000 Subject: [PATCH 09/12] better protocol versioning --- common/query_server/Cargo.toml | 4 +-- common/query_server/src/client.rs | 45 ++++++++++++++++++++----------- common/query_server/src/proto.rs | 23 +++++++++++----- common/query_server/src/server.rs | 31 ++++++++++++--------- 4 files changed, 67 insertions(+), 36 deletions(-) diff --git a/common/query_server/Cargo.toml b/common/query_server/Cargo.toml index 69aa701518..c7132d69af 100644 --- a/common/query_server/Cargo.toml +++ b/common/query_server/Cargo.toml @@ -9,8 +9,8 @@ edition = "2021" [features] server = ["dep:rand"] client = ["tokio/time"] -example = ["tokio/macros", "tokio/rt-multi-thread", "dep:tracing-subscriber"] -default = ["server", "client"] +example = ["tokio/macros", "tokio/rt-multi-thread", "dep:tracing-subscriber", "client", "server"] +default = [] [dependencies] tokio = { workspace = true, features = ["net", "sync"] } diff --git a/common/query_server/src/client.rs b/common/query_server/src/client.rs index c5ebf3306e..4c83a1cbbe 100644 --- a/common/query_server/src/client.rs +++ b/common/query_server/src/client.rs @@ -21,21 +21,26 @@ pub enum QueryClientError { Io(tokio::io::Error), Protocol(protocol::Error), InvalidResponse, - InvalidVersion, Timeout, ChallengeFailed, RequestTooLarge, } +struct ClientInitData { + p: u64, + #[allow(dead_code)] + server_max_version: u16, +} + /// The `p` field has to be requested from the server each time this client is /// constructed, if possible reuse this! pub struct QueryClient { pub addr: SocketAddr, - p: u64, + init: Option, } impl QueryClient { - pub fn new(addr: SocketAddr) -> Self { Self { addr, p: 0 } } + pub fn new(addr: SocketAddr) -> Self { Self { addr, init: None } } pub async fn server_info(&mut self) -> Result<(ServerInfo, Duration), QueryClientError> { self.send_query(QueryServerRequest::ServerInfo) @@ -73,10 +78,20 @@ impl QueryClient { // 2 extra bytes for version information, currently unused buf.extend(VERSION.to_le_bytes()); buf.extend({ - let request_data = ::raw_bytes( - &RawQueryServerRequest { p: self.p, request }, - &Default::default(), - )?; + let request_data = if let Some(init) = &self.init { + // TODO: Use the maximum version supported by both the client and server once + // new protocol versions are added + ::raw_bytes( + &RawQueryServerRequest { p: init.p, request }, + &Default::default(), + )? + } else { + // TODO: Use the legacy version here once new protocol versions are added + ::raw_bytes( + &RawQueryServerRequest { p: 0, request }, + &Default::default(), + )? + }; if request_data.len() > MAX_REQUEST_SIZE { warn!( ?request, @@ -103,14 +118,9 @@ impl QueryClient { Err(QueryClientError::InvalidResponse)? } - // FIXME: Allow lower versions once proper versioning is added. - if u16::from_le_bytes(buf[..2].try_into().unwrap()) != VERSION { - Err(QueryClientError::InvalidVersion)? - } - let packet = ::read( // TODO: Remove this padding once version information is added to packets - &mut io::Cursor::new(&buf[2..buf_len]), + &mut io::Cursor::new(&buf[..buf_len]), &Default::default(), )?; @@ -118,9 +128,12 @@ impl QueryClient { RawQueryServerResponse::Response(response) => { return Ok((response, query_sent.elapsed())); }, - RawQueryServerResponse::P(p) => { - trace!(?p, "Resetting p"); - self.p = p + RawQueryServerResponse::Init(init) => { + trace!(?init, "Resetting p"); + self.init = Some(ClientInitData { + p: init.p, + server_max_version: init.max_supported_version, + }); }, } } diff --git a/common/query_server/src/proto.rs b/common/query_server/src/proto.rs index ea6d00b98e..159d842437 100644 --- a/common/query_server/src/proto.rs +++ b/common/query_server/src/proto.rs @@ -11,7 +11,7 @@ pub(crate) const MAX_RESPONSE_SIZE: usize = 256; #[derive(Protocol, Debug, Clone, Copy)] pub(crate) struct RawQueryServerRequest { - /// See comment on [`RawQueryServerResponse::P`] + /// See comment on [`Init::p`] pub p: u64, pub request: QueryServerRequest, } @@ -27,17 +27,28 @@ pub enum QueryServerRequest { } #[derive(Protocol, Debug, Clone, Copy)] -#[protocol(discriminant = "integer")] -#[protocol(discriminator(u8))] -pub(crate) enum RawQueryServerResponse { - Response(QueryServerResponse), +pub(crate) struct Init { /// This is used as a challenge to prevent IP address spoofing by verifying /// that the client can receive from the source address. /// /// Any request to the server must include this value to be processed, /// otherwise this response will be returned (giving clients a value to pass /// for later requests). - P(u64), + pub p: u64, + /// The maximum supported protocol version by the server. The first request + /// to a server must always be done in the V0 protocol to query this value. + /// Following requests (when the version is known), can be done in the + /// maximum version or below, responses will be sent in the same version as + /// the requests. + pub max_supported_version: u16, +} + +#[derive(Protocol, Debug, Clone, Copy)] +#[protocol(discriminant = "integer")] +#[protocol(discriminator(u8))] +pub(crate) enum RawQueryServerResponse { + Response(QueryServerResponse), + Init(Init), } #[derive(Protocol, Debug, Clone, Copy)] diff --git a/common/query_server/src/server.rs b/common/query_server/src/server.rs index 6f6f13b9bb..a89cd7ad48 100644 --- a/common/query_server/src/server.rs +++ b/common/query_server/src/server.rs @@ -14,8 +14,9 @@ use tracing::{debug, error, trace}; use crate::{ proto::{ - QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse, - ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER, VERSION, + Init, QueryServerRequest, QueryServerResponse, RawQueryServerRequest, + RawQueryServerResponse, ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER, + VERSION, }, ratelimit::{RateLimiter, ReducedIpAddr}, }; @@ -183,7 +184,16 @@ impl QueryServer { }; if real_p != client_p { - Self::send_response(RawQueryServerResponse::P(real_p), remote, socket, metrics).await; + Self::send_response( + RawQueryServerResponse::Init(Init { + p: real_p, + max_supported_version: VERSION, + }), + remote, + socket, + metrics, + ) + .await; return; } @@ -225,30 +235,27 @@ impl QueryServer { socket: &UdpSocket, metrics: &mut Metrics, ) { - // TODO: Remove this extra padding once we add version information to requests - let mut buf = Vec::from(VERSION.to_ne_bytes()); - + // TODO: Once more versions are added, send the packet in the same version as + // the request here. match ::raw_bytes(&response, &Default::default()) { Ok(data) => { - buf.extend(data); - - if buf.len() > MAX_RESPONSE_SIZE { + if data.len() > MAX_RESPONSE_SIZE { error!( ?MAX_RESPONSE_SIZE, "Attempted to send a response larger than the maximum allowed size (size: \ {}, response: {response:?})", - buf.len() + data.len() ); #[cfg(debug_assertions)] panic!( "Attempted to send a response larger than the maximum allowed size (size: \ {}, max: {}, response: {response:?})", - buf.len(), + data.len(), MAX_RESPONSE_SIZE ); } - match socket.send_to(&buf, addr).await { + match socket.send_to(&data, addr).await { Ok(_) => { metrics.sent_responses += 1; }, From 64300915129288d6284536ea8876d81e0ee86896 Mon Sep 17 00:00:00 2001 From: crabman Date: Thu, 2 May 2024 07:45:53 +0000 Subject: [PATCH 10/12] Remove ping and add request size test --- common/query_server/examples/demo.rs | 4 +- common/query_server/src/client.rs | 63 ++++++-------------------- common/query_server/src/lib.rs | 2 +- common/query_server/src/proto.rs | 66 +++++++++++++++++++++++++--- common/query_server/src/server.rs | 27 +++++++----- common/src/util/mod.rs | 2 +- server/Cargo.toml | 2 +- server/src/lib.rs | 2 +- server/src/metrics.rs | 14 +++--- server/src/sys/server_info.rs | 2 +- 10 files changed, 102 insertions(+), 82 deletions(-) diff --git a/common/query_server/examples/demo.rs b/common/query_server/examples/demo.rs index a649ed683d..7f36e8be30 100644 --- a/common/query_server/examples/demo.rs +++ b/common/query_server/examples/demo.rs @@ -14,7 +14,7 @@ use veloren_query_server::{ const DEFAULT_SERVER_INFO: ServerInfo = ServerInfo { git_hash: 0, - git_version: 0, + git_timestamp: 0, players_count: 100, player_cap: 300, battlemode: ServerBattleMode::GlobalPvE, @@ -41,7 +41,7 @@ async fn main() { let start = Instant::now(); for _i in 0..10000 { - if let Err(error) = client.ping().await { + if let Err(error) = client.server_info().await { error!(?error, "Server info request error"); } } diff --git a/common/query_server/src/client.rs b/common/query_server/src/client.rs index 4c83a1cbbe..13587c93db 100644 --- a/common/query_server/src/client.rs +++ b/common/query_server/src/client.rs @@ -6,11 +6,11 @@ use std::{ use protocol::Parcel; use tokio::{net::UdpSocket, time::timeout}; -use tracing::{trace, warn}; +use tracing::trace; use crate::proto::{ QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse, - ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER, VERSION, + ServerInfo, MAX_RESPONSE_SIZE, }; // This must be at least 2 for the client to get a value for the `p` field. @@ -23,7 +23,6 @@ pub enum QueryClientError { InvalidResponse, Timeout, ChallengeFailed, - RequestTooLarge, } struct ClientInitData { @@ -46,6 +45,7 @@ impl QueryClient { self.send_query(QueryServerRequest::ServerInfo) .await .and_then(|(response, duration)| { + #[allow(irrefutable_let_patterns)] if let QueryServerResponse::ServerInfo(info) = response { Ok((info, duration)) } else { @@ -54,18 +54,6 @@ impl QueryClient { }) } - pub async fn ping(&mut self) -> Result { - self.send_query(QueryServerRequest::Ping) - .await - .and_then(|(response, duration)| { - if let QueryServerResponse::Pong = response { - Ok(duration) - } else { - Err(QueryClientError::InvalidResponse) - } - }) - } - async fn send_query( &mut self, request: QueryServerRequest, @@ -73,42 +61,20 @@ impl QueryClient { let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?; for _ in 0..MAX_REQUEST_RETRIES { - let mut buf = Vec::with_capacity(VELOREN_HEADER.len() + 2 + MAX_REQUEST_SIZE); - - // 2 extra bytes for version information, currently unused - buf.extend(VERSION.to_le_bytes()); - buf.extend({ - let request_data = if let Some(init) = &self.init { - // TODO: Use the maximum version supported by both the client and server once - // new protocol versions are added - ::raw_bytes( - &RawQueryServerRequest { p: init.p, request }, - &Default::default(), - )? - } else { - // TODO: Use the legacy version here once new protocol versions are added - ::raw_bytes( - &RawQueryServerRequest { p: 0, request }, - &Default::default(), - )? - }; - if request_data.len() > MAX_REQUEST_SIZE { - warn!( - ?request, - ?MAX_REQUEST_SIZE, - "Attempted to send request larger than the max size ({})", - request_data.len() - ); - Err(QueryClientError::RequestTooLarge)? + let request = if let Some(init) = &self.init { + // TODO: Use the maximum version supported by both the client and server once + // new protocol versions are added + RawQueryServerRequest { p: init.p, request } + } else { + // TODO: Use the legacy version here once new protocol versions are added + RawQueryServerRequest { + p: 0, + request: QueryServerRequest::Init, } - request_data - }); - buf.resize(2 + MAX_RESPONSE_SIZE, 0); - buf.extend(VELOREN_HEADER); - + }; + let buf = request.serialize()?; let query_sent = Instant::now(); socket.send_to(&buf, self.addr).await?; - let mut buf = vec![0; MAX_RESPONSE_SIZE]; let (buf_len, _) = timeout(Duration::from_secs(2), socket.recv_from(&mut buf)) .await @@ -119,7 +85,6 @@ impl QueryClient { } let packet = ::read( - // TODO: Remove this padding once version information is added to packets &mut io::Cursor::new(&buf[..buf_len]), &Default::default(), )?; diff --git a/common/query_server/src/lib.rs b/common/query_server/src/lib.rs index 6df1b6ff37..8f57862ab8 100644 --- a/common/query_server/src/lib.rs +++ b/common/query_server/src/lib.rs @@ -1,4 +1,4 @@ #[cfg(feature = "client")] pub mod client; pub mod proto; -mod ratelimit; +#[cfg(feature = "server")] mod ratelimit; #[cfg(feature = "server")] pub mod server; diff --git a/common/query_server/src/proto.rs b/common/query_server/src/proto.rs index 159d842437..b65d6f67fa 100644 --- a/common/query_server/src/proto.rs +++ b/common/query_server/src/proto.rs @@ -2,11 +2,10 @@ use protocol::Protocol; pub(crate) const VERSION: u16 = 0; pub(crate) const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n']; -// The actual maximum size of packets will be `MAX_REQUEST_SIZE + -// VELOREN_HEADER.len() + 2` (2 added for currently unused version). +pub(crate) const MAX_REQUEST_CONTENT_SIZE: usize = 300; // NOTE: The actual maximum size must never exceed 1200 or we risk getting near // MTU limits for some networks. -pub(crate) const MAX_REQUEST_SIZE: usize = 300; +pub(crate) const MAX_REQUEST_SIZE: usize = MAX_REQUEST_CONTENT_SIZE + VELOREN_HEADER.len() + 2; pub(crate) const MAX_RESPONSE_SIZE: usize = 256; #[derive(Protocol, Debug, Clone, Copy)] @@ -21,9 +20,17 @@ pub(crate) struct RawQueryServerRequest { #[protocol(discriminator(u8))] #[allow(clippy::large_enum_variant)] pub enum QueryServerRequest { + /// This requests exists mostly for backwards-compatibilty reasons. As the + /// first message sent to the server should always be in the V0 version + /// of the protocol, if future versions of the protocol have more + /// requests than server info it may be confusing to request `P` and the max + /// version with a `QueryServerRequest::ServerInfo` request (the request + /// will still be dropped as the supplied `P` value is invalid). + Init, ServerInfo, - Ping, - // New requests should be added at the end to prevent breakage + // New requests should be added at the end to prevent breakage. + // NOTE: Any new (sub-)variants must be added to the `check_request_sizes` test at the end of + // this file } #[derive(Protocol, Debug, Clone, Copy)] @@ -56,14 +63,13 @@ pub(crate) enum RawQueryServerResponse { #[protocol(discriminator(u8))] pub enum QueryServerResponse { ServerInfo(ServerInfo), - Pong, // New responses should be added at the end to prevent breakage } #[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)] pub struct ServerInfo { pub git_hash: u32, - pub git_version: i64, + pub git_timestamp: i64, pub players_count: u16, pub player_cap: u16, pub battlemode: ServerBattleMode, @@ -78,3 +84,49 @@ pub enum ServerBattleMode { GlobalPvE, PerPlayer, } + +impl RawQueryServerRequest { + #[cfg(any(feature = "client", test))] + pub fn serialize(&self) -> Result, protocol::Error> { + use protocol::Parcel; + + let mut buf = Vec::with_capacity(MAX_REQUEST_SIZE); + + // 2 extra bytes for version information, currently unused + buf.extend(VERSION.to_le_bytes()); + buf.extend({ + let request_data = + ::raw_bytes(self, &Default::default())?; + if request_data.len() > MAX_REQUEST_CONTENT_SIZE { + panic!( + "Attempted to send request larger than the max size (size: {}, max size: \ + {MAX_REQUEST_CONTENT_SIZE}, request: {self:?})", + request_data.len() + ); + } + request_data + }); + const _: () = assert!(MAX_RESPONSE_SIZE + VELOREN_HEADER.len() <= MAX_REQUEST_SIZE); + buf.resize(MAX_RESPONSE_SIZE.max(buf.len()), 0); + buf.extend(VELOREN_HEADER); + Ok(buf) + } +} + +#[cfg(test)] +mod tests { + use super::{QueryServerRequest, RawQueryServerRequest}; + + #[test] + fn check_request_sizes() { + const ALL_REQUESTS: &[QueryServerRequest] = + &[QueryServerRequest::ServerInfo, QueryServerRequest::Init]; + for request in ALL_REQUESTS { + let request = RawQueryServerRequest { + p: 0, + request: *request, + }; + request.serialize().unwrap(); // This will panic if the size is above MAX_REQUEST_SIZE + } + } +} diff --git a/common/query_server/src/server.rs b/common/query_server/src/server.rs index a89cd7ad48..194a722a5a 100644 --- a/common/query_server/src/server.rs +++ b/common/query_server/src/server.rs @@ -37,7 +37,7 @@ pub struct Metrics { pub invalid_packets: u32, pub proccessing_errors: u32, pub info_requests: u32, - pub ping_requests: u32, + pub init_requests: u32, pub sent_responses: u32, pub failed_responses: u32, pub timed_out_responses: u32, @@ -129,13 +129,13 @@ impl QueryServer { if len < MAX_RESPONSE_SIZE.max(VELOREN_HEADER.len() + 2) { trace!(?len, "Datagram too short"); false - } else if len > (MAX_REQUEST_SIZE + VELOREN_HEADER.len() + 2) { + } else if len > MAX_REQUEST_SIZE { trace!(?len, "Datagram too large"); false } else if data[(len - VELOREN_HEADER.len())..] != VELOREN_HEADER { trace!(?len, "Datagram header invalid"); false - // FIXME: Allow lower versions once proper versioning is added. + // TODO: Allow lower versions once proper versioning is added. } else if u16::from_ne_bytes(data[..2].try_into().unwrap()) != VERSION { trace!( "Datagram has invalid version {:?}, current {VERSION:?}", @@ -205,21 +205,24 @@ impl QueryServer { } match request { - QueryServerRequest::ServerInfo => { - metrics.info_requests += 1; - let server_info = *self.server_info.borrow(); + QueryServerRequest::Init => { + metrics.init_requests += 1; Self::send_response( - RawQueryServerResponse::Response(QueryServerResponse::ServerInfo(server_info)), + RawQueryServerResponse::Init(Init { + p: real_p, + max_supported_version: VERSION, + }), remote, socket, metrics, ) .await; }, - QueryServerRequest::Ping => { - metrics.ping_requests += 1; + QueryServerRequest::ServerInfo => { + metrics.info_requests += 1; + let server_info = *self.server_info.borrow(); Self::send_response( - RawQueryServerResponse::Response(QueryServerResponse::Pong), + RawQueryServerResponse::Response(QueryServerResponse::ServerInfo(server_info)), remote, socket, metrics, @@ -283,7 +286,7 @@ impl std::ops::AddAssign for Metrics { invalid_packets, proccessing_errors, info_requests, - ping_requests, + init_requests, sent_responses, failed_responses, timed_out_responses, @@ -295,7 +298,7 @@ impl std::ops::AddAssign for Metrics { self.invalid_packets += invalid_packets; self.proccessing_errors += proccessing_errors; self.info_requests += info_requests; - self.ping_requests += ping_requests; + self.init_requests += init_requests; self.sent_responses += sent_responses; self.failed_responses += failed_responses; self.timed_out_responses += timed_out_responses; diff --git a/common/src/util/mod.rs b/common/src/util/mod.rs index 8bded12a1e..3a282c5952 100644 --- a/common/src/util/mod.rs +++ b/common/src/util/mod.rs @@ -23,7 +23,7 @@ lazy_static::lazy_static! { pub static ref GIT_DATE: String = GIT_DATETIME.split('-').take(3).collect::>().join("-"); pub static ref GIT_TIME: &'static str = GIT_DATETIME.split('-').nth(3).expect("failed to retrieve git_time!"); pub static ref GIT_DATE_TIMESTAMP: i64 = - NaiveDateTime::parse_from_str(dbg!(&*GIT_DATETIME), "%Y-%m-%d-%H:%M") + NaiveDateTime::parse_from_str(*GIT_DATETIME, "%Y-%m-%d-%H:%M") .expect("Invalid date") .and_utc().timestamp(); pub static ref DISPLAY_VERSION: String = if GIT_TAG.is_empty() { diff --git a/server/Cargo.toml b/server/Cargo.toml index 23ba5bd05c..df5978e7e5 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -18,7 +18,7 @@ default = ["worldgen", "plugins", "persistent_world", "simd"] [dependencies] common = { package = "veloren-common", path = "../common" } common-base = { package = "veloren-common-base", path = "../common/base" } -veloren-query-server = { package = "veloren-query-server", path = "../common/query_server", features = ["server"] } +veloren-query-server = { package = "veloren-query-server", path = "../common/query_server", default-features = false, features = ["server"] } common-ecs = { package = "veloren-common-ecs", path = "../common/ecs" } common-state = { package = "veloren-common-state", path = "../common/state" } common-systems = { package = "veloren-common-systems", path = "../common/systems" } diff --git a/server/src/lib.rs b/server/src/lib.rs index 86bdafb1d0..824256ff0a 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -609,7 +609,7 @@ impl Server { let (query_server_info_tx, query_server_info_rx) = tokio::sync::watch::channel(ServerInfo { git_hash: *sys::server_info::GIT_HASH, - git_version: *GIT_DATE_TIMESTAMP, + git_timestamp: *GIT_DATE_TIMESTAMP, players_count: 0, player_cap: settings.max_players, battlemode: settings.gameplay.battle_mode.into(), diff --git a/server/src/metrics.rs b/server/src/metrics.rs index 41b5da54b0..64a5e25774 100644 --- a/server/src/metrics.rs +++ b/server/src/metrics.rs @@ -76,7 +76,7 @@ pub struct QueryServerMetrics { pub invalid_packets: IntCounter, pub proccessing_errors: IntCounter, pub info_requests: IntCounter, - pub ping_requests: IntCounter, + pub init_requests: IntCounter, pub sent_responses: IntCounter, pub failed_responses: IntCounter, pub timed_out_responses: IntCounter, @@ -461,9 +461,9 @@ impl QueryServerMetrics { "query_server::info_requests", "Amount of server info requests received by the query server", ))?; - let ping_requests = IntCounter::with_opts(Opts::new( + let init_requests = IntCounter::with_opts(Opts::new( "query_server::ping_requests", - "Amount of server ping requests received by the query server", + "Amount of init requests received by the query server", ))?; let sent_responses = IntCounter::with_opts(Opts::new( "query_server::sent_responses", @@ -487,7 +487,7 @@ impl QueryServerMetrics { registry.register(Box::new(invalid_packets.clone()))?; registry.register(Box::new(proccessing_errors.clone()))?; registry.register(Box::new(info_requests.clone()))?; - registry.register(Box::new(ping_requests.clone()))?; + registry.register(Box::new(init_requests.clone()))?; registry.register(Box::new(sent_responses.clone()))?; registry.register(Box::new(failed_responses.clone()))?; registry.register(Box::new(timed_out_responses.clone()))?; @@ -499,7 +499,7 @@ impl QueryServerMetrics { invalid_packets, proccessing_errors, info_requests, - ping_requests, + init_requests, sent_responses, failed_responses, timed_out_responses, @@ -515,7 +515,7 @@ impl QueryServerMetrics { invalid_packets, proccessing_errors, info_requests, - ping_requests, + init_requests, sent_responses, failed_responses, timed_out_responses, @@ -527,7 +527,7 @@ impl QueryServerMetrics { self.invalid_packets.inc_by(invalid_packets as u64); self.proccessing_errors.inc_by(proccessing_errors as u64); self.info_requests.inc_by(info_requests as u64); - self.ping_requests.inc_by(ping_requests as u64); + self.init_requests.inc_by(init_requests as u64); self.sent_responses.inc_by(sent_responses as u64); self.failed_responses.inc_by(failed_responses as u64); self.timed_out_responses.inc_by(timed_out_responses as u64); diff --git a/server/src/sys/server_info.rs b/server/src/sys/server_info.rs index 92ccb00937..e89d67153e 100644 --- a/server/src/sys/server_info.rs +++ b/server/src/sys/server_info.rs @@ -37,7 +37,7 @@ impl<'a> System<'a> for Sys { let count = players.count().try_into().unwrap_or(u16::MAX); if let Err(error) = sender.send(ServerInfo { git_hash: *GIT_HASH, - git_version: *GIT_DATE_TIMESTAMP, + git_timestamp: *GIT_DATE_TIMESTAMP, players_count: count, player_cap: settings.max_players, battlemode: settings.gameplay.battle_mode.into(), From fea309c4ad80f1ade71f674988fc667c67a8c608 Mon Sep 17 00:00:00 2001 From: crabman Date: Tue, 30 Apr 2024 16:40:25 +0000 Subject: [PATCH 11/12] Add port 14006 to docker-compose (server info queries) --- server-cli/docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/server-cli/docker-compose.yml b/server-cli/docker-compose.yml index f748eac667..a93c92b76b 100644 --- a/server-cli/docker-compose.yml +++ b/server-cli/docker-compose.yml @@ -9,6 +9,7 @@ services: ports: - "14004:14004" - "14005:14005" + - "14006:14006/udp" restart: on-failure:0 volumes: - "./userdata:/opt/userdata" From 24e0600e58a6d8b1e1730c2ca367867e5d3c3675 Mon Sep 17 00:00:00 2001 From: crabman Date: Thu, 2 May 2024 17:44:56 +0000 Subject: [PATCH 12/12] changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d702ed1d8..825c3cc9aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added Abyssal rings - Sprite models can now change depending on sprite attributes. - Dwarven-Mine update and activation. +- Protocol to query game server information (player count, version, etc.) and make ping tests. ### Changed