From 93ad288193bad0f324fc7e1e3f1ab150d72d20f2 Mon Sep 17 00:00:00 2001 From: crabman Date: Thu, 25 Apr 2024 12:01:54 +0000 Subject: [PATCH] query server crate --- Cargo.lock | 43 ++++++ Cargo.toml | 1 + common/query_server/Cargo.toml | 19 +++ common/query_server/examples/demo.rs | 48 +++++++ common/query_server/src/client.rs | 79 +++++++++++ common/query_server/src/lib.rs | 15 ++ common/query_server/src/proto.rs | 60 ++++++++ common/query_server/src/server.rs | 200 +++++++++++++++++++++++++++ 8 files changed, 465 insertions(+) create mode 100644 common/query_server/Cargo.toml create mode 100644 common/query_server/examples/demo.rs create mode 100644 common/query_server/src/client.rs create mode 100644 common/query_server/src/lib.rs create mode 100644 common/query_server/src/proto.rs create mode 100644 common/query_server/src/server.rs diff --git a/Cargo.lock b/Cargo.lock index a092516c31..296b1a7f49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2025,6 +2025,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "backtrace", + "version_check", +] + [[package]] name = "error-code" version = "2.3.1" @@ -4795,6 +4805,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "protocol" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13cfa9ba37e0183f87fb14b82f23fc76494c458c72469d95b8a8eec75ad5f191" +dependencies = [ + "byteorder", + "error-chain", + "num-traits", + "protocol-derive", +] + +[[package]] +name = "protocol-derive" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28647f30298898ead966b51e9aee5c74e4ac709ce5ca554378fde187fd3f7e47" +dependencies = [ + "proc-macro2 1.0.79", + "quote 1.0.35", + "syn 1.0.109", +] + [[package]] name = "psm" version = "0.1.21" @@ -7191,6 +7224,16 @@ dependencies = [ "veloren-world", ] +[[package]] +name = "veloren-server-query" +version = "0.1.0" +dependencies = [ + "protocol", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "veloren-voxygen" version = "0.16.0" diff --git a/Cargo.toml b/Cargo.toml index c8d02902e1..89e31e25fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "common/state", "common/systems", "common/frontend", + "common/query_server", "client", "client/i18n", "rtsim", diff --git a/common/query_server/Cargo.toml b/common/query_server/Cargo.toml new file mode 100644 index 0000000000..271cbe7f9d --- /dev/null +++ b/common/query_server/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "veloren-server-query" +version = "0.1.0" +authors = ["crabman ", "XVar "] +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[features] +server = ["dep:tokio"] +client = ["dep:tokio", "tokio/time"] +example = ["tokio/macros", "tokio/rt-multi-thread", "dep:tracing-subscriber"] +default = ["server", "client"] + +[dependencies] +tokio = { workspace = true, optional = true, features = ["net", "sync"] } +protocol = { version = "3.4.0", default-features = false, features = ["derive"] } +tracing-subscriber = { version = "0.3.7", optional = true } +tracing = { workspace = true } diff --git a/common/query_server/examples/demo.rs b/common/query_server/examples/demo.rs new file mode 100644 index 0000000000..6caeb0a6b0 --- /dev/null +++ b/common/query_server/examples/demo.rs @@ -0,0 +1,48 @@ +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::Arc, + time::Instant, +}; + +use tokio::sync::{watch, RwLock}; +use veloren_server_query::{ + client::QueryClient, + proto::{ServerBattleMode, ServerInfo}, + server::{Metrics, QueryServer}, +}; + +const DEFAULT_SERVER_INFO: ServerInfo = ServerInfo { + git_hash: ['\0'; 10], + players_count: 100, + player_cap: 300, + battlemode: ServerBattleMode::GlobalPvE, +}; + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 14006); + let (_sender, receiver) = watch::channel(DEFAULT_SERVER_INFO); + let mut server = QueryServer::new(addr, receiver); + let metrics = Arc::new(RwLock::new(Metrics::default())); + let metrics2 = Arc::clone(&metrics); + + tokio::task::spawn(async move { server.run(metrics2).await.unwrap() }); + + let client = QueryClient { addr }; + let ping = client.ping().await.unwrap(); + let info = client.server_info().await.unwrap(); + + println!("Ping = {}ms", ping.as_millis()); + println!("Server info: {info:?}"); + println!("Metrics = {:#?}", metrics.read().await); + assert_eq!(info, DEFAULT_SERVER_INFO); + + let start = Instant::now(); + + for _i in 0..10000 { + client.ping().await.unwrap(); + } + + dbg!(start.elapsed()); +} diff --git a/common/query_server/src/client.rs b/common/query_server/src/client.rs new file mode 100644 index 0000000000..17d3d993f7 --- /dev/null +++ b/common/query_server/src/client.rs @@ -0,0 +1,79 @@ +use std::{ + io, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + time::{Duration, Instant}, +}; + +use tokio::{net::UdpSocket, time::timeout}; + +use crate::proto::{Ping, QueryServerRequest, QueryServerResponse, ServerInfo, VELOREN_HEADER}; + +#[derive(Debug)] +pub enum QueryClientError { + Io(tokio::io::Error), + Protocol(protocol::Error), + InvalidResponse, + Timeout, +} + +pub struct QueryClient { + pub addr: SocketAddr, +} + +impl QueryClient { + pub async fn server_info(&self) -> Result { + self.send_query(QueryServerRequest::ServerInfo(Default::default())) + .await + .and_then(|(response, _)| { + if let QueryServerResponse::ServerInfo(info) = response { + Ok(info) + } else { + Err(QueryClientError::InvalidResponse) + } + }) + } + + pub async fn ping(&self) -> Result { + self.send_query(QueryServerRequest::Ping(Ping)) + .await + .map(|(_, elapsed)| elapsed) + } + + async fn send_query( + &self, + request: QueryServerRequest, + ) -> Result<(QueryServerResponse, Duration), QueryClientError> { + let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?; + + let mut pipeline = crate::create_pipeline(); + + let mut buf = VELOREN_HEADER.to_vec(); + + let mut cursor = io::Cursor::new(&mut buf); + cursor.set_position(VELOREN_HEADER.len() as u64); + pipeline.send_to(&mut cursor, &request)?; + + let query_sent = Instant::now(); + socket.send_to(buf.as_slice(), self.addr).await?; + + let mut buf = vec![0; 1500]; + let _ = timeout(Duration::from_secs(2), socket.recv_from(&mut buf)) + .await + .map_err(|_| QueryClientError::Timeout)? + .map_err(|_| QueryClientError::Timeout)?; + + let mut pipeline = crate::create_pipeline(); + + let packet: QueryServerResponse = pipeline.receive_from(&mut io::Cursor::new(&mut buf))?; + + Ok((packet, query_sent.elapsed())) + } +} + +impl From for QueryClientError { + fn from(value: tokio::io::Error) -> Self { Self::Io(value) } +} + +impl From for QueryClientError { + fn from(value: protocol::Error) -> Self { Self::Protocol(value) } +} diff --git a/common/query_server/src/lib.rs b/common/query_server/src/lib.rs new file mode 100644 index 0000000000..06a64aa014 --- /dev/null +++ b/common/query_server/src/lib.rs @@ -0,0 +1,15 @@ +use protocol::{ + wire::{self, dgram}, + Parcel, +}; + +#[cfg(feature = "client")] pub mod client; +pub mod proto; +#[cfg(feature = "server")] pub mod server; + +fn create_pipeline() -> dgram::Pipeline { + dgram::Pipeline::new( + wire::middleware::pipeline::default(), + protocol::Settings::default(), + ) +} diff --git a/common/query_server/src/proto.rs b/common/query_server/src/proto.rs new file mode 100644 index 0000000000..e12850409f --- /dev/null +++ b/common/query_server/src/proto.rs @@ -0,0 +1,60 @@ +use protocol::Protocol; + +pub const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n']; + +#[derive(Protocol, Debug, Clone, Copy)] +pub struct Ping; + +#[derive(Protocol, Debug, Clone, Copy)] +pub struct Pong; + +#[derive(Protocol, Debug, Clone, Copy)] +#[protocol(discriminant = "integer")] +#[protocol(discriminator(u8))] +#[allow(clippy::large_enum_variant)] +pub enum QueryServerRequest { + Ping(Ping), + ServerInfo(ServerInfoRequest), + // New requests should be added at the end to prevent breakage +} + +#[derive(Protocol, Debug, Clone, Copy)] +#[protocol(discriminant = "integer")] +#[protocol(discriminator(u8))] +pub enum QueryServerResponse { + Pong(Pong), + ServerInfo(ServerInfo), + // New responses should be added at the end to prevent breakage +} + +#[derive(Protocol, Debug, Clone, Copy)] +pub struct ServerInfoRequest { + // Padding to prevent amplification attacks + pub _padding: [u8; 256], +} + +#[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)] +pub struct ServerInfo { + pub git_hash: [char; 10], + pub players_count: u16, + pub player_cap: u16, + pub battlemode: ServerBattleMode, +} + +#[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)] +#[protocol(discriminant = "integer")] +#[protocol(discriminator(u8))] +#[repr(u8)] +pub enum ServerBattleMode { + GlobalPvP, + GlobalPvE, + PerPlayer, +} + +impl Default for ServerInfoRequest { + fn default() -> Self { ServerInfoRequest { _padding: [0; 256] } } +} + +impl ServerInfo { + pub fn git_hash(&self) -> String { String::from_iter(&self.git_hash) } +} diff --git a/common/query_server/src/server.rs b/common/query_server/src/server.rs new file mode 100644 index 0000000000..9901eb9da6 --- /dev/null +++ b/common/query_server/src/server.rs @@ -0,0 +1,200 @@ +use std::{ + io, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + sync::Arc, + time::Duration, +}; + +use protocol::wire::{self, dgram}; +use tokio::{ + net::UdpSocket, + sync::{watch, RwLock}, + time::timeout, +}; +use tracing::{debug, trace}; + +use crate::proto::{ + Ping, Pong, QueryServerRequest, QueryServerResponse, ServerInfo, VELOREN_HEADER, +}; + +const RESPONSE_SEND_TIMEOUT: Duration = Duration::from_secs(2); + +pub struct QueryServer { + pub addr: SocketAddr, + server_info: watch::Receiver, + pipeline: dgram::Pipeline, +} + +#[derive(Default, Clone, Copy, Debug)] +pub struct Metrics { + pub received_packets: u32, + pub dropped_packets: u32, + pub invalid_packets: u32, + pub proccessing_errors: u32, + pub ping_requests: u32, + pub info_requests: u32, + pub sent_responses: u32, + pub failed_responses: u32, + pub timed_out_responses: u32, +} + +impl QueryServer { + pub fn new(addr: SocketAddr, server_info: watch::Receiver) -> Self { + Self { + addr, + server_info, + pipeline: crate::create_pipeline(), + } + } + + pub async fn run(&mut self, metrics: Arc>) -> Result<(), tokio::io::Error> { + let socket = UdpSocket::bind(self.addr).await?; + + let mut buf = Box::new([0; 1024]); + loop { + let Ok((len, remote_addr)) = socket.recv_from(&mut *buf).await.inspect_err(|err| { + debug!("Error while receiving from query server socket: {err:?}") + }) else { + continue; + }; + + let mut new_metrics = Metrics { + received_packets: 1, + ..Default::default() + }; + + let raw_msg_buf = &buf[..len]; + let msg_buf = if Self::validate_datagram(raw_msg_buf) { + &raw_msg_buf[VELOREN_HEADER.len()..] + } else { + new_metrics.dropped_packets += 1; + continue; + }; + + if let Err(error) = self + .process_datagram(msg_buf, remote_addr, &mut new_metrics) + .await + { + debug!(?error, "Error while processing datagram"); + } + + *buf = [0; 1024]; + + // Update metrics at the end of eath packet + let mut metrics = metrics.write().await; + *metrics += new_metrics; + } + } + + // Header must be discarded after this validation passes + fn validate_datagram(data: &[u8]) -> bool { + let len = data.len(); + if len < VELOREN_HEADER.len() + 1 { + trace!(?len, "Datagram too short"); + false + } else if data[0..VELOREN_HEADER.len()] != VELOREN_HEADER { + trace!(?len, "Datagram header invalid"); + false + } else { + true + } + } + + async fn process_datagram( + &mut self, + datagram: &[u8], + remote: SocketAddr, + metrics: &mut Metrics, + ) -> Result<(), tokio::io::Error> { + let Ok(packet): Result = + self.pipeline.receive_from(&mut io::Cursor::new(datagram)) + else { + metrics.invalid_packets += 1; + return Ok(()); + }; + + trace!(?packet, "Received packet"); + + match packet { + QueryServerRequest::Ping(Ping) => { + metrics.ping_requests += 1; + tokio::task::spawn(async move { + _ = timeout( + RESPONSE_SEND_TIMEOUT, + Self::send_response(QueryServerResponse::Pong(Pong), remote), + ) + .await; + }); + }, + QueryServerRequest::ServerInfo(_) => { + metrics.info_requests += 1; + let server_info = *self.server_info.borrow(); + tokio::task::spawn(async move { + _ = timeout( + RESPONSE_SEND_TIMEOUT, + Self::send_response(QueryServerResponse::ServerInfo(server_info), remote), + ) + .await; + }); + }, + } + + Ok(()) + } + + async fn send_response(response: QueryServerResponse, addr: SocketAddr) { + let Ok(socket) = + UdpSocket::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))).await + else { + debug!("Failed to create response socket"); + return; + }; + + let mut buf = Vec::new(); + + let mut pipeline = crate::create_pipeline(); + + _ = pipeline.send_to(&mut io::Cursor::new(&mut buf), &response); + match socket.send_to(&buf, addr).await { + Ok(_) => { + // TODO: Sent responses metric + }, + Err(err) => { + // TODO: Failed response metric + debug!(?err, "Failed to send query server response"); + }, + } + } +} + +impl std::ops::AddAssign for Metrics { + fn add_assign( + &mut self, + Self { + received_packets, + dropped_packets, + invalid_packets, + proccessing_errors, + ping_requests, + info_requests, + sent_responses, + failed_responses, + timed_out_responses, + }: Self, + ) { + self.received_packets += received_packets; + self.dropped_packets += dropped_packets; + self.invalid_packets += invalid_packets; + self.proccessing_errors += proccessing_errors; + self.ping_requests += ping_requests; + self.info_requests += info_requests; + self.sent_responses += sent_responses; + self.failed_responses += failed_responses; + self.timed_out_responses += timed_out_responses; + } +} + +impl Metrics { + /// Resets all metrics to 0 + pub fn reset(&mut self) { *self = Self::default(); } +}