Added UDP Query Server to Veloren Server

This commit is contained in:
Ben Wallis 2022-10-05 21:57:59 +01:00
parent e5dfc6261c
commit e9bba9999b
8 changed files with 320 additions and 3 deletions

40
Cargo.lock generated
View File

@ -1907,6 +1907,16 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "error-chain"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
dependencies = [
"backtrace",
"version_check 0.9.4",
]
[[package]] [[package]]
name = "error-code" name = "error-code"
version = "2.3.1" version = "2.3.1"
@ -4662,6 +4672,31 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "protocol"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13cfa9ba37e0183f87fb14b82f23fc76494c458c72469d95b8a8eec75ad5f191"
dependencies = [
"byteorder",
"error-chain",
"flate2",
"num-traits",
"protocol-derive",
"uuid 0.8.2",
]
[[package]]
name = "protocol-derive"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28647f30298898ead966b51e9aee5c74e4ac709ce5ca554378fde187fd3f7e47"
dependencies = [
"proc-macro2 1.0.43",
"quote 1.0.21",
"syn 1.0.100",
]
[[package]] [[package]]
name = "ptr_meta" name = "ptr_meta"
version = "0.1.4" version = "0.1.4"
@ -6468,7 +6503,7 @@ version = "1.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [ dependencies = [
"cfg-if 0.1.10", "cfg-if 1.0.0",
"rand 0.8.5", "rand 0.8.5",
"static_assertions", "static_assertions",
] ]
@ -6881,7 +6916,7 @@ dependencies = [
"crossbeam-channel", "crossbeam-channel",
"futures-core", "futures-core",
"futures-util", "futures-util",
"hashbrown 0.9.1", "hashbrown 0.12.3",
"lazy_static", "lazy_static",
"lz-fear", "lz-fear",
"prometheus", "prometheus",
@ -6968,6 +7003,7 @@ dependencies = [
"portpicker", "portpicker",
"prometheus", "prometheus",
"prometheus-hyper", "prometheus-hyper",
"protocol",
"quinn", "quinn",
"rand 0.8.5", "rand 0.8.5",
"rand_distr", "rand_distr",

View File

@ -74,6 +74,7 @@ where
"quinn_proto::connection=info", "quinn_proto::connection=info",
"veloren_server::persistence::character=info", "veloren_server::persistence::character=info",
"veloren_server::settings=info", "veloren_server::settings=info",
"veloren_server::query_server=info",
]; ];
for s in default_directives { for s in default_directives {

View File

@ -64,6 +64,7 @@ rand_distr = "0.4.0"
enumset = "1.0.8" enumset = "1.0.8"
noise = { version = "0.7", default-features = false } noise = { version = "0.7", default-features = false }
censor = "0.2" censor = "0.2"
protocol = { version = "3.4", features = ["derive"] }
rusqlite = { version = "0.24.2", features = ["array", "vtab", "bundled", "trace"] } rusqlite = { version = "0.24.2", features = ["array", "vtab", "bundled", "trace"] }
refinery = { git = "https://gitlab.com/veloren/refinery.git", rev = "8ecf4b4772d791e6c8c0a3f9b66a7530fad1af3e", features = ["rusqlite"] } refinery = { git = "https://gitlab.com/veloren/refinery.git", rev = "8ecf4b4772d791e6c8c0a3f9b66a7530fad1af3e", features = ["rusqlite"] }

View File

@ -38,6 +38,7 @@ pub mod terrain_persistence;
mod weather; mod weather;
mod query_server;
pub mod wiring; pub mod wiring;
// Reexports // Reexports
@ -125,6 +126,7 @@ use {
common_state::plugin::{memory_manager::EcsWorld, PluginMgr}, common_state::plugin::{memory_manager::EcsWorld, PluginMgr},
}; };
use crate::{query_server::QueryServer, sys::server_info::ServerInfoRequest};
use common::comp::Anchor; use common::comp::Anchor;
#[cfg(feature = "worldgen")] #[cfg(feature = "worldgen")]
use world::{ use world::{
@ -309,6 +311,7 @@ impl Server {
state.ecs_mut().insert(tick_metrics); state.ecs_mut().insert(tick_metrics);
state.ecs_mut().insert(physics_metrics); state.ecs_mut().insert(physics_metrics);
state.ecs_mut().insert(server_event_metrics); state.ecs_mut().insert(server_event_metrics);
if settings.experimental_terrain_persistence { if settings.experimental_terrain_persistence {
#[cfg(feature = "persistent_world")] #[cfg(feature = "persistent_world")]
{ {
@ -478,6 +481,12 @@ impl Server {
.await .await
}); });
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
state.ecs_mut().insert(receiver);
let mut query_server = QueryServer::new(settings.query_address, sender);
// Run the query server in its own long-running future
runtime.spawn(async move { query_server.run().await });
let mut printed_quic_warning = false; let mut printed_quic_warning = false;
for protocol in &settings.gameserver_protocols { for protocol in &settings.gameserver_protocols {
match protocol { match protocol {

226
server/src/query_server.rs Normal file
View File

@ -0,0 +1,226 @@
use crate::{settings::ServerBattleMode, ServerInfoRequest};
use common::resources::BattleMode;
use protocol::{wire::dgram, Protocol};
use std::{
io,
io::Cursor,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
time::Duration,
};
use tokio::{net::UdpSocket, sync::mpsc::UnboundedSender, time::timeout};
use tracing::{debug, info, trace};
// NOTE: Debug logging is disabled by default for this module - to enable it add
// veloren_server::query_server=trace to RUST_LOG
pub struct QueryServer {
pub bind_addr: SocketAddr,
pipeline: dgram::Pipeline<QueryServerPacketKind, protocol::wire::middleware::pipeline::Default>,
server_info_request_sender: UnboundedSender<ServerInfoRequest>,
}
impl QueryServer {
pub fn new(
bind_addr: SocketAddr,
server_info_request_sender: UnboundedSender<ServerInfoRequest>,
) -> Self {
Self {
bind_addr,
pipeline: protocol::wire::dgram::Pipeline::new(
protocol::wire::middleware::pipeline::default(),
protocol::Settings::default(),
),
server_info_request_sender,
}
}
pub async fn run(&mut self) -> Result<(), io::Error> {
let socket = UdpSocket::bind(self.bind_addr).await?;
info!("Query Server running at {}", self.bind_addr);
loop {
let mut buf = vec![0; 1500];
let (len, remote_addr) = socket.recv_from(&mut buf).await?;
if !QueryServer::validate_datagram(len, &mut buf) {
continue;
}
if let Err(e) = self.process_datagram(buf, remote_addr).await {
debug!(?e, "Failed to process incoming datagram")
}
}
}
fn validate_datagram(len: usize, data: &mut Vec<u8>) -> bool {
const VELOREN_HEADER: [u8; 7] = [0x56, 0x45, 0x4C, 0x4F, 0x52, 0x45, 0x4E];
if len < 8 {
trace!("Ignoring packet - too short");
false
} else if data[0..7] != VELOREN_HEADER {
trace!("Ignoring packet - missing header");
false
} else {
trace!("Validated packet, data: {:?}", data);
// Discard the header after successful validation
*data = data.split_off(7);
true
}
}
async fn process_datagram(
&mut self,
datagram: Vec<u8>,
remote_addr: SocketAddr,
) -> Result<(), QueryError> {
let packet: QueryServerPacketKind =
self.pipeline.receive_from(&mut Cursor::new(datagram))?;
debug!(?packet, ?remote_addr, "Query Server received packet");
match packet {
QueryServerPacketKind::Ping(_) => {
QueryServer::send_response(remote_addr, &QueryServerPacketKind::Pong(Pong {}))
.await?
},
QueryServerPacketKind::ServerInfoQuery(ref _query) => {
let (sender, receiver) = tokio::sync::oneshot::channel::<ServerInfoResponse>();
let req = ServerInfoRequest {
response_sender: sender,
};
self.server_info_request_sender
.send(req)
.map_err(|e| QueryError::ChannelError(format!("{}", e)))?;
tokio::spawn(async move {
match timeout(Duration::from_secs(2), async move {
match receiver.await {
Ok(response) => {
trace!(?response, "Sending ServerInfoResponse");
QueryServer::send_response(
remote_addr,
&QueryServerPacketKind::ServerInfoResponse(response),
)
.await
.expect("Failed to send response"); // TODO remove expect
},
Err(_) => {
// Oneshot receive error
},
}
})
.await
{
Ok(_) => {},
Err(elapsed) => {
debug!(
?elapsed,
"Timeout expired while waiting for ServerInfoResponse"
);
},
}
});
},
QueryServerPacketKind::Pong(_) | QueryServerPacketKind::ServerInfoResponse(_) => {
// Ignore any incoming packets
debug!(?packet, "Dropping received response packet");
},
}
Ok(())
}
async fn send_response(
dest: SocketAddr,
packet: &QueryServerPacketKind,
) -> Result<(), QueryError> {
let socket =
UdpSocket::bind(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))).await?;
let mut buf = Vec::<u8>::new();
let mut pipeline = protocol::wire::dgram::Pipeline::new(
protocol::wire::middleware::pipeline::default(),
protocol::Settings::default(),
);
pipeline.send_to(&mut Cursor::new(&mut buf), packet)?;
socket.send_to(buf.as_slice(), dest).await?;
Ok(())
}
}
#[derive(Debug)]
enum QueryError {
NetworkError(io::Error),
ProtocolError(protocol::Error),
ChannelError(String),
}
impl From<protocol::Error> for QueryError {
fn from(e: protocol::Error) -> Self { QueryError::ProtocolError(e) }
}
impl From<io::Error> for QueryError {
fn from(e: io::Error) -> Self { QueryError::NetworkError(e) }
}
#[derive(protocol::Protocol, Clone, Debug, PartialEq)]
pub struct Ping;
#[derive(protocol::Protocol, Clone, Debug, PartialEq)]
pub struct Pong;
#[derive(protocol::Protocol, Clone, Debug, PartialEq)]
pub struct ServerInfoQuery;
#[derive(protocol::Protocol, Clone, Debug, PartialEq)]
#[protocol(discriminant = "integer")]
#[protocol(discriminator(u8))]
pub enum QueryServerPacketKind {
#[protocol(discriminator(0x00))]
Ping(Ping),
#[protocol(discriminator(0x01))]
Pong(Pong),
#[protocol(discriminator(0xA0))]
ServerInfoQuery(ServerInfoQuery),
#[protocol(discriminator(0xA1))]
ServerInfoResponse(ServerInfoResponse),
}
#[derive(Protocol, Debug, Clone, PartialEq)]
pub struct ServerInfoResponse {
pub git_hash: String, /* TODO: use u8 array instead? String includes 8 bytes for capacity
* and length that we don't need */
pub players_current: u16,
pub players_max: u16,
pub battle_mode: QueryBattleMode, // TODO: use a custom enum to avoid accidental breakage
}
#[derive(Protocol, Debug, Clone, PartialEq)]
#[protocol(discriminant = "integer")]
#[protocol(discriminator(u8))]
pub enum QueryBattleMode {
#[protocol(discriminator(0x00))]
GlobalPvP,
#[protocol(discriminator(0x01))]
GlobalPvE,
#[protocol(discriminator(0x02))]
PerPlayer,
}
impl From<ServerBattleMode> for QueryBattleMode {
fn from(battle_mode: ServerBattleMode) -> Self {
match battle_mode {
ServerBattleMode::Global(x) => match x {
BattleMode::PvP => QueryBattleMode::GlobalPvP,
BattleMode::PvE => QueryBattleMode::GlobalPvE,
},
ServerBattleMode::PerPlayer { .. } => QueryBattleMode::PerPlayer,
}
}
}

View File

@ -37,7 +37,7 @@ const BANLIST_FILENAME: &str = "banlist.ron";
const SERVER_DESCRIPTION_FILENAME: &str = "description.ron"; const SERVER_DESCRIPTION_FILENAME: &str = "description.ron";
const ADMINS_FILENAME: &str = "admins.ron"; const ADMINS_FILENAME: &str = "admins.ron";
#[derive(Copy, Clone, Debug, Deserialize, Serialize)] #[derive(Copy, Clone, Debug, PartialEq, Deserialize, Serialize)]
pub enum ServerBattleMode { pub enum ServerBattleMode {
Global(BattleMode), Global(BattleMode),
PerPlayer { default: BattleMode }, PerPlayer { default: BattleMode },
@ -161,6 +161,7 @@ impl CalendarMode {
pub struct Settings { pub struct Settings {
pub gameserver_protocols: Vec<Protocol>, pub gameserver_protocols: Vec<Protocol>,
pub metrics_address: SocketAddr, pub metrics_address: SocketAddr,
pub query_address: SocketAddr,
pub auth_server_address: Option<String>, pub auth_server_address: Option<String>,
pub max_players: u16, pub max_players: u16,
pub world_seed: u32, pub world_seed: u32,
@ -199,6 +200,7 @@ impl Default for Settings {
}, },
], ],
metrics_address: SocketAddr::from((Ipv4Addr::LOCALHOST, 14005)), metrics_address: SocketAddr::from((Ipv4Addr::LOCALHOST, 14005)),
query_address: SocketAddr::from((Ipv4Addr::LOCALHOST, 14006)),
auth_server_address: Some("https://auth.veloren.net".into()), auth_server_address: Some("https://auth.veloren.net".into()),
world_seed: DEFAULT_WORLD_SEED, world_seed: DEFAULT_WORLD_SEED,
server_name: "Veloren Server".into(), server_name: "Veloren Server".into(),

View File

@ -10,6 +10,7 @@ pub mod object;
pub mod persistence; pub mod persistence;
pub mod pets; pub mod pets;
pub mod sentinel; pub mod sentinel;
pub mod server_info;
pub mod subscription; pub mod subscription;
pub mod terrain; pub mod terrain;
pub mod terrain_sync; pub mod terrain_sync;
@ -40,6 +41,7 @@ pub fn add_server_systems(dispatch_builder: &mut DispatcherBuilder) {
dispatch::<chunk_serialize::Sys>(dispatch_builder, &[]); dispatch::<chunk_serialize::Sys>(dispatch_builder, &[]);
// don't depend on chunk_serialize, as we assume everything is done in a SlowJow // don't depend on chunk_serialize, as we assume everything is done in a SlowJow
dispatch::<chunk_send::Sys>(dispatch_builder, &[]); dispatch::<chunk_send::Sys>(dispatch_builder, &[]);
dispatch::<server_info::Sys>(dispatch_builder, &[]);
} }
pub fn run_sync_systems(ecs: &mut specs::World) { pub fn run_sync_systems(ecs: &mut specs::World) {

View File

@ -0,0 +1,40 @@
use crate::{client::Client, query_server::ServerInfoResponse, Settings};
use common_ecs::{Job, Origin, Phase, System};
use specs::{Read, ReadStorage, WriteExpect};
use tokio::sync::mpsc::UnboundedReceiver;
use tracing::error;
/// TODO: description
#[derive(Default)]
pub struct Sys;
impl<'a> System<'a> for Sys {
type SystemData = (
ReadStorage<'a, Client>,
Read<'a, Settings>,
WriteExpect<'a, UnboundedReceiver<ServerInfoRequest>>,
);
const NAME: &'static str = "server_info";
const ORIGIN: Origin = Origin::Server;
const PHASE: Phase = Phase::Create;
fn run(_job: &mut Job<Self>, (clients, settings, mut receiver): Self::SystemData) {
let players_current = (&clients).count() as u16;
let server_info = ServerInfoResponse {
players_current,
players_max: settings.max_players,
git_hash: common::util::GIT_HASH.to_owned(),
battle_mode: settings.gameplay.battle_mode.into(),
};
while let Ok(request) = receiver.try_recv() {
if let Err(e) = request.response_sender.send(server_info.clone()) {
error!(?e, "Failed to process System Info request!");
}
}
}
}
pub struct ServerInfoRequest {
pub response_sender: tokio::sync::oneshot::Sender<ServerInfoResponse>,
}