query server crate

This commit is contained in:
crabman 2024-04-25 12:01:54 +00:00
parent 0b9c2621f7
commit 93ad288193
No known key found for this signature in database
8 changed files with 465 additions and 0 deletions

43
Cargo.lock generated
View File

@ -2025,6 +2025,16 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "error-chain"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
dependencies = [
"backtrace",
"version_check",
]
[[package]] [[package]]
name = "error-code" name = "error-code"
version = "2.3.1" version = "2.3.1"
@ -4795,6 +4805,29 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "protocol"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13cfa9ba37e0183f87fb14b82f23fc76494c458c72469d95b8a8eec75ad5f191"
dependencies = [
"byteorder",
"error-chain",
"num-traits",
"protocol-derive",
]
[[package]]
name = "protocol-derive"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28647f30298898ead966b51e9aee5c74e4ac709ce5ca554378fde187fd3f7e47"
dependencies = [
"proc-macro2 1.0.79",
"quote 1.0.35",
"syn 1.0.109",
]
[[package]] [[package]]
name = "psm" name = "psm"
version = "0.1.21" version = "0.1.21"
@ -7191,6 +7224,16 @@ dependencies = [
"veloren-world", "veloren-world",
] ]
[[package]]
name = "veloren-server-query"
version = "0.1.0"
dependencies = [
"protocol",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]] [[package]]
name = "veloren-voxygen" name = "veloren-voxygen"
version = "0.16.0" version = "0.16.0"

View File

@ -13,6 +13,7 @@ members = [
"common/state", "common/state",
"common/systems", "common/systems",
"common/frontend", "common/frontend",
"common/query_server",
"client", "client",
"client/i18n", "client/i18n",
"rtsim", "rtsim",

View File

@ -0,0 +1,19 @@
[package]
name = "veloren-server-query"
version = "0.1.0"
authors = ["crabman <vlrncrabman+veloren@gmail.com>", "XVar <atomyc@gmail.com>"]
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
server = ["dep:tokio"]
client = ["dep:tokio", "tokio/time"]
example = ["tokio/macros", "tokio/rt-multi-thread", "dep:tracing-subscriber"]
default = ["server", "client"]
[dependencies]
tokio = { workspace = true, optional = true, features = ["net", "sync"] }
protocol = { version = "3.4.0", default-features = false, features = ["derive"] }
tracing-subscriber = { version = "0.3.7", optional = true }
tracing = { workspace = true }

View File

@ -0,0 +1,48 @@
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
time::Instant,
};
use tokio::sync::{watch, RwLock};
use veloren_server_query::{
client::QueryClient,
proto::{ServerBattleMode, ServerInfo},
server::{Metrics, QueryServer},
};
const DEFAULT_SERVER_INFO: ServerInfo = ServerInfo {
git_hash: ['\0'; 10],
players_count: 100,
player_cap: 300,
battlemode: ServerBattleMode::GlobalPvE,
};
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 14006);
let (_sender, receiver) = watch::channel(DEFAULT_SERVER_INFO);
let mut server = QueryServer::new(addr, receiver);
let metrics = Arc::new(RwLock::new(Metrics::default()));
let metrics2 = Arc::clone(&metrics);
tokio::task::spawn(async move { server.run(metrics2).await.unwrap() });
let client = QueryClient { addr };
let ping = client.ping().await.unwrap();
let info = client.server_info().await.unwrap();
println!("Ping = {}ms", ping.as_millis());
println!("Server info: {info:?}");
println!("Metrics = {:#?}", metrics.read().await);
assert_eq!(info, DEFAULT_SERVER_INFO);
let start = Instant::now();
for _i in 0..10000 {
client.ping().await.unwrap();
}
dbg!(start.elapsed());
}

View File

@ -0,0 +1,79 @@
use std::{
io,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
time::{Duration, Instant},
};
use tokio::{net::UdpSocket, time::timeout};
use crate::proto::{Ping, QueryServerRequest, QueryServerResponse, ServerInfo, VELOREN_HEADER};
#[derive(Debug)]
pub enum QueryClientError {
Io(tokio::io::Error),
Protocol(protocol::Error),
InvalidResponse,
Timeout,
}
pub struct QueryClient {
pub addr: SocketAddr,
}
impl QueryClient {
pub async fn server_info(&self) -> Result<ServerInfo, QueryClientError> {
self.send_query(QueryServerRequest::ServerInfo(Default::default()))
.await
.and_then(|(response, _)| {
if let QueryServerResponse::ServerInfo(info) = response {
Ok(info)
} else {
Err(QueryClientError::InvalidResponse)
}
})
}
pub async fn ping(&self) -> Result<Duration, QueryClientError> {
self.send_query(QueryServerRequest::Ping(Ping))
.await
.map(|(_, elapsed)| elapsed)
}
async fn send_query(
&self,
request: QueryServerRequest,
) -> Result<(QueryServerResponse, Duration), QueryClientError> {
let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?;
let mut pipeline = crate::create_pipeline();
let mut buf = VELOREN_HEADER.to_vec();
let mut cursor = io::Cursor::new(&mut buf);
cursor.set_position(VELOREN_HEADER.len() as u64);
pipeline.send_to(&mut cursor, &request)?;
let query_sent = Instant::now();
socket.send_to(buf.as_slice(), self.addr).await?;
let mut buf = vec![0; 1500];
let _ = timeout(Duration::from_secs(2), socket.recv_from(&mut buf))
.await
.map_err(|_| QueryClientError::Timeout)?
.map_err(|_| QueryClientError::Timeout)?;
let mut pipeline = crate::create_pipeline();
let packet: QueryServerResponse = pipeline.receive_from(&mut io::Cursor::new(&mut buf))?;
Ok((packet, query_sent.elapsed()))
}
}
impl From<tokio::io::Error> for QueryClientError {
fn from(value: tokio::io::Error) -> Self { Self::Io(value) }
}
impl From<protocol::Error> for QueryClientError {
fn from(value: protocol::Error) -> Self { Self::Protocol(value) }
}

View File

@ -0,0 +1,15 @@
use protocol::{
wire::{self, dgram},
Parcel,
};
#[cfg(feature = "client")] pub mod client;
pub mod proto;
#[cfg(feature = "server")] pub mod server;
fn create_pipeline<T: Parcel>() -> dgram::Pipeline<T, wire::middleware::pipeline::Default> {
dgram::Pipeline::new(
wire::middleware::pipeline::default(),
protocol::Settings::default(),
)
}

View File

@ -0,0 +1,60 @@
use protocol::Protocol;
pub const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n'];
#[derive(Protocol, Debug, Clone, Copy)]
pub struct Ping;
#[derive(Protocol, Debug, Clone, Copy)]
pub struct Pong;
#[derive(Protocol, Debug, Clone, Copy)]
#[protocol(discriminant = "integer")]
#[protocol(discriminator(u8))]
#[allow(clippy::large_enum_variant)]
pub enum QueryServerRequest {
Ping(Ping),
ServerInfo(ServerInfoRequest),
// New requests should be added at the end to prevent breakage
}
#[derive(Protocol, Debug, Clone, Copy)]
#[protocol(discriminant = "integer")]
#[protocol(discriminator(u8))]
pub enum QueryServerResponse {
Pong(Pong),
ServerInfo(ServerInfo),
// New responses should be added at the end to prevent breakage
}
#[derive(Protocol, Debug, Clone, Copy)]
pub struct ServerInfoRequest {
// Padding to prevent amplification attacks
pub _padding: [u8; 256],
}
#[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)]
pub struct ServerInfo {
pub git_hash: [char; 10],
pub players_count: u16,
pub player_cap: u16,
pub battlemode: ServerBattleMode,
}
#[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)]
#[protocol(discriminant = "integer")]
#[protocol(discriminator(u8))]
#[repr(u8)]
pub enum ServerBattleMode {
GlobalPvP,
GlobalPvE,
PerPlayer,
}
impl Default for ServerInfoRequest {
fn default() -> Self { ServerInfoRequest { _padding: [0; 256] } }
}
impl ServerInfo {
pub fn git_hash(&self) -> String { String::from_iter(&self.git_hash) }
}

View File

@ -0,0 +1,200 @@
use std::{
io,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
time::Duration,
};
use protocol::wire::{self, dgram};
use tokio::{
net::UdpSocket,
sync::{watch, RwLock},
time::timeout,
};
use tracing::{debug, trace};
use crate::proto::{
Ping, Pong, QueryServerRequest, QueryServerResponse, ServerInfo, VELOREN_HEADER,
};
const RESPONSE_SEND_TIMEOUT: Duration = Duration::from_secs(2);
pub struct QueryServer {
pub addr: SocketAddr,
server_info: watch::Receiver<ServerInfo>,
pipeline: dgram::Pipeline<QueryServerRequest, wire::middleware::pipeline::Default>,
}
#[derive(Default, Clone, Copy, Debug)]
pub struct Metrics {
pub received_packets: u32,
pub dropped_packets: u32,
pub invalid_packets: u32,
pub proccessing_errors: u32,
pub ping_requests: u32,
pub info_requests: u32,
pub sent_responses: u32,
pub failed_responses: u32,
pub timed_out_responses: u32,
}
impl QueryServer {
pub fn new(addr: SocketAddr, server_info: watch::Receiver<ServerInfo>) -> Self {
Self {
addr,
server_info,
pipeline: crate::create_pipeline(),
}
}
pub async fn run(&mut self, metrics: Arc<RwLock<Metrics>>) -> Result<(), tokio::io::Error> {
let socket = UdpSocket::bind(self.addr).await?;
let mut buf = Box::new([0; 1024]);
loop {
let Ok((len, remote_addr)) = socket.recv_from(&mut *buf).await.inspect_err(|err| {
debug!("Error while receiving from query server socket: {err:?}")
}) else {
continue;
};
let mut new_metrics = Metrics {
received_packets: 1,
..Default::default()
};
let raw_msg_buf = &buf[..len];
let msg_buf = if Self::validate_datagram(raw_msg_buf) {
&raw_msg_buf[VELOREN_HEADER.len()..]
} else {
new_metrics.dropped_packets += 1;
continue;
};
if let Err(error) = self
.process_datagram(msg_buf, remote_addr, &mut new_metrics)
.await
{
debug!(?error, "Error while processing datagram");
}
*buf = [0; 1024];
// Update metrics at the end of eath packet
let mut metrics = metrics.write().await;
*metrics += new_metrics;
}
}
// Header must be discarded after this validation passes
fn validate_datagram(data: &[u8]) -> bool {
let len = data.len();
if len < VELOREN_HEADER.len() + 1 {
trace!(?len, "Datagram too short");
false
} else if data[0..VELOREN_HEADER.len()] != VELOREN_HEADER {
trace!(?len, "Datagram header invalid");
false
} else {
true
}
}
async fn process_datagram(
&mut self,
datagram: &[u8],
remote: SocketAddr,
metrics: &mut Metrics,
) -> Result<(), tokio::io::Error> {
let Ok(packet): Result<QueryServerRequest, _> =
self.pipeline.receive_from(&mut io::Cursor::new(datagram))
else {
metrics.invalid_packets += 1;
return Ok(());
};
trace!(?packet, "Received packet");
match packet {
QueryServerRequest::Ping(Ping) => {
metrics.ping_requests += 1;
tokio::task::spawn(async move {
_ = timeout(
RESPONSE_SEND_TIMEOUT,
Self::send_response(QueryServerResponse::Pong(Pong), remote),
)
.await;
});
},
QueryServerRequest::ServerInfo(_) => {
metrics.info_requests += 1;
let server_info = *self.server_info.borrow();
tokio::task::spawn(async move {
_ = timeout(
RESPONSE_SEND_TIMEOUT,
Self::send_response(QueryServerResponse::ServerInfo(server_info), remote),
)
.await;
});
},
}
Ok(())
}
async fn send_response(response: QueryServerResponse, addr: SocketAddr) {
let Ok(socket) =
UdpSocket::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))).await
else {
debug!("Failed to create response socket");
return;
};
let mut buf = Vec::new();
let mut pipeline = crate::create_pipeline();
_ = pipeline.send_to(&mut io::Cursor::new(&mut buf), &response);
match socket.send_to(&buf, addr).await {
Ok(_) => {
// TODO: Sent responses metric
},
Err(err) => {
// TODO: Failed response metric
debug!(?err, "Failed to send query server response");
},
}
}
}
impl std::ops::AddAssign for Metrics {
fn add_assign(
&mut self,
Self {
received_packets,
dropped_packets,
invalid_packets,
proccessing_errors,
ping_requests,
info_requests,
sent_responses,
failed_responses,
timed_out_responses,
}: Self,
) {
self.received_packets += received_packets;
self.dropped_packets += dropped_packets;
self.invalid_packets += invalid_packets;
self.proccessing_errors += proccessing_errors;
self.ping_requests += ping_requests;
self.info_requests += info_requests;
self.sent_responses += sent_responses;
self.failed_responses += failed_responses;
self.timed_out_responses += timed_out_responses;
}
}
impl Metrics {
/// Resets all metrics to 0
pub fn reset(&mut self) { *self = Self::default(); }
}