address review comments

This commit is contained in:
crabman 2024-04-29 07:20:07 +00:00
parent 4c0cadabcf
commit 4455c777ce
No known key found for this signature in database
12 changed files with 301 additions and 216 deletions

View File

@ -75,6 +75,7 @@ where
"refinery_core::traits::divergent=off", "refinery_core::traits::divergent=off",
"veloren_server::persistence::character=info", "veloren_server::persistence::character=info",
"veloren_server::settings=info", "veloren_server::settings=info",
"veloren_query_server=info",
]; ];
for s in default_directives { for s in default_directives {

View File

@ -7,13 +7,13 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features] [features]
server = ["dep:tokio", "dep:rand"] server = ["dep:rand"]
client = ["dep:tokio", "tokio/time"] client = ["tokio/time"]
example = ["tokio/macros", "tokio/rt-multi-thread", "dep:tracing-subscriber"] example = ["tokio/macros", "tokio/rt-multi-thread", "dep:tracing-subscriber"]
default = ["server", "client"] default = ["server", "client"]
[dependencies] [dependencies]
tokio = { workspace = true, optional = true, features = ["net", "sync"] } tokio = { workspace = true, features = ["net", "sync"] }
protocol = { version = "3.4.0", default-features = false, features = ["derive"] } protocol = { version = "3.4.0", default-features = false, features = ["derive"] }
tracing-subscriber = { version = "0.3.7", optional = true } tracing-subscriber = { version = "0.3.7", optional = true }
tracing = { workspace = true } tracing = { workspace = true }

View File

@ -1,10 +1,10 @@
use std::{ use std::{
net::{IpAddr, Ipv4Addr, SocketAddr}, net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc, sync::{Arc, Mutex},
time::Instant, time::Instant,
}; };
use tokio::sync::{watch, RwLock}; use tokio::sync::watch;
use tracing::error; use tracing::error;
use veloren_query_server::{ use veloren_query_server::{
client::QueryClient, client::QueryClient,
@ -13,7 +13,8 @@ use veloren_query_server::{
}; };
const DEFAULT_SERVER_INFO: ServerInfo = ServerInfo { const DEFAULT_SERVER_INFO: ServerInfo = ServerInfo {
git_hash: ['\0'; 8], git_hash: 0,
git_version: 0,
players_count: 100, players_count: 100,
player_cap: 300, player_cap: 300,
battlemode: ServerBattleMode::GlobalPvE, battlemode: ServerBattleMode::GlobalPvE,
@ -24,8 +25,8 @@ async fn main() {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 14006); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 14006);
let (_sender, receiver) = watch::channel(DEFAULT_SERVER_INFO); let (_sender, receiver) = watch::channel(DEFAULT_SERVER_INFO);
let mut server = QueryServer::new(addr, receiver); let mut server = QueryServer::new(addr, receiver, 10002);
let metrics = Arc::new(RwLock::new(Metrics::default())); let metrics = Arc::new(Mutex::new(Metrics::default()));
let metrics2 = Arc::clone(&metrics); let metrics2 = Arc::clone(&metrics);
tokio::task::spawn(async move { server.run(metrics2).await.unwrap() }); tokio::task::spawn(async move { server.run(metrics2).await.unwrap() });
@ -39,12 +40,12 @@ async fn main() {
let start = Instant::now(); let start = Instant::now();
for _i in 0..32 { for _i in 0..10000 {
if let Err(error) = client.server_info().await { if let Err(error) = client.ping().await {
error!(?error, "Server info request error"); error!(?error, "Server info request error");
} }
} }
println!("Metrics = {:#?}", metrics.read().await); println!("Metrics = {:#?}", metrics.lock().unwrap());
dbg!(start.elapsed()); dbg!(start.elapsed());
} }

View File

@ -6,13 +6,14 @@ use std::{
use protocol::Parcel; use protocol::Parcel;
use tokio::{net::UdpSocket, time::timeout}; use tokio::{net::UdpSocket, time::timeout};
use tracing::trace; use tracing::{trace, warn};
use crate::proto::{ use crate::proto::{
QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse, QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse,
ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER, ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER, VERSION,
}; };
// This must be at least 2 for the client to get a value for the `p` field.
const MAX_REQUEST_RETRIES: usize = 5; const MAX_REQUEST_RETRIES: usize = 5;
#[derive(Debug)] #[derive(Debug)]
@ -20,10 +21,14 @@ pub enum QueryClientError {
Io(tokio::io::Error), Io(tokio::io::Error),
Protocol(protocol::Error), Protocol(protocol::Error),
InvalidResponse, InvalidResponse,
InvalidVersion,
Timeout, Timeout,
ChallengeFailed, ChallengeFailed,
RequestTooLarge,
} }
/// The `p` field has to be requested from the server each time this client is
/// constructed, if possible reuse this!
pub struct QueryClient { pub struct QueryClient {
pub addr: SocketAddr, pub addr: SocketAddr,
p: u64, p: u64,
@ -33,10 +38,9 @@ impl QueryClient {
pub fn new(addr: SocketAddr) -> Self { Self { addr, p: 0 } } pub fn new(addr: SocketAddr) -> Self { Self { addr, p: 0 } }
pub async fn server_info(&mut self) -> Result<(ServerInfo, Duration), QueryClientError> { pub async fn server_info(&mut self) -> Result<(ServerInfo, Duration), QueryClientError> {
self.send_query(QueryServerRequest::ServerInfo(Default::default())) self.send_query(QueryServerRequest::ServerInfo)
.await .await
.and_then(|(response, duration)| { .and_then(|(response, duration)| {
#[allow(irrefutable_let_patterns)] // TODO: remove when more variants are added
if let QueryServerResponse::ServerInfo(info) = response { if let QueryServerResponse::ServerInfo(info) = response {
Ok((info, duration)) Ok((info, duration))
} else { } else {
@ -45,36 +49,68 @@ impl QueryClient {
}) })
} }
pub async fn ping(&mut self) -> Result<Duration, QueryClientError> {
self.send_query(QueryServerRequest::Ping)
.await
.and_then(|(response, duration)| {
if let QueryServerResponse::Pong = response {
Ok(duration)
} else {
Err(QueryClientError::InvalidResponse)
}
})
}
async fn send_query( async fn send_query(
&mut self, &mut self,
request: QueryServerRequest, request: QueryServerRequest,
) -> Result<(QueryServerResponse, Duration), QueryClientError> { ) -> Result<(QueryServerResponse, Duration), QueryClientError> {
let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?; let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?;
let mut tries = 0; for _ in 0..MAX_REQUEST_RETRIES {
while tries < MAX_REQUEST_RETRIES {
tries += 1;
let mut buf = Vec::with_capacity(VELOREN_HEADER.len() + 2 + MAX_REQUEST_SIZE); let mut buf = Vec::with_capacity(VELOREN_HEADER.len() + 2 + MAX_REQUEST_SIZE);
buf.extend(VELOREN_HEADER);
// 2 extra bytes for version information, currently unused // 2 extra bytes for version information, currently unused
buf.extend([0; 2]); buf.extend(VERSION.to_le_bytes());
buf.extend(<RawQueryServerRequest as Parcel>::raw_bytes( buf.extend({
&RawQueryServerRequest { p: self.p, request }, let request_data = <RawQueryServerRequest as Parcel>::raw_bytes(
&Default::default(), &RawQueryServerRequest { p: self.p, request },
)?); &Default::default(),
)?;
if request_data.len() > MAX_REQUEST_SIZE {
warn!(
?request,
?MAX_REQUEST_SIZE,
"Attempted to send request larger than the max size ({})",
request_data.len()
);
Err(QueryClientError::RequestTooLarge)?
}
request_data
});
buf.resize(2 + MAX_RESPONSE_SIZE, 0);
buf.extend(VELOREN_HEADER);
let query_sent = Instant::now(); let query_sent = Instant::now();
socket.send_to(&buf, self.addr).await?; socket.send_to(&buf, self.addr).await?;
let mut buf = vec![0; MAX_RESPONSE_SIZE]; let mut buf = vec![0; MAX_RESPONSE_SIZE];
let _ = timeout(Duration::from_secs(2), socket.recv_from(&mut buf)) let (buf_len, _) = timeout(Duration::from_secs(2), socket.recv_from(&mut buf))
.await .await
.map_err(|_| QueryClientError::Timeout)? .map_err(|_| QueryClientError::Timeout)??;
.map_err(|_| QueryClientError::Timeout)?;
if buf_len <= 2 {
Err(QueryClientError::InvalidResponse)?
}
// FIXME: Allow lower versions once proper versioning is added.
if u16::from_le_bytes(buf[..2].try_into().unwrap()) != VERSION {
Err(QueryClientError::InvalidVersion)?
}
let packet = <RawQueryServerResponse as Parcel>::read( let packet = <RawQueryServerResponse as Parcel>::read(
&mut io::Cursor::new(buf), // TODO: Remove this padding once version information is added to packets
&mut io::Cursor::new(&buf[2..buf_len]),
&Default::default(), &Default::default(),
)?; )?;

View File

@ -1,13 +1,17 @@
use protocol::Protocol; use protocol::Protocol;
pub const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n']; pub(crate) const VERSION: u16 = 0;
pub const MAX_REQUEST_SIZE: usize = 300; pub(crate) const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n'];
pub const MAX_RESPONSE_SIZE: usize = 256; // The actual maximum size of packets will be `MAX_REQUEST_SIZE +
// VELOREN_HEADER.len() + 2` (2 added for currently unused version).
// NOTE: The actual maximum size must never exceed 1200 or we risk getting near
// MTU limits for some networks.
pub(crate) const MAX_REQUEST_SIZE: usize = 300;
pub(crate) const MAX_RESPONSE_SIZE: usize = 256;
#[derive(Protocol, Debug, Clone, Copy)] #[derive(Protocol, Debug, Clone, Copy)]
#[protocol(discriminant = "integer")] pub(crate) struct RawQueryServerRequest {
#[protocol(discriminator(u8))] /// See comment on [`RawQueryServerResponse::P`]
pub struct RawQueryServerRequest {
pub p: u64, pub p: u64,
pub request: QueryServerRequest, pub request: QueryServerRequest,
} }
@ -17,15 +21,22 @@ pub struct RawQueryServerRequest {
#[protocol(discriminator(u8))] #[protocol(discriminator(u8))]
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
pub enum QueryServerRequest { pub enum QueryServerRequest {
ServerInfo(ServerInfoRequest), ServerInfo,
Ping,
// New requests should be added at the end to prevent breakage // New requests should be added at the end to prevent breakage
} }
#[derive(Protocol, Debug, Clone, Copy)] #[derive(Protocol, Debug, Clone, Copy)]
#[protocol(discriminant = "integer")] #[protocol(discriminant = "integer")]
#[protocol(discriminator(u8))] #[protocol(discriminator(u8))]
pub enum RawQueryServerResponse { pub(crate) enum RawQueryServerResponse {
Response(QueryServerResponse), Response(QueryServerResponse),
/// This is used as a challenge to prevent IP address spoofing by verifying
/// that the client can receive from the source address.
///
/// Any request to the server must include this value to be processed,
/// otherwise this response will be returned (giving clients a value to pass
/// for later requests).
P(u64), P(u64),
} }
@ -34,18 +45,14 @@ pub enum RawQueryServerResponse {
#[protocol(discriminator(u8))] #[protocol(discriminator(u8))]
pub enum QueryServerResponse { pub enum QueryServerResponse {
ServerInfo(ServerInfo), ServerInfo(ServerInfo),
Pong,
// New responses should be added at the end to prevent breakage // New responses should be added at the end to prevent breakage
} }
#[derive(Protocol, Debug, Clone, Copy)]
pub struct ServerInfoRequest {
// Padding to prevent amplification attacks
pub _padding: [u8; 256],
}
#[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)] #[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)]
pub struct ServerInfo { pub struct ServerInfo {
pub git_hash: [char; 8], pub git_hash: u32,
pub git_version: i64,
pub players_count: u16, pub players_count: u16,
pub player_cap: u16, pub player_cap: u16,
pub battlemode: ServerBattleMode, pub battlemode: ServerBattleMode,
@ -60,11 +67,3 @@ pub enum ServerBattleMode {
GlobalPvE, GlobalPvE,
PerPlayer, PerPlayer,
} }
impl Default for ServerInfoRequest {
fn default() -> Self { ServerInfoRequest { _padding: [0; 256] } }
}
impl ServerInfo {
pub fn git_hash(&self) -> String { String::from_iter(&self.git_hash) }
}

View File

@ -34,24 +34,20 @@ impl RateLimiter {
pub fn maintain(&mut self, now: Instant) { pub fn maintain(&mut self, now: Instant) {
if now.duration_since(self.last_shift) > SHIFT_EVERY { if now.duration_since(self.last_shift) > SHIFT_EVERY {
for (_, state) in self.states.iter_mut() {
state.shift();
}
// Remove empty states // Remove empty states
self.states.retain(|_, state| !state.is_empty()); self.states.retain(|_, state| {
state.shift();
!state.is_empty()
});
self.last_shift = now;
} }
} }
pub fn can_request(&mut self, ip: ReducedIpAddr) -> bool { pub fn can_request(&mut self, ip: ReducedIpAddr) -> bool {
if let Some(state) = self.states.get_mut(&ip) { if let Some(state) = self.states.get_mut(&ip) {
if state.total() >= self.limit { state.0[0] = state.0[0].saturating_add(1);
state.0[0] = state.0[0].saturating_add(1);
false state.total() < self.limit
} else {
state.0[1] += 1;
true
}
} else { } else {
self.states.insert(ip, IpState::default()); self.states.insert(ip, IpState::default());
true true

View File

@ -1,35 +1,29 @@
#[allow(deprecated)] use std::hash::SipHasher; #[allow(deprecated)] use std::hash::SipHasher;
use std::{ use std::{
future::Future,
hash::{Hash, Hasher}, hash::{Hash, Hasher},
io, io::{self, ErrorKind},
net::{Ipv4Addr, SocketAddr, SocketAddrV4}, net::SocketAddr,
sync::Arc, sync::{Arc, Mutex},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use protocol::Parcel; use protocol::Parcel;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use tokio::{ use tokio::{net::UdpSocket, sync::watch};
net::UdpSocket, use tracing::{debug, error, trace};
sync::{watch, RwLock},
time::timeout,
};
use tracing::{debug, trace};
use crate::{ use crate::{
proto::{ proto::{
QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse, QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse,
ServerInfo, MAX_REQUEST_SIZE, VELOREN_HEADER, ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER, VERSION,
}, },
ratelimit::{RateLimiter, ReducedIpAddr}, ratelimit::{RateLimiter, ReducedIpAddr},
}; };
const RESPONSE_SEND_TIMEOUT: Duration = Duration::from_secs(2); const SECRET_REGEN_INTERNVAL: Duration = Duration::from_secs(300);
const SECRET_REGEN_INTERNVAL: Duration = Duration::from_secs(60);
pub struct QueryServer { pub struct QueryServer {
pub addr: SocketAddr, addr: SocketAddr,
server_info: watch::Receiver<ServerInfo>, server_info: watch::Receiver<ServerInfo>,
settings: protocol::Settings, settings: protocol::Settings,
ratelimit: RateLimiter, ratelimit: RateLimiter,
@ -42,6 +36,7 @@ pub struct Metrics {
pub invalid_packets: u32, pub invalid_packets: u32,
pub proccessing_errors: u32, pub proccessing_errors: u32,
pub info_requests: u32, pub info_requests: u32,
pub ping_requests: u32,
pub sent_responses: u32, pub sent_responses: u32,
pub failed_responses: u32, pub failed_responses: u32,
pub timed_out_responses: u32, pub timed_out_responses: u32,
@ -49,17 +44,23 @@ pub struct Metrics {
} }
impl QueryServer { impl QueryServer {
pub fn new(addr: SocketAddr, server_info: watch::Receiver<ServerInfo>) -> Self { pub fn new(addr: SocketAddr, server_info: watch::Receiver<ServerInfo>, ratelimit: u16) -> Self {
Self { Self {
addr, addr,
server_info, server_info,
ratelimit: RateLimiter::new(30), ratelimit: RateLimiter::new(ratelimit),
settings: Default::default(), settings: Default::default(),
} }
} }
pub async fn run(&mut self, metrics: Arc<RwLock<Metrics>>) -> Result<(), tokio::io::Error> { /// This produces TRACE level logs for any packet received on the assigned
let socket = UdpSocket::bind(self.addr).await?; /// port. To prevent potentially unfettered log spam, disable the TRACE
/// level for this crate (when outside of debugging contexts).
///
/// NOTE: TRACE and DEBUG levels are disabled by default for this crate when
/// using `veloren-common-frontend`.
pub async fn run(&mut self, metrics: Arc<Mutex<Metrics>>) -> Result<(), tokio::io::Error> {
let mut socket = UdpSocket::bind(self.addr).await?;
let gen_secret = || { let gen_secret = || {
let mut rng = thread_rng(); let mut rng = thread_rng();
@ -70,12 +71,20 @@ impl QueryServer {
let mut buf = Box::new([0; MAX_REQUEST_SIZE]); let mut buf = Box::new([0; MAX_REQUEST_SIZE]);
loop { loop {
*buf = [0; MAX_REQUEST_SIZE]; let (len, remote_addr) = match socket.recv_from(&mut *buf).await {
Ok(v) => v,
let Ok((len, remote_addr)) = socket.recv_from(&mut *buf).await.inspect_err(|err| { Err(e) if e.kind() == ErrorKind::NotConnected => {
debug!("Error while receiving from query server socket: {err:?}") error!(
}) else { ?e,
continue; "Query server connection was closed, re-binding to socket..."
);
socket = UdpSocket::bind(self.addr).await?;
continue;
},
err => {
debug!(?err, "Error while receiving from query server socket");
continue;
},
}; };
let mut new_metrics = Metrics { let mut new_metrics = Metrics {
@ -86,26 +95,19 @@ impl QueryServer {
let raw_msg_buf = &buf[..len]; let raw_msg_buf = &buf[..len];
let msg_buf = if Self::validate_datagram(raw_msg_buf) { let msg_buf = if Self::validate_datagram(raw_msg_buf) {
// Require 2 extra bytes for version (currently unused) // Require 2 extra bytes for version (currently unused)
&raw_msg_buf[(VELOREN_HEADER.len() + 2)..] &raw_msg_buf[2..(raw_msg_buf.len() - VELOREN_HEADER.len())]
} else { } else {
new_metrics.dropped_packets += 1; new_metrics.dropped_packets += 1;
continue; continue;
}; };
if let Err(error) = self self.process_datagram(msg_buf, remote_addr, secrets, &mut new_metrics, &socket)
.process_datagram( .await;
msg_buf,
remote_addr,
secrets,
(&mut new_metrics, Arc::clone(&metrics)),
)
.await
{
debug!(?error, "Error while processing datagram");
}
// Update metrics at the end of eath packet // Update metrics at the end of eath packet
*metrics.write().await += new_metrics; if let Ok(mut metrics) = metrics.lock() {
*metrics += new_metrics;
}
{ {
let now = Instant::now(); let now = Instant::now();
@ -123,12 +125,22 @@ impl QueryServer {
fn validate_datagram(data: &[u8]) -> bool { fn validate_datagram(data: &[u8]) -> bool {
let len = data.len(); let len = data.len();
// Require 2 extra bytes for version (currently unused) // Require 2 extra bytes for version (currently unused)
if len < VELOREN_HEADER.len() + 3 { if len < MAX_RESPONSE_SIZE.max(VELOREN_HEADER.len() + 2) {
trace!(?len, "Datagram too short"); trace!(?len, "Datagram too short");
false false
} else if data[0..VELOREN_HEADER.len()] != VELOREN_HEADER { } else if len > (MAX_REQUEST_SIZE + VELOREN_HEADER.len() + 2) {
trace!(?len, "Datagram too large");
false
} else if data[(len - VELOREN_HEADER.len())..] != VELOREN_HEADER {
trace!(?len, "Datagram header invalid"); trace!(?len, "Datagram header invalid");
false false
// FIXME: Allow lower versions once proper versioning is added.
} else if u16::from_ne_bytes(data[..2].try_into().unwrap()) != VERSION {
trace!(
"Datagram has invalid version {:?}, current {VERSION:?}",
&data[..2]
);
false
} else { } else {
true true
} }
@ -139,106 +151,117 @@ impl QueryServer {
datagram: &[u8], datagram: &[u8],
remote: SocketAddr, remote: SocketAddr,
secrets: (u64, u64), secrets: (u64, u64),
(new_metrics, metrics): (&mut Metrics, Arc<RwLock<Metrics>>), metrics: &mut Metrics,
) -> Result<(), tokio::io::Error> { socket: &UdpSocket,
) {
let Ok(RawQueryServerRequest { let Ok(RawQueryServerRequest {
p: client_p, p: client_p,
request, request,
}) = }) =
<RawQueryServerRequest as Parcel>::read(&mut io::Cursor::new(datagram), &self.settings) <RawQueryServerRequest as Parcel>::read(&mut io::Cursor::new(datagram), &self.settings)
else { else {
new_metrics.invalid_packets += 1; metrics.invalid_packets += 1;
return Ok(()); return;
}; };
trace!(?request, "Received packet"); trace!(?request, "Received packet");
#[allow(deprecated)] #[allow(deprecated)]
let real_p = { let real_p = {
// Use SipHash-2-4 to compute the `p` value from a server specific
// secret and the client's address.
//
// This is used to verify that packets are from an entity that can
// receive packets at the given address.
//
// Only use the first 64 bits from Ipv6 addresses since the latter
// 64 bits can change very frequently (as much as for every
// request).
let mut hasher = SipHasher::new_with_keys(secrets.0, secrets.1); let mut hasher = SipHasher::new_with_keys(secrets.0, secrets.1);
ReducedIpAddr::from(remote.ip()).hash(&mut hasher); ReducedIpAddr::from(remote.ip()).hash(&mut hasher);
hasher.finish() hasher.finish()
}; };
async fn timed<'a, F: Future<Output = O> + 'a, O>(
fut: F,
metrics: &'a Arc<RwLock<Metrics>>,
) -> Option<O> {
if let Ok(res) = timeout(RESPONSE_SEND_TIMEOUT, fut).await {
Some(res)
} else {
metrics.write().await.timed_out_responses += 1;
None
}
}
if real_p != client_p { if real_p != client_p {
tokio::task::spawn(async move { Self::send_response(RawQueryServerResponse::P(real_p), remote, socket, metrics).await;
timed(
Self::send_response(RawQueryServerResponse::P(real_p), remote, &metrics),
&metrics,
)
.await;
});
return Ok(()); return;
} }
if !self.ratelimit.can_request(remote.ip().into()) { if !self.ratelimit.can_request(remote.ip().into()) {
trace!("Ratelimited request"); trace!("Ratelimited request");
new_metrics.ratelimited += 1; metrics.ratelimited += 1;
return Ok(()); return;
} }
match request { match request {
QueryServerRequest::ServerInfo(_) => { QueryServerRequest::ServerInfo => {
new_metrics.info_requests += 1; metrics.info_requests += 1;
let server_info = *self.server_info.borrow(); let server_info = *self.server_info.borrow();
tokio::task::spawn(async move { Self::send_response(
timed( RawQueryServerResponse::Response(QueryServerResponse::ServerInfo(server_info)),
Self::send_response( remote,
RawQueryServerResponse::Response(QueryServerResponse::ServerInfo( socket,
server_info, metrics,
)), )
remote, .await;
&metrics, },
), QueryServerRequest::Ping => {
&metrics, metrics.ping_requests += 1;
) Self::send_response(
.await; RawQueryServerResponse::Response(QueryServerResponse::Pong),
}); remote,
socket,
metrics,
)
.await;
}, },
} }
Ok(())
} }
async fn send_response( async fn send_response(
response: RawQueryServerResponse, response: RawQueryServerResponse,
addr: SocketAddr, addr: SocketAddr,
metrics: &Arc<RwLock<Metrics>>, socket: &UdpSocket,
metrics: &mut Metrics,
) { ) {
let Ok(socket) = // TODO: Remove this extra padding once we add version information to requests
UdpSocket::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))).await let mut buf = Vec::from(VERSION.to_ne_bytes());
else {
debug!("Failed to create response socket");
return;
};
let buf = if let Ok(data) = match <RawQueryServerResponse as Parcel>::raw_bytes(&response, &Default::default()) {
<RawQueryServerResponse as Parcel>::raw_bytes(&response, &Default::default()) Ok(data) => {
{ buf.extend(data);
data
} else { if buf.len() > MAX_RESPONSE_SIZE {
Vec::new() error!(
}; ?MAX_RESPONSE_SIZE,
match socket.send_to(&buf, addr).await { "Attempted to send a response larger than the maximum allowed size (size: \
Ok(_) => { {}, response: {response:?})",
metrics.write().await.sent_responses += 1; buf.len()
);
#[cfg(debug_assertions)]
panic!(
"Attempted to send a response larger than the maximum allowed size (size: \
{}, max: {}, response: {response:?})",
buf.len(),
MAX_RESPONSE_SIZE
);
}
match socket.send_to(&buf, addr).await {
Ok(_) => {
metrics.sent_responses += 1;
},
Err(err) => {
metrics.failed_responses += 1;
debug!(?err, "Failed to send query server response");
},
}
}, },
Err(err) => { Err(error) => {
metrics.write().await.failed_responses += 1; trace!(?error, "Failed to serialize response");
debug!(?err, "Failed to send query server response"); #[cfg(debug_assertions)]
panic!("Serializing response failed: {error:?} ({response:?})");
}, },
} }
} }
@ -253,6 +276,7 @@ impl std::ops::AddAssign for Metrics {
invalid_packets, invalid_packets,
proccessing_errors, proccessing_errors,
info_requests, info_requests,
ping_requests,
sent_responses, sent_responses,
failed_responses, failed_responses,
timed_out_responses, timed_out_responses,
@ -264,6 +288,7 @@ impl std::ops::AddAssign for Metrics {
self.invalid_packets += invalid_packets; self.invalid_packets += invalid_packets;
self.proccessing_errors += proccessing_errors; self.proccessing_errors += proccessing_errors;
self.info_requests += info_requests; self.info_requests += info_requests;
self.ping_requests += ping_requests;
self.sent_responses += sent_responses; self.sent_responses += sent_responses;
self.failed_responses += failed_responses; self.failed_responses += failed_responses;
self.timed_out_responses += timed_out_responses; self.timed_out_responses += timed_out_responses;
@ -273,5 +298,7 @@ impl std::ops::AddAssign for Metrics {
impl Metrics { impl Metrics {
/// Resets all metrics to 0 and returns previous ones /// Resets all metrics to 0 and returns previous ones
///
/// Used by the consumer of the metrics.
pub fn reset(&mut self) -> Self { std::mem::take(self) } pub fn reset(&mut self) -> Self { std::mem::take(self) }
} }

View File

@ -22,6 +22,10 @@ lazy_static::lazy_static! {
static ref GIT_DATETIME: &'static str = GIT_VERSION.split('/').nth(1).expect("failed to retrieve git_datetime!"); static ref GIT_DATETIME: &'static str = GIT_VERSION.split('/').nth(1).expect("failed to retrieve git_datetime!");
pub static ref GIT_DATE: String = GIT_DATETIME.split('-').take(3).collect::<Vec<&str>>().join("-"); pub static ref GIT_DATE: String = GIT_DATETIME.split('-').take(3).collect::<Vec<&str>>().join("-");
pub static ref GIT_TIME: &'static str = GIT_DATETIME.split('-').nth(3).expect("failed to retrieve git_time!"); pub static ref GIT_TIME: &'static str = GIT_DATETIME.split('-').nth(3).expect("failed to retrieve git_time!");
pub static ref GIT_DATE_TIMESTAMP: i64 =
NaiveDateTime::parse_from_str(dbg!(&*GIT_DATETIME), "%Y-%m-%d-%H:%M")
.expect("Invalid date")
.and_utc().timestamp();
pub static ref DISPLAY_VERSION: String = if GIT_TAG.is_empty() { pub static ref DISPLAY_VERSION: String = if GIT_TAG.is_empty() {
format!("{}-{}", VELOREN_VERSION_STAGE, *GIT_DATE) format!("{}-{}", VELOREN_VERSION_STAGE, *GIT_DATE)
} else { } else {
@ -34,6 +38,7 @@ lazy_static::lazy_static! {
}; };
} }
use chrono::NaiveDateTime;
pub use color::*; pub use color::*;
pub use dir::*; pub use dir::*;
pub use grid_hasher::GridHasher; pub use grid_hasher::GridHasher;

View File

@ -93,6 +93,7 @@ use common::{
shared_server_config::ServerConstants, shared_server_config::ServerConstants,
slowjob::SlowJobPool, slowjob::SlowJobPool,
terrain::TerrainChunk, terrain::TerrainChunk,
util::GIT_DATE_TIMESTAMP,
vol::RectRasterableVol, vol::RectRasterableVol,
}; };
use common_base::prof_span; use common_base::prof_span;
@ -116,7 +117,7 @@ use specs::{
use std::{ use std::{
i32, i32,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
sync::Arc, sync::{Arc, Mutex},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
#[cfg(not(feature = "worldgen"))] #[cfg(not(feature = "worldgen"))]
@ -603,20 +604,24 @@ impl Server {
if let Some(addr) = settings.query_address { if let Some(addr) = settings.query_address {
use veloren_query_server::proto::ServerInfo; use veloren_query_server::proto::ServerInfo;
const QUERY_SERVER_RATELIMIT: u16 = 120;
let (query_server_info_tx, query_server_info_rx) = let (query_server_info_tx, query_server_info_rx) =
tokio::sync::watch::channel(ServerInfo { tokio::sync::watch::channel(ServerInfo {
git_hash: *sys::server_info::GIT_HASH, git_hash: *sys::server_info::GIT_HASH,
git_version: *GIT_DATE_TIMESTAMP,
players_count: 0, players_count: 0,
player_cap: settings.max_players, player_cap: settings.max_players,
battlemode: settings.gameplay.battle_mode.into(), battlemode: settings.gameplay.battle_mode.into(),
}); });
let mut query_server = QueryServer::new(addr, query_server_info_rx); let mut query_server =
let query_server_metrics = Arc::new(tokio::sync::RwLock::new( QueryServer::new(addr, query_server_info_rx, QUERY_SERVER_RATELIMIT);
veloren_query_server::server::Metrics::default(), let query_server_metrics =
)); Arc::new(Mutex::new(veloren_query_server::server::Metrics::default()));
let query_server_metrics2 = Arc::clone(&query_server_metrics); let query_server_metrics2 = Arc::clone(&query_server_metrics);
runtime.spawn(async move { runtime.spawn(async move {
_ = query_server.run(query_server_metrics2).await; let err = query_server.run(query_server_metrics2).await.err();
error!(?err, "Query server stopped unexpectedly");
}); });
state.ecs_mut().insert(query_server_info_tx); state.ecs_mut().insert(query_server_info_tx);
state.ecs_mut().insert(query_server_metrics); state.ecs_mut().insert(query_server_metrics);

View File

@ -71,15 +71,16 @@ pub struct ServerEventMetrics {
} }
pub struct QueryServerMetrics { pub struct QueryServerMetrics {
pub received_packets: IntGauge, pub received_packets: IntCounter,
pub dropped_packets: IntGauge, pub dropped_packets: IntCounter,
pub invalid_packets: IntGauge, pub invalid_packets: IntCounter,
pub proccessing_errors: IntGauge, pub proccessing_errors: IntCounter,
pub info_requests: IntGauge, pub info_requests: IntCounter,
pub sent_responses: IntGauge, pub ping_requests: IntCounter,
pub failed_responses: IntGauge, pub sent_responses: IntCounter,
pub timed_out_responses: IntGauge, pub failed_responses: IntCounter,
pub ratelimited: IntGauge, pub timed_out_responses: IntCounter,
pub ratelimited: IntCounter,
} }
impl PhysicsMetrics { impl PhysicsMetrics {
@ -440,39 +441,43 @@ impl ServerEventMetrics {
impl QueryServerMetrics { impl QueryServerMetrics {
pub fn new(registry: &Registry) -> Result<Self, prometheus::Error> { pub fn new(registry: &Registry) -> Result<Self, prometheus::Error> {
let received_packets = IntGauge::with_opts(Opts::new( let received_packets = IntCounter::with_opts(Opts::new(
"query_server::received_packets", "query_server::received_packets",
"Total amount of received packets by the query server", "Total amount of received packets by the query server",
))?; ))?;
let dropped_packets = IntGauge::with_opts(Opts::new( let dropped_packets = IntCounter::with_opts(Opts::new(
"query_server::dropped_packets", "query_server::dropped_packets",
"Amount of dropped packets received by the query server (too short or invalid header)", "Amount of dropped packets received by the query server (too short or invalid header)",
))?; ))?;
let invalid_packets = IntGauge::with_opts(Opts::new( let invalid_packets = IntCounter::with_opts(Opts::new(
"query_server::invalid_packets", "query_server::invalid_packets",
"Amount of unparseable packets received by the query server", "Amount of unparseable packets received by the query server",
))?; ))?;
let proccessing_errors = IntGauge::with_opts(Opts::new( let proccessing_errors = IntCounter::with_opts(Opts::new(
"query_server::proccessing_errors", "query_server::proccessing_errors",
"Amount of errors that occured while processing a query server request", "Amount of errors that occured while processing a query server request",
))?; ))?;
let info_requests = IntGauge::with_opts(Opts::new( let info_requests = IntCounter::with_opts(Opts::new(
"query_server::info_requests", "query_server::info_requests",
"Amount of server info requests received by the query server", "Amount of server info requests received by the query server",
))?; ))?;
let sent_responses = IntGauge::with_opts(Opts::new( let ping_requests = IntCounter::with_opts(Opts::new(
"query_server::ping_requests",
"Amount of server ping requests received by the query server",
))?;
let sent_responses = IntCounter::with_opts(Opts::new(
"query_server::sent_responses", "query_server::sent_responses",
"Amount of responses sent by the query server", "Amount of responses sent by the query server",
))?; ))?;
let failed_responses = IntGauge::with_opts(Opts::new( let failed_responses = IntCounter::with_opts(Opts::new(
"query_server::failed_responses", "query_server::failed_responses",
"Amount of responses which failed to be sent by the query server", "Amount of responses which failed to be sent by the query server",
))?; ))?;
let timed_out_responses = IntGauge::with_opts(Opts::new( let timed_out_responses = IntCounter::with_opts(Opts::new(
"query_server::timed_out_responses", "query_server::timed_out_responses",
"Amount of responses which timed out", "Amount of responses which timed out",
))?; ))?;
let ratelimited = IntGauge::with_opts(Opts::new( let ratelimited = IntCounter::with_opts(Opts::new(
"query_server::ratelimited", "query_server::ratelimited",
"Ratelimited requests to the query server", "Ratelimited requests to the query server",
))?; ))?;
@ -482,6 +487,7 @@ impl QueryServerMetrics {
registry.register(Box::new(invalid_packets.clone()))?; registry.register(Box::new(invalid_packets.clone()))?;
registry.register(Box::new(proccessing_errors.clone()))?; registry.register(Box::new(proccessing_errors.clone()))?;
registry.register(Box::new(info_requests.clone()))?; registry.register(Box::new(info_requests.clone()))?;
registry.register(Box::new(ping_requests.clone()))?;
registry.register(Box::new(sent_responses.clone()))?; registry.register(Box::new(sent_responses.clone()))?;
registry.register(Box::new(failed_responses.clone()))?; registry.register(Box::new(failed_responses.clone()))?;
registry.register(Box::new(timed_out_responses.clone()))?; registry.register(Box::new(timed_out_responses.clone()))?;
@ -493,6 +499,7 @@ impl QueryServerMetrics {
invalid_packets, invalid_packets,
proccessing_errors, proccessing_errors,
info_requests, info_requests,
ping_requests,
sent_responses, sent_responses,
failed_responses, failed_responses,
timed_out_responses, timed_out_responses,
@ -508,20 +515,22 @@ impl QueryServerMetrics {
invalid_packets, invalid_packets,
proccessing_errors, proccessing_errors,
info_requests, info_requests,
ping_requests,
sent_responses, sent_responses,
failed_responses, failed_responses,
timed_out_responses, timed_out_responses,
ratelimited, ratelimited,
}: veloren_query_server::server::Metrics, }: veloren_query_server::server::Metrics,
) { ) {
self.received_packets.set(received_packets as i64); self.received_packets.inc_by(received_packets as u64);
self.dropped_packets.set(dropped_packets as i64); self.dropped_packets.inc_by(dropped_packets as u64);
self.invalid_packets.set(invalid_packets as i64); self.invalid_packets.inc_by(invalid_packets as u64);
self.proccessing_errors.set(proccessing_errors as i64); self.proccessing_errors.inc_by(proccessing_errors as u64);
self.info_requests.set(info_requests as i64); self.info_requests.inc_by(info_requests as u64);
self.sent_responses.set(sent_responses as i64); self.ping_requests.inc_by(ping_requests as u64);
self.failed_responses.set(failed_responses as i64); self.sent_responses.inc_by(sent_responses as u64);
self.timed_out_responses.set(timed_out_responses as i64); self.failed_responses.inc_by(failed_responses as u64);
self.ratelimited.set(ratelimited as i64); self.timed_out_responses.inc_by(timed_out_responses as u64);
self.ratelimited.inc_by(ratelimited as u64);
} }
} }

View File

@ -6,8 +6,10 @@ use crate::{
use common::{resources::TimeOfDay, slowjob::SlowJobPool, terrain::TerrainGrid}; use common::{resources::TimeOfDay, slowjob::SlowJobPool, terrain::TerrainGrid};
use common_ecs::{Job, Origin, Phase, SysMetrics, System}; use common_ecs::{Job, Origin, Phase, SysMetrics, System};
use specs::{Entities, Join, Read, ReadExpect}; use specs::{Entities, Join, Read, ReadExpect};
use std::{sync::Arc, time::Instant}; use std::{
use tokio::sync::RwLock; sync::{Arc, Mutex},
time::Instant,
};
use veloren_query_server::server::Metrics as RawQueryServerMetrics; use veloren_query_server::server::Metrics as RawQueryServerMetrics;
/// This system exports metrics /// This system exports metrics
@ -29,8 +31,8 @@ impl<'a> System<'a> for Sys {
ReadExpect<'a, TickMetrics>, ReadExpect<'a, TickMetrics>,
ReadExpect<'a, PhysicsMetrics>, ReadExpect<'a, PhysicsMetrics>,
ReadExpect<'a, JobMetrics>, ReadExpect<'a, JobMetrics>,
Option<Read<'a, Arc<RwLock<RawQueryServerMetrics>>>>, Option<Read<'a, Arc<Mutex<RawQueryServerMetrics>>>>,
Option<ReadExpect<'a, QueryServerMetrics>>, ReadExpect<'a, QueryServerMetrics>,
); );
const NAME: &'static str = "metrics"; const NAME: &'static str = "metrics";
@ -172,10 +174,12 @@ impl<'a> System<'a> for Sys {
.with_label_values(&["metrics"]) .with_label_values(&["metrics"])
.observe(len as f64 / NANOSEC_PER_SEC); .observe(len as f64 / NANOSEC_PER_SEC);
if let (Some(query_server_metrics), Some(export_query_server)) = if let Some(Ok(metrics)) = raw_query_server
(raw_query_server, export_query_server) .as_ref()
// Hold the lock for the shortest time possible
.map(|m| m.lock().map(|mut metrics| metrics.reset()))
{ {
export_query_server.apply(query_server_metrics.blocking_write().reset()); export_query_server.apply(metrics);
} }
} }
} }

View File

@ -1,19 +1,18 @@
use common::{comp::Player, util::GIT_DATE_TIMESTAMP};
use common_ecs::{Origin, Phase, System}; use common_ecs::{Origin, Phase, System};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use specs::{Read, ReadStorage}; use specs::{Read, ReadStorage};
use tracing::error;
use veloren_query_server::proto::ServerInfo; use veloren_query_server::proto::ServerInfo;
use crate::{client::Client, Settings, Tick}; use crate::{Settings, Tick};
// Update the server stats every 60 ticks // Update the server stats every 60 ticks
const INFO_SEND_INTERVAL: u64 = 60; const INFO_SEND_INTERVAL: u64 = 60;
lazy_static! { lazy_static! {
pub static ref GIT_HASH: [char; 8] = common::util::GIT_HASH[..8] pub static ref GIT_HASH: u32 =
.chars() u32::from_str_radix(&common::util::GIT_HASH[..8], 16).expect("Invalid git hash");
.collect::<Vec<_>>()
.try_into()
.unwrap_or_default();
} }
#[derive(Default)] #[derive(Default)]
@ -24,24 +23,27 @@ impl<'a> System<'a> for Sys {
Read<'a, Tick>, Read<'a, Tick>,
Read<'a, Settings>, Read<'a, Settings>,
Option<Read<'a, tokio::sync::watch::Sender<ServerInfo>>>, Option<Read<'a, tokio::sync::watch::Sender<ServerInfo>>>,
ReadStorage<'a, Client>, ReadStorage<'a, Player>,
); );
const NAME: &'static str = "server_info"; const NAME: &'static str = "server_info";
const ORIGIN: Origin = Origin::Server; const ORIGIN: Origin = Origin::Server;
const PHASE: Phase = Phase::Create; const PHASE: Phase = Phase::Create;
fn run(_job: &mut common_ecs::Job<Self>, (tick, settings, sender, clients): Self::SystemData) { fn run(_job: &mut common_ecs::Job<Self>, (tick, settings, sender, players): Self::SystemData) {
if let Some(sender) = sender.as_ref() if let Some(sender) = sender.as_ref()
&& tick.0 % INFO_SEND_INTERVAL == 0 && tick.0 % INFO_SEND_INTERVAL == 0
{ {
let count = clients.count().try_into().unwrap_or(u16::MAX); let count = players.count().try_into().unwrap_or(u16::MAX);
_ = sender.send(ServerInfo { if let Err(error) = sender.send(ServerInfo {
git_hash: *GIT_HASH, git_hash: *GIT_HASH,
git_version: *GIT_DATE_TIMESTAMP,
players_count: count, players_count: count,
player_cap: settings.max_players, player_cap: settings.max_players,
battlemode: settings.gameplay.battle_mode.into(), battlemode: settings.gameplay.battle_mode.into(),
}); }) {
error!(?error, "Failed to send server info to the query server");
}
} }
} }
} }