diff --git a/server/src/rtsim/mod.rs b/server/src/rtsim/mod.rs index 042f921934..416c8f2598 100644 --- a/server/src/rtsim/mod.rs +++ b/server/src/rtsim/mod.rs @@ -9,6 +9,7 @@ use common::{ terrain::Block, }; use common_ecs::dispatch; +use crossbeam_channel::{unbounded, Receiver, Sender}; use enum_map::EnumMap; use rtsim::{ data::{npc::SimulationMode, Data}, @@ -21,6 +22,7 @@ use std::{ fs::{self, File}, io, path::PathBuf, + thread::{self, JoinHandle}, time::Instant, }; use tracing::{debug, error, info, trace, warn}; @@ -31,6 +33,7 @@ pub struct RtSim { file_path: PathBuf, last_saved: Option, state: RtState, + save_thread: Option<(Sender, JoinHandle<()>)>, } impl RtSim { @@ -114,6 +117,7 @@ impl RtSim { |_| None, ))), file_path, + save_thread: None, }; rule::start_rules(&mut this.state); @@ -189,42 +193,32 @@ impl RtSim { ); } - pub fn save(&mut self, /* slowjob_pool: &SlowJobPool, */ wait_until_finished: bool) { + pub fn save(&mut self, wait_until_finished: bool) { debug!("Saving rtsim data..."); - let file_path = self.file_path.clone(); - let data = self.state.data().clone(); - trace!("Starting rtsim data save job..."); - // TODO: Use slow job - // slowjob_pool.spawn("RTSIM_SAVE", move || { - let handle = std::thread::spawn(move || { - if let Err(e) = file_path - .parent() - .map(|dir| { - fs::create_dir_all(dir)?; - // We write to a temporary file and then rename to avoid corruption. - Ok(dir.join(&file_path)) - }) - .unwrap_or(Ok(file_path)) - .map(|file_path| AtomicFile::new(file_path, OverwriteBehavior::AllowOverwrite)) - .map_err(|e: io::Error| Box::new(e) as Box) - .and_then(|file| { - debug!("Writing rtsim data to file..."); - file.write(move |file| -> Result<(), rtsim::data::WriteError> { - data.write_to(io::BufWriter::new(file))?; - // file.flush()?; - Ok(()) - })?; - drop(file); - debug!("Rtsim data saved."); - Ok(()) - }) - { - error!("Saving rtsim data failed: {}", e); - } + + // Create the save thread if it doesn't already exist + // TODO: Use the slow job pool eventually + let (tx, _) = self.save_thread.get_or_insert_with(|| { + trace!("Starting rtsim data save thread..."); + let (tx, rx) = unbounded(); + let file_path = self.file_path.clone(); + (tx, thread::spawn(move || save_thread(file_path, rx))) }); + // Send rtsim data to the save thread + if let Err(err) = tx.send(self.state.data().clone()) { + error!("Failed to perform rtsim save: {}", err); + } + + // If we need to wait until the save thread has done its work (due to, for + // example, server shutdown) then do that. if wait_until_finished { - handle.join().expect("Save thread failed to join"); + if let Some((tx, handle)) = self.save_thread.take() { + drop(tx); + info!("Waiting for rtsim save thread to finish..."); + handle.join().expect("Save thread failed to join"); + info!("Rtsim save thread finished."); + } } self.last_saved = Some(Instant::now()); @@ -242,6 +236,35 @@ impl RtSim { } } +fn save_thread(file_path: PathBuf, rx: Receiver) { + while let Ok(data) = rx.recv() { + if let Err(e) = file_path + .parent() + .map(|dir| { + fs::create_dir_all(dir)?; + // We write to a temporary file and then rename to avoid corruption. + Ok(dir.join(&file_path)) + }) + .unwrap_or_else(|| Ok(file_path.clone())) + .map(|file_path| AtomicFile::new(file_path, OverwriteBehavior::AllowOverwrite)) + .map_err(|e: io::Error| Box::new(e) as Box) + .and_then(|file| { + debug!("Writing rtsim data to file..."); + file.write(move |file| -> Result<(), rtsim::data::WriteError> { + data.write_to(io::BufWriter::new(file))?; + // file.flush()?; + Ok(()) + })?; + drop(file); + debug!("Rtsim data saved."); + Ok(()) + }) + { + error!("Saving rtsim data failed: {}", e); + } + } +} + pub struct ChunkStates(pub Grid>); pub struct LoadedChunkState {