From 360cc4a4b708f501b3f95e7d6a6e75f569bd9356 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Wed, 18 Oct 2023 10:05:15 +0200 Subject: [PATCH] Move most of the locking work to a seperate tokio spawn thread --- server/src/chat.rs | 57 ++++++++++++++++++++++++++++++++++------------ server/src/lib.rs | 2 +- 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/server/src/chat.rs b/server/src/chat.rs index 16fecfbdc4..aa7d8d4be8 100644 --- a/server/src/chat.rs +++ b/server/src/chat.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use specs::{Join, World, WorldExt}; use std::{collections::VecDeque, ops::Sub, sync::Arc, time::Duration}; use tokio::sync::Mutex; +use tracing::{info_span, Instrument}; #[derive(Clone, Serialize, Deserialize)] pub struct PlayerInfo { @@ -61,11 +62,17 @@ pub struct ChatCache { pub messages: MessagesStore, } -pub struct ChatExporter { +/// Will internally run on tokio and take stress from main loop +struct ChatForwarder { + chat_r: tokio::sync::mpsc::Receiver, messages: MessagesStore, keep_duration: chrono::Duration, } +pub struct ChatExporter { + chat_s: tokio::sync::mpsc::Sender, +} + impl ChatMessage { fn new(chatmsg: &UnresolvedChatMsg, parties: ChatParties) -> Self { ChatMessage { @@ -199,30 +206,50 @@ impl ChatExporter { } pub fn send(&self, msg: ChatMessage) { - let drop_older_than = msg.time.sub(self.keep_duration); - let mut messages = self.messages.blocking_lock(); - while let Some(msg) = messages.front() && msg.time < drop_older_than { - messages.pop_front(); + if let Err(e) = self.chat_s.blocking_send(msg) { + tracing::warn!( + ?e, + "could not export chat message. the tokio sender seems to be broken" + ); } - messages.push_back(msg); - const MAX_CACHE_MESSAGES: usize = 10_000; // in case we have a short spam of many many messages, we dont want to keep the capacity forever - if messages.capacity() > messages.len() + MAX_CACHE_MESSAGES { - let msg_count = messages.len(); - tracing::debug!(?msg_count, "shrinking cache"); - messages.shrink_to_fit(); + } +} + +impl ChatForwarder { + async fn run(mut self) { + while let Some(msg) = self.chat_r.recv().await { + let drop_older_than = msg.time.sub(self.keep_duration); + let mut messages = self.messages.lock().await; + while let Some(msg) = messages.front() && msg.time < drop_older_than { + messages.pop_front(); + } + messages.push_back(msg); + const MAX_CACHE_MESSAGES: usize = 10_000; // in case we have a short spam of many many messages, we dont want to keep the capacity forever + if messages.capacity() > messages.len() + MAX_CACHE_MESSAGES { + let msg_count = messages.len(); + tracing::debug!(?msg_count, "shrinking cache"); + messages.shrink_to_fit(); + } } } } impl ChatCache { - pub fn new(keep_duration: Duration) -> (Self, ChatExporter) { + pub fn new(keep_duration: Duration, runtime: &tokio::runtime::Runtime) -> (Self, ChatExporter) { + const BUFFER_SIZE: usize = 1_000; + let (chat_s, chat_r) = tokio::sync::mpsc::channel(BUFFER_SIZE); let messages: Arc>> = Default::default(); let messages_clone = Arc::clone(&messages); let keep_duration = chrono::Duration::from_std(keep_duration).unwrap(); - (Self { messages }, ChatExporter { - messages: messages_clone, + let worker = ChatForwarder { keep_duration, - }) + chat_r, + messages: messages_clone, + }; + + runtime.spawn(worker.run().instrument(info_span!("chat_forwarder"))); + + (Self { messages }, ChatExporter { chat_s }) } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 051abe6e08..df46ee9b78 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -484,7 +484,7 @@ impl Server { state.ecs_mut().insert(DeletedEntities::default()); let network = Network::new_with_registry(Pid::new(), &runtime, ®istry); - let (chat_cache, chat_tracker) = ChatCache::new(Duration::from_secs(60)); + let (chat_cache, chat_tracker) = ChatCache::new(Duration::from_secs(60), &runtime); state.ecs_mut().insert(chat_tracker); let mut printed_quic_warning = false;