diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs index d88dadee9f..ff41d8a0ec 100644 --- a/common/src/slowjob.rs +++ b/common/src/slowjob.rs @@ -5,16 +5,33 @@ use std::sync::atomic::{Ordering, AtomicU64}; use core::any::Any; use crossbeam_channel::{Receiver, Sender}; +type LimitFn = dyn Fn(u64) -> u64; + +/// a slow job is a CPU heavy task, that is not I/O blocking. +/// It usually takes longer than a tick to compute, so it's outsourced +/// Internally the rayon threadpool is used to calculate t +pub struct SlowJobGroup { + name: String, + next_id: Arc, + queue: Arc>>, + local_running_jobs: Arc, + global_running_jobs: Arc, + receiver: Arc>, + sender: Arc>, +} + /// a slow job is a CPU heavy task, that is not I/O blocking. /// It usually takes longer than a tick to compute, so it's outsourced /// Internally the rayon threadpool is used to calculate t pub struct SlowJobPool { next_id: AtomicU64, + groups: RwLock>>>>, queue: RwLock>>, finished: RwLock>>>, running_jobs: RwLock>>, receiver: Receiver<(String, Box)>, sender: Sender<(String, Box)>, + global_limit: Box, } pub struct SlowJob { @@ -27,44 +44,28 @@ struct Queue { running_cnt: Arc, } - -impl SlowJobPool { - pub fn new() -> Self { - let (sender,receiver) = crossbeam_channel::unbounded(); - Self { - next_id: AtomicU64::new(0), - queue: RwLock::new(HashMap::new()), - finished: RwLock::new(HashMap::new()), - receiver, - sender - } - } - +impl SlowJobGroup where + D: Any + Send + 'static +{ /// spawn a new slow job - pub fn spawn(&self, name: &str, f: F) -> SlowJob where - F: FnOnce() -> D + Send + 'static, - D: Any + 'static, + pub fn spawn(&self, name: &str, f: F) -> SlowJob where + F: FnOnce() -> D + 'static, { let id = self.next_id.fetch_add(1, Ordering::Relaxed); - let running_cnt = { - let mut lock = self.running_jobs.write().unwrap(); - lock.entry(name.to_string()).or_default().clone() - }; - let running_cnt_clone = Arc::clone(&running_cnt); + let local_running_jobs_clone = Arc::clone(&self.local_running_jobs); + let global_running_jobs_clone = Arc::clone(&self.global_running_jobs); let sender = self.sender.clone(); let name_clone = name.to_string(); let queue = Queue { task: Box::new(move || { let result = f(); - let _ = sender.send((name_clone, Box::new(result))); - running_cnt_clone.fetch_sub(1, Ordering::Relaxed); + let _ = sender.send((name_clone, result)); + local_running_jobs_clone.fetch_sub(1, Ordering::Relaxed); + global_running_jobs_clone.fetch_sub(1, Ordering::Relaxed); }), - running_cnt, + running_cnt: Arc::clone(&self.local_running_jobs), }; - { - let mut lock = self.queue.write().unwrap(); - lock.entry(name.to_string()).or_default().insert(id, queue); - } + self.queue.write().unwrap().insert(id, queue); SlowJob { name: name.to_string(), id, @@ -72,19 +73,41 @@ impl SlowJobPool { } pub fn cancel(&self, job: SlowJob) { - let mut lock = self.queue.write().unwrap(); - if let Some(map) = lock.get_mut(&job.name) { - map.remove(&job.id); - } + self.queue.write().unwrap().remove(&job.id); } /// collect all slow jobs finished - pub fn collect(&self, name: &str) -> Vec> { - let mut lock = self.finished.write().unwrap(); + pub fn collect(&self, name: &str) -> Vec { + let mut result = vec!(); for (name, data) in self.receiver.try_iter() { - lock.entry(name).or_default().push(data); + result.push(data); } - lock.remove(name).unwrap_or_default() + result + } +} + + +impl SlowJobPool { + pub fn new() -> Self { + let (sender,receiver) = crossbeam_channel::unbounded(); + Self { + next_id: AtomicU64::new(0), + groups: RwLock::new(HashMap::new()), + queue: RwLock::new(HashMap::new()), + finished: RwLock::new(HashMap::new()), + running_jobs: RwLock::new(HashMap::new()), + receiver, + sender, + global_limit: Box::new(|n| n/2 + n/4), + } + } + + pub fn get(&self, name: &str) -> Arc> where D: Sized + Send + 'static { + let lock = self.groups.write().unwrap(); + if let Some(group) = lock.get(name) { + if group.type_id() == Arc> + }; + panic!("Unconfigured group name!"); } fn maintain(&self) {