From 38f4b8b64476039429839b628396ca10304178a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Sun, 15 Oct 2023 16:57:07 +0200 Subject: [PATCH 1/5] There is no way to moderate ChatMessages not send in global chat. For this way we might investigate in a different approach. The Gameserver returning the latest chat messages via a REST api (protected by a password in settings). A central service can then scrape this endpoint and make it accessable to moderators. We need to make sure to log which moderator sees which messages, especially when whispered. to be sure we might also limit the holding period to a week --- Cargo.lock | 134 ++++++++++++++++++++++++++++++++++++- server/Cargo.toml | 5 +- server/src/lib.rs | 10 ++- server/src/state_ext.rs | 10 +++ server/src/web/chat.rs | 144 ++++++++++++++++++++++++++++++++++++++++ server/src/web/mod.rs | 60 +++++++++++++++++ 6 files changed, 358 insertions(+), 5 deletions(-) create mode 100644 server/src/web/chat.rs create mode 100644 server/src/web/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 15686fcdd5..597643f79e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -412,6 +412,55 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -3408,6 +3457,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "maybe-uninit" version = "2.0.0" @@ -3497,6 +3552,12 @@ dependencies = [ "libmimalloc-sys", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minifb" version = "0.24.0" @@ -4433,6 +4494,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +dependencies = [ + "proc-macro2 1.0.66", + "quote 1.0.33", + "syn 2.0.29", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -5566,6 +5647,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_repr" version = "0.1.16" @@ -5586,6 +5677,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha1" version = "0.6.1" @@ -6106,6 +6209,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "tap" version = "1.0.1" @@ -6428,6 +6537,28 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -7086,6 +7217,7 @@ version = "0.15.0" dependencies = [ "atomicwrites", "authc", + "axum", "bincode", "censor", "chrono", @@ -7096,6 +7228,7 @@ dependencies = [ "futures-util", "hashbrown 0.13.2", "humantime", + "hyper", "itertools", "lazy_static", "noise", @@ -7103,7 +7236,6 @@ dependencies = [ "parking_lot 0.12.1", "portpicker", "prometheus", - "prometheus-hyper", "quinn", "rand 0.8.5", "rayon", diff --git a/server/Cargo.toml b/server/Cargo.toml index 0d673d61c7..079faeb39a 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -39,7 +39,6 @@ tracing = { workspace = true } vek = { workspace = true } futures-util = { workspace = true } tokio = { workspace = true } -prometheus-hyper = { workspace = true } quinn = "0.10" rustls = { version = "0.21", default-features = false } rustls-pemfile = { version = "1", default-features = false } @@ -65,6 +64,10 @@ enum-map = { workspace = true } noise = { version = "0.7", default-features = false } censor = "0.3" +#HTTP +axum = { version = "0.6.20" } +hyper = "0.14.26" + rusqlite = { version = "0.28.0", features = ["array", "vtab", "bundled", "trace"] } refinery = { version = "0.8.8", features = ["rusqlite"] } diff --git a/server/src/lib.rs b/server/src/lib.rs index 985959cacc..961e869514 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -31,6 +31,7 @@ pub mod sys; #[cfg(feature = "persistent_world")] pub mod terrain_persistence; #[cfg(not(feature = "worldgen"))] mod test_world; +mod web; mod weather; @@ -94,7 +95,6 @@ use persistence::{ character_updater::CharacterUpdater, }; use prometheus::Registry; -use prometheus_hyper::Server as PrometheusServer; use specs::{Builder, Entity as EcsEntity, Entity, Join, LendJoin, WorldExt}; use std::{ i32, @@ -124,7 +124,7 @@ use { common_state::plugin::{memory_manager::EcsWorld, PluginMgr}, }; -use crate::persistence::character_loader::CharacterScreenResponseKind; +use crate::{persistence::character_loader::CharacterScreenResponseKind, web::ChatCache}; use common::comp::Anchor; #[cfg(feature = "worldgen")] pub use world::{ @@ -486,9 +486,13 @@ impl Server { let metrics_shutdown = Arc::new(Notify::new()); let metrics_shutdown_clone = Arc::clone(&metrics_shutdown); let addr = settings.metrics_address; + let (chat_cache, chat_tracker) = ChatCache::new(Duration::from_secs(60)); + state.ecs_mut().insert(chat_tracker); runtime.spawn(async move { - PrometheusServer::run( + web::run( Arc::clone(®istry), + chat_cache, + "secretpassword".to_string(), addr, metrics_shutdown_clone.notified(), ) diff --git a/server/src/state_ext.rs b/server/src/state_ext.rs index 5b146beff4..1330744a2a 100644 --- a/server/src/state_ext.rs +++ b/server/src/state_ext.rs @@ -8,6 +8,7 @@ use crate::{ rtsim::RtSim, settings::Settings, sys::sentinel::DeletedEntities, + web::ChatExporter, wiring, BattleModeBuffer, SpawnPoint, }; use common::{ @@ -900,6 +901,7 @@ impl StateExt for State { |target, a: &comp::Pos, b: &comp::Pos| a.0.distance_squared(b.0) < target * target; let group_manager = ecs.read_resource::(); + let chat_exporter = ecs.read_resource::(); let group_info = msg.get_group().and_then(|g| group_manager.group_info(*g)); @@ -925,6 +927,7 @@ impl StateExt for State { | comp::ChatType::CommandError | comp::ChatType::Meta | comp::ChatType::World(_) => { + chat_exporter.send(resolved_msg.clone()); self.notify_players(ServerGeneral::ChatMsg(resolved_msg)) }, comp::ChatType::Online(u) => { @@ -935,6 +938,7 @@ impl StateExt for State { client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } + chat_exporter.send(resolved_msg); }, comp::ChatType::Tell(from, to) => { for (client, uid) in @@ -944,10 +948,12 @@ impl StateExt for State { client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } + chat_exporter.send(resolved_msg); }, comp::ChatType::Kill(kill_source, uid) => { let clients = ecs.read_storage::(); let clients_count = clients.count(); + chat_exporter.send(resolved_msg.clone()); // Avoid chat spam, send kill message only to group or nearby players if a // certain amount of clients are online if clients_count @@ -1002,6 +1008,7 @@ impl StateExt for State { } } } + chat_exporter.send(resolved_msg); }, comp::ChatType::Region(uid) => { let entity_opt = entity_from_uid(*uid); @@ -1014,6 +1021,7 @@ impl StateExt for State { } } } + chat_exporter.send(resolved_msg); }, comp::ChatType::Npc(uid) => { let entity_opt = entity_from_uid(*uid); @@ -1059,6 +1067,7 @@ impl StateExt for State { client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } + chat_exporter.send(resolved_msg); }, comp::ChatType::Group(from, g) => { if group_info.is_none() { @@ -1080,6 +1089,7 @@ impl StateExt for State { } else { send_to_group(g, ecs, &resolved_msg); } + chat_exporter.send(resolved_msg); }, comp::ChatType::GroupMeta(g) => { send_to_group(g, ecs, &resolved_msg); diff --git a/server/src/web/chat.rs b/server/src/web/chat.rs new file mode 100644 index 0000000000..f935770a58 --- /dev/null +++ b/server/src/web/chat.rs @@ -0,0 +1,144 @@ +use axum::{ + extract::{Query, State}, + middleware::Next, + response::{IntoResponse, Response}, + routing::get, + Json, Router, +}; +use chrono::{DateTime, Utc}; +use common::comp::ChatMsg; +use hyper::{Request, StatusCode}; +use serde::{Deserialize, Deserializer, Serialize}; +use std::{ + borrow::Cow, collections::VecDeque, mem::size_of, ops::Sub, str::FromStr, sync::Arc, + time::Duration, +}; +use tokio::sync::Mutex; + +/// The chat cache gets it data from the gameserver and will keep it for some +/// time It will be made available for its consumers, the REST Api +#[derive(Clone)] +pub struct ChatCache { + messages: Arc>>, +} + +/// Keep Size small, so we dont have to Clone much for each request. +#[derive(Clone)] +struct ChatToken { + secret_token: Cow<'static, str>, +} + +pub struct ChatExporter { + messages: Arc>>, + keep_duration: chrono::Duration, +} + +impl ChatExporter { + pub fn send(&self, msg: ChatMsg) { + let time = Utc::now(); + let drop_older_than = time.sub(self.keep_duration); + let mut messages = self.messages.blocking_lock(); + while let Some(msg) = messages.front() && msg.time < drop_older_than { + messages.pop_front(); + } + messages.push_back(ChatMessage { time, msg }); + const MAX_CACHE_BYTES: usize = 10_000_000; // approx. because HashMap allocates on Heap + if messages.len() * size_of::() > MAX_CACHE_BYTES { + let msg_count = messages.len(); + tracing::debug!(?msg_count, "shrinking cache"); + messages.shrink_to_fit(); + } + } +} + +impl ChatCache { + pub fn new(keep_duration: Duration) -> (Self, ChatExporter) { + let messages: Arc>> = Default::default(); + let messages_clone = Arc::clone(&messages); + let keep_duration = chrono::Duration::from_std(keep_duration).unwrap(); + + (Self { messages }, ChatExporter { + messages: messages_clone, + keep_duration, + }) + } +} + +async fn validate_secret( + State(token): State, + req: Request, + next: Next, +) -> Result { + pub const X_SECRET_TOKEN: &str = "X-Secret-Token"; + let session_cookie = req + .headers() + .get(X_SECRET_TOKEN) + .ok_or(StatusCode::UNAUTHORIZED)?; + + if session_cookie.as_bytes() != token.secret_token.as_bytes() { + return Err(StatusCode::UNAUTHORIZED); + } + + Ok(next.run(req).await) +} + +pub fn router(cache: ChatCache, secret_token: String) -> Router { + let token = ChatToken { + secret_token: Cow::Owned(secret_token), + }; + Router::new() + .route("/history", get(history)) + .layer(axum::middleware::from_fn_with_state(token, validate_secret)) + .with_state(cache) +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct ChatMessage { + time: DateTime, + msg: ChatMsg, +} + +#[derive(Debug, Deserialize)] +struct Params { + #[serde(default, deserialize_with = "empty_string_as_none")] + /// To be used to get all messages without duplicates nor losing messages + from_time_exclusive_rfc3339: Option, +} + +fn empty_string_as_none<'de, D, T>(de: D) -> Result, D::Error> +where + D: Deserializer<'de>, + T: FromStr, + T::Err: core::fmt::Display, +{ + let opt = Option::::deserialize(de)?; + match opt.as_deref() { + None | Some("") => Ok(None), + Some(s) => FromStr::from_str(s) + .map_err(serde::de::Error::custom) + .map(Some), + } +} + +async fn history( + State(cache): State, + Query(params): Query, +) -> Result { + // first validate parameters before we take lock + let from_time_exclusive = if let Some(rfc3339) = params.from_time_exclusive_rfc3339 { + Some(DateTime::parse_from_rfc3339(&rfc3339).map_err(|_| StatusCode::BAD_REQUEST)?) + } else { + None + }; + + let messages = cache.messages.lock().await; + let filtered: Vec<_> = match from_time_exclusive { + Some(from_time_exclusive) => messages + .iter() + .filter(|msg| msg.time > from_time_exclusive) + .cloned() + .collect(), + None => messages.iter().cloned().collect(), + }; + Ok(Json(filtered)) +} diff --git a/server/src/web/mod.rs b/server/src/web/mod.rs new file mode 100644 index 0000000000..bc3471c3ad --- /dev/null +++ b/server/src/web/mod.rs @@ -0,0 +1,60 @@ +use axum::{extract::State, response::IntoResponse, routing::get, Router}; +use core::{future::Future, ops::Deref}; +use hyper::{header, http, Body, StatusCode}; +use prometheus::{Registry, TextEncoder}; +use std::net::SocketAddr; + +mod chat; +pub use chat::{ChatCache, ChatExporter}; + +pub async fn run( + registry: R, + cache: ChatCache, + chat_token: String, + addr: S, + shutdown: F, +) -> Result<(), hyper::Error> +where + S: Into, + F: Future, + R: Deref + Clone + Send + Sync + 'static, +{ + let metrics = Router::new() + .route("/", get(metrics)) + .with_state(registry.deref().clone()); + + let app = Router::new() + .nest("/chat/v1", chat::router(cache, chat_token)) + .nest("/metrics", metrics) + .route("/health", get(|| async {})); + + // run it + let addr = addr.into(); + let server = axum::Server::bind(&addr).serve(app.into_make_service()); + let server = server.with_graceful_shutdown(shutdown); + tracing::info!("listening on {}", addr); + match server.await { + Ok(_) => tracing::debug!("webserver shutdown successful"), + Err(e) => tracing::error!(?e, "webserver shutdown error"), + } + + Ok(()) +} + +async fn metrics(State(registry): State) -> Result { + use prometheus::Encoder; + let mf = registry.gather(); + let mut buffer = Vec::with_capacity(1024); + + let encoder = TextEncoder::new(); + encoder + .encode(&mf, &mut buffer) + .expect("write to vec cannot fail"); + + let resp = http::Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "text/plain; charset=utf-8") + .body(Body::from(buffer)) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(resp) +} From adeab7387612112257ed47e028fe0b642560c64c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Sun, 15 Oct 2023 20:17:45 +0200 Subject: [PATCH 2/5] move whole webserver to server-cli --- Cargo.lock | 6 +- server-cli/Cargo.toml | 6 + server-cli/src/main.rs | 26 ++- server-cli/src/settings.rs | 12 +- {server => server-cli}/src/web/chat.rs | 72 ++------ {server => server-cli}/src/web/mod.rs | 6 +- server/Cargo.toml | 4 - server/src/chat.rs | 228 +++++++++++++++++++++++++ server/src/lib.rs | 33 ++-- server/src/settings.rs | 6 - server/src/state_ext.rs | 14 +- 11 files changed, 304 insertions(+), 109 deletions(-) rename {server => server-cli}/src/web/chat.rs (52%) rename {server => server-cli}/src/web/mod.rs (93%) create mode 100644 server/src/chat.rs diff --git a/Cargo.lock b/Cargo.lock index 597643f79e..d89a3ddff2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7217,7 +7217,6 @@ version = "0.15.0" dependencies = [ "atomicwrites", "authc", - "axum", "bincode", "censor", "chrono", @@ -7228,7 +7227,6 @@ dependencies = [ "futures-util", "hashbrown 0.13.2", "humantime", - "hyper", "itertools", "lazy_static", "noise", @@ -7283,12 +7281,16 @@ dependencies = [ name = "veloren-server-cli" version = "0.15.0" dependencies = [ + "axum", "cansi", + "chrono", "clap", "crossterm 0.26.1", + "hyper", "lazy_static", "mimalloc", "num_cpus", + "prometheus", "ron 0.8.1", "serde", "shell-words", diff --git a/server-cli/Cargo.toml b/server-cli/Cargo.toml index 5485670b61..c42ba33618 100644 --- a/server-cli/Cargo.toml +++ b/server-cli/Cargo.toml @@ -47,6 +47,12 @@ tracing = { workspace = true } ron = { workspace = true } serde = { workspace = true, features = [ "rc", "derive" ]} +#HTTP +axum = { version = "0.6.20" } +hyper = "0.14.26" +prometheus = { workspace = true } +chrono = { workspace = true } + [target.'cfg(windows)'.dependencies] mimalloc = "0.1.29" diff --git a/server-cli/src/main.rs b/server-cli/src/main.rs index c269adbb77..13f47e1e99 100644 --- a/server-cli/src/main.rs +++ b/server-cli/src/main.rs @@ -16,6 +16,7 @@ mod settings; mod shutdown_coordinator; mod tui_runner; mod tuilog; +mod web; use crate::{ cli::{Admin, ArgvApp, ArgvCommand, Message, SharedCommand, Shutdown}, shutdown_coordinator::ShutdownCoordinator, @@ -31,6 +32,7 @@ use std::{ sync::{atomic::AtomicBool, mpsc, Arc}, time::{Duration, Instant}, }; +use tokio::sync::Notify; use tracing::{info, trace}; lazy_static::lazy_static! { @@ -185,7 +187,7 @@ fn main() -> io::Result<()> { info!("Starting server..."); let protocols_and_addresses = server_settings.gameserver_protocols.clone(); - let metrics_port = &server_settings.metrics_address.port(); + let web_port = &settings.web_address.port(); // Create server let mut server = Server::new( server_settings, @@ -193,10 +195,27 @@ fn main() -> io::Result<()> { database_settings, &server_data_dir, &|_| {}, - runtime, + Arc::clone(&runtime), ) .expect("Failed to create server instance!"); + let registry = Arc::clone(server.metrics_registry()); + let chat = server.chat_cache().clone(); + let metrics_shutdown = Arc::new(Notify::new()); + let metrics_shutdown_clone = Arc::clone(&metrics_shutdown); + let web_chat_secret = settings.web_chat_secret.clone(); + + runtime.spawn(async move { + web::run( + registry, + chat, + web_chat_secret, + settings.web_address, + metrics_shutdown_clone.notified(), + ) + .await + }); + // Collect addresses that the server is listening to log. let gameserver_addresses = protocols_and_addresses .into_iter() @@ -210,7 +229,7 @@ fn main() -> io::Result<()> { }); info!( - ?metrics_port, + ?web_port, ?gameserver_addresses, "Server is ready to accept connections." ); @@ -243,6 +262,7 @@ fn main() -> io::Result<()> { tick_no += 1; // Terminate the server if instructed to do so by the shutdown coordinator if shutdown_coordinator.check(&mut server, &settings) { + metrics_shutdown.notify_one(); break; } diff --git a/server-cli/src/settings.rs b/server-cli/src/settings.rs index b771a49299..00e30f5354 100644 --- a/server-cli/src/settings.rs +++ b/server-cli/src/settings.rs @@ -1,5 +1,9 @@ use serde::{Deserialize, Serialize}; -use std::{fs, path::PathBuf}; +use std::{ + fs, + net::{Ipv4Addr, SocketAddr}, + path::PathBuf, +}; use tracing::warn; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -7,6 +11,10 @@ use tracing::warn; pub struct Settings { pub update_shutdown_grace_period_secs: u32, pub update_shutdown_message: String, + pub web_address: SocketAddr, + /// SECRET API HEADER used to access the chat api, if disabled the API is + /// unreachable + pub web_chat_secret: Option, } impl Default for Settings { @@ -14,6 +22,8 @@ impl Default for Settings { Self { update_shutdown_grace_period_secs: 120, update_shutdown_message: "The server is restarting for an update".to_owned(), + web_address: SocketAddr::from((Ipv4Addr::LOCALHOST, 14005)), + web_chat_secret: None, } } } diff --git a/server/src/web/chat.rs b/server-cli/src/web/chat.rs similarity index 52% rename from server/src/web/chat.rs rename to server-cli/src/web/chat.rs index f935770a58..81efa3a5f7 100644 --- a/server/src/web/chat.rs +++ b/server-cli/src/web/chat.rs @@ -5,63 +5,16 @@ use axum::{ routing::get, Json, Router, }; -use chrono::{DateTime, Utc}; -use common::comp::ChatMsg; +use chrono::DateTime; use hyper::{Request, StatusCode}; -use serde::{Deserialize, Deserializer, Serialize}; -use std::{ - borrow::Cow, collections::VecDeque, mem::size_of, ops::Sub, str::FromStr, sync::Arc, - time::Duration, -}; -use tokio::sync::Mutex; - -/// The chat cache gets it data from the gameserver and will keep it for some -/// time It will be made available for its consumers, the REST Api -#[derive(Clone)] -pub struct ChatCache { - messages: Arc>>, -} +use serde::{Deserialize, Deserializer}; +use server::chat::ChatCache; +use std::{borrow::Cow, str::FromStr}; /// Keep Size small, so we dont have to Clone much for each request. #[derive(Clone)] struct ChatToken { - secret_token: Cow<'static, str>, -} - -pub struct ChatExporter { - messages: Arc>>, - keep_duration: chrono::Duration, -} - -impl ChatExporter { - pub fn send(&self, msg: ChatMsg) { - let time = Utc::now(); - let drop_older_than = time.sub(self.keep_duration); - let mut messages = self.messages.blocking_lock(); - while let Some(msg) = messages.front() && msg.time < drop_older_than { - messages.pop_front(); - } - messages.push_back(ChatMessage { time, msg }); - const MAX_CACHE_BYTES: usize = 10_000_000; // approx. because HashMap allocates on Heap - if messages.len() * size_of::() > MAX_CACHE_BYTES { - let msg_count = messages.len(); - tracing::debug!(?msg_count, "shrinking cache"); - messages.shrink_to_fit(); - } - } -} - -impl ChatCache { - pub fn new(keep_duration: Duration) -> (Self, ChatExporter) { - let messages: Arc>> = Default::default(); - let messages_clone = Arc::clone(&messages); - let keep_duration = chrono::Duration::from_std(keep_duration).unwrap(); - - (Self { messages }, ChatExporter { - messages: messages_clone, - keep_duration, - }) - } + secret_token: Option>, } async fn validate_secret( @@ -69,22 +22,25 @@ async fn validate_secret( req: Request, next: Next, ) -> Result { + // check if this endpoint is disabled + let secret_token = token.secret_token.ok_or(StatusCode::METHOD_NOT_ALLOWED)?; + pub const X_SECRET_TOKEN: &str = "X-Secret-Token"; let session_cookie = req .headers() .get(X_SECRET_TOKEN) .ok_or(StatusCode::UNAUTHORIZED)?; - if session_cookie.as_bytes() != token.secret_token.as_bytes() { + if session_cookie.as_bytes() != secret_token.as_bytes() { return Err(StatusCode::UNAUTHORIZED); } Ok(next.run(req).await) } -pub fn router(cache: ChatCache, secret_token: String) -> Router { +pub fn router(cache: ChatCache, secret_token: Option) -> Router { let token = ChatToken { - secret_token: Cow::Owned(secret_token), + secret_token: secret_token.map(Cow::Owned), }; Router::new() .route("/history", get(history)) @@ -92,12 +48,6 @@ pub fn router(cache: ChatCache, secret_token: String) -> Router { .with_state(cache) } -#[derive(Debug, Clone, Serialize, Deserialize)] -struct ChatMessage { - time: DateTime, - msg: ChatMsg, -} - #[derive(Debug, Deserialize)] struct Params { #[serde(default, deserialize_with = "empty_string_as_none")] diff --git a/server/src/web/mod.rs b/server-cli/src/web/mod.rs similarity index 93% rename from server/src/web/mod.rs rename to server-cli/src/web/mod.rs index bc3471c3ad..d5f5bfabd3 100644 --- a/server/src/web/mod.rs +++ b/server-cli/src/web/mod.rs @@ -2,15 +2,15 @@ use axum::{extract::State, response::IntoResponse, routing::get, Router}; use core::{future::Future, ops::Deref}; use hyper::{header, http, Body, StatusCode}; use prometheus::{Registry, TextEncoder}; +use server::chat::ChatCache; use std::net::SocketAddr; mod chat; -pub use chat::{ChatCache, ChatExporter}; pub async fn run( registry: R, cache: ChatCache, - chat_token: String, + chat_secret: Option, addr: S, shutdown: F, ) -> Result<(), hyper::Error> @@ -24,7 +24,7 @@ where .with_state(registry.deref().clone()); let app = Router::new() - .nest("/chat/v1", chat::router(cache, chat_token)) + .nest("/chat/v1", chat::router(cache, chat_secret)) .nest("/metrics", metrics) .route("/health", get(|| async {})); diff --git a/server/Cargo.toml b/server/Cargo.toml index 079faeb39a..ff50ae522d 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -64,10 +64,6 @@ enum-map = { workspace = true } noise = { version = "0.7", default-features = false } censor = "0.3" -#HTTP -axum = { version = "0.6.20" } -hyper = "0.14.26" - rusqlite = { version = "0.28.0", features = ["array", "vtab", "bundled", "trace"] } refinery = { version = "0.8.8", features = ["rusqlite"] } diff --git a/server/src/chat.rs b/server/src/chat.rs new file mode 100644 index 0000000000..16fecfbdc4 --- /dev/null +++ b/server/src/chat.rs @@ -0,0 +1,228 @@ +use chrono::{DateTime, Utc}; +use common::{ + comp, + comp::{chat::KillType, ChatType, Content, Group, Player, UnresolvedChatMsg}, + uid::IdMaps, + uuid::Uuid, +}; +use serde::{Deserialize, Serialize}; +use specs::{Join, World, WorldExt}; +use std::{collections::VecDeque, ops::Sub, sync::Arc, time::Duration}; +use tokio::sync::Mutex; + +#[derive(Clone, Serialize, Deserialize)] +pub struct PlayerInfo { + uuid: Uuid, + alias: String, +} + +#[derive(Clone, Serialize, Deserialize)] +pub enum KillSource { + Player(PlayerInfo, KillType), + NonPlayer(String, KillType), + NonExistent(KillType), + Environment(String), + FallDamage, + Suicide, + Other, +} + +#[derive(Clone, Serialize, Deserialize)] +/// partially mapped to common::comp::ChatMsg +pub enum ChatParties { + Online(PlayerInfo), + Offline(PlayerInfo), + CommandInfo(PlayerInfo), + CommandError(PlayerInfo), + Kill(KillSource, PlayerInfo), + GroupMeta(Vec), + Group(PlayerInfo, Vec), + Tell(PlayerInfo, PlayerInfo), + Say(PlayerInfo), + FactionMeta(String), + Faction(PlayerInfo, String), + Region(PlayerInfo), + World(PlayerInfo), +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct ChatMessage { + pub time: DateTime, + pub parties: ChatParties, + pub content: Content, +} + +type MessagesStore = Arc>>; + +/// The chat cache gets it data from the gameserver and will keep it for some +/// time It will be made available for its consumers, the REST Api +#[derive(Clone)] +pub struct ChatCache { + pub messages: MessagesStore, +} + +pub struct ChatExporter { + messages: MessagesStore, + keep_duration: chrono::Duration, +} + +impl ChatMessage { + fn new(chatmsg: &UnresolvedChatMsg, parties: ChatParties) -> Self { + ChatMessage { + time: Utc::now(), + content: chatmsg.content().clone(), + parties, + } + } +} + +impl ChatExporter { + pub fn generate(chatmsg: &UnresolvedChatMsg, ecs: &World) -> Option { + let id_maps = ecs.read_resource::(); + let players = ecs.read_storage::(); + let player_info_from_uid = |uid| { + id_maps + .uid_entity(uid) + .and_then(|entry| players.get(entry)) + .map(|player| PlayerInfo { + alias: player.alias.clone(), + uuid: player.uuid(), + }) + }; + let group_members_from_group = |g| -> Vec<_> { + let groups = ecs.read_storage::(); + (&players, &groups) + .join() + .filter_map(|(player, group)| { + if g == group { + Some(PlayerInfo { + alias: player.alias.clone(), + uuid: player.uuid(), + }) + } else { + None + } + }) + .collect() + }; + + match &chatmsg.chat_type { + ChatType::Offline(from) => { + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new(chatmsg, ChatParties::Offline(player_info))); + } + }, + ChatType::Online(from) => { + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new(chatmsg, ChatParties::Online(player_info))); + } + }, + ChatType::Region(from) => { + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new(chatmsg, ChatParties::Region(player_info))); + } + }, + ChatType::World(from) => { + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new(chatmsg, ChatParties::World(player_info))); + } + }, + ChatType::Say(from) => { + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new(chatmsg, ChatParties::Say(player_info))); + } + }, + ChatType::Tell(from, to) => { + if let (Some(from_player_info), Some(to_player_info)) = + (player_info_from_uid(*from), player_info_from_uid(*to)) + { + return Some(ChatMessage::new( + chatmsg, + ChatParties::Tell(from_player_info, to_player_info), + )); + } + }, + ChatType::Kill(kill_source, from) => { + let kill_source = match kill_source.clone() { + comp::chat::KillSource::Player(uid, t) => { + if let Some(player_info) = player_info_from_uid(uid) { + KillSource::Player(player_info, t) + } else { + return None; + } + }, + comp::chat::KillSource::NonPlayer(str, t) => KillSource::NonPlayer(str, t), + comp::chat::KillSource::NonExistent(t) => KillSource::NonExistent(t), + comp::chat::KillSource::Environment(str) => KillSource::Environment(str), + comp::chat::KillSource::FallDamage => KillSource::FallDamage, + comp::chat::KillSource::Suicide => KillSource::Suicide, + comp::chat::KillSource::Other => KillSource::Other, + }; + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new( + chatmsg, + ChatParties::Kill(kill_source, player_info), + )); + } + }, + ChatType::FactionMeta(s) => { + return Some(ChatMessage::new( + chatmsg, + ChatParties::FactionMeta(s.clone()), + )); + }, + ChatType::Faction(from, s) => { + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new( + chatmsg, + ChatParties::Faction(player_info, s.clone()), + )); + } + }, + ChatType::GroupMeta(g) => { + let members = group_members_from_group(g); + return Some(ChatMessage::new(chatmsg, ChatParties::GroupMeta(members))); + }, + ChatType::Group(from, g) => { + let members = group_members_from_group(g); + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new( + chatmsg, + ChatParties::Group(player_info, members), + )); + } + }, + _ => (), + }; + + None + } + + pub fn send(&self, msg: ChatMessage) { + let drop_older_than = msg.time.sub(self.keep_duration); + let mut messages = self.messages.blocking_lock(); + while let Some(msg) = messages.front() && msg.time < drop_older_than { + messages.pop_front(); + } + messages.push_back(msg); + const MAX_CACHE_MESSAGES: usize = 10_000; // in case we have a short spam of many many messages, we dont want to keep the capacity forever + if messages.capacity() > messages.len() + MAX_CACHE_MESSAGES { + let msg_count = messages.len(); + tracing::debug!(?msg_count, "shrinking cache"); + messages.shrink_to_fit(); + } + } +} + +impl ChatCache { + pub fn new(keep_duration: Duration) -> (Self, ChatExporter) { + let messages: Arc>> = Default::default(); + let messages_clone = Arc::clone(&messages); + let keep_duration = chrono::Duration::from_std(keep_duration).unwrap(); + + (Self { messages }, ChatExporter { + messages: messages_clone, + keep_duration, + }) + } +} diff --git a/server/src/lib.rs b/server/src/lib.rs index 961e869514..051abe6e08 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -8,6 +8,7 @@ pub mod automod; mod character_creator; +pub mod chat; pub mod chunk_generator; mod chunk_serialize; pub mod client; @@ -31,7 +32,6 @@ pub mod sys; #[cfg(feature = "persistent_world")] pub mod terrain_persistence; #[cfg(not(feature = "worldgen"))] mod test_world; -mod web; mod weather; @@ -104,7 +104,7 @@ use std::{ }; #[cfg(not(feature = "worldgen"))] use test_world::{IndexOwned, World}; -use tokio::{runtime::Runtime, sync::Notify}; +use tokio::runtime::Runtime; use tracing::{debug, error, info, trace, warn}; use vek::*; pub use world::{civ::WorldCivStage, sim::WorldSimStage, WorldGenerateStage}; @@ -124,7 +124,7 @@ use { common_state::plugin::{memory_manager::EcsWorld, PluginMgr}, }; -use crate::{persistence::character_loader::CharacterScreenResponseKind, web::ChatCache}; +use crate::{chat::ChatCache, persistence::character_loader::CharacterScreenResponseKind}; use common::comp::Anchor; #[cfg(feature = "worldgen")] pub use world::{ @@ -211,7 +211,8 @@ pub struct Server { runtime: Arc, - metrics_shutdown: Arc, + metrics_registry: Arc, + chat_cache: ChatCache, database_settings: Arc>, disconnect_all_clients_requested: bool, @@ -483,21 +484,8 @@ impl Server { state.ecs_mut().insert(DeletedEntities::default()); let network = Network::new_with_registry(Pid::new(), &runtime, ®istry); - let metrics_shutdown = Arc::new(Notify::new()); - let metrics_shutdown_clone = Arc::clone(&metrics_shutdown); - let addr = settings.metrics_address; let (chat_cache, chat_tracker) = ChatCache::new(Duration::from_secs(60)); state.ecs_mut().insert(chat_tracker); - runtime.spawn(async move { - web::run( - Arc::clone(®istry), - chat_cache, - "secretpassword".to_string(), - addr, - metrics_shutdown_clone.notified(), - ) - .await - }); let mut printed_quic_warning = false; for protocol in &settings.gameserver_protocols { @@ -603,7 +591,8 @@ impl Server { connection_handler, runtime, - metrics_shutdown, + metrics_registry: registry, + chat_cache, database_settings, disconnect_all_clients_requested: false, @@ -668,6 +657,12 @@ impl Server { /// Get a reference to the server's world. pub fn world(&self) -> &World { &self.world } + /// Get a reference to the Metrics Registry + pub fn metrics_registry(&self) -> &Arc { &self.metrics_registry } + + /// Get a reference to the Chat Cache + pub fn chat_cache(&self) -> &ChatCache { &self.chat_cache } + fn parse_locations(&self, character_list_data: &mut [CharacterItem]) { character_list_data.iter_mut().for_each(|c| { let name = c @@ -1497,8 +1492,6 @@ impl Server { impl Drop for Server { fn drop(&mut self) { - self.metrics_shutdown.notify_one(); - self.state .notify_players(ServerGeneral::Disconnect(DisconnectReason::Shutdown)); diff --git a/server/src/settings.rs b/server/src/settings.rs index caf7a968d9..4756011a66 100644 --- a/server/src/settings.rs +++ b/server/src/settings.rs @@ -160,7 +160,6 @@ impl CalendarMode { #[serde(default)] pub struct Settings { pub gameserver_protocols: Vec, - pub metrics_address: SocketAddr, pub auth_server_address: Option, pub max_players: u16, pub world_seed: u32, @@ -202,7 +201,6 @@ impl Default for Settings { address: SocketAddr::from((Ipv4Addr::UNSPECIFIED, 14004)), }, ], - metrics_address: SocketAddr::from((Ipv4Addr::LOCALHOST, 14005)), auth_server_address: Some("https://auth.veloren.net".into()), world_seed: DEFAULT_WORLD_SEED, server_name: "Veloren Server".into(), @@ -285,10 +283,6 @@ impl Settings { pick_unused_port().expect("Failed to find unused port!"), )), }], - metrics_address: SocketAddr::from(( - Ipv4Addr::LOCALHOST, - pick_unused_port().expect("Failed to find unused port!"), - )), auth_server_address: None, // If loading the default map file, make sure the seed is also default. world_seed: if load.map_file.is_some() { diff --git a/server/src/state_ext.rs b/server/src/state_ext.rs index 1330744a2a..716912fac1 100644 --- a/server/src/state_ext.rs +++ b/server/src/state_ext.rs @@ -1,5 +1,6 @@ use crate::{ automod::AutoMod, + chat::ChatExporter, client::Client, events::{self, update_map_markers}, persistence::PersistedComponents, @@ -8,7 +9,6 @@ use crate::{ rtsim::RtSim, settings::Settings, sys::sentinel::DeletedEntities, - web::ChatExporter, wiring, BattleModeBuffer, SpawnPoint, }; use common::{ @@ -905,6 +905,10 @@ impl StateExt for State { let group_info = msg.get_group().and_then(|g| group_manager.group_info(*g)); + if let Some(exported_message) = ChatExporter::generate(&msg, ecs) { + chat_exporter.send(exported_message); + } + let resolved_msg = msg .clone() .map_group(|_| group_info.map_or_else(|| "???".to_string(), |i| i.name.clone())); @@ -927,7 +931,6 @@ impl StateExt for State { | comp::ChatType::CommandError | comp::ChatType::Meta | comp::ChatType::World(_) => { - chat_exporter.send(resolved_msg.clone()); self.notify_players(ServerGeneral::ChatMsg(resolved_msg)) }, comp::ChatType::Online(u) => { @@ -938,7 +941,6 @@ impl StateExt for State { client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } - chat_exporter.send(resolved_msg); }, comp::ChatType::Tell(from, to) => { for (client, uid) in @@ -948,12 +950,10 @@ impl StateExt for State { client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } - chat_exporter.send(resolved_msg); }, comp::ChatType::Kill(kill_source, uid) => { let clients = ecs.read_storage::(); let clients_count = clients.count(); - chat_exporter.send(resolved_msg.clone()); // Avoid chat spam, send kill message only to group or nearby players if a // certain amount of clients are online if clients_count @@ -1008,7 +1008,6 @@ impl StateExt for State { } } } - chat_exporter.send(resolved_msg); }, comp::ChatType::Region(uid) => { let entity_opt = entity_from_uid(*uid); @@ -1021,7 +1020,6 @@ impl StateExt for State { } } } - chat_exporter.send(resolved_msg); }, comp::ChatType::Npc(uid) => { let entity_opt = entity_from_uid(*uid); @@ -1067,7 +1065,6 @@ impl StateExt for State { client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } - chat_exporter.send(resolved_msg); }, comp::ChatType::Group(from, g) => { if group_info.is_none() { @@ -1089,7 +1086,6 @@ impl StateExt for State { } else { send_to_group(g, ecs, &resolved_msg); } - chat_exporter.send(resolved_msg); }, comp::ChatType::GroupMeta(g) => { send_to_group(g, ecs, &resolved_msg); From 360cc4a4b708f501b3f95e7d6a6e75f569bd9356 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Wed, 18 Oct 2023 10:05:15 +0200 Subject: [PATCH 3/5] Move most of the locking work to a seperate tokio spawn thread --- server/src/chat.rs | 57 ++++++++++++++++++++++++++++++++++------------ server/src/lib.rs | 2 +- 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/server/src/chat.rs b/server/src/chat.rs index 16fecfbdc4..aa7d8d4be8 100644 --- a/server/src/chat.rs +++ b/server/src/chat.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use specs::{Join, World, WorldExt}; use std::{collections::VecDeque, ops::Sub, sync::Arc, time::Duration}; use tokio::sync::Mutex; +use tracing::{info_span, Instrument}; #[derive(Clone, Serialize, Deserialize)] pub struct PlayerInfo { @@ -61,11 +62,17 @@ pub struct ChatCache { pub messages: MessagesStore, } -pub struct ChatExporter { +/// Will internally run on tokio and take stress from main loop +struct ChatForwarder { + chat_r: tokio::sync::mpsc::Receiver, messages: MessagesStore, keep_duration: chrono::Duration, } +pub struct ChatExporter { + chat_s: tokio::sync::mpsc::Sender, +} + impl ChatMessage { fn new(chatmsg: &UnresolvedChatMsg, parties: ChatParties) -> Self { ChatMessage { @@ -199,30 +206,50 @@ impl ChatExporter { } pub fn send(&self, msg: ChatMessage) { - let drop_older_than = msg.time.sub(self.keep_duration); - let mut messages = self.messages.blocking_lock(); - while let Some(msg) = messages.front() && msg.time < drop_older_than { - messages.pop_front(); + if let Err(e) = self.chat_s.blocking_send(msg) { + tracing::warn!( + ?e, + "could not export chat message. the tokio sender seems to be broken" + ); } - messages.push_back(msg); - const MAX_CACHE_MESSAGES: usize = 10_000; // in case we have a short spam of many many messages, we dont want to keep the capacity forever - if messages.capacity() > messages.len() + MAX_CACHE_MESSAGES { - let msg_count = messages.len(); - tracing::debug!(?msg_count, "shrinking cache"); - messages.shrink_to_fit(); + } +} + +impl ChatForwarder { + async fn run(mut self) { + while let Some(msg) = self.chat_r.recv().await { + let drop_older_than = msg.time.sub(self.keep_duration); + let mut messages = self.messages.lock().await; + while let Some(msg) = messages.front() && msg.time < drop_older_than { + messages.pop_front(); + } + messages.push_back(msg); + const MAX_CACHE_MESSAGES: usize = 10_000; // in case we have a short spam of many many messages, we dont want to keep the capacity forever + if messages.capacity() > messages.len() + MAX_CACHE_MESSAGES { + let msg_count = messages.len(); + tracing::debug!(?msg_count, "shrinking cache"); + messages.shrink_to_fit(); + } } } } impl ChatCache { - pub fn new(keep_duration: Duration) -> (Self, ChatExporter) { + pub fn new(keep_duration: Duration, runtime: &tokio::runtime::Runtime) -> (Self, ChatExporter) { + const BUFFER_SIZE: usize = 1_000; + let (chat_s, chat_r) = tokio::sync::mpsc::channel(BUFFER_SIZE); let messages: Arc>> = Default::default(); let messages_clone = Arc::clone(&messages); let keep_duration = chrono::Duration::from_std(keep_duration).unwrap(); - (Self { messages }, ChatExporter { - messages: messages_clone, + let worker = ChatForwarder { keep_duration, - }) + chat_r, + messages: messages_clone, + }; + + runtime.spawn(worker.run().instrument(info_span!("chat_forwarder"))); + + (Self { messages }, ChatExporter { chat_s }) } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 051abe6e08..df46ee9b78 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -484,7 +484,7 @@ impl Server { state.ecs_mut().insert(DeletedEntities::default()); let network = Network::new_with_registry(Pid::new(), &runtime, ®istry); - let (chat_cache, chat_tracker) = ChatCache::new(Duration::from_secs(60)); + let (chat_cache, chat_tracker) = ChatCache::new(Duration::from_secs(60), &runtime); state.ecs_mut().insert(chat_tracker); let mut printed_quic_warning = false; From 0baab58928845c76ed323f790f5d7747a78a3cf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Wed, 18 Oct 2023 10:16:44 +0200 Subject: [PATCH 4/5] imbris though the Cow would get Cloned anyway, so we go directly for String for now. OnceCell might be a thing in the future, but not now. Also logging errors in case metrics cannot be build --- server-cli/src/main.rs | 2 +- server-cli/src/web/chat.rs | 8 +++----- server-cli/src/web/mod.rs | 11 ++++++++--- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/server-cli/src/main.rs b/server-cli/src/main.rs index 13f47e1e99..68f462eb31 100644 --- a/server-cli/src/main.rs +++ b/server-cli/src/main.rs @@ -262,7 +262,6 @@ fn main() -> io::Result<()> { tick_no += 1; // Terminate the server if instructed to do so by the shutdown coordinator if shutdown_coordinator.check(&mut server, &settings) { - metrics_shutdown.notify_one(); break; } @@ -337,6 +336,7 @@ fn main() -> io::Result<()> { #[cfg(feature = "tracy")] common_base::tracy_client::frame_mark(); } + metrics_shutdown.notify_one(); Ok(()) } diff --git a/server-cli/src/web/chat.rs b/server-cli/src/web/chat.rs index 81efa3a5f7..1d7f71f6f0 100644 --- a/server-cli/src/web/chat.rs +++ b/server-cli/src/web/chat.rs @@ -9,12 +9,12 @@ use chrono::DateTime; use hyper::{Request, StatusCode}; use serde::{Deserialize, Deserializer}; use server::chat::ChatCache; -use std::{borrow::Cow, str::FromStr}; +use std::str::FromStr; /// Keep Size small, so we dont have to Clone much for each request. #[derive(Clone)] struct ChatToken { - secret_token: Option>, + secret_token: Option, } async fn validate_secret( @@ -39,9 +39,7 @@ async fn validate_secret( } pub fn router(cache: ChatCache, secret_token: Option) -> Router { - let token = ChatToken { - secret_token: secret_token.map(Cow::Owned), - }; + let token = ChatToken { secret_token }; Router::new() .route("/history", get(history)) .layer(axum::middleware::from_fn_with_state(token, validate_secret)) diff --git a/server-cli/src/web/mod.rs b/server-cli/src/web/mod.rs index d5f5bfabd3..edf9f2bbe9 100644 --- a/server-cli/src/web/mod.rs +++ b/server-cli/src/web/mod.rs @@ -51,10 +51,15 @@ async fn metrics(State(registry): State) -> Result { + tracing::warn!(?e, "could not export metrics to HTTP format"); + Err(StatusCode::INTERNAL_SERVER_ERROR) + }, + Ok(r) => Ok(r), + } } From 9e5bd1e463bb21fc3f4490e47d4942e71d5d37c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Wed, 18 Oct 2023 13:05:22 +0200 Subject: [PATCH 5/5] add a log for each person accessing the chat api --- common/frontend/src/lib.rs | 2 +- server-cli/src/web/chat.rs | 34 ++++++++++++++++++++++++++++++++-- server-cli/src/web/mod.rs | 3 ++- 3 files changed, 35 insertions(+), 4 deletions(-) diff --git a/common/frontend/src/lib.rs b/common/frontend/src/lib.rs index f46e754c53..97120a75d4 100644 --- a/common/frontend/src/lib.rs +++ b/common/frontend/src/lib.rs @@ -60,7 +60,7 @@ where "veloren_server::events::entity_manipulation=info", "hyper=info", "prometheus_hyper=info", - "mio::pool=info", + "mio::poll=info", "mio::sys::windows=info", "h2=info", "tokio_util=info", diff --git a/server-cli/src/web/chat.rs b/server-cli/src/web/chat.rs index 1d7f71f6f0..3396bc94cc 100644 --- a/server-cli/src/web/chat.rs +++ b/server-cli/src/web/chat.rs @@ -1,5 +1,5 @@ use axum::{ - extract::{Query, State}, + extract::{ConnectInfo, Query, State}, middleware::Next, response::{IntoResponse, Response}, routing::get, @@ -9,7 +9,13 @@ use chrono::DateTime; use hyper::{Request, StatusCode}; use serde::{Deserialize, Deserializer}; use server::chat::ChatCache; -use std::str::FromStr; +use std::{ + collections::HashSet, + net::{IpAddr, SocketAddr}, + str::FromStr, + sync::Arc, +}; +use tokio::sync::Mutex; /// Keep Size small, so we dont have to Clone much for each request. #[derive(Clone)] @@ -17,6 +23,11 @@ struct ChatToken { secret_token: Option, } +#[derive(Clone, Default)] +struct IpAddresses { + users: Arc>>, +} + async fn validate_secret( State(token): State, req: Request, @@ -38,10 +49,29 @@ async fn validate_secret( Ok(next.run(req).await) } +/// Logs each new IP address that accesses this API authenticated +async fn log_users( + State(ip_addresses): State, + ConnectInfo(addr): ConnectInfo, + req: Request, + next: Next, +) -> Result { + let mut ip_addresses = ip_addresses.users.lock().await; + let ip_addr = addr.ip(); + if !ip_addresses.contains(&ip_addr) { + ip_addresses.insert(ip_addr); + let users_so_far = ip_addresses.len(); + tracing::info!(?ip_addr, ?users_so_far, "Is accessing the /chat endpoint"); + } + Ok(next.run(req).await) +} + pub fn router(cache: ChatCache, secret_token: Option) -> Router { let token = ChatToken { secret_token }; + let ip_addrs = IpAddresses::default(); Router::new() .route("/history", get(history)) + .layer(axum::middleware::from_fn_with_state(ip_addrs, log_users)) .layer(axum::middleware::from_fn_with_state(token, validate_secret)) .with_state(cache) } diff --git a/server-cli/src/web/mod.rs b/server-cli/src/web/mod.rs index edf9f2bbe9..a73f95b456 100644 --- a/server-cli/src/web/mod.rs +++ b/server-cli/src/web/mod.rs @@ -30,7 +30,8 @@ where // run it let addr = addr.into(); - let server = axum::Server::bind(&addr).serve(app.into_make_service()); + let server = + axum::Server::bind(&addr).serve(app.into_make_service_with_connect_info::()); let server = server.with_graceful_shutdown(shutdown); tracing::info!("listening on {}", addr); match server.await {