Lower rather than raise priorities.

This commit is contained in:
Joshua Yanovski 2022-09-16 18:04:23 -07:00
parent 256a58ccd3
commit 9c6dd39054
5 changed files with 63 additions and 10 deletions

2
Cargo.lock generated
View File

@ -1977,7 +1977,7 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "executors"
version = "0.9.0"
source = "git+https://github.com/pythonesque/rust-executors.git?rev=1d08f96cf9c52ed85eda8e0f68b5d43072d39b14#1d08f96cf9c52ed85eda8e0f68b5d43072d39b14"
source = "git+https://github.com/pythonesque/rust-executors.git?rev=db84d362eca5575e982a580f6dd11207a090fc8e#db84d362eca5575e982a580f6dd11207a090fc8e"
dependencies = [
"arr_macro",
"async-task",

View File

@ -124,7 +124,7 @@ wgpu = { git = "https://github.com/pythonesque/wgpu.git", rev = "c78c3fe3f05e20d
# wgpu-core = { path = "../wgpu/wgpu-core" }
# wgpu-types = { path = "../wgpu/wgpu-types" }
executors = { git = "https://github.com/pythonesque/rust-executors.git", rev = "1d08f96cf9c52ed85eda8e0f68b5d43072d39b14" }
executors = { git = "https://github.com/pythonesque/rust-executors.git", rev = "db84d362eca5575e982a580f6dd11207a090fc8e" }
# executors = { path = "../rust-executors/executors" }
# # use the latest fixes in naga (remove when updates trickle down to wgpu-rs)

View File

@ -13,7 +13,7 @@ use executors::{
parker::{LargeThreadData, StaticParker},
Executor,
};
pub use executors::parker::large;
pub use executors::{builder::ThreadPoolBuilder, parker::large};
use hashbrown::{hash_map::Entry, HashMap};
use pin_project_lite::pin_project;
// use rayon::ThreadPool;

View File

@ -173,7 +173,7 @@ impl State {
.num_threads(rayon_threads/*.saturating_sub(rayon_offset)*/)
// .thread_name(move |i| format!("rayon-{}", i))
.thread_name(move |i| format!("rayon-{}-{}", thread_name_infix, i))
.spawn_handler(move |thread| {
/* .spawn_handler(move |thread| {
let mut b = ThreadBuilder::default();
if let Some(name) = thread.name() {
b = b.name(name.to_owned());
@ -188,10 +188,37 @@ impl State {
}
b.spawn_careless(|| thread.run())?;
Ok(())
})
}) */
.start_handler(move |i| {
let name = format!("rayon-{}-{}", thread_name_infix, i);
common_base::set_thread_name!(&name);
// We use a different i for assigning cores...
let i = if i == 0 {
// We always want our first core to be the one reserved for Rayon, if
// possible. This turns out to be the *last* core assigned to Rayon,
// in cases where it's actually pinned. For example, on a server with 8
// cores...
//
// * [] will be reserved for the main thread
// * [6,7] will be reserved for tokio (num_cpu / 4)
// * [0,1,2,3,4] will have pinned server slowjobs *and* pinned rayon tasks
// (of low priority/normal priority, respectively).
// * [5] will be reserved just for rayon.
//
// Since indices have (rayon_offset + tokio_count) subtracted from them to
// find the core, this works out to just putting the first thread "last."
rayon_threads - 1
} else {
// The first few tasks spawned are often join/split operations, which tend
// to not be computationally intensive but which we don't want preempted
// with no way to get them back. So we try to put these on the *unpinned*
// rayon threads--i.e. the rayon_offset reserved for the main thread(s) and
// tokio_count threads reserved for tokio. These are at the *beginning*,
// of the rayon indices, so we use those in order. Additionally, we
// continue to use them in order after that, since the pinned rayon ids
// follow immediately afterwards.
i - 1
};
if let Some(&core_id) = i.checked_sub(rayon_offset + tokio_count).and_then(|i| core_ids_.get(i)) {
core_affinity::set_for_current(core_id);
}
@ -280,11 +307,11 @@ impl State {
map_size_lg: MapSizeLg,
default_chunk: Arc<TerrainChunk>,
) -> Self {
/* let thread_name_infix = match game_mode {
let thread_name_infix = match ecs_role {
GameMode::Server => "s",
GameMode::Client => "c",
GameMode::Singleplayer => "sp",
}; */
};
let num_cpu = /*num_cpus::get()*/pools.slowjob_threads/* / 2 + pools.0 / 4*/;
let game_mode = pools.game_mode;
@ -313,7 +340,29 @@ impl State {
let cores = core_affinity::get_core_ids().unwrap_or(vec![]).into_iter().skip(start).step_by(step).take(total).collect::<Vec<_>>();
let floating = total.saturating_sub(cores.len());
// TODO: NUMA utils
let slow_pool = slowjob::ThreadPool::with_affinity(&cores, floating, slowjob::large());
let slow_pool = slowjob::ThreadPoolBuilder::new()
.thread_name(move |i| format!("slowjob-{}-{}", thread_name_infix, i))
.spawn_handler(move |thread| {
let mut b = ThreadBuilder::default();
if let Some(name) = thread.name() {
b = b.name(name.to_owned());
}
if let Some(stack_size) = thread.stack_size() {
b = b.stack_size(stack_size);
}
// pinned slowjob threads run with low priority
let index = thread.index();
if index < floating {
b = b.priority(ThreadPriority::Min);
}
b.spawn_careless(|| thread.run())?;
Ok(())
})
.start_handler(move |i| {
let name = format!("slowjob-{}-{}", thread_name_infix, i);
common_base::set_thread_name!(&name);
})
.build_with_affinity(&cores, floating, slowjob::large());
/* let slow_pool = if cores.is_empty() {
// We need *some* workers, so just start on all cores.
slowjob::large_pool(1)/*slow_limit.min(64)/* as usize */)*/

View File

@ -14,6 +14,7 @@ use common::{
terrain::TerrainGrid,
vol::ReadVol,
};
use common_base::prof_span;
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{ClientGeneral, PresenceKind, ServerGeneral};
use common_state::{BlockChange, BuildAreas};
@ -396,8 +397,11 @@ impl<'a> System<'a> for Sys {
// NOTE: Required because Specs has very poor work splitting for sparse joins.
.par_bridge()
.map_init(
|| server_event_bus.emitter(),
|server_emitter, (
|| {
prof_span!(guard, "in_game rayon job");
(server_event_bus.emitter(), guard)
},
|(server_emitter, _guard), (
entity,
client,
mut maybe_presence,