diff --git a/Cargo.lock b/Cargo.lock index c16404c034..752f4e4387 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4818,6 +4818,12 @@ dependencies = [ "byteorder", ] +[[package]] +name = "std-semaphore" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ae9eec00137a8eed469fb4148acd9fc6ac8c3f9b110f52cd34698c8b5bfa0e" + [[package]] name = "stdweb" version = "0.1.3" @@ -5539,6 +5545,7 @@ dependencies = [ "rustyline", "serde", "specs", + "std-semaphore", "termcolor", "tokio", "tracing", @@ -5750,6 +5757,7 @@ dependencies = [ "slab", "specs", "specs-idvs", + "std-semaphore", "tokio", "tracing", "vek 0.14.1", @@ -5771,9 +5779,11 @@ dependencies = [ "clap", "crossterm 0.19.0", "lazy_static", + "num_cpus", "ron", "serde", "signal-hook 0.3.6", + "std-semaphore", "termcolor", "tokio", "tracing", @@ -5833,6 +5843,7 @@ dependencies = [ "serde", "specs", "specs-idvs", + "std-semaphore", "termcolor", "tokio", "tracing", diff --git a/client/Cargo.toml b/client/Cargo.toml index 366bc4c7de..021db09809 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -27,6 +27,7 @@ num = "0.4" tracing = { version = "0.1", default-features = false } rayon = "1.5" specs = { git = "https://github.com/amethyst/specs.git", rev = "5a9b71035007be0e3574f35184acac1cd4530496" } +std-semaphore = "0.1.0" vek = { version = "=0.14.1", features = ["serde"] } hashbrown = { version = "0.9", features = ["rayon", "serde", "nightly"] } authc = { git = "https://gitlab.com/veloren/auth.git", rev = "fb3dcbc4962b367253f8f2f92760ef44d2679c9a" } diff --git a/client/examples/chat-cli/main.rs b/client/examples/chat-cli/main.rs index 495930c0b9..698fb238b4 100644 --- a/client/examples/chat-cli/main.rs +++ b/client/examples/chat-cli/main.rs @@ -44,7 +44,9 @@ fn main() { let password = read_input(); let runtime = Arc::new(Runtime::new().unwrap()); + let background_threads = Arc::new(std_semaphore::Semaphore::new(3)); let runtime2 = Arc::clone(&runtime); + let background_threads2 = Arc::clone(&background_threads); // Create a client. let mut client = runtime @@ -52,7 +54,7 @@ fn main() { let addr = ConnectionArgs::resolve(&server_addr, false) .await .expect("dns resolve failed"); - Client::new(addr, None, runtime2).await + Client::new(addr, None, runtime2, background_threads2).await }) .expect("Failed to create client instance"); diff --git a/client/src/lib.rs b/client/src/lib.rs index 1edebd7015..bb0dc530b4 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -138,6 +138,7 @@ pub struct Client { registered: bool, presence: Option, runtime: Arc, + background_threads: Arc, server_info: ServerInfo, world_data: WorldData, player_list: HashMap, @@ -198,6 +199,7 @@ impl Client { addr: ConnectionArgs, view_distance: Option, runtime: Arc, + background_threads: Arc, ) -> Result { let network = Network::new(Pid::new(), &runtime); @@ -446,6 +448,7 @@ impl Client { registered: false, presence: None, runtime, + background_threads, server_info, world_data: WorldData { lod_base, @@ -1842,6 +1845,10 @@ impl Client { /// exempt). pub fn runtime(&self) -> &Arc { &self.runtime } + /// Get a reference to the semaphore used to guard the number of cpu heavy + /// background threads running at once + pub fn background_threads(&self) -> &Arc { &self.background_threads } + /// Get a reference to the client's game state. pub fn state(&self) -> &State { &self.state } @@ -2176,11 +2183,13 @@ mod tests { let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9000); let view_distance: Option = None; let runtime = Arc::new(Runtime::new().unwrap()); + let background_threads = Arc::new(std_semaphore::Semaphore::new(3)); let runtime2 = Arc::clone(&runtime); let veloren_client: Result = runtime.block_on(Client::new( ConnectionArgs::IpAndPort(vec![socket]), view_distance, runtime2, + background_threads, )); let _ = veloren_client.map(|mut client| { diff --git a/server-cli/Cargo.toml b/server-cli/Cargo.toml index 4fc4646e65..b8cf563cb6 100644 --- a/server-cli/Cargo.toml +++ b/server-cli/Cargo.toml @@ -21,12 +21,14 @@ ansi-parser = "0.7" clap = "2.33" crossterm = "0.19" lazy_static = "1" +num_cpus = "1.0" signal-hook = "0.3.6" termcolor = "1.1" tracing = { version = "0.1", default-features = false } tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] } ron = {version = "0.6", default-features = false} serde = {version = "1.0", features = [ "rc", "derive" ]} +std-semaphore = "0.1.0" # Tracy tracing-tracy = { version = "0.6.0", optional = true } diff --git a/server-cli/src/main.rs b/server-cli/src/main.rs index 550709a4d7..e9924c7f5e 100644 --- a/server-cli/src/main.rs +++ b/server-cli/src/main.rs @@ -16,6 +16,7 @@ use crate::{cmd::Message, shutdown_coordinator::ShutdownCoordinator, tui_runner: use clap::{App, Arg, SubCommand}; use common::clock::Clock; use common_base::span; +use core::sync::atomic::{AtomicUsize, Ordering}; use server::{Event, Input, Server}; use std::{ io, @@ -92,12 +93,23 @@ fn main() -> io::Result<()> { path }; + let cores = num_cpus::get(); let runtime = Arc::new( tokio::runtime::Builder::new_multi_thread() .enable_all() + .worker_threads(if cores < 2 { 1 } else { cores / 2 + cores / 4 }) + .thread_name_fn(|| { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + format!("tokio-server-{}", id) + }) .build() .unwrap(), ); + let background_cpu_threads = + Arc::new(std_semaphore::Semaphore::new( + if cores < 2 { 1 } else { cores / 2 + cores / 4 } as isize, + )); // Load server settings let mut server_settings = server::Settings::load(&server_data_dir); @@ -143,6 +155,7 @@ fn main() -> io::Result<()> { editable_settings, &server_data_dir, runtime, + background_cpu_threads, ) .expect("Failed to create server instance!"); diff --git a/server/Cargo.toml b/server/Cargo.toml index b2634cf066..9a959a3b00 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -49,6 +49,7 @@ diesel = { version = "1.4.3", features = ["sqlite"] } diesel_migrations = "1.4.0" dotenv = "0.15.0" slab = "0.4" +std-semaphore = "0.1.0" # Plugins plugin-api = { package = "veloren-plugin-api", path = "../plugin/api"} diff --git a/server/src/chunk_generator.rs b/server/src/chunk_generator.rs index de28f441c3..7ddc05d170 100644 --- a/server/src/chunk_generator.rs +++ b/server/src/chunk_generator.rs @@ -22,16 +22,21 @@ pub struct ChunkGenerator { chunk_tx: crossbeam_channel::Sender, chunk_rx: crossbeam_channel::Receiver, pending_chunks: HashMap, Arc>, + background_threads: Arc, metrics: Arc, } impl ChunkGenerator { #[allow(clippy::new_without_default)] // TODO: Pending review in #587 - pub fn new(metrics: ChunkGenMetrics) -> Self { + pub fn new( + metrics: ChunkGenMetrics, + background_threads: Arc, + ) -> Self { let (chunk_tx, chunk_rx) = crossbeam_channel::unbounded(); Self { chunk_tx, chunk_rx, pending_chunks: HashMap::new(), + background_threads, metrics: Arc::new(metrics), } } @@ -52,8 +57,10 @@ impl ChunkGenerator { let cancel = Arc::new(AtomicBool::new(false)); v.insert(Arc::clone(&cancel)); let chunk_tx = self.chunk_tx.clone(); + let background_threads = Arc::clone(&self.background_threads); self.metrics.chunks_requested.inc(); runtime.spawn_blocking(move || { + let _cpu_guard = background_threads.access(); common_base::prof_span!(_guard, "generate_chunk"); let index = index.as_index_ref(); let payload = world diff --git a/server/src/lib.rs b/server/src/lib.rs index 05179fb637..e480373f61 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -151,6 +151,7 @@ impl Server { editable_settings: EditableSettings, data_dir: &std::path::Path, runtime: Arc, + background_threads: Arc, ) -> Result { info!("Server is data dir is: {}", data_dir.display()); if settings.auth_server_address.is_none() { @@ -199,7 +200,7 @@ impl Server { state.ecs_mut().insert(physics_metrics); state .ecs_mut() - .insert(ChunkGenerator::new(chunk_gen_metrics)); + .insert(ChunkGenerator::new(chunk_gen_metrics, background_threads)); state .ecs_mut() .insert(CharacterUpdater::new(&persistence_db_dir)?); diff --git a/voxygen/Cargo.toml b/voxygen/Cargo.toml index 4c9d5342e6..7bdf6aa6f5 100644 --- a/voxygen/Cargo.toml +++ b/voxygen/Cargo.toml @@ -82,6 +82,7 @@ rand = "0.8" rodio = {version = "0.13", default-features = false, features = ["wav", "vorbis"]} ron = {version = "0.6", default-features = false} serde = {version = "1.0", features = [ "rc", "derive" ]} +std-semaphore = "0.1.0" treeculler = "0.1.0" tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] } num_cpus = "1.0" diff --git a/voxygen/src/menu/char_selection/mod.rs b/voxygen/src/menu/char_selection/mod.rs index 6f2055b2ae..e323f48e14 100644 --- a/voxygen/src/menu/char_selection/mod.rs +++ b/voxygen/src/menu/char_selection/mod.rs @@ -149,6 +149,7 @@ impl PlayState for CharSelectionState { delta_time: client.state().ecs().read_resource::().0, tick: client.get_tick(), runtime: client.runtime(), + background_threads: client.background_threads(), body: humanoid_body, gamma: global_state.settings.graphics.gamma, exposure: global_state.settings.graphics.exposure, diff --git a/voxygen/src/menu/main/client_init.rs b/voxygen/src/menu/main/client_init.rs index d6574ff2ab..120c786e28 100644 --- a/voxygen/src/menu/main/client_init.rs +++ b/voxygen/src/menu/main/client_init.rs @@ -50,19 +50,19 @@ impl ClientInit { username: String, view_distance: Option, password: String, - runtime: Option>, + runtime_and_threads: Option<(Arc, Arc)>, ) -> Self { let (tx, rx) = unbounded(); let (trust_tx, trust_rx) = unbounded(); let cancel = Arc::new(AtomicBool::new(false)); let cancel2 = Arc::clone(&cancel); - let runtime = runtime.unwrap_or_else(|| { + let (runtime, background_threads) = runtime_and_threads.unwrap_or_else(|| { let cores = num_cpus::get(); - Arc::new( + let runtime = Arc::new( runtime::Builder::new_multi_thread() .enable_all() - .worker_threads(if cores > 4 { cores - 1 } else { cores }) + .worker_threads(if cores < 2 { 1 } else { cores / 2 + cores / 4 }) .thread_name_fn(|| { static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); @@ -70,9 +70,20 @@ impl ClientInit { }) .build() .unwrap(), - ) + ); + + let background_threads = Arc::new(std_semaphore::Semaphore::new(if cores < 2 { + 1 + } else { + cores / 2 + cores / 4 + } + as isize)); + + (runtime, background_threads) }); + let runtime2 = Arc::clone(&runtime); + let background_threads2 = Arc::clone(&background_threads); runtime.spawn(async move { let trust_fn = |auth_server: &str| { @@ -106,6 +117,7 @@ impl ClientInit { connection_args.clone(), view_distance, Arc::clone(&runtime2), + Arc::clone(&background_threads2), ) .await { diff --git a/voxygen/src/menu/main/mod.rs b/voxygen/src/menu/main/mod.rs index af82105301..44ba0bd994 100644 --- a/voxygen/src/menu/main/mod.rs +++ b/voxygen/src/menu/main/mod.rs @@ -69,7 +69,7 @@ impl PlayState for MainMenuState { { if let Some(singleplayer) = &global_state.singleplayer { match singleplayer.receiver.try_recv() { - Ok(Ok(runtime)) => { + Ok(Ok((runtime, background_threads))) => { // Attempt login after the server is finished initializing attempt_login( &mut global_state.settings, @@ -78,7 +78,7 @@ impl PlayState for MainMenuState { "".to_owned(), ClientConnArgs::Resolved(ConnectionArgs::Mpsc(14004)), &mut self.client_init, - Some(runtime), + Some((runtime, background_threads)), ); }, Ok(Err(e)) => { @@ -342,7 +342,7 @@ fn attempt_login( password: String, connection_args: ClientConnArgs, client_init: &mut Option, - runtime: Option>, + runtime_and_threads: Option<(Arc, Arc)>, ) { if comp::Player::alias_is_valid(&username) { // Don't try to connect if there is already a connection in progress. @@ -352,7 +352,7 @@ fn attempt_login( username, Some(settings.graphics.view_distance), password, - runtime, + runtime_and_threads, )); } } else { diff --git a/voxygen/src/scene/figure/cache.rs b/voxygen/src/scene/figure/cache.rs index 487a6d2dbe..671f92c55b 100644 --- a/voxygen/src/scene/figure/cache.rs +++ b/voxygen/src/scene/figure/cache.rs @@ -339,6 +339,7 @@ where camera_mode: CameraMode, character_state: Option<&CharacterState>, runtime: &Runtime, + background_threads: &Arc, ) -> (FigureModelEntryLod<'c>, &'c Skel::Attr) where for<'a> &'a Skel::Body: Into, @@ -403,8 +404,10 @@ where let slot = Arc::new(atomic::AtomicCell::new(None)); let manifests = self.manifests; let slot_ = Arc::clone(&slot); + let background_threads_ = Arc::clone(background_threads); runtime.spawn_blocking(move || { + let _cpu_guard = background_threads_.access(); // First, load all the base vertex data. let manifests = &*manifests.read(); let meshes = ::bone_meshes(&key, manifests); diff --git a/voxygen/src/scene/figure/mod.rs b/voxygen/src/scene/figure/mod.rs index c931efcd7f..261cf75a60 100644 --- a/voxygen/src/scene/figure/mod.rs +++ b/voxygen/src/scene/figure/mod.rs @@ -757,6 +757,7 @@ impl FigureMgr { player_camera_mode, player_character_state, scene_data.runtime, + scene_data.background_threads, ); let state = self @@ -1553,6 +1554,7 @@ impl FigureMgr { player_camera_mode, player_character_state, scene_data.runtime, + scene_data.background_threads, ); let state = self @@ -1755,6 +1757,7 @@ impl FigureMgr { player_camera_mode, player_character_state, scene_data.runtime, + scene_data.background_threads, ); let state = self @@ -2082,6 +2085,7 @@ impl FigureMgr { player_camera_mode, player_character_state, scene_data.runtime, + scene_data.background_threads, ); let state = self @@ -2441,6 +2445,7 @@ impl FigureMgr { player_camera_mode, player_character_state, scene_data.runtime, + scene_data.background_threads, ); let state = self @@ -2551,6 +2556,7 @@ impl FigureMgr { player_camera_mode, player_character_state, scene_data.runtime, + scene_data.background_threads, ); let state = self @@ -2640,6 +2646,7 @@ impl FigureMgr { player_camera_mode, player_character_state, scene_data.runtime, + scene_data.background_threads, ); let state = self @@ -2986,6 +2993,7 @@ impl FigureMgr { player_camera_mode, player_character_state, scene_data.runtime, + scene_data.background_threads, ); let state = @@ -3080,6 +3088,7 @@ impl FigureMgr { player_camera_mode, player_character_state, scene_data.runtime, + scene_data.background_threads, ); let state = self @@ -3267,6 +3276,7 @@ impl FigureMgr { player_camera_mode, player_character_state, scene_data.runtime, + scene_data.background_threads, ); let state = self @@ -3358,6 +3368,7 @@ impl FigureMgr { player_camera_mode, player_character_state, scene_data.runtime, + scene_data.background_threads, ); let state = self @@ -3447,6 +3458,7 @@ impl FigureMgr { player_camera_mode, player_character_state, scene_data.runtime, + scene_data.background_threads, ); let state = self @@ -3883,6 +3895,7 @@ impl FigureMgr { player_camera_mode, player_character_state, scene_data.runtime, + scene_data.background_threads, ); let state = @@ -4067,6 +4080,7 @@ impl FigureMgr { player_camera_mode, player_character_state, scene_data.runtime, + scene_data.background_threads, ); let state = @@ -4195,6 +4209,7 @@ impl FigureMgr { player_camera_mode, player_character_state, scene_data.runtime, + scene_data.background_threads, ); let state = self diff --git a/voxygen/src/scene/mod.rs b/voxygen/src/scene/mod.rs index cc3ffb706e..37c32e3597 100644 --- a/voxygen/src/scene/mod.rs +++ b/voxygen/src/scene/mod.rs @@ -116,6 +116,7 @@ pub struct SceneData<'a> { pub view_distance: u32, pub tick: u64, pub runtime: &'a Runtime, + pub background_threads: &'a std::sync::Arc, pub gamma: f32, pub exposure: f32, pub ambiance: f32, diff --git a/voxygen/src/scene/simple.rs b/voxygen/src/scene/simple.rs index bc735af9e1..10da8138e5 100644 --- a/voxygen/src/scene/simple.rs +++ b/voxygen/src/scene/simple.rs @@ -98,6 +98,7 @@ pub struct SceneData<'a> { pub delta_time: f32, pub tick: u64, pub runtime: &'a Runtime, + pub background_threads: &'a std::sync::Arc, pub body: Option, pub gamma: f32, pub exposure: f32, @@ -358,6 +359,7 @@ impl Scene { CameraMode::default(), None, scene_data.runtime, + scene_data.background_threads, ) .0; let mut buf = [Default::default(); anim::MAX_BONE_COUNT]; diff --git a/voxygen/src/scene/terrain.rs b/voxygen/src/scene/terrain.rs index 6faf7aa42e..c9bc2688bf 100644 --- a/voxygen/src/scene/terrain.rs +++ b/voxygen/src/scene/terrain.rs @@ -694,10 +694,12 @@ impl Terrain { // Limit ourselves to u16::MAX even if larger textures are supported. let max_texture_size = renderer.max_texture_size(); + // TODO: we need a blocking and non-blocking semaphore + // so we can use it in a non-blocking fashion on the main thread here + // and use it to block in other places that need that (chunk gen) let meshing_cores = match num_cpus::get() as u64 { - n if n < 4 => 1, - n if n < 8 => n - 3, - n => n - 4, + n if n < 2 => 1, + n => n / 2 + n / 4, }; span!(guard, "Queue meshing from todo list"); @@ -768,7 +770,9 @@ impl Terrain { let sprite_config = Arc::clone(&self.sprite_config); let cnt = Arc::clone(&self.mesh_todos_active); cnt.fetch_add(1, Ordering::Relaxed); + let background_threads = Arc::clone(scene_data.background_threads); scene_data.runtime.spawn_blocking(move || { + let _cpu_guard = background_threads.access(); let sprite_data = sprite_data; let _ = send.send(mesh_worker( pos, diff --git a/voxygen/src/session.rs b/voxygen/src/session.rs index 791db45343..b1bb4a414a 100644 --- a/voxygen/src/session.rs +++ b/voxygen/src/session.rs @@ -1443,6 +1443,7 @@ impl PlayState for SessionState { view_distance: client.view_distance().unwrap_or(1), tick: client.get_tick(), runtime: &client.runtime(), + background_threads: client.background_threads(), gamma: global_state.settings.graphics.gamma, exposure: global_state.settings.graphics.exposure, ambiance: global_state.settings.graphics.ambiance, @@ -1511,6 +1512,7 @@ impl PlayState for SessionState { view_distance: client.view_distance().unwrap_or(1), tick: client.get_tick(), runtime: &client.runtime(), + background_threads: client.background_threads(), gamma: settings.graphics.gamma, exposure: settings.graphics.exposure, ambiance: settings.graphics.ambiance, diff --git a/voxygen/src/singleplayer.rs b/voxygen/src/singleplayer.rs index 3e994d1361..8dbb4014b0 100644 --- a/voxygen/src/singleplayer.rs +++ b/voxygen/src/singleplayer.rs @@ -19,7 +19,8 @@ const TPS: u64 = 30; pub struct Singleplayer { _server_thread: JoinHandle<()>, stop_server_s: Sender<()>, - pub receiver: Receiver, ServerError>>, + #[allow(clippy::type_complexity)] // TODO: create new type for the sent things + pub receiver: Receiver, Arc), ServerError>>, // Wether the server is stopped or not paused: Arc, // Settings that the server was started with @@ -83,7 +84,7 @@ impl Singleplayer { let runtime = Arc::new( tokio::runtime::Builder::new_multi_thread() .enable_all() - .worker_threads(if cores > 4 { cores - 1 } else { cores }) + .worker_threads(if cores < 2 { 1 } else { cores / 2 + cores / 4 }) .thread_name_fn(|| { static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); @@ -92,6 +93,10 @@ impl Singleplayer { .build() .unwrap(), ); + let background_threads = + Arc::new(std_semaphore::Semaphore::new( + if cores < 2 { 1 } else { cores / 2 + cores / 4 } as isize, + )); let settings2 = settings.clone(); @@ -111,10 +116,11 @@ impl Singleplayer { editable_settings, &server_data_dir, Arc::clone(&runtime), + Arc::clone(&background_threads), ) { Ok(s) => { server = Some(s); - Ok(runtime) + Ok((runtime, background_threads)) }, Err(e) => Err(e), },