add extra padding for future version information and remove pings

This commit is contained in:
crabman 2024-04-26 08:06:40 +00:00
parent 800480b082
commit 419ee88cc2
No known key found for this signature in database
8 changed files with 60 additions and 96 deletions

View File

@ -12,7 +12,7 @@ use veloren_query_server::{
};
const DEFAULT_SERVER_INFO: ServerInfo = ServerInfo {
git_hash: ['\0'; 10],
git_hash: ['\0'; 8],
players_count: 100,
player_cap: 300,
battlemode: ServerBattleMode::GlobalPvE,
@ -30,19 +30,18 @@ async fn main() {
tokio::task::spawn(async move { server.run(metrics2).await.unwrap() });
let client = QueryClient { addr };
let ping = client.ping().await.unwrap();
let info = client.server_info().await.unwrap();
let (info, ping) = client.server_info().await.unwrap();
println!("Ping = {}ms", ping.as_millis());
println!("Server info: {info:?}");
println!("Metrics = {:#?}", metrics.read().await);
assert_eq!(info, DEFAULT_SERVER_INFO);
let start = Instant::now();
for _i in 0..10000 {
client.ping().await.unwrap();
client.server_info().await.unwrap();
}
println!("Metrics = {:#?}", metrics.read().await);
dbg!(start.elapsed());
}

View File

@ -4,9 +4,13 @@ use std::{
time::{Duration, Instant},
};
use protocol::Parcel;
use tokio::{net::UdpSocket, time::timeout};
use crate::proto::{Ping, QueryServerRequest, QueryServerResponse, ServerInfo, VELOREN_HEADER};
use crate::proto::{
QueryServerRequest, QueryServerResponse, ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE,
VELOREN_HEADER,
};
#[derive(Debug)]
pub enum QueryClientError {
@ -21,50 +25,45 @@ pub struct QueryClient {
}
impl QueryClient {
pub async fn server_info(&self) -> Result<ServerInfo, QueryClientError> {
pub async fn server_info(&self) -> Result<(ServerInfo, Duration), QueryClientError> {
self.send_query(QueryServerRequest::ServerInfo(Default::default()))
.await
.and_then(|(response, _)| {
.and_then(|(response, duration)| {
#[allow(irrefutable_let_patterns)] // TODO: remove when more variants are added
if let QueryServerResponse::ServerInfo(info) = response {
Ok(info)
Ok((info, duration))
} else {
Err(QueryClientError::InvalidResponse)
}
})
}
pub async fn ping(&self) -> Result<Duration, QueryClientError> {
self.send_query(QueryServerRequest::Ping(Ping))
.await
.map(|(_, elapsed)| elapsed)
}
async fn send_query(
&self,
request: QueryServerRequest,
) -> Result<(QueryServerResponse, Duration), QueryClientError> {
let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?;
let mut pipeline = crate::create_pipeline();
let mut buf = VELOREN_HEADER.to_vec();
let mut cursor = io::Cursor::new(&mut buf);
cursor.set_position(VELOREN_HEADER.len() as u64);
pipeline.send_to(&mut cursor, &request)?;
let mut buf = Vec::with_capacity(VELOREN_HEADER.len() + 2 + MAX_REQUEST_SIZE);
buf.extend(VELOREN_HEADER);
// 2 extra bytes for version information, currently unused
buf.extend([0; 2]);
buf.extend(<QueryServerRequest as Parcel>::raw_bytes(
&request,
&Default::default(),
)?);
let query_sent = Instant::now();
socket.send_to(buf.as_slice(), self.addr).await?;
socket.send_to(&buf, self.addr).await?;
let mut buf = vec![0; 1500];
let mut buf = vec![0; MAX_RESPONSE_SIZE];
let _ = timeout(Duration::from_secs(2), socket.recv_from(&mut buf))
.await
.map_err(|_| QueryClientError::Timeout)?
.map_err(|_| QueryClientError::Timeout)?;
let mut pipeline = crate::create_pipeline();
let packet: QueryServerResponse = pipeline.receive_from(&mut io::Cursor::new(&mut buf))?;
let packet =
<QueryServerResponse as Parcel>::read(&mut io::Cursor::new(buf), &Default::default())?;
Ok((packet, query_sent.elapsed()))
}

View File

@ -1,15 +1,3 @@
use protocol::{
wire::{self, dgram},
Parcel,
};
#[cfg(feature = "client")] pub mod client;
pub mod proto;
#[cfg(feature = "server")] pub mod server;
fn create_pipeline<T: Parcel>() -> dgram::Pipeline<T, wire::middleware::pipeline::Default> {
dgram::Pipeline::new(
wire::middleware::pipeline::default(),
protocol::Settings::default(),
)
}

View File

@ -1,19 +1,14 @@
use protocol::Protocol;
pub const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n'];
#[derive(Protocol, Debug, Clone, Copy)]
pub struct Ping;
#[derive(Protocol, Debug, Clone, Copy)]
pub struct Pong;
pub const MAX_REQUEST_SIZE: usize = 300;
pub const MAX_RESPONSE_SIZE: usize = 256;
#[derive(Protocol, Debug, Clone, Copy)]
#[protocol(discriminant = "integer")]
#[protocol(discriminator(u8))]
#[allow(clippy::large_enum_variant)]
pub enum QueryServerRequest {
Ping(Ping),
ServerInfo(ServerInfoRequest),
// New requests should be added at the end to prevent breakage
}
@ -22,7 +17,6 @@ pub enum QueryServerRequest {
#[protocol(discriminant = "integer")]
#[protocol(discriminator(u8))]
pub enum QueryServerResponse {
Pong(Pong),
ServerInfo(ServerInfo),
// New responses should be added at the end to prevent breakage
}
@ -35,7 +29,7 @@ pub struct ServerInfoRequest {
#[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)]
pub struct ServerInfo {
pub git_hash: [char; 10],
pub git_hash: [char; 8],
pub players_count: u16,
pub player_cap: u16,
pub battlemode: ServerBattleMode,

View File

@ -6,7 +6,7 @@ use std::{
time::Duration,
};
use protocol::wire::{self, dgram};
use protocol::Parcel;
use tokio::{
net::UdpSocket,
sync::{watch, RwLock},
@ -15,7 +15,7 @@ use tokio::{
use tracing::{debug, trace};
use crate::proto::{
Ping, Pong, QueryServerRequest, QueryServerResponse, ServerInfo, VELOREN_HEADER,
QueryServerRequest, QueryServerResponse, ServerInfo, MAX_REQUEST_SIZE, VELOREN_HEADER,
};
const RESPONSE_SEND_TIMEOUT: Duration = Duration::from_secs(2);
@ -23,7 +23,7 @@ const RESPONSE_SEND_TIMEOUT: Duration = Duration::from_secs(2);
pub struct QueryServer {
pub addr: SocketAddr,
server_info: watch::Receiver<ServerInfo>,
pipeline: dgram::Pipeline<QueryServerRequest, wire::middleware::pipeline::Default>,
settings: protocol::Settings,
}
#[derive(Default, Clone, Copy, Debug)]
@ -32,7 +32,6 @@ pub struct Metrics {
pub dropped_packets: u32,
pub invalid_packets: u32,
pub proccessing_errors: u32,
pub ping_requests: u32,
pub info_requests: u32,
pub sent_responses: u32,
pub failed_responses: u32,
@ -44,15 +43,17 @@ impl QueryServer {
Self {
addr,
server_info,
pipeline: crate::create_pipeline(),
settings: Default::default(),
}
}
pub async fn run(&mut self, metrics: Arc<RwLock<Metrics>>) -> Result<(), tokio::io::Error> {
let socket = UdpSocket::bind(self.addr).await?;
let mut buf = Box::new([0; 1024]);
let mut buf = Box::new([0; MAX_REQUEST_SIZE]);
loop {
*buf = [0; MAX_REQUEST_SIZE];
let Ok((len, remote_addr)) = socket.recv_from(&mut *buf).await.inspect_err(|err| {
debug!("Error while receiving from query server socket: {err:?}")
}) else {
@ -66,7 +67,8 @@ impl QueryServer {
let raw_msg_buf = &buf[..len];
let msg_buf = if Self::validate_datagram(raw_msg_buf) {
&raw_msg_buf[VELOREN_HEADER.len()..]
// Require 2 extra bytes for version (currently unused)
&raw_msg_buf[(VELOREN_HEADER.len() + 2)..]
} else {
new_metrics.dropped_packets += 1;
continue;
@ -83,18 +85,16 @@ impl QueryServer {
debug!(?error, "Error while processing datagram");
}
*buf = [0; 1024];
// Update metrics at the end of eath packet
let mut metrics = metrics.write().await;
*metrics += new_metrics;
*metrics.write().await += new_metrics;
}
}
// Header must be discarded after this validation passes
fn validate_datagram(data: &[u8]) -> bool {
let len = data.len();
if len < VELOREN_HEADER.len() + 1 {
// Require 2 extra bytes for version (currently unused)
if len < VELOREN_HEADER.len() + 3 {
trace!(?len, "Datagram too short");
false
} else if data[0..VELOREN_HEADER.len()] != VELOREN_HEADER {
@ -112,7 +112,7 @@ impl QueryServer {
(new_metrics, metrics): (&mut Metrics, Arc<RwLock<Metrics>>),
) -> Result<(), tokio::io::Error> {
let Ok(packet): Result<QueryServerRequest, _> =
self.pipeline.receive_from(&mut io::Cursor::new(datagram))
<QueryServerRequest as Parcel>::read(&mut io::Cursor::new(datagram), &self.settings)
else {
new_metrics.invalid_packets += 1;
return Ok(());
@ -132,16 +132,6 @@ impl QueryServer {
}
}
match packet {
QueryServerRequest::Ping(Ping) => {
new_metrics.ping_requests += 1;
tokio::task::spawn(async move {
timed(
Self::send_response(QueryServerResponse::Pong(Pong), remote, &metrics),
&metrics,
)
.await;
});
},
QueryServerRequest::ServerInfo(_) => {
new_metrics.info_requests += 1;
let server_info = *self.server_info.borrow();
@ -174,11 +164,13 @@ impl QueryServer {
return;
};
let mut buf = Vec::new();
let mut pipeline = crate::create_pipeline();
_ = pipeline.send_to(&mut io::Cursor::new(&mut buf), &response);
let buf = if let Ok(data) =
<QueryServerResponse as Parcel>::raw_bytes(&response, &Default::default())
{
data
} else {
Vec::new()
};
match socket.send_to(&buf, addr).await {
Ok(_) => {
metrics.write().await.sent_responses += 1;
@ -199,7 +191,6 @@ impl std::ops::AddAssign for Metrics {
dropped_packets,
invalid_packets,
proccessing_errors,
ping_requests,
info_requests,
sent_responses,
failed_responses,
@ -210,7 +201,6 @@ impl std::ops::AddAssign for Metrics {
self.dropped_packets += dropped_packets;
self.invalid_packets += invalid_packets;
self.proccessing_errors += proccessing_errors;
self.ping_requests += ping_requests;
self.info_requests += info_requests;
self.sent_responses += sent_responses;
self.failed_responses += failed_responses;

View File

@ -75,7 +75,6 @@ pub struct QueryServerMetrics {
pub dropped_packets: IntGauge,
pub invalid_packets: IntGauge,
pub proccessing_errors: IntGauge,
pub ping_requests: IntGauge,
pub info_requests: IntGauge,
pub sent_responses: IntGauge,
pub failed_responses: IntGauge,
@ -456,10 +455,6 @@ impl QueryServerMetrics {
"query_server::proccessing_errors",
"Amount of errors that occured while processing a query server request",
))?;
let ping_requests = IntGauge::with_opts(Opts::new(
"query_server::ping_requests",
"Amount of ping requests received by the query server",
))?;
let info_requests = IntGauge::with_opts(Opts::new(
"query_server::info_requests",
"Amount of server info requests received by the query server",
@ -481,7 +476,6 @@ impl QueryServerMetrics {
registry.register(Box::new(dropped_packets.clone()))?;
registry.register(Box::new(invalid_packets.clone()))?;
registry.register(Box::new(proccessing_errors.clone()))?;
registry.register(Box::new(ping_requests.clone()))?;
registry.register(Box::new(info_requests.clone()))?;
registry.register(Box::new(sent_responses.clone()))?;
registry.register(Box::new(failed_responses.clone()))?;
@ -492,7 +486,6 @@ impl QueryServerMetrics {
dropped_packets,
invalid_packets,
proccessing_errors,
ping_requests,
info_requests,
sent_responses,
failed_responses,
@ -507,7 +500,6 @@ impl QueryServerMetrics {
dropped_packets,
invalid_packets,
proccessing_errors,
ping_requests,
info_requests,
sent_responses,
failed_responses,
@ -518,7 +510,6 @@ impl QueryServerMetrics {
self.dropped_packets.set(dropped_packets as i64);
self.invalid_packets.set(invalid_packets as i64);
self.proccessing_errors.set(proccessing_errors as i64);
self.ping_requests.set(ping_requests as i64);
self.info_requests.set(info_requests as i64);
self.sent_responses.set(sent_responses as i64);
self.failed_responses.set(failed_responses as i64);

View File

@ -5,8 +5,9 @@ use crate::{
};
use common::{resources::TimeOfDay, slowjob::SlowJobPool, terrain::TerrainGrid};
use common_ecs::{Job, Origin, Phase, SysMetrics, System};
use specs::{Entities, Join, Read, ReadExpect, Write};
use std::time::Instant;
use specs::{Entities, Join, Read, ReadExpect};
use std::{sync::Arc, time::Instant};
use tokio::sync::RwLock;
use veloren_query_server::server::Metrics as RawQueryServerMetrics;
/// This system exports metrics
@ -28,8 +29,8 @@ impl<'a> System<'a> for Sys {
ReadExpect<'a, TickMetrics>,
ReadExpect<'a, PhysicsMetrics>,
ReadExpect<'a, JobMetrics>,
Write<'a, Option<RawQueryServerMetrics>>,
ReadExpect<'a, QueryServerMetrics>,
Option<Read<'a, Arc<RwLock<RawQueryServerMetrics>>>>,
Option<ReadExpect<'a, QueryServerMetrics>>,
);
const NAME: &'static str = "metrics";
@ -53,7 +54,7 @@ impl<'a> System<'a> for Sys {
export_tick,
export_physics,
export_jobs,
mut raw_query_server,
raw_query_server,
export_query_server,
): Self::SystemData,
) {
@ -171,8 +172,10 @@ impl<'a> System<'a> for Sys {
.with_label_values(&["metrics"])
.observe(len as f64 / NANOSEC_PER_SEC);
if let Some(query_server_metrics) = raw_query_server.as_mut() {
export_query_server.apply(query_server_metrics.reset());
if let (Some(query_server_metrics), Some(export_query_server)) =
(raw_query_server, export_query_server)
{
export_query_server.apply(query_server_metrics.blocking_write().reset());
}
}
}

View File

@ -9,7 +9,7 @@ use crate::{client::Client, Settings, Tick};
const INFO_SEND_INTERVAL: u64 = 60;
lazy_static! {
pub static ref GIT_HASH: [char; 10] = common::util::GIT_HASH[..10]
pub static ref GIT_HASH: [char; 8] = common::util::GIT_HASH[..8]
.chars()
.collect::<Vec<_>>()
.try_into()
@ -23,7 +23,7 @@ impl<'a> System<'a> for Sys {
type SystemData = (
Read<'a, Tick>,
Read<'a, Settings>,
Read<'a, Option<tokio::sync::watch::Sender<ServerInfo>>>,
Option<Read<'a, tokio::sync::watch::Sender<ServerInfo>>>,
ReadStorage<'a, Client>,
);