Synchronise rtsim saves

This commit is contained in:
Joshua Barretto 2023-04-09 15:35:53 +01:00
parent 67757f3d97
commit c6a7d7aa9b

View File

@ -9,6 +9,7 @@ use common::{
terrain::Block, terrain::Block,
}; };
use common_ecs::dispatch; use common_ecs::dispatch;
use crossbeam_channel::{unbounded, Receiver, Sender};
use enum_map::EnumMap; use enum_map::EnumMap;
use rtsim::{ use rtsim::{
data::{npc::SimulationMode, Data}, data::{npc::SimulationMode, Data},
@ -21,6 +22,7 @@ use std::{
fs::{self, File}, fs::{self, File},
io, io,
path::PathBuf, path::PathBuf,
thread::{self, JoinHandle},
time::Instant, time::Instant,
}; };
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
@ -31,6 +33,7 @@ pub struct RtSim {
file_path: PathBuf, file_path: PathBuf,
last_saved: Option<Instant>, last_saved: Option<Instant>,
state: RtState, state: RtState,
save_thread: Option<(Sender<Data>, JoinHandle<()>)>,
} }
impl RtSim { impl RtSim {
@ -114,6 +117,7 @@ impl RtSim {
|_| None, |_| None,
))), ))),
file_path, file_path,
save_thread: None,
}; };
rule::start_rules(&mut this.state); rule::start_rules(&mut this.state);
@ -189,42 +193,32 @@ impl RtSim {
); );
} }
pub fn save(&mut self, /* slowjob_pool: &SlowJobPool, */ wait_until_finished: bool) { pub fn save(&mut self, wait_until_finished: bool) {
debug!("Saving rtsim data..."); debug!("Saving rtsim data...");
let file_path = self.file_path.clone();
let data = self.state.data().clone(); // Create the save thread if it doesn't already exist
trace!("Starting rtsim data save job..."); // TODO: Use the slow job pool eventually
// TODO: Use slow job let (tx, _) = self.save_thread.get_or_insert_with(|| {
// slowjob_pool.spawn("RTSIM_SAVE", move || { trace!("Starting rtsim data save thread...");
let handle = std::thread::spawn(move || { let (tx, rx) = unbounded();
if let Err(e) = file_path let file_path = self.file_path.clone();
.parent() (tx, thread::spawn(move || save_thread(file_path, rx)))
.map(|dir| {
fs::create_dir_all(dir)?;
// We write to a temporary file and then rename to avoid corruption.
Ok(dir.join(&file_path))
})
.unwrap_or(Ok(file_path))
.map(|file_path| AtomicFile::new(file_path, OverwriteBehavior::AllowOverwrite))
.map_err(|e: io::Error| Box::new(e) as Box<dyn Error>)
.and_then(|file| {
debug!("Writing rtsim data to file...");
file.write(move |file| -> Result<(), rtsim::data::WriteError> {
data.write_to(io::BufWriter::new(file))?;
// file.flush()?;
Ok(())
})?;
drop(file);
debug!("Rtsim data saved.");
Ok(())
})
{
error!("Saving rtsim data failed: {}", e);
}
}); });
// Send rtsim data to the save thread
if let Err(err) = tx.send(self.state.data().clone()) {
error!("Failed to perform rtsim save: {}", err);
}
// If we need to wait until the save thread has done its work (due to, for
// example, server shutdown) then do that.
if wait_until_finished { if wait_until_finished {
handle.join().expect("Save thread failed to join"); if let Some((tx, handle)) = self.save_thread.take() {
drop(tx);
info!("Waiting for rtsim save thread to finish...");
handle.join().expect("Save thread failed to join");
info!("Rtsim save thread finished.");
}
} }
self.last_saved = Some(Instant::now()); self.last_saved = Some(Instant::now());
@ -242,6 +236,35 @@ impl RtSim {
} }
} }
fn save_thread(file_path: PathBuf, rx: Receiver<Data>) {
while let Ok(data) = rx.recv() {
if let Err(e) = file_path
.parent()
.map(|dir| {
fs::create_dir_all(dir)?;
// We write to a temporary file and then rename to avoid corruption.
Ok(dir.join(&file_path))
})
.unwrap_or_else(|| Ok(file_path.clone()))
.map(|file_path| AtomicFile::new(file_path, OverwriteBehavior::AllowOverwrite))
.map_err(|e: io::Error| Box::new(e) as Box<dyn Error>)
.and_then(|file| {
debug!("Writing rtsim data to file...");
file.write(move |file| -> Result<(), rtsim::data::WriteError> {
data.write_to(io::BufWriter::new(file))?;
// file.flush()?;
Ok(())
})?;
drop(file);
debug!("Rtsim data saved.");
Ok(())
})
{
error!("Saving rtsim data failed: {}", e);
}
}
}
pub struct ChunkStates(pub Grid<Option<LoadedChunkState>>); pub struct ChunkStates(pub Grid<Option<LoadedChunkState>>);
pub struct LoadedChunkState { pub struct LoadedChunkState {