diff --git a/Cargo.lock b/Cargo.lock index 97153bac97..b2d47b3ea8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,7 +163,7 @@ dependencies = [ "crossbeam-utils 0.7.2", "futures-core", "futures-io", - "futures-timer", + "futures-timer 2.0.2", "kv-log-macro", "log", "memchr", @@ -1370,6 +1370,12 @@ version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1de7508b218029b0f01662ed8f61b1c964b3ae99d6f25462d0f55a595109df6" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.5" @@ -4448,6 +4454,9 @@ version = "0.6.0" dependencies = [ "authc", "byteorder 1.3.4", + "futures-executor", + "futures-timer 3.0.2", + "futures-util", "hashbrown", "image", "num_cpus", @@ -4456,6 +4465,7 @@ dependencies = [ "uvth 3.1.1", "vek", "veloren-common", + "veloren_network", ] [[package]] @@ -4499,6 +4509,9 @@ dependencies = [ "diesel", "diesel_migrations", "dotenv", + "futures-executor", + "futures-timer 3.0.2", + "futures-util", "hashbrown", "lazy_static", "libsqlite3-sys", @@ -4519,6 +4532,7 @@ dependencies = [ "vek", "veloren-common", "veloren-world", + "veloren_network", ] [[package]] diff --git a/client/Cargo.toml b/client/Cargo.toml index 73fe3dc06e..7a3c36edfa 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -6,9 +6,13 @@ edition = "2018" [dependencies] common = { package = "veloren-common", path = "../common", features = ["no-assets"] } +network = { package = "veloren_network", path = "../network", default-features = false } byteorder = "1.3.2" uvth = "3.1.1" +futures-util = "0.3" +futures-executor = "0.3" +futures-timer = "3.0" image = { version = "0.22.3", default-features = false, features = ["png"] } num_cpus = "1.10.1" tracing = { version = "0.1", default-features = false } diff --git a/client/src/error.rs b/client/src/error.rs index 873b1ce2e8..ae7b1725aa 100644 --- a/client/src/error.rs +++ b/client/src/error.rs @@ -1,9 +1,12 @@ use authc::AuthClientError; -use common::net::PostError; +pub use network::NetworkError; +use network::{ParticipantError, StreamError}; #[derive(Debug)] pub enum Error { - Network(PostError), + NetworkErr(NetworkError), + ParticipantErr(ParticipantError), + StreamErr(StreamError), ServerWentMad, ServerTimeout, ServerShutdown, @@ -19,8 +22,16 @@ pub enum Error { Other(String), } -impl From for Error { - fn from(err: PostError) -> Self { Self::Network(err) } +impl From for Error { + fn from(err: NetworkError) -> Self { Self::NetworkErr(err) } +} + +impl From for Error { + fn from(err: ParticipantError) -> Self { Self::ParticipantErr(err) } +} + +impl From for Error { + fn from(err: StreamError) -> Self { Self::StreamErr(err) } } impl From for Error { diff --git a/client/src/lib.rs b/client/src/lib.rs index 64a75024f3..329faaa51a 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -25,14 +25,17 @@ use common::{ PlayerInfo, PlayerListUpdate, RegisterError, RequestStateError, ServerInfo, ServerMsg, MAX_BYTES_CHAT_MSG, }, - net::PostBox, state::State, sync::{Uid, UidAllocator, WorldSyncExt}, terrain::{block::Block, TerrainChunk, TerrainChunkSize}, vol::RectVolSize, }; +use futures_executor::block_on; +use futures_timer::Delay; +use futures_util::{select, FutureExt}; use hashbrown::HashMap; use image::DynamicImage; +use network::{Address, Network, Participant, Pid, Stream, PROMISES_CONSISTENCY, PROMISES_ORDERED}; use std::{ net::SocketAddr, sync::Arc, @@ -69,7 +72,9 @@ pub struct Client { pub character_list: CharacterList, pub active_character_id: Option, - postbox: PostBox, + _network: Network, + _participant: Arc, + singleton_stream: Stream, last_server_ping: f64, last_server_pong: f64, @@ -99,64 +104,82 @@ impl Client { /// Create a new `Client`. pub fn new>(addr: A, view_distance: Option) -> Result { let client_state = ClientState::Connected; - let mut postbox = PostBox::to(addr)?; + + let mut thread_pool = ThreadPoolBuilder::new() + .name("veloren-worker".into()) + .build(); + // We reduce the thread count by 1 to keep rendering smooth + thread_pool.set_num_threads((num_cpus::get() - 1).max(1)); + + let (network, f) = Network::new(Pid::new(), None); + thread_pool.execute(f); + + let participant = block_on(network.connect(Address::Tcp(addr.into())))?; + let mut stream = block_on(participant.open(10, PROMISES_ORDERED | PROMISES_CONSISTENCY))?; // Wait for initial sync - let (state, entity, server_info, world_map) = match postbox.next_message()? { - ServerMsg::InitialSync { - entity_package, - server_info, - time_of_day, - world_map: (map_size, world_map), - } => { - // TODO: Display that versions don't match in Voxygen - if &server_info.git_hash != *common::util::GIT_HASH { - warn!( - "Server is running {}[{}], you are running {}[{}], versions might be \ - incompatible!", - server_info.git_hash, - server_info.git_date, - common::util::GIT_HASH.to_string(), - common::util::GIT_DATE.to_string(), - ); + let (state, entity, server_info, world_map) = block_on(async { + loop { + match stream.recv().await? { + ServerMsg::InitialSync { + entity_package, + server_info, + time_of_day, + world_map: (map_size, world_map), + } => { + // TODO: Display that versions don't match in Voxygen + if &server_info.git_hash != *common::util::GIT_HASH { + warn!( + "Server is running {}[{}], you are running {}[{}], versions might \ + be incompatible!", + server_info.git_hash, + server_info.git_date, + common::util::GIT_HASH.to_string(), + common::util::GIT_DATE.to_string(), + ); + } + + debug!("Auth Server: {:?}", server_info.auth_provider); + + // Initialize `State` + let mut state = State::default(); + // Client-only components + state + .ecs_mut() + .register::>(); + + let entity = state.ecs_mut().apply_entity_package(entity_package); + *state.ecs_mut().write_resource() = time_of_day; + + assert_eq!(world_map.len(), (map_size.x * map_size.y) as usize); + let mut world_map_raw = + vec![0u8; 4 * world_map.len()/*map_size.x * map_size.y*/]; + LittleEndian::write_u32_into(&world_map, &mut world_map_raw); + debug!("Preparing image..."); + let world_map = Arc::new( + image::DynamicImage::ImageRgba8({ + // Should not fail if the dimensions are correct. + let world_map = + image::ImageBuffer::from_raw(map_size.x, map_size.y, world_map_raw); + world_map.ok_or_else(|| Error::Other("Server sent a bad world map image".into()))? + }) + // Flip the image, since Voxygen uses an orientation where rotation from + // positive x axis to positive y axis is counterclockwise around the z axis. + .flipv(), + ); + debug!("Done preparing image..."); + + break Ok((state, entity, server_info, (world_map, map_size))); + }, + ServerMsg::TooManyPlayers => break Err(Error::TooManyPlayers), + err => { + warn!("whoops, server mad {:?}, ignoring", err); + }, } + } + })?; - debug!("Auth Server: {:?}", server_info.auth_provider); - - // Initialize `State` - let mut state = State::default(); - // Client-only components - state - .ecs_mut() - .register::>(); - - let entity = state.ecs_mut().apply_entity_package(entity_package); - *state.ecs_mut().write_resource() = time_of_day; - - assert_eq!(world_map.len(), (map_size.x * map_size.y) as usize); - let mut world_map_raw = vec![0u8; 4 * world_map.len()/*map_size.x * map_size.y*/]; - LittleEndian::write_u32_into(&world_map, &mut world_map_raw); - debug!("Preparing image..."); - let world_map = Arc::new( - image::DynamicImage::ImageRgba8({ - // Should not fail if the dimensions are correct. - let world_map = - image::ImageBuffer::from_raw(map_size.x, map_size.y, world_map_raw); - world_map.ok_or_else(|| Error::Other("Server sent a bad world map image".into()))? - }) - // Flip the image, since Voxygen uses an orientation where rotation from - // positive x axis to positive y axis is counterclockwise around the z axis. - .flipv(), - ); - debug!("Done preparing image..."); - - (state, entity, server_info, (world_map, map_size)) - }, - ServerMsg::TooManyPlayers => return Err(Error::TooManyPlayers), - _ => return Err(Error::ServerWentMad), - }; - - postbox.send_message(ClientMsg::Ping); + stream.send(ClientMsg::Ping)?; let mut thread_pool = ThreadPoolBuilder::new() .name("veloren-worker".into()) @@ -173,7 +196,9 @@ impl Client { character_list: CharacterList::default(), active_character_id: None, - postbox, + _network: network, + _participant: participant, + singleton_stream: stream, last_server_ping: 0.0, last_server_pong: 0.0, @@ -213,33 +238,45 @@ impl Client { } ).unwrap_or(Ok(username))?; - self.postbox.send_message(ClientMsg::Register { + self.singleton_stream.send(ClientMsg::Register { view_distance: self.view_distance, token_or_username, - }); + })?; self.client_state = ClientState::Pending; - loop { - match self.postbox.next_message()? { - ServerMsg::StateAnswer(Err((RequestStateError::RegisterDenied(err), state))) => { - self.client_state = state; - break Err(match err { - RegisterError::AlreadyLoggedIn => Error::AlreadyLoggedIn, - RegisterError::AuthError(err) => Error::AuthErr(err), - RegisterError::InvalidCharacter => Error::InvalidCharacter, - RegisterError::NotOnWhitelist => Error::NotOnWhitelist, - }); - }, - ServerMsg::StateAnswer(Ok(ClientState::Registered)) => break Ok(()), - _ => {}, + block_on(async { + loop { + match self.singleton_stream.recv().await? { + ServerMsg::StateAnswer(Err(( + RequestStateError::RegisterDenied(err), + state, + ))) => { + self.client_state = state; + break Err(match err { + RegisterError::AlreadyLoggedIn => Error::AlreadyLoggedIn, + RegisterError::AuthError(err) => Error::AuthErr(err), + RegisterError::InvalidCharacter => Error::InvalidCharacter, + RegisterError::NotOnWhitelist => Error::NotOnWhitelist, + }); + }, + ServerMsg::StateAnswer(Ok(ClientState::Registered)) => break Ok(()), + ignore => { + warn!( + "Ignoring what the server send till registered: {:? }", + ignore + ); + //return Err(Error::ServerWentMad) + }, + } } - } + }) } /// Request a state transition to `ClientState::Character`. pub fn request_character(&mut self, character_id: i32) { - self.postbox - .send_message(ClientMsg::Character(character_id)); + self.singleton_stream + .send(ClientMsg::Character(character_id)) + .unwrap(); self.active_character_id = Some(character_id); self.client_state = ClientState::Pending; @@ -248,73 +285,90 @@ impl Client { /// Load the current players character list pub fn load_character_list(&mut self) { self.character_list.loading = true; - self.postbox.send_message(ClientMsg::RequestCharacterList); + self.singleton_stream + .send(ClientMsg::RequestCharacterList) + .unwrap(); } /// New character creation pub fn create_character(&mut self, alias: String, tool: Option, body: comp::Body) { self.character_list.loading = true; - self.postbox - .send_message(ClientMsg::CreateCharacter { alias, tool, body }); + self.singleton_stream + .send(ClientMsg::CreateCharacter { alias, tool, body }) + .unwrap(); } /// Character deletion pub fn delete_character(&mut self, character_id: i32) { self.character_list.loading = true; - self.postbox - .send_message(ClientMsg::DeleteCharacter(character_id)); + self.singleton_stream + .send(ClientMsg::DeleteCharacter(character_id)) + .unwrap(); } /// Send disconnect message to the server - pub fn request_logout(&mut self) { self.postbox.send_message(ClientMsg::Disconnect); } + pub fn request_logout(&mut self) { + if let Err(e) = self.singleton_stream.send(ClientMsg::Disconnect) { + error!( + ?e, + "couldn't send disconnect package to server, did server close already?" + ); + } + } /// Request a state transition to `ClientState::Registered` from an ingame /// state. pub fn request_remove_character(&mut self) { - self.postbox.send_message(ClientMsg::ExitIngame); + self.singleton_stream.send(ClientMsg::ExitIngame).unwrap(); self.client_state = ClientState::Pending; } pub fn set_view_distance(&mut self, view_distance: u32) { self.view_distance = Some(view_distance.max(1).min(65)); - self.postbox - .send_message(ClientMsg::SetViewDistance(self.view_distance.unwrap())); + self.singleton_stream + .send(ClientMsg::SetViewDistance(self.view_distance.unwrap())) + .unwrap(); // Can't fail } pub fn use_slot(&mut self, slot: comp::slot::Slot) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Use(slot), - ))); + ))) + .unwrap(); } pub fn swap_slots(&mut self, a: comp::slot::Slot, b: comp::slot::Slot) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Swap(a, b), - ))); + ))) + .unwrap(); } pub fn drop_slot(&mut self, slot: comp::slot::Slot) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Drop(slot), - ))); + ))) + .unwrap(); } pub fn pick_up(&mut self, entity: EcsEntity) { if let Some(uid) = self.state.ecs().read_storage::().get(entity).copied() { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Pickup(uid), - ))); + ))) + .unwrap(); } } pub fn toggle_lantern(&mut self) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::ToggleLantern)); + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::ToggleLantern)) + .unwrap(); } pub fn is_mounted(&self) -> bool { @@ -327,14 +381,16 @@ impl Client { pub fn mount(&mut self, entity: EcsEntity) { if let Some(uid) = self.state.ecs().read_storage::().get(entity).copied() { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::Mount(uid))); + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::Mount(uid))) + .unwrap(); } } pub fn unmount(&mut self) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::Unmount)); + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::Unmount)) + .unwrap(); } pub fn respawn(&mut self) { @@ -345,8 +401,9 @@ impl Client { .get(self.entity) .map_or(false, |s| s.is_dead) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::Respawn)); + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::Respawn)) + .unwrap(); } } @@ -428,8 +485,9 @@ impl Client { { controller.actions.push(control_action); } - self.postbox - .send_message(ClientMsg::ControlAction(control_action)); + self.singleton_stream + .send(ClientMsg::ControlAction(control_action)) + .unwrap(); } pub fn view_distance(&self) -> Option { self.view_distance } @@ -458,7 +516,10 @@ impl Client { /// Send a chat message to the server. pub fn send_chat(&mut self, message: String) { match validate_chat_msg(&message) { - Ok(()) => self.postbox.send_message(ClientMsg::ChatMsg(message)), + Ok(()) => self + .singleton_stream + .send(ClientMsg::ChatMsg(message)) + .unwrap(), Err(ChatMsgValidationError::TooLong) => tracing::warn!( "Attempted to send a message that's too long (Over {} bytes)", MAX_BYTES_CHAT_MSG @@ -473,18 +534,23 @@ impl Client { } pub fn place_block(&mut self, pos: Vec3, block: Block) { - self.postbox.send_message(ClientMsg::PlaceBlock(pos, block)); + self.singleton_stream + .send(ClientMsg::PlaceBlock(pos, block)) + .unwrap(); } pub fn remove_block(&mut self, pos: Vec3) { - self.postbox.send_message(ClientMsg::BreakBlock(pos)); + self.singleton_stream + .send(ClientMsg::BreakBlock(pos)) + .unwrap(); } pub fn collect_block(&mut self, pos: Vec3) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Collect(pos), - ))); + ))) + .unwrap(); } /// Execute a single client tick, handle input and update the game state by @@ -539,8 +605,9 @@ impl Client { "Couldn't access controller component on client entity" ); } - self.postbox - .send_message(ClientMsg::ControllerInputs(inputs)); + self.singleton_stream + .send(ClientMsg::ControllerInputs(inputs)) + .unwrap(); } // 2) Build up a list of events for this frame, to be passed to the frontend. @@ -637,8 +704,8 @@ impl Client { if self.state.terrain().get_key(*key).is_none() { if !skip_mode && !self.pending_chunks.contains_key(key) { if self.pending_chunks.len() < 4 { - self.postbox - .send_message(ClientMsg::TerrainChunkRequest { key: *key }); + self.singleton_stream + .send(ClientMsg::TerrainChunkRequest { key: *key })?; self.pending_chunks.insert(*key, Instant::now()); } else { skip_mode = true; @@ -670,7 +737,7 @@ impl Client { // Send a ping to the server once every second if self.state.get_time() - self.last_server_ping > 1. { - self.postbox.send_message(ClientMsg::Ping); + self.singleton_stream.send(ClientMsg::Ping)?; self.last_server_ping = self.state.get_time(); } @@ -681,8 +748,8 @@ impl Client { self.state.read_storage().get(self.entity).cloned(), self.state.read_storage().get(self.entity).cloned(), ) { - self.postbox - .send_message(ClientMsg::PlayerPhysics { pos, vel, ori }); + self.singleton_stream + .send(ClientMsg::PlayerPhysics { pos, vel, ori })?; } } @@ -709,8 +776,216 @@ impl Client { self.state.cleanup(); } - /// Handle new server messages. + async fn handle_message( + &mut self, + frontend_events: &mut Vec, + cnt: &mut u64, + ) -> Result<(), Error> { + loop { + let msg = self.singleton_stream.recv().await?; + *cnt += 1; + match msg { + ServerMsg::TooManyPlayers => { + return Err(Error::ServerWentMad); + }, + ServerMsg::Shutdown => return Err(Error::ServerShutdown), + ServerMsg::InitialSync { .. } => return Err(Error::ServerWentMad), + ServerMsg::PlayerListUpdate(PlayerListUpdate::Init(list)) => { + self.player_list = list + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::Add(uid, player_info)) => { + if let Some(old_player_info) = self.player_list.insert(uid, player_info.clone()) + { + warn!( + "Received msg to insert {} with uid {} into the player list but there \ + was already an entry for {} with the same uid that was overwritten!", + player_info.player_alias, uid, old_player_info.player_alias + ); + } + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::Admin(uid, admin)) => { + if let Some(player_info) = self.player_list.get_mut(&uid) { + player_info.is_admin = admin; + } else { + warn!( + "Received msg to update admin status of uid {}, but they were not in \ + the list.", + uid + ); + } + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::SelectedCharacter( + uid, + char_info, + )) => { + if let Some(player_info) = self.player_list.get_mut(&uid) { + player_info.character = Some(char_info); + } else { + warn!( + "Received msg to update character info for uid {}, but they were not \ + in the list.", + uid + ); + } + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::LevelChange(uid, next_level)) => { + if let Some(player_info) = self.player_list.get_mut(&uid) { + player_info.character = match &player_info.character { + Some(character) => Some(common::msg::CharacterInfo { + name: character.name.to_string(), + level: next_level, + }), + None => { + warn!( + "Received msg to update character level info to {} for uid \ + {}, but this player's character is None.", + next_level, uid + ); + None + }, + }; + } + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::Remove(uid)) => { + // Instead of removing players, mark them as offline because we need to + // remember the names of disconnected players in chat. + // + // TODO the server should re-use uids of players that log out and log back + // in. + + if let Some(player_info) = self.player_list.get_mut(&uid) { + if player_info.is_online { + player_info.is_online = false; + } else { + warn!( + "Received msg to remove uid {} from the player list by they were \ + already marked offline", + uid + ); + } + } else { + warn!( + "Received msg to remove uid {} from the player list by they weren't \ + in the list!", + uid + ); + } + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::Alias(uid, new_name)) => { + if let Some(player_info) = self.player_list.get_mut(&uid) { + player_info.player_alias = new_name; + } else { + warn!( + "Received msg to alias player with uid {} to {} but this uid is not \ + in the player list", + uid, new_name + ); + } + }, + + ServerMsg::Ping => { + self.singleton_stream.send(ClientMsg::Pong)?; + }, + ServerMsg::Pong => { + self.last_server_pong = self.state.get_time(); + + self.last_ping_delta = (self.state.get_time() - self.last_server_ping).round(); + }, + ServerMsg::ChatMsg(m) => frontend_events.push(Event::Chat(m)), + ServerMsg::SetPlayerEntity(uid) => { + if let Some(entity) = self.state.ecs().entity_from_uid(uid.0) { + self.entity = entity; + } else { + return Err(Error::Other("Failed to find entity from uid.".to_owned())); + } + }, + ServerMsg::TimeOfDay(time_of_day) => { + *self.state.ecs_mut().write_resource() = time_of_day; + }, + ServerMsg::EntitySync(entity_sync_package) => { + self.state + .ecs_mut() + .apply_entity_sync_package(entity_sync_package); + }, + ServerMsg::CompSync(comp_sync_package) => { + self.state + .ecs_mut() + .apply_comp_sync_package(comp_sync_package); + }, + ServerMsg::CreateEntity(entity_package) => { + self.state.ecs_mut().apply_entity_package(entity_package); + }, + ServerMsg::DeleteEntity(entity) => { + if self.state.read_component_cloned::(self.entity) != Some(entity) { + self.state + .ecs_mut() + .delete_entity_and_clear_from_uid_allocator(entity.0); + } + }, + // Cleanup for when the client goes back to the `Registered` state + ServerMsg::ExitIngameCleanup => { + self.clean_state(); + }, + ServerMsg::InventoryUpdate(inventory, event) => { + match event { + InventoryUpdateEvent::CollectFailed => {}, + _ => { + // Push the updated inventory component to the client + self.state.write_component(self.entity, inventory); + }, + } + + frontend_events.push(Event::InventoryUpdated(event)); + }, + ServerMsg::TerrainChunkUpdate { key, chunk } => { + if let Ok(chunk) = chunk { + self.state.insert_chunk(key, *chunk); + } + self.pending_chunks.remove(&key); + }, + ServerMsg::TerrainBlockUpdates(mut blocks) => { + blocks.drain().for_each(|(pos, block)| { + self.state.set_block(pos, block); + }); + }, + ServerMsg::StateAnswer(Ok(state)) => { + self.client_state = state; + }, + ServerMsg::StateAnswer(Err((error, state))) => { + warn!( + "StateAnswer: {:?}. Server thinks client is in state {:?}.", + error, state + ); + }, + ServerMsg::Disconnect => { + frontend_events.push(Event::Disconnect); + self.singleton_stream.send(ClientMsg::Terminate)?; + }, + ServerMsg::CharacterListUpdate(character_list) => { + self.character_list.characters = character_list; + self.character_list.loading = false; + }, + ServerMsg::CharacterActionError(error) => { + warn!("CharacterActionError: {:?}.", error); + self.character_list.error = Some(error); + }, + ServerMsg::Notification(n) => { + frontend_events.push(Event::Notification(n)); + }, + ServerMsg::CharacterDataLoadError(error) => { + self.clean_state(); + self.character_list.error = Some(error); + }, + ServerMsg::SetViewDistance(vd) => { + self.view_distance = Some(vd); + frontend_events.push(Event::SetViewDistance(vd)); + }, + } + } + } + + /// Handle new server messages. fn handle_new_messages(&mut self) -> Result, Error> { let mut frontend_events = Vec::new(); @@ -730,216 +1005,20 @@ impl Client { } } - let new_msgs = self.postbox.new_messages(); + let mut handles_msg = 0; - if new_msgs.len() > 0 { - for msg in new_msgs { - match msg { - ServerMsg::TooManyPlayers => { - return Err(Error::ServerWentMad); - }, - ServerMsg::Shutdown => return Err(Error::ServerShutdown), - ServerMsg::InitialSync { .. } => return Err(Error::ServerWentMad), - ServerMsg::PlayerListUpdate(PlayerListUpdate::Init(list)) => { - self.player_list = list - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::Add(uid, player_info)) => { - if let Some(old_player_info) = - self.player_list.insert(uid, player_info.clone()) - { - warn!( - "Received msg to insert {} with uid {} into the player list but \ - there was already an entry for {} with the same uid that was \ - overwritten!", - player_info.player_alias, uid, old_player_info.player_alias - ); - } - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::Admin(uid, admin)) => { - if let Some(player_info) = self.player_list.get_mut(&uid) { - player_info.is_admin = admin; - } else { - warn!( - "Received msg to update admin status of uid {}, but they were not \ - in the list.", - uid - ); - } - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::SelectedCharacter( - uid, - char_info, - )) => { - if let Some(player_info) = self.player_list.get_mut(&uid) { - player_info.character = Some(char_info); - } else { - warn!( - "Received msg to update character info for uid {}, but they were \ - not in the list.", - uid - ); - } - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::LevelChange(uid, next_level)) => { - if let Some(player_info) = self.player_list.get_mut(&uid) { - player_info.character = match &player_info.character { - Some(character) => Some(common::msg::CharacterInfo { - name: character.name.to_string(), - level: next_level, - }), - None => { - warn!( - "Received msg to update character level info to {} for \ - uid {}, but this player's character is None.", - next_level, uid - ); + block_on(async { + //TIMEOUT 0.01 ms for msg handling + select!( + _ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()), + err = self.handle_message(&mut frontend_events, &mut handles_msg).fuse() => err, + ) + })?; - None - }, - }; - } - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::Remove(uid)) => { - // Instead of removing players, mark them as offline because we need to - // remember the names of disconnected players in chat. - // - // TODO the server should re-use uids of players that log out and log back - // in. - - if let Some(player_info) = self.player_list.get_mut(&uid) { - if player_info.is_online { - player_info.is_online = false; - } else { - warn!( - "Received msg to remove uid {} from the player list by they \ - were already marked offline", - uid - ); - } - } else { - warn!( - "Received msg to remove uid {} from the player list by they \ - weren't in the list!", - uid - ); - } - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::Alias(uid, new_name)) => { - if let Some(player_info) = self.player_list.get_mut(&uid) { - player_info.player_alias = new_name; - } else { - warn!( - "Received msg to alias player with uid {} to {} but this uid is \ - not in the player list", - uid, new_name - ); - } - }, - - ServerMsg::Ping => self.postbox.send_message(ClientMsg::Pong), - ServerMsg::Pong => { - self.last_server_pong = self.state.get_time(); - - self.last_ping_delta = - (self.state.get_time() - self.last_server_ping).round(); - }, - ServerMsg::ChatMsg(m) => frontend_events.push(Event::Chat(m)), - ServerMsg::SetPlayerEntity(uid) => { - if let Some(entity) = self.state.ecs().entity_from_uid(uid.0) { - self.entity = entity; - } else { - return Err(Error::Other("Failed to find entity from uid.".to_owned())); - } - }, - ServerMsg::TimeOfDay(time_of_day) => { - *self.state.ecs_mut().write_resource() = time_of_day; - }, - ServerMsg::EntitySync(entity_sync_package) => { - self.state - .ecs_mut() - .apply_entity_sync_package(entity_sync_package); - }, - ServerMsg::CompSync(comp_sync_package) => { - self.state - .ecs_mut() - .apply_comp_sync_package(comp_sync_package); - }, - ServerMsg::CreateEntity(entity_package) => { - self.state.ecs_mut().apply_entity_package(entity_package); - }, - ServerMsg::DeleteEntity(entity) => { - if self.state.read_component_cloned::(self.entity) != Some(entity) { - self.state - .ecs_mut() - .delete_entity_and_clear_from_uid_allocator(entity.0); - } - }, - // Cleanup for when the client goes back to the `Registered` state - ServerMsg::ExitIngameCleanup => { - self.clean_state(); - }, - ServerMsg::InventoryUpdate(inventory, event) => { - match event { - InventoryUpdateEvent::CollectFailed => {}, - _ => { - // Push the updated inventory component to the client - self.state.write_component(self.entity, inventory); - }, - } - - frontend_events.push(Event::InventoryUpdated(event)); - }, - ServerMsg::TerrainChunkUpdate { key, chunk } => { - if let Ok(chunk) = chunk { - self.state.insert_chunk(key, *chunk); - } - self.pending_chunks.remove(&key); - }, - ServerMsg::TerrainBlockUpdates(mut blocks) => { - blocks.drain().for_each(|(pos, block)| { - self.state.set_block(pos, block); - }); - }, - ServerMsg::StateAnswer(Ok(state)) => { - self.client_state = state; - }, - ServerMsg::StateAnswer(Err((error, state))) => { - warn!( - "StateAnswer: {:?}. Server thinks client is in state {:?}.", - error, state - ); - }, - ServerMsg::Disconnect => { - frontend_events.push(Event::Disconnect); - self.postbox.send_message(ClientMsg::Terminate); - }, - ServerMsg::CharacterListUpdate(character_list) => { - self.character_list.characters = character_list; - self.character_list.loading = false; - }, - ServerMsg::CharacterActionError(error) => { - warn!("CharacterActionError: {:?}.", error); - self.character_list.error = Some(error); - }, - ServerMsg::Notification(n) => { - frontend_events.push(Event::Notification(n)); - }, - ServerMsg::CharacterDataLoadError(error) => { - self.clean_state(); - self.character_list.error = Some(error); - }, - ServerMsg::SetViewDistance(vd) => { - self.view_distance = Some(vd); - frontend_events.push(Event::SetViewDistance(vd)); - }, - } - } - } else if let Some(err) = self.postbox.error() { - return Err(err.into()); - // We regularily ping in the tick method - } else if self.state.get_time() - self.last_server_pong > SERVER_TIMEOUT { + if handles_msg == 0 && self.state.get_time() - self.last_server_pong > SERVER_TIMEOUT { return Err(Error::ServerTimeout); } + Ok(frontend_events) } @@ -1072,5 +1151,13 @@ impl Client { } impl Drop for Client { - fn drop(&mut self) { self.postbox.send_message(ClientMsg::Disconnect); } + fn drop(&mut self) { + if let Err(e) = self.singleton_stream.send(ClientMsg::Disconnect) { + warn!( + "error during drop of client, couldn't send disconnect package, is the connection \ + already closed? : {}", + e + ); + } + } } diff --git a/common/src/lib.rs b/common/src/lib.rs index a53cba2716..92c913e50f 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -40,29 +40,3 @@ pub mod vol; pub mod volumes; pub use loadout_builder::LoadoutBuilder; - -/// The networking module containing high-level wrappers of `TcpListener` and -/// `TcpStream` (`PostOffice` and `PostBox` respectively) and data types used by -/// both the server and client. # Examples -/// ``` -/// use std::net::SocketAddr; -/// use veloren_common::net::{PostBox, PostOffice}; -/// -/// let listen_addr = SocketAddr::from(([0, 0, 0, 0], 12345u16)); -/// let conn_addr = SocketAddr::from(([127, 0, 0, 1], 12345u16)); -/// -/// let mut server: PostOffice = PostOffice::bind(listen_addr).unwrap(); -/// let mut client: PostBox = PostBox::to(conn_addr).unwrap(); -/// std::thread::sleep(std::time::Duration::from_millis(100)); -/// -/// let mut scon = server.new_postboxes().next().unwrap(); -/// std::thread::sleep(std::time::Duration::from_millis(100)); -/// -/// scon.send_message(String::from("foo")); -/// client.send_message(String::from("bar")); -/// std::thread::sleep(std::time::Duration::from_millis(100)); -/// -/// assert_eq!("foo", client.next_message().unwrap()); -/// assert_eq!("bar", scon.next_message().unwrap()); -/// ``` -pub mod net; diff --git a/common/src/net/data.rs b/common/src/net/data.rs deleted file mode 100644 index ff16b1fe94..0000000000 --- a/common/src/net/data.rs +++ /dev/null @@ -1,19 +0,0 @@ -/// Messages server sends to client. -#[derive(Deserialize, Serialize, Debug)] -pub enum ServerMsg { - // VersionInfo MUST always stay first in this struct. - VersionInfo {}, -} - -/// Messages client sends to server. -#[derive(Deserialize, Serialize, Debug)] -pub enum ClientMsg { - // VersionInfo MUST always stay first in this struct. - VersionInfo {}, -} - -/// Control message type, used in [PostBox](super::PostBox) and -/// [PostOffice](super::PostOffice) to control threads. -pub enum ControlMsg { - Shutdown, -} diff --git a/common/src/net/mod.rs b/common/src/net/mod.rs deleted file mode 100644 index a27e61bbf2..0000000000 --- a/common/src/net/mod.rs +++ /dev/null @@ -1,14 +0,0 @@ -pub mod data; -//pub mod post; -pub mod post2; - -pub use post2 as post; - -// Reexports -pub use self::{ - data::{ClientMsg, ServerMsg}, - post::{Error as PostError, PostBox, PostOffice}, -}; - -pub trait PostSend = 'static + serde::Serialize + std::marker::Send + std::fmt::Debug; -pub trait PostRecv = 'static + serde::de::DeserializeOwned + std::marker::Send + std::fmt::Debug; diff --git a/common/src/net/post.rs b/common/src/net/post.rs deleted file mode 100644 index 5912e323e9..0000000000 --- a/common/src/net/post.rs +++ /dev/null @@ -1,628 +0,0 @@ -use bincode; -use middleman::Middleman; -use mio::{ - net::{TcpListener, TcpStream}, - Events, Poll, PollOpt, Ready, Token, -}; -use mio_extras::channel::{channel, Receiver, Sender}; -use serde; -use std::{ - collections::VecDeque, - convert::TryFrom, - fmt, - io::{self, Read, Write}, - net::{Shutdown, SocketAddr}, - sync::mpsc::TryRecvError, - thread, - time::Duration, -}; - -#[derive(Clone, Debug, PartialEq)] -pub enum Error { - Disconnect, - Network, - InvalidMsg, - Internal, -} - -impl From for Error { - fn from(err: io::Error) -> Self { Error::Network } -} - -impl From for Error { - fn from(err: TryRecvError) -> Self { Error::Internal } -} - -impl From for Error { - fn from(err: bincode::ErrorKind) -> Self { Error::InvalidMsg } -} - -impl From> for Error { - fn from(err: mio_extras::channel::SendError) -> Self { Error::Internal } -} - -pub trait PostSend = 'static + serde::Serialize + Send + middleman::Message; -pub trait PostRecv = 'static + serde::de::DeserializeOwned + Send + middleman::Message; - -const TCP_TOK: Token = Token(0); -const CTRL_TOK: Token = Token(1); -const POSTBOX_TOK: Token = Token(2); -const SEND_TOK: Token = Token(3); -const RECV_TOK: Token = Token(4); -const MIDDLEMAN_TOK: Token = Token(5); - -const MAX_MSG_BYTES: usize = 1 << 28; - -enum CtrlMsg { - Shutdown, -} - -pub struct PostOffice { - worker: Option>>, - ctrl_tx: Sender, - postbox_rx: Receiver, Error>>, - poll: Poll, - err: Option, -} - -impl PostOffice { - pub fn bind>(addr: A) -> Result { - let tcp_listener = TcpListener::bind(&addr.into())?; - - let (ctrl_tx, ctrl_rx) = channel(); - let (postbox_tx, postbox_rx) = channel(); - - let worker_poll = Poll::new()?; - worker_poll.register(&tcp_listener, TCP_TOK, Ready::readable(), PollOpt::edge())?; - worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?; - - let office_poll = Poll::new()?; - office_poll.register(&postbox_rx, POSTBOX_TOK, Ready::readable(), PollOpt::edge())?; - - let worker = - thread::spawn(move || office_worker(worker_poll, tcp_listener, ctrl_rx, postbox_tx)); - - Ok(Self { - worker: Some(worker), - ctrl_tx, - postbox_rx, - poll: office_poll, - err: None, - }) - } - - pub fn error(&self) -> Option { self.err.clone() } - - pub fn new_connections(&mut self) -> impl ExactSizeIterator> { - let mut conns = VecDeque::new(); - - if self.err.is_some() { - return conns.into_iter(); - } - - let mut events = Events::with_capacity(64); - if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) { - self.err = Some(err.into()); - return conns.into_iter(); - } - - for event in events { - match event.token() { - // Keep reading new postboxes from the channel - POSTBOX_TOK => loop { - match self.postbox_rx.try_recv() { - Ok(Ok(conn)) => conns.push_back(conn), - Err(TryRecvError::Empty) => break, - Err(err) => { - self.err = Some(err.into()); - return conns.into_iter(); - }, - Ok(Err(err)) => { - self.err = Some(err); - return conns.into_iter(); - }, - } - }, - tok => panic!("Unexpected event token '{:?}'", tok), - } - } - - conns.into_iter() - } -} - -impl Drop for PostOffice { - fn drop(&mut self) { - let _ = self.ctrl_tx.send(CtrlMsg::Shutdown); - let _ = self.worker.take().map(|w| w.join()); - } -} - -fn office_worker( - poll: Poll, - tcp_listener: TcpListener, - ctrl_rx: Receiver, - postbox_tx: Sender, Error>>, -) -> Result<(), Error> { - let mut events = Events::with_capacity(64); - loop { - if let Err(err) = poll.poll(&mut events, None) { - postbox_tx.send(Err(err.into()))?; - return Ok(()); - } - - for event in &events { - match event.token() { - CTRL_TOK => loop { - match ctrl_rx.try_recv() { - Ok(CtrlMsg::Shutdown) => return Ok(()), - Err(TryRecvError::Empty) => {}, - Err(err) => { - postbox_tx.send(Err(err.into()))?; - return Ok(()); - }, - } - }, - TCP_TOK => postbox_tx.send(match tcp_listener.accept() { - Ok((stream, _)) => PostBox::from_tcpstream(stream), - Err(err) => Err(err.into()), - })?, - tok => panic!("Unexpected event token '{:?}'", tok), - } - } - } -} - -pub struct PostBox { - worker: Option>>, - ctrl_tx: Sender, - send_tx: Sender, - recv_rx: Receiver>, - poll: Poll, - err: Option, -} - -impl PostBox { - pub fn to_server>(addr: A) -> Result { - Self::from_tcpstream(TcpStream::from_stream(std::net::TcpStream::connect( - &addr.into(), - )?)?) - } - - fn from_tcpstream(tcp_stream: TcpStream) -> Result { - let (ctrl_tx, ctrl_rx) = channel(); - let (send_tx, send_rx) = channel(); - let (recv_tx, recv_rx) = channel(); - - let worker_poll = Poll::new()?; - worker_poll.register( - &tcp_stream, - TCP_TOK, - Ready::readable() | Ready::writable(), - PollOpt::edge(), - )?; - worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?; - worker_poll.register(&send_rx, SEND_TOK, Ready::readable(), PollOpt::edge())?; - - let postbox_poll = Poll::new()?; - postbox_poll.register(&recv_rx, RECV_TOK, Ready::readable(), PollOpt::edge())?; - - let worker = thread::spawn(move || { - postbox_worker(worker_poll, tcp_stream, ctrl_rx, send_rx, recv_tx) - }); - - Ok(Self { - worker: Some(worker), - ctrl_tx, - send_tx, - recv_rx, - poll: postbox_poll, - err: None, - }) - } - - pub fn error(&self) -> Option { self.err.clone() } - - pub fn send(&mut self, data: S) { let _ = self.send_tx.send(data); } - - // TODO: This method is super messy. - pub fn next_message(&mut self) -> Option { - if self.err.is_some() { - return None; - } - - loop { - let mut events = Events::with_capacity(10); - if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) { - self.err = Some(err.into()); - return None; - } - - for event in events { - match event.token() { - // Keep reading new messages from the channel - RECV_TOK => loop { - match self.recv_rx.try_recv() { - Ok(Ok(msg)) => return Some(msg), - Err(TryRecvError::Empty) => break, - Err(err) => { - self.err = Some(err.into()); - return None; - }, - Ok(Err(err)) => { - self.err = Some(err); - return None; - }, - } - }, - tok => panic!("Unexpected event token '{:?}'", tok), - } - } - } - } - - pub fn new_messages(&mut self) -> impl ExactSizeIterator { - let mut msgs = VecDeque::new(); - - if self.err.is_some() { - return msgs.into_iter(); - } - - let mut events = Events::with_capacity(64); - if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) { - self.err = Some(err.into()); - return msgs.into_iter(); - } - - for event in events { - match event.token() { - // Keep reading new messages from the channel - RECV_TOK => loop { - match self.recv_rx.try_recv() { - Ok(Ok(msg)) => msgs.push_back(msg), - Err(TryRecvError::Empty) => break, - Err(err) => { - self.err = Some(err.into()); - return msgs.into_iter(); - }, - Ok(Err(err)) => { - self.err = Some(err); - return msgs.into_iter(); - }, - } - }, - tok => panic!("Unexpected event token '{:?}'", tok), - } - } - - msgs.into_iter() - } -} - -impl Drop for PostBox { - fn drop(&mut self) { - let _ = self.ctrl_tx.send(CtrlMsg::Shutdown); - let _ = self.worker.take().map(|w| w.join()); - } -} - -fn postbox_worker( - poll: Poll, - mut tcp_stream: TcpStream, - ctrl_rx: Receiver, - send_rx: Receiver, - recv_tx: Sender>, -) -> Result<(), Error> { - fn try_tcp_send( - tcp_stream: &mut TcpStream, - chunks: &mut VecDeque>, - ) -> Result<(), Error> { - loop { - let chunk = match chunks.pop_front() { - Some(chunk) => chunk, - None => break, - }; - - match tcp_stream.write_all(&chunk) { - Ok(()) => {}, - Err(err) if err.kind() == io::ErrorKind::WouldBlock => { - chunks.push_front(chunk); - break; - }, - Err(err) => { - println!("Error: {:?}", err); - return Err(err.into()); - }, - } - } - - Ok(()) - } - - enum RecvState { - ReadHead(Vec), - ReadBody(usize, Vec), - } - - let mut recv_state = RecvState::ReadHead(Vec::new()); - let mut chunks = VecDeque::new(); - - //let mut recv_state = RecvState::ReadHead(Vec::with_capacity(8)); - let mut events = Events::with_capacity(64); - - 'work: loop { - if let Err(err) = poll.poll(&mut events, None) { - recv_tx.send(Err(err.into()))?; - break 'work; - } - - for event in &events { - match event.token() { - CTRL_TOK => match ctrl_rx.try_recv() { - Ok(CtrlMsg::Shutdown) => { - break 'work; - }, - Err(TryRecvError::Empty) => (), - Err(err) => { - recv_tx.send(Err(err.into()))?; - break 'work; - }, - }, - SEND_TOK => loop { - match send_rx.try_recv() { - Ok(outgoing_msg) => { - let mut msg_bytes = match bincode::serialize(&outgoing_msg) { - Ok(bytes) => bytes, - Err(err) => { - recv_tx.send(Err((*err).into()))?; - break 'work; - }, - }; - - let mut bytes = msg_bytes.len().to_le_bytes().as_ref().to_vec(); - bytes.append(&mut msg_bytes); - - bytes - .chunks(1024) - .map(|chunk| chunk.to_vec()) - .for_each(|chunk| chunks.push_back(chunk)); - - match try_tcp_send(&mut tcp_stream, &mut chunks) { - Ok(_) => {}, - Err(err) => { - recv_tx.send(Err(err.into()))?; - return Err(Error::Network); - }, - } - }, - Err(TryRecvError::Empty) => break, - Err(err) => Err(err)?, - } - }, - TCP_TOK => { - loop { - // Check TCP error - match tcp_stream.take_error() { - Ok(None) => {}, - Ok(Some(err)) => { - recv_tx.send(Err(err.into()))?; - break 'work; - }, - Err(err) => { - recv_tx.send(Err(err.into()))?; - break 'work; - }, - } - match &mut recv_state { - RecvState::ReadHead(head) => { - if head.len() == 8 { - let len = usize::from_le_bytes( - <[u8; 8]>::try_from(head.as_slice()).unwrap(), - ); - if len > MAX_MSG_BYTES { - println!("TOO BIG! {:x}", len); - recv_tx.send(Err(Error::InvalidMsg))?; - break 'work; - } else if len == 0 { - recv_state = RecvState::ReadHead(Vec::with_capacity(8)); - break; - } else { - recv_state = RecvState::ReadBody(len, Vec::new()); - } - } else { - let mut b = [0; 1]; - match tcp_stream.read(&mut b) { - Ok(0) => {}, - Ok(_) => head.push(b[0]), - Err(_) => break, - } - } - }, - RecvState::ReadBody(len, body) => { - if body.len() == *len { - match bincode::deserialize(&body) { - Ok(msg) => { - recv_tx.send(Ok(msg))?; - recv_state = RecvState::ReadHead(Vec::with_capacity(8)); - }, - Err(err) => { - recv_tx.send(Err((*err).into()))?; - break 'work; - }, - } - } else { - let left = *len - body.len(); - let mut buf = vec![0; left]; - match tcp_stream.read(&mut buf) { - Ok(_) => body.append(&mut buf), - Err(err) => { - recv_tx.send(Err(err.into()))?; - break 'work; - }, - } - } - }, - } - } - - // Now, try sending TCP stuff - match try_tcp_send(&mut tcp_stream, &mut chunks) { - Ok(_) => {}, - Err(err) => { - recv_tx.send(Err(err.into()))?; - return Err(Error::Network); - }, - } - }, - tok => panic!("Unexpected event token '{:?}'", tok), - } - } - } - - //tcp_stream.shutdown(Shutdown::Both)?; - Ok(()) -} - -// TESTS - -/* -#[derive(Serialize, Deserialize)] -struct TestMsg(T); - -#[test] -fn connect() { - let srv_addr = ([127, 0, 0, 1], 12345); - - let mut postoffice = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - - // We should start off with 0 incoming connections. - thread::sleep(Duration::from_millis(250)); - assert_eq!(postoffice.new_connections().len(), 0); - assert_eq!(postoffice.error(), None); - - let postbox = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); - - // Now a postbox has been created, we should have 1 new. - thread::sleep(Duration::from_millis(250)); - let incoming = postoffice.new_connections(); - assert_eq!(incoming.len(), 1); - assert_eq!(postoffice.error(), None); -} - -#[test] -fn connect_fail() { - let listen_addr = ([0; 4], 12345); - let connect_addr = ([127, 0, 0, 1], 12212); - - let mut postoffice = PostOffice::, TestMsg>::bind(listen_addr).unwrap(); - - // We should start off with 0 incoming connections. - thread::sleep(Duration::from_millis(250)); - assert_eq!(postoffice.new_connections().len(), 0); - assert_eq!(postoffice.error(), None); - - assert!(PostBox::, TestMsg>::to_server(connect_addr).is_err()); -} - -#[test] -fn connection_count() { - let srv_addr = ([127, 0, 0, 1], 12346); - - let mut postoffice = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - let mut postboxes = Vec::new(); - - // We should start off with 0 incoming connections. - thread::sleep(Duration::from_millis(250)); - assert_eq!(postoffice.new_connections().len(), 0); - assert_eq!(postoffice.error(), None); - - for _ in 0..5 { - postboxes.push(PostBox::, TestMsg>::to_server(srv_addr).unwrap()); - } - - // 5 postboxes created, we should have 5. - thread::sleep(Duration::from_millis(3500)); - let incoming = postoffice.new_connections(); - assert_eq!(incoming.len(), 5); - assert_eq!(postoffice.error(), None); -} - -#[test] -fn disconnect() { - let srv_addr = ([127, 0, 0, 1], 12347); - - let mut postoffice = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - - let mut server_postbox = { - let mut client_postbox = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); - - thread::sleep(Duration::from_millis(250)); - let mut incoming = postoffice.new_connections(); - assert_eq!(incoming.len(), 1); - assert_eq!(postoffice.error(), None); - - incoming.next().unwrap() - }; - - // The client postbox has since been disconnected. - thread::sleep(Duration::from_millis(2050)); - let incoming_msgs = server_postbox.new_messages(); - assert_eq!(incoming_msgs.len(), 0); - // TODO - // assert_eq!(server_postbox.error(), Some(Error::Disconnect)); -} - -#[test] -fn client_to_server() { - let srv_addr = ([127, 0, 0, 1], 12348); - - let mut po = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - - let mut client_pb = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); - - thread::sleep(Duration::from_millis(250)); - - let mut server_pb = po.new_connections().next().unwrap(); - - client_pb.send(TestMsg(1337.0)); - client_pb.send(TestMsg(9821.0)); - client_pb.send(TestMsg(-3.2)); - client_pb.send(TestMsg(17.0)); - - thread::sleep(Duration::from_millis(250)); - - let mut incoming_msgs = server_pb.new_messages(); - assert_eq!(incoming_msgs.len(), 4); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(1337.0)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(9821.0)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(-3.2)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(17.0)); -} - -#[test] -fn server_to_client() { - let srv_addr = ([127, 0, 0, 1], 12349); - - let mut po = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - - let mut client_pb = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); - - thread::sleep(Duration::from_millis(250)); - - let mut server_pb = po.new_connections().next().unwrap(); - - server_pb.send(TestMsg(1337)); - server_pb.send(TestMsg(9821)); - server_pb.send(TestMsg(39999999)); - server_pb.send(TestMsg(17)); - - thread::sleep(Duration::from_millis(250)); - - let mut incoming_msgs = client_pb.new_messages(); - assert_eq!(incoming_msgs.len(), 4); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(1337)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(9821)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(39999999)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(17)); -} -*/ diff --git a/common/src/net/post2.rs b/common/src/net/post2.rs deleted file mode 100644 index e4ba5b31cb..0000000000 --- a/common/src/net/post2.rs +++ /dev/null @@ -1,439 +0,0 @@ -use crossbeam::channel; -use serde::{de::DeserializeOwned, Serialize}; -use std::{ - collections::VecDeque, - convert::TryFrom, - io::{self, Read, Write}, - marker::PhantomData, - net::{Shutdown, SocketAddr, TcpListener, TcpStream}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread, - time::Duration, -}; -use tracing::warn; - -#[derive(Clone, Debug)] -pub enum Error { - Io(Arc), - Bincode(Arc), - ChannelFailure, - InvalidMessage, -} - -impl From for Error { - fn from(err: io::Error) -> Self { Error::Io(Arc::new(err)) } -} - -impl From for Error { - fn from(err: bincode::Error) -> Self { Error::Bincode(Arc::new(err)) } -} - -impl From for Error { - fn from(_error: channel::TryRecvError) -> Self { Error::ChannelFailure } -} - -pub trait PostMsg = Serialize + DeserializeOwned + 'static + Send; - -const MAX_MSG_SIZE: usize = 1 << 28; - -pub struct PostOffice { - listener: TcpListener, - error: Option, - phantom: PhantomData<(S, R)>, -} - -impl PostOffice { - pub fn bind>(addr: A) -> Result { - let listener = TcpListener::bind(addr.into())?; - listener.set_nonblocking(true)?; - - Ok(Self { - listener, - error: None, - phantom: PhantomData, - }) - } - - pub fn error(&self) -> Option { self.error.clone() } - - pub fn new_postboxes(&mut self) -> impl ExactSizeIterator> { - let mut new = Vec::new(); - - if self.error.is_some() { - return new.into_iter(); - } - - loop { - match self.listener.accept() { - Ok((stream, _sock)) => new.push(PostBox::from_stream(stream).unwrap()), - Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, - Err(e) if e.kind() == io::ErrorKind::Interrupted => {}, - Err(e) => { - self.error = Some(e.into()); - break; - }, - } - } - - new.into_iter() - } -} - -pub struct PostBox { - send_tx: channel::Sender, - recv_rx: channel::Receiver>, - worker: Option>, - running: Arc, - error: Option, -} - -impl PostBox { - pub fn to>(addr: A) -> Result { - Self::from_stream(TcpStream::connect(addr.into())?) - } - - fn from_stream(stream: TcpStream) -> Result { - stream.set_nonblocking(true)?; - - let running = Arc::new(AtomicBool::new(true)); - let worker_running = running.clone(); - - let (send_tx, send_rx) = channel::unbounded(); - let (recv_tx, recv_rx) = channel::unbounded(); - - let worker = thread::spawn(move || Self::worker(stream, send_rx, recv_tx, worker_running)); - - Ok(Self { - send_tx, - recv_rx, - worker: Some(worker), - running, - error: None, - }) - } - - pub fn error(&self) -> Option { self.error.clone() } - - pub fn send_message(&mut self, msg: S) { let _ = self.send_tx.send(msg); } - - pub fn next_message(&mut self) -> Result { - if let Some(e) = self.error.clone() { - return Err(e); - } - - match self.recv_rx.recv().map_err(|_| Error::ChannelFailure)? { - Ok(msg) => Ok(msg), - Err(e) => { - self.error = Some(e.clone()); - Err(e) - }, - } - } - - pub fn new_messages(&mut self) -> impl ExactSizeIterator { - let mut new = Vec::new(); - - if self.error.is_some() { - return new.into_iter(); - } - - loop { - match self.recv_rx.try_recv() { - Ok(Ok(msg)) => new.push(msg), - Err(channel::TryRecvError::Empty) => break, - Err(e) => { - self.error = Some(e.into()); - break; - }, - Ok(Err(e)) => { - self.error = Some(e); - break; - }, - } - } - - new.into_iter() - } - - fn worker( - mut stream: TcpStream, - send_rx: channel::Receiver, - recv_tx: channel::Sender>, - running: Arc, - ) { - let mut outgoing_chunks = VecDeque::new(); - let mut incoming_buf = Vec::new(); - - 'work: while running.load(Ordering::Relaxed) { - for _ in 0..30 { - // Get stream errors. - match stream.take_error() { - Ok(Some(e)) | Err(e) => { - recv_tx.send(Err(e.into())).unwrap(); - break 'work; - }, - Ok(None) => {}, - } - - // Try getting messages from the send channel. - for _ in 0..1000 { - match send_rx.try_recv() { - Ok(send_msg) => { - // Serialize message - let msg_bytes = bincode::serialize(&send_msg).unwrap(); - let mut msg_bytes = lz4_compress::compress(&msg_bytes); - - /* - if msg_bytes.len() > 512 { - println!("MSG SIZE: {}", msg_bytes.len()); - } - */ - - // Assemble into packet. - let mut packet_bytes = - (msg_bytes.len() as u64).to_le_bytes().as_ref().to_vec(); - packet_bytes.push(msg_bytes.iter().fold(0, |a, x| a ^ *x)); - packet_bytes.append(&mut msg_bytes); - - // Split packet into chunks. - packet_bytes - .chunks(4096) - .map(|chunk| chunk.to_vec()) - .for_each(|chunk| outgoing_chunks.push_back(chunk)) - }, - Err(channel::TryRecvError::Empty) => break, - // Worker error - Err(e) => { - let _ = recv_tx.send(Err(e.into())); - break 'work; - }, - } - } - - // Try sending bytes through the TCP stream. - for _ in 0..1000 { - match outgoing_chunks.pop_front() { - Some(mut chunk) => match stream.write(&chunk) { - Ok(n) if n == chunk.len() => {}, - Ok(n) => { - outgoing_chunks.push_front(chunk.split_off(n)); - break; - }, - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - // Return chunk to the queue to try again later. - outgoing_chunks.push_front(chunk); - break; - }, - // Worker error - Err(e) => { - recv_tx.send(Err(e.into())).unwrap(); - break 'work; - }, - }, - None => break, - } - } - - // Try receiving bytes from the TCP stream. - for _ in 0..1000 { - let mut buf = [0; 4096]; - - match stream.read(&mut buf) { - Ok(n) => incoming_buf.extend_from_slice(&buf[0..n]), - Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, - Err(e) if e.kind() == io::ErrorKind::Interrupted => {}, - // Worker error - Err(e) => { - recv_tx.send(Err(e.into())).unwrap(); - break 'work; - }, - } - } - - // Try turning bytes into messages. - for _ in 0..1000 { - match incoming_buf.get(0..9) { - Some(len_bytes) => { - let len = - u64::from_le_bytes(<[u8; 8]>::try_from(&len_bytes[0..8]).unwrap()) - as usize; // Can't fail - - if len > MAX_MSG_SIZE { - recv_tx.send(Err(Error::InvalidMessage)).unwrap(); - break 'work; - } else if incoming_buf.len() >= len + 9 { - let checksum_found = - incoming_buf[9..len + 9].iter().fold(0, |a, x| a ^ *x); - let checksum_expected = len_bytes[8]; - - assert_eq!( - checksum_found, checksum_expected, - "Message checksum failed!" - ); - - let msg_bytes = - lz4_compress::decompress(&incoming_buf[9..len + 9]).unwrap(); - - match bincode::deserialize(&msg_bytes) { - Ok(msg) => recv_tx.send(Ok(msg)).unwrap(), - Err(err) => { - println!("BINCODE ERROR: {:?}", err); - recv_tx.send(Err(err.into())).unwrap() - }, - } - - incoming_buf = incoming_buf.split_off(len + 9); - } else { - break; - } - }, - None => break, - } - } - } - - thread::sleep(Duration::from_millis(10)); - } - - if let Err(err) = stream.shutdown(Shutdown::Both) { - warn!("TCP worker stream shutdown failed: {:?}", err); - } - } -} - -impl Drop for PostBox { - fn drop(&mut self) { - self.running.store(false, Ordering::Relaxed); - self.worker.take().map(|handle| handle.join()); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::time::Instant; - - fn create_postoffice( - id: u16, - ) -> Result<(PostOffice, SocketAddr), Error> { - let sock = ([0; 4], 12345 + id).into(); - Ok((PostOffice::bind(sock)?, sock)) - } - - fn loop_for(duration: Duration, mut f: F) { - let start = Instant::now(); - while start.elapsed() < duration { - f(); - } - } - - #[test] - fn connect() { - let (mut postoffice, bound) = create_postoffice::<(), ()>(0).unwrap(); - let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port()); - - let _client0 = PostBox::<(), ()>::to(sock).unwrap(); - let _client1 = PostBox::<(), ()>::to(sock).unwrap(); - let _client2 = PostBox::<(), ()>::to(sock).unwrap(); - - let mut new_clients = 0; - loop_for(Duration::from_millis(250), || { - new_clients += postoffice.new_postboxes().count(); - }); - - assert_eq!(new_clients, 3); - } - - /* - #[test] - fn disconnect() { - let (mut postoffice, sock) = create_postoffice::<(), ()>(1).unwrap(); - - let mut client = PostBox::::to(sock).unwrap(); - loop_for(Duration::from_millis(250), || ()); - let mut server = postoffice.new_postboxes().unwrap().next().unwrap(); - - drop(client); - loop_for(Duration::from_millis(300), || ()); - - assert!(server.new_messages().is_err()); - } - */ - - #[test] - fn send_recv() { - let (mut postoffice, bound) = create_postoffice::<(), i32>(2).unwrap(); - let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port()); - let test_msgs = vec![1, 1337, 42, -48]; - - let mut client = PostBox::::to(sock).unwrap(); - loop_for(Duration::from_millis(250), || ()); - let mut server = postoffice.new_postboxes().next().unwrap(); - - for msg in &test_msgs { - client.send_message(*msg); - } - - let mut recv_msgs = Vec::new(); - loop_for(Duration::from_millis(250), || { - server.new_messages().for_each(|msg| recv_msgs.push(msg)) - }); - - assert_eq!(test_msgs, recv_msgs); - } - - #[test] - #[ignore] - fn send_recv_huge() { - let (mut postoffice, bound) = create_postoffice::<(), Vec>(3).unwrap(); - let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port()); - let test_msgs: Vec> = (0..5) - .map(|i| (0..100000).map(|j| i * 2 + j).collect()) - .collect(); - - let mut client = PostBox::, ()>::to(sock).unwrap(); - loop_for(Duration::from_millis(250), || ()); - let mut server = postoffice.new_postboxes().next().unwrap(); - - for msg in &test_msgs { - client.send_message(msg.clone()); - } - - let mut recv_msgs = Vec::new(); - loop_for(Duration::from_millis(3000), || { - server.new_messages().for_each(|msg| recv_msgs.push(msg)) - }); - - assert_eq!(test_msgs.len(), recv_msgs.len()); - assert!(test_msgs == recv_msgs); - } - - #[test] - fn send_recv_both() { - let (mut postoffice, bound) = create_postoffice::(4).unwrap(); - let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port()); - let mut client = PostBox::::to(sock).unwrap(); - loop_for(Duration::from_millis(250), || ()); - let mut server = postoffice.new_postboxes().next().unwrap(); - - let test_msgs = vec![ - (0xDEADBEAD, 0xBEEEEEEF), - (0x1BADB002, 0xBAADF00D), - (0xBAADA555, 0xC0DED00D), - (0xCAFEBABE, 0xDEADC0DE), - ]; - - for (to, from) in test_msgs { - client.send_message(to); - server.send_message(from); - - loop_for(Duration::from_millis(250), || ()); - - assert_eq!(client.new_messages().next().unwrap(), from); - assert_eq!(server.new_messages().next().unwrap(), to); - } - } -} diff --git a/server/Cargo.toml b/server/Cargo.toml index 0a33b95e71..ade46b9c6c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,6 +11,7 @@ default = ["worldgen"] [dependencies] common = { package = "veloren-common", path = "../common" } world = { package = "veloren-world", path = "../world" } +network = { package = "veloren_network", path = "../network", default-features = false } specs-idvs = { git = "https://gitlab.com/veloren/specs-idvs.git" } @@ -18,6 +19,9 @@ tracing = "0.1" specs = { version = "0.15.1", features = ["shred-derive"] } vek = "0.11.0" uvth = "3.1.1" +futures-util = "0.3" +futures-executor = "0.3" +futures-timer = "3.0" lazy_static = "1.4.0" scan_fmt = "0.2.4" ron = { version = "0.6", default-features = false } diff --git a/server/src/client.rs b/server/src/client.rs index 994a5126ff..43803a42ae 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,15 +1,13 @@ -use common::{ - msg::{ClientMsg, ClientState, RequestStateError, ServerMsg}, - net::PostBox, -}; +use common::msg::{ClientState, RequestStateError, ServerMsg}; use hashbrown::HashSet; +use network::Stream; use specs::{Component, FlaggedStorage}; use specs_idvs::IDVStorage; use vek::*; pub struct Client { pub client_state: ClientState, - pub postbox: PostBox, + pub singleton_stream: std::sync::Mutex, pub last_ping: f64, pub login_msg_sent: bool, } @@ -19,7 +17,9 @@ impl Component for Client { } impl Client { - pub fn notify(&mut self, msg: ServerMsg) { self.postbox.send_message(msg); } + pub fn notify(&mut self, msg: ServerMsg) { + let _ = self.singleton_stream.lock().unwrap().send(msg); + } pub fn is_registered(&self) -> bool { match self.client_state { @@ -37,13 +37,19 @@ impl Client { pub fn allow_state(&mut self, new_state: ClientState) { self.client_state = new_state; - self.postbox - .send_message(ServerMsg::StateAnswer(Ok(new_state))); + let _ = self + .singleton_stream + .lock() + .unwrap() + .send(ServerMsg::StateAnswer(Ok(new_state))); } pub fn error_state(&mut self, error: RequestStateError) { - self.postbox - .send_message(ServerMsg::StateAnswer(Err((error, self.client_state)))); + let _ = self + .singleton_stream + .lock() + .unwrap() + .send(ServerMsg::StateAnswer(Err((error, self.client_state)))); } } diff --git a/server/src/error.rs b/server/src/error.rs index e98ec644f7..a231b3ec34 100644 --- a/server/src/error.rs +++ b/server/src/error.rs @@ -1,11 +1,21 @@ -use common::net::PostError; +use network::{NetworkError, ParticipantError, StreamError}; #[derive(Debug)] pub enum Error { - Network(PostError), + NetworkErr(NetworkError), + ParticipantErr(ParticipantError), + StreamErr(StreamError), Other(String), } -impl From for Error { - fn from(err: PostError) -> Self { Error::Network(err) } +impl From for Error { + fn from(err: NetworkError) -> Self { Error::NetworkErr(err) } +} + +impl From for Error { + fn from(err: ParticipantError) -> Self { Error::ParticipantErr(err) } +} + +impl From for Error { + fn from(err: StreamError) -> Self { Error::StreamErr(err) } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 288537277a..753d2b1fb4 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -31,14 +31,17 @@ use common::{ cmd::ChatCommand, comp::{self, ChatType}, event::{EventBus, ServerEvent}, - msg::{ClientMsg, ClientState, ServerInfo, ServerMsg}, - net::PostOffice, + msg::{ClientState, ServerInfo, ServerMsg}, state::{State, TimeOfDay}, sync::WorldSyncExt, terrain::TerrainChunkSize, vol::{ReadVol, RectVolSize}, }; +use futures_executor::block_on; +use futures_timer::Delay; +use futures_util::{select, FutureExt}; use metrics::{ServerMetrics, TickMetrics}; +use network::{Address, Network, Pid}; use persistence::character::{CharacterLoader, CharacterLoaderResponseType, CharacterUpdater}; use specs::{join::Join, Builder, Entity as EcsEntity, RunNow, SystemData, WorldExt}; use std::{ @@ -77,7 +80,7 @@ pub struct Server { world: Arc, map: Vec, - postoffice: PostOffice, + network: Network, thread_pool: ThreadPool, @@ -233,16 +236,21 @@ impl Server { .run(settings.metrics_address) .expect("Failed to initialize server metrics submodule."); + let thread_pool = ThreadPoolBuilder::new() + .name("veloren-worker".to_string()) + .build(); + let (network, f) = Network::new(Pid::new(), None); + thread_pool.execute(f); + block_on(network.listen(Address::Tcp(settings.gameserver_address)))?; + let this = Self { state, world: Arc::new(world), map, - postoffice: PostOffice::bind(settings.gameserver_address)?, + network, - thread_pool: ThreadPoolBuilder::new() - .name("veloren-worker".into()) - .build(), + thread_pool, metrics, tick_metrics, @@ -329,17 +337,18 @@ impl Server { // 1) Build up a list of events for this frame, to be passed to the frontend. let mut frontend_events = Vec::new(); - // If networking has problems, handle them. - if let Some(err) = self.postoffice.error() { - return Err(err.into()); - } - // 2) let before_new_connections = Instant::now(); // 3) Handle inputs from clients - frontend_events.append(&mut self.handle_new_connections()?); + block_on(async { + //TIMEOUT 0.01 ms for msg handling + select!( + _ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()), + err = self.handle_new_connections(&mut frontend_events).fuse() => err, + ) + })?; let before_message_system = Instant::now(); @@ -582,13 +591,17 @@ impl Server { } /// Handle new client connections. - fn handle_new_connections(&mut self) -> Result, Error> { - let mut frontend_events = Vec::new(); + async fn handle_new_connections( + &mut self, + frontend_events: &mut Vec, + ) -> Result<(), Error> { + loop { + let participant = self.network.connected().await?; + let singleton_stream = participant.opened().await?; - for postbox in self.postoffice.new_postboxes() { let mut client = Client { client_state: ClientState::Connected, - postbox, + singleton_stream: std::sync::Mutex::new(singleton_stream), last_ping: self.state.get_time(), login_msg_sent: false, }; @@ -626,8 +639,6 @@ impl Server { frontend_events.push(Event::ClientConnected { entity }); } } - - Ok(frontend_events) } pub fn notify_client(&self, entity: EcsEntity, msg: S) diff --git a/server/src/sys/message.rs b/server/src/sys/message.rs index 0038449bb3..b13fe3753e 100644 --- a/server/src/sys/message.rs +++ b/server/src/sys/message.rs @@ -18,11 +18,360 @@ use common::{ terrain::{Block, TerrainChunkSize, TerrainGrid}, vol::{RectVolSize, Vox}, }; +use futures_executor::block_on; +use futures_timer::Delay; +use futures_util::{select, FutureExt}; use hashbrown::HashMap; use specs::{ Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage, }; +impl Sys { + ///We need to move this to a async fn, otherwise the compiler generates to + /// much recursive fn, and async closures dont work yet + #[allow(clippy::too_many_arguments)] + async fn handle_client_msg( + server_emitter: &mut common::event::Emitter<'_, ServerEvent>, + new_chat_msgs: &mut Vec<(Option, ChatMsg)>, + player_list: &HashMap, + new_players: &mut Vec, + entity: specs::Entity, + client: &mut Client, + cnt: &mut u64, + character_loader: &ReadExpect<'_, CharacterLoader>, + terrain: &ReadExpect<'_, TerrainGrid>, + uids: &ReadStorage<'_, Uid>, + can_build: &ReadStorage<'_, CanBuild>, + force_updates: &ReadStorage<'_, ForceUpdate>, + stats: &ReadStorage<'_, Stats>, + chat_modes: &ReadStorage<'_, ChatMode>, + accounts: &mut WriteExpect<'_, AuthProvider>, + block_changes: &mut Write<'_, BlockChange>, + admin_list: &ReadExpect<'_, AdminList>, + admins: &mut WriteStorage<'_, Admin>, + positions: &mut WriteStorage<'_, Pos>, + velocities: &mut WriteStorage<'_, Vel>, + orientations: &mut WriteStorage<'_, Ori>, + players: &mut WriteStorage<'_, Player>, + controllers: &mut WriteStorage<'_, Controller>, + settings: &Read<'_, ServerSettings>, + ) -> Result<(), crate::error::Error> { + loop { + let msg = client.singleton_stream.lock().unwrap().recv().await?; + *cnt += 1; + match msg { + // Go back to registered state (char selection screen) + ClientMsg::ExitIngame => match client.client_state { + // Use ClientMsg::Register instead. + ClientState::Connected => client.error_state(RequestStateError::WrongMessage), + ClientState::Registered => client.error_state(RequestStateError::Already), + ClientState::Spectator | ClientState::Character => { + server_emitter.emit(ServerEvent::ExitIngame { entity }); + }, + ClientState::Pending => {}, + }, + // Request spectator state + ClientMsg::Spectate => match client.client_state { + // Become Registered first. + ClientState::Connected => client.error_state(RequestStateError::Impossible), + ClientState::Spectator => client.error_state(RequestStateError::Already), + ClientState::Registered | ClientState::Character => { + client.allow_state(ClientState::Spectator) + }, + ClientState::Pending => {}, + }, + // Request registered state (login) + ClientMsg::Register { + view_distance, + token_or_username, + } => { + let (username, uuid) = match accounts.query(token_or_username.clone()) { + Err(err) => { + client.error_state(RequestStateError::RegisterDenied(err)); + break Ok(()); + }, + Ok((username, uuid)) => (username, uuid), + }; + + let vd = + view_distance.map(|vd| vd.min(settings.max_view_distance.unwrap_or(vd))); + let player = Player::new(username.clone(), None, vd, uuid); + let is_admin = admin_list.contains(&username); + + if !player.is_valid() { + // Invalid player + client.error_state(RequestStateError::Impossible); + break Ok(()); + } + + match client.client_state { + ClientState::Connected => { + // Add Player component to this client + let _ = players.insert(entity, player); + + // Give the Admin component to the player if their name exists in + // admin list + if is_admin { + let _ = admins.insert(entity, Admin); + } + + // Tell the client its request was successful. + client.allow_state(ClientState::Registered); + + // Send initial player list + client.notify(ServerMsg::PlayerListUpdate(PlayerListUpdate::Init( + player_list.clone(), + ))); + + // Add to list to notify all clients of the new player + new_players.push(entity); + }, + // Use RequestState instead (No need to send `player` again). + _ => client.error_state(RequestStateError::Impossible), + } + //client.allow_state(ClientState::Registered); + + // Limit view distance if it's too high + // This comes after state registration so that the client actually hears it + if settings + .max_view_distance + .zip(view_distance) + .map(|(vd, max)| vd > max) + .unwrap_or(false) + { + client.notify(ServerMsg::SetViewDistance( + settings.max_view_distance.unwrap_or(0), + )); + }; + }, + ClientMsg::SetViewDistance(view_distance) => { + if let ClientState::Character { .. } = client.client_state { + if settings + .max_view_distance + .map(|max| view_distance <= max) + .unwrap_or(true) + { + players.get_mut(entity).map(|player| { + player.view_distance = Some( + settings + .max_view_distance + .map(|max| view_distance.min(max)) + .unwrap_or(view_distance), + ) + }); + } else { + client.notify(ServerMsg::SetViewDistance( + settings.max_view_distance.unwrap_or(0), + )); + } + } + }, + ClientMsg::Character(character_id) => match client.client_state { + // Become Registered first. + ClientState::Connected => client.error_state(RequestStateError::Impossible), + ClientState::Registered | ClientState::Spectator => { + // Only send login message if it wasn't already + // sent previously + if let (Some(player), false) = (players.get(entity), client.login_msg_sent) + { + // Send a request to load the character's component data from the + // DB. Once loaded, persisted components such as stats and inventory + // will be inserted for the entity + character_loader.load_character_data( + entity, + player.uuid().to_string(), + character_id, + ); + + // Start inserting non-persisted/default components for the entity + // while we load the DB data + server_emitter.emit(ServerEvent::InitCharacterData { + entity, + character_id, + }); + + // Give the player a welcome message + if settings.server_description.len() > 0 { + client.notify( + ChatType::CommandInfo + .server_msg(settings.server_description.clone()), + ); + } + + // Only send login message if it wasn't already + // sent previously + if !client.login_msg_sent { + new_chat_msgs.push((None, ChatMsg { + chat_type: ChatType::Online, + message: format!("[{}] is now online.", &player.alias), // TODO: Localize this + })); + + client.login_msg_sent = true; + } + } else { + client.notify(ServerMsg::CharacterDataLoadError(String::from( + "Failed to fetch player entity", + ))) + } + }, + ClientState::Character => client.error_state(RequestStateError::Already), + ClientState::Pending => {}, + }, + ClientMsg::ControllerInputs(inputs) => match client.client_state { + ClientState::Connected | ClientState::Registered | ClientState::Spectator => { + client.error_state(RequestStateError::Impossible) + }, + ClientState::Character => { + if let Some(controller) = controllers.get_mut(entity) { + controller.inputs.update_with_new(inputs); + } + }, + ClientState::Pending => {}, + }, + ClientMsg::ControlEvent(event) => match client.client_state { + ClientState::Connected | ClientState::Registered | ClientState::Spectator => { + client.error_state(RequestStateError::Impossible) + }, + ClientState::Character => { + // Skip respawn if client entity is alive + if let ControlEvent::Respawn = event { + if stats.get(entity).map_or(true, |s| !s.is_dead) { + continue; + } + } + if let Some(controller) = controllers.get_mut(entity) { + controller.events.push(event); + } + }, + ClientState::Pending => {}, + }, + ClientMsg::ControlAction(event) => match client.client_state { + ClientState::Connected | ClientState::Registered | ClientState::Spectator => { + client.error_state(RequestStateError::Impossible) + }, + ClientState::Character => { + if let Some(controller) = controllers.get_mut(entity) { + controller.actions.push(event); + } + }, + ClientState::Pending => {}, + }, + ClientMsg::ChatMsg(message) => match client.client_state { + ClientState::Connected => client.error_state(RequestStateError::Impossible), + ClientState::Registered | ClientState::Spectator | ClientState::Character => { + match validate_chat_msg(&message) { + Ok(()) => { + if let Some(from) = uids.get(entity) { + let mode = chat_modes.get(entity).cloned().unwrap_or_default(); + let msg = mode.new_message(*from, message); + new_chat_msgs.push((Some(entity), msg)); + } else { + tracing::error!("Could not send message. Missing player uid"); + } + }, + Err(ChatMsgValidationError::TooLong) => { + let max = MAX_BYTES_CHAT_MSG; + let len = message.len(); + tracing::warn!( + ?len, + ?max, + "Recieved a chat message that's too long" + ) + }, + } + }, + ClientState::Pending => {}, + }, + ClientMsg::PlayerPhysics { pos, vel, ori } => match client.client_state { + ClientState::Character => { + if force_updates.get(entity).is_none() + && stats.get(entity).map_or(true, |s| !s.is_dead) + { + let _ = positions.insert(entity, pos); + let _ = velocities.insert(entity, vel); + let _ = orientations.insert(entity, ori); + } + }, + // Only characters can send positions. + _ => client.error_state(RequestStateError::Impossible), + }, + ClientMsg::BreakBlock(pos) => { + if can_build.get(entity).is_some() { + block_changes.set(pos, Block::empty()); + } + }, + ClientMsg::PlaceBlock(pos, block) => { + if can_build.get(entity).is_some() { + block_changes.try_set(pos, block); + } + }, + ClientMsg::TerrainChunkRequest { key } => match client.client_state { + ClientState::Connected | ClientState::Registered => { + client.error_state(RequestStateError::Impossible); + }, + ClientState::Spectator | ClientState::Character => { + let in_vd = if let (Some(view_distance), Some(pos)) = ( + players.get(entity).and_then(|p| p.view_distance), + positions.get(entity), + ) { + pos.0.xy().map(|e| e as f64).distance( + key.map(|e| e as f64 + 0.5) + * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), + ) < (view_distance as f64 + 1.5) * TerrainChunkSize::RECT_SIZE.x as f64 + } else { + true + }; + if in_vd { + match terrain.get_key(key) { + Some(chunk) => client.notify(ServerMsg::TerrainChunkUpdate { + key, + chunk: Ok(Box::new(chunk.clone())), + }), + None => server_emitter.emit(ServerEvent::ChunkRequest(entity, key)), + } + } + }, + ClientState::Pending => {}, + }, + // Always possible. + ClientMsg::Ping => client.notify(ServerMsg::Pong), + ClientMsg::Pong => {}, + ClientMsg::Disconnect => { + client.notify(ServerMsg::Disconnect); + }, + ClientMsg::Terminate => { + server_emitter.emit(ServerEvent::ClientDisconnect(entity)); + }, + ClientMsg::RequestCharacterList => { + if let Some(player) = players.get(entity) { + character_loader.load_character_list(entity, player.uuid().to_string()) + } + }, + ClientMsg::CreateCharacter { alias, tool, body } => { + if let Some(player) = players.get(entity) { + character_loader.create_character( + entity, + player.uuid().to_string(), + alias, + tool, + body, + ); + } + }, + ClientMsg::DeleteCharacter(character_id) => { + if let Some(player) = players.get(entity) { + character_loader.delete_character( + entity, + player.uuid().to_string(), + character_id, + ); + } + }, + } + } + } +} + /// This system will handle new messages from clients pub struct Sys; impl<'a> System<'a> for Sys { @@ -107,347 +456,53 @@ impl<'a> System<'a> for Sys { let mut new_players = Vec::new(); for (entity, client) in (&entities, &mut clients).join() { - let new_msgs = client.postbox.new_messages(); + let mut cnt = 0; + + let network_err: Result<(), crate::error::Error> = block_on(async { + //TIMEOUT 0.01 ms for msg handling + select!( + _ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()), + err = Self::handle_client_msg( + &mut server_emitter, + &mut new_chat_msgs, + &player_list, + &mut new_players, + entity, + client, + &mut cnt, + + &character_loader, + &terrain, + &uids, + &can_build, + &force_updates, + &stats, + &chat_modes, + &mut accounts, + &mut block_changes, + &admin_list, + &mut admins, + &mut positions, + &mut velocities, + &mut orientations, + &mut players, + &mut controllers, + &settings, + ).fuse() => err, + ) + }); // Update client ping. - if new_msgs.len() > 0 { + if cnt > 0 { client.last_ping = time.0 } else if time.0 - client.last_ping > CLIENT_TIMEOUT // Timeout - || client.postbox.error().is_some() + || network_err.is_err() // Postbox error { server_emitter.emit(ServerEvent::ClientDisconnect(entity)); } else if time.0 - client.last_ping > CLIENT_TIMEOUT * 0.5 { // Try pinging the client if the timeout is nearing. - client.postbox.send_message(ServerMsg::Ping); - } - - // Process incoming messages. - for msg in new_msgs { - match msg { - // Go back to registered state (char selection screen) - ClientMsg::ExitIngame => match client.client_state { - // Use ClientMsg::Register instead. - ClientState::Connected => { - client.error_state(RequestStateError::WrongMessage) - }, - ClientState::Registered => client.error_state(RequestStateError::Already), - ClientState::Spectator | ClientState::Character => { - server_emitter.emit(ServerEvent::ExitIngame { entity }); - }, - ClientState::Pending => {}, - }, - // Request spectator state - ClientMsg::Spectate => match client.client_state { - // Become Registered first. - ClientState::Connected => client.error_state(RequestStateError::Impossible), - ClientState::Spectator => client.error_state(RequestStateError::Already), - ClientState::Registered | ClientState::Character => { - client.allow_state(ClientState::Spectator) - }, - ClientState::Pending => {}, - }, - // Request registered state (login) - ClientMsg::Register { - view_distance, - token_or_username, - } => { - let (username, uuid) = match accounts.query(token_or_username.clone()) { - Err(err) => { - client.error_state(RequestStateError::RegisterDenied(err)); - break; - }, - Ok((username, uuid)) => (username, uuid), - }; - - let vd = view_distance - .map(|vd| vd.min(settings.max_view_distance.unwrap_or(vd))); - let player = Player::new(username.clone(), None, vd, uuid); - let is_admin = admin_list.contains(&username); - - if !player.is_valid() { - // Invalid player - client.error_state(RequestStateError::Impossible); - break; - } - - match client.client_state { - ClientState::Connected => { - // Add Player component to this client - let _ = players.insert(entity, player); - - // Give the Admin component to the player if their name exists in - // admin list - if is_admin { - let _ = admins.insert(entity, Admin); - } - - // Tell the client its request was successful. - client.allow_state(ClientState::Registered); - - // Send initial player list - client.notify(ServerMsg::PlayerListUpdate(PlayerListUpdate::Init( - player_list.clone(), - ))); - - // Add to list to notify all clients of the new player - new_players.push(entity); - }, - // Use RequestState instead (No need to send `player` again). - _ => client.error_state(RequestStateError::Impossible), - } - //client.allow_state(ClientState::Registered); - - // Limit view distance if it's too high - // This comes after state registration so that the client actually hears it - if settings - .max_view_distance - .zip(view_distance) - .map(|(vd, max)| vd > max) - .unwrap_or(false) - { - client.notify(ServerMsg::SetViewDistance( - settings.max_view_distance.unwrap_or(0), - )); - }; - }, - ClientMsg::SetViewDistance(view_distance) => match client.client_state { - ClientState::Character { .. } => { - if settings - .max_view_distance - .map(|max| view_distance <= max) - .unwrap_or(true) - { - players.get_mut(entity).map(|player| { - player.view_distance = Some( - settings - .max_view_distance - .map(|max| view_distance.min(max)) - .unwrap_or(view_distance), - ) - }); - } else { - client.notify(ServerMsg::SetViewDistance( - settings.max_view_distance.unwrap_or(0), - )); - } - }, - _ => {}, - }, - ClientMsg::Character(character_id) => match client.client_state { - // Become Registered first. - ClientState::Connected => client.error_state(RequestStateError::Impossible), - ClientState::Registered | ClientState::Spectator => { - // Only send login message if it wasn't already - // sent previously - if let (Some(player), false) = - (players.get(entity), client.login_msg_sent) - { - // Send a request to load the character's component data from the - // DB. Once loaded, persisted components such as stats and inventory - // will be inserted for the entity - character_loader.load_character_data( - entity, - player.uuid().to_string(), - character_id, - ); - - // Start inserting non-persisted/default components for the entity - // while we load the DB data - server_emitter.emit(ServerEvent::InitCharacterData { - entity, - character_id, - }); - - // Give the player a welcome message - if settings.server_description.len() > 0 { - client.notify( - ChatType::CommandInfo - .server_msg(settings.server_description.clone()), - ); - } - - // Only send login message if it wasn't already - // sent previously - if !client.login_msg_sent { - new_chat_msgs.push((None, ChatMsg { - chat_type: ChatType::Online, - message: format!("[{}] is now online.", &player.alias), // TODO: Localize this - })); - - client.login_msg_sent = true; - } - } else { - client.notify(ServerMsg::CharacterDataLoadError(String::from( - "Failed to fetch player entity", - ))) - } - }, - ClientState::Character => client.error_state(RequestStateError::Already), - ClientState::Pending => {}, - }, - ClientMsg::ControllerInputs(inputs) => match client.client_state { - ClientState::Connected - | ClientState::Registered - | ClientState::Spectator => { - client.error_state(RequestStateError::Impossible) - }, - ClientState::Character => { - if let Some(controller) = controllers.get_mut(entity) { - controller.inputs.update_with_new(inputs); - } - }, - ClientState::Pending => {}, - }, - ClientMsg::ControlEvent(event) => match client.client_state { - ClientState::Connected - | ClientState::Registered - | ClientState::Spectator => { - client.error_state(RequestStateError::Impossible) - }, - ClientState::Character => { - // Skip respawn if client entity is alive - if let &ControlEvent::Respawn = &event { - if stats.get(entity).map_or(true, |s| !s.is_dead) { - continue; - } - } - if let Some(controller) = controllers.get_mut(entity) { - controller.events.push(event); - } - }, - ClientState::Pending => {}, - }, - ClientMsg::ControlAction(event) => match client.client_state { - ClientState::Connected - | ClientState::Registered - | ClientState::Spectator => { - client.error_state(RequestStateError::Impossible) - }, - ClientState::Character => { - if let Some(controller) = controllers.get_mut(entity) { - controller.actions.push(event); - } - }, - ClientState::Pending => {}, - }, - ClientMsg::ChatMsg(message) => match client.client_state { - ClientState::Connected => client.error_state(RequestStateError::Impossible), - ClientState::Registered - | ClientState::Spectator - | ClientState::Character => match validate_chat_msg(&message) { - Ok(()) => { - if let Some(from) = uids.get(entity) { - let mode = chat_modes.get(entity).cloned().unwrap_or_default(); - let msg = mode.new_message(*from, message); - new_chat_msgs.push((Some(entity), msg)); - } else { - tracing::error!("Could not send message. Missing player uid"); - } - }, - Err(ChatMsgValidationError::TooLong) => { - let max = MAX_BYTES_CHAT_MSG; - let len = message.len(); - tracing::warn!( - ?len, - ?max, - "Recieved a chat message that's too long" - ) - }, - }, - ClientState::Pending => {}, - }, - ClientMsg::PlayerPhysics { pos, vel, ori } => match client.client_state { - ClientState::Character => { - if force_updates.get(entity).is_none() - && stats.get(entity).map_or(true, |s| !s.is_dead) - { - let _ = positions.insert(entity, pos); - let _ = velocities.insert(entity, vel); - let _ = orientations.insert(entity, ori); - } - }, - // Only characters can send positions. - _ => client.error_state(RequestStateError::Impossible), - }, - ClientMsg::BreakBlock(pos) => { - if can_build.get(entity).is_some() { - block_changes.set(pos, Block::empty()); - } - }, - ClientMsg::PlaceBlock(pos, block) => { - if can_build.get(entity).is_some() { - block_changes.try_set(pos, block); - } - }, - ClientMsg::TerrainChunkRequest { key } => match client.client_state { - ClientState::Connected | ClientState::Registered => { - client.error_state(RequestStateError::Impossible); - }, - ClientState::Spectator | ClientState::Character => { - let in_vd = if let (Some(view_distance), Some(pos)) = ( - players.get(entity).and_then(|p| p.view_distance), - positions.get(entity), - ) { - pos.0.xy().map(|e| e as f64).distance( - key.map(|e| e as f64 + 0.5) - * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), - ) < (view_distance as f64 + 1.5) - * TerrainChunkSize::RECT_SIZE.x as f64 - } else { - true - }; - if in_vd { - match terrain.get_key(key) { - Some(chunk) => { - client.postbox.send_message(ServerMsg::TerrainChunkUpdate { - key, - chunk: Ok(Box::new(chunk.clone())), - }) - }, - None => { - server_emitter.emit(ServerEvent::ChunkRequest(entity, key)) - }, - } - } - }, - ClientState::Pending => {}, - }, - // Always possible. - ClientMsg::Ping => client.postbox.send_message(ServerMsg::Pong), - ClientMsg::Pong => {}, - ClientMsg::Disconnect => { - client.postbox.send_message(ServerMsg::Disconnect); - }, - ClientMsg::Terminate => { - server_emitter.emit(ServerEvent::ClientDisconnect(entity)); - }, - ClientMsg::RequestCharacterList => { - if let Some(player) = players.get(entity) { - character_loader.load_character_list(entity, player.uuid().to_string()) - } - }, - ClientMsg::CreateCharacter { alias, tool, body } => { - if let Some(player) = players.get(entity) { - character_loader.create_character( - entity, - player.uuid().to_string(), - alias, - tool, - body, - ); - } - }, - ClientMsg::DeleteCharacter(character_id) => { - if let Some(player) = players.get(entity) { - character_loader.delete_character( - entity, - player.uuid().to_string(), - character_id, - ); - } - }, - } + client.notify(ServerMsg::Ping); } } diff --git a/server/src/sys/waypoint.rs b/server/src/sys/waypoint.rs index c7b3ebbfb7..9129af814a 100644 --- a/server/src/sys/waypoint.rs +++ b/server/src/sys/waypoint.rs @@ -40,9 +40,7 @@ impl<'a> System<'a> for Sys { if let Ok(wp_old) = waypoints.insert(entity, Waypoint::new(player_pos.0, *time)) { if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) { - client - .postbox - .send_message(ServerMsg::Notification(Notification::WaypointSaved)); + client.notify(ServerMsg::Notification(Notification::WaypointSaved)); } } } diff --git a/voxygen/src/menu/main/client_init.rs b/voxygen/src/menu/main/client_init.rs index 02c171ac5b..0bc8fc29f3 100644 --- a/voxygen/src/menu/main/client_init.rs +++ b/voxygen/src/menu/main/client_init.rs @@ -1,5 +1,7 @@ -use client::{error::Error as ClientError, Client}; -use common::net::PostError; +use client::{ + error::{Error as ClientError, NetworkError}, + Client, +}; use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError}; use std::{ net::ToSocketAddrs, @@ -98,12 +100,8 @@ impl ClientInit { }, Err(err) => { match err { - ClientError::Network(PostError::Bincode(_)) => { - last_err = Some(Error::ClientError(err)); - break 'tries; + ClientError::NetworkErr(NetworkError::ListenFailed(..)) => { }, - // Assume the connection failed and try again soon - ClientError::Network(_) => {}, // Non-connection error, stop attempts err => { last_err = Some(Error::ClientError(err)); diff --git a/voxygen/src/menu/main/mod.rs b/voxygen/src/menu/main/mod.rs index 0cdc9b6e17..cdd9410545 100644 --- a/voxygen/src/menu/main/mod.rs +++ b/voxygen/src/menu/main/mod.rs @@ -109,7 +109,17 @@ impl PlayState for MainMenuState { client::Error::NotOnWhitelist => { localized_strings.get("main.login.not_on_whitelist").into() }, - client::Error::Network(e) => format!( + client::Error::NetworkErr(e) => format!( + "{}: {:?}", + localized_strings.get("main.login.network_error"), + e + ), + client::Error::ParticipantErr(e) => format!( + "{}: {:?}", + localized_strings.get("main.login.network_error"), + e + ), + client::Error::StreamErr(e) => format!( "{}: {:?}", localized_strings.get("main.login.network_error"), e diff --git a/voxygen/src/singleplayer.rs b/voxygen/src/singleplayer.rs index aee967caf4..7184d39dd5 100644 --- a/voxygen/src/singleplayer.rs +++ b/voxygen/src/singleplayer.rs @@ -47,14 +47,14 @@ impl Singleplayer { let paused = Arc::new(AtomicBool::new(false)); let paused1 = paused.clone(); + let server = Server::new(settings2).expect("Failed to create server instance!"); + + let server = match thread_pool { + Some(pool) => server.with_thread_pool(pool), + None => server, + }; + let thread = thread::spawn(move || { - let server = Server::new(settings2).expect("Failed to create server instance!"); - - let server = match thread_pool { - Some(pool) => server.with_thread_pool(pool), - None => server, - }; - run_server(server, receiver, paused1); });