mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
Merge branch 'xMAC94x/better_moderation' into 'master'
Improve chat moderation See merge request veloren/veloren!4138
This commit is contained in:
commit
c7383c662f
136
Cargo.lock
generated
136
Cargo.lock
generated
@ -412,6 +412,55 @@ version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
||||
|
||||
[[package]]
|
||||
name = "axum"
|
||||
version = "0.6.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"axum-core",
|
||||
"bitflags 1.3.2",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http",
|
||||
"http-body",
|
||||
"hyper",
|
||||
"itoa",
|
||||
"matchit",
|
||||
"memchr",
|
||||
"mime",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustversion",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_path_to_error",
|
||||
"serde_urlencoded",
|
||||
"sync_wrapper",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum-core"
|
||||
version = "0.3.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http",
|
||||
"http-body",
|
||||
"mime",
|
||||
"rustversion",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.69"
|
||||
@ -3408,6 +3457,12 @@ dependencies = [
|
||||
"regex-automata 0.1.10",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matchit"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
|
||||
|
||||
[[package]]
|
||||
name = "maybe-uninit"
|
||||
version = "2.0.0"
|
||||
@ -3497,6 +3552,12 @@ dependencies = [
|
||||
"libmimalloc-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mime"
|
||||
version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
|
||||
|
||||
[[package]]
|
||||
name = "minifb"
|
||||
version = "0.24.0"
|
||||
@ -4433,6 +4494,26 @@ dependencies = [
|
||||
"siphasher",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422"
|
||||
dependencies = [
|
||||
"pin-project-internal",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-internal"
|
||||
version = "1.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
|
||||
dependencies = [
|
||||
"proc-macro2 1.0.66",
|
||||
"quote 1.0.33",
|
||||
"syn 2.0.29",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-lite"
|
||||
version = "0.2.13"
|
||||
@ -5566,6 +5647,16 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_path_to_error"
|
||||
version = "0.1.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_repr"
|
||||
version = "0.1.16"
|
||||
@ -5586,6 +5677,18 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_urlencoded"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
|
||||
dependencies = [
|
||||
"form_urlencoded",
|
||||
"itoa",
|
||||
"ryu",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha1"
|
||||
version = "0.6.1"
|
||||
@ -6106,6 +6209,12 @@ dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sync_wrapper"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
|
||||
|
||||
[[package]]
|
||||
name = "tap"
|
||||
version = "1.0.1"
|
||||
@ -6428,6 +6537,28 @@ dependencies = [
|
||||
"winnow",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.4.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"pin-project",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower-layer"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
|
||||
|
||||
[[package]]
|
||||
name = "tower-service"
|
||||
version = "0.3.2"
|
||||
@ -7103,7 +7234,6 @@ dependencies = [
|
||||
"parking_lot 0.12.1",
|
||||
"portpicker",
|
||||
"prometheus",
|
||||
"prometheus-hyper",
|
||||
"quinn",
|
||||
"rand 0.8.5",
|
||||
"rayon",
|
||||
@ -7151,12 +7281,16 @@ dependencies = [
|
||||
name = "veloren-server-cli"
|
||||
version = "0.15.0"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"cansi",
|
||||
"chrono",
|
||||
"clap",
|
||||
"crossterm 0.26.1",
|
||||
"hyper",
|
||||
"lazy_static",
|
||||
"mimalloc",
|
||||
"num_cpus",
|
||||
"prometheus",
|
||||
"ron 0.8.1",
|
||||
"serde",
|
||||
"shell-words",
|
||||
|
@ -60,7 +60,7 @@ where
|
||||
"veloren_server::events::entity_manipulation=info",
|
||||
"hyper=info",
|
||||
"prometheus_hyper=info",
|
||||
"mio::pool=info",
|
||||
"mio::poll=info",
|
||||
"mio::sys::windows=info",
|
||||
"h2=info",
|
||||
"tokio_util=info",
|
||||
|
@ -47,6 +47,12 @@ tracing = { workspace = true }
|
||||
ron = { workspace = true }
|
||||
serde = { workspace = true, features = [ "rc", "derive" ]}
|
||||
|
||||
#HTTP
|
||||
axum = { version = "0.6.20" }
|
||||
hyper = "0.14.26"
|
||||
prometheus = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
mimalloc = "0.1.29"
|
||||
|
||||
|
@ -16,6 +16,7 @@ mod settings;
|
||||
mod shutdown_coordinator;
|
||||
mod tui_runner;
|
||||
mod tuilog;
|
||||
mod web;
|
||||
use crate::{
|
||||
cli::{Admin, ArgvApp, ArgvCommand, Message, SharedCommand, Shutdown},
|
||||
shutdown_coordinator::ShutdownCoordinator,
|
||||
@ -31,6 +32,7 @@ use std::{
|
||||
sync::{atomic::AtomicBool, mpsc, Arc},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::sync::Notify;
|
||||
use tracing::{info, trace};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
@ -185,7 +187,7 @@ fn main() -> io::Result<()> {
|
||||
info!("Starting server...");
|
||||
|
||||
let protocols_and_addresses = server_settings.gameserver_protocols.clone();
|
||||
let metrics_port = &server_settings.metrics_address.port();
|
||||
let web_port = &settings.web_address.port();
|
||||
// Create server
|
||||
let mut server = Server::new(
|
||||
server_settings,
|
||||
@ -193,10 +195,27 @@ fn main() -> io::Result<()> {
|
||||
database_settings,
|
||||
&server_data_dir,
|
||||
&|_| {},
|
||||
runtime,
|
||||
Arc::clone(&runtime),
|
||||
)
|
||||
.expect("Failed to create server instance!");
|
||||
|
||||
let registry = Arc::clone(server.metrics_registry());
|
||||
let chat = server.chat_cache().clone();
|
||||
let metrics_shutdown = Arc::new(Notify::new());
|
||||
let metrics_shutdown_clone = Arc::clone(&metrics_shutdown);
|
||||
let web_chat_secret = settings.web_chat_secret.clone();
|
||||
|
||||
runtime.spawn(async move {
|
||||
web::run(
|
||||
registry,
|
||||
chat,
|
||||
web_chat_secret,
|
||||
settings.web_address,
|
||||
metrics_shutdown_clone.notified(),
|
||||
)
|
||||
.await
|
||||
});
|
||||
|
||||
// Collect addresses that the server is listening to log.
|
||||
let gameserver_addresses = protocols_and_addresses
|
||||
.into_iter()
|
||||
@ -210,7 +229,7 @@ fn main() -> io::Result<()> {
|
||||
});
|
||||
|
||||
info!(
|
||||
?metrics_port,
|
||||
?web_port,
|
||||
?gameserver_addresses,
|
||||
"Server is ready to accept connections."
|
||||
);
|
||||
@ -317,6 +336,7 @@ fn main() -> io::Result<()> {
|
||||
#[cfg(feature = "tracy")]
|
||||
common_base::tracy_client::frame_mark();
|
||||
}
|
||||
metrics_shutdown.notify_one();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -1,5 +1,9 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{fs, path::PathBuf};
|
||||
use std::{
|
||||
fs,
|
||||
net::{Ipv4Addr, SocketAddr},
|
||||
path::PathBuf,
|
||||
};
|
||||
use tracing::warn;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
@ -7,6 +11,10 @@ use tracing::warn;
|
||||
pub struct Settings {
|
||||
pub update_shutdown_grace_period_secs: u32,
|
||||
pub update_shutdown_message: String,
|
||||
pub web_address: SocketAddr,
|
||||
/// SECRET API HEADER used to access the chat api, if disabled the API is
|
||||
/// unreachable
|
||||
pub web_chat_secret: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for Settings {
|
||||
@ -14,6 +22,8 @@ impl Default for Settings {
|
||||
Self {
|
||||
update_shutdown_grace_period_secs: 120,
|
||||
update_shutdown_message: "The server is restarting for an update".to_owned(),
|
||||
web_address: SocketAddr::from((Ipv4Addr::LOCALHOST, 14005)),
|
||||
web_chat_secret: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
122
server-cli/src/web/chat.rs
Normal file
122
server-cli/src/web/chat.rs
Normal file
@ -0,0 +1,122 @@
|
||||
use axum::{
|
||||
extract::{ConnectInfo, Query, State},
|
||||
middleware::Next,
|
||||
response::{IntoResponse, Response},
|
||||
routing::get,
|
||||
Json, Router,
|
||||
};
|
||||
use chrono::DateTime;
|
||||
use hyper::{Request, StatusCode};
|
||||
use serde::{Deserialize, Deserializer};
|
||||
use server::chat::ChatCache;
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
net::{IpAddr, SocketAddr},
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
/// Keep Size small, so we dont have to Clone much for each request.
|
||||
#[derive(Clone)]
|
||||
struct ChatToken {
|
||||
secret_token: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct IpAddresses {
|
||||
users: Arc<Mutex<HashSet<IpAddr>>>,
|
||||
}
|
||||
|
||||
async fn validate_secret<B>(
|
||||
State(token): State<ChatToken>,
|
||||
req: Request<B>,
|
||||
next: Next<B>,
|
||||
) -> Result<Response, StatusCode> {
|
||||
// check if this endpoint is disabled
|
||||
let secret_token = token.secret_token.ok_or(StatusCode::METHOD_NOT_ALLOWED)?;
|
||||
|
||||
pub const X_SECRET_TOKEN: &str = "X-Secret-Token";
|
||||
let session_cookie = req
|
||||
.headers()
|
||||
.get(X_SECRET_TOKEN)
|
||||
.ok_or(StatusCode::UNAUTHORIZED)?;
|
||||
|
||||
if session_cookie.as_bytes() != secret_token.as_bytes() {
|
||||
return Err(StatusCode::UNAUTHORIZED);
|
||||
}
|
||||
|
||||
Ok(next.run(req).await)
|
||||
}
|
||||
|
||||
/// Logs each new IP address that accesses this API authenticated
|
||||
async fn log_users<B>(
|
||||
State(ip_addresses): State<IpAddresses>,
|
||||
ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
||||
req: Request<B>,
|
||||
next: Next<B>,
|
||||
) -> Result<Response, StatusCode> {
|
||||
let mut ip_addresses = ip_addresses.users.lock().await;
|
||||
let ip_addr = addr.ip();
|
||||
if !ip_addresses.contains(&ip_addr) {
|
||||
ip_addresses.insert(ip_addr);
|
||||
let users_so_far = ip_addresses.len();
|
||||
tracing::info!(?ip_addr, ?users_so_far, "Is accessing the /chat endpoint");
|
||||
}
|
||||
Ok(next.run(req).await)
|
||||
}
|
||||
|
||||
pub fn router(cache: ChatCache, secret_token: Option<String>) -> Router {
|
||||
let token = ChatToken { secret_token };
|
||||
let ip_addrs = IpAddresses::default();
|
||||
Router::new()
|
||||
.route("/history", get(history))
|
||||
.layer(axum::middleware::from_fn_with_state(ip_addrs, log_users))
|
||||
.layer(axum::middleware::from_fn_with_state(token, validate_secret))
|
||||
.with_state(cache)
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct Params {
|
||||
#[serde(default, deserialize_with = "empty_string_as_none")]
|
||||
/// To be used to get all messages without duplicates nor losing messages
|
||||
from_time_exclusive_rfc3339: Option<String>,
|
||||
}
|
||||
|
||||
fn empty_string_as_none<'de, D, T>(de: D) -> Result<Option<T>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
T: FromStr,
|
||||
T::Err: core::fmt::Display,
|
||||
{
|
||||
let opt = Option::<String>::deserialize(de)?;
|
||||
match opt.as_deref() {
|
||||
None | Some("") => Ok(None),
|
||||
Some(s) => FromStr::from_str(s)
|
||||
.map_err(serde::de::Error::custom)
|
||||
.map(Some),
|
||||
}
|
||||
}
|
||||
|
||||
async fn history(
|
||||
State(cache): State<ChatCache>,
|
||||
Query(params): Query<Params>,
|
||||
) -> Result<impl IntoResponse, StatusCode> {
|
||||
// first validate parameters before we take lock
|
||||
let from_time_exclusive = if let Some(rfc3339) = params.from_time_exclusive_rfc3339 {
|
||||
Some(DateTime::parse_from_rfc3339(&rfc3339).map_err(|_| StatusCode::BAD_REQUEST)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let messages = cache.messages.lock().await;
|
||||
let filtered: Vec<_> = match from_time_exclusive {
|
||||
Some(from_time_exclusive) => messages
|
||||
.iter()
|
||||
.filter(|msg| msg.time > from_time_exclusive)
|
||||
.cloned()
|
||||
.collect(),
|
||||
None => messages.iter().cloned().collect(),
|
||||
};
|
||||
Ok(Json(filtered))
|
||||
}
|
66
server-cli/src/web/mod.rs
Normal file
66
server-cli/src/web/mod.rs
Normal file
@ -0,0 +1,66 @@
|
||||
use axum::{extract::State, response::IntoResponse, routing::get, Router};
|
||||
use core::{future::Future, ops::Deref};
|
||||
use hyper::{header, http, Body, StatusCode};
|
||||
use prometheus::{Registry, TextEncoder};
|
||||
use server::chat::ChatCache;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
mod chat;
|
||||
|
||||
pub async fn run<S, F, R>(
|
||||
registry: R,
|
||||
cache: ChatCache,
|
||||
chat_secret: Option<String>,
|
||||
addr: S,
|
||||
shutdown: F,
|
||||
) -> Result<(), hyper::Error>
|
||||
where
|
||||
S: Into<SocketAddr>,
|
||||
F: Future<Output = ()>,
|
||||
R: Deref<Target = Registry> + Clone + Send + Sync + 'static,
|
||||
{
|
||||
let metrics = Router::new()
|
||||
.route("/", get(metrics))
|
||||
.with_state(registry.deref().clone());
|
||||
|
||||
let app = Router::new()
|
||||
.nest("/chat/v1", chat::router(cache, chat_secret))
|
||||
.nest("/metrics", metrics)
|
||||
.route("/health", get(|| async {}));
|
||||
|
||||
// run it
|
||||
let addr = addr.into();
|
||||
let server =
|
||||
axum::Server::bind(&addr).serve(app.into_make_service_with_connect_info::<SocketAddr>());
|
||||
let server = server.with_graceful_shutdown(shutdown);
|
||||
tracing::info!("listening on {}", addr);
|
||||
match server.await {
|
||||
Ok(_) => tracing::debug!("webserver shutdown successful"),
|
||||
Err(e) => tracing::error!(?e, "webserver shutdown error"),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn metrics(State(registry): State<Registry>) -> Result<impl IntoResponse, StatusCode> {
|
||||
use prometheus::Encoder;
|
||||
let mf = registry.gather();
|
||||
let mut buffer = Vec::with_capacity(1024);
|
||||
|
||||
let encoder = TextEncoder::new();
|
||||
encoder
|
||||
.encode(&mf, &mut buffer)
|
||||
.expect("write to vec cannot fail");
|
||||
|
||||
match http::Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "text/plain; charset=utf-8")
|
||||
.body(Body::from(buffer))
|
||||
{
|
||||
Err(e) => {
|
||||
tracing::warn!(?e, "could not export metrics to HTTP format");
|
||||
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
},
|
||||
Ok(r) => Ok(r),
|
||||
}
|
||||
}
|
@ -39,7 +39,6 @@ tracing = { workspace = true }
|
||||
vek = { workspace = true }
|
||||
futures-util = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
prometheus-hyper = { workspace = true }
|
||||
quinn = "0.10"
|
||||
rustls = { version = "0.21", default-features = false }
|
||||
rustls-pemfile = { version = "1", default-features = false }
|
||||
|
255
server/src/chat.rs
Normal file
255
server/src/chat.rs
Normal file
@ -0,0 +1,255 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use common::{
|
||||
comp,
|
||||
comp::{chat::KillType, ChatType, Content, Group, Player, UnresolvedChatMsg},
|
||||
uid::IdMaps,
|
||||
uuid::Uuid,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use specs::{Join, World, WorldExt};
|
||||
use std::{collections::VecDeque, ops::Sub, sync::Arc, time::Duration};
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{info_span, Instrument};
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct PlayerInfo {
|
||||
uuid: Uuid,
|
||||
alias: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub enum KillSource {
|
||||
Player(PlayerInfo, KillType),
|
||||
NonPlayer(String, KillType),
|
||||
NonExistent(KillType),
|
||||
Environment(String),
|
||||
FallDamage,
|
||||
Suicide,
|
||||
Other,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
/// partially mapped to common::comp::ChatMsg
|
||||
pub enum ChatParties {
|
||||
Online(PlayerInfo),
|
||||
Offline(PlayerInfo),
|
||||
CommandInfo(PlayerInfo),
|
||||
CommandError(PlayerInfo),
|
||||
Kill(KillSource, PlayerInfo),
|
||||
GroupMeta(Vec<PlayerInfo>),
|
||||
Group(PlayerInfo, Vec<PlayerInfo>),
|
||||
Tell(PlayerInfo, PlayerInfo),
|
||||
Say(PlayerInfo),
|
||||
FactionMeta(String),
|
||||
Faction(PlayerInfo, String),
|
||||
Region(PlayerInfo),
|
||||
World(PlayerInfo),
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct ChatMessage {
|
||||
pub time: DateTime<Utc>,
|
||||
pub parties: ChatParties,
|
||||
pub content: Content,
|
||||
}
|
||||
|
||||
type MessagesStore = Arc<Mutex<VecDeque<ChatMessage>>>;
|
||||
|
||||
/// The chat cache gets it data from the gameserver and will keep it for some
|
||||
/// time It will be made available for its consumers, the REST Api
|
||||
#[derive(Clone)]
|
||||
pub struct ChatCache {
|
||||
pub messages: MessagesStore,
|
||||
}
|
||||
|
||||
/// Will internally run on tokio and take stress from main loop
|
||||
struct ChatForwarder {
|
||||
chat_r: tokio::sync::mpsc::Receiver<ChatMessage>,
|
||||
messages: MessagesStore,
|
||||
keep_duration: chrono::Duration,
|
||||
}
|
||||
|
||||
pub struct ChatExporter {
|
||||
chat_s: tokio::sync::mpsc::Sender<ChatMessage>,
|
||||
}
|
||||
|
||||
impl ChatMessage {
|
||||
fn new(chatmsg: &UnresolvedChatMsg, parties: ChatParties) -> Self {
|
||||
ChatMessage {
|
||||
time: Utc::now(),
|
||||
content: chatmsg.content().clone(),
|
||||
parties,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ChatExporter {
|
||||
pub fn generate(chatmsg: &UnresolvedChatMsg, ecs: &World) -> Option<ChatMessage> {
|
||||
let id_maps = ecs.read_resource::<IdMaps>();
|
||||
let players = ecs.read_storage::<Player>();
|
||||
let player_info_from_uid = |uid| {
|
||||
id_maps
|
||||
.uid_entity(uid)
|
||||
.and_then(|entry| players.get(entry))
|
||||
.map(|player| PlayerInfo {
|
||||
alias: player.alias.clone(),
|
||||
uuid: player.uuid(),
|
||||
})
|
||||
};
|
||||
let group_members_from_group = |g| -> Vec<_> {
|
||||
let groups = ecs.read_storage::<Group>();
|
||||
(&players, &groups)
|
||||
.join()
|
||||
.filter_map(|(player, group)| {
|
||||
if g == group {
|
||||
Some(PlayerInfo {
|
||||
alias: player.alias.clone(),
|
||||
uuid: player.uuid(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
match &chatmsg.chat_type {
|
||||
ChatType::Offline(from) => {
|
||||
if let Some(player_info) = player_info_from_uid(*from) {
|
||||
return Some(ChatMessage::new(chatmsg, ChatParties::Offline(player_info)));
|
||||
}
|
||||
},
|
||||
ChatType::Online(from) => {
|
||||
if let Some(player_info) = player_info_from_uid(*from) {
|
||||
return Some(ChatMessage::new(chatmsg, ChatParties::Online(player_info)));
|
||||
}
|
||||
},
|
||||
ChatType::Region(from) => {
|
||||
if let Some(player_info) = player_info_from_uid(*from) {
|
||||
return Some(ChatMessage::new(chatmsg, ChatParties::Region(player_info)));
|
||||
}
|
||||
},
|
||||
ChatType::World(from) => {
|
||||
if let Some(player_info) = player_info_from_uid(*from) {
|
||||
return Some(ChatMessage::new(chatmsg, ChatParties::World(player_info)));
|
||||
}
|
||||
},
|
||||
ChatType::Say(from) => {
|
||||
if let Some(player_info) = player_info_from_uid(*from) {
|
||||
return Some(ChatMessage::new(chatmsg, ChatParties::Say(player_info)));
|
||||
}
|
||||
},
|
||||
ChatType::Tell(from, to) => {
|
||||
if let (Some(from_player_info), Some(to_player_info)) =
|
||||
(player_info_from_uid(*from), player_info_from_uid(*to))
|
||||
{
|
||||
return Some(ChatMessage::new(
|
||||
chatmsg,
|
||||
ChatParties::Tell(from_player_info, to_player_info),
|
||||
));
|
||||
}
|
||||
},
|
||||
ChatType::Kill(kill_source, from) => {
|
||||
let kill_source = match kill_source.clone() {
|
||||
comp::chat::KillSource::Player(uid, t) => {
|
||||
if let Some(player_info) = player_info_from_uid(uid) {
|
||||
KillSource::Player(player_info, t)
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
},
|
||||
comp::chat::KillSource::NonPlayer(str, t) => KillSource::NonPlayer(str, t),
|
||||
comp::chat::KillSource::NonExistent(t) => KillSource::NonExistent(t),
|
||||
comp::chat::KillSource::Environment(str) => KillSource::Environment(str),
|
||||
comp::chat::KillSource::FallDamage => KillSource::FallDamage,
|
||||
comp::chat::KillSource::Suicide => KillSource::Suicide,
|
||||
comp::chat::KillSource::Other => KillSource::Other,
|
||||
};
|
||||
if let Some(player_info) = player_info_from_uid(*from) {
|
||||
return Some(ChatMessage::new(
|
||||
chatmsg,
|
||||
ChatParties::Kill(kill_source, player_info),
|
||||
));
|
||||
}
|
||||
},
|
||||
ChatType::FactionMeta(s) => {
|
||||
return Some(ChatMessage::new(
|
||||
chatmsg,
|
||||
ChatParties::FactionMeta(s.clone()),
|
||||
));
|
||||
},
|
||||
ChatType::Faction(from, s) => {
|
||||
if let Some(player_info) = player_info_from_uid(*from) {
|
||||
return Some(ChatMessage::new(
|
||||
chatmsg,
|
||||
ChatParties::Faction(player_info, s.clone()),
|
||||
));
|
||||
}
|
||||
},
|
||||
ChatType::GroupMeta(g) => {
|
||||
let members = group_members_from_group(g);
|
||||
return Some(ChatMessage::new(chatmsg, ChatParties::GroupMeta(members)));
|
||||
},
|
||||
ChatType::Group(from, g) => {
|
||||
let members = group_members_from_group(g);
|
||||
if let Some(player_info) = player_info_from_uid(*from) {
|
||||
return Some(ChatMessage::new(
|
||||
chatmsg,
|
||||
ChatParties::Group(player_info, members),
|
||||
));
|
||||
}
|
||||
},
|
||||
_ => (),
|
||||
};
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
pub fn send(&self, msg: ChatMessage) {
|
||||
if let Err(e) = self.chat_s.blocking_send(msg) {
|
||||
tracing::warn!(
|
||||
?e,
|
||||
"could not export chat message. the tokio sender seems to be broken"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ChatForwarder {
|
||||
async fn run(mut self) {
|
||||
while let Some(msg) = self.chat_r.recv().await {
|
||||
let drop_older_than = msg.time.sub(self.keep_duration);
|
||||
let mut messages = self.messages.lock().await;
|
||||
while let Some(msg) = messages.front() && msg.time < drop_older_than {
|
||||
messages.pop_front();
|
||||
}
|
||||
messages.push_back(msg);
|
||||
const MAX_CACHE_MESSAGES: usize = 10_000; // in case we have a short spam of many many messages, we dont want to keep the capacity forever
|
||||
if messages.capacity() > messages.len() + MAX_CACHE_MESSAGES {
|
||||
let msg_count = messages.len();
|
||||
tracing::debug!(?msg_count, "shrinking cache");
|
||||
messages.shrink_to_fit();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ChatCache {
|
||||
pub fn new(keep_duration: Duration, runtime: &tokio::runtime::Runtime) -> (Self, ChatExporter) {
|
||||
const BUFFER_SIZE: usize = 1_000;
|
||||
let (chat_s, chat_r) = tokio::sync::mpsc::channel(BUFFER_SIZE);
|
||||
let messages: Arc<Mutex<VecDeque<ChatMessage>>> = Default::default();
|
||||
let messages_clone = Arc::clone(&messages);
|
||||
let keep_duration = chrono::Duration::from_std(keep_duration).unwrap();
|
||||
|
||||
let worker = ChatForwarder {
|
||||
keep_duration,
|
||||
chat_r,
|
||||
messages: messages_clone,
|
||||
};
|
||||
|
||||
runtime.spawn(worker.run().instrument(info_span!("chat_forwarder")));
|
||||
|
||||
(Self { messages }, ChatExporter { chat_s })
|
||||
}
|
||||
}
|
@ -8,6 +8,7 @@
|
||||
|
||||
pub mod automod;
|
||||
mod character_creator;
|
||||
pub mod chat;
|
||||
pub mod chunk_generator;
|
||||
mod chunk_serialize;
|
||||
pub mod client;
|
||||
@ -94,7 +95,6 @@ use persistence::{
|
||||
character_updater::CharacterUpdater,
|
||||
};
|
||||
use prometheus::Registry;
|
||||
use prometheus_hyper::Server as PrometheusServer;
|
||||
use specs::{Builder, Entity as EcsEntity, Entity, Join, LendJoin, WorldExt};
|
||||
use std::{
|
||||
i32,
|
||||
@ -104,7 +104,7 @@ use std::{
|
||||
};
|
||||
#[cfg(not(feature = "worldgen"))]
|
||||
use test_world::{IndexOwned, World};
|
||||
use tokio::{runtime::Runtime, sync::Notify};
|
||||
use tokio::runtime::Runtime;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use vek::*;
|
||||
pub use world::{civ::WorldCivStage, sim::WorldSimStage, WorldGenerateStage};
|
||||
@ -124,7 +124,7 @@ use {
|
||||
common_state::plugin::{memory_manager::EcsWorld, PluginMgr},
|
||||
};
|
||||
|
||||
use crate::persistence::character_loader::CharacterScreenResponseKind;
|
||||
use crate::{chat::ChatCache, persistence::character_loader::CharacterScreenResponseKind};
|
||||
use common::comp::Anchor;
|
||||
#[cfg(feature = "worldgen")]
|
||||
pub use world::{
|
||||
@ -211,7 +211,8 @@ pub struct Server {
|
||||
|
||||
runtime: Arc<Runtime>,
|
||||
|
||||
metrics_shutdown: Arc<Notify>,
|
||||
metrics_registry: Arc<Registry>,
|
||||
chat_cache: ChatCache,
|
||||
database_settings: Arc<RwLock<DatabaseSettings>>,
|
||||
disconnect_all_clients_requested: bool,
|
||||
|
||||
@ -483,17 +484,8 @@ impl Server {
|
||||
state.ecs_mut().insert(DeletedEntities::default());
|
||||
|
||||
let network = Network::new_with_registry(Pid::new(), &runtime, ®istry);
|
||||
let metrics_shutdown = Arc::new(Notify::new());
|
||||
let metrics_shutdown_clone = Arc::clone(&metrics_shutdown);
|
||||
let addr = settings.metrics_address;
|
||||
runtime.spawn(async move {
|
||||
PrometheusServer::run(
|
||||
Arc::clone(®istry),
|
||||
addr,
|
||||
metrics_shutdown_clone.notified(),
|
||||
)
|
||||
.await
|
||||
});
|
||||
let (chat_cache, chat_tracker) = ChatCache::new(Duration::from_secs(60), &runtime);
|
||||
state.ecs_mut().insert(chat_tracker);
|
||||
|
||||
let mut printed_quic_warning = false;
|
||||
for protocol in &settings.gameserver_protocols {
|
||||
@ -599,7 +591,8 @@ impl Server {
|
||||
connection_handler,
|
||||
runtime,
|
||||
|
||||
metrics_shutdown,
|
||||
metrics_registry: registry,
|
||||
chat_cache,
|
||||
database_settings,
|
||||
disconnect_all_clients_requested: false,
|
||||
|
||||
@ -664,6 +657,12 @@ impl Server {
|
||||
/// Get a reference to the server's world.
|
||||
pub fn world(&self) -> &World { &self.world }
|
||||
|
||||
/// Get a reference to the Metrics Registry
|
||||
pub fn metrics_registry(&self) -> &Arc<Registry> { &self.metrics_registry }
|
||||
|
||||
/// Get a reference to the Chat Cache
|
||||
pub fn chat_cache(&self) -> &ChatCache { &self.chat_cache }
|
||||
|
||||
fn parse_locations(&self, character_list_data: &mut [CharacterItem]) {
|
||||
character_list_data.iter_mut().for_each(|c| {
|
||||
let name = c
|
||||
@ -1493,8 +1492,6 @@ impl Server {
|
||||
|
||||
impl Drop for Server {
|
||||
fn drop(&mut self) {
|
||||
self.metrics_shutdown.notify_one();
|
||||
|
||||
self.state
|
||||
.notify_players(ServerGeneral::Disconnect(DisconnectReason::Shutdown));
|
||||
|
||||
|
@ -160,7 +160,6 @@ impl CalendarMode {
|
||||
#[serde(default)]
|
||||
pub struct Settings {
|
||||
pub gameserver_protocols: Vec<Protocol>,
|
||||
pub metrics_address: SocketAddr,
|
||||
pub auth_server_address: Option<String>,
|
||||
pub max_players: u16,
|
||||
pub world_seed: u32,
|
||||
@ -202,7 +201,6 @@ impl Default for Settings {
|
||||
address: SocketAddr::from((Ipv4Addr::UNSPECIFIED, 14004)),
|
||||
},
|
||||
],
|
||||
metrics_address: SocketAddr::from((Ipv4Addr::LOCALHOST, 14005)),
|
||||
auth_server_address: Some("https://auth.veloren.net".into()),
|
||||
world_seed: DEFAULT_WORLD_SEED,
|
||||
server_name: "Veloren Server".into(),
|
||||
@ -285,10 +283,6 @@ impl Settings {
|
||||
pick_unused_port().expect("Failed to find unused port!"),
|
||||
)),
|
||||
}],
|
||||
metrics_address: SocketAddr::from((
|
||||
Ipv4Addr::LOCALHOST,
|
||||
pick_unused_port().expect("Failed to find unused port!"),
|
||||
)),
|
||||
auth_server_address: None,
|
||||
// If loading the default map file, make sure the seed is also default.
|
||||
world_seed: if load.map_file.is_some() {
|
||||
|
@ -1,5 +1,6 @@
|
||||
use crate::{
|
||||
automod::AutoMod,
|
||||
chat::ChatExporter,
|
||||
client::Client,
|
||||
events::{self, update_map_markers},
|
||||
persistence::PersistedComponents,
|
||||
@ -900,9 +901,14 @@ impl StateExt for State {
|
||||
|target, a: &comp::Pos, b: &comp::Pos| a.0.distance_squared(b.0) < target * target;
|
||||
|
||||
let group_manager = ecs.read_resource::<comp::group::GroupManager>();
|
||||
let chat_exporter = ecs.read_resource::<ChatExporter>();
|
||||
|
||||
let group_info = msg.get_group().and_then(|g| group_manager.group_info(*g));
|
||||
|
||||
if let Some(exported_message) = ChatExporter::generate(&msg, ecs) {
|
||||
chat_exporter.send(exported_message);
|
||||
}
|
||||
|
||||
let resolved_msg = msg
|
||||
.clone()
|
||||
.map_group(|_| group_info.map_or_else(|| "???".to_string(), |i| i.name.clone()));
|
||||
|
Loading…
Reference in New Issue
Block a user