There is no way to moderate ChatMessages not send in global chat. For this way we might investigate in a different approach.

The Gameserver returning the latest chat messages via a REST api (protected by a password in settings).
A central service can then scrape this endpoint and make it accessable to moderators.

We need to make sure to log which moderator sees which messages, especially when whispered. to be sure we might also limit the holding period to a week
This commit is contained in:
Marcel Märtens 2023-10-15 16:57:07 +02:00
parent bbed1d85e8
commit 38f4b8b644
6 changed files with 358 additions and 5 deletions

134
Cargo.lock generated
View File

@ -412,6 +412,55 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.6.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf"
dependencies = [
"async-trait",
"axum-core",
"bitflags 1.3.2",
"bytes",
"futures-util",
"http",
"http-body",
"hyper",
"itoa",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"mime",
"rustversion",
"tower-layer",
"tower-service",
]
[[package]]
name = "backtrace"
version = "0.3.69"
@ -3408,6 +3457,12 @@ dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "matchit"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "maybe-uninit"
version = "2.0.0"
@ -3497,6 +3552,12 @@ dependencies = [
"libmimalloc-sys",
]
[[package]]
name = "mime"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "minifb"
version = "0.24.0"
@ -4433,6 +4494,26 @@ dependencies = [
"siphasher",
]
[[package]]
name = "pin-project"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [
"proc-macro2 1.0.66",
"quote 1.0.33",
"syn 2.0.29",
]
[[package]]
name = "pin-project-lite"
version = "0.2.13"
@ -5566,6 +5647,16 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_path_to_error"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335"
dependencies = [
"itoa",
"serde",
]
[[package]]
name = "serde_repr"
version = "0.1.16"
@ -5586,6 +5677,18 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "sha1"
version = "0.6.1"
@ -6106,6 +6209,12 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "sync_wrapper"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "tap"
version = "1.0.1"
@ -6428,6 +6537,28 @@ dependencies = [
"winnow",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tokio",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
[[package]]
name = "tower-service"
version = "0.3.2"
@ -7086,6 +7217,7 @@ version = "0.15.0"
dependencies = [
"atomicwrites",
"authc",
"axum",
"bincode",
"censor",
"chrono",
@ -7096,6 +7228,7 @@ dependencies = [
"futures-util",
"hashbrown 0.13.2",
"humantime",
"hyper",
"itertools",
"lazy_static",
"noise",
@ -7103,7 +7236,6 @@ dependencies = [
"parking_lot 0.12.1",
"portpicker",
"prometheus",
"prometheus-hyper",
"quinn",
"rand 0.8.5",
"rayon",

View File

@ -39,7 +39,6 @@ tracing = { workspace = true }
vek = { workspace = true }
futures-util = { workspace = true }
tokio = { workspace = true }
prometheus-hyper = { workspace = true }
quinn = "0.10"
rustls = { version = "0.21", default-features = false }
rustls-pemfile = { version = "1", default-features = false }
@ -65,6 +64,10 @@ enum-map = { workspace = true }
noise = { version = "0.7", default-features = false }
censor = "0.3"
#HTTP
axum = { version = "0.6.20" }
hyper = "0.14.26"
rusqlite = { version = "0.28.0", features = ["array", "vtab", "bundled", "trace"] }
refinery = { version = "0.8.8", features = ["rusqlite"] }

View File

@ -31,6 +31,7 @@ pub mod sys;
#[cfg(feature = "persistent_world")]
pub mod terrain_persistence;
#[cfg(not(feature = "worldgen"))] mod test_world;
mod web;
mod weather;
@ -94,7 +95,6 @@ use persistence::{
character_updater::CharacterUpdater,
};
use prometheus::Registry;
use prometheus_hyper::Server as PrometheusServer;
use specs::{Builder, Entity as EcsEntity, Entity, Join, LendJoin, WorldExt};
use std::{
i32,
@ -124,7 +124,7 @@ use {
common_state::plugin::{memory_manager::EcsWorld, PluginMgr},
};
use crate::persistence::character_loader::CharacterScreenResponseKind;
use crate::{persistence::character_loader::CharacterScreenResponseKind, web::ChatCache};
use common::comp::Anchor;
#[cfg(feature = "worldgen")]
pub use world::{
@ -486,9 +486,13 @@ impl Server {
let metrics_shutdown = Arc::new(Notify::new());
let metrics_shutdown_clone = Arc::clone(&metrics_shutdown);
let addr = settings.metrics_address;
let (chat_cache, chat_tracker) = ChatCache::new(Duration::from_secs(60));
state.ecs_mut().insert(chat_tracker);
runtime.spawn(async move {
PrometheusServer::run(
web::run(
Arc::clone(&registry),
chat_cache,
"secretpassword".to_string(),
addr,
metrics_shutdown_clone.notified(),
)

View File

@ -8,6 +8,7 @@ use crate::{
rtsim::RtSim,
settings::Settings,
sys::sentinel::DeletedEntities,
web::ChatExporter,
wiring, BattleModeBuffer, SpawnPoint,
};
use common::{
@ -900,6 +901,7 @@ impl StateExt for State {
|target, a: &comp::Pos, b: &comp::Pos| a.0.distance_squared(b.0) < target * target;
let group_manager = ecs.read_resource::<comp::group::GroupManager>();
let chat_exporter = ecs.read_resource::<ChatExporter>();
let group_info = msg.get_group().and_then(|g| group_manager.group_info(*g));
@ -925,6 +927,7 @@ impl StateExt for State {
| comp::ChatType::CommandError
| comp::ChatType::Meta
| comp::ChatType::World(_) => {
chat_exporter.send(resolved_msg.clone());
self.notify_players(ServerGeneral::ChatMsg(resolved_msg))
},
comp::ChatType::Online(u) => {
@ -935,6 +938,7 @@ impl StateExt for State {
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
chat_exporter.send(resolved_msg);
},
comp::ChatType::Tell(from, to) => {
for (client, uid) in
@ -944,10 +948,12 @@ impl StateExt for State {
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
chat_exporter.send(resolved_msg);
},
comp::ChatType::Kill(kill_source, uid) => {
let clients = ecs.read_storage::<Client>();
let clients_count = clients.count();
chat_exporter.send(resolved_msg.clone());
// Avoid chat spam, send kill message only to group or nearby players if a
// certain amount of clients are online
if clients_count
@ -1002,6 +1008,7 @@ impl StateExt for State {
}
}
}
chat_exporter.send(resolved_msg);
},
comp::ChatType::Region(uid) => {
let entity_opt = entity_from_uid(*uid);
@ -1014,6 +1021,7 @@ impl StateExt for State {
}
}
}
chat_exporter.send(resolved_msg);
},
comp::ChatType::Npc(uid) => {
let entity_opt = entity_from_uid(*uid);
@ -1059,6 +1067,7 @@ impl StateExt for State {
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
chat_exporter.send(resolved_msg);
},
comp::ChatType::Group(from, g) => {
if group_info.is_none() {
@ -1080,6 +1089,7 @@ impl StateExt for State {
} else {
send_to_group(g, ecs, &resolved_msg);
}
chat_exporter.send(resolved_msg);
},
comp::ChatType::GroupMeta(g) => {
send_to_group(g, ecs, &resolved_msg);

144
server/src/web/chat.rs Normal file
View File

@ -0,0 +1,144 @@
use axum::{
extract::{Query, State},
middleware::Next,
response::{IntoResponse, Response},
routing::get,
Json, Router,
};
use chrono::{DateTime, Utc};
use common::comp::ChatMsg;
use hyper::{Request, StatusCode};
use serde::{Deserialize, Deserializer, Serialize};
use std::{
borrow::Cow, collections::VecDeque, mem::size_of, ops::Sub, str::FromStr, sync::Arc,
time::Duration,
};
use tokio::sync::Mutex;
/// The chat cache gets it data from the gameserver and will keep it for some
/// time It will be made available for its consumers, the REST Api
#[derive(Clone)]
pub struct ChatCache {
messages: Arc<Mutex<VecDeque<ChatMessage>>>,
}
/// Keep Size small, so we dont have to Clone much for each request.
#[derive(Clone)]
struct ChatToken {
secret_token: Cow<'static, str>,
}
pub struct ChatExporter {
messages: Arc<Mutex<VecDeque<ChatMessage>>>,
keep_duration: chrono::Duration,
}
impl ChatExporter {
pub fn send(&self, msg: ChatMsg) {
let time = Utc::now();
let drop_older_than = time.sub(self.keep_duration);
let mut messages = self.messages.blocking_lock();
while let Some(msg) = messages.front() && msg.time < drop_older_than {
messages.pop_front();
}
messages.push_back(ChatMessage { time, msg });
const MAX_CACHE_BYTES: usize = 10_000_000; // approx. because HashMap allocates on Heap
if messages.len() * size_of::<ChatMessage>() > MAX_CACHE_BYTES {
let msg_count = messages.len();
tracing::debug!(?msg_count, "shrinking cache");
messages.shrink_to_fit();
}
}
}
impl ChatCache {
pub fn new(keep_duration: Duration) -> (Self, ChatExporter) {
let messages: Arc<Mutex<VecDeque<ChatMessage>>> = Default::default();
let messages_clone = Arc::clone(&messages);
let keep_duration = chrono::Duration::from_std(keep_duration).unwrap();
(Self { messages }, ChatExporter {
messages: messages_clone,
keep_duration,
})
}
}
async fn validate_secret<B>(
State(token): State<ChatToken>,
req: Request<B>,
next: Next<B>,
) -> Result<Response, StatusCode> {
pub const X_SECRET_TOKEN: &str = "X-Secret-Token";
let session_cookie = req
.headers()
.get(X_SECRET_TOKEN)
.ok_or(StatusCode::UNAUTHORIZED)?;
if session_cookie.as_bytes() != token.secret_token.as_bytes() {
return Err(StatusCode::UNAUTHORIZED);
}
Ok(next.run(req).await)
}
pub fn router(cache: ChatCache, secret_token: String) -> Router {
let token = ChatToken {
secret_token: Cow::Owned(secret_token),
};
Router::new()
.route("/history", get(history))
.layer(axum::middleware::from_fn_with_state(token, validate_secret))
.with_state(cache)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ChatMessage {
time: DateTime<Utc>,
msg: ChatMsg,
}
#[derive(Debug, Deserialize)]
struct Params {
#[serde(default, deserialize_with = "empty_string_as_none")]
/// To be used to get all messages without duplicates nor losing messages
from_time_exclusive_rfc3339: Option<String>,
}
fn empty_string_as_none<'de, D, T>(de: D) -> Result<Option<T>, D::Error>
where
D: Deserializer<'de>,
T: FromStr,
T::Err: core::fmt::Display,
{
let opt = Option::<String>::deserialize(de)?;
match opt.as_deref() {
None | Some("") => Ok(None),
Some(s) => FromStr::from_str(s)
.map_err(serde::de::Error::custom)
.map(Some),
}
}
async fn history(
State(cache): State<ChatCache>,
Query(params): Query<Params>,
) -> Result<impl IntoResponse, StatusCode> {
// first validate parameters before we take lock
let from_time_exclusive = if let Some(rfc3339) = params.from_time_exclusive_rfc3339 {
Some(DateTime::parse_from_rfc3339(&rfc3339).map_err(|_| StatusCode::BAD_REQUEST)?)
} else {
None
};
let messages = cache.messages.lock().await;
let filtered: Vec<_> = match from_time_exclusive {
Some(from_time_exclusive) => messages
.iter()
.filter(|msg| msg.time > from_time_exclusive)
.cloned()
.collect(),
None => messages.iter().cloned().collect(),
};
Ok(Json(filtered))
}

60
server/src/web/mod.rs Normal file
View File

@ -0,0 +1,60 @@
use axum::{extract::State, response::IntoResponse, routing::get, Router};
use core::{future::Future, ops::Deref};
use hyper::{header, http, Body, StatusCode};
use prometheus::{Registry, TextEncoder};
use std::net::SocketAddr;
mod chat;
pub use chat::{ChatCache, ChatExporter};
pub async fn run<S, F, R>(
registry: R,
cache: ChatCache,
chat_token: String,
addr: S,
shutdown: F,
) -> Result<(), hyper::Error>
where
S: Into<SocketAddr>,
F: Future<Output = ()>,
R: Deref<Target = Registry> + Clone + Send + Sync + 'static,
{
let metrics = Router::new()
.route("/", get(metrics))
.with_state(registry.deref().clone());
let app = Router::new()
.nest("/chat/v1", chat::router(cache, chat_token))
.nest("/metrics", metrics)
.route("/health", get(|| async {}));
// run it
let addr = addr.into();
let server = axum::Server::bind(&addr).serve(app.into_make_service());
let server = server.with_graceful_shutdown(shutdown);
tracing::info!("listening on {}", addr);
match server.await {
Ok(_) => tracing::debug!("webserver shutdown successful"),
Err(e) => tracing::error!(?e, "webserver shutdown error"),
}
Ok(())
}
async fn metrics(State(registry): State<Registry>) -> Result<impl IntoResponse, StatusCode> {
use prometheus::Encoder;
let mf = registry.gather();
let mut buffer = Vec::with_capacity(1024);
let encoder = TextEncoder::new();
encoder
.encode(&mf, &mut buffer)
.expect("write to vec cannot fail");
let resp = http::Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "text/plain; charset=utf-8")
.body(Body::from(buffer))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(resp)
}