From 77c90b2c7c851c1beec0e1d755d92d5a32901efd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Wed, 1 Jul 2020 09:30:38 +0200 Subject: [PATCH 1/3] doing a simple 1:1 swap out network coding this is the part which prob has less Merge conflics and is easy to rebase the next commit will have prob alot of merge conflics followed by a fmt commit --- Cargo.lock | 16 +- client/Cargo.toml | 4 + client/src/error.rs | 18 +- client/src/lib.rs | 244 ++++++----- common/src/lib.rs | 28 +- common/src/net/data.rs | 19 - common/src/net/mod.rs | 14 - common/src/net/post.rs | 628 --------------------------- common/src/net/post2.rs | 439 ------------------- server/Cargo.toml | 4 + server/src/client.rs | 16 +- server/src/error.rs | 18 +- server/src/lib.rs | 44 +- server/src/sys/waypoint.rs | 3 +- voxygen/src/menu/main/client_init.rs | 7 +- voxygen/src/menu/main/mod.rs | 12 +- voxygen/src/singleplayer.rs | 14 +- 17 files changed, 243 insertions(+), 1285 deletions(-) delete mode 100644 common/src/net/data.rs delete mode 100644 common/src/net/mod.rs delete mode 100644 common/src/net/post.rs delete mode 100644 common/src/net/post2.rs diff --git a/Cargo.lock b/Cargo.lock index 97153bac97..b2d47b3ea8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,7 +163,7 @@ dependencies = [ "crossbeam-utils 0.7.2", "futures-core", "futures-io", - "futures-timer", + "futures-timer 2.0.2", "kv-log-macro", "log", "memchr", @@ -1370,6 +1370,12 @@ version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1de7508b218029b0f01662ed8f61b1c964b3ae99d6f25462d0f55a595109df6" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.5" @@ -4448,6 +4454,9 @@ version = "0.6.0" dependencies = [ "authc", "byteorder 1.3.4", + "futures-executor", + "futures-timer 3.0.2", + "futures-util", "hashbrown", "image", "num_cpus", @@ -4456,6 +4465,7 @@ dependencies = [ "uvth 3.1.1", "vek", "veloren-common", + "veloren_network", ] [[package]] @@ -4499,6 +4509,9 @@ dependencies = [ "diesel", "diesel_migrations", "dotenv", + "futures-executor", + "futures-timer 3.0.2", + "futures-util", "hashbrown", "lazy_static", "libsqlite3-sys", @@ -4519,6 +4532,7 @@ dependencies = [ "vek", "veloren-common", "veloren-world", + "veloren_network", ] [[package]] diff --git a/client/Cargo.toml b/client/Cargo.toml index 73fe3dc06e..7a3c36edfa 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -6,9 +6,13 @@ edition = "2018" [dependencies] common = { package = "veloren-common", path = "../common", features = ["no-assets"] } +network = { package = "veloren_network", path = "../network", default-features = false } byteorder = "1.3.2" uvth = "3.1.1" +futures-util = "0.3" +futures-executor = "0.3" +futures-timer = "3.0" image = { version = "0.22.3", default-features = false, features = ["png"] } num_cpus = "1.10.1" tracing = { version = "0.1", default-features = false } diff --git a/client/src/error.rs b/client/src/error.rs index 873b1ce2e8..cd816026e4 100644 --- a/client/src/error.rs +++ b/client/src/error.rs @@ -1,9 +1,11 @@ use authc::AuthClientError; -use common::net::PostError; +use network::{NetworkError, ParticipantError, StreamError}; #[derive(Debug)] pub enum Error { - Network(PostError), + NetworkErr(NetworkError), + ParticipantErr(ParticipantError), + StreamErr(StreamError), ServerWentMad, ServerTimeout, ServerShutdown, @@ -19,8 +21,16 @@ pub enum Error { Other(String), } -impl From for Error { - fn from(err: PostError) -> Self { Self::Network(err) } +impl From for Error { + fn from(err: NetworkError) -> Self { Self::NetworkErr(err) } +} + +impl From for Error { + fn from(err: ParticipantError) -> Self { Self::ParticipantErr(err) } +} + +impl From for Error { + fn from(err: StreamError) -> Self { Self::StreamErr(err) } } impl From for Error { diff --git a/client/src/lib.rs b/client/src/lib.rs index 64a75024f3..4719f2cdd7 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -25,12 +25,12 @@ use common::{ PlayerInfo, PlayerListUpdate, RegisterError, RequestStateError, ServerInfo, ServerMsg, MAX_BYTES_CHAT_MSG, }, - net::PostBox, state::State, sync::{Uid, UidAllocator, WorldSyncExt}, terrain::{block::Block, TerrainChunk, TerrainChunkSize}, vol::RectVolSize, }; +use network::{Network, Participant, Stream, Address, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED}; use hashbrown::HashMap; use image::DynamicImage; use std::{ @@ -38,6 +38,9 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; +use futures_util::{select, FutureExt}; +use futures_executor::block_on; +use futures_timer::Delay; use tracing::{debug, error, warn}; use uvth::{ThreadPool, ThreadPoolBuilder}; use vek::*; @@ -69,7 +72,9 @@ pub struct Client { pub character_list: CharacterList, pub active_character_id: Option, - postbox: PostBox, + _network: Network, + _participant: Arc, + singleton_stream: Stream, last_server_ping: f64, last_server_pong: f64, @@ -99,64 +104,80 @@ impl Client { /// Create a new `Client`. pub fn new>(addr: A, view_distance: Option) -> Result { let client_state = ClientState::Connected; - let mut postbox = PostBox::to(addr)?; + + let mut thread_pool = ThreadPoolBuilder::new() + .name("veloren-worker".into()) + .build(); + // We reduce the thread count by 1 to keep rendering smooth + thread_pool.set_num_threads((num_cpus::get() - 1).max(1)); + + let (network, f) = Network::new(Pid::new(), None); + thread_pool.execute(f); + + let participant = block_on(network.connect(Address::Tcp(addr.into())))?; + let mut stream = block_on(participant.open(10, PROMISES_ORDERED | PROMISES_CONSISTENCY))?; // Wait for initial sync - let (state, entity, server_info, world_map) = match postbox.next_message()? { - ServerMsg::InitialSync { - entity_package, - server_info, - time_of_day, - world_map: (map_size, world_map), - } => { - // TODO: Display that versions don't match in Voxygen - if &server_info.git_hash != *common::util::GIT_HASH { - warn!( - "Server is running {}[{}], you are running {}[{}], versions might be \ + let (state, entity, server_info, world_map) = block_on(async{ + loop { + match stream.recv().await? { + ServerMsg::InitialSync { + entity_package, + server_info, + time_of_day, + world_map: (map_size, world_map), + } => { + // TODO: Display that versions don't match in Voxygen + if &server_info.git_hash != *common::util::GIT_HASH { + warn!( + "Server is running {}[{}], you are running {}[{}], versions might be \ incompatible!", server_info.git_hash, server_info.git_date, common::util::GIT_HASH.to_string(), common::util::GIT_DATE.to_string(), ); - } + } - debug!("Auth Server: {:?}", server_info.auth_provider); + debug!("Auth Server: {:?}", server_info.auth_provider); - // Initialize `State` - let mut state = State::default(); - // Client-only components - state - .ecs_mut() - .register::>(); + // Initialize `State` + let mut state = State::default(); + // Client-only components + state + .ecs_mut() + .register::>(); - let entity = state.ecs_mut().apply_entity_package(entity_package); - *state.ecs_mut().write_resource() = time_of_day; + let entity = state.ecs_mut().apply_entity_package(entity_package); + *state.ecs_mut().write_resource() = time_of_day; - assert_eq!(world_map.len(), (map_size.x * map_size.y) as usize); - let mut world_map_raw = vec![0u8; 4 * world_map.len()/*map_size.x * map_size.y*/]; - LittleEndian::write_u32_into(&world_map, &mut world_map_raw); - debug!("Preparing image..."); - let world_map = Arc::new( - image::DynamicImage::ImageRgba8({ - // Should not fail if the dimensions are correct. - let world_map = - image::ImageBuffer::from_raw(map_size.x, map_size.y, world_map_raw); - world_map.ok_or_else(|| Error::Other("Server sent a bad world map image".into()))? - }) - // Flip the image, since Voxygen uses an orientation where rotation from - // positive x axis to positive y axis is counterclockwise around the z axis. - .flipv(), - ); - debug!("Done preparing image..."); + assert_eq!(world_map.len(), (map_size.x * map_size.y) as usize); + let mut world_map_raw = vec![0u8; 4 * world_map.len()/*map_size.x * map_size.y*/]; + LittleEndian::write_u32_into(&world_map, &mut world_map_raw); + debug!("Preparing image..."); + let world_map = Arc::new( + image::DynamicImage::ImageRgba8({ + // Should not fail if the dimensions are correct. + let world_map = + image::ImageBuffer::from_raw(map_size.x, map_size.y, world_map_raw); + world_map.ok_or_else(|| Error::Other("Server sent a bad world map image".into()))? + }) + // Flip the image, since Voxygen uses an orientation where rotation from + // positive x axis to positive y axis is counterclockwise around the z axis. + .flipv(), + ); + debug!("Done preparing image..."); - (state, entity, server_info, (world_map, map_size)) - }, - ServerMsg::TooManyPlayers => return Err(Error::TooManyPlayers), - _ => return Err(Error::ServerWentMad), - }; + break Ok((state, entity, server_info, (world_map, map_size))) + }, + ServerMsg::TooManyPlayers => break Err(Error::TooManyPlayers), + err => { + warn!("whoops, server mad {:?}, ignoring", err); + }, + }} + })?; - postbox.send_message(ClientMsg::Ping); + stream.send(ClientMsg::Ping)?; let mut thread_pool = ThreadPoolBuilder::new() .name("veloren-worker".into()) @@ -173,7 +194,9 @@ impl Client { character_list: CharacterList::default(), active_character_id: None, - postbox, + _network: network, + _participant: participant, + singleton_stream: stream, last_server_ping: 0.0, last_server_pong: 0.0, @@ -213,33 +236,39 @@ impl Client { } ).unwrap_or(Ok(username))?; - self.postbox.send_message(ClientMsg::Register { + self.singleton_stream.send(ClientMsg::Register { view_distance: self.view_distance, token_or_username, - }); + })?; self.client_state = ClientState::Pending; - loop { - match self.postbox.next_message()? { - ServerMsg::StateAnswer(Err((RequestStateError::RegisterDenied(err), state))) => { - self.client_state = state; - break Err(match err { - RegisterError::AlreadyLoggedIn => Error::AlreadyLoggedIn, - RegisterError::AuthError(err) => Error::AuthErr(err), - RegisterError::InvalidCharacter => Error::InvalidCharacter, - RegisterError::NotOnWhitelist => Error::NotOnWhitelist, - }); - }, - ServerMsg::StateAnswer(Ok(ClientState::Registered)) => break Ok(()), - _ => {}, + block_on( + async { + loop { + match self.singleton_stream.recv().await? { + ServerMsg::StateAnswer(Err((RequestStateError::RegisterDenied(err), state))) => { + self.client_state = state; + break Err(match err { + RegisterError::AlreadyLoggedIn => Error::AlreadyLoggedIn, + RegisterError::AuthError(err) => Error::AuthErr(err), + RegisterError::InvalidCharacter => Error::InvalidCharacter, + }) + }, + ServerMsg::StateAnswer(Ok(ClientState::Registered)) => break Ok(()), + ignore => { + warn!("Ignoring what the server send till registered: {:? }", ignore); + //return Err(Error::ServerWentMad) + }, + } + } } - } + ) } /// Request a state transition to `ClientState::Character`. pub fn request_character(&mut self, character_id: i32) { - self.postbox - .send_message(ClientMsg::Character(character_id)); + self.singleton_stream + .send(ClientMsg::Character(character_id)).unwrap(); self.active_character_id = Some(character_id); self.client_state = ClientState::Pending; @@ -248,73 +277,75 @@ impl Client { /// Load the current players character list pub fn load_character_list(&mut self) { self.character_list.loading = true; - self.postbox.send_message(ClientMsg::RequestCharacterList); + self.singleton_stream.send(ClientMsg::RequestCharacterList).unwrap(); } /// New character creation pub fn create_character(&mut self, alias: String, tool: Option, body: comp::Body) { self.character_list.loading = true; - self.postbox - .send_message(ClientMsg::CreateCharacter { alias, tool, body }); + self.singleton_stream + .send(ClientMsg::CreateCharacter { alias, tool, body }).unwrap(); } /// Character deletion pub fn delete_character(&mut self, character_id: i32) { self.character_list.loading = true; - self.postbox - .send_message(ClientMsg::DeleteCharacter(character_id)); + self.singleton_stream + .send(ClientMsg::DeleteCharacter(character_id)).unwrap(); } /// Send disconnect message to the server - pub fn request_logout(&mut self) { self.postbox.send_message(ClientMsg::Disconnect); } + pub fn request_logout(&mut self) { if let Err(e) = self.singleton_stream.send(ClientMsg::Disconnect) { + error!(?e, "couldn't send disconnect package to server, did server close already?"); + }} /// Request a state transition to `ClientState::Registered` from an ingame /// state. pub fn request_remove_character(&mut self) { - self.postbox.send_message(ClientMsg::ExitIngame); + self.singleton_stream.send(ClientMsg::ExitIngame).unwrap(); self.client_state = ClientState::Pending; } pub fn set_view_distance(&mut self, view_distance: u32) { self.view_distance = Some(view_distance.max(1).min(65)); - self.postbox - .send_message(ClientMsg::SetViewDistance(self.view_distance.unwrap())); + self.singleton_stream + .send(ClientMsg::SetViewDistance(self.view_distance.unwrap())).unwrap(); // Can't fail } pub fn use_slot(&mut self, slot: comp::slot::Slot) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Use(slot), - ))); + ))).unwrap(); } pub fn swap_slots(&mut self, a: comp::slot::Slot, b: comp::slot::Slot) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Swap(a, b), - ))); + ))).unwrap(); } pub fn drop_slot(&mut self, slot: comp::slot::Slot) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Drop(slot), - ))); + ))).unwrap(); } pub fn pick_up(&mut self, entity: EcsEntity) { if let Some(uid) = self.state.ecs().read_storage::().get(entity).copied() { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Pickup(uid), - ))); + ))).unwrap(); } } pub fn toggle_lantern(&mut self) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::ToggleLantern)); + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::ToggleLantern)).unwrap(); } pub fn is_mounted(&self) -> bool { @@ -327,8 +358,8 @@ impl Client { pub fn mount(&mut self, entity: EcsEntity) { if let Some(uid) = self.state.ecs().read_storage::().get(entity).copied() { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::Mount(uid))); + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::Mount(uid))).unwrap(); } } @@ -345,8 +376,8 @@ impl Client { .get(self.entity) .map_or(false, |s| s.is_dead) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::Respawn)); + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::Respawn)).unwrap(); } } @@ -428,8 +459,8 @@ impl Client { { controller.actions.push(control_action); } - self.postbox - .send_message(ClientMsg::ControlAction(control_action)); + self.singleton_stream + .send(ClientMsg::ControlAction(control_action)).unwrap(); } pub fn view_distance(&self) -> Option { self.view_distance } @@ -473,18 +504,18 @@ impl Client { } pub fn place_block(&mut self, pos: Vec3, block: Block) { - self.postbox.send_message(ClientMsg::PlaceBlock(pos, block)); + self.singleton_stream.send(ClientMsg::PlaceBlock(pos, block)).unwrap(); } pub fn remove_block(&mut self, pos: Vec3) { - self.postbox.send_message(ClientMsg::BreakBlock(pos)); + self.singleton_stream.send(ClientMsg::BreakBlock(pos)).unwrap(); } pub fn collect_block(&mut self, pos: Vec3) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Collect(pos), - ))); + ))).unwrap(); } /// Execute a single client tick, handle input and update the game state by @@ -539,8 +570,7 @@ impl Client { "Couldn't access controller component on client entity" ); } - self.postbox - .send_message(ClientMsg::ControllerInputs(inputs)); + self.singleton_stream.send(ClientMsg::ControllerInputs(inputs)).unwrap(); } // 2) Build up a list of events for this frame, to be passed to the frontend. @@ -637,8 +667,7 @@ impl Client { if self.state.terrain().get_key(*key).is_none() { if !skip_mode && !self.pending_chunks.contains_key(key) { if self.pending_chunks.len() < 4 { - self.postbox - .send_message(ClientMsg::TerrainChunkRequest { key: *key }); + self.singleton_stream.send(ClientMsg::TerrainChunkRequest { key: *key })?; self.pending_chunks.insert(*key, Instant::now()); } else { skip_mode = true; @@ -670,7 +699,7 @@ impl Client { // Send a ping to the server once every second if self.state.get_time() - self.last_server_ping > 1. { - self.postbox.send_message(ClientMsg::Ping); + self.singleton_stream.send(ClientMsg::Ping)?; self.last_server_ping = self.state.get_time(); } @@ -681,8 +710,7 @@ impl Client { self.state.read_storage().get(self.entity).cloned(), self.state.read_storage().get(self.entity).cloned(), ) { - self.postbox - .send_message(ClientMsg::PlayerPhysics { pos, vel, ori }); + self.singleton_stream.send(ClientMsg::PlayerPhysics { pos, vel, ori })?; } } @@ -1072,5 +1100,7 @@ impl Client { } impl Drop for Client { - fn drop(&mut self) { self.postbox.send_message(ClientMsg::Disconnect); } + fn drop(&mut self) { if let Err(e) = self.singleton_stream.send(ClientMsg::Disconnect) { + warn!("error during drop of client, couldn't send disconnect package, is the connection already closed? : {}", e); + } } } diff --git a/common/src/lib.rs b/common/src/lib.rs index a53cba2716..d3211b3abf 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -39,30 +39,4 @@ pub mod util; pub mod vol; pub mod volumes; -pub use loadout_builder::LoadoutBuilder; - -/// The networking module containing high-level wrappers of `TcpListener` and -/// `TcpStream` (`PostOffice` and `PostBox` respectively) and data types used by -/// both the server and client. # Examples -/// ``` -/// use std::net::SocketAddr; -/// use veloren_common::net::{PostBox, PostOffice}; -/// -/// let listen_addr = SocketAddr::from(([0, 0, 0, 0], 12345u16)); -/// let conn_addr = SocketAddr::from(([127, 0, 0, 1], 12345u16)); -/// -/// let mut server: PostOffice = PostOffice::bind(listen_addr).unwrap(); -/// let mut client: PostBox = PostBox::to(conn_addr).unwrap(); -/// std::thread::sleep(std::time::Duration::from_millis(100)); -/// -/// let mut scon = server.new_postboxes().next().unwrap(); -/// std::thread::sleep(std::time::Duration::from_millis(100)); -/// -/// scon.send_message(String::from("foo")); -/// client.send_message(String::from("bar")); -/// std::thread::sleep(std::time::Duration::from_millis(100)); -/// -/// assert_eq!("foo", client.next_message().unwrap()); -/// assert_eq!("bar", scon.next_message().unwrap()); -/// ``` -pub mod net; +pub use loadout_builder::LoadoutBuilder; \ No newline at end of file diff --git a/common/src/net/data.rs b/common/src/net/data.rs deleted file mode 100644 index ff16b1fe94..0000000000 --- a/common/src/net/data.rs +++ /dev/null @@ -1,19 +0,0 @@ -/// Messages server sends to client. -#[derive(Deserialize, Serialize, Debug)] -pub enum ServerMsg { - // VersionInfo MUST always stay first in this struct. - VersionInfo {}, -} - -/// Messages client sends to server. -#[derive(Deserialize, Serialize, Debug)] -pub enum ClientMsg { - // VersionInfo MUST always stay first in this struct. - VersionInfo {}, -} - -/// Control message type, used in [PostBox](super::PostBox) and -/// [PostOffice](super::PostOffice) to control threads. -pub enum ControlMsg { - Shutdown, -} diff --git a/common/src/net/mod.rs b/common/src/net/mod.rs deleted file mode 100644 index a27e61bbf2..0000000000 --- a/common/src/net/mod.rs +++ /dev/null @@ -1,14 +0,0 @@ -pub mod data; -//pub mod post; -pub mod post2; - -pub use post2 as post; - -// Reexports -pub use self::{ - data::{ClientMsg, ServerMsg}, - post::{Error as PostError, PostBox, PostOffice}, -}; - -pub trait PostSend = 'static + serde::Serialize + std::marker::Send + std::fmt::Debug; -pub trait PostRecv = 'static + serde::de::DeserializeOwned + std::marker::Send + std::fmt::Debug; diff --git a/common/src/net/post.rs b/common/src/net/post.rs deleted file mode 100644 index 5912e323e9..0000000000 --- a/common/src/net/post.rs +++ /dev/null @@ -1,628 +0,0 @@ -use bincode; -use middleman::Middleman; -use mio::{ - net::{TcpListener, TcpStream}, - Events, Poll, PollOpt, Ready, Token, -}; -use mio_extras::channel::{channel, Receiver, Sender}; -use serde; -use std::{ - collections::VecDeque, - convert::TryFrom, - fmt, - io::{self, Read, Write}, - net::{Shutdown, SocketAddr}, - sync::mpsc::TryRecvError, - thread, - time::Duration, -}; - -#[derive(Clone, Debug, PartialEq)] -pub enum Error { - Disconnect, - Network, - InvalidMsg, - Internal, -} - -impl From for Error { - fn from(err: io::Error) -> Self { Error::Network } -} - -impl From for Error { - fn from(err: TryRecvError) -> Self { Error::Internal } -} - -impl From for Error { - fn from(err: bincode::ErrorKind) -> Self { Error::InvalidMsg } -} - -impl From> for Error { - fn from(err: mio_extras::channel::SendError) -> Self { Error::Internal } -} - -pub trait PostSend = 'static + serde::Serialize + Send + middleman::Message; -pub trait PostRecv = 'static + serde::de::DeserializeOwned + Send + middleman::Message; - -const TCP_TOK: Token = Token(0); -const CTRL_TOK: Token = Token(1); -const POSTBOX_TOK: Token = Token(2); -const SEND_TOK: Token = Token(3); -const RECV_TOK: Token = Token(4); -const MIDDLEMAN_TOK: Token = Token(5); - -const MAX_MSG_BYTES: usize = 1 << 28; - -enum CtrlMsg { - Shutdown, -} - -pub struct PostOffice { - worker: Option>>, - ctrl_tx: Sender, - postbox_rx: Receiver, Error>>, - poll: Poll, - err: Option, -} - -impl PostOffice { - pub fn bind>(addr: A) -> Result { - let tcp_listener = TcpListener::bind(&addr.into())?; - - let (ctrl_tx, ctrl_rx) = channel(); - let (postbox_tx, postbox_rx) = channel(); - - let worker_poll = Poll::new()?; - worker_poll.register(&tcp_listener, TCP_TOK, Ready::readable(), PollOpt::edge())?; - worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?; - - let office_poll = Poll::new()?; - office_poll.register(&postbox_rx, POSTBOX_TOK, Ready::readable(), PollOpt::edge())?; - - let worker = - thread::spawn(move || office_worker(worker_poll, tcp_listener, ctrl_rx, postbox_tx)); - - Ok(Self { - worker: Some(worker), - ctrl_tx, - postbox_rx, - poll: office_poll, - err: None, - }) - } - - pub fn error(&self) -> Option { self.err.clone() } - - pub fn new_connections(&mut self) -> impl ExactSizeIterator> { - let mut conns = VecDeque::new(); - - if self.err.is_some() { - return conns.into_iter(); - } - - let mut events = Events::with_capacity(64); - if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) { - self.err = Some(err.into()); - return conns.into_iter(); - } - - for event in events { - match event.token() { - // Keep reading new postboxes from the channel - POSTBOX_TOK => loop { - match self.postbox_rx.try_recv() { - Ok(Ok(conn)) => conns.push_back(conn), - Err(TryRecvError::Empty) => break, - Err(err) => { - self.err = Some(err.into()); - return conns.into_iter(); - }, - Ok(Err(err)) => { - self.err = Some(err); - return conns.into_iter(); - }, - } - }, - tok => panic!("Unexpected event token '{:?}'", tok), - } - } - - conns.into_iter() - } -} - -impl Drop for PostOffice { - fn drop(&mut self) { - let _ = self.ctrl_tx.send(CtrlMsg::Shutdown); - let _ = self.worker.take().map(|w| w.join()); - } -} - -fn office_worker( - poll: Poll, - tcp_listener: TcpListener, - ctrl_rx: Receiver, - postbox_tx: Sender, Error>>, -) -> Result<(), Error> { - let mut events = Events::with_capacity(64); - loop { - if let Err(err) = poll.poll(&mut events, None) { - postbox_tx.send(Err(err.into()))?; - return Ok(()); - } - - for event in &events { - match event.token() { - CTRL_TOK => loop { - match ctrl_rx.try_recv() { - Ok(CtrlMsg::Shutdown) => return Ok(()), - Err(TryRecvError::Empty) => {}, - Err(err) => { - postbox_tx.send(Err(err.into()))?; - return Ok(()); - }, - } - }, - TCP_TOK => postbox_tx.send(match tcp_listener.accept() { - Ok((stream, _)) => PostBox::from_tcpstream(stream), - Err(err) => Err(err.into()), - })?, - tok => panic!("Unexpected event token '{:?}'", tok), - } - } - } -} - -pub struct PostBox { - worker: Option>>, - ctrl_tx: Sender, - send_tx: Sender, - recv_rx: Receiver>, - poll: Poll, - err: Option, -} - -impl PostBox { - pub fn to_server>(addr: A) -> Result { - Self::from_tcpstream(TcpStream::from_stream(std::net::TcpStream::connect( - &addr.into(), - )?)?) - } - - fn from_tcpstream(tcp_stream: TcpStream) -> Result { - let (ctrl_tx, ctrl_rx) = channel(); - let (send_tx, send_rx) = channel(); - let (recv_tx, recv_rx) = channel(); - - let worker_poll = Poll::new()?; - worker_poll.register( - &tcp_stream, - TCP_TOK, - Ready::readable() | Ready::writable(), - PollOpt::edge(), - )?; - worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?; - worker_poll.register(&send_rx, SEND_TOK, Ready::readable(), PollOpt::edge())?; - - let postbox_poll = Poll::new()?; - postbox_poll.register(&recv_rx, RECV_TOK, Ready::readable(), PollOpt::edge())?; - - let worker = thread::spawn(move || { - postbox_worker(worker_poll, tcp_stream, ctrl_rx, send_rx, recv_tx) - }); - - Ok(Self { - worker: Some(worker), - ctrl_tx, - send_tx, - recv_rx, - poll: postbox_poll, - err: None, - }) - } - - pub fn error(&self) -> Option { self.err.clone() } - - pub fn send(&mut self, data: S) { let _ = self.send_tx.send(data); } - - // TODO: This method is super messy. - pub fn next_message(&mut self) -> Option { - if self.err.is_some() { - return None; - } - - loop { - let mut events = Events::with_capacity(10); - if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) { - self.err = Some(err.into()); - return None; - } - - for event in events { - match event.token() { - // Keep reading new messages from the channel - RECV_TOK => loop { - match self.recv_rx.try_recv() { - Ok(Ok(msg)) => return Some(msg), - Err(TryRecvError::Empty) => break, - Err(err) => { - self.err = Some(err.into()); - return None; - }, - Ok(Err(err)) => { - self.err = Some(err); - return None; - }, - } - }, - tok => panic!("Unexpected event token '{:?}'", tok), - } - } - } - } - - pub fn new_messages(&mut self) -> impl ExactSizeIterator { - let mut msgs = VecDeque::new(); - - if self.err.is_some() { - return msgs.into_iter(); - } - - let mut events = Events::with_capacity(64); - if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) { - self.err = Some(err.into()); - return msgs.into_iter(); - } - - for event in events { - match event.token() { - // Keep reading new messages from the channel - RECV_TOK => loop { - match self.recv_rx.try_recv() { - Ok(Ok(msg)) => msgs.push_back(msg), - Err(TryRecvError::Empty) => break, - Err(err) => { - self.err = Some(err.into()); - return msgs.into_iter(); - }, - Ok(Err(err)) => { - self.err = Some(err); - return msgs.into_iter(); - }, - } - }, - tok => panic!("Unexpected event token '{:?}'", tok), - } - } - - msgs.into_iter() - } -} - -impl Drop for PostBox { - fn drop(&mut self) { - let _ = self.ctrl_tx.send(CtrlMsg::Shutdown); - let _ = self.worker.take().map(|w| w.join()); - } -} - -fn postbox_worker( - poll: Poll, - mut tcp_stream: TcpStream, - ctrl_rx: Receiver, - send_rx: Receiver, - recv_tx: Sender>, -) -> Result<(), Error> { - fn try_tcp_send( - tcp_stream: &mut TcpStream, - chunks: &mut VecDeque>, - ) -> Result<(), Error> { - loop { - let chunk = match chunks.pop_front() { - Some(chunk) => chunk, - None => break, - }; - - match tcp_stream.write_all(&chunk) { - Ok(()) => {}, - Err(err) if err.kind() == io::ErrorKind::WouldBlock => { - chunks.push_front(chunk); - break; - }, - Err(err) => { - println!("Error: {:?}", err); - return Err(err.into()); - }, - } - } - - Ok(()) - } - - enum RecvState { - ReadHead(Vec), - ReadBody(usize, Vec), - } - - let mut recv_state = RecvState::ReadHead(Vec::new()); - let mut chunks = VecDeque::new(); - - //let mut recv_state = RecvState::ReadHead(Vec::with_capacity(8)); - let mut events = Events::with_capacity(64); - - 'work: loop { - if let Err(err) = poll.poll(&mut events, None) { - recv_tx.send(Err(err.into()))?; - break 'work; - } - - for event in &events { - match event.token() { - CTRL_TOK => match ctrl_rx.try_recv() { - Ok(CtrlMsg::Shutdown) => { - break 'work; - }, - Err(TryRecvError::Empty) => (), - Err(err) => { - recv_tx.send(Err(err.into()))?; - break 'work; - }, - }, - SEND_TOK => loop { - match send_rx.try_recv() { - Ok(outgoing_msg) => { - let mut msg_bytes = match bincode::serialize(&outgoing_msg) { - Ok(bytes) => bytes, - Err(err) => { - recv_tx.send(Err((*err).into()))?; - break 'work; - }, - }; - - let mut bytes = msg_bytes.len().to_le_bytes().as_ref().to_vec(); - bytes.append(&mut msg_bytes); - - bytes - .chunks(1024) - .map(|chunk| chunk.to_vec()) - .for_each(|chunk| chunks.push_back(chunk)); - - match try_tcp_send(&mut tcp_stream, &mut chunks) { - Ok(_) => {}, - Err(err) => { - recv_tx.send(Err(err.into()))?; - return Err(Error::Network); - }, - } - }, - Err(TryRecvError::Empty) => break, - Err(err) => Err(err)?, - } - }, - TCP_TOK => { - loop { - // Check TCP error - match tcp_stream.take_error() { - Ok(None) => {}, - Ok(Some(err)) => { - recv_tx.send(Err(err.into()))?; - break 'work; - }, - Err(err) => { - recv_tx.send(Err(err.into()))?; - break 'work; - }, - } - match &mut recv_state { - RecvState::ReadHead(head) => { - if head.len() == 8 { - let len = usize::from_le_bytes( - <[u8; 8]>::try_from(head.as_slice()).unwrap(), - ); - if len > MAX_MSG_BYTES { - println!("TOO BIG! {:x}", len); - recv_tx.send(Err(Error::InvalidMsg))?; - break 'work; - } else if len == 0 { - recv_state = RecvState::ReadHead(Vec::with_capacity(8)); - break; - } else { - recv_state = RecvState::ReadBody(len, Vec::new()); - } - } else { - let mut b = [0; 1]; - match tcp_stream.read(&mut b) { - Ok(0) => {}, - Ok(_) => head.push(b[0]), - Err(_) => break, - } - } - }, - RecvState::ReadBody(len, body) => { - if body.len() == *len { - match bincode::deserialize(&body) { - Ok(msg) => { - recv_tx.send(Ok(msg))?; - recv_state = RecvState::ReadHead(Vec::with_capacity(8)); - }, - Err(err) => { - recv_tx.send(Err((*err).into()))?; - break 'work; - }, - } - } else { - let left = *len - body.len(); - let mut buf = vec![0; left]; - match tcp_stream.read(&mut buf) { - Ok(_) => body.append(&mut buf), - Err(err) => { - recv_tx.send(Err(err.into()))?; - break 'work; - }, - } - } - }, - } - } - - // Now, try sending TCP stuff - match try_tcp_send(&mut tcp_stream, &mut chunks) { - Ok(_) => {}, - Err(err) => { - recv_tx.send(Err(err.into()))?; - return Err(Error::Network); - }, - } - }, - tok => panic!("Unexpected event token '{:?}'", tok), - } - } - } - - //tcp_stream.shutdown(Shutdown::Both)?; - Ok(()) -} - -// TESTS - -/* -#[derive(Serialize, Deserialize)] -struct TestMsg(T); - -#[test] -fn connect() { - let srv_addr = ([127, 0, 0, 1], 12345); - - let mut postoffice = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - - // We should start off with 0 incoming connections. - thread::sleep(Duration::from_millis(250)); - assert_eq!(postoffice.new_connections().len(), 0); - assert_eq!(postoffice.error(), None); - - let postbox = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); - - // Now a postbox has been created, we should have 1 new. - thread::sleep(Duration::from_millis(250)); - let incoming = postoffice.new_connections(); - assert_eq!(incoming.len(), 1); - assert_eq!(postoffice.error(), None); -} - -#[test] -fn connect_fail() { - let listen_addr = ([0; 4], 12345); - let connect_addr = ([127, 0, 0, 1], 12212); - - let mut postoffice = PostOffice::, TestMsg>::bind(listen_addr).unwrap(); - - // We should start off with 0 incoming connections. - thread::sleep(Duration::from_millis(250)); - assert_eq!(postoffice.new_connections().len(), 0); - assert_eq!(postoffice.error(), None); - - assert!(PostBox::, TestMsg>::to_server(connect_addr).is_err()); -} - -#[test] -fn connection_count() { - let srv_addr = ([127, 0, 0, 1], 12346); - - let mut postoffice = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - let mut postboxes = Vec::new(); - - // We should start off with 0 incoming connections. - thread::sleep(Duration::from_millis(250)); - assert_eq!(postoffice.new_connections().len(), 0); - assert_eq!(postoffice.error(), None); - - for _ in 0..5 { - postboxes.push(PostBox::, TestMsg>::to_server(srv_addr).unwrap()); - } - - // 5 postboxes created, we should have 5. - thread::sleep(Duration::from_millis(3500)); - let incoming = postoffice.new_connections(); - assert_eq!(incoming.len(), 5); - assert_eq!(postoffice.error(), None); -} - -#[test] -fn disconnect() { - let srv_addr = ([127, 0, 0, 1], 12347); - - let mut postoffice = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - - let mut server_postbox = { - let mut client_postbox = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); - - thread::sleep(Duration::from_millis(250)); - let mut incoming = postoffice.new_connections(); - assert_eq!(incoming.len(), 1); - assert_eq!(postoffice.error(), None); - - incoming.next().unwrap() - }; - - // The client postbox has since been disconnected. - thread::sleep(Duration::from_millis(2050)); - let incoming_msgs = server_postbox.new_messages(); - assert_eq!(incoming_msgs.len(), 0); - // TODO - // assert_eq!(server_postbox.error(), Some(Error::Disconnect)); -} - -#[test] -fn client_to_server() { - let srv_addr = ([127, 0, 0, 1], 12348); - - let mut po = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - - let mut client_pb = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); - - thread::sleep(Duration::from_millis(250)); - - let mut server_pb = po.new_connections().next().unwrap(); - - client_pb.send(TestMsg(1337.0)); - client_pb.send(TestMsg(9821.0)); - client_pb.send(TestMsg(-3.2)); - client_pb.send(TestMsg(17.0)); - - thread::sleep(Duration::from_millis(250)); - - let mut incoming_msgs = server_pb.new_messages(); - assert_eq!(incoming_msgs.len(), 4); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(1337.0)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(9821.0)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(-3.2)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(17.0)); -} - -#[test] -fn server_to_client() { - let srv_addr = ([127, 0, 0, 1], 12349); - - let mut po = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - - let mut client_pb = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); - - thread::sleep(Duration::from_millis(250)); - - let mut server_pb = po.new_connections().next().unwrap(); - - server_pb.send(TestMsg(1337)); - server_pb.send(TestMsg(9821)); - server_pb.send(TestMsg(39999999)); - server_pb.send(TestMsg(17)); - - thread::sleep(Duration::from_millis(250)); - - let mut incoming_msgs = client_pb.new_messages(); - assert_eq!(incoming_msgs.len(), 4); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(1337)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(9821)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(39999999)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(17)); -} -*/ diff --git a/common/src/net/post2.rs b/common/src/net/post2.rs deleted file mode 100644 index e4ba5b31cb..0000000000 --- a/common/src/net/post2.rs +++ /dev/null @@ -1,439 +0,0 @@ -use crossbeam::channel; -use serde::{de::DeserializeOwned, Serialize}; -use std::{ - collections::VecDeque, - convert::TryFrom, - io::{self, Read, Write}, - marker::PhantomData, - net::{Shutdown, SocketAddr, TcpListener, TcpStream}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread, - time::Duration, -}; -use tracing::warn; - -#[derive(Clone, Debug)] -pub enum Error { - Io(Arc), - Bincode(Arc), - ChannelFailure, - InvalidMessage, -} - -impl From for Error { - fn from(err: io::Error) -> Self { Error::Io(Arc::new(err)) } -} - -impl From for Error { - fn from(err: bincode::Error) -> Self { Error::Bincode(Arc::new(err)) } -} - -impl From for Error { - fn from(_error: channel::TryRecvError) -> Self { Error::ChannelFailure } -} - -pub trait PostMsg = Serialize + DeserializeOwned + 'static + Send; - -const MAX_MSG_SIZE: usize = 1 << 28; - -pub struct PostOffice { - listener: TcpListener, - error: Option, - phantom: PhantomData<(S, R)>, -} - -impl PostOffice { - pub fn bind>(addr: A) -> Result { - let listener = TcpListener::bind(addr.into())?; - listener.set_nonblocking(true)?; - - Ok(Self { - listener, - error: None, - phantom: PhantomData, - }) - } - - pub fn error(&self) -> Option { self.error.clone() } - - pub fn new_postboxes(&mut self) -> impl ExactSizeIterator> { - let mut new = Vec::new(); - - if self.error.is_some() { - return new.into_iter(); - } - - loop { - match self.listener.accept() { - Ok((stream, _sock)) => new.push(PostBox::from_stream(stream).unwrap()), - Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, - Err(e) if e.kind() == io::ErrorKind::Interrupted => {}, - Err(e) => { - self.error = Some(e.into()); - break; - }, - } - } - - new.into_iter() - } -} - -pub struct PostBox { - send_tx: channel::Sender, - recv_rx: channel::Receiver>, - worker: Option>, - running: Arc, - error: Option, -} - -impl PostBox { - pub fn to>(addr: A) -> Result { - Self::from_stream(TcpStream::connect(addr.into())?) - } - - fn from_stream(stream: TcpStream) -> Result { - stream.set_nonblocking(true)?; - - let running = Arc::new(AtomicBool::new(true)); - let worker_running = running.clone(); - - let (send_tx, send_rx) = channel::unbounded(); - let (recv_tx, recv_rx) = channel::unbounded(); - - let worker = thread::spawn(move || Self::worker(stream, send_rx, recv_tx, worker_running)); - - Ok(Self { - send_tx, - recv_rx, - worker: Some(worker), - running, - error: None, - }) - } - - pub fn error(&self) -> Option { self.error.clone() } - - pub fn send_message(&mut self, msg: S) { let _ = self.send_tx.send(msg); } - - pub fn next_message(&mut self) -> Result { - if let Some(e) = self.error.clone() { - return Err(e); - } - - match self.recv_rx.recv().map_err(|_| Error::ChannelFailure)? { - Ok(msg) => Ok(msg), - Err(e) => { - self.error = Some(e.clone()); - Err(e) - }, - } - } - - pub fn new_messages(&mut self) -> impl ExactSizeIterator { - let mut new = Vec::new(); - - if self.error.is_some() { - return new.into_iter(); - } - - loop { - match self.recv_rx.try_recv() { - Ok(Ok(msg)) => new.push(msg), - Err(channel::TryRecvError::Empty) => break, - Err(e) => { - self.error = Some(e.into()); - break; - }, - Ok(Err(e)) => { - self.error = Some(e); - break; - }, - } - } - - new.into_iter() - } - - fn worker( - mut stream: TcpStream, - send_rx: channel::Receiver, - recv_tx: channel::Sender>, - running: Arc, - ) { - let mut outgoing_chunks = VecDeque::new(); - let mut incoming_buf = Vec::new(); - - 'work: while running.load(Ordering::Relaxed) { - for _ in 0..30 { - // Get stream errors. - match stream.take_error() { - Ok(Some(e)) | Err(e) => { - recv_tx.send(Err(e.into())).unwrap(); - break 'work; - }, - Ok(None) => {}, - } - - // Try getting messages from the send channel. - for _ in 0..1000 { - match send_rx.try_recv() { - Ok(send_msg) => { - // Serialize message - let msg_bytes = bincode::serialize(&send_msg).unwrap(); - let mut msg_bytes = lz4_compress::compress(&msg_bytes); - - /* - if msg_bytes.len() > 512 { - println!("MSG SIZE: {}", msg_bytes.len()); - } - */ - - // Assemble into packet. - let mut packet_bytes = - (msg_bytes.len() as u64).to_le_bytes().as_ref().to_vec(); - packet_bytes.push(msg_bytes.iter().fold(0, |a, x| a ^ *x)); - packet_bytes.append(&mut msg_bytes); - - // Split packet into chunks. - packet_bytes - .chunks(4096) - .map(|chunk| chunk.to_vec()) - .for_each(|chunk| outgoing_chunks.push_back(chunk)) - }, - Err(channel::TryRecvError::Empty) => break, - // Worker error - Err(e) => { - let _ = recv_tx.send(Err(e.into())); - break 'work; - }, - } - } - - // Try sending bytes through the TCP stream. - for _ in 0..1000 { - match outgoing_chunks.pop_front() { - Some(mut chunk) => match stream.write(&chunk) { - Ok(n) if n == chunk.len() => {}, - Ok(n) => { - outgoing_chunks.push_front(chunk.split_off(n)); - break; - }, - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - // Return chunk to the queue to try again later. - outgoing_chunks.push_front(chunk); - break; - }, - // Worker error - Err(e) => { - recv_tx.send(Err(e.into())).unwrap(); - break 'work; - }, - }, - None => break, - } - } - - // Try receiving bytes from the TCP stream. - for _ in 0..1000 { - let mut buf = [0; 4096]; - - match stream.read(&mut buf) { - Ok(n) => incoming_buf.extend_from_slice(&buf[0..n]), - Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, - Err(e) if e.kind() == io::ErrorKind::Interrupted => {}, - // Worker error - Err(e) => { - recv_tx.send(Err(e.into())).unwrap(); - break 'work; - }, - } - } - - // Try turning bytes into messages. - for _ in 0..1000 { - match incoming_buf.get(0..9) { - Some(len_bytes) => { - let len = - u64::from_le_bytes(<[u8; 8]>::try_from(&len_bytes[0..8]).unwrap()) - as usize; // Can't fail - - if len > MAX_MSG_SIZE { - recv_tx.send(Err(Error::InvalidMessage)).unwrap(); - break 'work; - } else if incoming_buf.len() >= len + 9 { - let checksum_found = - incoming_buf[9..len + 9].iter().fold(0, |a, x| a ^ *x); - let checksum_expected = len_bytes[8]; - - assert_eq!( - checksum_found, checksum_expected, - "Message checksum failed!" - ); - - let msg_bytes = - lz4_compress::decompress(&incoming_buf[9..len + 9]).unwrap(); - - match bincode::deserialize(&msg_bytes) { - Ok(msg) => recv_tx.send(Ok(msg)).unwrap(), - Err(err) => { - println!("BINCODE ERROR: {:?}", err); - recv_tx.send(Err(err.into())).unwrap() - }, - } - - incoming_buf = incoming_buf.split_off(len + 9); - } else { - break; - } - }, - None => break, - } - } - } - - thread::sleep(Duration::from_millis(10)); - } - - if let Err(err) = stream.shutdown(Shutdown::Both) { - warn!("TCP worker stream shutdown failed: {:?}", err); - } - } -} - -impl Drop for PostBox { - fn drop(&mut self) { - self.running.store(false, Ordering::Relaxed); - self.worker.take().map(|handle| handle.join()); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::time::Instant; - - fn create_postoffice( - id: u16, - ) -> Result<(PostOffice, SocketAddr), Error> { - let sock = ([0; 4], 12345 + id).into(); - Ok((PostOffice::bind(sock)?, sock)) - } - - fn loop_for(duration: Duration, mut f: F) { - let start = Instant::now(); - while start.elapsed() < duration { - f(); - } - } - - #[test] - fn connect() { - let (mut postoffice, bound) = create_postoffice::<(), ()>(0).unwrap(); - let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port()); - - let _client0 = PostBox::<(), ()>::to(sock).unwrap(); - let _client1 = PostBox::<(), ()>::to(sock).unwrap(); - let _client2 = PostBox::<(), ()>::to(sock).unwrap(); - - let mut new_clients = 0; - loop_for(Duration::from_millis(250), || { - new_clients += postoffice.new_postboxes().count(); - }); - - assert_eq!(new_clients, 3); - } - - /* - #[test] - fn disconnect() { - let (mut postoffice, sock) = create_postoffice::<(), ()>(1).unwrap(); - - let mut client = PostBox::::to(sock).unwrap(); - loop_for(Duration::from_millis(250), || ()); - let mut server = postoffice.new_postboxes().unwrap().next().unwrap(); - - drop(client); - loop_for(Duration::from_millis(300), || ()); - - assert!(server.new_messages().is_err()); - } - */ - - #[test] - fn send_recv() { - let (mut postoffice, bound) = create_postoffice::<(), i32>(2).unwrap(); - let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port()); - let test_msgs = vec![1, 1337, 42, -48]; - - let mut client = PostBox::::to(sock).unwrap(); - loop_for(Duration::from_millis(250), || ()); - let mut server = postoffice.new_postboxes().next().unwrap(); - - for msg in &test_msgs { - client.send_message(*msg); - } - - let mut recv_msgs = Vec::new(); - loop_for(Duration::from_millis(250), || { - server.new_messages().for_each(|msg| recv_msgs.push(msg)) - }); - - assert_eq!(test_msgs, recv_msgs); - } - - #[test] - #[ignore] - fn send_recv_huge() { - let (mut postoffice, bound) = create_postoffice::<(), Vec>(3).unwrap(); - let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port()); - let test_msgs: Vec> = (0..5) - .map(|i| (0..100000).map(|j| i * 2 + j).collect()) - .collect(); - - let mut client = PostBox::, ()>::to(sock).unwrap(); - loop_for(Duration::from_millis(250), || ()); - let mut server = postoffice.new_postboxes().next().unwrap(); - - for msg in &test_msgs { - client.send_message(msg.clone()); - } - - let mut recv_msgs = Vec::new(); - loop_for(Duration::from_millis(3000), || { - server.new_messages().for_each(|msg| recv_msgs.push(msg)) - }); - - assert_eq!(test_msgs.len(), recv_msgs.len()); - assert!(test_msgs == recv_msgs); - } - - #[test] - fn send_recv_both() { - let (mut postoffice, bound) = create_postoffice::(4).unwrap(); - let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port()); - let mut client = PostBox::::to(sock).unwrap(); - loop_for(Duration::from_millis(250), || ()); - let mut server = postoffice.new_postboxes().next().unwrap(); - - let test_msgs = vec![ - (0xDEADBEAD, 0xBEEEEEEF), - (0x1BADB002, 0xBAADF00D), - (0xBAADA555, 0xC0DED00D), - (0xCAFEBABE, 0xDEADC0DE), - ]; - - for (to, from) in test_msgs { - client.send_message(to); - server.send_message(from); - - loop_for(Duration::from_millis(250), || ()); - - assert_eq!(client.new_messages().next().unwrap(), from); - assert_eq!(server.new_messages().next().unwrap(), to); - } - } -} diff --git a/server/Cargo.toml b/server/Cargo.toml index 0a33b95e71..ade46b9c6c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,6 +11,7 @@ default = ["worldgen"] [dependencies] common = { package = "veloren-common", path = "../common" } world = { package = "veloren-world", path = "../world" } +network = { package = "veloren_network", path = "../network", default-features = false } specs-idvs = { git = "https://gitlab.com/veloren/specs-idvs.git" } @@ -18,6 +19,9 @@ tracing = "0.1" specs = { version = "0.15.1", features = ["shred-derive"] } vek = "0.11.0" uvth = "3.1.1" +futures-util = "0.3" +futures-executor = "0.3" +futures-timer = "3.0" lazy_static = "1.4.0" scan_fmt = "0.2.4" ron = { version = "0.6", default-features = false } diff --git a/server/src/client.rs b/server/src/client.rs index 994a5126ff..e02a48392d 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,7 +1,7 @@ use common::{ - msg::{ClientMsg, ClientState, RequestStateError, ServerMsg}, - net::PostBox, + msg::{ClientState, RequestStateError, ServerMsg}, }; +use network::Stream; use hashbrown::HashSet; use specs::{Component, FlaggedStorage}; use specs_idvs::IDVStorage; @@ -9,7 +9,7 @@ use vek::*; pub struct Client { pub client_state: ClientState, - pub postbox: PostBox, + pub singleton_stream: std::sync::Mutex, pub last_ping: f64, pub login_msg_sent: bool, } @@ -19,7 +19,7 @@ impl Component for Client { } impl Client { - pub fn notify(&mut self, msg: ServerMsg) { self.postbox.send_message(msg); } + pub fn notify(&mut self, msg: ServerMsg) { self.singleton_stream.lock().unwrap().send(msg); } pub fn is_registered(&self) -> bool { match self.client_state { @@ -37,13 +37,13 @@ impl Client { pub fn allow_state(&mut self, new_state: ClientState) { self.client_state = new_state; - self.postbox - .send_message(ServerMsg::StateAnswer(Ok(new_state))); + self.singleton_stream + .lock().unwrap().send(ServerMsg::StateAnswer(Ok(new_state))); } pub fn error_state(&mut self, error: RequestStateError) { - self.postbox - .send_message(ServerMsg::StateAnswer(Err((error, self.client_state)))); + self.singleton_stream + .lock().unwrap().send(ServerMsg::StateAnswer(Err((error, self.client_state)))); } } diff --git a/server/src/error.rs b/server/src/error.rs index e98ec644f7..a231b3ec34 100644 --- a/server/src/error.rs +++ b/server/src/error.rs @@ -1,11 +1,21 @@ -use common::net::PostError; +use network::{NetworkError, ParticipantError, StreamError}; #[derive(Debug)] pub enum Error { - Network(PostError), + NetworkErr(NetworkError), + ParticipantErr(ParticipantError), + StreamErr(StreamError), Other(String), } -impl From for Error { - fn from(err: PostError) -> Self { Error::Network(err) } +impl From for Error { + fn from(err: NetworkError) -> Self { Error::NetworkErr(err) } +} + +impl From for Error { + fn from(err: ParticipantError) -> Self { Error::ParticipantErr(err) } +} + +impl From for Error { + fn from(err: StreamError) -> Self { Error::StreamErr(err) } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 288537277a..898004616d 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -31,13 +31,13 @@ use common::{ cmd::ChatCommand, comp::{self, ChatType}, event::{EventBus, ServerEvent}, - msg::{ClientMsg, ClientState, ServerInfo, ServerMsg}, - net::PostOffice, + msg::{ClientState, ServerInfo, ServerMsg}, state::{State, TimeOfDay}, sync::WorldSyncExt, terrain::TerrainChunkSize, vol::{ReadVol, RectVolSize}, }; +use network::{Network, Address, Pid}; use metrics::{ServerMetrics, TickMetrics}; use persistence::character::{CharacterLoader, CharacterLoaderResponseType, CharacterUpdater}; use specs::{join::Join, Builder, Entity as EcsEntity, RunNow, SystemData, WorldExt}; @@ -47,6 +47,9 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; +use futures_util::{select, FutureExt}; +use futures_executor::block_on; +use futures_timer::Delay; #[cfg(not(feature = "worldgen"))] use test_world::{World, WORLD_SIZE}; use tracing::{debug, error, info}; @@ -77,7 +80,7 @@ pub struct Server { world: Arc, map: Vec, - postoffice: PostOffice, + network: Network, thread_pool: ThreadPool, @@ -233,16 +236,19 @@ impl Server { .run(settings.metrics_address) .expect("Failed to initialize server metrics submodule."); + let thread_pool = ThreadPoolBuilder::new().name("veloren-worker".to_string()).build(); + let (network, f) = Network::new(Pid::new(), None); + thread_pool.execute(f); + block_on(network.listen(Address::Tcp(settings.gameserver_address)))?; + let this = Self { state, world: Arc::new(world), map, - postoffice: PostOffice::bind(settings.gameserver_address)?, + network, - thread_pool: ThreadPoolBuilder::new() - .name("veloren-worker".into()) - .build(), + thread_pool, metrics, tick_metrics, @@ -329,17 +335,18 @@ impl Server { // 1) Build up a list of events for this frame, to be passed to the frontend. let mut frontend_events = Vec::new(); - // If networking has problems, handle them. - if let Some(err) = self.postoffice.error() { - return Err(err.into()); - } - // 2) let before_new_connections = Instant::now(); // 3) Handle inputs from clients - frontend_events.append(&mut self.handle_new_connections()?); + block_on(async{ + //TIMEOUT 0.01 ms for msg handling + let x = select!( + _ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()), + err = self.handle_new_connections(&mut frontend_events).fuse() => err, + ); + }); let before_message_system = Instant::now(); @@ -582,13 +589,14 @@ impl Server { } /// Handle new client connections. - fn handle_new_connections(&mut self) -> Result, Error> { - let mut frontend_events = Vec::new(); + async fn handle_new_connections(&mut self, frontend_events: &mut Vec) -> Result<(), Error> { + loop { + let participant = self.network.connected().await?; + let singleton_stream = participant.opened().await?; - for postbox in self.postoffice.new_postboxes() { let mut client = Client { client_state: ClientState::Connected, - postbox, + singleton_stream: std::sync::Mutex::new(singleton_stream), last_ping: self.state.get_time(), login_msg_sent: false, }; @@ -626,8 +634,6 @@ impl Server { frontend_events.push(Event::ClientConnected { entity }); } } - - Ok(frontend_events) } pub fn notify_client(&self, entity: EcsEntity, msg: S) diff --git a/server/src/sys/waypoint.rs b/server/src/sys/waypoint.rs index c7b3ebbfb7..7ee8ff1253 100644 --- a/server/src/sys/waypoint.rs +++ b/server/src/sys/waypoint.rs @@ -41,8 +41,7 @@ impl<'a> System<'a> for Sys { { if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) { client - .postbox - .send_message(ServerMsg::Notification(Notification::WaypointSaved)); + .notify(ServerMsg::Notification(Notification::WaypointSaved)); } } } diff --git a/voxygen/src/menu/main/client_init.rs b/voxygen/src/menu/main/client_init.rs index 02c171ac5b..455ced5066 100644 --- a/voxygen/src/menu/main/client_init.rs +++ b/voxygen/src/menu/main/client_init.rs @@ -1,5 +1,4 @@ use client::{error::Error as ClientError, Client}; -use common::net::PostError; use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError}; use std::{ net::ToSocketAddrs, @@ -98,12 +97,10 @@ impl ClientInit { }, Err(err) => { match err { - ClientError::Network(PostError::Bincode(_)) => { + ClientError::NetworkErr(_) => { last_err = Some(Error::ClientError(err)); break 'tries; - }, - // Assume the connection failed and try again soon - ClientError::Network(_) => {}, + } // Non-connection error, stop attempts err => { last_err = Some(Error::ClientError(err)); diff --git a/voxygen/src/menu/main/mod.rs b/voxygen/src/menu/main/mod.rs index 0cdc9b6e17..cdd9410545 100644 --- a/voxygen/src/menu/main/mod.rs +++ b/voxygen/src/menu/main/mod.rs @@ -109,7 +109,17 @@ impl PlayState for MainMenuState { client::Error::NotOnWhitelist => { localized_strings.get("main.login.not_on_whitelist").into() }, - client::Error::Network(e) => format!( + client::Error::NetworkErr(e) => format!( + "{}: {:?}", + localized_strings.get("main.login.network_error"), + e + ), + client::Error::ParticipantErr(e) => format!( + "{}: {:?}", + localized_strings.get("main.login.network_error"), + e + ), + client::Error::StreamErr(e) => format!( "{}: {:?}", localized_strings.get("main.login.network_error"), e diff --git a/voxygen/src/singleplayer.rs b/voxygen/src/singleplayer.rs index aee967caf4..7184d39dd5 100644 --- a/voxygen/src/singleplayer.rs +++ b/voxygen/src/singleplayer.rs @@ -47,14 +47,14 @@ impl Singleplayer { let paused = Arc::new(AtomicBool::new(false)); let paused1 = paused.clone(); + let server = Server::new(settings2).expect("Failed to create server instance!"); + + let server = match thread_pool { + Some(pool) => server.with_thread_pool(pool), + None => server, + }; + let thread = thread::spawn(move || { - let server = Server::new(settings2).expect("Failed to create server instance!"); - - let server = match thread_pool { - Some(pool) => server.with_thread_pool(pool), - None => server, - }; - run_server(server, receiver, paused1); }); From 4e92c0160e7c36478d4e764e7c7724bb0fa05f71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Wed, 1 Jul 2020 11:45:39 +0200 Subject: [PATCH 2/3] network layer switch, doing the stuff that might confict. - mostly its the message handling put now in a async wrapper - add some fixes to pass without warnings and make clippy happy - some network error handling is ignored, this can be improved but is no blocker --- client/src/error.rs | 3 +- client/src/lib.rs | 429 ++++++++-------- server/src/client.rs | 6 +- server/src/lib.rs | 6 +- server/src/sys/message.rs | 730 +++++++++++++++------------ voxygen/src/menu/main/client_init.rs | 7 +- 6 files changed, 627 insertions(+), 554 deletions(-) diff --git a/client/src/error.rs b/client/src/error.rs index cd816026e4..2321f5944f 100644 --- a/client/src/error.rs +++ b/client/src/error.rs @@ -1,5 +1,6 @@ use authc::AuthClientError; -use network::{NetworkError, ParticipantError, StreamError}; +use network::{ParticipantError, StreamError}; +pub use network::NetworkError; #[derive(Debug)] pub enum Error { diff --git a/client/src/lib.rs b/client/src/lib.rs index 4719f2cdd7..c5d6df2c60 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -252,6 +252,7 @@ impl Client { RegisterError::AlreadyLoggedIn => Error::AlreadyLoggedIn, RegisterError::AuthError(err) => Error::AuthErr(err), RegisterError::InvalidCharacter => Error::InvalidCharacter, + RegisterError::NotOnWhitelist => Error::NotOnWhitelist, }) }, ServerMsg::StateAnswer(Ok(ClientState::Registered)) => break Ok(()), @@ -364,8 +365,7 @@ impl Client { } pub fn unmount(&mut self) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::Unmount)); + self.singleton_stream.send(ClientMsg::ControlEvent(ControlEvent::Unmount)).unwrap(); } pub fn respawn(&mut self) { @@ -489,7 +489,7 @@ impl Client { /// Send a chat message to the server. pub fn send_chat(&mut self, message: String) { match validate_chat_msg(&message) { - Ok(()) => self.postbox.send_message(ClientMsg::ChatMsg(message)), + Ok(()) => self.singleton_stream.send(ClientMsg::ChatMsg(message)).unwrap(), Err(ChatMsgValidationError::TooLong) => tracing::warn!( "Attempted to send a message that's too long (Over {} bytes)", MAX_BYTES_CHAT_MSG @@ -737,8 +737,213 @@ impl Client { self.state.cleanup(); } - /// Handle new server messages. + async fn handle_message(&mut self, frontend_events: &mut Vec, cnt: &mut u64) -> Result<(), Error> { + loop { + let msg = self.singleton_stream.recv().await?; + *cnt += 1; + match msg { + ServerMsg::TooManyPlayers => { + return Err(Error::ServerWentMad); + }, + ServerMsg::Shutdown => return Err(Error::ServerShutdown), + ServerMsg::InitialSync { .. } => return Err(Error::ServerWentMad), + ServerMsg::PlayerListUpdate(PlayerListUpdate::Init(list)) => { + self.player_list = list + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::Add(uid, player_info)) => { + if let Some(old_player_info) = + self.player_list.insert(uid, player_info.clone()) + { + warn!( + "Received msg to insert {} with uid {} into the player list but \ + there was already an entry for {} with the same uid that was \ + overwritten!", + player_info.player_alias, uid, old_player_info.player_alias + ); + } + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::Admin(uid, admin)) => { + if let Some(player_info) = self.player_list.get_mut(&uid) { + player_info.is_admin = admin; + } else { + warn!( + "Received msg to update admin status of uid {}, but they were not \ + in the list.", + uid + ); + } + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::SelectedCharacter( + uid, + char_info, + )) => { + if let Some(player_info) = self.player_list.get_mut(&uid) { + player_info.character = Some(char_info); + } else { + warn!( + "Received msg to update character info for uid {}, but they were \ + not in the list.", + uid + ); + } + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::LevelChange(uid, next_level)) => { + if let Some(player_info) = self.player_list.get_mut(&uid) { + player_info.character = match &player_info.character { + Some(character) => Some(common::msg::CharacterInfo { + name: character.name.to_string(), + level: next_level, + }), + None => { + warn!( + "Received msg to update character level info to {} for \ + uid {}, but this player's character is None.", + next_level, uid + ); + None + }, + }; + } + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::Remove(uid)) => { + // Instead of removing players, mark them as offline because we need to + // remember the names of disconnected players in chat. + // + // TODO the server should re-use uids of players that log out and log back + // in. + + if let Some(player_info) = self.player_list.get_mut(&uid) { + if player_info.is_online { + player_info.is_online = false; + } else { + warn!( + "Received msg to remove uid {} from the player list by they \ + were already marked offline", + uid + ); + } + } else { + warn!( + "Received msg to remove uid {} from the player list by they \ + weren't in the list!", + uid + ); + } + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::Alias(uid, new_name)) => { + if let Some(player_info) = self.player_list.get_mut(&uid) { + player_info.player_alias = new_name; + } else { + warn!( + "Received msg to alias player with uid {} to {} but this uid is \ + not in the player list", + uid, new_name + ); + } + }, + + ServerMsg::Ping => {self.singleton_stream.send(ClientMsg::Pong)?;}, + ServerMsg::Pong => { + self.last_server_pong = self.state.get_time(); + + self.last_ping_delta = + (self.state.get_time() - self.last_server_ping).round(); + }, + ServerMsg::ChatMsg(m) => frontend_events.push(Event::Chat(m)), + ServerMsg::SetPlayerEntity(uid) => { + if let Some(entity) = self.state.ecs().entity_from_uid(uid.0) { + self.entity = entity; + } else { + return Err(Error::Other("Failed to find entity from uid.".to_owned())); + } + }, + ServerMsg::TimeOfDay(time_of_day) => { + *self.state.ecs_mut().write_resource() = time_of_day; + }, + ServerMsg::EntitySync(entity_sync_package) => { + self.state + .ecs_mut() + .apply_entity_sync_package(entity_sync_package); + }, + ServerMsg::CompSync(comp_sync_package) => { + self.state + .ecs_mut() + .apply_comp_sync_package(comp_sync_package); + }, + ServerMsg::CreateEntity(entity_package) => { + self.state.ecs_mut().apply_entity_package(entity_package); + }, + ServerMsg::DeleteEntity(entity) => { + if self.state.read_component_cloned::(self.entity) != Some(entity) { + self.state + .ecs_mut() + .delete_entity_and_clear_from_uid_allocator(entity.0); + } + }, + // Cleanup for when the client goes back to the `Registered` state + ServerMsg::ExitIngameCleanup => { + self.clean_state(); + }, + ServerMsg::InventoryUpdate(inventory, event) => { + match event { + InventoryUpdateEvent::CollectFailed => {}, + _ => { + // Push the updated inventory component to the client + self.state.write_component(self.entity, inventory); + }, + } + + frontend_events.push(Event::InventoryUpdated(event)); + }, + ServerMsg::TerrainChunkUpdate { key, chunk } => { + if let Ok(chunk) = chunk { + self.state.insert_chunk(key, *chunk); + } + self.pending_chunks.remove(&key); + }, + ServerMsg::TerrainBlockUpdates(mut blocks) => { + blocks.drain().for_each(|(pos, block)| { + self.state.set_block(pos, block); + }); + }, + ServerMsg::StateAnswer(Ok(state)) => { + self.client_state = state; + }, + ServerMsg::StateAnswer(Err((error, state))) => { + warn!( + "StateAnswer: {:?}. Server thinks client is in state {:?}.", + error, state + ); + }, + ServerMsg::Disconnect => { + frontend_events.push(Event::Disconnect); + self.singleton_stream.send(ClientMsg::Terminate)?; + }, + ServerMsg::CharacterListUpdate(character_list) => { + self.character_list.characters = character_list; + self.character_list.loading = false; + }, + ServerMsg::CharacterActionError(error) => { + warn!("CharacterActionError: {:?}.", error); + self.character_list.error = Some(error); + }, + ServerMsg::Notification(n) => { + frontend_events.push(Event::Notification(n)); + }, + ServerMsg::CharacterDataLoadError(error) => { + self.clean_state(); + self.character_list.error = Some(error); + }, + ServerMsg::SetViewDistance(vd) => { + self.view_distance = Some(vd); + frontend_events.push(Event::SetViewDistance(vd)); + }, + } + } + } + + /// Handle new server messages. fn handle_new_messages(&mut self) -> Result, Error> { let mut frontend_events = Vec::new(); @@ -758,216 +963,20 @@ impl Client { } } - let new_msgs = self.postbox.new_messages(); + let mut handles_msg = 0; - if new_msgs.len() > 0 { - for msg in new_msgs { - match msg { - ServerMsg::TooManyPlayers => { - return Err(Error::ServerWentMad); - }, - ServerMsg::Shutdown => return Err(Error::ServerShutdown), - ServerMsg::InitialSync { .. } => return Err(Error::ServerWentMad), - ServerMsg::PlayerListUpdate(PlayerListUpdate::Init(list)) => { - self.player_list = list - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::Add(uid, player_info)) => { - if let Some(old_player_info) = - self.player_list.insert(uid, player_info.clone()) - { - warn!( - "Received msg to insert {} with uid {} into the player list but \ - there was already an entry for {} with the same uid that was \ - overwritten!", - player_info.player_alias, uid, old_player_info.player_alias - ); - } - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::Admin(uid, admin)) => { - if let Some(player_info) = self.player_list.get_mut(&uid) { - player_info.is_admin = admin; - } else { - warn!( - "Received msg to update admin status of uid {}, but they were not \ - in the list.", - uid - ); - } - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::SelectedCharacter( - uid, - char_info, - )) => { - if let Some(player_info) = self.player_list.get_mut(&uid) { - player_info.character = Some(char_info); - } else { - warn!( - "Received msg to update character info for uid {}, but they were \ - not in the list.", - uid - ); - } - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::LevelChange(uid, next_level)) => { - if let Some(player_info) = self.player_list.get_mut(&uid) { - player_info.character = match &player_info.character { - Some(character) => Some(common::msg::CharacterInfo { - name: character.name.to_string(), - level: next_level, - }), - None => { - warn!( - "Received msg to update character level info to {} for \ - uid {}, but this player's character is None.", - next_level, uid - ); + block_on(async{ + //TIMEOUT 0.01 ms for msg handling + select!( + _ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()), + err = self.handle_message(&mut frontend_events, &mut handles_msg).fuse() => err, + ) + })?; - None - }, - }; - } - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::Remove(uid)) => { - // Instead of removing players, mark them as offline because we need to - // remember the names of disconnected players in chat. - // - // TODO the server should re-use uids of players that log out and log back - // in. - - if let Some(player_info) = self.player_list.get_mut(&uid) { - if player_info.is_online { - player_info.is_online = false; - } else { - warn!( - "Received msg to remove uid {} from the player list by they \ - were already marked offline", - uid - ); - } - } else { - warn!( - "Received msg to remove uid {} from the player list by they \ - weren't in the list!", - uid - ); - } - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::Alias(uid, new_name)) => { - if let Some(player_info) = self.player_list.get_mut(&uid) { - player_info.player_alias = new_name; - } else { - warn!( - "Received msg to alias player with uid {} to {} but this uid is \ - not in the player list", - uid, new_name - ); - } - }, - - ServerMsg::Ping => self.postbox.send_message(ClientMsg::Pong), - ServerMsg::Pong => { - self.last_server_pong = self.state.get_time(); - - self.last_ping_delta = - (self.state.get_time() - self.last_server_ping).round(); - }, - ServerMsg::ChatMsg(m) => frontend_events.push(Event::Chat(m)), - ServerMsg::SetPlayerEntity(uid) => { - if let Some(entity) = self.state.ecs().entity_from_uid(uid.0) { - self.entity = entity; - } else { - return Err(Error::Other("Failed to find entity from uid.".to_owned())); - } - }, - ServerMsg::TimeOfDay(time_of_day) => { - *self.state.ecs_mut().write_resource() = time_of_day; - }, - ServerMsg::EntitySync(entity_sync_package) => { - self.state - .ecs_mut() - .apply_entity_sync_package(entity_sync_package); - }, - ServerMsg::CompSync(comp_sync_package) => { - self.state - .ecs_mut() - .apply_comp_sync_package(comp_sync_package); - }, - ServerMsg::CreateEntity(entity_package) => { - self.state.ecs_mut().apply_entity_package(entity_package); - }, - ServerMsg::DeleteEntity(entity) => { - if self.state.read_component_cloned::(self.entity) != Some(entity) { - self.state - .ecs_mut() - .delete_entity_and_clear_from_uid_allocator(entity.0); - } - }, - // Cleanup for when the client goes back to the `Registered` state - ServerMsg::ExitIngameCleanup => { - self.clean_state(); - }, - ServerMsg::InventoryUpdate(inventory, event) => { - match event { - InventoryUpdateEvent::CollectFailed => {}, - _ => { - // Push the updated inventory component to the client - self.state.write_component(self.entity, inventory); - }, - } - - frontend_events.push(Event::InventoryUpdated(event)); - }, - ServerMsg::TerrainChunkUpdate { key, chunk } => { - if let Ok(chunk) = chunk { - self.state.insert_chunk(key, *chunk); - } - self.pending_chunks.remove(&key); - }, - ServerMsg::TerrainBlockUpdates(mut blocks) => { - blocks.drain().for_each(|(pos, block)| { - self.state.set_block(pos, block); - }); - }, - ServerMsg::StateAnswer(Ok(state)) => { - self.client_state = state; - }, - ServerMsg::StateAnswer(Err((error, state))) => { - warn!( - "StateAnswer: {:?}. Server thinks client is in state {:?}.", - error, state - ); - }, - ServerMsg::Disconnect => { - frontend_events.push(Event::Disconnect); - self.postbox.send_message(ClientMsg::Terminate); - }, - ServerMsg::CharacterListUpdate(character_list) => { - self.character_list.characters = character_list; - self.character_list.loading = false; - }, - ServerMsg::CharacterActionError(error) => { - warn!("CharacterActionError: {:?}.", error); - self.character_list.error = Some(error); - }, - ServerMsg::Notification(n) => { - frontend_events.push(Event::Notification(n)); - }, - ServerMsg::CharacterDataLoadError(error) => { - self.clean_state(); - self.character_list.error = Some(error); - }, - ServerMsg::SetViewDistance(vd) => { - self.view_distance = Some(vd); - frontend_events.push(Event::SetViewDistance(vd)); - }, - } - } - } else if let Some(err) = self.postbox.error() { - return Err(err.into()); - // We regularily ping in the tick method - } else if self.state.get_time() - self.last_server_pong > SERVER_TIMEOUT { + if handles_msg == 0 && self.state.get_time() - self.last_server_pong > SERVER_TIMEOUT { return Err(Error::ServerTimeout); } + Ok(frontend_events) } diff --git a/server/src/client.rs b/server/src/client.rs index e02a48392d..0cdea63718 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -19,7 +19,7 @@ impl Component for Client { } impl Client { - pub fn notify(&mut self, msg: ServerMsg) { self.singleton_stream.lock().unwrap().send(msg); } + pub fn notify(&mut self, msg: ServerMsg) { let _ = self.singleton_stream.lock().unwrap().send(msg); } pub fn is_registered(&self) -> bool { match self.client_state { @@ -37,12 +37,12 @@ impl Client { pub fn allow_state(&mut self, new_state: ClientState) { self.client_state = new_state; - self.singleton_stream + let _ = self.singleton_stream .lock().unwrap().send(ServerMsg::StateAnswer(Ok(new_state))); } pub fn error_state(&mut self, error: RequestStateError) { - self.singleton_stream + let _ = self.singleton_stream .lock().unwrap().send(ServerMsg::StateAnswer(Err((error, self.client_state)))); } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 898004616d..ffae562df9 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -342,11 +342,11 @@ impl Server { // 3) Handle inputs from clients block_on(async{ //TIMEOUT 0.01 ms for msg handling - let x = select!( + select!( _ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()), err = self.handle_new_connections(&mut frontend_events).fuse() => err, - ); - }); + ) + })?; let before_message_system = Instant::now(); diff --git a/server/src/sys/message.rs b/server/src/sys/message.rs index 0038449bb3..27ee12623d 100644 --- a/server/src/sys/message.rs +++ b/server/src/sys/message.rs @@ -22,6 +22,366 @@ use hashbrown::HashMap; use specs::{ Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage, }; +use futures_util::{select, FutureExt}; +use futures_executor::block_on; +use futures_timer::Delay; + +impl Sys { + ///We need to move this to a async fn, otherwise the compiler generates to much recursive fn, and async closures dont work yet + #[allow(clippy::too_many_arguments)] + async fn handle_client_msg( + server_emitter: &mut common::event::Emitter<'_, ServerEvent>, + new_chat_msgs: &mut Vec<(Option, ChatMsg)>, + player_list: &HashMap, + new_players: &mut Vec, + entity: specs::Entity, + client: &mut Client, + cnt: &mut u64, + character_loader: &ReadExpect<'_, CharacterLoader>, + terrain: &ReadExpect<'_, TerrainGrid>, + uids: &ReadStorage<'_, Uid>, + can_build: &ReadStorage<'_, CanBuild>, + force_updates: &ReadStorage<'_, ForceUpdate>, + stats: &ReadStorage<'_, Stats>, + chat_modes: &ReadStorage<'_, ChatMode>, + accounts: &mut WriteExpect<'_, AuthProvider>, + block_changes: &mut Write<'_, BlockChange>, + admin_list: &ReadExpect<'_, AdminList>, + admins: &mut WriteStorage<'_, Admin>, + positions: &mut WriteStorage<'_, Pos>, + velocities: &mut WriteStorage<'_, Vel>, + orientations: &mut WriteStorage<'_, Ori>, + players: &mut WriteStorage<'_, Player>, + controllers: &mut WriteStorage<'_, Controller>, + settings: &Read<'_, ServerSettings>, + ) -> Result<(), crate::error::Error> { + loop { + let msg = client.singleton_stream.lock().unwrap().recv().await?; + *cnt += 1; + match msg { + // Go back to registered state (char selection screen) + ClientMsg::ExitIngame => match client.client_state { + // Use ClientMsg::Register instead. + ClientState::Connected => { + client.error_state(RequestStateError::WrongMessage) + }, + ClientState::Registered => client.error_state(RequestStateError::Already), + ClientState::Spectator | ClientState::Character => { + server_emitter.emit(ServerEvent::ExitIngame { entity }); + }, + ClientState::Pending => {}, + }, + // Request spectator state + ClientMsg::Spectate => match client.client_state { + // Become Registered first. + ClientState::Connected => client.error_state(RequestStateError::Impossible), + ClientState::Spectator => client.error_state(RequestStateError::Already), + ClientState::Registered | ClientState::Character => { + client.allow_state(ClientState::Spectator) + }, + ClientState::Pending => {}, + }, + // Request registered state (login) + ClientMsg::Register { + view_distance, + token_or_username, + } => { + let (username, uuid) = match accounts.query(token_or_username.clone()) { + Err(err) => { + client.error_state(RequestStateError::RegisterDenied(err)); + break Ok(()); + }, + Ok((username, uuid)) => (username, uuid), + }; + + let vd = view_distance + .map(|vd| vd.min(settings.max_view_distance.unwrap_or(vd))); + let player = Player::new(username.clone(), None, vd, uuid); + let is_admin = admin_list.contains(&username); + + if !player.is_valid() { + // Invalid player + client.error_state(RequestStateError::Impossible); + break Ok(()); + } + + match client.client_state { + ClientState::Connected => { + // Add Player component to this client + let _ = players.insert(entity, player); + + // Give the Admin component to the player if their name exists in + // admin list + if is_admin { + let _ = admins.insert(entity, Admin); + } + + // Tell the client its request was successful. + client.allow_state(ClientState::Registered); + + // Send initial player list + client.notify(ServerMsg::PlayerListUpdate(PlayerListUpdate::Init( + player_list.clone(), + ))); + + // Add to list to notify all clients of the new player + new_players.push(entity); + }, + // Use RequestState instead (No need to send `player` again). + _ => client.error_state(RequestStateError::Impossible), + } + //client.allow_state(ClientState::Registered); + + // Limit view distance if it's too high + // This comes after state registration so that the client actually hears it + if settings + .max_view_distance + .zip(view_distance) + .map(|(vd, max)| vd > max) + .unwrap_or(false) + { + client.notify(ServerMsg::SetViewDistance( + settings.max_view_distance.unwrap_or(0), + )); + }; + }, + ClientMsg::SetViewDistance(view_distance) => if let ClientState::Character { .. } = client.client_state { + if settings + .max_view_distance + .map(|max| view_distance <= max) + .unwrap_or(true) + { + players.get_mut(entity).map(|player| { + player.view_distance = Some( + settings + .max_view_distance + .map(|max| view_distance.min(max)) + .unwrap_or(view_distance), + ) + }); + } else { + client.notify(ServerMsg::SetViewDistance( + settings.max_view_distance.unwrap_or(0), + )); + } + }, + ClientMsg::Character(character_id) => match client.client_state { + // Become Registered first. + ClientState::Connected => client.error_state(RequestStateError::Impossible), + ClientState::Registered | ClientState::Spectator => { + // Only send login message if it wasn't already + // sent previously + if let (Some(player), false) = + (players.get(entity), client.login_msg_sent) + { + // Send a request to load the character's component data from the + // DB. Once loaded, persisted components such as stats and inventory + // will be inserted for the entity + character_loader.load_character_data( + entity, + player.uuid().to_string(), + character_id, + ); + + // Start inserting non-persisted/default components for the entity + // while we load the DB data + server_emitter.emit(ServerEvent::InitCharacterData { + entity, + character_id, + }); + + // Give the player a welcome message + if settings.server_description.len() > 0 { + client.notify( + ChatType::CommandInfo + .server_msg(settings.server_description.clone()), + ); + } + + // Only send login message if it wasn't already + // sent previously + if !client.login_msg_sent { + new_chat_msgs.push((None, ChatMsg { + chat_type: ChatType::Online, + message: format!("[{}] is now online.", &player.alias), // TODO: Localize this + })); + + client.login_msg_sent = true; + } + } else { + client.notify(ServerMsg::CharacterDataLoadError(String::from( + "Failed to fetch player entity", + ))) + } + }, + ClientState::Character => client.error_state(RequestStateError::Already), + ClientState::Pending => {}, + }, + ClientMsg::ControllerInputs(inputs) => match client.client_state { + ClientState::Connected + | ClientState::Registered + | ClientState::Spectator => { + client.error_state(RequestStateError::Impossible) + }, + ClientState::Character => { + if let Some(controller) = controllers.get_mut(entity) { + controller.inputs.update_with_new(inputs); + } + }, + ClientState::Pending => {}, + }, + ClientMsg::ControlEvent(event) => match client.client_state { + ClientState::Connected + | ClientState::Registered + | ClientState::Spectator => { + client.error_state(RequestStateError::Impossible) + }, + ClientState::Character => { + // Skip respawn if client entity is alive + if let ControlEvent::Respawn = event { + if stats.get(entity).map_or(true, |s| !s.is_dead) { + continue; + } + } + if let Some(controller) = controllers.get_mut(entity) { + controller.events.push(event); + } + }, + ClientState::Pending => {}, + }, + ClientMsg::ControlAction(event) => match client.client_state { + ClientState::Connected + | ClientState::Registered + | ClientState::Spectator => { + client.error_state(RequestStateError::Impossible) + }, + ClientState::Character => { + if let Some(controller) = controllers.get_mut(entity) { + controller.actions.push(event); + } + }, + ClientState::Pending => {}, + }, + ClientMsg::ChatMsg(message) => match client.client_state { + ClientState::Connected => client.error_state(RequestStateError::Impossible), + ClientState::Registered + | ClientState::Spectator + | ClientState::Character => match validate_chat_msg(&message) { + Ok(()) => { + if let Some(from) = uids.get(entity) { + let mode = chat_modes.get(entity).cloned().unwrap_or_default(); + let msg = mode.new_message(*from, message); + new_chat_msgs.push((Some(entity), msg)); + } else { + tracing::error!("Could not send message. Missing player uid"); + } + }, + Err(ChatMsgValidationError::TooLong) => { + let max = MAX_BYTES_CHAT_MSG; + let len = message.len(); + tracing::warn!( + ?len, + ?max, + "Recieved a chat message that's too long" + ) + }, + }, + ClientState::Pending => {}, + }, + ClientMsg::PlayerPhysics { pos, vel, ori } => match client.client_state { + ClientState::Character => { + if force_updates.get(entity).is_none() + && stats.get(entity).map_or(true, |s| !s.is_dead) + { + let _ = positions.insert(entity, pos); + let _ = velocities.insert(entity, vel); + let _ = orientations.insert(entity, ori); + } + }, + // Only characters can send positions. + _ => client.error_state(RequestStateError::Impossible), + }, + ClientMsg::BreakBlock(pos) => { + if can_build.get(entity).is_some() { + block_changes.set(pos, Block::empty()); + } + }, + ClientMsg::PlaceBlock(pos, block) => { + if can_build.get(entity).is_some() { + block_changes.try_set(pos, block); + } + }, + ClientMsg::TerrainChunkRequest { key } => match client.client_state { + ClientState::Connected | ClientState::Registered => { + client.error_state(RequestStateError::Impossible); + }, + ClientState::Spectator | ClientState::Character => { + let in_vd = if let (Some(view_distance), Some(pos)) = ( + players.get(entity).and_then(|p| p.view_distance), + positions.get(entity), + ) { + pos.0.xy().map(|e| e as f64).distance( + key.map(|e| e as f64 + 0.5) + * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), + ) < (view_distance as f64 + 1.5) + * TerrainChunkSize::RECT_SIZE.x as f64 + } else { + true + }; + if in_vd { + match terrain.get_key(key) { + Some(chunk) => { + client.notify(ServerMsg::TerrainChunkUpdate { + key, + chunk: Ok(Box::new(chunk.clone())), + }) + }, + None => { + server_emitter.emit(ServerEvent::ChunkRequest(entity, key)) + }, + } + } + }, + ClientState::Pending => {}, + }, + // Always possible. + ClientMsg::Ping => client.notify(ServerMsg::Pong), + ClientMsg::Pong => {}, + ClientMsg::Disconnect => { + client.notify(ServerMsg::Disconnect); + }, + ClientMsg::Terminate => { + server_emitter.emit(ServerEvent::ClientDisconnect(entity)); + }, + ClientMsg::RequestCharacterList => { + if let Some(player) = players.get(entity) { + character_loader.load_character_list(entity, player.uuid().to_string()) + } + }, + ClientMsg::CreateCharacter { alias, tool, body } => { + if let Some(player) = players.get(entity) { + character_loader.create_character( + entity, + player.uuid().to_string(), + alias, + tool, + body, + ); + } + }, + ClientMsg::DeleteCharacter(character_id) => { + if let Some(player) = players.get(entity) { + character_loader.delete_character( + entity, + player.uuid().to_string(), + character_id, + ); + } + }, + } + } + } +} /// This system will handle new messages from clients pub struct Sys; @@ -107,347 +467,53 @@ impl<'a> System<'a> for Sys { let mut new_players = Vec::new(); for (entity, client) in (&entities, &mut clients).join() { - let new_msgs = client.postbox.new_messages(); + let mut cnt = 0; + + let network_err: Result<(), crate::error::Error> = block_on(async{ + //TIMEOUT 0.01 ms for msg handling + select!( + _ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()), + err = Self::handle_client_msg( + &mut server_emitter, + &mut new_chat_msgs, + &player_list, + &mut new_players, + entity, + client, + &mut cnt, + + &character_loader, + &terrain, + &uids, + &can_build, + &force_updates, + &stats, + &chat_modes, + &mut accounts, + &mut block_changes, + &admin_list, + &mut admins, + &mut positions, + &mut velocities, + &mut orientations, + &mut players, + &mut controllers, + &settings, + ).fuse() => err, + ) + }); // Update client ping. - if new_msgs.len() > 0 { + if cnt > 0 { client.last_ping = time.0 } else if time.0 - client.last_ping > CLIENT_TIMEOUT // Timeout - || client.postbox.error().is_some() + || network_err.is_err() // Postbox error { server_emitter.emit(ServerEvent::ClientDisconnect(entity)); } else if time.0 - client.last_ping > CLIENT_TIMEOUT * 0.5 { // Try pinging the client if the timeout is nearing. - client.postbox.send_message(ServerMsg::Ping); - } - - // Process incoming messages. - for msg in new_msgs { - match msg { - // Go back to registered state (char selection screen) - ClientMsg::ExitIngame => match client.client_state { - // Use ClientMsg::Register instead. - ClientState::Connected => { - client.error_state(RequestStateError::WrongMessage) - }, - ClientState::Registered => client.error_state(RequestStateError::Already), - ClientState::Spectator | ClientState::Character => { - server_emitter.emit(ServerEvent::ExitIngame { entity }); - }, - ClientState::Pending => {}, - }, - // Request spectator state - ClientMsg::Spectate => match client.client_state { - // Become Registered first. - ClientState::Connected => client.error_state(RequestStateError::Impossible), - ClientState::Spectator => client.error_state(RequestStateError::Already), - ClientState::Registered | ClientState::Character => { - client.allow_state(ClientState::Spectator) - }, - ClientState::Pending => {}, - }, - // Request registered state (login) - ClientMsg::Register { - view_distance, - token_or_username, - } => { - let (username, uuid) = match accounts.query(token_or_username.clone()) { - Err(err) => { - client.error_state(RequestStateError::RegisterDenied(err)); - break; - }, - Ok((username, uuid)) => (username, uuid), - }; - - let vd = view_distance - .map(|vd| vd.min(settings.max_view_distance.unwrap_or(vd))); - let player = Player::new(username.clone(), None, vd, uuid); - let is_admin = admin_list.contains(&username); - - if !player.is_valid() { - // Invalid player - client.error_state(RequestStateError::Impossible); - break; - } - - match client.client_state { - ClientState::Connected => { - // Add Player component to this client - let _ = players.insert(entity, player); - - // Give the Admin component to the player if their name exists in - // admin list - if is_admin { - let _ = admins.insert(entity, Admin); - } - - // Tell the client its request was successful. - client.allow_state(ClientState::Registered); - - // Send initial player list - client.notify(ServerMsg::PlayerListUpdate(PlayerListUpdate::Init( - player_list.clone(), - ))); - - // Add to list to notify all clients of the new player - new_players.push(entity); - }, - // Use RequestState instead (No need to send `player` again). - _ => client.error_state(RequestStateError::Impossible), - } - //client.allow_state(ClientState::Registered); - - // Limit view distance if it's too high - // This comes after state registration so that the client actually hears it - if settings - .max_view_distance - .zip(view_distance) - .map(|(vd, max)| vd > max) - .unwrap_or(false) - { - client.notify(ServerMsg::SetViewDistance( - settings.max_view_distance.unwrap_or(0), - )); - }; - }, - ClientMsg::SetViewDistance(view_distance) => match client.client_state { - ClientState::Character { .. } => { - if settings - .max_view_distance - .map(|max| view_distance <= max) - .unwrap_or(true) - { - players.get_mut(entity).map(|player| { - player.view_distance = Some( - settings - .max_view_distance - .map(|max| view_distance.min(max)) - .unwrap_or(view_distance), - ) - }); - } else { - client.notify(ServerMsg::SetViewDistance( - settings.max_view_distance.unwrap_or(0), - )); - } - }, - _ => {}, - }, - ClientMsg::Character(character_id) => match client.client_state { - // Become Registered first. - ClientState::Connected => client.error_state(RequestStateError::Impossible), - ClientState::Registered | ClientState::Spectator => { - // Only send login message if it wasn't already - // sent previously - if let (Some(player), false) = - (players.get(entity), client.login_msg_sent) - { - // Send a request to load the character's component data from the - // DB. Once loaded, persisted components such as stats and inventory - // will be inserted for the entity - character_loader.load_character_data( - entity, - player.uuid().to_string(), - character_id, - ); - - // Start inserting non-persisted/default components for the entity - // while we load the DB data - server_emitter.emit(ServerEvent::InitCharacterData { - entity, - character_id, - }); - - // Give the player a welcome message - if settings.server_description.len() > 0 { - client.notify( - ChatType::CommandInfo - .server_msg(settings.server_description.clone()), - ); - } - - // Only send login message if it wasn't already - // sent previously - if !client.login_msg_sent { - new_chat_msgs.push((None, ChatMsg { - chat_type: ChatType::Online, - message: format!("[{}] is now online.", &player.alias), // TODO: Localize this - })); - - client.login_msg_sent = true; - } - } else { - client.notify(ServerMsg::CharacterDataLoadError(String::from( - "Failed to fetch player entity", - ))) - } - }, - ClientState::Character => client.error_state(RequestStateError::Already), - ClientState::Pending => {}, - }, - ClientMsg::ControllerInputs(inputs) => match client.client_state { - ClientState::Connected - | ClientState::Registered - | ClientState::Spectator => { - client.error_state(RequestStateError::Impossible) - }, - ClientState::Character => { - if let Some(controller) = controllers.get_mut(entity) { - controller.inputs.update_with_new(inputs); - } - }, - ClientState::Pending => {}, - }, - ClientMsg::ControlEvent(event) => match client.client_state { - ClientState::Connected - | ClientState::Registered - | ClientState::Spectator => { - client.error_state(RequestStateError::Impossible) - }, - ClientState::Character => { - // Skip respawn if client entity is alive - if let &ControlEvent::Respawn = &event { - if stats.get(entity).map_or(true, |s| !s.is_dead) { - continue; - } - } - if let Some(controller) = controllers.get_mut(entity) { - controller.events.push(event); - } - }, - ClientState::Pending => {}, - }, - ClientMsg::ControlAction(event) => match client.client_state { - ClientState::Connected - | ClientState::Registered - | ClientState::Spectator => { - client.error_state(RequestStateError::Impossible) - }, - ClientState::Character => { - if let Some(controller) = controllers.get_mut(entity) { - controller.actions.push(event); - } - }, - ClientState::Pending => {}, - }, - ClientMsg::ChatMsg(message) => match client.client_state { - ClientState::Connected => client.error_state(RequestStateError::Impossible), - ClientState::Registered - | ClientState::Spectator - | ClientState::Character => match validate_chat_msg(&message) { - Ok(()) => { - if let Some(from) = uids.get(entity) { - let mode = chat_modes.get(entity).cloned().unwrap_or_default(); - let msg = mode.new_message(*from, message); - new_chat_msgs.push((Some(entity), msg)); - } else { - tracing::error!("Could not send message. Missing player uid"); - } - }, - Err(ChatMsgValidationError::TooLong) => { - let max = MAX_BYTES_CHAT_MSG; - let len = message.len(); - tracing::warn!( - ?len, - ?max, - "Recieved a chat message that's too long" - ) - }, - }, - ClientState::Pending => {}, - }, - ClientMsg::PlayerPhysics { pos, vel, ori } => match client.client_state { - ClientState::Character => { - if force_updates.get(entity).is_none() - && stats.get(entity).map_or(true, |s| !s.is_dead) - { - let _ = positions.insert(entity, pos); - let _ = velocities.insert(entity, vel); - let _ = orientations.insert(entity, ori); - } - }, - // Only characters can send positions. - _ => client.error_state(RequestStateError::Impossible), - }, - ClientMsg::BreakBlock(pos) => { - if can_build.get(entity).is_some() { - block_changes.set(pos, Block::empty()); - } - }, - ClientMsg::PlaceBlock(pos, block) => { - if can_build.get(entity).is_some() { - block_changes.try_set(pos, block); - } - }, - ClientMsg::TerrainChunkRequest { key } => match client.client_state { - ClientState::Connected | ClientState::Registered => { - client.error_state(RequestStateError::Impossible); - }, - ClientState::Spectator | ClientState::Character => { - let in_vd = if let (Some(view_distance), Some(pos)) = ( - players.get(entity).and_then(|p| p.view_distance), - positions.get(entity), - ) { - pos.0.xy().map(|e| e as f64).distance( - key.map(|e| e as f64 + 0.5) - * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), - ) < (view_distance as f64 + 1.5) - * TerrainChunkSize::RECT_SIZE.x as f64 - } else { - true - }; - if in_vd { - match terrain.get_key(key) { - Some(chunk) => { - client.postbox.send_message(ServerMsg::TerrainChunkUpdate { - key, - chunk: Ok(Box::new(chunk.clone())), - }) - }, - None => { - server_emitter.emit(ServerEvent::ChunkRequest(entity, key)) - }, - } - } - }, - ClientState::Pending => {}, - }, - // Always possible. - ClientMsg::Ping => client.postbox.send_message(ServerMsg::Pong), - ClientMsg::Pong => {}, - ClientMsg::Disconnect => { - client.postbox.send_message(ServerMsg::Disconnect); - }, - ClientMsg::Terminate => { - server_emitter.emit(ServerEvent::ClientDisconnect(entity)); - }, - ClientMsg::RequestCharacterList => { - if let Some(player) = players.get(entity) { - character_loader.load_character_list(entity, player.uuid().to_string()) - } - }, - ClientMsg::CreateCharacter { alias, tool, body } => { - if let Some(player) = players.get(entity) { - character_loader.create_character( - entity, - player.uuid().to_string(), - alias, - tool, - body, - ); - } - }, - ClientMsg::DeleteCharacter(character_id) => { - if let Some(player) = players.get(entity) { - character_loader.delete_character( - entity, - player.uuid().to_string(), - character_id, - ); - } - }, - } + client.notify(ServerMsg::Ping); } } diff --git a/voxygen/src/menu/main/client_init.rs b/voxygen/src/menu/main/client_init.rs index 455ced5066..77d4f2890e 100644 --- a/voxygen/src/menu/main/client_init.rs +++ b/voxygen/src/menu/main/client_init.rs @@ -1,4 +1,4 @@ -use client::{error::Error as ClientError, Client}; +use client::{error::Error as ClientError, Client, error::NetworkError}; use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError}; use std::{ net::ToSocketAddrs, @@ -97,10 +97,7 @@ impl ClientInit { }, Err(err) => { match err { - ClientError::NetworkErr(_) => { - last_err = Some(Error::ClientError(err)); - break 'tries; - } + ClientError::NetworkErr(NetworkError::ListenFailed(..)) => {}, // Non-connection error, stop attempts err => { last_err = Some(Error::ClientError(err)); From 15ff58cd6a16f1ee352773f79ff5744ec242751f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Wed, 1 Jul 2020 11:51:37 +0200 Subject: [PATCH 3/3] simple fmt in order to make the replacement commit as simple as possible --- client/src/error.rs | 2 +- client/src/lib.rs | 228 ++++++++++++++++----------- common/src/lib.rs | 2 +- server/src/client.rs | 24 +-- server/src/lib.rs | 19 ++- server/src/sys/message.rs | 89 +++++------ server/src/sys/waypoint.rs | 3 +- voxygen/src/menu/main/client_init.rs | 8 +- 8 files changed, 213 insertions(+), 162 deletions(-) diff --git a/client/src/error.rs b/client/src/error.rs index 2321f5944f..ae7b1725aa 100644 --- a/client/src/error.rs +++ b/client/src/error.rs @@ -1,6 +1,6 @@ use authc::AuthClientError; -use network::{ParticipantError, StreamError}; pub use network::NetworkError; +use network::{ParticipantError, StreamError}; #[derive(Debug)] pub enum Error { diff --git a/client/src/lib.rs b/client/src/lib.rs index c5d6df2c60..329faaa51a 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -30,17 +30,17 @@ use common::{ terrain::{block::Block, TerrainChunk, TerrainChunkSize}, vol::RectVolSize, }; -use network::{Network, Participant, Stream, Address, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED}; +use futures_executor::block_on; +use futures_timer::Delay; +use futures_util::{select, FutureExt}; use hashbrown::HashMap; use image::DynamicImage; +use network::{Address, Network, Participant, Pid, Stream, PROMISES_CONSISTENCY, PROMISES_ORDERED}; use std::{ net::SocketAddr, sync::Arc, time::{Duration, Instant}, }; -use futures_util::{select, FutureExt}; -use futures_executor::block_on; -use futures_timer::Delay; use tracing::{debug, error, warn}; use uvth::{ThreadPool, ThreadPoolBuilder}; use vek::*; @@ -118,7 +118,7 @@ impl Client { let mut stream = block_on(participant.open(10, PROMISES_ORDERED | PROMISES_CONSISTENCY))?; // Wait for initial sync - let (state, entity, server_info, world_map) = block_on(async{ + let (state, entity, server_info, world_map) = block_on(async { loop { match stream.recv().await? { ServerMsg::InitialSync { @@ -130,13 +130,13 @@ impl Client { // TODO: Display that versions don't match in Voxygen if &server_info.git_hash != *common::util::GIT_HASH { warn!( - "Server is running {}[{}], you are running {}[{}], versions might be \ - incompatible!", - server_info.git_hash, - server_info.git_date, - common::util::GIT_HASH.to_string(), - common::util::GIT_DATE.to_string(), - ); + "Server is running {}[{}], you are running {}[{}], versions might \ + be incompatible!", + server_info.git_hash, + server_info.git_date, + common::util::GIT_HASH.to_string(), + common::util::GIT_DATE.to_string(), + ); } debug!("Auth Server: {:?}", server_info.auth_provider); @@ -152,7 +152,8 @@ impl Client { *state.ecs_mut().write_resource() = time_of_day; assert_eq!(world_map.len(), (map_size.x * map_size.y) as usize); - let mut world_map_raw = vec![0u8; 4 * world_map.len()/*map_size.x * map_size.y*/]; + let mut world_map_raw = + vec![0u8; 4 * world_map.len()/*map_size.x * map_size.y*/]; LittleEndian::write_u32_into(&world_map, &mut world_map_raw); debug!("Preparing image..."); let world_map = Arc::new( @@ -168,13 +169,14 @@ impl Client { ); debug!("Done preparing image..."); - break Ok((state, entity, server_info, (world_map, map_size))) + break Ok((state, entity, server_info, (world_map, map_size))); }, ServerMsg::TooManyPlayers => break Err(Error::TooManyPlayers), err => { warn!("whoops, server mad {:?}, ignoring", err); }, - }} + } + } })?; stream.send(ClientMsg::Ping)?; @@ -242,34 +244,39 @@ impl Client { })?; self.client_state = ClientState::Pending; - block_on( - async { - loop { - match self.singleton_stream.recv().await? { - ServerMsg::StateAnswer(Err((RequestStateError::RegisterDenied(err), state))) => { - self.client_state = state; - break Err(match err { - RegisterError::AlreadyLoggedIn => Error::AlreadyLoggedIn, - RegisterError::AuthError(err) => Error::AuthErr(err), - RegisterError::InvalidCharacter => Error::InvalidCharacter, - RegisterError::NotOnWhitelist => Error::NotOnWhitelist, - }) - }, - ServerMsg::StateAnswer(Ok(ClientState::Registered)) => break Ok(()), - ignore => { - warn!("Ignoring what the server send till registered: {:? }", ignore); - //return Err(Error::ServerWentMad) - }, - } + block_on(async { + loop { + match self.singleton_stream.recv().await? { + ServerMsg::StateAnswer(Err(( + RequestStateError::RegisterDenied(err), + state, + ))) => { + self.client_state = state; + break Err(match err { + RegisterError::AlreadyLoggedIn => Error::AlreadyLoggedIn, + RegisterError::AuthError(err) => Error::AuthErr(err), + RegisterError::InvalidCharacter => Error::InvalidCharacter, + RegisterError::NotOnWhitelist => Error::NotOnWhitelist, + }); + }, + ServerMsg::StateAnswer(Ok(ClientState::Registered)) => break Ok(()), + ignore => { + warn!( + "Ignoring what the server send till registered: {:? }", + ignore + ); + //return Err(Error::ServerWentMad) + }, } } - ) + }) } /// Request a state transition to `ClientState::Character`. pub fn request_character(&mut self, character_id: i32) { self.singleton_stream - .send(ClientMsg::Character(character_id)).unwrap(); + .send(ClientMsg::Character(character_id)) + .unwrap(); self.active_character_id = Some(character_id); self.client_state = ClientState::Pending; @@ -278,27 +285,36 @@ impl Client { /// Load the current players character list pub fn load_character_list(&mut self) { self.character_list.loading = true; - self.singleton_stream.send(ClientMsg::RequestCharacterList).unwrap(); + self.singleton_stream + .send(ClientMsg::RequestCharacterList) + .unwrap(); } /// New character creation pub fn create_character(&mut self, alias: String, tool: Option, body: comp::Body) { self.character_list.loading = true; self.singleton_stream - .send(ClientMsg::CreateCharacter { alias, tool, body }).unwrap(); + .send(ClientMsg::CreateCharacter { alias, tool, body }) + .unwrap(); } /// Character deletion pub fn delete_character(&mut self, character_id: i32) { self.character_list.loading = true; self.singleton_stream - .send(ClientMsg::DeleteCharacter(character_id)).unwrap(); + .send(ClientMsg::DeleteCharacter(character_id)) + .unwrap(); } /// Send disconnect message to the server - pub fn request_logout(&mut self) { if let Err(e) = self.singleton_stream.send(ClientMsg::Disconnect) { - error!(?e, "couldn't send disconnect package to server, did server close already?"); - }} + pub fn request_logout(&mut self) { + if let Err(e) = self.singleton_stream.send(ClientMsg::Disconnect) { + error!( + ?e, + "couldn't send disconnect package to server, did server close already?" + ); + } + } /// Request a state transition to `ClientState::Registered` from an ingame /// state. @@ -310,7 +326,8 @@ impl Client { pub fn set_view_distance(&mut self, view_distance: u32) { self.view_distance = Some(view_distance.max(1).min(65)); self.singleton_stream - .send(ClientMsg::SetViewDistance(self.view_distance.unwrap())).unwrap(); + .send(ClientMsg::SetViewDistance(self.view_distance.unwrap())) + .unwrap(); // Can't fail } @@ -318,21 +335,24 @@ impl Client { self.singleton_stream .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Use(slot), - ))).unwrap(); + ))) + .unwrap(); } pub fn swap_slots(&mut self, a: comp::slot::Slot, b: comp::slot::Slot) { self.singleton_stream .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Swap(a, b), - ))).unwrap(); + ))) + .unwrap(); } pub fn drop_slot(&mut self, slot: comp::slot::Slot) { self.singleton_stream .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Drop(slot), - ))).unwrap(); + ))) + .unwrap(); } pub fn pick_up(&mut self, entity: EcsEntity) { @@ -340,13 +360,15 @@ impl Client { self.singleton_stream .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Pickup(uid), - ))).unwrap(); + ))) + .unwrap(); } } pub fn toggle_lantern(&mut self) { self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::ToggleLantern)).unwrap(); + .send(ClientMsg::ControlEvent(ControlEvent::ToggleLantern)) + .unwrap(); } pub fn is_mounted(&self) -> bool { @@ -360,12 +382,15 @@ impl Client { pub fn mount(&mut self, entity: EcsEntity) { if let Some(uid) = self.state.ecs().read_storage::().get(entity).copied() { self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::Mount(uid))).unwrap(); + .send(ClientMsg::ControlEvent(ControlEvent::Mount(uid))) + .unwrap(); } } pub fn unmount(&mut self) { - self.singleton_stream.send(ClientMsg::ControlEvent(ControlEvent::Unmount)).unwrap(); + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::Unmount)) + .unwrap(); } pub fn respawn(&mut self) { @@ -377,7 +402,8 @@ impl Client { .map_or(false, |s| s.is_dead) { self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::Respawn)).unwrap(); + .send(ClientMsg::ControlEvent(ControlEvent::Respawn)) + .unwrap(); } } @@ -460,7 +486,8 @@ impl Client { controller.actions.push(control_action); } self.singleton_stream - .send(ClientMsg::ControlAction(control_action)).unwrap(); + .send(ClientMsg::ControlAction(control_action)) + .unwrap(); } pub fn view_distance(&self) -> Option { self.view_distance } @@ -489,7 +516,10 @@ impl Client { /// Send a chat message to the server. pub fn send_chat(&mut self, message: String) { match validate_chat_msg(&message) { - Ok(()) => self.singleton_stream.send(ClientMsg::ChatMsg(message)).unwrap(), + Ok(()) => self + .singleton_stream + .send(ClientMsg::ChatMsg(message)) + .unwrap(), Err(ChatMsgValidationError::TooLong) => tracing::warn!( "Attempted to send a message that's too long (Over {} bytes)", MAX_BYTES_CHAT_MSG @@ -504,18 +534,23 @@ impl Client { } pub fn place_block(&mut self, pos: Vec3, block: Block) { - self.singleton_stream.send(ClientMsg::PlaceBlock(pos, block)).unwrap(); + self.singleton_stream + .send(ClientMsg::PlaceBlock(pos, block)) + .unwrap(); } pub fn remove_block(&mut self, pos: Vec3) { - self.singleton_stream.send(ClientMsg::BreakBlock(pos)).unwrap(); + self.singleton_stream + .send(ClientMsg::BreakBlock(pos)) + .unwrap(); } pub fn collect_block(&mut self, pos: Vec3) { self.singleton_stream .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Collect(pos), - ))).unwrap(); + ))) + .unwrap(); } /// Execute a single client tick, handle input and update the game state by @@ -570,7 +605,9 @@ impl Client { "Couldn't access controller component on client entity" ); } - self.singleton_stream.send(ClientMsg::ControllerInputs(inputs)).unwrap(); + self.singleton_stream + .send(ClientMsg::ControllerInputs(inputs)) + .unwrap(); } // 2) Build up a list of events for this frame, to be passed to the frontend. @@ -667,7 +704,8 @@ impl Client { if self.state.terrain().get_key(*key).is_none() { if !skip_mode && !self.pending_chunks.contains_key(key) { if self.pending_chunks.len() < 4 { - self.singleton_stream.send(ClientMsg::TerrainChunkRequest { key: *key })?; + self.singleton_stream + .send(ClientMsg::TerrainChunkRequest { key: *key })?; self.pending_chunks.insert(*key, Instant::now()); } else { skip_mode = true; @@ -710,7 +748,8 @@ impl Client { self.state.read_storage().get(self.entity).cloned(), self.state.read_storage().get(self.entity).cloned(), ) { - self.singleton_stream.send(ClientMsg::PlayerPhysics { pos, vel, ori })?; + self.singleton_stream + .send(ClientMsg::PlayerPhysics { pos, vel, ori })?; } } @@ -737,9 +776,13 @@ impl Client { self.state.cleanup(); } - async fn handle_message(&mut self, frontend_events: &mut Vec, cnt: &mut u64) -> Result<(), Error> { + async fn handle_message( + &mut self, + frontend_events: &mut Vec, + cnt: &mut u64, + ) -> Result<(), Error> { loop { - let msg = self.singleton_stream.recv().await?; + let msg = self.singleton_stream.recv().await?; *cnt += 1; match msg { ServerMsg::TooManyPlayers => { @@ -751,13 +794,11 @@ impl Client { self.player_list = list }, ServerMsg::PlayerListUpdate(PlayerListUpdate::Add(uid, player_info)) => { - if let Some(old_player_info) = - self.player_list.insert(uid, player_info.clone()) + if let Some(old_player_info) = self.player_list.insert(uid, player_info.clone()) { warn!( - "Received msg to insert {} with uid {} into the player list but \ - there was already an entry for {} with the same uid that was \ - overwritten!", + "Received msg to insert {} with uid {} into the player list but there \ + was already an entry for {} with the same uid that was overwritten!", player_info.player_alias, uid, old_player_info.player_alias ); } @@ -767,22 +808,22 @@ impl Client { player_info.is_admin = admin; } else { warn!( - "Received msg to update admin status of uid {}, but they were not \ - in the list.", + "Received msg to update admin status of uid {}, but they were not in \ + the list.", uid ); } }, ServerMsg::PlayerListUpdate(PlayerListUpdate::SelectedCharacter( - uid, - char_info, - )) => { + uid, + char_info, + )) => { if let Some(player_info) = self.player_list.get_mut(&uid) { player_info.character = Some(char_info); } else { warn!( - "Received msg to update character info for uid {}, but they were \ - not in the list.", + "Received msg to update character info for uid {}, but they were not \ + in the list.", uid ); } @@ -796,8 +837,8 @@ impl Client { }), None => { warn!( - "Received msg to update character level info to {} for \ - uid {}, but this player's character is None.", + "Received msg to update character level info to {} for uid \ + {}, but this player's character is None.", next_level, uid ); @@ -818,15 +859,15 @@ impl Client { player_info.is_online = false; } else { warn!( - "Received msg to remove uid {} from the player list by they \ - were already marked offline", + "Received msg to remove uid {} from the player list by they were \ + already marked offline", uid ); } } else { warn!( - "Received msg to remove uid {} from the player list by they \ - weren't in the list!", + "Received msg to remove uid {} from the player list by they weren't \ + in the list!", uid ); } @@ -836,19 +877,20 @@ impl Client { player_info.player_alias = new_name; } else { warn!( - "Received msg to alias player with uid {} to {} but this uid is \ - not in the player list", + "Received msg to alias player with uid {} to {} but this uid is not \ + in the player list", uid, new_name ); } }, - ServerMsg::Ping => {self.singleton_stream.send(ClientMsg::Pong)?;}, + ServerMsg::Ping => { + self.singleton_stream.send(ClientMsg::Pong)?; + }, ServerMsg::Pong => { self.last_server_pong = self.state.get_time(); - self.last_ping_delta = - (self.state.get_time() - self.last_server_ping).round(); + self.last_ping_delta = (self.state.get_time() - self.last_server_ping).round(); }, ServerMsg::ChatMsg(m) => frontend_events.push(Event::Chat(m)), ServerMsg::SetPlayerEntity(uid) => { @@ -943,7 +985,7 @@ impl Client { } } - /// Handle new server messages. + /// Handle new server messages. fn handle_new_messages(&mut self) -> Result, Error> { let mut frontend_events = Vec::new(); @@ -963,9 +1005,9 @@ impl Client { } } - let mut handles_msg = 0; + let mut handles_msg = 0; - block_on(async{ + block_on(async { //TIMEOUT 0.01 ms for msg handling select!( _ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()), @@ -1109,7 +1151,13 @@ impl Client { } impl Drop for Client { - fn drop(&mut self) { if let Err(e) = self.singleton_stream.send(ClientMsg::Disconnect) { - warn!("error during drop of client, couldn't send disconnect package, is the connection already closed? : {}", e); - } } + fn drop(&mut self) { + if let Err(e) = self.singleton_stream.send(ClientMsg::Disconnect) { + warn!( + "error during drop of client, couldn't send disconnect package, is the connection \ + already closed? : {}", + e + ); + } + } } diff --git a/common/src/lib.rs b/common/src/lib.rs index d3211b3abf..92c913e50f 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -39,4 +39,4 @@ pub mod util; pub mod vol; pub mod volumes; -pub use loadout_builder::LoadoutBuilder; \ No newline at end of file +pub use loadout_builder::LoadoutBuilder; diff --git a/server/src/client.rs b/server/src/client.rs index 0cdea63718..43803a42ae 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,8 +1,6 @@ -use common::{ - msg::{ClientState, RequestStateError, ServerMsg}, -}; -use network::Stream; +use common::msg::{ClientState, RequestStateError, ServerMsg}; use hashbrown::HashSet; +use network::Stream; use specs::{Component, FlaggedStorage}; use specs_idvs::IDVStorage; use vek::*; @@ -19,7 +17,9 @@ impl Component for Client { } impl Client { - pub fn notify(&mut self, msg: ServerMsg) { let _ = self.singleton_stream.lock().unwrap().send(msg); } + pub fn notify(&mut self, msg: ServerMsg) { + let _ = self.singleton_stream.lock().unwrap().send(msg); + } pub fn is_registered(&self) -> bool { match self.client_state { @@ -37,13 +37,19 @@ impl Client { pub fn allow_state(&mut self, new_state: ClientState) { self.client_state = new_state; - let _ = self.singleton_stream - .lock().unwrap().send(ServerMsg::StateAnswer(Ok(new_state))); + let _ = self + .singleton_stream + .lock() + .unwrap() + .send(ServerMsg::StateAnswer(Ok(new_state))); } pub fn error_state(&mut self, error: RequestStateError) { - let _ = self.singleton_stream - .lock().unwrap().send(ServerMsg::StateAnswer(Err((error, self.client_state)))); + let _ = self + .singleton_stream + .lock() + .unwrap() + .send(ServerMsg::StateAnswer(Err((error, self.client_state)))); } } diff --git a/server/src/lib.rs b/server/src/lib.rs index ffae562df9..753d2b1fb4 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -37,8 +37,11 @@ use common::{ terrain::TerrainChunkSize, vol::{ReadVol, RectVolSize}, }; -use network::{Network, Address, Pid}; +use futures_executor::block_on; +use futures_timer::Delay; +use futures_util::{select, FutureExt}; use metrics::{ServerMetrics, TickMetrics}; +use network::{Address, Network, Pid}; use persistence::character::{CharacterLoader, CharacterLoaderResponseType, CharacterUpdater}; use specs::{join::Join, Builder, Entity as EcsEntity, RunNow, SystemData, WorldExt}; use std::{ @@ -47,9 +50,6 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use futures_util::{select, FutureExt}; -use futures_executor::block_on; -use futures_timer::Delay; #[cfg(not(feature = "worldgen"))] use test_world::{World, WORLD_SIZE}; use tracing::{debug, error, info}; @@ -236,7 +236,9 @@ impl Server { .run(settings.metrics_address) .expect("Failed to initialize server metrics submodule."); - let thread_pool = ThreadPoolBuilder::new().name("veloren-worker".to_string()).build(); + let thread_pool = ThreadPoolBuilder::new() + .name("veloren-worker".to_string()) + .build(); let (network, f) = Network::new(Pid::new(), None); thread_pool.execute(f); block_on(network.listen(Address::Tcp(settings.gameserver_address)))?; @@ -340,7 +342,7 @@ impl Server { let before_new_connections = Instant::now(); // 3) Handle inputs from clients - block_on(async{ + block_on(async { //TIMEOUT 0.01 ms for msg handling select!( _ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()), @@ -589,7 +591,10 @@ impl Server { } /// Handle new client connections. - async fn handle_new_connections(&mut self, frontend_events: &mut Vec) -> Result<(), Error> { + async fn handle_new_connections( + &mut self, + frontend_events: &mut Vec, + ) -> Result<(), Error> { loop { let participant = self.network.connected().await?; let singleton_stream = participant.opened().await?; diff --git a/server/src/sys/message.rs b/server/src/sys/message.rs index 27ee12623d..b13fe3753e 100644 --- a/server/src/sys/message.rs +++ b/server/src/sys/message.rs @@ -18,16 +18,17 @@ use common::{ terrain::{Block, TerrainChunkSize, TerrainGrid}, vol::{RectVolSize, Vox}, }; +use futures_executor::block_on; +use futures_timer::Delay; +use futures_util::{select, FutureExt}; use hashbrown::HashMap; use specs::{ Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage, }; -use futures_util::{select, FutureExt}; -use futures_executor::block_on; -use futures_timer::Delay; impl Sys { - ///We need to move this to a async fn, otherwise the compiler generates to much recursive fn, and async closures dont work yet + ///We need to move this to a async fn, otherwise the compiler generates to + /// much recursive fn, and async closures dont work yet #[allow(clippy::too_many_arguments)] async fn handle_client_msg( server_emitter: &mut common::event::Emitter<'_, ServerEvent>, @@ -62,9 +63,7 @@ impl Sys { // Go back to registered state (char selection screen) ClientMsg::ExitIngame => match client.client_state { // Use ClientMsg::Register instead. - ClientState::Connected => { - client.error_state(RequestStateError::WrongMessage) - }, + ClientState::Connected => client.error_state(RequestStateError::WrongMessage), ClientState::Registered => client.error_state(RequestStateError::Already), ClientState::Spectator | ClientState::Character => { server_emitter.emit(ServerEvent::ExitIngame { entity }); @@ -94,8 +93,8 @@ impl Sys { Ok((username, uuid)) => (username, uuid), }; - let vd = view_distance - .map(|vd| vd.min(settings.max_view_distance.unwrap_or(vd))); + let vd = + view_distance.map(|vd| vd.min(settings.max_view_distance.unwrap_or(vd))); let player = Player::new(username.clone(), None, vd, uuid); let is_admin = admin_list.contains(&username); @@ -145,7 +144,8 @@ impl Sys { )); }; }, - ClientMsg::SetViewDistance(view_distance) => if let ClientState::Character { .. } = client.client_state { + ClientMsg::SetViewDistance(view_distance) => { + if let ClientState::Character { .. } = client.client_state { if settings .max_view_distance .map(|max| view_distance <= max) @@ -164,6 +164,7 @@ impl Sys { settings.max_view_distance.unwrap_or(0), )); } + } }, ClientMsg::Character(character_id) => match client.client_state { // Become Registered first. @@ -171,8 +172,7 @@ impl Sys { ClientState::Registered | ClientState::Spectator => { // Only send login message if it wasn't already // sent previously - if let (Some(player), false) = - (players.get(entity), client.login_msg_sent) + if let (Some(player), false) = (players.get(entity), client.login_msg_sent) { // Send a request to load the character's component data from the // DB. Once loaded, persisted components such as stats and inventory @@ -218,9 +218,7 @@ impl Sys { ClientState::Pending => {}, }, ClientMsg::ControllerInputs(inputs) => match client.client_state { - ClientState::Connected - | ClientState::Registered - | ClientState::Spectator => { + ClientState::Connected | ClientState::Registered | ClientState::Spectator => { client.error_state(RequestStateError::Impossible) }, ClientState::Character => { @@ -231,9 +229,7 @@ impl Sys { ClientState::Pending => {}, }, ClientMsg::ControlEvent(event) => match client.client_state { - ClientState::Connected - | ClientState::Registered - | ClientState::Spectator => { + ClientState::Connected | ClientState::Registered | ClientState::Spectator => { client.error_state(RequestStateError::Impossible) }, ClientState::Character => { @@ -250,9 +246,7 @@ impl Sys { ClientState::Pending => {}, }, ClientMsg::ControlAction(event) => match client.client_state { - ClientState::Connected - | ClientState::Registered - | ClientState::Spectator => { + ClientState::Connected | ClientState::Registered | ClientState::Spectator => { client.error_state(RequestStateError::Impossible) }, ClientState::Character => { @@ -264,27 +258,27 @@ impl Sys { }, ClientMsg::ChatMsg(message) => match client.client_state { ClientState::Connected => client.error_state(RequestStateError::Impossible), - ClientState::Registered - | ClientState::Spectator - | ClientState::Character => match validate_chat_msg(&message) { - Ok(()) => { - if let Some(from) = uids.get(entity) { - let mode = chat_modes.get(entity).cloned().unwrap_or_default(); - let msg = mode.new_message(*from, message); - new_chat_msgs.push((Some(entity), msg)); - } else { - tracing::error!("Could not send message. Missing player uid"); - } - }, - Err(ChatMsgValidationError::TooLong) => { - let max = MAX_BYTES_CHAT_MSG; - let len = message.len(); - tracing::warn!( + ClientState::Registered | ClientState::Spectator | ClientState::Character => { + match validate_chat_msg(&message) { + Ok(()) => { + if let Some(from) = uids.get(entity) { + let mode = chat_modes.get(entity).cloned().unwrap_or_default(); + let msg = mode.new_message(*from, message); + new_chat_msgs.push((Some(entity), msg)); + } else { + tracing::error!("Could not send message. Missing player uid"); + } + }, + Err(ChatMsgValidationError::TooLong) => { + let max = MAX_BYTES_CHAT_MSG; + let len = message.len(); + tracing::warn!( ?len, ?max, "Recieved a chat message that's too long" ) - }, + }, + } }, ClientState::Pending => {}, }, @@ -323,22 +317,17 @@ impl Sys { pos.0.xy().map(|e| e as f64).distance( key.map(|e| e as f64 + 0.5) * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), - ) < (view_distance as f64 + 1.5) - * TerrainChunkSize::RECT_SIZE.x as f64 + ) < (view_distance as f64 + 1.5) * TerrainChunkSize::RECT_SIZE.x as f64 } else { true }; if in_vd { match terrain.get_key(key) { - Some(chunk) => { - client.notify(ServerMsg::TerrainChunkUpdate { - key, - chunk: Ok(Box::new(chunk.clone())), - }) - }, - None => { - server_emitter.emit(ServerEvent::ChunkRequest(entity, key)) - }, + Some(chunk) => client.notify(ServerMsg::TerrainChunkUpdate { + key, + chunk: Ok(Box::new(chunk.clone())), + }), + None => server_emitter.emit(ServerEvent::ChunkRequest(entity, key)), } } }, @@ -469,7 +458,7 @@ impl<'a> System<'a> for Sys { for (entity, client) in (&entities, &mut clients).join() { let mut cnt = 0; - let network_err: Result<(), crate::error::Error> = block_on(async{ + let network_err: Result<(), crate::error::Error> = block_on(async { //TIMEOUT 0.01 ms for msg handling select!( _ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()), diff --git a/server/src/sys/waypoint.rs b/server/src/sys/waypoint.rs index 7ee8ff1253..9129af814a 100644 --- a/server/src/sys/waypoint.rs +++ b/server/src/sys/waypoint.rs @@ -40,8 +40,7 @@ impl<'a> System<'a> for Sys { if let Ok(wp_old) = waypoints.insert(entity, Waypoint::new(player_pos.0, *time)) { if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) { - client - .notify(ServerMsg::Notification(Notification::WaypointSaved)); + client.notify(ServerMsg::Notification(Notification::WaypointSaved)); } } } diff --git a/voxygen/src/menu/main/client_init.rs b/voxygen/src/menu/main/client_init.rs index 77d4f2890e..0bc8fc29f3 100644 --- a/voxygen/src/menu/main/client_init.rs +++ b/voxygen/src/menu/main/client_init.rs @@ -1,4 +1,7 @@ -use client::{error::Error as ClientError, Client, error::NetworkError}; +use client::{ + error::{Error as ClientError, NetworkError}, + Client, +}; use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError}; use std::{ net::ToSocketAddrs, @@ -97,7 +100,8 @@ impl ClientInit { }, Err(err) => { match err { - ClientError::NetworkErr(NetworkError::ListenFailed(..)) => {}, + ClientError::NetworkErr(NetworkError::ListenFailed(..)) => { + }, // Non-connection error, stop attempts err => { last_err = Some(Error::ClientError(err));