diff --git a/common/src/lib.rs b/common/src/lib.rs index 3f7a5f35a9..504ba44005 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -76,6 +76,7 @@ pub mod uid; #[cfg(not(target_arch = "wasm32"))] pub mod vol; #[cfg(not(target_arch = "wasm32"))] pub mod volumes; +pub mod slowjob; pub use combat::DamageSource; #[cfg(not(target_arch = "wasm32"))] diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs new file mode 100644 index 0000000000..d88dadee9f --- /dev/null +++ b/common/src/slowjob.rs @@ -0,0 +1,100 @@ +use std::sync::RwLock; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::atomic::{Ordering, AtomicU64}; +use core::any::Any; +use crossbeam_channel::{Receiver, Sender}; + +/// a slow job is a CPU heavy task, that is not I/O blocking. +/// It usually takes longer than a tick to compute, so it's outsourced +/// Internally the rayon threadpool is used to calculate t +pub struct SlowJobPool { + next_id: AtomicU64, + queue: RwLock>>, + finished: RwLock>>>, + running_jobs: RwLock>>, + receiver: Receiver<(String, Box)>, + sender: Sender<(String, Box)>, +} + +pub struct SlowJob { + name: String, + id: u64, +} + +struct Queue { + task: Box ()>, + running_cnt: Arc, +} + + +impl SlowJobPool { + pub fn new() -> Self { + let (sender,receiver) = crossbeam_channel::unbounded(); + Self { + next_id: AtomicU64::new(0), + queue: RwLock::new(HashMap::new()), + finished: RwLock::new(HashMap::new()), + receiver, + sender + } + } + + /// spawn a new slow job + pub fn spawn(&self, name: &str, f: F) -> SlowJob where + F: FnOnce() -> D + Send + 'static, + D: Any + 'static, + { + let id = self.next_id.fetch_add(1, Ordering::Relaxed); + let running_cnt = { + let mut lock = self.running_jobs.write().unwrap(); + lock.entry(name.to_string()).or_default().clone() + }; + let running_cnt_clone = Arc::clone(&running_cnt); + let sender = self.sender.clone(); + let name_clone = name.to_string(); + let queue = Queue { + task: Box::new(move || { + let result = f(); + let _ = sender.send((name_clone, Box::new(result))); + running_cnt_clone.fetch_sub(1, Ordering::Relaxed); + }), + running_cnt, + }; + { + let mut lock = self.queue.write().unwrap(); + lock.entry(name.to_string()).or_default().insert(id, queue); + } + SlowJob { + name: name.to_string(), + id, + } + } + + pub fn cancel(&self, job: SlowJob) { + let mut lock = self.queue.write().unwrap(); + if let Some(map) = lock.get_mut(&job.name) { + map.remove(&job.id); + } + } + + /// collect all slow jobs finished + pub fn collect(&self, name: &str) -> Vec> { + let mut lock = self.finished.write().unwrap(); + for (name, data) in self.receiver.try_iter() { + lock.entry(name).or_default().push(data); + } + lock.remove(name).unwrap_or_default() + } + + fn maintain(&self) { + /* + let mut lock = self.queue.write().unwrap(); + if let Some(map) = lock.get_mut(&job.name) { + map.remove(&job.id); + } + */ + + //let d = rayon::spawn(f); + } +} \ No newline at end of file