diff --git a/common/src/lib.rs b/common/src/lib.rs index 504ba44005..f5d86241e6 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -61,6 +61,7 @@ pub mod resources; #[cfg(not(target_arch = "wasm32"))] pub mod rtsim; #[cfg(not(target_arch = "wasm32"))] pub mod skillset_builder; +pub mod slowjob; #[cfg(not(target_arch = "wasm32"))] pub mod spiral; #[cfg(not(target_arch = "wasm32"))] @@ -76,7 +77,6 @@ pub mod uid; #[cfg(not(target_arch = "wasm32"))] pub mod vol; #[cfg(not(target_arch = "wasm32"))] pub mod volumes; -pub mod slowjob; pub use combat::DamageSource; #[cfg(not(target_arch = "wasm32"))] diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs index ff41d8a0ec..ad1e3d6f95 100644 --- a/common/src/slowjob.rs +++ b/common/src/slowjob.rs @@ -1,37 +1,35 @@ -use std::sync::RwLock; -use std::collections::HashMap; -use std::sync::Arc; -use std::sync::atomic::{Ordering, AtomicU64}; -use core::any::Any; -use crossbeam_channel::{Receiver, Sender}; +use hashbrown::HashMap; +use rayon::ThreadPool; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, RwLock, +}; -type LimitFn = dyn Fn(u64) -> u64; - -/// a slow job is a CPU heavy task, that is not I/O blocking. -/// It usually takes longer than a tick to compute, so it's outsourced -/// Internally the rayon threadpool is used to calculate t -pub struct SlowJobGroup { - name: String, - next_id: Arc, - queue: Arc>>, - local_running_jobs: Arc, - global_running_jobs: Arc, - receiver: Arc>, - sender: Arc>, -} - -/// a slow job is a CPU heavy task, that is not I/O blocking. -/// It usually takes longer than a tick to compute, so it's outsourced -/// Internally the rayon threadpool is used to calculate t +/// Provides a Wrapper around rayon threadpool to execute slow-jobs. +/// slow means, the job doesn't need to not complete within the same tick. +/// DO NOT USE I/O blocking jobs, but only CPU heavy jobs. +/// Jobs run here, will reduce the ammount of threads rayon can use during the +/// main tick. +/// +/// This Pool allows you to configure certain names of jobs and assign them a +/// maximum number of threads # Example +/// Your system has 16 cores, you assign 12 cores for slow-jobs. +/// Then you can configure all jobs with the name `CHUNK_GENERATOR` to spawn on +/// max 50% (6 = cores) ```rust +/// # use veloren_common::slowjob::SlowJobPool; +/// # use std::sync::Arc; +/// +/// let threadpool = rayon::ThreadPoolBuilder::new() +/// .num_threads(16) +/// .build() +/// .unwrap(); +/// let pool = SlowJobPool::new(3, Arc::new(threadpool)); +/// pool.configure("CHUNK_GENERATOR", |n| n / 2); +/// pool.spawn("CHUNK_GENERATOR", move || println("this is a job")); +/// ``` +#[derive(Clone)] pub struct SlowJobPool { - next_id: AtomicU64, - groups: RwLock>>>>, - queue: RwLock>>, - finished: RwLock>>>, - running_jobs: RwLock>>, - receiver: Receiver<(String, Box)>, - sender: Sender<(String, Box)>, - global_limit: Box, + internal: Arc, } pub struct SlowJob { @@ -39,85 +37,350 @@ pub struct SlowJob { id: u64, } -struct Queue { - task: Box ()>, - running_cnt: Arc, +struct InternalSlowJobPool { + next_id: Arc, + queue: RwLock>>, + running_jobs: RwLock>>, + configs: RwLock>, + global_running_jobs: Arc, + global_limit: u64, + threadpool: Arc, } -impl SlowJobGroup where - D: Any + Send + 'static -{ - /// spawn a new slow job - pub fn spawn(&self, name: &str, f: F) -> SlowJob where - F: FnOnce() -> D + 'static, - { - let id = self.next_id.fetch_add(1, Ordering::Relaxed); - let local_running_jobs_clone = Arc::clone(&self.local_running_jobs); - let global_running_jobs_clone = Arc::clone(&self.global_running_jobs); - let sender = self.sender.clone(); - let name_clone = name.to_string(); - let queue = Queue { - task: Box::new(move || { - let result = f(); - let _ = sender.send((name_clone, result)); - local_running_jobs_clone.fetch_sub(1, Ordering::Relaxed); - global_running_jobs_clone.fetch_sub(1, Ordering::Relaxed); - }), - running_cnt: Arc::clone(&self.local_running_jobs), +struct Config { + max_local: u64, + spawned_total: Arc, +} + +struct Queue { + task: Box, + spawned_total: Arc, + local_running_jobs: Arc, +} + +impl InternalSlowJobPool { + pub fn new(global_limit: u64, threadpool: Arc) -> Self { + Self { + next_id: Arc::new(AtomicU64::new(0)), + queue: RwLock::new(HashMap::new()), + running_jobs: RwLock::new(HashMap::new()), + configs: RwLock::new(HashMap::new()), + global_running_jobs: Arc::new(AtomicU64::new(0)), + global_limit, + threadpool, + } + } + + fn maintain(&self) { + let jobs_available = self.global_limit - self.global_running_jobs.load(Ordering::Relaxed); + if jobs_available == 0 { + // we run at limit, can't spawn + return; + } + let possible = { + let lock = self.queue.read().unwrap(); + lock.iter() + .map(|(name, queues)| { + if !queues.is_empty() { + Some(name.clone()) + } else { + None + } + }) + .flatten() + .collect::>() }; - self.queue.write().unwrap().insert(id, queue); + + let mut possible_total = { + let mut possible = possible; + let lock = self.configs.read().unwrap(); + possible + .drain(..) + .map(|name| { + let c = lock.get(&name).unwrap(); + ( + name, + c.spawned_total.load(Ordering::Relaxed) / c.max_local, + c.max_local, + ) + }) + .collect::>() + }; + possible_total.sort_by_key(|(_, i, _)| *i); + + let mut lock = self.queue.write().unwrap(); + for i in 0..jobs_available as usize { + if let Some((name, _, max)) = possible_total.get(i) { + if let Some(map) = lock.get_mut(name) { + let firstkey = match map.keys().next() { + Some(k) => *k, + None => continue, + }; + + if let Some(queue) = map.remove(&firstkey) { + if queue.local_running_jobs.load(Ordering::Relaxed) < *max { + self.fire(queue); + } else { + map.insert(firstkey, queue); + } + } + } + } + } + } + + fn fire(&self, queue: Queue) { + queue.spawned_total.fetch_add(1, Ordering::Relaxed); + queue.local_running_jobs.fetch_add(1, Ordering::Relaxed); + self.global_running_jobs.fetch_add(1, Ordering::Relaxed); + self.threadpool.spawn(queue.task); + } +} + +impl SlowJobPool { + pub fn new(global_limit: u64, threadpool: Arc) -> Self { + Self { + internal: Arc::new(InternalSlowJobPool::new(global_limit, threadpool)), + } + } + + /// configure a NAME to spawn up to f(n) threads, depending on how many + /// threads we globally have available + pub fn configure(&self, name: &str, f: F) + where + F: Fn(u64) -> u64, + { + let cnf = Config { + max_local: f(self.internal.global_limit), + spawned_total: Arc::new(AtomicU64::new(0)), + }; + let mut lock = self.internal.configs.write().unwrap(); + lock.insert(name.to_string(), cnf); + } + + /// spawn a new slow job on a certain NAME + pub fn spawn(&self, name: &str, f: F) -> SlowJob + where + F: FnOnce() + Send + Sync + 'static, + { + let id = self.internal.next_id.fetch_add(1, Ordering::Relaxed); + self.internal + .queue + .write() + .unwrap() + .entry(name.to_string()) + .or_default() + .insert(id, self.queue(name, f)); + self.maintain(); SlowJob { name: name.to_string(), id, } } - pub fn cancel(&self, job: SlowJob) { - self.queue.write().unwrap().remove(&job.id); - } - - /// collect all slow jobs finished - pub fn collect(&self, name: &str) -> Vec { - let mut result = vec!(); - for (name, data) in self.receiver.try_iter() { - result.push(data); - } - result - } -} - - -impl SlowJobPool { - pub fn new() -> Self { - let (sender,receiver) = crossbeam_channel::unbounded(); - Self { - next_id: AtomicU64::new(0), - groups: RwLock::new(HashMap::new()), - queue: RwLock::new(HashMap::new()), - finished: RwLock::new(HashMap::new()), - running_jobs: RwLock::new(HashMap::new()), - receiver, - sender, - global_limit: Box::new(|n| n/2 + n/4), - } - } - - pub fn get(&self, name: &str) -> Arc> where D: Sized + Send + 'static { - let lock = self.groups.write().unwrap(); - if let Some(group) = lock.get(name) { - if group.type_id() == Arc> + fn queue(&self, name: &str, f: F) -> Queue + where + F: FnOnce() + Send + Sync + 'static, + { + let internal = Arc::clone(&self.internal); + let spawned_total = Arc::clone( + &self + .internal + .configs + .read() + .unwrap() + .get(name) + .expect("can't spawn a non-configued slowjob") + .spawned_total, + ); + let local_running_jobs_clone = { + let mut lock = self.internal.running_jobs.write().unwrap(); + Arc::clone(&lock.entry(name.to_string()).or_default()) }; - panic!("Unconfigured group name!"); + let local_running_jobs = Arc::clone(&local_running_jobs_clone); + let global_running_jobs_clone = Arc::clone(&self.internal.global_running_jobs); + let _name_clones = name.to_string(); + Queue { + task: Box::new(move || { + common_base::prof_span!(_guard, &_name_clones); + f(); + local_running_jobs_clone.fetch_sub(1, Ordering::Relaxed); + global_running_jobs_clone.fetch_sub(1, Ordering::Relaxed); + // directly maintain the next task afterwards + internal.maintain(); + }), + spawned_total, + local_running_jobs, + } } - fn maintain(&self) { - /* - let mut lock = self.queue.write().unwrap(); + pub fn cancel(&self, job: SlowJob) { + let mut lock = self.internal.queue.write().unwrap(); if let Some(map) = lock.get_mut(&job.name) { map.remove(&job.id); } - */ - - //let d = rayon::spawn(f); } -} \ No newline at end of file + + fn maintain(&self) { self.internal.maintain() } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::{ + sync::Mutex, + time::{Duration, Instant}, + }; + + fn mock_fn( + name: &str, + start_time: &Arc>>, + done: &Arc, + ) -> impl FnOnce() { + let name = name.to_string(); + let start_time = Arc::clone(start_time); + let done = Arc::clone(done); + move || { + println!("Start {}", name); + *start_time.lock().unwrap() = Some(Instant::now()); + std::thread::sleep(Duration::from_millis(500)); + done.fetch_add(1, Ordering::Relaxed); + println!("Finished {}", name); + } + } + + #[test] + fn global_limit() { + let threadpool = rayon::ThreadPoolBuilder::new() + .num_threads(4) + .build() + .unwrap(); + let pool = SlowJobPool::new(3, Arc::new(threadpool)); + pool.configure("FOO", |_| 1000); + let start = Instant::now(); + let f1 = Arc::new(Mutex::new(None)); + let f2 = Arc::new(Mutex::new(None)); + let f3 = Arc::new(Mutex::new(None)); + let f4 = Arc::new(Mutex::new(None)); + let f5 = Arc::new(Mutex::new(None)); + let f6 = Arc::new(Mutex::new(None)); + let f7 = Arc::new(Mutex::new(None)); + let done = Arc::new(AtomicU64::new(0)); + pool.spawn("FOO", mock_fn("foo1", &f1, &done)); + pool.spawn("FOO", mock_fn("foo2", &f2, &done)); + pool.spawn("FOO", mock_fn("foo3", &f3, &done)); + std::thread::sleep(Duration::from_millis(300)); + pool.spawn("FOO", mock_fn("foo4", &f4, &done)); + pool.spawn("FOO", mock_fn("foo5", &f5, &done)); + pool.spawn("FOO", mock_fn("foo6", &f6, &done)); + std::thread::sleep(Duration::from_millis(300)); + pool.spawn("FOO", mock_fn("foo7", &f7, &done)); + std::thread::sleep(Duration::from_secs(1)); + let measure = |a: Arc>>, s: Instant| { + a.lock().unwrap().unwrap().duration_since(s).as_millis() + }; + let f1 = measure(f1, start); + let f2 = measure(f2, start); + let f3 = measure(f3, start); + let f4 = measure(f4, start); + let f5 = measure(f5, start); + let f6 = measure(f6, start); + let f7 = measure(f7, start); + assert_eq!(done.load(Ordering::Relaxed), 7); + assert!(f1 < 500); + assert!(f2 < 500); + assert!(f3 < 500); + assert!(f4 < 1000); + assert!(f5 < 1000); + assert!(f6 < 1000); + assert!(f7 < 1500); + } + + #[test] + fn local_limit() { + let threadpool = rayon::ThreadPoolBuilder::new() + .num_threads(4) + .build() + .unwrap(); + let pool = SlowJobPool::new(100, Arc::new(threadpool)); + pool.configure("FOO", |_| 3); + let start = Instant::now(); + let f1 = Arc::new(Mutex::new(None)); + let f2 = Arc::new(Mutex::new(None)); + let f3 = Arc::new(Mutex::new(None)); + let f4 = Arc::new(Mutex::new(None)); + let f5 = Arc::new(Mutex::new(None)); + let f6 = Arc::new(Mutex::new(None)); + let f7 = Arc::new(Mutex::new(None)); + let done = Arc::new(AtomicU64::new(0)); + pool.spawn("FOO", mock_fn("foo1", &f1, &done)); + pool.spawn("FOO", mock_fn("foo2", &f2, &done)); + pool.spawn("FOO", mock_fn("foo3", &f3, &done)); + std::thread::sleep(Duration::from_millis(300)); + pool.spawn("FOO", mock_fn("foo4", &f4, &done)); + pool.spawn("FOO", mock_fn("foo5", &f5, &done)); + pool.spawn("FOO", mock_fn("foo6", &f6, &done)); + std::thread::sleep(Duration::from_millis(300)); + pool.spawn("FOO", mock_fn("foo7", &f7, &done)); + std::thread::sleep(Duration::from_secs(1)); + let measure = |a: Arc>>, s: Instant| { + a.lock().unwrap().unwrap().duration_since(s).as_millis() + }; + let f1 = measure(f1, start); + let f2 = measure(f2, start); + let f3 = measure(f3, start); + let f4 = measure(f4, start); + let f5 = measure(f5, start); + let f6 = measure(f6, start); + let f7 = measure(f7, start); + assert_eq!(done.load(Ordering::Relaxed), 7); + assert!(f1 < 500); + assert!(f2 < 500); + assert!(f3 < 500); + assert!(f4 < 1000); + assert!(f5 < 1000); + assert!(f6 < 1000); + assert!(f7 < 1500); + } + + #[test] + fn pool() { + let threadpool = rayon::ThreadPoolBuilder::new() + .num_threads(2) + .build() + .unwrap(); + let pool = SlowJobPool::new(2, Arc::new(threadpool)); + pool.configure("FOO", |n| n); + pool.configure("BAR", |n| n / 2); + let start = Instant::now(); + let f1 = Arc::new(Mutex::new(None)); + let f2 = Arc::new(Mutex::new(None)); + let b1 = Arc::new(Mutex::new(None)); + let b2 = Arc::new(Mutex::new(None)); + let done = Arc::new(AtomicU64::new(0)); + pool.spawn("FOO", mock_fn("foo1", &f1, &done)); + pool.spawn("FOO", mock_fn("foo2", &f2, &done)); + std::thread::sleep(Duration::from_millis(1000)); + pool.spawn("BAR", mock_fn("bar1", &b1, &done)); + pool.spawn("BAR", mock_fn("bar2", &b2, &done)); + std::thread::sleep(Duration::from_secs(2)); + let measure = |a: Arc>>, s: Instant| { + a.lock().unwrap().unwrap().duration_since(s).as_millis() + }; + let f1 = measure(f1, start); + let f2 = measure(f2, start); + let b1 = measure(b1, start); + let b2 = measure(b2, start); + // Expect: + // [F1, F2] + // [B1] + // [B2] + assert_eq!(done.load(Ordering::Relaxed), 4); + assert!(f1 < 500); + assert!(f2 < 500); + println!("b1 {}", b1); + println!("b2 {}", b2); + assert!((1000..1500).contains(&b1)); + assert!((1500..2000).contains(&b2)); + } +} diff --git a/common/sys/src/state.rs b/common/sys/src/state.rs index 13ff7b33ed..09cc242e14 100644 --- a/common/sys/src/state.rs +++ b/common/sys/src/state.rs @@ -7,6 +7,7 @@ use common::{ event::{EventBus, LocalEvent, ServerEvent}, region::RegionMap, resources::{DeltaTime, GameMode, PlayerEntity, Time, TimeOfDay}, + slowjob::SlowJobPool, terrain::{Block, TerrainChunk, TerrainGrid}, time::DayPeriod, trade::Trades, @@ -110,7 +111,7 @@ impl State { .unwrap(), ); Self { - ecs: Self::setup_ecs_world(game_mode), + ecs: Self::setup_ecs_world(game_mode, &thread_pool), thread_pool, } } @@ -118,7 +119,7 @@ impl State { /// Creates ecs world and registers all the common components and resources // TODO: Split up registering into server and client (e.g. move // EventBus to the server) - fn setup_ecs_world(game_mode: GameMode) -> specs::World { + fn setup_ecs_world(game_mode: GameMode, thread_pool: &Arc) -> specs::World { let mut ecs = specs::World::new(); // Uids for sync ecs.register_sync_marker(); @@ -206,6 +207,12 @@ impl State { ecs.insert(EventBus::::default()); ecs.insert(game_mode); ecs.insert(Vec::::new()); + + let slow_limit = thread_pool.current_num_threads().max(2) as u64; + let slow_limit = slow_limit / 2 + slow_limit / 4; + tracing::trace!(?slow_limit, "Slow Thread limit"); + ecs.insert(SlowJobPool::new(slow_limit, Arc::clone(&thread_pool))); + // TODO: only register on the server ecs.insert(EventBus::::default()); ecs.insert(comp::group::GroupManager::default()); @@ -317,6 +324,9 @@ impl State { /// Get a reference to this state's terrain. pub fn terrain(&self) -> Fetch { self.ecs.read_resource() } + /// Get a reference to this state's terrain. + pub fn slow_job_pool(&self) -> Fetch { self.ecs.read_resource() } + /// Get a writable reference to this state's terrain. pub fn terrain_mut(&self) -> FetchMut { self.ecs.write_resource() } diff --git a/server-cli/src/main.rs b/server-cli/src/main.rs index 550709a4d7..4fcc89b1ba 100644 --- a/server-cli/src/main.rs +++ b/server-cli/src/main.rs @@ -16,6 +16,7 @@ use crate::{cmd::Message, shutdown_coordinator::ShutdownCoordinator, tui_runner: use clap::{App, Arg, SubCommand}; use common::clock::Clock; use common_base::span; +use core::sync::atomic::{AtomicUsize, Ordering}; use server::{Event, Input, Server}; use std::{ io, @@ -95,6 +96,11 @@ fn main() -> io::Result<()> { let runtime = Arc::new( tokio::runtime::Builder::new_multi_thread() .enable_all() + .thread_name_fn(|| { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + format!("tokio-server-{}", id) + }) .build() .unwrap(), ); diff --git a/server/src/chunk_generator.rs b/server/src/chunk_generator.rs index de28f441c3..06f27de086 100644 --- a/server/src/chunk_generator.rs +++ b/server/src/chunk_generator.rs @@ -1,14 +1,13 @@ use crate::metrics::ChunkGenMetrics; #[cfg(not(feature = "worldgen"))] use crate::test_world::{IndexOwned, World}; -use common::{generation::ChunkSupplement, terrain::TerrainChunk}; +use common::{generation::ChunkSupplement, slowjob::SlowJobPool, terrain::TerrainChunk}; use hashbrown::{hash_map::Entry, HashMap}; use specs::Entity as EcsEntity; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; -use tokio::runtime::Runtime; use vek::*; #[cfg(feature = "worldgen")] use world::{IndexOwned, World}; @@ -40,7 +39,7 @@ impl ChunkGenerator { &mut self, entity: Option, key: Vec2, - runtime: &Runtime, + slowjob_pool: &SlowJobPool, world: Arc, index: IndexOwned, ) { @@ -53,8 +52,7 @@ impl ChunkGenerator { v.insert(Arc::clone(&cancel)); let chunk_tx = self.chunk_tx.clone(); self.metrics.chunks_requested.inc(); - runtime.spawn_blocking(move || { - common_base::prof_span!(_guard, "generate_chunk"); + slowjob_pool.spawn("CHUNK_GENERATOR", move || { let index = index.as_index_ref(); let payload = world .generate_chunk(index, key, || cancel.load(Ordering::Relaxed)) diff --git a/server/src/lib.rs b/server/src/lib.rs index 05179fb637..b2741aefd4 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -64,6 +64,7 @@ use common::{ recipe::default_recipe_book, resources::TimeOfDay, rtsim::RtSimEntity, + slowjob::SlowJobPool, terrain::TerrainChunkSize, uid::UidAllocator, vol::{ReadVol, RectVolSize}, @@ -197,6 +198,10 @@ impl Server { state.ecs_mut().insert(ecs_system_metrics); state.ecs_mut().insert(tick_metrics); state.ecs_mut().insert(physics_metrics); + state + .ecs_mut() + .write_resource::() + .configure("CHUNK_GENERATOR", |n| n / 2 + n / 4); state .ecs_mut() .insert(ChunkGenerator::new(chunk_gen_metrics)); @@ -646,9 +651,9 @@ impl Server { // only work we do here on the fast path is perform a relaxed read on an atomic. // boolean. let index = &mut self.index; - let runtime = &mut self.runtime; let world = &mut self.world; let ecs = self.state.ecs_mut(); + let slow_jobs = ecs.write_resource::(); index.reload_colors_if_changed(|index| { let mut chunk_generator = ecs.write_resource::(); @@ -667,7 +672,7 @@ impl Server { chunk_generator.generate_chunk( None, pos, - runtime, + &slow_jobs, Arc::clone(&world), index.clone(), ); @@ -811,16 +816,15 @@ impl Server { pub fn notify_players(&mut self, msg: ServerGeneral) { self.state.notify_players(msg); } pub fn generate_chunk(&mut self, entity: EcsEntity, key: Vec2) { - self.state - .ecs() - .write_resource::() - .generate_chunk( - Some(entity), - key, - &self.runtime, - Arc::clone(&self.world), - self.index.clone(), - ); + let ecs = self.state.ecs(); + let slow_jobs = ecs.write_resource::(); + ecs.write_resource::().generate_chunk( + Some(entity), + key, + &slow_jobs, + Arc::clone(&self.world), + self.index.clone(), + ); } fn process_chat_cmd(&mut self, entity: EcsEntity, cmd: String) { @@ -1005,7 +1009,7 @@ impl Server { // rng.gen_range(-e/2..e/2 + 1)); let pos = comp::Pos(Vec3::from(world_dims_blocks.map(|e| e as f32 / 2.0))); self.state - .create_persister(pos, view_distance, &self.world, &self.index, &self.runtime) + .create_persister(pos, view_distance, &self.world, &self.index) .build(); } } diff --git a/server/src/state_ext.rs b/server/src/state_ext.rs index e0069e0dda..d49c9989f3 100644 --- a/server/src/state_ext.rs +++ b/server/src/state_ext.rs @@ -11,6 +11,7 @@ use common::{ Inventory, }, effect::Effect, + slowjob::SlowJobPool, uid::{Uid, UidAllocator}, }; use common_net::{ @@ -80,7 +81,6 @@ pub trait StateExt { view_distance: u32, world: &std::sync::Arc, index: &world::IndexOwned, - runtime: &tokio::runtime::Runtime, ) -> EcsEntityBuilder; /// Insert common/default components for a new character joining the server fn initialize_character_data(&mut self, entity: EcsEntity, character_id: CharacterId); @@ -327,15 +327,15 @@ impl StateExt for State { view_distance: u32, world: &std::sync::Arc, index: &world::IndexOwned, - runtime: &tokio::runtime::Runtime, ) -> EcsEntityBuilder { use common::{terrain::TerrainChunkSize, vol::RectVolSize}; use std::sync::Arc; // Request chunks { - let mut chunk_generator = self - .ecs() - .write_resource::(); + let ecs = self.ecs(); + let slow_jobs = ecs.write_resource::(); + let mut chunk_generator = + ecs.write_resource::(); let chunk_pos = self.terrain().pos_key(pos.0.map(|e| e as i32)); (-(view_distance as i32)..view_distance as i32 + 1) .flat_map(|x| { @@ -351,7 +351,7 @@ impl StateExt for State { * TerrainChunkSize::RECT_SIZE.x as f64 }) .for_each(|chunk_key| { - chunk_generator.generate_chunk(None, chunk_key, runtime, Arc::clone(world), index.clone()); + chunk_generator.generate_chunk(None, chunk_key, &slow_jobs, Arc::clone(world), index.clone()); }); } diff --git a/voxygen/src/ecs/mod.rs b/voxygen/src/ecs/mod.rs index 8323c598bc..0f432ebf9e 100644 --- a/voxygen/src/ecs/mod.rs +++ b/voxygen/src/ecs/mod.rs @@ -2,7 +2,7 @@ pub mod comp; pub mod sys; use crate::audio::sfx::SfxEventItem; -use common::event::EventBus; +use common::{event::EventBus, slowjob::SlowJobPool}; use specs::{Entity, World, WorldExt}; #[derive(Copy, Clone, Debug)] @@ -12,6 +12,12 @@ pub fn init(world: &mut World) { world.register::(); world.register::(); + { + let pool = world.read_resource::(); + pool.configure("FIGURE_MESHING", |n| n / 2); + pool.configure("TERRAIN_MESHING", |n| n / 2); + } + // Voxygen event buses world.insert(EventBus::::default()); } diff --git a/voxygen/src/menu/char_selection/mod.rs b/voxygen/src/menu/char_selection/mod.rs index 6f2055b2ae..b51f4bfdd1 100644 --- a/voxygen/src/menu/char_selection/mod.rs +++ b/voxygen/src/menu/char_selection/mod.rs @@ -148,7 +148,7 @@ impl PlayState for CharSelectionState { time: client.state().get_time(), delta_time: client.state().ecs().read_resource::().0, tick: client.get_tick(), - runtime: client.runtime(), + slow_job_pool: &client.state().slow_job_pool(), body: humanoid_body, gamma: global_state.settings.graphics.gamma, exposure: global_state.settings.graphics.exposure, diff --git a/voxygen/src/scene/figure/cache.rs b/voxygen/src/scene/figure/cache.rs index 487a6d2dbe..8ac24de101 100644 --- a/voxygen/src/scene/figure/cache.rs +++ b/voxygen/src/scene/figure/cache.rs @@ -21,13 +21,13 @@ use common::{ CharacterState, }, figure::Segment, + slowjob::SlowJobPool, vol::BaseVol, }; use core::{hash::Hash, ops::Range}; use crossbeam::atomic; use hashbrown::{hash_map::Entry, HashMap}; use std::sync::Arc; -use tokio::runtime::Runtime; use vek::*; /// A type produced by mesh worker threads corresponding to the information @@ -338,7 +338,7 @@ where tick: u64, camera_mode: CameraMode, character_state: Option<&CharacterState>, - runtime: &Runtime, + slow_jobs: &SlowJobPool, ) -> (FigureModelEntryLod<'c>, &'c Skel::Attr) where for<'a> &'a Skel::Body: Into, @@ -404,7 +404,7 @@ where let manifests = self.manifests; let slot_ = Arc::clone(&slot); - runtime.spawn_blocking(move || { + slow_jobs.spawn("FIGURE_MESHING", move || { // First, load all the base vertex data. let manifests = &*manifests.read(); let meshes = ::bone_meshes(&key, manifests); diff --git a/voxygen/src/scene/figure/mod.rs b/voxygen/src/scene/figure/mod.rs index c931efcd7f..0771ec4a1e 100644 --- a/voxygen/src/scene/figure/mod.rs +++ b/voxygen/src/scene/figure/mod.rs @@ -561,6 +561,7 @@ impl FigureMgr { }; let camera_mode = camera.get_mode(); let character_state_storage = state.read_storage::(); + let slow_jobs = state.slow_job_pool(); let character_state = character_state_storage.get(scene_data.player_entity); let focus_pos = anim::vek::Vec3::::from(camera.get_focus_pos()); @@ -756,7 +757,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.runtime, + &slow_jobs, ); let state = self @@ -1552,7 +1553,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.runtime, + &slow_jobs, ); let state = self @@ -1754,7 +1755,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.runtime, + &slow_jobs, ); let state = self @@ -2081,7 +2082,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.runtime, + &slow_jobs, ); let state = self @@ -2440,7 +2441,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.runtime, + &slow_jobs, ); let state = self @@ -2550,7 +2551,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.runtime, + &slow_jobs, ); let state = self @@ -2639,7 +2640,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.runtime, + &slow_jobs, ); let state = self @@ -2985,7 +2986,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.runtime, + &slow_jobs, ); let state = @@ -3079,7 +3080,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.runtime, + &slow_jobs, ); let state = self @@ -3266,7 +3267,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.runtime, + &slow_jobs, ); let state = self @@ -3357,7 +3358,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.runtime, + &slow_jobs, ); let state = self @@ -3446,7 +3447,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.runtime, + &slow_jobs, ); let state = self @@ -3882,7 +3883,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.runtime, + &slow_jobs, ); let state = @@ -4066,7 +4067,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.runtime, + &slow_jobs, ); let state = @@ -4194,7 +4195,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.runtime, + &slow_jobs, ); let state = self diff --git a/voxygen/src/scene/mod.rs b/voxygen/src/scene/mod.rs index cc3ffb706e..ca08799cfd 100644 --- a/voxygen/src/scene/mod.rs +++ b/voxygen/src/scene/mod.rs @@ -36,7 +36,6 @@ use common_sys::state::State; use comp::item::Reagent; use num::traits::{Float, FloatConst}; use specs::{Entity as EcsEntity, Join, WorldExt}; -use tokio::runtime::Runtime; use vek::*; // TODO: Don't hard-code this. @@ -115,7 +114,6 @@ pub struct SceneData<'a> { pub loaded_distance: f32, pub view_distance: u32, pub tick: u64, - pub runtime: &'a Runtime, pub gamma: f32, pub exposure: f32, pub ambiance: f32, diff --git a/voxygen/src/scene/simple.rs b/voxygen/src/scene/simple.rs index bc735af9e1..4901a5972c 100644 --- a/voxygen/src/scene/simple.rs +++ b/voxygen/src/scene/simple.rs @@ -26,10 +26,10 @@ use common::{ item::ItemKind, }, figure::Segment, + slowjob::SlowJobPool, terrain::BlockKind, vol::{BaseVol, ReadVol}, }; -use tokio::runtime::Runtime; use tracing::error; use vek::*; use winit::event::MouseButton; @@ -97,7 +97,7 @@ pub struct SceneData<'a> { pub time: f64, pub delta_time: f32, pub tick: u64, - pub runtime: &'a Runtime, + pub slow_job_pool: &'a SlowJobPool, pub body: Option, pub gamma: f32, pub exposure: f32, @@ -357,7 +357,7 @@ impl Scene { scene_data.tick, CameraMode::default(), None, - scene_data.runtime, + scene_data.slow_job_pool, ) .0; let mut buf = [Default::default(); anim::MAX_BONE_COUNT]; diff --git a/voxygen/src/scene/terrain.rs b/voxygen/src/scene/terrain.rs index 6faf7aa42e..5052d1e9bd 100644 --- a/voxygen/src/scene/terrain.rs +++ b/voxygen/src/scene/terrain.rs @@ -768,21 +768,24 @@ impl Terrain { let sprite_config = Arc::clone(&self.sprite_config); let cnt = Arc::clone(&self.mesh_todos_active); cnt.fetch_add(1, Ordering::Relaxed); - scene_data.runtime.spawn_blocking(move || { - let sprite_data = sprite_data; - let _ = send.send(mesh_worker( - pos, - (min_z as f32, max_z as f32), - started_tick, - volume, - max_texture_size, - chunk, - aabb, - &sprite_data, - &sprite_config, - )); - cnt.fetch_sub(1, Ordering::Relaxed); - }); + scene_data + .state + .slow_job_pool() + .spawn("TERRAIN_MESHING", move || { + let sprite_data = sprite_data; + let _ = send.send(mesh_worker( + pos, + (min_z as f32, max_z as f32), + started_tick, + volume, + max_texture_size, + chunk, + aabb, + &sprite_data, + &sprite_config, + )); + cnt.fetch_sub(1, Ordering::Relaxed); + }); todo.is_worker_active = true; } drop(guard); diff --git a/voxygen/src/session.rs b/voxygen/src/session.rs index 791db45343..b5d1425b42 100644 --- a/voxygen/src/session.rs +++ b/voxygen/src/session.rs @@ -1442,7 +1442,6 @@ impl PlayState for SessionState { loaded_distance: client.loaded_distance(), view_distance: client.view_distance().unwrap_or(1), tick: client.get_tick(), - runtime: &client.runtime(), gamma: global_state.settings.graphics.gamma, exposure: global_state.settings.graphics.exposure, ambiance: global_state.settings.graphics.ambiance, @@ -1510,7 +1509,6 @@ impl PlayState for SessionState { loaded_distance: client.loaded_distance(), view_distance: client.view_distance().unwrap_or(1), tick: client.get_tick(), - runtime: &client.runtime(), gamma: settings.graphics.gamma, exposure: settings.graphics.exposure, ambiance: settings.graphics.ambiance,