From 37c41c449facc6735c0f75ba87f2ed29bdcda362 Mon Sep 17 00:00:00 2001 From: Joshua Yanovski Date: Wed, 14 Sep 2022 01:00:01 -0700 Subject: [PATCH] WIP changes --- Cargo.lock | 1 + client/src/bin/swarm/main.rs | 214 ++++++++++++++------------- client/src/lib.rs | 6 +- common/Cargo.toml | 1 + common/src/clock.rs | 15 +- server-cli/src/main.rs | 4 +- server/src/lib.rs | 6 + voxygen/src/lib.rs | 2 + voxygen/src/main.rs | 9 +- voxygen/src/menu/main/client_init.rs | 2 +- voxygen/src/menu/main/mod.rs | 3 - voxygen/src/render/renderer.rs | 4 +- voxygen/src/run.rs | 2 +- voxygen/src/singleplayer.rs | 4 +- voxygen/src/window.rs | 7 +- 15 files changed, 157 insertions(+), 123 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 581b869604..4748122aec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6876,6 +6876,7 @@ dependencies = [ "spin_sleep", "structopt", "strum", + "tokio", "tracing", "tracing-subscriber", "uuid 0.8.2", diff --git a/client/src/bin/swarm/main.rs b/client/src/bin/swarm/main.rs index 58cf31a806..ce612a0ac3 100644 --- a/client/src/bin/swarm/main.rs +++ b/client/src/bin/swarm/main.rs @@ -94,17 +94,16 @@ fn run_client_new_thread( opt: Opt, finished_init: &Arc, ) { - let runtime = Arc::clone(&pools.runtime); - let pools = pools.clone(); + let pools_ = pools.clone(); let finished_init = Arc::clone(finished_init); - thread::spawn(move || { - if let Err(err) = run_client(username, index, to_adminify, pools, opt, finished_init) { + pools.runtime.spawn(async move { + if let Err(err) = run_client(username, index, to_adminify, pools_, opt, finished_init).await { tracing::error!("swarm member {} exited with an error: {:?}", index, err); } }); } -fn run_client( +async fn run_client( username: String, index: u32, to_adminify: Vec, @@ -113,110 +112,115 @@ fn run_client( finished_init: Arc, ) -> Result<(), veloren_client::Error> { let mut clock = common::clock::Clock::new(Duration::from_secs_f32(1.0 / 30.0)); - - let mut tick = |client: &mut Client| -> Result<(), veloren_client::Error> { - clock.tick(); + async fn tick<'a>(client: &'a mut Client, clock: &'a mut common::clock::Clock) -> Result<(), veloren_client::Error> { + clock.tick_slow().await; client.tick_network(clock.dt())?; client.cleanup(); Ok(()) - }; - - let mut run = || -> Result<_, veloren_client::Error> { - // Connect to localhost - let addr = ConnectionArgs::Tcp { - prefer_ipv6: false, - hostname: "localhost".into(), - }; - // NOTE: use a no-auth server - let mut client = pools.runtime.block_on(Client::new( - addr, - &mut None, - pools.clone(), - &username, - "", - |_| false, - ))?; - tracing::info!("Client {} connected", index); - - // Wait for character list to load - client.load_character_list(); - while client.character_list().loading { - tick(&mut client)?; - } - tracing::info!("Client {} loaded character list", index); - - // Create character if none exist - if client.character_list().characters.is_empty() { - client.create_character( - username.clone(), - Some("common.items.weapons.sword.starter".into()), - None, - body(), - ); - - client.load_character_list(); - - while client.character_list().loading || client.character_list().characters.is_empty() { - tick(&mut client)?; - } - } - tracing::info!("Client {} found or created character", index); - - client.set_view_distance(opt.vd); - - // Select the first character - client.request_character( - client - .character_list() - .characters - .first() - .expect("Just created new character if non were listed!!!") - .character - .id - .expect("Why is this an option?"), - ); - - // If this is the admin client then adminify the other swarm members - if !to_adminify.is_empty() { - // Wait for other clients to connect - loop { - tick(&mut client)?; - // NOTE: it's expected that each swarm member will have a unique alias - let players = client.players().collect::>(); - if to_adminify - .iter() - .all(|name| players.contains(&name.as_str())) - { - break; - } - } - // Assert that we are a moderator (assumes we are an admin if so) - assert!( - client.is_moderator(), - "The user needs to ensure \"{}\" is registered as an admin on the server", - username - ); - // Send commands to adminify others - to_adminify.iter().for_each(|name| { - client.send_command("adminify".into(), vec![name.into(), "admin".into()]) - }); - } - - // Wait for moderator - while !client.is_moderator() { - tick(&mut client)?; - } - client.clear_terrain(); - client.request_player_physics(false); - - Ok(client) - }; + } let mut client = loop { - match run() { - Err(e) => tracing::warn!(?e, "Client {} disconnected", index), + let pools = &pools; + let to_adminify = &*to_adminify; + let username = &username; + let clock = &mut clock; + let run = move || async move { + // Connect to localhost + let addr = ConnectionArgs::Tcp { + prefer_ipv6: false, + hostname: "localhost".into(), + }; + // NOTE: use a no-auth server + let mut client = Client::new( + addr, + &mut None, + pools.clone(), + &username, + "", + |_| false, + ).await?; + tracing::info!("Client {} connected", index); + + // Wait for character list to load + client.load_character_list(); + while client.character_list().loading { + tick(&mut client, clock).await?; + } + tracing::info!("Client {} loaded character list", index); + + // Create character if none exist + if client.character_list().characters.is_empty() { + client.create_character( + username.clone(), + Some("common.items.weapons.sword.starter".into()), + None, + body(), + ); + + client.load_character_list(); + + while client.character_list().loading || client.character_list().characters.is_empty() { + tick(&mut client, clock).await?; + } + } + tracing::info!("Client {} found or created character", index); + + client.set_view_distance(opt.vd); + + // Select the first character + client.request_character( + client + .character_list() + .characters + .first() + .expect("Just created new character if non were listed!!!") + .character + .id + .expect("Why is this an option?"), + ); + + // If this is the admin client then adminify the other swarm members + if !to_adminify.is_empty() { + // Wait for other clients to connect + loop { + tick(&mut client, clock).await?; + // NOTE: it's expected that each swarm member will have a unique alias + let players = client.players().collect::>(); + if to_adminify + .iter() + .all(|name| players.contains(&name.as_str())) + { + break; + } + } + // Assert that we are a moderator (assumes we are an admin if so) + assert!( + client.is_moderator(), + "The user needs to ensure \"{}\" is registered as an admin on the server", + username + ); + // Send commands to adminify others + to_adminify.iter().for_each(|name| { + client.send_command("adminify".into(), vec![name.into(), "admin".into()]) + }); + } + + // Wait for moderator + while !client.is_moderator() { + tick(&mut client, clock).await?; + } + client.clear_terrain(); + client.request_player_physics(false); + + Ok::<_, veloren_client::Error>(client) + }; + + match run().await { + Err(e) => { + tokio::time::sleep(Duration::from_secs(1)); + tracing::warn!(?e, "Client {} disconnected", index) + }, Ok(client) => { - thread::sleep(Duration::from_secs(1)); break client }, } @@ -226,7 +230,7 @@ fn run_client( finished_init.fetch_add(1, Ordering::Relaxed); // Wait for initialization of all other swarm clients to finish while finished_init.load(Ordering::Relaxed) != opt.size { - tick(&mut client)?; + tick(&mut client, &mut clock).await?; } // Use this check so this is only printed once @@ -239,7 +243,7 @@ fn run_client( loop { // TODO: doesn't seem to produce an error when server is shutdown (process keeps // running) - tick(&mut client)?; + tick(&mut client, &mut clock).await?; let entity = client.entity(); // Move or stay still depending on specified options // TODO: make sure server cheat protections aren't triggering diff --git a/client/src/lib.rs b/client/src/lib.rs index 6382b86dee..c4e32fde7e 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -295,7 +295,7 @@ pub struct CharacterList { } /// Higher than what's needed at VD = 65. -const TOTAL_PENDING_CHUNKS_LIMIT: usize = /*1024*//*13800*/12; +const TOTAL_PENDING_CHUNKS_LIMIT: usize = /*1024*/13800; impl Client { pub async fn new( @@ -2010,8 +2010,8 @@ impl Client { if !skip_mode && !self.pending_chunks.contains_key(key) { const CURRENT_TICK_PENDING_CHUNKS_LIMIT: usize = /*8 * 4*/2; if self.pending_chunks.len() < TOTAL_PENDING_CHUNKS_LIMIT - && current_tick_send_chunk_requests - < CURRENT_TICK_PENDING_CHUNKS_LIMIT + && /* current_tick_send_chunk_requests + < CURRENT_TICK_PENDING_CHUNKS_LIMIT */true { self.send_msg_err(ClientGeneral::TerrainChunkRequest { key: *key, diff --git a/common/Cargo.toml b/common/Cargo.toml index 1abe07a529..5a2dc52956 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -65,6 +65,7 @@ rayon = "1.5" roots = "0.0.6" spin_sleep = "1.0" tracing = { version = "0.1", default-features = false } +tokio = { version = "1.14", default-features = false, features = ["time", "rt"] } uuid = { version = "0.8.1", default-features = false, features = ["serde", "v4"] } rand = "0.8" fxhash = "0.2.1" diff --git a/common/src/clock.rs b/common/src/clock.rs index cbc86839ca..aebcf2b0f5 100644 --- a/common/src/clock.rs +++ b/common/src/clock.rs @@ -1,4 +1,5 @@ use common_base::span; +use core::future::Future; use ordered_float::NotNan; use std::{ collections::VecDeque, @@ -103,8 +104,18 @@ impl Clock { } } + pub async fn tick(&mut self) { + self.tick_inner(|duration| async move { spin_sleep::sleep(duration); }).await; + } + + pub async fn tick_slow(&mut self) { + self.tick_inner(tokio::time::sleep).await; + } + /// Do not modify without asking @xMAC94x first! - pub fn tick(&mut self) { + pub async fn tick_inner(&mut self, sleep: impl FnOnce(Duration) -> F) + where F: Future, + { span!(_guard, "tick", "Clock::tick"); span!(guard, "clock work"); let current_sys_time = Instant::now(); @@ -116,7 +127,7 @@ impl Clock { drop(guard); // Attempt to sleep to fill the gap. if let Some(sleep_dur) = self.target_dt.checked_sub(busy_delta) { - spin_sleep::sleep(sleep_dur); + sleep(sleep_dur).await; } let after_sleep_sys_time = Instant::now(); diff --git a/server-cli/src/main.rs b/server-cli/src/main.rs index d4405e207a..935d99a3bb 100644 --- a/server-cli/src/main.rs +++ b/server-cli/src/main.rs @@ -198,6 +198,7 @@ fn main() -> io::Result<()> { // Wait for a tick so we don't start with a zero dt let mut tick_no = 0u64; + server.runtime().clone().block_on(async { loop { tick_no += 1; span!(guard, "work"); @@ -273,10 +274,11 @@ fn main() -> io::Result<()> { drop(guard); // Wait for the next tick. - clock.tick(); + clock.tick().await; #[cfg(feature = "tracy")] common_base::tracy_client::frame_mark(); } + }); Ok(()) } diff --git a/server/src/lib.rs b/server/src/lib.rs index 4fb8f9ce2c..e49d83e82c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -630,6 +630,12 @@ impl Server { self.state.ecs().fetch::() } + /// Get a reference to the client's runtime thread pool. This pool should be + /// used for any computationally expensive operations that run outside + /// of the main thread (i.e., threads that block on I/O operations are + /// exempt). + pub fn runtime(&self) -> &Arc { &self.runtime } + /// Get a reference to the server's game state. pub fn state(&self) -> &State { &self.state } diff --git a/voxygen/src/lib.rs b/voxygen/src/lib.rs index c4de90adf4..98db36fd37 100644 --- a/voxygen/src/lib.rs +++ b/voxygen/src/lib.rs @@ -61,6 +61,7 @@ use i18n::LocalizationHandle; use std::path::PathBuf; use std::sync::Arc; +use tokio::runtime::Runtime; /// A type used to store state that is shared between all play states. pub struct GlobalState { @@ -69,6 +70,7 @@ pub struct GlobalState { pub settings: Settings, pub profile: Profile, pub window: Window, + pub runtime: Runtime, #[cfg(feature = "egui-ui")] pub egui_state: EguiState, pub lazy_init: scene::terrain::SpriteRenderContextLazy, diff --git a/voxygen/src/main.rs b/voxygen/src/main.rs index a5aab7e4c3..3c4b187ec7 100644 --- a/voxygen/src/main.rs +++ b/voxygen/src/main.rs @@ -142,9 +142,15 @@ fn main() { }); i18n.set_english_fallback(settings.language.use_english_fallback); + let runtime = tokio::runtime::Builder::new_current_thread() + .max_blocking_threads(1) + .enable_time() + .build() + .expect("Failed to create tokio runtime."); + // Create window use veloren_voxygen::{error::Error, render::RenderError}; - let (mut window, event_loop) = match Window::new(&settings) { + let (mut window, event_loop) = match Window::new(&settings, &runtime) { Ok(ok) => ok, // Custom panic message when a graphics backend could not be found Err(Error::RenderError(RenderError::CouldNotFindAdapter)) => { @@ -188,6 +194,7 @@ fn main() { audio, profile, window, + runtime, #[cfg(feature = "egui-ui")] egui_state, lazy_init, diff --git a/voxygen/src/menu/main/client_init.rs b/voxygen/src/menu/main/client_init.rs index ef2ca6afa6..05090c4fbd 100644 --- a/voxygen/src/menu/main/client_init.rs +++ b/voxygen/src/menu/main/client_init.rs @@ -53,7 +53,7 @@ impl ClientInit { let cancel = Arc::new(AtomicBool::new(false)); let cancel2 = Arc::clone(&cancel); - pools.runtime.spawn(async move { + pools.clone().runtime.spawn(async move { let trust_fn = |auth_server: &str| { let _ = tx.send(Msg::IsAuthTrusted(auth_server.to_string())); trust_rx diff --git a/voxygen/src/menu/main/mod.rs b/voxygen/src/menu/main/mod.rs index 7721b30975..486b2a2538 100644 --- a/voxygen/src/menu/main/mod.rs +++ b/voxygen/src/menu/main/mod.rs @@ -22,7 +22,6 @@ use common_base::span; use i18n::LocalizationHandle; use scene::Scene; use std::sync::Arc; -use tokio::Builder; use tracing::error; use ui::{Event as MainMenuEvent, MainMenuUi}; @@ -107,7 +106,6 @@ impl PlayState for MainMenuState { "".to_owned(), ConnectionArgs::Mpsc(14004), &mut self.init, - &pools.runtime, &global_state.i18n, Some(pools), ); @@ -559,7 +557,6 @@ fn attempt_login( connection_args, username, password, - Arc::clone(runtime), pools.unwrap_or_else(|| { common_state::State::pools( common::resources::GameMode::Client, diff --git a/voxygen/src/render/renderer.rs b/voxygen/src/render/renderer.rs index 5bd6bca1fe..f2b14baeb5 100644 --- a/voxygen/src/render/renderer.rs +++ b/voxygen/src/render/renderer.rs @@ -199,6 +199,7 @@ impl Renderer { pub fn new( window: &winit::window::Window, mode: RenderMode, + runtime: &tokio::runtime::Runtime, ) -> Result { let (pipeline_modes, mut other_modes) = mode.split(); // Enable seamless cubemaps globally, where available--they are essentially a @@ -312,9 +313,6 @@ impl Renderer { path }); - let runtime = runtime::Builder::new_current_thread().build() - .expect("Failed to create single-threaded tokio runtime (for renderer initialization)."); - let (device, queue) = runtime.block_on(adapter.request_device( &wgpu::DeviceDescriptor { // TODO diff --git a/voxygen/src/run.rs b/voxygen/src/run.rs index aa30002f50..efeb553c84 100644 --- a/voxygen/src/run.rs +++ b/voxygen/src/run.rs @@ -269,7 +269,7 @@ fn handle_main_events_cleared( global_state .clock .set_target_dt(Duration::from_secs_f64(1.0 / target_fps as f64)); - global_state.clock.tick(); + global_state.runtime.block_on(global_state.clock.tick_slow()); drop(guard); #[cfg(feature = "tracy")] common_base::tracy_client::frame_mark(); diff --git a/voxygen/src/singleplayer.rs b/voxygen/src/singleplayer.rs index aa6071efad..bd2c494d39 100644 --- a/voxygen/src/singleplayer.rs +++ b/voxygen/src/singleplayer.rs @@ -162,6 +162,7 @@ impl Drop for Singleplayer { fn run_server(mut server: Server, stop_server_r: Receiver<()>, paused: Arc) { info!("Starting server-cli..."); + Arc::clone(&server.runtime()).block_on(async { // Set up an fps clock let mut clock = Clock::new(Duration::from_secs_f64(1.0 / TPS as f64)); @@ -174,7 +175,7 @@ fn run_server(mut server: Server, stop_server_r: Receiver<()>, paused: Arc, paused: Arc Result<(Window, EventLoop), Error> { + pub fn new( + settings: &Settings, + runtime: &tokio::runtime::Runtime, + ) -> Result<(Window, EventLoop), Error> { let event_loop = EventLoop::new(); let size = settings.graphics.window_size; @@ -431,7 +434,7 @@ impl Window { let window = win_builder.build(&event_loop).unwrap(); - let renderer = Renderer::new(&window, settings.graphics.render_mode.clone())?; + let renderer = Renderer::new(&window, settings.graphics.render_mode.clone(), runtime)?; let keypress_map = HashMap::new();