From 9c6dd390544b7b303c2d32cc38703fb4139e6a5b Mon Sep 17 00:00:00 2001 From: Joshua Yanovski Date: Fri, 16 Sep 2022 18:04:23 -0700 Subject: [PATCH] Lower rather than raise priorities. --- Cargo.lock | 2 +- Cargo.toml | 2 +- common/src/slowjob.rs | 2 +- common/state/src/state.rs | 59 ++++++++++++++++++++++++++++++++--- server/src/sys/msg/in_game.rs | 8 +++-- 5 files changed, 63 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4748122aec..806e501cbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1977,7 +1977,7 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" [[package]] name = "executors" version = "0.9.0" -source = "git+https://github.com/pythonesque/rust-executors.git?rev=1d08f96cf9c52ed85eda8e0f68b5d43072d39b14#1d08f96cf9c52ed85eda8e0f68b5d43072d39b14" +source = "git+https://github.com/pythonesque/rust-executors.git?rev=db84d362eca5575e982a580f6dd11207a090fc8e#db84d362eca5575e982a580f6dd11207a090fc8e" dependencies = [ "arr_macro", "async-task", diff --git a/Cargo.toml b/Cargo.toml index 4cb9a58d77..614d9fc6a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,7 +124,7 @@ wgpu = { git = "https://github.com/pythonesque/wgpu.git", rev = "c78c3fe3f05e20d # wgpu-core = { path = "../wgpu/wgpu-core" } # wgpu-types = { path = "../wgpu/wgpu-types" } -executors = { git = "https://github.com/pythonesque/rust-executors.git", rev = "1d08f96cf9c52ed85eda8e0f68b5d43072d39b14" } +executors = { git = "https://github.com/pythonesque/rust-executors.git", rev = "db84d362eca5575e982a580f6dd11207a090fc8e" } # executors = { path = "../rust-executors/executors" } # # use the latest fixes in naga (remove when updates trickle down to wgpu-rs) diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs index 7c278d8a13..f0178fd2a4 100644 --- a/common/src/slowjob.rs +++ b/common/src/slowjob.rs @@ -13,7 +13,7 @@ use executors::{ parker::{LargeThreadData, StaticParker}, Executor, }; -pub use executors::parker::large; +pub use executors::{builder::ThreadPoolBuilder, parker::large}; use hashbrown::{hash_map::Entry, HashMap}; use pin_project_lite::pin_project; // use rayon::ThreadPool; diff --git a/common/state/src/state.rs b/common/state/src/state.rs index c0e70b46f2..77ddd85991 100644 --- a/common/state/src/state.rs +++ b/common/state/src/state.rs @@ -173,7 +173,7 @@ impl State { .num_threads(rayon_threads/*.saturating_sub(rayon_offset)*/) // .thread_name(move |i| format!("rayon-{}", i)) .thread_name(move |i| format!("rayon-{}-{}", thread_name_infix, i)) - .spawn_handler(move |thread| { + /* .spawn_handler(move |thread| { let mut b = ThreadBuilder::default(); if let Some(name) = thread.name() { b = b.name(name.to_owned()); @@ -188,10 +188,37 @@ impl State { } b.spawn_careless(|| thread.run())?; Ok(()) - }) + }) */ .start_handler(move |i| { let name = format!("rayon-{}-{}", thread_name_infix, i); common_base::set_thread_name!(&name); + // We use a different i for assigning cores... + let i = if i == 0 { + // We always want our first core to be the one reserved for Rayon, if + // possible. This turns out to be the *last* core assigned to Rayon, + // in cases where it's actually pinned. For example, on a server with 8 + // cores... + // + // * [] will be reserved for the main thread + // * [6,7] will be reserved for tokio (num_cpu / 4) + // * [0,1,2,3,4] will have pinned server slowjobs *and* pinned rayon tasks + // (of low priority/normal priority, respectively). + // * [5] will be reserved just for rayon. + // + // Since indices have (rayon_offset + tokio_count) subtracted from them to + // find the core, this works out to just putting the first thread "last." + rayon_threads - 1 + } else { + // The first few tasks spawned are often join/split operations, which tend + // to not be computationally intensive but which we don't want preempted + // with no way to get them back. So we try to put these on the *unpinned* + // rayon threads--i.e. the rayon_offset reserved for the main thread(s) and + // tokio_count threads reserved for tokio. These are at the *beginning*, + // of the rayon indices, so we use those in order. Additionally, we + // continue to use them in order after that, since the pinned rayon ids + // follow immediately afterwards. + i - 1 + }; if let Some(&core_id) = i.checked_sub(rayon_offset + tokio_count).and_then(|i| core_ids_.get(i)) { core_affinity::set_for_current(core_id); } @@ -280,11 +307,11 @@ impl State { map_size_lg: MapSizeLg, default_chunk: Arc, ) -> Self { - /* let thread_name_infix = match game_mode { + let thread_name_infix = match ecs_role { GameMode::Server => "s", GameMode::Client => "c", GameMode::Singleplayer => "sp", - }; */ + }; let num_cpu = /*num_cpus::get()*/pools.slowjob_threads/* / 2 + pools.0 / 4*/; let game_mode = pools.game_mode; @@ -313,7 +340,29 @@ impl State { let cores = core_affinity::get_core_ids().unwrap_or(vec![]).into_iter().skip(start).step_by(step).take(total).collect::>(); let floating = total.saturating_sub(cores.len()); // TODO: NUMA utils - let slow_pool = slowjob::ThreadPool::with_affinity(&cores, floating, slowjob::large()); + let slow_pool = slowjob::ThreadPoolBuilder::new() + .thread_name(move |i| format!("slowjob-{}-{}", thread_name_infix, i)) + .spawn_handler(move |thread| { + let mut b = ThreadBuilder::default(); + if let Some(name) = thread.name() { + b = b.name(name.to_owned()); + } + if let Some(stack_size) = thread.stack_size() { + b = b.stack_size(stack_size); + } + // pinned slowjob threads run with low priority + let index = thread.index(); + if index < floating { + b = b.priority(ThreadPriority::Min); + } + b.spawn_careless(|| thread.run())?; + Ok(()) + }) + .start_handler(move |i| { + let name = format!("slowjob-{}-{}", thread_name_infix, i); + common_base::set_thread_name!(&name); + }) + .build_with_affinity(&cores, floating, slowjob::large()); /* let slow_pool = if cores.is_empty() { // We need *some* workers, so just start on all cores. slowjob::large_pool(1)/*slow_limit.min(64)/* as usize */)*/ diff --git a/server/src/sys/msg/in_game.rs b/server/src/sys/msg/in_game.rs index fc0d24e0f0..128eceb992 100644 --- a/server/src/sys/msg/in_game.rs +++ b/server/src/sys/msg/in_game.rs @@ -14,6 +14,7 @@ use common::{ terrain::TerrainGrid, vol::ReadVol, }; +use common_base::prof_span; use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{ClientGeneral, PresenceKind, ServerGeneral}; use common_state::{BlockChange, BuildAreas}; @@ -396,8 +397,11 @@ impl<'a> System<'a> for Sys { // NOTE: Required because Specs has very poor work splitting for sparse joins. .par_bridge() .map_init( - || server_event_bus.emitter(), - |server_emitter, ( + || { + prof_span!(guard, "in_game rayon job"); + (server_event_bus.emitter(), guard) + }, + |(server_emitter, _guard), ( entity, client, mut maybe_presence,