From 60060a9913de2f8a576818a4b26ab0c4441f588f Mon Sep 17 00:00:00 2001 From: Joshua Yanovski Date: Mon, 12 Sep 2022 20:26:22 -0700 Subject: [PATCH] [WIP] improving thread pool usage and bot client. --- Cargo.lock | 9 + client/Cargo.toml | 1 + client/examples/chat-cli/main.rs | 8 +- client/src/bin/bot/main.rs | 24 +- client/src/bin/swarm/main.rs | 193 ++++---- client/src/lib.rs | 665 ++++++++++++++------------- common/state/Cargo.toml | 1 + common/state/src/state.rs | 92 +++- server-cli/src/main.rs | 25 +- server/src/lib.rs | 6 +- server/src/sys/terrain.rs | 10 +- voxygen/src/lib.rs | 2 - voxygen/src/main.rs | 26 +- voxygen/src/menu/main/client_init.rs | 13 +- voxygen/src/menu/main/mod.rs | 16 +- voxygen/src/render/renderer.rs | 4 +- voxygen/src/singleplayer.rs | 10 +- voxygen/src/window.rs | 7 +- 18 files changed, 591 insertions(+), 521 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 05edd1c188..a0448b6075 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1639,6 +1639,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ea835d29036a4087793836fa931b08837ad5e957da9e23886b29586fb9b6650" +[[package]] +name = "drop_guard" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c4a817d8b683f6e649aed359aab0c47a875377516bb5791d0f7e46d9066d209" + [[package]] name = "egui" version = "0.12.0" @@ -6561,6 +6567,7 @@ dependencies = [ "hashbrown 0.12.0", "image", "num 0.4.0", + "num_cpus", "quinn", "rayon", "ron 0.7.0", @@ -6718,6 +6725,7 @@ dependencies = [ "specs", "tar", "thread-priority", + "tokio", "toml", "tracing", "vek 0.15.8", @@ -6836,6 +6844,7 @@ dependencies = [ "chrono", "chrono-tz", "crossbeam-channel", + "drop_guard", "enumset", "futures-util", "hashbrown 0.12.0", diff --git a/client/Cargo.toml b/client/Cargo.toml index 04999fc671..1c3f94c448 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -24,6 +24,7 @@ network = { package = "veloren-network", path = "../network", features = ["compr bincode = "1.3.2" byteorder = "1.3.2" crossbeam-channel = "0.5" +num_cpus = "1.0" tokio = { version = "1.14", default-features = false, features = ["rt-multi-thread"] } quinn = "0.8" image = { version = "0.24", default-features = false, features = ["png"] } diff --git a/client/examples/chat-cli/main.rs b/client/examples/chat-cli/main.rs index 87f9113870..e84057d2d2 100644 --- a/client/examples/chat-cli/main.rs +++ b/client/examples/chat-cli/main.rs @@ -50,11 +50,15 @@ fn main() { }; // Create a client. - let mut client = runtime + let pools = common_state::State::pools( + common::resources::GameMode::Client, + tokio::runtime::Builder::new_multi_thread(), + ); + let mut client = pools.runtime .block_on(Client::new( addr, - runtime2, &mut None, + pools.clone(), &username, &password, |provider| provider == "https://auth.veloren.net", diff --git a/client/src/bin/bot/main.rs b/client/src/bin/bot/main.rs index fa34344ffd..f5a14431e2 100644 --- a/client/src/bin/bot/main.rs +++ b/client/src/bin/bot/main.rs @@ -46,29 +46,28 @@ pub fn main() { pub struct BotClient { settings: Settings, - runtime: Arc, + pools: common_state::Pools, server_info: ServerInfo, bot_clients: HashMap, clock: Clock, } pub fn make_client( - runtime: &Arc, + pools: &common_state::Pools, server: &str, server_info: &mut Option, username: &str, password: &str, ) -> Option { - let runtime_clone = Arc::clone(runtime); let addr = ConnectionArgs::Tcp { prefer_ipv6: false, hostname: server.to_owned(), }; - runtime + pools.runtime .block_on(Client::new( addr, - runtime_clone, server_info, + pools.clone(), username, password, |_| true, @@ -78,15 +77,18 @@ pub fn make_client( impl BotClient { pub fn new(settings: Settings) -> BotClient { - let runtime = Arc::new(Runtime::new().unwrap()); + let pools = common_state::State::pools( + common::resources::GameMode::Client, + tokio::runtime::Builder::new_multi_thread(), + ); let mut server_info = None; // Don't care if we connect, just trying to grab the server info. - let _ = make_client(&runtime, &settings.server, &mut server_info, "", ""); + let _ = make_client(&pools, &settings.server, &mut server_info, "", ""); let server_info = server_info.expect("Failed to connect to server."); let clock = Clock::new(Duration::from_secs_f64(1.0 / 60.0)); BotClient { settings, - runtime, + pools, server_info, bot_clients: HashMap::new(), clock, @@ -142,7 +144,7 @@ impl BotClient { { continue; } - match self.runtime.block_on(authc.register(username, password)) { + match self.pools.runtime.block_on(authc.register(username, password)) { Ok(()) => { self.settings.bot_logins.push(BotCreds { username: username.to_string(), @@ -171,15 +173,13 @@ impl BotClient { .cloned() .collect(); for cred in creds.iter() { - let runtime = Arc::clone(&self.runtime); - let server = &self.settings.server; // TODO: log the clients in in parallel instead of in series let client = self .bot_clients .entry(cred.username.clone()) .or_insert_with(|| { - make_client(&runtime, server, &mut None, &cred.username, &cred.password) + make_client(&self.pools, server, &mut None, &cred.username, &cred.password) .expect("Failed to connect to server") }); diff --git a/client/src/bin/swarm/main.rs b/client/src/bin/swarm/main.rs index b7d78b6610..aeee520890 100644 --- a/client/src/bin/swarm/main.rs +++ b/client/src/bin/swarm/main.rs @@ -9,7 +9,7 @@ use std::{ time::{Duration, SystemTime}, }; use structopt::StructOpt; -use tokio::runtime::Runtime; +use tokio::runtime::{self, Runtime}; use vek::*; use common_state::State; use veloren_client::{addr::ConnectionArgs, Client}; @@ -48,9 +48,12 @@ fn main() { let to_adminify = usernames.clone(); let finished_init = Arc::new(AtomicU32::new(0)); - let runtime = Arc::new(Runtime::new().unwrap()); - let mut pools = common_state::State::pools(common::resources::GameMode::Client); - pools.0 = 0; + let mut builder = runtime::Builder::new_multi_thread(); + builder + .max_blocking_threads(opt.size as usize + 1) + .thread_name("swarm"); + let mut pools = common_state::State::pools(common::resources::GameMode::Client, builder); + pools.slowjob_threads = 0; // TODO: calculate and log the required chunks per second to maintain the // selected scenario with full vd loaded @@ -59,7 +62,6 @@ fn main() { admin_username, 0, to_adminify, - &runtime, &pools, opt, &finished_init, @@ -70,7 +72,6 @@ fn main() { name, index as u32, Vec::new(), - &runtime, &pools, opt, &finished_init, @@ -84,16 +85,15 @@ fn run_client_new_thread( username: String, index: u32, to_adminify: Vec, - runtime: &Arc, pools: &common_state::Pools, opt: Opt, finished_init: &Arc, ) { - let runtime = Arc::clone(runtime); + let runtime = Arc::clone(&pools.runtime); let pools = pools.clone(); let finished_init = Arc::clone(finished_init); thread::spawn(move || { - if let Err(err) = run_client(username, index, to_adminify, pools, runtime, opt, finished_init) { + if let Err(err) = run_client(username, index, to_adminify, pools, opt, finished_init) { tracing::error!("swarm member {} exited with an error: {:?}", index, err); } }); @@ -104,112 +104,123 @@ fn run_client( index: u32, to_adminify: Vec, pools: common_state::Pools, - runtime: Arc, opt: Opt, finished_init: Arc, ) -> Result<(), veloren_client::Error> { - let mut client = loop { - // Connect to localhost - let addr = ConnectionArgs::Tcp { - prefer_ipv6: false, - hostname: "localhost".into(), - }; - let runtime_clone = Arc::clone(&runtime); - // NOTE: use a no-auth server - match runtime - .block_on(Client::new( - addr, - runtime_clone, - &mut None, - pools.clone(), - &username, - "", - |_| false, - )) { - Err(e) => tracing::warn!(?e, "Client {} disconnected", index), - Ok(client) => break client, - } - }; - drop(pools); - let mut clock = common::clock::Clock::new(Duration::from_secs_f32(1.0 / 30.0)); let mut tick = |client: &mut Client| -> Result<(), veloren_client::Error> { clock.tick(); client.tick_network(clock.dt())?; + client.cleanup(); Ok(()) }; - // Wait for character list to load - client.load_character_list(); - while client.character_list().loading { - tick(&mut client)?; - } - - // Create character if none exist - if client.character_list().characters.is_empty() { - client.create_character( - username.clone(), - Some("common.items.weapons.sword.starter".into()), - None, - body(), - ); + let mut run = || -> Result<_, veloren_client::Error> { + // Connect to localhost + let addr = ConnectionArgs::Tcp { + prefer_ipv6: false, + hostname: "localhost".into(), + }; + // NOTE: use a no-auth server + let mut client = pools.runtime.block_on(Client::new( + addr, + &mut None, + pools.clone(), + &username, + "", + |_| false, + ))?; + tracing::info!("Client {} connected", index); + // Wait for character list to load client.load_character_list(); - - while client.character_list().loading || client.character_list().characters.is_empty() { + while client.character_list().loading { tick(&mut client)?; } - } + tracing::info!("Client {} loaded character list", index); - // Select the first character - client.request_character( - client - .character_list() - .characters - .first() - .expect("Just created new character if non were listed!!!") - .character - .id - .expect("Why is this an option?"), - ); + // Create character if none exist + if client.character_list().characters.is_empty() { + client.create_character( + username.clone(), + Some("common.items.weapons.sword.starter".into()), + None, + body(), + ); - client.set_view_distance(opt.vd); + client.load_character_list(); - // If this is the admin client then adminify the other swarm members - if !to_adminify.is_empty() { - // Wait for other clients to connect - loop { - tick(&mut client)?; - // NOTE: it's expected that each swarm member will have a unique alias - let players = client.players().collect::>(); - if to_adminify - .iter() - .all(|name| players.contains(&name.as_str())) - { - break; + while client.character_list().loading || client.character_list().characters.is_empty() { + tick(&mut client)?; } } - // Assert that we are a moderator (assumes we are an admin if so) - assert!( - client.is_moderator(), - "The user needs to ensure \"{}\" is registered as an admin on the server", - username - ); - // Send commands to adminify others - to_adminify.iter().for_each(|name| { - client.send_command("adminify".into(), vec![name.into(), "admin".into()]) - }); - } + tracing::info!("Client {} found or created character", index); - // Wait for moderator - while !client.is_moderator() { - tick(&mut client)?; - } + client.set_view_distance(opt.vd); + + // Select the first character + client.request_character( + client + .character_list() + .characters + .first() + .expect("Just created new character if non were listed!!!") + .character + .id + .expect("Why is this an option?"), + ); + + // If this is the admin client then adminify the other swarm members + if !to_adminify.is_empty() { + // Wait for other clients to connect + loop { + tick(&mut client)?; + // NOTE: it's expected that each swarm member will have a unique alias + let players = client.players().collect::>(); + if to_adminify + .iter() + .all(|name| players.contains(&name.as_str())) + { + break; + } + } + // Assert that we are a moderator (assumes we are an admin if so) + assert!( + client.is_moderator(), + "The user needs to ensure \"{}\" is registered as an admin on the server", + username + ); + // Send commands to adminify others + to_adminify.iter().for_each(|name| { + client.send_command("adminify".into(), vec![name.into(), "admin".into()]) + }); + } + + // Wait for moderator + while !client.is_moderator() { + tick(&mut client)?; + } + client.clear_terrain(); + client.request_player_physics(false); + + Ok(client) + }; + + let mut client = loop { + match run() { + Err(e) => tracing::warn!(?e, "Client {} disconnected", index), + Ok(client) => { + thread::sleep(Duration::from_secs(1)); + break client + }, + } + }; + drop(pools); finished_init.fetch_add(1, Ordering::Relaxed); // Wait for initialization of all other swarm clients to finish - while !finished_init.load(Ordering::Relaxed) == opt.size { + while finished_init.load(Ordering::Relaxed) != opt.size { tick(&mut client)?; } diff --git a/client/src/lib.rs b/client/src/lib.rs index ff124c8d6d..0bdd0e0997 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -281,12 +281,11 @@ pub struct CharacterList { } /// Higher than what's needed at VD = 65. -const TOTAL_PENDING_CHUNKS_LIMIT: usize = /*1024*/13800; +const TOTAL_PENDING_CHUNKS_LIMIT: usize = /*1024*//*13800*/12; impl Client { pub async fn new( addr: ConnectionArgs, - runtime: Arc, // TODO: refactor to avoid needing to use this out parameter mismatched_server_info: &mut Option, pools: common_state::Pools, @@ -294,7 +293,8 @@ impl Client { password: &str, auth_trusted: impl FnMut(&str) -> bool, ) -> Result { - let network = Network::new(Pid::new(), &runtime); + let network = Network::new(Pid::new(), &pools.runtime); + let runtime = Arc::clone(&pools.runtime); let mut participant = match addr { ConnectionArgs::Tcp { @@ -374,10 +374,324 @@ impl Client { let msg = bincode::deserialize(&msg)?; Self::handle_server_terrain_msg(msg) }; - terrain_tx_.send(handle_msg()); + if terrain_tx_.send(handle_msg()).is_err() { + break; + } } }); + let ServerInit::GameSync { + entity_package, + time_of_day, + max_group_size, + client_timeout, + world_map, + recipe_book, + component_recipe_book, + material_stats, + ability_map, + } = loop { + tokio::select! { + // Spawn in a blocking thread (leaving the network thread free). This is mostly + // useful for bots. + res = register_stream.recv() => break res?, + _ = ping_interval.tick() => ping_stream.send(PingMsg::Ping)?, + } + }; + + // Spawn in a blocking thread (leaving the network thread free). This is mostly + // useful for bots. + let mut task = tokio::task::spawn_blocking(move || { + let map_size_lg = common::terrain::MapSizeLg::new(world_map.dimensions_lg) + .map_err(|_| { + Error::Other(format!( + "Server sent bad world map dimensions: {:?}", + world_map.dimensions_lg, + )) + })?; + let sea_level = world_map.default_chunk.get_min_z() as f32; + + // Initialize `State` + let mut state = State::client(pools, map_size_lg, world_map.default_chunk); + // Client-only components + state.ecs_mut().register::>(); + state.ecs_mut().write_resource::() + .configure(&"TERRAIN_DROP", |_n| 1); + /* state.ecs_mut().write_resource::() + .configure("TERRAIN_DESERIALIZING", |n| n / 2); */ + let entity = state.ecs_mut().apply_entity_package(entity_package); + *state.ecs_mut().write_resource() = time_of_day; + *state.ecs_mut().write_resource() = PlayerEntity(Some(entity)); + state.ecs_mut().insert(material_stats); + state.ecs_mut().insert(ability_map); + + let map_size = map_size_lg.chunks(); + let max_height = world_map.max_height; + let rgba = world_map.rgba; + let alt = world_map.alt; + if rgba.size() != map_size.map(|e| e as i32) { + return Err(Error::Other("Server sent a bad world map image".into())); + } + if alt.size() != map_size.map(|e| e as i32) { + return Err(Error::Other("Server sent a bad altitude map.".into())); + } + let [west, east] = world_map.horizons; + let scale_angle = + |a: u8| (a as f32 / 255.0 * ::FRAC_PI_2()).tan(); + let scale_height = |h: u8| h as f32 / 255.0 * max_height; + let scale_height_big = |h: u32| (h >> 3) as f32 / 8191.0 * max_height; + + debug!("Preparing image..."); + let unzip_horizons = |(angles, heights): &(Vec<_>, Vec<_>)| { + ( + angles.iter().copied().map(scale_angle).collect::>(), + heights + .iter() + .copied() + .map(scale_height) + .collect::>(), + ) + }; + let horizons = [unzip_horizons(&west), unzip_horizons(&east)]; + + // Redraw map (with shadows this time). + let mut world_map_rgba = vec![0u32; rgba.size().product() as usize]; + let mut world_map_topo = vec![0u32; rgba.size().product() as usize]; + let mut map_config = common::terrain::map::MapConfig::orthographic( + map_size_lg, + core::ops::RangeInclusive::new(0.0, max_height), + ); + map_config.horizons = Some(&horizons); + let rescale_height = |h: f32| h / max_height; + let bounds_check = |pos: Vec2| { + pos.reduce_partial_min() >= 0 + && pos.x < map_size.x as i32 + && pos.y < map_size.y as i32 + }; + fn sample_pos( + map_config: &MapConfig, + pos: Vec2, + alt: &Grid, + rgba: &Grid, + map_size: &Vec2, + map_size_lg: &common::terrain::MapSizeLg, + max_height: f32, + ) -> common::terrain::map::MapSample { + let rescale_height = |h: f32| h / max_height; + let scale_height_big = |h: u32| (h >> 3) as f32 / 8191.0 * max_height; + let bounds_check = |pos: Vec2| { + pos.reduce_partial_min() >= 0 + && pos.x < map_size.x as i32 + && pos.y < map_size.y as i32 + }; + let MapConfig { + gain, + is_contours, + is_height_map, + is_stylized_topo, + .. + } = *map_config; + let mut is_contour_line = false; + let mut is_border = false; + let (rgb, alt, downhill_wpos) = if bounds_check(pos) { + let posi = pos.y as usize * map_size.x as usize + pos.x as usize; + let [r, g, b, _a] = rgba[pos].to_le_bytes(); + let is_water = r == 0 && b > 102 && g < 77; + let alti = alt[pos]; + // Compute contours (chunks are assigned in the river code below) + let altj = rescale_height(scale_height_big(alti)); + let contour_interval = 150.0; + let chunk_contour = (altj * gain / contour_interval) as u32; + + // Compute downhill. + let downhill = { + let mut best = -1; + let mut besth = alti; + for nposi in neighbors(*map_size_lg, posi) { + let nbh = alt.raw()[nposi]; + let nalt = rescale_height(scale_height_big(nbh)); + let nchunk_contour = (nalt * gain / contour_interval) as u32; + if !is_contour_line && chunk_contour > nchunk_contour { + is_contour_line = true; + } + let [nr, ng, nb, _na] = rgba.raw()[nposi].to_le_bytes(); + let n_is_water = nr == 0 && nb > 102 && ng < 77; + + if !is_border && is_water && !n_is_water { + is_border = true; + } + + if nbh < besth { + besth = nbh; + best = nposi as isize; + } + } + best + }; + let downhill_wpos = if downhill < 0 { + None + } else { + Some( + Vec2::new( + (downhill as usize % map_size.x as usize) as i32, + (downhill as usize / map_size.x as usize) as i32, + ) * TerrainChunkSize::RECT_SIZE.map(|e| e as i32), + ) + }; + (Rgb::new(r, g, b), alti, downhill_wpos) + } else { + (Rgb::zero(), 0, None) + }; + let alt = f64::from(rescale_height(scale_height_big(alt))); + let wpos = pos * TerrainChunkSize::RECT_SIZE.map(|e| e as i32); + let downhill_wpos = downhill_wpos + .unwrap_or(wpos + TerrainChunkSize::RECT_SIZE.map(|e| e as i32)); + let is_path = rgb.r == 0x37 && rgb.g == 0x29 && rgb.b == 0x23; + let rgb = rgb.map(|e: u8| e as f64 / 255.0); + let is_water = rgb.r == 0.0 && rgb.b > 0.4 && rgb.g < 0.3; + + let rgb = if is_height_map { + if is_path { + // Path color is Rgb::new(0x37, 0x29, 0x23) + Rgb::new(0.9, 0.9, 0.63) + } else if is_water { + Rgb::new(0.23, 0.47, 0.53) + } else if is_contours && is_contour_line { + // Color contour lines + Rgb::new(0.15, 0.15, 0.15) + } else { + // Color hill shading + let lightness = (alt + 0.2).min(1.0) as f64; + Rgb::new(lightness, 0.9 * lightness, 0.5 * lightness) + } + } else if is_stylized_topo { + if is_path { + Rgb::new(0.9, 0.9, 0.63) + } else if is_water { + if is_border { + Rgb::new(0.10, 0.34, 0.50) + } else { + Rgb::new(0.23, 0.47, 0.63) + } + } else if is_contour_line { + Rgb::new(0.25, 0.25, 0.25) + } else { + // Stylized colors + Rgb::new( + (rgb.r + 0.25).min(1.0), + (rgb.g + 0.23).min(1.0), + (rgb.b + 0.10).min(1.0), + ) + } + } else { + Rgb::new(rgb.r, rgb.g, rgb.b) + } + .map(|e| (e * 255.0) as u8); + common::terrain::map::MapSample { + rgb, + alt, + downhill_wpos, + connections: None, + } + } + // Generate standard shaded map + map_config.is_shaded = true; + map_config.generate( + |pos| { + sample_pos( + &map_config, + pos, + &alt, + &rgba, + &map_size, + &map_size_lg, + max_height, + ) + }, + |wpos| { + let pos = wpos.map2(TerrainChunkSize::RECT_SIZE, |e, f| e / f as i32); + rescale_height(if bounds_check(pos) { + scale_height_big(alt[pos]) + } else { + 0.0 + }) + }, + |pos, (r, g, b, a)| { + world_map_rgba[pos.y * map_size.x as usize + pos.x] = + u32::from_le_bytes([r, g, b, a]); + }, + ); + // Generate map with topographical lines and stylized colors + map_config.is_contours = true; + map_config.is_stylized_topo = true; + map_config.generate( + |pos| { + sample_pos( + &map_config, + pos, + &alt, + &rgba, + &map_size, + &map_size_lg, + max_height, + ) + }, + |wpos| { + let pos = wpos.map2(TerrainChunkSize::RECT_SIZE, |e, f| e / f as i32); + rescale_height(if bounds_check(pos) { + scale_height_big(alt[pos]) + } else { + 0.0 + }) + }, + |pos, (r, g, b, a)| { + world_map_topo[pos.y * map_size.x as usize + pos.x] = + u32::from_le_bytes([r, g, b, a]); + }, + ); + let make_raw = |rgb| -> Result<_, Error> { + let mut raw = vec![0u8; 4 * world_map_rgba.len()]; + LittleEndian::write_u32_into(rgb, &mut raw); + Ok(Arc::new( + DynamicImage::ImageRgba8({ + // Should not fail if the dimensions are correct. + let map = + image::ImageBuffer::from_raw(u32::from(map_size.x), u32::from(map_size.y), raw); + map.ok_or_else(|| Error::Other("Server sent a bad world map image".into()))? + }) + // Flip the image, since Voxygen uses an orientation where rotation from + // positive x axis to positive y axis is counterclockwise around the z axis. + .flipv(), + )) + }; + let lod_base = rgba; + let lod_alt = alt; + let world_map_rgb_img = make_raw(&world_map_rgba)?; + let world_map_topo_img = make_raw(&world_map_topo)?; + let world_map_layers = vec![world_map_rgb_img, world_map_topo_img]; + let horizons = (west.0, west.1, east.0, east.1) + .into_par_iter() + .map(|(wa, wh, ea, eh)| u32::from_le_bytes([wa, wh, ea, eh])) + .collect::>(); + let lod_horizon = horizons; + let map_bounds = Vec2::new(sea_level, max_height); + debug!("Done preparing image..."); + + Ok(( + state, + lod_base, + lod_alt, + Grid::from_raw(map_size.map(|e| e as i32), lod_horizon), + (world_map_layers, map_size, map_bounds), + world_map.sites, + world_map.pois, + recipe_book, + component_recipe_book, + max_group_size, + client_timeout, + )) + }); + let ( state, lod_base, @@ -392,312 +706,7 @@ impl Client { client_timeout, ) = loop { tokio::select! { - // Spawn in a blocking thread (leaving the network thread free). This is mostly - // useful for bots. - res = register_stream.recv() => { - let ServerInit::GameSync { - entity_package, - time_of_day, - max_group_size, - client_timeout, - world_map, - recipe_book, - component_recipe_book, - material_stats, - ability_map, - } = res?; - - break tokio::task::spawn_blocking(move || { - let map_size_lg = common::terrain::MapSizeLg::new(world_map.dimensions_lg) - .map_err(|_| { - Error::Other(format!( - "Server sent bad world map dimensions: {:?}", - world_map.dimensions_lg, - )) - })?; - let sea_level = world_map.default_chunk.get_min_z() as f32; - - // Initialize `State` - let mut state = State::client(pools, map_size_lg, world_map.default_chunk); - // Client-only components - state.ecs_mut().register::>(); - state.ecs_mut().write_resource::() - .configure(&"TERRAIN_DROP", |_n| 1); - /* state.ecs_mut().write_resource::() - .configure("TERRAIN_DESERIALIZING", |n| n / 2); */ - let entity = state.ecs_mut().apply_entity_package(entity_package); - *state.ecs_mut().write_resource() = time_of_day; - *state.ecs_mut().write_resource() = PlayerEntity(Some(entity)); - state.ecs_mut().insert(material_stats); - state.ecs_mut().insert(ability_map); - - let map_size = map_size_lg.chunks(); - let max_height = world_map.max_height; - let rgba = world_map.rgba; - let alt = world_map.alt; - if rgba.size() != map_size.map(|e| e as i32) { - return Err(Error::Other("Server sent a bad world map image".into())); - } - if alt.size() != map_size.map(|e| e as i32) { - return Err(Error::Other("Server sent a bad altitude map.".into())); - } - let [west, east] = world_map.horizons; - let scale_angle = - |a: u8| (a as f32 / 255.0 * ::FRAC_PI_2()).tan(); - let scale_height = |h: u8| h as f32 / 255.0 * max_height; - let scale_height_big = |h: u32| (h >> 3) as f32 / 8191.0 * max_height; - - debug!("Preparing image..."); - let unzip_horizons = |(angles, heights): &(Vec<_>, Vec<_>)| { - ( - angles.iter().copied().map(scale_angle).collect::>(), - heights - .iter() - .copied() - .map(scale_height) - .collect::>(), - ) - }; - let horizons = [unzip_horizons(&west), unzip_horizons(&east)]; - - // Redraw map (with shadows this time). - let mut world_map_rgba = vec![0u32; rgba.size().product() as usize]; - let mut world_map_topo = vec![0u32; rgba.size().product() as usize]; - let mut map_config = common::terrain::map::MapConfig::orthographic( - map_size_lg, - core::ops::RangeInclusive::new(0.0, max_height), - ); - map_config.horizons = Some(&horizons); - let rescale_height = |h: f32| h / max_height; - let bounds_check = |pos: Vec2| { - pos.reduce_partial_min() >= 0 - && pos.x < map_size.x as i32 - && pos.y < map_size.y as i32 - }; - fn sample_pos( - map_config: &MapConfig, - pos: Vec2, - alt: &Grid, - rgba: &Grid, - map_size: &Vec2, - map_size_lg: &common::terrain::MapSizeLg, - max_height: f32, - ) -> common::terrain::map::MapSample { - let rescale_height = |h: f32| h / max_height; - let scale_height_big = |h: u32| (h >> 3) as f32 / 8191.0 * max_height; - let bounds_check = |pos: Vec2| { - pos.reduce_partial_min() >= 0 - && pos.x < map_size.x as i32 - && pos.y < map_size.y as i32 - }; - let MapConfig { - gain, - is_contours, - is_height_map, - is_stylized_topo, - .. - } = *map_config; - let mut is_contour_line = false; - let mut is_border = false; - let (rgb, alt, downhill_wpos) = if bounds_check(pos) { - let posi = pos.y as usize * map_size.x as usize + pos.x as usize; - let [r, g, b, _a] = rgba[pos].to_le_bytes(); - let is_water = r == 0 && b > 102 && g < 77; - let alti = alt[pos]; - // Compute contours (chunks are assigned in the river code below) - let altj = rescale_height(scale_height_big(alti)); - let contour_interval = 150.0; - let chunk_contour = (altj * gain / contour_interval) as u32; - - // Compute downhill. - let downhill = { - let mut best = -1; - let mut besth = alti; - for nposi in neighbors(*map_size_lg, posi) { - let nbh = alt.raw()[nposi]; - let nalt = rescale_height(scale_height_big(nbh)); - let nchunk_contour = (nalt * gain / contour_interval) as u32; - if !is_contour_line && chunk_contour > nchunk_contour { - is_contour_line = true; - } - let [nr, ng, nb, _na] = rgba.raw()[nposi].to_le_bytes(); - let n_is_water = nr == 0 && nb > 102 && ng < 77; - - if !is_border && is_water && !n_is_water { - is_border = true; - } - - if nbh < besth { - besth = nbh; - best = nposi as isize; - } - } - best - }; - let downhill_wpos = if downhill < 0 { - None - } else { - Some( - Vec2::new( - (downhill as usize % map_size.x as usize) as i32, - (downhill as usize / map_size.x as usize) as i32, - ) * TerrainChunkSize::RECT_SIZE.map(|e| e as i32), - ) - }; - (Rgb::new(r, g, b), alti, downhill_wpos) - } else { - (Rgb::zero(), 0, None) - }; - let alt = f64::from(rescale_height(scale_height_big(alt))); - let wpos = pos * TerrainChunkSize::RECT_SIZE.map(|e| e as i32); - let downhill_wpos = downhill_wpos - .unwrap_or(wpos + TerrainChunkSize::RECT_SIZE.map(|e| e as i32)); - let is_path = rgb.r == 0x37 && rgb.g == 0x29 && rgb.b == 0x23; - let rgb = rgb.map(|e: u8| e as f64 / 255.0); - let is_water = rgb.r == 0.0 && rgb.b > 0.4 && rgb.g < 0.3; - - let rgb = if is_height_map { - if is_path { - // Path color is Rgb::new(0x37, 0x29, 0x23) - Rgb::new(0.9, 0.9, 0.63) - } else if is_water { - Rgb::new(0.23, 0.47, 0.53) - } else if is_contours && is_contour_line { - // Color contour lines - Rgb::new(0.15, 0.15, 0.15) - } else { - // Color hill shading - let lightness = (alt + 0.2).min(1.0) as f64; - Rgb::new(lightness, 0.9 * lightness, 0.5 * lightness) - } - } else if is_stylized_topo { - if is_path { - Rgb::new(0.9, 0.9, 0.63) - } else if is_water { - if is_border { - Rgb::new(0.10, 0.34, 0.50) - } else { - Rgb::new(0.23, 0.47, 0.63) - } - } else if is_contour_line { - Rgb::new(0.25, 0.25, 0.25) - } else { - // Stylized colors - Rgb::new( - (rgb.r + 0.25).min(1.0), - (rgb.g + 0.23).min(1.0), - (rgb.b + 0.10).min(1.0), - ) - } - } else { - Rgb::new(rgb.r, rgb.g, rgb.b) - } - .map(|e| (e * 255.0) as u8); - common::terrain::map::MapSample { - rgb, - alt, - downhill_wpos, - connections: None, - } - } - // Generate standard shaded map - map_config.is_shaded = true; - map_config.generate( - |pos| { - sample_pos( - &map_config, - pos, - &alt, - &rgba, - &map_size, - &map_size_lg, - max_height, - ) - }, - |wpos| { - let pos = wpos.map2(TerrainChunkSize::RECT_SIZE, |e, f| e / f as i32); - rescale_height(if bounds_check(pos) { - scale_height_big(alt[pos]) - } else { - 0.0 - }) - }, - |pos, (r, g, b, a)| { - world_map_rgba[pos.y * map_size.x as usize + pos.x] = - u32::from_le_bytes([r, g, b, a]); - }, - ); - // Generate map with topographical lines and stylized colors - map_config.is_contours = true; - map_config.is_stylized_topo = true; - map_config.generate( - |pos| { - sample_pos( - &map_config, - pos, - &alt, - &rgba, - &map_size, - &map_size_lg, - max_height, - ) - }, - |wpos| { - let pos = wpos.map2(TerrainChunkSize::RECT_SIZE, |e, f| e / f as i32); - rescale_height(if bounds_check(pos) { - scale_height_big(alt[pos]) - } else { - 0.0 - }) - }, - |pos, (r, g, b, a)| { - world_map_topo[pos.y * map_size.x as usize + pos.x] = - u32::from_le_bytes([r, g, b, a]); - }, - ); - let make_raw = |rgb| -> Result<_, Error> { - let mut raw = vec![0u8; 4 * world_map_rgba.len()]; - LittleEndian::write_u32_into(rgb, &mut raw); - Ok(Arc::new( - DynamicImage::ImageRgba8({ - // Should not fail if the dimensions are correct. - let map = - image::ImageBuffer::from_raw(u32::from(map_size.x), u32::from(map_size.y), raw); - map.ok_or_else(|| Error::Other("Server sent a bad world map image".into()))? - }) - // Flip the image, since Voxygen uses an orientation where rotation from - // positive x axis to positive y axis is counterclockwise around the z axis. - .flipv(), - )) - }; - let lod_base = rgba; - let lod_alt = alt; - let world_map_rgb_img = make_raw(&world_map_rgba)?; - let world_map_topo_img = make_raw(&world_map_topo)?; - let world_map_layers = vec![world_map_rgb_img, world_map_topo_img]; - let horizons = (west.0, west.1, east.0, east.1) - .into_par_iter() - .map(|(wa, wh, ea, eh)| u32::from_le_bytes([wa, wh, ea, eh])) - .collect::>(); - let lod_horizon = horizons; - let map_bounds = Vec2::new(sea_level, max_height); - debug!("Done preparing image..."); - - Ok(( - state, - lod_base, - lod_alt, - Grid::from_raw(map_size.map(|e| e as i32), lod_horizon), - (world_map_layers, map_size, map_bounds), - world_map.sites, - world_map.pois, - recipe_book, - component_recipe_book, - max_group_size, - client_timeout, - )) - }).await.expect("Client thread should not panic")?; - }, + res = &mut task => break res.expect("Client thread should not panic")?, _ = ping_interval.tick() => ping_stream.send(PingMsg::Ping)?, } }; @@ -1900,7 +1909,7 @@ impl Client { + TerrainChunkSize::RECT_SIZE.map(|x| x as f32) / 2.0) .distance_squared(pos.0.into()); - let mut terrain = self.state.terrain(); + let terrain = self.state.terrain(); if let Some(chunk) = terrain.get_key_arc(*key) { if !skip_mode && !terrain.contains_key_real(*key) { let chunk = Arc::clone(chunk); @@ -1910,10 +1919,10 @@ impl Client { } else { drop(terrain); if !skip_mode && !self.pending_chunks.contains_key(key) { - const CURRENT_TICK_PENDING_CHUNKS_LIMIT: usize = 8 * 4; + const CURRENT_TICK_PENDING_CHUNKS_LIMIT: usize = /*8 * 4*/2; if self.pending_chunks.len() < TOTAL_PENDING_CHUNKS_LIMIT - && /* current_tick_send_chunk_requests - < CURRENT_TICK_PENDING_CHUNKS_LIMIT */true + && current_tick_send_chunk_requests + < CURRENT_TICK_PENDING_CHUNKS_LIMIT { self.send_msg_err(ClientGeneral::TerrainChunkRequest { key: *key, @@ -2477,7 +2486,22 @@ impl Client { { terrain_cnt += msg.size(); } - self.terrain_tx.send(msg); + match self.terrain_tx.try_send(msg) { + Ok(()) => {}, + Err(mpsc::TrySendError::Full(msg)) => { + // The message thread is fully backed up, which means we can blocking + // process at least the size of the channel in chunk receives. For now, to + // avoid blocking for too long, we just ask for a single chunk. Note that + // since the thread is processed serially, try_recv would actually be + // guaranteed to succeed here, but we avoid relying on that assumption. + let msg_ = self.terrain_rx.recv() + .expect("Deserialization thread shouold not panic")?; + self.handle_terrain_msg(msg_); + self.terrain_tx.send(msg).expect("Deserialization thread should not panic"); + }, + Err(mpsc::TrySendError::Disconnected(_)) => + unreachable!("Deserialization thread should not panic."), + } } /* if !terrain_messages.is_empty() { cnt += terrain_messages.len() as u64; @@ -2537,8 +2561,9 @@ impl Client { return Err(Error::ServerTimeout); } - // ignore network events - while let Some(Ok(Some(event))) = self.participant.as_mut().map(|p| p.try_fetch_event()) { + + while let Some(res) = self.participant.as_mut().and_then(|p| p.try_fetch_event().transpose()) { + let event = res?; trace!(?event, "received network event"); } @@ -2868,6 +2893,12 @@ impl Client { // Handle new messages from the server. self.handle_new_messages()?; + // TODO: avoid emitting these in the first place + self.state + .ecs() + .fetch::>() + .recv_all(); + // 5) Terrain self.tick_terrain()?; let empty = Arc::new(TerrainChunk::new( diff --git a/common/state/Cargo.toml b/common/state/Cargo.toml index 604b8efb6b..80d0ed3fbe 100644 --- a/common/state/Cargo.toml +++ b/common/state/Cargo.toml @@ -20,6 +20,7 @@ core_affinity = "0.5" rayon = "1.5" num_cpus = "1.0" thread-priority = { version = "0.9.2" } +tokio = { version = "1.14", default-features = false, features = ["rt"] } tracing = { version = "0.1", default-features = false } vek = { version = "0.15.8", features = ["serde"] } diff --git a/common/state/src/state.rs b/common/state/src/state.rs index 17e29152e6..a3f10f341c 100644 --- a/common/state/src/state.rs +++ b/common/state/src/state.rs @@ -36,7 +36,7 @@ use specs::{ Component, DispatcherBuilder, Entity as EcsEntity, WorldExt, }; use thread_priority::{ThreadBuilder, ThreadPriority}; -use std::sync::Arc; +use std::sync::{atomic::{AtomicUsize, Ordering}, Arc}; use vek::*; /// How much faster should an in-game day be compared to a real day? @@ -95,36 +95,50 @@ pub struct State { thread_pool: Arc, } -pub type Pools = (usize, GameMode/*u64*/, Arc/*, slowjob::SlowJobPool*/); - +#[derive(Clone,Debug)] +pub struct Pools { + pub slowjob_threads: usize, + game_mode: GameMode, + pub rayon_pool: Arc, + pub runtime: Arc, + /* slowjob: slowjob::SlowJobPool,*/ +} impl State { - pub fn pools(game_mode: GameMode) -> Pools { + pub fn pools(game_mode: GameMode, mut tokio_builder: tokio::runtime::Builder) -> Pools { let num_cpu = num_cpus::get()/* - 1*/; - let (thread_name_infix, rayon_offset) = match game_mode { + let (thread_name_infix, rayon_offset, tokio_count) = match game_mode { // The server does work on both the main thread and the rayon pool, but not at the same // time, so there's no reason to hold back a core from rayon. It does effectively no - // other non-slowjob, non-rayon work that blocks the main thread. - GameMode::Server => ("s", 0), + // other non-slowjob, non-rayon work that blocks the main thread, but we do want to + // leave num_cpu / 4 cores available for tokio, since otherwise TCP buffers can build + // up to unreasonable degrees. + GameMode::Server => ("s", 0, num_cpu / 4), // The client does work on both the main thread and the rayon pool, but not at the same // time. It does run some other non-slowjob and non-rayon threads, but none of them - // block work on the main thread. - GameMode::Client => ("c", 0), + // block work on the main thread. It does not do enough networking for it to be worth + // dedicating anything to the tokio pool. + GameMode::Client => ("c", 0, 0), // Singleplayer does work on both the main thread and the rayon pool; unlike the server // and client cases, the rayon work may interfere with work on one of the main threads, // since the server and client don't coordinate their rayon work. Therefore, we // reserve a core for the main thread(s) from both slowjob and rayon tasks. Since many // CPUs can't afford to lose an entire core during the common periods when the server // and client are not interfering with each other, we still spawn an extra rayon thread - // in this case, but leave it floating so the OS can schedule it. - GameMode::Singleplayer => ("sp", /*2*/1), + // in this case, but leave it floating so the OS can schedule it. Of course, + // singleplayer need not devote any resources to the tokio pool, as it's not expected + // to be in use. + GameMode::Singleplayer => ("sp", /*2*/1, 0), }; let rayon_threads = /*match game_mode { GameMode::Server | GameMode::Client => num_cpu / 2, GameMode::Singleplayer => num_cpu / 4, - }*/num_cpu.max(common::consts::MIN_RECOMMENDED_RAYON_THREADS); + }*/num_cpu/*.saturating_sub(tokio_count)*/.max(common::consts::MIN_RECOMMENDED_RAYON_THREADS); let core_ids = /*(rayon_threads >= 16).then(|| */core_affinity::get_core_ids().unwrap_or(vec![])/*).unwrap_or(vec![])*/; + // Don't pin rayon threads to the cores expected to be used by tokio threads. + /* core_ids.truncate(core_ids.len() - tokio_count); */ + let mut core_ids_ = core_ids.clone(); let core_count = core_ids.len(); let rayon_pool = Arc::new( ThreadPoolBuilder::new() @@ -141,14 +155,14 @@ impl State { } // pinned rayon threads run with high priority let index = thread.index(); - if index.checked_sub(rayon_offset).map_or(false, |i| i < core_count) { + if index.checked_sub(rayon_offset + tokio_count).map_or(false, |i| i < core_count) { b = b.priority(ThreadPriority::Max); } b.spawn_careless(|| thread.run())?; Ok(()) }) .start_handler(move |i| { - if let Some(&core_id) = i.checked_sub(rayon_offset).and_then(|i| core_ids.get(i)) { + if let Some(&core_id) = i.checked_sub(rayon_offset + tokio_count).and_then(|i| core_ids_.get(i)) { core_affinity::set_for_current(core_id); } }) @@ -173,9 +187,49 @@ impl State { /*Arc::clone(*/slow_pool/*)*/, ); */ + // Setup tokio runtime + use tokio::runtime::Builder; + + // TODO: evaluate std::thread::available_concurrency as a num_cpus replacement + let cores = num_cpus::get(); + // We don't need that many threads in the async pool, at least 2 but generally + // 25% of all available will do + let mut core_ids_ = core_ids.clone(); + let runtime = Arc::new( + tokio_builder + .enable_all() + .worker_threads(tokio_count.max(common::consts::MIN_RECOMMENDED_TOKIO_THREADS)) + .thread_name_fn(move || { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + format!("tokio-{}-{}", thread_name_infix, id) + }) + .on_thread_start(move || { + if tokio_count > 0 { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let index = ATOMIC_ID.fetch_add(1, Ordering::SeqCst) % tokio_count; + if let Some(&core_id) = index.checked_add(num_cpu - tokio_count - 1).and_then(|i| core_ids_.get(i)) { + core_affinity::set_for_current(core_id); + } + } + }) + .build() + .unwrap(), + ); + // We always reserve at least one non-slowjob core, if possible, to make sure systems get a - // chance to run unobstructed. - (num_cpu - 1/*slow_limit*//* as u64*/, game_mode, rayon_pool/*, slowjob*/) + // chance to run unobstructed. We also leave any dedicated tokio threads their own + // reserved cores, since networking is quite latency critical on a server. + // + // Note that for all n > 1, n - n / 4 - 1 > 0 (using integer arithmetic), which is fine + // since we require at least 2 hardware threads. + Pools { + slowjob_threads: num_cpu - tokio_count - 1/*slow_limit*//* as u64*/, + game_mode, + rayon_pool, + runtime, + /*slowjob,*/ + } } /// Create a new `State` in client mode. @@ -195,8 +249,8 @@ impl State { GameMode::Singleplayer => "sp", }; */ - let num_cpu = /*num_cpus::get()*/pools.0/* / 2 + pools.0 / 4*/; - let game_mode = pools.1; + let num_cpu = /*num_cpus::get()*/pools.slowjob_threads/* / 2 + pools.0 / 4*/; + let game_mode = pools.game_mode; /* let rayon_threads = match game_mode { GameMode::Server | GameMode::Client => num_cpu/* / 2*/, GameMode::Singleplayer => num_cpu/* / 4*// 2, @@ -248,7 +302,7 @@ impl State { Self { ecs: Self::setup_ecs_world(ecs_role, /*num_cpu as u64*//*, &thread_pool, *//*pools.1*/slowjob/*pools.3*/, map_size_lg, default_chunk), - thread_pool: pools.2, + thread_pool: pools.rayon_pool, } } diff --git a/server-cli/src/main.rs b/server-cli/src/main.rs index 5b81d2d68a..4a30823531 100644 --- a/server-cli/src/main.rs +++ b/server-cli/src/main.rs @@ -18,9 +18,8 @@ use crate::{ tui_runner::Tui, tuilog::TuiLog, }; -use common::{clock::Clock, consts::MIN_RECOMMENDED_TOKIO_THREADS}; +use common::clock::Clock; use common_base::span; -use core::sync::atomic::{AtomicUsize, Ordering}; use server::{persistence::DatabaseSettings, settings::Protocol, Event, Input, Server}; use std::{ io, @@ -71,20 +70,9 @@ fn main() -> io::Result<()> { path }; - // We don't need that many threads in the async pool, at least 2 but generally - // 25% of all available will do - // TODO: evaluate std::thread::available_concurrency as a num_cpus replacement - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads((num_cpus::get() / 4).max(MIN_RECOMMENDED_TOKIO_THREADS)) - .thread_name_fn(|| { - static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); - format!("tokio-server-{}", id) - }) - .build() - .unwrap(), + let pools = common_state::State::pools( + common::resources::GameMode::Server, + tokio::runtime::Builder::new_multi_thread(), ); // Load server settings @@ -109,7 +97,7 @@ fn main() -> io::Result<()> { ArgvCommand::Shared(SharedCommand::Admin { command }) => { let login_provider = server::login_provider::LoginProvider::new( server_settings.auth_server_address, - runtime, + pools.runtime, ); match command { @@ -168,8 +156,7 @@ fn main() -> io::Result<()> { editable_settings, database_settings, &server_data_dir, - runtime, - common_state::State::pools(common::resources::GameMode::Server), + pools, ) .expect("Failed to create server instance!"); diff --git a/server/src/lib.rs b/server/src/lib.rs index 18d4c50177..17dc19679b 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -212,7 +212,6 @@ impl Server { editable_settings: EditableSettings, database_settings: DatabaseSettings, data_dir: &std::path::Path, - runtime: Arc, pools: common_state::Pools, ) -> Result { info!("Server data dir is: {}", data_dir.display()); @@ -251,13 +250,13 @@ impl Server { }, calendar: Some(settings.calendar_mode.calendar_now()), }, - &pools.2, + &pools.rayon_pool, ); #[cfg(not(feature = "worldgen"))] let (world, index) = World::generate(settings.world_seed); #[cfg(feature = "worldgen")] - let map = world.get_map_data(index.as_index_ref(), &pools.2); + let map = world.get_map_data(index.as_index_ref(), &pools.rayon_pool); #[cfg(not(feature = "worldgen"))] let map = WorldMapMsg { dimensions_lg: Vec2::zero(), @@ -270,6 +269,7 @@ impl Server { default_chunk: Arc::new(world.generate_oob_chunk()), }; + let runtime = Arc::clone(&pools.runtime); let mut state = State::server( pools, world.sim().map_size_lg(), diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 6eb77c6d0e..c9c3d90323 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -415,10 +415,12 @@ impl<'a> System<'a> for Sys { chunk }) }).collect::>(); - // Drop chunks in a background thread. - slow_jobs.spawn(&"CHUNK_DROP", async move { - drop(chunks_to_remove); - }); + if !chunks_to_remove.is_empty() { + // Drop chunks in a background thread. + slow_jobs.spawn(&"CHUNK_DROP", async move { + drop(chunks_to_remove); + }); + } } } diff --git a/voxygen/src/lib.rs b/voxygen/src/lib.rs index e04c9062f1..0caa1dfc65 100644 --- a/voxygen/src/lib.rs +++ b/voxygen/src/lib.rs @@ -59,7 +59,6 @@ use i18n::LocalizationHandle; use std::path::PathBuf; use std::sync::Arc; -use tokio::runtime::Runtime; /// A type used to store state that is shared between all play states. pub struct GlobalState { @@ -68,7 +67,6 @@ pub struct GlobalState { pub settings: Settings, pub profile: Profile, pub window: Window, - pub tokio_runtime: Arc, #[cfg(feature = "egui-ui")] pub egui_state: EguiState, pub lazy_init: scene::terrain::SpriteRenderContextLazy, diff --git a/voxygen/src/main.rs b/voxygen/src/main.rs index 6f6c712bde..8ba3c37a0f 100644 --- a/voxygen/src/main.rs +++ b/voxygen/src/main.rs @@ -181,29 +181,6 @@ fn main() { default_hook(panic_info); })); - // Setup tokio runtime - use common::consts::MIN_RECOMMENDED_TOKIO_THREADS; - use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }; - use tokio::runtime::Builder; - - // TODO: evaluate std::thread::available_concurrency as a num_cpus replacement - let cores = num_cpus::get(); - let tokio_runtime = Arc::new( - Builder::new_multi_thread() - .enable_all() - .worker_threads((cores / 4).max(MIN_RECOMMENDED_TOKIO_THREADS)) - .thread_name_fn(|| { - static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); - format!("tokio-voxygen-{}", id) - }) - .build() - .unwrap(), - ); - #[cfg(feature = "hot-reloading")] assets::start_hot_reloading(); @@ -253,7 +230,7 @@ fn main() { // Create window use veloren_voxygen::{error::Error, render::RenderError}; - let (mut window, event_loop) = match Window::new(&settings, &tokio_runtime) { + let (mut window, event_loop) = match Window::new(&settings) { Ok(ok) => ok, // Custom panic message when a graphics backend could not be found Err(Error::RenderError(RenderError::CouldNotFindAdapter)) => { @@ -290,7 +267,6 @@ fn main() { audio, profile, window, - tokio_runtime, #[cfg(feature = "egui-ui")] egui_state, lazy_init, diff --git a/voxygen/src/menu/main/client_init.rs b/voxygen/src/menu/main/client_init.rs index 2d9bbad53c..ef2ca6afa6 100644 --- a/voxygen/src/menu/main/client_init.rs +++ b/voxygen/src/menu/main/client_init.rs @@ -11,7 +11,6 @@ use std::{ }, time::Duration, }; -use tokio::runtime; use tracing::{trace, warn}; #[derive(Debug)] @@ -47,7 +46,6 @@ impl ClientInit { connection_args: ConnectionArgs, username: String, password: String, - runtime: Arc, pools: common_state::Pools, ) -> Self { let (tx, rx) = unbounded(); @@ -55,9 +53,7 @@ impl ClientInit { let cancel = Arc::new(AtomicBool::new(false)); let cancel2 = Arc::clone(&cancel); - let runtime2 = Arc::clone(&runtime); - - runtime.spawn(async move { + pools.runtime.spawn(async move { let trust_fn = |auth_server: &str| { let _ = tx.send(Msg::IsAuthTrusted(auth_server.to_string())); trust_rx @@ -76,7 +72,6 @@ impl ClientInit { let mut mismatched_server_info = None; match Client::new( connection_args.clone(), - Arc::clone(&runtime2), &mut mismatched_server_info, pools.clone(), &username, @@ -87,8 +82,7 @@ impl ClientInit { { Ok(client) => { let _ = tx.send(Msg::Done(Ok(client))); - drop(pools); - tokio::task::block_in_place(move || drop(runtime2)); + tokio::task::block_in_place(move || drop(pools)); return; }, Err(ClientError::NetworkErr(NetworkError::ConnectFailed( @@ -112,9 +106,8 @@ impl ClientInit { // address and all the attempts timed out. let _ = tx.send(Msg::Done(Err(last_err.unwrap_or(Error::ServerNotFound)))); - drop(pools); // Safe drop runtime - tokio::task::block_in_place(move || drop(runtime2)); + tokio::task::block_in_place(move || drop(pools)); }); ClientInit { diff --git a/voxygen/src/menu/main/mod.rs b/voxygen/src/menu/main/mod.rs index b2d5131a54..753af2b280 100644 --- a/voxygen/src/menu/main/mod.rs +++ b/voxygen/src/menu/main/mod.rs @@ -22,7 +22,7 @@ use common_base::span; use i18n::LocalizationHandle; use scene::Scene; use std::sync::Arc; -use tokio::runtime; +use tokio::Builder; use tracing::error; use ui::{Event as MainMenuEvent, MainMenuUi}; @@ -104,7 +104,7 @@ impl PlayState for MainMenuState { "".to_owned(), ConnectionArgs::Mpsc(14004), &mut self.init, - &global_state.tokio_runtime, + &pools.runtime, &global_state.i18n, Some(pools), ); @@ -300,7 +300,6 @@ impl PlayState for MainMenuState { password, connection_args, &mut self.init, - &global_state.tokio_runtime, &global_state.i18n, None, ); @@ -331,7 +330,8 @@ impl PlayState for MainMenuState { }, #[cfg(feature = "singleplayer")] MainMenuEvent::StartSingleplayer => { - let singleplayer = Singleplayer::new(&global_state.tokio_runtime); + + let singleplayer = Singleplayer::new(); global_state.singleplayer = Some(singleplayer); }, @@ -500,7 +500,6 @@ fn attempt_login( password: String, connection_args: ConnectionArgs, init: &mut InitState, - runtime: &Arc, localized_strings: &LocalizationHandle, pools: Option, ) { @@ -533,7 +532,12 @@ fn attempt_login( username, password, Arc::clone(runtime), - pools.unwrap_or_else(|| common_state::State::pools(common::resources::GameMode::Client)), + pools.unwrap_or_else(|| { + common_state::State::pools( + common::resources::GameMode::Client, + tokio::runtime::Builder::new_multi_thread(), + ) + }), )); } } diff --git a/voxygen/src/render/renderer.rs b/voxygen/src/render/renderer.rs index c4193f2b11..f01c2b765d 100644 --- a/voxygen/src/render/renderer.rs +++ b/voxygen/src/render/renderer.rs @@ -198,7 +198,6 @@ impl Renderer { pub fn new( window: &winit::window::Window, mode: RenderMode, - runtime: &tokio::runtime::Runtime, ) -> Result { let (pipeline_modes, mut other_modes) = mode.split(); // Enable seamless cubemaps globally, where available--they are essentially a @@ -312,6 +311,9 @@ impl Renderer { path }); + let runtime = runtime::Builder::new_current_thread().build() + .expect("Failed to create single-threaded tokio runtime (for renderer initialization)."); + let (device, queue) = runtime.block_on(adapter.request_device( &wgpu::DeviceDescriptor { // TODO diff --git a/voxygen/src/singleplayer.rs b/voxygen/src/singleplayer.rs index daad5c90ee..aa6071efad 100644 --- a/voxygen/src/singleplayer.rs +++ b/voxygen/src/singleplayer.rs @@ -12,7 +12,6 @@ use std::{ thread::{self, JoinHandle}, time::Duration, }; -use tokio::runtime::Runtime; use tracing::{error, info, trace, warn}; const TPS: u64 = 30; @@ -30,7 +29,7 @@ pub struct Singleplayer { } impl Singleplayer { - pub fn new(runtime: &Arc) -> Self { + pub fn new() -> Self { let (stop_server_s, stop_server_r) = unbounded(); // Determine folder to save server data in @@ -100,18 +99,19 @@ impl Singleplayer { let (result_sender, result_receiver) = bounded(1); let builder = thread::Builder::new().name("singleplayer-server-thread".into()); - let runtime = Arc::clone(runtime); let thread = builder .spawn(move || { trace!("starting singleplayer server thread"); - let pools = common_state::State::pools(common::resources::GameMode::Singleplayer); + let pools = common_state::State::pools( + common::resources::GameMode::Singleplayer, + tokio::runtime::Builder::new_multi_thread(), + ); let (server, init_result) = match Server::new( settings2, editable_settings, database_settings, &server_data_dir, - runtime, pools.clone(), ) { Ok(server) => (Some(server), Ok(pools)), diff --git a/voxygen/src/window.rs b/voxygen/src/window.rs index 575a9f6dbd..bdfefac062 100644 --- a/voxygen/src/window.rs +++ b/voxygen/src/window.rs @@ -411,10 +411,7 @@ pub struct Window { } impl Window { - pub fn new( - settings: &Settings, - runtime: &tokio::runtime::Runtime, - ) -> Result<(Window, EventLoop), Error> { + pub fn new(settings: &Settings) -> Result<(Window, EventLoop), Error> { let event_loop = EventLoop::new(); let size = settings.graphics.window_size; @@ -434,7 +431,7 @@ impl Window { let window = win_builder.build(&event_loop).unwrap(); - let renderer = Renderer::new(&window, settings.graphics.render_mode.clone(), runtime)?; + let renderer = Renderer::new(&window, settings.graphics.render_mode.clone())?; let keypress_map = HashMap::new();