diff --git a/server/src/client.rs b/server/src/client.rs index f3db6ee99e..9b33e6f5f8 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,17 +1,10 @@ use crate::error::Error; -use common::msg::{ - ClientCharacterScreen, ClientGeneral, ClientInGame, ClientIngame, ClientType, PingMsg, - ServerMsg, -}; +use common::msg::{ClientIngame, ClientType, ServerMsg}; use hashbrown::HashSet; use network::{Participant, Stream}; use serde::{de::DeserializeOwned, Serialize}; use specs::{Component, FlaggedStorage}; use specs_idvs::IdvStorage; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Mutex, -}; use tracing::debug; use vek::*; @@ -19,13 +12,13 @@ pub struct Client { pub registered: bool, pub client_type: ClientType, pub in_game: Option, - pub participant: Mutex>, + pub participant: Option, pub general_stream: Stream, pub ping_stream: Stream, pub register_stream: Stream, pub character_screen_stream: Stream, pub in_game_stream: Stream, - pub network_error: AtomicBool, + pub network_error: bool, pub last_ping: f64, pub login_msg_sent: bool, } @@ -35,11 +28,11 @@ impl Component for Client { } impl Client { - fn internal_send(b: &AtomicBool, s: &mut Stream, msg: M) { - if !b.load(Ordering::Relaxed) { + fn internal_send(err: &mut bool, s: &mut Stream, msg: M) { + if !*err { if let Err(e) = s.send(msg) { debug!(?e, "got a network error with client"); - b.store(true, Ordering::Relaxed); + *err = true; } } } @@ -64,33 +57,35 @@ impl Client { ServerMsg::Info(_) => panic!(ERR), ServerMsg::Init(_) => panic!(ERR), ServerMsg::RegisterAnswer(msg) => { - Self::internal_send(&self.network_error, &mut self.register_stream, &msg) - }, - ServerMsg::CharacterScreen(msg) => { - Self::internal_send(&self.network_error, &mut self.character_screen_stream, &msg) + Self::internal_send(&mut self.network_error, &mut self.register_stream, &msg) }, + ServerMsg::CharacterScreen(msg) => Self::internal_send( + &mut self.network_error, + &mut self.character_screen_stream, + &msg, + ), ServerMsg::InGame(msg) => { - Self::internal_send(&self.network_error, &mut self.in_game_stream, &msg) + Self::internal_send(&mut self.network_error, &mut self.in_game_stream, &msg) }, ServerMsg::General(msg) => { - Self::internal_send(&self.network_error, &mut self.general_stream, &msg) + Self::internal_send(&mut self.network_error, &mut self.general_stream, &msg) }, ServerMsg::Ping(msg) => { - Self::internal_send(&self.network_error, &mut self.ping_stream, &msg) + Self::internal_send(&mut self.network_error, &mut self.ping_stream, &msg) }, }; } pub async fn internal_recv( - b: &AtomicBool, + err: &mut bool, s: &mut Stream, ) -> Result { - if !b.load(Ordering::Relaxed) { + if !*err { match s.recv().await { Ok(r) => Ok(r), Err(e) => { debug!(?e, "got a network error with client while recv"); - b.store(true, Ordering::Relaxed); + *err = true; Err(Error::StreamErr(e)) }, } @@ -98,22 +93,6 @@ impl Client { Err(Error::StreamErr(network::StreamError::StreamClosed)) } } - - pub async fn recv_msg(&mut self) -> Result { - Self::internal_recv(&self.network_error, &mut self.general_stream).await - } - - pub async fn recv_in_game_msg(&mut self) -> Result { - Self::internal_recv(&self.network_error, &mut self.in_game_stream).await - } - - pub async fn recv_character_screen_msg(&mut self) -> Result { - Self::internal_recv(&self.network_error, &mut self.character_screen_stream).await - } - - pub async fn recv_ping_msg(&mut self) -> Result { - Self::internal_recv(&self.network_error, &mut self.ping_stream).await - } } // Distance from fuzzy_chunk before snapping to current chunk diff --git a/server/src/connection_handler.rs b/server/src/connection_handler.rs index a2c7145f58..6617a78214 100644 --- a/server/src/connection_handler.rs +++ b/server/src/connection_handler.rs @@ -133,13 +133,13 @@ impl ConnectionHandler { registered: false, client_type, in_game: None, - participant: std::sync::Mutex::new(Some(participant)), + participant: Some(participant), general_stream, ping_stream, register_stream, in_game_stream, character_screen_stream, - network_error: std::sync::atomic::AtomicBool::new(false), + network_error: false, last_ping: server_data.time, login_msg_sent: false, }; diff --git a/server/src/events/player.rs b/server/src/events/player.rs index db6a164a14..50380a8349 100644 --- a/server/src/events/player.rs +++ b/server/src/events/player.rs @@ -89,14 +89,13 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) { pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event { span!(_guard, "handle_client_disconnect"); - if let Some(client) = server.state().read_storage::().get(entity) { - let participant = match client.participant.try_lock() { - Ok(mut p) => p.take().unwrap(), - Err(e) => { - error!(?e, ?entity, "couldn't lock participant for removal"); - return Event::ClientDisconnected { entity }; - }, - }; + if let Some(client) = server + .state() + .ecs() + .write_storage::() + .get_mut(entity) + { + let participant = client.participant.take().unwrap(); let pid = participant.remote_pid(); std::thread::spawn(move || { let span = tracing::span!(tracing::Level::DEBUG, "client_disconnect", ?pid, ?entity); diff --git a/server/src/sys/message.rs b/server/src/sys/message.rs index 70a6afcddb..07badca7cc 100644 --- a/server/src/sys/message.rs +++ b/server/src/sys/message.rs @@ -453,23 +453,30 @@ impl Sys { editable_settings: &ReadExpect<'_, EditableSettings>, alias_validator: &ReadExpect<'_, AliasValidator>, ) -> Result<(), crate::error::Error> { + let (mut b1, mut b2, mut b3, mut b4, mut b5) = ( + client.network_error, + client.network_error, + client.network_error, + client.network_error, + client.network_error, + ); loop { - let q1 = Client::internal_recv(&client.network_error, &mut client.general_stream); - let q2 = Client::internal_recv(&client.network_error, &mut client.in_game_stream); - let q3 = - Client::internal_recv(&client.network_error, &mut client.character_screen_stream); - let q4 = Client::internal_recv(&client.network_error, &mut client.ping_stream); - let q5 = Client::internal_recv(&client.network_error, &mut client.register_stream); + let q1 = Client::internal_recv(&mut b1, &mut client.general_stream); + let q2 = Client::internal_recv(&mut b2, &mut client.in_game_stream); + let q3 = Client::internal_recv(&mut b3, &mut client.character_screen_stream); + let q4 = Client::internal_recv(&mut b4, &mut client.ping_stream); + let q5 = Client::internal_recv(&mut b5, &mut client.register_stream); let (m1, m2, m3, m4, m5) = select!( - msg = q1.fuse() => (Some(msg?), None, None, None, None), - msg = q2.fuse() => (None, Some(msg?), None, None, None), - msg = q3.fuse() => (None, None, Some(msg?), None, None), - msg = q4.fuse() => (None, None, None, Some(msg?), None), - msg = q5.fuse() => (None, None, None, None,Some(msg?)), + msg = q1.fuse() => (Some(msg), None, None, None, None), + msg = q2.fuse() => (None, Some(msg), None, None, None), + msg = q3.fuse() => (None, None, Some(msg), None, None), + msg = q4.fuse() => (None, None, None, Some(msg), None), + msg = q5.fuse() => (None, None, None, None,Some(msg)), ); *cnt += 1; if let Some(msg) = m1 { + client.network_error |= b1; Self::handle_client_msg( server_emitter, new_chat_msgs, @@ -478,10 +485,11 @@ impl Sys { player_metrics, uids, chat_modes, - msg, + msg?, )?; } if let Some(msg) = m2 { + client.network_error |= b2; Self::handle_client_in_game_msg( server_emitter, entity, @@ -498,10 +506,11 @@ impl Sys { players, controllers, settings, - msg, + msg?, )?; } if let Some(msg) = m3 { + client.network_error |= b3; Self::handle_client_character_screen_msg( server_emitter, new_chat_msgs, @@ -512,13 +521,15 @@ impl Sys { players, editable_settings, alias_validator, - msg, + msg?, )?; } if let Some(msg) = m4 { - Self::handle_ping_msg(client, msg)?; + client.network_error |= b4; + Self::handle_ping_msg(client, msg?)?; } if let Some(msg) = m5 { + client.network_error |= b5; Self::handle_register_msg( player_list, new_players, @@ -529,7 +540,7 @@ impl Sys { admins, players, editable_settings, - msg, + msg?, )?; } } @@ -666,8 +677,16 @@ impl<'a> System<'a> for Sys { ) }); - // Update client ping. - if cnt > 0 { + // Postbox error + if network_err.is_err() { + debug!(?entity, "postbox error with client, disconnecting"); + player_metrics + .clients_disconnected + .with_label_values(&["network_error"]) + .inc(); + server_emitter.emit(ServerEvent::ClientDisconnect(entity)); + } else if cnt > 0 { + // Update client ping. client.last_ping = time.0 } else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 // Timeout @@ -678,15 +697,6 @@ impl<'a> System<'a> for Sys { .with_label_values(&["timeout"]) .inc(); server_emitter.emit(ServerEvent::ClientDisconnect(entity)); - } else if network_err.is_err() - // Postbox error - { - debug!(?entity, "postbox error with client, disconnecting"); - player_metrics - .clients_disconnected - .with_label_values(&["network_error"]) - .inc(); - server_emitter.emit(ServerEvent::ClientDisconnect(entity)); } else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 * 0.5 { // Try pinging the client if the timeout is nearing. client.send_msg(PingMsg::Ping);