all tests shows there is currently no way to keep rayon from work-stealing over scopes and doing really really weird stuff between the ECS threadpool and the slowjob Threadpool. so even if i dont like to have multiple threads i think there is no other workaround than just creating a second threapool for background tasks

This commit is contained in:
Marcel Märtens 2022-08-19 09:47:54 +02:00
parent 8ca458188d
commit 3eac68000b
2 changed files with 14 additions and 27 deletions

View File

@ -60,13 +60,13 @@ type JobType = Box<dyn FnOnce() + Send + Sync + 'static>;
struct InternalSlowJobPool {
next_id: u64,
queue: HashMap<String, VecDeque<Queue>>,
dispatch_sender: std::sync::mpsc::Sender<JobType>,
configs: HashMap<String, Config>,
last_spawned_configs: Vec<String>,
global_spawned_and_running: u64,
global_limit: u64,
jobs_metrics_cnt: usize,
jobs_metrics: HashMap<String, Vec<JobMetrics>>,
threadpool: Arc<ThreadPool>,
internal: Option<Arc<Mutex<Self>>>,
}
@ -124,20 +124,27 @@ impl InternalSlowJobPool {
pub fn new(
global_limit: u64,
jobs_metrics_cnt: usize,
threadpool: Arc<ThreadPool>,
_threadpool: Arc<ThreadPool>,
) -> Arc<Mutex<Self>> {
let (dispatch_sender, dispatch_receiver) = std::sync::mpsc::channel();
Self::dispatcher(threadpool, dispatch_receiver);
// rayon is having a bug where a ECS task could work-steal a slowjob if we use
// the same threadpool, which would cause lagspikes we dont want!
let threadpool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(global_limit as usize)
.thread_name(move |i| format!("slowjob-{}", i))
.build()
.unwrap(),
);
let link = Arc::new(Mutex::new(Self {
next_id: 0,
queue: HashMap::new(),
dispatch_sender,
configs: HashMap::new(),
last_spawned_configs: Vec::new(),
global_spawned_and_running: 0,
global_limit: global_limit.max(1),
jobs_metrics_cnt,
jobs_metrics: HashMap::new(),
threadpool,
internal: None,
}));
@ -148,24 +155,6 @@ impl InternalSlowJobPool {
link
}
pub fn dispatcher(
threadpool: Arc<ThreadPool>,
dispatch_receiver: std::sync::mpsc::Receiver<JobType>,
) {
let dispatch_receiver = Mutex::new(dispatch_receiver);
let threadpool2 = Arc::clone(&threadpool);
threadpool.spawn(move || {
threadpool2.in_place_scope(|s| {
s.spawn(|s| {
let dispatch_receiver = dispatch_receiver.lock().unwrap();
for task in dispatch_receiver.iter() {
s.spawn(|_| (task)());
}
});
});
});
}
/// returns order of configuration which are queued next
fn calc_queued_order(
&self,
@ -329,9 +318,7 @@ impl InternalSlowJobPool {
.position(|e| e == &queue.name)
.map(|i| self.last_spawned_configs.remove(i));
self.last_spawned_configs.push(queue.name.to_owned());
if let Err(e) = self.dispatch_sender.send(queue.task) {
error!(?e, "dispatcher thread seems to have crashed");
};
self.threadpool.spawn(queue.task);
},
None => error!(
"internal calculation is wrong, we extected a schedulable job to be \

View File

@ -110,7 +110,7 @@ impl State {
let thread_pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(num_cpus::get().max(common::consts::MIN_RECOMMENDED_RAYON_THREADS) + 1 /*workaround for a rayon schedule bug, see MR 3546*/)
.num_threads(num_cpus::get().max(common::consts::MIN_RECOMMENDED_RAYON_THREADS))
.thread_name(move |i| format!("rayon-{}-{}", thread_name_infix, i))
.build()
.unwrap(),