diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs index 192f47e15a..fac0c98846 100644 --- a/common/src/slowjob.rs +++ b/common/src/slowjob.rs @@ -1,7 +1,7 @@ use hashbrown::HashMap; use rayon::ThreadPool; use std::sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{AtomicI64, AtomicU64, Ordering}, Arc, RwLock, }; @@ -40,9 +40,9 @@ pub struct SlowJob { struct InternalSlowJobPool { next_id: Arc, queue: RwLock>>, - running_jobs: RwLock>>, + running_jobs: RwLock>>, configs: RwLock>, - global_running_jobs: Arc, + global_running_jobs: Arc, global_limit: u64, threadpool: Arc, } @@ -55,7 +55,7 @@ struct Config { struct Queue { task: Box, spawned_total: Arc, - local_running_jobs: Arc, + local_running_jobs: Arc, } impl InternalSlowJobPool { @@ -65,15 +65,19 @@ impl InternalSlowJobPool { queue: RwLock::new(HashMap::new()), running_jobs: RwLock::new(HashMap::new()), configs: RwLock::new(HashMap::new()), - global_running_jobs: Arc::new(AtomicU64::new(0)), + global_running_jobs: Arc::new(AtomicI64::new(0)), global_limit: global_limit.max(1), threadpool, } } fn maintain(&self) { - let jobs_available = self.global_limit - self.global_running_jobs.load(Ordering::Relaxed); - if jobs_available == 0 { + let jobs_available = + self.global_limit as i64 - self.global_running_jobs.load(Ordering::Relaxed); + if jobs_available < 0 { + tracing::warn!(?jobs_available, "Some math is wrong in slowjob code"); + } + if jobs_available <= 0 { // we run at limit, can't spawn return; } @@ -118,7 +122,7 @@ impl InternalSlowJobPool { }; if let Some(queue) = map.remove(&firstkey) { - if queue.local_running_jobs.load(Ordering::Relaxed) < *max { + if queue.local_running_jobs.load(Ordering::Relaxed) < *max as i64 { self.fire(queue); } else { map.insert(firstkey, queue); @@ -235,7 +239,7 @@ mod tests { fn mock_fn( name: &str, start_time: &Arc>>, - done: &Arc, + done: &Arc, ) -> impl FnOnce() { let name = name.to_string(); let start_time = Arc::clone(start_time); @@ -265,7 +269,7 @@ mod tests { let f5 = Arc::new(Mutex::new(None)); let f6 = Arc::new(Mutex::new(None)); let f7 = Arc::new(Mutex::new(None)); - let done = Arc::new(AtomicU64::new(0)); + let done = Arc::new(AtomicI64::new(0)); pool.spawn("FOO", mock_fn("foo1", &f1, &done)); pool.spawn("FOO", mock_fn("foo2", &f2, &done)); pool.spawn("FOO", mock_fn("foo3", &f3, &done)); @@ -312,7 +316,7 @@ mod tests { let f5 = Arc::new(Mutex::new(None)); let f6 = Arc::new(Mutex::new(None)); let f7 = Arc::new(Mutex::new(None)); - let done = Arc::new(AtomicU64::new(0)); + let done = Arc::new(AtomicI64::new(0)); pool.spawn("FOO", mock_fn("foo1", &f1, &done)); pool.spawn("FOO", mock_fn("foo2", &f2, &done)); pool.spawn("FOO", mock_fn("foo3", &f3, &done)); @@ -357,7 +361,7 @@ mod tests { let f2 = Arc::new(Mutex::new(None)); let b1 = Arc::new(Mutex::new(None)); let b2 = Arc::new(Mutex::new(None)); - let done = Arc::new(AtomicU64::new(0)); + let done = Arc::new(AtomicI64::new(0)); pool.spawn("FOO", mock_fn("foo1", &f1, &done)); pool.spawn("FOO", mock_fn("foo2", &f2, &done)); std::thread::sleep(Duration::from_millis(1000));