Remove ping and add request size test

This commit is contained in:
crabman 2024-05-02 07:45:53 +00:00
parent 78fd92fae5
commit 6430091512
No known key found for this signature in database
10 changed files with 102 additions and 82 deletions

View File

@ -14,7 +14,7 @@ use veloren_query_server::{
const DEFAULT_SERVER_INFO: ServerInfo = ServerInfo { const DEFAULT_SERVER_INFO: ServerInfo = ServerInfo {
git_hash: 0, git_hash: 0,
git_version: 0, git_timestamp: 0,
players_count: 100, players_count: 100,
player_cap: 300, player_cap: 300,
battlemode: ServerBattleMode::GlobalPvE, battlemode: ServerBattleMode::GlobalPvE,
@ -41,7 +41,7 @@ async fn main() {
let start = Instant::now(); let start = Instant::now();
for _i in 0..10000 { for _i in 0..10000 {
if let Err(error) = client.ping().await { if let Err(error) = client.server_info().await {
error!(?error, "Server info request error"); error!(?error, "Server info request error");
} }
} }

View File

@ -6,11 +6,11 @@ use std::{
use protocol::Parcel; use protocol::Parcel;
use tokio::{net::UdpSocket, time::timeout}; use tokio::{net::UdpSocket, time::timeout};
use tracing::{trace, warn}; use tracing::trace;
use crate::proto::{ use crate::proto::{
QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse, QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse,
ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER, VERSION, ServerInfo, MAX_RESPONSE_SIZE,
}; };
// This must be at least 2 for the client to get a value for the `p` field. // This must be at least 2 for the client to get a value for the `p` field.
@ -23,7 +23,6 @@ pub enum QueryClientError {
InvalidResponse, InvalidResponse,
Timeout, Timeout,
ChallengeFailed, ChallengeFailed,
RequestTooLarge,
} }
struct ClientInitData { struct ClientInitData {
@ -46,6 +45,7 @@ impl QueryClient {
self.send_query(QueryServerRequest::ServerInfo) self.send_query(QueryServerRequest::ServerInfo)
.await .await
.and_then(|(response, duration)| { .and_then(|(response, duration)| {
#[allow(irrefutable_let_patterns)]
if let QueryServerResponse::ServerInfo(info) = response { if let QueryServerResponse::ServerInfo(info) = response {
Ok((info, duration)) Ok((info, duration))
} else { } else {
@ -54,18 +54,6 @@ impl QueryClient {
}) })
} }
pub async fn ping(&mut self) -> Result<Duration, QueryClientError> {
self.send_query(QueryServerRequest::Ping)
.await
.and_then(|(response, duration)| {
if let QueryServerResponse::Pong = response {
Ok(duration)
} else {
Err(QueryClientError::InvalidResponse)
}
})
}
async fn send_query( async fn send_query(
&mut self, &mut self,
request: QueryServerRequest, request: QueryServerRequest,
@ -73,42 +61,20 @@ impl QueryClient {
let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?; let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?;
for _ in 0..MAX_REQUEST_RETRIES { for _ in 0..MAX_REQUEST_RETRIES {
let mut buf = Vec::with_capacity(VELOREN_HEADER.len() + 2 + MAX_REQUEST_SIZE); let request = if let Some(init) = &self.init {
// 2 extra bytes for version information, currently unused
buf.extend(VERSION.to_le_bytes());
buf.extend({
let request_data = if let Some(init) = &self.init {
// TODO: Use the maximum version supported by both the client and server once // TODO: Use the maximum version supported by both the client and server once
// new protocol versions are added // new protocol versions are added
<RawQueryServerRequest as Parcel>::raw_bytes( RawQueryServerRequest { p: init.p, request }
&RawQueryServerRequest { p: init.p, request },
&Default::default(),
)?
} else { } else {
// TODO: Use the legacy version here once new protocol versions are added // TODO: Use the legacy version here once new protocol versions are added
<RawQueryServerRequest as Parcel>::raw_bytes( RawQueryServerRequest {
&RawQueryServerRequest { p: 0, request }, p: 0,
&Default::default(), request: QueryServerRequest::Init,
)?
};
if request_data.len() > MAX_REQUEST_SIZE {
warn!(
?request,
?MAX_REQUEST_SIZE,
"Attempted to send request larger than the max size ({})",
request_data.len()
);
Err(QueryClientError::RequestTooLarge)?
} }
request_data };
}); let buf = request.serialize()?;
buf.resize(2 + MAX_RESPONSE_SIZE, 0);
buf.extend(VELOREN_HEADER);
let query_sent = Instant::now(); let query_sent = Instant::now();
socket.send_to(&buf, self.addr).await?; socket.send_to(&buf, self.addr).await?;
let mut buf = vec![0; MAX_RESPONSE_SIZE]; let mut buf = vec![0; MAX_RESPONSE_SIZE];
let (buf_len, _) = timeout(Duration::from_secs(2), socket.recv_from(&mut buf)) let (buf_len, _) = timeout(Duration::from_secs(2), socket.recv_from(&mut buf))
.await .await
@ -119,7 +85,6 @@ impl QueryClient {
} }
let packet = <RawQueryServerResponse as Parcel>::read( let packet = <RawQueryServerResponse as Parcel>::read(
// TODO: Remove this padding once version information is added to packets
&mut io::Cursor::new(&buf[..buf_len]), &mut io::Cursor::new(&buf[..buf_len]),
&Default::default(), &Default::default(),
)?; )?;

View File

@ -1,4 +1,4 @@
#[cfg(feature = "client")] pub mod client; #[cfg(feature = "client")] pub mod client;
pub mod proto; pub mod proto;
mod ratelimit; #[cfg(feature = "server")] mod ratelimit;
#[cfg(feature = "server")] pub mod server; #[cfg(feature = "server")] pub mod server;

View File

@ -2,11 +2,10 @@ use protocol::Protocol;
pub(crate) const VERSION: u16 = 0; pub(crate) const VERSION: u16 = 0;
pub(crate) const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n']; pub(crate) const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n'];
// The actual maximum size of packets will be `MAX_REQUEST_SIZE + pub(crate) const MAX_REQUEST_CONTENT_SIZE: usize = 300;
// VELOREN_HEADER.len() + 2` (2 added for currently unused version).
// NOTE: The actual maximum size must never exceed 1200 or we risk getting near // NOTE: The actual maximum size must never exceed 1200 or we risk getting near
// MTU limits for some networks. // MTU limits for some networks.
pub(crate) const MAX_REQUEST_SIZE: usize = 300; pub(crate) const MAX_REQUEST_SIZE: usize = MAX_REQUEST_CONTENT_SIZE + VELOREN_HEADER.len() + 2;
pub(crate) const MAX_RESPONSE_SIZE: usize = 256; pub(crate) const MAX_RESPONSE_SIZE: usize = 256;
#[derive(Protocol, Debug, Clone, Copy)] #[derive(Protocol, Debug, Clone, Copy)]
@ -21,9 +20,17 @@ pub(crate) struct RawQueryServerRequest {
#[protocol(discriminator(u8))] #[protocol(discriminator(u8))]
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
pub enum QueryServerRequest { pub enum QueryServerRequest {
/// This requests exists mostly for backwards-compatibilty reasons. As the
/// first message sent to the server should always be in the V0 version
/// of the protocol, if future versions of the protocol have more
/// requests than server info it may be confusing to request `P` and the max
/// version with a `QueryServerRequest::ServerInfo` request (the request
/// will still be dropped as the supplied `P` value is invalid).
Init,
ServerInfo, ServerInfo,
Ping, // New requests should be added at the end to prevent breakage.
// New requests should be added at the end to prevent breakage // NOTE: Any new (sub-)variants must be added to the `check_request_sizes` test at the end of
// this file
} }
#[derive(Protocol, Debug, Clone, Copy)] #[derive(Protocol, Debug, Clone, Copy)]
@ -56,14 +63,13 @@ pub(crate) enum RawQueryServerResponse {
#[protocol(discriminator(u8))] #[protocol(discriminator(u8))]
pub enum QueryServerResponse { pub enum QueryServerResponse {
ServerInfo(ServerInfo), ServerInfo(ServerInfo),
Pong,
// New responses should be added at the end to prevent breakage // New responses should be added at the end to prevent breakage
} }
#[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)] #[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)]
pub struct ServerInfo { pub struct ServerInfo {
pub git_hash: u32, pub git_hash: u32,
pub git_version: i64, pub git_timestamp: i64,
pub players_count: u16, pub players_count: u16,
pub player_cap: u16, pub player_cap: u16,
pub battlemode: ServerBattleMode, pub battlemode: ServerBattleMode,
@ -78,3 +84,49 @@ pub enum ServerBattleMode {
GlobalPvE, GlobalPvE,
PerPlayer, PerPlayer,
} }
impl RawQueryServerRequest {
#[cfg(any(feature = "client", test))]
pub fn serialize(&self) -> Result<Vec<u8>, protocol::Error> {
use protocol::Parcel;
let mut buf = Vec::with_capacity(MAX_REQUEST_SIZE);
// 2 extra bytes for version information, currently unused
buf.extend(VERSION.to_le_bytes());
buf.extend({
let request_data =
<RawQueryServerRequest as Parcel>::raw_bytes(self, &Default::default())?;
if request_data.len() > MAX_REQUEST_CONTENT_SIZE {
panic!(
"Attempted to send request larger than the max size (size: {}, max size: \
{MAX_REQUEST_CONTENT_SIZE}, request: {self:?})",
request_data.len()
);
}
request_data
});
const _: () = assert!(MAX_RESPONSE_SIZE + VELOREN_HEADER.len() <= MAX_REQUEST_SIZE);
buf.resize(MAX_RESPONSE_SIZE.max(buf.len()), 0);
buf.extend(VELOREN_HEADER);
Ok(buf)
}
}
#[cfg(test)]
mod tests {
use super::{QueryServerRequest, RawQueryServerRequest};
#[test]
fn check_request_sizes() {
const ALL_REQUESTS: &[QueryServerRequest] =
&[QueryServerRequest::ServerInfo, QueryServerRequest::Init];
for request in ALL_REQUESTS {
let request = RawQueryServerRequest {
p: 0,
request: *request,
};
request.serialize().unwrap(); // This will panic if the size is above MAX_REQUEST_SIZE
}
}
}

View File

@ -37,7 +37,7 @@ pub struct Metrics {
pub invalid_packets: u32, pub invalid_packets: u32,
pub proccessing_errors: u32, pub proccessing_errors: u32,
pub info_requests: u32, pub info_requests: u32,
pub ping_requests: u32, pub init_requests: u32,
pub sent_responses: u32, pub sent_responses: u32,
pub failed_responses: u32, pub failed_responses: u32,
pub timed_out_responses: u32, pub timed_out_responses: u32,
@ -129,13 +129,13 @@ impl QueryServer {
if len < MAX_RESPONSE_SIZE.max(VELOREN_HEADER.len() + 2) { if len < MAX_RESPONSE_SIZE.max(VELOREN_HEADER.len() + 2) {
trace!(?len, "Datagram too short"); trace!(?len, "Datagram too short");
false false
} else if len > (MAX_REQUEST_SIZE + VELOREN_HEADER.len() + 2) { } else if len > MAX_REQUEST_SIZE {
trace!(?len, "Datagram too large"); trace!(?len, "Datagram too large");
false false
} else if data[(len - VELOREN_HEADER.len())..] != VELOREN_HEADER { } else if data[(len - VELOREN_HEADER.len())..] != VELOREN_HEADER {
trace!(?len, "Datagram header invalid"); trace!(?len, "Datagram header invalid");
false false
// FIXME: Allow lower versions once proper versioning is added. // TODO: Allow lower versions once proper versioning is added.
} else if u16::from_ne_bytes(data[..2].try_into().unwrap()) != VERSION { } else if u16::from_ne_bytes(data[..2].try_into().unwrap()) != VERSION {
trace!( trace!(
"Datagram has invalid version {:?}, current {VERSION:?}", "Datagram has invalid version {:?}, current {VERSION:?}",
@ -205,21 +205,24 @@ impl QueryServer {
} }
match request { match request {
QueryServerRequest::ServerInfo => { QueryServerRequest::Init => {
metrics.info_requests += 1; metrics.init_requests += 1;
let server_info = *self.server_info.borrow();
Self::send_response( Self::send_response(
RawQueryServerResponse::Response(QueryServerResponse::ServerInfo(server_info)), RawQueryServerResponse::Init(Init {
p: real_p,
max_supported_version: VERSION,
}),
remote, remote,
socket, socket,
metrics, metrics,
) )
.await; .await;
}, },
QueryServerRequest::Ping => { QueryServerRequest::ServerInfo => {
metrics.ping_requests += 1; metrics.info_requests += 1;
let server_info = *self.server_info.borrow();
Self::send_response( Self::send_response(
RawQueryServerResponse::Response(QueryServerResponse::Pong), RawQueryServerResponse::Response(QueryServerResponse::ServerInfo(server_info)),
remote, remote,
socket, socket,
metrics, metrics,
@ -283,7 +286,7 @@ impl std::ops::AddAssign for Metrics {
invalid_packets, invalid_packets,
proccessing_errors, proccessing_errors,
info_requests, info_requests,
ping_requests, init_requests,
sent_responses, sent_responses,
failed_responses, failed_responses,
timed_out_responses, timed_out_responses,
@ -295,7 +298,7 @@ impl std::ops::AddAssign for Metrics {
self.invalid_packets += invalid_packets; self.invalid_packets += invalid_packets;
self.proccessing_errors += proccessing_errors; self.proccessing_errors += proccessing_errors;
self.info_requests += info_requests; self.info_requests += info_requests;
self.ping_requests += ping_requests; self.init_requests += init_requests;
self.sent_responses += sent_responses; self.sent_responses += sent_responses;
self.failed_responses += failed_responses; self.failed_responses += failed_responses;
self.timed_out_responses += timed_out_responses; self.timed_out_responses += timed_out_responses;

View File

@ -23,7 +23,7 @@ lazy_static::lazy_static! {
pub static ref GIT_DATE: String = GIT_DATETIME.split('-').take(3).collect::<Vec<&str>>().join("-"); pub static ref GIT_DATE: String = GIT_DATETIME.split('-').take(3).collect::<Vec<&str>>().join("-");
pub static ref GIT_TIME: &'static str = GIT_DATETIME.split('-').nth(3).expect("failed to retrieve git_time!"); pub static ref GIT_TIME: &'static str = GIT_DATETIME.split('-').nth(3).expect("failed to retrieve git_time!");
pub static ref GIT_DATE_TIMESTAMP: i64 = pub static ref GIT_DATE_TIMESTAMP: i64 =
NaiveDateTime::parse_from_str(dbg!(&*GIT_DATETIME), "%Y-%m-%d-%H:%M") NaiveDateTime::parse_from_str(*GIT_DATETIME, "%Y-%m-%d-%H:%M")
.expect("Invalid date") .expect("Invalid date")
.and_utc().timestamp(); .and_utc().timestamp();
pub static ref DISPLAY_VERSION: String = if GIT_TAG.is_empty() { pub static ref DISPLAY_VERSION: String = if GIT_TAG.is_empty() {

View File

@ -18,7 +18,7 @@ default = ["worldgen", "plugins", "persistent_world", "simd"]
[dependencies] [dependencies]
common = { package = "veloren-common", path = "../common" } common = { package = "veloren-common", path = "../common" }
common-base = { package = "veloren-common-base", path = "../common/base" } common-base = { package = "veloren-common-base", path = "../common/base" }
veloren-query-server = { package = "veloren-query-server", path = "../common/query_server", features = ["server"] } veloren-query-server = { package = "veloren-query-server", path = "../common/query_server", default-features = false, features = ["server"] }
common-ecs = { package = "veloren-common-ecs", path = "../common/ecs" } common-ecs = { package = "veloren-common-ecs", path = "../common/ecs" }
common-state = { package = "veloren-common-state", path = "../common/state" } common-state = { package = "veloren-common-state", path = "../common/state" }
common-systems = { package = "veloren-common-systems", path = "../common/systems" } common-systems = { package = "veloren-common-systems", path = "../common/systems" }

View File

@ -609,7 +609,7 @@ impl Server {
let (query_server_info_tx, query_server_info_rx) = let (query_server_info_tx, query_server_info_rx) =
tokio::sync::watch::channel(ServerInfo { tokio::sync::watch::channel(ServerInfo {
git_hash: *sys::server_info::GIT_HASH, git_hash: *sys::server_info::GIT_HASH,
git_version: *GIT_DATE_TIMESTAMP, git_timestamp: *GIT_DATE_TIMESTAMP,
players_count: 0, players_count: 0,
player_cap: settings.max_players, player_cap: settings.max_players,
battlemode: settings.gameplay.battle_mode.into(), battlemode: settings.gameplay.battle_mode.into(),

View File

@ -76,7 +76,7 @@ pub struct QueryServerMetrics {
pub invalid_packets: IntCounter, pub invalid_packets: IntCounter,
pub proccessing_errors: IntCounter, pub proccessing_errors: IntCounter,
pub info_requests: IntCounter, pub info_requests: IntCounter,
pub ping_requests: IntCounter, pub init_requests: IntCounter,
pub sent_responses: IntCounter, pub sent_responses: IntCounter,
pub failed_responses: IntCounter, pub failed_responses: IntCounter,
pub timed_out_responses: IntCounter, pub timed_out_responses: IntCounter,
@ -461,9 +461,9 @@ impl QueryServerMetrics {
"query_server::info_requests", "query_server::info_requests",
"Amount of server info requests received by the query server", "Amount of server info requests received by the query server",
))?; ))?;
let ping_requests = IntCounter::with_opts(Opts::new( let init_requests = IntCounter::with_opts(Opts::new(
"query_server::ping_requests", "query_server::ping_requests",
"Amount of server ping requests received by the query server", "Amount of init requests received by the query server",
))?; ))?;
let sent_responses = IntCounter::with_opts(Opts::new( let sent_responses = IntCounter::with_opts(Opts::new(
"query_server::sent_responses", "query_server::sent_responses",
@ -487,7 +487,7 @@ impl QueryServerMetrics {
registry.register(Box::new(invalid_packets.clone()))?; registry.register(Box::new(invalid_packets.clone()))?;
registry.register(Box::new(proccessing_errors.clone()))?; registry.register(Box::new(proccessing_errors.clone()))?;
registry.register(Box::new(info_requests.clone()))?; registry.register(Box::new(info_requests.clone()))?;
registry.register(Box::new(ping_requests.clone()))?; registry.register(Box::new(init_requests.clone()))?;
registry.register(Box::new(sent_responses.clone()))?; registry.register(Box::new(sent_responses.clone()))?;
registry.register(Box::new(failed_responses.clone()))?; registry.register(Box::new(failed_responses.clone()))?;
registry.register(Box::new(timed_out_responses.clone()))?; registry.register(Box::new(timed_out_responses.clone()))?;
@ -499,7 +499,7 @@ impl QueryServerMetrics {
invalid_packets, invalid_packets,
proccessing_errors, proccessing_errors,
info_requests, info_requests,
ping_requests, init_requests,
sent_responses, sent_responses,
failed_responses, failed_responses,
timed_out_responses, timed_out_responses,
@ -515,7 +515,7 @@ impl QueryServerMetrics {
invalid_packets, invalid_packets,
proccessing_errors, proccessing_errors,
info_requests, info_requests,
ping_requests, init_requests,
sent_responses, sent_responses,
failed_responses, failed_responses,
timed_out_responses, timed_out_responses,
@ -527,7 +527,7 @@ impl QueryServerMetrics {
self.invalid_packets.inc_by(invalid_packets as u64); self.invalid_packets.inc_by(invalid_packets as u64);
self.proccessing_errors.inc_by(proccessing_errors as u64); self.proccessing_errors.inc_by(proccessing_errors as u64);
self.info_requests.inc_by(info_requests as u64); self.info_requests.inc_by(info_requests as u64);
self.ping_requests.inc_by(ping_requests as u64); self.init_requests.inc_by(init_requests as u64);
self.sent_responses.inc_by(sent_responses as u64); self.sent_responses.inc_by(sent_responses as u64);
self.failed_responses.inc_by(failed_responses as u64); self.failed_responses.inc_by(failed_responses as u64);
self.timed_out_responses.inc_by(timed_out_responses as u64); self.timed_out_responses.inc_by(timed_out_responses as u64);

View File

@ -37,7 +37,7 @@ impl<'a> System<'a> for Sys {
let count = players.count().try_into().unwrap_or(u16::MAX); let count = players.count().try_into().unwrap_or(u16::MAX);
if let Err(error) = sender.send(ServerInfo { if let Err(error) = sender.send(ServerInfo {
git_hash: *GIT_HASH, git_hash: *GIT_HASH,
git_version: *GIT_DATE_TIMESTAMP, git_timestamp: *GIT_DATE_TIMESTAMP,
players_count: count, players_count: count,
player_cap: settings.max_players, player_cap: settings.max_players,
battlemode: settings.gameplay.battle_mode.into(), battlemode: settings.gameplay.battle_mode.into(),