mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
Merge branch 'crabman/server_query' into 'master'
Query server See merge request veloren/veloren!4440
This commit is contained in:
commit
809695880e
@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- Added Abyssal rings
|
||||
- Sprite models can now change depending on sprite attributes.
|
||||
- Dwarven-Mine update and activation.
|
||||
- Protocol to query game server information (player count, version, etc.) and make ping tests.
|
||||
|
||||
### Changed
|
||||
|
||||
|
55
Cargo.lock
generated
55
Cargo.lock
generated
@ -2025,6 +2025,16 @@ dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "error-chain"
|
||||
version = "0.12.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "error-code"
|
||||
version = "2.3.1"
|
||||
@ -2679,9 +2689,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.3.25"
|
||||
version = "0.3.26"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4fbd2820c5e49886948654ab546d0688ff24530286bdcf8fca3cefb16d4618eb"
|
||||
checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fnv",
|
||||
@ -4795,6 +4805,29 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protocol"
|
||||
version = "3.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "13cfa9ba37e0183f87fb14b82f23fc76494c458c72469d95b8a8eec75ad5f191"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"error-chain",
|
||||
"num-traits",
|
||||
"protocol-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protocol-derive"
|
||||
version = "3.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "28647f30298898ead966b51e9aee5c74e4ac709ce5ca554378fde187fd3f7e47"
|
||||
dependencies = [
|
||||
"proc-macro2 1.0.79",
|
||||
"quote 1.0.35",
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "psm"
|
||||
version = "0.1.21"
|
||||
@ -5371,9 +5404,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.21.10"
|
||||
version = "0.21.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba"
|
||||
checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e"
|
||||
dependencies = [
|
||||
"log",
|
||||
"ring 0.17.8",
|
||||
@ -6618,7 +6651,7 @@ version = "1.6.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"cfg-if 0.1.10",
|
||||
"rand 0.8.5",
|
||||
"static_assertions",
|
||||
]
|
||||
@ -7070,6 +7103,17 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "veloren-query-server"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"protocol",
|
||||
"rand 0.8.5",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "veloren-rtsim"
|
||||
version = "0.10.0"
|
||||
@ -7138,6 +7182,7 @@ dependencies = [
|
||||
"veloren-common-state",
|
||||
"veloren-common-systems",
|
||||
"veloren-network",
|
||||
"veloren-query-server",
|
||||
"veloren-rtsim",
|
||||
"veloren-server-agent",
|
||||
"veloren-world",
|
||||
|
@ -13,6 +13,7 @@ members = [
|
||||
"common/state",
|
||||
"common/systems",
|
||||
"common/frontend",
|
||||
"common/query_server",
|
||||
"client",
|
||||
"client/i18n",
|
||||
"rtsim",
|
||||
|
@ -75,6 +75,7 @@ where
|
||||
"refinery_core::traits::divergent=off",
|
||||
"veloren_server::persistence::character=info",
|
||||
"veloren_server::settings=info",
|
||||
"veloren_query_server=info",
|
||||
];
|
||||
|
||||
for s in default_directives {
|
||||
|
24
common/query_server/Cargo.toml
Normal file
24
common/query_server/Cargo.toml
Normal file
@ -0,0 +1,24 @@
|
||||
[package]
|
||||
name = "veloren-query-server"
|
||||
version = "0.1.0"
|
||||
authors = ["crabman <vlrncrabman+veloren@gmail.com>", "XVar <atomyc@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[features]
|
||||
server = ["dep:rand"]
|
||||
client = ["tokio/time"]
|
||||
example = ["tokio/macros", "tokio/rt-multi-thread", "dep:tracing-subscriber", "client", "server"]
|
||||
default = []
|
||||
|
||||
[dependencies]
|
||||
tokio = { workspace = true, features = ["net", "sync"] }
|
||||
protocol = { version = "3.4.0", default-features = false, features = ["derive"] }
|
||||
tracing-subscriber = { version = "0.3.7", optional = true }
|
||||
tracing = { workspace = true }
|
||||
rand = { workspace = true, optional = true }
|
||||
|
||||
[[example]]
|
||||
name = "demo"
|
||||
required-features = ["example"]
|
51
common/query_server/examples/demo.rs
Normal file
51
common/query_server/examples/demo.rs
Normal file
@ -0,0 +1,51 @@
|
||||
use std::{
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||
sync::{Arc, Mutex},
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
use tokio::sync::watch;
|
||||
use tracing::error;
|
||||
use veloren_query_server::{
|
||||
client::QueryClient,
|
||||
proto::{ServerBattleMode, ServerInfo},
|
||||
server::{Metrics, QueryServer},
|
||||
};
|
||||
|
||||
const DEFAULT_SERVER_INFO: ServerInfo = ServerInfo {
|
||||
git_hash: 0,
|
||||
git_timestamp: 0,
|
||||
players_count: 100,
|
||||
player_cap: 300,
|
||||
battlemode: ServerBattleMode::GlobalPvE,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing_subscriber::fmt::init();
|
||||
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 14006);
|
||||
let (_sender, receiver) = watch::channel(DEFAULT_SERVER_INFO);
|
||||
let mut server = QueryServer::new(addr, receiver, 10002);
|
||||
let metrics = Arc::new(Mutex::new(Metrics::default()));
|
||||
let metrics2 = Arc::clone(&metrics);
|
||||
|
||||
tokio::task::spawn(async move { server.run(metrics2).await.unwrap() });
|
||||
|
||||
let mut client = QueryClient::new(addr);
|
||||
let (info, ping) = client.server_info().await.unwrap();
|
||||
|
||||
println!("Ping = {}ms", ping.as_millis());
|
||||
println!("Server info: {info:?}");
|
||||
assert_eq!(info, DEFAULT_SERVER_INFO);
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for _i in 0..10000 {
|
||||
if let Err(error) = client.server_info().await {
|
||||
error!(?error, "Server info request error");
|
||||
}
|
||||
}
|
||||
|
||||
println!("Metrics = {:#?}", metrics.lock().unwrap());
|
||||
dbg!(start.elapsed());
|
||||
}
|
116
common/query_server/src/client.rs
Normal file
116
common/query_server/src/client.rs
Normal file
@ -0,0 +1,116 @@
|
||||
use std::{
|
||||
io,
|
||||
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use protocol::Parcel;
|
||||
use tokio::{net::UdpSocket, time::timeout};
|
||||
use tracing::trace;
|
||||
|
||||
use crate::proto::{
|
||||
QueryServerRequest, QueryServerResponse, RawQueryServerRequest, RawQueryServerResponse,
|
||||
ServerInfo, MAX_RESPONSE_SIZE,
|
||||
};
|
||||
|
||||
// This must be at least 2 for the client to get a value for the `p` field.
|
||||
const MAX_REQUEST_RETRIES: usize = 5;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum QueryClientError {
|
||||
Io(tokio::io::Error),
|
||||
Protocol(protocol::Error),
|
||||
InvalidResponse,
|
||||
Timeout,
|
||||
ChallengeFailed,
|
||||
}
|
||||
|
||||
struct ClientInitData {
|
||||
p: u64,
|
||||
#[allow(dead_code)]
|
||||
server_max_version: u16,
|
||||
}
|
||||
|
||||
/// The `p` field has to be requested from the server each time this client is
|
||||
/// constructed, if possible reuse this!
|
||||
pub struct QueryClient {
|
||||
pub addr: SocketAddr,
|
||||
init: Option<ClientInitData>,
|
||||
}
|
||||
|
||||
impl QueryClient {
|
||||
pub fn new(addr: SocketAddr) -> Self { Self { addr, init: None } }
|
||||
|
||||
pub async fn server_info(&mut self) -> Result<(ServerInfo, Duration), QueryClientError> {
|
||||
self.send_query(QueryServerRequest::ServerInfo)
|
||||
.await
|
||||
.and_then(|(response, duration)| {
|
||||
#[allow(irrefutable_let_patterns)]
|
||||
if let QueryServerResponse::ServerInfo(info) = response {
|
||||
Ok((info, duration))
|
||||
} else {
|
||||
Err(QueryClientError::InvalidResponse)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn send_query(
|
||||
&mut self,
|
||||
request: QueryServerRequest,
|
||||
) -> Result<(QueryServerResponse, Duration), QueryClientError> {
|
||||
let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).await?;
|
||||
|
||||
for _ in 0..MAX_REQUEST_RETRIES {
|
||||
let request = if let Some(init) = &self.init {
|
||||
// TODO: Use the maximum version supported by both the client and server once
|
||||
// new protocol versions are added
|
||||
RawQueryServerRequest { p: init.p, request }
|
||||
} else {
|
||||
// TODO: Use the legacy version here once new protocol versions are added
|
||||
RawQueryServerRequest {
|
||||
p: 0,
|
||||
request: QueryServerRequest::Init,
|
||||
}
|
||||
};
|
||||
let buf = request.serialize()?;
|
||||
let query_sent = Instant::now();
|
||||
socket.send_to(&buf, self.addr).await?;
|
||||
let mut buf = vec![0; MAX_RESPONSE_SIZE];
|
||||
let (buf_len, _) = timeout(Duration::from_secs(2), socket.recv_from(&mut buf))
|
||||
.await
|
||||
.map_err(|_| QueryClientError::Timeout)??;
|
||||
|
||||
if buf_len <= 2 {
|
||||
Err(QueryClientError::InvalidResponse)?
|
||||
}
|
||||
|
||||
let packet = <RawQueryServerResponse as Parcel>::read(
|
||||
&mut io::Cursor::new(&buf[..buf_len]),
|
||||
&Default::default(),
|
||||
)?;
|
||||
|
||||
match packet {
|
||||
RawQueryServerResponse::Response(response) => {
|
||||
return Ok((response, query_sent.elapsed()));
|
||||
},
|
||||
RawQueryServerResponse::Init(init) => {
|
||||
trace!(?init, "Resetting p");
|
||||
self.init = Some(ClientInitData {
|
||||
p: init.p,
|
||||
server_max_version: init.max_supported_version,
|
||||
});
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Err(QueryClientError::ChallengeFailed)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tokio::io::Error> for QueryClientError {
|
||||
fn from(value: tokio::io::Error) -> Self { Self::Io(value) }
|
||||
}
|
||||
|
||||
impl From<protocol::Error> for QueryClientError {
|
||||
fn from(value: protocol::Error) -> Self { Self::Protocol(value) }
|
||||
}
|
4
common/query_server/src/lib.rs
Normal file
4
common/query_server/src/lib.rs
Normal file
@ -0,0 +1,4 @@
|
||||
#[cfg(feature = "client")] pub mod client;
|
||||
pub mod proto;
|
||||
#[cfg(feature = "server")] mod ratelimit;
|
||||
#[cfg(feature = "server")] pub mod server;
|
132
common/query_server/src/proto.rs
Normal file
132
common/query_server/src/proto.rs
Normal file
@ -0,0 +1,132 @@
|
||||
use protocol::Protocol;
|
||||
|
||||
pub(crate) const VERSION: u16 = 0;
|
||||
pub(crate) const VELOREN_HEADER: [u8; 7] = [b'v', b'e', b'l', b'o', b'r', b'e', b'n'];
|
||||
pub(crate) const MAX_REQUEST_CONTENT_SIZE: usize = 300;
|
||||
// NOTE: The actual maximum size must never exceed 1200 or we risk getting near
|
||||
// MTU limits for some networks.
|
||||
pub(crate) const MAX_REQUEST_SIZE: usize = MAX_REQUEST_CONTENT_SIZE + VELOREN_HEADER.len() + 2;
|
||||
pub(crate) const MAX_RESPONSE_SIZE: usize = 256;
|
||||
|
||||
#[derive(Protocol, Debug, Clone, Copy)]
|
||||
pub(crate) struct RawQueryServerRequest {
|
||||
/// See comment on [`Init::p`]
|
||||
pub p: u64,
|
||||
pub request: QueryServerRequest,
|
||||
}
|
||||
|
||||
#[derive(Protocol, Debug, Clone, Copy)]
|
||||
#[protocol(discriminant = "integer")]
|
||||
#[protocol(discriminator(u8))]
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub enum QueryServerRequest {
|
||||
/// This requests exists mostly for backwards-compatibilty reasons. As the
|
||||
/// first message sent to the server should always be in the V0 version
|
||||
/// of the protocol, if future versions of the protocol have more
|
||||
/// requests than server info it may be confusing to request `P` and the max
|
||||
/// version with a `QueryServerRequest::ServerInfo` request (the request
|
||||
/// will still be dropped as the supplied `P` value is invalid).
|
||||
Init,
|
||||
ServerInfo,
|
||||
// New requests should be added at the end to prevent breakage.
|
||||
// NOTE: Any new (sub-)variants must be added to the `check_request_sizes` test at the end of
|
||||
// this file
|
||||
}
|
||||
|
||||
#[derive(Protocol, Debug, Clone, Copy)]
|
||||
pub(crate) struct Init {
|
||||
/// This is used as a challenge to prevent IP address spoofing by verifying
|
||||
/// that the client can receive from the source address.
|
||||
///
|
||||
/// Any request to the server must include this value to be processed,
|
||||
/// otherwise this response will be returned (giving clients a value to pass
|
||||
/// for later requests).
|
||||
pub p: u64,
|
||||
/// The maximum supported protocol version by the server. The first request
|
||||
/// to a server must always be done in the V0 protocol to query this value.
|
||||
/// Following requests (when the version is known), can be done in the
|
||||
/// maximum version or below, responses will be sent in the same version as
|
||||
/// the requests.
|
||||
pub max_supported_version: u16,
|
||||
}
|
||||
|
||||
#[derive(Protocol, Debug, Clone, Copy)]
|
||||
#[protocol(discriminant = "integer")]
|
||||
#[protocol(discriminator(u8))]
|
||||
pub(crate) enum RawQueryServerResponse {
|
||||
Response(QueryServerResponse),
|
||||
Init(Init),
|
||||
}
|
||||
|
||||
#[derive(Protocol, Debug, Clone, Copy)]
|
||||
#[protocol(discriminant = "integer")]
|
||||
#[protocol(discriminator(u8))]
|
||||
pub enum QueryServerResponse {
|
||||
ServerInfo(ServerInfo),
|
||||
// New responses should be added at the end to prevent breakage
|
||||
}
|
||||
|
||||
#[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct ServerInfo {
|
||||
pub git_hash: u32,
|
||||
pub git_timestamp: i64,
|
||||
pub players_count: u16,
|
||||
pub player_cap: u16,
|
||||
pub battlemode: ServerBattleMode,
|
||||
}
|
||||
|
||||
#[derive(Protocol, Debug, Clone, Copy, PartialEq, Eq)]
|
||||
#[protocol(discriminant = "integer")]
|
||||
#[protocol(discriminator(u8))]
|
||||
#[repr(u8)]
|
||||
pub enum ServerBattleMode {
|
||||
GlobalPvP,
|
||||
GlobalPvE,
|
||||
PerPlayer,
|
||||
}
|
||||
|
||||
impl RawQueryServerRequest {
|
||||
#[cfg(any(feature = "client", test))]
|
||||
pub fn serialize(&self) -> Result<Vec<u8>, protocol::Error> {
|
||||
use protocol::Parcel;
|
||||
|
||||
let mut buf = Vec::with_capacity(MAX_REQUEST_SIZE);
|
||||
|
||||
// 2 extra bytes for version information, currently unused
|
||||
buf.extend(VERSION.to_le_bytes());
|
||||
buf.extend({
|
||||
let request_data =
|
||||
<RawQueryServerRequest as Parcel>::raw_bytes(self, &Default::default())?;
|
||||
if request_data.len() > MAX_REQUEST_CONTENT_SIZE {
|
||||
panic!(
|
||||
"Attempted to send request larger than the max size (size: {}, max size: \
|
||||
{MAX_REQUEST_CONTENT_SIZE}, request: {self:?})",
|
||||
request_data.len()
|
||||
);
|
||||
}
|
||||
request_data
|
||||
});
|
||||
const _: () = assert!(MAX_RESPONSE_SIZE + VELOREN_HEADER.len() <= MAX_REQUEST_SIZE);
|
||||
buf.resize(MAX_RESPONSE_SIZE.max(buf.len()), 0);
|
||||
buf.extend(VELOREN_HEADER);
|
||||
Ok(buf)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{QueryServerRequest, RawQueryServerRequest};
|
||||
|
||||
#[test]
|
||||
fn check_request_sizes() {
|
||||
const ALL_REQUESTS: &[QueryServerRequest] =
|
||||
&[QueryServerRequest::ServerInfo, QueryServerRequest::Init];
|
||||
for request in ALL_REQUESTS {
|
||||
let request = RawQueryServerRequest {
|
||||
p: 0,
|
||||
request: *request,
|
||||
};
|
||||
request.serialize().unwrap(); // This will panic if the size is above MAX_REQUEST_SIZE
|
||||
}
|
||||
}
|
||||
}
|
85
common/query_server/src/ratelimit.rs
Normal file
85
common/query_server/src/ratelimit.rs
Normal file
@ -0,0 +1,85 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::IpAddr,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
const SHIFT_EVERY: Duration = Duration::from_secs(15);
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum ReducedIpAddr {
|
||||
V4(u32),
|
||||
V6(u64),
|
||||
}
|
||||
|
||||
/// Per-IP state, divided into 4 segments of [`SHIFT_EVERY`] each (one minute at
|
||||
/// the time of writing).
|
||||
pub struct IpState([u16; 4]);
|
||||
|
||||
pub struct RateLimiter {
|
||||
states: HashMap<ReducedIpAddr, IpState>,
|
||||
last_shift: Instant,
|
||||
/// Maximum amount requests that can be done in `4 * SHIFT_EVERY`
|
||||
limit: u16,
|
||||
}
|
||||
|
||||
impl RateLimiter {
|
||||
pub fn new(limit: u16) -> Self {
|
||||
Self {
|
||||
states: Default::default(),
|
||||
last_shift: Instant::now(),
|
||||
limit,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn maintain(&mut self, now: Instant) {
|
||||
if now.duration_since(self.last_shift) > SHIFT_EVERY {
|
||||
// Remove empty states
|
||||
self.states.retain(|_, state| {
|
||||
state.shift();
|
||||
!state.is_empty()
|
||||
});
|
||||
self.last_shift = now;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn can_request(&mut self, ip: ReducedIpAddr) -> bool {
|
||||
if let Some(state) = self.states.get_mut(&ip) {
|
||||
state.0[0] = state.0[0].saturating_add(1);
|
||||
|
||||
state.total() < self.limit
|
||||
} else {
|
||||
self.states.insert(ip, IpState::default());
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IpState {
|
||||
fn shift(&mut self) {
|
||||
self.0.rotate_right(1);
|
||||
self.0[0] = 0;
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool { self.0.iter().all(|&freq| freq == 0) }
|
||||
|
||||
fn total(&self) -> u16 { self.0.iter().fold(0, |total, &v| total.saturating_add(v)) }
|
||||
}
|
||||
|
||||
impl Default for IpState {
|
||||
fn default() -> Self { Self([1, 0, 0, 0]) }
|
||||
}
|
||||
|
||||
impl From<IpAddr> for ReducedIpAddr {
|
||||
fn from(value: IpAddr) -> Self {
|
||||
match value {
|
||||
IpAddr::V4(v4) => Self::V4(u32::from_be_bytes(v4.octets())),
|
||||
IpAddr::V6(v6) => {
|
||||
let bytes = v6.octets();
|
||||
Self::V6(u64::from_be_bytes([
|
||||
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
|
||||
]))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
314
common/query_server/src/server.rs
Normal file
314
common/query_server/src/server.rs
Normal file
@ -0,0 +1,314 @@
|
||||
#[allow(deprecated)] use std::hash::SipHasher;
|
||||
use std::{
|
||||
hash::{Hash, Hasher},
|
||||
io::{self, ErrorKind},
|
||||
net::SocketAddr,
|
||||
sync::{Arc, Mutex},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use protocol::Parcel;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tokio::{net::UdpSocket, sync::watch};
|
||||
use tracing::{debug, error, trace};
|
||||
|
||||
use crate::{
|
||||
proto::{
|
||||
Init, QueryServerRequest, QueryServerResponse, RawQueryServerRequest,
|
||||
RawQueryServerResponse, ServerInfo, MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE, VELOREN_HEADER,
|
||||
VERSION,
|
||||
},
|
||||
ratelimit::{RateLimiter, ReducedIpAddr},
|
||||
};
|
||||
|
||||
const SECRET_REGEN_INTERNVAL: Duration = Duration::from_secs(300);
|
||||
|
||||
pub struct QueryServer {
|
||||
addr: SocketAddr,
|
||||
server_info: watch::Receiver<ServerInfo>,
|
||||
settings: protocol::Settings,
|
||||
ratelimit: RateLimiter,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Copy, Debug)]
|
||||
pub struct Metrics {
|
||||
pub received_packets: u32,
|
||||
pub dropped_packets: u32,
|
||||
pub invalid_packets: u32,
|
||||
pub proccessing_errors: u32,
|
||||
pub info_requests: u32,
|
||||
pub init_requests: u32,
|
||||
pub sent_responses: u32,
|
||||
pub failed_responses: u32,
|
||||
pub timed_out_responses: u32,
|
||||
pub ratelimited: u32,
|
||||
}
|
||||
|
||||
impl QueryServer {
|
||||
pub fn new(addr: SocketAddr, server_info: watch::Receiver<ServerInfo>, ratelimit: u16) -> Self {
|
||||
Self {
|
||||
addr,
|
||||
server_info,
|
||||
ratelimit: RateLimiter::new(ratelimit),
|
||||
settings: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// This produces TRACE level logs for any packet received on the assigned
|
||||
/// port. To prevent potentially unfettered log spam, disable the TRACE
|
||||
/// level for this crate (when outside of debugging contexts).
|
||||
///
|
||||
/// NOTE: TRACE and DEBUG levels are disabled by default for this crate when
|
||||
/// using `veloren-common-frontend`.
|
||||
pub async fn run(&mut self, metrics: Arc<Mutex<Metrics>>) -> Result<(), tokio::io::Error> {
|
||||
let mut socket = UdpSocket::bind(self.addr).await?;
|
||||
|
||||
let gen_secret = || {
|
||||
let mut rng = thread_rng();
|
||||
(rng.gen::<u64>(), rng.gen::<u64>())
|
||||
};
|
||||
let mut secrets = gen_secret();
|
||||
let mut last_secret_refresh = Instant::now();
|
||||
|
||||
let mut buf = Box::new([0; MAX_REQUEST_SIZE]);
|
||||
loop {
|
||||
let (len, remote_addr) = match socket.recv_from(&mut *buf).await {
|
||||
Ok(v) => v,
|
||||
Err(e) if e.kind() == ErrorKind::NotConnected => {
|
||||
error!(
|
||||
?e,
|
||||
"Query server connection was closed, re-binding to socket..."
|
||||
);
|
||||
socket = UdpSocket::bind(self.addr).await?;
|
||||
continue;
|
||||
},
|
||||
err => {
|
||||
debug!(?err, "Error while receiving from query server socket");
|
||||
continue;
|
||||
},
|
||||
};
|
||||
|
||||
let mut new_metrics = Metrics {
|
||||
received_packets: 1,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let raw_msg_buf = &buf[..len];
|
||||
let msg_buf = if Self::validate_datagram(raw_msg_buf) {
|
||||
// Require 2 extra bytes for version (currently unused)
|
||||
&raw_msg_buf[2..(raw_msg_buf.len() - VELOREN_HEADER.len())]
|
||||
} else {
|
||||
new_metrics.dropped_packets += 1;
|
||||
continue;
|
||||
};
|
||||
|
||||
self.process_datagram(msg_buf, remote_addr, secrets, &mut new_metrics, &socket)
|
||||
.await;
|
||||
|
||||
// Update metrics at the end of eath packet
|
||||
if let Ok(mut metrics) = metrics.lock() {
|
||||
*metrics += new_metrics;
|
||||
}
|
||||
|
||||
{
|
||||
let now = Instant::now();
|
||||
if now.duration_since(last_secret_refresh) > SECRET_REGEN_INTERNVAL {
|
||||
last_secret_refresh = now;
|
||||
secrets = gen_secret();
|
||||
}
|
||||
|
||||
self.ratelimit.maintain(now);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Header must be discarded after this validation passes
|
||||
fn validate_datagram(data: &[u8]) -> bool {
|
||||
let len = data.len();
|
||||
// Require 2 extra bytes for version (currently unused)
|
||||
if len < MAX_RESPONSE_SIZE.max(VELOREN_HEADER.len() + 2) {
|
||||
trace!(?len, "Datagram too short");
|
||||
false
|
||||
} else if len > MAX_REQUEST_SIZE {
|
||||
trace!(?len, "Datagram too large");
|
||||
false
|
||||
} else if data[(len - VELOREN_HEADER.len())..] != VELOREN_HEADER {
|
||||
trace!(?len, "Datagram header invalid");
|
||||
false
|
||||
// TODO: Allow lower versions once proper versioning is added.
|
||||
} else if u16::from_ne_bytes(data[..2].try_into().unwrap()) != VERSION {
|
||||
trace!(
|
||||
"Datagram has invalid version {:?}, current {VERSION:?}",
|
||||
&data[..2]
|
||||
);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_datagram(
|
||||
&mut self,
|
||||
datagram: &[u8],
|
||||
remote: SocketAddr,
|
||||
secrets: (u64, u64),
|
||||
metrics: &mut Metrics,
|
||||
socket: &UdpSocket,
|
||||
) {
|
||||
let Ok(RawQueryServerRequest {
|
||||
p: client_p,
|
||||
request,
|
||||
}) =
|
||||
<RawQueryServerRequest as Parcel>::read(&mut io::Cursor::new(datagram), &self.settings)
|
||||
else {
|
||||
metrics.invalid_packets += 1;
|
||||
return;
|
||||
};
|
||||
|
||||
trace!(?request, "Received packet");
|
||||
|
||||
#[allow(deprecated)]
|
||||
let real_p = {
|
||||
// Use SipHash-2-4 to compute the `p` value from a server specific
|
||||
// secret and the client's address.
|
||||
//
|
||||
// This is used to verify that packets are from an entity that can
|
||||
// receive packets at the given address.
|
||||
//
|
||||
// Only use the first 64 bits from Ipv6 addresses since the latter
|
||||
// 64 bits can change very frequently (as much as for every
|
||||
// request).
|
||||
let mut hasher = SipHasher::new_with_keys(secrets.0, secrets.1);
|
||||
ReducedIpAddr::from(remote.ip()).hash(&mut hasher);
|
||||
hasher.finish()
|
||||
};
|
||||
|
||||
if real_p != client_p {
|
||||
Self::send_response(
|
||||
RawQueryServerResponse::Init(Init {
|
||||
p: real_p,
|
||||
max_supported_version: VERSION,
|
||||
}),
|
||||
remote,
|
||||
socket,
|
||||
metrics,
|
||||
)
|
||||
.await;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if !self.ratelimit.can_request(remote.ip().into()) {
|
||||
trace!("Ratelimited request");
|
||||
metrics.ratelimited += 1;
|
||||
return;
|
||||
}
|
||||
|
||||
match request {
|
||||
QueryServerRequest::Init => {
|
||||
metrics.init_requests += 1;
|
||||
Self::send_response(
|
||||
RawQueryServerResponse::Init(Init {
|
||||
p: real_p,
|
||||
max_supported_version: VERSION,
|
||||
}),
|
||||
remote,
|
||||
socket,
|
||||
metrics,
|
||||
)
|
||||
.await;
|
||||
},
|
||||
QueryServerRequest::ServerInfo => {
|
||||
metrics.info_requests += 1;
|
||||
let server_info = *self.server_info.borrow();
|
||||
Self::send_response(
|
||||
RawQueryServerResponse::Response(QueryServerResponse::ServerInfo(server_info)),
|
||||
remote,
|
||||
socket,
|
||||
metrics,
|
||||
)
|
||||
.await;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_response(
|
||||
response: RawQueryServerResponse,
|
||||
addr: SocketAddr,
|
||||
socket: &UdpSocket,
|
||||
metrics: &mut Metrics,
|
||||
) {
|
||||
// TODO: Once more versions are added, send the packet in the same version as
|
||||
// the request here.
|
||||
match <RawQueryServerResponse as Parcel>::raw_bytes(&response, &Default::default()) {
|
||||
Ok(data) => {
|
||||
if data.len() > MAX_RESPONSE_SIZE {
|
||||
error!(
|
||||
?MAX_RESPONSE_SIZE,
|
||||
"Attempted to send a response larger than the maximum allowed size (size: \
|
||||
{}, response: {response:?})",
|
||||
data.len()
|
||||
);
|
||||
#[cfg(debug_assertions)]
|
||||
panic!(
|
||||
"Attempted to send a response larger than the maximum allowed size (size: \
|
||||
{}, max: {}, response: {response:?})",
|
||||
data.len(),
|
||||
MAX_RESPONSE_SIZE
|
||||
);
|
||||
}
|
||||
|
||||
match socket.send_to(&data, addr).await {
|
||||
Ok(_) => {
|
||||
metrics.sent_responses += 1;
|
||||
},
|
||||
Err(err) => {
|
||||
metrics.failed_responses += 1;
|
||||
debug!(?err, "Failed to send query server response");
|
||||
},
|
||||
}
|
||||
},
|
||||
Err(error) => {
|
||||
trace!(?error, "Failed to serialize response");
|
||||
#[cfg(debug_assertions)]
|
||||
panic!("Serializing response failed: {error:?} ({response:?})");
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::AddAssign for Metrics {
|
||||
fn add_assign(
|
||||
&mut self,
|
||||
Self {
|
||||
received_packets,
|
||||
dropped_packets,
|
||||
invalid_packets,
|
||||
proccessing_errors,
|
||||
info_requests,
|
||||
init_requests,
|
||||
sent_responses,
|
||||
failed_responses,
|
||||
timed_out_responses,
|
||||
ratelimited,
|
||||
}: Self,
|
||||
) {
|
||||
self.received_packets += received_packets;
|
||||
self.dropped_packets += dropped_packets;
|
||||
self.invalid_packets += invalid_packets;
|
||||
self.proccessing_errors += proccessing_errors;
|
||||
self.info_requests += info_requests;
|
||||
self.init_requests += init_requests;
|
||||
self.sent_responses += sent_responses;
|
||||
self.failed_responses += failed_responses;
|
||||
self.timed_out_responses += timed_out_responses;
|
||||
self.ratelimited += ratelimited;
|
||||
}
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
/// Resets all metrics to 0 and returns previous ones
|
||||
///
|
||||
/// Used by the consumer of the metrics.
|
||||
pub fn reset(&mut self) -> Self { std::mem::take(self) }
|
||||
}
|
@ -22,6 +22,10 @@ lazy_static::lazy_static! {
|
||||
static ref GIT_DATETIME: &'static str = GIT_VERSION.split('/').nth(1).expect("failed to retrieve git_datetime!");
|
||||
pub static ref GIT_DATE: String = GIT_DATETIME.split('-').take(3).collect::<Vec<&str>>().join("-");
|
||||
pub static ref GIT_TIME: &'static str = GIT_DATETIME.split('-').nth(3).expect("failed to retrieve git_time!");
|
||||
pub static ref GIT_DATE_TIMESTAMP: i64 =
|
||||
NaiveDateTime::parse_from_str(*GIT_DATETIME, "%Y-%m-%d-%H:%M")
|
||||
.expect("Invalid date")
|
||||
.and_utc().timestamp();
|
||||
pub static ref DISPLAY_VERSION: String = if GIT_TAG.is_empty() {
|
||||
format!("{}-{}", VELOREN_VERSION_STAGE, *GIT_DATE)
|
||||
} else {
|
||||
@ -34,6 +38,7 @@ lazy_static::lazy_static! {
|
||||
};
|
||||
}
|
||||
|
||||
use chrono::NaiveDateTime;
|
||||
pub use color::*;
|
||||
pub use dir::*;
|
||||
pub use grid_hasher::GridHasher;
|
||||
|
@ -9,6 +9,7 @@ services:
|
||||
ports:
|
||||
- "14004:14004"
|
||||
- "14005:14005"
|
||||
- "14006:14006/udp"
|
||||
restart: on-failure:0
|
||||
volumes:
|
||||
- "./userdata:/opt/userdata"
|
||||
|
@ -18,6 +18,7 @@ default = ["worldgen", "plugins", "persistent_world", "simd"]
|
||||
[dependencies]
|
||||
common = { package = "veloren-common", path = "../common" }
|
||||
common-base = { package = "veloren-common-base", path = "../common/base" }
|
||||
veloren-query-server = { package = "veloren-query-server", path = "../common/query_server", default-features = false, features = ["server"] }
|
||||
common-ecs = { package = "veloren-common-ecs", path = "../common/ecs" }
|
||||
common-state = { package = "veloren-common-state", path = "../common/state" }
|
||||
common-systems = { package = "veloren-common-systems", path = "../common/systems" }
|
||||
|
@ -93,6 +93,7 @@ use common::{
|
||||
shared_server_config::ServerConstants,
|
||||
slowjob::SlowJobPool,
|
||||
terrain::TerrainChunk,
|
||||
util::GIT_DATE_TIMESTAMP,
|
||||
vol::RectRasterableVol,
|
||||
};
|
||||
use common_base::prof_span;
|
||||
@ -116,7 +117,7 @@ use specs::{
|
||||
use std::{
|
||||
i32,
|
||||
ops::{Deref, DerefMut},
|
||||
sync::Arc,
|
||||
sync::{Arc, Mutex},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
#[cfg(not(feature = "worldgen"))]
|
||||
@ -124,6 +125,7 @@ use test_world::{IndexOwned, World};
|
||||
use tokio::runtime::Runtime;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use vek::*;
|
||||
use veloren_query_server::server::QueryServer;
|
||||
pub use world::{civ::WorldCivStage, sim::WorldSimStage, WorldGenerateStage};
|
||||
|
||||
use crate::{
|
||||
@ -274,6 +276,7 @@ impl Server {
|
||||
let tick_metrics = TickMetrics::new(®istry).unwrap();
|
||||
let physics_metrics = PhysicsMetrics::new(®istry).unwrap();
|
||||
let server_event_metrics = metrics::ServerEventMetrics::new(®istry).unwrap();
|
||||
let query_server_metrics = metrics::QueryServerMetrics::new(®istry).unwrap();
|
||||
|
||||
let battlemode_buffer = BattleModeBuffer::default();
|
||||
|
||||
@ -376,6 +379,7 @@ impl Server {
|
||||
state.ecs_mut().insert(tick_metrics);
|
||||
state.ecs_mut().insert(physics_metrics);
|
||||
state.ecs_mut().insert(server_event_metrics);
|
||||
state.ecs_mut().insert(query_server_metrics);
|
||||
if settings.experimental_terrain_persistence {
|
||||
#[cfg(feature = "persistent_world")]
|
||||
{
|
||||
@ -597,6 +601,32 @@ impl Server {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(addr) = settings.query_address {
|
||||
use veloren_query_server::proto::ServerInfo;
|
||||
|
||||
const QUERY_SERVER_RATELIMIT: u16 = 120;
|
||||
|
||||
let (query_server_info_tx, query_server_info_rx) =
|
||||
tokio::sync::watch::channel(ServerInfo {
|
||||
git_hash: *sys::server_info::GIT_HASH,
|
||||
git_timestamp: *GIT_DATE_TIMESTAMP,
|
||||
players_count: 0,
|
||||
player_cap: settings.max_players,
|
||||
battlemode: settings.gameplay.battle_mode.into(),
|
||||
});
|
||||
let mut query_server =
|
||||
QueryServer::new(addr, query_server_info_rx, QUERY_SERVER_RATELIMIT);
|
||||
let query_server_metrics =
|
||||
Arc::new(Mutex::new(veloren_query_server::server::Metrics::default()));
|
||||
let query_server_metrics2 = Arc::clone(&query_server_metrics);
|
||||
runtime.spawn(async move {
|
||||
let err = query_server.run(query_server_metrics2).await.err();
|
||||
error!(?err, "Query server stopped unexpectedly");
|
||||
});
|
||||
state.ecs_mut().insert(query_server_info_tx);
|
||||
state.ecs_mut().insert(query_server_metrics);
|
||||
}
|
||||
|
||||
runtime.block_on(network.listen(ListenAddr::Mpsc(14004)))?;
|
||||
|
||||
let connection_handler = ConnectionHandler::new(network, &runtime);
|
||||
|
@ -70,6 +70,19 @@ pub struct ServerEventMetrics {
|
||||
pub event_count: IntCounterVec,
|
||||
}
|
||||
|
||||
pub struct QueryServerMetrics {
|
||||
pub received_packets: IntCounter,
|
||||
pub dropped_packets: IntCounter,
|
||||
pub invalid_packets: IntCounter,
|
||||
pub proccessing_errors: IntCounter,
|
||||
pub info_requests: IntCounter,
|
||||
pub init_requests: IntCounter,
|
||||
pub sent_responses: IntCounter,
|
||||
pub failed_responses: IntCounter,
|
||||
pub timed_out_responses: IntCounter,
|
||||
pub ratelimited: IntCounter,
|
||||
}
|
||||
|
||||
impl PhysicsMetrics {
|
||||
pub fn new(registry: &Registry) -> Result<Self, prometheus::Error> {
|
||||
let entity_entity_collision_checks_count = IntCounter::with_opts(Opts::new(
|
||||
@ -415,7 +428,7 @@ impl TickMetrics {
|
||||
}
|
||||
|
||||
impl ServerEventMetrics {
|
||||
pub fn new(registry: &Registry) -> Result<Self, Box<dyn Error>> {
|
||||
pub fn new(registry: &Registry) -> Result<Self, prometheus::Error> {
|
||||
let event_count = IntCounterVec::new(
|
||||
Opts::new("event_count", "number of ServerEvents handled"),
|
||||
&["event"],
|
||||
@ -425,3 +438,99 @@ impl ServerEventMetrics {
|
||||
Ok(Self { event_count })
|
||||
}
|
||||
}
|
||||
|
||||
impl QueryServerMetrics {
|
||||
pub fn new(registry: &Registry) -> Result<Self, prometheus::Error> {
|
||||
let received_packets = IntCounter::with_opts(Opts::new(
|
||||
"query_server::received_packets",
|
||||
"Total amount of received packets by the query server",
|
||||
))?;
|
||||
let dropped_packets = IntCounter::with_opts(Opts::new(
|
||||
"query_server::dropped_packets",
|
||||
"Amount of dropped packets received by the query server (too short or invalid header)",
|
||||
))?;
|
||||
let invalid_packets = IntCounter::with_opts(Opts::new(
|
||||
"query_server::invalid_packets",
|
||||
"Amount of unparseable packets received by the query server",
|
||||
))?;
|
||||
let proccessing_errors = IntCounter::with_opts(Opts::new(
|
||||
"query_server::proccessing_errors",
|
||||
"Amount of errors that occured while processing a query server request",
|
||||
))?;
|
||||
let info_requests = IntCounter::with_opts(Opts::new(
|
||||
"query_server::info_requests",
|
||||
"Amount of server info requests received by the query server",
|
||||
))?;
|
||||
let init_requests = IntCounter::with_opts(Opts::new(
|
||||
"query_server::ping_requests",
|
||||
"Amount of init requests received by the query server",
|
||||
))?;
|
||||
let sent_responses = IntCounter::with_opts(Opts::new(
|
||||
"query_server::sent_responses",
|
||||
"Amount of responses sent by the query server",
|
||||
))?;
|
||||
let failed_responses = IntCounter::with_opts(Opts::new(
|
||||
"query_server::failed_responses",
|
||||
"Amount of responses which failed to be sent by the query server",
|
||||
))?;
|
||||
let timed_out_responses = IntCounter::with_opts(Opts::new(
|
||||
"query_server::timed_out_responses",
|
||||
"Amount of responses which timed out",
|
||||
))?;
|
||||
let ratelimited = IntCounter::with_opts(Opts::new(
|
||||
"query_server::ratelimited",
|
||||
"Ratelimited requests to the query server",
|
||||
))?;
|
||||
|
||||
registry.register(Box::new(received_packets.clone()))?;
|
||||
registry.register(Box::new(dropped_packets.clone()))?;
|
||||
registry.register(Box::new(invalid_packets.clone()))?;
|
||||
registry.register(Box::new(proccessing_errors.clone()))?;
|
||||
registry.register(Box::new(info_requests.clone()))?;
|
||||
registry.register(Box::new(init_requests.clone()))?;
|
||||
registry.register(Box::new(sent_responses.clone()))?;
|
||||
registry.register(Box::new(failed_responses.clone()))?;
|
||||
registry.register(Box::new(timed_out_responses.clone()))?;
|
||||
registry.register(Box::new(ratelimited.clone()))?;
|
||||
|
||||
Ok(Self {
|
||||
received_packets,
|
||||
dropped_packets,
|
||||
invalid_packets,
|
||||
proccessing_errors,
|
||||
info_requests,
|
||||
init_requests,
|
||||
sent_responses,
|
||||
failed_responses,
|
||||
timed_out_responses,
|
||||
ratelimited,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn apply(
|
||||
&self,
|
||||
veloren_query_server::server::Metrics {
|
||||
received_packets,
|
||||
dropped_packets,
|
||||
invalid_packets,
|
||||
proccessing_errors,
|
||||
info_requests,
|
||||
init_requests,
|
||||
sent_responses,
|
||||
failed_responses,
|
||||
timed_out_responses,
|
||||
ratelimited,
|
||||
}: veloren_query_server::server::Metrics,
|
||||
) {
|
||||
self.received_packets.inc_by(received_packets as u64);
|
||||
self.dropped_packets.inc_by(dropped_packets as u64);
|
||||
self.invalid_packets.inc_by(invalid_packets as u64);
|
||||
self.proccessing_errors.inc_by(proccessing_errors as u64);
|
||||
self.info_requests.inc_by(info_requests as u64);
|
||||
self.init_requests.inc_by(init_requests as u64);
|
||||
self.sent_responses.inc_by(sent_responses as u64);
|
||||
self.failed_responses.inc_by(failed_responses as u64);
|
||||
self.timed_out_responses.inc_by(timed_out_responses as u64);
|
||||
self.ratelimited.inc_by(ratelimited as u64);
|
||||
}
|
||||
}
|
||||
|
@ -67,6 +67,20 @@ impl ServerBattleMode {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ServerBattleMode> for veloren_query_server::proto::ServerBattleMode {
|
||||
fn from(value: ServerBattleMode) -> Self {
|
||||
use veloren_query_server::proto::ServerBattleMode as QueryBattleMode;
|
||||
|
||||
match value {
|
||||
ServerBattleMode::Global(mode) => match mode {
|
||||
BattleMode::PvP => QueryBattleMode::GlobalPvP,
|
||||
BattleMode::PvE => QueryBattleMode::GlobalPvE,
|
||||
},
|
||||
ServerBattleMode::PerPlayer { .. } => QueryBattleMode::PerPlayer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum Protocol {
|
||||
Quic {
|
||||
@ -163,6 +177,7 @@ impl CalendarMode {
|
||||
pub struct Settings {
|
||||
pub gameserver_protocols: Vec<Protocol>,
|
||||
pub auth_server_address: Option<String>,
|
||||
pub query_address: Option<SocketAddr>,
|
||||
pub max_players: u16,
|
||||
pub world_seed: u32,
|
||||
pub server_name: String,
|
||||
@ -204,6 +219,7 @@ impl Default for Settings {
|
||||
},
|
||||
],
|
||||
auth_server_address: Some("https://auth.veloren.net".into()),
|
||||
query_address: Some(SocketAddr::from((Ipv4Addr::UNSPECIFIED, 14006))),
|
||||
world_seed: DEFAULT_WORLD_SEED,
|
||||
server_name: "Veloren Server".into(),
|
||||
max_players: 100,
|
||||
|
@ -1,12 +1,16 @@
|
||||
use crate::{
|
||||
chunk_generator::ChunkGenerator,
|
||||
metrics::{EcsSystemMetrics, JobMetrics, PhysicsMetrics, TickMetrics},
|
||||
metrics::{EcsSystemMetrics, JobMetrics, PhysicsMetrics, QueryServerMetrics, TickMetrics},
|
||||
HwStats, Tick, TickStart,
|
||||
};
|
||||
use common::{resources::TimeOfDay, slowjob::SlowJobPool, terrain::TerrainGrid};
|
||||
use common_ecs::{Job, Origin, Phase, SysMetrics, System};
|
||||
use specs::{Entities, Join, Read, ReadExpect};
|
||||
use std::time::Instant;
|
||||
use std::{
|
||||
sync::{Arc, Mutex},
|
||||
time::Instant,
|
||||
};
|
||||
use veloren_query_server::server::Metrics as RawQueryServerMetrics;
|
||||
|
||||
/// This system exports metrics
|
||||
#[derive(Default)]
|
||||
@ -27,6 +31,8 @@ impl<'a> System<'a> for Sys {
|
||||
ReadExpect<'a, TickMetrics>,
|
||||
ReadExpect<'a, PhysicsMetrics>,
|
||||
ReadExpect<'a, JobMetrics>,
|
||||
Option<Read<'a, Arc<Mutex<RawQueryServerMetrics>>>>,
|
||||
ReadExpect<'a, QueryServerMetrics>,
|
||||
);
|
||||
|
||||
const NAME: &'static str = "metrics";
|
||||
@ -50,6 +56,8 @@ impl<'a> System<'a> for Sys {
|
||||
export_tick,
|
||||
export_physics,
|
||||
export_jobs,
|
||||
raw_query_server,
|
||||
export_query_server,
|
||||
): Self::SystemData,
|
||||
) {
|
||||
const NANOSEC_PER_SEC: f64 = std::time::Duration::from_secs(1).as_nanos() as f64;
|
||||
@ -165,5 +173,13 @@ impl<'a> System<'a> for Sys {
|
||||
.system_length_hist
|
||||
.with_label_values(&["metrics"])
|
||||
.observe(len as f64 / NANOSEC_PER_SEC);
|
||||
|
||||
if let Some(Ok(metrics)) = raw_query_server
|
||||
.as_ref()
|
||||
// Hold the lock for the shortest time possible
|
||||
.map(|m| m.lock().map(|mut metrics| metrics.reset()))
|
||||
{
|
||||
export_query_server.apply(metrics);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ pub mod object;
|
||||
pub mod persistence;
|
||||
pub mod pets;
|
||||
pub mod sentinel;
|
||||
pub mod server_info;
|
||||
pub mod subscription;
|
||||
pub mod teleporter;
|
||||
pub mod terrain;
|
||||
|
49
server/src/sys/server_info.rs
Normal file
49
server/src/sys/server_info.rs
Normal file
@ -0,0 +1,49 @@
|
||||
use common::{comp::Player, util::GIT_DATE_TIMESTAMP};
|
||||
use common_ecs::{Origin, Phase, System};
|
||||
use lazy_static::lazy_static;
|
||||
use specs::{Read, ReadStorage};
|
||||
use tracing::error;
|
||||
use veloren_query_server::proto::ServerInfo;
|
||||
|
||||
use crate::{Settings, Tick};
|
||||
|
||||
// Update the server stats every 60 ticks
|
||||
const INFO_SEND_INTERVAL: u64 = 60;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref GIT_HASH: u32 =
|
||||
u32::from_str_radix(&common::util::GIT_HASH[..8], 16).expect("Invalid git hash");
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Sys;
|
||||
|
||||
impl<'a> System<'a> for Sys {
|
||||
type SystemData = (
|
||||
Read<'a, Tick>,
|
||||
Read<'a, Settings>,
|
||||
Option<Read<'a, tokio::sync::watch::Sender<ServerInfo>>>,
|
||||
ReadStorage<'a, Player>,
|
||||
);
|
||||
|
||||
const NAME: &'static str = "server_info";
|
||||
const ORIGIN: Origin = Origin::Server;
|
||||
const PHASE: Phase = Phase::Create;
|
||||
|
||||
fn run(_job: &mut common_ecs::Job<Self>, (tick, settings, sender, players): Self::SystemData) {
|
||||
if let Some(sender) = sender.as_ref()
|
||||
&& tick.0 % INFO_SEND_INTERVAL == 0
|
||||
{
|
||||
let count = players.count().try_into().unwrap_or(u16::MAX);
|
||||
if let Err(error) = sender.send(ServerInfo {
|
||||
git_hash: *GIT_HASH,
|
||||
git_timestamp: *GIT_DATE_TIMESTAMP,
|
||||
players_count: count,
|
||||
player_cap: settings.max_players,
|
||||
battlemode: settings.gameplay.battle_mode.into(),
|
||||
}) {
|
||||
error!(?error, "Failed to send server info to the query server");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user