diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs index fac0c98846..fa77b03ba6 100644 --- a/common/src/slowjob.rs +++ b/common/src/slowjob.rs @@ -1,9 +1,10 @@ use hashbrown::HashMap; use rayon::ThreadPool; -use std::sync::{ - atomic::{AtomicI64, AtomicU64, Ordering}, - Arc, RwLock, +use std::{ + collections::VecDeque, + sync::{Arc, Mutex}, }; +use tracing::{error, warn}; /// Provides a Wrapper around rayon threadpool to execute slow-jobs. /// slow means, the job doesn't need to not complete within the same tick. @@ -11,11 +12,26 @@ use std::sync::{ /// Jobs run here, will reduce the ammount of threads rayon can use during the /// main tick. /// +/// ## Configuration /// This Pool allows you to configure certain names of jobs and assign them a /// maximum number of threads # Example /// Your system has 16 cores, you assign 12 cores for slow-jobs. /// Then you can configure all jobs with the name `CHUNK_GENERATOR` to spawn on -/// max 50% (6 = cores) ```rust +/// max 50% (6 = cores) +/// +/// ## Spawn Order +/// - At least 1 job of a configuration is allowed to run if global limit isn't +/// hit. +/// - remaining capacities are spread in relation to their limit. e.g. a +/// configuration with double the limit will be sheduled to spawn double the +/// tasks, starting by a round robin. +/// +/// ## States +/// - queued +/// - spawned +/// - started +/// - finished +/// ``` /// # use veloren_common::slowjob::SlowJobPool; /// # use std::sync::Arc; /// @@ -25,126 +41,264 @@ use std::sync::{ /// .unwrap(); /// let pool = SlowJobPool::new(3, Arc::new(threadpool)); /// pool.configure("CHUNK_GENERATOR", |n| n / 2); -/// pool.spawn("CHUNK_GENERATOR", move || println("this is a job")); +/// pool.spawn("CHUNK_GENERATOR", move || println!("this is a job")); /// ``` #[derive(Clone)] pub struct SlowJobPool { - internal: Arc, + internal: Arc>, } +#[derive(Debug)] pub struct SlowJob { name: String, id: u64, } struct InternalSlowJobPool { - next_id: Arc, - queue: RwLock>>, - running_jobs: RwLock>>, - configs: RwLock>, - global_running_jobs: Arc, + next_id: u64, + queue: HashMap>, + configs: HashMap, + last_spawned_configs: Vec, + global_spawned_and_running: u64, global_limit: u64, threadpool: Arc, + internal: Option>>, } +#[derive(Debug)] struct Config { - max_local: u64, - spawned_total: Arc, + local_limit: u64, + local_spawned_and_running: u64, } struct Queue { + id: u64, + name: String, task: Box, - spawned_total: Arc, - local_running_jobs: Arc, +} + +impl Queue { + fn new(name: &str, id: u64, internal: &Arc>, f: F) -> Self + where + F: FnOnce() + Send + Sync + 'static, + { + let internal = Arc::clone(&internal); + let name_cloned = name.to_owned(); + Self { + id, + name: name.to_owned(), + task: Box::new(move || { + common_base::prof_span!(_guard, &name_cloned); + f(); + // directly maintain the next task afterwards + { + let mut lock = internal.lock().expect("slowjob lock poisoned"); + lock.finish(&name_cloned); + lock.spawn_queued(); + } + }), + } + } } impl InternalSlowJobPool { - pub fn new(global_limit: u64, threadpool: Arc) -> Self { - Self { - next_id: Arc::new(AtomicU64::new(0)), - queue: RwLock::new(HashMap::new()), - running_jobs: RwLock::new(HashMap::new()), - configs: RwLock::new(HashMap::new()), - global_running_jobs: Arc::new(AtomicI64::new(0)), + pub fn new(global_limit: u64, threadpool: Arc) -> Arc> { + let link = Arc::new(Mutex::new(Self { + next_id: 0, + queue: HashMap::new(), + configs: HashMap::new(), + last_spawned_configs: Vec::new(), + global_spawned_and_running: 0, global_limit: global_limit.max(1), threadpool, - } + internal: None, + })); + + let link_clone = Arc::clone(&link); + link.lock() + .expect("poisoned on InternalSlowJobPool::new") + .internal = Some(link_clone); + link } - fn maintain(&self) { - let jobs_available = - self.global_limit as i64 - self.global_running_jobs.load(Ordering::Relaxed); - if jobs_available < 0 { - tracing::warn!(?jobs_available, "Some math is wrong in slowjob code"); - } - if jobs_available <= 0 { - // we run at limit, can't spawn - return; - } - let possible = { - let lock = self.queue.read().unwrap(); - lock.iter() - .map(|(name, queues)| { - if !queues.is_empty() { - Some(name.clone()) - } else { - None - } - }) - .flatten() - .collect::>() - }; - - let mut possible_total = { - let mut possible = possible; - let lock = self.configs.read().unwrap(); - possible - .drain(..) - .map(|name| { - let c = lock.get(&name).unwrap(); - ( - name, - c.spawned_total.load(Ordering::Relaxed) / c.max_local, - c.max_local, - ) - }) - .collect::>() - }; - possible_total.sort_by_key(|(_, i, _)| *i); - - let mut lock = self.queue.write().unwrap(); - for i in 0..jobs_available as usize { - if let Some((name, _, max)) = possible_total.get(i) { - if let Some(map) = lock.get_mut(name) { - let firstkey = match map.keys().next() { - Some(k) => *k, - None => continue, - }; - - if let Some(queue) = map.remove(&firstkey) { - if queue.local_running_jobs.load(Ordering::Relaxed) < *max as i64 { - self.fire(queue); - } else { - map.insert(firstkey, queue); - } + /// returns order of configuration which are queued next + fn calc_queued_order( + &self, + mut queued: HashMap<&String, u64>, + mut limit: usize, + ) -> Vec { + let mut roundrobin = self.last_spawned_configs.clone(); + let mut result = vec![]; + let spawned = self + .configs + .iter() + .map(|(n, c)| (n, c.local_spawned_and_running)) + .collect::>(); + let mut queried_capped = self + .configs + .iter() + .map(|(n, c)| { + ( + n, + queued + .get(&n) + .cloned() + .unwrap_or(0) + .min(c.local_limit - c.local_spawned_and_running), + ) + }) + .collect::>(); + // grab all configs that are queued and not running. in roundrobin order + for n in roundrobin.clone().into_iter() { + if let Some(c) = queued.get_mut(&n) { + if *c > 0 && spawned.get(&n).cloned().unwrap_or(0) == 0 { + result.push(n.clone()); + *c -= 1; + limit -= 1; + queried_capped.get_mut(&n).map(|v| *v -= 1); + roundrobin + .iter() + .position(|e| e == &n) + .map(|i| roundrobin.remove(i)); + roundrobin.push(n); + if limit == 0 { + return result; } } } } + //schedule rest based on their possible limites, don't use round robin here + let total_limit = queried_capped.values().sum::() as f32; + if total_limit < f32::EPSILON { + return result; + } + let mut spawn_rates = queried_capped + .iter() + .map(|(&n, l)| (n, ((*l as f32 * limit as f32) / total_limit).min(*l as f32))) + .collect::>(); + while limit > 0 { + spawn_rates.sort_by(|(_, a), (_, b)| { + if b < a { + core::cmp::Ordering::Less + } else if (b - a).abs() < f32::EPSILON { + core::cmp::Ordering::Equal + } else { + core::cmp::Ordering::Greater + } + }); + match spawn_rates.first_mut() { + Some((n, r)) => { + if *r > f32::EPSILON { + result.push(n.clone()); + limit -= 1; + *r -= 1.0; + } else { + break; + } + }, + None => break, + } + } + result } - fn fire(&self, queue: Queue) { - queue.spawned_total.fetch_add(1, Ordering::Relaxed); - queue.local_running_jobs.fetch_add(1, Ordering::Relaxed); - self.global_running_jobs.fetch_add(1, Ordering::Relaxed); - self.threadpool.spawn(queue.task); + fn can_spawn(&self, name: &str) -> bool { + let queued = self + .queue + .iter() + .map(|(n, m)| (n, m.len() as u64)) + .collect::>(); + let mut to_be_queued = queued.clone(); + let name = name.to_owned(); + *to_be_queued.entry(&name).or_default() += 1; + let limit = (self.global_limit - self.global_spawned_and_running) as usize; + // calculate to_be_queued first + let to_be_queued_order = self.calc_queued_order(to_be_queued, limit); + let queued_order = self.calc_queued_order(queued, limit); + // if its queued one time more then its okay to spawn + let to_be_queued_cnt = to_be_queued_order + .into_iter() + .filter(|n| n == &name) + .count(); + let queued_cnt = queued_order.into_iter().filter(|n| n == &name).count(); + to_be_queued_cnt > queued_cnt + } + + pub fn spawn(&mut self, name: &str, f: F) -> SlowJob + where + F: FnOnce() + Send + Sync + 'static, + { + let id = self.next_id; + self.next_id += 1; + let queue = Queue::new(name, id, self.internal.as_ref().expect("internal empty"), f); + self.queue + .entry(name.to_string()) + .or_default() + .push_back(queue); + debug_assert!( + self.configs.contains_key(name), + "Can't spawn unconfigured task!" + ); + //spawn already queued + self.spawn_queued(); + SlowJob { + name: name.to_string(), + id, + } + } + + fn finish(&mut self, name: &str) { + self.global_spawned_and_running -= 1; + if let Some(c) = self.configs.get_mut(name) { + c.local_spawned_and_running -= 1; + } else { + warn!(?name, "sync_maintain on a no longer existing config"); + } + } + + fn spawn_queued(&mut self) { + let queued = self + .queue + .iter() + .map(|(n, m)| (n, m.len() as u64)) + .collect::>(); + let limit = self.global_limit as usize; + let queued_order = self.calc_queued_order(queued, limit); + for name in queued_order.into_iter() { + match self.queue.get_mut(&name) { + Some(deque) => match deque.pop_front() { + Some(queue) => { + //fire + self.global_spawned_and_running += 1; + self.configs + .get_mut(&queue.name) + .expect("cannot fire a unconfigured job") + .local_spawned_and_running += 1; + self.last_spawned_configs + .iter() + .position(|e| e == &queue.name) + .map(|i| self.last_spawned_configs.remove(i)); + self.last_spawned_configs.push(queue.name.to_owned()); + self.threadpool.spawn(queue.task); + }, + None => error!( + "internal calculation is wrong, we extected a schedulable job to be \ + present in the queue" + ), + }, + None => error!( + "internal calculation is wrong, we marked a queue as schedulable which \ + doesn't exist" + ), + } + } } } impl SlowJobPool { pub fn new(global_limit: u64, threadpool: Arc) -> Self { Self { - internal: Arc::new(InternalSlowJobPool::new(global_limit, threadpool)), + internal: InternalSlowJobPool::new(global_limit, threadpool), } } @@ -154,237 +308,379 @@ impl SlowJobPool { where F: Fn(u64) -> u64, { + let mut lock = self.internal.lock().expect("lock poisoned while configure"); let cnf = Config { - max_local: f(self.internal.global_limit).max(1), - spawned_total: Arc::new(AtomicU64::new(0)), + local_limit: f(lock.global_limit).max(1), + local_spawned_and_running: 0, }; - let mut lock = self.internal.configs.write().unwrap(); - lock.insert(name.to_string(), cnf); + lock.configs.insert(name.to_owned(), cnf); + lock.last_spawned_configs.push(name.to_owned()); + } + + /// spawn a new slow job on a certain NAME IF it can run immediately + #[allow(clippy::result_unit_err)] + pub fn try_run(&self, name: &str, f: F) -> Result + where + F: FnOnce() + Send + Sync + 'static, + { + let mut lock = self.internal.lock().expect("lock poisoned while try_run"); + //spawn already queued + lock.spawn_queued(); + if lock.can_spawn(name) { + Ok(lock.spawn(name, f)) + } else { + Err(()) + } } - /// spawn a new slow job on a certain NAME pub fn spawn(&self, name: &str, f: F) -> SlowJob where F: FnOnce() + Send + Sync + 'static, { - let id = self.internal.next_id.fetch_add(1, Ordering::Relaxed); self.internal - .queue - .write() - .unwrap() - .entry(name.to_string()) - .or_default() - .insert(id, self.queue(name, f)); - self.maintain(); - SlowJob { - name: name.to_string(), - id, - } + .lock() + .expect("lock poisoned while spawn") + .spawn(name, f) } - fn queue(&self, name: &str, f: F) -> Queue - where - F: FnOnce() + Send + Sync + 'static, - { - let internal = Arc::clone(&self.internal); - let spawned_total = Arc::clone( - &self - .internal - .configs - .read() - .unwrap() - .get(name) - .expect("can't spawn a non-configued slowjob") - .spawned_total, - ); - let local_running_jobs_clone = { - let mut lock = self.internal.running_jobs.write().unwrap(); - Arc::clone(&lock.entry(name.to_string()).or_default()) - }; - let local_running_jobs = Arc::clone(&local_running_jobs_clone); - let global_running_jobs_clone = Arc::clone(&self.internal.global_running_jobs); - let _name_clones = name.to_string(); - Queue { - task: Box::new(move || { - common_base::prof_span!(_guard, &_name_clones); - f(); - local_running_jobs_clone.fetch_sub(1, Ordering::Relaxed); - global_running_jobs_clone.fetch_sub(1, Ordering::Relaxed); - // directly maintain the next task afterwards - internal.maintain(); - }), - spawned_total, - local_running_jobs, + pub fn cancel(&self, job: SlowJob) -> Result<(), SlowJob> { + let mut lock = self.internal.lock().expect("lock poisoned while cancel"); + if let Some(m) = lock.queue.get_mut(&job.name) { + let p = match m.iter().position(|p| p.id == job.id) { + Some(p) => p, + None => return Err(job), + }; + if m.remove(p).is_some() { + return Ok(()); + } } + Err(job) } - - pub fn cancel(&self, job: SlowJob) { - let mut lock = self.internal.queue.write().unwrap(); - if let Some(map) = lock.get_mut(&job.name) { - map.remove(&job.id); - } - } - - fn maintain(&self) { self.internal.maintain() } } #[cfg(test)] mod tests { use super::*; - use std::{ - sync::Mutex, - time::{Duration, Instant}, - }; - fn mock_fn( - name: &str, - start_time: &Arc>>, - done: &Arc, - ) -> impl FnOnce() { - let name = name.to_string(); - let start_time = Arc::clone(start_time); - let done = Arc::clone(done); - move || { - println!("Start {}", name); - *start_time.lock().unwrap() = Some(Instant::now()); - std::thread::sleep(Duration::from_millis(500)); - done.fetch_add(1, Ordering::Relaxed); - println!("Finished {}", name); + #[allow(clippy::blacklisted_name)] + fn mock_pool( + pool_threads: usize, + global_threads: u64, + foo: u64, + bar: u64, + baz: u64, + ) -> SlowJobPool { + let threadpool = rayon::ThreadPoolBuilder::new() + .num_threads(pool_threads) + .build() + .unwrap(); + let pool = SlowJobPool::new(global_threads, Arc::new(threadpool)); + if foo != 0 { + pool.configure("FOO", |x| x / foo); } + if bar != 0 { + pool.configure("BAR", |x| x / bar); + } + if baz != 0 { + pool.configure("BAZ", |x| x / baz); + } + pool } #[test] - fn global_limit() { - let threadpool = rayon::ThreadPoolBuilder::new() - .num_threads(4) - .build() - .unwrap(); - let pool = SlowJobPool::new(3, Arc::new(threadpool)); - pool.configure("FOO", |_| 1000); - let start = Instant::now(); - let f1 = Arc::new(Mutex::new(None)); - let f2 = Arc::new(Mutex::new(None)); - let f3 = Arc::new(Mutex::new(None)); - let f4 = Arc::new(Mutex::new(None)); - let f5 = Arc::new(Mutex::new(None)); - let f6 = Arc::new(Mutex::new(None)); - let f7 = Arc::new(Mutex::new(None)); - let done = Arc::new(AtomicI64::new(0)); - pool.spawn("FOO", mock_fn("foo1", &f1, &done)); - pool.spawn("FOO", mock_fn("foo2", &f2, &done)); - pool.spawn("FOO", mock_fn("foo3", &f3, &done)); - std::thread::sleep(Duration::from_millis(300)); - pool.spawn("FOO", mock_fn("foo4", &f4, &done)); - pool.spawn("FOO", mock_fn("foo5", &f5, &done)); - pool.spawn("FOO", mock_fn("foo6", &f6, &done)); - std::thread::sleep(Duration::from_millis(300)); - pool.spawn("FOO", mock_fn("foo7", &f7, &done)); - std::thread::sleep(Duration::from_secs(1)); - let measure = |a: Arc>>, s: Instant| { - a.lock().unwrap().unwrap().duration_since(s).as_millis() - }; - let f1 = measure(f1, start); - let f2 = measure(f2, start); - let f3 = measure(f3, start); - let f4 = measure(f4, start); - let f5 = measure(f5, start); - let f6 = measure(f6, start); - let f7 = measure(f7, start); - assert_eq!(done.load(Ordering::Relaxed), 7); - assert!(f1 < 500); - assert!(f2 < 500); - assert!(f3 < 500); - assert!(f4 < 1000); - assert!(f5 < 1000); - assert!(f6 < 1000); - assert!(f7 < 1500); + fn simple_queue() { + let pool = mock_pool(4, 4, 1, 0, 0); + let internal = pool.internal.lock().unwrap(); + let queue_data = [("FOO", 1u64)] + .iter() + .map(|(n, c)| ((*n).to_owned(), *c)) + .collect::>(); + let queued = queue_data + .iter() + .map(|(s, c)| (s, *c)) + .collect::>(); + let result = internal.calc_queued_order(queued, 4); + assert_eq!(result.len(), 1); + assert_eq!(result[0], "FOO"); } #[test] - fn local_limit() { - let threadpool = rayon::ThreadPoolBuilder::new() - .num_threads(4) - .build() - .unwrap(); - let pool = SlowJobPool::new(100, Arc::new(threadpool)); - pool.configure("FOO", |_| 3); - let start = Instant::now(); - let f1 = Arc::new(Mutex::new(None)); - let f2 = Arc::new(Mutex::new(None)); - let f3 = Arc::new(Mutex::new(None)); - let f4 = Arc::new(Mutex::new(None)); - let f5 = Arc::new(Mutex::new(None)); - let f6 = Arc::new(Mutex::new(None)); - let f7 = Arc::new(Mutex::new(None)); - let done = Arc::new(AtomicI64::new(0)); - pool.spawn("FOO", mock_fn("foo1", &f1, &done)); - pool.spawn("FOO", mock_fn("foo2", &f2, &done)); - pool.spawn("FOO", mock_fn("foo3", &f3, &done)); - std::thread::sleep(Duration::from_millis(300)); - pool.spawn("FOO", mock_fn("foo4", &f4, &done)); - pool.spawn("FOO", mock_fn("foo5", &f5, &done)); - pool.spawn("FOO", mock_fn("foo6", &f6, &done)); - std::thread::sleep(Duration::from_millis(300)); - pool.spawn("FOO", mock_fn("foo7", &f7, &done)); - std::thread::sleep(Duration::from_secs(1)); - let measure = |a: Arc>>, s: Instant| { - a.lock().unwrap().unwrap().duration_since(s).as_millis() - }; - let f1 = measure(f1, start); - let f2 = measure(f2, start); - let f3 = measure(f3, start); - let f4 = measure(f4, start); - let f5 = measure(f5, start); - let f6 = measure(f6, start); - let f7 = measure(f7, start); - assert_eq!(done.load(Ordering::Relaxed), 7); - assert!(f1 < 500); - assert!(f2 < 500); - assert!(f3 < 500); - assert!(f4 < 1000); - assert!(f5 < 1000); - assert!(f6 < 1000); - assert!(f7 < 1500); + fn multiple_queue() { + let pool = mock_pool(4, 4, 1, 0, 0); + let internal = pool.internal.lock().unwrap(); + let queue_data = [("FOO", 2u64)] + .iter() + .map(|(n, c)| ((*n).to_owned(), *c)) + .collect::>(); + let queued = queue_data + .iter() + .map(|(s, c)| (s, *c)) + .collect::>(); + let result = internal.calc_queued_order(queued, 4); + assert_eq!(result.len(), 2); + assert_eq!(result[0], "FOO"); + assert_eq!(result[1], "FOO"); } #[test] - fn pool() { - let threadpool = rayon::ThreadPoolBuilder::new() - .num_threads(2) - .build() - .unwrap(); - let pool = SlowJobPool::new(2, Arc::new(threadpool)); - pool.configure("FOO", |n| n); - pool.configure("BAR", |n| n / 2); - let start = Instant::now(); - let f1 = Arc::new(Mutex::new(None)); - let f2 = Arc::new(Mutex::new(None)); - let b1 = Arc::new(Mutex::new(None)); - let b2 = Arc::new(Mutex::new(None)); - let done = Arc::new(AtomicI64::new(0)); - pool.spawn("FOO", mock_fn("foo1", &f1, &done)); - pool.spawn("FOO", mock_fn("foo2", &f2, &done)); - std::thread::sleep(Duration::from_millis(1000)); - pool.spawn("BAR", mock_fn("bar1", &b1, &done)); - pool.spawn("BAR", mock_fn("bar2", &b2, &done)); - std::thread::sleep(Duration::from_secs(2)); - let measure = |a: Arc>>, s: Instant| { - a.lock().unwrap().unwrap().duration_since(s).as_millis() + fn limit_queue() { + let pool = mock_pool(5, 5, 1, 0, 0); + let internal = pool.internal.lock().unwrap(); + let queue_data = [("FOO", 80u64)] + .iter() + .map(|(n, c)| ((*n).to_owned(), *c)) + .collect::>(); + let queued = queue_data + .iter() + .map(|(s, c)| (s, *c)) + .collect::>(); + let result = internal.calc_queued_order(queued, 4); + assert_eq!(result.len(), 4); + assert_eq!(result[0], "FOO"); + assert_eq!(result[1], "FOO"); + assert_eq!(result[2], "FOO"); + assert_eq!(result[3], "FOO"); + } + + #[test] + fn simple_queue_2() { + let pool = mock_pool(4, 4, 1, 1, 0); + let internal = pool.internal.lock().unwrap(); + let queue_data = [("FOO", 1u64), ("BAR", 1u64)] + .iter() + .map(|(n, c)| ((*n).to_owned(), *c)) + .collect::>(); + let queued = queue_data + .iter() + .map(|(s, c)| (s, *c)) + .collect::>(); + let result = internal.calc_queued_order(queued, 4); + assert_eq!(result.len(), 2); + assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 1); + assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 1); + } + + #[test] + fn multiple_queue_3() { + let pool = mock_pool(4, 4, 1, 1, 0); + let internal = pool.internal.lock().unwrap(); + let queue_data = [("FOO", 2u64), ("BAR", 2u64)] + .iter() + .map(|(n, c)| ((*n).to_owned(), *c)) + .collect::>(); + let queued = queue_data + .iter() + .map(|(s, c)| (s, *c)) + .collect::>(); + let result = internal.calc_queued_order(queued, 4); + assert_eq!(result.len(), 4); + assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2); + assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 2); + } + + #[test] + fn multiple_queue_4() { + let pool = mock_pool(4, 4, 2, 1, 0); + let internal = pool.internal.lock().unwrap(); + let queue_data = [("FOO", 3u64), ("BAR", 3u64)] + .iter() + .map(|(n, c)| ((*n).to_owned(), *c)) + .collect::>(); + let queued = queue_data + .iter() + .map(|(s, c)| (s, *c)) + .collect::>(); + let result = internal.calc_queued_order(queued, 4); + assert_eq!(result.len(), 4); + assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2); + assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 2); + } + + #[test] + fn multiple_queue_5() { + let pool = mock_pool(4, 4, 2, 1, 0); + let internal = pool.internal.lock().unwrap(); + let queue_data = [("FOO", 5u64), ("BAR", 5u64)] + .iter() + .map(|(n, c)| ((*n).to_owned(), *c)) + .collect::>(); + let queued = queue_data + .iter() + .map(|(s, c)| (s, *c)) + .collect::>(); + let result = internal.calc_queued_order(queued, 5); + assert_eq!(result.len(), 5); + assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2); + assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 3); + } + + #[test] + fn multiple_queue_6() { + let pool = mock_pool(40, 40, 2, 1, 0); + let internal = pool.internal.lock().unwrap(); + let queue_data = [("FOO", 5u64), ("BAR", 5u64)] + .iter() + .map(|(n, c)| ((*n).to_owned(), *c)) + .collect::>(); + let queued = queue_data + .iter() + .map(|(s, c)| (s, *c)) + .collect::>(); + let result = internal.calc_queued_order(queued, 11); + assert_eq!(result.len(), 10); + assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 5); + assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 5); + } + + #[test] + fn roundrobin() { + let pool = mock_pool(4, 4, 2, 2, 0); + let queue_data = [("FOO", 5u64), ("BAR", 5u64)] + .iter() + .map(|(n, c)| ((*n).to_owned(), *c)) + .collect::>(); + let queued = queue_data + .iter() + .map(|(s, c)| (s, *c)) + .collect::>(); + // Spawn a FOO task. + pool.internal + .lock() + .unwrap() + .spawn("FOO", || println!("foo")); + // a barrier in f doesnt work as we need to wait for the cleanup + while pool.internal.lock().unwrap().global_spawned_and_running != 0 { + std::thread::yield_now(); + } + let result = pool + .internal + .lock() + .unwrap() + .calc_queued_order(queued.clone(), 1); + assert_eq!(result.len(), 1); + assert_eq!(result[0], "BAR"); + // keep order if no new is spawned + let result = pool + .internal + .lock() + .unwrap() + .calc_queued_order(queued.clone(), 1); + assert_eq!(result.len(), 1); + assert_eq!(result[0], "BAR"); + // spawn a BAR task + pool.internal + .lock() + .unwrap() + .spawn("BAR", || println!("bar")); + while pool.internal.lock().unwrap().global_spawned_and_running != 0 { + std::thread::yield_now(); + } + let result = pool.internal.lock().unwrap().calc_queued_order(queued, 1); + assert_eq!(result.len(), 1); + assert_eq!(result[0], "FOO"); + } + + #[test] + #[should_panic] + fn unconfigured() { + let pool = mock_pool(4, 4, 2, 1, 0); + let mut internal = pool.internal.lock().unwrap(); + internal.spawn("UNCONFIGURED", || println!()); + } + + #[test] + fn correct_spawn_doesnt_panic() { + let pool = mock_pool(4, 4, 2, 1, 0); + let mut internal = pool.internal.lock().unwrap(); + internal.spawn("FOO", || println!("foo")); + internal.spawn("BAR", || println!("bar")); + } + + #[test] + fn can_spawn() { + let pool = mock_pool(4, 4, 2, 1, 0); + let internal = pool.internal.lock().unwrap(); + assert!(internal.can_spawn("FOO")); + assert!(internal.can_spawn("BAR")); + } + + #[test] + fn try_run_works() { + let pool = mock_pool(4, 4, 2, 1, 0); + pool.try_run("FOO", || println!("foo")).unwrap(); + pool.try_run("BAR", || println!("bar")).unwrap(); + } + + #[test] + fn try_run_exhausted() { + use std::{thread::sleep, time::Duration}; + let pool = mock_pool(8, 8, 4, 2, 0); + let func = || loop { + sleep(Duration::from_secs(1)) }; - let f1 = measure(f1, start); - let f2 = measure(f2, start); - let b1 = measure(b1, start); - let b2 = measure(b2, start); - // Expect: - // [F1, F2] - // [B1] - // [B2] - assert_eq!(done.load(Ordering::Relaxed), 4); - assert!(f1 < 500); - assert!(f2 < 500); - println!("b1 {}", b1); - println!("b2 {}", b2); - assert!((1000..1500).contains(&b1)); - assert!((1500..2000).contains(&b2)); + pool.try_run("FOO", func).unwrap(); + pool.try_run("BAR", func).unwrap(); + pool.try_run("FOO", func).unwrap(); + pool.try_run("BAR", func).unwrap(); + pool.try_run("FOO", func).unwrap_err(); + pool.try_run("BAR", func).unwrap(); + pool.try_run("FOO", func).unwrap_err(); + pool.try_run("BAR", func).unwrap(); + pool.try_run("FOO", func).unwrap_err(); + pool.try_run("BAR", func).unwrap_err(); + pool.try_run("FOO", func).unwrap_err(); + } + + #[test] + fn actually_runs_1() { + let pool = mock_pool(4, 4, 0, 0, 1); + let barrier = Arc::new(std::sync::Barrier::new(2)); + let barrier_clone = Arc::clone(&barrier); + pool.try_run("BAZ", move || { + barrier_clone.wait(); + }) + .unwrap(); + barrier.wait(); + } + + #[test] + fn actually_runs_2() { + let pool = mock_pool(4, 4, 0, 0, 1); + let barrier = Arc::new(std::sync::Barrier::new(2)); + let barrier_clone = Arc::clone(&barrier); + pool.spawn("BAZ", move || { + barrier_clone.wait(); + }); + barrier.wait(); + } + + #[test] + fn actually_waits() { + use std::sync::{ + atomic::{AtomicBool, Ordering}, + Barrier, + }; + let pool = mock_pool(4, 4, 4, 0, 1); + let ops_i_ran = Arc::new(AtomicBool::new(false)); + let ops_i_ran_clone = Arc::clone(&ops_i_ran); + let barrier = Arc::new(Barrier::new(2)); + let barrier_clone = Arc::clone(&barrier); + let barrier2 = Arc::new(Barrier::new(2)); + let barrier2_clone = Arc::clone(&barrier2); + pool.try_run("FOO", move || { + barrier_clone.wait(); + }) + .unwrap(); + pool.spawn("FOO", move || { + ops_i_ran_clone.store(true, Ordering::SeqCst); + barrier2_clone.wait(); + }); + // in this case we have to sleep + std::thread::sleep(std::time::Duration::from_secs(1)); + assert!(!ops_i_ran.load(Ordering::SeqCst)); + // now finish the first job + barrier.wait(); + // now wait on the second job to be actually finished + barrier2.wait(); } } diff --git a/voxygen/src/ui/keyed_jobs.rs b/voxygen/src/ui/keyed_jobs.rs index 72f61b33b7..cacd5195f3 100644 --- a/voxygen/src/ui/keyed_jobs.rs +++ b/voxygen/src/ui/keyed_jobs.rs @@ -4,6 +4,7 @@ use std::{ hash::Hash, time::{Duration, Instant}, }; +use tracing::warn; enum KeyedJobTask { Pending(Instant, Option), @@ -60,7 +61,9 @@ impl Key let fresh = now - *at < KEYEDJOBS_GC_INTERVAL; if !fresh { if let Some(job) = job.take() { - pool.cancel(job) + if let Err(e) = pool.cancel(job) { + warn!(?e, "failed to cancel job"); + } } } fresh