Move most of the locking work to a seperate tokio spawn thread

This commit is contained in:
Marcel Märtens 2023-10-18 10:05:15 +02:00
parent adeab73876
commit 360cc4a4b7
2 changed files with 43 additions and 16 deletions

View File

@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
use specs::{Join, World, WorldExt}; use specs::{Join, World, WorldExt};
use std::{collections::VecDeque, ops::Sub, sync::Arc, time::Duration}; use std::{collections::VecDeque, ops::Sub, sync::Arc, time::Duration};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::{info_span, Instrument};
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct PlayerInfo { pub struct PlayerInfo {
@ -61,11 +62,17 @@ pub struct ChatCache {
pub messages: MessagesStore, pub messages: MessagesStore,
} }
pub struct ChatExporter { /// Will internally run on tokio and take stress from main loop
struct ChatForwarder {
chat_r: tokio::sync::mpsc::Receiver<ChatMessage>,
messages: MessagesStore, messages: MessagesStore,
keep_duration: chrono::Duration, keep_duration: chrono::Duration,
} }
pub struct ChatExporter {
chat_s: tokio::sync::mpsc::Sender<ChatMessage>,
}
impl ChatMessage { impl ChatMessage {
fn new(chatmsg: &UnresolvedChatMsg, parties: ChatParties) -> Self { fn new(chatmsg: &UnresolvedChatMsg, parties: ChatParties) -> Self {
ChatMessage { ChatMessage {
@ -199,30 +206,50 @@ impl ChatExporter {
} }
pub fn send(&self, msg: ChatMessage) { pub fn send(&self, msg: ChatMessage) {
let drop_older_than = msg.time.sub(self.keep_duration); if let Err(e) = self.chat_s.blocking_send(msg) {
let mut messages = self.messages.blocking_lock(); tracing::warn!(
while let Some(msg) = messages.front() && msg.time < drop_older_than { ?e,
messages.pop_front(); "could not export chat message. the tokio sender seems to be broken"
);
} }
messages.push_back(msg); }
const MAX_CACHE_MESSAGES: usize = 10_000; // in case we have a short spam of many many messages, we dont want to keep the capacity forever }
if messages.capacity() > messages.len() + MAX_CACHE_MESSAGES {
let msg_count = messages.len(); impl ChatForwarder {
tracing::debug!(?msg_count, "shrinking cache"); async fn run(mut self) {
messages.shrink_to_fit(); while let Some(msg) = self.chat_r.recv().await {
let drop_older_than = msg.time.sub(self.keep_duration);
let mut messages = self.messages.lock().await;
while let Some(msg) = messages.front() && msg.time < drop_older_than {
messages.pop_front();
}
messages.push_back(msg);
const MAX_CACHE_MESSAGES: usize = 10_000; // in case we have a short spam of many many messages, we dont want to keep the capacity forever
if messages.capacity() > messages.len() + MAX_CACHE_MESSAGES {
let msg_count = messages.len();
tracing::debug!(?msg_count, "shrinking cache");
messages.shrink_to_fit();
}
} }
} }
} }
impl ChatCache { impl ChatCache {
pub fn new(keep_duration: Duration) -> (Self, ChatExporter) { pub fn new(keep_duration: Duration, runtime: &tokio::runtime::Runtime) -> (Self, ChatExporter) {
const BUFFER_SIZE: usize = 1_000;
let (chat_s, chat_r) = tokio::sync::mpsc::channel(BUFFER_SIZE);
let messages: Arc<Mutex<VecDeque<ChatMessage>>> = Default::default(); let messages: Arc<Mutex<VecDeque<ChatMessage>>> = Default::default();
let messages_clone = Arc::clone(&messages); let messages_clone = Arc::clone(&messages);
let keep_duration = chrono::Duration::from_std(keep_duration).unwrap(); let keep_duration = chrono::Duration::from_std(keep_duration).unwrap();
(Self { messages }, ChatExporter { let worker = ChatForwarder {
messages: messages_clone,
keep_duration, keep_duration,
}) chat_r,
messages: messages_clone,
};
runtime.spawn(worker.run().instrument(info_span!("chat_forwarder")));
(Self { messages }, ChatExporter { chat_s })
} }
} }

View File

@ -484,7 +484,7 @@ impl Server {
state.ecs_mut().insert(DeletedEntities::default()); state.ecs_mut().insert(DeletedEntities::default());
let network = Network::new_with_registry(Pid::new(), &runtime, &registry); let network = Network::new_with_registry(Pid::new(), &runtime, &registry);
let (chat_cache, chat_tracker) = ChatCache::new(Duration::from_secs(60)); let (chat_cache, chat_tracker) = ChatCache::new(Duration::from_secs(60), &runtime);
state.ecs_mut().insert(chat_tracker); state.ecs_mut().insert(chat_tracker);
let mut printed_quic_warning = false; let mut printed_quic_warning = false;