mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
Merge branch 'xMAC94x/tracing_slowjob' into 'master'
harden slowjob code See merge request veloren/veloren!1961
This commit is contained in:
commit
e64258afcf
@ -1,7 +1,7 @@
|
|||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use rayon::ThreadPool;
|
use rayon::ThreadPool;
|
||||||
use std::sync::{
|
use std::sync::{
|
||||||
atomic::{AtomicU64, Ordering},
|
atomic::{AtomicI64, AtomicU64, Ordering},
|
||||||
Arc, RwLock,
|
Arc, RwLock,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -40,9 +40,9 @@ pub struct SlowJob {
|
|||||||
struct InternalSlowJobPool {
|
struct InternalSlowJobPool {
|
||||||
next_id: Arc<AtomicU64>,
|
next_id: Arc<AtomicU64>,
|
||||||
queue: RwLock<HashMap<String, HashMap<u64, Queue>>>,
|
queue: RwLock<HashMap<String, HashMap<u64, Queue>>>,
|
||||||
running_jobs: RwLock<HashMap<String, Arc<AtomicU64>>>,
|
running_jobs: RwLock<HashMap<String, Arc<AtomicI64>>>,
|
||||||
configs: RwLock<HashMap<String, Config>>,
|
configs: RwLock<HashMap<String, Config>>,
|
||||||
global_running_jobs: Arc<AtomicU64>,
|
global_running_jobs: Arc<AtomicI64>,
|
||||||
global_limit: u64,
|
global_limit: u64,
|
||||||
threadpool: Arc<ThreadPool>,
|
threadpool: Arc<ThreadPool>,
|
||||||
}
|
}
|
||||||
@ -55,7 +55,7 @@ struct Config {
|
|||||||
struct Queue {
|
struct Queue {
|
||||||
task: Box<dyn FnOnce() + Send + Sync + 'static>,
|
task: Box<dyn FnOnce() + Send + Sync + 'static>,
|
||||||
spawned_total: Arc<AtomicU64>,
|
spawned_total: Arc<AtomicU64>,
|
||||||
local_running_jobs: Arc<AtomicU64>,
|
local_running_jobs: Arc<AtomicI64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl InternalSlowJobPool {
|
impl InternalSlowJobPool {
|
||||||
@ -65,15 +65,19 @@ impl InternalSlowJobPool {
|
|||||||
queue: RwLock::new(HashMap::new()),
|
queue: RwLock::new(HashMap::new()),
|
||||||
running_jobs: RwLock::new(HashMap::new()),
|
running_jobs: RwLock::new(HashMap::new()),
|
||||||
configs: RwLock::new(HashMap::new()),
|
configs: RwLock::new(HashMap::new()),
|
||||||
global_running_jobs: Arc::new(AtomicU64::new(0)),
|
global_running_jobs: Arc::new(AtomicI64::new(0)),
|
||||||
global_limit: global_limit.max(1),
|
global_limit: global_limit.max(1),
|
||||||
threadpool,
|
threadpool,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn maintain(&self) {
|
fn maintain(&self) {
|
||||||
let jobs_available = self.global_limit - self.global_running_jobs.load(Ordering::Relaxed);
|
let jobs_available =
|
||||||
if jobs_available == 0 {
|
self.global_limit as i64 - self.global_running_jobs.load(Ordering::Relaxed);
|
||||||
|
if jobs_available < 0 {
|
||||||
|
tracing::warn!(?jobs_available, "Some math is wrong in slowjob code");
|
||||||
|
}
|
||||||
|
if jobs_available <= 0 {
|
||||||
// we run at limit, can't spawn
|
// we run at limit, can't spawn
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -118,7 +122,7 @@ impl InternalSlowJobPool {
|
|||||||
};
|
};
|
||||||
|
|
||||||
if let Some(queue) = map.remove(&firstkey) {
|
if let Some(queue) = map.remove(&firstkey) {
|
||||||
if queue.local_running_jobs.load(Ordering::Relaxed) < *max {
|
if queue.local_running_jobs.load(Ordering::Relaxed) < *max as i64 {
|
||||||
self.fire(queue);
|
self.fire(queue);
|
||||||
} else {
|
} else {
|
||||||
map.insert(firstkey, queue);
|
map.insert(firstkey, queue);
|
||||||
@ -235,7 +239,7 @@ mod tests {
|
|||||||
fn mock_fn(
|
fn mock_fn(
|
||||||
name: &str,
|
name: &str,
|
||||||
start_time: &Arc<Mutex<Option<Instant>>>,
|
start_time: &Arc<Mutex<Option<Instant>>>,
|
||||||
done: &Arc<AtomicU64>,
|
done: &Arc<AtomicI64>,
|
||||||
) -> impl FnOnce() {
|
) -> impl FnOnce() {
|
||||||
let name = name.to_string();
|
let name = name.to_string();
|
||||||
let start_time = Arc::clone(start_time);
|
let start_time = Arc::clone(start_time);
|
||||||
@ -265,7 +269,7 @@ mod tests {
|
|||||||
let f5 = Arc::new(Mutex::new(None));
|
let f5 = Arc::new(Mutex::new(None));
|
||||||
let f6 = Arc::new(Mutex::new(None));
|
let f6 = Arc::new(Mutex::new(None));
|
||||||
let f7 = Arc::new(Mutex::new(None));
|
let f7 = Arc::new(Mutex::new(None));
|
||||||
let done = Arc::new(AtomicU64::new(0));
|
let done = Arc::new(AtomicI64::new(0));
|
||||||
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
|
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
|
||||||
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
|
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
|
||||||
pool.spawn("FOO", mock_fn("foo3", &f3, &done));
|
pool.spawn("FOO", mock_fn("foo3", &f3, &done));
|
||||||
@ -312,7 +316,7 @@ mod tests {
|
|||||||
let f5 = Arc::new(Mutex::new(None));
|
let f5 = Arc::new(Mutex::new(None));
|
||||||
let f6 = Arc::new(Mutex::new(None));
|
let f6 = Arc::new(Mutex::new(None));
|
||||||
let f7 = Arc::new(Mutex::new(None));
|
let f7 = Arc::new(Mutex::new(None));
|
||||||
let done = Arc::new(AtomicU64::new(0));
|
let done = Arc::new(AtomicI64::new(0));
|
||||||
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
|
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
|
||||||
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
|
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
|
||||||
pool.spawn("FOO", mock_fn("foo3", &f3, &done));
|
pool.spawn("FOO", mock_fn("foo3", &f3, &done));
|
||||||
@ -357,7 +361,7 @@ mod tests {
|
|||||||
let f2 = Arc::new(Mutex::new(None));
|
let f2 = Arc::new(Mutex::new(None));
|
||||||
let b1 = Arc::new(Mutex::new(None));
|
let b1 = Arc::new(Mutex::new(None));
|
||||||
let b2 = Arc::new(Mutex::new(None));
|
let b2 = Arc::new(Mutex::new(None));
|
||||||
let done = Arc::new(AtomicU64::new(0));
|
let done = Arc::new(AtomicI64::new(0));
|
||||||
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
|
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
|
||||||
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
|
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
|
||||||
std::thread::sleep(Duration::from_millis(1000));
|
std::thread::sleep(Duration::from_millis(1000));
|
||||||
|
Loading…
Reference in New Issue
Block a user