From b1db5ef488931da3fea04b36c0911f263f636165 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Sun, 4 Oct 2020 20:20:18 +0200 Subject: [PATCH] Redo Network Frontend. Rather than having a single Stream to handle ALL data, seperate into multiple streams: - Ping Stream, for seperate PINGS - Register Stream, only used till the client is registered, then no longer used! - General Stream, used for msg that can occur always - NotInGame Stream, used for everything NOT ingame, e.g. Character Screen - InGame Stream, used for all GAME data, players, terrain, entities, etc... This version does compile, and gets the client registered (with auth too) but doesnt get to the char screen yet. This fixes also the ignoring messages problem we had, as we are not sending data to the register stream! This fixes also the problem that the server had to sleep for the Stream Creation, as the Server is now creating the streams and client has to sleep. --- client/src/lib.rs | 1276 +++++++++++----------- common/src/msg/client.rs | 51 +- common/src/msg/mod.rs | 16 +- common/src/msg/server.rs | 63 +- server/src/client.rs | 95 +- server/src/cmd.rs | 6 +- server/src/events/entity_manipulation.rs | 4 +- server/src/events/group_manip.rs | 52 +- server/src/events/interaction.rs | 2 +- server/src/events/inventory_manip.rs | 6 +- server/src/events/player.rs | 7 +- server/src/lib.rs | 160 +-- server/src/state_ext.rs | 26 +- server/src/sys/entity_sync.rs | 24 +- server/src/sys/invite_timeout.rs | 4 +- server/src/sys/message.rs | 871 +++++++++------ server/src/sys/subscription.rs | 12 +- server/src/sys/terrain.rs | 6 +- server/src/sys/terrain_sync.rs | 8 +- server/src/sys/waypoint.rs | 2 +- voxygen/src/menu/char_selection/mod.rs | 6 +- voxygen/src/session.rs | 10 +- 22 files changed, 1488 insertions(+), 1219 deletions(-) diff --git a/client/src/lib.rs b/client/src/lib.rs index fdd3cd795e..74a7526ad7 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -25,9 +25,11 @@ use common::{ }, event::{EventBus, LocalEvent}, msg::{ - validate_chat_msg, ChatMsgValidationError, ClientMsg, ClientState, DisconnectReason, - InviteAnswer, Notification, PlayerInfo, PlayerListUpdate, RegisterError, RequestStateError, - ServerInfo, ServerMsg, MAX_BYTES_CHAT_MSG, + validate_chat_msg, ChatMsgValidationError, ClientInGameMsg, ClientIngame, ClientMsg, + ClientNotInGameMsg, ClientRegisterMsg, ClientType, DisconnectReason, InviteAnswer, + Notification, PingMsg, PlayerInfo, PlayerListUpdate, RegisterError, ServerInGameMsg, + ServerInfo, ServerInitMsg, ServerMsg, ServerNotInGameMsg, ServerRegisterAnswerMsg, + MAX_BYTES_CHAT_MSG, }, outcome::Outcome, recipe::RecipeBook, @@ -41,7 +43,7 @@ use futures_timer::Delay; use futures_util::{select, FutureExt}; use hashbrown::{HashMap, HashSet}; use image::DynamicImage; -use network::{Network, Participant, Pid, Promises, ProtocolAddr, Stream}; +use network::{Network, Participant, Pid, ProtocolAddr, Stream}; use num::traits::FloatConst; use rayon::prelude::*; use std::{ @@ -68,7 +70,8 @@ pub enum Event { } pub struct Client { - client_state: ClientState, + registered: bool, + client_ingame: Option, thread_pool: ThreadPool, pub server_info: ServerInfo, /// Just the "base" layer for LOD; currently includes colors and nothing @@ -111,6 +114,10 @@ pub struct Client { _network: Network, participant: Option, singleton_stream: Stream, + ping_stream: Stream, + register_stream: Stream, + not_in_game_stream: Stream, + in_game_stream: Stream, client_timeout: Duration, last_server_ping: f64, @@ -141,8 +148,6 @@ pub struct CharacterList { impl Client { /// Create a new `Client`. pub fn new>(addr: A, view_distance: Option) -> Result { - let client_state = ClientState::Connected; - let mut thread_pool = ThreadPoolBuilder::new() .name("veloren-worker".into()) .build(); @@ -153,16 +158,33 @@ impl Client { thread_pool.execute(scheduler); let participant = block_on(network.connect(ProtocolAddr::Tcp(addr.into())))?; - let mut stream = block_on(participant.open( - 10, - Promises::ORDERED | Promises::CONSISTENCY | Promises::COMPRESSED, - ))?; + let stream = block_on(participant.opened())?; + let mut ping_stream = block_on(participant.opened())?; + let mut register_stream = block_on(participant.opened())?; + let not_in_game_stream = block_on(participant.opened())?; + let in_game_stream = block_on(participant.opened())?; + + register_stream.send(ClientType::Game)?; + let server_info: ServerInfo = block_on(register_stream.recv())?; + + // TODO: Display that versions don't match in Voxygen + if server_info.git_hash != *common::util::GIT_HASH { + warn!( + "Server is running {}[{}], you are running {}[{}], versions might be incompatible!", + server_info.git_hash, + server_info.git_date, + common::util::GIT_HASH.to_string(), + common::util::GIT_DATE.to_string(), + ); + } + debug!("Auth Server: {:?}", server_info.auth_provider); + + ping_stream.send(PingMsg::Ping)?; // Wait for initial sync let ( state, entity, - server_info, lod_base, lod_alt, lod_horizon, @@ -170,204 +192,179 @@ impl Client { recipe_book, max_group_size, client_timeout, - ) = block_on(async { - loop { - match stream.recv().await? { - ServerMsg::InitialSync { - entity_package, - server_info, - time_of_day, - max_group_size, - client_timeout, - world_map, - recipe_book, - } => { - // TODO: Display that versions don't match in Voxygen - if server_info.git_hash != *common::util::GIT_HASH { - warn!( - "Server is running {}[{}], you are running {}[{}], versions might \ - be incompatible!", - server_info.git_hash, - server_info.git_date, - common::util::GIT_HASH.to_string(), - common::util::GIT_DATE.to_string(), - ); - } + ) = match block_on(register_stream.recv())? { + ServerInitMsg::GameSync { + entity_package, + time_of_day, + max_group_size, + client_timeout, + world_map, + recipe_book, + } => { + // Initialize `State` + let mut state = State::default(); + // Client-only components + state + .ecs_mut() + .register::>(); - debug!("Auth Server: {:?}", server_info.auth_provider); + let entity = state.ecs_mut().apply_entity_package(entity_package); + *state.ecs_mut().write_resource() = time_of_day; - // Initialize `State` - let mut state = State::default(); - // Client-only components - state - .ecs_mut() - .register::>(); - - let entity = state.ecs_mut().apply_entity_package(entity_package); - *state.ecs_mut().write_resource() = time_of_day; - - let map_size_lg = common::terrain::MapSizeLg::new(world_map.dimensions_lg) - .map_err(|_| { - Error::Other(format!( - "Server sent bad world map dimensions: {:?}", - world_map.dimensions_lg, - )) - })?; - let map_size = map_size_lg.chunks(); - let max_height = world_map.max_height; - let sea_level = world_map.sea_level; - let rgba = world_map.rgba; - let alt = world_map.alt; - let expected_size = - (u32::from(map_size.x) * u32::from(map_size.y)) as usize; - if rgba.len() != expected_size { - return Err(Error::Other("Server sent a bad world map image".into())); - } - if alt.len() != expected_size { - return Err(Error::Other("Server sent a bad altitude map.".into())); - } - let [west, east] = world_map.horizons; - let scale_angle = - |a: u8| (a as f32 / 255.0 * ::FRAC_PI_2()).tan(); - let scale_height = |h: u8| h as f32 / 255.0 * max_height; - let scale_height_big = |h: u32| (h >> 3) as f32 / 8191.0 * max_height; - - debug!("Preparing image..."); - let unzip_horizons = |(angles, heights): &(Vec<_>, Vec<_>)| { - ( - angles.iter().copied().map(scale_angle).collect::>(), - heights - .iter() - .copied() - .map(scale_height) - .collect::>(), - ) - }; - let horizons = [unzip_horizons(&west), unzip_horizons(&east)]; - - // Redraw map (with shadows this time). - let mut world_map = vec![0u32; rgba.len()]; - let mut map_config = common::terrain::map::MapConfig::orthographic( - map_size_lg, - core::ops::RangeInclusive::new(0.0, max_height), - ); - map_config.horizons = Some(&horizons); - let rescale_height = |h: f32| h / max_height; - let bounds_check = |pos: Vec2| { - pos.reduce_partial_min() >= 0 - && pos.x < map_size.x as i32 - && pos.y < map_size.y as i32 - }; - map_config.generate( - |pos| { - let (rgba, alt, downhill_wpos) = if bounds_check(pos) { - let posi = - pos.y as usize * map_size.x as usize + pos.x as usize; - let [r, g, b, a] = rgba[posi].to_le_bytes(); - let alti = alt[posi]; - // Compute downhill. - let downhill = { - let mut best = -1; - let mut besth = alti; - for nposi in neighbors(map_size_lg, posi) { - let nbh = alt[nposi]; - if nbh < besth { - besth = nbh; - best = nposi as isize; - } - } - best - }; - let downhill_wpos = if downhill < 0 { - None - } else { - Some( - Vec2::new( - (downhill as usize % map_size.x as usize) as i32, - (downhill as usize / map_size.x as usize) as i32, - ) * TerrainChunkSize::RECT_SIZE.map(|e| e as i32), - ) - }; - (Rgba::new(r, g, b, a), alti, downhill_wpos) - } else { - (Rgba::zero(), 0, None) - }; - let wpos = pos * TerrainChunkSize::RECT_SIZE.map(|e| e as i32); - let downhill_wpos = downhill_wpos.unwrap_or( - wpos + TerrainChunkSize::RECT_SIZE.map(|e| e as i32), - ); - let alt = rescale_height(scale_height_big(alt)); - common::terrain::map::MapSample { - rgb: Rgb::from(rgba), - alt: f64::from(alt), - downhill_wpos, - connections: None, - } - }, - |wpos| { - let pos = - wpos.map2(TerrainChunkSize::RECT_SIZE, |e, f| e / f as i32); - rescale_height(if bounds_check(pos) { - let posi = - pos.y as usize * map_size.x as usize + pos.x as usize; - scale_height_big(alt[posi]) - } else { - 0.0 - }) - }, - |pos, (r, g, b, a)| { - world_map[pos.y * map_size.x as usize + pos.x] = - u32::from_le_bytes([r, g, b, a]); - }, - ); - let make_raw = |rgba| -> Result<_, Error> { - let mut raw = vec![0u8; 4 * world_map.len()]; - LittleEndian::write_u32_into(rgba, &mut raw); - Ok(Arc::new( - image::DynamicImage::ImageRgba8({ - // Should not fail if the dimensions are correct. - let map = - image::ImageBuffer::from_raw(u32::from(map_size.x), u32::from(map_size.y), raw); - map.ok_or_else(|| Error::Other("Server sent a bad world map image".into()))? - }) - // Flip the image, since Voxygen uses an orientation where rotation from - // positive x axis to positive y axis is counterclockwise around the z axis. - .flipv(), - )) - }; - let lod_base = rgba; - let lod_alt = alt; - let world_map = make_raw(&world_map)?; - let horizons = (west.0, west.1, east.0, east.1) - .into_par_iter() - .map(|(wa, wh, ea, eh)| u32::from_le_bytes([wa, wh, ea, eh])) - .collect::>(); - let lod_horizon = horizons; - let map_bounds = Vec2::new(sea_level, max_height); - debug!("Done preparing image..."); - - break Ok(( - state, - entity, - server_info, - lod_base, - lod_alt, - lod_horizon, - (world_map, map_size, map_bounds), - recipe_book, - max_group_size, - client_timeout, - )); - }, - ServerMsg::TooManyPlayers => break Err(Error::TooManyPlayers), - err => { - warn!("whoops, server mad {:?}, ignoring", err); - }, + let map_size_lg = common::terrain::MapSizeLg::new(world_map.dimensions_lg) + .map_err(|_| { + Error::Other(format!( + "Server sent bad world map dimensions: {:?}", + world_map.dimensions_lg, + )) + })?; + let map_size = map_size_lg.chunks(); + let max_height = world_map.max_height; + let sea_level = world_map.sea_level; + let rgba = world_map.rgba; + let alt = world_map.alt; + let expected_size = (u32::from(map_size.x) * u32::from(map_size.y)) as usize; + if rgba.len() != expected_size { + return Err(Error::Other("Server sent a bad world map image".into())); } - } - })?; + if alt.len() != expected_size { + return Err(Error::Other("Server sent a bad altitude map.".into())); + } + let [west, east] = world_map.horizons; + let scale_angle = + |a: u8| (a as f32 / 255.0 * ::FRAC_PI_2()).tan(); + let scale_height = |h: u8| h as f32 / 255.0 * max_height; + let scale_height_big = |h: u32| (h >> 3) as f32 / 8191.0 * max_height; + ping_stream.send(PingMsg::Ping)?; - stream.send(ClientMsg::Ping)?; + debug!("Preparing image..."); + let unzip_horizons = |(angles, heights): &(Vec<_>, Vec<_>)| { + ( + angles.iter().copied().map(scale_angle).collect::>(), + heights + .iter() + .copied() + .map(scale_height) + .collect::>(), + ) + }; + let horizons = [unzip_horizons(&west), unzip_horizons(&east)]; + + // Redraw map (with shadows this time). + let mut world_map = vec![0u32; rgba.len()]; + let mut map_config = common::terrain::map::MapConfig::orthographic( + map_size_lg, + core::ops::RangeInclusive::new(0.0, max_height), + ); + map_config.horizons = Some(&horizons); + let rescale_height = |h: f32| h / max_height; + let bounds_check = |pos: Vec2| { + pos.reduce_partial_min() >= 0 + && pos.x < map_size.x as i32 + && pos.y < map_size.y as i32 + }; + ping_stream.send(PingMsg::Ping)?; + map_config.generate( + |pos| { + let (rgba, alt, downhill_wpos) = if bounds_check(pos) { + let posi = pos.y as usize * map_size.x as usize + pos.x as usize; + let [r, g, b, a] = rgba[posi].to_le_bytes(); + let alti = alt[posi]; + // Compute downhill. + let downhill = { + let mut best = -1; + let mut besth = alti; + for nposi in neighbors(map_size_lg, posi) { + let nbh = alt[nposi]; + if nbh < besth { + besth = nbh; + best = nposi as isize; + } + } + best + }; + let downhill_wpos = if downhill < 0 { + None + } else { + Some( + Vec2::new( + (downhill as usize % map_size.x as usize) as i32, + (downhill as usize / map_size.x as usize) as i32, + ) * TerrainChunkSize::RECT_SIZE.map(|e| e as i32), + ) + }; + (Rgba::new(r, g, b, a), alti, downhill_wpos) + } else { + (Rgba::zero(), 0, None) + }; + let wpos = pos * TerrainChunkSize::RECT_SIZE.map(|e| e as i32); + let downhill_wpos = downhill_wpos + .unwrap_or(wpos + TerrainChunkSize::RECT_SIZE.map(|e| e as i32)); + let alt = rescale_height(scale_height_big(alt)); + common::terrain::map::MapSample { + rgb: Rgb::from(rgba), + alt: f64::from(alt), + downhill_wpos, + connections: None, + } + }, + |wpos| { + let pos = wpos.map2(TerrainChunkSize::RECT_SIZE, |e, f| e / f as i32); + rescale_height(if bounds_check(pos) { + let posi = pos.y as usize * map_size.x as usize + pos.x as usize; + scale_height_big(alt[posi]) + } else { + 0.0 + }) + }, + |pos, (r, g, b, a)| { + world_map[pos.y * map_size.x as usize + pos.x] = + u32::from_le_bytes([r, g, b, a]); + }, + ); + ping_stream.send(PingMsg::Ping)?; + let make_raw = |rgba| -> Result<_, Error> { + let mut raw = vec![0u8; 4 * world_map.len()]; + LittleEndian::write_u32_into(rgba, &mut raw); + Ok(Arc::new( + image::DynamicImage::ImageRgba8({ + // Should not fail if the dimensions are correct. + let map = + image::ImageBuffer::from_raw(u32::from(map_size.x), u32::from(map_size.y), raw); + map.ok_or_else(|| Error::Other("Server sent a bad world map image".into()))? + }) + // Flip the image, since Voxygen uses an orientation where rotation from + // positive x axis to positive y axis is counterclockwise around the z axis. + .flipv(), + )) + }; + ping_stream.send(PingMsg::Ping)?; + let lod_base = rgba; + let lod_alt = alt; + let world_map = make_raw(&world_map)?; + let horizons = (west.0, west.1, east.0, east.1) + .into_par_iter() + .map(|(wa, wh, ea, eh)| u32::from_le_bytes([wa, wh, ea, eh])) + .collect::>(); + let lod_horizon = horizons; + let map_bounds = Vec2::new(sea_level, max_height); + debug!("Done preparing image..."); + + Ok(( + state, + entity, + lod_base, + lod_alt, + lod_horizon, + (world_map, map_size, map_bounds), + recipe_book, + max_group_size, + client_timeout, + )) + }, + ServerInitMsg::TooManyPlayers => Err(Error::TooManyPlayers), + }?; + ping_stream.send(PingMsg::Ping)?; let mut thread_pool = ThreadPoolBuilder::new() .name("veloren-worker".into()) @@ -375,8 +372,11 @@ impl Client { // We reduce the thread count by 1 to keep rendering smooth thread_pool.set_num_threads((num_cpus::get() - 1).max(1)); + debug!("Initial sync done"); + Ok(Self { - client_state, + registered: false, + client_ingame: None, thread_pool, server_info, world_map, @@ -398,6 +398,11 @@ impl Client { _network: network, participant: Some(participant), singleton_stream: stream, + ping_stream, + register_stream, + not_in_game_stream, + in_game_stream, + client_timeout, last_server_ping: 0.0, @@ -439,72 +444,52 @@ impl Client { } ).unwrap_or(Ok(username))?; - self.singleton_stream.send(ClientMsg::Register { + //TODO move ViewDistance out of register + self.register_stream.send(ClientRegisterMsg { view_distance: self.view_distance, token_or_username, })?; - self.client_state = ClientState::Pending; - block_on(async { - loop { - match self.singleton_stream.recv().await? { - ServerMsg::StateAnswer(Err(( - RequestStateError::RegisterDenied(err), - state, - ))) => { - self.client_state = state; - break Err(match err { - RegisterError::AlreadyLoggedIn => Error::AlreadyLoggedIn, - RegisterError::AuthError(err) => Error::AuthErr(err), - RegisterError::InvalidCharacter => Error::InvalidCharacter, - RegisterError::NotOnWhitelist => Error::NotOnWhitelist, - RegisterError::Banned(reason) => Error::Banned(reason), - }); - }, - ServerMsg::StateAnswer(Ok(ClientState::Registered)) => break Ok(()), - ignore => { - warn!( - "Ignoring what the server send till registered: {:? }", - ignore - ); - //return Err(Error::ServerWentMad) - }, - } - } - }) + match block_on(self.register_stream.recv::())? { + Err(RegisterError::AlreadyLoggedIn) => Err(Error::AlreadyLoggedIn), + Err(RegisterError::AuthError(err)) => Err(Error::AuthErr(err)), + Err(RegisterError::InvalidCharacter) => Err(Error::InvalidCharacter), + Err(RegisterError::NotOnWhitelist) => Err(Error::NotOnWhitelist), + Err(RegisterError::Banned(reason)) => Err(Error::Banned(reason)), + Ok(()) => Ok(()), + } } /// Request a state transition to `ClientState::Character`. pub fn request_character(&mut self, character_id: CharacterId) { - self.singleton_stream - .send(ClientMsg::Character(character_id)) + self.not_in_game_stream + .send(ClientNotInGameMsg::Character(character_id)) .unwrap(); self.active_character_id = Some(character_id); - self.client_state = ClientState::Pending; } /// Load the current players character list pub fn load_character_list(&mut self) { self.character_list.loading = true; - self.singleton_stream - .send(ClientMsg::RequestCharacterList) + self.not_in_game_stream + .send(ClientNotInGameMsg::RequestCharacterList) .unwrap(); } /// New character creation pub fn create_character(&mut self, alias: String, tool: Option, body: comp::Body) { self.character_list.loading = true; - self.singleton_stream - .send(ClientMsg::CreateCharacter { alias, tool, body }) + self.not_in_game_stream + .send(ClientNotInGameMsg::CreateCharacter { alias, tool, body }) .unwrap(); } /// Character deletion pub fn delete_character(&mut self, character_id: CharacterId) { self.character_list.loading = true; - self.singleton_stream - .send(ClientMsg::DeleteCharacter(character_id)) + self.not_in_game_stream + .send(ClientNotInGameMsg::DeleteCharacter(character_id)) .unwrap(); } @@ -522,37 +507,40 @@ impl Client { /// Request a state transition to `ClientState::Registered` from an ingame /// state. pub fn request_remove_character(&mut self) { - self.singleton_stream.send(ClientMsg::ExitIngame).unwrap(); - self.client_state = ClientState::Pending; + self.in_game_stream + .send(ClientInGameMsg::ExitInGame) + .unwrap(); } pub fn set_view_distance(&mut self, view_distance: u32) { self.view_distance = Some(view_distance.max(1).min(65)); - self.singleton_stream - .send(ClientMsg::SetViewDistance(self.view_distance.unwrap())) + self.in_game_stream + .send(ClientInGameMsg::SetViewDistance( + self.view_distance.unwrap(), + )) .unwrap(); // Can't fail } pub fn use_slot(&mut self, slot: comp::slot::Slot) { - self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.in_game_stream + .send(ClientInGameMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Use(slot), ))) .unwrap(); } pub fn swap_slots(&mut self, a: comp::slot::Slot, b: comp::slot::Slot) { - self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.in_game_stream + .send(ClientInGameMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Swap(a, b), ))) .unwrap(); } pub fn drop_slot(&mut self, slot: comp::slot::Slot) { - self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.in_game_stream + .send(ClientInGameMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Drop(slot), ))) .unwrap(); @@ -560,8 +548,8 @@ impl Client { pub fn pick_up(&mut self, entity: EcsEntity) { if let Some(uid) = self.state.read_component_copied(entity) { - self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.in_game_stream + .send(ClientInGameMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Pickup(uid), ))) .unwrap(); @@ -582,8 +570,8 @@ impl Client { pub fn craft_recipe(&mut self, recipe: &str) -> bool { if self.can_craft_recipe(recipe) { - self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.in_game_stream + .send(ClientInGameMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::CraftRecipe(recipe.to_string()), ))) .unwrap(); @@ -604,13 +592,13 @@ impl Client { pub fn enable_lantern(&mut self) { self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::EnableLantern)) + .send(ClientInGameMsg::ControlEvent(ControlEvent::EnableLantern)) .unwrap(); } pub fn disable_lantern(&mut self) { self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::DisableLantern)) + .send(ClientInGameMsg::ControlEvent(ControlEvent::DisableLantern)) .unwrap(); } @@ -629,8 +617,8 @@ impl Client { pub fn pending_invites(&self) -> &HashSet { &self.pending_invites } pub fn send_group_invite(&mut self, invitee: Uid) { - self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::GroupManip( + self.in_game_stream + .send(ClientInGameMsg::ControlEvent(ControlEvent::GroupManip( GroupManip::Invite(invitee), ))) .unwrap() @@ -639,8 +627,8 @@ impl Client { pub fn accept_group_invite(&mut self) { // Clear invite self.group_invite.take(); - self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::GroupManip( + self.in_game_stream + .send(ClientInGameMsg::ControlEvent(ControlEvent::GroupManip( GroupManip::Accept, ))) .unwrap(); @@ -649,32 +637,32 @@ impl Client { pub fn decline_group_invite(&mut self) { // Clear invite self.group_invite.take(); - self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::GroupManip( + self.in_game_stream + .send(ClientInGameMsg::ControlEvent(ControlEvent::GroupManip( GroupManip::Decline, ))) .unwrap(); } pub fn leave_group(&mut self) { - self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::GroupManip( + self.in_game_stream + .send(ClientInGameMsg::ControlEvent(ControlEvent::GroupManip( GroupManip::Leave, ))) .unwrap(); } pub fn kick_from_group(&mut self, uid: Uid) { - self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::GroupManip( + self.in_game_stream + .send(ClientInGameMsg::ControlEvent(ControlEvent::GroupManip( GroupManip::Kick(uid), ))) .unwrap(); } pub fn assign_group_leader(&mut self, uid: Uid) { - self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::GroupManip( + self.in_game_stream + .send(ClientInGameMsg::ControlEvent(ControlEvent::GroupManip( GroupManip::AssignLeader(uid), ))) .unwrap(); @@ -698,15 +686,15 @@ impl Client { pub fn mount(&mut self, entity: EcsEntity) { if let Some(uid) = self.state.read_component_copied(entity) { - self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::Mount(uid))) + self.in_game_stream + .send(ClientInGameMsg::ControlEvent(ControlEvent::Mount(uid))) .unwrap(); } } pub fn unmount(&mut self) { - self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::Unmount)) + self.in_game_stream + .send(ClientInGameMsg::ControlEvent(ControlEvent::Unmount)) .unwrap(); } @@ -718,8 +706,8 @@ impl Client { .get(self.entity) .map_or(false, |s| s.is_dead) { - self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::Respawn)) + self.in_game_stream + .send(ClientInGameMsg::ControlEvent(ControlEvent::Respawn)) .unwrap(); } } @@ -817,8 +805,8 @@ impl Client { { controller.actions.push(control_action); } - self.singleton_stream - .send(ClientMsg::ControlAction(control_action)) + self.in_game_stream + .send(ClientInGameMsg::ControlAction(control_action)) .unwrap(); } @@ -866,20 +854,20 @@ impl Client { } pub fn place_block(&mut self, pos: Vec3, block: Block) { - self.singleton_stream - .send(ClientMsg::PlaceBlock(pos, block)) + self.in_game_stream + .send(ClientInGameMsg::PlaceBlock(pos, block)) .unwrap(); } pub fn remove_block(&mut self, pos: Vec3) { - self.singleton_stream - .send(ClientMsg::BreakBlock(pos)) + self.in_game_stream + .send(ClientInGameMsg::BreakBlock(pos)) .unwrap(); } pub fn collect_block(&mut self, pos: Vec3) { - self.singleton_stream - .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.in_game_stream + .send(ClientInGameMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Collect(pos), ))) .unwrap(); @@ -914,7 +902,7 @@ impl Client { // 1) Handle input from frontend. // Pass character actions from frontend input to the player's entity. - if let ClientState::Character = self.client_state { + if self.client_ingame.is_some() { if let Err(e) = self .state .ecs() @@ -937,8 +925,8 @@ impl Client { "Couldn't access controller component on client entity" ); } - self.singleton_stream - .send(ClientMsg::ControllerInputs(inputs))?; + self.in_game_stream + .send(ClientInGameMsg::ControllerInputs(inputs))?; } // 2) Build up a list of events for this frame, to be passed to the frontend. @@ -1042,8 +1030,8 @@ impl Client { if self.state.terrain().get_key(*key).is_none() { if !skip_mode && !self.pending_chunks.contains_key(key) { if self.pending_chunks.len() < 4 { - self.singleton_stream - .send(ClientMsg::TerrainChunkRequest { key: *key })?; + self.in_game_stream + .send(ClientInGameMsg::TerrainChunkRequest { key: *key })?; self.pending_chunks.insert(*key, Instant::now()); } else { skip_mode = true; @@ -1075,19 +1063,19 @@ impl Client { // Send a ping to the server once every second if self.state.get_time() - self.last_server_ping > 1. { - self.singleton_stream.send(ClientMsg::Ping)?; + self.ping_stream.send(PingMsg::Ping)?; self.last_server_ping = self.state.get_time(); } // 6) Update the server about the player's physics attributes. - if let ClientState::Character = self.client_state { + if self.client_ingame.is_some() { if let (Some(pos), Some(vel), Some(ori)) = ( self.state.read_storage().get(self.entity).cloned(), self.state.read_storage().get(self.entity).cloned(), self.state.read_storage().get(self.entity).cloned(), ) { - self.singleton_stream - .send(ClientMsg::PlayerPhysics { pos, vel, ori })?; + self.in_game_stream + .send(ClientInGameMsg::PlayerPhysics { pos, vel, ori })?; } } @@ -1114,343 +1102,368 @@ impl Client { self.state.cleanup(); } - async fn handle_message( + fn handle_server_msg( + &mut self, + frontend_events: &mut Vec, + msg: ServerMsg, + ) -> Result<(), Error> { + match msg { + ServerMsg::Disconnect(reason) => match reason { + DisconnectReason::Shutdown => return Err(Error::ServerShutdown), + DisconnectReason::Requested => { + debug!("finally sending ClientMsg::Terminate"); + frontend_events.push(Event::Disconnect); + self.singleton_stream.send(ClientMsg::Terminate)?; + }, + DisconnectReason::Kicked(reason) => { + debug!("sending ClientMsg::Terminate because we got kicked"); + frontend_events.push(Event::Kicked(reason.clone())); + self.singleton_stream.send(ClientMsg::Terminate)?; + }, + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::Init(list)) => self.player_list = list, + ServerMsg::PlayerListUpdate(PlayerListUpdate::Add(uid, player_info)) => { + if let Some(old_player_info) = self.player_list.insert(uid, player_info.clone()) { + warn!( + "Received msg to insert {} with uid {} into the player list but there was \ + already an entry for {} with the same uid that was overwritten!", + player_info.player_alias, uid, old_player_info.player_alias + ); + } + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::Admin(uid, admin)) => { + if let Some(player_info) = self.player_list.get_mut(&uid) { + player_info.is_admin = admin; + } else { + warn!( + "Received msg to update admin status of uid {}, but they were not in the \ + list.", + uid + ); + } + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::SelectedCharacter(uid, char_info)) => { + if let Some(player_info) = self.player_list.get_mut(&uid) { + player_info.character = Some(char_info); + } else { + warn!( + "Received msg to update character info for uid {}, but they were not in \ + the list.", + uid + ); + } + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::LevelChange(uid, next_level)) => { + if let Some(player_info) = self.player_list.get_mut(&uid) { + player_info.character = match &player_info.character { + Some(character) => Some(common::msg::CharacterInfo { + name: character.name.to_string(), + level: next_level, + }), + None => { + warn!( + "Received msg to update character level info to {} for uid {}, \ + but this player's character is None.", + next_level, uid + ); + + None + }, + }; + } + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::Remove(uid)) => { + // Instead of removing players, mark them as offline because we need to + // remember the names of disconnected players in chat. + // + // TODO the server should re-use uids of players that log out and log back + // in. + + if let Some(player_info) = self.player_list.get_mut(&uid) { + if player_info.is_online { + player_info.is_online = false; + } else { + warn!( + "Received msg to remove uid {} from the player list by they were \ + already marked offline", + uid + ); + } + } else { + warn!( + "Received msg to remove uid {} from the player list by they weren't in \ + the list!", + uid + ); + } + }, + ServerMsg::PlayerListUpdate(PlayerListUpdate::Alias(uid, new_name)) => { + if let Some(player_info) = self.player_list.get_mut(&uid) { + player_info.player_alias = new_name; + } else { + warn!( + "Received msg to alias player with uid {} to {} but this uid is not in \ + the player list", + uid, new_name + ); + } + }, + ServerMsg::ChatMsg(m) => frontend_events.push(Event::Chat(m)), + ServerMsg::SetPlayerEntity(uid) => { + if let Some(entity) = self.state.ecs().entity_from_uid(uid.0) { + self.entity = entity; + } else { + return Err(Error::Other("Failed to find entity from uid.".to_owned())); + } + }, + ServerMsg::TimeOfDay(time_of_day) => { + *self.state.ecs_mut().write_resource() = time_of_day; + }, + ServerMsg::EntitySync(entity_sync_package) => { + self.state + .ecs_mut() + .apply_entity_sync_package(entity_sync_package); + }, + ServerMsg::CompSync(comp_sync_package) => { + self.state + .ecs_mut() + .apply_comp_sync_package(comp_sync_package); + }, + ServerMsg::CreateEntity(entity_package) => { + self.state.ecs_mut().apply_entity_package(entity_package); + }, + ServerMsg::DeleteEntity(entity) => { + if self.uid() != Some(entity) { + self.state + .ecs_mut() + .delete_entity_and_clear_from_uid_allocator(entity.0); + } + }, + ServerMsg::Notification(n) => { + frontend_events.push(Event::Notification(n)); + }, + } + Ok(()) + } + + fn handle_server_in_game_msg( + &mut self, + frontend_events: &mut Vec, + msg: ServerInGameMsg, + ) -> Result<(), Error> { + match msg { + ServerInGameMsg::GroupUpdate(change_notification) => { + use comp::group::ChangeNotification::*; + // Note: we use a hashmap since this would not work with entities outside + // the view distance + match change_notification { + Added(uid, role) => { + // Check if this is a newly formed group by looking for absence of + // other non pet group members + if !matches!(role, group::Role::Pet) + && !self + .group_members + .values() + .any(|r| !matches!(r, group::Role::Pet)) + { + frontend_events + .push(Event::Chat(comp::ChatType::Meta.chat_msg( + "Type /g or /group to chat with your group members", + ))); + } + if let Some(player_info) = self.player_list.get(&uid) { + frontend_events.push(Event::Chat( + comp::ChatType::GroupMeta("Group".into()).chat_msg(format!( + "[{}] joined group", + self.personalize_alias(uid, player_info.player_alias.clone()) + )), + )); + } + if self.group_members.insert(uid, role) == Some(role) { + warn!( + "Received msg to add uid {} to the group members but they were \ + already there", + uid + ); + } + }, + Removed(uid) => { + if let Some(player_info) = self.player_list.get(&uid) { + frontend_events.push(Event::Chat( + comp::ChatType::GroupMeta("Group".into()).chat_msg(format!( + "[{}] left group", + self.personalize_alias(uid, player_info.player_alias.clone()) + )), + )); + } + if self.group_members.remove(&uid).is_none() { + warn!( + "Received msg to remove uid {} from group members but by they \ + weren't in there!", + uid + ); + } + }, + NewLeader(leader) => { + self.group_leader = Some(leader); + }, + NewGroup { leader, members } => { + self.group_leader = Some(leader); + self.group_members = members.into_iter().collect(); + // Currently add/remove messages treat client as an implicit member + // of the group whereas this message explicitly includes them so to + // be consistent for now we will remove the client from the + // received hashset + if let Some(uid) = self.uid() { + self.group_members.remove(&uid); + } + }, + NoGroup => { + self.group_leader = None; + self.group_members = HashMap::new(); + }, + } + }, + ServerInGameMsg::GroupInvite { inviter, timeout } => { + self.group_invite = Some((inviter, std::time::Instant::now(), timeout)); + }, + ServerInGameMsg::InvitePending(uid) => { + if !self.pending_invites.insert(uid) { + warn!("Received message about pending invite that was already pending"); + } + }, + ServerInGameMsg::InviteComplete { target, answer } => { + if !self.pending_invites.remove(&target) { + warn!( + "Received completed invite message for invite that was not in the list of \ + pending invites" + ) + } + // TODO: expose this as a new event variant instead of going + // through the chat + let msg = match answer { + // TODO: say who accepted/declined/timed out the invite + InviteAnswer::Accepted => "Invite accepted", + InviteAnswer::Declined => "Invite declined", + InviteAnswer::TimedOut => "Invite timed out", + }; + frontend_events.push(Event::Chat(comp::ChatType::Meta.chat_msg(msg))); + }, + // Cleanup for when the client goes back to the `in_game = None` + ServerInGameMsg::ExitInGameSuccess => { + self.clean_state(); + }, + ServerInGameMsg::InventoryUpdate(mut inventory, event) => { + match event { + InventoryUpdateEvent::CollectFailed => {}, + _ => { + inventory.recount_items(); + // Push the updated inventory component to the client + self.state.write_component(self.entity, inventory); + }, + } + + self.update_available_recipes(); + + frontend_events.push(Event::InventoryUpdated(event)); + }, + ServerInGameMsg::TerrainChunkUpdate { key, chunk } => { + if let Ok(chunk) = chunk { + self.state.insert_chunk(key, *chunk); + } + self.pending_chunks.remove(&key); + }, + ServerInGameMsg::TerrainBlockUpdates(mut blocks) => { + blocks.drain().for_each(|(pos, block)| { + self.state.set_block(pos, block); + }); + }, + ServerInGameMsg::SetViewDistance(vd) => { + self.view_distance = Some(vd); + frontend_events.push(Event::SetViewDistance(vd)); + }, + ServerInGameMsg::Outcomes(outcomes) => { + frontend_events.extend(outcomes.into_iter().map(Event::Outcome)) + }, + ServerInGameMsg::Knockback(impulse) => { + self.state + .ecs() + .read_resource::>() + .emit_now(LocalEvent::ApplyImpulse { + entity: self.entity, + impulse, + }); + }, + } + Ok(()) + } + + fn handle_server_not_in_game_msg(&mut self, msg: ServerNotInGameMsg) -> Result<(), Error> { + match msg { + ServerNotInGameMsg::CharacterListUpdate(character_list) => { + self.character_list.characters = character_list; + self.character_list.loading = false; + }, + ServerNotInGameMsg::CharacterActionError(error) => { + warn!("CharacterActionError: {:?}.", error); + self.character_list.error = Some(error); + }, + ServerNotInGameMsg::CharacterDataLoadError(error) => { + self.clean_state(); + self.character_list.error = Some(error); + }, + ServerNotInGameMsg::CharacterSuccess => { + warn!("WOOP88u8yeah"); + }, + } + Ok(()) + } + + fn handle_ping_msg(&mut self, msg: PingMsg) -> Result<(), Error> { + match msg { + PingMsg::Ping => { + self.ping_stream.send(PingMsg::Pong)?; + }, + PingMsg::Pong => { + self.last_server_pong = self.state.get_time(); + self.last_ping_delta = self.state.get_time() - self.last_server_ping; + + // Maintain the correct number of deltas for calculating the rolling average + // ping. The client sends a ping to the server every second so we should be + // receiving a pong reply roughly every second. + while self.ping_deltas.len() > PING_ROLLING_AVERAGE_SECS - 1 { + self.ping_deltas.pop_front(); + } + self.ping_deltas.push_back(self.last_ping_delta); + }, + } + Ok(()) + } + + async fn handle_messages( &mut self, frontend_events: &mut Vec, cnt: &mut u64, ) -> Result<(), Error> { loop { - let msg = self.singleton_stream.recv().await?; + let (m1, m2, m3, m4) = select!( + msg = self.singleton_stream.recv().fuse() => (Some(msg?), None, None, None), + msg = self.ping_stream.recv().fuse() => (None, Some(msg?), None, None), + msg = self.not_in_game_stream.recv().fuse() => (None, None, Some(msg?), None), + msg = self.in_game_stream.recv().fuse() => (None, None, None, Some(msg?)), + ); *cnt += 1; - match msg { - ServerMsg::TooManyPlayers => { - return Err(Error::ServerWentMad); - }, - ServerMsg::Disconnect(reason) => match reason { - DisconnectReason::Shutdown => return Err(Error::ServerShutdown), - DisconnectReason::Requested => { - debug!("finally sending ClientMsg::Terminate"); - frontend_events.push(Event::Disconnect); - self.singleton_stream.send(ClientMsg::Terminate)?; - break Ok(()); - }, - DisconnectReason::Kicked(reason) => { - debug!("sending ClientMsg::Terminate because we got kicked"); - frontend_events.push(Event::Kicked(reason.clone())); - self.singleton_stream.send(ClientMsg::Terminate)?; - }, - }, - ServerMsg::InitialSync { .. } => return Err(Error::ServerWentMad), - ServerMsg::PlayerListUpdate(PlayerListUpdate::Init(list)) => { - self.player_list = list - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::Add(uid, player_info)) => { - if let Some(old_player_info) = self.player_list.insert(uid, player_info.clone()) - { - warn!( - "Received msg to insert {} with uid {} into the player list but there \ - was already an entry for {} with the same uid that was overwritten!", - player_info.player_alias, uid, old_player_info.player_alias - ); - } - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::Admin(uid, admin)) => { - if let Some(player_info) = self.player_list.get_mut(&uid) { - player_info.is_admin = admin; - } else { - warn!( - "Received msg to update admin status of uid {}, but they were not in \ - the list.", - uid - ); - } - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::SelectedCharacter( - uid, - char_info, - )) => { - if let Some(player_info) = self.player_list.get_mut(&uid) { - player_info.character = Some(char_info); - } else { - warn!( - "Received msg to update character info for uid {}, but they were not \ - in the list.", - uid - ); - } - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::LevelChange(uid, next_level)) => { - if let Some(player_info) = self.player_list.get_mut(&uid) { - player_info.character = match &player_info.character { - Some(character) => Some(common::msg::CharacterInfo { - name: character.name.to_string(), - level: next_level, - }), - None => { - warn!( - "Received msg to update character level info to {} for uid \ - {}, but this player's character is None.", - next_level, uid - ); - - None - }, - }; - } - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::Remove(uid)) => { - // Instead of removing players, mark them as offline because we need to - // remember the names of disconnected players in chat. - // - // TODO the server should re-use uids of players that log out and log back - // in. - - if let Some(player_info) = self.player_list.get_mut(&uid) { - if player_info.is_online { - player_info.is_online = false; - } else { - warn!( - "Received msg to remove uid {} from the player list by they were \ - already marked offline", - uid - ); - } - } else { - warn!( - "Received msg to remove uid {} from the player list by they weren't \ - in the list!", - uid - ); - } - }, - ServerMsg::PlayerListUpdate(PlayerListUpdate::Alias(uid, new_name)) => { - if let Some(player_info) = self.player_list.get_mut(&uid) { - player_info.player_alias = new_name; - } else { - warn!( - "Received msg to alias player with uid {} to {} but this uid is not \ - in the player list", - uid, new_name - ); - } - }, - ServerMsg::GroupUpdate(change_notification) => { - use comp::group::ChangeNotification::*; - // Note: we use a hashmap since this would not work with entities outside - // the view distance - match change_notification { - Added(uid, role) => { - // Check if this is a newly formed group by looking for absence of - // other non pet group members - if !matches!(role, group::Role::Pet) - && !self - .group_members - .values() - .any(|r| !matches!(r, group::Role::Pet)) - { - frontend_events.push(Event::Chat(comp::ChatType::Meta.chat_msg( - "Type /g or /group to chat with your group members", - ))); - } - if let Some(player_info) = self.player_list.get(&uid) { - frontend_events.push(Event::Chat( - comp::ChatType::GroupMeta("Group".into()).chat_msg(format!( - "[{}] joined group", - self.personalize_alias( - uid, - player_info.player_alias.clone() - ) - )), - )); - } - if self.group_members.insert(uid, role) == Some(role) { - warn!( - "Received msg to add uid {} to the group members but they \ - were already there", - uid - ); - } - }, - Removed(uid) => { - if let Some(player_info) = self.player_list.get(&uid) { - frontend_events.push(Event::Chat( - comp::ChatType::GroupMeta("Group".into()).chat_msg(format!( - "[{}] left group", - self.personalize_alias( - uid, - player_info.player_alias.clone() - ) - )), - )); - } - if self.group_members.remove(&uid).is_none() { - warn!( - "Received msg to remove uid {} from group members but by they \ - weren't in there!", - uid - ); - } - }, - NewLeader(leader) => { - self.group_leader = Some(leader); - }, - NewGroup { leader, members } => { - self.group_leader = Some(leader); - self.group_members = members.into_iter().collect(); - // Currently add/remove messages treat client as an implicit member - // of the group whereas this message explicitly includes them so to - // be consistent for now we will remove the client from the - // received hashset - if let Some(uid) = self.uid() { - self.group_members.remove(&uid); - } - }, - NoGroup => { - self.group_leader = None; - self.group_members = HashMap::new(); - }, - } - }, - ServerMsg::GroupInvite { inviter, timeout } => { - self.group_invite = Some((inviter, std::time::Instant::now(), timeout)); - }, - ServerMsg::InvitePending(uid) => { - if !self.pending_invites.insert(uid) { - warn!("Received message about pending invite that was already pending"); - } - }, - ServerMsg::InviteComplete { target, answer } => { - if !self.pending_invites.remove(&target) { - warn!( - "Received completed invite message for invite that was not in the \ - list of pending invites" - ) - } - // TODO: expose this as a new event variant instead of going - // through the chat - let msg = match answer { - // TODO: say who accepted/declined/timed out the invite - InviteAnswer::Accepted => "Invite accepted", - InviteAnswer::Declined => "Invite declined", - InviteAnswer::TimedOut => "Invite timed out", - }; - frontend_events.push(Event::Chat(comp::ChatType::Meta.chat_msg(msg))); - }, - ServerMsg::Ping => { - self.singleton_stream.send(ClientMsg::Pong)?; - }, - ServerMsg::Pong => { - self.last_server_pong = self.state.get_time(); - self.last_ping_delta = self.state.get_time() - self.last_server_ping; - - // Maintain the correct number of deltas for calculating the rolling average - // ping. The client sends a ping to the server every second so we should be - // receiving a pong reply roughly every second. - while self.ping_deltas.len() > PING_ROLLING_AVERAGE_SECS - 1 { - self.ping_deltas.pop_front(); - } - self.ping_deltas.push_back(self.last_ping_delta); - }, - ServerMsg::ChatMsg(m) => frontend_events.push(Event::Chat(m)), - ServerMsg::SetPlayerEntity(uid) => { - if let Some(entity) = self.state.ecs().entity_from_uid(uid.0) { - self.entity = entity; - } else { - return Err(Error::Other("Failed to find entity from uid.".to_owned())); - } - }, - ServerMsg::TimeOfDay(time_of_day) => { - *self.state.ecs_mut().write_resource() = time_of_day; - }, - ServerMsg::EntitySync(entity_sync_package) => { - self.state - .ecs_mut() - .apply_entity_sync_package(entity_sync_package); - }, - ServerMsg::CompSync(comp_sync_package) => { - self.state - .ecs_mut() - .apply_comp_sync_package(comp_sync_package); - }, - ServerMsg::CreateEntity(entity_package) => { - self.state.ecs_mut().apply_entity_package(entity_package); - }, - ServerMsg::DeleteEntity(entity) => { - if self.uid() != Some(entity) { - self.state - .ecs_mut() - .delete_entity_and_clear_from_uid_allocator(entity.0); - } - }, - // Cleanup for when the client goes back to the `Registered` state - ServerMsg::ExitIngameCleanup => { - self.clean_state(); - }, - ServerMsg::InventoryUpdate(mut inventory, event) => { - match event { - InventoryUpdateEvent::CollectFailed => {}, - _ => { - inventory.recount_items(); - // Push the updated inventory component to the client - self.state.write_component(self.entity, inventory); - }, - } - - self.update_available_recipes(); - - frontend_events.push(Event::InventoryUpdated(event)); - }, - ServerMsg::TerrainChunkUpdate { key, chunk } => { - if let Ok(chunk) = chunk { - self.state.insert_chunk(key, *chunk); - } - self.pending_chunks.remove(&key); - }, - ServerMsg::TerrainBlockUpdates(mut blocks) => { - blocks.drain().for_each(|(pos, block)| { - self.state.set_block(pos, block); - }); - }, - ServerMsg::StateAnswer(Ok(state)) => { - self.client_state = state; - }, - ServerMsg::StateAnswer(Err((error, state))) => { - warn!( - "StateAnswer: {:?}. Server thinks client is in state {:?}.", - error, state - ); - }, - ServerMsg::CharacterListUpdate(character_list) => { - self.character_list.characters = character_list; - self.character_list.loading = false; - }, - ServerMsg::CharacterActionError(error) => { - warn!("CharacterActionError: {:?}.", error); - self.character_list.error = Some(error); - }, - ServerMsg::Notification(n) => { - frontend_events.push(Event::Notification(n)); - }, - ServerMsg::CharacterDataLoadError(error) => { - self.clean_state(); - self.character_list.error = Some(error); - }, - ServerMsg::SetViewDistance(vd) => { - self.view_distance = Some(vd); - frontend_events.push(Event::SetViewDistance(vd)); - }, - ServerMsg::Outcomes(outcomes) => { - frontend_events.extend(outcomes.into_iter().map(Event::Outcome)) - }, - ServerMsg::Knockback(impulse) => { - self.state - .ecs() - .read_resource::>() - .emit_now(LocalEvent::ApplyImpulse { - entity: self.entity, - impulse, - }); - }, + if let Some(msg) = m1 { + self.handle_server_msg(frontend_events, msg)?; + } + if let Some(msg) = m2 { + self.handle_ping_msg(msg)?; + } + if let Some(msg) = m3 { + self.handle_server_not_in_game_msg(msg)?; + } + if let Some(msg) = m4 { + self.handle_server_in_game_msg(frontend_events, msg)?; } } } @@ -1484,7 +1497,7 @@ impl Client { //TIMEOUT 0.01 ms for msg handling select!( _ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()), - err = self.handle_message(&mut frontend_events, &mut handles_msg).fuse() => err, + err = self.handle_messages(&mut frontend_events, &mut handles_msg).fuse() => err, ) })?; @@ -1503,8 +1516,11 @@ impl Client { /// Get the player's Uid. pub fn uid(&self) -> Option { self.state.read_component_copied(self.entity) } - /// Get the client state - pub fn get_client_state(&self) -> ClientState { self.client_state } + pub fn get_client_type(&self) -> ClientType { ClientType::Game } + + pub fn get_in_game(&self) -> Option { self.client_ingame } + + pub fn get_registered(&self) -> bool { self.registered } /// Get the current tick number. pub fn get_tick(&self) -> u64 { self.tick } diff --git a/common/src/msg/client.rs b/common/src/msg/client.rs index b39e21316b..5e464fc5b9 100644 --- a/common/src/msg/client.rs +++ b/common/src/msg/client.rs @@ -7,13 +7,26 @@ use crate::{ use serde::{Deserialize, Serialize}; use vek::*; -/// Messages sent from the client to the server -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ClientMsg { - Register { - view_distance: Option, - token_or_username: String, - }, +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +pub enum ClientType { + // Regular Client like Voxygen who plays the game + Game, + // A Chatonly client, which doesn't want to connect via its character + ChatOnly, + // A unprivileged bot, e.g. to request world information + // Or a privileged bot, e.g. to run admin commands used by server-cli + Bot { privileged: bool }, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ClientRegisterMsg { + pub view_distance: Option, + pub token_or_username: String, +} + +//messages send by clients only valid when NOT ingame +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum ClientNotInGameMsg { RequestCharacterList, CreateCharacter { alias: String, @@ -22,20 +35,19 @@ pub enum ClientMsg { }, DeleteCharacter(CharacterId), Character(CharacterId), - /// Request `ClientState::Registered` from an ingame state - ExitIngame, - /// Request `ClientState::Spectator` from a registered or ingame state Spectate, +} + +//messages send by clients only valid when ingame +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum ClientInGameMsg { ControllerInputs(comp::ControllerInputs), ControlEvent(comp::ControlEvent), ControlAction(comp::ControlAction), SetViewDistance(u32), BreakBlock(Vec3), PlaceBlock(Vec3, Block), - Ping, - Pong, - /// Send the chat message or command to be processed by the server - ChatMsg(String), + ExitInGame, PlayerPhysics { pos: comp::Pos, vel: comp::Vel, @@ -44,9 +56,16 @@ pub enum ClientMsg { TerrainChunkRequest { key: Vec2, }, - Disconnect, - Terminate, UnlockSkill(Skill), RefundSkill(Skill), UnlockSkillGroup(SkillGroupType), } + +/// Messages sent from the client to the server +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ClientMsg { + ChatMsg(String), + Command(String), + Disconnect, + Terminate, +} diff --git a/common/src/msg/mod.rs b/common/src/msg/mod.rs index 64a6d4be1f..964ee157fc 100644 --- a/common/src/msg/mod.rs +++ b/common/src/msg/mod.rs @@ -4,24 +4,28 @@ pub mod server; // Reexports pub use self::{ - client::ClientMsg, + client::{ClientInGameMsg, ClientMsg, ClientNotInGameMsg, ClientRegisterMsg, ClientType}, ecs_packet::EcsCompPacket, server::{ CharacterInfo, DisconnectReason, InviteAnswer, Notification, PlayerInfo, PlayerListUpdate, - RegisterError, RequestStateError, ServerInfo, ServerMsg, + RegisterError, ServerInGameMsg, ServerInfo, ServerInitMsg, ServerMsg, ServerNotInGameMsg, + ServerRegisterAnswerMsg, }, }; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] -pub enum ClientState { - Pending, - Connected, - Registered, +pub enum ClientIngame { Spectator, Character, } +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +pub enum PingMsg { + Ping, + Pong, +} + pub const MAX_BYTES_CHAT_MSG: usize = 256; pub enum ChatMsgValidationError { diff --git a/common/src/msg/server.rs b/common/src/msg/server.rs index ce86e46c73..8aaa9873c4 100644 --- a/common/src/msg/server.rs +++ b/common/src/msg/server.rs @@ -1,4 +1,4 @@ -use super::{ClientState, EcsCompPacket}; +use super::EcsCompPacket; use crate::{ character::CharacterItem, comp, @@ -192,25 +192,37 @@ pub enum DisconnectReason { Kicked(String), } -/// Messages sent from the server to the client +/// Reponse To ClientType #[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ServerMsg { - InitialSync { +pub enum ServerInitMsg { + TooManyPlayers, + GameSync { entity_package: sync::EntityPackage, - server_info: ServerInfo, time_of_day: state::TimeOfDay, max_group_size: u32, client_timeout: Duration, world_map: WorldMapMsg, recipe_book: RecipeBook, }, +} + +pub type ServerRegisterAnswerMsg = Result<(), RegisterError>; + +//Messages only allowed while client ingame +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ServerNotInGameMsg { /// An error occurred while loading character data CharacterDataLoadError(String), /// A list of characters belonging to the a authenticated player was sent CharacterListUpdate(Vec), /// An error occurred while creating or deleting a character CharacterActionError(String), - PlayerListUpdate(PlayerListUpdate), + CharacterSuccess, +} + +//Messages only allowed while client ingame +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ServerInGameMsg { GroupUpdate(comp::group::ChangeNotification), // Indicate to the client that they are invited to join a group GroupInvite { @@ -227,12 +239,24 @@ pub enum ServerMsg { target: sync::Uid, answer: InviteAnswer, }, - StateAnswer(Result), /// Trigger cleanup for when the client goes back to the `Registered` state /// from an ingame state - ExitIngameCleanup, - Ping, - Pong, + ExitInGameSuccess, + InventoryUpdate(comp::Inventory, comp::InventoryUpdateEvent), + TerrainChunkUpdate { + key: Vec2, + chunk: Result, ()>, + }, + TerrainBlockUpdates(HashMap, Block>), + SetViewDistance(u32), + Outcomes(Vec), + Knockback(Vec3), +} + +/// Messages sent from the server to the client +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ServerMsg { + PlayerListUpdate(PlayerListUpdate), /// A message to go into the client chat box. The client is responsible for /// formatting the message and turning it into a speech bubble. ChatMsg(comp::ChatMsg), @@ -242,28 +266,9 @@ pub enum ServerMsg { CompSync(sync::CompSyncPackage), CreateEntity(sync::EntityPackage), DeleteEntity(Uid), - InventoryUpdate(comp::Inventory, comp::InventoryUpdateEvent), - TerrainChunkUpdate { - key: Vec2, - chunk: Result, ()>, - }, - TerrainBlockUpdates(HashMap, Block>), Disconnect(DisconnectReason), - TooManyPlayers, /// Send a popup notification such as "Waypoint Saved" Notification(Notification), - SetViewDistance(u32), - Outcomes(Vec), - Knockback(Vec3), -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub enum RequestStateError { - RegisterDenied(RegisterError), - Denied, - Already, - Impossible, - WrongMessage, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] diff --git a/server/src/client.rs b/server/src/client.rs index 3cb76f659b..7423589d1f 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,20 +1,30 @@ use crate::error::Error; -use common::msg::{ClientMsg, ClientState, RequestStateError, ServerMsg}; +use common::msg::{ + ClientInGameMsg, ClientIngame, ClientMsg, ClientNotInGameMsg, ClientType, PingMsg, + ServerInGameMsg, ServerInitMsg, ServerMsg, ServerNotInGameMsg, +}; use hashbrown::HashSet; -use network::{Participant, Stream}; +use network::{MessageBuffer, Participant, Stream}; +use serde::{de::DeserializeOwned, Serialize}; use specs::{Component, FlaggedStorage}; use specs_idvs::IdvStorage; use std::sync::{ atomic::{AtomicBool, Ordering}, - Mutex, + Arc, Mutex, }; use tracing::debug; use vek::*; pub struct Client { - pub client_state: ClientState, + pub registered: bool, + pub client_type: ClientType, + pub in_game: Option, pub participant: Mutex>, pub singleton_stream: Stream, + pub ping_stream: Stream, + pub register_stream: Stream, + pub in_game_stream: Stream, + pub not_in_game_stream: Stream, pub network_error: AtomicBool, pub last_ping: f64, pub login_msg_sent: bool, @@ -25,22 +35,58 @@ impl Component for Client { } impl Client { - pub fn notify(&mut self, msg: ServerMsg) { - if !self.network_error.load(Ordering::Relaxed) { - if let Err(e) = self.singleton_stream.send(msg) { + fn internal_send(b: &AtomicBool, s: &mut Stream, msg: M) { + if !b.load(Ordering::Relaxed) { + if let Err(e) = s.send(msg) { debug!(?e, "got a network error with client"); - self.network_error.store(true, Ordering::Relaxed); + b.store(true, Ordering::Relaxed); } } } - pub async fn recv(&mut self) -> Result { - if !self.network_error.load(Ordering::Relaxed) { - match self.singleton_stream.recv().await { + fn internal_send_raw(b: &AtomicBool, s: &mut Stream, msg: Arc) { + if !b.load(Ordering::Relaxed) { + if let Err(e) = s.send_raw(msg) { + debug!(?e, "got a network error with client"); + b.store(true, Ordering::Relaxed); + } + } + } + + pub fn send_init(&mut self, msg: ServerInitMsg) { + Self::internal_send(&self.network_error, &mut self.register_stream, msg); + } + + pub fn send_msg(&mut self, msg: ServerMsg) { + Self::internal_send(&self.network_error, &mut self.singleton_stream, msg); + } + + pub fn send_in_game(&mut self, msg: ServerInGameMsg) { + Self::internal_send(&self.network_error, &mut self.in_game_stream, msg); + } + + pub fn send_not_in_game(&mut self, msg: ServerNotInGameMsg) { + Self::internal_send(&self.network_error, &mut self.not_in_game_stream, msg); + } + + pub fn send_ping(&mut self, msg: PingMsg) { + Self::internal_send(&self.network_error, &mut self.ping_stream, msg); + } + + pub fn send_msg_raw(&mut self, msg: Arc) { + Self::internal_send_raw(&self.network_error, &mut self.singleton_stream, msg); + } + + pub async fn internal_recv( + b: &AtomicBool, + s: &mut Stream, + ) -> Result { + if !b.load(Ordering::Relaxed) { + match s.recv().await { Ok(r) => Ok(r), Err(e) => { debug!(?e, "got a network error with client while recv"); - self.network_error.store(true, Ordering::Relaxed); + b.store(true, Ordering::Relaxed); Err(Error::StreamErr(e)) }, } @@ -49,29 +95,20 @@ impl Client { } } - pub fn is_registered(&self) -> bool { - matches!( - self.client_state, - ClientState::Registered | ClientState::Spectator | ClientState::Character - ) + pub async fn recv_msg(&mut self) -> Result { + Self::internal_recv(&self.network_error, &mut self.singleton_stream).await } - pub fn is_ingame(&self) -> bool { - matches!( - self.client_state, - ClientState::Spectator | ClientState::Character - ) + pub async fn recv_in_game_msg(&mut self) -> Result { + Self::internal_recv(&self.network_error, &mut self.in_game_stream).await } - pub fn allow_state(&mut self, new_state: ClientState) { - self.client_state = new_state; - let _ = self - .singleton_stream - .send(ServerMsg::StateAnswer(Ok(new_state))); + pub async fn recv_not_in_game_msg(&mut self) -> Result { + Self::internal_recv(&self.network_error, &mut self.not_in_game_stream).await } - pub fn error_state(&mut self, error: RequestStateError) { - let _ = self.notify(ServerMsg::StateAnswer(Err((error, self.client_state)))); + pub async fn recv_ping_msg(&mut self) -> Result { + Self::internal_recv(&self.network_error, &mut self.ping_stream).await } } diff --git a/server/src/cmd.rs b/server/src/cmd.rs index 704b290d9e..ccd9ca219f 100644 --- a/server/src/cmd.rs +++ b/server/src/cmd.rs @@ -12,7 +12,7 @@ use common::{ cmd::{ChatCommand, CHAT_COMMANDS, CHAT_SHORTCUTS}, comp::{self, ChatType, Item, LightEmitter, WaypointArea}, event::{EventBus, ServerEvent}, - msg::{DisconnectReason, Notification, PlayerListUpdate, ServerMsg}, + msg::{DisconnectReason, Notification, PlayerListUpdate, ServerInGameMsg, ServerMsg}, npc::{self, get_npc_name}, state::TimeOfDay, sync::{Uid, WorldSyncExt}, @@ -667,7 +667,9 @@ fn handle_spawn( .try_map(|e| uids.get(e).copied()) .map(|g| (g, c)) }) - .map(|(g, c)| c.notify(ServerMsg::GroupUpdate(g))); + .map(|(g, c)| { + c.send_in_game(ServerInGameMsg::GroupUpdate(g)) + }); }, ); } else if let Some(group) = match alignment { diff --git a/server/src/events/entity_manipulation.rs b/server/src/events/entity_manipulation.rs index 308363bc22..edb13243eb 100644 --- a/server/src/events/entity_manipulation.rs +++ b/server/src/events/entity_manipulation.rs @@ -12,7 +12,7 @@ use common::{ Player, Pos, Stats, }, lottery::Lottery, - msg::{PlayerListUpdate, ServerMsg}, + msg::{PlayerListUpdate, ServerInGameMsg, ServerMsg}, outcome::Outcome, state::BlockChange, sync::{Uid, UidAllocator, WorldSyncExt}, @@ -44,7 +44,7 @@ pub fn handle_knockback(server: &Server, entity: EcsEntity, impulse: Vec3) } let mut clients = state.ecs().write_storage::(); if let Some(client) = clients.get_mut(entity) { - client.notify(ServerMsg::Knockback(impulse)); + client.send_in_game(ServerInGameMsg::Knockback(impulse)); } } diff --git a/server/src/events/group_manip.rs b/server/src/events/group_manip.rs index fb028729de..6c96644968 100644 --- a/server/src/events/group_manip.rs +++ b/server/src/events/group_manip.rs @@ -5,7 +5,7 @@ use common::{ group::{self, Group, GroupManager, Invite, PendingInvites}, ChatType, GroupManip, }, - msg::{InviteAnswer, ServerMsg}, + msg::{InviteAnswer, ServerInGameMsg}, sync, sync::WorldSyncExt, }; @@ -31,7 +31,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani None => { // Inform of failure if let Some(client) = clients.get_mut(entity) { - client.notify( + client.send_msg( ChatType::Meta .server_msg("Invite failed, target does not exist.".to_owned()), ); @@ -63,7 +63,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if already_in_same_group { // Inform of failure if let Some(client) = clients.get_mut(entity) { - client.notify(ChatType::Meta.server_msg( + client.send_msg(ChatType::Meta.server_msg( "Invite failed, can't invite someone already in your group".to_owned(), )); } @@ -93,7 +93,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if group_size_limit_reached { // Inform inviter that they have reached the group size limit if let Some(client) = clients.get_mut(entity) { - client.notify( + client.send_msg( ChatType::Meta.server_msg( "Invite failed, pending invites plus current group size have reached \ the group size limit" @@ -110,7 +110,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if invites.contains(invitee) { // Inform inviter that there is already an invite if let Some(client) = clients.get_mut(entity) { - client.notify( + client.send_msg( ChatType::Meta .server_msg("This player already has a pending invite.".to_owned()), ); @@ -155,7 +155,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani (clients.get_mut(invitee), uids.get(entity).copied()) { if send_invite() { - client.notify(ServerMsg::GroupInvite { + client.send_in_game(ServerInGameMsg::GroupInvite { inviter, timeout: PRESENTED_INVITE_TIMEOUT_DUR, }); @@ -163,7 +163,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani } else if agents.contains(invitee) { send_invite(); } else if let Some(client) = clients.get_mut(entity) { - client.notify( + client.send_msg( ChatType::Meta.server_msg("Can't invite, not a player or npc".to_owned()), ); } @@ -171,7 +171,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Notify inviter that the invite is pending if invite_sent { if let Some(client) = clients.get_mut(entity) { - client.notify(ServerMsg::InvitePending(uid)); + client.send_in_game(ServerInGameMsg::InvitePending(uid)); } } }, @@ -196,7 +196,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if let (Some(client), Some(target)) = (clients.get_mut(inviter), uids.get(entity).copied()) { - client.notify(ServerMsg::InviteComplete { + client.send_in_game(ServerInGameMsg::InviteComplete { target, answer: InviteAnswer::Accepted, }) @@ -217,7 +217,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani .try_map(|e| uids.get(e).copied()) .map(|g| (g, c)) }) - .map(|(g, c)| c.notify(ServerMsg::GroupUpdate(g))); + .map(|(g, c)| c.send_in_game(ServerInGameMsg::GroupUpdate(g))); }, ); } @@ -244,7 +244,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if let (Some(client), Some(target)) = (clients.get_mut(inviter), uids.get(entity).copied()) { - client.notify(ServerMsg::InviteComplete { + client.send_in_game(ServerInGameMsg::InviteComplete { target, answer: InviteAnswer::Declined, }) @@ -269,7 +269,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani .try_map(|e| uids.get(e).copied()) .map(|g| (g, c)) }) - .map(|(g, c)| c.notify(ServerMsg::GroupUpdate(g))); + .map(|(g, c)| c.send_in_game(ServerInGameMsg::GroupUpdate(g))); }, ); }, @@ -283,7 +283,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani None => { // Inform of failure if let Some(client) = clients.get_mut(entity) { - client.notify( + client.send_msg( ChatType::Meta .server_msg("Kick failed, target does not exist.".to_owned()), ); @@ -296,7 +296,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if matches!(alignments.get(target), Some(comp::Alignment::Owned(owner)) if uids.get(target).map_or(true, |u| u != owner)) { if let Some(client) = clients.get_mut(entity) { - client.notify( + client.send_msg( ChatType::Meta.server_msg("Kick failed, you can't kick pets.".to_owned()), ); } @@ -305,7 +305,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Can't kick yourself if uids.get(entity).map_or(false, |u| *u == uid) { if let Some(client) = clients.get_mut(entity) { - client.notify( + client.send_msg( ChatType::Meta .server_msg("Kick failed, you can't kick yourself.".to_owned()), ); @@ -336,26 +336,26 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani .try_map(|e| uids.get(e).copied()) .map(|g| (g, c)) }) - .map(|(g, c)| c.notify(ServerMsg::GroupUpdate(g))); + .map(|(g, c)| c.send_in_game(ServerInGameMsg::GroupUpdate(g))); }, ); // Tell them the have been kicked if let Some(client) = clients.get_mut(target) { - client.notify( + client.send_msg( ChatType::Meta .server_msg("You were removed from the group.".to_owned()), ); } // Tell kicker that they were succesful if let Some(client) = clients.get_mut(entity) { - client.notify(ChatType::Meta.server_msg("Player kicked.".to_owned())); + client.send_msg(ChatType::Meta.server_msg("Player kicked.".to_owned())); } }, Some(_) => { // Inform kicker that they are not the leader if let Some(client) = clients.get_mut(entity) { - client.notify(ChatType::Meta.server_msg( + client.send_msg(ChatType::Meta.server_msg( "Kick failed: You are not the leader of the target's group.".to_owned(), )); } @@ -363,7 +363,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani None => { // Inform kicker that the target is not in a group if let Some(client) = clients.get_mut(entity) { - client.notify( + client.send_msg( ChatType::Meta.server_msg( "Kick failed: Your target is not in a group.".to_owned(), ), @@ -380,7 +380,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani None => { // Inform of failure if let Some(client) = clients.get_mut(entity) { - client.notify(ChatType::Meta.server_msg( + client.send_msg(ChatType::Meta.server_msg( "Leadership transfer failed, target does not exist".to_owned(), )); } @@ -410,18 +410,18 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani .try_map(|e| uids.get(e).copied()) .map(|g| (g, c)) }) - .map(|(g, c)| c.notify(ServerMsg::GroupUpdate(g))); + .map(|(g, c)| c.send_in_game(ServerInGameMsg::GroupUpdate(g))); }, ); // Tell them they are the leader if let Some(client) = clients.get_mut(target) { - client.notify( + client.send_msg( ChatType::Meta.server_msg("You are the group leader now.".to_owned()), ); } // Tell the old leader that the transfer was succesful if let Some(client) = clients.get_mut(target) { - client.notify( + client.send_msg( ChatType::Meta .server_msg("You are no longer the group leader.".to_owned()), ); @@ -430,7 +430,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani Some(_) => { // Inform transferer that they are not the leader if let Some(client) = clients.get_mut(entity) { - client.notify( + client.send_msg( ChatType::Meta.server_msg( "Transfer failed: You are not the leader of the target's group." .to_owned(), @@ -441,7 +441,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani None => { // Inform transferer that the target is not in a group if let Some(client) = clients.get_mut(entity) { - client.notify(ChatType::Meta.server_msg( + client.send_msg(ChatType::Meta.server_msg( "Transfer failed: Your target is not in a group.".to_owned(), )); } diff --git a/server/src/events/interaction.rs b/server/src/events/interaction.rs index b5060208f5..e34ea80d52 100644 --- a/server/src/events/interaction.rs +++ b/server/src/events/interaction.rs @@ -116,7 +116,7 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) { let mut clients = ecs.write_storage::(); if clients.get_mut(possesse).is_none() { if let Some(mut client) = clients.remove(possessor) { - client.notify(ServerMsg::SetPlayerEntity(possesse_uid)); + client.send_msg(ServerMsg::SetPlayerEntity(possesse_uid)); clients .insert(possesse, client) .err() diff --git a/server/src/events/inventory_manip.rs b/server/src/events/inventory_manip.rs index 1f879bcbc4..c894479dc8 100644 --- a/server/src/events/inventory_manip.rs +++ b/server/src/events/inventory_manip.rs @@ -5,7 +5,7 @@ use common::{ slot::{self, Slot}, Pos, MAX_PICKUP_RANGE_SQR, }, - msg::ServerMsg, + msg::ServerInGameMsg, recipe::default_recipe_book, sync::{Uid, WorldSyncExt}, vol::ReadVol, @@ -281,7 +281,9 @@ pub fn handle_inventory(server: &mut Server, entity: EcsEntity, manip: comp::Inv .map(|g| (g, c)) }) .map(|(g, c)| { - c.notify(ServerMsg::GroupUpdate(g)) + c.send_in_game( + ServerInGameMsg::GroupUpdate(g), + ) }); }, ); diff --git a/server/src/events/player.rs b/server/src/events/player.rs index f92167bda8..910e932131 100644 --- a/server/src/events/player.rs +++ b/server/src/events/player.rs @@ -5,7 +5,7 @@ use crate::{ use common::{ comp, comp::{group, Player}, - msg::{ClientState, PlayerListUpdate, ServerMsg}, + msg::{PlayerListUpdate, ServerInGameMsg, ServerMsg}, span, sync::{Uid, UidAllocator}, }; @@ -33,9 +33,8 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) { .cloned(); if let (Some(mut client), Some(uid), Some(player)) = (maybe_client, maybe_uid, maybe_player) { // Tell client its request was successful - client.allow_state(ClientState::Registered); - // Tell client to clear out other entities and its own components - client.notify(ServerMsg::ExitIngameCleanup); + client.in_game = None; + client.send_in_game(ServerInGameMsg::ExitInGameSuccess); let entity_builder = state.ecs_mut().create_entity().with(client).with(player); diff --git a/server/src/lib.rs b/server/src/lib.rs index 2cb6b1e1d9..c0b0e25526 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -44,7 +44,10 @@ use common::{ cmd::ChatCommand, comp::{self, ChatType}, event::{EventBus, ServerEvent}, - msg::{server::WorldMapMsg, ClientState, DisconnectReason, ServerInfo, ServerMsg}, + msg::{ + server::WorldMapMsg, ClientType, DisconnectReason, ServerInGameMsg, ServerInfo, + ServerInitMsg, ServerMsg, ServerNotInGameMsg, + }, outcome::Outcome, recipe::default_recipe_book, state::{State, TimeOfDay}, @@ -56,7 +59,7 @@ use futures_executor::block_on; use futures_timer::Delay; use futures_util::{select, FutureExt}; use metrics::{ServerMetrics, StateTickMetrics, TickMetrics}; -use network::{Network, Pid, ProtocolAddr}; +use network::{Network, Pid, Promises, ProtocolAddr}; use persistence::{ character_loader::{CharacterLoader, CharacterLoaderResponseType}, character_updater::CharacterUpdater, @@ -70,7 +73,7 @@ use std::{ }; #[cfg(not(feature = "worldgen"))] use test_world::{IndexOwned, World}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, trace}; use uvth::{ThreadPool, ThreadPoolBuilder}; use vek::*; #[cfg(feature = "worldgen")] @@ -521,13 +524,13 @@ impl Server { .messages() .for_each(|query_result| match query_result.result { CharacterLoaderResponseType::CharacterList(result) => match result { - Ok(character_list_data) => self.notify_client( + Ok(character_list_data) => self.notify_not_in_game_client( query_result.entity, - ServerMsg::CharacterListUpdate(character_list_data), + ServerNotInGameMsg::CharacterListUpdate(character_list_data), ), - Err(error) => self.notify_client( + Err(error) => self.notify_not_in_game_client( query_result.entity, - ServerMsg::CharacterActionError(error.to_string()), + ServerNotInGameMsg::CharacterActionError(error.to_string()), ), }, CharacterLoaderResponseType::CharacterData(result) => { @@ -540,9 +543,9 @@ impl Server { // We failed to load data for the character from the DB. Notify the // client to push the state back to character selection, with the error // to display - self.notify_client( + self.notify_not_in_game_client( query_result.entity, - ServerMsg::CharacterDataLoadError(error.to_string()), + ServerNotInGameMsg::CharacterDataLoadError(error.to_string()), ); // Clean up the entity data on the server @@ -797,7 +800,6 @@ impl Server { ) -> Result<(), Error> { //TIMEOUT 0.1 ms for msg handling const TIMEOUT: Duration = Duration::from_micros(100); - const SLOWLORIS_TIMEOUT: Duration = Duration::from_millis(300); loop { let participant = match select!( _ = Delay::new(TIMEOUT).fuse() => None, @@ -807,71 +809,77 @@ impl Server { Some(pr) => pr?, }; debug!("New Participant connected to the server"); + let reliable = Promises::ORDERED | Promises::CONSISTENCY; + let reliablec = reliable | Promises::COMPRESSED; - let singleton_stream = match select!( - _ = Delay::new(SLOWLORIS_TIMEOUT).fuse() => None, - sr = participant.opened().fuse() => Some(sr), - ) { - None => { - warn!("Either Slowloris attack or very slow client, dropping"); - return Ok(()); //return rather then continue to give removes a tick more to send data. - }, - Some(Ok(s)) => s, - Some(Err(e)) => { - warn!(?e, "Failed to open a Stream from remote client. dropping"); - continue; - }, - }; + let stream = participant.open(10, reliablec).await?; + let ping_stream = participant.open(5, reliable).await?; + let mut register_stream = participant.open(10, reliablec).await?; + let in_game_stream = participant.open(10, reliablec).await?; + let not_in_game_stream = participant.open(10, reliablec).await?; - let mut client = Client { - client_state: ClientState::Connected, + register_stream.send(self.get_server_info())?; + let client_type: ClientType = register_stream.recv().await?; + + if self.settings().max_players + <= self.state.ecs().read_storage::().join().count() + { + trace!( + ?participant, + "to many players, wont allow participant to connect" + ); + register_stream.send(ServerInitMsg::TooManyPlayers)?; + continue; + } + + let client = Client { + registered: false, + client_type, + in_game: None, participant: std::sync::Mutex::new(Some(participant)), - singleton_stream, + singleton_stream: stream, + ping_stream, + register_stream, + in_game_stream, + not_in_game_stream, network_error: std::sync::atomic::AtomicBool::new(false), last_ping: self.state.get_time(), login_msg_sent: false, }; - if self.settings().max_players - <= self.state.ecs().read_storage::().join().count() - { - // Note: in this case the client is dropped - client.notify(ServerMsg::TooManyPlayers); - } else { - let entity = self - .state - .ecs_mut() - .create_entity_synced() - .with(client) - .build(); - self.state - .ecs() - .read_resource::() - .clients_connected - .inc(); - // Send client all the tracked components currently attached to its entity as - // well as synced resources (currently only `TimeOfDay`) - debug!("Starting initial sync with client."); - self.state - .ecs() - .write_storage::() - .get_mut(entity) - .unwrap() - .notify(ServerMsg::InitialSync { - // Send client their entity - entity_package: TrackedComps::fetch(&self.state.ecs()) - .create_entity_package(entity, None, None, None), - server_info: self.get_server_info(), - time_of_day: *self.state.ecs().read_resource(), - max_group_size: self.settings().max_player_group_size, - client_timeout: self.settings().client_timeout, - world_map: self.map.clone(), - recipe_book: (&*default_recipe_book()).clone(), - }); + let entity = self + .state + .ecs_mut() + .create_entity_synced() + .with(client) + .build(); + self.state + .ecs() + .read_resource::() + .clients_connected + .inc(); + // Send client all the tracked components currently attached to its entity as + // well as synced resources (currently only `TimeOfDay`) + debug!("Starting initial sync with client."); + self.state + .ecs() + .write_storage::() + .get_mut(entity) + .unwrap() + .register_stream + .send(ServerInitMsg::GameSync { + // Send client their entity + entity_package: TrackedComps::fetch(&self.state.ecs()) + .create_entity_package(entity, None, None, None), + time_of_day: *self.state.ecs().read_resource(), + max_group_size: self.settings().max_player_group_size, + client_timeout: self.settings().client_timeout, + world_map: self.map.clone(), + recipe_book: (&*default_recipe_book()).clone(), + })?; - frontend_events.push(Event::ClientConnected { entity }); - debug!("Done initial sync with client."); - } + frontend_events.push(Event::ClientConnected { entity }); + debug!("Done initial sync with client."); } } @@ -880,7 +888,25 @@ impl Server { S: Into, { if let Some(client) = self.state.ecs().write_storage::().get_mut(entity) { - client.notify(msg.into()) + client.send_msg(msg.into()) + } + } + + pub fn notify_in_game_client(&self, entity: EcsEntity, msg: S) + where + S: Into, + { + if let Some(client) = self.state.ecs().write_storage::().get_mut(entity) { + client.send_in_game(msg.into()) + } + } + + pub fn notify_not_in_game_client(&self, entity: EcsEntity, msg: S) + where + S: Into, + { + if let Some(client) = self.state.ecs().write_storage::().get_mut(entity) { + client.send_not_in_game(msg.into()) } } diff --git a/server/src/state_ext.rs b/server/src/state_ext.rs index e51de63761..14ef5bbf70 100644 --- a/server/src/state_ext.rs +++ b/server/src/state_ext.rs @@ -5,7 +5,10 @@ use common::{ character::CharacterId, comp, effect::Effect, - msg::{CharacterInfo, ClientState, PlayerListUpdate, ServerMsg}, + msg::{ + CharacterInfo, ClientIngame, PlayerListUpdate, ServerInGameMsg, ServerMsg, + ServerNotInGameMsg, + }, state::State, sync::{Uid, UidAllocator, WorldSyncExt}, util::Dir, @@ -216,7 +219,8 @@ impl StateExt for State { // Tell the client its request was successful. if let Some(client) = self.ecs().write_storage::().get_mut(entity) { - client.allow_state(ClientState::Character); + client.in_game = Some(ClientIngame::Character); + client.send_not_in_game(ServerNotInGameMsg::CharacterSuccess) } } @@ -282,7 +286,7 @@ impl StateExt for State { .join() { if uid == u || uid == t { - client.notify(ServerMsg::ChatMsg(resolved_msg.clone())); + client.send_msg(ServerMsg::ChatMsg(resolved_msg.clone())); } } }, @@ -294,7 +298,7 @@ impl StateExt for State { if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) { for (client, pos) in (&mut ecs.write_storage::(), &positions).join() { if is_within(comp::ChatMsg::SAY_DISTANCE, pos, speaker_pos) { - client.notify(ServerMsg::ChatMsg(resolved_msg.clone())); + client.send_msg(ServerMsg::ChatMsg(resolved_msg.clone())); } } } @@ -306,7 +310,7 @@ impl StateExt for State { if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) { for (client, pos) in (&mut ecs.write_storage::(), &positions).join() { if is_within(comp::ChatMsg::REGION_DISTANCE, pos, speaker_pos) { - client.notify(ServerMsg::ChatMsg(resolved_msg.clone())); + client.send_msg(ServerMsg::ChatMsg(resolved_msg.clone())); } } } @@ -318,7 +322,7 @@ impl StateExt for State { if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) { for (client, pos) in (&mut ecs.write_storage::(), &positions).join() { if is_within(comp::ChatMsg::NPC_DISTANCE, pos, speaker_pos) { - client.notify(ServerMsg::ChatMsg(resolved_msg.clone())); + client.send_msg(ServerMsg::ChatMsg(resolved_msg.clone())); } } } @@ -332,7 +336,7 @@ impl StateExt for State { .join() { if s == &faction.0 { - client.notify(ServerMsg::ChatMsg(resolved_msg.clone())); + client.send_msg(ServerMsg::ChatMsg(resolved_msg.clone())); } } }, @@ -344,7 +348,7 @@ impl StateExt for State { .join() { if g == group { - client.notify(ServerMsg::ChatMsg(resolved_msg.clone())); + client.send_msg(ServerMsg::ChatMsg(resolved_msg.clone())); } } }, @@ -355,9 +359,9 @@ impl StateExt for State { fn notify_registered_clients(&self, msg: ServerMsg) { for client in (&mut self.ecs().write_storage::()) .join() - .filter(|c| c.is_registered()) + .filter(|c| c.registered) { - client.notify(msg.clone()); + client.send_msg(msg.clone()); } } @@ -384,7 +388,7 @@ impl StateExt for State { .try_map(|e| uids.get(e).copied()) .map(|g| (g, c)) }) - .map(|(g, c)| c.notify(ServerMsg::GroupUpdate(g))); + .map(|(g, c)| c.send_in_game(ServerInGameMsg::GroupUpdate(g))); }, ); } diff --git a/server/src/sys/entity_sync.rs b/server/src/sys/entity_sync.rs index 1dfd92e4aa..3bd9b1d415 100644 --- a/server/src/sys/entity_sync.rs +++ b/server/src/sys/entity_sync.rs @@ -8,7 +8,7 @@ use crate::{ }; use common::{ comp::{ForceUpdate, Inventory, InventoryUpdate, Last, Ori, Player, Pos, Vel}, - msg::ServerMsg, + msg::{ServerInGameMsg, ServerMsg}, outcome::Outcome, region::{Event as RegionEvent, RegionMap}, span, @@ -107,7 +107,7 @@ impl<'a> System<'a> for Sys { let mut subscribers = (&mut clients, &entities, &subscriptions, &positions) .join() .filter_map(|(client, entity, subscription, pos)| { - if client.is_ingame() && subscription.regions.contains(&key) { + if client.in_game.is_some() && subscription.regions.contains(&key) { Some((client, &subscription.regions, entity, *pos)) } else { None @@ -143,7 +143,7 @@ impl<'a> System<'a> for Sys { // Client doesn't need to know about itself && *client_entity != entity { - client.notify(create_msg.clone()); + client.send_msg(create_msg.clone()); } } } @@ -157,7 +157,7 @@ impl<'a> System<'a> for Sys { .map(|key| !regions.contains(key)) .unwrap_or(true) { - client.notify(ServerMsg::DeleteEntity(uid)); + client.send_msg(ServerMsg::DeleteEntity(uid)); } } } @@ -177,8 +177,8 @@ impl<'a> System<'a> for Sys { let entity_sync_msg = ServerMsg::EntitySync(entity_sync_package); let comp_sync_msg = ServerMsg::CompSync(comp_sync_package); subscribers.iter_mut().for_each(move |(client, _, _, _)| { - client.notify(entity_sync_msg.clone()); - client.notify(comp_sync_msg.clone()); + client.send_msg(entity_sync_msg.clone()); + client.send_msg(comp_sync_msg.clone()); }); let mut send_msg = |msg: ServerMsg, @@ -212,7 +212,7 @@ impl<'a> System<'a> for Sys { true // Closer than 100 blocks } } { - client.notify(msg.clone()); + client.send_msg(msg.clone()); } } }; @@ -303,7 +303,7 @@ impl<'a> System<'a> for Sys { (&mut clients, &subscriptions) .join() .filter_map(|(client, subscription)| { - if client.is_ingame() && subscription.regions.contains(®ion_key) { + if client.in_game.is_some() && subscription.regions.contains(®ion_key) { Some(client) } else { None @@ -311,7 +311,7 @@ impl<'a> System<'a> for Sys { }) { for uid in &deleted { - client.notify(ServerMsg::DeleteEntity(Uid(*uid))); + client.send_msg(ServerMsg::DeleteEntity(Uid(*uid))); } } } @@ -320,7 +320,7 @@ impl<'a> System<'a> for Sys { // Sync inventories for (client, inventory, update) in (&mut clients, &inventories, &inventory_updates).join() { - client.notify(ServerMsg::InventoryUpdate( + client.send_in_game(ServerInGameMsg::InventoryUpdate( inventory.clone(), update.event(), )); @@ -341,7 +341,7 @@ impl<'a> System<'a> for Sys { .cloned() .collect::>(); if !outcomes.is_empty() { - client.notify(ServerMsg::Outcomes(outcomes)); + client.send_in_game(ServerInGameMsg::Outcomes(outcomes)); } } outcomes.clear(); @@ -355,7 +355,7 @@ impl<'a> System<'a> for Sys { // system?) let tof_msg = ServerMsg::TimeOfDay(*time_of_day); for client in (&mut clients).join() { - client.notify(tof_msg.clone()); + client.send_msg(tof_msg.clone()); } timer.end(); diff --git a/server/src/sys/invite_timeout.rs b/server/src/sys/invite_timeout.rs index 834f812a84..2afd405c11 100644 --- a/server/src/sys/invite_timeout.rs +++ b/server/src/sys/invite_timeout.rs @@ -2,7 +2,7 @@ use super::SysTimer; use crate::client::Client; use common::{ comp::group::{Invite, PendingInvites}, - msg::{InviteAnswer, ServerMsg}, + msg::{InviteAnswer, ServerInGameMsg}, span, sync::Uid, }; @@ -54,7 +54,7 @@ impl<'a> System<'a> for Sys { if let (Some(client), Some(target)) = (clients.get_mut(*inviter), uids.get(invitee).copied()) { - client.notify(ServerMsg::InviteComplete { + client.send_in_game(ServerInGameMsg::InviteComplete { target, answer: InviteAnswer::TimedOut, }) diff --git a/server/src/sys/message.rs b/server/src/sys/message.rs index ed6eb8ee4a..8cbbb947be 100644 --- a/server/src/sys/message.rs +++ b/server/src/sys/message.rs @@ -15,9 +15,10 @@ use common::{ }, event::{EventBus, ServerEvent}, msg::{ - validate_chat_msg, CharacterInfo, ChatMsgValidationError, ClientMsg, ClientState, - DisconnectReason, PlayerInfo, PlayerListUpdate, RequestStateError, ServerMsg, - MAX_BYTES_CHAT_MSG, + validate_chat_msg, CharacterInfo, ChatMsgValidationError, ClientInGameMsg, ClientIngame, + ClientMsg, ClientNotInGameMsg, ClientRegisterMsg, DisconnectReason, PingMsg, PlayerInfo, + PlayerListUpdate, RegisterError, ServerInGameMsg, ServerMsg, ServerNotInGameMsg, + ServerRegisterAnswerMsg, MAX_BYTES_CHAT_MSG, }, span, state::{BlockChange, Time}, @@ -32,13 +33,435 @@ use hashbrown::HashMap; use specs::{ Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage, }; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, trace, warn}; impl Sys { + #[allow(clippy::too_many_arguments)] + fn handle_client_msg( + server_emitter: &mut common::event::Emitter<'_, ServerEvent>, + new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, + entity: specs::Entity, + client: &mut Client, + player_metrics: &ReadExpect<'_, PlayerMetrics>, + uids: &ReadStorage<'_, Uid>, + chat_modes: &ReadStorage<'_, ChatMode>, + msg: ClientMsg, + ) -> Result<(), crate::error::Error> { + match msg { + ClientMsg::ChatMsg(message) => { + if client.registered { + match validate_chat_msg(&message) { + Ok(()) => { + if let Some(from) = uids.get(entity) { + let mode = chat_modes.get(entity).cloned().unwrap_or_default(); + let msg = mode.new_message(*from, message); + new_chat_msgs.push((Some(entity), msg)); + } else { + error!("Could not send message. Missing player uid"); + } + }, + Err(ChatMsgValidationError::TooLong) => { + let max = MAX_BYTES_CHAT_MSG; + let len = message.len(); + warn!(?len, ?max, "Received a chat message that's too long") + }, + } + } + }, + ClientMsg::Command(message) => { + if client.registered { + match validate_chat_msg(&message) { + Ok(()) => { + if let Some(from) = uids.get(entity) { + let mode = chat_modes.get(entity).cloned().unwrap_or_default(); + let msg = mode.new_message(*from, message); + new_chat_msgs.push((Some(entity), msg)); + } else { + error!("Could not send message. Missing player uid"); + } + }, + Err(ChatMsgValidationError::TooLong) => { + let max = MAX_BYTES_CHAT_MSG; + let len = message.len(); + warn!(?len, ?max, "Received a chat message that's too long") + }, + } + } + }, + ClientMsg::Disconnect => { + client.send_msg(ServerMsg::Disconnect(DisconnectReason::Requested)); + }, + ClientMsg::Terminate => { + debug!(?entity, "Client send message to termitate session"); + player_metrics + .clients_disconnected + .with_label_values(&["gracefully"]) + .inc(); + server_emitter.emit(ServerEvent::ClientDisconnect(entity)); + }, + } + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + fn handle_client_in_game_msg( + server_emitter: &mut common::event::Emitter<'_, ServerEvent>, + entity: specs::Entity, + client: &mut Client, + terrain: &ReadExpect<'_, TerrainGrid>, + network_metrics: &ReadExpect<'_, NetworkRequestMetrics>, + can_build: &ReadStorage<'_, CanBuild>, + force_updates: &ReadStorage<'_, ForceUpdate>, + stats: &mut WriteStorage<'_, Stats>, + block_changes: &mut Write<'_, BlockChange>, + positions: &mut WriteStorage<'_, Pos>, + velocities: &mut WriteStorage<'_, Vel>, + orientations: &mut WriteStorage<'_, Ori>, + players: &mut WriteStorage<'_, Player>, + controllers: &mut WriteStorage<'_, Controller>, + settings: &Read<'_, Settings>, + msg: ClientInGameMsg, + ) -> Result<(), crate::error::Error> { + if !client.in_game.is_some() { + debug!(?entity, "client is not in_game, ignoring msg"); + trace!(?msg, "ignored msg content"); + if matches!(msg, ClientInGameMsg::TerrainChunkRequest{ .. }) { + network_metrics.chunks_request_dropped.inc(); + } + return Ok(()); + } + match msg { + // Go back to registered state (char selection screen) + ClientInGameMsg::ExitInGame => { + client.in_game = None; + server_emitter.emit(ServerEvent::ExitIngame { entity }); + client.send_in_game(ServerInGameMsg::ExitInGameSuccess); + }, + ClientInGameMsg::SetViewDistance(view_distance) => { + players.get_mut(entity).map(|player| { + player.view_distance = Some( + settings + .max_view_distance + .map(|max| view_distance.min(max)) + .unwrap_or(view_distance), + ) + }); + + if settings + .max_view_distance + .map(|max| view_distance > max) + .unwrap_or(false) + { + client.send_in_game(ServerInGameMsg::SetViewDistance( + settings.max_view_distance.unwrap_or(0), + )); + } + }, + ClientInGameMsg::ControllerInputs(inputs) => { + if let Some(ClientIngame::Character) = client.in_game { + if let Some(controller) = controllers.get_mut(entity) { + controller.inputs.update_with_new(inputs); + } + } + }, + ClientInGameMsg::ControlEvent(event) => { + if let Some(ClientIngame::Character) = client.in_game { + // Skip respawn if client entity is alive + if let ControlEvent::Respawn = event { + if stats.get(entity).map_or(true, |s| !s.is_dead) { + //Todo: comment why return! + return Ok(()); + } + } + if let Some(controller) = controllers.get_mut(entity) { + controller.events.push(event); + } + } + }, + ClientInGameMsg::ControlAction(event) => { + if let Some(ClientIngame::Character) = client.in_game { + if let Some(controller) = controllers.get_mut(entity) { + controller.actions.push(event); + } + } + }, + ClientInGameMsg::PlayerPhysics { pos, vel, ori } => { + if let Some(ClientIngame::Character) = client.in_game { + if force_updates.get(entity).is_none() + && stats.get(entity).map_or(true, |s| !s.is_dead) + { + let _ = positions.insert(entity, pos); + let _ = velocities.insert(entity, vel); + let _ = orientations.insert(entity, ori); + } + } + }, + ClientInGameMsg::BreakBlock(pos) => { + if let Some(block) = can_build.get(entity).and_then(|_| terrain.get(pos).ok()) { + block_changes.set(pos, block.into_vacant()); + } + }, + ClientInGameMsg::PlaceBlock(pos, block) => { + if can_build.get(entity).is_some() { + block_changes.try_set(pos, block); + } + }, + ClientInGameMsg::TerrainChunkRequest { key } => { + let in_vd = if let (Some(view_distance), Some(pos)) = ( + players.get(entity).and_then(|p| p.view_distance), + positions.get(entity), + ) { + pos.0.xy().map(|e| e as f64).distance( + key.map(|e| e as f64 + 0.5) * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), + ) < (view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt()) + * TerrainChunkSize::RECT_SIZE.x as f64 + } else { + true + }; + if in_vd { + match terrain.get_key(key) { + Some(chunk) => { + network_metrics.chunks_served_from_memory.inc(); + client.send_in_game(ServerInGameMsg::TerrainChunkUpdate { + key, + chunk: Ok(Box::new(chunk.clone())), + }) + }, + None => { + network_metrics.chunks_generation_triggered.inc(); + server_emitter.emit(ServerEvent::ChunkRequest(entity, key)) + }, + } + } else { + network_metrics.chunks_request_dropped.inc(); + } + }, + ClientInGameMsg::UnlockSkill(skill) => { + stats + .get_mut(entity) + .map(|s| s.skill_set.unlock_skill(skill)); + }, + ClientInGameMsg::RefundSkill(skill) => { + stats + .get_mut(entity) + .map(|s| s.skill_set.refund_skill(skill)); + }, + ClientInGameMsg::UnlockSkillGroup(skill_group_type) => { + stats + .get_mut(entity) + .map(|s| s.skill_set.unlock_skill_group(skill_group_type)); + }, + } + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + fn handle_client_not_in_game_msg( + server_emitter: &mut common::event::Emitter<'_, ServerEvent>, + new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, + entity: specs::Entity, + client: &mut Client, + character_loader: &ReadExpect<'_, CharacterLoader>, + uids: &ReadStorage<'_, Uid>, + players: &mut WriteStorage<'_, Player>, + editable_settings: &ReadExpect<'_, EditableSettings>, + alias_validator: &ReadExpect<'_, AliasValidator>, + msg: ClientNotInGameMsg, + ) -> Result<(), crate::error::Error> { + match msg { + // Request spectator state + ClientNotInGameMsg::Spectate => { + if client.registered { + client.in_game = Some(ClientIngame::Spectator) + } else { + debug!("dropped Spectate msg from unregistered client"); + } + }, + ClientNotInGameMsg::Character(character_id) => { + if client.registered && !client.in_game.is_some() { + // Only send login message if it wasn't already + // sent previously + if let Some(player) = players.get(entity) { + // Send a request to load the character's component data from the + // DB. Once loaded, persisted components such as stats and inventory + // will be inserted for the entity + character_loader.load_character_data( + entity, + player.uuid().to_string(), + character_id, + ); + + // Start inserting non-persisted/default components for the entity + // while we load the DB data + server_emitter.emit(ServerEvent::InitCharacterData { + entity, + character_id, + }); + + // Give the player a welcome message + if !editable_settings.server_description.is_empty() { + client.send_msg( + ChatType::CommandInfo.server_msg(String::from( + &*editable_settings.server_description, + )), + ); + } + + // Only send login message if it wasn't already + // sent previously + if !client.login_msg_sent { + if let Some(player_uid) = uids.get(entity) { + new_chat_msgs.push((None, UnresolvedChatMsg { + chat_type: ChatType::Online(*player_uid), + message: "".to_string(), + })); + + client.login_msg_sent = true; + } + } + } else { + client.send_not_in_game(ServerNotInGameMsg::CharacterDataLoadError( + String::from("Failed to fetch player entity"), + )) + } + } else { + let registered = client.registered; + let in_game = client.in_game; + debug!(?registered, ?in_game, "dropped Character msg from client"); + } + }, + ClientNotInGameMsg::RequestCharacterList => { + if let Some(player) = players.get(entity) { + character_loader.load_character_list(entity, player.uuid().to_string()) + } + }, + ClientNotInGameMsg::CreateCharacter { alias, tool, body } => { + if let Err(error) = alias_validator.validate(&alias) { + debug!(?error, ?alias, "denied alias as it contained a banned word"); + client.send_not_in_game(ServerNotInGameMsg::CharacterActionError( + error.to_string(), + )); + } else if let Some(player) = players.get(entity) { + character_creator::create_character( + entity, + player.uuid().to_string(), + alias, + tool, + body, + character_loader, + ); + } + }, + ClientNotInGameMsg::DeleteCharacter(character_id) => { + if let Some(player) = players.get(entity) { + character_loader.delete_character( + entity, + player.uuid().to_string(), + character_id, + ); + } + }, + } + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + fn handle_ping_msg(client: &mut Client, msg: PingMsg) -> Result<(), crate::error::Error> { + match msg { + PingMsg::Ping => client.send_ping(PingMsg::Pong), + PingMsg::Pong => {}, + } + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + fn handle_register_msg( + player_list: &HashMap, + new_players: &mut Vec, + entity: specs::Entity, + client: &mut Client, + player_metrics: &ReadExpect<'_, PlayerMetrics>, + login_provider: &mut WriteExpect<'_, LoginProvider>, + admins: &mut WriteStorage<'_, Admin>, + players: &mut WriteStorage<'_, Player>, + settings: &Read<'_, Settings>, + editable_settings: &ReadExpect<'_, EditableSettings>, + msg: ClientRegisterMsg, + ) -> Result<(), crate::error::Error> { + let (username, uuid) = match login_provider.try_login( + &msg.token_or_username, + &*editable_settings.admins, + &*editable_settings.whitelist, + &*editable_settings.banlist, + ) { + Err(err) => { + client + .register_stream + .send(ServerRegisterAnswerMsg::Err(err))?; + return Ok(()); + }, + Ok((username, uuid)) => (username, uuid), + }; + + let vd = msg + .view_distance + .map(|vd| vd.min(settings.max_view_distance.unwrap_or(vd))); + let player = Player::new(username.clone(), None, vd, uuid); + let is_admin = editable_settings.admins.contains(&uuid); + + if !player.is_valid() { + // Invalid player + client.register_stream.send(ServerRegisterAnswerMsg::Err( + RegisterError::InvalidCharacter, + ))?; + return Ok(()); + } + + if !client.registered && client.in_game.is_none() { + // Add Player component to this client + let _ = players.insert(entity, player); + player_metrics.players_connected.inc(); + + // Give the Admin component to the player if their name exists in + // admin list + if is_admin { + let _ = admins.insert(entity, Admin); + } + + // Tell the client its request was successful. + client.registered = true; + client + .register_stream + .send(ServerRegisterAnswerMsg::Ok(()))?; + + // Send initial player list + client.send_msg(ServerMsg::PlayerListUpdate(PlayerListUpdate::Init( + player_list.clone(), + ))); + + // Add to list to notify all clients of the new player + new_players.push(entity); + } + + // Limit view distance if it's too high + // This comes after state registration so that the client actually hears it + if settings + .max_view_distance + .zip(msg.view_distance) + .map(|(max, vd)| vd > max) + .unwrap_or(false) + { + client.send_in_game(ServerInGameMsg::SetViewDistance( + settings.max_view_distance.unwrap_or(0), + )); + }; + Ok(()) + } + ///We needed to move this to a async fn, if we would use a async closures /// the compiler generates to much recursion and fails to compile this #[allow(clippy::too_many_arguments)] - async fn handle_client_msg( + async fn handle_messages( server_emitter: &mut common::event::Emitter<'_, ServerEvent>, new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, player_list: &HashMap, @@ -64,357 +487,87 @@ impl Sys { players: &mut WriteStorage<'_, Player>, controllers: &mut WriteStorage<'_, Controller>, settings: &Read<'_, Settings>, + editable_settings: &ReadExpect<'_, EditableSettings>, alias_validator: &ReadExpect<'_, AliasValidator>, - editable_settings: &EditableSettings, ) -> Result<(), crate::error::Error> { loop { - let msg = client.recv().await?; + let q1 = Client::internal_recv(&client.network_error, &mut client.singleton_stream); + let q2 = Client::internal_recv(&client.network_error, &mut client.in_game_stream); + let q3 = Client::internal_recv(&client.network_error, &mut client.not_in_game_stream); + let q4 = Client::internal_recv(&client.network_error, &mut client.ping_stream); + let q5 = Client::internal_recv(&client.network_error, &mut client.register_stream); + + let (m1, m2, m3, m4, m5) = select!( + msg = q1.fuse() => (Some(msg?), None, None, None, None), + msg = q2.fuse() => (None, Some(msg?), None, None, None), + msg = q3.fuse() => (None, None, Some(msg?), None, None), + msg = q4.fuse() => (None, None, None, Some(msg?), None), + msg = q5.fuse() => (None, None, None, None,Some(msg?)), + ); *cnt += 1; - match msg { - // Go back to registered state (char selection screen) - ClientMsg::ExitIngame => match client.client_state { - // Use ClientMsg::Register instead. - ClientState::Connected => client.error_state(RequestStateError::WrongMessage), - ClientState::Registered => client.error_state(RequestStateError::Already), - ClientState::Spectator | ClientState::Character => { - server_emitter.emit(ServerEvent::ExitIngame { entity }); - }, - ClientState::Pending => {}, - }, - // Request spectator state - ClientMsg::Spectate => match client.client_state { - // Become Registered first. - ClientState::Connected => client.error_state(RequestStateError::Impossible), - ClientState::Spectator => client.error_state(RequestStateError::Already), - ClientState::Registered | ClientState::Character => { - client.allow_state(ClientState::Spectator) - }, - ClientState::Pending => {}, - }, - // Request registered state (login) - ClientMsg::Register { - view_distance, - token_or_username, - } => { - let (username, uuid) = match login_provider.try_login( - &token_or_username, - &*editable_settings.admins, - &*editable_settings.whitelist, - &*editable_settings.banlist, - ) { - Err(err) => { - client.error_state(RequestStateError::RegisterDenied(err)); - break Ok(()); - }, - Ok((username, uuid)) => (username, uuid), - }; - - let vd = - view_distance.map(|vd| vd.min(settings.max_view_distance.unwrap_or(vd))); - let is_admin = editable_settings.admins.contains(&uuid); - let player = Player::new(username.clone(), None, vd, uuid); - - if !player.is_valid() { - // Invalid player - client.error_state(RequestStateError::Impossible); - break Ok(()); - } - - match client.client_state { - ClientState::Connected => { - // Add Player component to this client - let _ = players.insert(entity, player); - player_metrics.players_connected.inc(); - - // Give the Admin component to the player if their name exists in - // admin list - if is_admin { - let _ = admins.insert(entity, Admin); - } - - // Tell the client its request was successful. - client.allow_state(ClientState::Registered); - - // Send initial player list - client.notify(ServerMsg::PlayerListUpdate(PlayerListUpdate::Init( - player_list.clone(), - ))); - - // Add to list to notify all clients of the new player - new_players.push(entity); - }, - // Use RequestState instead (No need to send `player` again). - _ => client.error_state(RequestStateError::Impossible), - } - //client.allow_state(ClientState::Registered); - - // Limit view distance if it's too high - // This comes after state registration so that the client actually hears it - if settings - .max_view_distance - .zip(view_distance) - .map(|(max, vd)| vd > max) - .unwrap_or(false) - { - client.notify(ServerMsg::SetViewDistance( - settings.max_view_distance.unwrap_or(0), - )); - }; - }, - ClientMsg::SetViewDistance(view_distance) => { - if let ClientState::Character { .. } = client.client_state { - players.get_mut(entity).map(|player| { - player.view_distance = Some( - settings - .max_view_distance - .map(|max| view_distance.min(max)) - .unwrap_or(view_distance), - ) - }); - - if settings - .max_view_distance - .map(|max| view_distance > max) - .unwrap_or(false) - { - client.notify(ServerMsg::SetViewDistance( - settings.max_view_distance.unwrap_or(0), - )); - } - } - }, - ClientMsg::Character(character_id) => match client.client_state { - // Become Registered first. - ClientState::Connected => client.error_state(RequestStateError::Impossible), - ClientState::Registered | ClientState::Spectator => { - // Only send login message if it wasn't already - // sent previously - if let Some(player) = players.get(entity) { - // Send a request to load the character's component data from the - // DB. Once loaded, persisted components such as stats and inventory - // will be inserted for the entity - character_loader.load_character_data( - entity, - player.uuid().to_string(), - character_id, - ); - - // Start inserting non-persisted/default components for the entity - // while we load the DB data - server_emitter.emit(ServerEvent::InitCharacterData { - entity, - character_id, - }); - - // Give the player a welcome message - if !editable_settings.server_description.is_empty() { - client.notify(ChatType::CommandInfo.server_msg(String::from( - &*editable_settings.server_description, - ))); - } - - // Only send login message if it wasn't already - // sent previously - if !client.login_msg_sent { - if let Some(player_uid) = uids.get(entity) { - new_chat_msgs.push((None, UnresolvedChatMsg { - chat_type: ChatType::Online(*player_uid), - message: "".to_string(), - })); - - client.login_msg_sent = true; - } - } - } else { - client.notify(ServerMsg::CharacterDataLoadError(String::from( - "Failed to fetch player entity", - ))) - } - }, - ClientState::Character => client.error_state(RequestStateError::Already), - ClientState::Pending => {}, - }, - ClientMsg::ControllerInputs(inputs) => match client.client_state { - ClientState::Connected | ClientState::Registered | ClientState::Spectator => { - client.error_state(RequestStateError::Impossible) - }, - ClientState::Character => { - if let Some(controller) = controllers.get_mut(entity) { - controller.inputs.update_with_new(inputs); - } - }, - ClientState::Pending => {}, - }, - ClientMsg::ControlEvent(event) => match client.client_state { - ClientState::Connected | ClientState::Registered | ClientState::Spectator => { - client.error_state(RequestStateError::Impossible) - }, - ClientState::Character => { - // Skip respawn if client entity is alive - if let ControlEvent::Respawn = event { - if stats.get(entity).map_or(true, |s| !s.is_dead) { - continue; - } - } - if let Some(controller) = controllers.get_mut(entity) { - controller.events.push(event); - } - }, - ClientState::Pending => {}, - }, - ClientMsg::ControlAction(event) => match client.client_state { - ClientState::Connected | ClientState::Registered | ClientState::Spectator => { - client.error_state(RequestStateError::Impossible) - }, - ClientState::Character => { - if let Some(controller) = controllers.get_mut(entity) { - controller.actions.push(event); - } - }, - ClientState::Pending => {}, - }, - ClientMsg::ChatMsg(message) => match client.client_state { - ClientState::Connected => client.error_state(RequestStateError::Impossible), - ClientState::Registered | ClientState::Spectator | ClientState::Character => { - match validate_chat_msg(&message) { - Ok(()) => { - if let Some(from) = uids.get(entity) { - let mode = chat_modes.get(entity).cloned().unwrap_or_default(); - let msg = mode.new_message(*from, message); - new_chat_msgs.push((Some(entity), msg)); - } else { - error!("Could not send message. Missing player uid"); - } - }, - Err(ChatMsgValidationError::TooLong) => { - let max = MAX_BYTES_CHAT_MSG; - let len = message.len(); - warn!(?len, ?max, "Received a chat message that's too long") - }, - } - }, - ClientState::Pending => {}, - }, - ClientMsg::PlayerPhysics { pos, vel, ori } => match client.client_state { - ClientState::Character => { - if force_updates.get(entity).is_none() - && stats.get(entity).map_or(true, |s| !s.is_dead) - { - let _ = positions.insert(entity, pos); - let _ = velocities.insert(entity, vel); - let _ = orientations.insert(entity, ori); - } - }, - // Only characters can send positions. - _ => client.error_state(RequestStateError::Impossible), - }, - ClientMsg::BreakBlock(pos) => { - if let Some(block) = can_build.get(entity).and_then(|_| terrain.get(pos).ok()) { - block_changes.set(pos, block.into_vacant()); - } - }, - ClientMsg::PlaceBlock(pos, block) => { - if can_build.get(entity).is_some() { - block_changes.try_set(pos, block); - } - }, - ClientMsg::TerrainChunkRequest { key } => match client.client_state { - ClientState::Connected | ClientState::Registered => { - network_metrics.chunks_request_dropped.inc(); - client.error_state(RequestStateError::Impossible); - }, - ClientState::Spectator | ClientState::Character => { - let in_vd = if let (Some(view_distance), Some(pos)) = ( - players.get(entity).and_then(|p| p.view_distance), - positions.get(entity), - ) { - pos.0.xy().map(|e| e as f64).distance( - key.map(|e| e as f64 + 0.5) - * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), - ) < (view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt()) - * TerrainChunkSize::RECT_SIZE.x as f64 - } else { - true - }; - if in_vd { - match terrain.get_key(key) { - Some(chunk) => { - network_metrics.chunks_served_from_memory.inc(); - client.notify(ServerMsg::TerrainChunkUpdate { - key, - chunk: Ok(Box::new(chunk.clone())), - }) - }, - None => { - network_metrics.chunks_generation_triggered.inc(); - server_emitter.emit(ServerEvent::ChunkRequest(entity, key)) - }, - } - } else { - network_metrics.chunks_request_dropped.inc(); - } - }, - ClientState::Pending => {}, - }, - // Always possible. - ClientMsg::Ping => client.notify(ServerMsg::Pong), - ClientMsg::Pong => {}, - ClientMsg::Disconnect => { - client.notify(ServerMsg::Disconnect(DisconnectReason::Requested)); - }, - ClientMsg::Terminate => { - debug!(?entity, "Client send message to termitate session"); - player_metrics - .clients_disconnected - .with_label_values(&["gracefully"]) - .inc(); - server_emitter.emit(ServerEvent::ClientDisconnect(entity)); - break Ok(()); - }, - ClientMsg::RequestCharacterList => { - if let Some(player) = players.get(entity) { - character_loader.load_character_list(entity, player.uuid().to_string()) - } - }, - ClientMsg::CreateCharacter { alias, tool, body } => { - if let Err(error) = alias_validator.validate(&alias) { - debug!(?error, ?alias, "denied alias as it contained a banned word"); - client.notify(ServerMsg::CharacterActionError(error.to_string())); - } else if let Some(player) = players.get(entity) { - character_creator::create_character( - entity, - player.uuid().to_string(), - alias, - tool, - body, - character_loader, - ); - } - }, - ClientMsg::DeleteCharacter(character_id) => { - if let Some(player) = players.get(entity) { - character_loader.delete_character( - entity, - player.uuid().to_string(), - character_id, - ); - } - }, - ClientMsg::UnlockSkill(skill) => { - stats - .get_mut(entity) - .map(|s| s.skill_set.unlock_skill(skill)); - }, - ClientMsg::RefundSkill(skill) => { - stats - .get_mut(entity) - .map(|s| s.skill_set.refund_skill(skill)); - }, - ClientMsg::UnlockSkillGroup(skill_group_type) => { - stats - .get_mut(entity) - .map(|s| s.skill_set.unlock_skill_group(skill_group_type)); - }, + if let Some(msg) = m1 { + Self::handle_client_msg( + server_emitter, + new_chat_msgs, + entity, + client, + player_metrics, + uids, + chat_modes, + msg, + )?; + } + if let Some(msg) = m2 { + Self::handle_client_in_game_msg( + server_emitter, + entity, + client, + terrain, + network_metrics, + can_build, + force_updates, + stats, + block_changes, + positions, + velocities, + orientations, + players, + controllers, + settings, + msg, + )?; + } + if let Some(msg) = m3 { + Self::handle_client_not_in_game_msg( + server_emitter, + new_chat_msgs, + entity, + client, + character_loader, + uids, + players, + editable_settings, + alias_validator, + msg, + )?; + } + if let Some(msg) = m4 { + Self::handle_ping_msg(client, msg)?; + } + if let Some(msg) = m5 { + Self::handle_register_msg( + player_list, + new_players, + entity, + client, + player_metrics, + login_provider, + admins, + players, + settings, + editable_settings, + msg, + )?; } } } @@ -448,8 +601,8 @@ impl<'a> System<'a> for Sys { WriteStorage<'a, Client>, WriteStorage<'a, Controller>, Read<'a, Settings>, - ReadExpect<'a, AliasValidator>, ReadExpect<'a, EditableSettings>, + ReadExpect<'a, AliasValidator>, ); #[allow(clippy::match_ref_pats)] // TODO: Pending review in #587 @@ -481,8 +634,8 @@ impl<'a> System<'a> for Sys { mut clients, mut controllers, settings, - alias_validator, editable_settings, + alias_validator, ): Self::SystemData, ) { span!(_guard, "run", "message::Sys::run"); @@ -515,7 +668,7 @@ impl<'a> System<'a> for Sys { let network_err: Result<(), crate::error::Error> = block_on(async { //TIMEOUT 0.02 ms for msg handling - let work_future = Self::handle_client_msg( + let work_future = Self::handle_messages( &mut server_emitter, &mut new_chat_msgs, &player_list, @@ -541,8 +694,8 @@ impl<'a> System<'a> for Sys { &mut players, &mut controllers, &settings, - &alias_validator, &editable_settings, + &alias_validator, ); select!( _ = Delay::new(std::time::Duration::from_micros(20)).fuse() => Ok(()), @@ -573,7 +726,7 @@ impl<'a> System<'a> for Sys { server_emitter.emit(ServerEvent::ClientDisconnect(entity)); } else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 * 0.5 { // Try pinging the client if the timeout is nearing. - client.notify(ServerMsg::Ping); + client.send_ping(PingMsg::Ping); } } @@ -587,8 +740,8 @@ impl<'a> System<'a> for Sys { is_admin: admins.get(entity).is_some(), character: None, // new players will be on character select. })); - for client in (&mut clients).join().filter(|c| c.is_registered()) { - client.notify(msg.clone()) + for client in (&mut clients).join().filter(|c| c.registered) { + client.send_msg(msg.clone()) } } } diff --git a/server/src/sys/subscription.rs b/server/src/sys/subscription.rs index 1e0df762bd..2ee739bada 100644 --- a/server/src/sys/subscription.rs +++ b/server/src/sys/subscription.rs @@ -80,7 +80,7 @@ impl<'a> System<'a> for Sys { ) .join() .filter_map(|(client, s, pos, player, e)| { - if client.is_ingame() { + if client.in_game.is_some() { player.view_distance.map(|v| (client, s, pos, v, e)) } else { None @@ -153,7 +153,7 @@ impl<'a> System<'a> for Sys { .map(|key| subscription.regions.contains(key)) .unwrap_or(false) { - client.notify(ServerMsg::DeleteEntity(uid)); + client.send_msg(ServerMsg::DeleteEntity(uid)); } } }, @@ -161,7 +161,7 @@ impl<'a> System<'a> for Sys { } // Tell client to delete entities in the region for (&uid, _) in (&uids, region.entities()).join() { - client.notify(ServerMsg::DeleteEntity(uid)); + client.send_msg(ServerMsg::DeleteEntity(uid)); } } // Send deleted entities since they won't be processed for this client in entity @@ -171,7 +171,7 @@ impl<'a> System<'a> for Sys { .iter() .flat_map(|v| v.iter()) { - client.notify(ServerMsg::DeleteEntity(Uid(*uid))); + client.send_msg(ServerMsg::DeleteEntity(Uid(*uid))); } } @@ -196,7 +196,7 @@ impl<'a> System<'a> for Sys { { // Send message to create entity and tracked components and physics // components - client.notify(ServerMsg::CreateEntity( + client.send_msg(ServerMsg::CreateEntity( tracked_comps.create_entity_package( entity, Some(*pos), @@ -249,7 +249,7 @@ pub fn initialize_region_subscription(world: &World, entity: specs::Entity) { .join() { // Send message to create entity and tracked components and physics components - client.notify(ServerMsg::CreateEntity( + client.send_msg(ServerMsg::CreateEntity( tracked_comps.create_entity_package( entity, Some(*pos), diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 9be68b799b..8ca477e0fd 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -4,7 +4,7 @@ use common::{ comp::{self, bird_medium, Alignment, Player, Pos}, event::{EventBus, ServerEvent}, generation::get_npc_name, - msg::ServerMsg, + msg::ServerInGameMsg, npc::NPC_NAMES, span, state::TerrainChanges, @@ -63,7 +63,7 @@ impl<'a> System<'a> for Sys { Ok((chunk, supplement)) => (chunk, supplement), Err(Some(entity)) => { if let Some(client) = clients.get_mut(entity) { - client.notify(ServerMsg::TerrainChunkUpdate { + client.send_in_game(ServerInGameMsg::TerrainChunkUpdate { key, chunk: Err(()), }); @@ -90,7 +90,7 @@ impl<'a> System<'a> for Sys { .magnitude_squared(); if adjusted_dist_sqr <= view_distance.pow(2) { - client.notify(ServerMsg::TerrainChunkUpdate { + client.send_in_game(ServerInGameMsg::TerrainChunkUpdate { key, chunk: Ok(Box::new(chunk.clone())), }); diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index 8d0e8b5f23..d0d1df5376 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -2,7 +2,7 @@ use super::SysTimer; use crate::client::Client; use common::{ comp::{Player, Pos}, - msg::ServerMsg, + msg::ServerInGameMsg, span, state::TerrainChanges, terrain::TerrainGrid, @@ -38,7 +38,7 @@ impl<'a> System<'a> for Sys { .map(|vd| super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, vd)) .unwrap_or(false) { - client.notify(ServerMsg::TerrainChunkUpdate { + client.send_in_game(ServerInGameMsg::TerrainChunkUpdate { key: *chunk_key, chunk: Ok(Box::new(match terrain.get_key(*chunk_key) { Some(chunk) => chunk.clone(), @@ -51,10 +51,10 @@ impl<'a> System<'a> for Sys { // TODO: Don't send all changed blocks to all clients // Sync changed blocks - let msg = ServerMsg::TerrainBlockUpdates(terrain_changes.modified_blocks.clone()); + let msg = ServerInGameMsg::TerrainBlockUpdates(terrain_changes.modified_blocks.clone()); for (player, client) in (&players, &mut clients).join() { if player.view_distance.is_some() { - client.notify(msg.clone()); + client.send_in_game(msg.clone()); } } diff --git a/server/src/sys/waypoint.rs b/server/src/sys/waypoint.rs index d9b0f0e739..a30911f4cc 100644 --- a/server/src/sys/waypoint.rs +++ b/server/src/sys/waypoint.rs @@ -42,7 +42,7 @@ impl<'a> System<'a> for Sys { if let Ok(wp_old) = waypoints.insert(entity, Waypoint::new(player_pos.0, *time)) { if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) { - client.notify(ServerMsg::Notification(Notification::WaypointSaved)); + client.send_msg(ServerMsg::Notification(Notification::WaypointSaved)); } } } diff --git a/voxygen/src/menu/char_selection/mod.rs b/voxygen/src/menu/char_selection/mod.rs index 1b9d85bf1f..4ec8eb27a7 100644 --- a/voxygen/src/menu/char_selection/mod.rs +++ b/voxygen/src/menu/char_selection/mod.rs @@ -10,7 +10,7 @@ use crate::{ Direction, GlobalState, PlayState, PlayStateResult, }; use client::{self, Client}; -use common::{assets::Asset, comp, msg::ClientState, span, state::DeltaTime}; +use common::{assets::Asset, comp, span, state::DeltaTime}; use specs::WorldExt; use std::{cell::RefCell, rc::Rc}; use tracing::error; @@ -61,8 +61,8 @@ impl PlayState for CharSelectionState { fn tick(&mut self, global_state: &mut GlobalState, events: Vec) -> PlayStateResult { span!(_guard, "tick", "::tick"); - let client_state = self.client.borrow().get_client_state(); - if let ClientState::Pending | ClientState::Registered = client_state { + let client_in_game = self.client.borrow().get_in_game(); + if client_in_game.is_none() { // Handle window events for event in events { if self.char_selection_ui.handle_event(event.clone()) { diff --git a/voxygen/src/session.rs b/voxygen/src/session.rs index 0d0c39a118..6529573f08 100644 --- a/voxygen/src/session.rs +++ b/voxygen/src/session.rs @@ -20,7 +20,6 @@ use common::{ MAX_PICKUP_RANGE_SQR, }, event::EventBus, - msg::ClientState, outcome::Outcome, span, terrain::{Block, BlockKind}, @@ -213,8 +212,11 @@ impl PlayState for SessionState { // TODO: can this be a method on the session or are there borrowcheck issues? - let client_state = self.client.borrow().get_client_state(); - if let ClientState::Pending | ClientState::Character = client_state { + let (client_in_game, client_registered) = { + let client = self.client.borrow(); + (client.get_in_game(), client.get_registered()) + }; + if client_in_game.is_none() { // Update MyEntity // Note: Alternatively, the client could emit an event when the entity changes // which may or may not be more elegant @@ -1088,7 +1090,7 @@ impl PlayState for SessionState { self.cleanup(); PlayStateResult::Continue - } else if let ClientState::Registered = client_state { + } else if client_registered && client_in_game.is_none() { PlayStateResult::Switch(Box::new(CharSelectionState::new( global_state, Rc::clone(&self.client),