From 48d7f7dafd87534eb92b602ee18edda103b7c36d Mon Sep 17 00:00:00 2001 From: Joshua Yanovski Date: Mon, 12 Sep 2022 20:26:22 -0700 Subject: [PATCH] [WIP] improving thread pool usage and bot client. --- Cargo.lock | 5 + client/Cargo.toml | 1 + client/examples/chat-cli/main.rs | 8 +- client/src/bin/bot/main.rs | 24 ++-- client/src/bin/swarm/main.rs | 197 +++++++++++++++------------ client/src/lib.rs | 16 +-- common/state/Cargo.toml | 1 + common/state/src/state.rs | 91 ++++++++++--- server-cli/src/main.rs | 25 +--- server/src/lib.rs | 6 +- voxygen/src/lib.rs | 2 - voxygen/src/main.rs | 26 +--- voxygen/src/menu/main/client_init.rs | 13 +- voxygen/src/menu/main/mod.rs | 16 ++- voxygen/src/render/renderer.rs | 4 +- voxygen/src/singleplayer.rs | 10 +- voxygen/src/window.rs | 7 +- 17 files changed, 245 insertions(+), 207 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8ded12b246..bbd4ba7baf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6786,6 +6786,7 @@ dependencies = [ "hashbrown 0.12.3", "image", "num 0.4.0", + "num_cpus", "quinn", "rayon", "ron 0.8.0", @@ -6973,7 +6974,11 @@ dependencies = [ "specs", "tar", "thread-priority", +<<<<<<< HEAD "timer-queue", +======= + "tokio", +>>>>>>> 60060a9913 ([WIP] improving thread pool usage and bot client.) "toml", "tracing", "vek 0.15.8", diff --git a/client/Cargo.toml b/client/Cargo.toml index a0385375e6..dbe04f3a61 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -24,6 +24,7 @@ network = { package = "veloren-network", path = "../network", features = ["compr bincode = "1.3.2" byteorder = "1.3.2" crossbeam-channel = "0.5" +num_cpus = "1.0" tokio = { version = "1.14", default-features = false, features = ["rt-multi-thread"] } quinn = "0.8" image = { version = "0.24", default-features = false, features = ["png"] } diff --git a/client/examples/chat-cli/main.rs b/client/examples/chat-cli/main.rs index 4f0a525098..77856cce12 100644 --- a/client/examples/chat-cli/main.rs +++ b/client/examples/chat-cli/main.rs @@ -56,11 +56,15 @@ fn main() { }; // Create a client. - let mut client = runtime + let pools = common_state::State::pools( + common::resources::GameMode::Client, + tokio::runtime::Builder::new_multi_thread(), + ); + let mut client = pools.runtime .block_on(Client::new( addr, - runtime2, &mut None, + pools.clone(), &username, &password, |provider| provider == "https://auth.veloren.net", diff --git a/client/src/bin/bot/main.rs b/client/src/bin/bot/main.rs index 9fcdade262..62bb1ef7ba 100644 --- a/client/src/bin/bot/main.rs +++ b/client/src/bin/bot/main.rs @@ -46,29 +46,28 @@ pub fn main() { pub struct BotClient { settings: Settings, - runtime: Arc, + pools: common_state::Pools, server_info: ServerInfo, bot_clients: HashMap, clock: Clock, } pub fn make_client( - runtime: &Arc, + pools: &common_state::Pools, server: &str, server_info: &mut Option, username: &str, password: &str, ) -> Option { - let runtime_clone = Arc::clone(runtime); let addr = ConnectionArgs::Tcp { prefer_ipv6: false, hostname: server.to_owned(), }; - runtime + pools.runtime .block_on(Client::new( addr, - runtime_clone, server_info, + pools.clone(), username, password, |_| true, @@ -78,15 +77,18 @@ pub fn make_client( impl BotClient { pub fn new(settings: Settings) -> BotClient { - let runtime = Arc::new(Runtime::new().unwrap()); + let pools = common_state::State::pools( + common::resources::GameMode::Client, + tokio::runtime::Builder::new_multi_thread(), + ); let mut server_info = None; // Don't care if we connect, just trying to grab the server info. - let _ = make_client(&runtime, &settings.server, &mut server_info, "", ""); + let _ = make_client(&pools, &settings.server, &mut server_info, "", ""); let server_info = server_info.expect("Failed to connect to server."); let clock = Clock::new(Duration::from_secs_f64(1.0 / 60.0)); BotClient { settings, - runtime, + pools, server_info, bot_clients: HashMap::new(), clock, @@ -142,7 +144,7 @@ impl BotClient { { continue; } - match self.runtime.block_on(authc.register(username, password)) { + match self.pools.runtime.block_on(authc.register(username, password)) { Ok(()) => { self.settings.bot_logins.push(BotCreds { username: username.to_string(), @@ -171,15 +173,13 @@ impl BotClient { .cloned() .collect(); for cred in creds.iter() { - let runtime = Arc::clone(&self.runtime); - let server = &self.settings.server; // TODO: log the clients in in parallel instead of in series let client = self .bot_clients .entry(cred.username.clone()) .or_insert_with(|| { - make_client(&runtime, server, &mut None, &cred.username, &cred.password) + make_client(&self.pools, server, &mut None, &cred.username, &cred.password) .expect("Failed to connect to server") }); diff --git a/client/src/bin/swarm/main.rs b/client/src/bin/swarm/main.rs index 735ad30b0c..58cf31a806 100644 --- a/client/src/bin/swarm/main.rs +++ b/client/src/bin/swarm/main.rs @@ -13,7 +13,7 @@ use std::{ time::{Duration, SystemTime}, }; use structopt::StructOpt; -use tokio::runtime::Runtime; +use tokio::runtime::{self, Runtime}; use vek::*; use veloren_client::{addr::ConnectionArgs, Client}; @@ -53,7 +53,12 @@ fn main() { let to_adminify = usernames.clone(); let finished_init = Arc::new(AtomicU32::new(0)); - let runtime = Arc::new(Runtime::new().unwrap()); + let mut builder = runtime::Builder::new_multi_thread(); + builder + .max_blocking_threads(opt.size as usize + 1) + .thread_name("swarm"); + let mut pools = common_state::State::pools(common::resources::GameMode::Client, builder); + pools.slowjob_threads = 0; // TODO: calculate and log the required chunks per second to maintain the // selected scenario with full vd loaded @@ -62,7 +67,7 @@ fn main() { admin_username, 0, to_adminify, - &runtime, + &pools, opt, &finished_init, ); @@ -72,7 +77,7 @@ fn main() { name, index as u32, Vec::new(), - &runtime, + &pools, opt, &finished_init, ); @@ -85,14 +90,15 @@ fn run_client_new_thread( username: String, index: u32, to_adminify: Vec, - runtime: &Arc, + pools: &common_state::Pools, opt: Opt, finished_init: &Arc, ) { - let runtime = Arc::clone(runtime); + let runtime = Arc::clone(&pools.runtime); + let pools = pools.clone(); let finished_init = Arc::clone(finished_init); thread::spawn(move || { - if let Err(err) = run_client(username, index, to_adminify, runtime, opt, finished_init) { + if let Err(err) = run_client(username, index, to_adminify, pools, opt, finished_init) { tracing::error!("swarm member {} exited with an error: {:?}", index, err); } }); @@ -102,111 +108,124 @@ fn run_client( username: String, index: u32, to_adminify: Vec, - runtime: Arc, + pools: common_state::Pools, opt: Opt, finished_init: Arc, ) -> Result<(), veloren_client::Error> { - let mut client = loop { - // Connect to localhost - let addr = ConnectionArgs::Tcp { - prefer_ipv6: false, - hostname: "localhost".into(), - }; - let runtime_clone = Arc::clone(&runtime); - // NOTE: use a no-auth server - match runtime.block_on(Client::new( - addr, - runtime_clone, - &mut None, - &username, - "", - |_| false, - )) { - Err(e) => tracing::warn!(?e, "Client {} disconnected", index), - Ok(client) => break client, - } - }; - let mut clock = common::clock::Clock::new(Duration::from_secs_f32(1.0 / 30.0)); let mut tick = |client: &mut Client| -> Result<(), veloren_client::Error> { clock.tick(); client.tick_network(clock.dt())?; + client.cleanup(); Ok(()) }; - // Wait for character list to load - client.load_character_list(); - while client.character_list().loading { - tick(&mut client)?; - } - - // Create character if none exist - if client.character_list().characters.is_empty() { - client.create_character( - username.clone(), - Some("common.items.weapons.sword.starter".into()), - None, - body(), - ); + let mut run = || -> Result<_, veloren_client::Error> { + // Connect to localhost + let addr = ConnectionArgs::Tcp { + prefer_ipv6: false, + hostname: "localhost".into(), + }; + // NOTE: use a no-auth server + let mut client = pools.runtime.block_on(Client::new( + addr, + &mut None, + pools.clone(), + &username, + "", + |_| false, + ))?; + tracing::info!("Client {} connected", index); + // Wait for character list to load client.load_character_list(); - - while client.character_list().loading || client.character_list().characters.is_empty() { + while client.character_list().loading { tick(&mut client)?; } - } + tracing::info!("Client {} loaded character list", index); - // Select the first character - client.request_character( - client - .character_list() - .characters - .first() - .expect("Just created new character if non were listed!!!") - .character - .id - .expect("Why is this an option?"), - common::ViewDistances { - terrain: opt.vd, - entity: opt.vd, - }, - ); + // Create character if none exist + if client.character_list().characters.is_empty() { + client.create_character( + username.clone(), + Some("common.items.weapons.sword.starter".into()), + None, + body(), + ); - // If this is the admin client then adminify the other swarm members - if !to_adminify.is_empty() { - // Wait for other clients to connect - loop { - tick(&mut client)?; - // NOTE: it's expected that each swarm member will have a unique alias - let players = client.players().collect::>(); - if to_adminify - .iter() - .all(|name| players.contains(&name.as_str())) - { - break; + client.load_character_list(); + + while client.character_list().loading || client.character_list().characters.is_empty() { + tick(&mut client)?; } } - // Assert that we are a moderator (assumes we are an admin if so) - assert!( - client.is_moderator(), - "The user needs to ensure \"{}\" is registered as an admin on the server", - username - ); - // Send commands to adminify others - to_adminify.iter().for_each(|name| { - client.send_command("adminify".into(), vec![name.into(), "admin".into()]) - }); - } + tracing::info!("Client {} found or created character", index); - // Wait for moderator - while !client.is_moderator() { - tick(&mut client)?; - } + client.set_view_distance(opt.vd); + + // Select the first character + client.request_character( + client + .character_list() + .characters + .first() + .expect("Just created new character if non were listed!!!") + .character + .id + .expect("Why is this an option?"), + ); + + // If this is the admin client then adminify the other swarm members + if !to_adminify.is_empty() { + // Wait for other clients to connect + loop { + tick(&mut client)?; + // NOTE: it's expected that each swarm member will have a unique alias + let players = client.players().collect::>(); + if to_adminify + .iter() + .all(|name| players.contains(&name.as_str())) + { + break; + } + } + // Assert that we are a moderator (assumes we are an admin if so) + assert!( + client.is_moderator(), + "The user needs to ensure \"{}\" is registered as an admin on the server", + username + ); + // Send commands to adminify others + to_adminify.iter().for_each(|name| { + client.send_command("adminify".into(), vec![name.into(), "admin".into()]) + }); + } + + // Wait for moderator + while !client.is_moderator() { + tick(&mut client)?; + } + client.clear_terrain(); + client.request_player_physics(false); + + Ok(client) + }; + + let mut client = loop { + match run() { + Err(e) => tracing::warn!(?e, "Client {} disconnected", index), + Ok(client) => { + thread::sleep(Duration::from_secs(1)); + break client + }, + } + }; + drop(pools); finished_init.fetch_add(1, Ordering::Relaxed); // Wait for initialization of all other swarm clients to finish - while !finished_init.load(Ordering::Relaxed) == opt.size { + while finished_init.load(Ordering::Relaxed) != opt.size { tick(&mut client)?; } diff --git a/client/src/lib.rs b/client/src/lib.rs index 32ccef782d..6382b86dee 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -295,12 +295,11 @@ pub struct CharacterList { } /// Higher than what's needed at VD = 65. -const TOTAL_PENDING_CHUNKS_LIMIT: usize = /*1024*/13800; +const TOTAL_PENDING_CHUNKS_LIMIT: usize = /*1024*//*13800*/12; impl Client { pub async fn new( addr: ConnectionArgs, - runtime: Arc, // TODO: refactor to avoid needing to use this out parameter mismatched_server_info: &mut Option, pools: common_state::Pools, @@ -308,7 +307,8 @@ impl Client { password: &str, auth_trusted: impl FnMut(&str) -> bool, ) -> Result { - let network = Network::new(Pid::new(), &runtime); + let network = Network::new(Pid::new(), &pools.runtime); + let runtime = Arc::clone(&pools.runtime); let mut participant = match addr { ConnectionArgs::Tcp { @@ -437,10 +437,6 @@ impl Client { *state.ecs_mut().write_resource() = PlayerEntity(Some(entity)); state.ecs_mut().insert(material_stats); state.ecs_mut().insert(ability_map); - state - .ecs_mut() - .write_resource::() - .configure(&"CHUNK_DROP", |_n| 1); let map_size = map_size_lg.chunks(); let max_height = world_map.max_height; @@ -2012,10 +2008,10 @@ impl Client { } else { drop(terrain); if !skip_mode && !self.pending_chunks.contains_key(key) { - const CURRENT_TICK_PENDING_CHUNKS_LIMIT: usize = 8 * 4; + const CURRENT_TICK_PENDING_CHUNKS_LIMIT: usize = /*8 * 4*/2; if self.pending_chunks.len() < TOTAL_PENDING_CHUNKS_LIMIT - && /* current_tick_send_chunk_requests - < CURRENT_TICK_PENDING_CHUNKS_LIMIT */true + && current_tick_send_chunk_requests + < CURRENT_TICK_PENDING_CHUNKS_LIMIT { self.send_msg_err(ClientGeneral::TerrainChunkRequest { key: *key, diff --git a/common/state/Cargo.toml b/common/state/Cargo.toml index c5a7f04b50..f51420a6b0 100644 --- a/common/state/Cargo.toml +++ b/common/state/Cargo.toml @@ -20,6 +20,7 @@ core_affinity = "0.5" rayon = "1.5" num_cpus = "1.0" thread-priority = { version = "0.9.2" } +tokio = { version = "1.14", default-features = false, features = ["rt"] } tracing = { version = "0.1", default-features = false } vek = { version = "0.15.8", features = ["serde"] } diff --git a/common/state/src/state.rs b/common/state/src/state.rs index 6018b46582..53ebb4260c 100644 --- a/common/state/src/state.rs +++ b/common/state/src/state.rs @@ -37,7 +37,7 @@ use specs::{ Component, DispatcherBuilder, Entity as EcsEntity, WorldExt, }; use thread_priority::{ThreadBuilder, ThreadPriority}; -use std::{sync::Arc, time::Instant}; +use std::{sync::{atomic::{AtomicUsize, Ordering}, Arc}, time::Instant}; use timer_queue::TimerQueue; use vek::*; @@ -123,35 +123,50 @@ pub struct State { thread_pool: Arc, } -pub type Pools = (usize, GameMode/*u64*/, Arc/*, slowjob::SlowJobPool*/); +#[derive(Clone,Debug)] +pub struct Pools { + pub slowjob_threads: usize, + game_mode: GameMode, + pub rayon_pool: Arc, + pub runtime: Arc, + /* slowjob: slowjob::SlowJobPool,*/ +} impl State { - pub fn pools(game_mode: GameMode) -> Pools { + pub fn pools(game_mode: GameMode, mut tokio_builder: tokio::runtime::Builder) -> Pools { let num_cpu = num_cpus::get()/* - 1*/; - let (thread_name_infix, rayon_offset) = match game_mode { + let (thread_name_infix, rayon_offset, tokio_count) = match game_mode { // The server does work on both the main thread and the rayon pool, but not at the same // time, so there's no reason to hold back a core from rayon. It does effectively no - // other non-slowjob, non-rayon work that blocks the main thread. - GameMode::Server => ("s", 0), + // other non-slowjob, non-rayon work that blocks the main thread, but we do want to + // leave num_cpu / 4 cores available for tokio, since otherwise TCP buffers can build + // up to unreasonable degrees. + GameMode::Server => ("s", 0, num_cpu / 4), // The client does work on both the main thread and the rayon pool, but not at the same // time. It does run some other non-slowjob and non-rayon threads, but none of them - // block work on the main thread. - GameMode::Client => ("c", 0), + // block work on the main thread. It does not do enough networking for it to be worth + // dedicating anything to the tokio pool. + GameMode::Client => ("c", 0, 0), // Singleplayer does work on both the main thread and the rayon pool; unlike the server // and client cases, the rayon work may interfere with work on one of the main threads, // since the server and client don't coordinate their rayon work. Therefore, we // reserve a core for the main thread(s) from both slowjob and rayon tasks. Since many // CPUs can't afford to lose an entire core during the common periods when the server // and client are not interfering with each other, we still spawn an extra rayon thread - // in this case, but leave it floating so the OS can schedule it. - GameMode::Singleplayer => ("sp", /*2*/1), + // in this case, but leave it floating so the OS can schedule it. Of course, + // singleplayer need not devote any resources to the tokio pool, as it's not expected + // to be in use. + GameMode::Singleplayer => ("sp", /*2*/1, 0), }; let rayon_threads = /*match game_mode { GameMode::Server | GameMode::Client => num_cpu / 2, GameMode::Singleplayer => num_cpu / 4, - }*/num_cpu.max(common::consts::MIN_RECOMMENDED_RAYON_THREADS); + }*/num_cpu/*.saturating_sub(tokio_count)*/.max(common::consts::MIN_RECOMMENDED_RAYON_THREADS); let core_ids = /*(rayon_threads >= 16).then(|| */core_affinity::get_core_ids().unwrap_or(vec![])/*).unwrap_or(vec![])*/; + // Don't pin rayon threads to the cores expected to be used by tokio threads. + /* core_ids.truncate(core_ids.len() - tokio_count); */ + let mut core_ids_ = core_ids.clone(); let core_count = core_ids.len(); let rayon_pool = Arc::new( ThreadPoolBuilder::new() @@ -168,14 +183,14 @@ impl State { } // pinned rayon threads run with high priority let index = thread.index(); - if index.checked_sub(rayon_offset).map_or(false, |i| i < core_count) { + if index.checked_sub(rayon_offset + tokio_count).map_or(false, |i| i < core_count) { b = b.priority(ThreadPriority::Max); } b.spawn_careless(|| thread.run())?; Ok(()) }) .start_handler(move |i| { - if let Some(&core_id) = i.checked_sub(rayon_offset).and_then(|i| core_ids.get(i)) { + if let Some(&core_id) = i.checked_sub(rayon_offset + tokio_count).and_then(|i| core_ids_.get(i)) { core_affinity::set_for_current(core_id); } }) @@ -200,9 +215,49 @@ impl State { /*Arc::clone(*/slow_pool/*)*/, ); */ + // Setup tokio runtime + use tokio::runtime::Builder; + + // TODO: evaluate std::thread::available_concurrency as a num_cpus replacement + let cores = num_cpus::get(); + // We don't need that many threads in the async pool, at least 2 but generally + // 25% of all available will do + let mut core_ids_ = core_ids.clone(); + let runtime = Arc::new( + tokio_builder + .enable_all() + .worker_threads(tokio_count.max(common::consts::MIN_RECOMMENDED_TOKIO_THREADS)) + .thread_name_fn(move || { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + format!("tokio-{}-{}", thread_name_infix, id) + }) + .on_thread_start(move || { + if tokio_count > 0 { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let index = ATOMIC_ID.fetch_add(1, Ordering::SeqCst) % tokio_count; + if let Some(&core_id) = index.checked_add(num_cpu - tokio_count - 1).and_then(|i| core_ids_.get(i)) { + core_affinity::set_for_current(core_id); + } + } + }) + .build() + .unwrap(), + ); + // We always reserve at least one non-slowjob core, if possible, to make sure systems get a - // chance to run unobstructed. - (num_cpu - 1/*slow_limit*//* as u64*/, game_mode, rayon_pool/*, slowjob*/) + // chance to run unobstructed. We also leave any dedicated tokio threads their own + // reserved cores, since networking is quite latency critical on a server. + // + // Note that for all n > 1, n - n / 4 - 1 > 0 (using integer arithmetic), which is fine + // since we require at least 2 hardware threads. + Pools { + slowjob_threads: num_cpu - tokio_count - 1/*slow_limit*//* as u64*/, + game_mode, + rayon_pool, + runtime, + /*slowjob,*/ + } } /// Create a new `State` in client mode. @@ -227,8 +282,8 @@ impl State { GameMode::Singleplayer => "sp", }; */ - let num_cpu = /*num_cpus::get()*/pools.0/* / 2 + pools.0 / 4*/; - let game_mode = pools.1; + let num_cpu = /*num_cpus::get()*/pools.slowjob_threads/* / 2 + pools.0 / 4*/; + let game_mode = pools.game_mode; /* let rayon_threads = match game_mode { GameMode::Server | GameMode::Client => num_cpu/* / 2*/, GameMode::Singleplayer => num_cpu/* / 4*// 2, @@ -280,7 +335,7 @@ impl State { Self { ecs: Self::setup_ecs_world(ecs_role, /*num_cpu as u64*//*, &thread_pool, *//*pools.1*/slowjob/*pools.3*/, map_size_lg, default_chunk), - thread_pool: pools.2, + thread_pool: pools.rayon_pool, } } diff --git a/server-cli/src/main.rs b/server-cli/src/main.rs index f776118c24..d4405e207a 100644 --- a/server-cli/src/main.rs +++ b/server-cli/src/main.rs @@ -22,9 +22,8 @@ use crate::{ tui_runner::Tui, tuilog::TuiLog, }; -use common::{clock::Clock, consts::MIN_RECOMMENDED_TOKIO_THREADS}; +use common::clock::Clock; use common_base::span; -use core::sync::atomic::{AtomicUsize, Ordering}; use server::{persistence::DatabaseSettings, settings::Protocol, Event, Input, Server}; use std::{ io, @@ -75,20 +74,9 @@ fn main() -> io::Result<()> { path }; - // We don't need that many threads in the async pool, at least 2 but generally - // 25% of all available will do - // TODO: evaluate std::thread::available_concurrency as a num_cpus replacement - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads((num_cpus::get() / 4).max(MIN_RECOMMENDED_TOKIO_THREADS)) - .thread_name_fn(|| { - static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); - format!("tokio-server-{}", id) - }) - .build() - .unwrap(), + let pools = common_state::State::pools( + common::resources::GameMode::Server, + tokio::runtime::Builder::new_multi_thread(), ); #[cfg(feature = "hot-agent")] @@ -122,7 +110,7 @@ fn main() -> io::Result<()> { ArgvCommand::Shared(SharedCommand::Admin { command }) => { let login_provider = server::login_provider::LoginProvider::new( server_settings.auth_server_address, - runtime, + pools.runtime, ); match command { @@ -181,8 +169,7 @@ fn main() -> io::Result<()> { editable_settings, database_settings, &server_data_dir, - runtime, - common_state::State::pools(common::resources::GameMode::Server), + pools, ) .expect("Failed to create server instance!"); diff --git a/server/src/lib.rs b/server/src/lib.rs index e6da6f71a1..9801b16fee 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -217,7 +217,6 @@ impl Server { editable_settings: EditableSettings, database_settings: DatabaseSettings, data_dir: &std::path::Path, - runtime: Arc, pools: common_state::Pools, ) -> Result { info!("Server data dir is: {}", data_dir.display()); @@ -262,13 +261,13 @@ impl Server { }, calendar: Some(settings.calendar_mode.calendar_now()), }, - &pools.2, + &pools.rayon_pool, ); #[cfg(not(feature = "worldgen"))] let (world, index) = World::generate(settings.world_seed); #[cfg(feature = "worldgen")] - let map = world.get_map_data(index.as_index_ref(), &pools.2); + let map = world.get_map_data(index.as_index_ref(), &pools.rayon_pool); #[cfg(not(feature = "worldgen"))] let map = WorldMapMsg { dimensions_lg: Vec2::zero(), @@ -281,6 +280,7 @@ impl Server { default_chunk: Arc::new(world.generate_oob_chunk()), }; + let runtime = Arc::clone(&pools.runtime); let mut state = State::server( pools, world.sim().map_size_lg(), diff --git a/voxygen/src/lib.rs b/voxygen/src/lib.rs index 1a20c8c683..c4de90adf4 100644 --- a/voxygen/src/lib.rs +++ b/voxygen/src/lib.rs @@ -61,7 +61,6 @@ use i18n::LocalizationHandle; use std::path::PathBuf; use std::sync::Arc; -use tokio::runtime::Runtime; /// A type used to store state that is shared between all play states. pub struct GlobalState { @@ -70,7 +69,6 @@ pub struct GlobalState { pub settings: Settings, pub profile: Profile, pub window: Window, - pub tokio_runtime: Arc, #[cfg(feature = "egui-ui")] pub egui_state: EguiState, pub lazy_init: scene::terrain::SpriteRenderContextLazy, diff --git a/voxygen/src/main.rs b/voxygen/src/main.rs index 2854e5b0c1..a5aab7e4c3 100644 --- a/voxygen/src/main.rs +++ b/voxygen/src/main.rs @@ -95,29 +95,6 @@ fn main() { use clap::Parser; let args = cli::Args::parse(); - // Setup tokio runtime - use common::consts::MIN_RECOMMENDED_TOKIO_THREADS; - use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }; - use tokio::runtime::Builder; - - // TODO: evaluate std::thread::available_concurrency as a num_cpus replacement - let cores = num_cpus::get(); - let tokio_runtime = Arc::new( - Builder::new_multi_thread() - .enable_all() - .worker_threads((cores / 4).max(MIN_RECOMMENDED_TOKIO_THREADS)) - .thread_name_fn(|| { - static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); - format!("tokio-voxygen-{}", id) - }) - .build() - .unwrap(), - ); - #[cfg(feature = "hot-reloading")] assets::start_hot_reloading(); @@ -167,7 +144,7 @@ fn main() { // Create window use veloren_voxygen::{error::Error, render::RenderError}; - let (mut window, event_loop) = match Window::new(&settings, &tokio_runtime) { + let (mut window, event_loop) = match Window::new(&settings) { Ok(ok) => ok, // Custom panic message when a graphics backend could not be found Err(Error::RenderError(RenderError::CouldNotFindAdapter)) => { @@ -211,7 +188,6 @@ fn main() { audio, profile, window, - tokio_runtime, #[cfg(feature = "egui-ui")] egui_state, lazy_init, diff --git a/voxygen/src/menu/main/client_init.rs b/voxygen/src/menu/main/client_init.rs index 2d9bbad53c..ef2ca6afa6 100644 --- a/voxygen/src/menu/main/client_init.rs +++ b/voxygen/src/menu/main/client_init.rs @@ -11,7 +11,6 @@ use std::{ }, time::Duration, }; -use tokio::runtime; use tracing::{trace, warn}; #[derive(Debug)] @@ -47,7 +46,6 @@ impl ClientInit { connection_args: ConnectionArgs, username: String, password: String, - runtime: Arc, pools: common_state::Pools, ) -> Self { let (tx, rx) = unbounded(); @@ -55,9 +53,7 @@ impl ClientInit { let cancel = Arc::new(AtomicBool::new(false)); let cancel2 = Arc::clone(&cancel); - let runtime2 = Arc::clone(&runtime); - - runtime.spawn(async move { + pools.runtime.spawn(async move { let trust_fn = |auth_server: &str| { let _ = tx.send(Msg::IsAuthTrusted(auth_server.to_string())); trust_rx @@ -76,7 +72,6 @@ impl ClientInit { let mut mismatched_server_info = None; match Client::new( connection_args.clone(), - Arc::clone(&runtime2), &mut mismatched_server_info, pools.clone(), &username, @@ -87,8 +82,7 @@ impl ClientInit { { Ok(client) => { let _ = tx.send(Msg::Done(Ok(client))); - drop(pools); - tokio::task::block_in_place(move || drop(runtime2)); + tokio::task::block_in_place(move || drop(pools)); return; }, Err(ClientError::NetworkErr(NetworkError::ConnectFailed( @@ -112,9 +106,8 @@ impl ClientInit { // address and all the attempts timed out. let _ = tx.send(Msg::Done(Err(last_err.unwrap_or(Error::ServerNotFound)))); - drop(pools); // Safe drop runtime - tokio::task::block_in_place(move || drop(runtime2)); + tokio::task::block_in_place(move || drop(pools)); }); ClientInit { diff --git a/voxygen/src/menu/main/mod.rs b/voxygen/src/menu/main/mod.rs index e13fef8cfc..7721b30975 100644 --- a/voxygen/src/menu/main/mod.rs +++ b/voxygen/src/menu/main/mod.rs @@ -22,7 +22,7 @@ use common_base::span; use i18n::LocalizationHandle; use scene::Scene; use std::sync::Arc; -use tokio::runtime; +use tokio::Builder; use tracing::error; use ui::{Event as MainMenuEvent, MainMenuUi}; @@ -107,7 +107,7 @@ impl PlayState for MainMenuState { "".to_owned(), ConnectionArgs::Mpsc(14004), &mut self.init, - &global_state.tokio_runtime, + &pools.runtime, &global_state.i18n, Some(pools), ); @@ -318,7 +318,6 @@ impl PlayState for MainMenuState { password, connection_args, &mut self.init, - &global_state.tokio_runtime, &global_state.i18n, None, ); @@ -348,7 +347,8 @@ impl PlayState for MainMenuState { }, #[cfg(feature = "singleplayer")] MainMenuEvent::StartSingleplayer => { - let singleplayer = Singleplayer::new(&global_state.tokio_runtime); + + let singleplayer = Singleplayer::new(); global_state.singleplayer = Some(singleplayer); }, @@ -527,7 +527,6 @@ fn attempt_login( password: String, connection_args: ConnectionArgs, init: &mut InitState, - runtime: &Arc, localized_strings: &LocalizationHandle, pools: Option, ) { @@ -561,7 +560,12 @@ fn attempt_login( username, password, Arc::clone(runtime), - pools.unwrap_or_else(|| common_state::State::pools(common::resources::GameMode::Client)), + pools.unwrap_or_else(|| { + common_state::State::pools( + common::resources::GameMode::Client, + tokio::runtime::Builder::new_multi_thread(), + ) + }), )); } } diff --git a/voxygen/src/render/renderer.rs b/voxygen/src/render/renderer.rs index f2b14baeb5..5bd6bca1fe 100644 --- a/voxygen/src/render/renderer.rs +++ b/voxygen/src/render/renderer.rs @@ -199,7 +199,6 @@ impl Renderer { pub fn new( window: &winit::window::Window, mode: RenderMode, - runtime: &tokio::runtime::Runtime, ) -> Result { let (pipeline_modes, mut other_modes) = mode.split(); // Enable seamless cubemaps globally, where available--they are essentially a @@ -313,6 +312,9 @@ impl Renderer { path }); + let runtime = runtime::Builder::new_current_thread().build() + .expect("Failed to create single-threaded tokio runtime (for renderer initialization)."); + let (device, queue) = runtime.block_on(adapter.request_device( &wgpu::DeviceDescriptor { // TODO diff --git a/voxygen/src/singleplayer.rs b/voxygen/src/singleplayer.rs index daad5c90ee..aa6071efad 100644 --- a/voxygen/src/singleplayer.rs +++ b/voxygen/src/singleplayer.rs @@ -12,7 +12,6 @@ use std::{ thread::{self, JoinHandle}, time::Duration, }; -use tokio::runtime::Runtime; use tracing::{error, info, trace, warn}; const TPS: u64 = 30; @@ -30,7 +29,7 @@ pub struct Singleplayer { } impl Singleplayer { - pub fn new(runtime: &Arc) -> Self { + pub fn new() -> Self { let (stop_server_s, stop_server_r) = unbounded(); // Determine folder to save server data in @@ -100,18 +99,19 @@ impl Singleplayer { let (result_sender, result_receiver) = bounded(1); let builder = thread::Builder::new().name("singleplayer-server-thread".into()); - let runtime = Arc::clone(runtime); let thread = builder .spawn(move || { trace!("starting singleplayer server thread"); - let pools = common_state::State::pools(common::resources::GameMode::Singleplayer); + let pools = common_state::State::pools( + common::resources::GameMode::Singleplayer, + tokio::runtime::Builder::new_multi_thread(), + ); let (server, init_result) = match Server::new( settings2, editable_settings, database_settings, &server_data_dir, - runtime, pools.clone(), ) { Ok(server) => (Some(server), Ok(pools)), diff --git a/voxygen/src/window.rs b/voxygen/src/window.rs index fd1e62ae04..0c543ac162 100644 --- a/voxygen/src/window.rs +++ b/voxygen/src/window.rs @@ -411,10 +411,7 @@ pub struct Window { } impl Window { - pub fn new( - settings: &Settings, - runtime: &tokio::runtime::Runtime, - ) -> Result<(Window, EventLoop), Error> { + pub fn new(settings: &Settings) -> Result<(Window, EventLoop), Error> { let event_loop = EventLoop::new(); let size = settings.graphics.window_size; @@ -434,7 +431,7 @@ impl Window { let window = win_builder.build(&event_loop).unwrap(); - let renderer = Renderer::new(&window, settings.graphics.render_mode.clone(), runtime)?; + let renderer = Renderer::new(&window, settings.graphics.render_mode.clone())?; let keypress_map = HashMap::new();