Merge branch 'xMAC94x/slowjob_freeze_reproduction' into 'master'

Initial reproduction of the slowjob freezen with par_iter and slowjob,

See merge request veloren/veloren!3546
This commit is contained in:
Marcel 2022-08-20 20:44:07 +00:00
commit 3b87bf1e4a
2 changed files with 79 additions and 14 deletions

View File

@ -106,7 +106,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Modular weapons now have a selling price
- Closing a subwindow now only regrabs the cursor if no other subwindow requires it.
- Fixed npc not handling interactions while fighting (especially merchants in trade)
- Fixed bug where you would still be burning after dying in lava.
- Fixed bug where you would still be burning after dying in lava.
- Workaround for rayon bug that caused lag spikes in slowjobs
## [0.12.0] - 2022-02-19

View File

@ -55,6 +55,8 @@ pub struct SlowJob {
id: u64,
}
type JobType = Box<dyn FnOnce() + Send + Sync + 'static>;
struct InternalSlowJobPool {
next_id: u64,
queue: HashMap<String, VecDeque<Queue>>,
@ -77,7 +79,7 @@ struct Config {
struct Queue {
id: u64,
name: String,
task: Box<dyn FnOnce() + Send + Sync + 'static>,
task: JobType,
}
pub struct JobMetrics {
@ -122,8 +124,17 @@ impl InternalSlowJobPool {
pub fn new(
global_limit: u64,
jobs_metrics_cnt: usize,
threadpool: Arc<ThreadPool>,
_threadpool: Arc<ThreadPool>,
) -> Arc<Mutex<Self>> {
// rayon is having a bug where a ECS task could work-steal a slowjob if we use
// the same threadpool, which would cause lagspikes we dont want!
let threadpool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(global_limit as usize)
.thread_name(move |i| format!("slowjob-{}", i))
.build()
.unwrap(),
);
let link = Arc::new(Mutex::new(Self {
next_id: 0,
queue: HashMap::new(),
@ -400,6 +411,13 @@ impl SlowJobPool {
#[cfg(test)]
mod tests {
use super::*;
use std::{
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Barrier,
},
time::Duration,
};
fn mock_pool(
pool_threads: usize,
@ -652,10 +670,9 @@ mod tests {
#[test]
fn try_run_exhausted() {
use std::{thread::sleep, time::Duration};
let pool = mock_pool(8, 8, 0, 4, 2, 0);
let func = || loop {
sleep(Duration::from_secs(1))
std::thread::sleep(Duration::from_secs(1))
};
pool.try_run("FOO", func).unwrap();
pool.try_run("BAR", func).unwrap();
@ -673,7 +690,7 @@ mod tests {
#[test]
fn actually_runs_1() {
let pool = mock_pool(4, 4, 0, 0, 0, 1);
let barrier = Arc::new(std::sync::Barrier::new(2));
let barrier = Arc::new(Barrier::new(2));
let barrier_clone = Arc::clone(&barrier);
pool.try_run("BAZ", move || {
barrier_clone.wait();
@ -685,7 +702,7 @@ mod tests {
#[test]
fn actually_runs_2() {
let pool = mock_pool(4, 4, 0, 0, 0, 1);
let barrier = Arc::new(std::sync::Barrier::new(2));
let barrier = Arc::new(Barrier::new(2));
let barrier_clone = Arc::clone(&barrier);
pool.spawn("BAZ", move || {
barrier_clone.wait();
@ -695,10 +712,6 @@ mod tests {
#[test]
fn actually_waits() {
use std::sync::{
atomic::{AtomicBool, Ordering},
Barrier,
};
let pool = mock_pool(4, 4, 0, 4, 0, 1);
let ops_i_ran = Arc::new(AtomicBool::new(false));
let ops_i_ran_clone = Arc::clone(&ops_i_ran);
@ -715,7 +728,7 @@ mod tests {
barrier2_clone.wait();
});
// in this case we have to sleep
std::thread::sleep(std::time::Duration::from_secs(1));
std::thread::sleep(Duration::from_secs(1));
assert!(!ops_i_ran.load(Ordering::SeqCst));
// now finish the first job
barrier.wait();
@ -725,7 +738,6 @@ mod tests {
#[test]
fn verify_metrics() {
use std::sync::Barrier;
let pool = mock_pool(4, 4, 2, 1, 0, 4);
let barrier = Arc::new(Barrier::new(5));
for name in &["FOO", "BAZ", "FOO", "FOO"] {
@ -737,7 +749,7 @@ mod tests {
// now finish all jobs
barrier.wait();
// in this case we have to sleep to give it some time to store all the metrics
std::thread::sleep(std::time::Duration::from_secs(2));
std::thread::sleep(Duration::from_secs(2));
let metrics = pool.take_metrics();
let foo = metrics.get("FOO").expect("FOO doesn't exist in metrics");
//its limited to 2, even though we had 3 jobs
@ -746,4 +758,56 @@ mod tests {
let baz = metrics.get("BAZ").expect("BAZ doesn't exist in metrics");
assert_eq!(baz.len(), 1);
}
fn work_barrier(counter: &Arc<AtomicU64>, ms: u64) -> impl std::ops::FnOnce() -> () {
let counter = Arc::clone(counter);
println!("Create work_barrier");
move || {
println!(".{}..", ms);
std::thread::sleep(Duration::from_millis(ms));
println!(".{}..Done", ms);
counter.fetch_add(1, Ordering::SeqCst);
}
}
#[test]
fn verify_that_spawn_doesnt_block_par_iter() {
let threadpool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(20)
.build()
.unwrap(),
);
let pool = SlowJobPool::new(2, 100, Arc::<rayon::ThreadPool>::clone(&threadpool));
pool.configure("BAZ", |_| 2);
let counter = Arc::new(AtomicU64::new(0));
let start = Instant::now();
threadpool.install(|| {
use rayon::prelude::*;
(0..100)
.into_par_iter()
.map(|i| {
std::thread::sleep(Duration::from_millis(10));
if i == 50 {
pool.spawn("BAZ", work_barrier(&counter, 2000));
}
if i == 99 {
println!("The first ITER end, at {}ms", start.elapsed().as_millis());
}
})
.collect::<Vec<_>>();
let elapsed = start.elapsed().as_millis();
println!("The first ITER finished, at {}ms", elapsed);
assert!(
elapsed < 1900,
"It seems like the par_iter waited on the 2s sleep task to finish"
);
});
while counter.load(Ordering::SeqCst) == 0 {
println!("waiting for BAZ task to finish");
std::thread::sleep(Duration::from_secs(1));
}
}
}