diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs index 5175e4b069..fa77b03ba6 100644 --- a/common/src/slowjob.rs +++ b/common/src/slowjob.rs @@ -3,13 +3,9 @@ use rayon::ThreadPool; use std::{ collections::VecDeque, sync::{Arc, Mutex}, - time::{Duration, Instant} }; use tracing::{error, warn}; -const LAST_JOBS_METRICS: usize = 32; -const FAIR_TIME_INTERVAL: Duration = Duration::from_millis(1000); - /// Provides a Wrapper around rayon threadpool to execute slow-jobs. /// slow means, the job doesn't need to not complete within the same tick. /// DO NOT USE I/O blocking jobs, but only CPU heavy jobs. @@ -28,7 +24,7 @@ const FAIR_TIME_INTERVAL: Duration = Duration::from_millis(1000); /// hit. /// - remaining capacities are spread in relation to their limit. e.g. a /// configuration with double the limit will be sheduled to spawn double the -/// tasks. +/// tasks, starting by a round robin. /// /// ## States /// - queued @@ -62,6 +58,7 @@ struct InternalSlowJobPool { next_id: u64, queue: HashMap>, configs: HashMap, + last_spawned_configs: Vec, global_spawned_and_running: u64, global_limit: u64, threadpool: Arc, @@ -72,8 +69,6 @@ struct InternalSlowJobPool { struct Config { local_limit: u64, local_spawned_and_running: u64, - /// hold the start and time of the last LAST_JOBS_METRICS jobs - last_jobs: VecDeque<(std::time::Instant, Option)>, } struct Queue { @@ -112,6 +107,7 @@ impl InternalSlowJobPool { next_id: 0, queue: HashMap::new(), configs: HashMap::new(), + last_spawned_configs: Vec::new(), global_spawned_and_running: 0, global_limit: global_limit.max(1), threadpool, @@ -131,13 +127,14 @@ impl InternalSlowJobPool { mut queued: HashMap<&String, u64>, mut limit: usize, ) -> Vec { + let mut roundrobin = self.last_spawned_configs.clone(); let mut result = vec![]; let spawned = self .configs .iter() .map(|(n, c)| (n, c.local_spawned_and_running)) .collect::>(); - let mut queried_caped = self + let mut queried_capped = self .configs .iter() .map(|(n, c)| { @@ -151,24 +148,31 @@ impl InternalSlowJobPool { ) }) .collect::>(); - // grab all configs that are queued and not running - for (&n, c) in queued.iter_mut() { - if *c > 0 && spawned.get(n).cloned().unwrap_or(0) == 0 { - result.push(n.clone()); - *c -= 1; - limit -= 1; - queried_caped.get_mut(&n).map(|v| *v -= 1); - if limit == 0 { - return result; + // grab all configs that are queued and not running. in roundrobin order + for n in roundrobin.clone().into_iter() { + if let Some(c) = queued.get_mut(&n) { + if *c > 0 && spawned.get(&n).cloned().unwrap_or(0) == 0 { + result.push(n.clone()); + *c -= 1; + limit -= 1; + queried_capped.get_mut(&n).map(|v| *v -= 1); + roundrobin + .iter() + .position(|e| e == &n) + .map(|i| roundrobin.remove(i)); + roundrobin.push(n); + if limit == 0 { + return result; + } } } } - //schedule rest - let total_limit = queried_caped.values().sum::() as f32; + //schedule rest based on their possible limites, don't use round robin here + let total_limit = queried_capped.values().sum::() as f32; if total_limit < f32::EPSILON { return result; } - let mut spawn_rates = queried_caped + let mut spawn_rates = queried_capped .iter() .map(|(&n, l)| (n, ((*l as f32 * limit as f32) / total_limit).min(*l as f32))) .collect::>(); @@ -231,6 +235,10 @@ impl InternalSlowJobPool { .entry(name.to_string()) .or_default() .push_back(queue); + debug_assert!( + self.configs.contains_key(name), + "Can't spawn unconfigured task!" + ); //spawn already queued self.spawn_queued(); SlowJob { @@ -243,7 +251,6 @@ impl InternalSlowJobPool { self.global_spawned_and_running -= 1; if let Some(c) = self.configs.get_mut(name) { c.local_spawned_and_running -= 1; - c.last_jobs. } else { warn!(?name, "sync_maintain on a no longer existing config"); } @@ -267,6 +274,11 @@ impl InternalSlowJobPool { .get_mut(&queue.name) .expect("cannot fire a unconfigured job") .local_spawned_and_running += 1; + self.last_spawned_configs + .iter() + .position(|e| e == &queue.name) + .map(|i| self.last_spawned_configs.remove(i)); + self.last_spawned_configs.push(queue.name.to_owned()); self.threadpool.spawn(queue.task); }, None => error!( @@ -300,9 +312,9 @@ impl SlowJobPool { let cnf = Config { local_limit: f(lock.global_limit).max(1), local_spawned_and_running: 0, - last_jobs: VecDeque::with_capacity(LAST_JOBS_METRICS), }; lock.configs.insert(name.to_owned(), cnf); + lock.last_spawned_configs.push(name.to_owned()); } /// spawn a new slow job on a certain NAME IF it can run immediately @@ -520,6 +532,54 @@ mod tests { assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 5); } + #[test] + fn roundrobin() { + let pool = mock_pool(4, 4, 2, 2, 0); + let queue_data = [("FOO", 5u64), ("BAR", 5u64)] + .iter() + .map(|(n, c)| ((*n).to_owned(), *c)) + .collect::>(); + let queued = queue_data + .iter() + .map(|(s, c)| (s, *c)) + .collect::>(); + // Spawn a FOO task. + pool.internal + .lock() + .unwrap() + .spawn("FOO", || println!("foo")); + // a barrier in f doesnt work as we need to wait for the cleanup + while pool.internal.lock().unwrap().global_spawned_and_running != 0 { + std::thread::yield_now(); + } + let result = pool + .internal + .lock() + .unwrap() + .calc_queued_order(queued.clone(), 1); + assert_eq!(result.len(), 1); + assert_eq!(result[0], "BAR"); + // keep order if no new is spawned + let result = pool + .internal + .lock() + .unwrap() + .calc_queued_order(queued.clone(), 1); + assert_eq!(result.len(), 1); + assert_eq!(result[0], "BAR"); + // spawn a BAR task + pool.internal + .lock() + .unwrap() + .spawn("BAR", || println!("bar")); + while pool.internal.lock().unwrap().global_spawned_and_running != 0 { + std::thread::yield_now(); + } + let result = pool.internal.lock().unwrap().calc_queued_order(queued, 1); + assert_eq!(result.len(), 1); + assert_eq!(result[0], "FOO"); + } + #[test] #[should_panic] fn unconfigured() { @@ -617,7 +677,7 @@ mod tests { }); // in this case we have to sleep std::thread::sleep(std::time::Duration::from_secs(1)); - assert_eq!(ops_i_ran.load(Ordering::SeqCst), false); + assert!(!ops_i_ran.load(Ordering::SeqCst)); // now finish the first job barrier.wait(); // now wait on the second job to be actually finished diff --git a/voxygen/src/ui/keyed_jobs.rs b/voxygen/src/ui/keyed_jobs.rs index 72f61b33b7..cacd5195f3 100644 --- a/voxygen/src/ui/keyed_jobs.rs +++ b/voxygen/src/ui/keyed_jobs.rs @@ -4,6 +4,7 @@ use std::{ hash::Hash, time::{Duration, Instant}, }; +use tracing::warn; enum KeyedJobTask { Pending(Instant, Option), @@ -60,7 +61,9 @@ impl Key let fresh = now - *at < KEYEDJOBS_GC_INTERVAL; if !fresh { if let Some(job) = job.take() { - pool.cancel(job) + if let Err(e) = pool.cancel(job) { + warn!(?e, "failed to cancel job"); + } } } fresh