create a wrapper around rayon, slowjobpool, that spawns on rayon but drop feed it

This commit is contained in:
Marcel Märtens 2021-03-15 23:50:26 +01:00
parent 0ec863b236
commit a286eb084a
15 changed files with 454 additions and 167 deletions

View File

@ -61,6 +61,7 @@ pub mod resources;
#[cfg(not(target_arch = "wasm32"))] pub mod rtsim; #[cfg(not(target_arch = "wasm32"))] pub mod rtsim;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub mod skillset_builder; pub mod skillset_builder;
pub mod slowjob;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub mod spiral; pub mod spiral;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
@ -76,7 +77,6 @@ pub mod uid;
#[cfg(not(target_arch = "wasm32"))] pub mod vol; #[cfg(not(target_arch = "wasm32"))] pub mod vol;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub mod volumes; pub mod volumes;
pub mod slowjob;
pub use combat::DamageSource; pub use combat::DamageSource;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]

View File

@ -1,37 +1,35 @@
use std::sync::RwLock; use hashbrown::HashMap;
use std::collections::HashMap; use rayon::ThreadPool;
use std::sync::Arc; use std::sync::{
use std::sync::atomic::{Ordering, AtomicU64}; atomic::{AtomicU64, Ordering},
use core::any::Any; Arc, RwLock,
use crossbeam_channel::{Receiver, Sender}; };
type LimitFn = dyn Fn(u64) -> u64; /// Provides a Wrapper around rayon threadpool to execute slow-jobs.
/// slow means, the job doesn't need to not complete within the same tick.
/// a slow job is a CPU heavy task, that is not I/O blocking. /// DO NOT USE I/O blocking jobs, but only CPU heavy jobs.
/// It usually takes longer than a tick to compute, so it's outsourced /// Jobs run here, will reduce the ammount of threads rayon can use during the
/// Internally the rayon threadpool is used to calculate t /// main tick.
pub struct SlowJobGroup<D> { ///
name: String, /// This Pool allows you to configure certain names of jobs and assign them a
next_id: Arc<AtomicU64>, /// maximum number of threads # Example
queue: Arc<RwLock<HashMap<u64, Queue>>>, /// Your system has 16 cores, you assign 12 cores for slow-jobs.
local_running_jobs: Arc<AtomicU64>, /// Then you can configure all jobs with the name `CHUNK_GENERATOR` to spawn on
global_running_jobs: Arc<AtomicU64>, /// max 50% (6 = cores) ```rust
receiver: Arc<Receiver<(String, D)>>, /// # use veloren_common::slowjob::SlowJobPool;
sender: Arc<Sender<(String, D)>>, /// # use std::sync::Arc;
} ///
/// let threadpool = rayon::ThreadPoolBuilder::new()
/// a slow job is a CPU heavy task, that is not I/O blocking. /// .num_threads(16)
/// It usually takes longer than a tick to compute, so it's outsourced /// .build()
/// Internally the rayon threadpool is used to calculate t /// .unwrap();
/// let pool = SlowJobPool::new(3, Arc::new(threadpool));
/// pool.configure("CHUNK_GENERATOR", |n| n / 2);
/// pool.spawn("CHUNK_GENERATOR", move || println("this is a job"));
/// ```
#[derive(Clone)]
pub struct SlowJobPool { pub struct SlowJobPool {
next_id: AtomicU64, internal: Arc<InternalSlowJobPool>,
groups: RwLock<HashMap<String, Arc<SlowJobGroup<Box<dyn Any>>>>>,
queue: RwLock<HashMap<String, HashMap<u64, Queue>>>,
finished: RwLock<HashMap<String, Vec<Box<dyn Any>>>>,
running_jobs: RwLock<HashMap<String, Arc<AtomicU64>>>,
receiver: Receiver<(String, Box<dyn Any>)>,
sender: Sender<(String, Box<dyn Any>)>,
global_limit: Box<LimitFn>,
} }
pub struct SlowJob { pub struct SlowJob {
@ -39,85 +37,350 @@ pub struct SlowJob {
id: u64, id: u64,
} }
struct Queue { struct InternalSlowJobPool {
task: Box<dyn FnOnce() -> ()>, next_id: Arc<AtomicU64>,
running_cnt: Arc<AtomicU64>, queue: RwLock<HashMap<String, HashMap<u64, Queue>>>,
running_jobs: RwLock<HashMap<String, Arc<AtomicU64>>>,
configs: RwLock<HashMap<String, Config>>,
global_running_jobs: Arc<AtomicU64>,
global_limit: u64,
threadpool: Arc<ThreadPool>,
} }
impl<D> SlowJobGroup<D> where struct Config {
D: Any + Send + 'static max_local: u64,
{ spawned_total: Arc<AtomicU64>,
/// spawn a new slow job }
pub fn spawn<F>(&self, name: &str, f: F) -> SlowJob where
F: FnOnce() -> D + 'static, struct Queue {
{ task: Box<dyn FnOnce() + Send + Sync + 'static>,
let id = self.next_id.fetch_add(1, Ordering::Relaxed); spawned_total: Arc<AtomicU64>,
let local_running_jobs_clone = Arc::clone(&self.local_running_jobs); local_running_jobs: Arc<AtomicU64>,
let global_running_jobs_clone = Arc::clone(&self.global_running_jobs); }
let sender = self.sender.clone();
let name_clone = name.to_string(); impl InternalSlowJobPool {
let queue = Queue { pub fn new(global_limit: u64, threadpool: Arc<ThreadPool>) -> Self {
task: Box::new(move || { Self {
let result = f(); next_id: Arc::new(AtomicU64::new(0)),
let _ = sender.send((name_clone, result)); queue: RwLock::new(HashMap::new()),
local_running_jobs_clone.fetch_sub(1, Ordering::Relaxed); running_jobs: RwLock::new(HashMap::new()),
global_running_jobs_clone.fetch_sub(1, Ordering::Relaxed); configs: RwLock::new(HashMap::new()),
}), global_running_jobs: Arc::new(AtomicU64::new(0)),
running_cnt: Arc::clone(&self.local_running_jobs), global_limit,
threadpool,
}
}
fn maintain(&self) {
let jobs_available = self.global_limit - self.global_running_jobs.load(Ordering::Relaxed);
if jobs_available == 0 {
// we run at limit, can't spawn
return;
}
let possible = {
let lock = self.queue.read().unwrap();
lock.iter()
.map(|(name, queues)| {
if !queues.is_empty() {
Some(name.clone())
} else {
None
}
})
.flatten()
.collect::<Vec<_>>()
}; };
self.queue.write().unwrap().insert(id, queue);
let mut possible_total = {
let mut possible = possible;
let lock = self.configs.read().unwrap();
possible
.drain(..)
.map(|name| {
let c = lock.get(&name).unwrap();
(
name,
c.spawned_total.load(Ordering::Relaxed) / c.max_local,
c.max_local,
)
})
.collect::<Vec<_>>()
};
possible_total.sort_by_key(|(_, i, _)| *i);
let mut lock = self.queue.write().unwrap();
for i in 0..jobs_available as usize {
if let Some((name, _, max)) = possible_total.get(i) {
if let Some(map) = lock.get_mut(name) {
let firstkey = match map.keys().next() {
Some(k) => *k,
None => continue,
};
if let Some(queue) = map.remove(&firstkey) {
if queue.local_running_jobs.load(Ordering::Relaxed) < *max {
self.fire(queue);
} else {
map.insert(firstkey, queue);
}
}
}
}
}
}
fn fire(&self, queue: Queue) {
queue.spawned_total.fetch_add(1, Ordering::Relaxed);
queue.local_running_jobs.fetch_add(1, Ordering::Relaxed);
self.global_running_jobs.fetch_add(1, Ordering::Relaxed);
self.threadpool.spawn(queue.task);
}
}
impl SlowJobPool {
pub fn new(global_limit: u64, threadpool: Arc<ThreadPool>) -> Self {
Self {
internal: Arc::new(InternalSlowJobPool::new(global_limit, threadpool)),
}
}
/// configure a NAME to spawn up to f(n) threads, depending on how many
/// threads we globally have available
pub fn configure<F>(&self, name: &str, f: F)
where
F: Fn(u64) -> u64,
{
let cnf = Config {
max_local: f(self.internal.global_limit),
spawned_total: Arc::new(AtomicU64::new(0)),
};
let mut lock = self.internal.configs.write().unwrap();
lock.insert(name.to_string(), cnf);
}
/// spawn a new slow job on a certain NAME
pub fn spawn<F>(&self, name: &str, f: F) -> SlowJob
where
F: FnOnce() + Send + Sync + 'static,
{
let id = self.internal.next_id.fetch_add(1, Ordering::Relaxed);
self.internal
.queue
.write()
.unwrap()
.entry(name.to_string())
.or_default()
.insert(id, self.queue(name, f));
self.maintain();
SlowJob { SlowJob {
name: name.to_string(), name: name.to_string(),
id, id,
} }
} }
pub fn cancel(&self, job: SlowJob) { fn queue<F>(&self, name: &str, f: F) -> Queue
self.queue.write().unwrap().remove(&job.id); where
} F: FnOnce() + Send + Sync + 'static,
{
/// collect all slow jobs finished let internal = Arc::clone(&self.internal);
pub fn collect(&self, name: &str) -> Vec<D> { let spawned_total = Arc::clone(
let mut result = vec!(); &self
for (name, data) in self.receiver.try_iter() { .internal
result.push(data); .configs
} .read()
result .unwrap()
} .get(name)
} .expect("can't spawn a non-configued slowjob")
.spawned_total,
);
impl SlowJobPool { let local_running_jobs_clone = {
pub fn new() -> Self { let mut lock = self.internal.running_jobs.write().unwrap();
let (sender,receiver) = crossbeam_channel::unbounded(); Arc::clone(&lock.entry(name.to_string()).or_default())
Self {
next_id: AtomicU64::new(0),
groups: RwLock::new(HashMap::new()),
queue: RwLock::new(HashMap::new()),
finished: RwLock::new(HashMap::new()),
running_jobs: RwLock::new(HashMap::new()),
receiver,
sender,
global_limit: Box::new(|n| n/2 + n/4),
}
}
pub fn get<D>(&self, name: &str) -> Arc<SlowJobGroup<D>> where D: Sized + Send + 'static {
let lock = self.groups.write().unwrap();
if let Some(group) = lock.get(name) {
if group.type_id() == Arc<SlowJobGroup<Box>>
}; };
panic!("Unconfigured group name!"); let local_running_jobs = Arc::clone(&local_running_jobs_clone);
let global_running_jobs_clone = Arc::clone(&self.internal.global_running_jobs);
let _name_clones = name.to_string();
Queue {
task: Box::new(move || {
common_base::prof_span!(_guard, &_name_clones);
f();
local_running_jobs_clone.fetch_sub(1, Ordering::Relaxed);
global_running_jobs_clone.fetch_sub(1, Ordering::Relaxed);
// directly maintain the next task afterwards
internal.maintain();
}),
spawned_total,
local_running_jobs,
}
} }
fn maintain(&self) { pub fn cancel(&self, job: SlowJob) {
/* let mut lock = self.internal.queue.write().unwrap();
let mut lock = self.queue.write().unwrap();
if let Some(map) = lock.get_mut(&job.name) { if let Some(map) = lock.get_mut(&job.name) {
map.remove(&job.id); map.remove(&job.id);
} }
*/
//let d = rayon::spawn(f);
} }
}
fn maintain(&self) { self.internal.maintain() }
}
#[cfg(test)]
mod tests {
use super::*;
use std::{
sync::Mutex,
time::{Duration, Instant},
};
fn mock_fn(
name: &str,
start_time: &Arc<Mutex<Option<Instant>>>,
done: &Arc<AtomicU64>,
) -> impl FnOnce() {
let name = name.to_string();
let start_time = Arc::clone(start_time);
let done = Arc::clone(done);
move || {
println!("Start {}", name);
*start_time.lock().unwrap() = Some(Instant::now());
std::thread::sleep(Duration::from_millis(500));
done.fetch_add(1, Ordering::Relaxed);
println!("Finished {}", name);
}
}
#[test]
fn global_limit() {
let threadpool = rayon::ThreadPoolBuilder::new()
.num_threads(4)
.build()
.unwrap();
let pool = SlowJobPool::new(3, Arc::new(threadpool));
pool.configure("FOO", |_| 1000);
let start = Instant::now();
let f1 = Arc::new(Mutex::new(None));
let f2 = Arc::new(Mutex::new(None));
let f3 = Arc::new(Mutex::new(None));
let f4 = Arc::new(Mutex::new(None));
let f5 = Arc::new(Mutex::new(None));
let f6 = Arc::new(Mutex::new(None));
let f7 = Arc::new(Mutex::new(None));
let done = Arc::new(AtomicU64::new(0));
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
pool.spawn("FOO", mock_fn("foo3", &f3, &done));
std::thread::sleep(Duration::from_millis(300));
pool.spawn("FOO", mock_fn("foo4", &f4, &done));
pool.spawn("FOO", mock_fn("foo5", &f5, &done));
pool.spawn("FOO", mock_fn("foo6", &f6, &done));
std::thread::sleep(Duration::from_millis(300));
pool.spawn("FOO", mock_fn("foo7", &f7, &done));
std::thread::sleep(Duration::from_secs(1));
let measure = |a: Arc<Mutex<Option<Instant>>>, s: Instant| {
a.lock().unwrap().unwrap().duration_since(s).as_millis()
};
let f1 = measure(f1, start);
let f2 = measure(f2, start);
let f3 = measure(f3, start);
let f4 = measure(f4, start);
let f5 = measure(f5, start);
let f6 = measure(f6, start);
let f7 = measure(f7, start);
assert_eq!(done.load(Ordering::Relaxed), 7);
assert!(f1 < 500);
assert!(f2 < 500);
assert!(f3 < 500);
assert!(f4 < 1000);
assert!(f5 < 1000);
assert!(f6 < 1000);
assert!(f7 < 1500);
}
#[test]
fn local_limit() {
let threadpool = rayon::ThreadPoolBuilder::new()
.num_threads(4)
.build()
.unwrap();
let pool = SlowJobPool::new(100, Arc::new(threadpool));
pool.configure("FOO", |_| 3);
let start = Instant::now();
let f1 = Arc::new(Mutex::new(None));
let f2 = Arc::new(Mutex::new(None));
let f3 = Arc::new(Mutex::new(None));
let f4 = Arc::new(Mutex::new(None));
let f5 = Arc::new(Mutex::new(None));
let f6 = Arc::new(Mutex::new(None));
let f7 = Arc::new(Mutex::new(None));
let done = Arc::new(AtomicU64::new(0));
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
pool.spawn("FOO", mock_fn("foo3", &f3, &done));
std::thread::sleep(Duration::from_millis(300));
pool.spawn("FOO", mock_fn("foo4", &f4, &done));
pool.spawn("FOO", mock_fn("foo5", &f5, &done));
pool.spawn("FOO", mock_fn("foo6", &f6, &done));
std::thread::sleep(Duration::from_millis(300));
pool.spawn("FOO", mock_fn("foo7", &f7, &done));
std::thread::sleep(Duration::from_secs(1));
let measure = |a: Arc<Mutex<Option<Instant>>>, s: Instant| {
a.lock().unwrap().unwrap().duration_since(s).as_millis()
};
let f1 = measure(f1, start);
let f2 = measure(f2, start);
let f3 = measure(f3, start);
let f4 = measure(f4, start);
let f5 = measure(f5, start);
let f6 = measure(f6, start);
let f7 = measure(f7, start);
assert_eq!(done.load(Ordering::Relaxed), 7);
assert!(f1 < 500);
assert!(f2 < 500);
assert!(f3 < 500);
assert!(f4 < 1000);
assert!(f5 < 1000);
assert!(f6 < 1000);
assert!(f7 < 1500);
}
#[test]
fn pool() {
let threadpool = rayon::ThreadPoolBuilder::new()
.num_threads(2)
.build()
.unwrap();
let pool = SlowJobPool::new(2, Arc::new(threadpool));
pool.configure("FOO", |n| n);
pool.configure("BAR", |n| n / 2);
let start = Instant::now();
let f1 = Arc::new(Mutex::new(None));
let f2 = Arc::new(Mutex::new(None));
let b1 = Arc::new(Mutex::new(None));
let b2 = Arc::new(Mutex::new(None));
let done = Arc::new(AtomicU64::new(0));
pool.spawn("FOO", mock_fn("foo1", &f1, &done));
pool.spawn("FOO", mock_fn("foo2", &f2, &done));
std::thread::sleep(Duration::from_millis(1000));
pool.spawn("BAR", mock_fn("bar1", &b1, &done));
pool.spawn("BAR", mock_fn("bar2", &b2, &done));
std::thread::sleep(Duration::from_secs(2));
let measure = |a: Arc<Mutex<Option<Instant>>>, s: Instant| {
a.lock().unwrap().unwrap().duration_since(s).as_millis()
};
let f1 = measure(f1, start);
let f2 = measure(f2, start);
let b1 = measure(b1, start);
let b2 = measure(b2, start);
// Expect:
// [F1, F2]
// [B1]
// [B2]
assert_eq!(done.load(Ordering::Relaxed), 4);
assert!(f1 < 500);
assert!(f2 < 500);
println!("b1 {}", b1);
println!("b2 {}", b2);
assert!((1000..1500).contains(&b1));
assert!((1500..2000).contains(&b2));
}
}

View File

@ -7,6 +7,7 @@ use common::{
event::{EventBus, LocalEvent, ServerEvent}, event::{EventBus, LocalEvent, ServerEvent},
region::RegionMap, region::RegionMap,
resources::{DeltaTime, GameMode, PlayerEntity, Time, TimeOfDay}, resources::{DeltaTime, GameMode, PlayerEntity, Time, TimeOfDay},
slowjob::SlowJobPool,
terrain::{Block, TerrainChunk, TerrainGrid}, terrain::{Block, TerrainChunk, TerrainGrid},
time::DayPeriod, time::DayPeriod,
trade::Trades, trade::Trades,
@ -110,7 +111,7 @@ impl State {
.unwrap(), .unwrap(),
); );
Self { Self {
ecs: Self::setup_ecs_world(game_mode), ecs: Self::setup_ecs_world(game_mode, &thread_pool),
thread_pool, thread_pool,
} }
} }
@ -118,7 +119,7 @@ impl State {
/// Creates ecs world and registers all the common components and resources /// Creates ecs world and registers all the common components and resources
// TODO: Split up registering into server and client (e.g. move // TODO: Split up registering into server and client (e.g. move
// EventBus<ServerEvent> to the server) // EventBus<ServerEvent> to the server)
fn setup_ecs_world(game_mode: GameMode) -> specs::World { fn setup_ecs_world(game_mode: GameMode, thread_pool: &Arc<ThreadPool>) -> specs::World {
let mut ecs = specs::World::new(); let mut ecs = specs::World::new();
// Uids for sync // Uids for sync
ecs.register_sync_marker(); ecs.register_sync_marker();
@ -206,6 +207,12 @@ impl State {
ecs.insert(EventBus::<LocalEvent>::default()); ecs.insert(EventBus::<LocalEvent>::default());
ecs.insert(game_mode); ecs.insert(game_mode);
ecs.insert(Vec::<common::outcome::Outcome>::new()); ecs.insert(Vec::<common::outcome::Outcome>::new());
let slow_limit = thread_pool.current_num_threads().max(2) as u64;
let slow_limit = slow_limit / 2 + slow_limit / 4;
tracing::trace!(?slow_limit, "Slow Thread limit");
ecs.insert(SlowJobPool::new(slow_limit, Arc::clone(&thread_pool)));
// TODO: only register on the server // TODO: only register on the server
ecs.insert(EventBus::<ServerEvent>::default()); ecs.insert(EventBus::<ServerEvent>::default());
ecs.insert(comp::group::GroupManager::default()); ecs.insert(comp::group::GroupManager::default());
@ -317,6 +324,9 @@ impl State {
/// Get a reference to this state's terrain. /// Get a reference to this state's terrain.
pub fn terrain(&self) -> Fetch<TerrainGrid> { self.ecs.read_resource() } pub fn terrain(&self) -> Fetch<TerrainGrid> { self.ecs.read_resource() }
/// Get a reference to this state's terrain.
pub fn slow_job_pool(&self) -> Fetch<SlowJobPool> { self.ecs.read_resource() }
/// Get a writable reference to this state's terrain. /// Get a writable reference to this state's terrain.
pub fn terrain_mut(&self) -> FetchMut<TerrainGrid> { self.ecs.write_resource() } pub fn terrain_mut(&self) -> FetchMut<TerrainGrid> { self.ecs.write_resource() }

View File

@ -16,6 +16,7 @@ use crate::{cmd::Message, shutdown_coordinator::ShutdownCoordinator, tui_runner:
use clap::{App, Arg, SubCommand}; use clap::{App, Arg, SubCommand};
use common::clock::Clock; use common::clock::Clock;
use common_base::span; use common_base::span;
use core::sync::atomic::{AtomicUsize, Ordering};
use server::{Event, Input, Server}; use server::{Event, Input, Server};
use std::{ use std::{
io, io,
@ -95,6 +96,11 @@ fn main() -> io::Result<()> {
let runtime = Arc::new( let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
.enable_all() .enable_all()
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("tokio-server-{}", id)
})
.build() .build()
.unwrap(), .unwrap(),
); );

View File

@ -1,14 +1,13 @@
use crate::metrics::ChunkGenMetrics; use crate::metrics::ChunkGenMetrics;
#[cfg(not(feature = "worldgen"))] #[cfg(not(feature = "worldgen"))]
use crate::test_world::{IndexOwned, World}; use crate::test_world::{IndexOwned, World};
use common::{generation::ChunkSupplement, terrain::TerrainChunk}; use common::{generation::ChunkSupplement, slowjob::SlowJobPool, terrain::TerrainChunk};
use hashbrown::{hash_map::Entry, HashMap}; use hashbrown::{hash_map::Entry, HashMap};
use specs::Entity as EcsEntity; use specs::Entity as EcsEntity;
use std::sync::{ use std::sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Arc, Arc,
}; };
use tokio::runtime::Runtime;
use vek::*; use vek::*;
#[cfg(feature = "worldgen")] #[cfg(feature = "worldgen")]
use world::{IndexOwned, World}; use world::{IndexOwned, World};
@ -40,7 +39,7 @@ impl ChunkGenerator {
&mut self, &mut self,
entity: Option<EcsEntity>, entity: Option<EcsEntity>,
key: Vec2<i32>, key: Vec2<i32>,
runtime: &Runtime, slowjob_pool: &SlowJobPool,
world: Arc<World>, world: Arc<World>,
index: IndexOwned, index: IndexOwned,
) { ) {
@ -53,8 +52,7 @@ impl ChunkGenerator {
v.insert(Arc::clone(&cancel)); v.insert(Arc::clone(&cancel));
let chunk_tx = self.chunk_tx.clone(); let chunk_tx = self.chunk_tx.clone();
self.metrics.chunks_requested.inc(); self.metrics.chunks_requested.inc();
runtime.spawn_blocking(move || { slowjob_pool.spawn("CHUNK_GENERATOR", move || {
common_base::prof_span!(_guard, "generate_chunk");
let index = index.as_index_ref(); let index = index.as_index_ref();
let payload = world let payload = world
.generate_chunk(index, key, || cancel.load(Ordering::Relaxed)) .generate_chunk(index, key, || cancel.load(Ordering::Relaxed))

View File

@ -64,6 +64,7 @@ use common::{
recipe::default_recipe_book, recipe::default_recipe_book,
resources::TimeOfDay, resources::TimeOfDay,
rtsim::RtSimEntity, rtsim::RtSimEntity,
slowjob::SlowJobPool,
terrain::TerrainChunkSize, terrain::TerrainChunkSize,
uid::UidAllocator, uid::UidAllocator,
vol::{ReadVol, RectVolSize}, vol::{ReadVol, RectVolSize},
@ -197,6 +198,10 @@ impl Server {
state.ecs_mut().insert(ecs_system_metrics); state.ecs_mut().insert(ecs_system_metrics);
state.ecs_mut().insert(tick_metrics); state.ecs_mut().insert(tick_metrics);
state.ecs_mut().insert(physics_metrics); state.ecs_mut().insert(physics_metrics);
state
.ecs_mut()
.write_resource::<SlowJobPool>()
.configure("CHUNK_GENERATOR", |n| n / 2 + n / 4);
state state
.ecs_mut() .ecs_mut()
.insert(ChunkGenerator::new(chunk_gen_metrics)); .insert(ChunkGenerator::new(chunk_gen_metrics));
@ -646,9 +651,9 @@ impl Server {
// only work we do here on the fast path is perform a relaxed read on an atomic. // only work we do here on the fast path is perform a relaxed read on an atomic.
// boolean. // boolean.
let index = &mut self.index; let index = &mut self.index;
let runtime = &mut self.runtime;
let world = &mut self.world; let world = &mut self.world;
let ecs = self.state.ecs_mut(); let ecs = self.state.ecs_mut();
let slow_jobs = ecs.write_resource::<SlowJobPool>();
index.reload_colors_if_changed(|index| { index.reload_colors_if_changed(|index| {
let mut chunk_generator = ecs.write_resource::<ChunkGenerator>(); let mut chunk_generator = ecs.write_resource::<ChunkGenerator>();
@ -667,7 +672,7 @@ impl Server {
chunk_generator.generate_chunk( chunk_generator.generate_chunk(
None, None,
pos, pos,
runtime, &slow_jobs,
Arc::clone(&world), Arc::clone(&world),
index.clone(), index.clone(),
); );
@ -811,16 +816,15 @@ impl Server {
pub fn notify_players(&mut self, msg: ServerGeneral) { self.state.notify_players(msg); } pub fn notify_players(&mut self, msg: ServerGeneral) { self.state.notify_players(msg); }
pub fn generate_chunk(&mut self, entity: EcsEntity, key: Vec2<i32>) { pub fn generate_chunk(&mut self, entity: EcsEntity, key: Vec2<i32>) {
self.state let ecs = self.state.ecs();
.ecs() let slow_jobs = ecs.write_resource::<SlowJobPool>();
.write_resource::<ChunkGenerator>() ecs.write_resource::<ChunkGenerator>().generate_chunk(
.generate_chunk( Some(entity),
Some(entity), key,
key, &slow_jobs,
&self.runtime, Arc::clone(&self.world),
Arc::clone(&self.world), self.index.clone(),
self.index.clone(), );
);
} }
fn process_chat_cmd(&mut self, entity: EcsEntity, cmd: String) { fn process_chat_cmd(&mut self, entity: EcsEntity, cmd: String) {
@ -1005,7 +1009,7 @@ impl Server {
// rng.gen_range(-e/2..e/2 + 1)); // rng.gen_range(-e/2..e/2 + 1));
let pos = comp::Pos(Vec3::from(world_dims_blocks.map(|e| e as f32 / 2.0))); let pos = comp::Pos(Vec3::from(world_dims_blocks.map(|e| e as f32 / 2.0)));
self.state self.state
.create_persister(pos, view_distance, &self.world, &self.index, &self.runtime) .create_persister(pos, view_distance, &self.world, &self.index)
.build(); .build();
} }
} }

View File

@ -11,6 +11,7 @@ use common::{
Inventory, Inventory,
}, },
effect::Effect, effect::Effect,
slowjob::SlowJobPool,
uid::{Uid, UidAllocator}, uid::{Uid, UidAllocator},
}; };
use common_net::{ use common_net::{
@ -80,7 +81,6 @@ pub trait StateExt {
view_distance: u32, view_distance: u32,
world: &std::sync::Arc<world::World>, world: &std::sync::Arc<world::World>,
index: &world::IndexOwned, index: &world::IndexOwned,
runtime: &tokio::runtime::Runtime,
) -> EcsEntityBuilder; ) -> EcsEntityBuilder;
/// Insert common/default components for a new character joining the server /// Insert common/default components for a new character joining the server
fn initialize_character_data(&mut self, entity: EcsEntity, character_id: CharacterId); fn initialize_character_data(&mut self, entity: EcsEntity, character_id: CharacterId);
@ -327,15 +327,15 @@ impl StateExt for State {
view_distance: u32, view_distance: u32,
world: &std::sync::Arc<world::World>, world: &std::sync::Arc<world::World>,
index: &world::IndexOwned, index: &world::IndexOwned,
runtime: &tokio::runtime::Runtime,
) -> EcsEntityBuilder { ) -> EcsEntityBuilder {
use common::{terrain::TerrainChunkSize, vol::RectVolSize}; use common::{terrain::TerrainChunkSize, vol::RectVolSize};
use std::sync::Arc; use std::sync::Arc;
// Request chunks // Request chunks
{ {
let mut chunk_generator = self let ecs = self.ecs();
.ecs() let slow_jobs = ecs.write_resource::<SlowJobPool>();
.write_resource::<crate::chunk_generator::ChunkGenerator>(); let mut chunk_generator =
ecs.write_resource::<crate::chunk_generator::ChunkGenerator>();
let chunk_pos = self.terrain().pos_key(pos.0.map(|e| e as i32)); let chunk_pos = self.terrain().pos_key(pos.0.map(|e| e as i32));
(-(view_distance as i32)..view_distance as i32 + 1) (-(view_distance as i32)..view_distance as i32 + 1)
.flat_map(|x| { .flat_map(|x| {
@ -351,7 +351,7 @@ impl StateExt for State {
* TerrainChunkSize::RECT_SIZE.x as f64 * TerrainChunkSize::RECT_SIZE.x as f64
}) })
.for_each(|chunk_key| { .for_each(|chunk_key| {
chunk_generator.generate_chunk(None, chunk_key, runtime, Arc::clone(world), index.clone()); chunk_generator.generate_chunk(None, chunk_key, &slow_jobs, Arc::clone(world), index.clone());
}); });
} }

View File

@ -2,7 +2,7 @@ pub mod comp;
pub mod sys; pub mod sys;
use crate::audio::sfx::SfxEventItem; use crate::audio::sfx::SfxEventItem;
use common::event::EventBus; use common::{event::EventBus, slowjob::SlowJobPool};
use specs::{Entity, World, WorldExt}; use specs::{Entity, World, WorldExt};
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
@ -12,6 +12,12 @@ pub fn init(world: &mut World) {
world.register::<comp::HpFloaterList>(); world.register::<comp::HpFloaterList>();
world.register::<comp::Interpolated>(); world.register::<comp::Interpolated>();
{
let pool = world.read_resource::<SlowJobPool>();
pool.configure("FIGURE_MESHING", |n| n / 2);
pool.configure("TERRAIN_MESHING", |n| n / 2);
}
// Voxygen event buses // Voxygen event buses
world.insert(EventBus::<SfxEventItem>::default()); world.insert(EventBus::<SfxEventItem>::default());
} }

View File

@ -148,7 +148,7 @@ impl PlayState for CharSelectionState {
time: client.state().get_time(), time: client.state().get_time(),
delta_time: client.state().ecs().read_resource::<DeltaTime>().0, delta_time: client.state().ecs().read_resource::<DeltaTime>().0,
tick: client.get_tick(), tick: client.get_tick(),
runtime: client.runtime(), slow_job_pool: &client.state().slow_job_pool(),
body: humanoid_body, body: humanoid_body,
gamma: global_state.settings.graphics.gamma, gamma: global_state.settings.graphics.gamma,
exposure: global_state.settings.graphics.exposure, exposure: global_state.settings.graphics.exposure,

View File

@ -21,13 +21,13 @@ use common::{
CharacterState, CharacterState,
}, },
figure::Segment, figure::Segment,
slowjob::SlowJobPool,
vol::BaseVol, vol::BaseVol,
}; };
use core::{hash::Hash, ops::Range}; use core::{hash::Hash, ops::Range};
use crossbeam::atomic; use crossbeam::atomic;
use hashbrown::{hash_map::Entry, HashMap}; use hashbrown::{hash_map::Entry, HashMap};
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::Runtime;
use vek::*; use vek::*;
/// A type produced by mesh worker threads corresponding to the information /// A type produced by mesh worker threads corresponding to the information
@ -338,7 +338,7 @@ where
tick: u64, tick: u64,
camera_mode: CameraMode, camera_mode: CameraMode,
character_state: Option<&CharacterState>, character_state: Option<&CharacterState>,
runtime: &Runtime, slow_jobs: &SlowJobPool,
) -> (FigureModelEntryLod<'c>, &'c Skel::Attr) ) -> (FigureModelEntryLod<'c>, &'c Skel::Attr)
where where
for<'a> &'a Skel::Body: Into<Skel::Attr>, for<'a> &'a Skel::Body: Into<Skel::Attr>,
@ -404,7 +404,7 @@ where
let manifests = self.manifests; let manifests = self.manifests;
let slot_ = Arc::clone(&slot); let slot_ = Arc::clone(&slot);
runtime.spawn_blocking(move || { slow_jobs.spawn("FIGURE_MESHING", move || {
// First, load all the base vertex data. // First, load all the base vertex data.
let manifests = &*manifests.read(); let manifests = &*manifests.read();
let meshes = <Skel::Body as BodySpec>::bone_meshes(&key, manifests); let meshes = <Skel::Body as BodySpec>::bone_meshes(&key, manifests);

View File

@ -561,6 +561,7 @@ impl FigureMgr {
}; };
let camera_mode = camera.get_mode(); let camera_mode = camera.get_mode();
let character_state_storage = state.read_storage::<common::comp::CharacterState>(); let character_state_storage = state.read_storage::<common::comp::CharacterState>();
let slow_jobs = state.slow_job_pool();
let character_state = character_state_storage.get(scene_data.player_entity); let character_state = character_state_storage.get(scene_data.player_entity);
let focus_pos = anim::vek::Vec3::<f32>::from(camera.get_focus_pos()); let focus_pos = anim::vek::Vec3::<f32>::from(camera.get_focus_pos());
@ -756,7 +757,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.runtime, &slow_jobs,
); );
let state = self let state = self
@ -1552,7 +1553,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.runtime, &slow_jobs,
); );
let state = self let state = self
@ -1754,7 +1755,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.runtime, &slow_jobs,
); );
let state = self let state = self
@ -2081,7 +2082,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.runtime, &slow_jobs,
); );
let state = self let state = self
@ -2440,7 +2441,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.runtime, &slow_jobs,
); );
let state = self let state = self
@ -2550,7 +2551,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.runtime, &slow_jobs,
); );
let state = self let state = self
@ -2639,7 +2640,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.runtime, &slow_jobs,
); );
let state = self let state = self
@ -2985,7 +2986,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.runtime, &slow_jobs,
); );
let state = let state =
@ -3079,7 +3080,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.runtime, &slow_jobs,
); );
let state = self let state = self
@ -3266,7 +3267,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.runtime, &slow_jobs,
); );
let state = self let state = self
@ -3357,7 +3358,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.runtime, &slow_jobs,
); );
let state = self let state = self
@ -3446,7 +3447,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.runtime, &slow_jobs,
); );
let state = self let state = self
@ -3882,7 +3883,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.runtime, &slow_jobs,
); );
let state = let state =
@ -4066,7 +4067,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.runtime, &slow_jobs,
); );
let state = let state =
@ -4194,7 +4195,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.runtime, &slow_jobs,
); );
let state = self let state = self

View File

@ -36,7 +36,6 @@ use common_sys::state::State;
use comp::item::Reagent; use comp::item::Reagent;
use num::traits::{Float, FloatConst}; use num::traits::{Float, FloatConst};
use specs::{Entity as EcsEntity, Join, WorldExt}; use specs::{Entity as EcsEntity, Join, WorldExt};
use tokio::runtime::Runtime;
use vek::*; use vek::*;
// TODO: Don't hard-code this. // TODO: Don't hard-code this.
@ -115,7 +114,6 @@ pub struct SceneData<'a> {
pub loaded_distance: f32, pub loaded_distance: f32,
pub view_distance: u32, pub view_distance: u32,
pub tick: u64, pub tick: u64,
pub runtime: &'a Runtime,
pub gamma: f32, pub gamma: f32,
pub exposure: f32, pub exposure: f32,
pub ambiance: f32, pub ambiance: f32,

View File

@ -26,10 +26,10 @@ use common::{
item::ItemKind, item::ItemKind,
}, },
figure::Segment, figure::Segment,
slowjob::SlowJobPool,
terrain::BlockKind, terrain::BlockKind,
vol::{BaseVol, ReadVol}, vol::{BaseVol, ReadVol},
}; };
use tokio::runtime::Runtime;
use tracing::error; use tracing::error;
use vek::*; use vek::*;
use winit::event::MouseButton; use winit::event::MouseButton;
@ -97,7 +97,7 @@ pub struct SceneData<'a> {
pub time: f64, pub time: f64,
pub delta_time: f32, pub delta_time: f32,
pub tick: u64, pub tick: u64,
pub runtime: &'a Runtime, pub slow_job_pool: &'a SlowJobPool,
pub body: Option<humanoid::Body>, pub body: Option<humanoid::Body>,
pub gamma: f32, pub gamma: f32,
pub exposure: f32, pub exposure: f32,
@ -357,7 +357,7 @@ impl Scene {
scene_data.tick, scene_data.tick,
CameraMode::default(), CameraMode::default(),
None, None,
scene_data.runtime, scene_data.slow_job_pool,
) )
.0; .0;
let mut buf = [Default::default(); anim::MAX_BONE_COUNT]; let mut buf = [Default::default(); anim::MAX_BONE_COUNT];

View File

@ -768,21 +768,24 @@ impl<V: RectRasterableVol> Terrain<V> {
let sprite_config = Arc::clone(&self.sprite_config); let sprite_config = Arc::clone(&self.sprite_config);
let cnt = Arc::clone(&self.mesh_todos_active); let cnt = Arc::clone(&self.mesh_todos_active);
cnt.fetch_add(1, Ordering::Relaxed); cnt.fetch_add(1, Ordering::Relaxed);
scene_data.runtime.spawn_blocking(move || { scene_data
let sprite_data = sprite_data; .state
let _ = send.send(mesh_worker( .slow_job_pool()
pos, .spawn("TERRAIN_MESHING", move || {
(min_z as f32, max_z as f32), let sprite_data = sprite_data;
started_tick, let _ = send.send(mesh_worker(
volume, pos,
max_texture_size, (min_z as f32, max_z as f32),
chunk, started_tick,
aabb, volume,
&sprite_data, max_texture_size,
&sprite_config, chunk,
)); aabb,
cnt.fetch_sub(1, Ordering::Relaxed); &sprite_data,
}); &sprite_config,
));
cnt.fetch_sub(1, Ordering::Relaxed);
});
todo.is_worker_active = true; todo.is_worker_active = true;
} }
drop(guard); drop(guard);

View File

@ -1442,7 +1442,6 @@ impl PlayState for SessionState {
loaded_distance: client.loaded_distance(), loaded_distance: client.loaded_distance(),
view_distance: client.view_distance().unwrap_or(1), view_distance: client.view_distance().unwrap_or(1),
tick: client.get_tick(), tick: client.get_tick(),
runtime: &client.runtime(),
gamma: global_state.settings.graphics.gamma, gamma: global_state.settings.graphics.gamma,
exposure: global_state.settings.graphics.exposure, exposure: global_state.settings.graphics.exposure,
ambiance: global_state.settings.graphics.ambiance, ambiance: global_state.settings.graphics.ambiance,
@ -1510,7 +1509,6 @@ impl PlayState for SessionState {
loaded_distance: client.loaded_distance(), loaded_distance: client.loaded_distance(),
view_distance: client.view_distance().unwrap_or(1), view_distance: client.view_distance().unwrap_or(1),
tick: client.get_tick(), tick: client.get_tick(),
runtime: &client.runtime(),
gamma: settings.graphics.gamma, gamma: settings.graphics.gamma,
exposure: settings.graphics.exposure, exposure: settings.graphics.exposure,
ambiance: settings.graphics.ambiance, ambiance: settings.graphics.ambiance,