diff --git a/Cargo.lock b/Cargo.lock index 15686fcdd5..d89a3ddff2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -412,6 +412,55 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -3408,6 +3457,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "maybe-uninit" version = "2.0.0" @@ -3497,6 +3552,12 @@ dependencies = [ "libmimalloc-sys", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minifb" version = "0.24.0" @@ -4433,6 +4494,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +dependencies = [ + "proc-macro2 1.0.66", + "quote 1.0.33", + "syn 2.0.29", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -5566,6 +5647,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_repr" version = "0.1.16" @@ -5586,6 +5677,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha1" version = "0.6.1" @@ -6106,6 +6209,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "tap" version = "1.0.1" @@ -6428,6 +6537,28 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -7103,7 +7234,6 @@ dependencies = [ "parking_lot 0.12.1", "portpicker", "prometheus", - "prometheus-hyper", "quinn", "rand 0.8.5", "rayon", @@ -7151,12 +7281,16 @@ dependencies = [ name = "veloren-server-cli" version = "0.15.0" dependencies = [ + "axum", "cansi", + "chrono", "clap", "crossterm 0.26.1", + "hyper", "lazy_static", "mimalloc", "num_cpus", + "prometheus", "ron 0.8.1", "serde", "shell-words", diff --git a/common/frontend/src/lib.rs b/common/frontend/src/lib.rs index f46e754c53..97120a75d4 100644 --- a/common/frontend/src/lib.rs +++ b/common/frontend/src/lib.rs @@ -60,7 +60,7 @@ where "veloren_server::events::entity_manipulation=info", "hyper=info", "prometheus_hyper=info", - "mio::pool=info", + "mio::poll=info", "mio::sys::windows=info", "h2=info", "tokio_util=info", diff --git a/server-cli/Cargo.toml b/server-cli/Cargo.toml index 5485670b61..c42ba33618 100644 --- a/server-cli/Cargo.toml +++ b/server-cli/Cargo.toml @@ -47,6 +47,12 @@ tracing = { workspace = true } ron = { workspace = true } serde = { workspace = true, features = [ "rc", "derive" ]} +#HTTP +axum = { version = "0.6.20" } +hyper = "0.14.26" +prometheus = { workspace = true } +chrono = { workspace = true } + [target.'cfg(windows)'.dependencies] mimalloc = "0.1.29" diff --git a/server-cli/src/main.rs b/server-cli/src/main.rs index c269adbb77..68f462eb31 100644 --- a/server-cli/src/main.rs +++ b/server-cli/src/main.rs @@ -16,6 +16,7 @@ mod settings; mod shutdown_coordinator; mod tui_runner; mod tuilog; +mod web; use crate::{ cli::{Admin, ArgvApp, ArgvCommand, Message, SharedCommand, Shutdown}, shutdown_coordinator::ShutdownCoordinator, @@ -31,6 +32,7 @@ use std::{ sync::{atomic::AtomicBool, mpsc, Arc}, time::{Duration, Instant}, }; +use tokio::sync::Notify; use tracing::{info, trace}; lazy_static::lazy_static! { @@ -185,7 +187,7 @@ fn main() -> io::Result<()> { info!("Starting server..."); let protocols_and_addresses = server_settings.gameserver_protocols.clone(); - let metrics_port = &server_settings.metrics_address.port(); + let web_port = &settings.web_address.port(); // Create server let mut server = Server::new( server_settings, @@ -193,10 +195,27 @@ fn main() -> io::Result<()> { database_settings, &server_data_dir, &|_| {}, - runtime, + Arc::clone(&runtime), ) .expect("Failed to create server instance!"); + let registry = Arc::clone(server.metrics_registry()); + let chat = server.chat_cache().clone(); + let metrics_shutdown = Arc::new(Notify::new()); + let metrics_shutdown_clone = Arc::clone(&metrics_shutdown); + let web_chat_secret = settings.web_chat_secret.clone(); + + runtime.spawn(async move { + web::run( + registry, + chat, + web_chat_secret, + settings.web_address, + metrics_shutdown_clone.notified(), + ) + .await + }); + // Collect addresses that the server is listening to log. let gameserver_addresses = protocols_and_addresses .into_iter() @@ -210,7 +229,7 @@ fn main() -> io::Result<()> { }); info!( - ?metrics_port, + ?web_port, ?gameserver_addresses, "Server is ready to accept connections." ); @@ -317,6 +336,7 @@ fn main() -> io::Result<()> { #[cfg(feature = "tracy")] common_base::tracy_client::frame_mark(); } + metrics_shutdown.notify_one(); Ok(()) } diff --git a/server-cli/src/settings.rs b/server-cli/src/settings.rs index b771a49299..00e30f5354 100644 --- a/server-cli/src/settings.rs +++ b/server-cli/src/settings.rs @@ -1,5 +1,9 @@ use serde::{Deserialize, Serialize}; -use std::{fs, path::PathBuf}; +use std::{ + fs, + net::{Ipv4Addr, SocketAddr}, + path::PathBuf, +}; use tracing::warn; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -7,6 +11,10 @@ use tracing::warn; pub struct Settings { pub update_shutdown_grace_period_secs: u32, pub update_shutdown_message: String, + pub web_address: SocketAddr, + /// SECRET API HEADER used to access the chat api, if disabled the API is + /// unreachable + pub web_chat_secret: Option, } impl Default for Settings { @@ -14,6 +22,8 @@ impl Default for Settings { Self { update_shutdown_grace_period_secs: 120, update_shutdown_message: "The server is restarting for an update".to_owned(), + web_address: SocketAddr::from((Ipv4Addr::LOCALHOST, 14005)), + web_chat_secret: None, } } } diff --git a/server-cli/src/web/chat.rs b/server-cli/src/web/chat.rs new file mode 100644 index 0000000000..3396bc94cc --- /dev/null +++ b/server-cli/src/web/chat.rs @@ -0,0 +1,122 @@ +use axum::{ + extract::{ConnectInfo, Query, State}, + middleware::Next, + response::{IntoResponse, Response}, + routing::get, + Json, Router, +}; +use chrono::DateTime; +use hyper::{Request, StatusCode}; +use serde::{Deserialize, Deserializer}; +use server::chat::ChatCache; +use std::{ + collections::HashSet, + net::{IpAddr, SocketAddr}, + str::FromStr, + sync::Arc, +}; +use tokio::sync::Mutex; + +/// Keep Size small, so we dont have to Clone much for each request. +#[derive(Clone)] +struct ChatToken { + secret_token: Option, +} + +#[derive(Clone, Default)] +struct IpAddresses { + users: Arc>>, +} + +async fn validate_secret( + State(token): State, + req: Request, + next: Next, +) -> Result { + // check if this endpoint is disabled + let secret_token = token.secret_token.ok_or(StatusCode::METHOD_NOT_ALLOWED)?; + + pub const X_SECRET_TOKEN: &str = "X-Secret-Token"; + let session_cookie = req + .headers() + .get(X_SECRET_TOKEN) + .ok_or(StatusCode::UNAUTHORIZED)?; + + if session_cookie.as_bytes() != secret_token.as_bytes() { + return Err(StatusCode::UNAUTHORIZED); + } + + Ok(next.run(req).await) +} + +/// Logs each new IP address that accesses this API authenticated +async fn log_users( + State(ip_addresses): State, + ConnectInfo(addr): ConnectInfo, + req: Request, + next: Next, +) -> Result { + let mut ip_addresses = ip_addresses.users.lock().await; + let ip_addr = addr.ip(); + if !ip_addresses.contains(&ip_addr) { + ip_addresses.insert(ip_addr); + let users_so_far = ip_addresses.len(); + tracing::info!(?ip_addr, ?users_so_far, "Is accessing the /chat endpoint"); + } + Ok(next.run(req).await) +} + +pub fn router(cache: ChatCache, secret_token: Option) -> Router { + let token = ChatToken { secret_token }; + let ip_addrs = IpAddresses::default(); + Router::new() + .route("/history", get(history)) + .layer(axum::middleware::from_fn_with_state(ip_addrs, log_users)) + .layer(axum::middleware::from_fn_with_state(token, validate_secret)) + .with_state(cache) +} + +#[derive(Debug, Deserialize)] +struct Params { + #[serde(default, deserialize_with = "empty_string_as_none")] + /// To be used to get all messages without duplicates nor losing messages + from_time_exclusive_rfc3339: Option, +} + +fn empty_string_as_none<'de, D, T>(de: D) -> Result, D::Error> +where + D: Deserializer<'de>, + T: FromStr, + T::Err: core::fmt::Display, +{ + let opt = Option::::deserialize(de)?; + match opt.as_deref() { + None | Some("") => Ok(None), + Some(s) => FromStr::from_str(s) + .map_err(serde::de::Error::custom) + .map(Some), + } +} + +async fn history( + State(cache): State, + Query(params): Query, +) -> Result { + // first validate parameters before we take lock + let from_time_exclusive = if let Some(rfc3339) = params.from_time_exclusive_rfc3339 { + Some(DateTime::parse_from_rfc3339(&rfc3339).map_err(|_| StatusCode::BAD_REQUEST)?) + } else { + None + }; + + let messages = cache.messages.lock().await; + let filtered: Vec<_> = match from_time_exclusive { + Some(from_time_exclusive) => messages + .iter() + .filter(|msg| msg.time > from_time_exclusive) + .cloned() + .collect(), + None => messages.iter().cloned().collect(), + }; + Ok(Json(filtered)) +} diff --git a/server-cli/src/web/mod.rs b/server-cli/src/web/mod.rs new file mode 100644 index 0000000000..a73f95b456 --- /dev/null +++ b/server-cli/src/web/mod.rs @@ -0,0 +1,66 @@ +use axum::{extract::State, response::IntoResponse, routing::get, Router}; +use core::{future::Future, ops::Deref}; +use hyper::{header, http, Body, StatusCode}; +use prometheus::{Registry, TextEncoder}; +use server::chat::ChatCache; +use std::net::SocketAddr; + +mod chat; + +pub async fn run( + registry: R, + cache: ChatCache, + chat_secret: Option, + addr: S, + shutdown: F, +) -> Result<(), hyper::Error> +where + S: Into, + F: Future, + R: Deref + Clone + Send + Sync + 'static, +{ + let metrics = Router::new() + .route("/", get(metrics)) + .with_state(registry.deref().clone()); + + let app = Router::new() + .nest("/chat/v1", chat::router(cache, chat_secret)) + .nest("/metrics", metrics) + .route("/health", get(|| async {})); + + // run it + let addr = addr.into(); + let server = + axum::Server::bind(&addr).serve(app.into_make_service_with_connect_info::()); + let server = server.with_graceful_shutdown(shutdown); + tracing::info!("listening on {}", addr); + match server.await { + Ok(_) => tracing::debug!("webserver shutdown successful"), + Err(e) => tracing::error!(?e, "webserver shutdown error"), + } + + Ok(()) +} + +async fn metrics(State(registry): State) -> Result { + use prometheus::Encoder; + let mf = registry.gather(); + let mut buffer = Vec::with_capacity(1024); + + let encoder = TextEncoder::new(); + encoder + .encode(&mf, &mut buffer) + .expect("write to vec cannot fail"); + + match http::Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "text/plain; charset=utf-8") + .body(Body::from(buffer)) + { + Err(e) => { + tracing::warn!(?e, "could not export metrics to HTTP format"); + Err(StatusCode::INTERNAL_SERVER_ERROR) + }, + Ok(r) => Ok(r), + } +} diff --git a/server/Cargo.toml b/server/Cargo.toml index 0d673d61c7..ff50ae522d 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -39,7 +39,6 @@ tracing = { workspace = true } vek = { workspace = true } futures-util = { workspace = true } tokio = { workspace = true } -prometheus-hyper = { workspace = true } quinn = "0.10" rustls = { version = "0.21", default-features = false } rustls-pemfile = { version = "1", default-features = false } diff --git a/server/src/chat.rs b/server/src/chat.rs new file mode 100644 index 0000000000..aa7d8d4be8 --- /dev/null +++ b/server/src/chat.rs @@ -0,0 +1,255 @@ +use chrono::{DateTime, Utc}; +use common::{ + comp, + comp::{chat::KillType, ChatType, Content, Group, Player, UnresolvedChatMsg}, + uid::IdMaps, + uuid::Uuid, +}; +use serde::{Deserialize, Serialize}; +use specs::{Join, World, WorldExt}; +use std::{collections::VecDeque, ops::Sub, sync::Arc, time::Duration}; +use tokio::sync::Mutex; +use tracing::{info_span, Instrument}; + +#[derive(Clone, Serialize, Deserialize)] +pub struct PlayerInfo { + uuid: Uuid, + alias: String, +} + +#[derive(Clone, Serialize, Deserialize)] +pub enum KillSource { + Player(PlayerInfo, KillType), + NonPlayer(String, KillType), + NonExistent(KillType), + Environment(String), + FallDamage, + Suicide, + Other, +} + +#[derive(Clone, Serialize, Deserialize)] +/// partially mapped to common::comp::ChatMsg +pub enum ChatParties { + Online(PlayerInfo), + Offline(PlayerInfo), + CommandInfo(PlayerInfo), + CommandError(PlayerInfo), + Kill(KillSource, PlayerInfo), + GroupMeta(Vec), + Group(PlayerInfo, Vec), + Tell(PlayerInfo, PlayerInfo), + Say(PlayerInfo), + FactionMeta(String), + Faction(PlayerInfo, String), + Region(PlayerInfo), + World(PlayerInfo), +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct ChatMessage { + pub time: DateTime, + pub parties: ChatParties, + pub content: Content, +} + +type MessagesStore = Arc>>; + +/// The chat cache gets it data from the gameserver and will keep it for some +/// time It will be made available for its consumers, the REST Api +#[derive(Clone)] +pub struct ChatCache { + pub messages: MessagesStore, +} + +/// Will internally run on tokio and take stress from main loop +struct ChatForwarder { + chat_r: tokio::sync::mpsc::Receiver, + messages: MessagesStore, + keep_duration: chrono::Duration, +} + +pub struct ChatExporter { + chat_s: tokio::sync::mpsc::Sender, +} + +impl ChatMessage { + fn new(chatmsg: &UnresolvedChatMsg, parties: ChatParties) -> Self { + ChatMessage { + time: Utc::now(), + content: chatmsg.content().clone(), + parties, + } + } +} + +impl ChatExporter { + pub fn generate(chatmsg: &UnresolvedChatMsg, ecs: &World) -> Option { + let id_maps = ecs.read_resource::(); + let players = ecs.read_storage::(); + let player_info_from_uid = |uid| { + id_maps + .uid_entity(uid) + .and_then(|entry| players.get(entry)) + .map(|player| PlayerInfo { + alias: player.alias.clone(), + uuid: player.uuid(), + }) + }; + let group_members_from_group = |g| -> Vec<_> { + let groups = ecs.read_storage::(); + (&players, &groups) + .join() + .filter_map(|(player, group)| { + if g == group { + Some(PlayerInfo { + alias: player.alias.clone(), + uuid: player.uuid(), + }) + } else { + None + } + }) + .collect() + }; + + match &chatmsg.chat_type { + ChatType::Offline(from) => { + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new(chatmsg, ChatParties::Offline(player_info))); + } + }, + ChatType::Online(from) => { + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new(chatmsg, ChatParties::Online(player_info))); + } + }, + ChatType::Region(from) => { + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new(chatmsg, ChatParties::Region(player_info))); + } + }, + ChatType::World(from) => { + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new(chatmsg, ChatParties::World(player_info))); + } + }, + ChatType::Say(from) => { + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new(chatmsg, ChatParties::Say(player_info))); + } + }, + ChatType::Tell(from, to) => { + if let (Some(from_player_info), Some(to_player_info)) = + (player_info_from_uid(*from), player_info_from_uid(*to)) + { + return Some(ChatMessage::new( + chatmsg, + ChatParties::Tell(from_player_info, to_player_info), + )); + } + }, + ChatType::Kill(kill_source, from) => { + let kill_source = match kill_source.clone() { + comp::chat::KillSource::Player(uid, t) => { + if let Some(player_info) = player_info_from_uid(uid) { + KillSource::Player(player_info, t) + } else { + return None; + } + }, + comp::chat::KillSource::NonPlayer(str, t) => KillSource::NonPlayer(str, t), + comp::chat::KillSource::NonExistent(t) => KillSource::NonExistent(t), + comp::chat::KillSource::Environment(str) => KillSource::Environment(str), + comp::chat::KillSource::FallDamage => KillSource::FallDamage, + comp::chat::KillSource::Suicide => KillSource::Suicide, + comp::chat::KillSource::Other => KillSource::Other, + }; + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new( + chatmsg, + ChatParties::Kill(kill_source, player_info), + )); + } + }, + ChatType::FactionMeta(s) => { + return Some(ChatMessage::new( + chatmsg, + ChatParties::FactionMeta(s.clone()), + )); + }, + ChatType::Faction(from, s) => { + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new( + chatmsg, + ChatParties::Faction(player_info, s.clone()), + )); + } + }, + ChatType::GroupMeta(g) => { + let members = group_members_from_group(g); + return Some(ChatMessage::new(chatmsg, ChatParties::GroupMeta(members))); + }, + ChatType::Group(from, g) => { + let members = group_members_from_group(g); + if let Some(player_info) = player_info_from_uid(*from) { + return Some(ChatMessage::new( + chatmsg, + ChatParties::Group(player_info, members), + )); + } + }, + _ => (), + }; + + None + } + + pub fn send(&self, msg: ChatMessage) { + if let Err(e) = self.chat_s.blocking_send(msg) { + tracing::warn!( + ?e, + "could not export chat message. the tokio sender seems to be broken" + ); + } + } +} + +impl ChatForwarder { + async fn run(mut self) { + while let Some(msg) = self.chat_r.recv().await { + let drop_older_than = msg.time.sub(self.keep_duration); + let mut messages = self.messages.lock().await; + while let Some(msg) = messages.front() && msg.time < drop_older_than { + messages.pop_front(); + } + messages.push_back(msg); + const MAX_CACHE_MESSAGES: usize = 10_000; // in case we have a short spam of many many messages, we dont want to keep the capacity forever + if messages.capacity() > messages.len() + MAX_CACHE_MESSAGES { + let msg_count = messages.len(); + tracing::debug!(?msg_count, "shrinking cache"); + messages.shrink_to_fit(); + } + } + } +} + +impl ChatCache { + pub fn new(keep_duration: Duration, runtime: &tokio::runtime::Runtime) -> (Self, ChatExporter) { + const BUFFER_SIZE: usize = 1_000; + let (chat_s, chat_r) = tokio::sync::mpsc::channel(BUFFER_SIZE); + let messages: Arc>> = Default::default(); + let messages_clone = Arc::clone(&messages); + let keep_duration = chrono::Duration::from_std(keep_duration).unwrap(); + + let worker = ChatForwarder { + keep_duration, + chat_r, + messages: messages_clone, + }; + + runtime.spawn(worker.run().instrument(info_span!("chat_forwarder"))); + + (Self { messages }, ChatExporter { chat_s }) + } +} diff --git a/server/src/lib.rs b/server/src/lib.rs index 985959cacc..df46ee9b78 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -8,6 +8,7 @@ pub mod automod; mod character_creator; +pub mod chat; pub mod chunk_generator; mod chunk_serialize; pub mod client; @@ -94,7 +95,6 @@ use persistence::{ character_updater::CharacterUpdater, }; use prometheus::Registry; -use prometheus_hyper::Server as PrometheusServer; use specs::{Builder, Entity as EcsEntity, Entity, Join, LendJoin, WorldExt}; use std::{ i32, @@ -104,7 +104,7 @@ use std::{ }; #[cfg(not(feature = "worldgen"))] use test_world::{IndexOwned, World}; -use tokio::{runtime::Runtime, sync::Notify}; +use tokio::runtime::Runtime; use tracing::{debug, error, info, trace, warn}; use vek::*; pub use world::{civ::WorldCivStage, sim::WorldSimStage, WorldGenerateStage}; @@ -124,7 +124,7 @@ use { common_state::plugin::{memory_manager::EcsWorld, PluginMgr}, }; -use crate::persistence::character_loader::CharacterScreenResponseKind; +use crate::{chat::ChatCache, persistence::character_loader::CharacterScreenResponseKind}; use common::comp::Anchor; #[cfg(feature = "worldgen")] pub use world::{ @@ -211,7 +211,8 @@ pub struct Server { runtime: Arc, - metrics_shutdown: Arc, + metrics_registry: Arc, + chat_cache: ChatCache, database_settings: Arc>, disconnect_all_clients_requested: bool, @@ -483,17 +484,8 @@ impl Server { state.ecs_mut().insert(DeletedEntities::default()); let network = Network::new_with_registry(Pid::new(), &runtime, ®istry); - let metrics_shutdown = Arc::new(Notify::new()); - let metrics_shutdown_clone = Arc::clone(&metrics_shutdown); - let addr = settings.metrics_address; - runtime.spawn(async move { - PrometheusServer::run( - Arc::clone(®istry), - addr, - metrics_shutdown_clone.notified(), - ) - .await - }); + let (chat_cache, chat_tracker) = ChatCache::new(Duration::from_secs(60), &runtime); + state.ecs_mut().insert(chat_tracker); let mut printed_quic_warning = false; for protocol in &settings.gameserver_protocols { @@ -599,7 +591,8 @@ impl Server { connection_handler, runtime, - metrics_shutdown, + metrics_registry: registry, + chat_cache, database_settings, disconnect_all_clients_requested: false, @@ -664,6 +657,12 @@ impl Server { /// Get a reference to the server's world. pub fn world(&self) -> &World { &self.world } + /// Get a reference to the Metrics Registry + pub fn metrics_registry(&self) -> &Arc { &self.metrics_registry } + + /// Get a reference to the Chat Cache + pub fn chat_cache(&self) -> &ChatCache { &self.chat_cache } + fn parse_locations(&self, character_list_data: &mut [CharacterItem]) { character_list_data.iter_mut().for_each(|c| { let name = c @@ -1493,8 +1492,6 @@ impl Server { impl Drop for Server { fn drop(&mut self) { - self.metrics_shutdown.notify_one(); - self.state .notify_players(ServerGeneral::Disconnect(DisconnectReason::Shutdown)); diff --git a/server/src/settings.rs b/server/src/settings.rs index caf7a968d9..4756011a66 100644 --- a/server/src/settings.rs +++ b/server/src/settings.rs @@ -160,7 +160,6 @@ impl CalendarMode { #[serde(default)] pub struct Settings { pub gameserver_protocols: Vec, - pub metrics_address: SocketAddr, pub auth_server_address: Option, pub max_players: u16, pub world_seed: u32, @@ -202,7 +201,6 @@ impl Default for Settings { address: SocketAddr::from((Ipv4Addr::UNSPECIFIED, 14004)), }, ], - metrics_address: SocketAddr::from((Ipv4Addr::LOCALHOST, 14005)), auth_server_address: Some("https://auth.veloren.net".into()), world_seed: DEFAULT_WORLD_SEED, server_name: "Veloren Server".into(), @@ -285,10 +283,6 @@ impl Settings { pick_unused_port().expect("Failed to find unused port!"), )), }], - metrics_address: SocketAddr::from(( - Ipv4Addr::LOCALHOST, - pick_unused_port().expect("Failed to find unused port!"), - )), auth_server_address: None, // If loading the default map file, make sure the seed is also default. world_seed: if load.map_file.is_some() { diff --git a/server/src/state_ext.rs b/server/src/state_ext.rs index 5b146beff4..716912fac1 100644 --- a/server/src/state_ext.rs +++ b/server/src/state_ext.rs @@ -1,5 +1,6 @@ use crate::{ automod::AutoMod, + chat::ChatExporter, client::Client, events::{self, update_map_markers}, persistence::PersistedComponents, @@ -900,9 +901,14 @@ impl StateExt for State { |target, a: &comp::Pos, b: &comp::Pos| a.0.distance_squared(b.0) < target * target; let group_manager = ecs.read_resource::(); + let chat_exporter = ecs.read_resource::(); let group_info = msg.get_group().and_then(|g| group_manager.group_info(*g)); + if let Some(exported_message) = ChatExporter::generate(&msg, ecs) { + chat_exporter.send(exported_message); + } + let resolved_msg = msg .clone() .map_group(|_| group_info.map_or_else(|| "???".to_string(), |i| i.name.clone()));