diff --git a/client/src/lib.rs b/client/src/lib.rs index 85c77c3e6b..46fefef2d2 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -25,9 +25,9 @@ use common::{ }, event::{EventBus, LocalEvent}, msg::{ - validate_chat_msg, ChatMsgValidationError, ClientGeneral, ClientInGame, ClientMsg, - ClientRegister, ClientType, DisconnectReason, InviteAnswer, Notification, PingMsg, - PlayerInfo, PlayerListUpdate, RegisterError, ServerGeneral, ServerInfo, ServerInit, + validate_chat_msg, ChatMsgValidationError, ClientGeneral, ClientMsg, ClientRegister, + ClientType, DisconnectReason, InviteAnswer, Notification, PingMsg, PlayerInfo, + PlayerListUpdate, PresenceKind, RegisterError, ServerGeneral, ServerInfo, ServerInit, ServerRegisterAnswer, MAX_BYTES_CHAT_MSG, }, outcome::Outcome, @@ -71,7 +71,7 @@ pub enum Event { pub struct Client { registered: bool, - in_game: Option, + presence: Option, thread_pool: ThreadPool, pub server_info: ServerInfo, /// Just the "base" layer for LOD; currently includes colors and nothing @@ -98,7 +98,6 @@ pub struct Client { pub world_map: (Arc, Vec2, Vec2), pub player_list: HashMap, pub character_list: CharacterList, - pub active_character_id: Option, recipe_book: RecipeBook, available_recipes: HashSet, @@ -376,7 +375,7 @@ impl Client { Ok(Self { registered: false, - in_game: None, + presence: None, thread_pool, server_info, world_map, @@ -385,7 +384,6 @@ impl Client { lod_horizon, player_list: HashMap::new(), character_list: CharacterList::default(), - active_character_id: None, recipe_book, available_recipes: HashSet::default(), @@ -467,12 +465,12 @@ impl Client { #[cfg(debug_assertions)] { const C_TYPE: ClientType = ClientType::Game; - let verified = msg.verify(C_TYPE, self.registered, self.in_game); + let verified = msg.verify(C_TYPE, self.registered, self.presence); assert!( verified, format!( - "c_type: {:?}, registered: {}, in_game: {:?}, msg: {:?}", - C_TYPE, self.registered, self.in_game, msg + "c_type: {:?}, registered: {}, presence: {:?}, msg: {:?}", + C_TYPE, self.registered, self.presence, msg ) ); } @@ -500,9 +498,9 @@ impl Client { | ClientGeneral::RefundSkill(_) | ClientGeneral::UnlockSkillGroup(_) => &mut self.in_game_stream, //Always possible - ClientGeneral::ChatMsg(_) - | ClientGeneral::Disconnect - | ClientGeneral::Terminate => &mut self.general_stream, + ClientGeneral::ChatMsg(_) | ClientGeneral::Terminate => { + &mut self.general_stream + }, }; stream.send(msg) }, @@ -528,9 +526,7 @@ impl Client { self.send_msg(ClientGeneral::Character(character_id)); //Assume we are in_game unless server tells us otherwise - self.in_game = Some(ClientInGame::Character); - - self.active_character_id = Some(character_id); + self.presence = Some(PresenceKind::Character(character_id)); } /// Load the current players character list @@ -552,9 +548,11 @@ impl Client { } /// Send disconnect message to the server - pub fn request_logout(&mut self) { - debug!("Requesting logout from server"); - self.send_msg(ClientGeneral::Disconnect); + pub fn logout(&mut self) { + debug!("Sending logout from server"); + self.send_msg(ClientGeneral::Terminate); + self.registered = false; + self.presence = None; } /// Request a state transition to `ClientState::Registered` from an ingame @@ -920,7 +918,7 @@ impl Client { // 1) Handle input from frontend. // Pass character actions from frontend input to the player's entity. - if self.in_game.is_some() { + if self.presence.is_some() { if let Err(e) = self .state .ecs() @@ -1091,7 +1089,7 @@ impl Client { } // 6) Update the server about the player's physics attributes. - if self.in_game.is_some() { + if self.presence.is_some() { if let (Some(pos), Some(vel), Some(ori)) = ( self.state.read_storage().get(self.entity).cloned(), self.state.read_storage().get(self.entity).cloned(), @@ -1133,11 +1131,6 @@ impl Client { match msg { ServerGeneral::Disconnect(reason) => match reason { DisconnectReason::Shutdown => return Err(Error::ServerShutdown), - DisconnectReason::Requested => { - debug!("finally sending ClientMsg::Terminate"); - frontend_events.push(Event::Disconnect); - self.send_msg_err(ClientGeneral::Terminate)?; - }, DisconnectReason::Kicked(reason) => { debug!("sending ClientMsg::Terminate because we got kicked"); frontend_events.push(Event::Kicked(reason)); @@ -1378,9 +1371,9 @@ impl Client { }; frontend_events.push(Event::Chat(comp::ChatType::Meta.chat_msg(msg))); }, - // Cleanup for when the client goes back to the `in_game = None` + // Cleanup for when the client goes back to the `presence = None` ServerGeneral::ExitInGameSuccess => { - self.in_game = None; + self.presence = None; self.clean_state(); }, ServerGeneral::InventoryUpdate(mut inventory, event) => { @@ -1441,7 +1434,7 @@ impl Client { }, ServerGeneral::CharacterDataLoadError(error) => { trace!("Handling join error by server"); - self.in_game = None; + self.presence = None; self.clean_state(); self.character_list.error = Some(error); }, @@ -1551,7 +1544,7 @@ impl Client { pub fn uid(&self) -> Option { self.state.read_component_copied(self.entity) } - pub fn in_game(&self) -> Option { self.in_game } + pub fn presence(&self) -> Option { self.presence } pub fn registered(&self) -> bool { self.registered } @@ -1840,7 +1833,7 @@ impl Drop for Client { fn drop(&mut self) { trace!("Dropping client"); if self.registered { - if let Err(e) = self.send_msg_err(ClientGeneral::Disconnect) { + if let Err(e) = self.send_msg_err(ClientGeneral::Terminate) { warn!( ?e, "Error during drop of client, couldn't send disconnect package, is the \ diff --git a/common/src/comp/player.rs b/common/src/comp/player.rs index 7b12b8741a..d8a5a90307 100644 --- a/common/src/comp/player.rs +++ b/common/src/comp/player.rs @@ -1,4 +1,3 @@ -use crate::character::CharacterId; use authc::Uuid; use serde::{Deserialize, Serialize}; use specs::{Component, FlaggedStorage, NullStorage}; @@ -9,25 +8,11 @@ const MAX_ALIAS_LEN: usize = 32; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Player { pub alias: String, - pub character_id: Option, - pub view_distance: Option, uuid: Uuid, } impl Player { - pub fn new( - alias: String, - character_id: Option, - view_distance: Option, - uuid: Uuid, - ) -> Self { - Self { - alias, - character_id, - view_distance, - uuid, - } - } + pub fn new(alias: String, uuid: Uuid) -> Self { Self { alias, uuid } } pub fn is_valid(&self) -> bool { Self::alias_is_valid(&self.alias) } diff --git a/common/src/msg/client.rs b/common/src/msg/client.rs index e8b8be3b02..c9c5bb1fba 100644 --- a/common/src/msg/client.rs +++ b/common/src/msg/client.rs @@ -78,7 +78,6 @@ pub enum ClientGeneral { UnlockSkillGroup(SkillGroupType), //Always possible ChatMsg(String), - Disconnect, Terminate, } @@ -87,21 +86,21 @@ impl ClientMsg { &self, c_type: ClientType, registered: bool, - in_game: Option, + presence: Option, ) -> bool { match self { ClientMsg::Type(t) => c_type == *t, - ClientMsg::Register(_) => !registered && in_game.is_none(), + ClientMsg::Register(_) => !registered && presence.is_none(), ClientMsg::General(g) => { registered && match g { ClientGeneral::RequestCharacterList | ClientGeneral::CreateCharacter { .. } | ClientGeneral::DeleteCharacter(_) => { - c_type != ClientType::ChatOnly && in_game.is_none() + c_type != ClientType::ChatOnly && presence.is_none() }, ClientGeneral::Character(_) | ClientGeneral::Spectate => { - c_type == ClientType::Game && in_game.is_none() + c_type == ClientType::Game && presence.is_none() }, //Only in game ClientGeneral::ControllerInputs(_) @@ -116,12 +115,10 @@ impl ClientMsg { | ClientGeneral::UnlockSkill(_) | ClientGeneral::RefundSkill(_) | ClientGeneral::UnlockSkillGroup(_) => { - c_type == ClientType::Game && in_game.is_some() + c_type == ClientType::Game && presence.is_some() }, //Always possible - ClientGeneral::ChatMsg(_) - | ClientGeneral::Disconnect - | ClientGeneral::Terminate => true, + ClientGeneral::ChatMsg(_) | ClientGeneral::Terminate => true, } }, ClientMsg::Ping(_) => true, diff --git a/common/src/msg/mod.rs b/common/src/msg/mod.rs index c25cb8a8a1..4580088795 100644 --- a/common/src/msg/mod.rs +++ b/common/src/msg/mod.rs @@ -13,12 +13,13 @@ pub use self::{ }, world_msg::WorldMapMsg, }; +use crate::character::CharacterId; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] -pub enum ClientInGame { +pub enum PresenceKind { Spectator, - Character, + Character(CharacterId), } #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] diff --git a/common/src/msg/server.rs b/common/src/msg/server.rs index eda77ac4ab..625d07b0e1 100644 --- a/common/src/msg/server.rs +++ b/common/src/msg/server.rs @@ -163,8 +163,6 @@ pub enum Notification { pub enum DisconnectReason { /// Server shut down Shutdown, - /// Client sent disconnect message - Requested, /// Client was kicked Kicked(String), } @@ -184,11 +182,11 @@ impl ServerMsg { &self, c_type: ClientType, registered: bool, - in_game: Option, + presence: Option, ) -> bool { match self { ServerMsg::Info(_) | ServerMsg::Init(_) | ServerMsg::RegisterAnswer(_) => { - !registered && in_game.is_none() + !registered && presence.is_none() }, ServerMsg::General(g) => { registered @@ -197,10 +195,10 @@ impl ServerMsg { ServerGeneral::CharacterDataLoadError(_) | ServerGeneral::CharacterListUpdate(_) | ServerGeneral::CharacterActionError(_) => { - c_type != ClientType::ChatOnly && in_game.is_none() + c_type != ClientType::ChatOnly && presence.is_none() }, ServerGeneral::CharacterSuccess => { - c_type == ClientType::Game && in_game.is_none() + c_type == ClientType::Game && presence.is_none() }, //Ingame related ServerGeneral::GroupUpdate(_) @@ -214,7 +212,7 @@ impl ServerMsg { | ServerGeneral::SetViewDistance(_) | ServerGeneral::Outcomes(_) | ServerGeneral::Knockback(_) => { - c_type == ClientType::Game && in_game.is_some() + c_type == ClientType::Game && presence.is_some() }, // Always possible ServerGeneral::PlayerListUpdate(_) diff --git a/server-cli/src/shutdown_coordinator.rs b/server-cli/src/shutdown_coordinator.rs index 59705c2003..fa9be90b25 100644 --- a/server-cli/src/shutdown_coordinator.rs +++ b/server-cli/src/shutdown_coordinator.rs @@ -155,7 +155,7 @@ impl ShutdownCoordinator { /// Logs and sends a message to all connected clients fn send_msg(server: &mut Server, msg: String) { info!("{}", &msg); - server.notify_registered_clients(ChatType::CommandError.server_msg(msg)); + server.notify_players(ChatType::CommandError.server_msg(msg)); } /// Converts a `Duration` into text in the format XsXm for example 1 minute diff --git a/server/src/client.rs b/server/src/client.rs index e706689d80..8c4b2655de 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,72 +1,81 @@ -use crate::error::Error; -use common::msg::{ClientInGame, ClientType, ServerGeneral, ServerMsg}; -use hashbrown::HashSet; -use network::{Participant, Stream}; +use common::msg::{ClientType, ServerGeneral, ServerMsg}; +use network::{Message, Participant, Stream, StreamError}; use serde::{de::DeserializeOwned, Serialize}; -use specs::{Component, FlaggedStorage}; +use specs::Component; use specs_idvs::IdvStorage; -use tracing::debug; -use vek::*; +use std::sync::{atomic::AtomicBool, Mutex}; +/// Client handles ALL network related information of everything that connects +/// to the server Client DOES NOT handle game states +/// Client DOES NOT handle network information that is only relevant to some +/// "things" connecting to the server (there is currently no such case). First a +/// Client connects to the game, when it registers, it gets the `Player` +/// component, when he enters the game he gets the `InGame` component. pub struct Client { - pub registered: bool, pub client_type: ClientType, - pub in_game: Option, pub participant: Option, - pub general_stream: Stream, - pub ping_stream: Stream, - pub register_stream: Stream, - pub character_screen_stream: Stream, - pub in_game_stream: Stream, - pub network_error: bool, - pub last_ping: f64, - pub login_msg_sent: bool, + pub last_ping: Mutex, + pub login_msg_sent: AtomicBool, + pub terminate_msg_recv: AtomicBool, + + //TODO: improve network crate so that `send` is no longer `&mut self` and we can get rid of + // this Mutex. This Mutex is just to please the compiler as we do not get into contention + general_stream: Mutex, + ping_stream: Mutex, + register_stream: Mutex, + character_screen_stream: Mutex, + in_game_stream: Mutex, +} + +pub struct PreparedMsg { + stream_id: u8, + message: Message, } impl Component for Client { - type Storage = FlaggedStorage>; + type Storage = IdvStorage; } impl Client { - fn internal_send(err: &mut bool, s: &mut Stream, msg: M) { - if !*err { - if let Err(e) = s.send(msg) { - debug!(?e, "got a network error with client"); - *err = true; - } + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + client_type: ClientType, + participant: Participant, + last_ping: f64, + general_stream: Stream, + ping_stream: Stream, + register_stream: Stream, + character_screen_stream: Stream, + in_game_stream: Stream, + ) -> Self { + Client { + client_type, + participant: Some(participant), + last_ping: Mutex::new(last_ping), + login_msg_sent: AtomicBool::new(false), + terminate_msg_recv: AtomicBool::new(false), + general_stream: Mutex::new(general_stream), + ping_stream: Mutex::new(ping_stream), + register_stream: Mutex::new(register_stream), + character_screen_stream: Mutex::new(character_screen_stream), + in_game_stream: Mutex::new(in_game_stream), } } - /* - fn internal_send_raw(b: &AtomicBool, s: &mut Stream, msg: Arc) { - if !b.load(Ordering::Relaxed) { - if let Err(e) = s.send_raw(msg) { - debug!(?e, "got a network error with client"); - b.store(true, Ordering::Relaxed); - } - } - } - */ - - pub fn send_msg(&mut self, msg: S) - where - S: Into, - { - const ERR: &str = - "Don't do that. Sending these messages is only done ONCE at connect and not by this fn"; + pub(crate) fn send>(&self, msg: M) -> Result<(), StreamError> { match msg.into() { - ServerMsg::Info(_) => panic!(ERR), - ServerMsg::Init(_) => panic!(ERR), - ServerMsg::RegisterAnswer(msg) => { - Self::internal_send(&mut self.network_error, &mut self.register_stream, &msg) - }, - ServerMsg::General(msg) => { - let stream = match &msg { + ServerMsg::Info(m) => self.register_stream.try_lock().unwrap().send(m), + ServerMsg::Init(m) => self.register_stream.try_lock().unwrap().send(m), + ServerMsg::RegisterAnswer(m) => self.register_stream.try_lock().unwrap().send(m), + ServerMsg::General(g) => { + match g { //Character Screen related ServerGeneral::CharacterDataLoadError(_) | ServerGeneral::CharacterListUpdate(_) | ServerGeneral::CharacterActionError(_) - | ServerGeneral::CharacterSuccess => &mut self.character_screen_stream, + | ServerGeneral::CharacterSuccess => { + self.character_screen_stream.try_lock().unwrap().send(g) + }, //Ingame related ServerGeneral::GroupUpdate(_) | ServerGeneral::GroupInvite { .. } @@ -78,7 +87,9 @@ impl Client { | ServerGeneral::TerrainBlockUpdates(_) | ServerGeneral::SetViewDistance(_) | ServerGeneral::Outcomes(_) - | ServerGeneral::Knockback(_) => &mut self.in_game_stream, + | ServerGeneral::Knockback(_) => { + self.in_game_stream.try_lock().unwrap().send(g) + }, // Always possible ServerGeneral::PlayerListUpdate(_) | ServerGeneral::ChatMsg(_) @@ -89,46 +100,109 @@ impl Client { | ServerGeneral::CreateEntity(_) | ServerGeneral::DeleteEntity(_) | ServerGeneral::Disconnect(_) - | ServerGeneral::Notification(_) => &mut self.general_stream, - }; - Self::internal_send(&mut self.network_error, stream, &msg) + | ServerGeneral::Notification(_) => { + self.general_stream.try_lock().unwrap().send(g) + }, + } }, - ServerMsg::Ping(msg) => { - Self::internal_send(&mut self.network_error, &mut self.ping_stream, &msg) - }, - }; + ServerMsg::Ping(m) => self.ping_stream.try_lock().unwrap().send(m), + } } - pub async fn internal_recv( - err: &mut bool, - s: &mut Stream, - ) -> Result { - if !*err { - match s.recv().await { - Ok(r) => Ok(r), - Err(e) => { - debug!(?e, "got a network error with client while recv"); - *err = true; - Err(Error::StreamErr(e)) - }, - } - } else { - Err(Error::StreamErr(network::StreamError::StreamClosed)) + pub(crate) fn send_fallible>(&self, msg: M) { let _ = self.send(msg); } + + pub(crate) fn send_prepared(&self, msg: &PreparedMsg) -> Result<(), StreamError> { + match msg.stream_id { + 0 => self + .register_stream + .try_lock() + .unwrap() + .send_raw(&msg.message), + 1 => self + .character_screen_stream + .try_lock() + .unwrap() + .send_raw(&msg.message), + 2 => self + .in_game_stream + .try_lock() + .unwrap() + .send_raw(&msg.message), + 3 => self + .general_stream + .try_lock() + .unwrap() + .send_raw(&msg.message), + 4 => self.ping_stream.try_lock().unwrap().send_raw(&msg.message), + _ => unreachable!("invalid stream id"), + } + } + + pub(crate) fn prepare>(&self, msg: M) -> PreparedMsg { + match msg.into() { + ServerMsg::Info(m) => PreparedMsg::new(0, &m, &self.register_stream), + ServerMsg::Init(m) => PreparedMsg::new(0, &m, &self.register_stream), + ServerMsg::RegisterAnswer(m) => PreparedMsg::new(0, &m, &self.register_stream), + ServerMsg::General(g) => { + match g { + //Character Screen related + ServerGeneral::CharacterDataLoadError(_) + | ServerGeneral::CharacterListUpdate(_) + | ServerGeneral::CharacterActionError(_) + | ServerGeneral::CharacterSuccess => { + PreparedMsg::new(1, &g, &self.character_screen_stream) + }, + //Ingame related + ServerGeneral::GroupUpdate(_) + | ServerGeneral::GroupInvite { .. } + | ServerGeneral::InvitePending(_) + | ServerGeneral::InviteComplete { .. } + | ServerGeneral::ExitInGameSuccess + | ServerGeneral::InventoryUpdate(_, _) + | ServerGeneral::TerrainChunkUpdate { .. } + | ServerGeneral::TerrainBlockUpdates(_) + | ServerGeneral::SetViewDistance(_) + | ServerGeneral::Outcomes(_) + | ServerGeneral::Knockback(_) => PreparedMsg::new(2, &g, &self.in_game_stream), + // Always possible + ServerGeneral::PlayerListUpdate(_) + | ServerGeneral::ChatMsg(_) + | ServerGeneral::SetPlayerEntity(_) + | ServerGeneral::TimeOfDay(_) + | ServerGeneral::EntitySync(_) + | ServerGeneral::CompSync(_) + | ServerGeneral::CreateEntity(_) + | ServerGeneral::DeleteEntity(_) + | ServerGeneral::Disconnect(_) + | ServerGeneral::Notification(_) => { + PreparedMsg::new(3, &g, &self.general_stream) + }, + } + }, + ServerMsg::Ping(m) => PreparedMsg::new(4, &m, &self.ping_stream), + } + } + + pub(crate) fn recv( + &self, + stream_id: u8, + ) -> Result, StreamError> { + match stream_id { + 0 => self.register_stream.try_lock().unwrap().try_recv(), + 1 => self.character_screen_stream.try_lock().unwrap().try_recv(), + 2 => self.in_game_stream.try_lock().unwrap().try_recv(), + 3 => self.general_stream.try_lock().unwrap().try_recv(), + 4 => self.ping_stream.try_lock().unwrap().try_recv(), + _ => unreachable!("invalid stream id"), } } } -// Distance from fuzzy_chunk before snapping to current chunk -pub const CHUNK_FUZZ: u32 = 2; -// Distance out of the range of a region before removing it from subscriptions -pub const REGION_FUZZ: u32 = 16; - -#[derive(Clone, Debug)] -pub struct RegionSubscription { - pub fuzzy_chunk: Vec2, - pub regions: HashSet>, -} - -impl Component for RegionSubscription { - type Storage = FlaggedStorage>; +impl PreparedMsg { + fn new(id: u8, msg: &M, stream: &Mutex) -> PreparedMsg { + Self { + stream_id: id, + message: Message::serialize(&msg, &stream.try_lock().unwrap()), + } + } } diff --git a/server/src/cmd.rs b/server/src/cmd.rs index e747e725ad..a573c94878 100644 --- a/server/src/cmd.rs +++ b/server/src/cmd.rs @@ -3,7 +3,6 @@ //! `CHAT_COMMANDS` and provide a handler function. use crate::{ - client::Client, settings::{BanRecord, EditableSetting}, Server, StateExt, }; @@ -27,7 +26,7 @@ use std::convert::TryFrom; use vek::*; use world::util::Sampler; -use crate::login_provider::LoginProvider; +use crate::{client::Client, login_provider::LoginProvider}; use scan_fmt::{scan_fmt, scan_fmt_some}; use tracing::error; @@ -508,11 +507,11 @@ fn handle_alias( *uid, player.alias.clone(), )); - server.state.notify_registered_clients(msg); + server.state.notify_players(msg); // Announce alias change if target has a Body. if ecs.read_storage::().get(target).is_some() { - server.state.notify_registered_clients( + server.state.notify_players( ChatType::CommandInfo .server_msg(format!("{} is now known as {}.", old_alias, player.alias)), ); @@ -650,7 +649,7 @@ fn handle_spawn( // Add to group system if a pet if matches!(alignment, comp::Alignment::Owned { .. }) { let state = server.state(); - let mut clients = state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let uids = state.ecs().read_storage::(); let mut group_manager = state.ecs().write_resource::(); @@ -663,14 +662,14 @@ fn handle_spawn( &uids, &mut |entity, group_change| { clients - .get_mut(entity) + .get(entity) .and_then(|c| { group_change .try_map(|e| uids.get(e).copied()) .map(|g| (g, c)) }) .map(|(g, c)| { - c.send_msg(ServerGeneral::GroupUpdate(g)) + c.send_fallible(ServerGeneral::GroupUpdate(g)); }); }, ); @@ -1211,7 +1210,7 @@ fn handle_adminify( .expect("Player should have uid"), is_admin, )); - server.state.notify_registered_clients(msg); + server.state.notify_players(msg); }, None => { server.notify_client( @@ -1668,11 +1667,9 @@ fn handle_set_level( .read_storage::() .get(player) .expect("Failed to get uid for player"); - server - .state - .notify_registered_clients(ServerGeneral::PlayerListUpdate( - PlayerListUpdate::LevelChange(uid, lvl), - )); + server.state.notify_players(ServerGeneral::PlayerListUpdate( + PlayerListUpdate::LevelChange(uid, lvl), + )); if let Some(stats) = server .state diff --git a/server/src/connection_handler.rs b/server/src/connection_handler.rs index 2f79f1795d..bbc04605cd 100644 --- a/server/src/connection_handler.rs +++ b/server/src/connection_handler.rs @@ -13,10 +13,12 @@ pub(crate) struct ServerInfoPacket { pub time: f64, } +pub(crate) type IncomingClient = Client; + pub(crate) struct ConnectionHandler { _network: Arc, thread_handle: Option>, - pub client_receiver: Receiver, + pub client_receiver: Receiver, pub info_requester_receiver: Receiver>, stop_sender: Option>, } @@ -31,7 +33,7 @@ impl ConnectionHandler { let network_clone = Arc::clone(&network); let (stop_sender, stop_receiver) = oneshot::channel(); - let (client_sender, client_receiver) = unbounded::(); + let (client_sender, client_receiver) = unbounded::(); let (info_requester_sender, info_requester_receiver) = bounded::>(1); @@ -55,7 +57,7 @@ impl ConnectionHandler { async fn work( network: Arc, - client_sender: Sender, + client_sender: Sender, info_requester_sender: Sender>, stop_receiver: oneshot::Receiver<()>, ) { @@ -92,7 +94,7 @@ impl ConnectionHandler { async fn init_participant( participant: Participant, - client_sender: Sender, + client_sender: Sender, info_requester_sender: Sender>, ) -> Result<(), Box> { debug!("New Participant connected to the server"); @@ -118,26 +120,22 @@ impl ConnectionHandler { t = register_stream.recv::().fuse() => Some(t), ) { None => { - debug!("slow client connection detected, dropping it"); + debug!("Timeout for incoming client elapsed, aborting connection"); return Ok(()); }, Some(client_type) => client_type?, }; - let client = Client { - registered: false, + let client = Client::new( client_type, - in_game: None, - participant: Some(participant), + participant, + server_data.time, general_stream, ping_stream, register_stream, - in_game_stream, character_screen_stream, - network_error: false, - last_ping: server_data.time, - login_msg_sent: false, - }; + in_game_stream, + ); client_sender.send(client)?; Ok(()) diff --git a/server/src/events/entity_manipulation.rs b/server/src/events/entity_manipulation.rs index cb175c4c20..48ae6b7563 100644 --- a/server/src/events/entity_manipulation.rs +++ b/server/src/events/entity_manipulation.rs @@ -42,9 +42,9 @@ pub fn handle_knockback(server: &Server, entity: EcsEntity, impulse: Vec3) if let Some(vel) = velocities.get_mut(entity) { vel.0 = impulse; } - let mut clients = state.ecs().write_storage::(); - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ServerGeneral::Knockback(impulse)); + let clients = state.ecs().read_storage::(); + if let Some(client) = clients.get(entity) { + client.send_fallible(ServerGeneral::Knockback(impulse)); } } @@ -199,9 +199,8 @@ pub fn handle_destroy(server: &mut Server, entity: EcsEntity, cause: HealthSourc | HealthSource::Healing { by: _ } | HealthSource::Unknown => KillSource::Other, }; - state.notify_registered_clients( - comp::ChatType::Kill(kill_source, *uid).server_msg("".to_string()), - ); + state + .notify_players(comp::ChatType::Kill(kill_source, *uid).server_msg("".to_string())); } } @@ -667,11 +666,9 @@ pub fn handle_level_up(server: &mut Server, entity: EcsEntity, new_level: u32) { .get(entity) .expect("Failed to fetch uid component for entity."); - server - .state - .notify_registered_clients(ServerGeneral::PlayerListUpdate( - PlayerListUpdate::LevelChange(*uid, new_level), - )); + server.state.notify_players(ServerGeneral::PlayerListUpdate( + PlayerListUpdate::LevelChange(*uid, new_level), + )); } pub fn handle_buff(server: &mut Server, entity: EcsEntity, buff_change: buff::BuffChange) { diff --git a/server/src/events/group_manip.rs b/server/src/events/group_manip.rs index 78c0df462b..65598019c9 100644 --- a/server/src/events/group_manip.rs +++ b/server/src/events/group_manip.rs @@ -25,13 +25,13 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani match manip { GroupManip::Invite(uid) => { - let mut clients = state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let invitee = match state.ecs().entity_from_uid(uid.into()) { Some(t) => t, None => { // Inform of failure - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(client) = clients.get(entity) { + client.send_fallible( ChatType::Meta .server_msg("Invite failed, target does not exist.".to_owned()), ); @@ -62,8 +62,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }); if already_in_same_group { // Inform of failure - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ChatType::Meta.server_msg( + if let Some(general_stream) = clients.get(entity) { + general_stream.send_fallible(ChatType::Meta.server_msg( "Invite failed, can't invite someone already in your group".to_owned(), )); } @@ -92,8 +92,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani >= max_group_size as usize; if group_size_limit_reached { // Inform inviter that they have reached the group size limit - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(general_stream) = clients.get(entity) { + general_stream.send_fallible( ChatType::Meta.server_msg( "Invite failed, pending invites plus current group size have reached \ the group size limit" @@ -109,8 +109,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if invites.contains(invitee) { // Inform inviter that there is already an invite - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(client) = clients.get(entity) { + client.send_fallible( ChatType::Meta .server_msg("This player already has a pending invite.".to_owned()), ); @@ -151,32 +151,31 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }; // If client comp - if let (Some(client), Some(inviter)) = - (clients.get_mut(invitee), uids.get(entity).copied()) + if let (Some(client), Some(inviter)) = (clients.get(invitee), uids.get(entity).copied()) { if send_invite() { - client.send_msg(ServerGeneral::GroupInvite { + client.send_fallible(ServerGeneral::GroupInvite { inviter, timeout: PRESENTED_INVITE_TIMEOUT_DUR, }); } } else if agents.contains(invitee) { send_invite(); - } else if let Some(client) = clients.get_mut(entity) { - client.send_msg( + } else if let Some(client) = clients.get(entity) { + client.send_fallible( ChatType::Meta.server_msg("Can't invite, not a player or npc".to_owned()), ); } // Notify inviter that the invite is pending if invite_sent { - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ServerGeneral::InvitePending(uid)); + if let Some(client) = clients.get(entity) { + client.send_fallible(ServerGeneral::InvitePending(uid)); } } }, GroupManip::Accept => { - let mut clients = state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let uids = state.ecs().read_storage::(); let mut invites = state.ecs().write_storage::(); if let Some(inviter) = invites.remove(entity).and_then(|invite| { @@ -194,12 +193,12 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani Some(inviter) }) { if let (Some(client), Some(target)) = - (clients.get_mut(inviter), uids.get(entity).copied()) + (clients.get(inviter), uids.get(entity).copied()) { - client.send_msg(ServerGeneral::InviteComplete { + client.send_fallible(ServerGeneral::InviteComplete { target, answer: InviteAnswer::Accepted, - }) + }); } let mut group_manager = state.ecs().write_resource::(); group_manager.add_group_member( @@ -211,19 +210,19 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &uids, |entity, group_change| { clients - .get_mut(entity) + .get(entity) .and_then(|c| { group_change .try_map(|e| uids.get(e).copied()) .map(|g| (g, c)) }) - .map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g))); + .map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g))); }, ); } }, GroupManip::Decline => { - let mut clients = state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let uids = state.ecs().read_storage::(); let mut invites = state.ecs().write_storage::(); if let Some(inviter) = invites.remove(entity).and_then(|invite| { @@ -242,17 +241,17 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }) { // Inform inviter of rejection if let (Some(client), Some(target)) = - (clients.get_mut(inviter), uids.get(entity).copied()) + (clients.get(inviter), uids.get(entity).copied()) { - client.send_msg(ServerGeneral::InviteComplete { + client.send_fallible(ServerGeneral::InviteComplete { target, answer: InviteAnswer::Declined, - }) + }); } } }, GroupManip::Leave => { - let mut clients = state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let uids = state.ecs().read_storage::(); let mut group_manager = state.ecs().write_resource::(); group_manager.leave_group( @@ -263,18 +262,18 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &state.ecs().entities(), &mut |entity, group_change| { clients - .get_mut(entity) + .get(entity) .and_then(|c| { group_change .try_map(|e| uids.get(e).copied()) .map(|g| (g, c)) }) - .map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g))); + .map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g))); }, ); }, GroupManip::Kick(uid) => { - let mut clients = state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let uids = state.ecs().read_storage::(); let alignments = state.ecs().read_storage::(); @@ -282,8 +281,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani Some(t) => t, None => { // Inform of failure - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(client) = clients.get(entity) { + client.send_fallible( ChatType::Meta .server_msg("Kick failed, target does not exist.".to_owned()), ); @@ -295,8 +294,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Can't kick pet if matches!(alignments.get(target), Some(comp::Alignment::Owned(owner)) if uids.get(target).map_or(true, |u| u != owner)) { - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(general_stream) = clients.get(entity) { + general_stream.send_fallible( ChatType::Meta.server_msg("Kick failed, you can't kick pets.".to_owned()), ); } @@ -304,8 +303,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani } // Can't kick yourself if uids.get(entity).map_or(false, |u| *u == uid) { - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(client) = clients.get(entity) { + client.send_fallible( ChatType::Meta .server_msg("Kick failed, you can't kick yourself.".to_owned()), ); @@ -330,40 +329,41 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &state.ecs().entities(), &mut |entity, group_change| { clients - .get_mut(entity) + .get(entity) .and_then(|c| { group_change .try_map(|e| uids.get(e).copied()) .map(|g| (g, c)) }) - .map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g))); + .map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g))); }, ); // Tell them the have been kicked - if let Some(client) = clients.get_mut(target) { - client.send_msg( + if let Some(client) = clients.get(target) { + client.send_fallible( ChatType::Meta .server_msg("You were removed from the group.".to_owned()), ); } // Tell kicker that they were succesful - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ChatType::Meta.server_msg("Player kicked.".to_owned())); + if let Some(client) = clients.get(entity) { + client + .send_fallible(ChatType::Meta.server_msg("Player kicked.".to_owned())); } }, Some(_) => { // Inform kicker that they are not the leader - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ChatType::Meta.server_msg( + if let Some(client) = clients.get(entity) { + client.send_fallible(ChatType::Meta.server_msg( "Kick failed: You are not the leader of the target's group.".to_owned(), )); } }, None => { // Inform kicker that the target is not in a group - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(client) = clients.get(entity) { + client.send_fallible( ChatType::Meta.server_msg( "Kick failed: Your target is not in a group.".to_owned(), ), @@ -373,14 +373,14 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani } }, GroupManip::AssignLeader(uid) => { - let mut clients = state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let uids = state.ecs().read_storage::(); let target = match state.ecs().entity_from_uid(uid.into()) { Some(t) => t, None => { // Inform of failure - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ChatType::Meta.server_msg( + if let Some(client) = clients.get(entity) { + client.send_fallible(ChatType::Meta.server_msg( "Leadership transfer failed, target does not exist".to_owned(), )); } @@ -404,24 +404,24 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &uids, |entity, group_change| { clients - .get_mut(entity) + .get(entity) .and_then(|c| { group_change .try_map(|e| uids.get(e).copied()) .map(|g| (g, c)) }) - .map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g))); + .map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g))); }, ); // Tell them they are the leader - if let Some(client) = clients.get_mut(target) { - client.send_msg( + if let Some(client) = clients.get(target) { + client.send_fallible( ChatType::Meta.server_msg("You are the group leader now.".to_owned()), ); } // Tell the old leader that the transfer was succesful - if let Some(client) = clients.get_mut(target) { - client.send_msg( + if let Some(client) = clients.get(target) { + client.send_fallible( ChatType::Meta .server_msg("You are no longer the group leader.".to_owned()), ); @@ -429,8 +429,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }, Some(_) => { // Inform transferer that they are not the leader - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(client) = clients.get(entity) { + client.send_fallible( ChatType::Meta.server_msg( "Transfer failed: You are not the leader of the target's group." .to_owned(), @@ -440,8 +440,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }, None => { // Inform transferer that the target is not in a group - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ChatType::Meta.server_msg( + if let Some(client) = clients.get(entity) { + client.send_fallible(ChatType::Meta.server_msg( "Transfer failed: Your target is not in a group.".to_owned(), )); } diff --git a/server/src/events/interaction.rs b/server/src/events/interaction.rs index ff9f2e467a..9a0cfcb32f 100644 --- a/server/src/events/interaction.rs +++ b/server/src/events/interaction.rs @@ -1,7 +1,4 @@ -use crate::{ - client::{Client, RegionSubscription}, - Server, -}; +use crate::{client::Client, presence::RegionSubscription, Server}; use common::{ comp::{self, item, Pos}, consts::MAX_MOUNT_RANGE, @@ -118,85 +115,80 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) { return; } - // You can't possess other players let mut clients = ecs.write_storage::(); - if clients.get_mut(possesse).is_none() { - if let Some(mut client) = clients.remove(possessor) { - client.send_msg(ServerGeneral::SetPlayerEntity(possesse_uid)); - clients - .insert(possesse, client) - .err() - .map(|e| error!(?e, "Error inserting client component during possession")); - // Put possess item into loadout - let mut loadouts = ecs.write_storage::(); - let loadout = loadouts - .entry(possesse) - .expect("Could not read loadouts component while possessing") - .or_insert(comp::Loadout::default()); - let item = comp::Item::new_from_asset_expect("common.items.debug.possess"); - if let item::ItemKind::Tool(tool) = item.kind() { - let mut abilities = tool.get_abilities(); - let mut ability_drain = abilities.drain(..); - let debug_item = comp::ItemConfig { - item, - ability1: ability_drain.next(), - ability2: ability_drain.next(), - ability3: ability_drain.next(), - block_ability: None, - dodge_ability: None, - }; - std::mem::swap(&mut loadout.active_item, &mut loadout.second_item); - loadout.active_item = Some(debug_item); - } - - // Move player component - { - let mut players = ecs.write_storage::(); - if let Some(player) = players.remove(possessor) { - players.insert(possesse, player).err().map(|e| { - error!(?e, "Error inserting player component during possession") - }); - } - } - // Transfer region subscription - { - let mut subscriptions = ecs.write_storage::(); - if let Some(s) = subscriptions.remove(possessor) { - subscriptions.insert(possesse, s).err().map(|e| { - error!( - ?e, - "Error inserting subscription component during possession" - ) - }); - } - } - // Remove will of the entity - ecs.write_storage::().remove(possesse); - // Reset controller of former shell - ecs.write_storage::() - .get_mut(possessor) - .map(|c| c.reset()); - // Transfer admin powers - { - let mut admins = ecs.write_storage::(); - if let Some(admin) = admins.remove(possessor) { - admins.insert(possesse, admin).err().map(|e| { - error!(?e, "Error inserting admin component during possession") - }); - } - } - // Transfer waypoint - { - let mut waypoints = ecs.write_storage::(); - if let Some(waypoint) = waypoints.remove(possessor) { - waypoints.insert(possesse, waypoint).err().map(|e| { - error!(?e, "Error inserting waypoint component during possession",) - }); - } - } - } + if clients.get_mut(possesse).is_some() { + error!("can't possess other players"); + return; } + + match (|| -> Option> { + let c = clients.remove(possessor)?; + clients.insert(possesse, c).ok()?; + //optional entities + let mut players = ecs.write_storage::(); + let mut subscriptions = ecs.write_storage::(); + let mut admins = ecs.write_storage::(); + let mut waypoints = ecs.write_storage::(); + players + .remove(possessor) + .map(|p| players.insert(possesse, p).ok()?); + subscriptions + .remove(possessor) + .map(|s| subscriptions.insert(possesse, s).ok()?); + admins + .remove(possessor) + .map(|a| admins.insert(possesse, a).ok()?); + waypoints + .remove(possessor) + .map(|w| waypoints.insert(possesse, w).ok()?); + + Some(Ok(())) + })() { + Some(Ok(())) => (), + Some(Err(e)) => { + error!(?e, ?possesse, "Error inserting component during possession"); + return; + }, + None => { + error!(?possessor, "Error removing component during possession"); + return; + }, + } + + clients + .get_mut(possesse) + .map(|c| c.send_fallible(ServerGeneral::SetPlayerEntity(possesse_uid))); + + // Put possess item into loadout + let mut loadouts = ecs.write_storage::(); + let loadout = loadouts + .entry(possesse) + .expect("Could not read loadouts component while possessing") + .or_insert(comp::Loadout::default()); + + let item = comp::Item::new_from_asset_expect("common.items.debug.possess"); + if let item::ItemKind::Tool(tool) = item.kind() { + let mut abilities = tool.get_abilities(); + let mut ability_drain = abilities.drain(..); + let debug_item = comp::ItemConfig { + item, + ability1: ability_drain.next(), + ability2: ability_drain.next(), + ability3: ability_drain.next(), + block_ability: None, + dodge_ability: None, + }; + std::mem::swap(&mut loadout.active_item, &mut loadout.second_item); + loadout.active_item = Some(debug_item); + } + + // Remove will of the entity + ecs.write_storage::().remove(possesse); + // Reset controller of former shell + ecs.write_storage::() + .get_mut(possessor) + .map(|c| c.reset()); } } diff --git a/server/src/events/inventory_manip.rs b/server/src/events/inventory_manip.rs index e82f292385..a9b936da70 100644 --- a/server/src/events/inventory_manip.rs +++ b/server/src/events/inventory_manip.rs @@ -279,7 +279,7 @@ pub fn handle_inventory(server: &mut Server, entity: EcsEntity, manip: comp::Inv .insert(tameable_entity, comp::Alignment::Owned(uid)); // Add to group system - let mut clients = state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let uids = state.ecs().read_storage::(); let mut group_manager = state .ecs() @@ -294,14 +294,14 @@ pub fn handle_inventory(server: &mut Server, entity: EcsEntity, manip: comp::Inv &uids, &mut |entity, group_change| { clients - .get_mut(entity) + .get(entity) .and_then(|c| { group_change .try_map(|e| uids.get(e).copied()) .map(|g| (g, c)) }) .map(|(g, c)| { - c.send_msg(ServerGeneral::GroupUpdate(g)) + c.send(ServerGeneral::GroupUpdate(g)) }); }, ); diff --git a/server/src/events/player.rs b/server/src/events/player.rs index e466a5b116..e522fe28f0 100644 --- a/server/src/events/player.rs +++ b/server/src/events/player.rs @@ -1,11 +1,12 @@ use super::Event; use crate::{ - client::Client, login_provider::LoginProvider, persistence, state_ext::StateExt, Server, + client::Client, login_provider::LoginProvider, persistence, presence::Presence, + state_ext::StateExt, Server, }; use common::{ comp, comp::{group, Player}, - msg::{PlayerListUpdate, ServerGeneral}, + msg::{PlayerListUpdate, PresenceKind, ServerGeneral}, span, sync::{Uid, UidAllocator}, }; @@ -17,24 +18,28 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) { span!(_guard, "handle_exit_ingame"); let state = server.state_mut(); - // Create new entity with just `Client`, `Uid`, and `Player` components - // Easier than checking and removing all other known components + // Create new entity with just `Client`, `Uid`, `Player`, and `...Stream` + // components Easier than checking and removing all other known components // Note: If other `ServerEvent`s are referring to this entity they will be // disrupted - let maybe_client = state.ecs().write_storage::().remove(entity); - let maybe_uid = state.read_component_copied::(entity); - let maybe_player = state.ecs().write_storage::().remove(entity); - let maybe_admin = state.ecs().write_storage::().remove(entity); + let maybe_admin = state.ecs().write_storage::().remove(entity); let maybe_group = state .ecs() .write_storage::() .get(entity) .cloned(); - if let (Some(mut client), Some(uid), Some(player)) = (maybe_client, maybe_uid, maybe_player) { + + if let Some((client, uid, player)) = (|| { + let ecs = state.ecs(); + Some(( + ecs.write_storage::().remove(entity)?, + ecs.write_storage::().remove(entity)?, + ecs.write_storage::().remove(entity)?, + )) + })() { // Tell client its request was successful - client.in_game = None; - client.send_msg(ServerGeneral::ExitInGameSuccess); + client.send_fallible(ServerGeneral::ExitInGameSuccess); let entity_builder = state.ecs_mut().create_entity().with(client).with(player); @@ -127,9 +132,9 @@ pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event state.read_storage::().get(entity), state.read_storage::().get(entity), ) { - state.notify_registered_clients(comp::ChatType::Offline(*uid).server_msg("")); + state.notify_players(comp::ChatType::Offline(*uid).server_msg("")); - state.notify_registered_clients(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Remove( + state.notify_players(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Remove( *uid, ))); } @@ -141,8 +146,8 @@ pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event } // Sync the player's character data to the database - if let (Some(player), Some(stats), Some(inventory), Some(loadout), updater) = ( - state.read_storage::().get(entity), + if let (Some(presences), Some(stats), Some(inventory), Some(loadout), updater) = ( + state.read_storage::().get(entity), state.read_storage::().get(entity), state.read_storage::().get(entity), state.read_storage::().get(entity), @@ -150,7 +155,7 @@ pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event .ecs() .read_resource::(), ) { - if let Some(character_id) = player.character_id { + if let PresenceKind::Character(character_id) = presences.kind { updater.update(character_id, stats, inventory, loadout); } } diff --git a/server/src/lib.rs b/server/src/lib.rs index e3acfa2f02..8db6c0532f 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -17,6 +17,7 @@ pub mod input; pub mod login_provider; pub mod metrics; pub mod persistence; +pub mod presence; pub mod settings; pub mod state_ext; pub mod sys; @@ -34,11 +35,12 @@ pub use crate::{ use crate::{ alias_validator::AliasValidator, chunk_generator::ChunkGenerator, - client::{Client, RegionSubscription}, + client::Client, cmd::ChatCommandExt, connection_handler::ConnectionHandler, data_dir::DataDir, login_provider::LoginProvider, + presence::{Presence, RegionSubscription}, state_ext::StateExt, sys::sentinel::{DeletedEntities, TrackedComps}, }; @@ -163,7 +165,13 @@ impl Server { // System timers for performance monitoring state.ecs_mut().insert(sys::EntitySyncTimer::default()); - state.ecs_mut().insert(sys::MessageTimer::default()); + state.ecs_mut().insert(sys::GeneralMsgTimer::default()); + state.ecs_mut().insert(sys::PingMsgTimer::default()); + state.ecs_mut().insert(sys::RegisterMsgTimer::default()); + state + .ecs_mut() + .insert(sys::CharacterScreenMsgTimer::default()); + state.ecs_mut().insert(sys::InGameMsgTimer::default()); state.ecs_mut().insert(sys::SentinelTimer::default()); state.ecs_mut().insert(sys::SubscriptionTimer::default()); state.ecs_mut().insert(sys::TerrainSyncTimer::default()); @@ -180,6 +188,7 @@ impl Server { // Server-only components state.ecs_mut().register::(); state.ecs_mut().register::(); + state.ecs_mut().register::(); //Alias validator let banned_words_paths = &settings.banned_words_files; @@ -452,13 +461,18 @@ impl Server { let before_new_connections = Instant::now(); // 3) Handle inputs from clients - self.handle_new_connections(&mut frontend_events)?; + self.handle_new_connections(&mut frontend_events); let before_message_system = Instant::now(); // Run message receiving sys before the systems in common for decreased latency // (e.g. run before controller system) - sys::message::Sys.run_now(&self.state.ecs()); + //TODO: run in parallel + sys::msg::general::Sys.run_now(&self.state.ecs()); + sys::msg::register::Sys.run_now(&self.state.ecs()); + sys::msg::character_screen::Sys.run_now(&self.state.ecs()); + sys::msg::in_game::Sys.run_now(&self.state.ecs()); + sys::msg::ping::Sys.run_now(&self.state.ecs()); let before_state_tick = Instant::now(); @@ -607,7 +621,14 @@ impl Server { .ecs() .read_resource::() .nanos as i64; - let message_nanos = self.state.ecs().read_resource::().nanos as i64; + let message_nanos = { + let state = self.state.ecs(); + (state.read_resource::().nanos + + state.read_resource::().nanos + + state.read_resource::().nanos + + state.read_resource::().nanos + + state.read_resource::().nanos) as i64 + }; let sentinel_nanos = self.state.ecs().read_resource::().nanos as i64; let subscription_nanos = self .state @@ -793,8 +814,53 @@ impl Server { self.state.cleanup(); } + fn initialize_client( + &mut self, + client: crate::connection_handler::IncomingClient, + ) -> Result, Error> { + if self.settings().max_players <= self.state.ecs().read_storage::().join().count() { + trace!( + ?client.participant, + "to many players, wont allow participant to connect" + ); + client.send(ServerInit::TooManyPlayers)?; + return Ok(None); + } + + let entity = self + .state + .ecs_mut() + .create_entity_synced() + .with(client) + .build(); + self.state + .ecs() + .read_resource::() + .clients_connected + .inc(); + // Send client all the tracked components currently attached to its entity as + // well as synced resources (currently only `TimeOfDay`) + debug!("Starting initial sync with client."); + self.state + .ecs() + .read_storage::() + .get(entity) + .unwrap() + .send(ServerInit::GameSync { + // Send client their entity + entity_package: TrackedComps::fetch(&self.state.ecs()) + .create_entity_package(entity, None, None, None), + time_of_day: *self.state.ecs().read_resource(), + max_group_size: self.settings().max_player_group_size, + client_timeout: self.settings().client_timeout, + world_map: self.map.clone(), + recipe_book: (&*default_recipe_book()).clone(), + })?; + Ok(Some(entity)) + } + /// Handle new client connections. - fn handle_new_connections(&mut self, frontend_events: &mut Vec) -> Result<(), Error> { + fn handle_new_connections(&mut self, frontend_events: &mut Vec) { while let Ok(sender) = self.connection_handler.info_requester_receiver.try_recv() { // can fail, e.g. due to timeout or network prob. trace!("sending info to connection_handler"); @@ -804,69 +870,32 @@ impl Server { }); } - while let Ok(data) = self.connection_handler.client_receiver.try_recv() { - let mut client = data; - - if self.settings().max_players - <= self.state.ecs().read_storage::().join().count() - { - trace!( - ?client.participant, - "to many players, wont allow participant to connect" - ); - client.register_stream.send(ServerInit::TooManyPlayers)?; - continue; + while let Ok(incoming) = self.connection_handler.client_receiver.try_recv() { + match self.initialize_client(incoming) { + Ok(None) => (), + Ok(Some(entity)) => { + frontend_events.push(Event::ClientConnected { entity }); + debug!("Done initial sync with client."); + }, + Err(e) => { + debug!(?e, "failed initializing a new client"); + }, } - - let entity = self - .state - .ecs_mut() - .create_entity_synced() - .with(client) - .build(); - self.state - .ecs() - .read_resource::() - .clients_connected - .inc(); - // Send client all the tracked components currently attached to its entity as - // well as synced resources (currently only `TimeOfDay`) - debug!("Starting initial sync with client."); - self.state - .ecs() - .write_storage::() - .get_mut(entity) - .unwrap() - .register_stream - .send(ServerInit::GameSync { - // Send client their entity - entity_package: TrackedComps::fetch(&self.state.ecs()) - .create_entity_package(entity, None, None, None), - time_of_day: *self.state.ecs().read_resource(), - max_group_size: self.settings().max_player_group_size, - client_timeout: self.settings().client_timeout, - world_map: self.map.clone(), - recipe_book: (&*default_recipe_book()).clone(), - })?; - - frontend_events.push(Event::ClientConnected { entity }); - debug!("Done initial sync with client."); } - Ok(()) } pub fn notify_client(&self, entity: EcsEntity, msg: S) where S: Into, { - if let Some(client) = self.state.ecs().write_storage::().get_mut(entity) { - client.send_msg(msg.into()) - } + self.state + .ecs() + .read_storage::() + .get(entity) + .map(|c| c.send(msg)); } - pub fn notify_registered_clients(&mut self, msg: ServerGeneral) { - self.state.notify_registered_clients(msg); - } + pub fn notify_players(&mut self, msg: ServerGeneral) { self.state.notify_players(msg); } pub fn generate_chunk(&mut self, entity: EcsEntity, key: Vec2) { self.state @@ -944,7 +973,7 @@ impl Server { impl Drop for Server { fn drop(&mut self) { self.state - .notify_registered_clients(ServerGeneral::Disconnect(DisconnectReason::Shutdown)); + .notify_players(ServerGeneral::Disconnect(DisconnectReason::Shutdown)); } } diff --git a/server/src/presence.rs b/server/src/presence.rs new file mode 100644 index 0000000000..5cba90dd72 --- /dev/null +++ b/server/src/presence.rs @@ -0,0 +1,40 @@ +use common::msg::PresenceKind; +use hashbrown::HashSet; +use serde::{Deserialize, Serialize}; +use specs::{Component, FlaggedStorage}; +use specs_idvs::IdvStorage; +use vek::*; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Presence { + pub view_distance: u32, + pub kind: PresenceKind, +} + +impl Presence { + pub fn new(view_distance: u32, kind: PresenceKind) -> Self { + Self { + view_distance, + kind, + } + } +} + +impl Component for Presence { + type Storage = FlaggedStorage>; +} + +// Distance from fuzzy_chunk before snapping to current chunk +pub const CHUNK_FUZZ: u32 = 2; +// Distance out of the range of a region before removing it from subscriptions +pub const REGION_FUZZ: u32 = 16; + +#[derive(Clone, Debug)] +pub struct RegionSubscription { + pub fuzzy_chunk: Vec2, + pub regions: HashSet>, +} + +impl Component for RegionSubscription { + type Storage = FlaggedStorage>; +} diff --git a/server/src/state_ext.rs b/server/src/state_ext.rs index 06d7c523e3..46fe38fb12 100644 --- a/server/src/state_ext.rs +++ b/server/src/state_ext.rs @@ -1,11 +1,12 @@ use crate::{ - client::Client, persistence::PersistedComponents, sys::sentinel::DeletedEntities, SpawnPoint, + client::Client, persistence::PersistedComponents, presence::Presence, + sys::sentinel::DeletedEntities, SpawnPoint, }; use common::{ character::CharacterId, comp, effect::Effect, - msg::{CharacterInfo, ClientInGame, PlayerListUpdate, ServerGeneral, ServerMsg}, + msg::{CharacterInfo, PlayerListUpdate, PresenceKind, ServerGeneral}, state::State, sync::{Uid, UidAllocator, WorldSyncExt}, util::Dir, @@ -59,7 +60,7 @@ pub trait StateExt { fn update_character_data(&mut self, entity: EcsEntity, components: PersistedComponents); /// Iterates over registered clients and send each `ServerMsg` fn send_chat(&self, msg: comp::UnresolvedChatMsg); - fn notify_registered_clients(&self, msg: ServerGeneral); + fn notify_players(&self, msg: ServerGeneral); fn notify_in_game_clients(&self, msg: ServerGeneral); /// Delete an entity, recording the deletion in [`DeletedEntities`] fn delete_entity_recorded( @@ -208,22 +209,15 @@ impl StateExt for State { // Make sure physics components are updated self.write_component(entity, comp::ForceUpdate); - // Set the character id for the player - // TODO this results in a warning in the console: "Error modifying synced - // component, it doesn't seem to exist" - // It appears to be caused by the player not yet existing on the client at this - // point, despite being able to write the data on the server - self.ecs() - .write_storage::() - .get_mut(entity) - .map(|player| { - player.character_id = Some(character_id); - }); + const INITIAL_VD: u32 = 5; //will be changed after login + self.write_component( + entity, + Presence::new(INITIAL_VD, PresenceKind::Character(character_id)), + ); // Tell the client its request was successful. - if let Some(client) = self.ecs().write_storage::().get_mut(entity) { - client.in_game = Some(ClientInGame::Character); - client.send_msg(ServerGeneral::CharacterSuccess) + if let Some(client) = self.ecs().read_storage::().get(entity) { + client.send_fallible(ServerGeneral::CharacterSuccess); } } @@ -232,7 +226,7 @@ impl StateExt for State { if let Some(player_uid) = self.read_component_copied::(entity) { // Notify clients of a player list update - self.notify_registered_clients(ServerGeneral::PlayerListUpdate( + self.notify_players(ServerGeneral::PlayerListUpdate( PlayerListUpdate::SelectedCharacter(player_uid, CharacterInfo { name: String::from(&stats.name), level: stats.level.level(), @@ -277,30 +271,22 @@ impl StateExt for State { | comp::ChatType::Loot | comp::ChatType::Kill(_, _) | comp::ChatType::Meta - | comp::ChatType::World(_) => { - self.notify_registered_clients(ServerGeneral::ChatMsg(resolved_msg)) - }, + | comp::ChatType::World(_) => self.notify_players(ServerGeneral::ChatMsg(resolved_msg)), comp::ChatType::Online(u) => { - for (client, uid) in ( - &mut ecs.write_storage::(), - &ecs.read_storage::(), - ) - .join() + for (client, uid) in + (&ecs.read_storage::(), &ecs.read_storage::()).join() { if uid != u { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, comp::ChatType::Tell(u, t) => { - for (client, uid) in ( - &mut ecs.write_storage::(), - &ecs.read_storage::(), - ) - .join() + for (client, uid) in + (&ecs.read_storage::(), &ecs.read_storage::()).join() { if uid == u || uid == t { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, @@ -310,9 +296,9 @@ impl StateExt for State { (*ecs.read_resource::()).retrieve_entity_internal(uid.0); let positions = ecs.read_storage::(); if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) { - for (client, pos) in (&mut ecs.write_storage::(), &positions).join() { + for (client, pos) in (&ecs.read_storage::(), &positions).join() { if is_within(comp::ChatMsg::SAY_DISTANCE, pos, speaker_pos) { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } @@ -322,9 +308,9 @@ impl StateExt for State { (*ecs.read_resource::()).retrieve_entity_internal(uid.0); let positions = ecs.read_storage::(); if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) { - for (client, pos) in (&mut ecs.write_storage::(), &positions).join() { + for (client, pos) in (&ecs.read_storage::(), &positions).join() { if is_within(comp::ChatMsg::REGION_DISTANCE, pos, speaker_pos) { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } @@ -334,9 +320,9 @@ impl StateExt for State { (*ecs.read_resource::()).retrieve_entity_internal(uid.0); let positions = ecs.read_storage::(); if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) { - for (client, pos) in (&mut ecs.write_storage::(), &positions).join() { + for (client, pos) in (&ecs.read_storage::(), &positions).join() { if is_within(comp::ChatMsg::NPC_DISTANCE, pos, speaker_pos) { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } @@ -344,25 +330,25 @@ impl StateExt for State { comp::ChatType::FactionMeta(s) | comp::ChatType::Faction(_, s) => { for (client, faction) in ( - &mut ecs.write_storage::(), + &ecs.read_storage::(), &ecs.read_storage::(), ) .join() { if s == &faction.0 { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, comp::ChatType::GroupMeta(g) | comp::ChatType::Group(_, g) => { for (client, group) in ( - &mut ecs.write_storage::(), + &ecs.read_storage::(), &ecs.read_storage::(), ) .join() { if g == group { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, @@ -370,24 +356,36 @@ impl StateExt for State { } /// Sends the message to all connected clients - fn notify_registered_clients(&self, msg: ServerGeneral) { - let msg: ServerMsg = msg.into(); - for client in (&mut self.ecs().write_storage::()) + fn notify_players(&self, msg: ServerGeneral) { + let mut msg = Some(msg); + let mut lazy_msg = None; + for (client, _) in ( + &self.ecs().read_storage::(), + &self.ecs().read_storage::(), + ) .join() - .filter(|c| c.registered) { - client.send_msg(msg.clone()); + if lazy_msg.is_none() { + lazy_msg = Some(client.prepare(msg.take().unwrap())); + } + lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); } } /// Sends the message to all clients playing in game fn notify_in_game_clients(&self, msg: ServerGeneral) { - let msg: ServerMsg = msg.into(); - for client in (&mut self.ecs().write_storage::()) + let mut msg = Some(msg); + let mut lazy_msg = None; + for (client, _) in ( + &mut self.ecs().write_storage::(), + &self.ecs().read_storage::(), + ) .join() - .filter(|c| c.in_game.is_some()) { - client.send_msg(msg.clone()); + if lazy_msg.is_none() { + lazy_msg = Some(client.prepare(msg.take().unwrap())); + } + lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); } } @@ -397,7 +395,7 @@ impl StateExt for State { ) -> Result<(), specs::error::WrongGeneration> { // Remove entity from a group if they are in one { - let mut clients = self.ecs().write_storage::(); + let clients = self.ecs().read_storage::(); let uids = self.ecs().read_storage::(); let mut group_manager = self.ecs().write_resource::(); group_manager.entity_deleted( @@ -408,13 +406,13 @@ impl StateExt for State { &self.ecs().entities(), &mut |entity, group_change| { clients - .get_mut(entity) + .get(entity) .and_then(|c| { group_change .try_map(|e| uids.get(e).copied()) .map(|g| (g, c)) }) - .map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g))); + .map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g))); }, ); } diff --git a/server/src/sys/entity_sync.rs b/server/src/sys/entity_sync.rs index 0442a04e8e..fffa97871e 100644 --- a/server/src/sys/entity_sync.rs +++ b/server/src/sys/entity_sync.rs @@ -3,11 +3,12 @@ use super::{ SysTimer, }; use crate::{ - client::{Client, RegionSubscription}, + client::Client, + presence::{Presence, RegionSubscription}, Tick, }; use common::{ - comp::{ForceUpdate, Inventory, InventoryUpdate, Last, Ori, Player, Pos, Vel}, + comp::{ForceUpdate, Inventory, InventoryUpdate, Last, Ori, Pos, Vel}, msg::ServerGeneral, outcome::Outcome, region::{Event as RegionEvent, RegionMap}, @@ -38,11 +39,11 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Ori>, ReadStorage<'a, Inventory>, ReadStorage<'a, RegionSubscription>, - ReadStorage<'a, Player>, + ReadStorage<'a, Presence>, WriteStorage<'a, Last>, WriteStorage<'a, Last>, WriteStorage<'a, Last>, - WriteStorage<'a, Client>, + ReadStorage<'a, Client>, WriteStorage<'a, ForceUpdate>, WriteStorage<'a, InventoryUpdate>, Write<'a, DeletedEntities>, @@ -65,11 +66,11 @@ impl<'a> System<'a> for Sys { orientations, inventories, subscriptions, - players, + presences, mut last_pos, mut last_vel, mut last_ori, - mut clients, + clients, mut force_updates, mut inventory_updates, mut deleted_entities, @@ -104,10 +105,16 @@ impl<'a> System<'a> for Sys { for (key, region) in region_map.iter() { // Assemble subscriber list for this region by iterating through clients and // checking if they are subscribed to this region - let mut subscribers = (&mut clients, &entities, &subscriptions, &positions) + let mut subscribers = ( + &clients, + &entities, + presences.maybe(), + &subscriptions, + &positions, + ) .join() - .filter_map(|(client, entity, subscription, pos)| { - if client.in_game.is_some() && subscription.regions.contains(&key) { + .filter_map(|(client, entity, presence, subscription, pos)| { + if presence.is_some() && subscription.regions.contains(&key) { Some((client, &subscription.regions, entity, *pos)) } else { None @@ -143,7 +150,7 @@ impl<'a> System<'a> for Sys { // Client doesn't need to know about itself && *client_entity != entity { - client.send_msg(create_msg.clone()); + client.send_fallible(create_msg.clone()); } } } @@ -157,7 +164,7 @@ impl<'a> System<'a> for Sys { .map(|key| !regions.contains(key)) .unwrap_or(true) { - client.send_msg(ServerGeneral::DeleteEntity(uid)); + client.send_fallible(ServerGeneral::DeleteEntity(uid)); } } } @@ -174,18 +181,32 @@ impl<'a> System<'a> for Sys { .take_deleted_in_region(key) .unwrap_or_default(), ); - let entity_sync_msg = ServerGeneral::EntitySync(entity_sync_package); - let comp_sync_msg = ServerGeneral::CompSync(comp_sync_package); + let mut entity_sync_package = Some(entity_sync_package); + let mut comp_sync_package = Some(comp_sync_package); + let mut entity_sync_lazymsg = None; + let mut comp_sync_lazymsg = None; subscribers.iter_mut().for_each(move |(client, _, _, _)| { - client.send_msg(entity_sync_msg.clone()); - client.send_msg(comp_sync_msg.clone()); + if entity_sync_lazymsg.is_none() { + entity_sync_lazymsg = Some(client.prepare(ServerGeneral::EntitySync( + entity_sync_package.take().unwrap(), + ))); + comp_sync_lazymsg = Some( + client.prepare(ServerGeneral::CompSync(comp_sync_package.take().unwrap())), + ); + } + entity_sync_lazymsg + .as_ref() + .map(|msg| client.send_prepared(&msg)); + comp_sync_lazymsg + .as_ref() + .map(|msg| client.send_prepared(&msg)); }); - let mut send_msg = |msg: ServerGeneral, - entity: EcsEntity, - pos: Pos, - force_update: Option<&ForceUpdate>, - throttle: bool| { + let mut send_general = |msg: ServerGeneral, + entity: EcsEntity, + pos: Pos, + force_update: Option<&ForceUpdate>, + throttle: bool| { for (client, _, client_entity, client_pos) in &mut subscribers { if if client_entity == &entity { // Don't send client physics updates about itself unless force update is set @@ -212,7 +233,7 @@ impl<'a> System<'a> for Sys { true // Closer than 100 blocks } } { - client.send_msg(msg.clone()); + client.send_fallible(msg.clone()); } } }; @@ -286,7 +307,7 @@ impl<'a> System<'a> for Sys { comp_sync_package.comp_removed::(uid); } - send_msg( + send_general( ServerGeneral::CompSync(comp_sync_package), entity, pos, @@ -299,19 +320,18 @@ impl<'a> System<'a> for Sys { // Handle entity deletion in regions that don't exist in RegionMap // (theoretically none) for (region_key, deleted) in deleted_entities.take_remaining_deleted() { - for client in - (&mut clients, &subscriptions) - .join() - .filter_map(|(client, subscription)| { - if client.in_game.is_some() && subscription.regions.contains(®ion_key) { - Some(client) - } else { - None - } - }) + for client in (presences.maybe(), &subscriptions, &clients) + .join() + .filter_map(|(presence, subscription, client)| { + if presence.is_some() && subscription.regions.contains(®ion_key) { + Some(client) + } else { + None + } + }) { for uid in &deleted { - client.send_msg(ServerGeneral::DeleteEntity(Uid(*uid))); + client.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid))); } } } @@ -319,19 +339,20 @@ impl<'a> System<'a> for Sys { // TODO: Sync clients that don't have a position? // Sync inventories - for (client, inventory, update) in (&mut clients, &inventories, &inventory_updates).join() { - client.send_msg(ServerGeneral::InventoryUpdate( + for (inventory, update, client) in (&inventories, &inventory_updates, &clients).join() { + client.send_fallible(ServerGeneral::InventoryUpdate( inventory.clone(), update.event(), )); } // Sync outcomes - for (client, player, pos) in (&mut clients, &players, positions.maybe()).join() { + for (presence, pos, client) in (presences.maybe(), positions.maybe(), &clients).join() { let is_near = |o_pos: Vec3| { - pos.zip_with(player.view_distance, |pos, vd| { + pos.zip_with(presence, |pos, presence| { pos.0.xy().distance_squared(o_pos.xy()) - < (vd as f32 * TerrainChunkSize::RECT_SIZE.x as f32).powf(2.0) + < (presence.view_distance as f32 * TerrainChunkSize::RECT_SIZE.x as f32) + .powf(2.0) }) }; @@ -341,7 +362,7 @@ impl<'a> System<'a> for Sys { .cloned() .collect::>(); if !outcomes.is_empty() { - client.send_msg(ServerGeneral::Outcomes(outcomes)); + client.send_fallible(ServerGeneral::Outcomes(outcomes)); } } outcomes.clear(); @@ -353,9 +374,12 @@ impl<'a> System<'a> for Sys { // Sync resources // TODO: doesn't really belong in this system (rename system or create another // system?) - let tof_msg = ServerGeneral::TimeOfDay(*time_of_day); - for client in (&mut clients).join() { - client.send_msg(tof_msg.clone()); + let mut tof_lazymsg = None; + for client in (&clients).join() { + if tof_lazymsg.is_none() { + tof_lazymsg = Some(client.prepare(ServerGeneral::TimeOfDay(*time_of_day))); + } + tof_lazymsg.as_ref().map(|msg| client.send_prepared(&msg)); } timer.end(); diff --git a/server/src/sys/invite_timeout.rs b/server/src/sys/invite_timeout.rs index 42f6603fe8..27819c317f 100644 --- a/server/src/sys/invite_timeout.rs +++ b/server/src/sys/invite_timeout.rs @@ -16,14 +16,14 @@ impl<'a> System<'a> for Sys { Entities<'a>, WriteStorage<'a, Invite>, WriteStorage<'a, PendingInvites>, - WriteStorage<'a, Client>, + ReadStorage<'a, Client>, ReadStorage<'a, Uid>, Write<'a, SysTimer>, ); fn run( &mut self, - (entities, mut invites, mut pending_invites, mut clients, uids, mut timer): Self::SystemData, + (entities, mut invites, mut pending_invites, clients, uids, mut timer): Self::SystemData, ) { span!(_guard, "run", "invite_timeout::Sys::run"); timer.start(); @@ -52,12 +52,12 @@ impl<'a> System<'a> for Sys { // Inform inviter of timeout if let (Some(client), Some(target)) = - (clients.get_mut(*inviter), uids.get(invitee).copied()) + (clients.get(*inviter), uids.get(invitee).copied()) { - client.send_msg(ServerGeneral::InviteComplete { + client.send_fallible(ServerGeneral::InviteComplete { target, answer: InviteAnswer::TimedOut, - }) + }); } Some(invitee) diff --git a/server/src/sys/message.rs b/server/src/sys/message.rs deleted file mode 100644 index 9a9dfc92d3..0000000000 --- a/server/src/sys/message.rs +++ /dev/null @@ -1,737 +0,0 @@ -use super::SysTimer; -use crate::{ - alias_validator::AliasValidator, - character_creator, - client::Client, - login_provider::LoginProvider, - metrics::{NetworkRequestMetrics, PlayerMetrics}, - persistence::character_loader::CharacterLoader, - EditableSettings, Settings, -}; -use common::{ - comp::{ - Admin, CanBuild, ChatMode, ChatType, ControlEvent, Controller, ForceUpdate, Ori, Player, - Pos, Stats, UnresolvedChatMsg, Vel, - }, - event::{EventBus, ServerEvent}, - msg::{ - validate_chat_msg, CharacterInfo, ChatMsgValidationError, ClientGeneral, ClientInGame, - ClientRegister, DisconnectReason, PingMsg, PlayerInfo, PlayerListUpdate, RegisterError, - ServerGeneral, ServerRegisterAnswer, MAX_BYTES_CHAT_MSG, - }, - span, - state::{BlockChange, Time}, - sync::Uid, - terrain::{TerrainChunkSize, TerrainGrid}, - vol::{ReadVol, RectVolSize}, -}; -use futures_executor::block_on; -use futures_timer::Delay; -use futures_util::{select, FutureExt}; -use hashbrown::HashMap; -use specs::{ - Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage, -}; -use tracing::{debug, error, info, trace, warn}; - -impl Sys { - #[allow(clippy::too_many_arguments)] - fn handle_client_msg( - server_emitter: &mut common::event::Emitter<'_, ServerEvent>, - new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, - entity: specs::Entity, - client: &mut Client, - player_metrics: &ReadExpect<'_, PlayerMetrics>, - uids: &ReadStorage<'_, Uid>, - chat_modes: &ReadStorage<'_, ChatMode>, - msg: ClientGeneral, - ) -> Result<(), crate::error::Error> { - match msg { - ClientGeneral::ChatMsg(message) => { - if client.registered { - match validate_chat_msg(&message) { - Ok(()) => { - if let Some(from) = uids.get(entity) { - let mode = chat_modes.get(entity).cloned().unwrap_or_default(); - let msg = mode.new_message(*from, message); - new_chat_msgs.push((Some(entity), msg)); - } else { - error!("Could not send message. Missing player uid"); - } - }, - Err(ChatMsgValidationError::TooLong) => { - let max = MAX_BYTES_CHAT_MSG; - let len = message.len(); - warn!(?len, ?max, "Received a chat message that's too long") - }, - } - } - }, - ClientGeneral::Disconnect => { - client.send_msg(ServerGeneral::Disconnect(DisconnectReason::Requested)); - }, - ClientGeneral::Terminate => { - debug!(?entity, "Client send message to termitate session"); - player_metrics - .clients_disconnected - .with_label_values(&["gracefully"]) - .inc(); - server_emitter.emit(ServerEvent::ClientDisconnect(entity)); - }, - _ => unreachable!("not a client_general msg"), - } - Ok(()) - } - - #[allow(clippy::too_many_arguments)] - fn handle_client_in_game_msg( - server_emitter: &mut common::event::Emitter<'_, ServerEvent>, - entity: specs::Entity, - client: &mut Client, - terrain: &ReadExpect<'_, TerrainGrid>, - network_metrics: &ReadExpect<'_, NetworkRequestMetrics>, - can_build: &ReadStorage<'_, CanBuild>, - force_updates: &ReadStorage<'_, ForceUpdate>, - stats: &mut WriteStorage<'_, Stats>, - block_changes: &mut Write<'_, BlockChange>, - positions: &mut WriteStorage<'_, Pos>, - velocities: &mut WriteStorage<'_, Vel>, - orientations: &mut WriteStorage<'_, Ori>, - players: &mut WriteStorage<'_, Player>, - controllers: &mut WriteStorage<'_, Controller>, - settings: &Read<'_, Settings>, - msg: ClientGeneral, - ) -> Result<(), crate::error::Error> { - if client.in_game.is_none() { - debug!(?entity, "client is not in_game, ignoring msg"); - trace!(?msg, "ignored msg content"); - if matches!(msg, ClientGeneral::TerrainChunkRequest{ .. }) { - network_metrics.chunks_request_dropped.inc(); - } - return Ok(()); - } - match msg { - // Go back to registered state (char selection screen) - ClientGeneral::ExitInGame => { - client.in_game = None; - server_emitter.emit(ServerEvent::ExitIngame { entity }); - client.send_msg(ServerGeneral::ExitInGameSuccess); - }, - ClientGeneral::SetViewDistance(view_distance) => { - players.get_mut(entity).map(|player| { - player.view_distance = Some( - settings - .max_view_distance - .map(|max| view_distance.min(max)) - .unwrap_or(view_distance), - ) - }); - - //correct client if its VD is to high - if settings - .max_view_distance - .map(|max| view_distance > max) - .unwrap_or(false) - { - client.send_msg(ServerGeneral::SetViewDistance( - settings.max_view_distance.unwrap_or(0), - )); - } - }, - ClientGeneral::ControllerInputs(inputs) => { - if let Some(ClientInGame::Character) = client.in_game { - if let Some(controller) = controllers.get_mut(entity) { - controller.inputs.update_with_new(inputs); - } - } - }, - ClientGeneral::ControlEvent(event) => { - if let Some(ClientInGame::Character) = client.in_game { - // Skip respawn if client entity is alive - if let ControlEvent::Respawn = event { - if stats.get(entity).map_or(true, |s| !s.is_dead) { - //Todo: comment why return! - return Ok(()); - } - } - if let Some(controller) = controllers.get_mut(entity) { - controller.events.push(event); - } - } - }, - ClientGeneral::ControlAction(event) => { - if let Some(ClientInGame::Character) = client.in_game { - if let Some(controller) = controllers.get_mut(entity) { - controller.actions.push(event); - } - } - }, - ClientGeneral::PlayerPhysics { pos, vel, ori } => { - if let Some(ClientInGame::Character) = client.in_game { - if force_updates.get(entity).is_none() - && stats.get(entity).map_or(true, |s| !s.is_dead) - { - let _ = positions.insert(entity, pos); - let _ = velocities.insert(entity, vel); - let _ = orientations.insert(entity, ori); - } - } - }, - ClientGeneral::BreakBlock(pos) => { - if let Some(block) = can_build.get(entity).and_then(|_| terrain.get(pos).ok()) { - block_changes.set(pos, block.into_vacant()); - } - }, - ClientGeneral::PlaceBlock(pos, block) => { - if can_build.get(entity).is_some() { - block_changes.try_set(pos, block); - } - }, - ClientGeneral::TerrainChunkRequest { key } => { - let in_vd = if let (Some(view_distance), Some(pos)) = ( - players.get(entity).and_then(|p| p.view_distance), - positions.get(entity), - ) { - pos.0.xy().map(|e| e as f64).distance( - key.map(|e| e as f64 + 0.5) * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), - ) < (view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt()) - * TerrainChunkSize::RECT_SIZE.x as f64 - } else { - true - }; - if in_vd { - match terrain.get_key(key) { - Some(chunk) => { - network_metrics.chunks_served_from_memory.inc(); - client.send_msg(ServerGeneral::TerrainChunkUpdate { - key, - chunk: Ok(Box::new(chunk.clone())), - }) - }, - None => { - network_metrics.chunks_generation_triggered.inc(); - server_emitter.emit(ServerEvent::ChunkRequest(entity, key)) - }, - } - } else { - network_metrics.chunks_request_dropped.inc(); - } - }, - ClientGeneral::UnlockSkill(skill) => { - stats - .get_mut(entity) - .map(|s| s.skill_set.unlock_skill(skill)); - }, - ClientGeneral::RefundSkill(skill) => { - stats - .get_mut(entity) - .map(|s| s.skill_set.refund_skill(skill)); - }, - ClientGeneral::UnlockSkillGroup(skill_group_type) => { - stats - .get_mut(entity) - .map(|s| s.skill_set.unlock_skill_group(skill_group_type)); - }, - _ => unreachable!("not a client_in_game msg"), - } - Ok(()) - } - - #[allow(clippy::too_many_arguments)] - fn handle_client_character_screen_msg( - server_emitter: &mut common::event::Emitter<'_, ServerEvent>, - new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, - entity: specs::Entity, - client: &mut Client, - character_loader: &ReadExpect<'_, CharacterLoader>, - uids: &ReadStorage<'_, Uid>, - players: &mut WriteStorage<'_, Player>, - editable_settings: &ReadExpect<'_, EditableSettings>, - alias_validator: &ReadExpect<'_, AliasValidator>, - msg: ClientGeneral, - ) -> Result<(), crate::error::Error> { - match msg { - // Request spectator state - ClientGeneral::Spectate if client.registered => { - client.in_game = Some(ClientInGame::Spectator) - }, - ClientGeneral::Spectate => debug!("dropped Spectate msg from unregistered client"), - ClientGeneral::Character(character_id) - if client.registered && client.in_game.is_none() => - { - if let Some(player) = players.get(entity) { - // Send a request to load the character's component data from the - // DB. Once loaded, persisted components such as stats and inventory - // will be inserted for the entity - character_loader.load_character_data( - entity, - player.uuid().to_string(), - character_id, - ); - - // Start inserting non-persisted/default components for the entity - // while we load the DB data - server_emitter.emit(ServerEvent::InitCharacterData { - entity, - character_id, - }); - - // Give the player a welcome message - if !editable_settings.server_description.is_empty() { - client.send_msg( - ChatType::CommandInfo - .server_msg(String::from(&*editable_settings.server_description)), - ); - } - - if !client.login_msg_sent { - if let Some(player_uid) = uids.get(entity) { - new_chat_msgs.push((None, UnresolvedChatMsg { - chat_type: ChatType::Online(*player_uid), - message: "".to_string(), - })); - - client.login_msg_sent = true; - } - } - } else { - client.send_msg(ServerGeneral::CharacterDataLoadError(String::from( - "Failed to fetch player entity", - ))) - } - } - ClientGeneral::Character(_) => { - let registered = client.registered; - let in_game = client.in_game; - debug!(?registered, ?in_game, "dropped Character msg from client"); - }, - ClientGeneral::RequestCharacterList => { - if let Some(player) = players.get(entity) { - character_loader.load_character_list(entity, player.uuid().to_string()) - } - }, - ClientGeneral::CreateCharacter { alias, tool, body } => { - if let Err(error) = alias_validator.validate(&alias) { - debug!(?error, ?alias, "denied alias as it contained a banned word"); - client.send_msg(ServerGeneral::CharacterActionError(error.to_string())); - } else if let Some(player) = players.get(entity) { - character_creator::create_character( - entity, - player.uuid().to_string(), - alias, - tool, - body, - character_loader, - ); - } - }, - ClientGeneral::DeleteCharacter(character_id) => { - if let Some(player) = players.get(entity) { - character_loader.delete_character( - entity, - player.uuid().to_string(), - character_id, - ); - } - }, - _ => unreachable!("not a client_character_screen msg"), - } - Ok(()) - } - - #[allow(clippy::too_many_arguments)] - fn handle_ping_msg(client: &mut Client, msg: PingMsg) -> Result<(), crate::error::Error> { - match msg { - PingMsg::Ping => client.send_msg(PingMsg::Pong), - PingMsg::Pong => {}, - } - Ok(()) - } - - #[allow(clippy::too_many_arguments)] - fn handle_register_msg( - player_list: &HashMap, - new_players: &mut Vec, - entity: specs::Entity, - client: &mut Client, - player_metrics: &ReadExpect<'_, PlayerMetrics>, - login_provider: &mut WriteExpect<'_, LoginProvider>, - admins: &mut WriteStorage<'_, Admin>, - players: &mut WriteStorage<'_, Player>, - editable_settings: &ReadExpect<'_, EditableSettings>, - msg: ClientRegister, - ) -> Result<(), crate::error::Error> { - let (username, uuid) = match login_provider.try_login( - &msg.token_or_username, - &*editable_settings.admins, - &*editable_settings.whitelist, - &*editable_settings.banlist, - ) { - Err(err) => { - client - .register_stream - .send(ServerRegisterAnswer::Err(err))?; - return Ok(()); - }, - Ok((username, uuid)) => (username, uuid), - }; - - const INITIAL_VD: Option = Some(5); //will be changed after login - let player = Player::new(username, None, INITIAL_VD, uuid); - let is_admin = editable_settings.admins.contains(&uuid); - - if !player.is_valid() { - // Invalid player - client - .register_stream - .send(ServerRegisterAnswer::Err(RegisterError::InvalidCharacter))?; - return Ok(()); - } - - if !client.registered && client.in_game.is_none() { - // Add Player component to this client - let _ = players.insert(entity, player); - player_metrics.players_connected.inc(); - - // Give the Admin component to the player if their name exists in - // admin list - if is_admin { - let _ = admins.insert(entity, Admin); - } - - // Tell the client its request was successful. - client.registered = true; - client.register_stream.send(ServerRegisterAnswer::Ok(()))?; - - // Send initial player list - client.send_msg(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init( - player_list.clone(), - ))); - - // Add to list to notify all clients of the new player - new_players.push(entity); - } - Ok(()) - } - - ///We needed to move this to a async fn, if we would use a async closures - /// the compiler generates to much recursion and fails to compile this - #[allow(clippy::too_many_arguments)] - async fn handle_messages( - server_emitter: &mut common::event::Emitter<'_, ServerEvent>, - new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, - player_list: &HashMap, - new_players: &mut Vec, - entity: specs::Entity, - client: &mut Client, - cnt: &mut u64, - character_loader: &ReadExpect<'_, CharacterLoader>, - terrain: &ReadExpect<'_, TerrainGrid>, - network_metrics: &ReadExpect<'_, NetworkRequestMetrics>, - player_metrics: &ReadExpect<'_, PlayerMetrics>, - uids: &ReadStorage<'_, Uid>, - can_build: &ReadStorage<'_, CanBuild>, - force_updates: &ReadStorage<'_, ForceUpdate>, - stats: &mut WriteStorage<'_, Stats>, - chat_modes: &ReadStorage<'_, ChatMode>, - login_provider: &mut WriteExpect<'_, LoginProvider>, - block_changes: &mut Write<'_, BlockChange>, - admins: &mut WriteStorage<'_, Admin>, - positions: &mut WriteStorage<'_, Pos>, - velocities: &mut WriteStorage<'_, Vel>, - orientations: &mut WriteStorage<'_, Ori>, - players: &mut WriteStorage<'_, Player>, - controllers: &mut WriteStorage<'_, Controller>, - settings: &Read<'_, Settings>, - editable_settings: &ReadExpect<'_, EditableSettings>, - alias_validator: &ReadExpect<'_, AliasValidator>, - ) -> Result<(), crate::error::Error> { - let (mut b1, mut b2, mut b3, mut b4, mut b5) = ( - client.network_error, - client.network_error, - client.network_error, - client.network_error, - client.network_error, - ); - loop { - /* - waiting for 1 of the 5 streams to return a massage asynchronous. - If so, handle that msg type. This code will be refactored soon - */ - - let q1 = Client::internal_recv(&mut b1, &mut client.general_stream); - let q2 = Client::internal_recv(&mut b2, &mut client.in_game_stream); - let q3 = Client::internal_recv(&mut b3, &mut client.character_screen_stream); - let q4 = Client::internal_recv(&mut b4, &mut client.ping_stream); - let q5 = Client::internal_recv(&mut b5, &mut client.register_stream); - - let (m1, m2, m3, m4, m5) = select!( - msg = q1.fuse() => (Some(msg), None, None, None, None), - msg = q2.fuse() => (None, Some(msg), None, None, None), - msg = q3.fuse() => (None, None, Some(msg), None, None), - msg = q4.fuse() => (None, None, None, Some(msg), None), - msg = q5.fuse() => (None, None, None, None,Some(msg)), - ); - *cnt += 1; - if let Some(msg) = m1 { - client.network_error |= b1; - Self::handle_client_msg( - server_emitter, - new_chat_msgs, - entity, - client, - player_metrics, - uids, - chat_modes, - msg?, - )?; - } - if let Some(msg) = m2 { - client.network_error |= b2; - Self::handle_client_in_game_msg( - server_emitter, - entity, - client, - terrain, - network_metrics, - can_build, - force_updates, - stats, - block_changes, - positions, - velocities, - orientations, - players, - controllers, - settings, - msg?, - )?; - } - if let Some(msg) = m3 { - client.network_error |= b3; - Self::handle_client_character_screen_msg( - server_emitter, - new_chat_msgs, - entity, - client, - character_loader, - uids, - players, - editable_settings, - alias_validator, - msg?, - )?; - } - if let Some(msg) = m4 { - client.network_error |= b4; - Self::handle_ping_msg(client, msg?)?; - } - if let Some(msg) = m5 { - client.network_error |= b5; - Self::handle_register_msg( - player_list, - new_players, - entity, - client, - player_metrics, - login_provider, - admins, - players, - editable_settings, - msg?, - )?; - } - } - } -} - -/// This system will handle new messages from clients -pub struct Sys; -impl<'a> System<'a> for Sys { - #[allow(clippy::type_complexity)] // TODO: Pending review in #587 - type SystemData = ( - Entities<'a>, - Read<'a, EventBus>, - Read<'a, Time>, - ReadExpect<'a, CharacterLoader>, - ReadExpect<'a, TerrainGrid>, - ReadExpect<'a, NetworkRequestMetrics>, - ReadExpect<'a, PlayerMetrics>, - Write<'a, SysTimer>, - ReadStorage<'a, Uid>, - ReadStorage<'a, CanBuild>, - ReadStorage<'a, ForceUpdate>, - WriteStorage<'a, Stats>, - ReadStorage<'a, ChatMode>, - WriteExpect<'a, LoginProvider>, - Write<'a, BlockChange>, - WriteStorage<'a, Admin>, - WriteStorage<'a, Pos>, - WriteStorage<'a, Vel>, - WriteStorage<'a, Ori>, - WriteStorage<'a, Player>, - WriteStorage<'a, Client>, - WriteStorage<'a, Controller>, - Read<'a, Settings>, - ReadExpect<'a, EditableSettings>, - ReadExpect<'a, AliasValidator>, - ); - - #[allow(clippy::match_ref_pats)] // TODO: Pending review in #587 - #[allow(clippy::single_char_pattern)] // TODO: Pending review in #587 - #[allow(clippy::single_match)] // TODO: Pending review in #587 - fn run( - &mut self, - ( - entities, - server_event_bus, - time, - character_loader, - terrain, - network_metrics, - player_metrics, - mut timer, - uids, - can_build, - force_updates, - mut stats, - chat_modes, - mut accounts, - mut block_changes, - mut admins, - mut positions, - mut velocities, - mut orientations, - mut players, - mut clients, - mut controllers, - settings, - editable_settings, - alias_validator, - ): Self::SystemData, - ) { - span!(_guard, "run", "message::Sys::run"); - timer.start(); - - let mut server_emitter = server_event_bus.emitter(); - - let mut new_chat_msgs = Vec::new(); - - // Player list to send new players. - let player_list = (&uids, &players, stats.maybe(), admins.maybe()) - .join() - .map(|(uid, player, stats, admin)| { - (*uid, PlayerInfo { - is_online: true, - is_admin: admin.is_some(), - player_alias: player.alias.clone(), - character: stats.map(|stats| CharacterInfo { - name: stats.name.clone(), - level: stats.level.level(), - }), - }) - }) - .collect::>(); - // List of new players to update player lists of all clients. - let mut new_players = Vec::new(); - - for (entity, client) in (&entities, &mut clients).join() { - let mut cnt = 0; - - let network_err: Result<(), crate::error::Error> = block_on(async { - //TIMEOUT 0.02 ms for msg handling - let work_future = Self::handle_messages( - &mut server_emitter, - &mut new_chat_msgs, - &player_list, - &mut new_players, - entity, - client, - &mut cnt, - &character_loader, - &terrain, - &network_metrics, - &player_metrics, - &uids, - &can_build, - &force_updates, - &mut stats, - &chat_modes, - &mut accounts, - &mut block_changes, - &mut admins, - &mut positions, - &mut velocities, - &mut orientations, - &mut players, - &mut controllers, - &settings, - &editable_settings, - &alias_validator, - ); - select!( - _ = Delay::new(std::time::Duration::from_micros(20)).fuse() => Ok(()), - err = work_future.fuse() => err, - ) - }); - - // Network error - if network_err.is_err() { - debug!(?entity, "postbox error with client, disconnecting"); - player_metrics - .clients_disconnected - .with_label_values(&["network_error"]) - .inc(); - server_emitter.emit(ServerEvent::ClientDisconnect(entity)); - } else if cnt > 0 { - // Update client ping. - client.last_ping = time.0 - } else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 - // Timeout - { - info!(?entity, "timeout error with client, disconnecting"); - player_metrics - .clients_disconnected - .with_label_values(&["timeout"]) - .inc(); - server_emitter.emit(ServerEvent::ClientDisconnect(entity)); - } else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 * 0.5 { - // Try pinging the client if the timeout is nearing. - client.send_msg(PingMsg::Ping); - } - } - - // Handle new players. - // Tell all clients to add them to the player list. - for entity in new_players { - if let (Some(uid), Some(player)) = (uids.get(entity), players.get(entity)) { - let msg = - ServerGeneral::PlayerListUpdate(PlayerListUpdate::Add(*uid, PlayerInfo { - player_alias: player.alias.clone(), - is_online: true, - is_admin: admins.get(entity).is_some(), - character: None, // new players will be on character select. - })); - for client in (&mut clients).join().filter(|c| c.registered) { - client.send_msg(msg.clone()) - } - } - } - - // Handle new chat messages. - for (entity, msg) in new_chat_msgs { - // Handle chat commands. - if msg.message.starts_with("/") { - if let (Some(entity), true) = (entity, msg.message.len() > 1) { - let argv = String::from(&msg.message[1..]); - server_emitter.emit(ServerEvent::ChatCmd(entity, argv)); - } - } else { - // Send chat message - server_emitter.emit(ServerEvent::Chat(msg)); - } - } - - timer.end() - } -} diff --git a/server/src/sys/mod.rs b/server/src/sys/mod.rs index 6b98fc9edb..7e34ea933b 100644 --- a/server/src/sys/mod.rs +++ b/server/src/sys/mod.rs @@ -1,6 +1,6 @@ pub mod entity_sync; pub mod invite_timeout; -pub mod message; +pub mod msg; pub mod object; pub mod persistence; pub mod sentinel; @@ -16,7 +16,11 @@ use std::{ }; pub type EntitySyncTimer = SysTimer; -pub type MessageTimer = SysTimer; +pub type GeneralMsgTimer = SysTimer; +pub type PingMsgTimer = SysTimer; +pub type RegisterMsgTimer = SysTimer; +pub type CharacterScreenMsgTimer = SysTimer; +pub type InGameMsgTimer = SysTimer; pub type SentinelTimer = SysTimer; pub type SubscriptionTimer = SysTimer; pub type TerrainTimer = SysTimer; diff --git a/server/src/sys/msg/character_screen.rs b/server/src/sys/msg/character_screen.rs new file mode 100644 index 0000000000..6dae2482fb --- /dev/null +++ b/server/src/sys/msg/character_screen.rs @@ -0,0 +1,194 @@ +use super::super::SysTimer; +use crate::{ + alias_validator::AliasValidator, character_creator, client::Client, + persistence::character_loader::CharacterLoader, presence::Presence, EditableSettings, +}; +use common::{ + comp::{ChatType, Player, UnresolvedChatMsg}, + event::{EventBus, ServerEvent}, + msg::{ClientGeneral, ServerGeneral}, + span, + sync::Uid, +}; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write}; +use std::sync::atomic::Ordering; +use tracing::{debug, warn}; + +impl Sys { + #[allow(clippy::too_many_arguments)] + fn handle_client_character_screen_msg( + server_emitter: &mut common::event::Emitter<'_, ServerEvent>, + new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, + entity: specs::Entity, + client: &Client, + character_loader: &ReadExpect<'_, CharacterLoader>, + uids: &ReadStorage<'_, Uid>, + players: &ReadStorage<'_, Player>, + presences: &ReadStorage<'_, Presence>, + editable_settings: &ReadExpect<'_, EditableSettings>, + alias_validator: &ReadExpect<'_, AliasValidator>, + msg: ClientGeneral, + ) -> Result<(), crate::error::Error> { + match msg { + // Request spectator state + ClientGeneral::Spectate => { + if players.contains(entity) { + warn!("Spectator mode not yet implemented on server"); + } else { + debug!("dropped Spectate msg from unregistered client") + } + }, + ClientGeneral::Character(character_id) => { + if let Some(player) = players.get(entity) { + if presences.contains(entity) { + debug!("player already ingame, aborting"); + } else { + // Send a request to load the character's component data from the + // DB. Once loaded, persisted components such as stats and inventory + // will be inserted for the entity + character_loader.load_character_data( + entity, + player.uuid().to_string(), + character_id, + ); + + // Start inserting non-persisted/default components for the entity + // while we load the DB data + server_emitter.emit(ServerEvent::InitCharacterData { + entity, + character_id, + }); + + // Give the player a welcome message + if !editable_settings.server_description.is_empty() { + client.send(ChatType::CommandInfo.server_msg(String::from( + &*editable_settings.server_description, + )))?; + } + + if !client.login_msg_sent.load(Ordering::Relaxed) { + if let Some(player_uid) = uids.get(entity) { + new_chat_msgs.push((None, UnresolvedChatMsg { + chat_type: ChatType::Online(*player_uid), + message: "".to_string(), + })); + + client.login_msg_sent.store(true, Ordering::Relaxed); + } + } + } + } else { + debug!("Client is not yet registered"); + client.send(ServerGeneral::CharacterDataLoadError(String::from( + "Failed to fetch player entity", + )))? + } + }, + ClientGeneral::RequestCharacterList => { + if let Some(player) = players.get(entity) { + character_loader.load_character_list(entity, player.uuid().to_string()) + } + }, + ClientGeneral::CreateCharacter { alias, tool, body } => { + if let Err(error) = alias_validator.validate(&alias) { + debug!(?error, ?alias, "denied alias as it contained a banned word"); + client.send(ServerGeneral::CharacterActionError(error.to_string()))?; + } else if let Some(player) = players.get(entity) { + character_creator::create_character( + entity, + player.uuid().to_string(), + alias, + tool, + body, + character_loader, + ); + } + }, + ClientGeneral::DeleteCharacter(character_id) => { + if let Some(player) = players.get(entity) { + character_loader.delete_character( + entity, + player.uuid().to_string(), + character_id, + ); + } + }, + _ => unreachable!("not a client_character_screen msg"), + } + Ok(()) + } +} + +/// This system will handle new messages from clients +pub struct Sys; +impl<'a> System<'a> for Sys { + #[allow(clippy::type_complexity)] + type SystemData = ( + Entities<'a>, + Read<'a, EventBus>, + ReadExpect<'a, CharacterLoader>, + Write<'a, SysTimer>, + ReadStorage<'a, Uid>, + ReadStorage<'a, Client>, + ReadStorage<'a, Player>, + ReadStorage<'a, Presence>, + ReadExpect<'a, EditableSettings>, + ReadExpect<'a, AliasValidator>, + ); + + fn run( + &mut self, + ( + entities, + server_event_bus, + character_loader, + mut timer, + uids, + clients, + players, + presences, + editable_settings, + alias_validator, + ): Self::SystemData, + ) { + span!(_guard, "run", "msg::character_screen::Sys::run"); + timer.start(); + + let mut server_emitter = server_event_bus.emitter(); + let mut new_chat_msgs = Vec::new(); + + for (entity, client) in (&entities, &clients).join() { + let _ = super::try_recv_all(client, 1, |client, msg| { + Self::handle_client_character_screen_msg( + &mut server_emitter, + &mut new_chat_msgs, + entity, + client, + &character_loader, + &uids, + &players, + &presences, + &editable_settings, + &alias_validator, + msg, + ) + }); + } + + // Handle new chat messages. + for (entity, msg) in new_chat_msgs { + // Handle chat commands. + if msg.message.starts_with('/') { + if let (Some(entity), true) = (entity, msg.message.len() > 1) { + let argv = String::from(&msg.message[1..]); + server_emitter.emit(ServerEvent::ChatCmd(entity, argv)); + } + } else { + // Send chat message + server_emitter.emit(ServerEvent::Chat(msg)); + } + } + + timer.end() + } +} diff --git a/server/src/sys/msg/general.rs b/server/src/sys/msg/general.rs new file mode 100644 index 0000000000..8051da65b5 --- /dev/null +++ b/server/src/sys/msg/general.rs @@ -0,0 +1,137 @@ +use super::super::SysTimer; +use crate::{client::Client, metrics::PlayerMetrics}; +use common::{ + comp::{ChatMode, Player, UnresolvedChatMsg}, + event::{EventBus, ServerEvent}, + msg::{validate_chat_msg, ChatMsgValidationError, ClientGeneral, MAX_BYTES_CHAT_MSG}, + span, + state::Time, + sync::Uid, +}; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write}; +use std::sync::atomic::Ordering; +use tracing::{debug, error, warn}; + +impl Sys { + #[allow(clippy::too_many_arguments)] + fn handle_general_msg( + server_emitter: &mut common::event::Emitter<'_, ServerEvent>, + new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, + entity: specs::Entity, + client: &Client, + player: Option<&Player>, + player_metrics: &ReadExpect<'_, PlayerMetrics>, + uids: &ReadStorage<'_, Uid>, + chat_modes: &ReadStorage<'_, ChatMode>, + msg: ClientGeneral, + ) -> Result<(), crate::error::Error> { + match msg { + ClientGeneral::ChatMsg(message) => { + if player.is_some() { + match validate_chat_msg(&message) { + Ok(()) => { + if let Some(from) = uids.get(entity) { + let mode = chat_modes.get(entity).cloned().unwrap_or_default(); + let msg = mode.new_message(*from, message); + new_chat_msgs.push((Some(entity), msg)); + } else { + error!("Could not send message. Missing player uid"); + } + }, + Err(ChatMsgValidationError::TooLong) => { + let max = MAX_BYTES_CHAT_MSG; + let len = message.len(); + warn!(?len, ?max, "Received a chat message that's too long") + }, + } + } + }, + ClientGeneral::Terminate => { + debug!(?entity, "Client send message to termitate session"); + player_metrics + .clients_disconnected + .with_label_values(&["gracefully"]) + .inc(); + client.terminate_msg_recv.store(true, Ordering::Relaxed); + server_emitter.emit(ServerEvent::ClientDisconnect(entity)); + }, + _ => unreachable!("not a client_general msg"), + } + Ok(()) + } +} + +/// This system will handle new messages from clients +pub struct Sys; +impl<'a> System<'a> for Sys { + #[allow(clippy::type_complexity)] + type SystemData = ( + Entities<'a>, + Read<'a, EventBus>, + Read<'a, Time>, + ReadExpect<'a, PlayerMetrics>, + Write<'a, SysTimer>, + ReadStorage<'a, Uid>, + ReadStorage<'a, ChatMode>, + ReadStorage<'a, Player>, + ReadStorage<'a, Client>, + ); + + fn run( + &mut self, + ( + entities, + server_event_bus, + time, + player_metrics, + mut timer, + uids, + chat_modes, + players, + clients, + ): Self::SystemData, + ) { + span!(_guard, "run", "msg::general::Sys::run"); + timer.start(); + + let mut server_emitter = server_event_bus.emitter(); + let mut new_chat_msgs = Vec::new(); + + for (entity, client, player) in (&entities, &clients, (&players).maybe()).join() { + let res = super::try_recv_all(client, 3, |client, msg| { + Self::handle_general_msg( + &mut server_emitter, + &mut new_chat_msgs, + entity, + client, + player, + &player_metrics, + &uids, + &chat_modes, + msg, + ) + }); + + if let Ok(1_u64..=u64::MAX) = res { + // Update client ping. + *client.last_ping.lock().unwrap() = time.0 + } + } + + // Handle new chat messages. + for (entity, msg) in new_chat_msgs { + // Handle chat commands. + if msg.message.starts_with('/') { + if let (Some(entity), true) = (entity, msg.message.len() > 1) { + let argv = String::from(&msg.message[1..]); + server_emitter.emit(ServerEvent::ChatCmd(entity, argv)); + } + } else { + // Send chat message + server_emitter.emit(ServerEvent::Chat(msg)); + } + } + + timer.end() + } +} diff --git a/server/src/sys/msg/in_game.rs b/server/src/sys/msg/in_game.rs new file mode 100644 index 0000000000..ae49d6d8e2 --- /dev/null +++ b/server/src/sys/msg/in_game.rs @@ -0,0 +1,242 @@ +use super::super::SysTimer; +use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence, Settings}; +use common::{ + comp::{CanBuild, ControlEvent, Controller, ForceUpdate, Ori, Pos, Stats, Vel}, + event::{EventBus, ServerEvent}, + msg::{ClientGeneral, PresenceKind, ServerGeneral}, + span, + state::BlockChange, + terrain::{TerrainChunkSize, TerrainGrid}, + vol::{ReadVol, RectVolSize}, +}; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage}; +use tracing::{debug, trace}; + +impl Sys { + #[allow(clippy::too_many_arguments)] + fn handle_client_in_game_msg( + server_emitter: &mut common::event::Emitter<'_, ServerEvent>, + entity: specs::Entity, + client: &Client, + maybe_presence: &mut Option<&mut Presence>, + terrain: &ReadExpect<'_, TerrainGrid>, + network_metrics: &ReadExpect<'_, NetworkRequestMetrics>, + can_build: &ReadStorage<'_, CanBuild>, + force_updates: &ReadStorage<'_, ForceUpdate>, + stats: &mut WriteStorage<'_, Stats>, + block_changes: &mut Write<'_, BlockChange>, + positions: &mut WriteStorage<'_, Pos>, + velocities: &mut WriteStorage<'_, Vel>, + orientations: &mut WriteStorage<'_, Ori>, + controllers: &mut WriteStorage<'_, Controller>, + settings: &Read<'_, Settings>, + msg: ClientGeneral, + ) -> Result<(), crate::error::Error> { + let presence = match maybe_presence { + Some(g) => g, + None => { + debug!(?entity, "client is not in_game, ignoring msg"); + trace!(?msg, "ignored msg content"); + if matches!(msg, ClientGeneral::TerrainChunkRequest{ .. }) { + network_metrics.chunks_request_dropped.inc(); + } + return Ok(()); + }, + }; + match msg { + // Go back to registered state (char selection screen) + ClientGeneral::ExitInGame => { + server_emitter.emit(ServerEvent::ExitIngame { entity }); + client.send(ServerGeneral::ExitInGameSuccess)?; + *maybe_presence = None; + }, + ClientGeneral::SetViewDistance(view_distance) => { + presence.view_distance = settings + .max_view_distance + .map(|max| view_distance.min(max)) + .unwrap_or(view_distance); + + //correct client if its VD is to high + if settings + .max_view_distance + .map(|max| view_distance > max) + .unwrap_or(false) + { + client.send(ServerGeneral::SetViewDistance( + settings.max_view_distance.unwrap_or(0), + ))?; + } + }, + ClientGeneral::ControllerInputs(inputs) => { + if matches!(presence.kind, PresenceKind::Character(_)) { + if let Some(controller) = controllers.get_mut(entity) { + controller.inputs.update_with_new(inputs); + } + } + }, + ClientGeneral::ControlEvent(event) => { + if matches!(presence.kind, PresenceKind::Character(_)) { + // Skip respawn if client entity is alive + if let ControlEvent::Respawn = event { + if stats.get(entity).map_or(true, |s| !s.is_dead) { + //Todo: comment why return! + return Ok(()); + } + } + if let Some(controller) = controllers.get_mut(entity) { + controller.events.push(event); + } + } + }, + ClientGeneral::ControlAction(event) => { + if matches!(presence.kind, PresenceKind::Character(_)) { + if let Some(controller) = controllers.get_mut(entity) { + controller.actions.push(event); + } + } + }, + ClientGeneral::PlayerPhysics { pos, vel, ori } => { + if matches!(presence.kind, PresenceKind::Character(_)) + && force_updates.get(entity).is_none() + && stats.get(entity).map_or(true, |s| !s.is_dead) + { + let _ = positions.insert(entity, pos); + let _ = velocities.insert(entity, vel); + let _ = orientations.insert(entity, ori); + } + }, + ClientGeneral::BreakBlock(pos) => { + if let Some(block) = can_build.get(entity).and_then(|_| terrain.get(pos).ok()) { + block_changes.set(pos, block.into_vacant()); + } + }, + ClientGeneral::PlaceBlock(pos, block) => { + if can_build.get(entity).is_some() { + block_changes.try_set(pos, block); + } + }, + ClientGeneral::TerrainChunkRequest { key } => { + let in_vd = if let Some(pos) = positions.get(entity) { + pos.0.xy().map(|e| e as f64).distance( + key.map(|e| e as f64 + 0.5) * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), + ) < (presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt()) + * TerrainChunkSize::RECT_SIZE.x as f64 + } else { + true + }; + if in_vd { + match terrain.get_key(key) { + Some(chunk) => { + network_metrics.chunks_served_from_memory.inc(); + client.send(ServerGeneral::TerrainChunkUpdate { + key, + chunk: Ok(Box::new(chunk.clone())), + })? + }, + None => { + network_metrics.chunks_generation_triggered.inc(); + server_emitter.emit(ServerEvent::ChunkRequest(entity, key)) + }, + } + } else { + network_metrics.chunks_request_dropped.inc(); + } + }, + ClientGeneral::UnlockSkill(skill) => { + stats + .get_mut(entity) + .map(|s| s.skill_set.unlock_skill(skill)); + }, + ClientGeneral::RefundSkill(skill) => { + stats + .get_mut(entity) + .map(|s| s.skill_set.refund_skill(skill)); + }, + ClientGeneral::UnlockSkillGroup(skill_group_type) => { + stats + .get_mut(entity) + .map(|s| s.skill_set.unlock_skill_group(skill_group_type)); + }, + _ => unreachable!("not a client_in_game msg"), + } + Ok(()) + } +} + +/// This system will handle new messages from clients +pub struct Sys; +impl<'a> System<'a> for Sys { + #[allow(clippy::type_complexity)] + type SystemData = ( + Entities<'a>, + Read<'a, EventBus>, + ReadExpect<'a, TerrainGrid>, + ReadExpect<'a, NetworkRequestMetrics>, + Write<'a, SysTimer>, + ReadStorage<'a, CanBuild>, + ReadStorage<'a, ForceUpdate>, + WriteStorage<'a, Stats>, + Write<'a, BlockChange>, + WriteStorage<'a, Pos>, + WriteStorage<'a, Vel>, + WriteStorage<'a, Ori>, + WriteStorage<'a, Presence>, + WriteStorage<'a, Client>, + WriteStorage<'a, Controller>, + Read<'a, Settings>, + ); + + fn run( + &mut self, + ( + entities, + server_event_bus, + terrain, + network_metrics, + mut timer, + can_build, + force_updates, + mut stats, + mut block_changes, + mut positions, + mut velocities, + mut orientations, + mut presences, + mut clients, + mut controllers, + settings, + ): Self::SystemData, + ) { + span!(_guard, "run", "msg::in_game::Sys::run"); + timer.start(); + + let mut server_emitter = server_event_bus.emitter(); + + for (entity, client, mut maybe_presence) in + (&entities, &mut clients, (&mut presences).maybe()).join() + { + let _ = super::try_recv_all(client, 2, |client, msg| { + Self::handle_client_in_game_msg( + &mut server_emitter, + entity, + client, + &mut maybe_presence, + &terrain, + &network_metrics, + &can_build, + &force_updates, + &mut stats, + &mut block_changes, + &mut positions, + &mut velocities, + &mut orientations, + &mut controllers, + &settings, + msg, + ) + }); + } + + timer.end() + } +} diff --git a/server/src/sys/msg/mod.rs b/server/src/sys/msg/mod.rs new file mode 100644 index 0000000000..0242d83794 --- /dev/null +++ b/server/src/sys/msg/mod.rs @@ -0,0 +1,33 @@ +pub mod character_screen; +pub mod general; +pub mod in_game; +pub mod ping; +pub mod register; + +use crate::client::Client; +use serde::de::DeserializeOwned; + +/// handles all send msg and calls a handle fn +/// Aborts when a error occurred returns cnt of successful msg otherwise +pub(in crate::sys::msg) fn try_recv_all( + client: &Client, + stream_id: u8, + mut f: F, +) -> Result +where + M: DeserializeOwned, + F: FnMut(&Client, M) -> Result<(), crate::error::Error>, +{ + let mut cnt = 0u64; + loop { + let msg = match client.recv(stream_id) { + Ok(Some(msg)) => msg, + Ok(None) => break Ok(cnt), + Err(e) => break Err(e.into()), + }; + if let Err(e) = f(client, msg) { + break Err(e); + } + cnt += 1; + } +} diff --git a/server/src/sys/msg/ping.rs b/server/src/sys/msg/ping.rs new file mode 100644 index 0000000000..aa476ac2ae --- /dev/null +++ b/server/src/sys/msg/ping.rs @@ -0,0 +1,95 @@ +use super::super::SysTimer; +use crate::{client::Client, metrics::PlayerMetrics, Settings}; +use common::{ + event::{EventBus, ServerEvent}, + msg::PingMsg, + span, + state::Time, +}; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write}; +use std::sync::atomic::Ordering; +use tracing::{debug, info}; + +impl Sys { + fn handle_ping_msg(client: &Client, msg: PingMsg) -> Result<(), crate::error::Error> { + match msg { + PingMsg::Ping => client.send(PingMsg::Pong)?, + PingMsg::Pong => {}, + } + Ok(()) + } +} + +/// This system will handle new messages from clients +pub struct Sys; +impl<'a> System<'a> for Sys { + #[allow(clippy::type_complexity)] + type SystemData = ( + Entities<'a>, + Read<'a, EventBus>, + Read<'a, Time>, + ReadExpect<'a, PlayerMetrics>, + Write<'a, SysTimer>, + ReadStorage<'a, Client>, + Read<'a, Settings>, + ); + + fn run( + &mut self, + ( + entities, + server_event_bus, + time, + player_metrics, + mut timer, + clients, + settings, + ): Self::SystemData, + ) { + span!(_guard, "run", "msg::ping::Sys::run"); + timer.start(); + + let mut server_emitter = server_event_bus.emitter(); + + for (entity, client) in (&entities, &clients).join() { + let res = super::try_recv_all(client, 4, Self::handle_ping_msg); + + match res { + Err(e) => { + if !client.terminate_msg_recv.load(Ordering::Relaxed) { + debug!(?entity, ?e, "network error with client, disconnecting"); + player_metrics + .clients_disconnected + .with_label_values(&["network_error"]) + .inc(); + server_emitter.emit(ServerEvent::ClientDisconnect(entity)); + } + }, + Ok(1_u64..=u64::MAX) => { + // Update client ping. + *client.last_ping.lock().unwrap() = time.0 + }, + Ok(0) => { + let last_ping: f64 = *client.last_ping.lock().unwrap(); + if time.0 - last_ping > settings.client_timeout.as_secs() as f64 + // Timeout + { + if !client.terminate_msg_recv.load(Ordering::Relaxed) { + info!(?entity, "timeout error with client, disconnecting"); + player_metrics + .clients_disconnected + .with_label_values(&["timeout"]) + .inc(); + server_emitter.emit(ServerEvent::ClientDisconnect(entity)); + } + } else if time.0 - last_ping > settings.client_timeout.as_secs() as f64 * 0.5 { + // Try pinging the client if the timeout is nearing. + client.send_fallible(PingMsg::Ping); + } + }, + } + } + + timer.end() + } +} diff --git a/server/src/sys/msg/register.rs b/server/src/sys/msg/register.rs new file mode 100644 index 0000000000..d072ada6c8 --- /dev/null +++ b/server/src/sys/msg/register.rs @@ -0,0 +1,173 @@ +use super::super::SysTimer; +use crate::{ + client::Client, login_provider::LoginProvider, metrics::PlayerMetrics, EditableSettings, +}; +use common::{ + comp::{Admin, Player, Stats}, + msg::{ + CharacterInfo, ClientRegister, PlayerInfo, PlayerListUpdate, RegisterError, ServerGeneral, + ServerRegisterAnswer, + }, + span, + sync::Uid, +}; +use hashbrown::HashMap; +use specs::{Entities, Join, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage}; + +impl Sys { + #[allow(clippy::too_many_arguments)] + fn handle_register_msg( + player_list: &HashMap, + new_players: &mut Vec, + entity: specs::Entity, + client: &Client, + player_metrics: &ReadExpect<'_, PlayerMetrics>, + login_provider: &mut WriteExpect<'_, LoginProvider>, + admins: &mut WriteStorage<'_, Admin>, + players: &mut WriteStorage<'_, Player>, + editable_settings: &ReadExpect<'_, EditableSettings>, + msg: ClientRegister, + ) -> Result<(), crate::error::Error> { + let (username, uuid) = match login_provider.try_login( + &msg.token_or_username, + &*editable_settings.admins, + &*editable_settings.whitelist, + &*editable_settings.banlist, + ) { + Err(err) => { + client.send(ServerRegisterAnswer::Err(err))?; + return Ok(()); + }, + Ok((username, uuid)) => (username, uuid), + }; + + let player = Player::new(username, uuid); + let is_admin = editable_settings.admins.contains(&uuid); + + if !player.is_valid() { + // Invalid player + client.send(ServerRegisterAnswer::Err(RegisterError::InvalidCharacter))?; + return Ok(()); + } + + if !players.contains(entity) { + // Add Player component to this client + let _ = players.insert(entity, player); + player_metrics.players_connected.inc(); + + // Give the Admin component to the player if their name exists in + // admin list + if is_admin { + let _ = admins.insert(entity, Admin); + } + + // Tell the client its request was successful. + client.send(ServerRegisterAnswer::Ok(()))?; + + // Send initial player list + client.send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init( + player_list.clone(), + )))?; + + // Add to list to notify all clients of the new player + new_players.push(entity); + } + + Ok(()) + } +} + +/// This system will handle new messages from clients +pub struct Sys; +impl<'a> System<'a> for Sys { + #[allow(clippy::type_complexity)] + type SystemData = ( + Entities<'a>, + ReadExpect<'a, PlayerMetrics>, + Write<'a, SysTimer>, + ReadStorage<'a, Uid>, + ReadStorage<'a, Client>, + WriteStorage<'a, Player>, + ReadStorage<'a, Stats>, + WriteExpect<'a, LoginProvider>, + WriteStorage<'a, Admin>, + ReadExpect<'a, EditableSettings>, + ); + + fn run( + &mut self, + ( + entities, + player_metrics, + mut timer, + uids, + clients, + mut players, + stats, + mut login_provider, + mut admins, + editable_settings, + ): Self::SystemData, + ) { + span!(_guard, "run", "msg::register::Sys::run"); + timer.start(); + + // Player list to send new players. + let player_list = (&uids, &players, stats.maybe(), admins.maybe()) + .join() + .map(|(uid, player, stats, admin)| { + (*uid, PlayerInfo { + is_online: true, + is_admin: admin.is_some(), + player_alias: player.alias.clone(), + character: stats.map(|stats| CharacterInfo { + name: stats.name.clone(), + level: stats.level.level(), + }), + }) + }) + .collect::>(); + // List of new players to update player lists of all clients. + let mut new_players = Vec::new(); + + for (entity, client) in (&entities, &clients).join() { + let _ = super::try_recv_all(client, 0, |client, msg| { + Self::handle_register_msg( + &player_list, + &mut new_players, + entity, + client, + &player_metrics, + &mut login_provider, + &mut admins, + &mut players, + &editable_settings, + msg, + ) + }); + } + + // Handle new players. + // Tell all clients to add them to the player list. + for entity in new_players { + if let (Some(uid), Some(player)) = (uids.get(entity), players.get(entity)) { + let mut lazy_msg = None; + for (_, client) in (&players, &clients).join() { + if lazy_msg.is_none() { + lazy_msg = Some(client.prepare(ServerGeneral::PlayerListUpdate( + PlayerListUpdate::Add(*uid, PlayerInfo { + player_alias: player.alias.clone(), + is_online: true, + is_admin: admins.get(entity).is_some(), + character: None, // new players will be on character select. + }), + ))); + } + lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); + } + } + } + + timer.end() + } +} diff --git a/server/src/sys/persistence.rs b/server/src/sys/persistence.rs index 91bc9c4fe4..60c578d7d8 100644 --- a/server/src/sys/persistence.rs +++ b/server/src/sys/persistence.rs @@ -1,9 +1,11 @@ use crate::{ persistence::character_updater, + presence::Presence, sys::{SysScheduler, SysTimer}, }; use common::{ - comp::{Inventory, Loadout, Player, Stats}, + comp::{Inventory, Loadout, Stats}, + msg::PresenceKind, span, }; use specs::{Join, ReadExpect, ReadStorage, System, Write}; @@ -13,7 +15,7 @@ pub struct Sys; impl<'a> System<'a> for Sys { #[allow(clippy::type_complexity)] // TODO: Pending review in #587 type SystemData = ( - ReadStorage<'a, Player>, + ReadStorage<'a, Presence>, ReadStorage<'a, Stats>, ReadStorage<'a, Inventory>, ReadStorage<'a, Loadout>, @@ -25,7 +27,7 @@ impl<'a> System<'a> for Sys { fn run( &mut self, ( - players, + presences, player_stats, player_inventories, player_loadouts, @@ -39,17 +41,18 @@ impl<'a> System<'a> for Sys { timer.start(); updater.batch_update( ( - &players, + &presences, &player_stats, &player_inventories, &player_loadouts, ) .join() - .filter_map(|(player, stats, inventory, loadout)| { - player - .character_id - .map(|id| (id, stats, inventory, loadout)) - }), + .filter_map( + |(presence, stats, inventory, loadout)| match presence.kind { + PresenceKind::Character(id) => Some((id, stats, inventory, loadout)), + PresenceKind::Spectator => None, + }, + ), ); timer.end(); } diff --git a/server/src/sys/subscription.rs b/server/src/sys/subscription.rs index 2eac60f2a6..b1fa4e4d71 100644 --- a/server/src/sys/subscription.rs +++ b/server/src/sys/subscription.rs @@ -2,9 +2,12 @@ use super::{ sentinel::{DeletedEntities, TrackedComps}, SysTimer, }; -use crate::client::{self, Client, RegionSubscription}; +use crate::{ + client::Client, + presence::{self, Presence, RegionSubscription}, +}; use common::{ - comp::{Ori, Player, Pos, Vel}, + comp::{Ori, Pos, Vel}, msg::ServerGeneral, region::{region_in_vd, regions_in_vd, Event as RegionEvent, RegionMap}, span, @@ -31,8 +34,8 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Pos>, ReadStorage<'a, Vel>, ReadStorage<'a, Ori>, - ReadStorage<'a, Player>, - WriteStorage<'a, Client>, + ReadStorage<'a, Presence>, + ReadStorage<'a, Client>, WriteStorage<'a, RegionSubscription>, Write<'a, DeletedEntities>, TrackedComps<'a>, @@ -49,8 +52,8 @@ impl<'a> System<'a> for Sys { positions, velocities, orientations, - players, - mut clients, + presences, + clients, mut subscriptions, mut deleted_entities, tracked_comps, @@ -71,22 +74,16 @@ impl<'a> System<'a> for Sys { // 7. Determine list of regions that are in range and iterate through it // - check if in hashset (hash calc) if not add it let mut regions_to_remove = Vec::new(); - for (client, subscription, pos, vd, client_entity) in ( - &mut clients, + for (subscription, pos, presence, client_entity, client) in ( &mut subscriptions, &positions, - &players, + &presences, &entities, + &clients, ) .join() - .filter_map(|(client, s, pos, player, e)| { - if client.in_game.is_some() { - player.view_distance.map(|v| (client, s, pos, v, e)) - } else { - None - } - }) { + let vd = presence.view_distance; // Calculate current chunk let chunk = (Vec2::::from(pos.0)) .map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32); @@ -101,7 +98,7 @@ impl<'a> System<'a> for Sys { }) - Vec2::from(pos.0)) .map2(TerrainChunkSize::RECT_SIZE, |e, sz| { - e.abs() > (sz / 2 + client::CHUNK_FUZZ) as f32 + e.abs() > (sz / 2 + presence::CHUNK_FUZZ) as f32 }) .reduce_or() { @@ -117,7 +114,9 @@ impl<'a> System<'a> for Sys { *key, pos.0, (vd as f32 * chunk_size) - + (client::CHUNK_FUZZ as f32 + client::REGION_FUZZ as f32 + chunk_size) + + (presence::CHUNK_FUZZ as f32 + + presence::REGION_FUZZ as f32 + + chunk_size) * 2.0f32.sqrt(), ) { // Add to the list of regions to remove @@ -153,7 +152,7 @@ impl<'a> System<'a> for Sys { .map(|key| subscription.regions.contains(key)) .unwrap_or(false) { - client.send_msg(ServerGeneral::DeleteEntity(uid)); + client.send_fallible(ServerGeneral::DeleteEntity(uid)); } } }, @@ -161,7 +160,7 @@ impl<'a> System<'a> for Sys { } // Tell client to delete entities in the region for (&uid, _) in (&uids, region.entities()).join() { - client.send_msg(ServerGeneral::DeleteEntity(uid)); + client.send_fallible(ServerGeneral::DeleteEntity(uid)); } } // Send deleted entities since they won't be processed for this client in entity @@ -171,14 +170,14 @@ impl<'a> System<'a> for Sys { .iter() .flat_map(|v| v.iter()) { - client.send_msg(ServerGeneral::DeleteEntity(Uid(*uid))); + client.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid))); } } for key in regions_in_vd( pos.0, (vd as f32 * chunk_size) - + (client::CHUNK_FUZZ as f32 + chunk_size) * 2.0f32.sqrt(), + + (presence::CHUNK_FUZZ as f32 + chunk_size) * 2.0f32.sqrt(), ) { // Send client initial info about the entities in this region if it was not // already within the set of subscribed regions @@ -196,7 +195,7 @@ impl<'a> System<'a> for Sys { { // Send message to create entity and tracked components and physics // components - client.send_msg(ServerGeneral::CreateEntity( + client.send_fallible(ServerGeneral::CreateEntity( tracked_comps.create_entity_package( entity, Some(*pos), @@ -217,22 +216,18 @@ impl<'a> System<'a> for Sys { /// Initialize region subscription pub fn initialize_region_subscription(world: &World, entity: specs::Entity) { - if let (Some(client_pos), Some(client_vd), Some(client)) = ( + if let (Some(client_pos), Some(presence), Some(client)) = ( world.read_storage::().get(entity), - world - .read_storage::() - .get(entity) - .map(|pl| pl.view_distance) - .and_then(|v| v), - world.write_storage::().get_mut(entity), + world.read_storage::().get(entity), + world.write_storage::().get(entity), ) { let fuzzy_chunk = (Vec2::::from(client_pos.0)) .map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32); let chunk_size = TerrainChunkSize::RECT_SIZE.reduce_max() as f32; let regions = common::region::regions_in_vd( client_pos.0, - (client_vd as f32 * chunk_size) as f32 - + (client::CHUNK_FUZZ as f32 + chunk_size) * 2.0f32.sqrt(), + (presence.view_distance as f32 * chunk_size) as f32 + + (presence::CHUNK_FUZZ as f32 + chunk_size) * 2.0f32.sqrt(), ); let region_map = world.read_resource::(); @@ -249,7 +244,7 @@ pub fn initialize_region_subscription(world: &World, entity: specs::Entity) { .join() { // Send message to create entity and tracked components and physics components - client.send_msg(ServerGeneral::CreateEntity( + client.send_fallible(ServerGeneral::CreateEntity( tracked_comps.create_entity_package( entity, Some(*pos), diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index eafa24b0ff..b2da237a3c 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -1,7 +1,7 @@ use super::SysTimer; -use crate::{chunk_generator::ChunkGenerator, client::Client, Tick}; +use crate::{chunk_generator::ChunkGenerator, client::Client, presence::Presence, Tick}; use common::{ - comp::{self, bird_medium, Alignment, Player, Pos}, + comp::{self, bird_medium, Alignment, Pos}, event::{EventBus, ServerEvent}, generation::get_npc_name, msg::ServerGeneral, @@ -12,7 +12,7 @@ use common::{ LoadoutBuilder, }; use rand::Rng; -use specs::{Join, Read, ReadStorage, System, Write, WriteExpect, WriteStorage}; +use specs::{Join, Read, ReadStorage, System, Write, WriteExpect}; use std::sync::Arc; use vek::*; @@ -33,8 +33,8 @@ impl<'a> System<'a> for Sys { WriteExpect<'a, TerrainGrid>, Write<'a, TerrainChanges>, ReadStorage<'a, Pos>, - ReadStorage<'a, Player>, - WriteStorage<'a, Client>, + ReadStorage<'a, Presence>, + ReadStorage<'a, Client>, ); fn run( @@ -47,8 +47,8 @@ impl<'a> System<'a> for Sys { mut terrain, mut terrain_changes, positions, - players, - mut clients, + presences, + clients, ): Self::SystemData, ) { span!(_guard, "run", "terrain::Sys::run"); @@ -62,8 +62,8 @@ impl<'a> System<'a> for Sys { let (chunk, supplement) = match res { Ok((chunk, supplement)) => (chunk, supplement), Err(Some(entity)) => { - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ServerGeneral::TerrainChunkUpdate { + if let Some(client) = clients.get(entity) { + client.send_fallible(ServerGeneral::TerrainChunkUpdate { key, chunk: Err(()), }); @@ -75,12 +75,7 @@ impl<'a> System<'a> for Sys { }, }; // Send the chunk to all nearby players. - for (view_distance, pos, client) in (&players, &positions, &mut clients) - .join() - .filter_map(|(player, pos, client)| { - player.view_distance.map(|vd| (vd, pos, client)) - }) - { + for (presence, pos, client) in (&presences, &positions, &clients).join() { let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32)); // Subtract 2 from the offset before computing squared magnitude // 1 since chunks need neighbors to be meshed @@ -89,8 +84,8 @@ impl<'a> System<'a> for Sys { .map(|e: i32| (e.abs() as u32).saturating_sub(2)) .magnitude_squared(); - if adjusted_dist_sqr <= view_distance.pow(2) { - client.send_msg(ServerGeneral::TerrainChunkUpdate { + if adjusted_dist_sqr <= presence.view_distance.pow(2) { + client.send_fallible(ServerGeneral::TerrainChunkUpdate { key, chunk: Ok(Box::new(chunk.clone())), }); @@ -206,12 +201,8 @@ impl<'a> System<'a> for Sys { let mut should_drop = true; // For each player with a position, calculate the distance. - for (player, pos) in (&players, &positions).join() { - if player - .view_distance - .map(|vd| chunk_in_vd(pos.0, chunk_key, &terrain, vd)) - .unwrap_or(false) - { + for (presence, pos) in (&presences, &positions).join() { + if chunk_in_vd(pos.0, chunk_key, &terrain, presence.view_distance) { should_drop = false; break; } diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index a2e21f42a9..900248b6f1 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -1,13 +1,7 @@ use super::SysTimer; -use crate::client::Client; -use common::{ - comp::{Player, Pos}, - msg::ServerGeneral, - span, - state::TerrainChanges, - terrain::TerrainGrid, -}; -use specs::{Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage}; +use crate::{client::Client, presence::Presence}; +use common::{comp::Pos, msg::ServerGeneral, span, state::TerrainChanges, terrain::TerrainGrid}; +use specs::{Join, Read, ReadExpect, ReadStorage, System, Write}; /// This systems sends new chunks to clients as well as changes to existing /// chunks @@ -19,43 +13,48 @@ impl<'a> System<'a> for Sys { Read<'a, TerrainChanges>, Write<'a, SysTimer>, ReadStorage<'a, Pos>, - ReadStorage<'a, Player>, - WriteStorage<'a, Client>, + ReadStorage<'a, Presence>, + ReadStorage<'a, Client>, ); fn run( &mut self, - (terrain, terrain_changes, mut timer, positions, players, mut clients): Self::SystemData, + (terrain, terrain_changes, mut timer, positions, presences, clients): Self::SystemData, ) { span!(_guard, "run", "terrain_sync::Sys::run"); timer.start(); // Sync changed chunks 'chunk: for chunk_key in &terrain_changes.modified_chunks { - for (player, pos, client) in (&players, &positions, &mut clients).join() { - if player - .view_distance - .map(|vd| super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, vd)) - .unwrap_or(false) + let mut lazy_msg = None; + + for (presence, pos, client) in (&presences, &positions, &clients).join() { + if super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance) { - client.send_msg(ServerGeneral::TerrainChunkUpdate { - key: *chunk_key, - chunk: Ok(Box::new(match terrain.get_key(*chunk_key) { - Some(chunk) => chunk.clone(), - None => break 'chunk, - })), - }); + if lazy_msg.is_none() { + lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { + key: *chunk_key, + chunk: Ok(Box::new(match terrain.get_key(*chunk_key) { + Some(chunk) => chunk.clone(), + None => break 'chunk, + })), + })); + } + lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); } } } // TODO: Don't send all changed blocks to all clients // Sync changed blocks - let msg = ServerGeneral::TerrainBlockUpdates(terrain_changes.modified_blocks.clone()); - for (player, client) in (&players, &mut clients).join() { - if player.view_distance.is_some() { - client.send_msg(msg.clone()); + let mut lazy_msg = None; + for (_, client) in (&presences, &clients).join() { + if lazy_msg.is_none() { + lazy_msg = Some(client.prepare(ServerGeneral::TerrainBlockUpdates( + terrain_changes.modified_blocks.clone(), + ))); } + lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); } timer.end(); diff --git a/server/src/sys/waypoint.rs b/server/src/sys/waypoint.rs index bf8034baa0..89e16be7c5 100644 --- a/server/src/sys/waypoint.rs +++ b/server/src/sys/waypoint.rs @@ -22,28 +22,36 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Player>, ReadStorage<'a, WaypointArea>, WriteStorage<'a, Waypoint>, - WriteStorage<'a, Client>, + ReadStorage<'a, Client>, Read<'a, Time>, Write<'a, SysTimer>, ); fn run( &mut self, - (entities, positions, players, waypoint_areas, mut waypoints, mut clients, time, mut timer): Self::SystemData, + ( + entities, + positions, + players, + waypoint_areas, + mut waypoints, + clients, + time, + mut timer, + ): Self::SystemData, ) { span!(_guard, "run", "waypoint::Sys::run"); timer.start(); - for (entity, player_pos, _, client) in - (&entities, &positions, &players, &mut clients).join() - { + for (entity, player_pos, _, client) in (&entities, &positions, &players, &clients).join() { for (waypoint_pos, waypoint_area) in (&positions, &waypoint_areas).join() { if player_pos.0.distance_squared(waypoint_pos.0) < waypoint_area.radius().powi(2) { if let Ok(wp_old) = waypoints.insert(entity, Waypoint::new(player_pos.0, *time)) { if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) { - client - .send_msg(ServerGeneral::Notification(Notification::WaypointSaved)); + client.send_fallible(ServerGeneral::Notification( + Notification::WaypointSaved, + )); } } } diff --git a/voxygen/src/hud/mod.rs b/voxygen/src/hud/mod.rs index a0496cadf5..1022a935dd 100644 --- a/voxygen/src/hud/mod.rs +++ b/voxygen/src/hud/mod.rs @@ -64,6 +64,7 @@ use common::{ item::{ItemDesc, Quality}, BuffKind, }, + msg::PresenceKind, span, sync::Uid, terrain::TerrainChunk, @@ -658,7 +659,12 @@ impl Hud { let server = &client.server_info.name; // Get the id, unwrap is safe because this CANNOT be None at this // point. - let character_id = client.active_character_id.unwrap(); + + let character_id = match client.presence().unwrap() { + PresenceKind::Character(id) => id, + PresenceKind::Spectator => unreachable!("HUD creation in Spectator mode!"), + }; + // Create a new HotbarState from the persisted slots. let hotbar_state = HotbarState::new(global_state.profile.get_hotbar_slots(server, character_id)); diff --git a/voxygen/src/menu/char_selection/mod.rs b/voxygen/src/menu/char_selection/mod.rs index fe80710b07..cd3adc0229 100644 --- a/voxygen/src/menu/char_selection/mod.rs +++ b/voxygen/src/menu/char_selection/mod.rs @@ -61,11 +61,11 @@ impl PlayState for CharSelectionState { fn tick(&mut self, global_state: &mut GlobalState, events: Vec) -> PlayStateResult { span!(_guard, "tick", "::tick"); - let (client_in_game, client_registered) = { + let (client_presence, client_registered) = { let client = self.client.borrow(); - (client.in_game(), client.registered()) + (client.presence(), client.registered()) }; - if client_in_game.is_none() && client_registered { + if client_presence.is_none() && client_registered { // Handle window events for event in events { if self.char_selection_ui.handle_event(event.clone()) { diff --git a/voxygen/src/session.rs b/voxygen/src/session.rs index f99d8890c4..b51a178346 100644 --- a/voxygen/src/session.rs +++ b/voxygen/src/session.rs @@ -18,6 +18,7 @@ use common::{ comp::{ChatMsg, ChatType, InventoryUpdateEvent, Pos, Vel}, consts::{MAX_MOUNT_RANGE, MAX_PICKUP_RANGE}, event::EventBus, + msg::PresenceKind, outcome::Outcome, span, terrain::{Block, BlockKind}, @@ -211,11 +212,11 @@ impl PlayState for SessionState { )); // TODO: can this be a method on the session or are there borrowcheck issues? - let (client_in_game, client_registered) = { + let (client_presence, client_registered) = { let client = self.client.borrow(); - (client.in_game(), client.registered()) + (client.presence(), client.registered()) }; - if client_in_game.is_some() { + if client_presence.is_some() { // Update MyEntity // Note: Alternatively, the client could emit an event when the entity changes // which may or may not be more elegant @@ -763,7 +764,10 @@ impl PlayState for SessionState { HudEvent::CharacterSelection => { self.client.borrow_mut().request_remove_character() }, - HudEvent::Logout => self.client.borrow_mut().request_logout(), + HudEvent::Logout => { + self.client.borrow_mut().logout(); + return PlayStateResult::Pop; + }, HudEvent::Quit => { return PlayStateResult::Shutdown; }, @@ -924,7 +928,12 @@ impl PlayState for SessionState { let server = &client.server_info.name; // If we are changing the hotbar state this CANNOT be None. - let character_id = client.active_character_id.unwrap(); + let character_id = match client.presence().unwrap() { + PresenceKind::Character(id) => id, + PresenceKind::Spectator => { + unreachable!("HUD adaption in Spectator mode!") + }, + }; // Get or update the ServerProfile. global_state @@ -1077,7 +1086,7 @@ impl PlayState for SessionState { self.cleanup(); PlayStateResult::Continue - } else if client_registered && client_in_game.is_none() { + } else if client_registered && client_presence.is_none() { PlayStateResult::Switch(Box::new(CharSelectionState::new( global_state, Rc::clone(&self.client),