fix a followup bug, after a protocol fail now Participant is closed, including all streams, so we get the stream errors.

We MUST handle them and we are not allowed to act on a stream after it failed, as i am to lazy to change the structure to ensure the client to be imeadiatly dropped i added a AtomicBool to it.
This commit is contained in:
Marcel Märtens 2020-07-11 16:08:25 +02:00
parent 187ec42aa2
commit 6db9c6f91b
7 changed files with 68 additions and 19 deletions

View File

@ -613,8 +613,7 @@ impl Client {
);
}
self.singleton_stream
.send(ClientMsg::ControllerInputs(inputs))
.unwrap();
.send(ClientMsg::ControllerInputs(inputs))?;
}
// 2) Build up a list of events for this frame, to be passed to the frontend.

View File

@ -905,17 +905,16 @@ impl Drop for Participant {
.send((self.remote_pid, finished_sender))
.await
.expect("Something is wrong in internal scheduler coding");
match finished_receiver
if let Err(e) = finished_receiver
.await
.expect("Something is wrong in internal scheduler/participant coding")
{
Err(e) => error!(
error!(
?pid,
?e,
"Error while dropping the participant, couldn't send all outgoing \
messages, dropping remaining"
),
_ => (),
);
};
});
},

View File

@ -45,7 +45,7 @@ pub(crate) fn serialize<M: Serialize>(message: &M) -> MessageBuffer {
// std::Result<M, std::Box<bincode::error::bincode::ErrorKind>> {
pub(crate) fn deserialize<M: DeserializeOwned>(buffer: MessageBuffer) -> bincode::Result<M> {
let span = lz4_compress::decompress(&buffer.data)
.expect("lz4_compression error, failed to deserialze");
.expect("lz4 decompression failed, failed to deserialze");
//this might fail if you choose the wrong type for M. in that case probably X
// got transfered while you assume Y. probably this means your application
// logic is wrong. E.g. You expect a String, but just get a u8.

View File

@ -1,3 +1,23 @@
//! How to read those tests:
//! - in the first line we call the helper, this is only debug code. in case
//! you want to have tracing for a special test you set set the bool = true
//! and the sleep to 10000 and your test will start 10 sec delayed with
//! tracing. You need a delay as otherwise the other tests polute your trace
//! - the second line is to simulate a client and a server
//! `network_participant_stream` will return
//! - 2 networks
//! - 2 participants
//! - 2 streams
//! each one `linked` to their counterpart.
//! You see a cryptic use of rust `_` this is because we are testing the
//! `drop` behavior here.
//! - A `_` means this is directly dropped after the line executes, thus
//! immediately executing its `Drop` impl.
//! - A `_p1_a` e.g. means we don't use that Participant yet, but we must
//! not `drop` it yet as we might want to use the Streams.
//! - You sometimes see sleep(1000ms) this is used when we rely on the
//! underlying TCP functionality, as this simulates client and server
use async_std::task;
use task::block_on;
use veloren_network::StreamError;
@ -22,9 +42,13 @@ fn close_participant() {
let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp()));
block_on(p1_a.disconnect()).unwrap();
//We dont know of if the disconect is done YET, so the next command will either
// return already closed or fail a gracefully close as it will discover that the
// remote site closed right now
// The following will `Err`, but we don't know the exact error message.
// Why? because of the TCP layer we have no guarantee if the TCP messages send
// one line above already reached `p1_b`. If they reached them it would fail
// with a `ParticipantDisconnected` as a clean disconnect was performed.
// If they haven't reached them yet but will reach them during the execution it
// will return a unclean shutdown was detected. Nevertheless, if it returns
// Ok(()) then something is wrong!
assert!(block_on(p1_b.disconnect()).is_err());
assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed));

View File

@ -1,15 +1,21 @@
use common::msg::{ClientState, RequestStateError, ServerMsg};
use crate::error::Error;
use common::msg::{ClientMsg, ClientState, RequestStateError, ServerMsg};
use hashbrown::HashSet;
use network::{Participant, Stream};
use specs::{Component, FlaggedStorage};
use specs_idvs::IdvStorage;
use std::sync::Mutex;
use std::sync::{
atomic::{AtomicBool, Ordering},
Mutex,
};
use tracing::debug;
use vek::*;
pub struct Client {
pub client_state: ClientState,
pub participant: Mutex<Option<Participant>>,
pub singleton_stream: Stream,
pub network_error: AtomicBool,
pub last_ping: f64,
pub login_msg_sent: bool,
}
@ -19,7 +25,29 @@ impl Component for Client {
}
impl Client {
pub fn notify(&mut self, msg: ServerMsg) { let _ = self.singleton_stream.send(msg); }
pub fn notify(&mut self, msg: ServerMsg) {
if !self.network_error.load(Ordering::Relaxed) {
if let Err(e) = self.singleton_stream.send(msg) {
debug!(?e, "got a network error with client");
self.network_error.store(true, Ordering::Relaxed);
}
}
}
pub async fn recv(&mut self) -> Result<ClientMsg, Error> {
if !self.network_error.load(Ordering::Relaxed) {
match self.singleton_stream.recv().await {
Ok(r) => Ok(r),
Err(e) => {
debug!(?e, "got a network error with client while recv");
self.network_error.store(true, Ordering::Relaxed);
Err(Error::StreamErr(e))
},
}
} else {
Err(Error::StreamErr(network::StreamError::StreamClosed))
}
}
pub fn is_registered(&self) -> bool {
match self.client_state {
@ -43,9 +71,7 @@ impl Client {
}
pub fn error_state(&mut self, error: RequestStateError) {
let _ = self
.singleton_stream
.send(ServerMsg::StateAnswer(Err((error, self.client_state))));
let _ = self.notify(ServerMsg::StateAnswer(Err((error, self.client_state))));
}
}

View File

@ -604,6 +604,7 @@ impl Server {
client_state: ClientState::Connected,
participant: std::sync::Mutex::new(Some(participant)),
singleton_stream,
network_error: std::sync::atomic::AtomicBool::new(false),
last_ping: self.state.get_time(),
login_msg_sent: false,
};

View File

@ -27,8 +27,8 @@ use specs::{
};
impl Sys {
///We need to move this to a async fn, otherwise the compiler generates to
/// much recursive fn, and async closures dont work yet
///We needed to move this to a async fn, if we would use a async closures
/// the compiler generates to much recursion and fails to compile this
#[allow(clippy::too_many_arguments)]
async fn handle_client_msg(
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
@ -57,7 +57,7 @@ impl Sys {
settings: &Read<'_, ServerSettings>,
) -> Result<(), crate::error::Error> {
loop {
let msg = client.singleton_stream.recv().await?;
let msg = client.recv().await?;
*cnt += 1;
match msg {
// Go back to registered state (char selection screen)