diff --git a/server/src/client.rs b/server/src/client.rs index eec6385676..9462713c78 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,34 +1,10 @@ use common::msg::{ClientInGame, ClientType}; use hashbrown::HashSet; -use network::{Participant, Stream}; +use network::Participant; use specs::{Component, FlaggedStorage}; use specs_idvs::IdvStorage; use vek::*; -// Streams -// we ignore errors on send, and do unified error handling in recv -pub struct GeneralStream(pub Stream); -pub struct PingStream(pub Stream); -pub struct RegisterStream(pub Stream); -pub struct CharacterScreenStream(pub Stream); -pub struct InGameStream(pub Stream); - -impl Component for GeneralStream { - type Storage = FlaggedStorage>; -} -impl Component for PingStream { - type Storage = FlaggedStorage>; -} -impl Component for RegisterStream { - type Storage = FlaggedStorage>; -} -impl Component for CharacterScreenStream { - type Storage = FlaggedStorage>; -} -impl Component for InGameStream { - type Storage = FlaggedStorage>; -} - pub struct Client { pub registered: bool, pub client_type: ClientType, diff --git a/server/src/cmd.rs b/server/src/cmd.rs index 462d6c3e05..dfa5c9b064 100644 --- a/server/src/cmd.rs +++ b/server/src/cmd.rs @@ -26,7 +26,10 @@ use std::convert::TryFrom; use vek::*; use world::util::Sampler; -use crate::{client::InGameStream, login_provider::LoginProvider}; +use crate::{ + login_provider::LoginProvider, + streams::{GetStream, InGameStream}, +}; use scan_fmt::{scan_fmt, scan_fmt_some}; use tracing::error; @@ -670,7 +673,7 @@ fn handle_spawn( .map(|g| (g, s)) }) .map(|(g, s)| { - let _ = s.0.send(ServerGeneral::GroupUpdate(g)); + s.send_unchecked(ServerGeneral::GroupUpdate(g)); }); }, ); diff --git a/server/src/connection_handler.rs b/server/src/connection_handler.rs index 82c4f191f9..6a790f367a 100644 --- a/server/src/connection_handler.rs +++ b/server/src/connection_handler.rs @@ -1,6 +1,6 @@ use crate::{ - CharacterScreenStream, Client, ClientType, GeneralStream, InGameStream, PingStream, - RegisterStream, ServerInfo, + streams::{CharacterScreenStream, GeneralStream, InGameStream, PingStream, RegisterStream}, + Client, ClientType, ServerInfo, }; use crossbeam::{bounded, unbounded, Receiver, Sender}; use futures_channel::oneshot; diff --git a/server/src/events/entity_manipulation.rs b/server/src/events/entity_manipulation.rs index 1e3dd4af62..39bb1c39e9 100644 --- a/server/src/events/entity_manipulation.rs +++ b/server/src/events/entity_manipulation.rs @@ -1,6 +1,7 @@ use crate::{ - client::{Client, InGameStream}, + client::Client, comp::{biped_large, quadruped_medium, quadruped_small}, + streams::{GetStream, InGameStream}, Server, SpawnPoint, StateExt, }; use common::{ @@ -44,7 +45,7 @@ pub fn handle_knockback(server: &Server, entity: EcsEntity, impulse: Vec3) } let mut in_game_streams = state.ecs().write_storage::(); if let Some(in_game_stream) = in_game_streams.get_mut(entity) { - let _ = in_game_stream.0.send(ServerGeneral::Knockback(impulse)); + in_game_stream.send_unchecked(ServerGeneral::Knockback(impulse)); } } diff --git a/server/src/events/group_manip.rs b/server/src/events/group_manip.rs index 8d8e00d150..17729c436b 100644 --- a/server/src/events/group_manip.rs +++ b/server/src/events/group_manip.rs @@ -1,5 +1,5 @@ use crate::{ - client::{GeneralStream, InGameStream}, + streams::{GeneralStream, GetStream, InGameStream}, Server, }; use common::{ @@ -29,20 +29,19 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani match manip { GroupManip::Invite(uid) => { let mut general_streams = state.ecs().write_storage::(); - let invitee = - match state.ecs().entity_from_uid(uid.into()) { - Some(t) => t, - None => { - // Inform of failure - if let Some(general_stream) = general_streams.get_mut(entity) { - let _ = - general_stream.0.send(ChatType::Meta.server_msg( - "Invite failed, target does not exist.".to_owned(), - )); - } - return; - }, - }; + let invitee = match state.ecs().entity_from_uid(uid.into()) { + Some(t) => t, + None => { + // Inform of failure + if let Some(general_stream) = general_streams.get_mut(entity) { + general_stream.send_unchecked( + ChatType::Meta + .server_msg("Invite failed, target does not exist.".to_owned()), + ); + } + return; + }, + }; let uids = state.ecs().read_storage::(); @@ -67,7 +66,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if already_in_same_group { // Inform of failure if let Some(general_stream) = general_streams.get_mut(entity) { - let _ = general_stream.0.send(ChatType::Meta.server_msg( + general_stream.send_unchecked(ChatType::Meta.server_msg( "Invite failed, can't invite someone already in your group".to_owned(), )); } @@ -97,7 +96,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if group_size_limit_reached { // Inform inviter that they have reached the group size limit if let Some(general_stream) = general_streams.get_mut(entity) { - let _ = general_stream.0.send( + general_stream.send_unchecked( ChatType::Meta.server_msg( "Invite failed, pending invites plus current group size have reached \ the group size limit" @@ -114,12 +113,10 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if invites.contains(invitee) { // Inform inviter that there is already an invite if let Some(general_stream) = general_streams.get_mut(entity) { - let _ = - general_stream - .0 - .send(ChatType::Meta.server_msg( - "This player already has a pending invite.".to_owned(), - )); + general_stream.send_unchecked( + ChatType::Meta + .server_msg("This player already has a pending invite.".to_owned()), + ); } return; } @@ -163,7 +160,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani (in_game_streams.get_mut(invitee), uids.get(entity).copied()) { if send_invite() { - let _ = in_game_stream.0.send(ServerGeneral::GroupInvite { + in_game_stream.send_unchecked(ServerGeneral::GroupInvite { inviter, timeout: PRESENTED_INVITE_TIMEOUT_DUR, }); @@ -171,7 +168,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani } else if agents.contains(invitee) { send_invite(); } else if let Some(general_stream) = general_streams.get_mut(entity) { - let _ = general_stream.0.send( + general_stream.send_unchecked( ChatType::Meta.server_msg("Can't invite, not a player or npc".to_owned()), ); } @@ -179,7 +176,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Notify inviter that the invite is pending if invite_sent { if let Some(in_game_stream) = in_game_streams.get_mut(entity) { - let _ = in_game_stream.0.send(ServerGeneral::InvitePending(uid)); + in_game_stream.send_unchecked(ServerGeneral::InvitePending(uid)); } } }, @@ -204,7 +201,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if let (Some(in_game_stream), Some(target)) = (in_game_streams.get_mut(inviter), uids.get(entity).copied()) { - let _ = in_game_stream.0.send(ServerGeneral::InviteComplete { + in_game_stream.send_unchecked(ServerGeneral::InviteComplete { target, answer: InviteAnswer::Accepted, }); @@ -225,7 +222,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani .try_map(|e| uids.get(e).copied()) .map(|g| (g, s)) }) - .map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g))); + .map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g))); }, ); } @@ -252,7 +249,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if let (Some(in_game_stream), Some(target)) = (in_game_streams.get_mut(inviter), uids.get(entity).copied()) { - let _ = in_game_stream.0.send(ServerGeneral::InviteComplete { + in_game_stream.send_unchecked(ServerGeneral::InviteComplete { target, answer: InviteAnswer::Declined, }); @@ -277,7 +274,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani .try_map(|e| uids.get(e).copied()) .map(|g| (g, s)) }) - .map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g))); + .map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g))); }, ); }, @@ -291,7 +288,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani None => { // Inform of failure if let Some(general_stream) = general_streams.get_mut(entity) { - let _ = general_stream.0.send( + general_stream.send_unchecked( ChatType::Meta .server_msg("Kick failed, target does not exist.".to_owned()), ); @@ -304,7 +301,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if matches!(alignments.get(target), Some(comp::Alignment::Owned(owner)) if uids.get(target).map_or(true, |u| u != owner)) { if let Some(general_stream) = general_streams.get_mut(entity) { - let _ = general_stream.0.send( + general_stream.send_unchecked( ChatType::Meta.server_msg("Kick failed, you can't kick pets.".to_owned()), ); } @@ -313,7 +310,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Can't kick yourself if uids.get(entity).map_or(false, |u| *u == uid) { if let Some(general_stream) = general_streams.get_mut(entity) { - let _ = general_stream.0.send( + general_stream.send_unchecked( ChatType::Meta .server_msg("Kick failed, you can't kick yourself.".to_owned()), ); @@ -345,28 +342,27 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani .try_map(|e| uids.get(e).copied()) .map(|g| (g, s)) }) - .map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g))); + .map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g))); }, ); // Tell them the have been kicked if let Some(general_stream) = general_streams.get_mut(target) { - let _ = general_stream.0.send( + general_stream.send_unchecked( ChatType::Meta .server_msg("You were removed from the group.".to_owned()), ); } // Tell kicker that they were succesful if let Some(general_stream) = general_streams.get_mut(entity) { - let _ = general_stream - .0 - .send(ChatType::Meta.server_msg("Player kicked.".to_owned())); + general_stream + .send_unchecked(ChatType::Meta.server_msg("Player kicked.".to_owned())); } }, Some(_) => { // Inform kicker that they are not the leader if let Some(general_stream) = general_streams.get_mut(entity) { - let _ = general_stream.0.send(ChatType::Meta.server_msg( + general_stream.send_unchecked(ChatType::Meta.server_msg( "Kick failed: You are not the leader of the target's group.".to_owned(), )); } @@ -374,10 +370,11 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani None => { // Inform kicker that the target is not in a group if let Some(general_stream) = general_streams.get_mut(entity) { - let _ = - general_stream.0.send(ChatType::Meta.server_msg( + general_stream.send_unchecked( + ChatType::Meta.server_msg( "Kick failed: Your target is not in a group.".to_owned(), - )); + ), + ); } }, } @@ -390,7 +387,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani None => { // Inform of failure if let Some(general_stream) = general_streams.get_mut(entity) { - let _ = general_stream.0.send(ChatType::Meta.server_msg( + general_stream.send_unchecked(ChatType::Meta.server_msg( "Leadership transfer failed, target does not exist".to_owned(), )); } @@ -421,18 +418,18 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani .try_map(|e| uids.get(e).copied()) .map(|g| (g, s)) }) - .map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g))); + .map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g))); }, ); // Tell them they are the leader if let Some(general_stream) = general_streams.get_mut(target) { - let _ = general_stream.0.send( + general_stream.send_unchecked( ChatType::Meta.server_msg("You are the group leader now.".to_owned()), ); } // Tell the old leader that the transfer was succesful if let Some(general_stream) = general_streams.get_mut(target) { - let _ = general_stream.0.send( + general_stream.send_unchecked( ChatType::Meta .server_msg("You are no longer the group leader.".to_owned()), ); @@ -442,7 +439,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Inform transferer that they are not the leader let mut general_streams = state.ecs().write_storage::(); if let Some(general_stream) = general_streams.get_mut(entity) { - let _ = general_stream.0.send( + general_stream.send_unchecked( ChatType::Meta.server_msg( "Transfer failed: You are not the leader of the target's group." .to_owned(), @@ -454,7 +451,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Inform transferer that the target is not in a group let mut general_streams = state.ecs().write_storage::(); if let Some(general_stream) = general_streams.get_mut(entity) { - let _ = general_stream.0.send(ChatType::Meta.server_msg( + general_stream.send_unchecked(ChatType::Meta.server_msg( "Transfer failed: Your target is not in a group.".to_owned(), )); } diff --git a/server/src/events/interaction.rs b/server/src/events/interaction.rs index 21a27b01e2..cd1da4e31b 100644 --- a/server/src/events/interaction.rs +++ b/server/src/events/interaction.rs @@ -1,7 +1,7 @@ use crate::{ - client::{ - CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegionSubscription, - RegisterStream, + client::{Client, RegionSubscription}, + streams::{ + CharacterScreenStream, GeneralStream, GetStream, InGameStream, PingStream, RegisterStream, }, Server, }; @@ -153,55 +153,34 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) { Some(c) => c, None => return, }; - let _ = general_stream - .0 - .send(ServerGeneral::SetPlayerEntity(possesse_uid)); - clients - .insert(possesse, client) - .err() - .map(|e| error!(?e, "Error inserting client component during possession")); - general_streams - .insert(possesse, general_stream) - .err() - .map(|e| { - error!( - ?e, - "Error inserting general_streams component during possession" - ) - }); - ping_streams.insert(possesse, ping_stream).err().map(|e| { - error!( - ?e, - "Error inserting ping_streams component during possession" - ) - }); - register_streams - .insert(possesse, register_stream) - .err() - .map(|e| { - error!( - ?e, - "Error inserting register_streams component during possession" - ) - }); - character_screen_streams - .insert(possesse, character_screen_stream) - .err() - .map(|e| { - error!( - ?e, - "Error inserting character_screen_streams component during possession" - ) - }); - in_game_streams - .insert(possesse, in_game_stream) - .err() - .map(|e| { - error!( - ?e, - "Error inserting in_game_streams component during possession" - ) - }); + general_stream.send_unchecked(ServerGeneral::SetPlayerEntity(possesse_uid)); + let err_fn = |c, e: Option| { + e.map(|e| error!(?e, "Error inserting {} component during possession", c)); + }; + + err_fn("client", clients.insert(possesse, client).err()); + err_fn( + "general_streams", + general_streams.insert(possesse, general_stream).err(), + ); + err_fn( + "ping_streams", + ping_streams.insert(possesse, ping_stream).err(), + ); + err_fn( + "register_streams", + register_streams.insert(possesse, register_stream).err(), + ); + err_fn( + "character_screen_streams", + character_screen_streams + .insert(possesse, character_screen_stream) + .err(), + ); + err_fn( + "in_game_streams", + in_game_streams.insert(possesse, in_game_stream).err(), + ); // Put possess item into loadout let mut loadouts = ecs.write_storage::(); let loadout = loadouts @@ -229,22 +208,14 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) { { let mut players = ecs.write_storage::(); if let Some(player) = players.remove(possessor) { - players - .insert(possesse, player) - .err() - .map(|e| error!(?e, "Error inserting player component during possession")); + err_fn("player", players.insert(possesse, player).err()); } } // Transfer region subscription { let mut subscriptions = ecs.write_storage::(); if let Some(s) = subscriptions.remove(possessor) { - subscriptions.insert(possesse, s).err().map(|e| { - error!( - ?e, - "Error inserting subscription component during possession" - ) - }); + err_fn("subscription", subscriptions.insert(possesse, s).err()); } } // Remove will of the entity @@ -257,19 +228,14 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) { { let mut admins = ecs.write_storage::(); if let Some(admin) = admins.remove(possessor) { - admins - .insert(possesse, admin) - .err() - .map(|e| error!(?e, "Error inserting admin component during possession")); + err_fn("admin", admins.insert(possesse, admin).err()); } } // Transfer waypoint { let mut waypoints = ecs.write_storage::(); if let Some(waypoint) = waypoints.remove(possessor) { - waypoints.insert(possesse, waypoint).err().map(|e| { - error!(?e, "Error inserting waypoint component during possession",) - }); + err_fn("waypoints", waypoints.insert(possesse, waypoint).err()); } } } diff --git a/server/src/events/inventory_manip.rs b/server/src/events/inventory_manip.rs index 3bc648aea1..76dd82f638 100644 --- a/server/src/events/inventory_manip.rs +++ b/server/src/events/inventory_manip.rs @@ -1,4 +1,7 @@ -use crate::{client::InGameStream, Server, StateExt}; +use crate::{ + streams::{GetStream, InGameStream}, + Server, StateExt, +}; use common::{ comp::{ self, item, @@ -302,7 +305,7 @@ pub fn handle_inventory(server: &mut Server, entity: EcsEntity, manip: comp::Inv .map(|g| (g, s)) }) .map(|(g, s)| { - s.0.send(ServerGeneral::GroupUpdate(g)) + s.send(ServerGeneral::GroupUpdate(g)) }); }, ); diff --git a/server/src/events/player.rs b/server/src/events/player.rs index 383de143bf..0d35b28d07 100644 --- a/server/src/events/player.rs +++ b/server/src/events/player.rs @@ -1,11 +1,12 @@ use super::Event; use crate::{ - client::{ - CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegisterStream, - }, + client::Client, login_provider::LoginProvider, persistence, state_ext::StateExt, + streams::{ + CharacterScreenStream, GeneralStream, GetStream, InGameStream, PingStream, RegisterStream, + }, Server, }; use common::{ @@ -49,11 +50,11 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) { Some(mut client), Some(uid), Some(player), - Some(mut general_stream), + Some(general_stream), Some(ping_stream), Some(register_stream), Some(character_screen_stream), - Some(in_game_stream), + Some(mut in_game_stream), ) = ( maybe_client, maybe_uid, @@ -66,7 +67,7 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) { ) { // Tell client its request was successful client.in_game = None; - let _ = general_stream.0.send(ServerGeneral::ExitInGameSuccess); + in_game_stream.send_unchecked(ServerGeneral::ExitInGameSuccess); let entity_builder = state .ecs_mut() diff --git a/server/src/lib.rs b/server/src/lib.rs index 976c9f4d9c..9f5eced842 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -19,6 +19,7 @@ pub mod metrics; pub mod persistence; pub mod settings; pub mod state_ext; +pub mod streams; pub mod sys; #[cfg(not(feature = "worldgen"))] mod test_world; @@ -34,15 +35,15 @@ pub use crate::{ use crate::{ alias_validator::AliasValidator, chunk_generator::ChunkGenerator, - client::{ - CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegionSubscription, - RegisterStream, - }, + client::{Client, RegionSubscription}, cmd::ChatCommandExt, connection_handler::ConnectionHandler, data_dir::DataDir, login_provider::LoginProvider, state_ext::StateExt, + streams::{ + CharacterScreenStream, GeneralStream, GetStream, InGameStream, PingStream, RegisterStream, + }, sys::sentinel::{DeletedEntities, TrackedComps}, }; use common::{ @@ -166,7 +167,13 @@ impl Server { // System timers for performance monitoring state.ecs_mut().insert(sys::EntitySyncTimer::default()); - state.ecs_mut().insert(sys::MessageTimer::default()); + state.ecs_mut().insert(sys::GeneralMsgTimer::default()); + state.ecs_mut().insert(sys::PingMsgTimer::default()); + state.ecs_mut().insert(sys::RegisterMsgTimer::default()); + state + .ecs_mut() + .insert(sys::CharacterScreenMsgTimer::default()); + state.ecs_mut().insert(sys::InGameMsgTimer::default()); state.ecs_mut().insert(sys::SentinelTimer::default()); state.ecs_mut().insert(sys::SubscriptionTimer::default()); state.ecs_mut().insert(sys::TerrainSyncTimer::default()); @@ -466,7 +473,12 @@ impl Server { // Run message receiving sys before the systems in common for decreased latency // (e.g. run before controller system) - sys::message::Sys.run_now(&self.state.ecs()); + //TODO: run in parallel + sys::msg::general::Sys.run_now(&self.state.ecs()); + sys::msg::register::Sys.run_now(&self.state.ecs()); + sys::msg::character_screen::Sys.run_now(&self.state.ecs()); + sys::msg::in_game::Sys.run_now(&self.state.ecs()); + sys::msg::ping::Sys.run_now(&self.state.ecs()); let before_state_tick = Instant::now(); @@ -615,7 +627,14 @@ impl Server { .ecs() .read_resource::() .nanos as i64; - let message_nanos = self.state.ecs().read_resource::().nanos as i64; + let message_nanos = { + let state = self.state.ecs(); + (state.read_resource::().nanos + + state.read_resource::().nanos + + state.read_resource::().nanos + + state.read_resource::().nanos + + state.read_resource::().nanos) as i64 + }; let sentinel_nanos = self.state.ecs().read_resource::().nanos as i64; let subscription_nanos = self .state @@ -882,7 +901,7 @@ impl Server { .ecs() .write_storage::() .get_mut(entity) - .map(|s| s.0.send(msg)); + .map(|s| s.send(msg)); }, ServerMsg::General(msg) => { match &msg { @@ -895,7 +914,7 @@ impl Server { .ecs() .write_storage::() .get_mut(entity) - .map(|s| s.0.send(msg)), + .map(|s| s.send(msg)), //Ingame related ServerGeneral::GroupUpdate(_) | ServerGeneral::GroupInvite { .. } @@ -912,7 +931,7 @@ impl Server { .ecs() .write_storage::() .get_mut(entity) - .map(|s| s.0.send(msg)), + .map(|s| s.send(msg)), // Always possible ServerGeneral::PlayerListUpdate(_) | ServerGeneral::ChatMsg(_) @@ -928,7 +947,7 @@ impl Server { .ecs() .write_storage::() .get_mut(entity) - .map(|s| s.0.send(msg)), + .map(|s| s.send(msg)), }; }, ServerMsg::Ping(msg) => { @@ -936,7 +955,7 @@ impl Server { .ecs() .write_storage::() .get_mut(entity) - .map(|s| s.0.send(msg)); + .map(|s| s.send(msg)); }, } } diff --git a/server/src/state_ext.rs b/server/src/state_ext.rs index 57d714838b..d7cb4a3a57 100644 --- a/server/src/state_ext.rs +++ b/server/src/state_ext.rs @@ -1,6 +1,7 @@ use crate::{ - client::{CharacterScreenStream, Client, GeneralStream}, + client::Client, persistence::PersistedComponents, + streams::{CharacterScreenStream, GeneralStream, GetStream, InGameStream}, sys::sentinel::DeletedEntities, SpawnPoint, }; @@ -231,9 +232,7 @@ impl StateExt for State { .get_mut(entity) { client.in_game = Some(ClientInGame::Character); - let _ = character_screen_stream - .0 - .send(ServerGeneral::CharacterSuccess); + character_screen_stream.send_unchecked(ServerGeneral::CharacterSuccess); } } } @@ -292,14 +291,14 @@ impl StateExt for State { self.notify_registered_clients(ServerGeneral::ChatMsg(resolved_msg)) }, comp::ChatType::Online(u) => { - for (client, uid) in ( - &mut ecs.write_storage::(), + for (general_stream, uid) in ( + &mut ecs.write_storage::(), &ecs.read_storage::(), ) .join() { if uid != u { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, @@ -311,9 +310,7 @@ impl StateExt for State { .join() { if uid == u || uid == t { - let _ = general_stream - .0 - .send(ServerGeneral::ChatMsg(resolved_msg.clone())); + general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, @@ -327,9 +324,8 @@ impl StateExt for State { (&mut ecs.write_storage::(), &positions).join() { if is_within(comp::ChatMsg::SAY_DISTANCE, pos, speaker_pos) { - let _ = general_stream - .0 - .send(ServerGeneral::ChatMsg(resolved_msg.clone())); + general_stream + .send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } @@ -343,9 +339,8 @@ impl StateExt for State { (&mut ecs.write_storage::(), &positions).join() { if is_within(comp::ChatMsg::REGION_DISTANCE, pos, speaker_pos) { - let _ = general_stream - .0 - .send(ServerGeneral::ChatMsg(resolved_msg.clone())); + general_stream + .send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } @@ -359,9 +354,8 @@ impl StateExt for State { (&mut ecs.write_storage::(), &positions).join() { if is_within(comp::ChatMsg::NPC_DISTANCE, pos, speaker_pos) { - let _ = general_stream - .0 - .send(ServerGeneral::ChatMsg(resolved_msg.clone())); + general_stream + .send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } @@ -375,9 +369,7 @@ impl StateExt for State { .join() { if s == &faction.0 { - let _ = general_stream - .0 - .send(ServerGeneral::ChatMsg(resolved_msg.clone())); + general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, @@ -389,9 +381,7 @@ impl StateExt for State { .join() { if g == group { - let _ = general_stream - .0 - .send(ServerGeneral::ChatMsg(resolved_msg.clone())); + general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, @@ -407,7 +397,7 @@ impl StateExt for State { .join() .filter(|(_, c)| c.registered) { - let _ = general_stream.0.send(msg.clone()); + general_stream.send_unchecked(msg.clone()); } } @@ -420,7 +410,7 @@ impl StateExt for State { .join() .filter(|(_, c)| c.in_game.is_some()) { - let _ = general_stream.0.send(msg.clone()); + general_stream.send_unchecked(msg.clone()); } } @@ -430,7 +420,7 @@ impl StateExt for State { ) -> Result<(), specs::error::WrongGeneration> { // Remove entity from a group if they are in one { - let mut general_streams = self.ecs().write_storage::(); + let mut in_game_streams = self.ecs().write_storage::(); let uids = self.ecs().read_storage::(); let mut group_manager = self.ecs().write_resource::(); group_manager.entity_deleted( @@ -440,14 +430,14 @@ impl StateExt for State { &uids, &self.ecs().entities(), &mut |entity, group_change| { - general_streams + in_game_streams .get_mut(entity) .and_then(|s| { group_change .try_map(|e| uids.get(e).copied()) .map(|g| (g, s)) }) - .map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g))); + .map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g))); }, ); } diff --git a/server/src/streams.rs b/server/src/streams.rs new file mode 100644 index 0000000000..4fa4bfd4ad --- /dev/null +++ b/server/src/streams.rs @@ -0,0 +1,118 @@ +use common::msg::{ClientGeneral, ClientRegister, PingMsg, ServerGeneral, ServerRegisterAnswer}; + +use network::{Stream, StreamError}; +use serde::{de::DeserializeOwned, Serialize}; + +use specs::Component; +use specs_idvs::IdvStorage; + +/// helped to reduce code duplication +pub(crate) trait GetStream { + type RecvMsg: DeserializeOwned; + type SendMsg: Serialize + core::fmt::Debug; + fn get_mut(&mut self) -> &mut Stream; + fn verify(msg: &Self::SendMsg) -> bool; + + fn send(&mut self, msg: Self::SendMsg) -> Result<(), StreamError> { + if Self::verify(&msg) { + self.get_mut().send(msg) + } else { + unreachable!("sending this msg isn't allowed! got: {:?}", msg) + } + } + + fn send_unchecked(&mut self, msg: Self::SendMsg) { let _ = self.send(msg); } +} + +// Streams +// we ignore errors on send, and do unified error handling in recv +pub struct GeneralStream(pub(crate) Stream); +pub struct PingStream(pub(crate) Stream); +pub struct RegisterStream(pub(crate) Stream); +pub struct CharacterScreenStream(pub(crate) Stream); +pub struct InGameStream(pub(crate) Stream); + +impl Component for GeneralStream { + type Storage = IdvStorage; +} +impl Component for PingStream { + type Storage = IdvStorage; +} +impl Component for RegisterStream { + type Storage = IdvStorage; +} +impl Component for CharacterScreenStream { + type Storage = IdvStorage; +} +impl Component for InGameStream { + type Storage = IdvStorage; +} + +impl GetStream for GeneralStream { + type RecvMsg = ClientGeneral; + type SendMsg = ServerGeneral; + + fn get_mut(&mut self) -> &mut Stream { &mut self.0 } + + fn verify(msg: &Self::SendMsg) -> bool { + matches!(&msg, ServerGeneral::PlayerListUpdate(_) + | ServerGeneral::ChatMsg(_) + | ServerGeneral::SetPlayerEntity(_) + | ServerGeneral::TimeOfDay(_) + | ServerGeneral::EntitySync(_) + | ServerGeneral::CompSync(_) + | ServerGeneral::CreateEntity(_) + | ServerGeneral::DeleteEntity(_) + | ServerGeneral::Disconnect(_) + | ServerGeneral::Notification(_)) + } +} +impl GetStream for PingStream { + type RecvMsg = PingMsg; + type SendMsg = PingMsg; + + fn get_mut(&mut self) -> &mut Stream { &mut self.0 } + + fn verify(_: &Self::SendMsg) -> bool { true } +} +impl GetStream for RegisterStream { + type RecvMsg = ClientRegister; + type SendMsg = ServerRegisterAnswer; + + fn get_mut(&mut self) -> &mut Stream { &mut self.0 } + + fn verify(_: &Self::SendMsg) -> bool { true } +} +impl GetStream for CharacterScreenStream { + type RecvMsg = ClientGeneral; + type SendMsg = ServerGeneral; + + fn get_mut(&mut self) -> &mut Stream { &mut self.0 } + + fn verify(msg: &Self::SendMsg) -> bool { + matches!(&msg, ServerGeneral::CharacterDataLoadError(_) + | ServerGeneral::CharacterListUpdate(_) + | ServerGeneral::CharacterActionError(_) + | ServerGeneral::CharacterSuccess) + } +} +impl GetStream for InGameStream { + type RecvMsg = ClientGeneral; + type SendMsg = ServerGeneral; + + fn get_mut(&mut self) -> &mut Stream { &mut self.0 } + + fn verify(msg: &Self::SendMsg) -> bool { + matches!(&msg, ServerGeneral::GroupUpdate(_) + | ServerGeneral::GroupInvite { .. } + | ServerGeneral::InvitePending(_) + | ServerGeneral::InviteComplete { .. } + | ServerGeneral::ExitInGameSuccess + | ServerGeneral::InventoryUpdate(_, _) + | ServerGeneral::TerrainChunkUpdate { .. } + | ServerGeneral::TerrainBlockUpdates(_) + | ServerGeneral::SetViewDistance(_) + | ServerGeneral::Outcomes(_) + | ServerGeneral::Knockback(_)) + } +} diff --git a/server/src/sys/entity_sync.rs b/server/src/sys/entity_sync.rs index 465a77d728..96c61d0ba4 100644 --- a/server/src/sys/entity_sync.rs +++ b/server/src/sys/entity_sync.rs @@ -3,7 +3,8 @@ use super::{ SysTimer, }; use crate::{ - client::{Client, GeneralStream, InGameStream, RegionSubscription}, + client::{Client, RegionSubscription}, + streams::{GeneralStream, GetStream, InGameStream}, Tick, }; use common::{ @@ -165,7 +166,7 @@ impl<'a> System<'a> for Sys { // Client doesn't need to know about itself && *client_entity != entity { - let _ = general_stream.0.send(create_msg.clone()); + general_stream.send_unchecked(create_msg.clone()); } } } @@ -179,7 +180,7 @@ impl<'a> System<'a> for Sys { .map(|key| !regions.contains(key)) .unwrap_or(true) { - let _ = general_stream.0.send(ServerGeneral::DeleteEntity(uid)); + general_stream.send_unchecked(ServerGeneral::DeleteEntity(uid)); } } } @@ -201,8 +202,8 @@ impl<'a> System<'a> for Sys { subscribers .iter_mut() .for_each(move |(_, _, _, _, _, general_stream)| { - let _ = general_stream.0.send(entity_sync_msg.clone()); - let _ = general_stream.0.send(comp_sync_msg.clone()); + general_stream.send_unchecked(entity_sync_msg.clone()); + general_stream.send_unchecked(comp_sync_msg.clone()); }); let mut send_general = |msg: ServerGeneral, @@ -236,7 +237,7 @@ impl<'a> System<'a> for Sys { true // Closer than 100 blocks } } { - let _ = general_stream.0.send(msg.clone()); + general_stream.send_unchecked(msg.clone()); } } }; @@ -334,9 +335,7 @@ impl<'a> System<'a> for Sys { }) { for uid in &deleted { - let _ = general_stream - .0 - .send(ServerGeneral::DeleteEntity(Uid(*uid))); + general_stream.send_unchecked(ServerGeneral::DeleteEntity(Uid(*uid))); } } } @@ -347,7 +346,7 @@ impl<'a> System<'a> for Sys { for (inventory, update, in_game_stream) in (&inventories, &inventory_updates, &mut in_game_streams).join() { - let _ = in_game_stream.0.send(ServerGeneral::InventoryUpdate( + in_game_stream.send_unchecked(ServerGeneral::InventoryUpdate( inventory.clone(), update.event(), )); @@ -370,7 +369,7 @@ impl<'a> System<'a> for Sys { .cloned() .collect::>(); if !outcomes.is_empty() { - let _ = in_game_stream.0.send(ServerGeneral::Outcomes(outcomes)); + in_game_stream.send_unchecked(ServerGeneral::Outcomes(outcomes)); } } outcomes.clear(); @@ -384,7 +383,7 @@ impl<'a> System<'a> for Sys { // system?) let tof_msg = ServerGeneral::TimeOfDay(*time_of_day); for general_stream in (&mut general_streams).join() { - let _ = general_stream.0.send(tof_msg.clone()); + general_stream.send_unchecked(tof_msg.clone()); } timer.end(); diff --git a/server/src/sys/invite_timeout.rs b/server/src/sys/invite_timeout.rs index 87fa70b25c..fdb1d392f9 100644 --- a/server/src/sys/invite_timeout.rs +++ b/server/src/sys/invite_timeout.rs @@ -1,5 +1,5 @@ use super::SysTimer; -use crate::client::InGameStream; +use crate::streams::{GetStream, InGameStream}; use common::{ comp::group::{Invite, PendingInvites}, msg::{InviteAnswer, ServerGeneral}, @@ -55,7 +55,7 @@ impl<'a> System<'a> for Sys { in_game_streams.get_mut(*inviter), uids.get(invitee).copied(), ) { - let _ = in_game_stream.0.send(ServerGeneral::InviteComplete { + in_game_stream.send_unchecked(ServerGeneral::InviteComplete { target, answer: InviteAnswer::TimedOut, }); diff --git a/server/src/sys/message.rs b/server/src/sys/message.rs deleted file mode 100644 index 39ef2537c2..0000000000 --- a/server/src/sys/message.rs +++ /dev/null @@ -1,764 +0,0 @@ -use super::SysTimer; -use crate::{ - alias_validator::AliasValidator, - character_creator, - client::{ - CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegisterStream, - }, - login_provider::LoginProvider, - metrics::{NetworkRequestMetrics, PlayerMetrics}, - persistence::character_loader::CharacterLoader, - EditableSettings, Settings, -}; -use common::{ - comp::{ - Admin, CanBuild, ChatMode, ChatType, ControlEvent, Controller, ForceUpdate, Ori, Player, - Pos, Stats, UnresolvedChatMsg, Vel, - }, - event::{EventBus, ServerEvent}, - msg::{ - validate_chat_msg, CharacterInfo, ChatMsgValidationError, ClientGeneral, ClientInGame, - ClientRegister, DisconnectReason, PingMsg, PlayerInfo, PlayerListUpdate, RegisterError, - ServerGeneral, ServerRegisterAnswer, MAX_BYTES_CHAT_MSG, - }, - span, - state::{BlockChange, Time}, - sync::Uid, - terrain::{TerrainChunkSize, TerrainGrid}, - vol::{ReadVol, RectVolSize}, -}; -use futures_executor::block_on; -use futures_timer::Delay; -use futures_util::{select, FutureExt}; -use hashbrown::HashMap; -use specs::{ - Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage, -}; -use tracing::{debug, error, info, trace, warn}; - -impl Sys { - #[allow(clippy::too_many_arguments)] - fn handle_client_msg( - server_emitter: &mut common::event::Emitter<'_, ServerEvent>, - new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, - entity: specs::Entity, - client: &mut Client, - general_stream: &mut GeneralStream, - player_metrics: &ReadExpect<'_, PlayerMetrics>, - uids: &ReadStorage<'_, Uid>, - chat_modes: &ReadStorage<'_, ChatMode>, - msg: ClientGeneral, - ) -> Result<(), crate::error::Error> { - match msg { - ClientGeneral::ChatMsg(message) => { - if client.registered { - match validate_chat_msg(&message) { - Ok(()) => { - if let Some(from) = uids.get(entity) { - let mode = chat_modes.get(entity).cloned().unwrap_or_default(); - let msg = mode.new_message(*from, message); - new_chat_msgs.push((Some(entity), msg)); - } else { - error!("Could not send message. Missing player uid"); - } - }, - Err(ChatMsgValidationError::TooLong) => { - let max = MAX_BYTES_CHAT_MSG; - let len = message.len(); - warn!(?len, ?max, "Received a chat message that's too long") - }, - } - } - }, - ClientGeneral::Disconnect => { - general_stream - .0 - .send(ServerGeneral::Disconnect(DisconnectReason::Requested))?; - }, - ClientGeneral::Terminate => { - debug!(?entity, "Client send message to termitate session"); - player_metrics - .clients_disconnected - .with_label_values(&["gracefully"]) - .inc(); - server_emitter.emit(ServerEvent::ClientDisconnect(entity)); - }, - _ => unreachable!("not a client_general msg"), - } - Ok(()) - } - - /* - #[allow(clippy::too_many_arguments)] - fn handle_client_in_game_msg( - server_emitter: &mut common::event::Emitter<'_, ServerEvent>, - entity: specs::Entity, - client: &mut Client, - in_game_stream: &mut InGameStream, - terrain: &ReadExpect<'_, TerrainGrid>, - network_metrics: &ReadExpect<'_, NetworkRequestMetrics>, - can_build: &ReadStorage<'_, CanBuild>, - force_updates: &ReadStorage<'_, ForceUpdate>, - stats: &mut WriteStorage<'_, Stats>, - block_changes: &mut Write<'_, BlockChange>, - positions: &mut WriteStorage<'_, Pos>, - velocities: &mut WriteStorage<'_, Vel>, - orientations: &mut WriteStorage<'_, Ori>, - players: &mut WriteStorage<'_, Player>, - controllers: &mut WriteStorage<'_, Controller>, - settings: &Read<'_, Settings>, - msg: ClientGeneral, - ) -> Result<(), crate::error::Error> { - if client.in_game.is_none() { - debug!(?entity, "client is not in_game, ignoring msg"); - trace!(?msg, "ignored msg content"); - if matches!(msg, ClientGeneral::TerrainChunkRequest{ .. }) { - network_metrics.chunks_request_dropped.inc(); - } - return Ok(()); - } - match msg { - // Go back to registered state (char selection screen) - ClientGeneral::ExitInGame => { - client.in_game = None; - server_emitter.emit(ServerEvent::ExitIngame { entity }); - in_game_stream.0.send(ServerGeneral::ExitInGameSuccess)?; - }, - ClientGeneral::SetViewDistance(view_distance) => { - players.get_mut(entity).map(|player| { - player.view_distance = Some( - settings - .max_view_distance - .map(|max| view_distance.min(max)) - .unwrap_or(view_distance), - ) - }); - - //correct client if its VD is to high - if settings - .max_view_distance - .map(|max| view_distance > max) - .unwrap_or(false) - { - in_game_stream.0.send(ServerGeneral::SetViewDistance( - settings.max_view_distance.unwrap_or(0), - ))?; - } - }, - ClientGeneral::ControllerInputs(inputs) => { - if let Some(ClientInGame::Character) = client.in_game { - if let Some(controller) = controllers.get_mut(entity) { - controller.inputs.update_with_new(inputs); - } - } - }, - ClientGeneral::ControlEvent(event) => { - if let Some(ClientInGame::Character) = client.in_game { - // Skip respawn if client entity is alive - if let ControlEvent::Respawn = event { - if stats.get(entity).map_or(true, |s| !s.is_dead) { - //Todo: comment why return! - return Ok(()); - } - } - if let Some(controller) = controllers.get_mut(entity) { - controller.events.push(event); - } - } - }, - ClientGeneral::ControlAction(event) => { - if let Some(ClientInGame::Character) = client.in_game { - if let Some(controller) = controllers.get_mut(entity) { - controller.actions.push(event); - } - } - }, - ClientGeneral::PlayerPhysics { pos, vel, ori } => { - if let Some(ClientInGame::Character) = client.in_game { - if force_updates.get(entity).is_none() - && stats.get(entity).map_or(true, |s| !s.is_dead) - { - let _ = positions.insert(entity, pos); - let _ = velocities.insert(entity, vel); - let _ = orientations.insert(entity, ori); - } - } - }, - ClientGeneral::BreakBlock(pos) => { - if let Some(block) = can_build.get(entity).and_then(|_| terrain.get(pos).ok()) { - block_changes.set(pos, block.into_vacant()); - } - }, - ClientGeneral::PlaceBlock(pos, block) => { - if can_build.get(entity).is_some() { - block_changes.try_set(pos, block); - } - }, - ClientGeneral::TerrainChunkRequest { key } => { - let in_vd = if let (Some(view_distance), Some(pos)) = ( - players.get(entity).and_then(|p| p.view_distance), - positions.get(entity), - ) { - pos.0.xy().map(|e| e as f64).distance( - key.map(|e| e as f64 + 0.5) * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), - ) < (view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt()) - * TerrainChunkSize::RECT_SIZE.x as f64 - } else { - true - }; - if in_vd { - match terrain.get_key(key) { - Some(chunk) => { - network_metrics.chunks_served_from_memory.inc(); - in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate { - key, - chunk: Ok(Box::new(chunk.clone())), - })? - }, - None => { - network_metrics.chunks_generation_triggered.inc(); - server_emitter.emit(ServerEvent::ChunkRequest(entity, key)) - }, - } - } else { - network_metrics.chunks_request_dropped.inc(); - } - }, - ClientGeneral::UnlockSkill(skill) => { - stats - .get_mut(entity) - .map(|s| s.skill_set.unlock_skill(skill)); - }, - ClientGeneral::RefundSkill(skill) => { - stats - .get_mut(entity) - .map(|s| s.skill_set.refund_skill(skill)); - }, - ClientGeneral::UnlockSkillGroup(skill_group_type) => { - stats - .get_mut(entity) - .map(|s| s.skill_set.unlock_skill_group(skill_group_type)); - }, - _ => unreachable!("not a client_in_game msg"), - } - Ok(()) - } - */ - - #[allow(clippy::too_many_arguments)] - fn handle_client_character_screen_msg( - server_emitter: &mut common::event::Emitter<'_, ServerEvent>, - new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, - entity: specs::Entity, - client: &mut Client, - character_screen_stream: &mut CharacterScreenStream, - character_loader: &ReadExpect<'_, CharacterLoader>, - uids: &ReadStorage<'_, Uid>, - players: &mut WriteStorage<'_, Player>, - editable_settings: &ReadExpect<'_, EditableSettings>, - alias_validator: &ReadExpect<'_, AliasValidator>, - msg: ClientGeneral, - ) -> Result<(), crate::error::Error> { - match msg { - // Request spectator state - ClientGeneral::Spectate if client.registered => { - client.in_game = Some(ClientInGame::Spectator) - }, - ClientGeneral::Spectate => debug!("dropped Spectate msg from unregistered client"), - ClientGeneral::Character(character_id) - if client.registered && client.in_game.is_none() => - { - if let Some(player) = players.get(entity) { - // Send a request to load the character's component data from the - // DB. Once loaded, persisted components such as stats and inventory - // will be inserted for the entity - character_loader.load_character_data( - entity, - player.uuid().to_string(), - character_id, - ); - - // Start inserting non-persisted/default components for the entity - // while we load the DB data - server_emitter.emit(ServerEvent::InitCharacterData { - entity, - character_id, - }); - - // Give the player a welcome message - if !editable_settings.server_description.is_empty() { - character_screen_stream - .0 - .send(ChatType::CommandInfo.server_msg(String::from( - &*editable_settings.server_description, - )))?; - } - - if !client.login_msg_sent { - if let Some(player_uid) = uids.get(entity) { - new_chat_msgs.push((None, UnresolvedChatMsg { - chat_type: ChatType::Online(*player_uid), - message: "".to_string(), - })); - - client.login_msg_sent = true; - } - } - } else { - character_screen_stream - .0 - .send(ServerGeneral::CharacterDataLoadError(String::from( - "Failed to fetch player entity", - )))? - } - } - ClientGeneral::Character(_) => { - let registered = client.registered; - let in_game = client.in_game; - debug!(?registered, ?in_game, "dropped Character msg from client"); - }, - ClientGeneral::RequestCharacterList => { - if let Some(player) = players.get(entity) { - character_loader.load_character_list(entity, player.uuid().to_string()) - } - }, - ClientGeneral::CreateCharacter { alias, tool, body } => { - if let Err(error) = alias_validator.validate(&alias) { - debug!(?error, ?alias, "denied alias as it contained a banned word"); - character_screen_stream - .0 - .send(ServerGeneral::CharacterActionError(error.to_string()))?; - } else if let Some(player) = players.get(entity) { - character_creator::create_character( - entity, - player.uuid().to_string(), - alias, - tool, - body, - character_loader, - ); - } - }, - ClientGeneral::DeleteCharacter(character_id) => { - if let Some(player) = players.get(entity) { - character_loader.delete_character( - entity, - player.uuid().to_string(), - character_id, - ); - } - }, - _ => unreachable!("not a client_character_screen msg"), - } - Ok(()) - } - - #[allow(clippy::too_many_arguments)] - fn handle_ping_msg( - client: &mut Client, - ping_stream: &mut PingStream, - msg: PingMsg, - ) -> Result<(), crate::error::Error> { - match msg { - PingMsg::Ping => ping_stream.0.send(PingMsg::Pong)?, - PingMsg::Pong => {}, - } - Ok(()) - } - - #[allow(clippy::too_many_arguments)] - fn handle_register_msg( - player_list: &HashMap, - new_players: &mut Vec, - entity: specs::Entity, - client: &mut Client, - register_stream: &mut RegisterStream, - player_metrics: &ReadExpect<'_, PlayerMetrics>, - login_provider: &mut WriteExpect<'_, LoginProvider>, - admins: &mut WriteStorage<'_, Admin>, - players: &mut WriteStorage<'_, Player>, - editable_settings: &ReadExpect<'_, EditableSettings>, - msg: ClientRegister, - ) -> Result<(), crate::error::Error> { - let (username, uuid) = match login_provider.try_login( - &msg.token_or_username, - &*editable_settings.admins, - &*editable_settings.whitelist, - &*editable_settings.banlist, - ) { - Err(err) => { - register_stream.0.send(ServerRegisterAnswer::Err(err))?; - return Ok(()); - }, - Ok((username, uuid)) => (username, uuid), - }; - - const INITIAL_VD: Option = Some(5); //will be changed after login - let player = Player::new(username, None, INITIAL_VD, uuid); - let is_admin = editable_settings.admins.contains(&uuid); - - if !player.is_valid() { - // Invalid player - register_stream - .0 - .send(ServerRegisterAnswer::Err(RegisterError::InvalidCharacter))?; - return Ok(()); - } - - if !client.registered && client.in_game.is_none() { - // Add Player component to this client - let _ = players.insert(entity, player); - player_metrics.players_connected.inc(); - - // Give the Admin component to the player if their name exists in - // admin list - if is_admin { - let _ = admins.insert(entity, Admin); - } - - // Tell the client its request was successful. - client.registered = true; - register_stream.0.send(ServerRegisterAnswer::Ok(()))?; - - // Send initial player list - register_stream - .0 - .send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init( - player_list.clone(), - )))?; - - // Add to list to notify all clients of the new player - new_players.push(entity); - } - Ok(()) - } - - ///We needed to move this to a async fn, if we would use a async closures - /// the compiler generates to much recursion and fails to compile this - #[allow(clippy::too_many_arguments)] - async fn handle_messages( - server_emitter: &mut common::event::Emitter<'_, ServerEvent>, - new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, - player_list: &HashMap, - new_players: &mut Vec, - entity: specs::Entity, - client: &mut Client, - cnt: &mut u64, - character_loader: &ReadExpect<'_, CharacterLoader>, - terrain: &ReadExpect<'_, TerrainGrid>, - network_metrics: &ReadExpect<'_, NetworkRequestMetrics>, - player_metrics: &ReadExpect<'_, PlayerMetrics>, - uids: &ReadStorage<'_, Uid>, - can_build: &ReadStorage<'_, CanBuild>, - force_updates: &ReadStorage<'_, ForceUpdate>, - stats: &mut WriteStorage<'_, Stats>, - chat_modes: &ReadStorage<'_, ChatMode>, - login_provider: &mut WriteExpect<'_, LoginProvider>, - block_changes: &mut Write<'_, BlockChange>, - admins: &mut WriteStorage<'_, Admin>, - //positions: &mut WriteStorage<'_, Pos>, - //velocities: &mut WriteStorage<'_, Vel>, - //orientations: &mut WriteStorage<'_, Ori>, - players: &mut WriteStorage<'_, Player>, - general_stream: &mut GeneralStream, - ping_stream: &mut PingStream, - register_stream: &mut RegisterStream, - character_screen_stream: &mut CharacterScreenStream, - //in_game_stream: &mut InGameStream, - controllers: &mut WriteStorage<'_, Controller>, - settings: &Read<'_, Settings>, - editable_settings: &ReadExpect<'_, EditableSettings>, - alias_validator: &ReadExpect<'_, AliasValidator>, - ) -> Result<(), crate::error::Error> { - loop { - if let Some(msg) = general_stream.0.try_recv()? { - Self::handle_client_msg( - server_emitter, - new_chat_msgs, - entity, - client, - general_stream, - player_metrics, - uids, - chat_modes, - msg, - )?; - *cnt += 1; - continue; - } - - if let Some(msg) = ping_stream.0.try_recv()? { - Self::handle_ping_msg(client, ping_stream, msg)?; - *cnt += 1; - continue; - } - - if let Some(msg) = register_stream.0.try_recv()? { - Self::handle_register_msg( - player_list, - new_players, - entity, - client, - register_stream, - player_metrics, - login_provider, - admins, - players, - editable_settings, - msg, - )?; - *cnt += 1; - continue; - } - - if let Some(msg) = character_screen_stream.0.try_recv()? { - Self::handle_client_character_screen_msg( - server_emitter, - new_chat_msgs, - entity, - client, - character_screen_stream, - character_loader, - uids, - players, - editable_settings, - alias_validator, - msg, - )?; - *cnt += 1; - continue; - } - - break Ok(()) - /* - if let Some(msg) = m2 { - client.network_error |= b2; - Self::handle_client_in_game_msg( - server_emitter, - entity, - client, - terrain, - network_metrics, - can_build, - force_updates, - stats, - block_changes, - positions, - velocities, - orientations, - players, - controllers, - settings, - msg?, - )?; - }*/ - } - } -} - -/// This system will handle new messages from clients -pub struct Sys; -impl<'a> System<'a> for Sys { - #[allow(clippy::type_complexity)] // TODO: Pending review in #587 - type SystemData = ( - Entities<'a>, - Read<'a, EventBus>, - Read<'a, Time>, - ReadExpect<'a, CharacterLoader>, - ReadExpect<'a, TerrainGrid>, - ReadExpect<'a, NetworkRequestMetrics>, - ReadExpect<'a, PlayerMetrics>, - Write<'a, SysTimer>, - ReadStorage<'a, Uid>, - ReadStorage<'a, CanBuild>, - ReadStorage<'a, ForceUpdate>, - WriteStorage<'a, Stats>, - ReadStorage<'a, ChatMode>, - WriteExpect<'a, LoginProvider>, - Write<'a, BlockChange>, - WriteStorage<'a, Admin>, - //WriteStorage<'a, Pos>, - //WriteStorage<'a, Vel>, - //WriteStorage<'a, Ori>, - WriteStorage<'a, Player>, - WriteStorage<'a, Client>, - WriteStorage<'a, GeneralStream>, - WriteStorage<'a, PingStream>, - WriteStorage<'a, RegisterStream>, - WriteStorage<'a, CharacterScreenStream>, - //WriteStorage<'a, InGameStream>, - WriteStorage<'a, Controller>, - Read<'a, Settings>, - ReadExpect<'a, EditableSettings>, - ReadExpect<'a, AliasValidator>, - ); - - #[allow(clippy::match_ref_pats)] // TODO: Pending review in #587 - #[allow(clippy::single_char_pattern)] // TODO: Pending review in #587 - #[allow(clippy::single_match)] // TODO: Pending review in #587 - fn run( - &mut self, - ( - entities, - server_event_bus, - time, - character_loader, - terrain, - network_metrics, - player_metrics, - mut timer, - uids, - can_build, - force_updates, - mut stats, - chat_modes, - mut accounts, - mut block_changes, - mut admins, - //mut positions, - //mut velocities, - //mut orientations, - mut players, - mut clients, - mut general_streams, - mut ping_streams, - mut register_streams, - mut character_screen_streams, - //mut in_game_streams, - mut controllers, - settings, - editable_settings, - alias_validator, - ): Self::SystemData, - ) { - span!(_guard, "run", "message::Sys::run"); - timer.start(); - - let mut server_emitter = server_event_bus.emitter(); - - let mut new_chat_msgs = Vec::new(); - - // Player list to send new players. - let player_list = (&uids, &players, stats.maybe(), admins.maybe()) - .join() - .map(|(uid, player, stats, admin)| { - (*uid, PlayerInfo { - is_online: true, - is_admin: admin.is_some(), - player_alias: player.alias.clone(), - character: stats.map(|stats| CharacterInfo { - name: stats.name.clone(), - level: stats.level.level(), - }), - }) - }) - .collect::>(); - // List of new players to update player lists of all clients. - let mut new_players = Vec::new(); - - for (entity, client, general_stream, ping_stream, register_stream, character_screen_stream) in (&entities, &mut clients, &mut general_streams, &mut ping_streams, &mut register_streams, &mut character_screen_streams).join() { - let mut cnt = 0; - - let network_err: Result<(), crate::error::Error> = block_on(async { - //TIMEOUT 0.02 ms for msg handling - let work_future = Self::handle_messages( - &mut server_emitter, - &mut new_chat_msgs, - &player_list, - &mut new_players, - entity, - client, - &mut cnt, - &character_loader, - &terrain, - &network_metrics, - &player_metrics, - &uids, - &can_build, - &force_updates, - &mut stats, - &chat_modes, - &mut accounts, - &mut block_changes, - &mut admins, - //&mut positions, - //&mut velocities, - //&mut orientations, - &mut players, - general_stream, - ping_stream, - register_stream, - character_screen_stream, - &mut controllers, - &settings, - &editable_settings, - &alias_validator, - ); - select!( - _ = Delay::new(std::time::Duration::from_micros(20)).fuse() => Ok(()), - err = work_future.fuse() => err, - ) - }); - - // Network error - if network_err.is_err() { - debug!(?entity, "postbox error with client, disconnecting"); - player_metrics - .clients_disconnected - .with_label_values(&["network_error"]) - .inc(); - server_emitter.emit(ServerEvent::ClientDisconnect(entity)); - } else if cnt > 0 { - // Update client ping. - client.last_ping = time.0 - } else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 - // Timeout - { - info!(?entity, "timeout error with client, disconnecting"); - player_metrics - .clients_disconnected - .with_label_values(&["timeout"]) - .inc(); - server_emitter.emit(ServerEvent::ClientDisconnect(entity)); - } else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 * 0.5 { - // Try pinging the client if the timeout is nearing. - //FIXME - //client.send_msg(PingMsg::Ping); - } - } - - // Handle new players. - // Tell all clients to add them to the player list. - for entity in new_players { - if let (Some(uid), Some(player)) = (uids.get(entity), players.get(entity)) { - let msg = - ServerGeneral::PlayerListUpdate(PlayerListUpdate::Add(*uid, PlayerInfo { - player_alias: player.alias.clone(), - is_online: true, - is_admin: admins.get(entity).is_some(), - character: None, // new players will be on character select. - })); - for client in (&mut clients).join().filter(|c| c.registered) { - //FIXME - //client.send_msg(msg.clone()) - } - } - } - - // Handle new chat messages. - for (entity, msg) in new_chat_msgs { - // Handle chat commands. - if msg.message.starts_with("/") { - if let (Some(entity), true) = (entity, msg.message.len() > 1) { - let argv = String::from(&msg.message[1..]); - server_emitter.emit(ServerEvent::ChatCmd(entity, argv)); - } - } else { - // Send chat message - server_emitter.emit(ServerEvent::Chat(msg)); - } - } - - timer.end() - } -} diff --git a/server/src/sys/mod.rs b/server/src/sys/mod.rs index 6dba8a0f3c..7e34ea933b 100644 --- a/server/src/sys/mod.rs +++ b/server/src/sys/mod.rs @@ -1,7 +1,6 @@ pub mod entity_sync; pub mod invite_timeout; pub mod msg; -pub mod message; pub mod object; pub mod persistence; pub mod sentinel; @@ -17,7 +16,11 @@ use std::{ }; pub type EntitySyncTimer = SysTimer; -pub type MessageTimer = SysTimer; +pub type GeneralMsgTimer = SysTimer; +pub type PingMsgTimer = SysTimer; +pub type RegisterMsgTimer = SysTimer; +pub type CharacterScreenMsgTimer = SysTimer; +pub type InGameMsgTimer = SysTimer; pub type SentinelTimer = SysTimer; pub type SubscriptionTimer = SysTimer; pub type TerrainTimer = SysTimer; diff --git a/server/src/sys/msg/character_screen.rs b/server/src/sys/msg/character_screen.rs new file mode 100644 index 0000000000..8444886a62 --- /dev/null +++ b/server/src/sys/msg/character_screen.rs @@ -0,0 +1,224 @@ +use super::super::SysTimer; +use crate::{ + alias_validator::AliasValidator, + character_creator, + client::Client, + persistence::character_loader::CharacterLoader, + streams::{CharacterScreenStream, GeneralStream, GetStream}, + EditableSettings, +}; +use common::{ + comp::{ChatType, Player, UnresolvedChatMsg}, + event::{EventBus, ServerEvent}, + msg::{ClientGeneral, ClientInGame, ServerGeneral}, + span, + state::Time, + sync::Uid, +}; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage}; +use tracing::debug; + +impl Sys { + #[allow(clippy::too_many_arguments)] + fn handle_client_character_screen_msg( + server_emitter: &mut common::event::Emitter<'_, ServerEvent>, + new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, + entity: specs::Entity, + client: &mut Client, + character_screen_stream: &mut CharacterScreenStream, + general_stream: &mut GeneralStream, + character_loader: &ReadExpect<'_, CharacterLoader>, + uids: &ReadStorage<'_, Uid>, + players: &ReadStorage<'_, Player>, + editable_settings: &ReadExpect<'_, EditableSettings>, + alias_validator: &ReadExpect<'_, AliasValidator>, + msg: ClientGeneral, + ) -> Result<(), crate::error::Error> { + match msg { + // Request spectator state + ClientGeneral::Spectate if client.registered => { + client.in_game = Some(ClientInGame::Spectator) + }, + ClientGeneral::Spectate => debug!("dropped Spectate msg from unregistered client"), + ClientGeneral::Character(character_id) + if client.registered && client.in_game.is_none() => + { + if let Some(player) = players.get(entity) { + // Send a request to load the character's component data from the + // DB. Once loaded, persisted components such as stats and inventory + // will be inserted for the entity + character_loader.load_character_data( + entity, + player.uuid().to_string(), + character_id, + ); + + // Start inserting non-persisted/default components for the entity + // while we load the DB data + server_emitter.emit(ServerEvent::InitCharacterData { + entity, + character_id, + }); + + // Give the player a welcome message + if !editable_settings.server_description.is_empty() { + general_stream + .send(ChatType::CommandInfo.server_msg(String::from( + &*editable_settings.server_description, + )))?; + } + + if !client.login_msg_sent { + if let Some(player_uid) = uids.get(entity) { + new_chat_msgs.push((None, UnresolvedChatMsg { + chat_type: ChatType::Online(*player_uid), + message: "".to_string(), + })); + + client.login_msg_sent = true; + } + } + } else { + character_screen_stream.send(ServerGeneral::CharacterDataLoadError( + String::from("Failed to fetch player entity"), + ))? + } + } + ClientGeneral::Character(_) => { + let registered = client.registered; + let in_game = client.in_game; + debug!(?registered, ?in_game, "dropped Character msg from client"); + }, + ClientGeneral::RequestCharacterList => { + if let Some(player) = players.get(entity) { + character_loader.load_character_list(entity, player.uuid().to_string()) + } + }, + ClientGeneral::CreateCharacter { alias, tool, body } => { + if let Err(error) = alias_validator.validate(&alias) { + debug!(?error, ?alias, "denied alias as it contained a banned word"); + character_screen_stream + .send(ServerGeneral::CharacterActionError(error.to_string()))?; + } else if let Some(player) = players.get(entity) { + character_creator::create_character( + entity, + player.uuid().to_string(), + alias, + tool, + body, + character_loader, + ); + } + }, + ClientGeneral::DeleteCharacter(character_id) => { + if let Some(player) = players.get(entity) { + character_loader.delete_character( + entity, + player.uuid().to_string(), + character_id, + ); + } + }, + _ => unreachable!("not a client_character_screen msg"), + } + Ok(()) + } +} + +/// This system will handle new messages from clients +pub struct Sys; +impl<'a> System<'a> for Sys { + #[allow(clippy::type_complexity)] // TODO: Pending review in #587 + type SystemData = ( + Entities<'a>, + Read<'a, EventBus>, + Read<'a, Time>, + ReadExpect<'a, CharacterLoader>, + Write<'a, SysTimer>, + ReadStorage<'a, Uid>, + WriteStorage<'a, Client>, + ReadStorage<'a, Player>, + WriteStorage<'a, CharacterScreenStream>, + WriteStorage<'a, GeneralStream>, + ReadExpect<'a, EditableSettings>, + ReadExpect<'a, AliasValidator>, + ); + + #[allow(clippy::match_ref_pats)] // TODO: Pending review in #587 + #[allow(clippy::single_char_pattern)] // TODO: Pending review in #587 + #[allow(clippy::single_match)] // TODO: Pending review in #587 + fn run( + &mut self, + ( + entities, + server_event_bus, + time, + character_loader, + mut timer, + uids, + mut clients, + players, + mut character_screen_streams, + mut general_streams, + editable_settings, + alias_validator, + ): Self::SystemData, + ) { + span!(_guard, "run", "msg::character_screen::Sys::run"); + timer.start(); + + let mut server_emitter = server_event_bus.emitter(); + let mut new_chat_msgs = Vec::new(); + + for (entity, client, character_screen_stream, general_stream) in ( + &entities, + &mut clients, + &mut character_screen_streams, + &mut general_streams, + ) + .join() + { + let res = + super::try_recv_all(character_screen_stream, |character_screen_stream, msg| { + Self::handle_client_character_screen_msg( + &mut server_emitter, + &mut new_chat_msgs, + entity, + client, + character_screen_stream, + general_stream, + &character_loader, + &uids, + &players, + &editable_settings, + &alias_validator, + msg, + ) + }); + + match res { + Ok(1_u64..=u64::MAX) => { + // Update client ping. + client.last_ping = time.0 + }, + _ => (/*handled by ping*/), + } + } + + // Handle new chat messages. + for (entity, msg) in new_chat_msgs { + // Handle chat commands. + if msg.message.starts_with("/") { + if let (Some(entity), true) = (entity, msg.message.len() > 1) { + let argv = String::from(&msg.message[1..]); + server_emitter.emit(ServerEvent::ChatCmd(entity, argv)); + } + } else { + // Send chat message + server_emitter.emit(ServerEvent::Chat(msg)); + } + } + + timer.end() + } +} diff --git a/server/src/sys/msg/general.rs b/server/src/sys/msg/general.rs new file mode 100644 index 0000000000..19a2fb2dd6 --- /dev/null +++ b/server/src/sys/msg/general.rs @@ -0,0 +1,153 @@ +use super::super::SysTimer; +use crate::{ + client::Client, + metrics::PlayerMetrics, + streams::{GeneralStream, GetStream}, +}; +use common::{ + comp::{ChatMode, UnresolvedChatMsg}, + event::{EventBus, ServerEvent}, + msg::{ + validate_chat_msg, ChatMsgValidationError, ClientGeneral, DisconnectReason, ServerGeneral, + MAX_BYTES_CHAT_MSG, + }, + span, + state::Time, + sync::Uid, +}; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage}; +use tracing::{debug, error, warn}; + +impl Sys { + #[allow(clippy::too_many_arguments)] + fn handle_general_msg( + server_emitter: &mut common::event::Emitter<'_, ServerEvent>, + new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, + entity: specs::Entity, + client: &mut Client, + general_stream: &mut GeneralStream, + player_metrics: &ReadExpect<'_, PlayerMetrics>, + uids: &ReadStorage<'_, Uid>, + chat_modes: &ReadStorage<'_, ChatMode>, + msg: ClientGeneral, + ) -> Result<(), crate::error::Error> { + match msg { + ClientGeneral::ChatMsg(message) => { + if client.registered { + match validate_chat_msg(&message) { + Ok(()) => { + if let Some(from) = uids.get(entity) { + let mode = chat_modes.get(entity).cloned().unwrap_or_default(); + let msg = mode.new_message(*from, message); + new_chat_msgs.push((Some(entity), msg)); + } else { + error!("Could not send message. Missing player uid"); + } + }, + Err(ChatMsgValidationError::TooLong) => { + let max = MAX_BYTES_CHAT_MSG; + let len = message.len(); + warn!(?len, ?max, "Received a chat message that's too long") + }, + } + } + }, + ClientGeneral::Disconnect => { + general_stream.send(ServerGeneral::Disconnect(DisconnectReason::Requested))?; + }, + ClientGeneral::Terminate => { + debug!(?entity, "Client send message to termitate session"); + player_metrics + .clients_disconnected + .with_label_values(&["gracefully"]) + .inc(); + server_emitter.emit(ServerEvent::ClientDisconnect(entity)); + }, + _ => unreachable!("not a client_general msg"), + } + Ok(()) + } +} + +/// This system will handle new messages from clients +pub struct Sys; +impl<'a> System<'a> for Sys { + #[allow(clippy::type_complexity)] // TODO: Pending review in #587 + type SystemData = ( + Entities<'a>, + Read<'a, EventBus>, + Read<'a, Time>, + ReadExpect<'a, PlayerMetrics>, + Write<'a, SysTimer>, + ReadStorage<'a, Uid>, + ReadStorage<'a, ChatMode>, + WriteStorage<'a, Client>, + WriteStorage<'a, GeneralStream>, + ); + + #[allow(clippy::match_ref_pats)] // TODO: Pending review in #587 + #[allow(clippy::single_char_pattern)] // TODO: Pending review in #587 + #[allow(clippy::single_match)] // TODO: Pending review in #587 + fn run( + &mut self, + ( + entities, + server_event_bus, + time, + player_metrics, + mut timer, + uids, + chat_modes, + mut clients, + mut general_streams, + ): Self::SystemData, + ) { + span!(_guard, "run", "msg::general::Sys::run"); + timer.start(); + + let mut server_emitter = server_event_bus.emitter(); + let mut new_chat_msgs = Vec::new(); + + for (entity, client, general_stream) in + (&entities, &mut clients, &mut general_streams).join() + { + let res = super::try_recv_all(general_stream, |general_stream, msg| { + Self::handle_general_msg( + &mut server_emitter, + &mut new_chat_msgs, + entity, + client, + general_stream, + &player_metrics, + &uids, + &chat_modes, + msg, + ) + }); + + match res { + Ok(1_u64..=u64::MAX) => { + // Update client ping. + client.last_ping = time.0 + }, + _ => (/*handled by ping*/), + } + } + + // Handle new chat messages. + for (entity, msg) in new_chat_msgs { + // Handle chat commands. + if msg.message.starts_with("/") { + if let (Some(entity), true) = (entity, msg.message.len() > 1) { + let argv = String::from(&msg.message[1..]); + server_emitter.emit(ServerEvent::ChatCmd(entity, argv)); + } + } else { + // Send chat message + server_emitter.emit(ServerEvent::Chat(msg)); + } + } + + timer.end() + } +} diff --git a/server/src/sys/msg/in_game.rs b/server/src/sys/msg/in_game.rs index 0090f7603b..ed3356d38a 100644 --- a/server/src/sys/msg/in_game.rs +++ b/server/src/sys/msg/in_game.rs @@ -1,33 +1,20 @@ use super::super::SysTimer; use crate::{ - alias_validator::AliasValidator, - client::{ - Client, InGameStream, - }, - login_provider::LoginProvider, - metrics::{NetworkRequestMetrics, PlayerMetrics}, - persistence::character_loader::CharacterLoader, - EditableSettings, Settings, + client::Client, + metrics::NetworkRequestMetrics, + streams::{GetStream, InGameStream}, + Settings, }; use common::{ - comp::{ - Admin, CanBuild, ChatMode, ControlEvent, Controller, ForceUpdate, Ori, Player, - Pos, Stats, Vel, - }, + comp::{CanBuild, ControlEvent, Controller, ForceUpdate, Ori, Player, Pos, Stats, Vel}, event::{EventBus, ServerEvent}, - msg::{ - ClientGeneral, ClientInGame, - ServerGeneral, - }, + msg::{ClientGeneral, ClientInGame, ServerGeneral}, span, state::{BlockChange, Time}, - sync::Uid, terrain::{TerrainChunkSize, TerrainGrid}, vol::{ReadVol, RectVolSize}, }; -use specs::{ - Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage, -}; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage}; use tracing::{debug, trace}; impl Sys { @@ -64,7 +51,7 @@ impl Sys { ClientGeneral::ExitInGame => { client.in_game = None; server_emitter.emit(ServerEvent::ExitIngame { entity }); - in_game_stream.0.send(ServerGeneral::ExitInGameSuccess)?; + in_game_stream.send(ServerGeneral::ExitInGameSuccess)?; }, ClientGeneral::SetViewDistance(view_distance) => { players.get_mut(entity).map(|player| { @@ -82,7 +69,7 @@ impl Sys { .map(|max| view_distance > max) .unwrap_or(false) { - in_game_stream.0.send(ServerGeneral::SetViewDistance( + in_game_stream.send(ServerGeneral::SetViewDistance( settings.max_view_distance.unwrap_or(0), ))?; } @@ -152,7 +139,7 @@ impl Sys { match terrain.get_key(key) { Some(chunk) => { network_metrics.chunks_served_from_memory.inc(); - in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate { + in_game_stream.send(ServerGeneral::TerrainChunkUpdate { key, chunk: Ok(Box::new(chunk.clone())), })? @@ -195,19 +182,13 @@ impl<'a> System<'a> for Sys { Entities<'a>, Read<'a, EventBus>, Read<'a, Time>, - ReadExpect<'a, CharacterLoader>, ReadExpect<'a, TerrainGrid>, ReadExpect<'a, NetworkRequestMetrics>, - ReadExpect<'a, PlayerMetrics>, Write<'a, SysTimer>, - ReadStorage<'a, Uid>, ReadStorage<'a, CanBuild>, ReadStorage<'a, ForceUpdate>, WriteStorage<'a, Stats>, - ReadStorage<'a, ChatMode>, - WriteExpect<'a, LoginProvider>, Write<'a, BlockChange>, - WriteStorage<'a, Admin>, WriteStorage<'a, Pos>, WriteStorage<'a, Vel>, WriteStorage<'a, Ori>, @@ -216,8 +197,6 @@ impl<'a> System<'a> for Sys { WriteStorage<'a, InGameStream>, WriteStorage<'a, Controller>, Read<'a, Settings>, - ReadExpect<'a, EditableSettings>, - ReadExpect<'a, AliasValidator>, ); #[allow(clippy::match_ref_pats)] // TODO: Pending review in #587 @@ -229,19 +208,13 @@ impl<'a> System<'a> for Sys { entities, server_event_bus, time, - character_loader, terrain, network_metrics, - player_metrics, mut timer, - uids, can_build, force_updates, mut stats, - chat_modes, - mut accounts, mut block_changes, - mut admins, mut positions, mut velocities, mut orientations, @@ -250,8 +223,6 @@ impl<'a> System<'a> for Sys { mut in_game_streams, mut controllers, settings, - editable_settings, - alias_validator, ): Self::SystemData, ) { span!(_guard, "run", "msg::in_game::Sys::run"); @@ -259,43 +230,37 @@ impl<'a> System<'a> for Sys { let mut server_emitter = server_event_bus.emitter(); - for (entity, client, mut in_game_stream) in (&entities, &mut clients, &mut in_game_streams).join() { - let mut cnt = 0; + for (entity, client, in_game_stream) in + (&entities, &mut clients, &mut in_game_streams).join() + { + let res = super::try_recv_all(in_game_stream, |in_game_stream, msg| { + Self::handle_client_in_game_msg( + &mut server_emitter, + entity, + client, + in_game_stream, + &terrain, + &network_metrics, + &can_build, + &force_updates, + &mut stats, + &mut block_changes, + &mut positions, + &mut velocities, + &mut orientations, + &mut players, + &mut controllers, + &settings, + msg, + ) + }); - let network_err: Result<(), crate::error::Error> = { - loop { - let msg = match in_game_stream.0.try_recv() { - Ok(Some(msg)) => msg, - Ok(None) => break Ok(()), - Err(e) => break Err(e.into()), - }; - if let Err(e) = Self::handle_client_in_game_msg( - &mut server_emitter, - entity, - client, - &mut in_game_stream, - &terrain, - &network_metrics, - &can_build, - &force_updates, - &mut stats, - &mut block_changes, - &mut positions, - &mut velocities, - &mut orientations, - &mut players, - &mut controllers, - &settings, - msg, - ) { - break Err(e); - } - cnt += 1; - } - }; - - if cnt > 0 { // Update client ping. - client.last_ping = time.0 + match res { + Ok(1_u64..=u64::MAX) => { + // Update client ping. + client.last_ping = time.0 + }, + _ => (/*handled by ping*/), } } diff --git a/server/src/sys/msg/mod.rs b/server/src/sys/msg/mod.rs index ee8449ae96..a95ef02a94 100644 --- a/server/src/sys/msg/mod.rs +++ b/server/src/sys/msg/mod.rs @@ -1 +1,28 @@ -mod in_game; \ No newline at end of file +pub mod character_screen; +pub mod general; +pub mod in_game; +pub mod ping; +pub mod register; + +use crate::streams::GetStream; + +/// handles all send msg and calls a handle fn +/// Aborts when a error occurred returns cnt of successful msg otherwise +pub(crate) fn try_recv_all(stream: &mut T, mut f: F) -> Result +where + T: GetStream, + F: FnMut(&mut T, T::RecvMsg) -> Result<(), crate::error::Error>, +{ + let mut cnt = 0u64; + loop { + let msg = match stream.get_mut().try_recv() { + Ok(Some(msg)) => msg, + Ok(None) => break Ok(cnt), + Err(e) => break Err(e.into()), + }; + if let Err(e) = f(stream, msg) { + break Err(e); + } + cnt += 1; + } +} diff --git a/server/src/sys/msg/ping.rs b/server/src/sys/msg/ping.rs new file mode 100644 index 0000000000..b5b93bdc66 --- /dev/null +++ b/server/src/sys/msg/ping.rs @@ -0,0 +1,106 @@ +use super::super::SysTimer; +use crate::{ + client::Client, + metrics::PlayerMetrics, + streams::{GetStream, PingStream}, + Settings, +}; +use common::{ + event::{EventBus, ServerEvent}, + msg::PingMsg, + span, + state::Time, +}; +use specs::{Entities, Join, Read, ReadExpect, System, Write, WriteStorage}; +use tracing::{debug, info}; + +impl Sys { + fn handle_ping_msg( + ping_stream: &mut PingStream, + msg: PingMsg, + ) -> Result<(), crate::error::Error> { + match msg { + PingMsg::Ping => ping_stream.send(PingMsg::Pong)?, + PingMsg::Pong => {}, + } + Ok(()) + } +} + +/// This system will handle new messages from clients +pub struct Sys; +impl<'a> System<'a> for Sys { + #[allow(clippy::type_complexity)] // TODO: Pending review in #587 + type SystemData = ( + Entities<'a>, + Read<'a, EventBus>, + Read<'a, Time>, + ReadExpect<'a, PlayerMetrics>, + Write<'a, SysTimer>, + WriteStorage<'a, Client>, + WriteStorage<'a, PingStream>, + Read<'a, Settings>, + ); + + #[allow(clippy::match_ref_pats)] // TODO: Pending review in #587 + #[allow(clippy::single_char_pattern)] // TODO: Pending review in #587 + #[allow(clippy::single_match)] // TODO: Pending review in #587 + fn run( + &mut self, + ( + entities, + server_event_bus, + time, + player_metrics, + mut timer, + mut clients, + mut ping_streams, + settings, + ): Self::SystemData, + ) { + span!(_guard, "run", "msg::ping::Sys::run"); + timer.start(); + + let mut server_emitter = server_event_bus.emitter(); + + for (entity, client, ping_stream) in (&entities, &mut clients, &mut ping_streams).join() { + let res = super::try_recv_all(ping_stream, |ping_stream, msg| { + Self::handle_ping_msg(ping_stream, msg) + }); + + match res { + Err(e) => { + debug!(?entity, ?e, "network error with client, disconnecting"); + player_metrics + .clients_disconnected + .with_label_values(&["network_error"]) + .inc(); + server_emitter.emit(ServerEvent::ClientDisconnect(entity)); + }, + Ok(1_u64..=u64::MAX) => { + // Update client ping. + client.last_ping = time.0 + }, + Ok(0) => { + if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 + // Timeout + { + info!(?entity, "timeout error with client, disconnecting"); + player_metrics + .clients_disconnected + .with_label_values(&["timeout"]) + .inc(); + server_emitter.emit(ServerEvent::ClientDisconnect(entity)); + } else if time.0 - client.last_ping + > settings.client_timeout.as_secs() as f64 * 0.5 + { + // Try pinging the client if the timeout is nearing. + ping_stream.send_unchecked(PingMsg::Ping); + } + }, + } + } + + timer.end() + } +} diff --git a/server/src/sys/msg/register.rs b/server/src/sys/msg/register.rs new file mode 100644 index 0000000000..437243ab52 --- /dev/null +++ b/server/src/sys/msg/register.rs @@ -0,0 +1,208 @@ +use super::super::SysTimer; +use crate::{ + client::Client, + login_provider::LoginProvider, + metrics::PlayerMetrics, + streams::{GeneralStream, GetStream, RegisterStream}, + EditableSettings, +}; +use common::{ + comp::{Admin, Player, Stats}, + msg::{ + CharacterInfo, ClientRegister, PlayerInfo, PlayerListUpdate, RegisterError, ServerGeneral, + ServerRegisterAnswer, + }, + span, + state::Time, + sync::Uid, +}; +use hashbrown::HashMap; +use specs::{ + Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage, +}; + +impl Sys { + #[allow(clippy::too_many_arguments)] + fn handle_register_msg( + player_list: &HashMap, + new_players: &mut Vec, + entity: specs::Entity, + client: &mut Client, + register_stream: &mut RegisterStream, + general_stream: &mut GeneralStream, + player_metrics: &ReadExpect<'_, PlayerMetrics>, + login_provider: &mut WriteExpect<'_, LoginProvider>, + admins: &mut WriteStorage<'_, Admin>, + players: &mut WriteStorage<'_, Player>, + editable_settings: &ReadExpect<'_, EditableSettings>, + msg: ClientRegister, + ) -> Result<(), crate::error::Error> { + let (username, uuid) = match login_provider.try_login( + &msg.token_or_username, + &*editable_settings.admins, + &*editable_settings.whitelist, + &*editable_settings.banlist, + ) { + Err(err) => { + register_stream.send(ServerRegisterAnswer::Err(err))?; + return Ok(()); + }, + Ok((username, uuid)) => (username, uuid), + }; + + const INITIAL_VD: Option = Some(5); //will be changed after login + let player = Player::new(username, None, INITIAL_VD, uuid); + let is_admin = editable_settings.admins.contains(&uuid); + + if !player.is_valid() { + // Invalid player + register_stream.send(ServerRegisterAnswer::Err(RegisterError::InvalidCharacter))?; + return Ok(()); + } + + if !client.registered && client.in_game.is_none() { + // Add Player component to this client + let _ = players.insert(entity, player); + player_metrics.players_connected.inc(); + + // Give the Admin component to the player if their name exists in + // admin list + if is_admin { + let _ = admins.insert(entity, Admin); + } + + // Tell the client its request was successful. + client.registered = true; + register_stream.send(ServerRegisterAnswer::Ok(()))?; + + // Send initial player list + general_stream.send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init( + player_list.clone(), + )))?; + + // Add to list to notify all clients of the new player + new_players.push(entity); + } + Ok(()) + } +} + +/// This system will handle new messages from clients +pub struct Sys; +impl<'a> System<'a> for Sys { + #[allow(clippy::type_complexity)] // TODO: Pending review in #587 + type SystemData = ( + Entities<'a>, + Read<'a, Time>, + ReadExpect<'a, PlayerMetrics>, + Write<'a, SysTimer>, + ReadStorage<'a, Uid>, + WriteStorage<'a, Client>, + WriteStorage<'a, Player>, + ReadStorage<'a, Stats>, + WriteExpect<'a, LoginProvider>, + WriteStorage<'a, Admin>, + WriteStorage<'a, RegisterStream>, + WriteStorage<'a, GeneralStream>, + ReadExpect<'a, EditableSettings>, + ); + + #[allow(clippy::match_ref_pats)] // TODO: Pending review in #587 + #[allow(clippy::single_char_pattern)] // TODO: Pending review in #587 + #[allow(clippy::single_match)] // TODO: Pending review in #587 + fn run( + &mut self, + ( + entities, + time, + player_metrics, + mut timer, + uids, + mut clients, + mut players, + stats, + mut login_provider, + mut admins, + mut register_streams, + mut general_streams, + editable_settings, + ): Self::SystemData, + ) { + span!(_guard, "run", "msg::register::Sys::run"); + timer.start(); + + // Player list to send new players. + let player_list = (&uids, &players, stats.maybe(), admins.maybe()) + .join() + .map(|(uid, player, stats, admin)| { + (*uid, PlayerInfo { + is_online: true, + is_admin: admin.is_some(), + player_alias: player.alias.clone(), + character: stats.map(|stats| CharacterInfo { + name: stats.name.clone(), + level: stats.level.level(), + }), + }) + }) + .collect::>(); + // List of new players to update player lists of all clients. + let mut new_players = Vec::new(); + + for (entity, client, register_stream, general_stream) in ( + &entities, + &mut clients, + &mut register_streams, + &mut general_streams, + ) + .join() + { + let res = super::try_recv_all(register_stream, |register_stream, msg| { + Self::handle_register_msg( + &player_list, + &mut new_players, + entity, + client, + register_stream, + general_stream, + &player_metrics, + &mut login_provider, + &mut admins, + &mut players, + &editable_settings, + msg, + ) + }); + + match res { + Ok(1_u64..=u64::MAX) => { + // Update client ping. + client.last_ping = time.0 + }, + _ => (/*handled by ping*/), + } + } + + // Handle new players. + // Tell all clients to add them to the player list. + for entity in new_players { + if let (Some(uid), Some(player)) = (uids.get(entity), players.get(entity)) { + let msg = + ServerGeneral::PlayerListUpdate(PlayerListUpdate::Add(*uid, PlayerInfo { + player_alias: player.alias.clone(), + is_online: true, + is_admin: admins.get(entity).is_some(), + character: None, // new players will be on character select. + })); + for (_, general_stream) in (&mut clients, &mut general_streams) + .join() + .filter(|(c, _)| c.registered) + { + let _ = general_stream.send(msg.clone()); + } + } + } + + timer.end() + } +} diff --git a/server/src/sys/subscription.rs b/server/src/sys/subscription.rs index 5fd1611ae0..ce7c3567b1 100644 --- a/server/src/sys/subscription.rs +++ b/server/src/sys/subscription.rs @@ -2,7 +2,10 @@ use super::{ sentinel::{DeletedEntities, TrackedComps}, SysTimer, }; -use crate::client::{self, Client, InGameStream, RegionSubscription}; +use crate::{ + client::{self, Client, RegionSubscription}, + streams::{GeneralStream, GetStream}, +}; use common::{ comp::{Ori, Player, Pos, Vel}, msg::ServerGeneral, @@ -33,7 +36,7 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Ori>, ReadStorage<'a, Player>, ReadStorage<'a, Client>, - WriteStorage<'a, InGameStream>, + WriteStorage<'a, GeneralStream>, WriteStorage<'a, RegionSubscription>, Write<'a, DeletedEntities>, TrackedComps<'a>, @@ -52,7 +55,7 @@ impl<'a> System<'a> for Sys { orientations, players, clients, - mut in_game_streams, + mut general_streams, mut subscriptions, mut deleted_entities, tracked_comps, @@ -73,13 +76,13 @@ impl<'a> System<'a> for Sys { // 7. Determine list of regions that are in range and iterate through it // - check if in hashset (hash calc) if not add it let mut regions_to_remove = Vec::new(); - for (_, subscription, pos, vd, client_entity, in_game_stream) in ( + for (_, subscription, pos, vd, client_entity, general_stream) in ( &clients, &mut subscriptions, &positions, &players, &entities, - &mut in_game_streams, + &mut general_streams, ) .join() .filter_map(|(client, s, pos, player, e, stream)| { @@ -156,9 +159,8 @@ impl<'a> System<'a> for Sys { .map(|key| subscription.regions.contains(key)) .unwrap_or(false) { - let _ = in_game_stream - .0 - .send(ServerGeneral::DeleteEntity(uid)); + general_stream + .send_unchecked(ServerGeneral::DeleteEntity(uid)); } } }, @@ -166,7 +168,7 @@ impl<'a> System<'a> for Sys { } // Tell client to delete entities in the region for (&uid, _) in (&uids, region.entities()).join() { - let _ = in_game_stream.0.send(ServerGeneral::DeleteEntity(uid)); + let _ = general_stream.send(ServerGeneral::DeleteEntity(uid)); } } // Send deleted entities since they won't be processed for this client in entity @@ -176,9 +178,7 @@ impl<'a> System<'a> for Sys { .iter() .flat_map(|v| v.iter()) { - let _ = in_game_stream - .0 - .send(ServerGeneral::DeleteEntity(Uid(*uid))); + general_stream.send_unchecked(ServerGeneral::DeleteEntity(Uid(*uid))); } } @@ -203,7 +203,7 @@ impl<'a> System<'a> for Sys { { // Send message to create entity and tracked components and physics // components - let _ = in_game_stream.0.send(ServerGeneral::CreateEntity( + general_stream.send_unchecked(ServerGeneral::CreateEntity( tracked_comps.create_entity_package( entity, Some(*pos), @@ -224,14 +224,14 @@ impl<'a> System<'a> for Sys { /// Initialize region subscription pub fn initialize_region_subscription(world: &World, entity: specs::Entity) { - if let (Some(client_pos), Some(client_vd), Some(in_game_stream)) = ( + if let (Some(client_pos), Some(client_vd), Some(general_stream)) = ( world.read_storage::().get(entity), world .read_storage::() .get(entity) .map(|pl| pl.view_distance) .and_then(|v| v), - world.write_storage::().get_mut(entity), + world.write_storage::().get_mut(entity), ) { let fuzzy_chunk = (Vec2::::from(client_pos.0)) .map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32); @@ -256,7 +256,7 @@ pub fn initialize_region_subscription(world: &World, entity: specs::Entity) { .join() { // Send message to create entity and tracked components and physics components - let _ = in_game_stream.0.send(ServerGeneral::CreateEntity( + general_stream.send_unchecked(ServerGeneral::CreateEntity( tracked_comps.create_entity_package( entity, Some(*pos), diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 8a359e2423..07f11a19db 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -1,5 +1,9 @@ use super::SysTimer; -use crate::{chunk_generator::ChunkGenerator, client::InGameStream, Tick}; +use crate::{ + chunk_generator::ChunkGenerator, + streams::{GetStream, InGameStream}, + Tick, +}; use common::{ comp::{self, bird_medium, Alignment, Player, Pos}, event::{EventBus, ServerEvent}, @@ -63,7 +67,7 @@ impl<'a> System<'a> for Sys { Ok((chunk, supplement)) => (chunk, supplement), Err(Some(entity)) => { if let Some(in_game_stream) = in_game_streams.get_mut(entity) { - let _ = in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate { + in_game_stream.send_unchecked(ServerGeneral::TerrainChunkUpdate { key, chunk: Err(()), }); @@ -90,7 +94,7 @@ impl<'a> System<'a> for Sys { .magnitude_squared(); if adjusted_dist_sqr <= view_distance.pow(2) { - let _ = in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate { + in_game_stream.send_unchecked(ServerGeneral::TerrainChunkUpdate { key, chunk: Ok(Box::new(chunk.clone())), }); diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index e547f4a8e3..fc2f657518 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -1,5 +1,5 @@ use super::SysTimer; -use crate::client::InGameStream; +use crate::streams::{GetStream, InGameStream}; use common::{ comp::{Player, Pos}, msg::ServerGeneral, @@ -39,7 +39,7 @@ impl<'a> System<'a> for Sys { .map(|vd| super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, vd)) .unwrap_or(false) { - let _ = in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate { + let _ = in_game_stream.send(ServerGeneral::TerrainChunkUpdate { key: *chunk_key, chunk: Ok(Box::new(match terrain.get_key(*chunk_key) { Some(chunk) => chunk.clone(), @@ -55,7 +55,7 @@ impl<'a> System<'a> for Sys { let msg = ServerGeneral::TerrainBlockUpdates(terrain_changes.modified_blocks.clone()); for (player, in_game_stream) in (&players, &mut in_game_streams).join() { if player.view_distance.is_some() { - let _ = in_game_stream.0.send(msg.clone()); + in_game_stream.send_unchecked(msg.clone()); } } diff --git a/server/src/sys/waypoint.rs b/server/src/sys/waypoint.rs index 3af23149b4..0d2f7426dd 100644 --- a/server/src/sys/waypoint.rs +++ b/server/src/sys/waypoint.rs @@ -1,5 +1,5 @@ use super::SysTimer; -use crate::client::GeneralStream; +use crate::streams::{GeneralStream, GetStream}; use common::{ comp::{Player, Pos, Waypoint, WaypointArea}, msg::{Notification, ServerGeneral}, @@ -51,9 +51,9 @@ impl<'a> System<'a> for Sys { if let Ok(wp_old) = waypoints.insert(entity, Waypoint::new(player_pos.0, *time)) { if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) { - let _ = general_stream - .0 - .send(ServerGeneral::Notification(Notification::WaypointSaved)); + general_stream.send_unchecked(ServerGeneral::Notification( + Notification::WaypointSaved, + )); } } }