mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
redo slowjobs in order to have a try_run
fn
This commit is contained in:
parent
5f2b44002e
commit
e2a9128976
@ -1,9 +1,14 @@
|
||||
use hashbrown::HashMap;
|
||||
use rayon::ThreadPool;
|
||||
use std::sync::{
|
||||
atomic::{AtomicI64, AtomicU64, Ordering},
|
||||
Arc, RwLock,
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::{Arc, Mutex},
|
||||
time::{Duration, Instant}
|
||||
};
|
||||
use tracing::{error, warn};
|
||||
|
||||
const LAST_JOBS_METRICS: usize = 32;
|
||||
const FAIR_TIME_INTERVAL: Duration = Duration::from_millis(1000);
|
||||
|
||||
/// Provides a Wrapper around rayon threadpool to execute slow-jobs.
|
||||
/// slow means, the job doesn't need to not complete within the same tick.
|
||||
@ -11,11 +16,26 @@ use std::sync::{
|
||||
/// Jobs run here, will reduce the ammount of threads rayon can use during the
|
||||
/// main tick.
|
||||
///
|
||||
/// ## Configuration
|
||||
/// This Pool allows you to configure certain names of jobs and assign them a
|
||||
/// maximum number of threads # Example
|
||||
/// Your system has 16 cores, you assign 12 cores for slow-jobs.
|
||||
/// Then you can configure all jobs with the name `CHUNK_GENERATOR` to spawn on
|
||||
/// max 50% (6 = cores) ```rust
|
||||
/// max 50% (6 = cores)
|
||||
///
|
||||
/// ## Spawn Order
|
||||
/// - At least 1 job of a configuration is allowed to run if global limit isn't
|
||||
/// hit.
|
||||
/// - remaining capacities are spread in relation to their limit. e.g. a
|
||||
/// configuration with double the limit will be sheduled to spawn double the
|
||||
/// tasks.
|
||||
///
|
||||
/// ## States
|
||||
/// - queued
|
||||
/// - spawned
|
||||
/// - started
|
||||
/// - finished
|
||||
/// ```
|
||||
/// # use veloren_common::slowjob::SlowJobPool;
|
||||
/// # use std::sync::Arc;
|
||||
///
|
||||
@ -25,126 +45,248 @@ use std::sync::{
|
||||
/// .unwrap();
|
||||
/// let pool = SlowJobPool::new(3, Arc::new(threadpool));
|
||||
/// pool.configure("CHUNK_GENERATOR", |n| n / 2);
|
||||
/// pool.spawn("CHUNK_GENERATOR", move || println("this is a job"));
|
||||
/// pool.spawn("CHUNK_GENERATOR", move || println!("this is a job"));
|
||||
/// ```
|
||||
#[derive(Clone)]
|
||||
pub struct SlowJobPool {
|
||||
internal: Arc<InternalSlowJobPool>,
|
||||
internal: Arc<Mutex<InternalSlowJobPool>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SlowJob {
|
||||
name: String,
|
||||
id: u64,
|
||||
}
|
||||
|
||||
struct InternalSlowJobPool {
|
||||
next_id: Arc<AtomicU64>,
|
||||
queue: RwLock<HashMap<String, HashMap<u64, Queue>>>,
|
||||
running_jobs: RwLock<HashMap<String, Arc<AtomicI64>>>,
|
||||
configs: RwLock<HashMap<String, Config>>,
|
||||
global_running_jobs: Arc<AtomicI64>,
|
||||
next_id: u64,
|
||||
queue: HashMap<String, VecDeque<Queue>>,
|
||||
configs: HashMap<String, Config>,
|
||||
global_spawned_and_running: u64,
|
||||
global_limit: u64,
|
||||
threadpool: Arc<ThreadPool>,
|
||||
internal: Option<Arc<Mutex<Self>>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Config {
|
||||
max_local: u64,
|
||||
spawned_total: Arc<AtomicU64>,
|
||||
local_limit: u64,
|
||||
local_spawned_and_running: u64,
|
||||
/// hold the start and time of the last LAST_JOBS_METRICS jobs
|
||||
last_jobs: VecDeque<(std::time::Instant, Option<std::time::Duration>)>,
|
||||
}
|
||||
|
||||
struct Queue {
|
||||
id: u64,
|
||||
name: String,
|
||||
task: Box<dyn FnOnce() + Send + Sync + 'static>,
|
||||
spawned_total: Arc<AtomicU64>,
|
||||
local_running_jobs: Arc<AtomicI64>,
|
||||
}
|
||||
|
||||
impl Queue {
|
||||
fn new<F>(name: &str, id: u64, internal: &Arc<Mutex<InternalSlowJobPool>>, f: F) -> Self
|
||||
where
|
||||
F: FnOnce() + Send + Sync + 'static,
|
||||
{
|
||||
let internal = Arc::clone(&internal);
|
||||
let name_cloned = name.to_owned();
|
||||
Self {
|
||||
id,
|
||||
name: name.to_owned(),
|
||||
task: Box::new(move || {
|
||||
common_base::prof_span!(_guard, &name_cloned);
|
||||
f();
|
||||
// directly maintain the next task afterwards
|
||||
{
|
||||
let mut lock = internal.lock().expect("slowjob lock poisoned");
|
||||
lock.finish(&name_cloned);
|
||||
lock.spawn_queued();
|
||||
}
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InternalSlowJobPool {
|
||||
pub fn new(global_limit: u64, threadpool: Arc<ThreadPool>) -> Self {
|
||||
Self {
|
||||
next_id: Arc::new(AtomicU64::new(0)),
|
||||
queue: RwLock::new(HashMap::new()),
|
||||
running_jobs: RwLock::new(HashMap::new()),
|
||||
configs: RwLock::new(HashMap::new()),
|
||||
global_running_jobs: Arc::new(AtomicI64::new(0)),
|
||||
pub fn new(global_limit: u64, threadpool: Arc<ThreadPool>) -> Arc<Mutex<Self>> {
|
||||
let link = Arc::new(Mutex::new(Self {
|
||||
next_id: 0,
|
||||
queue: HashMap::new(),
|
||||
configs: HashMap::new(),
|
||||
global_spawned_and_running: 0,
|
||||
global_limit: global_limit.max(1),
|
||||
threadpool,
|
||||
}
|
||||
internal: None,
|
||||
}));
|
||||
|
||||
let link_clone = Arc::clone(&link);
|
||||
link.lock()
|
||||
.expect("poisoned on InternalSlowJobPool::new")
|
||||
.internal = Some(link_clone);
|
||||
link
|
||||
}
|
||||
|
||||
fn maintain(&self) {
|
||||
let jobs_available =
|
||||
self.global_limit as i64 - self.global_running_jobs.load(Ordering::SeqCst);
|
||||
if jobs_available < 0 {
|
||||
tracing::warn!(?jobs_available, "Some math is wrong in slowjob code");
|
||||
}
|
||||
if jobs_available <= 0 {
|
||||
// we run at limit, can't spawn
|
||||
return;
|
||||
}
|
||||
let possible = {
|
||||
let lock = self.queue.read().unwrap();
|
||||
lock.iter()
|
||||
.map(|(name, queues)| {
|
||||
if !queues.is_empty() {
|
||||
Some(name.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.flatten()
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
let mut possible_total = {
|
||||
let mut possible = possible;
|
||||
let lock = self.configs.read().unwrap();
|
||||
possible
|
||||
.drain(..)
|
||||
.map(|name| {
|
||||
let c = lock.get(&name).unwrap();
|
||||
/// returns order of configuration which are queued next
|
||||
fn calc_queued_order(
|
||||
&self,
|
||||
mut queued: HashMap<&String, u64>,
|
||||
mut limit: usize,
|
||||
) -> Vec<String> {
|
||||
let mut result = vec![];
|
||||
let spawned = self
|
||||
.configs
|
||||
.iter()
|
||||
.map(|(n, c)| (n, c.local_spawned_and_running))
|
||||
.collect::<HashMap<_, u64>>();
|
||||
let mut queried_caped = self
|
||||
.configs
|
||||
.iter()
|
||||
.map(|(n, c)| {
|
||||
(
|
||||
name,
|
||||
c.spawned_total.load(Ordering::SeqCst) / c.max_local,
|
||||
c.max_local,
|
||||
n,
|
||||
queued
|
||||
.get(&n)
|
||||
.cloned()
|
||||
.unwrap_or(0)
|
||||
.min(c.local_limit - c.local_spawned_and_running),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
possible_total.sort_by_key(|(_, i, _)| *i);
|
||||
|
||||
let mut lock = self.queue.write().unwrap();
|
||||
for i in 0..jobs_available as usize {
|
||||
if let Some((name, _, max)) = possible_total.get(i) {
|
||||
if let Some(map) = lock.get_mut(name) {
|
||||
let firstkey = match map.keys().next() {
|
||||
Some(k) => *k,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
if let Some(queue) = map.remove(&firstkey) {
|
||||
if queue.local_running_jobs.load(Ordering::SeqCst) < *max as i64 {
|
||||
self.fire(queue);
|
||||
.collect::<HashMap<_, _>>();
|
||||
// grab all configs that are queued and not running
|
||||
for (&n, c) in queued.iter_mut() {
|
||||
if *c > 0 && spawned.get(n).cloned().unwrap_or(0) == 0 {
|
||||
result.push(n.clone());
|
||||
*c -= 1;
|
||||
limit -= 1;
|
||||
queried_caped.get_mut(&n).map(|v| *v -= 1);
|
||||
if limit == 0 {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
//schedule rest
|
||||
let total_limit = queried_caped.values().sum::<u64>() as f32;
|
||||
if total_limit < f32::EPSILON {
|
||||
return result;
|
||||
}
|
||||
let mut spawn_rates = queried_caped
|
||||
.iter()
|
||||
.map(|(&n, l)| (n, ((*l as f32 * limit as f32) / total_limit).min(*l as f32)))
|
||||
.collect::<Vec<_>>();
|
||||
while limit > 0 {
|
||||
spawn_rates.sort_by(|(_, a), (_, b)| {
|
||||
if b < a {
|
||||
core::cmp::Ordering::Less
|
||||
} else if (b - a).abs() < f32::EPSILON {
|
||||
core::cmp::Ordering::Equal
|
||||
} else {
|
||||
map.insert(firstkey, queue);
|
||||
core::cmp::Ordering::Greater
|
||||
}
|
||||
});
|
||||
match spawn_rates.first_mut() {
|
||||
Some((n, r)) => {
|
||||
if *r > f32::EPSILON {
|
||||
result.push(n.clone());
|
||||
limit -= 1;
|
||||
*r -= 1.0;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
},
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
fn can_spawn(&self, name: &str) -> bool {
|
||||
let queued = self
|
||||
.queue
|
||||
.iter()
|
||||
.map(|(n, m)| (n, m.len() as u64))
|
||||
.collect::<HashMap<_, u64>>();
|
||||
let mut to_be_queued = queued.clone();
|
||||
let name = name.to_owned();
|
||||
*to_be_queued.entry(&name).or_default() += 1;
|
||||
let limit = (self.global_limit - self.global_spawned_and_running) as usize;
|
||||
// calculate to_be_queued first
|
||||
let to_be_queued_order = self.calc_queued_order(to_be_queued, limit);
|
||||
let queued_order = self.calc_queued_order(queued, limit);
|
||||
// if its queued one time more then its okay to spawn
|
||||
let to_be_queued_cnt = to_be_queued_order
|
||||
.into_iter()
|
||||
.filter(|n| n == &name)
|
||||
.count();
|
||||
let queued_cnt = queued_order.into_iter().filter(|n| n == &name).count();
|
||||
to_be_queued_cnt > queued_cnt
|
||||
}
|
||||
|
||||
pub fn spawn<F>(&mut self, name: &str, f: F) -> SlowJob
|
||||
where
|
||||
F: FnOnce() + Send + Sync + 'static,
|
||||
{
|
||||
let id = self.next_id;
|
||||
self.next_id += 1;
|
||||
let queue = Queue::new(name, id, self.internal.as_ref().expect("internal empty"), f);
|
||||
self.queue
|
||||
.entry(name.to_string())
|
||||
.or_default()
|
||||
.push_back(queue);
|
||||
//spawn already queued
|
||||
self.spawn_queued();
|
||||
SlowJob {
|
||||
name: name.to_string(),
|
||||
id,
|
||||
}
|
||||
}
|
||||
|
||||
fn fire(&self, queue: Queue) {
|
||||
queue.spawned_total.fetch_add(1, Ordering::SeqCst);
|
||||
queue.local_running_jobs.fetch_add(1, Ordering::SeqCst);
|
||||
self.global_running_jobs.fetch_add(1, Ordering::SeqCst);
|
||||
fn finish(&mut self, name: &str) {
|
||||
self.global_spawned_and_running -= 1;
|
||||
if let Some(c) = self.configs.get_mut(name) {
|
||||
c.local_spawned_and_running -= 1;
|
||||
c.last_jobs.
|
||||
} else {
|
||||
warn!(?name, "sync_maintain on a no longer existing config");
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_queued(&mut self) {
|
||||
let queued = self
|
||||
.queue
|
||||
.iter()
|
||||
.map(|(n, m)| (n, m.len() as u64))
|
||||
.collect::<HashMap<_, u64>>();
|
||||
let limit = self.global_limit as usize;
|
||||
let queued_order = self.calc_queued_order(queued, limit);
|
||||
for name in queued_order.into_iter() {
|
||||
match self.queue.get_mut(&name) {
|
||||
Some(deque) => match deque.pop_front() {
|
||||
Some(queue) => {
|
||||
//fire
|
||||
self.global_spawned_and_running += 1;
|
||||
self.configs
|
||||
.get_mut(&queue.name)
|
||||
.expect("cannot fire a unconfigured job")
|
||||
.local_spawned_and_running += 1;
|
||||
self.threadpool.spawn(queue.task);
|
||||
},
|
||||
None => error!(
|
||||
"internal calculation is wrong, we extected a schedulable job to be \
|
||||
present in the queue"
|
||||
),
|
||||
},
|
||||
None => error!(
|
||||
"internal calculation is wrong, we marked a queue as schedulable which \
|
||||
doesn't exist"
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SlowJobPool {
|
||||
pub fn new(global_limit: u64, threadpool: Arc<ThreadPool>) -> Self {
|
||||
Self {
|
||||
internal: Arc::new(InternalSlowJobPool::new(global_limit, threadpool)),
|
||||
internal: InternalSlowJobPool::new(global_limit, threadpool),
|
||||
}
|
||||
}
|
||||
|
||||
@ -154,256 +296,331 @@ impl SlowJobPool {
|
||||
where
|
||||
F: Fn(u64) -> u64,
|
||||
{
|
||||
let mut lock = self.internal.lock().expect("lock poisoned while configure");
|
||||
let cnf = Config {
|
||||
max_local: f(self.internal.global_limit).max(1),
|
||||
spawned_total: Arc::new(AtomicU64::new(0)),
|
||||
local_limit: f(lock.global_limit).max(1),
|
||||
local_spawned_and_running: 0,
|
||||
last_jobs: VecDeque::with_capacity(LAST_JOBS_METRICS),
|
||||
};
|
||||
let mut lock = self.internal.configs.write().unwrap();
|
||||
lock.insert(name.to_string(), cnf);
|
||||
lock.configs.insert(name.to_owned(), cnf);
|
||||
}
|
||||
|
||||
/// spawn a new slow job on a certain NAME IF it can run immediately
|
||||
#[allow(clippy::result_unit_err)]
|
||||
pub fn try_run<F>(&self, name: &str, f: F) -> Result<SlowJob, ()>
|
||||
where
|
||||
F: FnOnce() + Send + Sync + 'static,
|
||||
{
|
||||
let mut lock = self.internal.lock().expect("lock poisoned while try_run");
|
||||
//spawn already queued
|
||||
lock.spawn_queued();
|
||||
if lock.can_spawn(name) {
|
||||
Ok(lock.spawn(name, f))
|
||||
} else {
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
/// spawn a new slow job on a certain NAME
|
||||
pub fn spawn<F>(&self, name: &str, f: F) -> SlowJob
|
||||
where
|
||||
F: FnOnce() + Send + Sync + 'static,
|
||||
{
|
||||
let id = self.internal.next_id.fetch_add(1, Ordering::SeqCst);
|
||||
self.internal
|
||||
.queue
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(name.to_string())
|
||||
.or_default()
|
||||
.insert(id, self.queue(name, f));
|
||||
self.maintain();
|
||||
SlowJob {
|
||||
name: name.to_string(),
|
||||
id,
|
||||
}
|
||||
.lock()
|
||||
.expect("lock poisoned while spawn")
|
||||
.spawn(name, f)
|
||||
}
|
||||
|
||||
fn queue<F>(&self, name: &str, f: F) -> Queue
|
||||
where
|
||||
F: FnOnce() + Send + Sync + 'static,
|
||||
{
|
||||
let internal = Arc::clone(&self.internal);
|
||||
let spawned_total = Arc::clone(
|
||||
&self
|
||||
.internal
|
||||
.configs
|
||||
.read()
|
||||
.unwrap()
|
||||
.get(name)
|
||||
.expect("can't spawn a non-configued slowjob")
|
||||
.spawned_total,
|
||||
);
|
||||
let local_running_jobs_clone = {
|
||||
let mut lock = self.internal.running_jobs.write().unwrap();
|
||||
Arc::clone(&lock.entry(name.to_string()).or_default())
|
||||
pub fn cancel(&self, job: SlowJob) -> Result<(), SlowJob> {
|
||||
let mut lock = self.internal.lock().expect("lock poisoned while cancel");
|
||||
if let Some(m) = lock.queue.get_mut(&job.name) {
|
||||
let p = match m.iter().position(|p| p.id == job.id) {
|
||||
Some(p) => p,
|
||||
None => return Err(job),
|
||||
};
|
||||
let local_running_jobs = Arc::clone(&local_running_jobs_clone);
|
||||
let global_running_jobs_clone = Arc::clone(&self.internal.global_running_jobs);
|
||||
let _name_clones = name.to_string();
|
||||
Queue {
|
||||
task: Box::new(move || {
|
||||
common_base::prof_span!(_guard, &_name_clones);
|
||||
f();
|
||||
local_running_jobs_clone.fetch_sub(1, Ordering::SeqCst);
|
||||
global_running_jobs_clone.fetch_sub(1, Ordering::SeqCst);
|
||||
// directly maintain the next task afterwards
|
||||
internal.maintain();
|
||||
}),
|
||||
spawned_total,
|
||||
local_running_jobs,
|
||||
if m.remove(p).is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cancel(&self, job: SlowJob) {
|
||||
let mut lock = self.internal.queue.write().unwrap();
|
||||
if let Some(map) = lock.get_mut(&job.name) {
|
||||
map.remove(&job.id);
|
||||
Err(job)
|
||||
}
|
||||
}
|
||||
|
||||
fn maintain(&self) { self.internal.maintain() }
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::{
|
||||
sync::Mutex,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
fn mock_fn(
|
||||
name: &str,
|
||||
start_time: &Arc<Mutex<Option<Instant>>>,
|
||||
done: &Arc<AtomicI64>,
|
||||
) -> impl FnOnce() {
|
||||
let name = name.to_string();
|
||||
let start_time = Arc::clone(start_time);
|
||||
let done = Arc::clone(done);
|
||||
move || {
|
||||
println!("Start {}", name);
|
||||
*start_time.lock().unwrap() = Some(Instant::now());
|
||||
std::thread::sleep(Duration::from_millis(500));
|
||||
done.fetch_add(1, Ordering::Relaxed);
|
||||
println!("Finished {}", name);
|
||||
#[allow(clippy::blacklisted_name)]
|
||||
fn mock_pool(
|
||||
pool_threads: usize,
|
||||
global_threads: u64,
|
||||
foo: u64,
|
||||
bar: u64,
|
||||
baz: u64,
|
||||
) -> SlowJobPool {
|
||||
let threadpool = rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(pool_threads)
|
||||
.build()
|
||||
.unwrap();
|
||||
let pool = SlowJobPool::new(global_threads, Arc::new(threadpool));
|
||||
if foo != 0 {
|
||||
pool.configure("FOO", |x| x / foo);
|
||||
}
|
||||
if bar != 0 {
|
||||
pool.configure("BAR", |x| x / bar);
|
||||
}
|
||||
if baz != 0 {
|
||||
pool.configure("BAZ", |x| x / baz);
|
||||
}
|
||||
pool
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn global_limit() {
|
||||
let threadpool = rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(4)
|
||||
.build()
|
||||
.unwrap();
|
||||
let pool = SlowJobPool::new(3, Arc::new(threadpool));
|
||||
pool.configure("FOO", |_| 1000);
|
||||
let start = Instant::now();
|
||||
let f1 = Arc::new(Mutex::new(None));
|
||||
let f2 = Arc::new(Mutex::new(None));
|
||||
let f3 = Arc::new(Mutex::new(None));
|
||||
let f4 = Arc::new(Mutex::new(None));
|
||||
let f5 = Arc::new(Mutex::new(None));
|
||||
let f6 = Arc::new(Mutex::new(None));
|
||||
let f7 = Arc::new(Mutex::new(None));
|
||||
let done = Arc::new(AtomicI64::new(0));
|
||||
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
|
||||
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
|
||||
pool.spawn("FOO", mock_fn("foo3", &f3, &done));
|
||||
std::thread::sleep(Duration::from_millis(300));
|
||||
pool.spawn("FOO", mock_fn("foo4", &f4, &done));
|
||||
pool.spawn("FOO", mock_fn("foo5", &f5, &done));
|
||||
pool.spawn("FOO", mock_fn("foo6", &f6, &done));
|
||||
std::thread::sleep(Duration::from_millis(300));
|
||||
pool.spawn("FOO", mock_fn("foo7", &f7, &done));
|
||||
std::thread::sleep(Duration::from_secs(1));
|
||||
let measure = |a: Arc<Mutex<Option<Instant>>>, s: Instant| {
|
||||
a.lock().unwrap().unwrap().duration_since(s).as_millis()
|
||||
};
|
||||
let f1 = measure(f1, start);
|
||||
let f2 = measure(f2, start);
|
||||
let f3 = measure(f3, start);
|
||||
let f4 = measure(f4, start);
|
||||
let f5 = measure(f5, start);
|
||||
let f6 = measure(f6, start);
|
||||
let f7 = measure(f7, start);
|
||||
assert_eq!(done.load(Ordering::Relaxed), 7);
|
||||
// Just test relative times, not absolute
|
||||
assert!(f1 < f4);
|
||||
assert!(f1 < f5);
|
||||
assert!(f1 < f6);
|
||||
assert!(f1 < f7);
|
||||
assert!(f2 < f4);
|
||||
assert!(f2 < f5);
|
||||
assert!(f2 < f6);
|
||||
assert!(f2 < f7);
|
||||
assert!(f3 < f4);
|
||||
assert!(f3 < f5);
|
||||
assert!(f3 < f6);
|
||||
assert!(f3 < f7);
|
||||
assert!(f4 < f7);
|
||||
assert!(f5 < f7);
|
||||
assert!(f6 < f7);
|
||||
fn simple_queue() {
|
||||
let pool = mock_pool(4, 4, 1, 0, 0);
|
||||
let internal = pool.internal.lock().unwrap();
|
||||
let queue_data = [("FOO", 1u64)]
|
||||
.iter()
|
||||
.map(|(n, c)| ((*n).to_owned(), *c))
|
||||
.collect::<Vec<_>>();
|
||||
let queued = queue_data
|
||||
.iter()
|
||||
.map(|(s, c)| (s, *c))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let result = internal.calc_queued_order(queued, 4);
|
||||
assert_eq!(result.len(), 1);
|
||||
assert_eq!(result[0], "FOO");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_limit() {
|
||||
let threadpool = rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(4)
|
||||
.build()
|
||||
.unwrap();
|
||||
let pool = SlowJobPool::new(100, Arc::new(threadpool));
|
||||
pool.configure("FOO", |_| 3);
|
||||
let start = Instant::now();
|
||||
let f1 = Arc::new(Mutex::new(None));
|
||||
let f2 = Arc::new(Mutex::new(None));
|
||||
let f3 = Arc::new(Mutex::new(None));
|
||||
let f4 = Arc::new(Mutex::new(None));
|
||||
let f5 = Arc::new(Mutex::new(None));
|
||||
let f6 = Arc::new(Mutex::new(None));
|
||||
let f7 = Arc::new(Mutex::new(None));
|
||||
let done = Arc::new(AtomicI64::new(0));
|
||||
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
|
||||
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
|
||||
pool.spawn("FOO", mock_fn("foo3", &f3, &done));
|
||||
std::thread::sleep(Duration::from_millis(300));
|
||||
pool.spawn("FOO", mock_fn("foo4", &f4, &done));
|
||||
pool.spawn("FOO", mock_fn("foo5", &f5, &done));
|
||||
pool.spawn("FOO", mock_fn("foo6", &f6, &done));
|
||||
std::thread::sleep(Duration::from_millis(300));
|
||||
pool.spawn("FOO", mock_fn("foo7", &f7, &done));
|
||||
std::thread::sleep(Duration::from_secs(1));
|
||||
let measure = |a: Arc<Mutex<Option<Instant>>>, s: Instant| {
|
||||
a.lock().unwrap().unwrap().duration_since(s).as_millis()
|
||||
};
|
||||
let f1 = measure(f1, start);
|
||||
let f2 = measure(f2, start);
|
||||
let f3 = measure(f3, start);
|
||||
let f4 = measure(f4, start);
|
||||
let f5 = measure(f5, start);
|
||||
let f6 = measure(f6, start);
|
||||
let f7 = measure(f7, start);
|
||||
assert_eq!(done.load(Ordering::Relaxed), 7);
|
||||
assert!(f1 < f4);
|
||||
assert!(f1 < f5);
|
||||
assert!(f1 < f6);
|
||||
assert!(f1 < f7);
|
||||
assert!(f2 < f4);
|
||||
assert!(f2 < f5);
|
||||
assert!(f2 < f6);
|
||||
assert!(f2 < f7);
|
||||
assert!(f3 < f4);
|
||||
assert!(f3 < f5);
|
||||
assert!(f3 < f6);
|
||||
assert!(f3 < f7);
|
||||
assert!(f4 < f7);
|
||||
assert!(f5 < f7);
|
||||
assert!(f6 < f7);
|
||||
fn multiple_queue() {
|
||||
let pool = mock_pool(4, 4, 1, 0, 0);
|
||||
let internal = pool.internal.lock().unwrap();
|
||||
let queue_data = [("FOO", 2u64)]
|
||||
.iter()
|
||||
.map(|(n, c)| ((*n).to_owned(), *c))
|
||||
.collect::<Vec<_>>();
|
||||
let queued = queue_data
|
||||
.iter()
|
||||
.map(|(s, c)| (s, *c))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let result = internal.calc_queued_order(queued, 4);
|
||||
assert_eq!(result.len(), 2);
|
||||
assert_eq!(result[0], "FOO");
|
||||
assert_eq!(result[1], "FOO");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pool() {
|
||||
let threadpool = rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(2)
|
||||
.build()
|
||||
.unwrap();
|
||||
let pool = SlowJobPool::new(2, Arc::new(threadpool));
|
||||
pool.configure("FOO", |n| n);
|
||||
pool.configure("BAR", |n| n / 2);
|
||||
let f1 = Arc::new(Mutex::new(None));
|
||||
let f2 = Arc::new(Mutex::new(None));
|
||||
let b1 = Arc::new(Mutex::new(None));
|
||||
let b2 = Arc::new(Mutex::new(None));
|
||||
let done = Arc::new(AtomicI64::new(0));
|
||||
let start = Instant::now();
|
||||
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
|
||||
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
|
||||
std::thread::sleep(Duration::from_millis(1000));
|
||||
pool.spawn("BAR", mock_fn("bar1", &b1, &done));
|
||||
pool.spawn("BAR", mock_fn("bar2", &b2, &done));
|
||||
std::thread::sleep(Duration::from_secs(2));
|
||||
let measure = |a: Arc<Mutex<Option<Instant>>>, s: Instant| {
|
||||
a.lock().unwrap().unwrap().duration_since(s).as_millis()
|
||||
fn limit_queue() {
|
||||
let pool = mock_pool(5, 5, 1, 0, 0);
|
||||
let internal = pool.internal.lock().unwrap();
|
||||
let queue_data = [("FOO", 80u64)]
|
||||
.iter()
|
||||
.map(|(n, c)| ((*n).to_owned(), *c))
|
||||
.collect::<Vec<_>>();
|
||||
let queued = queue_data
|
||||
.iter()
|
||||
.map(|(s, c)| (s, *c))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let result = internal.calc_queued_order(queued, 4);
|
||||
assert_eq!(result.len(), 4);
|
||||
assert_eq!(result[0], "FOO");
|
||||
assert_eq!(result[1], "FOO");
|
||||
assert_eq!(result[2], "FOO");
|
||||
assert_eq!(result[3], "FOO");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn simple_queue_2() {
|
||||
let pool = mock_pool(4, 4, 1, 1, 0);
|
||||
let internal = pool.internal.lock().unwrap();
|
||||
let queue_data = [("FOO", 1u64), ("BAR", 1u64)]
|
||||
.iter()
|
||||
.map(|(n, c)| ((*n).to_owned(), *c))
|
||||
.collect::<Vec<_>>();
|
||||
let queued = queue_data
|
||||
.iter()
|
||||
.map(|(s, c)| (s, *c))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let result = internal.calc_queued_order(queued, 4);
|
||||
assert_eq!(result.len(), 2);
|
||||
assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 1);
|
||||
assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_queue_3() {
|
||||
let pool = mock_pool(4, 4, 1, 1, 0);
|
||||
let internal = pool.internal.lock().unwrap();
|
||||
let queue_data = [("FOO", 2u64), ("BAR", 2u64)]
|
||||
.iter()
|
||||
.map(|(n, c)| ((*n).to_owned(), *c))
|
||||
.collect::<Vec<_>>();
|
||||
let queued = queue_data
|
||||
.iter()
|
||||
.map(|(s, c)| (s, *c))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let result = internal.calc_queued_order(queued, 4);
|
||||
assert_eq!(result.len(), 4);
|
||||
assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2);
|
||||
assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_queue_4() {
|
||||
let pool = mock_pool(4, 4, 2, 1, 0);
|
||||
let internal = pool.internal.lock().unwrap();
|
||||
let queue_data = [("FOO", 3u64), ("BAR", 3u64)]
|
||||
.iter()
|
||||
.map(|(n, c)| ((*n).to_owned(), *c))
|
||||
.collect::<Vec<_>>();
|
||||
let queued = queue_data
|
||||
.iter()
|
||||
.map(|(s, c)| (s, *c))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let result = internal.calc_queued_order(queued, 4);
|
||||
assert_eq!(result.len(), 4);
|
||||
assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2);
|
||||
assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_queue_5() {
|
||||
let pool = mock_pool(4, 4, 2, 1, 0);
|
||||
let internal = pool.internal.lock().unwrap();
|
||||
let queue_data = [("FOO", 5u64), ("BAR", 5u64)]
|
||||
.iter()
|
||||
.map(|(n, c)| ((*n).to_owned(), *c))
|
||||
.collect::<Vec<_>>();
|
||||
let queued = queue_data
|
||||
.iter()
|
||||
.map(|(s, c)| (s, *c))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let result = internal.calc_queued_order(queued, 5);
|
||||
assert_eq!(result.len(), 5);
|
||||
assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 2);
|
||||
assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_queue_6() {
|
||||
let pool = mock_pool(40, 40, 2, 1, 0);
|
||||
let internal = pool.internal.lock().unwrap();
|
||||
let queue_data = [("FOO", 5u64), ("BAR", 5u64)]
|
||||
.iter()
|
||||
.map(|(n, c)| ((*n).to_owned(), *c))
|
||||
.collect::<Vec<_>>();
|
||||
let queued = queue_data
|
||||
.iter()
|
||||
.map(|(s, c)| (s, *c))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let result = internal.calc_queued_order(queued, 11);
|
||||
assert_eq!(result.len(), 10);
|
||||
assert_eq!(result.iter().filter(|&x| x == "FOO").count(), 5);
|
||||
assert_eq!(result.iter().filter(|&x| x == "BAR").count(), 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn unconfigured() {
|
||||
let pool = mock_pool(4, 4, 2, 1, 0);
|
||||
let mut internal = pool.internal.lock().unwrap();
|
||||
internal.spawn("UNCONFIGURED", || println!());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn correct_spawn_doesnt_panic() {
|
||||
let pool = mock_pool(4, 4, 2, 1, 0);
|
||||
let mut internal = pool.internal.lock().unwrap();
|
||||
internal.spawn("FOO", || println!("foo"));
|
||||
internal.spawn("BAR", || println!("bar"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_spawn() {
|
||||
let pool = mock_pool(4, 4, 2, 1, 0);
|
||||
let internal = pool.internal.lock().unwrap();
|
||||
assert!(internal.can_spawn("FOO"));
|
||||
assert!(internal.can_spawn("BAR"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_run_works() {
|
||||
let pool = mock_pool(4, 4, 2, 1, 0);
|
||||
pool.try_run("FOO", || println!("foo")).unwrap();
|
||||
pool.try_run("BAR", || println!("bar")).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_run_exhausted() {
|
||||
use std::{thread::sleep, time::Duration};
|
||||
let pool = mock_pool(8, 8, 4, 2, 0);
|
||||
let func = || loop {
|
||||
sleep(Duration::from_secs(1))
|
||||
};
|
||||
let f1 = measure(f1, start);
|
||||
let f2 = measure(f2, start);
|
||||
let b1 = measure(b1, start);
|
||||
let b2 = measure(b2, start);
|
||||
// Expect:
|
||||
// [F1, F2]
|
||||
// [B1]
|
||||
// [B2]
|
||||
assert_eq!(done.load(Ordering::Relaxed), 4);
|
||||
assert!(f1 < 600);
|
||||
assert!(f2 < 600);
|
||||
println!("b1 {}", b1);
|
||||
println!("b2 {}", b2);
|
||||
// would be to flanky:
|
||||
//assert!((1000..1500).contains(&b1));
|
||||
//assert!((1500..2000).contains(&b2));
|
||||
assert!(b1 < b2);
|
||||
pool.try_run("FOO", func).unwrap();
|
||||
pool.try_run("BAR", func).unwrap();
|
||||
pool.try_run("FOO", func).unwrap();
|
||||
pool.try_run("BAR", func).unwrap();
|
||||
pool.try_run("FOO", func).unwrap_err();
|
||||
pool.try_run("BAR", func).unwrap();
|
||||
pool.try_run("FOO", func).unwrap_err();
|
||||
pool.try_run("BAR", func).unwrap();
|
||||
pool.try_run("FOO", func).unwrap_err();
|
||||
pool.try_run("BAR", func).unwrap_err();
|
||||
pool.try_run("FOO", func).unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn actually_runs_1() {
|
||||
let pool = mock_pool(4, 4, 0, 0, 1);
|
||||
let barrier = Arc::new(std::sync::Barrier::new(2));
|
||||
let barrier_clone = Arc::clone(&barrier);
|
||||
pool.try_run("BAZ", move || {
|
||||
barrier_clone.wait();
|
||||
})
|
||||
.unwrap();
|
||||
barrier.wait();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn actually_runs_2() {
|
||||
let pool = mock_pool(4, 4, 0, 0, 1);
|
||||
let barrier = Arc::new(std::sync::Barrier::new(2));
|
||||
let barrier_clone = Arc::clone(&barrier);
|
||||
pool.spawn("BAZ", move || {
|
||||
barrier_clone.wait();
|
||||
});
|
||||
barrier.wait();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn actually_waits() {
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Barrier,
|
||||
};
|
||||
let pool = mock_pool(4, 4, 4, 0, 1);
|
||||
let ops_i_ran = Arc::new(AtomicBool::new(false));
|
||||
let ops_i_ran_clone = Arc::clone(&ops_i_ran);
|
||||
let barrier = Arc::new(Barrier::new(2));
|
||||
let barrier_clone = Arc::clone(&barrier);
|
||||
let barrier2 = Arc::new(Barrier::new(2));
|
||||
let barrier2_clone = Arc::clone(&barrier2);
|
||||
pool.try_run("FOO", move || {
|
||||
barrier_clone.wait();
|
||||
})
|
||||
.unwrap();
|
||||
pool.spawn("FOO", move || {
|
||||
ops_i_ran_clone.store(true, Ordering::SeqCst);
|
||||
barrier2_clone.wait();
|
||||
});
|
||||
// in this case we have to sleep
|
||||
std::thread::sleep(std::time::Duration::from_secs(1));
|
||||
assert_eq!(ops_i_ran.load(Ordering::SeqCst), false);
|
||||
// now finish the first job
|
||||
barrier.wait();
|
||||
// now wait on the second job to be actually finished
|
||||
barrier2.wait();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user