harden slowjob code

This commit is contained in:
Marcel Märtens 2021-03-21 18:44:30 +01:00
parent 1afe3b7de5
commit a77578a25b

View File

@ -1,7 +1,7 @@
use hashbrown::HashMap;
use rayon::ThreadPool;
use std::sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicI64, AtomicU64, Ordering},
Arc, RwLock,
};
@ -40,9 +40,9 @@ pub struct SlowJob {
struct InternalSlowJobPool {
next_id: Arc<AtomicU64>,
queue: RwLock<HashMap<String, HashMap<u64, Queue>>>,
running_jobs: RwLock<HashMap<String, Arc<AtomicU64>>>,
running_jobs: RwLock<HashMap<String, Arc<AtomicI64>>>,
configs: RwLock<HashMap<String, Config>>,
global_running_jobs: Arc<AtomicU64>,
global_running_jobs: Arc<AtomicI64>,
global_limit: u64,
threadpool: Arc<ThreadPool>,
}
@ -55,7 +55,7 @@ struct Config {
struct Queue {
task: Box<dyn FnOnce() + Send + Sync + 'static>,
spawned_total: Arc<AtomicU64>,
local_running_jobs: Arc<AtomicU64>,
local_running_jobs: Arc<AtomicI64>,
}
impl InternalSlowJobPool {
@ -65,15 +65,19 @@ impl InternalSlowJobPool {
queue: RwLock::new(HashMap::new()),
running_jobs: RwLock::new(HashMap::new()),
configs: RwLock::new(HashMap::new()),
global_running_jobs: Arc::new(AtomicU64::new(0)),
global_running_jobs: Arc::new(AtomicI64::new(0)),
global_limit: global_limit.max(1),
threadpool,
}
}
fn maintain(&self) {
let jobs_available = self.global_limit - self.global_running_jobs.load(Ordering::Relaxed);
if jobs_available == 0 {
let jobs_available =
self.global_limit as i64 - self.global_running_jobs.load(Ordering::Relaxed);
if jobs_available < 0 {
tracing::warn!(?jobs_available, "Some math is wrong in slowjob code");
}
if jobs_available <= 0 {
// we run at limit, can't spawn
return;
}
@ -118,7 +122,7 @@ impl InternalSlowJobPool {
};
if let Some(queue) = map.remove(&firstkey) {
if queue.local_running_jobs.load(Ordering::Relaxed) < *max {
if queue.local_running_jobs.load(Ordering::Relaxed) < *max as i64 {
self.fire(queue);
} else {
map.insert(firstkey, queue);
@ -235,7 +239,7 @@ mod tests {
fn mock_fn(
name: &str,
start_time: &Arc<Mutex<Option<Instant>>>,
done: &Arc<AtomicU64>,
done: &Arc<AtomicI64>,
) -> impl FnOnce() {
let name = name.to_string();
let start_time = Arc::clone(start_time);
@ -265,7 +269,7 @@ mod tests {
let f5 = Arc::new(Mutex::new(None));
let f6 = Arc::new(Mutex::new(None));
let f7 = Arc::new(Mutex::new(None));
let done = Arc::new(AtomicU64::new(0));
let done = Arc::new(AtomicI64::new(0));
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
pool.spawn("FOO", mock_fn("foo3", &f3, &done));
@ -312,7 +316,7 @@ mod tests {
let f5 = Arc::new(Mutex::new(None));
let f6 = Arc::new(Mutex::new(None));
let f7 = Arc::new(Mutex::new(None));
let done = Arc::new(AtomicU64::new(0));
let done = Arc::new(AtomicI64::new(0));
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
pool.spawn("FOO", mock_fn("foo3", &f3, &done));
@ -357,7 +361,7 @@ mod tests {
let f2 = Arc::new(Mutex::new(None));
let b1 = Arc::new(Mutex::new(None));
let b2 = Arc::new(Mutex::new(None));
let done = Arc::new(AtomicU64::new(0));
let done = Arc::new(AtomicI64::new(0));
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
std::thread::sleep(Duration::from_millis(1000));