WIP changes

This commit is contained in:
Joshua Yanovski
2022-09-14 01:00:01 -07:00
parent f12812c7ee
commit ecb68acb77
15 changed files with 157 additions and 123 deletions

1
Cargo.lock generated
View File

@ -6636,6 +6636,7 @@ dependencies = [
"spin_sleep", "spin_sleep",
"structopt", "structopt",
"strum", "strum",
"tokio",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"uuid", "uuid",

View File

@ -89,17 +89,16 @@ fn run_client_new_thread(
opt: Opt, opt: Opt,
finished_init: &Arc<AtomicU32>, finished_init: &Arc<AtomicU32>,
) { ) {
let runtime = Arc::clone(&pools.runtime); let pools_ = pools.clone();
let pools = pools.clone();
let finished_init = Arc::clone(finished_init); let finished_init = Arc::clone(finished_init);
thread::spawn(move || { pools.runtime.spawn(async move {
if let Err(err) = run_client(username, index, to_adminify, pools, opt, finished_init) { if let Err(err) = run_client(username, index, to_adminify, pools_, opt, finished_init).await {
tracing::error!("swarm member {} exited with an error: {:?}", index, err); tracing::error!("swarm member {} exited with an error: {:?}", index, err);
} }
}); });
} }
fn run_client( async fn run_client(
username: String, username: String,
index: u32, index: u32,
to_adminify: Vec<String>, to_adminify: Vec<String>,
@ -108,110 +107,115 @@ fn run_client(
finished_init: Arc<AtomicU32>, finished_init: Arc<AtomicU32>,
) -> Result<(), veloren_client::Error> { ) -> Result<(), veloren_client::Error> {
let mut clock = common::clock::Clock::new(Duration::from_secs_f32(1.0 / 30.0)); let mut clock = common::clock::Clock::new(Duration::from_secs_f32(1.0 / 30.0));
async fn tick<'a>(client: &'a mut Client, clock: &'a mut common::clock::Clock) -> Result<(), veloren_client::Error> {
let mut tick = |client: &mut Client| -> Result<(), veloren_client::Error> { clock.tick_slow().await;
clock.tick();
client.tick_network(clock.dt())?; client.tick_network(clock.dt())?;
client.cleanup(); client.cleanup();
Ok(()) Ok(())
}; }
let mut run = || -> Result<_, veloren_client::Error> {
// Connect to localhost
let addr = ConnectionArgs::Tcp {
prefer_ipv6: false,
hostname: "localhost".into(),
};
// NOTE: use a no-auth server
let mut client = pools.runtime.block_on(Client::new(
addr,
&mut None,
pools.clone(),
&username,
"",
|_| false,
))?;
tracing::info!("Client {} connected", index);
// Wait for character list to load
client.load_character_list();
while client.character_list().loading {
tick(&mut client)?;
}
tracing::info!("Client {} loaded character list", index);
// Create character if none exist
if client.character_list().characters.is_empty() {
client.create_character(
username.clone(),
Some("common.items.weapons.sword.starter".into()),
None,
body(),
);
client.load_character_list();
while client.character_list().loading || client.character_list().characters.is_empty() {
tick(&mut client)?;
}
}
tracing::info!("Client {} found or created character", index);
client.set_view_distance(opt.vd);
// Select the first character
client.request_character(
client
.character_list()
.characters
.first()
.expect("Just created new character if non were listed!!!")
.character
.id
.expect("Why is this an option?"),
);
// If this is the admin client then adminify the other swarm members
if !to_adminify.is_empty() {
// Wait for other clients to connect
loop {
tick(&mut client)?;
// NOTE: it's expected that each swarm member will have a unique alias
let players = client.players().collect::<HashSet<&str>>();
if to_adminify
.iter()
.all(|name| players.contains(&name.as_str()))
{
break;
}
}
// Assert that we are a moderator (assumes we are an admin if so)
assert!(
client.is_moderator(),
"The user needs to ensure \"{}\" is registered as an admin on the server",
username
);
// Send commands to adminify others
to_adminify.iter().for_each(|name| {
client.send_command("adminify".into(), vec![name.into(), "admin".into()])
});
}
// Wait for moderator
while !client.is_moderator() {
tick(&mut client)?;
}
client.clear_terrain();
client.request_player_physics(false);
Ok(client)
};
let mut client = loop { let mut client = loop {
match run() { let pools = &pools;
Err(e) => tracing::warn!(?e, "Client {} disconnected", index), let to_adminify = &*to_adminify;
let username = &username;
let clock = &mut clock;
let run = move || async move {
// Connect to localhost
let addr = ConnectionArgs::Tcp {
prefer_ipv6: false,
hostname: "localhost".into(),
};
// NOTE: use a no-auth server
let mut client = Client::new(
addr,
&mut None,
pools.clone(),
&username,
"",
|_| false,
).await?;
tracing::info!("Client {} connected", index);
// Wait for character list to load
client.load_character_list();
while client.character_list().loading {
tick(&mut client, clock).await?;
}
tracing::info!("Client {} loaded character list", index);
// Create character if none exist
if client.character_list().characters.is_empty() {
client.create_character(
username.clone(),
Some("common.items.weapons.sword.starter".into()),
None,
body(),
);
client.load_character_list();
while client.character_list().loading || client.character_list().characters.is_empty() {
tick(&mut client, clock).await?;
}
}
tracing::info!("Client {} found or created character", index);
client.set_view_distance(opt.vd);
// Select the first character
client.request_character(
client
.character_list()
.characters
.first()
.expect("Just created new character if non were listed!!!")
.character
.id
.expect("Why is this an option?"),
);
// If this is the admin client then adminify the other swarm members
if !to_adminify.is_empty() {
// Wait for other clients to connect
loop {
tick(&mut client, clock).await?;
// NOTE: it's expected that each swarm member will have a unique alias
let players = client.players().collect::<HashSet<&str>>();
if to_adminify
.iter()
.all(|name| players.contains(&name.as_str()))
{
break;
}
}
// Assert that we are a moderator (assumes we are an admin if so)
assert!(
client.is_moderator(),
"The user needs to ensure \"{}\" is registered as an admin on the server",
username
);
// Send commands to adminify others
to_adminify.iter().for_each(|name| {
client.send_command("adminify".into(), vec![name.into(), "admin".into()])
});
}
// Wait for moderator
while !client.is_moderator() {
tick(&mut client, clock).await?;
}
client.clear_terrain();
client.request_player_physics(false);
Ok::<_, veloren_client::Error>(client)
};
match run().await {
Err(e) => {
tokio::time::sleep(Duration::from_secs(1));
tracing::warn!(?e, "Client {} disconnected", index)
},
Ok(client) => { Ok(client) => {
thread::sleep(Duration::from_secs(1));
break client break client
}, },
} }
@ -221,7 +225,7 @@ fn run_client(
finished_init.fetch_add(1, Ordering::Relaxed); finished_init.fetch_add(1, Ordering::Relaxed);
// Wait for initialization of all other swarm clients to finish // Wait for initialization of all other swarm clients to finish
while finished_init.load(Ordering::Relaxed) != opt.size { while finished_init.load(Ordering::Relaxed) != opt.size {
tick(&mut client)?; tick(&mut client, &mut clock).await?;
} }
// Use this check so this is only printed once // Use this check so this is only printed once
@ -239,7 +243,7 @@ fn run_client(
loop { loop {
// TODO: doesn't seem to produce an error when server is shutdown (process keeps // TODO: doesn't seem to produce an error when server is shutdown (process keeps
// running) // running)
tick(&mut client)?; tick(&mut client, &mut clock).await?;
let entity = client.entity(); let entity = client.entity();
// Move or stay still depending on specified options // Move or stay still depending on specified options
// TODO: make sure server cheat protections aren't triggering // TODO: make sure server cheat protections aren't triggering

View File

@ -281,7 +281,7 @@ pub struct CharacterList {
} }
/// Higher than what's needed at VD = 65. /// Higher than what's needed at VD = 65.
const TOTAL_PENDING_CHUNKS_LIMIT: usize = /*1024*//*13800*/12; const TOTAL_PENDING_CHUNKS_LIMIT: usize = /*1024*/13800;
impl Client { impl Client {
pub async fn new( pub async fn new(
@ -1921,8 +1921,8 @@ impl Client {
if !skip_mode && !self.pending_chunks.contains_key(key) { if !skip_mode && !self.pending_chunks.contains_key(key) {
const CURRENT_TICK_PENDING_CHUNKS_LIMIT: usize = /*8 * 4*/2; const CURRENT_TICK_PENDING_CHUNKS_LIMIT: usize = /*8 * 4*/2;
if self.pending_chunks.len() < TOTAL_PENDING_CHUNKS_LIMIT if self.pending_chunks.len() < TOTAL_PENDING_CHUNKS_LIMIT
&& current_tick_send_chunk_requests && /* current_tick_send_chunk_requests
< CURRENT_TICK_PENDING_CHUNKS_LIMIT < CURRENT_TICK_PENDING_CHUNKS_LIMIT */true
{ {
self.send_msg_err(ClientGeneral::TerrainChunkRequest { self.send_msg_err(ClientGeneral::TerrainChunkRequest {
key: *key, key: *key,

View File

@ -64,6 +64,7 @@ rayon = "1.5"
roots = "0.0.6" roots = "0.0.6"
spin_sleep = "1.0" spin_sleep = "1.0"
tracing = { version = "0.1", default-features = false } tracing = { version = "0.1", default-features = false }
tokio = { version = "1.14", default-features = false, features = ["time", "rt"] }
uuid = { version = "0.8.1", default-features = false, features = ["serde", "v4"] } uuid = { version = "0.8.1", default-features = false, features = ["serde", "v4"] }
rand = "0.8" rand = "0.8"
fxhash = "0.2.1" fxhash = "0.2.1"

View File

@ -1,4 +1,5 @@
use common_base::span; use common_base::span;
use core::future::Future;
use ordered_float::NotNan; use ordered_float::NotNan;
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
@ -92,8 +93,18 @@ impl Clock {
} }
} }
pub async fn tick(&mut self) {
self.tick_inner(|duration| async move { spin_sleep::sleep(duration); }).await;
}
pub async fn tick_slow(&mut self) {
self.tick_inner(tokio::time::sleep).await;
}
/// Do not modify without asking @xMAC94x first! /// Do not modify without asking @xMAC94x first!
pub fn tick(&mut self) { pub async fn tick_inner<F>(&mut self, sleep: impl FnOnce(Duration) -> F)
where F: Future<Output=()>,
{
span!(_guard, "tick", "Clock::tick"); span!(_guard, "tick", "Clock::tick");
span!(guard, "clock work"); span!(guard, "clock work");
let current_sys_time = Instant::now(); let current_sys_time = Instant::now();
@ -105,7 +116,7 @@ impl Clock {
drop(guard); drop(guard);
// Attempt to sleep to fill the gap. // Attempt to sleep to fill the gap.
if let Some(sleep_dur) = self.target_dt.checked_sub(busy_delta) { if let Some(sleep_dur) = self.target_dt.checked_sub(busy_delta) {
spin_sleep::sleep(sleep_dur); sleep(sleep_dur).await;
} }
let after_sleep_sys_time = Instant::now(); let after_sleep_sys_time = Instant::now();

View File

@ -185,6 +185,7 @@ fn main() -> io::Result<()> {
// Wait for a tick so we don't start with a zero dt // Wait for a tick so we don't start with a zero dt
let mut tick_no = 0u64; let mut tick_no = 0u64;
server.runtime().clone().block_on(async {
loop { loop {
tick_no += 1; tick_no += 1;
span!(guard, "work"); span!(guard, "work");
@ -260,10 +261,11 @@ fn main() -> io::Result<()> {
drop(guard); drop(guard);
// Wait for the next tick. // Wait for the next tick.
clock.tick(); clock.tick().await;
#[cfg(feature = "tracy")] #[cfg(feature = "tracy")]
common_base::tracy_client::frame_mark(); common_base::tracy_client::frame_mark();
} }
});
Ok(()) Ok(())
} }

View File

@ -634,6 +634,12 @@ impl Server {
self.state.ecs().fetch::<DataDir>() self.state.ecs().fetch::<DataDir>()
} }
/// Get a reference to the client's runtime thread pool. This pool should be
/// used for any computationally expensive operations that run outside
/// of the main thread (i.e., threads that block on I/O operations are
/// exempt).
pub fn runtime(&self) -> &Arc<Runtime> { &self.runtime }
/// Get a reference to the server's game state. /// Get a reference to the server's game state.
pub fn state(&self) -> &State { &self.state } pub fn state(&self) -> &State { &self.state }

View File

@ -59,6 +59,7 @@ use i18n::LocalizationHandle;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::Runtime;
/// A type used to store state that is shared between all play states. /// A type used to store state that is shared between all play states.
pub struct GlobalState { pub struct GlobalState {
@ -67,6 +68,7 @@ pub struct GlobalState {
pub settings: Settings, pub settings: Settings,
pub profile: Profile, pub profile: Profile,
pub window: Window, pub window: Window,
pub runtime: Runtime,
#[cfg(feature = "egui-ui")] #[cfg(feature = "egui-ui")]
pub egui_state: EguiState, pub egui_state: EguiState,
pub lazy_init: scene::terrain::SpriteRenderContextLazy, pub lazy_init: scene::terrain::SpriteRenderContextLazy,

View File

@ -228,9 +228,15 @@ fn main() {
i18n.read().log_missing_entries(); i18n.read().log_missing_entries();
i18n.set_english_fallback(settings.language.use_english_fallback); i18n.set_english_fallback(settings.language.use_english_fallback);
let runtime = tokio::runtime::Builder::new_current_thread()
.max_blocking_threads(1)
.enable_time()
.build()
.expect("Failed to create tokio runtime.");
// Create window // Create window
use veloren_voxygen::{error::Error, render::RenderError}; use veloren_voxygen::{error::Error, render::RenderError};
let (mut window, event_loop) = match Window::new(&settings) { let (mut window, event_loop) = match Window::new(&settings, &runtime) {
Ok(ok) => ok, Ok(ok) => ok,
// Custom panic message when a graphics backend could not be found // Custom panic message when a graphics backend could not be found
Err(Error::RenderError(RenderError::CouldNotFindAdapter)) => { Err(Error::RenderError(RenderError::CouldNotFindAdapter)) => {
@ -267,6 +273,7 @@ fn main() {
audio, audio,
profile, profile,
window, window,
runtime,
#[cfg(feature = "egui-ui")] #[cfg(feature = "egui-ui")]
egui_state, egui_state,
lazy_init, lazy_init,

View File

@ -53,7 +53,7 @@ impl ClientInit {
let cancel = Arc::new(AtomicBool::new(false)); let cancel = Arc::new(AtomicBool::new(false));
let cancel2 = Arc::clone(&cancel); let cancel2 = Arc::clone(&cancel);
pools.runtime.spawn(async move { pools.clone().runtime.spawn(async move {
let trust_fn = |auth_server: &str| { let trust_fn = |auth_server: &str| {
let _ = tx.send(Msg::IsAuthTrusted(auth_server.to_string())); let _ = tx.send(Msg::IsAuthTrusted(auth_server.to_string()));
trust_rx trust_rx

View File

@ -22,7 +22,6 @@ use common_base::span;
use i18n::LocalizationHandle; use i18n::LocalizationHandle;
use scene::Scene; use scene::Scene;
use std::sync::Arc; use std::sync::Arc;
use tokio::Builder;
use tracing::error; use tracing::error;
use ui::{Event as MainMenuEvent, MainMenuUi}; use ui::{Event as MainMenuEvent, MainMenuUi};
@ -104,7 +103,6 @@ impl PlayState for MainMenuState {
"".to_owned(), "".to_owned(),
ConnectionArgs::Mpsc(14004), ConnectionArgs::Mpsc(14004),
&mut self.init, &mut self.init,
&pools.runtime,
&global_state.i18n, &global_state.i18n,
Some(pools), Some(pools),
); );
@ -531,7 +529,6 @@ fn attempt_login(
connection_args, connection_args,
username, username,
password, password,
Arc::clone(runtime),
pools.unwrap_or_else(|| { pools.unwrap_or_else(|| {
common_state::State::pools( common_state::State::pools(
common::resources::GameMode::Client, common::resources::GameMode::Client,

View File

@ -198,6 +198,7 @@ impl Renderer {
pub fn new( pub fn new(
window: &winit::window::Window, window: &winit::window::Window,
mode: RenderMode, mode: RenderMode,
runtime: &tokio::runtime::Runtime,
) -> Result<Self, RenderError> { ) -> Result<Self, RenderError> {
let (pipeline_modes, mut other_modes) = mode.split(); let (pipeline_modes, mut other_modes) = mode.split();
// Enable seamless cubemaps globally, where available--they are essentially a // Enable seamless cubemaps globally, where available--they are essentially a
@ -311,9 +312,6 @@ impl Renderer {
path path
}); });
let runtime = runtime::Builder::new_current_thread().build()
.expect("Failed to create single-threaded tokio runtime (for renderer initialization).");
let (device, queue) = runtime.block_on(adapter.request_device( let (device, queue) = runtime.block_on(adapter.request_device(
&wgpu::DeviceDescriptor { &wgpu::DeviceDescriptor {
// TODO // TODO

View File

@ -260,7 +260,7 @@ fn handle_main_events_cleared(
global_state global_state
.clock .clock
.set_target_dt(Duration::from_secs_f64(1.0 / target_fps as f64)); .set_target_dt(Duration::from_secs_f64(1.0 / target_fps as f64));
global_state.clock.tick(); global_state.runtime.block_on(global_state.clock.tick_slow());
drop(guard); drop(guard);
#[cfg(feature = "tracy")] #[cfg(feature = "tracy")]
common_base::tracy_client::frame_mark(); common_base::tracy_client::frame_mark();

View File

@ -162,6 +162,7 @@ impl Drop for Singleplayer {
fn run_server(mut server: Server, stop_server_r: Receiver<()>, paused: Arc<AtomicBool>) { fn run_server(mut server: Server, stop_server_r: Receiver<()>, paused: Arc<AtomicBool>) {
info!("Starting server-cli..."); info!("Starting server-cli...");
Arc::clone(&server.runtime()).block_on(async {
// Set up an fps clock // Set up an fps clock
let mut clock = Clock::new(Duration::from_secs_f64(1.0 / TPS as f64)); let mut clock = Clock::new(Duration::from_secs_f64(1.0 / TPS as f64));
@ -174,7 +175,7 @@ fn run_server(mut server: Server, stop_server_r: Receiver<()>, paused: Arc<Atomi
} }
// Wait for the next tick. // Wait for the next tick.
clock.tick(); clock.tick().await;
// Skip updating the server if it's paused // Skip updating the server if it's paused
if paused.load(Ordering::SeqCst) && server.number_of_players() < 2 { if paused.load(Ordering::SeqCst) && server.number_of_players() < 2 {
@ -198,4 +199,5 @@ fn run_server(mut server: Server, stop_server_r: Receiver<()>, paused: Arc<Atomi
// Clean up the server after a tick. // Clean up the server after a tick.
server.cleanup(); server.cleanup();
} }
});
} }

View File

@ -411,7 +411,10 @@ pub struct Window {
} }
impl Window { impl Window {
pub fn new(settings: &Settings) -> Result<(Window, EventLoop), Error> { pub fn new(
settings: &Settings,
runtime: &tokio::runtime::Runtime,
) -> Result<(Window, EventLoop), Error> {
let event_loop = EventLoop::new(); let event_loop = EventLoop::new();
let size = settings.graphics.window_size; let size = settings.graphics.window_size;
@ -431,7 +434,7 @@ impl Window {
let window = win_builder.build(&event_loop).unwrap(); let window = win_builder.build(&event_loop).unwrap();
let renderer = Renderer::new(&window, settings.graphics.render_mode.clone())?; let renderer = Renderer::new(&window, settings.graphics.render_mode.clone(), runtime)?;
let keypress_map = HashMap::new(); let keypress_map = HashMap::new();