From aaa9255c8e55d34d24090e8a24b11475e7998a5b Mon Sep 17 00:00:00 2001 From: Joshua Yanovski Date: Fri, 16 Sep 2022 18:04:23 -0700 Subject: [PATCH] Lower rather than raise priorities. --- Cargo.lock | 2 +- Cargo.toml | 2 +- common/src/slowjob.rs | 2 +- common/state/src/state.rs | 59 ++++++++++++++++++++++++++++++++--- server/src/sys/msg/in_game.rs | 8 +++-- 5 files changed, 63 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e336c1dd17..e4311d3abf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1837,7 +1837,7 @@ checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" [[package]] name = "executors" version = "0.9.0" -source = "git+https://github.com/pythonesque/rust-executors.git?rev=1d08f96cf9c52ed85eda8e0f68b5d43072d39b14#1d08f96cf9c52ed85eda8e0f68b5d43072d39b14" +source = "git+https://github.com/pythonesque/rust-executors.git?rev=db84d362eca5575e982a580f6dd11207a090fc8e#db84d362eca5575e982a580f6dd11207a090fc8e" dependencies = [ "arr_macro", "async-task", diff --git a/Cargo.toml b/Cargo.toml index fc43b4313b..f73503b2c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -128,7 +128,7 @@ wgpu = { git = "https://github.com/pythonesque/wgpu.git", rev = "c78c3fe3f05e20d # wgpu-core = { path = "../wgpu/wgpu-core" } # wgpu-types = { path = "../wgpu/wgpu-types" } -executors = { git = "https://github.com/pythonesque/rust-executors.git", rev = "1d08f96cf9c52ed85eda8e0f68b5d43072d39b14" } +executors = { git = "https://github.com/pythonesque/rust-executors.git", rev = "db84d362eca5575e982a580f6dd11207a090fc8e" } # executors = { path = "../rust-executors/executors" } # # use the latest fixes in naga (remove when updates trickle down to wgpu-rs) diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs index 7c278d8a13..f0178fd2a4 100644 --- a/common/src/slowjob.rs +++ b/common/src/slowjob.rs @@ -13,7 +13,7 @@ use executors::{ parker::{LargeThreadData, StaticParker}, Executor, }; -pub use executors::parker::large; +pub use executors::{builder::ThreadPoolBuilder, parker::large}; use hashbrown::{hash_map::Entry, HashMap}; use pin_project_lite::pin_project; // use rayon::ThreadPool; diff --git a/common/state/src/state.rs b/common/state/src/state.rs index cb5b05d765..cc34ff2ade 100644 --- a/common/state/src/state.rs +++ b/common/state/src/state.rs @@ -145,7 +145,7 @@ impl State { .num_threads(rayon_threads/*.saturating_sub(rayon_offset)*/) // .thread_name(move |i| format!("rayon-{}", i)) .thread_name(move |i| format!("rayon-{}-{}", thread_name_infix, i)) - .spawn_handler(move |thread| { + /* .spawn_handler(move |thread| { let mut b = ThreadBuilder::default(); if let Some(name) = thread.name() { b = b.name(name.to_owned()); @@ -160,10 +160,37 @@ impl State { } b.spawn_careless(|| thread.run())?; Ok(()) - }) + }) */ .start_handler(move |i| { let name = format!("rayon-{}-{}", thread_name_infix, i); common_base::set_thread_name!(&name); + // We use a different i for assigning cores... + let i = if i == 0 { + // We always want our first core to be the one reserved for Rayon, if + // possible. This turns out to be the *last* core assigned to Rayon, + // in cases where it's actually pinned. For example, on a server with 8 + // cores... + // + // * [] will be reserved for the main thread + // * [6,7] will be reserved for tokio (num_cpu / 4) + // * [0,1,2,3,4] will have pinned server slowjobs *and* pinned rayon tasks + // (of low priority/normal priority, respectively). + // * [5] will be reserved just for rayon. + // + // Since indices have (rayon_offset + tokio_count) subtracted from them to + // find the core, this works out to just putting the first thread "last." + rayon_threads - 1 + } else { + // The first few tasks spawned are often join/split operations, which tend + // to not be computationally intensive but which we don't want preempted + // with no way to get them back. So we try to put these on the *unpinned* + // rayon threads--i.e. the rayon_offset reserved for the main thread(s) and + // tokio_count threads reserved for tokio. These are at the *beginning*, + // of the rayon indices, so we use those in order. Additionally, we + // continue to use them in order after that, since the pinned rayon ids + // follow immediately afterwards. + i - 1 + }; if let Some(&core_id) = i.checked_sub(rayon_offset + tokio_count).and_then(|i| core_ids_.get(i)) { core_affinity::set_for_current(core_id); } @@ -247,11 +274,11 @@ impl State { } pub fn new(ecs_role: GameMode, pools: Pools, map_size_lg: MapSizeLg, default_chunk: Arc) -> Self { - /* let thread_name_infix = match game_mode { + let thread_name_infix = match ecs_role { GameMode::Server => "s", GameMode::Client => "c", GameMode::Singleplayer => "sp", - }; */ + }; let num_cpu = /*num_cpus::get()*/pools.slowjob_threads/* / 2 + pools.0 / 4*/; let game_mode = pools.game_mode; @@ -280,7 +307,29 @@ impl State { let cores = core_affinity::get_core_ids().unwrap_or(vec![]).into_iter().skip(start).step_by(step).take(total).collect::>(); let floating = total.saturating_sub(cores.len()); // TODO: NUMA utils - let slow_pool = slowjob::ThreadPool::with_affinity(&cores, floating, slowjob::large()); + let slow_pool = slowjob::ThreadPoolBuilder::new() + .thread_name(move |i| format!("slowjob-{}-{}", thread_name_infix, i)) + .spawn_handler(move |thread| { + let mut b = ThreadBuilder::default(); + if let Some(name) = thread.name() { + b = b.name(name.to_owned()); + } + if let Some(stack_size) = thread.stack_size() { + b = b.stack_size(stack_size); + } + // pinned slowjob threads run with low priority + let index = thread.index(); + if index < floating { + b = b.priority(ThreadPriority::Min); + } + b.spawn_careless(|| thread.run())?; + Ok(()) + }) + .start_handler(move |i| { + let name = format!("slowjob-{}-{}", thread_name_infix, i); + common_base::set_thread_name!(&name); + }) + .build_with_affinity(&cores, floating, slowjob::large()); /* let slow_pool = if cores.is_empty() { // We need *some* workers, so just start on all cores. slowjob::large_pool(1)/*slow_limit.min(64)/* as usize */)*/ diff --git a/server/src/sys/msg/in_game.rs b/server/src/sys/msg/in_game.rs index 92ba0cdc6d..fde0931e4d 100644 --- a/server/src/sys/msg/in_game.rs +++ b/server/src/sys/msg/in_game.rs @@ -14,6 +14,7 @@ use common::{ terrain::TerrainGrid, vol::ReadVol, }; +use common_base::prof_span; use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{ClientGeneral, ServerGeneral}; use common_state::{BlockChange, BuildAreas}; @@ -392,8 +393,11 @@ impl<'a> System<'a> for Sys { // NOTE: Required because Specs has very poor work splitting for sparse joins. .par_bridge() .map_init( - || server_event_bus.emitter(), - |server_emitter, ( + || { + prof_span!(guard, "in_game rayon job"); + (server_event_bus.emitter(), guard) + }, + |(server_emitter, _guard), ( entity, client, mut maybe_presence,