diff --git a/common/frontend/src/lib.rs b/common/frontend/src/lib.rs index 97120a75d4..f2471fcb51 100644 --- a/common/frontend/src/lib.rs +++ b/common/frontend/src/lib.rs @@ -75,6 +75,7 @@ where "refinery_core::traits::divergent=off", "veloren_server::persistence::character=info", "veloren_server::settings=info", + "veloren_query_server=info", ]; for s in default_directives { diff --git a/common/query_server/Cargo.toml b/common/query_server/Cargo.toml index b8de9b01e9..69aa701518 100644 --- a/common/query_server/Cargo.toml +++ b/common/query_server/Cargo.toml @@ -7,13 +7,13 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -server = ["dep:tokio", "dep:rand"] -client = ["dep:tokio", "tokio/time"] +server = ["dep:rand"] +client = ["tokio/time"] example = ["tokio/macros", "tokio/rt-multi-thread", "dep:tracing-subscriber"] default = ["server", "client"] [dependencies] -tokio = { workspace = true, optional = true, features = ["net", "sync"] } +tokio = { workspace = true, features = ["net", "sync"] } protocol = { version = "3.4.0", default-features = false, features = ["derive"] } tracing-subscriber = { version = "0.3.7", optional = true } tracing = { workspace = true } diff --git a/common/query_server/examples/demo.rs b/common/query_server/examples/demo.rs index 0263cba6dc..a649ed683d 100644 --- a/common/query_server/examples/demo.rs +++ b/common/query_server/examples/demo.rs @@ -1,10 +1,10 @@ use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, - sync::Arc, + sync::{Arc, Mutex}, time::Instant, }; -use tokio::sync::{watch, RwLock}; +use tokio::sync::watch; use tracing::error; use veloren_query_server::{ client::QueryClient, @@ -13,7 +13,8 @@ use veloren_query_server::{ }; const DEFAULT_SERVER_INFO: ServerInfo = ServerInfo { - git_hash: ['\0'; 8], + git_hash: 0, + git_version: 0, players_count: 100, player_cap: 300, battlemode: ServerBattleMode::GlobalPvE, @@ -24,8 +25,8 @@ async fn main() { tracing_subscriber::fmt::init(); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 14006); let (_sender, receiver) = watch::channel(DEFAULT_SERVER_INFO); - let mut server = QueryServer::new(addr, receiver); - let metrics = Arc::new(RwLock::new(Metrics::default())); + let mut server = QueryServer::new(addr, receiver, 10002); + let metrics = Arc::new(Mutex::new(Metrics::default())); let metrics2 = Arc::clone(&metrics); tokio::task::spawn(async move { server.run(metrics2).await.unwrap() }); @@ -39,12 +40,12 @@ async fn main() { let start = Instant::now(); - for _i in 0..32 { - if let Err(error) = client.server_info().await { + for _i in 0..10000 { + if let Err(error) = client.ping().await { error!(?error, "Server info request error"); } } - println!("Metrics = {:#?}", metrics.read().await); + println!("Metrics = {:#?}", metrics.lock().unwrap()); dbg!(start.elapsed()); } diff --git a/common/query_server/src/client.rs b/common/query_server/src/client.rs index 1391f32eb6..c5ebf3306e 100644 --- a/common/query_server/src/client.rs +++ b/common/query_server/src/client.rs @@ -6,13 +6,14 @@ use std::{ use protocol::Parcel; use tokio::{net::UdpSocket, time::timeout}; -use tracing::trace; +use tracing::{trace, warn}; use crate::proto::{ QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse, - ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER, + ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER, VERSION, }; +// This must be at least 2 for the client to get a value for the `p` field. const MAX_REQUEST_RETRIES: usize = 5; #[derive(Debug)] @@ -20,10 +21,14 @@ pub enum QueryClientError { Io(tokio::io::Error), Protocol(protocol::Error), InvalidResponse, + InvalidVersion, Timeout, ChallengeFailed, + RequestTooLarge, } +/// The `p` field has to be requested from the server each time this client is +/// constructed, if possible reuse this! pub struct QueryClient { pub addr: SocketAddr, p: u64, @@ -33,10 +38,9 @@ impl QueryClient { pub fn new(addr: SocketAddr) -> Self { Self { addr, p: 0 } } pub async fn server_info(&mut self) -> Result<(ServerInfo, Duration), QueryClientError> { - self.send_query(QueryServerRequest::ServerInfo(Default::default())) + self.send_query(QueryServerRequest::ServerInfo) .await .and_then(|(response, duration)| { - #[allow(irrefutable_let_patterns)] // TODO: remove when more variants are added if let QueryServerResponse::ServerInfo(info) = response { Ok((info, duration)) } else { @@ -45,36 +49,68 @@ impl QueryClient { }) } + pub async fn ping(&mut self) -> Result { + self.send_query(QueryServerRequest::Ping) + .await + .and_then(|(response, duration)| { + if let QueryServerResponse::Pong = response { + Ok(duration) + } else { + Err(QueryClientError::InvalidResponse) + } + }) + } + async fn send_query( &mut self, request: QueryServerRequest, ) -> Result<(QueryServerResponse, Duration), QueryClientError> { let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?; - let mut tries = 0; - while tries < MAX_REQUEST_RETRIES { - tries += 1; + for _ in 0..MAX_REQUEST_RETRIES { let mut buf = Vec::with_capacity(VELOREN_HEADER.len() + 2 + MAX_REQUEST_SIZE); - buf.extend(VELOREN_HEADER); // 2 extra bytes for version information, currently unused - buf.extend([0; 2]); - buf.extend(::raw_bytes( - &RawQueryServerRequest { p: self.p, request }, - &Default::default(), - )?); + buf.extend(VERSION.to_le_bytes()); + buf.extend({ + let request_data = ::raw_bytes( + &RawQueryServerRequest { p: self.p, request }, + &Default::default(), + )?; + if request_data.len() > MAX_REQUEST_SIZE { + warn!( + ?request, + ?MAX_REQUEST_SIZE, + "Attempted to send request larger than the max size ({})", + request_data.len() + ); + Err(QueryClientError::RequestTooLarge)? + } + request_data + }); + buf.resize(2 + MAX_RESPONSE_SIZE, 0); + buf.extend(VELOREN_HEADER); let query_sent = Instant::now(); socket.send_to(&buf, self.addr).await?; let mut buf = vec![0; MAX_RESPONSE_SIZE]; - let _ = timeout(Duration::from_secs(2), socket.recv_from(&mut buf)) + let (buf_len, _) = timeout(Duration::from_secs(2), socket.recv_from(&mut buf)) .await - .map_err(|_| QueryClientError::Timeout)? - .map_err(|_| QueryClientError::Timeout)?; + .map_err(|_| QueryClientError::Timeout)??; + + if buf_len <= 2 { + Err(QueryClientError::InvalidResponse)? + } + + // FIXME: Allow lower versions once proper versioning is added. + if u16::from_le_bytes(buf[..2].try_into().unwrap()) != VERSION { + Err(QueryClientError::InvalidVersion)? + } let packet = ::read( - &mut io::Cursor::new(buf), + // TODO: Remove this padding once version information is added to packets + &mut io::Cursor::new(&buf[2..buf_len]), &Default::default(), )?; diff --git a/common/query_server/src/proto.rs b/common/query_server/src/proto.rs index 83c8bff730..ea6d00b98e 100644 --- a/common/query_server/src/proto.rs +++ b/common/query_server/src/proto.rs @@ -1,13 +1,17 @@ use protocol::Protocol; -pub const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n']; -pub const MAX_REQUEST_SIZE: usize = 300; -pub const MAX_RESPONSE_SIZE: usize = 256; +pub(crate) const VERSION: u16 = 0; +pub(crate) const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n']; +// The actual maximum size of packets will be `MAX_REQUEST_SIZE + +// VELOREN_HEADER.len() + 2` (2 added for currently unused version). +// NOTE: The actual maximum size must never exceed 1200 or we risk getting near +// MTU limits for some networks. +pub(crate) const MAX_REQUEST_SIZE: usize = 300; +pub(crate) const MAX_RESPONSE_SIZE: usize = 256; #[derive(Protocol, Debug, Clone, Copy)] -#[protocol(discriminant = "integer")] -#[protocol(discriminator(u8))] -pub struct RawQueryServerRequest { +pub(crate) struct RawQueryServerRequest { + /// See comment on [`RawQueryServerResponse::P`] pub p: u64, pub request: QueryServerRequest, } @@ -17,15 +21,22 @@ pub struct RawQueryServerRequest { #[protocol(discriminator(u8))] #[allow(clippy::large_enum_variant)] pub enum QueryServerRequest { - ServerInfo(ServerInfoRequest), + ServerInfo, + Ping, // New requests should be added at the end to prevent breakage } #[derive(Protocol, Debug, Clone, Copy)] #[protocol(discriminant = "integer")] #[protocol(discriminator(u8))] -pub enum RawQueryServerResponse { +pub(crate) enum RawQueryServerResponse { Response(QueryServerResponse), + /// This is used as a challenge to prevent IP address spoofing by verifying + /// that the client can receive from the source address. + /// + /// Any request to the server must include this value to be processed, + /// otherwise this response will be returned (giving clients a value to pass + /// for later requests). P(u64), } @@ -34,18 +45,14 @@ pub enum RawQueryServerResponse { #[protocol(discriminator(u8))] pub enum QueryServerResponse { ServerInfo(ServerInfo), + Pong, // New responses should be added at the end to prevent breakage } -#[derive(Protocol, Debug, Clone, Copy)] -pub struct ServerInfoRequest { - // Padding to prevent amplification attacks - pub _padding: [u8; 256], -} - #[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)] pub struct ServerInfo { - pub git_hash: [char; 8], + pub git_hash: u32, + pub git_version: i64, pub players_count: u16, pub player_cap: u16, pub battlemode: ServerBattleMode, @@ -60,11 +67,3 @@ pub enum ServerBattleMode { GlobalPvE, PerPlayer, } - -impl Default for ServerInfoRequest { - fn default() -> Self { ServerInfoRequest { _padding: [0; 256] } } -} - -impl ServerInfo { - pub fn git_hash(&self) -> String { String::from_iter(&self.git_hash) } -} diff --git a/common/query_server/src/ratelimit.rs b/common/query_server/src/ratelimit.rs index 70f72136b7..01b2a76f62 100644 --- a/common/query_server/src/ratelimit.rs +++ b/common/query_server/src/ratelimit.rs @@ -34,24 +34,20 @@ impl RateLimiter { pub fn maintain(&mut self, now: Instant) { if now.duration_since(self.last_shift) > SHIFT_EVERY { - for (_, state) in self.states.iter_mut() { - state.shift(); - } - // Remove empty states - self.states.retain(|_, state| !state.is_empty()); + self.states.retain(|_, state| { + state.shift(); + !state.is_empty() + }); + self.last_shift = now; } } pub fn can_request(&mut self, ip: ReducedIpAddr) -> bool { if let Some(state) = self.states.get_mut(&ip) { - if state.total() >= self.limit { - state.0[0] = state.0[0].saturating_add(1); - false - } else { - state.0[1] += 1; - true - } + state.0[0] = state.0[0].saturating_add(1); + + state.total() < self.limit } else { self.states.insert(ip, IpState::default()); true diff --git a/common/query_server/src/server.rs b/common/query_server/src/server.rs index 52088836d9..6f6f13b9bb 100644 --- a/common/query_server/src/server.rs +++ b/common/query_server/src/server.rs @@ -1,35 +1,29 @@ #[allow(deprecated)] use std::hash::SipHasher; use std::{ - future::Future, hash::{Hash, Hasher}, - io, - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, - sync::Arc, + io::{self, ErrorKind}, + net::SocketAddr, + sync::{Arc, Mutex}, time::{Duration, Instant}, }; use protocol::Parcel; use rand::{thread_rng, Rng}; -use tokio::{ - net::UdpSocket, - sync::{watch, RwLock}, - time::timeout, -}; -use tracing::{debug, trace}; +use tokio::{net::UdpSocket, sync::watch}; +use tracing::{debug, error, trace}; use crate::{ proto::{ QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse, - ServerInfo, MAX_REQUEST_SIZE, VELOREN_HEADER, + ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER, VERSION, }, ratelimit::{RateLimiter, ReducedIpAddr}, }; -const RESPONSE_SEND_TIMEOUT: Duration = Duration::from_secs(2); -const SECRET_REGEN_INTERNVAL: Duration = Duration::from_secs(60); +const SECRET_REGEN_INTERNVAL: Duration = Duration::from_secs(300); pub struct QueryServer { - pub addr: SocketAddr, + addr: SocketAddr, server_info: watch::Receiver, settings: protocol::Settings, ratelimit: RateLimiter, @@ -42,6 +36,7 @@ pub struct Metrics { pub invalid_packets: u32, pub proccessing_errors: u32, pub info_requests: u32, + pub ping_requests: u32, pub sent_responses: u32, pub failed_responses: u32, pub timed_out_responses: u32, @@ -49,17 +44,23 @@ pub struct Metrics { } impl QueryServer { - pub fn new(addr: SocketAddr, server_info: watch::Receiver) -> Self { + pub fn new(addr: SocketAddr, server_info: watch::Receiver, ratelimit: u16) -> Self { Self { addr, server_info, - ratelimit: RateLimiter::new(30), + ratelimit: RateLimiter::new(ratelimit), settings: Default::default(), } } - pub async fn run(&mut self, metrics: Arc>) -> Result<(), tokio::io::Error> { - let socket = UdpSocket::bind(self.addr).await?; + /// This produces TRACE level logs for any packet received on the assigned + /// port. To prevent potentially unfettered log spam, disable the TRACE + /// level for this crate (when outside of debugging contexts). + /// + /// NOTE: TRACE and DEBUG levels are disabled by default for this crate when + /// using `veloren-common-frontend`. + pub async fn run(&mut self, metrics: Arc>) -> Result<(), tokio::io::Error> { + let mut socket = UdpSocket::bind(self.addr).await?; let gen_secret = || { let mut rng = thread_rng(); @@ -70,12 +71,20 @@ impl QueryServer { let mut buf = Box::new([0; MAX_REQUEST_SIZE]); loop { - *buf = [0; MAX_REQUEST_SIZE]; - - let Ok((len, remote_addr)) = socket.recv_from(&mut *buf).await.inspect_err(|err| { - debug!("Error while receiving from query server socket: {err:?}") - }) else { - continue; + let (len, remote_addr) = match socket.recv_from(&mut *buf).await { + Ok(v) => v, + Err(e) if e.kind() == ErrorKind::NotConnected => { + error!( + ?e, + "Query server connection was closed, re-binding to socket..." + ); + socket = UdpSocket::bind(self.addr).await?; + continue; + }, + err => { + debug!(?err, "Error while receiving from query server socket"); + continue; + }, }; let mut new_metrics = Metrics { @@ -86,26 +95,19 @@ impl QueryServer { let raw_msg_buf = &buf[..len]; let msg_buf = if Self::validate_datagram(raw_msg_buf) { // Require 2 extra bytes for version (currently unused) - &raw_msg_buf[(VELOREN_HEADER.len() + 2)..] + &raw_msg_buf[2..(raw_msg_buf.len() - VELOREN_HEADER.len())] } else { new_metrics.dropped_packets += 1; continue; }; - if let Err(error) = self - .process_datagram( - msg_buf, - remote_addr, - secrets, - (&mut new_metrics, Arc::clone(&metrics)), - ) - .await - { - debug!(?error, "Error while processing datagram"); - } + self.process_datagram(msg_buf, remote_addr, secrets, &mut new_metrics, &socket) + .await; // Update metrics at the end of eath packet - *metrics.write().await += new_metrics; + if let Ok(mut metrics) = metrics.lock() { + *metrics += new_metrics; + } { let now = Instant::now(); @@ -123,12 +125,22 @@ impl QueryServer { fn validate_datagram(data: &[u8]) -> bool { let len = data.len(); // Require 2 extra bytes for version (currently unused) - if len < VELOREN_HEADER.len() + 3 { + if len < MAX_RESPONSE_SIZE.max(VELOREN_HEADER.len() + 2) { trace!(?len, "Datagram too short"); false - } else if data[0..VELOREN_HEADER.len()] != VELOREN_HEADER { + } else if len > (MAX_REQUEST_SIZE + VELOREN_HEADER.len() + 2) { + trace!(?len, "Datagram too large"); + false + } else if data[(len - VELOREN_HEADER.len())..] != VELOREN_HEADER { trace!(?len, "Datagram header invalid"); false + // FIXME: Allow lower versions once proper versioning is added. + } else if u16::from_ne_bytes(data[..2].try_into().unwrap()) != VERSION { + trace!( + "Datagram has invalid version {:?}, current {VERSION:?}", + &data[..2] + ); + false } else { true } @@ -139,106 +151,117 @@ impl QueryServer { datagram: &[u8], remote: SocketAddr, secrets: (u64, u64), - (new_metrics, metrics): (&mut Metrics, Arc>), - ) -> Result<(), tokio::io::Error> { + metrics: &mut Metrics, + socket: &UdpSocket, + ) { let Ok(RawQueryServerRequest { p: client_p, request, }) = ::read(&mut io::Cursor::new(datagram), &self.settings) else { - new_metrics.invalid_packets += 1; - return Ok(()); + metrics.invalid_packets += 1; + return; }; trace!(?request, "Received packet"); #[allow(deprecated)] let real_p = { + // Use SipHash-2-4 to compute the `p` value from a server specific + // secret and the client's address. + // + // This is used to verify that packets are from an entity that can + // receive packets at the given address. + // + // Only use the first 64 bits from Ipv6 addresses since the latter + // 64 bits can change very frequently (as much as for every + // request). let mut hasher = SipHasher::new_with_keys(secrets.0, secrets.1); ReducedIpAddr::from(remote.ip()).hash(&mut hasher); hasher.finish() }; - async fn timed<'a, F: Future + 'a, O>( - fut: F, - metrics: &'a Arc>, - ) -> Option { - if let Ok(res) = timeout(RESPONSE_SEND_TIMEOUT, fut).await { - Some(res) - } else { - metrics.write().await.timed_out_responses += 1; - None - } - } - if real_p != client_p { - tokio::task::spawn(async move { - timed( - Self::send_response(RawQueryServerResponse::P(real_p), remote, &metrics), - &metrics, - ) - .await; - }); + Self::send_response(RawQueryServerResponse::P(real_p), remote, socket, metrics).await; - return Ok(()); + return; } if !self.ratelimit.can_request(remote.ip().into()) { trace!("Ratelimited request"); - new_metrics.ratelimited += 1; - return Ok(()); + metrics.ratelimited += 1; + return; } match request { - QueryServerRequest::ServerInfo(_) => { - new_metrics.info_requests += 1; + QueryServerRequest::ServerInfo => { + metrics.info_requests += 1; let server_info = *self.server_info.borrow(); - tokio::task::spawn(async move { - timed( - Self::send_response( - RawQueryServerResponse::Response(QueryServerResponse::ServerInfo( - server_info, - )), - remote, - &metrics, - ), - &metrics, - ) - .await; - }); + Self::send_response( + RawQueryServerResponse::Response(QueryServerResponse::ServerInfo(server_info)), + remote, + socket, + metrics, + ) + .await; + }, + QueryServerRequest::Ping => { + metrics.ping_requests += 1; + Self::send_response( + RawQueryServerResponse::Response(QueryServerResponse::Pong), + remote, + socket, + metrics, + ) + .await; }, } - - Ok(()) } async fn send_response( response: RawQueryServerResponse, addr: SocketAddr, - metrics: &Arc>, + socket: &UdpSocket, + metrics: &mut Metrics, ) { - let Ok(socket) = - UdpSocket::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))).await - else { - debug!("Failed to create response socket"); - return; - }; + // TODO: Remove this extra padding once we add version information to requests + let mut buf = Vec::from(VERSION.to_ne_bytes()); - let buf = if let Ok(data) = - ::raw_bytes(&response, &Default::default()) - { - data - } else { - Vec::new() - }; - match socket.send_to(&buf, addr).await { - Ok(_) => { - metrics.write().await.sent_responses += 1; + match ::raw_bytes(&response, &Default::default()) { + Ok(data) => { + buf.extend(data); + + if buf.len() > MAX_RESPONSE_SIZE { + error!( + ?MAX_RESPONSE_SIZE, + "Attempted to send a response larger than the maximum allowed size (size: \ + {}, response: {response:?})", + buf.len() + ); + #[cfg(debug_assertions)] + panic!( + "Attempted to send a response larger than the maximum allowed size (size: \ + {}, max: {}, response: {response:?})", + buf.len(), + MAX_RESPONSE_SIZE + ); + } + + match socket.send_to(&buf, addr).await { + Ok(_) => { + metrics.sent_responses += 1; + }, + Err(err) => { + metrics.failed_responses += 1; + debug!(?err, "Failed to send query server response"); + }, + } }, - Err(err) => { - metrics.write().await.failed_responses += 1; - debug!(?err, "Failed to send query server response"); + Err(error) => { + trace!(?error, "Failed to serialize response"); + #[cfg(debug_assertions)] + panic!("Serializing response failed: {error:?} ({response:?})"); }, } } @@ -253,6 +276,7 @@ impl std::ops::AddAssign for Metrics { invalid_packets, proccessing_errors, info_requests, + ping_requests, sent_responses, failed_responses, timed_out_responses, @@ -264,6 +288,7 @@ impl std::ops::AddAssign for Metrics { self.invalid_packets += invalid_packets; self.proccessing_errors += proccessing_errors; self.info_requests += info_requests; + self.ping_requests += ping_requests; self.sent_responses += sent_responses; self.failed_responses += failed_responses; self.timed_out_responses += timed_out_responses; @@ -273,5 +298,7 @@ impl std::ops::AddAssign for Metrics { impl Metrics { /// Resets all metrics to 0 and returns previous ones + /// + /// Used by the consumer of the metrics. pub fn reset(&mut self) -> Self { std::mem::take(self) } } diff --git a/common/src/util/mod.rs b/common/src/util/mod.rs index 2fcdebf0ff..8bded12a1e 100644 --- a/common/src/util/mod.rs +++ b/common/src/util/mod.rs @@ -22,6 +22,10 @@ lazy_static::lazy_static! { static ref GIT_DATETIME: &'static str = GIT_VERSION.split('/').nth(1).expect("failed to retrieve git_datetime!"); pub static ref GIT_DATE: String = GIT_DATETIME.split('-').take(3).collect::>().join("-"); pub static ref GIT_TIME: &'static str = GIT_DATETIME.split('-').nth(3).expect("failed to retrieve git_time!"); + pub static ref GIT_DATE_TIMESTAMP: i64 = + NaiveDateTime::parse_from_str(dbg!(&*GIT_DATETIME), "%Y-%m-%d-%H:%M") + .expect("Invalid date") + .and_utc().timestamp(); pub static ref DISPLAY_VERSION: String = if GIT_TAG.is_empty() { format!("{}-{}", VELOREN_VERSION_STAGE, *GIT_DATE) } else { @@ -34,6 +38,7 @@ lazy_static::lazy_static! { }; } +use chrono::NaiveDateTime; pub use color::*; pub use dir::*; pub use grid_hasher::GridHasher; diff --git a/server/src/lib.rs b/server/src/lib.rs index 7c35384b7c..86bdafb1d0 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -93,6 +93,7 @@ use common::{ shared_server_config::ServerConstants, slowjob::SlowJobPool, terrain::TerrainChunk, + util::GIT_DATE_TIMESTAMP, vol::RectRasterableVol, }; use common_base::prof_span; @@ -116,7 +117,7 @@ use specs::{ use std::{ i32, ops::{Deref, DerefMut}, - sync::Arc, + sync::{Arc, Mutex}, time::{Duration, Instant}, }; #[cfg(not(feature = "worldgen"))] @@ -603,20 +604,24 @@ impl Server { if let Some(addr) = settings.query_address { use veloren_query_server::proto::ServerInfo; + const QUERY_SERVER_RATELIMIT: u16 = 120; + let (query_server_info_tx, query_server_info_rx) = tokio::sync::watch::channel(ServerInfo { git_hash: *sys::server_info::GIT_HASH, + git_version: *GIT_DATE_TIMESTAMP, players_count: 0, player_cap: settings.max_players, battlemode: settings.gameplay.battle_mode.into(), }); - let mut query_server = QueryServer::new(addr, query_server_info_rx); - let query_server_metrics = Arc::new(tokio::sync::RwLock::new( - veloren_query_server::server::Metrics::default(), - )); + let mut query_server = + QueryServer::new(addr, query_server_info_rx, QUERY_SERVER_RATELIMIT); + let query_server_metrics = + Arc::new(Mutex::new(veloren_query_server::server::Metrics::default())); let query_server_metrics2 = Arc::clone(&query_server_metrics); runtime.spawn(async move { - _ = query_server.run(query_server_metrics2).await; + let err = query_server.run(query_server_metrics2).await.err(); + error!(?err, "Query server stopped unexpectedly"); }); state.ecs_mut().insert(query_server_info_tx); state.ecs_mut().insert(query_server_metrics); diff --git a/server/src/metrics.rs b/server/src/metrics.rs index 2e4cc478db..41b5da54b0 100644 --- a/server/src/metrics.rs +++ b/server/src/metrics.rs @@ -71,15 +71,16 @@ pub struct ServerEventMetrics { } pub struct QueryServerMetrics { - pub received_packets: IntGauge, - pub dropped_packets: IntGauge, - pub invalid_packets: IntGauge, - pub proccessing_errors: IntGauge, - pub info_requests: IntGauge, - pub sent_responses: IntGauge, - pub failed_responses: IntGauge, - pub timed_out_responses: IntGauge, - pub ratelimited: IntGauge, + pub received_packets: IntCounter, + pub dropped_packets: IntCounter, + pub invalid_packets: IntCounter, + pub proccessing_errors: IntCounter, + pub info_requests: IntCounter, + pub ping_requests: IntCounter, + pub sent_responses: IntCounter, + pub failed_responses: IntCounter, + pub timed_out_responses: IntCounter, + pub ratelimited: IntCounter, } impl PhysicsMetrics { @@ -440,39 +441,43 @@ impl ServerEventMetrics { impl QueryServerMetrics { pub fn new(registry: &Registry) -> Result { - let received_packets = IntGauge::with_opts(Opts::new( + let received_packets = IntCounter::with_opts(Opts::new( "query_server::received_packets", "Total amount of received packets by the query server", ))?; - let dropped_packets = IntGauge::with_opts(Opts::new( + let dropped_packets = IntCounter::with_opts(Opts::new( "query_server::dropped_packets", "Amount of dropped packets received by the query server (too short or invalid header)", ))?; - let invalid_packets = IntGauge::with_opts(Opts::new( + let invalid_packets = IntCounter::with_opts(Opts::new( "query_server::invalid_packets", "Amount of unparseable packets received by the query server", ))?; - let proccessing_errors = IntGauge::with_opts(Opts::new( + let proccessing_errors = IntCounter::with_opts(Opts::new( "query_server::proccessing_errors", "Amount of errors that occured while processing a query server request", ))?; - let info_requests = IntGauge::with_opts(Opts::new( + let info_requests = IntCounter::with_opts(Opts::new( "query_server::info_requests", "Amount of server info requests received by the query server", ))?; - let sent_responses = IntGauge::with_opts(Opts::new( + let ping_requests = IntCounter::with_opts(Opts::new( + "query_server::ping_requests", + "Amount of server ping requests received by the query server", + ))?; + let sent_responses = IntCounter::with_opts(Opts::new( "query_server::sent_responses", "Amount of responses sent by the query server", ))?; - let failed_responses = IntGauge::with_opts(Opts::new( + let failed_responses = IntCounter::with_opts(Opts::new( "query_server::failed_responses", "Amount of responses which failed to be sent by the query server", ))?; - let timed_out_responses = IntGauge::with_opts(Opts::new( + let timed_out_responses = IntCounter::with_opts(Opts::new( "query_server::timed_out_responses", "Amount of responses which timed out", ))?; - let ratelimited = IntGauge::with_opts(Opts::new( + let ratelimited = IntCounter::with_opts(Opts::new( "query_server::ratelimited", "Ratelimited requests to the query server", ))?; @@ -482,6 +487,7 @@ impl QueryServerMetrics { registry.register(Box::new(invalid_packets.clone()))?; registry.register(Box::new(proccessing_errors.clone()))?; registry.register(Box::new(info_requests.clone()))?; + registry.register(Box::new(ping_requests.clone()))?; registry.register(Box::new(sent_responses.clone()))?; registry.register(Box::new(failed_responses.clone()))?; registry.register(Box::new(timed_out_responses.clone()))?; @@ -493,6 +499,7 @@ impl QueryServerMetrics { invalid_packets, proccessing_errors, info_requests, + ping_requests, sent_responses, failed_responses, timed_out_responses, @@ -508,20 +515,22 @@ impl QueryServerMetrics { invalid_packets, proccessing_errors, info_requests, + ping_requests, sent_responses, failed_responses, timed_out_responses, ratelimited, }: veloren_query_server::server::Metrics, ) { - self.received_packets.set(received_packets as i64); - self.dropped_packets.set(dropped_packets as i64); - self.invalid_packets.set(invalid_packets as i64); - self.proccessing_errors.set(proccessing_errors as i64); - self.info_requests.set(info_requests as i64); - self.sent_responses.set(sent_responses as i64); - self.failed_responses.set(failed_responses as i64); - self.timed_out_responses.set(timed_out_responses as i64); - self.ratelimited.set(ratelimited as i64); + self.received_packets.inc_by(received_packets as u64); + self.dropped_packets.inc_by(dropped_packets as u64); + self.invalid_packets.inc_by(invalid_packets as u64); + self.proccessing_errors.inc_by(proccessing_errors as u64); + self.info_requests.inc_by(info_requests as u64); + self.ping_requests.inc_by(ping_requests as u64); + self.sent_responses.inc_by(sent_responses as u64); + self.failed_responses.inc_by(failed_responses as u64); + self.timed_out_responses.inc_by(timed_out_responses as u64); + self.ratelimited.inc_by(ratelimited as u64); } } diff --git a/server/src/sys/metrics.rs b/server/src/sys/metrics.rs index 7fb44ad041..a848dbe76d 100644 --- a/server/src/sys/metrics.rs +++ b/server/src/sys/metrics.rs @@ -6,8 +6,10 @@ use crate::{ use common::{resources::TimeOfDay, slowjob::SlowJobPool, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, SysMetrics, System}; use specs::{Entities, Join, Read, ReadExpect}; -use std::{sync::Arc, time::Instant}; -use tokio::sync::RwLock; +use std::{ + sync::{Arc, Mutex}, + time::Instant, +}; use veloren_query_server::server::Metrics as RawQueryServerMetrics; /// This system exports metrics @@ -29,8 +31,8 @@ impl<'a> System<'a> for Sys { ReadExpect<'a, TickMetrics>, ReadExpect<'a, PhysicsMetrics>, ReadExpect<'a, JobMetrics>, - Option>>>, - Option>, + Option>>>, + ReadExpect<'a, QueryServerMetrics>, ); const NAME: &'static str = "metrics"; @@ -172,10 +174,12 @@ impl<'a> System<'a> for Sys { .with_label_values(&["metrics"]) .observe(len as f64 / NANOSEC_PER_SEC); - if let (Some(query_server_metrics), Some(export_query_server)) = - (raw_query_server, export_query_server) + if let Some(Ok(metrics)) = raw_query_server + .as_ref() + // Hold the lock for the shortest time possible + .map(|m| m.lock().map(|mut metrics| metrics.reset())) { - export_query_server.apply(query_server_metrics.blocking_write().reset()); + export_query_server.apply(metrics); } } } diff --git a/server/src/sys/server_info.rs b/server/src/sys/server_info.rs index 43eedc9848..92ccb00937 100644 --- a/server/src/sys/server_info.rs +++ b/server/src/sys/server_info.rs @@ -1,19 +1,18 @@ +use common::{comp::Player, util::GIT_DATE_TIMESTAMP}; use common_ecs::{Origin, Phase, System}; use lazy_static::lazy_static; use specs::{Read, ReadStorage}; +use tracing::error; use veloren_query_server::proto::ServerInfo; -use crate::{client::Client, Settings, Tick}; +use crate::{Settings, Tick}; // Update the server stats every 60 ticks const INFO_SEND_INTERVAL: u64 = 60; lazy_static! { - pub static ref GIT_HASH: [char; 8] = common::util::GIT_HASH[..8] - .chars() - .collect::>() - .try_into() - .unwrap_or_default(); + pub static ref GIT_HASH: u32 = + u32::from_str_radix(&common::util::GIT_HASH[..8], 16).expect("Invalid git hash"); } #[derive(Default)] @@ -24,24 +23,27 @@ impl<'a> System<'a> for Sys { Read<'a, Tick>, Read<'a, Settings>, Option>>, - ReadStorage<'a, Client>, + ReadStorage<'a, Player>, ); const NAME: &'static str = "server_info"; const ORIGIN: Origin = Origin::Server; const PHASE: Phase = Phase::Create; - fn run(_job: &mut common_ecs::Job, (tick, settings, sender, clients): Self::SystemData) { + fn run(_job: &mut common_ecs::Job, (tick, settings, sender, players): Self::SystemData) { if let Some(sender) = sender.as_ref() && tick.0 % INFO_SEND_INTERVAL == 0 { - let count = clients.count().try_into().unwrap_or(u16::MAX); - _ = sender.send(ServerInfo { + let count = players.count().try_into().unwrap_or(u16::MAX); + if let Err(error) = sender.send(ServerInfo { git_hash: *GIT_HASH, + git_version: *GIT_DATE_TIMESTAMP, players_count: count, player_cap: settings.max_players, battlemode: settings.gameplay.battle_mode.into(), - }); + }) { + error!(?error, "Failed to send server info to the query server"); + } } } }