Unfortuatly rayon has a bug that if you Threadpool.spawn from inside a parallel iterator from inside a Threadpool.install, that the parallel iterator will BLOCK till the Threadpool.spawn finished, which causes many many lag spikes.
I assume this might be the case for the pictures in the gantt chart where a system took unusual long or had a long pause that was unexplained.
I also raise the number of threads by 1, as this rayon thread will prob be useless in all cases and have no real work to do.

EDIT: it turns out the tests are sporadicly failing and this soluction doesnt work
This commit is contained in:
Marcel Märtens 2022-08-18 19:57:30 +02:00
parent c968e5c748
commit 25d9e5b27e
3 changed files with 76 additions and 108 deletions

View File

@ -105,7 +105,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Modular weapons now have a selling price
- Closing a subwindow now only regrabs the cursor if no other subwindow requires it.
- Fixed npc not handling interactions while fighting (especially merchants in trade)
- Fixed bug where you would still be burning after dying in lava.
- Fixed bug where you would still be burning after dying in lava.
- Workaround for rayon bug that caused lag spikes in slowjobs
## [0.12.0] - 2022-02-19

View File

@ -55,16 +55,18 @@ pub struct SlowJob {
id: u64,
}
type JobType = Box<dyn FnOnce() + Send + Sync + 'static>;
struct InternalSlowJobPool {
next_id: u64,
queue: HashMap<String, VecDeque<Queue>>,
dispatch_sender: std::sync::mpsc::Sender<JobType>,
configs: HashMap<String, Config>,
last_spawned_configs: Vec<String>,
global_spawned_and_running: u64,
global_limit: u64,
jobs_metrics_cnt: usize,
jobs_metrics: HashMap<String, Vec<JobMetrics>>,
threadpool: Arc<ThreadPool>,
internal: Option<Arc<Mutex<Self>>>,
}
@ -77,7 +79,7 @@ struct Config {
struct Queue {
id: u64,
name: String,
task: Box<dyn FnOnce() + Send + Sync + 'static>,
task: JobType,
}
pub struct JobMetrics {
@ -124,16 +126,18 @@ impl InternalSlowJobPool {
jobs_metrics_cnt: usize,
threadpool: Arc<ThreadPool>,
) -> Arc<Mutex<Self>> {
let (dispatch_sender, dispatch_receiver) = std::sync::mpsc::channel();
Self::dispatcher(threadpool, dispatch_receiver);
let link = Arc::new(Mutex::new(Self {
next_id: 0,
queue: HashMap::new(),
dispatch_sender,
configs: HashMap::new(),
last_spawned_configs: Vec::new(),
global_spawned_and_running: 0,
global_limit: global_limit.max(1),
jobs_metrics_cnt,
jobs_metrics: HashMap::new(),
threadpool,
internal: None,
}));
@ -144,6 +148,20 @@ impl InternalSlowJobPool {
link
}
pub fn dispatcher(
threadpool: Arc<ThreadPool>,
dispatch_receiver: std::sync::mpsc::Receiver<JobType>,
) {
let dispatch_receiver = Mutex::new(dispatch_receiver);
let threadpool2 = Arc::clone(&threadpool);
threadpool.spawn(move || {
let dispatch_receiver = dispatch_receiver.lock().unwrap();
for task in dispatch_receiver.iter() {
threadpool2.spawn(task)
}
});
}
/// returns order of configuration which are queued next
fn calc_queued_order(
&self,
@ -307,7 +325,9 @@ impl InternalSlowJobPool {
.position(|e| e == &queue.name)
.map(|i| self.last_spawned_configs.remove(i));
self.last_spawned_configs.push(queue.name.to_owned());
self.threadpool.spawn(queue.task);
if let Err(e) = self.dispatch_sender.send(queue.task) {
error!(?e, "dispatcher thread seems to have crashed");
};
},
None => error!(
"internal calculation is wrong, we extected a schedulable job to be \
@ -399,9 +419,14 @@ impl SlowJobPool {
#[cfg(test)]
mod tests {
use std::sync::Barrier;
use std::sync::atomic::AtomicU64;
use super::*;
use std::{
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Barrier,
},
time::Duration,
};
fn mock_pool(
pool_threads: usize,
@ -654,10 +679,9 @@ mod tests {
#[test]
fn try_run_exhausted() {
use std::{thread::sleep, time::Duration};
let pool = mock_pool(8, 8, 0, 4, 2, 0);
let func = || loop {
sleep(Duration::from_secs(1))
std::thread::sleep(Duration::from_secs(1))
};
pool.try_run("FOO", func).unwrap();
pool.try_run("BAR", func).unwrap();
@ -675,7 +699,7 @@ mod tests {
#[test]
fn actually_runs_1() {
let pool = mock_pool(4, 4, 0, 0, 0, 1);
let barrier = Arc::new(std::sync::Barrier::new(2));
let barrier = Arc::new(Barrier::new(2));
let barrier_clone = Arc::clone(&barrier);
pool.try_run("BAZ", move || {
barrier_clone.wait();
@ -687,7 +711,7 @@ mod tests {
#[test]
fn actually_runs_2() {
let pool = mock_pool(4, 4, 0, 0, 0, 1);
let barrier = Arc::new(std::sync::Barrier::new(2));
let barrier = Arc::new(Barrier::new(2));
let barrier_clone = Arc::clone(&barrier);
pool.spawn("BAZ", move || {
barrier_clone.wait();
@ -697,10 +721,6 @@ mod tests {
#[test]
fn actually_waits() {
use std::sync::{
atomic::{AtomicBool, Ordering},
Barrier,
};
let pool = mock_pool(4, 4, 0, 4, 0, 1);
let ops_i_ran = Arc::new(AtomicBool::new(false));
let ops_i_ran_clone = Arc::clone(&ops_i_ran);
@ -717,7 +737,7 @@ mod tests {
barrier2_clone.wait();
});
// in this case we have to sleep
std::thread::sleep(std::time::Duration::from_secs(1));
std::thread::sleep(Duration::from_secs(1));
assert!(!ops_i_ran.load(Ordering::SeqCst));
// now finish the first job
barrier.wait();
@ -727,7 +747,6 @@ mod tests {
#[test]
fn verify_metrics() {
use std::sync::Barrier;
let pool = mock_pool(4, 4, 2, 1, 0, 4);
let barrier = Arc::new(Barrier::new(5));
for name in &["FOO", "BAZ", "FOO", "FOO"] {
@ -739,7 +758,7 @@ mod tests {
// now finish all jobs
barrier.wait();
// in this case we have to sleep to give it some time to store all the metrics
std::thread::sleep(std::time::Duration::from_secs(2));
std::thread::sleep(Duration::from_secs(2));
let metrics = pool.take_metrics();
let foo = metrics.get("FOO").expect("FOO doesn't exist in metrics");
//its limited to 2, even though we had 3 jobs
@ -749,107 +768,55 @@ mod tests {
assert_eq!(baz.len(), 1);
}
fn busy_work(ms: u64) {
let x = Instant::now();
while x.elapsed() < std::time::Duration::from_millis(ms) {
}
}
use std::time::Duration;
use std::sync::atomic::Ordering;
fn work_barrier2(counter: Arc<AtomicU64>, ms: u64) -> () {
println!(".{}..", ms);
busy_work(ms);
println!(".{}..Done", ms);
counter.fetch_add(1, Ordering::SeqCst);
}
fn work_barrier(counter: &Arc<AtomicU64>, ms: u64) -> impl std::ops::FnOnce() -> () {
let counter = Arc::clone(&counter);
let counter = Arc::clone(counter);
println!("Create work_barrier");
move || work_barrier2(counter, ms)
move || {
println!(".{}..", ms);
std::thread::sleep(Duration::from_millis(ms));
println!(".{}..Done", ms);
counter.fetch_add(1, Ordering::SeqCst);
}
}
#[test]
fn ffffff() {
let threadpool = Arc::new(rayon::ThreadPoolBuilder::new()
.num_threads(2)
.build()
.unwrap());
let pool = SlowJobPool::new(2, 100, threadpool.clone());
pool.configure("FOO", |x| x );
pool.configure("BAR", |x| x / 2);
pool.configure("BAZ", |_| 1);
let pool2 = SlowJobPool::new(2, 100, threadpool.clone());
pool2.configure("FOO", |x| x );
pool2.configure("BAR", |x| x / 2);
pool2.configure("BAZ", |_| 1);
fn verify_that_spawn_doesnt_block_par_iter() {
let threadpool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(20)
.build()
.unwrap(),
);
let pool = SlowJobPool::new(2, 100, Arc::<rayon::ThreadPool>::clone(&threadpool));
pool.configure("BAZ", |_| 2);
let counter = Arc::new(AtomicU64::new(0));
let start = Instant::now();
//pool.spawn("BAZ", work_barrier(&counter, 1000));
//for _ in 0..600 {
// pool.spawn("FOO", work_barrier(&counter, 10));
//}
threadpool.install(|| {
use rayon::prelude::*;
let mut iter = Vec::new();
for i in 0..1000 {
iter.push((i, work_barrier(&counter, 10)));
}
let mut iter2 = Vec::new();
for i in 0..1000 {
iter2.push((i, work_barrier(&counter, 10)));
}
iter.into_par_iter().map(|(i, task)| {
task();
if i == 900 {
println!("Spawning task");
pool.spawn("BAZ", work_barrier(&counter, 1000));
pool2.spawn("BAZ", work_barrier(&counter, 10000));
println!("Spawned tasks");
}
if i == 999 {
println!("The first ITER end");
}
}).collect::<Vec<_>>();
println!("The first ITER finished");
//pool2.spawn("BAZ", work_barrier(&counter, 1000));
//pool2.spawn("BAZ", work_barrier(&counter, 1000));
//pool2.spawn("BAZ", work_barrier(&counter, 1000));
iter2.into_par_iter().map(|(i, task)| {
if i == 0 {
println!("The second ITER started");
}
task();
}).collect::<Vec<_>>();
(0..100)
.into_par_iter()
.map(|i| {
std::thread::sleep(Duration::from_millis(10));
if i == 50 {
pool.spawn("BAZ", work_barrier(&counter, 2000));
}
if i == 99 {
println!("The first ITER end, at {}ms", start.elapsed().as_millis());
}
})
.collect::<Vec<_>>();
let elapsed = start.elapsed().as_millis();
println!("The first ITER finished, at {}ms", elapsed);
assert!(
elapsed < 1900,
"It seems like the par_iter waited on the 2s sleep task to finish"
);
});
//pool.spawn("FOO", work_barrier(&barrier, 1));
println!("wait for test finish");
const TIMEOUT: Duration = Duration::from_secs(2);
let mut last_n_time = (0, start);
loop {
let now = Instant::now();
let n = counter.load(Ordering::SeqCst);
if n != last_n_time.0 {
last_n_time = (n, now);
} else if now.duration_since(last_n_time.1) > TIMEOUT {
break;
}
while counter.load(Ordering::SeqCst) == 0 {
println!("waiting for BAZ task to finish");
std::thread::sleep(Duration::from_secs(1));
}
let d = last_n_time.1.duration_since(start);
println!("==============");
println!("Time Passed: {}ms", d.as_millis());
println!("Jobs finished: {}", last_n_time.0);
}
}

View File

@ -110,7 +110,7 @@ impl State {
let thread_pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(num_cpus::get().max(common::consts::MIN_RECOMMENDED_RAYON_THREADS))
.num_threads(num_cpus::get().max(common::consts::MIN_RECOMMENDED_RAYON_THREADS) + 1 /*workaround for a rayon schedule bug, see MR 3546*/)
.thread_name(move |i| format!("rayon-{}-{}", thread_name_infix, i))
.build()
.unwrap(),