diff --git a/common/query_server/Cargo.toml b/common/query_server/Cargo.toml index 097a4a7b33..6db272cdcf 100644 --- a/common/query_server/Cargo.toml +++ b/common/query_server/Cargo.toml @@ -17,3 +17,7 @@ tokio = { workspace = true, optional = true, features = ["net", "sync"] } protocol = { version = "3.4.0", default-features = false, features = ["derive"] } tracing-subscriber = { version = "0.3.7", optional = true } tracing = { workspace = true } + +[[example]] +name = "demo" +required-features = ["example"] diff --git a/common/query_server/examples/demo.rs b/common/query_server/examples/demo.rs index 6caeb0a6b0..1bb5d09dd7 100644 --- a/common/query_server/examples/demo.rs +++ b/common/query_server/examples/demo.rs @@ -5,7 +5,7 @@ use std::{ }; use tokio::sync::{watch, RwLock}; -use veloren_server_query::{ +use veloren_query_server::{ client::QueryClient, proto::{ServerBattleMode, ServerInfo}, server::{Metrics, QueryServer}, diff --git a/common/query_server/src/server.rs b/common/query_server/src/server.rs index 9901eb9da6..09dae7333a 100644 --- a/common/query_server/src/server.rs +++ b/common/query_server/src/server.rs @@ -1,4 +1,5 @@ use std::{ + future::Future, io, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, sync::Arc, @@ -72,7 +73,11 @@ impl QueryServer { }; if let Err(error) = self - .process_datagram(msg_buf, remote_addr, &mut new_metrics) + .process_datagram( + msg_buf, + remote_addr, + (&mut new_metrics, Arc::clone(&metrics)), + ) .await { debug!(?error, "Error while processing datagram"); @@ -104,35 +109,50 @@ impl QueryServer { &mut self, datagram: &[u8], remote: SocketAddr, - metrics: &mut Metrics, + (new_metrics, metrics): (&mut Metrics, Arc>), ) -> Result<(), tokio::io::Error> { let Ok(packet): Result = self.pipeline.receive_from(&mut io::Cursor::new(datagram)) else { - metrics.invalid_packets += 1; + new_metrics.invalid_packets += 1; return Ok(()); }; trace!(?packet, "Received packet"); + async fn timed<'a, F: Future + 'a, O>( + fut: F, + metrics: &'a Arc>, + ) -> Option { + if let Ok(res) = timeout(RESPONSE_SEND_TIMEOUT, fut).await { + Some(res) + } else { + metrics.write().await.timed_out_responses += 1; + None + } + } match packet { QueryServerRequest::Ping(Ping) => { - metrics.ping_requests += 1; + new_metrics.ping_requests += 1; tokio::task::spawn(async move { - _ = timeout( - RESPONSE_SEND_TIMEOUT, - Self::send_response(QueryServerResponse::Pong(Pong), remote), + timed( + Self::send_response(QueryServerResponse::Pong(Pong), remote, &metrics), + &metrics, ) .await; }); }, QueryServerRequest::ServerInfo(_) => { - metrics.info_requests += 1; + new_metrics.info_requests += 1; let server_info = *self.server_info.borrow(); tokio::task::spawn(async move { - _ = timeout( - RESPONSE_SEND_TIMEOUT, - Self::send_response(QueryServerResponse::ServerInfo(server_info), remote), + timed( + Self::send_response( + QueryServerResponse::ServerInfo(server_info), + remote, + &metrics, + ), + &metrics, ) .await; }); @@ -142,7 +162,11 @@ impl QueryServer { Ok(()) } - async fn send_response(response: QueryServerResponse, addr: SocketAddr) { + async fn send_response( + response: QueryServerResponse, + addr: SocketAddr, + metrics: &Arc>, + ) { let Ok(socket) = UdpSocket::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))).await else { @@ -157,10 +181,10 @@ impl QueryServer { _ = pipeline.send_to(&mut io::Cursor::new(&mut buf), &response); match socket.send_to(&buf, addr).await { Ok(_) => { - // TODO: Sent responses metric + metrics.write().await.sent_responses += 1; }, Err(err) => { - // TODO: Failed response metric + metrics.write().await.failed_responses += 1; debug!(?err, "Failed to send query server response"); }, } @@ -195,6 +219,6 @@ impl std::ops::AddAssign for Metrics { } impl Metrics { - /// Resets all metrics to 0 - pub fn reset(&mut self) { *self = Self::default(); } + /// Resets all metrics to 0 and returns previous ones + pub fn reset(&mut self) -> Self { std::mem::take(self) } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 58a4959883..7c35384b7c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -275,6 +275,7 @@ impl Server { let tick_metrics = TickMetrics::new(®istry).unwrap(); let physics_metrics = PhysicsMetrics::new(®istry).unwrap(); let server_event_metrics = metrics::ServerEventMetrics::new(®istry).unwrap(); + let query_server_metrics = metrics::QueryServerMetrics::new(®istry).unwrap(); let battlemode_buffer = BattleModeBuffer::default(); @@ -377,6 +378,7 @@ impl Server { state.ecs_mut().insert(tick_metrics); state.ecs_mut().insert(physics_metrics); state.ecs_mut().insert(server_event_metrics); + state.ecs_mut().insert(query_server_metrics); if settings.experimental_terrain_persistence { #[cfg(feature = "persistent_world")] { diff --git a/server/src/metrics.rs b/server/src/metrics.rs index 1d146ee431..7be921c1c7 100644 --- a/server/src/metrics.rs +++ b/server/src/metrics.rs @@ -70,6 +70,18 @@ pub struct ServerEventMetrics { pub event_count: IntCounterVec, } +pub struct QueryServerMetrics { + pub received_packets: IntGauge, + pub dropped_packets: IntGauge, + pub invalid_packets: IntGauge, + pub proccessing_errors: IntGauge, + pub ping_requests: IntGauge, + pub info_requests: IntGauge, + pub sent_responses: IntGauge, + pub failed_responses: IntGauge, + pub timed_out_responses: IntGauge, +} + impl PhysicsMetrics { pub fn new(registry: &Registry) -> Result { let entity_entity_collision_checks_count = IntCounter::with_opts(Opts::new( @@ -415,7 +427,7 @@ impl TickMetrics { } impl ServerEventMetrics { - pub fn new(registry: &Registry) -> Result> { + pub fn new(registry: &Registry) -> Result { let event_count = IntCounterVec::new( Opts::new("event_count", "number of ServerEvents handled"), &["event"], @@ -425,3 +437,91 @@ impl ServerEventMetrics { Ok(Self { event_count }) } } + +impl QueryServerMetrics { + pub fn new(registry: &Registry) -> Result { + let received_packets = IntGauge::with_opts(Opts::new( + "query_server::received_packets", + "Total amount of received packets by the query server", + ))?; + let dropped_packets = IntGauge::with_opts(Opts::new( + "query_server::dropped_packets", + "Amount of dropped packets received by the query server (too short or invalid header)", + ))?; + let invalid_packets = IntGauge::with_opts(Opts::new( + "query_server::invalid_packets", + "Amount of unparseable packets received by the query server", + ))?; + let proccessing_errors = IntGauge::with_opts(Opts::new( + "query_server::proccessing_errors", + "Amount of errors that occured while processing a query server request", + ))?; + let ping_requests = IntGauge::with_opts(Opts::new( + "query_server::ping_requests", + "Amount of ping requests received by the query server", + ))?; + let info_requests = IntGauge::with_opts(Opts::new( + "query_server::info_requests", + "Amount of server info requests received by the query server", + ))?; + let sent_responses = IntGauge::with_opts(Opts::new( + "query_server::sent_responses", + "Amount of responses sent by the query server", + ))?; + let failed_responses = IntGauge::with_opts(Opts::new( + "query_server::failed_responses", + "Amount of responses which failed to be sent by the query server", + ))?; + let timed_out_responses = IntGauge::with_opts(Opts::new( + "query_server::timed_out_responses", + "Amount of responses which timed out", + ))?; + + registry.register(Box::new(received_packets.clone()))?; + registry.register(Box::new(dropped_packets.clone()))?; + registry.register(Box::new(invalid_packets.clone()))?; + registry.register(Box::new(proccessing_errors.clone()))?; + registry.register(Box::new(ping_requests.clone()))?; + registry.register(Box::new(info_requests.clone()))?; + registry.register(Box::new(sent_responses.clone()))?; + registry.register(Box::new(failed_responses.clone()))?; + registry.register(Box::new(timed_out_responses.clone()))?; + + Ok(Self { + received_packets, + dropped_packets, + invalid_packets, + proccessing_errors, + ping_requests, + info_requests, + sent_responses, + failed_responses, + timed_out_responses, + }) + } + + pub fn apply( + &self, + veloren_query_server::server::Metrics { + received_packets, + dropped_packets, + invalid_packets, + proccessing_errors, + ping_requests, + info_requests, + sent_responses, + failed_responses, + timed_out_responses, + }: veloren_query_server::server::Metrics, + ) { + self.received_packets.set(received_packets as i64); + self.dropped_packets.set(dropped_packets as i64); + self.invalid_packets.set(invalid_packets as i64); + self.proccessing_errors.set(proccessing_errors as i64); + self.ping_requests.set(ping_requests as i64); + self.info_requests.set(info_requests as i64); + self.sent_responses.set(sent_responses as i64); + self.failed_responses.set(failed_responses as i64); + self.timed_out_responses.set(timed_out_responses as i64); + } +} diff --git a/server/src/sys/metrics.rs b/server/src/sys/metrics.rs index d8633805cc..749f793c6a 100644 --- a/server/src/sys/metrics.rs +++ b/server/src/sys/metrics.rs @@ -1,12 +1,13 @@ use crate::{ chunk_generator::ChunkGenerator, - metrics::{EcsSystemMetrics, JobMetrics, PhysicsMetrics, TickMetrics}, + metrics::{EcsSystemMetrics, JobMetrics, PhysicsMetrics, QueryServerMetrics, TickMetrics}, HwStats, Tick, TickStart, }; use common::{resources::TimeOfDay, slowjob::SlowJobPool, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, SysMetrics, System}; -use specs::{Entities, Join, Read, ReadExpect}; +use specs::{Entities, Join, Read, ReadExpect, Write}; use std::time::Instant; +use veloren_query_server::server::Metrics as RawQueryServerMetrics; /// This system exports metrics #[derive(Default)] @@ -27,6 +28,8 @@ impl<'a> System<'a> for Sys { ReadExpect<'a, TickMetrics>, ReadExpect<'a, PhysicsMetrics>, ReadExpect<'a, JobMetrics>, + Write<'a, Option>, + ReadExpect<'a, QueryServerMetrics>, ); const NAME: &'static str = "metrics"; @@ -50,6 +53,8 @@ impl<'a> System<'a> for Sys { export_tick, export_physics, export_jobs, + mut raw_query_server, + export_query_server, ): Self::SystemData, ) { const NANOSEC_PER_SEC: f64 = std::time::Duration::from_secs(1).as_nanos() as f64; @@ -165,5 +170,9 @@ impl<'a> System<'a> for Sys { .system_length_hist .with_label_values(&["metrics"]) .observe(len as f64 / NANOSEC_PER_SEC); + + if let Some(query_server_metrics) = raw_query_server.as_mut() { + export_query_server.apply(query_server_metrics.reset()); + } } }