Lower rather than raise priorities.

This commit is contained in:
Joshua Yanovski 2022-09-16 18:04:23 -07:00
parent 7d3b8ea81e
commit aaa9255c8e
5 changed files with 63 additions and 10 deletions

2
Cargo.lock generated
View File

@ -1837,7 +1837,7 @@ checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71"
[[package]] [[package]]
name = "executors" name = "executors"
version = "0.9.0" version = "0.9.0"
source = "git+https://github.com/pythonesque/rust-executors.git?rev=1d08f96cf9c52ed85eda8e0f68b5d43072d39b14#1d08f96cf9c52ed85eda8e0f68b5d43072d39b14" source = "git+https://github.com/pythonesque/rust-executors.git?rev=db84d362eca5575e982a580f6dd11207a090fc8e#db84d362eca5575e982a580f6dd11207a090fc8e"
dependencies = [ dependencies = [
"arr_macro", "arr_macro",
"async-task", "async-task",

View File

@ -128,7 +128,7 @@ wgpu = { git = "https://github.com/pythonesque/wgpu.git", rev = "c78c3fe3f05e20d
# wgpu-core = { path = "../wgpu/wgpu-core" } # wgpu-core = { path = "../wgpu/wgpu-core" }
# wgpu-types = { path = "../wgpu/wgpu-types" } # wgpu-types = { path = "../wgpu/wgpu-types" }
executors = { git = "https://github.com/pythonesque/rust-executors.git", rev = "1d08f96cf9c52ed85eda8e0f68b5d43072d39b14" } executors = { git = "https://github.com/pythonesque/rust-executors.git", rev = "db84d362eca5575e982a580f6dd11207a090fc8e" }
# executors = { path = "../rust-executors/executors" } # executors = { path = "../rust-executors/executors" }
# # use the latest fixes in naga (remove when updates trickle down to wgpu-rs) # # use the latest fixes in naga (remove when updates trickle down to wgpu-rs)

View File

@ -13,7 +13,7 @@ use executors::{
parker::{LargeThreadData, StaticParker}, parker::{LargeThreadData, StaticParker},
Executor, Executor,
}; };
pub use executors::parker::large; pub use executors::{builder::ThreadPoolBuilder, parker::large};
use hashbrown::{hash_map::Entry, HashMap}; use hashbrown::{hash_map::Entry, HashMap};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
// use rayon::ThreadPool; // use rayon::ThreadPool;

View File

@ -145,7 +145,7 @@ impl State {
.num_threads(rayon_threads/*.saturating_sub(rayon_offset)*/) .num_threads(rayon_threads/*.saturating_sub(rayon_offset)*/)
// .thread_name(move |i| format!("rayon-{}", i)) // .thread_name(move |i| format!("rayon-{}", i))
.thread_name(move |i| format!("rayon-{}-{}", thread_name_infix, i)) .thread_name(move |i| format!("rayon-{}-{}", thread_name_infix, i))
.spawn_handler(move |thread| { /* .spawn_handler(move |thread| {
let mut b = ThreadBuilder::default(); let mut b = ThreadBuilder::default();
if let Some(name) = thread.name() { if let Some(name) = thread.name() {
b = b.name(name.to_owned()); b = b.name(name.to_owned());
@ -160,10 +160,37 @@ impl State {
} }
b.spawn_careless(|| thread.run())?; b.spawn_careless(|| thread.run())?;
Ok(()) Ok(())
}) }) */
.start_handler(move |i| { .start_handler(move |i| {
let name = format!("rayon-{}-{}", thread_name_infix, i); let name = format!("rayon-{}-{}", thread_name_infix, i);
common_base::set_thread_name!(&name); common_base::set_thread_name!(&name);
// We use a different i for assigning cores...
let i = if i == 0 {
// We always want our first core to be the one reserved for Rayon, if
// possible. This turns out to be the *last* core assigned to Rayon,
// in cases where it's actually pinned. For example, on a server with 8
// cores...
//
// * [] will be reserved for the main thread
// * [6,7] will be reserved for tokio (num_cpu / 4)
// * [0,1,2,3,4] will have pinned server slowjobs *and* pinned rayon tasks
// (of low priority/normal priority, respectively).
// * [5] will be reserved just for rayon.
//
// Since indices have (rayon_offset + tokio_count) subtracted from them to
// find the core, this works out to just putting the first thread "last."
rayon_threads - 1
} else {
// The first few tasks spawned are often join/split operations, which tend
// to not be computationally intensive but which we don't want preempted
// with no way to get them back. So we try to put these on the *unpinned*
// rayon threads--i.e. the rayon_offset reserved for the main thread(s) and
// tokio_count threads reserved for tokio. These are at the *beginning*,
// of the rayon indices, so we use those in order. Additionally, we
// continue to use them in order after that, since the pinned rayon ids
// follow immediately afterwards.
i - 1
};
if let Some(&core_id) = i.checked_sub(rayon_offset + tokio_count).and_then(|i| core_ids_.get(i)) { if let Some(&core_id) = i.checked_sub(rayon_offset + tokio_count).and_then(|i| core_ids_.get(i)) {
core_affinity::set_for_current(core_id); core_affinity::set_for_current(core_id);
} }
@ -247,11 +274,11 @@ impl State {
} }
pub fn new(ecs_role: GameMode, pools: Pools, map_size_lg: MapSizeLg, default_chunk: Arc<TerrainChunk>) -> Self { pub fn new(ecs_role: GameMode, pools: Pools, map_size_lg: MapSizeLg, default_chunk: Arc<TerrainChunk>) -> Self {
/* let thread_name_infix = match game_mode { let thread_name_infix = match ecs_role {
GameMode::Server => "s", GameMode::Server => "s",
GameMode::Client => "c", GameMode::Client => "c",
GameMode::Singleplayer => "sp", GameMode::Singleplayer => "sp",
}; */ };
let num_cpu = /*num_cpus::get()*/pools.slowjob_threads/* / 2 + pools.0 / 4*/; let num_cpu = /*num_cpus::get()*/pools.slowjob_threads/* / 2 + pools.0 / 4*/;
let game_mode = pools.game_mode; let game_mode = pools.game_mode;
@ -280,7 +307,29 @@ impl State {
let cores = core_affinity::get_core_ids().unwrap_or(vec![]).into_iter().skip(start).step_by(step).take(total).collect::<Vec<_>>(); let cores = core_affinity::get_core_ids().unwrap_or(vec![]).into_iter().skip(start).step_by(step).take(total).collect::<Vec<_>>();
let floating = total.saturating_sub(cores.len()); let floating = total.saturating_sub(cores.len());
// TODO: NUMA utils // TODO: NUMA utils
let slow_pool = slowjob::ThreadPool::with_affinity(&cores, floating, slowjob::large()); let slow_pool = slowjob::ThreadPoolBuilder::new()
.thread_name(move |i| format!("slowjob-{}-{}", thread_name_infix, i))
.spawn_handler(move |thread| {
let mut b = ThreadBuilder::default();
if let Some(name) = thread.name() {
b = b.name(name.to_owned());
}
if let Some(stack_size) = thread.stack_size() {
b = b.stack_size(stack_size);
}
// pinned slowjob threads run with low priority
let index = thread.index();
if index < floating {
b = b.priority(ThreadPriority::Min);
}
b.spawn_careless(|| thread.run())?;
Ok(())
})
.start_handler(move |i| {
let name = format!("slowjob-{}-{}", thread_name_infix, i);
common_base::set_thread_name!(&name);
})
.build_with_affinity(&cores, floating, slowjob::large());
/* let slow_pool = if cores.is_empty() { /* let slow_pool = if cores.is_empty() {
// We need *some* workers, so just start on all cores. // We need *some* workers, so just start on all cores.
slowjob::large_pool(1)/*slow_limit.min(64)/* as usize */)*/ slowjob::large_pool(1)/*slow_limit.min(64)/* as usize */)*/

View File

@ -14,6 +14,7 @@ use common::{
terrain::TerrainGrid, terrain::TerrainGrid,
vol::ReadVol, vol::ReadVol,
}; };
use common_base::prof_span;
use common_ecs::{Job, Origin, Phase, System}; use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{ClientGeneral, ServerGeneral}; use common_net::msg::{ClientGeneral, ServerGeneral};
use common_state::{BlockChange, BuildAreas}; use common_state::{BlockChange, BuildAreas};
@ -392,8 +393,11 @@ impl<'a> System<'a> for Sys {
// NOTE: Required because Specs has very poor work splitting for sparse joins. // NOTE: Required because Specs has very poor work splitting for sparse joins.
.par_bridge() .par_bridge()
.map_init( .map_init(
|| server_event_bus.emitter(), || {
|server_emitter, ( prof_span!(guard, "in_game rayon job");
(server_event_bus.emitter(), guard)
},
|(server_emitter, _guard), (
entity, entity,
client, client,
mut maybe_presence, mut maybe_presence,