From 50ac07c7526cc7e235dc7ef76ac9d2736653925a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Wed, 21 Oct 2020 12:23:34 +0200 Subject: [PATCH] review: - fix wording in error msg - find better name for structs - unify errors and cleanup code with `(|| {foo?; Some(())})()` pattern - fix the negative PlayersOnline, it was caused by having a gracefull shutdown AND a timeout error. we now unregister the client when he issues TERMINATE --- server/src/cmd.rs | 2 +- server/src/connection_handler.rs | 14 +- server/src/events/entity_manipulation.rs | 2 +- server/src/events/group_manip.rs | 42 ++--- server/src/events/interaction.rs | 200 ++++++++++------------- server/src/events/player.rs | 58 +++---- server/src/state_ext.rs | 16 +- server/src/streams.rs | 2 +- server/src/sys/entity_sync.rs | 12 +- server/src/sys/invite_timeout.rs | 2 +- server/src/sys/msg/general.rs | 2 + server/src/sys/msg/mod.rs | 5 +- server/src/sys/msg/ping.rs | 33 ++-- server/src/sys/subscription.rs | 8 +- server/src/sys/terrain.rs | 4 +- server/src/sys/waypoint.rs | 2 +- 16 files changed, 193 insertions(+), 211 deletions(-) diff --git a/server/src/cmd.rs b/server/src/cmd.rs index dfa5c9b064..204d4f399d 100644 --- a/server/src/cmd.rs +++ b/server/src/cmd.rs @@ -673,7 +673,7 @@ fn handle_spawn( .map(|g| (g, s)) }) .map(|(g, s)| { - s.send_unchecked(ServerGeneral::GroupUpdate(g)); + s.send_fallible(ServerGeneral::GroupUpdate(g)); }); }, ); diff --git a/server/src/connection_handler.rs b/server/src/connection_handler.rs index 6a790f367a..847d6d8693 100644 --- a/server/src/connection_handler.rs +++ b/server/src/connection_handler.rs @@ -16,7 +16,7 @@ pub(crate) struct ServerInfoPacket { pub time: f64, } -pub(crate) struct ClientPackage { +pub(crate) struct IncomingClient { pub client: Client, pub general: GeneralStream, pub ping: PingStream, @@ -28,7 +28,7 @@ pub(crate) struct ClientPackage { pub(crate) struct ConnectionHandler { _network: Arc, thread_handle: Option>, - pub client_receiver: Receiver, + pub client_receiver: Receiver, pub info_requester_receiver: Receiver>, stop_sender: Option>, } @@ -43,7 +43,7 @@ impl ConnectionHandler { let network_clone = Arc::clone(&network); let (stop_sender, stop_receiver) = oneshot::channel(); - let (client_sender, client_receiver) = unbounded::(); + let (client_sender, client_receiver) = unbounded::(); let (info_requester_sender, info_requester_receiver) = bounded::>(1); @@ -67,7 +67,7 @@ impl ConnectionHandler { async fn work( network: Arc, - client_sender: Sender, + client_sender: Sender, info_requester_sender: Sender>, stop_receiver: oneshot::Receiver<()>, ) { @@ -104,7 +104,7 @@ impl ConnectionHandler { async fn init_participant( participant: Participant, - client_sender: Sender, + client_sender: Sender, info_requester_sender: Sender>, ) -> Result<(), Box> { debug!("New Participant connected to the server"); @@ -130,7 +130,7 @@ impl ConnectionHandler { t = register_stream.recv::().fuse() => Some(t), ) { None => { - debug!("slow client connection detected, dropping it"); + debug!("Timeout for incoming client elapsed, aborting connection"); return Ok(()); }, Some(client_type) => client_type?, @@ -145,7 +145,7 @@ impl ConnectionHandler { login_msg_sent: false, }; - let package = ClientPackage { + let package = IncomingClient { client, general: GeneralStream(general_stream), ping: PingStream(ping_stream), diff --git a/server/src/events/entity_manipulation.rs b/server/src/events/entity_manipulation.rs index 39bb1c39e9..c8896e1f8f 100644 --- a/server/src/events/entity_manipulation.rs +++ b/server/src/events/entity_manipulation.rs @@ -45,7 +45,7 @@ pub fn handle_knockback(server: &Server, entity: EcsEntity, impulse: Vec3) } let mut in_game_streams = state.ecs().write_storage::(); if let Some(in_game_stream) = in_game_streams.get_mut(entity) { - in_game_stream.send_unchecked(ServerGeneral::Knockback(impulse)); + in_game_stream.send_fallible(ServerGeneral::Knockback(impulse)); } } diff --git a/server/src/events/group_manip.rs b/server/src/events/group_manip.rs index 17729c436b..2c750f86bb 100644 --- a/server/src/events/group_manip.rs +++ b/server/src/events/group_manip.rs @@ -34,7 +34,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani None => { // Inform of failure if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_unchecked( + general_stream.send_fallible( ChatType::Meta .server_msg("Invite failed, target does not exist.".to_owned()), ); @@ -66,7 +66,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if already_in_same_group { // Inform of failure if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_unchecked(ChatType::Meta.server_msg( + general_stream.send_fallible(ChatType::Meta.server_msg( "Invite failed, can't invite someone already in your group".to_owned(), )); } @@ -96,7 +96,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if group_size_limit_reached { // Inform inviter that they have reached the group size limit if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_unchecked( + general_stream.send_fallible( ChatType::Meta.server_msg( "Invite failed, pending invites plus current group size have reached \ the group size limit" @@ -113,7 +113,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if invites.contains(invitee) { // Inform inviter that there is already an invite if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_unchecked( + general_stream.send_fallible( ChatType::Meta .server_msg("This player already has a pending invite.".to_owned()), ); @@ -160,7 +160,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani (in_game_streams.get_mut(invitee), uids.get(entity).copied()) { if send_invite() { - in_game_stream.send_unchecked(ServerGeneral::GroupInvite { + in_game_stream.send_fallible(ServerGeneral::GroupInvite { inviter, timeout: PRESENTED_INVITE_TIMEOUT_DUR, }); @@ -168,7 +168,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani } else if agents.contains(invitee) { send_invite(); } else if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_unchecked( + general_stream.send_fallible( ChatType::Meta.server_msg("Can't invite, not a player or npc".to_owned()), ); } @@ -176,7 +176,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Notify inviter that the invite is pending if invite_sent { if let Some(in_game_stream) = in_game_streams.get_mut(entity) { - in_game_stream.send_unchecked(ServerGeneral::InvitePending(uid)); + in_game_stream.send_fallible(ServerGeneral::InvitePending(uid)); } } }, @@ -201,7 +201,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if let (Some(in_game_stream), Some(target)) = (in_game_streams.get_mut(inviter), uids.get(entity).copied()) { - in_game_stream.send_unchecked(ServerGeneral::InviteComplete { + in_game_stream.send_fallible(ServerGeneral::InviteComplete { target, answer: InviteAnswer::Accepted, }); @@ -249,7 +249,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if let (Some(in_game_stream), Some(target)) = (in_game_streams.get_mut(inviter), uids.get(entity).copied()) { - in_game_stream.send_unchecked(ServerGeneral::InviteComplete { + in_game_stream.send_fallible(ServerGeneral::InviteComplete { target, answer: InviteAnswer::Declined, }); @@ -288,7 +288,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani None => { // Inform of failure if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_unchecked( + general_stream.send_fallible( ChatType::Meta .server_msg("Kick failed, target does not exist.".to_owned()), ); @@ -301,7 +301,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if matches!(alignments.get(target), Some(comp::Alignment::Owned(owner)) if uids.get(target).map_or(true, |u| u != owner)) { if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_unchecked( + general_stream.send_fallible( ChatType::Meta.server_msg("Kick failed, you can't kick pets.".to_owned()), ); } @@ -310,7 +310,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Can't kick yourself if uids.get(entity).map_or(false, |u| *u == uid) { if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_unchecked( + general_stream.send_fallible( ChatType::Meta .server_msg("Kick failed, you can't kick yourself.".to_owned()), ); @@ -348,7 +348,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Tell them the have been kicked if let Some(general_stream) = general_streams.get_mut(target) { - general_stream.send_unchecked( + general_stream.send_fallible( ChatType::Meta .server_msg("You were removed from the group.".to_owned()), ); @@ -356,13 +356,13 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Tell kicker that they were succesful if let Some(general_stream) = general_streams.get_mut(entity) { general_stream - .send_unchecked(ChatType::Meta.server_msg("Player kicked.".to_owned())); + .send_fallible(ChatType::Meta.server_msg("Player kicked.".to_owned())); } }, Some(_) => { // Inform kicker that they are not the leader if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_unchecked(ChatType::Meta.server_msg( + general_stream.send_fallible(ChatType::Meta.server_msg( "Kick failed: You are not the leader of the target's group.".to_owned(), )); } @@ -370,7 +370,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani None => { // Inform kicker that the target is not in a group if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_unchecked( + general_stream.send_fallible( ChatType::Meta.server_msg( "Kick failed: Your target is not in a group.".to_owned(), ), @@ -387,7 +387,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani None => { // Inform of failure if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_unchecked(ChatType::Meta.server_msg( + general_stream.send_fallible(ChatType::Meta.server_msg( "Leadership transfer failed, target does not exist".to_owned(), )); } @@ -423,13 +423,13 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani ); // Tell them they are the leader if let Some(general_stream) = general_streams.get_mut(target) { - general_stream.send_unchecked( + general_stream.send_fallible( ChatType::Meta.server_msg("You are the group leader now.".to_owned()), ); } // Tell the old leader that the transfer was succesful if let Some(general_stream) = general_streams.get_mut(target) { - general_stream.send_unchecked( + general_stream.send_fallible( ChatType::Meta .server_msg("You are no longer the group leader.".to_owned()), ); @@ -439,7 +439,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Inform transferer that they are not the leader let mut general_streams = state.ecs().write_storage::(); if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_unchecked( + general_stream.send_fallible( ChatType::Meta.server_msg( "Transfer failed: You are not the leader of the target's group." .to_owned(), @@ -451,7 +451,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Inform transferer that the target is not in a group let mut general_streams = state.ecs().write_storage::(); if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_unchecked(ChatType::Meta.server_msg( + general_stream.send_fallible(ChatType::Meta.server_msg( "Transfer failed: Your target is not in a group.".to_owned(), )); } diff --git a/server/src/events/interaction.rs b/server/src/events/interaction.rs index cd1da4e31b..79a9073309 100644 --- a/server/src/events/interaction.rs +++ b/server/src/events/interaction.rs @@ -121,124 +121,96 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) { return; } - // You can't possess other players let mut clients = ecs.write_storage::(); let mut general_streams = ecs.write_storage::(); - let mut ping_streams = ecs.write_storage::(); - let mut register_streams = ecs.write_storage::(); - let mut character_screen_streams = ecs.write_storage::(); - let mut in_game_streams = ecs.write_storage::(); - if clients.get_mut(possesse).is_none() { - let client = match clients.remove(possessor) { - Some(c) => c, - None => return, - }; - let mut general_stream = match general_streams.remove(possessor) { - Some(c) => c, - None => return, - }; - let ping_stream = match ping_streams.remove(possessor) { - Some(c) => c, - None => return, - }; - let register_stream = match register_streams.remove(possessor) { - Some(c) => c, - None => return, - }; - let character_screen_stream = match character_screen_streams.remove(possessor) { - Some(c) => c, - None => return, - }; - let in_game_stream = match in_game_streams.remove(possessor) { - Some(c) => c, - None => return, - }; - general_stream.send_unchecked(ServerGeneral::SetPlayerEntity(possesse_uid)); - let err_fn = |c, e: Option| { - e.map(|e| error!(?e, "Error inserting {} component during possession", c)); - }; - err_fn("client", clients.insert(possesse, client).err()); - err_fn( - "general_streams", - general_streams.insert(possesse, general_stream).err(), - ); - err_fn( - "ping_streams", - ping_streams.insert(possesse, ping_stream).err(), - ); - err_fn( - "register_streams", - register_streams.insert(possesse, register_stream).err(), - ); - err_fn( - "character_screen_streams", - character_screen_streams - .insert(possesse, character_screen_stream) - .err(), - ); - err_fn( - "in_game_streams", - in_game_streams.insert(possesse, in_game_stream).err(), - ); - // Put possess item into loadout - let mut loadouts = ecs.write_storage::(); - let loadout = loadouts - .entry(possesse) - .expect("Could not read loadouts component while possessing") - .or_insert(comp::Loadout::default()); - - let item = comp::Item::new_from_asset_expect("common.items.debug.possess"); - if let item::ItemKind::Tool(tool) = item.kind() { - let mut abilities = tool.get_abilities(); - let mut ability_drain = abilities.drain(..); - let debug_item = comp::ItemConfig { - item, - ability1: ability_drain.next(), - ability2: ability_drain.next(), - ability3: ability_drain.next(), - block_ability: None, - dodge_ability: None, - }; - std::mem::swap(&mut loadout.active_item, &mut loadout.second_item); - loadout.active_item = Some(debug_item); - } - - // Move player component - { - let mut players = ecs.write_storage::(); - if let Some(player) = players.remove(possessor) { - err_fn("player", players.insert(possesse, player).err()); - } - } - // Transfer region subscription - { - let mut subscriptions = ecs.write_storage::(); - if let Some(s) = subscriptions.remove(possessor) { - err_fn("subscription", subscriptions.insert(possesse, s).err()); - } - } - // Remove will of the entity - ecs.write_storage::().remove(possesse); - // Reset controller of former shell - ecs.write_storage::() - .get_mut(possessor) - .map(|c| c.reset()); - // Transfer admin powers - { - let mut admins = ecs.write_storage::(); - if let Some(admin) = admins.remove(possessor) { - err_fn("admin", admins.insert(possesse, admin).err()); - } - } - // Transfer waypoint - { - let mut waypoints = ecs.write_storage::(); - if let Some(waypoint) = waypoints.remove(possessor) { - err_fn("waypoints", waypoints.insert(possesse, waypoint).err()); - } - } + if clients.get_mut(possesse).is_some() { + error!("can't possess other players"); + return; } + + match (|| -> Option> { + let mut ping_streams = ecs.write_storage::(); + let mut register_streams = ecs.write_storage::(); + let mut character_screen_streams = ecs.write_storage::(); + let mut in_game_streams = ecs.write_storage::(); + + let c = clients.remove(possessor)?; + clients.insert(possesse, c).ok()?; + let s = general_streams.remove(possessor)?; + general_streams.insert(possesse, s).ok()?; + let s = ping_streams.remove(possessor)?; + ping_streams.insert(possesse, s).ok()?; + let s = register_streams.remove(possessor)?; + register_streams.insert(possesse, s).ok()?; + let s = character_screen_streams.remove(possessor)?; + character_screen_streams.insert(possesse, s).ok()?; + let s = in_game_streams.remove(possessor)?; + in_game_streams.insert(possesse, s).ok()?; + //optional entities + let mut players = ecs.write_storage::(); + let mut subscriptions = ecs.write_storage::(); + let mut admins = ecs.write_storage::(); + let mut waypoints = ecs.write_storage::(); + players + .remove(possessor) + .map(|p| players.insert(possesse, p).ok()?); + subscriptions + .remove(possessor) + .map(|s| subscriptions.insert(possesse, s).ok()?); + admins + .remove(possessor) + .map(|a| admins.insert(possesse, a).ok()?); + waypoints + .remove(possessor) + .map(|w| waypoints.insert(possesse, w).ok()?); + + Some(Ok(())) + })() { + Some(Ok(())) => (), + Some(Err(e)) => { + error!(?e, ?possesse, "Error inserting component during possession"); + return; + }, + None => { + error!(?possessor, "Error removing component during possession"); + return; + }, + } + + general_streams + .get_mut(possesse) + .map(|s| s.send_fallible(ServerGeneral::SetPlayerEntity(possesse_uid))); + + // Put possess item into loadout + let mut loadouts = ecs.write_storage::(); + let loadout = loadouts + .entry(possesse) + .expect("Could not read loadouts component while possessing") + .or_insert(comp::Loadout::default()); + + let item = comp::Item::new_from_asset_expect("common.items.debug.possess"); + if let item::ItemKind::Tool(tool) = item.kind() { + let mut abilities = tool.get_abilities(); + let mut ability_drain = abilities.drain(..); + let debug_item = comp::ItemConfig { + item, + ability1: ability_drain.next(), + ability2: ability_drain.next(), + ability3: ability_drain.next(), + block_ability: None, + dodge_ability: None, + }; + std::mem::swap(&mut loadout.active_item, &mut loadout.second_item); + loadout.active_item = Some(debug_item); + } + + // Remove will of the entity + ecs.write_storage::().remove(possesse); + // Reset controller of former shell + ecs.write_storage::() + .get_mut(possessor) + .map(|c| c.reset()); } } diff --git a/server/src/events/player.rs b/server/src/events/player.rs index 0d35b28d07..0e79497073 100644 --- a/server/src/events/player.rs +++ b/server/src/events/player.rs @@ -28,46 +28,40 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) { // components Easier than checking and removing all other known components // Note: If other `ServerEvent`s are referring to this entity they will be // disrupted - let maybe_client = state.ecs().write_storage::().remove(entity); - let maybe_uid = state.read_component_copied::(entity); - let maybe_player = state.ecs().write_storage::().remove(entity); - let maybe_admin = state.ecs().write_storage::().remove(entity); - let maybe_general_stream = state.ecs().write_storage::().remove(entity); - let maybe_ping_stream = state.ecs().write_storage::().remove(entity); - let maybe_register_stream = state.ecs().write_storage::().remove(entity); - let maybe_character_screen_stream = state - .ecs() - .write_storage::() - .remove(entity); - let maybe_in_game_stream = state.ecs().write_storage::().remove(entity); + let maybe_admin = state.ecs().write_storage::().remove(entity); let maybe_group = state .ecs() .write_storage::() .get(entity) .cloned(); - if let ( - Some(mut client), - Some(uid), - Some(player), - Some(general_stream), - Some(ping_stream), - Some(register_stream), - Some(character_screen_stream), - Some(mut in_game_stream), - ) = ( - maybe_client, - maybe_uid, - maybe_player, - maybe_general_stream, - maybe_ping_stream, - maybe_register_stream, - maybe_character_screen_stream, - maybe_in_game_stream, - ) { + + if let Some(( + mut client, + uid, + player, + general_stream, + ping_stream, + register_stream, + character_screen_stream, + mut in_game_stream, + )) = (|| { + let ecs = state.ecs(); + Some(( + ecs.write_storage::().remove(entity)?, + ecs.write_storage::().remove(entity)?, + ecs.write_storage::().remove(entity)?, + ecs.write_storage::().remove(entity)?, + ecs.write_storage::().remove(entity)?, + ecs.write_storage::().remove(entity)?, + ecs.write_storage::() + .remove(entity)?, + ecs.write_storage::().remove(entity)?, + )) + })() { // Tell client its request was successful client.in_game = None; - in_game_stream.send_unchecked(ServerGeneral::ExitInGameSuccess); + in_game_stream.send_fallible(ServerGeneral::ExitInGameSuccess); let entity_builder = state .ecs_mut() diff --git a/server/src/state_ext.rs b/server/src/state_ext.rs index afc8a88f0c..7cd3082955 100644 --- a/server/src/state_ext.rs +++ b/server/src/state_ext.rs @@ -232,7 +232,7 @@ impl StateExt for State { .get_mut(entity) { client.in_game = Some(ClientInGame::Character); - character_screen_stream.send_unchecked(ServerGeneral::CharacterSuccess); + character_screen_stream.send_fallible(ServerGeneral::CharacterSuccess); } } } @@ -298,7 +298,7 @@ impl StateExt for State { .join() { if uid != u { - general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone())); + general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, @@ -310,7 +310,7 @@ impl StateExt for State { .join() { if uid == u || uid == t { - general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone())); + general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, @@ -325,7 +325,7 @@ impl StateExt for State { { if is_within(comp::ChatMsg::SAY_DISTANCE, pos, speaker_pos) { general_stream - .send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone())); + .send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } @@ -340,7 +340,7 @@ impl StateExt for State { { if is_within(comp::ChatMsg::REGION_DISTANCE, pos, speaker_pos) { general_stream - .send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone())); + .send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } @@ -355,7 +355,7 @@ impl StateExt for State { { if is_within(comp::ChatMsg::NPC_DISTANCE, pos, speaker_pos) { general_stream - .send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone())); + .send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } @@ -369,7 +369,7 @@ impl StateExt for State { .join() { if s == &faction.0 { - general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone())); + general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, @@ -381,7 +381,7 @@ impl StateExt for State { .join() { if g == group { - general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone())); + general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, diff --git a/server/src/streams.rs b/server/src/streams.rs index 7c279902d9..d1dfebfd86 100644 --- a/server/src/streams.rs +++ b/server/src/streams.rs @@ -21,7 +21,7 @@ pub(crate) trait GetStream { } } - fn send_unchecked(&mut self, msg: Self::SendMsg) { let _ = self.send(msg); } + fn send_fallible(&mut self, msg: Self::SendMsg) { let _ = self.send(msg); } fn prepare(&mut self, msg: &Self::SendMsg) -> Message { if Self::verify(&msg) { diff --git a/server/src/sys/entity_sync.rs b/server/src/sys/entity_sync.rs index ec1e6aa63a..7431f979fe 100644 --- a/server/src/sys/entity_sync.rs +++ b/server/src/sys/entity_sync.rs @@ -166,7 +166,7 @@ impl<'a> System<'a> for Sys { // Client doesn't need to know about itself && *client_entity != entity { - general_stream.send_unchecked(create_msg.clone()); + general_stream.send_fallible(create_msg.clone()); } } } @@ -180,7 +180,7 @@ impl<'a> System<'a> for Sys { .map(|key| !regions.contains(key)) .unwrap_or(true) { - general_stream.send_unchecked(ServerGeneral::DeleteEntity(uid)); + general_stream.send_fallible(ServerGeneral::DeleteEntity(uid)); } } } @@ -252,7 +252,7 @@ impl<'a> System<'a> for Sys { true // Closer than 100 blocks } } { - general_stream.send_unchecked(msg.clone()); + general_stream.send_fallible(msg.clone()); } } }; @@ -350,7 +350,7 @@ impl<'a> System<'a> for Sys { }) { for uid in &deleted { - general_stream.send_unchecked(ServerGeneral::DeleteEntity(Uid(*uid))); + general_stream.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid))); } } } @@ -361,7 +361,7 @@ impl<'a> System<'a> for Sys { for (inventory, update, in_game_stream) in (&inventories, &inventory_updates, &mut in_game_streams).join() { - in_game_stream.send_unchecked(ServerGeneral::InventoryUpdate( + in_game_stream.send_fallible(ServerGeneral::InventoryUpdate( inventory.clone(), update.event(), )); @@ -384,7 +384,7 @@ impl<'a> System<'a> for Sys { .cloned() .collect::>(); if !outcomes.is_empty() { - in_game_stream.send_unchecked(ServerGeneral::Outcomes(outcomes)); + in_game_stream.send_fallible(ServerGeneral::Outcomes(outcomes)); } } outcomes.clear(); diff --git a/server/src/sys/invite_timeout.rs b/server/src/sys/invite_timeout.rs index fdb1d392f9..ea913a1a1c 100644 --- a/server/src/sys/invite_timeout.rs +++ b/server/src/sys/invite_timeout.rs @@ -55,7 +55,7 @@ impl<'a> System<'a> for Sys { in_game_streams.get_mut(*inviter), uids.get(invitee).copied(), ) { - in_game_stream.send_unchecked(ServerGeneral::InviteComplete { + in_game_stream.send_fallible(ServerGeneral::InviteComplete { target, answer: InviteAnswer::TimedOut, }); diff --git a/server/src/sys/msg/general.rs b/server/src/sys/msg/general.rs index a583a76805..9038dac57f 100644 --- a/server/src/sys/msg/general.rs +++ b/server/src/sys/msg/general.rs @@ -61,6 +61,8 @@ impl Sys { .clients_disconnected .with_label_values(&["gracefully"]) .inc(); + client.registered = false; + client.in_game = None; server_emitter.emit(ServerEvent::ClientDisconnect(entity)); }, _ => unreachable!("not a client_general msg"), diff --git a/server/src/sys/msg/mod.rs b/server/src/sys/msg/mod.rs index a95ef02a94..54aa60a9d4 100644 --- a/server/src/sys/msg/mod.rs +++ b/server/src/sys/msg/mod.rs @@ -8,7 +8,10 @@ use crate::streams::GetStream; /// handles all send msg and calls a handle fn /// Aborts when a error occurred returns cnt of successful msg otherwise -pub(crate) fn try_recv_all(stream: &mut T, mut f: F) -> Result +pub(in crate::sys::msg) fn try_recv_all( + stream: &mut T, + mut f: F, +) -> Result where T: GetStream, F: FnMut(&mut T, T::RecvMsg) -> Result<(), crate::error::Error>, diff --git a/server/src/sys/msg/ping.rs b/server/src/sys/msg/ping.rs index 68be90ffc0..db0308c20a 100644 --- a/server/src/sys/msg/ping.rs +++ b/server/src/sys/msg/ping.rs @@ -67,11 +67,19 @@ impl<'a> System<'a> for Sys { match res { Err(e) => { - debug!(?entity, ?e, "network error with client, disconnecting"); - player_metrics - .clients_disconnected - .with_label_values(&["network_error"]) - .inc(); + let reg = client.registered; + debug!( + ?entity, + ?e, + ?reg, + "network error with client, disconnecting" + ); + if reg { + player_metrics + .clients_disconnected + .with_label_values(&["network_error"]) + .inc(); + } server_emitter.emit(ServerEvent::ClientDisconnect(entity)); }, Ok(1_u64..=u64::MAX) => { @@ -82,17 +90,20 @@ impl<'a> System<'a> for Sys { if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 // Timeout { - info!(?entity, "timeout error with client, disconnecting"); - player_metrics - .clients_disconnected - .with_label_values(&["timeout"]) - .inc(); + let reg = client.registered; + info!(?entity, ?reg, "timeout error with client, disconnecting"); + if reg { + player_metrics + .clients_disconnected + .with_label_values(&["timeout"]) + .inc(); + } server_emitter.emit(ServerEvent::ClientDisconnect(entity)); } else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 * 0.5 { // Try pinging the client if the timeout is nearing. - ping_stream.send_unchecked(PingMsg::Ping); + ping_stream.send_fallible(PingMsg::Ping); } }, } diff --git a/server/src/sys/subscription.rs b/server/src/sys/subscription.rs index ce7c3567b1..45327314b5 100644 --- a/server/src/sys/subscription.rs +++ b/server/src/sys/subscription.rs @@ -160,7 +160,7 @@ impl<'a> System<'a> for Sys { .unwrap_or(false) { general_stream - .send_unchecked(ServerGeneral::DeleteEntity(uid)); + .send_fallible(ServerGeneral::DeleteEntity(uid)); } } }, @@ -178,7 +178,7 @@ impl<'a> System<'a> for Sys { .iter() .flat_map(|v| v.iter()) { - general_stream.send_unchecked(ServerGeneral::DeleteEntity(Uid(*uid))); + general_stream.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid))); } } @@ -203,7 +203,7 @@ impl<'a> System<'a> for Sys { { // Send message to create entity and tracked components and physics // components - general_stream.send_unchecked(ServerGeneral::CreateEntity( + general_stream.send_fallible(ServerGeneral::CreateEntity( tracked_comps.create_entity_package( entity, Some(*pos), @@ -256,7 +256,7 @@ pub fn initialize_region_subscription(world: &World, entity: specs::Entity) { .join() { // Send message to create entity and tracked components and physics components - general_stream.send_unchecked(ServerGeneral::CreateEntity( + general_stream.send_fallible(ServerGeneral::CreateEntity( tracked_comps.create_entity_package( entity, Some(*pos), diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 07f11a19db..cd2826541c 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -67,7 +67,7 @@ impl<'a> System<'a> for Sys { Ok((chunk, supplement)) => (chunk, supplement), Err(Some(entity)) => { if let Some(in_game_stream) = in_game_streams.get_mut(entity) { - in_game_stream.send_unchecked(ServerGeneral::TerrainChunkUpdate { + in_game_stream.send_fallible(ServerGeneral::TerrainChunkUpdate { key, chunk: Err(()), }); @@ -94,7 +94,7 @@ impl<'a> System<'a> for Sys { .magnitude_squared(); if adjusted_dist_sqr <= view_distance.pow(2) { - in_game_stream.send_unchecked(ServerGeneral::TerrainChunkUpdate { + in_game_stream.send_fallible(ServerGeneral::TerrainChunkUpdate { key, chunk: Ok(Box::new(chunk.clone())), }); diff --git a/server/src/sys/waypoint.rs b/server/src/sys/waypoint.rs index 0d2f7426dd..237391b964 100644 --- a/server/src/sys/waypoint.rs +++ b/server/src/sys/waypoint.rs @@ -51,7 +51,7 @@ impl<'a> System<'a> for Sys { if let Ok(wp_old) = waypoints.insert(entity, Waypoint::new(player_pos.0, *time)) { if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) { - general_stream.send_unchecked(ServerGeneral::Notification( + general_stream.send_fallible(ServerGeneral::Notification( Notification::WaypointSaved, )); }