Merge branch 'xMAC94x/flakytests' into 'master'

redo slowjobs, fixed #1056

Closes #1056

See merge request veloren/veloren!2063
This commit is contained in:
Marcel 2021-06-13 16:10:09 +00:00
commit 53378929e5
2 changed files with 587 additions and 288 deletions

View File

@ -1,9 +1,10 @@
use hashbrown::HashMap; use hashbrown::HashMap;
use rayon::ThreadPool; use rayon::ThreadPool;
use std::sync::{ use std::{
atomic::{AtomicI64, AtomicU64, Ordering}, collections::VecDeque,
Arc, RwLock, sync::{Arc, Mutex},
}; };
use tracing::{error, warn};
/// Provides a Wrapper around rayon threadpool to execute slow-jobs. /// Provides a Wrapper around rayon threadpool to execute slow-jobs.
/// slow means, the job doesn't need to not complete within the same tick. /// slow means, the job doesn't need to not complete within the same tick.
@ -11,11 +12,26 @@ use std::sync::{
/// Jobs run here, will reduce the ammount of threads rayon can use during the /// Jobs run here, will reduce the ammount of threads rayon can use during the
/// main tick. /// main tick.
/// ///
/// ## Configuration
/// This Pool allows you to configure certain names of jobs and assign them a /// This Pool allows you to configure certain names of jobs and assign them a
/// maximum number of threads # Example /// maximum number of threads # Example
/// Your system has 16 cores, you assign 12 cores for slow-jobs. /// Your system has 16 cores, you assign 12 cores for slow-jobs.
/// Then you can configure all jobs with the name `CHUNK_GENERATOR` to spawn on /// Then you can configure all jobs with the name `CHUNK_GENERATOR` to spawn on
/// max 50% (6 = cores) ```rust /// max 50% (6 = cores)
///
/// ## Spawn Order
/// - At least 1 job of a configuration is allowed to run if global limit isn't
/// hit.
/// - remaining capacities are spread in relation to their limit. e.g. a
/// configuration with double the limit will be sheduled to spawn double the
/// tasks, starting by a round robin.
///
/// ## States
/// - queued
/// - spawned
/// - started
/// - finished
/// ```
/// # use veloren_common::slowjob::SlowJobPool; /// # use veloren_common::slowjob::SlowJobPool;
/// # use std::sync::Arc; /// # use std::sync::Arc;
/// ///
@ -25,126 +41,264 @@ use std::sync::{
/// .unwrap(); /// .unwrap();
/// let pool = SlowJobPool::new(3, Arc::new(threadpool)); /// let pool = SlowJobPool::new(3, Arc::new(threadpool));
/// pool.configure("CHUNK_GENERATOR", |n| n / 2); /// pool.configure("CHUNK_GENERATOR", |n| n / 2);
/// pool.spawn("CHUNK_GENERATOR", move || println("this is a job")); /// pool.spawn("CHUNK_GENERATOR", move || println!("this is a job"));
/// ``` /// ```
#[derive(Clone)] #[derive(Clone)]
pub struct SlowJobPool { pub struct SlowJobPool {
internal: Arc<InternalSlowJobPool>, internal: Arc<Mutex<InternalSlowJobPool>>,
} }
#[derive(Debug)]
pub struct SlowJob { pub struct SlowJob {
name: String, name: String,
id: u64, id: u64,
} }
struct InternalSlowJobPool { struct InternalSlowJobPool {
next_id: Arc<AtomicU64>, next_id: u64,
queue: RwLock<HashMap<String, HashMap<u64, Queue>>>, queue: HashMap<String, VecDeque<Queue>>,
running_jobs: RwLock<HashMap<String, Arc<AtomicI64>>>, configs: HashMap<String, Config>,
configs: RwLock<HashMap<String, Config>>, last_spawned_configs: Vec<String>,
global_running_jobs: Arc<AtomicI64>, global_spawned_and_running: u64,
global_limit: u64, global_limit: u64,
threadpool: Arc<ThreadPool>, threadpool: Arc<ThreadPool>,
internal: Option<Arc<Mutex<Self>>>,
} }
#[derive(Debug)]
struct Config { struct Config {
max_local: u64, local_limit: u64,
spawned_total: Arc<AtomicU64>, local_spawned_and_running: u64,
} }
struct Queue { struct Queue {
id: u64,
name: String,
task: Box<dyn FnOnce() + Send + Sync + 'static>, task: Box<dyn FnOnce() + Send + Sync + 'static>,
spawned_total: Arc<AtomicU64>, }
local_running_jobs: Arc<AtomicI64>,
impl Queue {
fn new<F>(name: &str, id: u64, internal: &Arc<Mutex<InternalSlowJobPool>>, f: F) -> Self
where
F: FnOnce() + Send + Sync + 'static,
{
let internal = Arc::clone(&internal);
let name_cloned = name.to_owned();
Self {
id,
name: name.to_owned(),
task: Box::new(move || {
common_base::prof_span!(_guard, &name_cloned);
f();
// directly maintain the next task afterwards
{
let mut lock = internal.lock().expect("slowjob lock poisoned");
lock.finish(&name_cloned);
lock.spawn_queued();
}
}),
}
}
} }
impl InternalSlowJobPool { impl InternalSlowJobPool {
pub fn new(global_limit: u64, threadpool: Arc<ThreadPool>) -> Self { pub fn new(global_limit: u64, threadpool: Arc<ThreadPool>) -> Arc<Mutex<Self>> {
Self { let link = Arc::new(Mutex::new(Self {
next_id: Arc::new(AtomicU64::new(0)), next_id: 0,
queue: RwLock::new(HashMap::new()), queue: HashMap::new(),
running_jobs: RwLock::new(HashMap::new()), configs: HashMap::new(),
configs: RwLock::new(HashMap::new()), last_spawned_configs: Vec::new(),
global_running_jobs: Arc::new(AtomicI64::new(0)), global_spawned_and_running: 0,
global_limit: global_limit.max(1), global_limit: global_limit.max(1),
threadpool, threadpool,
} internal: None,
}));
let link_clone = Arc::clone(&link);
link.lock()
.expect("poisoned on InternalSlowJobPool::new")
.internal = Some(link_clone);
link
} }
fn maintain(&self) { /// returns order of configuration which are queued next
let jobs_available = fn calc_queued_order(
self.global_limit as i64 - self.global_running_jobs.load(Ordering::Relaxed); &self,
if jobs_available < 0 { mut queued: HashMap<&String, u64>,
tracing::warn!(?jobs_available, "Some math is wrong in slowjob code"); mut limit: usize,
} ) -> Vec<String> {
if jobs_available <= 0 { let mut roundrobin = self.last_spawned_configs.clone();
// we run at limit, can't spawn let mut result = vec![];
return; let spawned = self
} .configs
let possible = { .iter()
let lock = self.queue.read().unwrap(); .map(|(n, c)| (n, c.local_spawned_and_running))
lock.iter() .collect::<HashMap<_, u64>>();
.map(|(name, queues)| { let mut queried_capped = self
if !queues.is_empty() { .configs
Some(name.clone()) .iter()
} else { .map(|(n, c)| {
None (
} n,
}) queued
.flatten() .get(&n)
.collect::<Vec<_>>() .cloned()
}; .unwrap_or(0)
.min(c.local_limit - c.local_spawned_and_running),
let mut possible_total = { )
let mut possible = possible; })
let lock = self.configs.read().unwrap(); .collect::<HashMap<_, _>>();
possible // grab all configs that are queued and not running. in roundrobin order
.drain(..) for n in roundrobin.clone().into_iter() {
.map(|name| { if let Some(c) = queued.get_mut(&n) {
let c = lock.get(&name).unwrap(); if *c > 0 && spawned.get(&n).cloned().unwrap_or(0) == 0 {
( result.push(n.clone());
name, *c -= 1;
c.spawned_total.load(Ordering::Relaxed) / c.max_local, limit -= 1;
c.max_local, queried_capped.get_mut(&n).map(|v| *v -= 1);
) roundrobin
}) .iter()
.collect::<Vec<_>>() .position(|e| e == &n)
}; .map(|i| roundrobin.remove(i));
possible_total.sort_by_key(|(_, i, _)| *i); roundrobin.push(n);
if limit == 0 {
let mut lock = self.queue.write().unwrap(); return result;
for i in 0..jobs_available as usize {
if let Some((name, _, max)) = possible_total.get(i) {
if let Some(map) = lock.get_mut(name) {
let firstkey = match map.keys().next() {
Some(k) => *k,
None => continue,
};
if let Some(queue) = map.remove(&firstkey) {
if queue.local_running_jobs.load(Ordering::Relaxed) < *max as i64 {
self.fire(queue);
} else {
map.insert(firstkey, queue);
}
} }
} }
} }
} }
//schedule rest based on their possible limites, don't use round robin here
let total_limit = queried_capped.values().sum::<u64>() as f32;
if total_limit < f32::EPSILON {
return result;
}
let mut spawn_rates = queried_capped
.iter()
.map(|(&n, l)| (n, ((*l as f32 * limit as f32) / total_limit).min(*l as f32)))
.collect::<Vec<_>>();
while limit > 0 {
spawn_rates.sort_by(|(_, a), (_, b)| {
if b < a {
core::cmp::Ordering::Less
} else if (b - a).abs() < f32::EPSILON {
core::cmp::Ordering::Equal
} else {
core::cmp::Ordering::Greater
}
});
match spawn_rates.first_mut() {
Some((n, r)) => {
if *r > f32::EPSILON {
result.push(n.clone());
limit -= 1;
*r -= 1.0;
} else {
break;
}
},
None => break,
}
}
result
} }
fn fire(&self, queue: Queue) { fn can_spawn(&self, name: &str) -> bool {
queue.spawned_total.fetch_add(1, Ordering::Relaxed); let queued = self
queue.local_running_jobs.fetch_add(1, Ordering::Relaxed); .queue
self.global_running_jobs.fetch_add(1, Ordering::Relaxed); .iter()
self.threadpool.spawn(queue.task); .map(|(n, m)| (n, m.len() as u64))
.collect::<HashMap<_, u64>>();
let mut to_be_queued = queued.clone();
let name = name.to_owned();
*to_be_queued.entry(&name).or_default() += 1;
let limit = (self.global_limit - self.global_spawned_and_running) as usize;
// calculate to_be_queued first
let to_be_queued_order = self.calc_queued_order(to_be_queued, limit);
let queued_order = self.calc_queued_order(queued, limit);
// if its queued one time more then its okay to spawn
let to_be_queued_cnt = to_be_queued_order
.into_iter()
.filter(|n| n == &name)
.count();
let queued_cnt = queued_order.into_iter().filter(|n| n == &name).count();
to_be_queued_cnt > queued_cnt
}
pub fn spawn<F>(&mut self, name: &str, f: F) -> SlowJob
where
F: FnOnce() + Send + Sync + 'static,
{
let id = self.next_id;
self.next_id += 1;
let queue = Queue::new(name, id, self.internal.as_ref().expect("internal empty"), f);
self.queue
.entry(name.to_string())
.or_default()
.push_back(queue);
debug_assert!(
self.configs.contains_key(name),
"Can't spawn unconfigured task!"
);
//spawn already queued
self.spawn_queued();
SlowJob {
name: name.to_string(),
id,
}
}
fn finish(&mut self, name: &str) {
self.global_spawned_and_running -= 1;
if let Some(c) = self.configs.get_mut(name) {
c.local_spawned_and_running -= 1;
} else {
warn!(?name, "sync_maintain on a no longer existing config");
}
}
fn spawn_queued(&mut self) {
let queued = self
.queue
.iter()
.map(|(n, m)| (n, m.len() as u64))
.collect::<HashMap<_, u64>>();
let limit = self.global_limit as usize;
let queued_order = self.calc_queued_order(queued, limit);
for name in queued_order.into_iter() {
match self.queue.get_mut(&name) {
Some(deque) => match deque.pop_front() {
Some(queue) => {
//fire
self.global_spawned_and_running += 1;
self.configs
.get_mut(&queue.name)
.expect("cannot fire a unconfigured job")
.local_spawned_and_running += 1;
self.last_spawned_configs
.iter()
.position(|e| e == &queue.name)
.map(|i| self.last_spawned_configs.remove(i));
self.last_spawned_configs.push(queue.name.to_owned());
self.threadpool.spawn(queue.task);
},
None => error!(
"internal calculation is wrong, we extected a schedulable job to be \
present in the queue"
),
},
None => error!(
"internal calculation is wrong, we marked a queue as schedulable which \
doesn't exist"
),
}
}
} }
} }
impl SlowJobPool { impl SlowJobPool {
pub fn new(global_limit: u64, threadpool: Arc<ThreadPool>) -> Self { pub fn new(global_limit: u64, threadpool: Arc<ThreadPool>) -> Self {
Self { Self {
internal: Arc::new(InternalSlowJobPool::new(global_limit, threadpool)), internal: InternalSlowJobPool::new(global_limit, threadpool),
} }
} }
@ -154,237 +308,379 @@ impl SlowJobPool {
where where
F: Fn(u64) -> u64, F: Fn(u64) -> u64,
{ {
let mut lock = self.internal.lock().expect("lock poisoned while configure");
let cnf = Config { let cnf = Config {
max_local: f(self.internal.global_limit).max(1), local_limit: f(lock.global_limit).max(1),
spawned_total: Arc::new(AtomicU64::new(0)), local_spawned_and_running: 0,
}; };
let mut lock = self.internal.configs.write().unwrap(); lock.configs.insert(name.to_owned(), cnf);
lock.insert(name.to_string(), cnf); lock.last_spawned_configs.push(name.to_owned());
}
/// spawn a new slow job on a certain NAME IF it can run immediately
#[allow(clippy::result_unit_err)]
pub fn try_run<F>(&self, name: &str, f: F) -> Result<SlowJob, ()>
where
F: FnOnce() + Send + Sync + 'static,
{
let mut lock = self.internal.lock().expect("lock poisoned while try_run");
//spawn already queued
lock.spawn_queued();
if lock.can_spawn(name) {
Ok(lock.spawn(name, f))
} else {
Err(())
}
} }
/// spawn a new slow job on a certain NAME
pub fn spawn<F>(&self, name: &str, f: F) -> SlowJob pub fn spawn<F>(&self, name: &str, f: F) -> SlowJob
where where
F: FnOnce() + Send + Sync + 'static, F: FnOnce() + Send + Sync + 'static,
{ {
let id = self.internal.next_id.fetch_add(1, Ordering::Relaxed);
self.internal self.internal
.queue .lock()
.write() .expect("lock poisoned while spawn")
.unwrap() .spawn(name, f)
.entry(name.to_string())
.or_default()
.insert(id, self.queue(name, f));
self.maintain();
SlowJob {
name: name.to_string(),
id,
}
} }
fn queue<F>(&self, name: &str, f: F) -> Queue pub fn cancel(&self, job: SlowJob) -> Result<(), SlowJob> {
where let mut lock = self.internal.lock().expect("lock poisoned while cancel");
F: FnOnce() + Send + Sync + 'static, if let Some(m) = lock.queue.get_mut(&job.name) {
{ let p = match m.iter().position(|p| p.id == job.id) {
let internal = Arc::clone(&self.internal); Some(p) => p,
let spawned_total = Arc::clone( None => return Err(job),
&self };
.internal if m.remove(p).is_some() {
.configs return Ok(());
.read() }
.unwrap()
.get(name)
.expect("can't spawn a non-configued slowjob")
.spawned_total,
);
let local_running_jobs_clone = {
let mut lock = self.internal.running_jobs.write().unwrap();
Arc::clone(&lock.entry(name.to_string()).or_default())
};
let local_running_jobs = Arc::clone(&local_running_jobs_clone);
let global_running_jobs_clone = Arc::clone(&self.internal.global_running_jobs);
let _name_clones = name.to_string();
Queue {
task: Box::new(move || {
common_base::prof_span!(_guard, &_name_clones);
f();
local_running_jobs_clone.fetch_sub(1, Ordering::Relaxed);
global_running_jobs_clone.fetch_sub(1, Ordering::Relaxed);
// directly maintain the next task afterwards
internal.maintain();
}),
spawned_total,
local_running_jobs,
} }
Err(job)
} }
pub fn cancel(&self, job: SlowJob) {
let mut lock = self.internal.queue.write().unwrap();
if let Some(map) = lock.get_mut(&job.name) {
map.remove(&job.id);
}
}
fn maintain(&self) { self.internal.maintain() }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use std::{
sync::Mutex,
time::{Duration, Instant},
};
fn mock_fn( #[allow(clippy::blacklisted_name)]
name: &str, fn mock_pool(
start_time: &Arc<Mutex<Option<Instant>>>, pool_threads: usize,
done: &Arc<AtomicI64>, global_threads: u64,
) -> impl FnOnce() { foo: u64,
let name = name.to_string(); bar: u64,
let start_time = Arc::clone(start_time); baz: u64,
let done = Arc::clone(done); ) -> SlowJobPool {
move || { let threadpool = rayon::ThreadPoolBuilder::new()
println!("Start {}", name); .num_threads(pool_threads)
*start_time.lock().unwrap() = Some(Instant::now()); .build()
std::thread::sleep(Duration::from_millis(500)); .unwrap();
done.fetch_add(1, Ordering::Relaxed); let pool = SlowJobPool::new(global_threads, Arc::new(threadpool));
println!("Finished {}", name); if foo != 0 {
pool.configure("FOO", |x| x / foo);
} }
if bar != 0 {
pool.configure("BAR", |x| x / bar);
}
if baz != 0 {
pool.configure("BAZ", |x| x / baz);
}
pool
} }
#[test] #[test]
fn global_limit() { fn simple_queue() {
let threadpool = rayon::ThreadPoolBuilder::new() let pool = mock_pool(4, 4, 1, 0, 0);
.num_threads(4) let internal = pool.internal.lock().unwrap();
.build() let queue_data = [("FOO", 1u64)]
.unwrap(); .iter()
let pool = SlowJobPool::new(3, Arc::new(threadpool)); .map(|(n, c)| ((*n).to_owned(), *c))
pool.configure("FOO", |_| 1000); .collect::<Vec<_>>();
let start = Instant::now(); let queued = queue_data
let f1 = Arc::new(Mutex::new(None)); .iter()
let f2 = Arc::new(Mutex::new(None)); .map(|(s, c)| (s, *c))
let f3 = Arc::new(Mutex::new(None)); .collect::<HashMap<_, _>>();
let f4 = Arc::new(Mutex::new(None)); let result = internal.calc_queued_order(queued, 4);
let f5 = Arc::new(Mutex::new(None)); assert_eq!(result.len(), 1);
let f6 = Arc::new(Mutex::new(None)); assert_eq!(result[0], "FOO");
let f7 = Arc::new(Mutex::new(None));
let done = Arc::new(AtomicI64::new(0));
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
pool.spawn("FOO", mock_fn("foo3", &f3, &done));
std::thread::sleep(Duration::from_millis(300));
pool.spawn("FOO", mock_fn("foo4", &f4, &done));
pool.spawn("FOO", mock_fn("foo5", &f5, &done));
pool.spawn("FOO", mock_fn("foo6", &f6, &done));
std::thread::sleep(Duration::from_millis(300));
pool.spawn("FOO", mock_fn("foo7", &f7, &done));
std::thread::sleep(Duration::from_secs(1));
let measure = |a: Arc<Mutex<Option<Instant>>>, s: Instant| {
a.lock().unwrap().unwrap().duration_since(s).as_millis()
};
let f1 = measure(f1, start);
let f2 = measure(f2, start);
let f3 = measure(f3, start);
let f4 = measure(f4, start);
let f5 = measure(f5, start);
let f6 = measure(f6, start);
let f7 = measure(f7, start);
assert_eq!(done.load(Ordering::Relaxed), 7);
assert!(f1 < 500);
assert!(f2 < 500);
assert!(f3 < 500);
assert!(f4 < 1000);
assert!(f5 < 1000);
assert!(f6 < 1000);
assert!(f7 < 1500);
} }
#[test] #[test]
fn local_limit() { fn multiple_queue() {
let threadpool = rayon::ThreadPoolBuilder::new() let pool = mock_pool(4, 4, 1, 0, 0);
.num_threads(4) let internal = pool.internal.lock().unwrap();
.build() let queue_data = [("FOO", 2u64)]
.unwrap(); .iter()
let pool = SlowJobPool::new(100, Arc::new(threadpool)); .map(|(n, c)| ((*n).to_owned(), *c))
pool.configure("FOO", |_| 3); .collect::<Vec<_>>();
let start = Instant::now(); let queued = queue_data
let f1 = Arc::new(Mutex::new(None)); .iter()
let f2 = Arc::new(Mutex::new(None)); .map(|(s, c)| (s, *c))
let f3 = Arc::new(Mutex::new(None)); .collect::<HashMap<_, _>>();
let f4 = Arc::new(Mutex::new(None)); let result = internal.calc_queued_order(queued, 4);
let f5 = Arc::new(Mutex::new(None)); assert_eq!(result.len(), 2);
let f6 = Arc::new(Mutex::new(None)); assert_eq!(result[0], "FOO");
let f7 = Arc::new(Mutex::new(None)); assert_eq!(result[1], "FOO");
let done = Arc::new(AtomicI64::new(0));
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
pool.spawn("FOO", mock_fn("foo3", &f3, &done));
std::thread::sleep(Duration::from_millis(300));
pool.spawn("FOO", mock_fn("foo4", &f4, &done));
pool.spawn("FOO", mock_fn("foo5", &f5, &done));
pool.spawn("FOO", mock_fn("foo6", &f6, &done));
std::thread::sleep(Duration::from_millis(300));
pool.spawn("FOO", mock_fn("foo7", &f7, &done));
std::thread::sleep(Duration::from_secs(1));
let measure = |a: Arc<Mutex<Option<Instant>>>, s: Instant| {
a.lock().unwrap().unwrap().duration_since(s).as_millis()
};
let f1 = measure(f1, start);
let f2 = measure(f2, start);
let f3 = measure(f3, start);
let f4 = measure(f4, start);
let f5 = measure(f5, start);
let f6 = measure(f6, start);
let f7 = measure(f7, start);
assert_eq!(done.load(Ordering::Relaxed), 7);
assert!(f1 < 500);
assert!(f2 < 500);
assert!(f3 < 500);
assert!(f4 < 1000);
assert!(f5 < 1000);
assert!(f6 < 1000);
assert!(f7 < 1500);
} }
#[test] #[test]
fn pool() { fn limit_queue() {
let threadpool = rayon::ThreadPoolBuilder::new() let pool = mock_pool(5, 5, 1, 0, 0);
.num_threads(2) let internal = pool.internal.lock().unwrap();
.build() let queue_data = [("FOO", 80u64)]
.unwrap(); .iter()
let pool = SlowJobPool::new(2, Arc::new(threadpool)); .map(|(n, c)| ((*n).to_owned(), *c))
pool.configure("FOO", |n| n); .collect::<Vec<_>>();
pool.configure("BAR", |n| n / 2); let queued = queue_data
let start = Instant::now(); .iter()
let f1 = Arc::new(Mutex::new(None)); .map(|(s, c)| (s, *c))
let f2 = Arc::new(Mutex::new(None)); .collect::<HashMap<_, _>>();
let b1 = Arc::new(Mutex::new(None)); let result = internal.calc_queued_order(queued, 4);
let b2 = Arc::new(Mutex::new(None)); assert_eq!(result.len(), 4);
let done = Arc::new(AtomicI64::new(0)); assert_eq!(result[0], "FOO");
pool.spawn("FOO", mock_fn("foo1", &f1, &done)); assert_eq!(result[1], "FOO");
pool.spawn("FOO", mock_fn("foo2", &f2, &done)); assert_eq!(result[2], "FOO");
std::thread::sleep(Duration::from_millis(1000)); assert_eq!(result[3], "FOO");
pool.spawn("BAR", mock_fn("bar1", &b1, &done)); }
pool.spawn("BAR", mock_fn("bar2", &b2, &done));
std::thread::sleep(Duration::from_secs(2)); #[test]
let measure = |a: Arc<Mutex<Option<Instant>>>, s: Instant| { fn simple_queue_2() {
a.lock().unwrap().unwrap().duration_since(s).as_millis() let pool = mock_pool(4, 4, 1, 1, 0);
let internal = pool.internal.lock().unwrap();
let queue_data = [("FOO", 1u64), ("BAR", 1u64)]
.iter()
.map(|(n, c)| ((*n).to_owned(), *c))
.collect::<Vec<_>>();
let queued = queue_data
.iter()
.map(|(s, c)| (s, *c))
.collect::<HashMap<_, _>>();
let result = internal.calc_queued_order(queued, 4);
assert_eq!(result.len(), 2);
assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 1);
assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 1);
}
#[test]
fn multiple_queue_3() {
let pool = mock_pool(4, 4, 1, 1, 0);
let internal = pool.internal.lock().unwrap();
let queue_data = [("FOO", 2u64), ("BAR", 2u64)]
.iter()
.map(|(n, c)| ((*n).to_owned(), *c))
.collect::<Vec<_>>();
let queued = queue_data
.iter()
.map(|(s, c)| (s, *c))
.collect::<HashMap<_, _>>();
let result = internal.calc_queued_order(queued, 4);
assert_eq!(result.len(), 4);
assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2);
assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 2);
}
#[test]
fn multiple_queue_4() {
let pool = mock_pool(4, 4, 2, 1, 0);
let internal = pool.internal.lock().unwrap();
let queue_data = [("FOO", 3u64), ("BAR", 3u64)]
.iter()
.map(|(n, c)| ((*n).to_owned(), *c))
.collect::<Vec<_>>();
let queued = queue_data
.iter()
.map(|(s, c)| (s, *c))
.collect::<HashMap<_, _>>();
let result = internal.calc_queued_order(queued, 4);
assert_eq!(result.len(), 4);
assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2);
assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 2);
}
#[test]
fn multiple_queue_5() {
let pool = mock_pool(4, 4, 2, 1, 0);
let internal = pool.internal.lock().unwrap();
let queue_data = [("FOO", 5u64), ("BAR", 5u64)]
.iter()
.map(|(n, c)| ((*n).to_owned(), *c))
.collect::<Vec<_>>();
let queued = queue_data
.iter()
.map(|(s, c)| (s, *c))
.collect::<HashMap<_, _>>();
let result = internal.calc_queued_order(queued, 5);
assert_eq!(result.len(), 5);
assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2);
assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 3);
}
#[test]
fn multiple_queue_6() {
let pool = mock_pool(40, 40, 2, 1, 0);
let internal = pool.internal.lock().unwrap();
let queue_data = [("FOO", 5u64), ("BAR", 5u64)]
.iter()
.map(|(n, c)| ((*n).to_owned(), *c))
.collect::<Vec<_>>();
let queued = queue_data
.iter()
.map(|(s, c)| (s, *c))
.collect::<HashMap<_, _>>();
let result = internal.calc_queued_order(queued, 11);
assert_eq!(result.len(), 10);
assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 5);
assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 5);
}
#[test]
fn roundrobin() {
let pool = mock_pool(4, 4, 2, 2, 0);
let queue_data = [("FOO", 5u64), ("BAR", 5u64)]
.iter()
.map(|(n, c)| ((*n).to_owned(), *c))
.collect::<Vec<_>>();
let queued = queue_data
.iter()
.map(|(s, c)| (s, *c))
.collect::<HashMap<_, _>>();
// Spawn a FOO task.
pool.internal
.lock()
.unwrap()
.spawn("FOO", || println!("foo"));
// a barrier in f doesnt work as we need to wait for the cleanup
while pool.internal.lock().unwrap().global_spawned_and_running != 0 {
std::thread::yield_now();
}
let result = pool
.internal
.lock()
.unwrap()
.calc_queued_order(queued.clone(), 1);
assert_eq!(result.len(), 1);
assert_eq!(result[0], "BAR");
// keep order if no new is spawned
let result = pool
.internal
.lock()
.unwrap()
.calc_queued_order(queued.clone(), 1);
assert_eq!(result.len(), 1);
assert_eq!(result[0], "BAR");
// spawn a BAR task
pool.internal
.lock()
.unwrap()
.spawn("BAR", || println!("bar"));
while pool.internal.lock().unwrap().global_spawned_and_running != 0 {
std::thread::yield_now();
}
let result = pool.internal.lock().unwrap().calc_queued_order(queued, 1);
assert_eq!(result.len(), 1);
assert_eq!(result[0], "FOO");
}
#[test]
#[should_panic]
fn unconfigured() {
let pool = mock_pool(4, 4, 2, 1, 0);
let mut internal = pool.internal.lock().unwrap();
internal.spawn("UNCONFIGURED", || println!());
}
#[test]
fn correct_spawn_doesnt_panic() {
let pool = mock_pool(4, 4, 2, 1, 0);
let mut internal = pool.internal.lock().unwrap();
internal.spawn("FOO", || println!("foo"));
internal.spawn("BAR", || println!("bar"));
}
#[test]
fn can_spawn() {
let pool = mock_pool(4, 4, 2, 1, 0);
let internal = pool.internal.lock().unwrap();
assert!(internal.can_spawn("FOO"));
assert!(internal.can_spawn("BAR"));
}
#[test]
fn try_run_works() {
let pool = mock_pool(4, 4, 2, 1, 0);
pool.try_run("FOO", || println!("foo")).unwrap();
pool.try_run("BAR", || println!("bar")).unwrap();
}
#[test]
fn try_run_exhausted() {
use std::{thread::sleep, time::Duration};
let pool = mock_pool(8, 8, 4, 2, 0);
let func = || loop {
sleep(Duration::from_secs(1))
}; };
let f1 = measure(f1, start); pool.try_run("FOO", func).unwrap();
let f2 = measure(f2, start); pool.try_run("BAR", func).unwrap();
let b1 = measure(b1, start); pool.try_run("FOO", func).unwrap();
let b2 = measure(b2, start); pool.try_run("BAR", func).unwrap();
// Expect: pool.try_run("FOO", func).unwrap_err();
// [F1, F2] pool.try_run("BAR", func).unwrap();
// [B1] pool.try_run("FOO", func).unwrap_err();
// [B2] pool.try_run("BAR", func).unwrap();
assert_eq!(done.load(Ordering::Relaxed), 4); pool.try_run("FOO", func).unwrap_err();
assert!(f1 < 500); pool.try_run("BAR", func).unwrap_err();
assert!(f2 < 500); pool.try_run("FOO", func).unwrap_err();
println!("b1 {}", b1); }
println!("b2 {}", b2);
assert!((1000..1500).contains(&b1)); #[test]
assert!((1500..2000).contains(&b2)); fn actually_runs_1() {
let pool = mock_pool(4, 4, 0, 0, 1);
let barrier = Arc::new(std::sync::Barrier::new(2));
let barrier_clone = Arc::clone(&barrier);
pool.try_run("BAZ", move || {
barrier_clone.wait();
})
.unwrap();
barrier.wait();
}
#[test]
fn actually_runs_2() {
let pool = mock_pool(4, 4, 0, 0, 1);
let barrier = Arc::new(std::sync::Barrier::new(2));
let barrier_clone = Arc::clone(&barrier);
pool.spawn("BAZ", move || {
barrier_clone.wait();
});
barrier.wait();
}
#[test]
fn actually_waits() {
use std::sync::{
atomic::{AtomicBool, Ordering},
Barrier,
};
let pool = mock_pool(4, 4, 4, 0, 1);
let ops_i_ran = Arc::new(AtomicBool::new(false));
let ops_i_ran_clone = Arc::clone(&ops_i_ran);
let barrier = Arc::new(Barrier::new(2));
let barrier_clone = Arc::clone(&barrier);
let barrier2 = Arc::new(Barrier::new(2));
let barrier2_clone = Arc::clone(&barrier2);
pool.try_run("FOO", move || {
barrier_clone.wait();
})
.unwrap();
pool.spawn("FOO", move || {
ops_i_ran_clone.store(true, Ordering::SeqCst);
barrier2_clone.wait();
});
// in this case we have to sleep
std::thread::sleep(std::time::Duration::from_secs(1));
assert!(!ops_i_ran.load(Ordering::SeqCst));
// now finish the first job
barrier.wait();
// now wait on the second job to be actually finished
barrier2.wait();
} }
} }

View File

@ -4,6 +4,7 @@ use std::{
hash::Hash, hash::Hash,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tracing::warn;
enum KeyedJobTask<V> { enum KeyedJobTask<V> {
Pending(Instant, Option<SlowJob>), Pending(Instant, Option<SlowJob>),
@ -60,7 +61,9 @@ impl<K: Hash + Eq + Send + Sync + 'static + Clone, V: Send + Sync + 'static> Key
let fresh = now - *at < KEYEDJOBS_GC_INTERVAL; let fresh = now - *at < KEYEDJOBS_GC_INTERVAL;
if !fresh { if !fresh {
if let Some(job) = job.take() { if let Some(job) = job.take() {
pool.cancel(job) if let Err(e) = pool.cancel(job) {
warn!(?e, "failed to cancel job");
}
} }
} }
fresh fresh