export metrics

This commit is contained in:
crabman 2024-04-25 12:47:51 +00:00
parent 83d4c775e2
commit 800480b082
No known key found for this signature in database
6 changed files with 159 additions and 20 deletions

View File

@ -17,3 +17,7 @@ tokio = { workspace = true, optional = true, features = ["net", "sync"] }
protocol = { version = "3.4.0", default-features = false, features = ["derive"] }
tracing-subscriber = { version = "0.3.7", optional = true }
tracing = { workspace = true }
[[example]]
name = "demo"
required-features = ["example"]

View File

@ -5,7 +5,7 @@ use std::{
};
use tokio::sync::{watch, RwLock};
use veloren_server_query::{
use veloren_query_server::{
client::QueryClient,
proto::{ServerBattleMode, ServerInfo},
server::{Metrics, QueryServer},

View File

@ -1,4 +1,5 @@
use std::{
future::Future,
io,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
@ -72,7 +73,11 @@ impl QueryServer {
};
if let Err(error) = self
.process_datagram(msg_buf, remote_addr, &mut new_metrics)
.process_datagram(
msg_buf,
remote_addr,
(&mut new_metrics, Arc::clone(&metrics)),
)
.await
{
debug!(?error, "Error while processing datagram");
@ -104,35 +109,50 @@ impl QueryServer {
&mut self,
datagram: &[u8],
remote: SocketAddr,
metrics: &mut Metrics,
(new_metrics, metrics): (&mut Metrics, Arc<RwLock<Metrics>>),
) -> Result<(), tokio::io::Error> {
let Ok(packet): Result<QueryServerRequest, _> =
self.pipeline.receive_from(&mut io::Cursor::new(datagram))
else {
metrics.invalid_packets += 1;
new_metrics.invalid_packets += 1;
return Ok(());
};
trace!(?packet, "Received packet");
async fn timed<'a, F: Future<Output = O> + 'a, O>(
fut: F,
metrics: &'a Arc<RwLock<Metrics>>,
) -> Option<O> {
if let Ok(res) = timeout(RESPONSE_SEND_TIMEOUT, fut).await {
Some(res)
} else {
metrics.write().await.timed_out_responses += 1;
None
}
}
match packet {
QueryServerRequest::Ping(Ping) => {
metrics.ping_requests += 1;
new_metrics.ping_requests += 1;
tokio::task::spawn(async move {
_ = timeout(
RESPONSE_SEND_TIMEOUT,
Self::send_response(QueryServerResponse::Pong(Pong), remote),
timed(
Self::send_response(QueryServerResponse::Pong(Pong), remote, &metrics),
&metrics,
)
.await;
});
},
QueryServerRequest::ServerInfo(_) => {
metrics.info_requests += 1;
new_metrics.info_requests += 1;
let server_info = *self.server_info.borrow();
tokio::task::spawn(async move {
_ = timeout(
RESPONSE_SEND_TIMEOUT,
Self::send_response(QueryServerResponse::ServerInfo(server_info), remote),
timed(
Self::send_response(
QueryServerResponse::ServerInfo(server_info),
remote,
&metrics,
),
&metrics,
)
.await;
});
@ -142,7 +162,11 @@ impl QueryServer {
Ok(())
}
async fn send_response(response: QueryServerResponse, addr: SocketAddr) {
async fn send_response(
response: QueryServerResponse,
addr: SocketAddr,
metrics: &Arc<RwLock<Metrics>>,
) {
let Ok(socket) =
UdpSocket::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))).await
else {
@ -157,10 +181,10 @@ impl QueryServer {
_ = pipeline.send_to(&mut io::Cursor::new(&mut buf), &response);
match socket.send_to(&buf, addr).await {
Ok(_) => {
// TODO: Sent responses metric
metrics.write().await.sent_responses += 1;
},
Err(err) => {
// TODO: Failed response metric
metrics.write().await.failed_responses += 1;
debug!(?err, "Failed to send query server response");
},
}
@ -195,6 +219,6 @@ impl std::ops::AddAssign for Metrics {
}
impl Metrics {
/// Resets all metrics to 0
pub fn reset(&mut self) { *self = Self::default(); }
/// Resets all metrics to 0 and returns previous ones
pub fn reset(&mut self) -> Self { std::mem::take(self) }
}

View File

@ -275,6 +275,7 @@ impl Server {
let tick_metrics = TickMetrics::new(&registry).unwrap();
let physics_metrics = PhysicsMetrics::new(&registry).unwrap();
let server_event_metrics = metrics::ServerEventMetrics::new(&registry).unwrap();
let query_server_metrics = metrics::QueryServerMetrics::new(&registry).unwrap();
let battlemode_buffer = BattleModeBuffer::default();
@ -377,6 +378,7 @@ impl Server {
state.ecs_mut().insert(tick_metrics);
state.ecs_mut().insert(physics_metrics);
state.ecs_mut().insert(server_event_metrics);
state.ecs_mut().insert(query_server_metrics);
if settings.experimental_terrain_persistence {
#[cfg(feature = "persistent_world")]
{

View File

@ -70,6 +70,18 @@ pub struct ServerEventMetrics {
pub event_count: IntCounterVec,
}
pub struct QueryServerMetrics {
pub received_packets: IntGauge,
pub dropped_packets: IntGauge,
pub invalid_packets: IntGauge,
pub proccessing_errors: IntGauge,
pub ping_requests: IntGauge,
pub info_requests: IntGauge,
pub sent_responses: IntGauge,
pub failed_responses: IntGauge,
pub timed_out_responses: IntGauge,
}
impl PhysicsMetrics {
pub fn new(registry: &Registry) -> Result<Self, prometheus::Error> {
let entity_entity_collision_checks_count = IntCounter::with_opts(Opts::new(
@ -415,7 +427,7 @@ impl TickMetrics {
}
impl ServerEventMetrics {
pub fn new(registry: &Registry) -> Result<Self, Box<dyn Error>> {
pub fn new(registry: &Registry) -> Result<Self, prometheus::Error> {
let event_count = IntCounterVec::new(
Opts::new("event_count", "number of ServerEvents handled"),
&["event"],
@ -425,3 +437,91 @@ impl ServerEventMetrics {
Ok(Self { event_count })
}
}
impl QueryServerMetrics {
pub fn new(registry: &Registry) -> Result<Self, prometheus::Error> {
let received_packets = IntGauge::with_opts(Opts::new(
"query_server::received_packets",
"Total amount of received packets by the query server",
))?;
let dropped_packets = IntGauge::with_opts(Opts::new(
"query_server::dropped_packets",
"Amount of dropped packets received by the query server (too short or invalid header)",
))?;
let invalid_packets = IntGauge::with_opts(Opts::new(
"query_server::invalid_packets",
"Amount of unparseable packets received by the query server",
))?;
let proccessing_errors = IntGauge::with_opts(Opts::new(
"query_server::proccessing_errors",
"Amount of errors that occured while processing a query server request",
))?;
let ping_requests = IntGauge::with_opts(Opts::new(
"query_server::ping_requests",
"Amount of ping requests received by the query server",
))?;
let info_requests = IntGauge::with_opts(Opts::new(
"query_server::info_requests",
"Amount of server info requests received by the query server",
))?;
let sent_responses = IntGauge::with_opts(Opts::new(
"query_server::sent_responses",
"Amount of responses sent by the query server",
))?;
let failed_responses = IntGauge::with_opts(Opts::new(
"query_server::failed_responses",
"Amount of responses which failed to be sent by the query server",
))?;
let timed_out_responses = IntGauge::with_opts(Opts::new(
"query_server::timed_out_responses",
"Amount of responses which timed out",
))?;
registry.register(Box::new(received_packets.clone()))?;
registry.register(Box::new(dropped_packets.clone()))?;
registry.register(Box::new(invalid_packets.clone()))?;
registry.register(Box::new(proccessing_errors.clone()))?;
registry.register(Box::new(ping_requests.clone()))?;
registry.register(Box::new(info_requests.clone()))?;
registry.register(Box::new(sent_responses.clone()))?;
registry.register(Box::new(failed_responses.clone()))?;
registry.register(Box::new(timed_out_responses.clone()))?;
Ok(Self {
received_packets,
dropped_packets,
invalid_packets,
proccessing_errors,
ping_requests,
info_requests,
sent_responses,
failed_responses,
timed_out_responses,
})
}
pub fn apply(
&self,
veloren_query_server::server::Metrics {
received_packets,
dropped_packets,
invalid_packets,
proccessing_errors,
ping_requests,
info_requests,
sent_responses,
failed_responses,
timed_out_responses,
}: veloren_query_server::server::Metrics,
) {
self.received_packets.set(received_packets as i64);
self.dropped_packets.set(dropped_packets as i64);
self.invalid_packets.set(invalid_packets as i64);
self.proccessing_errors.set(proccessing_errors as i64);
self.ping_requests.set(ping_requests as i64);
self.info_requests.set(info_requests as i64);
self.sent_responses.set(sent_responses as i64);
self.failed_responses.set(failed_responses as i64);
self.timed_out_responses.set(timed_out_responses as i64);
}
}

View File

@ -1,12 +1,13 @@
use crate::{
chunk_generator::ChunkGenerator,
metrics::{EcsSystemMetrics, JobMetrics, PhysicsMetrics, TickMetrics},
metrics::{EcsSystemMetrics, JobMetrics, PhysicsMetrics, QueryServerMetrics, TickMetrics},
HwStats, Tick, TickStart,
};
use common::{resources::TimeOfDay, slowjob::SlowJobPool, terrain::TerrainGrid};
use common_ecs::{Job, Origin, Phase, SysMetrics, System};
use specs::{Entities, Join, Read, ReadExpect};
use specs::{Entities, Join, Read, ReadExpect, Write};
use std::time::Instant;
use veloren_query_server::server::Metrics as RawQueryServerMetrics;
/// This system exports metrics
#[derive(Default)]
@ -27,6 +28,8 @@ impl<'a> System<'a> for Sys {
ReadExpect<'a, TickMetrics>,
ReadExpect<'a, PhysicsMetrics>,
ReadExpect<'a, JobMetrics>,
Write<'a, Option<RawQueryServerMetrics>>,
ReadExpect<'a, QueryServerMetrics>,
);
const NAME: &'static str = "metrics";
@ -50,6 +53,8 @@ impl<'a> System<'a> for Sys {
export_tick,
export_physics,
export_jobs,
mut raw_query_server,
export_query_server,
): Self::SystemData,
) {
const NANOSEC_PER_SEC: f64 = std::time::Duration::from_secs(1).as_nanos() as f64;
@ -165,5 +170,9 @@ impl<'a> System<'a> for Sys {
.system_length_hist
.with_label_values(&["metrics"])
.observe(len as f64 / NANOSEC_PER_SEC);
if let Some(query_server_metrics) = raw_query_server.as_mut() {
export_query_server.apply(query_server_metrics.reset());
}
}
}