diff --git a/client/src/lib.rs b/client/src/lib.rs index accc589818..9e939c91c8 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -613,8 +613,7 @@ impl Client { ); } self.singleton_stream - .send(ClientMsg::ControllerInputs(inputs)) - .unwrap(); + .send(ClientMsg::ControllerInputs(inputs))?; } // 2) Build up a list of events for this frame, to be passed to the frontend. diff --git a/network/src/api.rs b/network/src/api.rs index 31dcae775c..d31de577b2 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -905,17 +905,16 @@ impl Drop for Participant { .send((self.remote_pid, finished_sender)) .await .expect("Something is wrong in internal scheduler coding"); - match finished_receiver + if let Err(e) = finished_receiver .await .expect("Something is wrong in internal scheduler/participant coding") { - Err(e) => error!( + error!( ?pid, ?e, "Error while dropping the participant, couldn't send all outgoing \ messages, dropping remaining" - ), - _ => (), + ); }; }); }, diff --git a/network/src/message.rs b/network/src/message.rs index baf6ef6ce1..03626f22d3 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -45,7 +45,7 @@ pub(crate) fn serialize(message: &M) -> MessageBuffer { // std::Result> { pub(crate) fn deserialize(buffer: MessageBuffer) -> bincode::Result { let span = lz4_compress::decompress(&buffer.data) - .expect("lz4_compression error, failed to deserialze"); + .expect("lz4 decompression failed, failed to deserialze"); //this might fail if you choose the wrong type for M. in that case probably X // got transfered while you assume Y. probably this means your application // logic is wrong. E.g. You expect a String, but just get a u8. diff --git a/network/tests/closing.rs b/network/tests/closing.rs index 1b5b74fb2a..9b79f9c88a 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -1,3 +1,23 @@ +//! How to read those tests: +//! - in the first line we call the helper, this is only debug code. in case +//! you want to have tracing for a special test you set set the bool = true +//! and the sleep to 10000 and your test will start 10 sec delayed with +//! tracing. You need a delay as otherwise the other tests polute your trace +//! - the second line is to simulate a client and a server +//! `network_participant_stream` will return +//! - 2 networks +//! - 2 participants +//! - 2 streams +//! each one `linked` to their counterpart. +//! You see a cryptic use of rust `_` this is because we are testing the +//! `drop` behavior here. +//! - A `_` means this is directly dropped after the line executes, thus +//! immediately executing its `Drop` impl. +//! - A `_p1_a` e.g. means we don't use that Participant yet, but we must +//! not `drop` it yet as we might want to use the Streams. +//! - You sometimes see sleep(1000ms) this is used when we rely on the +//! underlying TCP functionality, as this simulates client and server + use async_std::task; use task::block_on; use veloren_network::StreamError; @@ -22,9 +42,13 @@ fn close_participant() { let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp())); block_on(p1_a.disconnect()).unwrap(); - //We dont know of if the disconect is done YET, so the next command will either - // return already closed or fail a gracefully close as it will discover that the - // remote site closed right now + // The following will `Err`, but we don't know the exact error message. + // Why? because of the TCP layer we have no guarantee if the TCP messages send + // one line above already reached `p1_b`. If they reached them it would fail + // with a `ParticipantDisconnected` as a clean disconnect was performed. + // If they haven't reached them yet but will reach them during the execution it + // will return a unclean shutdown was detected. Nevertheless, if it returns + // Ok(()) then something is wrong! assert!(block_on(p1_b.disconnect()).is_err()); assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed)); diff --git a/server/src/client.rs b/server/src/client.rs index 345d674381..95e6d96b91 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,15 +1,21 @@ -use common::msg::{ClientState, RequestStateError, ServerMsg}; +use crate::error::Error; +use common::msg::{ClientMsg, ClientState, RequestStateError, ServerMsg}; use hashbrown::HashSet; use network::{Participant, Stream}; use specs::{Component, FlaggedStorage}; use specs_idvs::IdvStorage; -use std::sync::Mutex; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Mutex, +}; +use tracing::debug; use vek::*; pub struct Client { pub client_state: ClientState, pub participant: Mutex>, pub singleton_stream: Stream, + pub network_error: AtomicBool, pub last_ping: f64, pub login_msg_sent: bool, } @@ -19,7 +25,29 @@ impl Component for Client { } impl Client { - pub fn notify(&mut self, msg: ServerMsg) { let _ = self.singleton_stream.send(msg); } + pub fn notify(&mut self, msg: ServerMsg) { + if !self.network_error.load(Ordering::Relaxed) { + if let Err(e) = self.singleton_stream.send(msg) { + debug!(?e, "got a network error with client"); + self.network_error.store(true, Ordering::Relaxed); + } + } + } + + pub async fn recv(&mut self) -> Result { + if !self.network_error.load(Ordering::Relaxed) { + match self.singleton_stream.recv().await { + Ok(r) => Ok(r), + Err(e) => { + debug!(?e, "got a network error with client while recv"); + self.network_error.store(true, Ordering::Relaxed); + Err(Error::StreamErr(e)) + }, + } + } else { + Err(Error::StreamErr(network::StreamError::StreamClosed)) + } + } pub fn is_registered(&self) -> bool { match self.client_state { @@ -43,9 +71,7 @@ impl Client { } pub fn error_state(&mut self, error: RequestStateError) { - let _ = self - .singleton_stream - .send(ServerMsg::StateAnswer(Err((error, self.client_state)))); + let _ = self.notify(ServerMsg::StateAnswer(Err((error, self.client_state)))); } } diff --git a/server/src/lib.rs b/server/src/lib.rs index d7fab8912c..11d7d95093 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -604,6 +604,7 @@ impl Server { client_state: ClientState::Connected, participant: std::sync::Mutex::new(Some(participant)), singleton_stream, + network_error: std::sync::atomic::AtomicBool::new(false), last_ping: self.state.get_time(), login_msg_sent: false, }; diff --git a/server/src/sys/message.rs b/server/src/sys/message.rs index de7b10c233..a6b6eb5f0c 100644 --- a/server/src/sys/message.rs +++ b/server/src/sys/message.rs @@ -27,8 +27,8 @@ use specs::{ }; impl Sys { - ///We need to move this to a async fn, otherwise the compiler generates to - /// much recursive fn, and async closures dont work yet + ///We needed to move this to a async fn, if we would use a async closures + /// the compiler generates to much recursion and fails to compile this #[allow(clippy::too_many_arguments)] async fn handle_client_msg( server_emitter: &mut common::event::Emitter<'_, ServerEvent>, @@ -57,7 +57,7 @@ impl Sys { settings: &Read<'_, ServerSettings>, ) -> Result<(), crate::error::Error> { loop { - let msg = client.singleton_stream.recv().await?; + let msg = client.recv().await?; *cnt += 1; match msg { // Go back to registered state (char selection screen)