implement a simple roundrobin to assure if multiple are spawned the older one has prio, spelling

This commit is contained in:
Marcel Märtens 2021-05-21 09:51:00 +02:00
parent e2a9128976
commit 34f5ff62d4
2 changed files with 87 additions and 24 deletions

View File

@ -3,13 +3,9 @@ use rayon::ThreadPool;
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
time::{Duration, Instant}
};
use tracing::{error, warn};
const LAST_JOBS_METRICS: usize = 32;
const FAIR_TIME_INTERVAL: Duration = Duration::from_millis(1000);
/// Provides a Wrapper around rayon threadpool to execute slow-jobs.
/// slow means, the job doesn't need to not complete within the same tick.
/// DO NOT USE I/O blocking jobs, but only CPU heavy jobs.
@ -28,7 +24,7 @@ const FAIR_TIME_INTERVAL: Duration = Duration::from_millis(1000);
/// hit.
/// - remaining capacities are spread in relation to their limit. e.g. a
/// configuration with double the limit will be sheduled to spawn double the
/// tasks.
/// tasks, starting by a round robin.
///
/// ## States
/// - queued
@ -62,6 +58,7 @@ struct InternalSlowJobPool {
next_id: u64,
queue: HashMap<String, VecDeque<Queue>>,
configs: HashMap<String, Config>,
last_spawned_configs: Vec<String>,
global_spawned_and_running: u64,
global_limit: u64,
threadpool: Arc<ThreadPool>,
@ -72,8 +69,6 @@ struct InternalSlowJobPool {
struct Config {
local_limit: u64,
local_spawned_and_running: u64,
/// hold the start and time of the last LAST_JOBS_METRICS jobs
last_jobs: VecDeque<(std::time::Instant, Option<std::time::Duration>)>,
}
struct Queue {
@ -112,6 +107,7 @@ impl InternalSlowJobPool {
next_id: 0,
queue: HashMap::new(),
configs: HashMap::new(),
last_spawned_configs: Vec::new(),
global_spawned_and_running: 0,
global_limit: global_limit.max(1),
threadpool,
@ -131,13 +127,14 @@ impl InternalSlowJobPool {
mut queued: HashMap<&String, u64>,
mut limit: usize,
) -> Vec<String> {
let mut roundrobin = self.last_spawned_configs.clone();
let mut result = vec![];
let spawned = self
.configs
.iter()
.map(|(n, c)| (n, c.local_spawned_and_running))
.collect::<HashMap<_, u64>>();
let mut queried_caped = self
let mut queried_capped = self
.configs
.iter()
.map(|(n, c)| {
@ -151,24 +148,31 @@ impl InternalSlowJobPool {
)
})
.collect::<HashMap<_, _>>();
// grab all configs that are queued and not running
for (&n, c) in queued.iter_mut() {
if *c > 0 && spawned.get(n).cloned().unwrap_or(0) == 0 {
result.push(n.clone());
*c -= 1;
limit -= 1;
queried_caped.get_mut(&n).map(|v| *v -= 1);
if limit == 0 {
return result;
// grab all configs that are queued and not running. in roundrobin order
for n in roundrobin.clone().into_iter() {
if let Some(c) = queued.get_mut(&n) {
if *c > 0 && spawned.get(&n).cloned().unwrap_or(0) == 0 {
result.push(n.clone());
*c -= 1;
limit -= 1;
queried_capped.get_mut(&n).map(|v| *v -= 1);
roundrobin
.iter()
.position(|e| e == &n)
.map(|i| roundrobin.remove(i));
roundrobin.push(n);
if limit == 0 {
return result;
}
}
}
}
//schedule rest
let total_limit = queried_caped.values().sum::<u64>() as f32;
//schedule rest based on their possible limites, don't use round robin here
let total_limit = queried_capped.values().sum::<u64>() as f32;
if total_limit < f32::EPSILON {
return result;
}
let mut spawn_rates = queried_caped
let mut spawn_rates = queried_capped
.iter()
.map(|(&n, l)| (n, ((*l as f32 * limit as f32) / total_limit).min(*l as f32)))
.collect::<Vec<_>>();
@ -231,6 +235,10 @@ impl InternalSlowJobPool {
.entry(name.to_string())
.or_default()
.push_back(queue);
debug_assert!(
self.configs.contains_key(name),
"Can't spawn unconfigured task!"
);
//spawn already queued
self.spawn_queued();
SlowJob {
@ -243,7 +251,6 @@ impl InternalSlowJobPool {
self.global_spawned_and_running -= 1;
if let Some(c) = self.configs.get_mut(name) {
c.local_spawned_and_running -= 1;
c.last_jobs.
} else {
warn!(?name, "sync_maintain on a no longer existing config");
}
@ -267,6 +274,11 @@ impl InternalSlowJobPool {
.get_mut(&queue.name)
.expect("cannot fire a unconfigured job")
.local_spawned_and_running += 1;
self.last_spawned_configs
.iter()
.position(|e| e == &queue.name)
.map(|i| self.last_spawned_configs.remove(i));
self.last_spawned_configs.push(queue.name.to_owned());
self.threadpool.spawn(queue.task);
},
None => error!(
@ -300,9 +312,9 @@ impl SlowJobPool {
let cnf = Config {
local_limit: f(lock.global_limit).max(1),
local_spawned_and_running: 0,
last_jobs: VecDeque::with_capacity(LAST_JOBS_METRICS),
};
lock.configs.insert(name.to_owned(), cnf);
lock.last_spawned_configs.push(name.to_owned());
}
/// spawn a new slow job on a certain NAME IF it can run immediately
@ -520,6 +532,54 @@ mod tests {
assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 5);
}
#[test]
fn roundrobin() {
let pool = mock_pool(4, 4, 2, 2, 0);
let queue_data = [("FOO", 5u64), ("BAR", 5u64)]
.iter()
.map(|(n, c)| ((*n).to_owned(), *c))
.collect::<Vec<_>>();
let queued = queue_data
.iter()
.map(|(s, c)| (s, *c))
.collect::<HashMap<_, _>>();
// Spawn a FOO task.
pool.internal
.lock()
.unwrap()
.spawn("FOO", || println!("foo"));
// a barrier in f doesnt work as we need to wait for the cleanup
while pool.internal.lock().unwrap().global_spawned_and_running != 0 {
std::thread::yield_now();
}
let result = pool
.internal
.lock()
.unwrap()
.calc_queued_order(queued.clone(), 1);
assert_eq!(result.len(), 1);
assert_eq!(result[0], "BAR");
// keep order if no new is spawned
let result = pool
.internal
.lock()
.unwrap()
.calc_queued_order(queued.clone(), 1);
assert_eq!(result.len(), 1);
assert_eq!(result[0], "BAR");
// spawn a BAR task
pool.internal
.lock()
.unwrap()
.spawn("BAR", || println!("bar"));
while pool.internal.lock().unwrap().global_spawned_and_running != 0 {
std::thread::yield_now();
}
let result = pool.internal.lock().unwrap().calc_queued_order(queued, 1);
assert_eq!(result.len(), 1);
assert_eq!(result[0], "FOO");
}
#[test]
#[should_panic]
fn unconfigured() {
@ -617,7 +677,7 @@ mod tests {
});
// in this case we have to sleep
std::thread::sleep(std::time::Duration::from_secs(1));
assert_eq!(ops_i_ran.load(Ordering::SeqCst), false);
assert!(!ops_i_ran.load(Ordering::SeqCst));
// now finish the first job
barrier.wait();
// now wait on the second job to be actually finished

View File

@ -4,6 +4,7 @@ use std::{
hash::Hash,
time::{Duration, Instant},
};
use tracing::warn;
enum KeyedJobTask<V> {
Pending(Instant, Option<SlowJob>),
@ -60,7 +61,9 @@ impl<K: Hash + Eq + Send + Sync + 'static + Clone, V: Send + Sync + 'static> Key
let fresh = now - *at < KEYEDJOBS_GC_INTERVAL;
if !fresh {
if let Some(job) = job.take() {
pool.cancel(job)
if let Err(e) = pool.cancel(job) {
warn!(?e, "failed to cancel job");
}
}
}
fresh