From 0f01b78e4b0c004347654363ae931986cc94815b Mon Sep 17 00:00:00 2001 From: Joshua Yanovski Date: Wed, 24 Aug 2022 21:43:14 -0700 Subject: [PATCH] Threads pinned to cores, initial groundwork for async slowjobs. --- Cargo.lock | 76 ++++++++++++++- assets/world/features.ron | 2 +- client/src/lib.rs | 5 +- common/Cargo.toml | 11 ++- common/src/lib.rs | 1 + common/src/slowjob.rs | 101 +++++++++++++------- common/state/Cargo.toml | 1 + common/state/src/lib.rs | 2 +- common/state/src/state.rs | 133 +++++++++++++++++++++++---- server-cli/Cargo.toml | 1 + server-cli/src/main.rs | 1 + server/src/chunk_generator.rs | 2 +- server/src/lib.rs | 4 +- server/src/sys/chunk_serialize.rs | 2 +- voxygen/src/menu/main/client_init.rs | 5 +- voxygen/src/menu/main/mod.rs | 6 +- voxygen/src/render/renderer.rs | 5 + voxygen/src/scene/figure/cache.rs | 2 +- voxygen/src/scene/terrain.rs | 20 ++-- voxygen/src/singleplayer.rs | 6 +- voxygen/src/ui/keyed_jobs.rs | 2 +- 21 files changed, 304 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d4ea3870da..3f59ebf49a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -158,7 +158,7 @@ dependencies = [ [[package]] name = "arbalest" version = "0.2.1" -source = "git+https://gitlab.com/veloren/arbalest.git?rev=9cb8f67a4f6d8f3cc908dac4eb5eb8aec9fab07b#9cb8f67a4f6d8f3cc908dac4eb5eb8aec9fab07b" +source = "git+https://gitlab.com/veloren/arbalest.git?rev=743a8a262e4a762d6d7656c1193782ea35dd2a1d#743a8a262e4a762d6d7656c1193782ea35dd2a1d" [[package]] name = "arr_macro" @@ -252,6 +252,12 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-task" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a40729d2133846d9ed0ea60a8b9541bccddab49cd30f0715a1da672fe9a2524" + [[package]] name = "async-trait" version = "0.1.53" @@ -1013,6 +1019,18 @@ dependencies = [ "objc", ] +[[package]] +name = "core_affinity" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f8a03115cc34fb0d7c321dd154a3914b3ca082ccc5c11d91bf7117dbbe7171f" +dependencies = [ + "kernel32-sys", + "libc", + "num_cpus", + "winapi 0.2.8", +] + [[package]] name = "coreaudio-rs" version = "0.10.0" @@ -1183,7 +1201,7 @@ dependencies = [ "crossbeam-channel", "crossbeam-deque 0.8.1", "crossbeam-epoch 0.9.8", - "crossbeam-queue", + "crossbeam-queue 0.3.5", "crossbeam-utils 0.8.8", ] @@ -1248,6 +1266,15 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b" +dependencies = [ + "crossbeam-utils 0.6.6", +] + [[package]] name = "crossbeam-queue" version = "0.3.5" @@ -1258,6 +1285,16 @@ dependencies = [ "crossbeam-utils 0.8.8", ] +[[package]] +name = "crossbeam-utils" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6" +dependencies = [ + "cfg-if 0.1.10", + "lazy_static", +] + [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -1791,6 +1828,23 @@ version = "2.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" +[[package]] +name = "executors" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a94f9d4d0b228598ba8db61d2fe9321df9afcaa10aabdc26aa183cc76b10f7d" +dependencies = [ + "arr_macro", + "async-task", + "core_affinity", + "crossbeam-channel", + "crossbeam-deque 0.8.1", + "crossbeam-utils 0.8.8", + "log", + "rand 0.8.5", + "synchronoise", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -3388,7 +3442,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b47412f3a52115b936ff2a229b803498c7b4d332adeb87c2f1498c9da54c398c" dependencies = [ "crossbeam", - "crossbeam-queue", + "crossbeam-queue 0.3.5", "log", "mio 0.7.14", ] @@ -5667,7 +5721,7 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ea85dac2880f84d4025ff5ace80cda6d8bc43bc88b6a389b9277fcf894b51e9" dependencies = [ - "crossbeam-queue", + "crossbeam-queue 0.3.5", "hashbrown 0.12.0", "hibitset", "log", @@ -5926,6 +5980,15 @@ dependencies = [ "unicode-xid 0.2.2", ] +[[package]] +name = "synchronoise" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d717ed0efc9d39ab3b642a096bc369a3e02a38a51c41845d7fe31bdad1d6eaeb" +dependencies = [ + "crossbeam-queue 0.1.2", +] + [[package]] name = "synstructure" version = "0.12.6" @@ -6529,6 +6592,8 @@ dependencies = [ "csv", "dot_vox", "enum-iterator", + "executors", + "futures", "fxhash", "hashbrown 0.12.0", "indexmap", @@ -6538,6 +6603,7 @@ dependencies = [ "num-traits", "ordered-float 2.10.0", "petgraph 0.6.0", + "pin-project-lite", "rand 0.8.5", "rayon", "ron 0.7.0", @@ -6634,6 +6700,7 @@ name = "veloren-common-state" version = "0.10.0" dependencies = [ "bincode", + "core_affinity", "hashbrown 0.12.0", "num_cpus", "rayon", @@ -6820,6 +6887,7 @@ dependencies = [ "veloren-common-base", "veloren-common-frontend", "veloren-common-net", + "veloren-common-state", "veloren-server", ] diff --git a/assets/world/features.ron b/assets/world/features.ron index 27aef3b84a..c7d3319968 100644 --- a/assets/world/features.ron +++ b/assets/world/features.ron @@ -3,7 +3,7 @@ ( caverns: false, // TODO: Disabled by default until cave overhaul - caves: false, + caves: true, rocks: true, shrubs: true, trees: true, diff --git a/client/src/lib.rs b/client/src/lib.rs index e8719468ae..d63c27add0 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -289,6 +289,7 @@ impl Client { runtime: Arc, // TODO: refactor to avoid needing to use this out parameter mismatched_server_info: &mut Option, + pools: common_state::Pools, ) -> Result { let network = Network::new(Pid::new(), &runtime); @@ -395,7 +396,7 @@ impl Client { ability_map, } => { // Initialize `State` - let mut state = State::client(); + let mut state = State::client(pools); // Client-only components state.ecs_mut().register::>(); state.ecs_mut().write_resource::() @@ -1842,7 +1843,7 @@ impl Client { for key in chunks_to_remove { let chunk = self.state.remove_chunk(key); // Drop chunk in a background thread. - slowjob.spawn(&"TERRAIN_DROP", move || { drop(chunk); }); + slowjob.spawn(&"TERRAIN_DROP", async move { drop(chunk); }); } drop(slowjob); diff --git a/common/Cargo.toml b/common/Cargo.toml index 31745e5fc0..27fa1ca484 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -30,6 +30,7 @@ enum-iterator = "0.7" vek = { version = "0.15.8", features = ["serde"] } chrono = "0.4" chrono-tz = "0.6" +pin-project-lite = "0.2" sha2 = "0.10" serde_json = "1.0.50" serde_with = { version = "1.14.0" } @@ -48,6 +49,13 @@ clap = "2.33" crossbeam-utils = "0.8.1" bitflags = "1.2" crossbeam-channel = "0.5" +executors = { version = "0.9", default-features = false, features = [ + "workstealing-exec", + "ws-timed-fairness", + "thread-pinning", + "numa-aware" +] } +futures = "0.3" lazy_static = "1.4.0" num-derive = "0.3" num-traits = "0.2" @@ -79,7 +87,8 @@ petgraph = { version = "0.6", optional = true } kiddo = { version = "0.1", optional = true } # Data structures -arbalest = { git = "https://gitlab.com/veloren/arbalest.git", rev = "9cb8f67a4f6d8f3cc908dac4eb5eb8aec9fab07b", features = ["nightly"] } +arbalest = { git = "https://gitlab.com/veloren/arbalest.git", rev = "743a8a262e4a762d6d7656c1193782ea35dd2a1d", features = ["nightly"] } +# arbalest = { path = "../../arbalest", features = ["nightly"] } hashbrown = { version = "0.12", features = ["rayon", "serde", "nightly"] } slotmap = { version = "1.0", features = ["serde"] } indexmap = { version = "1.3.0", features = ["rayon"] } diff --git a/common/src/lib.rs b/common/src/lib.rs index 1c0223d864..904465567b 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -10,6 +10,7 @@ bool_to_option, coerce_unsized, dispatch_from_dyn, + exclusive_wrapper, fundamental, generic_const_exprs, generic_arg_infer, diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs index 5b3051e1a8..62c418d6a1 100644 --- a/common/src/slowjob.rs +++ b/common/src/slowjob.rs @@ -1,18 +1,30 @@ use arbalest::sync::{Strong, Frail}; use core::{ fmt, + future::Future, marker::Unsize, ops::{CoerceUnsized, DispatchFromDyn}, - sync::atomic::{AtomicBool, Ordering} + pin::Pin, + sync::{atomic::{AtomicBool, Ordering}, Exclusive}, + task, }; +use executors::{ + crossbeam_workstealing_pool::ThreadPool as ThreadPool_, + parker::{LargeThreadData, StaticParker}, + Executor, +}; +pub use executors::parker::large; use hashbrown::{hash_map::Entry, HashMap}; -use rayon::ThreadPool; +use pin_project_lite::pin_project; +// use rayon::ThreadPool; use std::{ collections::VecDeque, sync::{Arc, Mutex}, time::Instant, }; -use tracing::{error, warn}; +use tracing::{error, warn/* , Instrument */}; + +pub type ThreadPool = ThreadPool_>; /// Provides a Wrapper around rayon threadpool to execute slow-jobs. /// slow means, the job doesn't need to not complete within the same tick. @@ -51,24 +63,30 @@ use tracing::{error, warn}; /// pool.configure("CHUNK_GENERATOR", |n| n / 2); /// pool.spawn("CHUNK_GENERATOR", move || println!("this is a job")); /// ``` -#[derive(Clone)] +// #[derive(Clone)] pub struct SlowJobPool { internal: Arc>, - threadpool: Arc, + threadpool: ThreadPool, +} + +impl Drop for SlowJobPool { + fn drop(&mut self) { + self.threadpool.shutdown_borrowed(); + } } type Name = /*String*/&'static str; #[derive(Debug)] pub struct SlowJob { - task: Frail, + task: Pin>, } // impl + CoerceUnsized, U: ?Sized> CoerceUnsized> for Task {} struct InternalSlowJobPool { cur_slot: usize, - queue: HashMap>>, + queue: HashMap>>>, configs: HashMap, last_spawned_configs: Vec, global_spawned_and_running: u64, @@ -83,20 +101,29 @@ struct Config { local_spawned_and_running: u64, } -#[derive(Debug)] -struct Task { - queue_created: Instant, - /// Has this task been canceled? - is_canceled: AtomicBool, - /// The actual task function. Technically, the Option is unnecessary, since we'll only ever - /// run it once, but the performance improvement doesn't justify unsafe in this case. - task: F, +pin_project! { + struct Task { + queue_created: Instant, + // Has this task been canceled? + is_canceled: AtomicBool, + #[pin] + // The actual task future. + task: Exclusive, + } +} + +impl Future for Task { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + self.project().task.poll(cx) + } } /// NOTE: Should be FnOnce, but can't because there's no easy way to run an FnOnce function on an -/// Arc even if [try_unwrap] would work. We could write unsafe code to do this, but it probably +/// Arc even if [try_unwrap] would work. We could write non-safe code to do this, but it probably /// isn't worth it. -type Queue = Task; +type Queue = Task + Send + 'static>; pub struct JobMetrics { pub queue_created: Instant, @@ -105,20 +132,20 @@ pub struct JobMetrics { } impl Task { - fn new(f: F) -> Task - where F: FnOnce() + Send + Sync + 'static + fn new(f: F) -> Task + Send + 'static> + where F: /*FnOnce()*/Future + Send + 'static { let queue_created = Instant::now(); - let mut f = Some(f); + /* let mut f = Some(f); */ Task { queue_created, is_canceled: AtomicBool::new(false), - task: move || { + task: Exclusive::new(/* move || { // Working around not being able to call FnOnce in an Arc. if let Some(f) = f.take() { f(); } - }, + }*/f), } } } @@ -246,14 +273,15 @@ impl InternalSlowJobPool { fn spawn(&mut self, slowjob: &SlowJobPool, push_back: bool, name: &Name, f: F) -> SlowJob where - F: FnOnce() + Send + Sync + 'static, + F: /*FnOnce()*/Future + Send + 'static, { - let queue: Strong = Strong::new(Task::new(f)); + // let f = f.instrument(tracing::info_span!("{}", name)); + let queue: Pin> = Strong::pin(Task::new(f)); let mut deque = self.queue .entry(name.to_owned()) .or_default(); let job = SlowJob { - task: Strong::downgrade(&queue) + task: Strong::pin_downgrade(&queue) }; if push_back { deque.push_back(queue); @@ -287,7 +315,7 @@ impl InternalSlowJobPool { /// and global task counters, so make sure to actually finish the returned jobs if you consume /// the iterator, or the position in the queue may be off! #[must_use = "Remember to actually use the returned jobs if you consume the iterator."] - fn next_jobs<'a>(&'a mut self) -> impl Iterator)> + 'a { + fn next_jobs<'a>(&'a mut self) -> impl Iterator>)> + 'a { let queued = &mut self.queue; let configs = &mut self.configs; let global_spawned_and_running = &mut self.global_spawned_and_running; @@ -426,7 +454,7 @@ impl SlowJob { // run the task, and we only drop the strong side after the task is complete, this is // a conservative signal that there's no point in cancelling the task, so this has no // false positives. - let task = self.task.try_upgrade().or(Err(self))?; + let task = Frail::try_pin_upgrade(&self.task).or(Err(self))?; // Now that the task is upgraded, any attempt by the strong side to mutably access the // task will fail, so it will assume it's been canceled. This is fine, because we're // about to cancel it anyway. @@ -448,7 +476,7 @@ impl SlowJob { } impl SlowJobPool { - pub fn new(global_limit: u64, jobs_metrics_cnt: usize, threadpool: Arc) -> Self { + pub fn new(global_limit: u64, jobs_metrics_cnt: usize, threadpool: /*Arc<*/ThreadPool/*>*/) -> Self { Self { internal: Arc::new(Mutex::new(InternalSlowJobPool::new(global_limit, jobs_metrics_cnt))), threadpool, @@ -482,12 +510,12 @@ impl SlowJobPool { /// This runs the task, and then checks at the end to see if there are any more tasks to run /// before returning for good. In cases with lots of tasks, this may help avoid unnecessary /// context switches or extra threads being spawned unintentionally. - fn spawn_in_threadpool(&self, mut name_task: (Name, Strong)) { + fn spawn_in_threadpool(&self, mut name_task: (Name, Pin>)) { let internal = Arc::clone(&self.internal); // NOTE: It's important not to use internal until we're in the spawned thread, since the // lock is probably currently taken! - self.threadpool.spawn(move || { + self.threadpool.execute(move || { // Repeatedly run until exit; we do things this way to avoid recursion, which might blow // our call stack. loop { @@ -499,14 +527,17 @@ impl SlowJobPool { // difference is minor and it makes it easier to assign metrics to canceled tasks // (though maybe we don't want to do that?). let execution_start = Instant::now(); - if let Some(mut task) = Strong::try_borrow_mut(&mut task) + if let Some(mut task) = Strong::try_pin_borrow_mut(&mut task) .ok() .filter(|task| !task.is_canceled.load(Ordering::Relaxed)) { // The task was not canceled. // // Run the task in its own scope so perf works correctly. common_base::prof_span_alloc!(_guard, &name); - (task.task)(); + futures::executor::block_on(task.as_mut()/* .instrument({ + common_base::prof_span!(span, &name); + span + }) */); } let execution_end = Instant::now(); let metrics = JobMetrics { @@ -542,7 +573,7 @@ impl SlowJobPool { #[allow(clippy::result_unit_err)] pub fn try_run(&self, name: &Name, f: F) -> Result where - F: FnOnce() + Send + Sync + 'static, + F: /*FnOnce()*/Future + Send + 'static, { let mut lock = self.internal.lock().expect("lock poisoned while try_run"); let lock = &mut *lock; @@ -557,7 +588,7 @@ impl SlowJobPool { pub fn spawn(&self, name: &Name, f: F) -> SlowJob where - F: FnOnce() + Send + Sync + 'static, + F: /*FnOnce()*/Future + Send + 'static, { self.internal .lock() @@ -568,7 +599,7 @@ impl SlowJobPool { /// Spawn at the front of the queue, which is preferrable in some cases. pub fn spawn_front(&self, name: &Name, f: F) -> SlowJob where - F: FnOnce() + Send + Sync + 'static, + F: /*FnOnce()*/Future + Send + 'static, { self.internal .lock() diff --git a/common/state/Cargo.toml b/common/state/Cargo.toml index b5e9063e0d..5cb9c3d7df 100644 --- a/common/state/Cargo.toml +++ b/common/state/Cargo.toml @@ -16,6 +16,7 @@ common-net = { package = "veloren-common-net", path = "../net" } common-ecs = { package = "veloren-common-ecs", path = "../ecs" } common-base = { package = "veloren-common-base", path = "../base" } +core_affinity = "0.5" rayon = "1.5" num_cpus = "1.0" tracing = { version = "0.1", default-features = false } diff --git a/common/state/src/lib.rs b/common/state/src/lib.rs index d0047fe6de..c9dd94f289 100644 --- a/common/state/src/lib.rs +++ b/common/state/src/lib.rs @@ -6,4 +6,4 @@ mod build_areas; mod state; // TODO: breakup state module and remove glob pub use build_areas::{BuildAreaError, BuildAreas}; -pub use state::{BlockChange, State, TerrainChanges}; +pub use state::{BlockChange, Pools, State, TerrainChanges}; diff --git a/common/state/src/state.rs b/common/state/src/state.rs index 5b84c2e87c..0993912075 100644 --- a/common/state/src/state.rs +++ b/common/state/src/state.rs @@ -16,7 +16,7 @@ use common::{ DeltaTime, EntitiesDiedLastTick, GameMode, PlayerEntity, PlayerPhysicsSettings, Time, TimeOfDay, }, - slowjob::SlowJobPool, + slowjob::{self, SlowJobPool}, terrain::{Block, TerrainChunk, TerrainGrid}, time::DayPeriod, trade::Trades, @@ -94,38 +94,125 @@ pub struct State { thread_pool: Arc, } +pub type Pools = (usize, GameMode/*u64*/, Arc/*, slowjob::SlowJobPool*/); + + impl State { - /// Create a new `State` in client mode. - pub fn client() -> Self { Self::new(GameMode::Client) } + pub fn pools(game_mode: GameMode) -> Pools { + let num_cpu = num_cpus::get()/* - 1*/; - /// Create a new `State` in server mode. - pub fn server() -> Self { Self::new(GameMode::Server) } - - pub fn new(game_mode: GameMode) -> Self { let thread_name_infix = match game_mode { GameMode::Server => "s", GameMode::Client => "c", GameMode::Singleplayer => "sp", }; - - let num_cpu = num_cpus::get(); - let thread_pool = Arc::new( + let rayon_threads = match game_mode { + GameMode::Server | GameMode::Client => num_cpu / 2, + GameMode::Singleplayer => num_cpu / 4, + }/*num_cpu*/.max(common::consts::MIN_RECOMMENDED_RAYON_THREADS); + let rayon_pool = Arc::new( ThreadPoolBuilder::new() - .num_threads(num_cpu.max(common::consts::MIN_RECOMMENDED_RAYON_THREADS)) + .num_threads(rayon_threads) + // .thread_name(move |i| format!("rayon-{}", i)) .thread_name(move |i| format!("rayon-{}-{}", thread_name_infix, i)) .build() .unwrap(), ); + + // let num_cpu = num_cpu as u64; + /* let slow_limit = /* match game_mode { + GameMode::Server | GameMode::Client => num_cpu / 2, + GameMode::Singleplayer => num_cpu / 4 + }.max(1); */(2 * num_cpu).max(1); */ + /* let slow_limit = 2 * (num_cpu - 1).max(1); + let cores = core_affinity::get_core_ids().unwrap_or(vec![]).into_iter().take(slow_limit).collect::>(); + let floating = slow_limit.saturating_sub(cores.len()); + tracing::trace!(?slow_limit, "Slow Thread limit"); + let slow_pool = slowjob::ThreadPool::with_affinity(&cores, floating, slowjob::large()); + // let slow_pool = slowjob::large_pool(slow_limit.min(64)/* as usize */); + let slowjob = SlowJobPool::new( + slow_limit as u64, + 10_000, + /*Arc::clone(*/slow_pool/*)*/, + ); */ + + (num_cpu - 1/*slow_limit*//* as u64*/, game_mode, rayon_pool/*, slowjob*/) + } + + /// Create a new `State` in client mode. + pub fn client(pools: Pools) -> Self { Self::new(GameMode::Client, pools) } + + /// Create a new `State` in server mode. + pub fn server(pools: Pools) -> Self { Self::new(GameMode::Server, pools) } + + pub fn new(ecs_role: GameMode, pools: Pools) -> Self { + /* let thread_name_infix = match game_mode { + GameMode::Server => "s", + GameMode::Client => "c", + GameMode::Singleplayer => "sp", + }; */ + + let num_cpu = /*num_cpus::get()*/pools.0/* / 2 + pools.0 / 4*/; + let game_mode = pools.1; + /* let rayon_threads = match game_mode { + GameMode::Server | GameMode::Client => num_cpu/* / 2*/, + GameMode::Singleplayer => num_cpu/* / 4*// 2, + }/*num_cpu*/; + let rayon_threads = rayon_threads.max(common::consts::MIN_RECOMMENDED_RAYON_THREADS); + + let thread_pool = Arc::new( + ThreadPoolBuilder::new() + .num_threads(rayon_threads) + .thread_name(move |i| format!("rayon-{}-{}", thread_name_infix, i)) + .build() + .unwrap(), + ); */ + + // let num_cpu = num_cpu as u64; + let (start, step, total) = match (game_mode, ecs_role) { + (_, GameMode::Singleplayer) => todo!("Singleplayer is not a valid ECS role (yet)"), + (GameMode::Server | GameMode::Client, _) => (0, 1, num_cpu), + (GameMode::Singleplayer, GameMode::Server) => (0, 2, num_cpu / 2/* + num_cpu / 4 */), + (GameMode::Singleplayer, GameMode::Client) => (1, 2, num_cpu - num_cpu / 2/* + num_cpu / 4 */), + }; + let total = total.max(1); + let cores = core_affinity::get_core_ids().unwrap_or(vec![]).into_iter().skip(start).step_by(step).take(total).collect::>(); + let floating = total.saturating_sub(cores.len()); + // TODO: NUMA utils + let slow_pool = slowjob::ThreadPool::with_affinity(&cores, floating, slowjob::large()); + /* let slow_pool = if cores.is_empty() { + // We need *some* workers, so just start on all cores. + slowjob::large_pool(1)/*slow_limit.min(64)/* as usize */)*/ + } else { + let slow_pool = with_affinity( + cores: &[CoreId], + floating: usize, + parker: P + ) -> ThreadPool

+ } */ + /* let slow_limit = match game_mode { + GameMode::Server | GameMode::Client => num_cpu / 2, + GameMode::Singleplayer => num_cpu / 4 + }.max(1);/*(/*2 * */num_cpu / 2 + num_cpu / 4).max(1);*/ */ + tracing::trace!(?game_mode, ?ecs_role, ?num_cpu, ?start, ?step, ?total, "Slow Thread limit"); + // dbg!(game_mode, ecs_role, num_cpu, start, step, total, cores, floating, "Slow Thread limit"); + let slowjob = SlowJobPool::new( + /* slow_limit as u64, */ + total as u64, + 10_000, + /*Arc::clone(*/slow_pool/*)*/, + ); + Self { - ecs: Self::setup_ecs_world(game_mode, num_cpu as u64, &thread_pool), - thread_pool, + ecs: Self::setup_ecs_world(ecs_role, /*num_cpu as u64*//*, &thread_pool, *//*pools.1*/slowjob/*pools.3*/), + thread_pool: pools.2, } } /// Creates ecs world and registers all the common components and resources // TODO: Split up registering into server and client (e.g. move // EventBus to the server) - fn setup_ecs_world(game_mode: GameMode, num_cpu: u64, thread_pool: &Arc) -> specs::World { + fn setup_ecs_world(ecs_role: GameMode, /*num_cpu: u64*//*, thread_pool: &Arc, */slowjob: SlowJobPool) -> specs::World { let mut ecs = specs::World::new(); // Uids for sync ecs.register_sync_marker(); @@ -219,18 +306,24 @@ impl State { ecs.insert(crate::build_areas::BuildAreas::default()); ecs.insert(TerrainChanges::default()); ecs.insert(EventBus::::default()); - ecs.insert(game_mode); + ecs.insert(ecs_role); ecs.insert(EventBus::::default()); ecs.insert(common::CachedSpatialGrid::default()); ecs.insert(EntitiesDiedLastTick::default()); - let slow_limit = (num_cpu / 2 + num_cpu / 4).max(1); - tracing::trace!(?slow_limit, "Slow Thread limit"); + /* let slow_limit = match game_mode { + GameMode::Server | GameMode::Client => num_cpu / 2, + GameMode::Singleplayer => num_cpu / 4 + }.max(1); */ + // let slow_limit = (num_cpu / 2 + num_cpu / 4).max(1); + /* tracing::trace!(?slow_limit, "Slow Thread limit"); + let thread_pool = slowjob::large_pool(slow_limit.min(64) as usize); ecs.insert(SlowJobPool::new( slow_limit, 10_000, - Arc::clone(thread_pool), - )); + /*Arc::clone(*/thread_pool/*)*/, + )); */ + ecs.insert(slowjob); // TODO: only register on the server ecs.insert(EventBus::::default()); @@ -254,7 +347,7 @@ impl State { }; if let Err(e) = plugin_mgr .execute_event(&ecs_world, &plugin_api::event::PluginLoadEvent { - game_mode, + game_mode: ecs_role, }) { tracing::debug!(?e, "Failed to run plugin init"); diff --git a/server-cli/Cargo.toml b/server-cli/Cargo.toml index f3e23b5841..d5bef3cdea 100644 --- a/server-cli/Cargo.toml +++ b/server-cli/Cargo.toml @@ -28,6 +28,7 @@ hot-reloading = ["server/hot-reloading"] server = { package = "veloren-server", path = "../server", default-features = false, features = ["simd"] } common = { package = "veloren-common", path = "../common" } common-base = { package = "veloren-common-base", path = "../common/base" } +common-state = { package = "veloren-common-state", path = "../common/state" } common-net = { package = "veloren-common-net", path = "../common/net" } common-frontend = { package = "veloren-common-frontend", path = "../common/frontend" } diff --git a/server-cli/src/main.rs b/server-cli/src/main.rs index e88c6d7572..5c34a0161d 100644 --- a/server-cli/src/main.rs +++ b/server-cli/src/main.rs @@ -170,6 +170,7 @@ fn main() -> io::Result<()> { database_settings, &server_data_dir, runtime, + common_state::State::pools(common::resources::GameMode::Server), ) .expect("Failed to create server instance!"); diff --git a/server/src/chunk_generator.rs b/server/src/chunk_generator.rs index ec4ef06562..5788c798f0 100644 --- a/server/src/chunk_generator.rs +++ b/server/src/chunk_generator.rs @@ -55,7 +55,7 @@ impl ChunkGenerator { v.insert(Arc::clone(&cancel)); let chunk_tx = self.chunk_tx.clone(); self.metrics.chunks_requested.inc(); - slowjob_pool.spawn(&"CHUNK_GENERATOR", move || { + slowjob_pool.spawn(&"CHUNK_GENERATOR", async move { let index = index.as_index_ref(); let payload = world .generate_chunk(index, key, || cancel.load(Ordering::Relaxed), Some(time)) diff --git a/server/src/lib.rs b/server/src/lib.rs index 06ea8e4a27..32fb45195c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -2,7 +2,6 @@ #![allow(clippy::option_map_unit_fn)] #![deny(clippy::clone_on_ref_ptr)] #![feature( - bool_to_option, box_patterns, drain_filter, label_break_value, @@ -218,6 +217,7 @@ impl Server { database_settings: DatabaseSettings, data_dir: &std::path::Path, runtime: Arc, + pools: common_state::Pools, ) -> Result { info!("Server data dir is: {}", data_dir.display()); if settings.auth_server_address.is_none() { @@ -242,7 +242,7 @@ impl Server { let battlemode_buffer = BattleModeBuffer::default(); - let mut state = State::server(); + let mut state = State::server(pools); state.ecs_mut().insert(battlemode_buffer); state.ecs_mut().insert(settings.clone()); state.ecs_mut().insert(editable_settings); diff --git a/server/src/sys/chunk_serialize.rs b/server/src/sys/chunk_serialize.rs index d014ee0b5d..6eddab93fb 100644 --- a/server/src/sys/chunk_serialize.rs +++ b/server/src/sys/chunk_serialize.rs @@ -117,7 +117,7 @@ impl<'a> System<'a> for Sys { while chunks_iter.peek().is_some() { let chunks: Vec<_> = chunks_iter.by_ref().take(CHUNK_SIZE).collect(); let chunk_sender = chunk_sender.clone(); - slow_jobs.spawn(&"CHUNK_SERIALIZER", move || { + slow_jobs.spawn(&"CHUNK_SERIALIZER", async move { for (chunk, chunk_key, mut meta) in chunks { let msg = Client::prepare_chunk_update_msg( ServerGeneral::TerrainChunkUpdate { diff --git a/voxygen/src/menu/main/client_init.rs b/voxygen/src/menu/main/client_init.rs index 5cc0b5b791..20d7893bd2 100644 --- a/voxygen/src/menu/main/client_init.rs +++ b/voxygen/src/menu/main/client_init.rs @@ -48,6 +48,7 @@ impl ClientInit { username: String, password: String, runtime: Arc, + pools: common_state::Pools, ) -> Self { let (tx, rx) = unbounded(); let (trust_tx, trust_rx) = unbounded(); @@ -77,6 +78,7 @@ impl ClientInit { connection_args.clone(), Arc::clone(&runtime2), &mut mismatched_server_info, + pools.clone(), ) .await { @@ -89,6 +91,7 @@ impl ClientInit { break 'tries; } let _ = tx.send(Msg::Done(Ok(client))); + drop(pools); tokio::task::block_in_place(move || drop(runtime2)); return; }, @@ -108,12 +111,12 @@ impl ClientInit { } tokio::time::sleep(Duration::from_secs(5)).await; } - // Parsing/host name resolution successful but no connection succeeded // If last_err is None this typically means there was no server up at the input // address and all the attempts timed out. let _ = tx.send(Msg::Done(Err(last_err.unwrap_or(Error::ServerNotFound)))); + drop(pools); // Safe drop runtime tokio::task::block_in_place(move || drop(runtime2)); }); diff --git a/voxygen/src/menu/main/mod.rs b/voxygen/src/menu/main/mod.rs index 6bc4860f51..b2d5131a54 100644 --- a/voxygen/src/menu/main/mod.rs +++ b/voxygen/src/menu/main/mod.rs @@ -96,7 +96,7 @@ impl PlayState for MainMenuState { { if let Some(singleplayer) = &global_state.singleplayer { match singleplayer.receiver.try_recv() { - Ok(Ok(())) => { + Ok(Ok(pools)) => { // Attempt login after the server is finished initializing attempt_login( &mut global_state.info_message, @@ -106,6 +106,7 @@ impl PlayState for MainMenuState { &mut self.init, &global_state.tokio_runtime, &global_state.i18n, + Some(pools), ); }, Ok(Err(e)) => { @@ -301,6 +302,7 @@ impl PlayState for MainMenuState { &mut self.init, &global_state.tokio_runtime, &global_state.i18n, + None, ); }, MainMenuEvent::CancelLoginAttempt => { @@ -500,6 +502,7 @@ fn attempt_login( init: &mut InitState, runtime: &Arc, localized_strings: &LocalizationHandle, + pools: Option, ) { let localization = localized_strings.read(); if let Err(err) = comp::Player::alias_validate(&username) { @@ -530,6 +533,7 @@ fn attempt_login( username, password, Arc::clone(runtime), + pools.unwrap_or_else(|| common_state::State::pools(common::resources::GameMode::Client)), )); } } diff --git a/voxygen/src/render/renderer.rs b/voxygen/src/render/renderer.rs index f258687b5c..c4193f2b11 100644 --- a/voxygen/src/render/renderer.rs +++ b/voxygen/src/render/renderer.rs @@ -1013,6 +1013,7 @@ impl Renderer { ); if self.is_minimized { + self.queue.submit(pre_commands.into_iter()); return Ok(None); } @@ -1198,17 +1199,21 @@ impl Renderer { // If lost recreate the swap chain Err(err @ wgpu::SwapChainError::Lost) => { warn!("{}. Recreating swap chain. A frame will be missed", err); + self.queue.submit(pre_commands.into_iter()); self.on_resize(self.resolution); return Ok(None); }, Err(wgpu::SwapChainError::Timeout) => { + println!("Timeout."); // This will probably be resolved on the next frame // NOTE: we don't log this because it happens very frequently with // PresentMode::Fifo and unlimited FPS on certain machines + self.queue.submit(pre_commands.into_iter()); return Ok(None); }, Err(err @ wgpu::SwapChainError::Outdated) => { warn!("{}. Recreating the swapchain", err); + self.queue.submit(pre_commands.into_iter()); self.swap_chain = self.device.create_swap_chain(&self.surface, &self.sc_desc); return Ok(None); }, diff --git a/voxygen/src/scene/figure/cache.rs b/voxygen/src/scene/figure/cache.rs index 5d6b67bfc4..8584d6591d 100644 --- a/voxygen/src/scene/figure/cache.rs +++ b/voxygen/src/scene/figure/cache.rs @@ -336,7 +336,7 @@ where let manifests = self.manifests.clone(); let slot_ = Arc::clone(&slot); - slow_jobs.spawn(&"FIGURE_MESHING", move || { + slow_jobs.spawn(&"FIGURE_MESHING", async move { // First, load all the base vertex data. let meshes = ::bone_meshes(&key, &manifests, extra); diff --git a/voxygen/src/scene/terrain.rs b/voxygen/src/scene/terrain.rs index 2a22f89ab2..f0ad8fcd6b 100644 --- a/voxygen/src/scene/terrain.rs +++ b/voxygen/src/scene/terrain.rs @@ -772,7 +772,7 @@ impl/**/ Terrain { (0..=count).for_each(|_| { let new_atlas_tx = new_atlas_tx.clone(); let texture_fn = renderer.create_texture_raw(); - slowjob.spawn(&"IMAGE_PROCESSING", move || { + slowjob.spawn(&"IMAGE_PROCESSING", async move { // Construct the next atlas on a separate thread. If it doesn't get sent, it means // the original channel was dropped, which implies the terrain scene data no longer // exists, so we can just drop the result in that case. @@ -850,7 +850,7 @@ impl/**/ Terrain { if let Some(old) = chunks.insert(pos, chunk) { Self::remove_chunk_meta(atlas, pos, &old); // Drop the chunk on another thread. - slowjob.spawn(&"TERRAIN_DROP", move || { drop(old); }); + slowjob.spawn(&"TERRAIN_DROP", async move { drop(old); }); } /* let (zmin, zmax) = chunk.z_bounds; self.z_index_up.insert(Vec3::from(zmin, pos.x, pos.y)); @@ -1316,7 +1316,7 @@ impl/**/ Terrain { /* let create_locals = renderer.create_terrain_bound_locals(); */ let create_texture = renderer./*create_texture_raw*/create_model_lazy_base(wgpu::BufferUsage::COPY_SRC/* | wgpu::BufferUsage::MAP_WRITE */); /* cnt.fetch_add(1, Ordering::Relaxed); */ - let job = move || { + let job = async move { // Since this loads when the task actually *runs*, rather than when it's // queued, it provides us with a good opportunity to check whether the chunk // should be canceled. We might miss updates, but that's okay, since canceling @@ -1395,7 +1395,7 @@ impl/**/ Terrain { // Chunk must have been removed, or it was spawned on an old tick. Drop // the mesh in the background since it's either out of date or no longer // needed. - slowjob.spawn(&"TERRAIN_DROP", move || { drop(response); }); + slowjob.spawn(&"TERRAIN_DROP", async move { drop(response); }); continue; } @@ -1500,7 +1500,7 @@ impl/**/ Terrain { }, ); // Drop image on background thread. - slowjob.spawn(&"TERRAIN_DROP", move || { drop(tex); }); + slowjob.spawn(&"TERRAIN_DROP", async move { drop(tex); }); // Update the memory mapped locals. let locals_buffer_ = @@ -1546,7 +1546,7 @@ impl/**/ Terrain { } else { // Not sure what happened here, but we should drop the result in the // background. - slowjob.spawn(&"TERRAIN_DROP", move || { drop(response); }); + slowjob.spawn(&"TERRAIN_DROP", async move { drop(response); }); } if response_started_tick == started_tick { @@ -1557,7 +1557,7 @@ impl/**/ Terrain { }, // Old task, drop the response in the background. None => { - slowjob.spawn(&"TERRAIN_DROP", move || { drop(response); }); + slowjob.spawn(&"TERRAIN_DROP", async move { drop(response); }); }, } } @@ -1565,7 +1565,7 @@ impl/**/ Terrain { drop(locals_buffer); renderer.unmap_consts(&mut locals); // Drop buffer on background thread. - slowjob.spawn(&"TERRAIN_DROP", move || { drop(locals); }); + slowjob.spawn(&"TERRAIN_DROP", async move { drop(locals); }); /* // TODO: Delay submission, don't just submit immediately out of convenience! renderer.queue.submit(std::iter::once(encoder.finish())); */ self.command_buffers.push(encoder.finish()); @@ -1728,7 +1728,7 @@ impl/**/ Terrain { .drain_filter(|(pos, chunk)| chunks.contains_key(pos) || !can_shadow_sun(*pos, chunk)) .for_each(|(pos, chunk)| { // Drop the chunk on another thread. - slowjob.spawn(&"TERRAIN_DROP", move || { drop(chunk); }); + slowjob.spawn(&"TERRAIN_DROP", async move { drop(chunk); }); }); (visible_light_volume, visible_bounds) @@ -1737,7 +1737,7 @@ impl/**/ Terrain { // shadow chunks around. let chunks = core::mem::replace(&mut self.shadow_chunks, Vec::new()); // Drop the chunks on another thread. - slowjob.spawn(&"TERRAIN_DROP", move || { drop(chunks); }); + slowjob.spawn(&"TERRAIN_DROP", async move { drop(chunks); }); (Vec::new(), math::Aabr { min: math::Vec2::zero(), max: math::Vec2::zero(), diff --git a/voxygen/src/singleplayer.rs b/voxygen/src/singleplayer.rs index 44e0a789c3..daad5c90ee 100644 --- a/voxygen/src/singleplayer.rs +++ b/voxygen/src/singleplayer.rs @@ -22,7 +22,7 @@ const TPS: u64 = 30; pub struct Singleplayer { _server_thread: JoinHandle<()>, stop_server_s: Sender<()>, - pub receiver: Receiver>, + pub receiver: Receiver>, // Wether the server is stopped or not paused: Arc, // Settings that the server was started with @@ -105,14 +105,16 @@ impl Singleplayer { .spawn(move || { trace!("starting singleplayer server thread"); + let pools = common_state::State::pools(common::resources::GameMode::Singleplayer); let (server, init_result) = match Server::new( settings2, editable_settings, database_settings, &server_data_dir, runtime, + pools.clone(), ) { - Ok(server) => (Some(server), Ok(())), + Ok(server) => (Some(server), Ok(pools)), Err(err) => (None, Err(err)), }; diff --git a/voxygen/src/ui/keyed_jobs.rs b/voxygen/src/ui/keyed_jobs.rs index 0e7a1dee05..566c16070e 100644 --- a/voxygen/src/ui/keyed_jobs.rs +++ b/voxygen/src/ui/keyed_jobs.rs @@ -89,7 +89,7 @@ impl Key // approximating that let tx = self.tx.clone(); let f = f(); - let job = pool.spawn(&self.name, move || { + let job = pool.spawn(&self.name, async move { let v = f(&k); let _ = tx.send((k, v)); });