use hashbrown::HashMap; use rayon::ThreadPool; use std::{ collections::VecDeque, sync::{Arc, Mutex}, time::Instant, }; use tracing::{error, warn}; /// Provides a Wrapper around rayon threadpool to execute slow-jobs. /// slow means, the job doesn't need to not complete within the same tick. /// DO NOT USE I/O blocking jobs, but only CPU heavy jobs. /// Jobs run here, will reduce the ammount of threads rayon can use during the /// main tick. /// /// ## Configuration /// This Pool allows you to configure certain names of jobs and assign them a /// maximum number of threads # Example /// Your system has 16 cores, you assign 12 cores for slow-jobs. /// Then you can configure all jobs with the name `CHUNK_GENERATOR` to spawn on /// max 50% (6 = cores) /// /// ## Spawn Order /// - At least 1 job of a configuration is allowed to run if global limit isn't /// hit. /// - remaining capacities are spread in relation to their limit. e.g. a /// configuration with double the limit will be sheduled to spawn double the /// tasks, starting by a round robin. /// /// ## States /// - queued /// - spawned /// - started /// - finished /// ``` /// # use veloren_common::slowjob::SlowJobPool; /// # use std::sync::Arc; /// /// let threadpool = rayon::ThreadPoolBuilder::new() /// .num_threads(16) /// .build() /// .unwrap(); /// let pool = SlowJobPool::new(3, 10, Arc::new(threadpool)); /// pool.configure("CHUNK_GENERATOR", |n| n / 2); /// pool.spawn("CHUNK_GENERATOR", move || println!("this is a job")); /// ``` #[derive(Clone)] pub struct SlowJobPool { internal: Arc>, } #[derive(Debug)] pub struct SlowJob { name: String, id: u64, } struct InternalSlowJobPool { next_id: u64, queue: HashMap>, configs: HashMap, last_spawned_configs: Vec, global_spawned_and_running: u64, global_limit: u64, jobs_metrics_cnt: usize, jobs_metrics: HashMap>, threadpool: Arc, internal: Option>>, } #[derive(Debug)] struct Config { local_limit: u64, local_spawned_and_running: u64, } struct Queue { id: u64, name: String, task: Box, } pub struct JobMetrics { pub queue_created: Instant, pub execution_start: Instant, pub execution_end: Instant, } impl Queue { fn new(name: &str, id: u64, internal: &Arc>, f: F) -> Self where F: FnOnce() + Send + Sync + 'static, { let internal = Arc::clone(internal); let name_cloned = name.to_owned(); let queue_created = Instant::now(); Self { id, name: name.to_owned(), task: Box::new(move || { common_base::prof_span_alloc!(_guard, &name_cloned); let execution_start = Instant::now(); f(); let execution_end = Instant::now(); let metrics = JobMetrics { queue_created, execution_start, execution_end, }; // directly maintain the next task afterwards { let mut lock = internal.lock().expect("slowjob lock poisoned"); lock.finish(&name_cloned, metrics); lock.spawn_queued(); } }), } } } impl InternalSlowJobPool { pub fn new( global_limit: u64, jobs_metrics_cnt: usize, threadpool: Arc, ) -> Arc> { let link = Arc::new(Mutex::new(Self { next_id: 0, queue: HashMap::new(), configs: HashMap::new(), last_spawned_configs: Vec::new(), global_spawned_and_running: 0, global_limit: global_limit.max(1), jobs_metrics_cnt, jobs_metrics: HashMap::new(), threadpool, internal: None, })); let link_clone = Arc::clone(&link); link.lock() .expect("poisoned on InternalSlowJobPool::new") .internal = Some(link_clone); link } /// returns order of configuration which are queued next fn calc_queued_order( &self, mut queued: HashMap<&String, u64>, mut limit: usize, ) -> Vec { let mut roundrobin = self.last_spawned_configs.clone(); let mut result = vec![]; let spawned = self .configs .iter() .map(|(n, c)| (n, c.local_spawned_and_running)) .collect::>(); let mut queried_capped = self .configs .iter() .map(|(n, c)| { ( n, queued .get(&n) .cloned() .unwrap_or(0) .min(c.local_limit - c.local_spawned_and_running), ) }) .collect::>(); // grab all configs that are queued and not running. in roundrobin order for n in roundrobin.clone().into_iter() { if let Some(c) = queued.get_mut(&n) { if *c > 0 && spawned.get(&n).cloned().unwrap_or(0) == 0 { result.push(n.clone()); *c -= 1; limit -= 1; queried_capped.get_mut(&n).map(|v| *v -= 1); roundrobin .iter() .position(|e| e == &n) .map(|i| roundrobin.remove(i)); roundrobin.push(n); if limit == 0 { return result; } } } } //schedule rest based on their possible limites, don't use round robin here let total_limit = queried_capped.values().sum::() as f32; if total_limit < f32::EPSILON { return result; } let mut spawn_rates = queried_capped .iter() .map(|(&n, l)| (n, ((*l as f32 * limit as f32) / total_limit).min(*l as f32))) .collect::>(); while limit > 0 { spawn_rates.sort_by(|(_, a), (_, b)| { if b < a { core::cmp::Ordering::Less } else if (b - a).abs() < f32::EPSILON { core::cmp::Ordering::Equal } else { core::cmp::Ordering::Greater } }); match spawn_rates.first_mut() { Some((n, r)) => { if *r > f32::EPSILON { result.push(n.clone()); limit -= 1; *r -= 1.0; } else { break; } }, None => break, } } result } fn can_spawn(&self, name: &str) -> bool { let queued = self .queue .iter() .map(|(n, m)| (n, m.len() as u64)) .collect::>(); let mut to_be_queued = queued.clone(); let name = name.to_owned(); *to_be_queued.entry(&name).or_default() += 1; let limit = (self.global_limit - self.global_spawned_and_running) as usize; // calculate to_be_queued first let to_be_queued_order = self.calc_queued_order(to_be_queued, limit); let queued_order = self.calc_queued_order(queued, limit); // if its queued one time more then its okay to spawn let to_be_queued_cnt = to_be_queued_order .into_iter() .filter(|n| n == &name) .count(); let queued_cnt = queued_order.into_iter().filter(|n| n == &name).count(); to_be_queued_cnt > queued_cnt } pub fn spawn(&mut self, name: &str, f: F) -> SlowJob where F: FnOnce() + Send + Sync + 'static, { let id = self.next_id; self.next_id += 1; let queue = Queue::new(name, id, self.internal.as_ref().expect("internal empty"), f); self.queue .entry(name.to_string()) .or_default() .push_back(queue); debug_assert!( self.configs.contains_key(name), "Can't spawn unconfigured task!" ); //spawn already queued self.spawn_queued(); SlowJob { name: name.to_string(), id, } } fn finish(&mut self, name: &str, metrics: JobMetrics) { let metric = self.jobs_metrics.entry(name.to_string()).or_default(); if metric.len() < self.jobs_metrics_cnt { metric.push(metrics); } self.global_spawned_and_running -= 1; if let Some(c) = self.configs.get_mut(name) { c.local_spawned_and_running -= 1; } else { warn!(?name, "sync_maintain on a no longer existing config"); } } fn spawn_queued(&mut self) { let queued = self .queue .iter() .map(|(n, m)| (n, m.len() as u64)) .collect::>(); let limit = self.global_limit as usize; let queued_order = self.calc_queued_order(queued, limit); for name in queued_order.into_iter() { match self.queue.get_mut(&name) { Some(deque) => match deque.pop_front() { Some(queue) => { //fire self.global_spawned_and_running += 1; self.configs .get_mut(&queue.name) .expect("cannot fire a unconfigured job") .local_spawned_and_running += 1; self.last_spawned_configs .iter() .position(|e| e == &queue.name) .map(|i| self.last_spawned_configs.remove(i)); self.last_spawned_configs.push(queue.name.to_owned()); self.threadpool.spawn(queue.task); }, None => error!( "internal calculation is wrong, we extected a schedulable job to be \ present in the queue" ), }, None => error!( "internal calculation is wrong, we marked a queue as schedulable which \ doesn't exist" ), } } } pub fn take_metrics(&mut self) -> HashMap> { core::mem::take(&mut self.jobs_metrics) } } impl SlowJobPool { pub fn new(global_limit: u64, jobs_metrics_cnt: usize, threadpool: Arc) -> Self { Self { internal: InternalSlowJobPool::new(global_limit, jobs_metrics_cnt, threadpool), } } /// configure a NAME to spawn up to f(n) threads, depending on how many /// threads we globally have available pub fn configure(&self, name: &str, f: F) where F: Fn(u64) -> u64, { let mut lock = self.internal.lock().expect("lock poisoned while configure"); let cnf = Config { local_limit: f(lock.global_limit).max(1), local_spawned_and_running: 0, }; lock.configs.insert(name.to_owned(), cnf); lock.last_spawned_configs.push(name.to_owned()); } /// spawn a new slow job on a certain NAME IF it can run immediately #[allow(clippy::result_unit_err)] pub fn try_run(&self, name: &str, f: F) -> Result where F: FnOnce() + Send + Sync + 'static, { let mut lock = self.internal.lock().expect("lock poisoned while try_run"); //spawn already queued lock.spawn_queued(); if lock.can_spawn(name) { Ok(lock.spawn(name, f)) } else { Err(()) } } pub fn spawn(&self, name: &str, f: F) -> SlowJob where F: FnOnce() + Send + Sync + 'static, { self.internal .lock() .expect("lock poisoned while spawn") .spawn(name, f) } pub fn cancel(&self, job: SlowJob) -> Result<(), SlowJob> { let mut lock = self.internal.lock().expect("lock poisoned while cancel"); if let Some(m) = lock.queue.get_mut(&job.name) { let p = match m.iter().position(|p| p.id == job.id) { Some(p) => p, None => return Err(job), }; if m.remove(p).is_some() { return Ok(()); } } Err(job) } pub fn take_metrics(&self) -> HashMap> { self.internal .lock() .expect("lock poisoned while take_metrics") .take_metrics() } } #[cfg(test)] mod tests { use std::sync::Barrier; use std::sync::atomic::AtomicU64; use super::*; fn mock_pool( pool_threads: usize, global_threads: u64, metrics: usize, foo: u64, bar: u64, baz: u64, ) -> SlowJobPool { let threadpool = rayon::ThreadPoolBuilder::new() .num_threads(pool_threads) .build() .unwrap(); let pool = SlowJobPool::new(global_threads, metrics, Arc::new(threadpool)); if foo != 0 { pool.configure("FOO", |x| x / foo); } if bar != 0 { pool.configure("BAR", |x| x / bar); } if baz != 0 { pool.configure("BAZ", |x| x / baz); } pool } #[test] fn simple_queue() { let pool = mock_pool(4, 4, 0, 1, 0, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 1u64)] .iter() .map(|(n, c)| ((*n).to_owned(), *c)) .collect::>(); let queued = queue_data .iter() .map(|(s, c)| (s, *c)) .collect::>(); let result = internal.calc_queued_order(queued, 4); assert_eq!(result.len(), 1); assert_eq!(result[0], "FOO"); } #[test] fn multiple_queue() { let pool = mock_pool(4, 4, 0, 1, 0, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 2u64)] .iter() .map(|(n, c)| ((*n).to_owned(), *c)) .collect::>(); let queued = queue_data .iter() .map(|(s, c)| (s, *c)) .collect::>(); let result = internal.calc_queued_order(queued, 4); assert_eq!(result.len(), 2); assert_eq!(result[0], "FOO"); assert_eq!(result[1], "FOO"); } #[test] fn limit_queue() { let pool = mock_pool(5, 5, 0, 1, 0, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 80u64)] .iter() .map(|(n, c)| ((*n).to_owned(), *c)) .collect::>(); let queued = queue_data .iter() .map(|(s, c)| (s, *c)) .collect::>(); let result = internal.calc_queued_order(queued, 4); assert_eq!(result.len(), 4); assert_eq!(result[0], "FOO"); assert_eq!(result[1], "FOO"); assert_eq!(result[2], "FOO"); assert_eq!(result[3], "FOO"); } #[test] fn simple_queue_2() { let pool = mock_pool(4, 4, 0, 1, 1, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 1u64), ("BAR", 1u64)] .iter() .map(|(n, c)| ((*n).to_owned(), *c)) .collect::>(); let queued = queue_data .iter() .map(|(s, c)| (s, *c)) .collect::>(); let result = internal.calc_queued_order(queued, 4); assert_eq!(result.len(), 2); assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 1); assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 1); } #[test] fn multiple_queue_3() { let pool = mock_pool(4, 4, 0, 1, 1, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 2u64), ("BAR", 2u64)] .iter() .map(|(n, c)| ((*n).to_owned(), *c)) .collect::>(); let queued = queue_data .iter() .map(|(s, c)| (s, *c)) .collect::>(); let result = internal.calc_queued_order(queued, 4); assert_eq!(result.len(), 4); assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2); assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 2); } #[test] fn multiple_queue_4() { let pool = mock_pool(4, 4, 0, 2, 1, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 3u64), ("BAR", 3u64)] .iter() .map(|(n, c)| ((*n).to_owned(), *c)) .collect::>(); let queued = queue_data .iter() .map(|(s, c)| (s, *c)) .collect::>(); let result = internal.calc_queued_order(queued, 4); assert_eq!(result.len(), 4); assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2); assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 2); } #[test] fn multiple_queue_5() { let pool = mock_pool(4, 4, 0, 2, 1, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 5u64), ("BAR", 5u64)] .iter() .map(|(n, c)| ((*n).to_owned(), *c)) .collect::>(); let queued = queue_data .iter() .map(|(s, c)| (s, *c)) .collect::>(); let result = internal.calc_queued_order(queued, 5); assert_eq!(result.len(), 5); assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2); assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 3); } #[test] fn multiple_queue_6() { let pool = mock_pool(40, 40, 0, 2, 1, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 5u64), ("BAR", 5u64)] .iter() .map(|(n, c)| ((*n).to_owned(), *c)) .collect::>(); let queued = queue_data .iter() .map(|(s, c)| (s, *c)) .collect::>(); let result = internal.calc_queued_order(queued, 11); assert_eq!(result.len(), 10); assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 5); assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 5); } #[test] fn roundrobin() { let pool = mock_pool(4, 4, 0, 2, 2, 0); let queue_data = [("FOO", 5u64), ("BAR", 5u64)] .iter() .map(|(n, c)| ((*n).to_owned(), *c)) .collect::>(); let queued = queue_data .iter() .map(|(s, c)| (s, *c)) .collect::>(); // Spawn a FOO task. pool.internal .lock() .unwrap() .spawn("FOO", || println!("foo")); // a barrier in f doesnt work as we need to wait for the cleanup while pool.internal.lock().unwrap().global_spawned_and_running != 0 { std::thread::yield_now(); } let result = pool .internal .lock() .unwrap() .calc_queued_order(queued.clone(), 1); assert_eq!(result.len(), 1); assert_eq!(result[0], "BAR"); // keep order if no new is spawned let result = pool .internal .lock() .unwrap() .calc_queued_order(queued.clone(), 1); assert_eq!(result.len(), 1); assert_eq!(result[0], "BAR"); // spawn a BAR task pool.internal .lock() .unwrap() .spawn("BAR", || println!("bar")); while pool.internal.lock().unwrap().global_spawned_and_running != 0 { std::thread::yield_now(); } let result = pool.internal.lock().unwrap().calc_queued_order(queued, 1); assert_eq!(result.len(), 1); assert_eq!(result[0], "FOO"); } #[test] #[should_panic] fn unconfigured() { let pool = mock_pool(4, 4, 0, 2, 1, 0); let mut internal = pool.internal.lock().unwrap(); internal.spawn("UNCONFIGURED", || println!()); } #[test] fn correct_spawn_doesnt_panic() { let pool = mock_pool(4, 4, 0, 2, 1, 0); let mut internal = pool.internal.lock().unwrap(); internal.spawn("FOO", || println!("foo")); internal.spawn("BAR", || println!("bar")); } #[test] fn can_spawn() { let pool = mock_pool(4, 4, 0, 2, 1, 0); let internal = pool.internal.lock().unwrap(); assert!(internal.can_spawn("FOO")); assert!(internal.can_spawn("BAR")); } #[test] fn try_run_works() { let pool = mock_pool(4, 4, 0, 2, 1, 0); pool.try_run("FOO", || println!("foo")).unwrap(); pool.try_run("BAR", || println!("bar")).unwrap(); } #[test] fn try_run_exhausted() { use std::{thread::sleep, time::Duration}; let pool = mock_pool(8, 8, 0, 4, 2, 0); let func = || loop { sleep(Duration::from_secs(1)) }; pool.try_run("FOO", func).unwrap(); pool.try_run("BAR", func).unwrap(); pool.try_run("FOO", func).unwrap(); pool.try_run("BAR", func).unwrap(); pool.try_run("FOO", func).unwrap_err(); pool.try_run("BAR", func).unwrap(); pool.try_run("FOO", func).unwrap_err(); pool.try_run("BAR", func).unwrap(); pool.try_run("FOO", func).unwrap_err(); pool.try_run("BAR", func).unwrap_err(); pool.try_run("FOO", func).unwrap_err(); } #[test] fn actually_runs_1() { let pool = mock_pool(4, 4, 0, 0, 0, 1); let barrier = Arc::new(std::sync::Barrier::new(2)); let barrier_clone = Arc::clone(&barrier); pool.try_run("BAZ", move || { barrier_clone.wait(); }) .unwrap(); barrier.wait(); } #[test] fn actually_runs_2() { let pool = mock_pool(4, 4, 0, 0, 0, 1); let barrier = Arc::new(std::sync::Barrier::new(2)); let barrier_clone = Arc::clone(&barrier); pool.spawn("BAZ", move || { barrier_clone.wait(); }); barrier.wait(); } #[test] fn actually_waits() { use std::sync::{ atomic::{AtomicBool, Ordering}, Barrier, }; let pool = mock_pool(4, 4, 0, 4, 0, 1); let ops_i_ran = Arc::new(AtomicBool::new(false)); let ops_i_ran_clone = Arc::clone(&ops_i_ran); let barrier = Arc::new(Barrier::new(2)); let barrier_clone = Arc::clone(&barrier); let barrier2 = Arc::new(Barrier::new(2)); let barrier2_clone = Arc::clone(&barrier2); pool.try_run("FOO", move || { barrier_clone.wait(); }) .unwrap(); pool.spawn("FOO", move || { ops_i_ran_clone.store(true, Ordering::SeqCst); barrier2_clone.wait(); }); // in this case we have to sleep std::thread::sleep(std::time::Duration::from_secs(1)); assert!(!ops_i_ran.load(Ordering::SeqCst)); // now finish the first job barrier.wait(); // now wait on the second job to be actually finished barrier2.wait(); } #[test] fn verify_metrics() { use std::sync::Barrier; let pool = mock_pool(4, 4, 2, 1, 0, 4); let barrier = Arc::new(Barrier::new(5)); for name in &["FOO", "BAZ", "FOO", "FOO"] { let barrier_clone = Arc::clone(&barrier); pool.spawn(name, move || { barrier_clone.wait(); }); } // now finish all jobs barrier.wait(); // in this case we have to sleep to give it some time to store all the metrics std::thread::sleep(std::time::Duration::from_secs(2)); let metrics = pool.take_metrics(); let foo = metrics.get("FOO").expect("FOO doesn't exist in metrics"); //its limited to 2, even though we had 3 jobs assert_eq!(foo.len(), 2); assert!(metrics.get("BAR").is_none()); let baz = metrics.get("BAZ").expect("BAZ doesn't exist in metrics"); assert_eq!(baz.len(), 1); } fn busy_work(ms: u64) { let x = Instant::now(); while x.elapsed() < std::time::Duration::from_millis(ms) { } } use std::time::Duration; use std::sync::atomic::Ordering; fn work_barrier2(counter: Arc, ms: u64) -> () { println!(".{}..", ms); busy_work(ms); println!(".{}..Done", ms); counter.fetch_add(1, Ordering::SeqCst); } fn work_barrier(counter: &Arc, ms: u64) -> impl std::ops::FnOnce() -> () { let counter = Arc::clone(&counter); println!("Create work_barrier"); move || work_barrier2(counter, ms) } #[test] fn ffffff() { let threadpool = Arc::new(rayon::ThreadPoolBuilder::new() .num_threads(2) .build() .unwrap()); let pool = SlowJobPool::new(2, 100, threadpool.clone()); pool.configure("FOO", |x| x ); pool.configure("BAR", |x| x / 2); pool.configure("BAZ", |_| 1); let pool2 = SlowJobPool::new(2, 100, threadpool.clone()); pool2.configure("FOO", |x| x ); pool2.configure("BAR", |x| x / 2); pool2.configure("BAZ", |_| 1); let counter = Arc::new(AtomicU64::new(0)); let start = Instant::now(); //pool.spawn("BAZ", work_barrier(&counter, 1000)); //for _ in 0..600 { // pool.spawn("FOO", work_barrier(&counter, 10)); //} threadpool.install(|| { use rayon::prelude::*; let mut iter = Vec::new(); for i in 0..1000 { iter.push((i, work_barrier(&counter, 10))); } let mut iter2 = Vec::new(); for i in 0..1000 { iter2.push((i, work_barrier(&counter, 10))); } iter.into_par_iter().map(|(i, task)| { task(); if i == 900 { println!("Spawning task"); pool.spawn("BAZ", work_barrier(&counter, 1000)); pool2.spawn("BAZ", work_barrier(&counter, 10000)); println!("Spawned tasks"); } if i == 999 { println!("The first ITER end"); } }).collect::>(); println!("The first ITER finished"); //pool2.spawn("BAZ", work_barrier(&counter, 1000)); //pool2.spawn("BAZ", work_barrier(&counter, 1000)); //pool2.spawn("BAZ", work_barrier(&counter, 1000)); iter2.into_par_iter().map(|(i, task)| { if i == 0 { println!("The second ITER started"); } task(); }).collect::>(); }); //pool.spawn("FOO", work_barrier(&barrier, 1)); println!("wait for test finish"); const TIMEOUT: Duration = Duration::from_secs(2); let mut last_n_time = (0, start); loop { let now = Instant::now(); let n = counter.load(Ordering::SeqCst); if n != last_n_time.0 { last_n_time = (n, now); } else if now.duration_since(last_n_time.1) > TIMEOUT { break; } } let d = last_n_time.1.duration_since(start); println!("=============="); println!("Time Passed: {}ms", d.as_millis()); println!("Jobs finished: {}", last_n_time.0); } }