From c968e5c748c09c26bd8c246d9ae2e63d1dad230c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Thu, 18 Aug 2022 12:38:17 +0200 Subject: [PATCH 1/4] Initial reproduction of the slowjob freezen with par_iter and slowjob, See that we spawn 2 jobs in the first loop, the loop seems to NOT complete until those jobs are executed Next step is to do everything with plain rayon coding --- common/src/slowjob.rs | 106 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs index 414d4888f8..6e3f267f76 100644 --- a/common/src/slowjob.rs +++ b/common/src/slowjob.rs @@ -399,6 +399,8 @@ impl SlowJobPool { #[cfg(test)] mod tests { + use std::sync::Barrier; + use std::sync::atomic::AtomicU64; use super::*; fn mock_pool( @@ -746,4 +748,108 @@ mod tests { let baz = metrics.get("BAZ").expect("BAZ doesn't exist in metrics"); assert_eq!(baz.len(), 1); } + + + fn busy_work(ms: u64) { + let x = Instant::now(); + while x.elapsed() < std::time::Duration::from_millis(ms) { + } + } + + use std::time::Duration; + use std::sync::atomic::Ordering; + + fn work_barrier2(counter: Arc, ms: u64) -> () { + println!(".{}..", ms); + busy_work(ms); + println!(".{}..Done", ms); + counter.fetch_add(1, Ordering::SeqCst); + } + + fn work_barrier(counter: &Arc, ms: u64) -> impl std::ops::FnOnce() -> () { + let counter = Arc::clone(&counter); + println!("Create work_barrier"); + move || work_barrier2(counter, ms) + } + + #[test] + fn ffffff() { + let threadpool = Arc::new(rayon::ThreadPoolBuilder::new() + .num_threads(2) + .build() + .unwrap()); + let pool = SlowJobPool::new(2, 100, threadpool.clone()); + pool.configure("FOO", |x| x ); + pool.configure("BAR", |x| x / 2); + pool.configure("BAZ", |_| 1); + let pool2 = SlowJobPool::new(2, 100, threadpool.clone()); + pool2.configure("FOO", |x| x ); + pool2.configure("BAR", |x| x / 2); + pool2.configure("BAZ", |_| 1); + let counter = Arc::new(AtomicU64::new(0)); + let start = Instant::now(); + //pool.spawn("BAZ", work_barrier(&counter, 1000)); + //for _ in 0..600 { + // pool.spawn("FOO", work_barrier(&counter, 10)); + //} + + threadpool.install(|| { + use rayon::prelude::*; + let mut iter = Vec::new(); + for i in 0..1000 { + iter.push((i, work_barrier(&counter, 10))); + } + let mut iter2 = Vec::new(); + for i in 0..1000 { + iter2.push((i, work_barrier(&counter, 10))); + } + iter.into_par_iter().map(|(i, task)| { + task(); + if i == 900 { + println!("Spawning task"); + pool.spawn("BAZ", work_barrier(&counter, 1000)); + pool2.spawn("BAZ", work_barrier(&counter, 10000)); + println!("Spawned tasks"); + } + if i == 999 { + println!("The first ITER end"); + } + }).collect::>(); + println!("The first ITER finished"); + //pool2.spawn("BAZ", work_barrier(&counter, 1000)); + //pool2.spawn("BAZ", work_barrier(&counter, 1000)); + //pool2.spawn("BAZ", work_barrier(&counter, 1000)); + iter2.into_par_iter().map(|(i, task)| { + if i == 0 { + println!("The second ITER started"); + } + task(); + }).collect::>(); + + }); + + + //pool.spawn("FOO", work_barrier(&barrier, 1)); + + + + + println!("wait for test finish"); + const TIMEOUT: Duration = Duration::from_secs(2); + let mut last_n_time = (0, start); + loop { + let now = Instant::now(); + let n = counter.load(Ordering::SeqCst); + if n != last_n_time.0 { + last_n_time = (n, now); + } else if now.duration_since(last_n_time.1) > TIMEOUT { + break; + } + } + let d = last_n_time.1.duration_since(start); + println!("=============="); + println!("Time Passed: {}ms", d.as_millis()); + println!("Jobs finished: {}", last_n_time.0); + + } } From 25d9e5b27e9f9377ddbc7967fe7007e1630b0f3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Thu, 18 Aug 2022 19:57:30 +0200 Subject: [PATCH 2/4] workaround for https://github.com/rayon-rs/rayon/issues/969 Unfortuatly rayon has a bug that if you Threadpool.spawn from inside a parallel iterator from inside a Threadpool.install, that the parallel iterator will BLOCK till the Threadpool.spawn finished, which causes many many lag spikes. I assume this might be the case for the pictures in the gantt chart where a system took unusual long or had a long pause that was unexplained. I also raise the number of threads by 1, as this rayon thread will prob be useless in all cases and have no real work to do. EDIT: it turns out the tests are sporadicly failing and this soluction doesnt work --- CHANGELOG.md | 3 +- common/src/slowjob.rs | 179 ++++++++++++++++---------------------- common/state/src/state.rs | 2 +- 3 files changed, 76 insertions(+), 108 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c24ad07e6b..52210a5847 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -105,7 +105,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Modular weapons now have a selling price - Closing a subwindow now only regrabs the cursor if no other subwindow requires it. - Fixed npc not handling interactions while fighting (especially merchants in trade) -- Fixed bug where you would still be burning after dying in lava. +- Fixed bug where you would still be burning after dying in lava. +- Workaround for rayon bug that caused lag spikes in slowjobs ## [0.12.0] - 2022-02-19 diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs index 6e3f267f76..a7cb2d88dc 100644 --- a/common/src/slowjob.rs +++ b/common/src/slowjob.rs @@ -55,16 +55,18 @@ pub struct SlowJob { id: u64, } +type JobType = Box; + struct InternalSlowJobPool { next_id: u64, queue: HashMap>, + dispatch_sender: std::sync::mpsc::Sender, configs: HashMap, last_spawned_configs: Vec, global_spawned_and_running: u64, global_limit: u64, jobs_metrics_cnt: usize, jobs_metrics: HashMap>, - threadpool: Arc, internal: Option>>, } @@ -77,7 +79,7 @@ struct Config { struct Queue { id: u64, name: String, - task: Box, + task: JobType, } pub struct JobMetrics { @@ -124,16 +126,18 @@ impl InternalSlowJobPool { jobs_metrics_cnt: usize, threadpool: Arc, ) -> Arc> { + let (dispatch_sender, dispatch_receiver) = std::sync::mpsc::channel(); + Self::dispatcher(threadpool, dispatch_receiver); let link = Arc::new(Mutex::new(Self { next_id: 0, queue: HashMap::new(), + dispatch_sender, configs: HashMap::new(), last_spawned_configs: Vec::new(), global_spawned_and_running: 0, global_limit: global_limit.max(1), jobs_metrics_cnt, jobs_metrics: HashMap::new(), - threadpool, internal: None, })); @@ -144,6 +148,20 @@ impl InternalSlowJobPool { link } + pub fn dispatcher( + threadpool: Arc, + dispatch_receiver: std::sync::mpsc::Receiver, + ) { + let dispatch_receiver = Mutex::new(dispatch_receiver); + let threadpool2 = Arc::clone(&threadpool); + threadpool.spawn(move || { + let dispatch_receiver = dispatch_receiver.lock().unwrap(); + for task in dispatch_receiver.iter() { + threadpool2.spawn(task) + } + }); + } + /// returns order of configuration which are queued next fn calc_queued_order( &self, @@ -307,7 +325,9 @@ impl InternalSlowJobPool { .position(|e| e == &queue.name) .map(|i| self.last_spawned_configs.remove(i)); self.last_spawned_configs.push(queue.name.to_owned()); - self.threadpool.spawn(queue.task); + if let Err(e) = self.dispatch_sender.send(queue.task) { + error!(?e, "dispatcher thread seems to have crashed"); + }; }, None => error!( "internal calculation is wrong, we extected a schedulable job to be \ @@ -399,9 +419,14 @@ impl SlowJobPool { #[cfg(test)] mod tests { - use std::sync::Barrier; - use std::sync::atomic::AtomicU64; use super::*; + use std::{ + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Barrier, + }, + time::Duration, + }; fn mock_pool( pool_threads: usize, @@ -654,10 +679,9 @@ mod tests { #[test] fn try_run_exhausted() { - use std::{thread::sleep, time::Duration}; let pool = mock_pool(8, 8, 0, 4, 2, 0); let func = || loop { - sleep(Duration::from_secs(1)) + std::thread::sleep(Duration::from_secs(1)) }; pool.try_run("FOO", func).unwrap(); pool.try_run("BAR", func).unwrap(); @@ -675,7 +699,7 @@ mod tests { #[test] fn actually_runs_1() { let pool = mock_pool(4, 4, 0, 0, 0, 1); - let barrier = Arc::new(std::sync::Barrier::new(2)); + let barrier = Arc::new(Barrier::new(2)); let barrier_clone = Arc::clone(&barrier); pool.try_run("BAZ", move || { barrier_clone.wait(); @@ -687,7 +711,7 @@ mod tests { #[test] fn actually_runs_2() { let pool = mock_pool(4, 4, 0, 0, 0, 1); - let barrier = Arc::new(std::sync::Barrier::new(2)); + let barrier = Arc::new(Barrier::new(2)); let barrier_clone = Arc::clone(&barrier); pool.spawn("BAZ", move || { barrier_clone.wait(); @@ -697,10 +721,6 @@ mod tests { #[test] fn actually_waits() { - use std::sync::{ - atomic::{AtomicBool, Ordering}, - Barrier, - }; let pool = mock_pool(4, 4, 0, 4, 0, 1); let ops_i_ran = Arc::new(AtomicBool::new(false)); let ops_i_ran_clone = Arc::clone(&ops_i_ran); @@ -717,7 +737,7 @@ mod tests { barrier2_clone.wait(); }); // in this case we have to sleep - std::thread::sleep(std::time::Duration::from_secs(1)); + std::thread::sleep(Duration::from_secs(1)); assert!(!ops_i_ran.load(Ordering::SeqCst)); // now finish the first job barrier.wait(); @@ -727,7 +747,6 @@ mod tests { #[test] fn verify_metrics() { - use std::sync::Barrier; let pool = mock_pool(4, 4, 2, 1, 0, 4); let barrier = Arc::new(Barrier::new(5)); for name in &["FOO", "BAZ", "FOO", "FOO"] { @@ -739,7 +758,7 @@ mod tests { // now finish all jobs barrier.wait(); // in this case we have to sleep to give it some time to store all the metrics - std::thread::sleep(std::time::Duration::from_secs(2)); + std::thread::sleep(Duration::from_secs(2)); let metrics = pool.take_metrics(); let foo = metrics.get("FOO").expect("FOO doesn't exist in metrics"); //its limited to 2, even though we had 3 jobs @@ -749,107 +768,55 @@ mod tests { assert_eq!(baz.len(), 1); } - - fn busy_work(ms: u64) { - let x = Instant::now(); - while x.elapsed() < std::time::Duration::from_millis(ms) { - } - } - - use std::time::Duration; - use std::sync::atomic::Ordering; - - fn work_barrier2(counter: Arc, ms: u64) -> () { - println!(".{}..", ms); - busy_work(ms); - println!(".{}..Done", ms); - counter.fetch_add(1, Ordering::SeqCst); - } - fn work_barrier(counter: &Arc, ms: u64) -> impl std::ops::FnOnce() -> () { - let counter = Arc::clone(&counter); + let counter = Arc::clone(counter); println!("Create work_barrier"); - move || work_barrier2(counter, ms) + move || { + println!(".{}..", ms); + std::thread::sleep(Duration::from_millis(ms)); + println!(".{}..Done", ms); + counter.fetch_add(1, Ordering::SeqCst); + } } #[test] - fn ffffff() { - let threadpool = Arc::new(rayon::ThreadPoolBuilder::new() - .num_threads(2) - .build() - .unwrap()); - let pool = SlowJobPool::new(2, 100, threadpool.clone()); - pool.configure("FOO", |x| x ); - pool.configure("BAR", |x| x / 2); - pool.configure("BAZ", |_| 1); - let pool2 = SlowJobPool::new(2, 100, threadpool.clone()); - pool2.configure("FOO", |x| x ); - pool2.configure("BAR", |x| x / 2); - pool2.configure("BAZ", |_| 1); + fn verify_that_spawn_doesnt_block_par_iter() { + let threadpool = Arc::new( + rayon::ThreadPoolBuilder::new() + .num_threads(20) + .build() + .unwrap(), + ); + let pool = SlowJobPool::new(2, 100, Arc::::clone(&threadpool)); + pool.configure("BAZ", |_| 2); let counter = Arc::new(AtomicU64::new(0)); let start = Instant::now(); - //pool.spawn("BAZ", work_barrier(&counter, 1000)); - //for _ in 0..600 { - // pool.spawn("FOO", work_barrier(&counter, 10)); - //} threadpool.install(|| { use rayon::prelude::*; - let mut iter = Vec::new(); - for i in 0..1000 { - iter.push((i, work_barrier(&counter, 10))); - } - let mut iter2 = Vec::new(); - for i in 0..1000 { - iter2.push((i, work_barrier(&counter, 10))); - } - iter.into_par_iter().map(|(i, task)| { - task(); - if i == 900 { - println!("Spawning task"); - pool.spawn("BAZ", work_barrier(&counter, 1000)); - pool2.spawn("BAZ", work_barrier(&counter, 10000)); - println!("Spawned tasks"); - } - if i == 999 { - println!("The first ITER end"); - } - }).collect::>(); - println!("The first ITER finished"); - //pool2.spawn("BAZ", work_barrier(&counter, 1000)); - //pool2.spawn("BAZ", work_barrier(&counter, 1000)); - //pool2.spawn("BAZ", work_barrier(&counter, 1000)); - iter2.into_par_iter().map(|(i, task)| { - if i == 0 { - println!("The second ITER started"); - } - task(); - }).collect::>(); - + (0..100) + .into_par_iter() + .map(|i| { + std::thread::sleep(Duration::from_millis(10)); + if i == 50 { + pool.spawn("BAZ", work_barrier(&counter, 2000)); + } + if i == 99 { + println!("The first ITER end, at {}ms", start.elapsed().as_millis()); + } + }) + .collect::>(); + let elapsed = start.elapsed().as_millis(); + println!("The first ITER finished, at {}ms", elapsed); + assert!( + elapsed < 1900, + "It seems like the par_iter waited on the 2s sleep task to finish" + ); }); - - //pool.spawn("FOO", work_barrier(&barrier, 1)); - - - - - println!("wait for test finish"); - const TIMEOUT: Duration = Duration::from_secs(2); - let mut last_n_time = (0, start); - loop { - let now = Instant::now(); - let n = counter.load(Ordering::SeqCst); - if n != last_n_time.0 { - last_n_time = (n, now); - } else if now.duration_since(last_n_time.1) > TIMEOUT { - break; - } + while counter.load(Ordering::SeqCst) == 0 { + println!("waiting for BAZ task to finish"); + std::thread::sleep(Duration::from_secs(1)); } - let d = last_n_time.1.duration_since(start); - println!("=============="); - println!("Time Passed: {}ms", d.as_millis()); - println!("Jobs finished: {}", last_n_time.0); - } } diff --git a/common/state/src/state.rs b/common/state/src/state.rs index d088bc84d3..6d659c9f07 100644 --- a/common/state/src/state.rs +++ b/common/state/src/state.rs @@ -110,7 +110,7 @@ impl State { let thread_pool = Arc::new( ThreadPoolBuilder::new() - .num_threads(num_cpus::get().max(common::consts::MIN_RECOMMENDED_RAYON_THREADS)) + .num_threads(num_cpus::get().max(common::consts::MIN_RECOMMENDED_RAYON_THREADS) + 1 /*workaround for a rayon schedule bug, see MR 3546*/) .thread_name(move |i| format!("rayon-{}-{}", thread_name_infix, i)) .build() .unwrap(), From 8ca458188d524edfe5af411f37392fabfeaaaafd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Fri, 19 Aug 2022 09:30:56 +0200 Subject: [PATCH 3/4] also those dispatcher tricks wont help the test, it still sporadically fails --- common/src/slowjob.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs index a7cb2d88dc..9af7b348d9 100644 --- a/common/src/slowjob.rs +++ b/common/src/slowjob.rs @@ -155,10 +155,14 @@ impl InternalSlowJobPool { let dispatch_receiver = Mutex::new(dispatch_receiver); let threadpool2 = Arc::clone(&threadpool); threadpool.spawn(move || { - let dispatch_receiver = dispatch_receiver.lock().unwrap(); - for task in dispatch_receiver.iter() { - threadpool2.spawn(task) - } + threadpool2.in_place_scope(|s| { + s.spawn(|s| { + let dispatch_receiver = dispatch_receiver.lock().unwrap(); + for task in dispatch_receiver.iter() { + s.spawn(|_| (task)()); + } + }); + }); }); } From 3eac68000bf81d091f09e94b7d22c0a96165f0b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Fri, 19 Aug 2022 09:47:54 +0200 Subject: [PATCH 4/4] all tests shows there is currently no way to keep rayon from work-stealing over scopes and doing really really weird stuff between the ECS threadpool and the slowjob Threadpool. so even if i dont like to have multiple threads i think there is no other workaround than just creating a second threapool for background tasks --- common/src/slowjob.rs | 39 +++++++++++++-------------------------- common/state/src/state.rs | 2 +- 2 files changed, 14 insertions(+), 27 deletions(-) diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs index 9af7b348d9..86c9c1afab 100644 --- a/common/src/slowjob.rs +++ b/common/src/slowjob.rs @@ -60,13 +60,13 @@ type JobType = Box; struct InternalSlowJobPool { next_id: u64, queue: HashMap>, - dispatch_sender: std::sync::mpsc::Sender, configs: HashMap, last_spawned_configs: Vec, global_spawned_and_running: u64, global_limit: u64, jobs_metrics_cnt: usize, jobs_metrics: HashMap>, + threadpool: Arc, internal: Option>>, } @@ -124,20 +124,27 @@ impl InternalSlowJobPool { pub fn new( global_limit: u64, jobs_metrics_cnt: usize, - threadpool: Arc, + _threadpool: Arc, ) -> Arc> { - let (dispatch_sender, dispatch_receiver) = std::sync::mpsc::channel(); - Self::dispatcher(threadpool, dispatch_receiver); + // rayon is having a bug where a ECS task could work-steal a slowjob if we use + // the same threadpool, which would cause lagspikes we dont want! + let threadpool = Arc::new( + rayon::ThreadPoolBuilder::new() + .num_threads(global_limit as usize) + .thread_name(move |i| format!("slowjob-{}", i)) + .build() + .unwrap(), + ); let link = Arc::new(Mutex::new(Self { next_id: 0, queue: HashMap::new(), - dispatch_sender, configs: HashMap::new(), last_spawned_configs: Vec::new(), global_spawned_and_running: 0, global_limit: global_limit.max(1), jobs_metrics_cnt, jobs_metrics: HashMap::new(), + threadpool, internal: None, })); @@ -148,24 +155,6 @@ impl InternalSlowJobPool { link } - pub fn dispatcher( - threadpool: Arc, - dispatch_receiver: std::sync::mpsc::Receiver, - ) { - let dispatch_receiver = Mutex::new(dispatch_receiver); - let threadpool2 = Arc::clone(&threadpool); - threadpool.spawn(move || { - threadpool2.in_place_scope(|s| { - s.spawn(|s| { - let dispatch_receiver = dispatch_receiver.lock().unwrap(); - for task in dispatch_receiver.iter() { - s.spawn(|_| (task)()); - } - }); - }); - }); - } - /// returns order of configuration which are queued next fn calc_queued_order( &self, @@ -329,9 +318,7 @@ impl InternalSlowJobPool { .position(|e| e == &queue.name) .map(|i| self.last_spawned_configs.remove(i)); self.last_spawned_configs.push(queue.name.to_owned()); - if let Err(e) = self.dispatch_sender.send(queue.task) { - error!(?e, "dispatcher thread seems to have crashed"); - }; + self.threadpool.spawn(queue.task); }, None => error!( "internal calculation is wrong, we extected a schedulable job to be \ diff --git a/common/state/src/state.rs b/common/state/src/state.rs index 6d659c9f07..d088bc84d3 100644 --- a/common/state/src/state.rs +++ b/common/state/src/state.rs @@ -110,7 +110,7 @@ impl State { let thread_pool = Arc::new( ThreadPoolBuilder::new() - .num_threads(num_cpus::get().max(common::consts::MIN_RECOMMENDED_RAYON_THREADS) + 1 /*workaround for a rayon schedule bug, see MR 3546*/) + .num_threads(num_cpus::get().max(common::consts::MIN_RECOMMENDED_RAYON_THREADS)) .thread_name(move |i| format!("rayon-{}-{}", thread_name_infix, i)) .build() .unwrap(),