From 80c8c389dea90e66d50f4a58d4941611e7610203 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Tue, 18 Aug 2020 13:48:26 +0200 Subject: [PATCH 1/6] improve tracing and spans in network crate --- network/src/participant.rs | 2 ++ network/src/protocols.rs | 15 +++++++-------- network/src/scheduler.rs | 19 ++++++++++++++----- server/src/events/player.rs | 5 +++-- 4 files changed, 26 insertions(+), 15 deletions(-) diff --git a/network/src/participant.rs b/network/src/participant.rs index 085ddfcb77..0363d08310 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -25,6 +25,7 @@ use std::{ time::{Duration, Instant}, }; use tracing::*; +use tracing_futures::Instrument; pub(crate) type A2bStreamOpen = (Prio, Promises, oneshot::Sender); pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>); @@ -477,6 +478,7 @@ impl BParticipant { trace!(?cid, "Running channel in participant"); channel .run(protocol, w2b_frames_s, leftover_cid_frame) + .instrument(tracing::info_span!("", ?cid)) .await; #[cfg(feature = "metrics")] self.metrics diff --git a/network/src/protocols.rs b/network/src/protocols.rs index 687e1d8885..a40e218e2d 100644 --- a/network/src/protocols.rs +++ b/network/src/protocols.rs @@ -80,11 +80,10 @@ impl TcpProtocol { } { Some(Ok(_)) => false, Some(Err(e)) => { - debug!( - ?cid, + info!( ?e, - "Closing tcp protocol due to read error, sending close frame to gracefully \ - shutdown" + "Closing tcp protocol due to read error, pushing close frame (to own \ + Channel/Participant) to gracefully shutdown" ); w2c_cid_frame_s .send((cid, Frame::Shutdown)) @@ -93,7 +92,7 @@ impl TcpProtocol { true }, None => { - trace!(?cid, "shutdown requested"); + trace!("shutdown requested"); true }, } @@ -119,7 +118,7 @@ impl TcpProtocol { macro_rules! read_or_close { ($x:expr) => { if TcpProtocol::read_or_close(cid, &stream, $x, w2c_cid_frame_s, &mut end_r).await { - info!("Tcp stream closed, shutting down read"); + trace!("read_or_close requested a shutdown"); break; } }; @@ -228,7 +227,7 @@ impl TcpProtocol { ) -> bool { match stream.write_all(&bytes).await { Err(e) => { - debug!( + info!( ?e, "Got an error writing to tcp, going to close this channel" ); @@ -255,7 +254,7 @@ impl TcpProtocol { macro_rules! write_or_close { ($x:expr) => { if TcpProtocol::write_or_close(&mut stream, $x, &mut c2w_frame_r).await { - info!("Tcp stream closed, shutting down write"); + trace!("write_or_close requested a shutdown"); break; } }; diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 000ccd92e7..8aa6dcd6e9 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -512,7 +512,11 @@ impl Scheduler { metrics.clone(), send_handshake, ); - match handshake.setup(&protocol).await { + match handshake + .setup(&protocol) + .instrument(tracing::info_span!("handshake", ?cid)) + .await + { Ok((pid, sid, secret, leftover_cid_frame)) => { trace!( ?cid, @@ -583,14 +587,18 @@ impl Scheduler { } } else { let pi = &participants[&pid]; - trace!("2nd+ channel of participant, going to compare security ids"); + trace!( + ?cid, + "2nd+ channel of participant, going to compare security ids" + ); if pi.secret != secret { warn!( + ?cid, ?pid, ?secret, "Detected incompatible Secret!, this is probably an attack!" ); - error!("Just dropping here, TODO handle this correctly!"); + error!(?cid, "Just dropping here, TODO handle this correctly!"); //TODO if let Some(pid_oneshot) = s2a_return_pid_s { // someone is waiting with `connect`, so give them their Error @@ -604,6 +612,7 @@ impl Scheduler { return; } error!( + ?cid, "Ufff i cant answer the pid_oneshot. as i need to create the SAME \ participant. maybe switch to ARC" ); @@ -614,7 +623,7 @@ impl Scheduler { Err(()) => { if let Some(pid_oneshot) = s2a_return_pid_s { // someone is waiting with `connect`, so give them their Error - trace!("returning the Err to api who requested the connect"); + trace!(?cid, "returning the Err to api who requested the connect"); pid_oneshot .send(Err(std::io::Error::new( std::io::ErrorKind::PermissionDenied, @@ -625,7 +634,7 @@ impl Scheduler { }, } } - .instrument(tracing::trace_span!("")), + .instrument(tracing::info_span!("")), ); /*WORKAROUND FOR SPAN NOT TO GET LOST*/ } } diff --git a/server/src/events/player.rs b/server/src/events/player.rs index 306300605e..bc05196f66 100644 --- a/server/src/events/player.rs +++ b/server/src/events/player.rs @@ -94,15 +94,16 @@ pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event if let Err(e) = block_on(participant.disconnect()) { debug!( ?e, + ?pid, "Error when disconnecting client, maybe the pipe already broke" ); }; trace!(?pid, "finished disconnect"); let elapsed = now.elapsed(); if elapsed.as_millis() > 100 { - warn!(?elapsed, "disconecting took quite long"); + warn!(?elapsed, ?pid, "disconecting took quite long"); } else { - debug!(?elapsed, "disconecting took"); + debug!(?elapsed, ?pid, "disconecting took"); } }); } From effecd7a04afedecc7c1a336a4d41c2938900e96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Tue, 18 Aug 2020 17:52:19 +0200 Subject: [PATCH 2/6] protocols no longer send a Close Frame in case the read fails. They just fail, let participant handle this! Participant will now handle a close in the `create_channel_mgr` rather then the `send` fn. Its the better place, which makes a HashMap better for delete lookup Since tcp_read now broke but tcp_write didn't and the Participant wasnt updated till both were broke, we changed CHANNEL tcp_read and tcp_write in protocols to be a `select` rather than a `join` However only do this in the CHANNEL, but in the HANDSHAKE. it fails if you try to. Also the handshake will take care of any failed read or write manually and will handle a clear teardown in this case. --- network/src/channel.rs | 14 ++++----- network/src/participant.rs | 59 ++++++++++++++++++++++++-------------- network/src/protocols.rs | 14 ++------- network/src/scheduler.rs | 1 + network/tests/closing.rs | 15 +++++++--- 5 files changed, 59 insertions(+), 44 deletions(-) diff --git a/network/src/channel.rs b/network/src/channel.rs index 25babccd2b..63062ef113 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -9,7 +9,7 @@ use crate::{ }; use futures::{ channel::{mpsc, oneshot}, - join, + join, select, sink::SinkExt, stream::StreamExt, FutureExt, @@ -58,15 +58,15 @@ impl Channel { trace!(?self.cid, "Start up channel"); match protocol { Protocols::Tcp(tcp) => { - futures::join!( - tcp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver), - tcp.write_to_wire(self.cid, c2w_frame_r), + select!( + _ = tcp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver).fuse() => (), + _ = tcp.write_to_wire(self.cid, c2w_frame_r).fuse() => (), ); }, Protocols::Udp(udp) => { - futures::join!( - udp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver), - udp.write_to_wire(self.cid, c2w_frame_r), + select!( + _ = udp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver).fuse() => (), + _ = udp.write_to_wire(self.cid, c2w_frame_r).fuse() => (), ); }, } diff --git a/network/src/participant.rs b/network/src/participant.rs index 0363d08310..b05701ff50 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -70,7 +70,7 @@ pub struct BParticipant { remote_pid: Pid, remote_pid_string: String, //optimisation offset_sid: Sid, - channels: Arc>>, + channels: Arc>>, streams: RwLock>, running_mgr: AtomicUsize, run_channels: Option, @@ -119,7 +119,7 @@ impl BParticipant { remote_pid, remote_pid_string: remote_pid.to_string(), offset_sid, - channels: Arc::new(RwLock::new(vec![])), + channels: Arc::new(RwLock::new(HashMap::new())), streams: RwLock::new(HashMap::new()), running_mgr: AtomicUsize::new(0), run_channels, @@ -252,27 +252,21 @@ impl BParticipant { // find out ideal channel here //TODO: just take first let mut lock = self.channels.write().await; - if let Some(ci) = lock.get_mut(0) { - //note: this is technically wrong we should only increase when it suceeded, but - // this requiered me to clone `frame` which is a to big performance impact for - // error handling + if let Some(ci) = lock.values_mut().next() { + //note: this is technically wrong we should only increase when it succeeded, + // but this requiered me to clone `frame` which is a to big + // performance impact for error handling #[cfg(feature = "metrics")] frames_out_total_cache .with_label_values(ci.cid, &frame) .inc(); if let Err(e) = ci.b2w_frame_s.send(frame).await { + let cid = ci.cid; warn!( ?e, - "The channel got closed unexpectedly, cleaning it up now." + ?cid, + "channel no longer available, maybe cleanup in process?" ); - let ci = lock.remove(0); - if let Err(e) = ci.b2r_read_shutdown.send(()) { - debug!( - ?e, - "Error shutdowning channel, which is prob fine as we detected it to no \ - longer work in the first place" - ); - }; //TODO FIXME tags: takeover channel multiple info!( "FIXME: the frame is actually drop. which is fine for now as the participant \ @@ -292,10 +286,18 @@ impl BParticipant { guard.0 = now; let occurrences = guard.1 + 1; guard.1 = 0; - error!(?occurrences, "Participant has no channel to communicate on"); + let lastframe = frame; + error!( + ?occurrences, + ?lastframe, + "Participant has no channel to communicate on" + ); } else { guard.1 += 1; } + //TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant + self.close_api(Some(ParticipantError::ProtocolFailedUnrecoverable)) + .await; false } } @@ -463,7 +465,7 @@ impl BParticipant { let channels = self.channels.clone(); async move { let (channel, b2w_frame_s, b2r_read_shutdown) = Channel::new(cid); - channels.write().await.push(ChannelInfo { + channels.write().await.insert(cid, ChannelInfo { cid, cid_string: cid.to_string(), b2w_frame_s, @@ -485,7 +487,17 @@ impl BParticipant { .channels_disconnected_total .with_label_values(&[&self.remote_pid_string]) .inc(); - trace!(?cid, "Channel got closed"); + info!(?cid, "Channel got closed, cleaning up"); + //stopping read in case write triggered the failure + if let Some(ci) = channels.write().await.remove(&cid) { + if let Err(e) = ci.b2r_read_shutdown.send(()) { + debug!(?e, "read channel was already shut down"); + }; + } //None means it prob got closed by closing the participant + trace!(?cid, "Channel cleanup completed"); + //TEMP FIX: as we dont have channel takeover yet drop the whole + // bParticipant + self.close_api(None).await; } }, ) @@ -575,9 +587,14 @@ impl BParticipant { b2b_prios_flushed_r.await.unwrap(); debug!("Closing all channels, after flushed prios"); - for ci in self.channels.write().await.drain(..) { + for (cid, ci) in self.channels.write().await.drain() { if let Err(e) = ci.b2r_read_shutdown.send(()) { - debug!(?e, ?ci.cid, "Seems like this read protocol got already dropped by closing the Stream itself, just ignoring the fact"); + debug!( + ?e, + ?cid, + "Seems like this read protocol got already dropped by closing the Stream \ + itself, ignoring" + ); }; } @@ -649,7 +666,7 @@ impl BParticipant { si.send_closed.store(true, Ordering::Relaxed); si.b2a_msg_recv_s.close_channel(); }, - None => warn!("Couldn't find the stream, might be simulanious close from remote"), + None => warn!("Couldn't find the stream, might be simultaneous close from remote"), } //TODO: what happens if RIGHT NOW the remote sends a StreamClose and this diff --git a/network/src/protocols.rs b/network/src/protocols.rs index a40e218e2d..0178bd805d 100644 --- a/network/src/protocols.rs +++ b/network/src/protocols.rs @@ -68,10 +68,8 @@ impl TcpProtocol { /// read_except and if it fails, close the protocol async fn read_or_close( - cid: Cid, mut stream: &TcpStream, mut bytes: &mut [u8], - w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>, mut end_receiver: &mut Fuse>, ) -> bool { match select! { @@ -80,15 +78,7 @@ impl TcpProtocol { } { Some(Ok(_)) => false, Some(Err(e)) => { - info!( - ?e, - "Closing tcp protocol due to read error, pushing close frame (to own \ - Channel/Participant) to gracefully shutdown" - ); - w2c_cid_frame_s - .send((cid, Frame::Shutdown)) - .await - .expect("Channel or Participant seems no longer to exist to be Shutdown"); + info!(?e, "Closing tcp protocol due to read error"); true }, None => { @@ -117,7 +107,7 @@ impl TcpProtocol { macro_rules! read_or_close { ($x:expr) => { - if TcpProtocol::read_or_close(cid, &stream, $x, w2c_cid_frame_s, &mut end_r).await { + if TcpProtocol::read_or_close(&stream, $x, &mut end_r).await { trace!("read_or_close requested a shutdown"); break; } diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 8aa6dcd6e9..b7792f811f 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -621,6 +621,7 @@ impl Scheduler { // move directly to participant! }, Err(()) => { + debug!(?cid, "Handshake from a new connection failed"); if let Some(pid_oneshot) = s2a_return_pid_s { // someone is waiting with `connect`, so give them their Error trace!(?cid, "returning the Err to api who requested the connect"); diff --git a/network/tests/closing.rs b/network/tests/closing.rs index 1eb81b1990..af0674460a 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -42,10 +42,8 @@ fn close_participant() { let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp())); block_on(p1_a.disconnect()).unwrap(); - assert_eq!( - block_on(p1_b.disconnect()), - Err(ParticipantError::ParticipantDisconnected) - ); + //As no more read/write is done on p1_b the disconnect is successful + block_on(p1_b.disconnect()).unwrap(); assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed)); assert_eq!( @@ -227,6 +225,15 @@ fn close_network_then_disconnect_part() { } #[test] +/* +FLANKY: +---- opened_stream_before_remote_part_is_closed stdout ---- +thread 'opened_stream_before_remote_part_is_closed' panicked at 'assertion failed: `(left == right)` + left: `Err(StreamClosed)`, + right: `Ok("HelloWorld")`', network/tests/closing.rs:236:5 +note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace + +*/ fn opened_stream_before_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp())); From 410a4a99b7147c89488029283ccc3a9162ad6627 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Tue, 18 Aug 2020 18:42:02 +0200 Subject: [PATCH 3/6] remove some `trace!` in network which a) was only spam and b) could be replaced by a metric way better. added a span for disconnecting on the gameserver side. also added more debug! tracing there Just keeping a trace! all 10000ms active to have a keep alive feeling. --- network/src/metrics.rs | 10 ++++++++++ network/src/participant.rs | 8 +++++--- network/src/prios.rs | 15 +++++---------- server/src/events/player.rs | 11 ++++++----- server/src/sys/message.rs | 26 +++++++++++++------------- 5 files changed, 39 insertions(+), 31 deletions(-) diff --git a/network/src/metrics.rs b/network/src/metrics.rs index 4436a8ce7f..02940acd40 100644 --- a/network/src/metrics.rs +++ b/network/src/metrics.rs @@ -34,6 +34,8 @@ pub struct NetworkMetrics { pub message_out_total: IntCounterVec, // send(prio) Messages throughput, seperated by STREAM AND PARTICIPANT, pub message_out_throughput: IntCounterVec, + // flushed(prio) stream count, seperated by PARTICIPANT, + pub streams_flushed: IntCounterVec, // TODO: queued Messages, seperated by STREAM (add PART, CHANNEL), // queued Messages, seperated by PARTICIPANT pub queued_count: IntGaugeVec, @@ -167,6 +169,13 @@ impl NetworkMetrics { ), &["participant", "stream"], )?; + let streams_flushed = IntCounterVec::new( + Opts::new( + "stream_flushed", + "Number of flushed streams requested to PrioManager at participant level", + ), + &["participant"], + )?; let queued_count = IntGaugeVec::new( Opts::new( "queued_count", @@ -207,6 +216,7 @@ impl NetworkMetrics { wire_in_throughput, message_out_total, message_out_throughput, + streams_flushed, queued_count, queued_bytes, participants_ping, diff --git a/network/src/participant.rs b/network/src/participant.rs index b05701ff50..eca9945931 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -206,13 +206,11 @@ impl BParticipant { #[cfg(feature = "metrics")] let mut send_cache = PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); + let mut i: u64 = 0; loop { let mut frames = VecDeque::new(); prios.fill_frames(FRAMES_PER_TICK, &mut frames).await; let len = frames.len(); - if len > 0 { - trace!("Tick {}", len); - } for (_, frame) in frames { self.send_frame( frame, @@ -226,6 +224,10 @@ impl BParticipant { .await .unwrap(); async_std::task::sleep(TICK_TIME).await; + i += 1; + if i.rem_euclid(1000) == 0 { + trace!("Did 1000 ticks"); + } //shutdown after all msg are send! if closing_up && (len == 0) { break; diff --git a/network/src/prios.rs b/network/src/prios.rs index 4e5971a32e..1026b5f581 100644 --- a/network/src/prios.rs +++ b/network/src/prios.rs @@ -16,8 +16,6 @@ use futures::channel::oneshot; use std::collections::{HashMap, HashSet, VecDeque}; #[cfg(feature = "metrics")] use std::sync::Arc; -use tracing::*; - const PRIO_MAX: usize = 64; #[derive(Default)] @@ -148,11 +146,8 @@ impl PrioManager { async fn tick(&mut self) { // Check Range - let mut messages = 0; - let mut closed = 0; for (prio, sid, msg) in self.messages_rx.try_iter() { debug_assert!(prio as usize <= PRIO_MAX); - messages += 1; #[cfg(feature = "metrics")] { let sid_string = sid.to_string(); @@ -173,7 +168,11 @@ impl PrioManager { } //this must be AFTER messages for (sid, return_sender) in self.sid_flushed_rx.try_iter() { - closed += 1; + #[cfg(feature = "metrics")] + self.metrics + .streams_flushed + .with_label_values(&[&self.pid]) + .inc(); if let Some(cnt) = self.sid_owned.get_mut(&sid) { // register sender cnt.empty_notify = Some(return_sender); @@ -182,9 +181,6 @@ impl PrioManager { return_sender.send(()).unwrap(); } } - if messages > 0 || closed > 0 { - trace!(?messages, ?closed, "tick"); - } } //if None returned, we are empty! @@ -256,7 +252,6 @@ impl PrioManager { } } } else { - trace!(?msg.mid, "Repush message"); self.messages[prio as usize].push_front((sid, msg)); } }, diff --git a/server/src/events/player.rs b/server/src/events/player.rs index bc05196f66..8b5ea37dfe 100644 --- a/server/src/events/player.rs +++ b/server/src/events/player.rs @@ -79,18 +79,19 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) { pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event { if let Some(client) = server.state().read_storage::().get(entity) { - trace!("Closing participant of client"); let participant = match client.participant.try_lock() { Ok(mut p) => p.take().unwrap(), Err(e) => { - error!(?e, "coudln't lock participant for removal"); + error!(?e, ?entity, "coudln't lock participant for removal"); return Event::ClientDisconnected { entity }; }, }; - std::thread::spawn(|| { - let pid = participant.remote_pid(); + let pid = participant.remote_pid(); + std::thread::spawn(move || { + let span = tracing::span!(tracing::Level::DEBUG, "client_disconnect", ?pid, ?entity); + let _enter = span.enter(); let now = std::time::Instant::now(); - trace!(?pid, "start disconnect"); + debug!(?pid, ?entity, "Start handle disconnect of client"); if let Err(e) = block_on(participant.disconnect()) { debug!( ?e, diff --git a/server/src/sys/message.rs b/server/src/sys/message.rs index 234f6c0a86..415f9d4727 100644 --- a/server/src/sys/message.rs +++ b/server/src/sys/message.rs @@ -25,6 +25,7 @@ use hashbrown::HashMap; use specs::{ Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage, }; +use tracing::{debug, error, info, warn}; impl Sys { ///We needed to move this to a async fn, if we would use a async closures @@ -267,17 +268,13 @@ impl Sys { let msg = mode.new_message(*from, message); new_chat_msgs.push((Some(entity), msg)); } else { - tracing::error!("Could not send message. Missing player uid"); + error!("Could not send message. Missing player uid"); } }, Err(ChatMsgValidationError::TooLong) => { let max = MAX_BYTES_CHAT_MSG; let len = message.len(); - tracing::warn!( - ?len, - ?max, - "Recieved a chat message that's too long" - ) + warn!(?len, ?max, "Recieved a chat message that's too long") }, } }, @@ -342,7 +339,9 @@ impl Sys { client.notify(ServerMsg::Disconnect); }, ClientMsg::Terminate => { + debug!(?entity, "Client send message to termitate session"); server_emitter.emit(ServerEvent::ClientDisconnect(entity)); + break Ok(()); }, ClientMsg::RequestCharacterList => { if let Some(player) = players.get(entity) { @@ -351,11 +350,7 @@ impl Sys { }, ClientMsg::CreateCharacter { alias, tool, body } => { if let Err(error) = alias_validator.validate(&alias) { - tracing::debug!( - ?error, - ?alias, - "denied alias as it contained a banned word" - ); + debug!(?error, ?alias, "denied alias as it contained a banned word"); client.notify(ServerMsg::CharacterActionError(error.to_string())); } else if let Some(player) = players.get(entity) { character_loader.create_character( @@ -522,10 +517,15 @@ impl<'a> System<'a> for Sys { // Update client ping. if cnt > 0 { client.last_ping = time.0 - } else if time.0 - client.last_ping > CLIENT_TIMEOUT // Timeout - || network_err.is_err() + } else if time.0 - client.last_ping > CLIENT_TIMEOUT + // Timeout + { + info!(?entity, "timeout error with client, disconnecting"); + server_emitter.emit(ServerEvent::ClientDisconnect(entity)); + } else if network_err.is_err() // Postbox error { + debug!(?entity, "postbox error with client, disconnecting"); server_emitter.emit(ServerEvent::ClientDisconnect(entity)); } else if time.0 - client.last_ping > CLIENT_TIMEOUT * 0.5 { // Try pinging the client if the timeout is nearing. From ecb443d1cdf1d25816b3a90f1ef29ffac373a128 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Tue, 18 Aug 2020 19:03:43 +0200 Subject: [PATCH 4/6] enable `veloren_network` metrics --- server/src/lib.rs | 13 ++++++++----- server/src/metrics.rs | 23 +++++++++++++---------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/server/src/lib.rs b/server/src/lib.rs index aef6fe1dbb..d93851401c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -274,16 +274,19 @@ impl Server { let mut metrics = ServerMetrics::new(); // register all metrics submodules here - let tick_metrics = TickMetrics::new(metrics.registry(), metrics.tick_clone()) + let tick_metrics = TickMetrics::new(metrics.tick_clone()) .expect("Failed to initialize server tick metrics submodule."); - metrics - .run(settings.metrics_address) - .expect("Failed to initialize server metrics submodule."); + tick_metrics + .register(&metrics.registry()) + .expect("failed to register tick metrics"); let thread_pool = ThreadPoolBuilder::new() .name("veloren-worker".to_string()) .build(); - let (network, f) = Network::new(Pid::new()); + let (network, f) = Network::new_with_registry(Pid::new(), &metrics.registry()); + metrics + .run(settings.metrics_address) + .expect("Failed to initialize server metrics submodule."); thread_pool.execute(f); block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?; diff --git a/server/src/metrics.rs b/server/src/metrics.rs index 11913abd7a..a23a77db79 100644 --- a/server/src/metrics.rs +++ b/server/src/metrics.rs @@ -33,7 +33,7 @@ pub struct ServerMetrics { } impl TickMetrics { - pub fn new(registry: &Registry, tick: Arc) -> Result> { + pub fn new(tick: Arc) -> Result> { let player_online = IntGauge::with_opts(Opts::new( "player_online", "shows the number of clients connected to the server", @@ -74,15 +74,6 @@ impl TickMetrics { .expect("Time went backwards"); start_time.set(since_the_epoch.as_secs().try_into()?); - registry.register(Box::new(player_online.clone()))?; - registry.register(Box::new(entity_count.clone()))?; - registry.register(Box::new(build_info.clone()))?; - registry.register(Box::new(start_time.clone()))?; - registry.register(Box::new(time_of_day.clone()))?; - registry.register(Box::new(chonks_count.clone()))?; - registry.register(Box::new(chunks_count.clone()))?; - registry.register(Box::new(tick_time.clone()))?; - Ok(Self { chonks_count, chunks_count, @@ -97,6 +88,18 @@ impl TickMetrics { }) } + pub fn register(&self, registry: &Registry) -> Result<(), Box> { + registry.register(Box::new(self.player_online.clone()))?; + registry.register(Box::new(self.entity_count.clone()))?; + registry.register(Box::new(self.build_info.clone()))?; + registry.register(Box::new(self.start_time.clone()))?; + registry.register(Box::new(self.time_of_day.clone()))?; + registry.register(Box::new(self.chonks_count.clone()))?; + registry.register(Box::new(self.chunks_count.clone()))?; + registry.register(Box::new(self.tick_time.clone()))?; + Ok(()) + } + pub fn is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 } } From a62b1e3b7c80872d342576095ff8c35efef80808 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Fri, 21 Aug 2020 14:01:49 +0200 Subject: [PATCH 5/6] Fixed the unclean disconnecting of participants. Till now, we just dropped the TCP connection and registered this as a clean shutdown. The prodocol reader intereted this and send a Frame::Shutdown frame to it's local processor. This is ofc wrong. So now the protocol reader will detect a Frame::Shutdown frame and send it over. if the Tcp connection gets closed it will return an Error up. The processor will then pick up this error and request a unclear shutdown and notifies the user. Also when doing a clean shutdown we are sending a Frame::Shutdown now to the remote side to trigger this behavior. Before we wrongly added the feature of only using a `select` in channel. This is WRONG, as it could mean that the write maybe fails, but the read still had some Frames buffered which then get dropped. Its fixed now by the clean shutdown mechanims defined before. Also when a channel is closed now inside a participant we are closing the whole participant as a protection. However, we must not close the recv channel as the `handle_frames_mgr` might still be working on them, so we only stop writing/sending. Debugging this also let me introduce some smaller fixes: - PID in tests are now 0 and 1+1*64+1*64*64+... this makes the traces appear as AAAAAA and BBBBBB instead of ABAAAA and ACAAAA - veloren client now better seperates between clean shutdown and unclear shutdown. - added a new type: C2pFrame for `(cid, Result)` - wrong frames inside the handshare are not counted in metrics - --- client/src/lib.rs | 2 + network/src/channel.rs | 74 ++++++++++++++------------- network/src/participant.rs | 102 ++++++++++++++++++++++++------------- network/src/protocols.rs | 48 +++++++++++------ network/src/types.rs | 18 +++++-- network/tests/closing.rs | 24 ++++----- network/tests/helper.rs | 4 +- 7 files changed, 167 insertions(+), 105 deletions(-) diff --git a/client/src/lib.rs b/client/src/lib.rs index 1c5e28bf6c..187c33306f 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1383,8 +1383,10 @@ impl Client { ); }, ServerMsg::Disconnect => { + debug!("finally sendinge ClientMsg::Terminate"); frontend_events.push(Event::Disconnect); self.singleton_stream.send(ClientMsg::Terminate)?; + break Ok(()); }, ServerMsg::CharacterListUpdate(character_list) => { self.character_list.characters = character_list; diff --git a/network/src/channel.rs b/network/src/channel.rs index 63062ef113..6e9a4c5e43 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -1,6 +1,7 @@ #[cfg(feature = "metrics")] use crate::metrics::NetworkMetrics; use crate::{ + participant::C2pFrame, protocols::Protocols, types::{ Cid, Frame, Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER, @@ -9,7 +10,7 @@ use crate::{ }; use futures::{ channel::{mpsc, oneshot}, - join, select, + join, sink::SinkExt, stream::StreamExt, FutureExt, @@ -41,8 +42,8 @@ impl Channel { pub async fn run( mut self, protocol: Protocols, - mut w2c_cid_frame_s: mpsc::UnboundedSender<(Cid, Frame)>, - mut leftover_cid_frame: Vec<(Cid, Frame)>, + mut w2c_cid_frame_s: mpsc::UnboundedSender, + mut leftover_cid_frame: Vec, ) { let c2w_frame_r = self.c2w_frame_r.take().unwrap(); let read_stop_receiver = self.read_stop_receiver.take().unwrap(); @@ -58,15 +59,15 @@ impl Channel { trace!(?self.cid, "Start up channel"); match protocol { Protocols::Tcp(tcp) => { - select!( - _ = tcp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver).fuse() => (), - _ = tcp.write_to_wire(self.cid, c2w_frame_r).fuse() => (), + join!( + tcp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver), + tcp.write_to_wire(self.cid, c2w_frame_r), ); }, Protocols::Udp(udp) => { - select!( - _ = udp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver).fuse() => (), - _ = udp.write_to_wire(self.cid, c2w_frame_r).fuse() => (), + join!( + udp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver), + udp.write_to_wire(self.cid, c2w_frame_r), ); }, } @@ -113,12 +114,9 @@ impl Handshake { } } - pub async fn setup( - self, - protocol: &Protocols, - ) -> Result<(Pid, Sid, u128, Vec<(Cid, Frame)>), ()> { + pub async fn setup(self, protocol: &Protocols) -> Result<(Pid, Sid, u128, Vec), ()> { let (c2w_frame_s, c2w_frame_r) = mpsc::unbounded::(); - let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>(); + let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::(); let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let handler_future = @@ -160,7 +158,7 @@ impl Handshake { async fn frame_handler( &self, - w2c_cid_frame_r: &mut mpsc::UnboundedReceiver<(Cid, Frame)>, + w2c_cid_frame_r: &mut mpsc::UnboundedReceiver, mut c2w_frame_s: mpsc::UnboundedSender, read_stop_sender: oneshot::Sender<()>, ) -> Result<(Pid, Sid, u128), ()> { @@ -176,7 +174,7 @@ impl Handshake { let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame); #[cfg(feature = "metrics")] { - if let Some(ref frame) = frame { + if let Some(Ok(ref frame)) = frame { self.metrics .frames_in_total .with_label_values(&["", &cid_string, &frame.get_string()]) @@ -184,10 +182,10 @@ impl Handshake { } } let r = match frame { - Some(Frame::Handshake { + Some(Ok(Frame::Handshake { magic_number, version, - }) => { + })) => { trace!(?magic_number, ?version, "Recv handshake"); if magic_number != VELOREN_MAGIC_NUMBER { error!(?magic_number, "Connection with invalid magic_number"); @@ -221,18 +219,24 @@ impl Handshake { Ok(()) } }, - Some(Frame::Shutdown) => { - info!("Shutdown signal received"); - Err(()) - }, - Some(Frame::Raw(bytes)) => { - match std::str::from_utf8(bytes.as_slice()) { - Ok(string) => error!(?string, ERR_S), - _ => error!(?bytes, ERR_S), + Some(Ok(frame)) => { + #[cfg(feature = "metrics")] + self.metrics + .frames_in_total + .with_label_values(&["", &cid_string, frame.get_string()]) + .inc(); + if let Frame::Raw(bytes) = frame { + match std::str::from_utf8(bytes.as_slice()) { + Ok(string) => error!(?string, ERR_S), + _ => error!(?bytes, ERR_S), + } } Err(()) }, - Some(_) => Err(()), + Some(Err(())) => { + info!("Protocol got interrupted"); + Err(()) + }, None => Err(()), }; if let Err(()) = r { @@ -248,7 +252,7 @@ impl Handshake { let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame); let r = match frame { - Some(Frame::Init { pid, secret }) => { + Some(Ok(Frame::Init { pid, secret })) => { debug!(?pid, "Participant send their ID"); let pid_string = pid.to_string(); #[cfg(feature = "metrics")] @@ -265,22 +269,24 @@ impl Handshake { info!(?pid, "This Handshake is now configured!"); Ok((pid, stream_id_offset, secret)) }, - Some(frame) => { + Some(Ok(frame)) => { #[cfg(feature = "metrics")] self.metrics .frames_in_total .with_label_values(&["", &cid_string, frame.get_string()]) .inc(); - match frame { - Frame::Shutdown => info!("Shutdown signal received"), - Frame::Raw(bytes) => match std::str::from_utf8(bytes.as_slice()) { + if let Frame::Raw(bytes) = frame { + match std::str::from_utf8(bytes.as_slice()) { Ok(string) => error!(?string, ERR_S), _ => error!(?bytes, ERR_S), - }, - _ => (), + } } Err(()) }, + Some(Err(())) => { + info!("Protocol got interrupted"); + Err(()) + }, None => Err(()), }; if r.is_err() { diff --git a/network/src/participant.rs b/network/src/participant.rs index eca9945931..607b83e925 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -28,7 +28,8 @@ use tracing::*; use tracing_futures::Instrument; pub(crate) type A2bStreamOpen = (Prio, Promises, oneshot::Sender); -pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>); +pub(crate) type C2pFrame = (Cid, Result); +pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, Vec, oneshot::Sender<()>); pub(crate) type S2bShutdownBparticipant = oneshot::Sender>; pub(crate) type B2sPrioStatistic = (Pid, u64, u64); @@ -143,7 +144,7 @@ impl BParticipant { oneshot::channel(); let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel(); let (b2b_prios_flushed_s, b2b_prios_flushed_r) = oneshot::channel(); - let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<(Cid, Frame)>(); + let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::(); let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = PrioManager::new( #[cfg(feature = "metrics")] self.metrics.clone(), @@ -255,27 +256,29 @@ impl BParticipant { //TODO: just take first let mut lock = self.channels.write().await; if let Some(ci) = lock.values_mut().next() { - //note: this is technically wrong we should only increase when it succeeded, - // but this requiered me to clone `frame` which is a to big - // performance impact for error handling + //we are increasing metrics without checking the result to please + // borrow_checker. otherwise we would need to close `frame` what we + // dont want! #[cfg(feature = "metrics")] frames_out_total_cache .with_label_values(ci.cid, &frame) .inc(); if let Err(e) = ci.b2w_frame_s.send(frame).await { let cid = ci.cid; - warn!( - ?e, - ?cid, - "channel no longer available, maybe cleanup in process?" - ); + info!(?e, ?cid, "channel no longer available"); + if let Some(ci) = self.channels.write().await.remove(&cid) { + trace!(?cid, "stopping read protocol"); + if let Err(e) = ci.b2r_read_shutdown.send(()) { + trace!(?cid, ?e, "seems like was already shut down"); + } + } //TODO FIXME tags: takeover channel multiple info!( "FIXME: the frame is actually drop. which is fine for now as the participant \ will be closed, but not if we do channel-takeover" ); //TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant - self.close_api(Some(ParticipantError::ProtocolFailedUnrecoverable)) + self.close_write_api(Some(ParticipantError::ProtocolFailedUnrecoverable)) .await; false } else { @@ -297,16 +300,13 @@ impl BParticipant { } else { guard.1 += 1; } - //TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant - self.close_api(Some(ParticipantError::ProtocolFailedUnrecoverable)) - .await; false } } async fn handle_frames_mgr( &self, - mut w2b_frames_r: mpsc::UnboundedReceiver<(Cid, Frame)>, + mut w2b_frames_r: mpsc::UnboundedReceiver, mut b2a_stream_opened_s: mpsc::UnboundedSender, a2b_close_stream_s: mpsc::UnboundedSender, a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, @@ -318,8 +318,22 @@ impl BParticipant { let mut dropped_cnt = 0u64; let mut dropped_sid = Sid::new(0); - while let Some((cid, frame)) = w2b_frames_r.next().await { - //trace!("handling frame"); + while let Some((cid, result_frame)) = w2b_frames_r.next().await { + //trace!(?result_frame, "handling frame"); + let frame = match result_frame { + Ok(frame) => frame, + Err(()) => { + // The read protocol stopped, i need to make sure that write gets stopped + debug!("read protocol was closed. Stopping write protocol"); + if let Some(ci) = self.channels.write().await.get_mut(&cid) { + ci.b2w_frame_s + .close() + .await + .expect("couldn't stop write protocol"); + } + continue; + }, + }; #[cfg(feature = "metrics")] { let cid_string = cid.to_string(); @@ -328,8 +342,6 @@ impl BParticipant { .with_label_values(&[&self.remote_pid_string, &cid_string, frame.get_string()]) .inc(); } - #[cfg(not(feature = "metrics"))] - let _cid = cid; match frame { Frame::OpenStream { sid, @@ -400,7 +412,7 @@ impl BParticipant { false }; if finished { - //debug!(?mid, "finished receiving message"); + //trace!(?mid, "finished receiving message"); let imsg = messages.remove(&mid).unwrap(); if let Some(si) = self.streams.write().await.get_mut(&imsg.sid) { if let Err(e) = si.b2a_msg_recv_s.send(imsg).await { @@ -453,7 +465,7 @@ impl BParticipant { async fn create_channel_mgr( &self, s2b_create_channel_r: mpsc::UnboundedReceiver, - w2b_frames_s: mpsc::UnboundedSender<(Cid, Frame)>, + w2b_frames_s: mpsc::UnboundedSender, ) { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("Start create_channel_mgr"); @@ -489,17 +501,13 @@ impl BParticipant { .channels_disconnected_total .with_label_values(&[&self.remote_pid_string]) .inc(); - info!(?cid, "Channel got closed, cleaning up"); - //stopping read in case write triggered the failure - if let Some(ci) = channels.write().await.remove(&cid) { - if let Err(e) = ci.b2r_read_shutdown.send(()) { - debug!(?e, "read channel was already shut down"); - }; - } //None means it prob got closed by closing the participant + info!(?cid, "Channel got closed"); + //maybe channel got already dropped, we don't know. + channels.write().await.remove(&cid); trace!(?cid, "Channel cleanup completed"); //TEMP FIX: as we dont have channel takeover yet drop the whole // bParticipant - self.close_api(None).await; + self.close_write_api(None).await; } }, ) @@ -578,6 +586,9 @@ impl BParticipant { trace!("Start participant_shutdown_mgr"); let sender = s2b_shutdown_bparticipant_r.await.unwrap(); + let mut send_cache = + PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); + self.close_api(None).await; debug!("Closing all managers"); @@ -588,6 +599,21 @@ impl BParticipant { } b2b_prios_flushed_r.await.unwrap(); + if Some(ParticipantError::ParticipantDisconnected) != self.shutdown_info.read().await.error + { + debug!("Sending shutdown frame after flushed all prios"); + if !self + .send_frame( + Frame::Shutdown, + #[cfg(feature = "metrics")] + &mut send_cache, + ) + .await + { + warn!("couldn't send shutdown frame, are channels already closed?"); + } + } + debug!("Closing all channels, after flushed prios"); for (cid, ci) in self.channels.write().await.drain() { if let Err(e) = ci.b2r_read_shutdown.send(()) { @@ -732,21 +758,29 @@ impl BParticipant { ) } - /// close streams and set err - async fn close_api(&self, reason: Option) { - //closing api::Participant is done by closing all channels, exepct for the - // shutdown channel at this point! + async fn close_write_api(&self, reason: Option) { + trace!(?reason, "close_api"); let mut lock = self.shutdown_info.write().await; if let Some(r) = reason { lock.error = Some(r); } lock.b2a_stream_opened_s.close_channel(); + debug!("Closing all streams for write"); + for (sid, si) in self.streams.write().await.iter() { + trace!(?sid, "Shutting down Stream for write"); + si.send_closed.store(true, Ordering::Relaxed); + } + } + + ///closing api::Participant is done by closing all channels, exepct for the + /// shutdown channel at this point! + async fn close_api(&self, reason: Option) { + self.close_write_api(reason).await; debug!("Closing all streams"); for (sid, si) in self.streams.write().await.drain() { trace!(?sid, "Shutting down Stream"); si.b2a_msg_recv_s.close_channel(); - si.send_closed.store(true, Ordering::Relaxed); } } } diff --git a/network/src/protocols.rs b/network/src/protocols.rs index 0178bd805d..910b11d5c6 100644 --- a/network/src/protocols.rs +++ b/network/src/protocols.rs @@ -1,6 +1,9 @@ #[cfg(feature = "metrics")] use crate::metrics::{CidFrameCache, NetworkMetrics}; -use crate::types::{Cid, Frame, Mid, Pid, Sid}; +use crate::{ + participant::C2pFrame, + types::{Cid, Frame, Mid, Pid, Sid}, +}; use async_std::{ net::{TcpStream, UdpSocket}, prelude::*, @@ -68,9 +71,11 @@ impl TcpProtocol { /// read_except and if it fails, close the protocol async fn read_or_close( + cid: Cid, mut stream: &TcpStream, mut bytes: &mut [u8], mut end_receiver: &mut Fuse>, + w2c_cid_frame_s: &mut mpsc::UnboundedSender, ) -> bool { match select! { r = stream.read_exact(&mut bytes).fuse() => Some(r), @@ -79,6 +84,13 @@ impl TcpProtocol { Some(Ok(_)) => false, Some(Err(e)) => { info!(?e, "Closing tcp protocol due to read error"); + //w2c_cid_frame_s is shared, dropping it wouldn't notify the receiver as every + // channel is holding a sender! thats why Ne need a explicit + // STOP here + w2c_cid_frame_s + .send((cid, Err(()))) + .await + .expect("Channel or Participant seems no longer to exist"); true }, None => { @@ -91,7 +103,7 @@ impl TcpProtocol { pub async fn read_from_wire( &self, cid: Cid, - w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>, + w2c_cid_frame_s: &mut mpsc::UnboundedSender, end_r: oneshot::Receiver<()>, ) { trace!("Starting up tcp read()"); @@ -107,7 +119,7 @@ impl TcpProtocol { macro_rules! read_or_close { ($x:expr) => { - if TcpProtocol::read_or_close(&stream, $x, &mut end_r).await { + if TcpProtocol::read_or_close(cid, &stream, $x, &mut end_r, w2c_cid_frame_s).await { trace!("read_or_close requested a shutdown"); break; } @@ -202,7 +214,7 @@ impl TcpProtocol { #[cfg(feature = "metrics")] metrics_cache.with_label_values(&frame).inc(); w2c_cid_frame_s - .send((cid, frame)) + .send((cid, Ok(frame))) .await .expect("Channel or Participant seems no longer to exist"); } @@ -331,7 +343,7 @@ impl UdpProtocol { pub async fn read_from_wire( &self, cid: Cid, - w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>, + w2c_cid_frame_s: &mut mpsc::UnboundedSender, end_r: oneshot::Receiver<()>, ) { trace!("Starting up udp read()"); @@ -345,7 +357,14 @@ impl UdpProtocol { let mut data_in = self.data_in.lock().await; let mut end_r = end_r.fuse(); while let Some(bytes) = select! { - r = data_in.next().fuse() => r, + r = data_in.next().fuse() => match r { + Some(r) => Some(r), + None => { + info!("Udp read ended"); + w2c_cid_frame_s.send((cid, Err(()))).await.expect("Channel or Participant seems no longer to exist"); + None + } + }, _ = end_r => None, } { trace!("Got raw UDP message with len: {}", bytes.len()); @@ -443,7 +462,7 @@ impl UdpProtocol { }; #[cfg(feature = "metrics")] metrics_cache.with_label_values(&frame).inc(); - w2c_cid_frame_s.send((cid, frame)).await.unwrap(); + w2c_cid_frame_s.send((cid, Ok(frame))).await.unwrap(); } trace!("Shutting down udp read()"); } @@ -554,10 +573,7 @@ impl UdpProtocol { #[cfg(test)] mod tests { use super::*; - use crate::{ - metrics::NetworkMetrics, - types::{Cid, Pid}, - }; + use crate::{metrics::NetworkMetrics, types::Pid}; use async_std::net; use futures::{executor::block_on, stream::StreamExt}; use std::sync::Arc; @@ -584,7 +600,7 @@ mod tests { client.flush(); //handle data - let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>(); + let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::(); let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let cid2 = cid; let t = std::thread::spawn(move || { @@ -597,10 +613,10 @@ mod tests { //async_std::task::sleep(std::time::Duration::from_millis(1000)); let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap(); assert_eq!(cid, cid_r); - if let Frame::Handshake { + if let Ok(Frame::Handshake { magic_number, version, - } = frame + }) = frame { assert_eq!(&magic_number, b"HELLOWO"); assert_eq!(version, [1337, 0, 42]); @@ -633,7 +649,7 @@ mod tests { client.flush(); //handle data - let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>(); + let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::(); let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let cid2 = cid; let t = std::thread::spawn(move || { @@ -645,7 +661,7 @@ mod tests { // Assert than we get some value back! Its a Raw! let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap(); assert_eq!(cid, cid_r); - if let Frame::Raw(data) = frame { + if let Ok(Frame::Raw(data)) = frame { assert_eq!(&data.as_slice(), b"x4hrtzsektfhxugzdtz5r78gzrtzfhxf"); } else { panic!("wrong frame type"); diff --git a/network/src/types.rs b/network/src/types.rs index 3ede7fd302..51fd1843e3 100644 --- a/network/src/types.rs +++ b/network/src/types.rs @@ -146,10 +146,18 @@ impl Pid { /// This will panic if pid i greater than 7, as I do not want you to use /// this in production! #[doc(hidden)] - pub fn fake(pid: u8) -> Self { - assert!(pid < 8); + pub fn fake(pid_offset: u8) -> Self { + assert!(pid_offset < 8); + let o = pid_offset as u128; + const OFF: [u128; 5] = [ + 0x40, + 0x40 * 0x40, + 0x40 * 0x40 * 0x40, + 0x40 * 0x40 * 0x40 * 0x40, + 0x40 * 0x40 * 0x40 * 0x40 * 0x40, + ]; Self { - internal: pid as u128, + internal: o + o * OFF[0] + o * OFF[1] + o * OFF[2] + o * OFF[3] + o * OFF[4], } } @@ -251,7 +259,9 @@ mod tests { #[test] fn frame_creation() { Pid::new(); - assert_eq!(format!("{}", Pid::fake(2)), "CAAAAA"); + assert_eq!(format!("{}", Pid::fake(0)), "AAAAAA"); + assert_eq!(format!("{}", Pid::fake(1)), "BBBBBB"); + assert_eq!(format!("{}", Pid::fake(2)), "CCCCCC"); } #[test] diff --git a/network/tests/closing.rs b/network/tests/closing.rs index af0674460a..9c26a3ac43 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -42,8 +42,11 @@ fn close_participant() { let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp())); block_on(p1_a.disconnect()).unwrap(); - //As no more read/write is done on p1_b the disconnect is successful - block_on(p1_b.disconnect()).unwrap(); + //As no more read/write is run disconnect is successful or already disconnected + match block_on(p1_b.disconnect()) { + Ok(_) | Err(ParticipantError::ParticipantDisconnected) => (), + e => panic!("wrong disconnect type {:?}", e), + }; assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed)); assert_eq!( @@ -225,15 +228,6 @@ fn close_network_then_disconnect_part() { } #[test] -/* -FLANKY: ----- opened_stream_before_remote_part_is_closed stdout ---- -thread 'opened_stream_before_remote_part_is_closed' panicked at 'assertion failed: `(left == right)` - left: `Err(StreamClosed)`, - right: `Ok("HelloWorld")`', network/tests/closing.rs:236:5 -note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace - -*/ fn opened_stream_before_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp())); @@ -291,9 +285,9 @@ fn failed_stream_open_after_remote_part_is_closed() { #[test] fn open_participant_before_remote_part_is_closed() { - let (n_a, f) = Network::new(Pid::fake(1)); + let (n_a, f) = Network::new(Pid::fake(0)); std::thread::spawn(f); - let (n_b, f) = Network::new(Pid::fake(2)); + let (n_b, f) = Network::new(Pid::fake(1)); std::thread::spawn(f); let addr = tcp(); block_on(n_a.listen(addr.clone())).unwrap(); @@ -311,9 +305,9 @@ fn open_participant_before_remote_part_is_closed() { #[test] fn open_participant_after_remote_part_is_closed() { - let (n_a, f) = Network::new(Pid::fake(1)); + let (n_a, f) = Network::new(Pid::fake(0)); std::thread::spawn(f); - let (n_b, f) = Network::new(Pid::fake(2)); + let (n_b, f) = Network::new(Pid::fake(1)); std::thread::spawn(f); let addr = tcp(); block_on(n_a.listen(addr.clone())).unwrap(); diff --git a/network/tests/helper.rs b/network/tests/helper.rs index 53ce8e0c47..91987cbbc5 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -49,9 +49,9 @@ pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) { pub async fn network_participant_stream( addr: ProtocolAddr, ) -> (Network, Participant, Stream, Network, Participant, Stream) { - let (n_a, f_a) = Network::new(Pid::fake(1)); + let (n_a, f_a) = Network::new(Pid::fake(0)); std::thread::spawn(f_a); - let (n_b, f_b) = Network::new(Pid::fake(2)); + let (n_b, f_b) = Network::new(Pid::fake(1)); std::thread::spawn(f_b); n_a.listen(addr.clone()).await.unwrap(); From d9a6938d12a5d5f200c750470671985ee4436a72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Fri, 21 Aug 2020 16:21:00 +0200 Subject: [PATCH 6/6] Fix scheduler not really shutting down when they where listening on a Port. Add a seperate test for this. - 1000ms sleep isn't enough in tracing anyway, so remove it --- network/src/scheduler.rs | 7 +++++++ network/tests/closing.rs | 22 ++++++++++++++++++++++ network/tests/helper.rs | 5 +---- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index b7792f811f..a8a0a0a889 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -334,6 +334,13 @@ impl Scheduler { ); }; } + debug!("shutting down protocol listeners"); + for (addr, end_channel_sender) in self.channel_listener.write().await.drain() { + trace!(?addr, "stopping listen on protocol"); + if let Err(e) = end_channel_sender.send(()) { + warn!(?addr, ?e, "listener crashed/disconnected already"); + } + } debug!("Scheduler shut down gracefully"); //removing the possibility to create new participants, needed to close down // some mgr: diff --git a/network/tests/closing.rs b/network/tests/closing.rs index 9c26a3ac43..0a9a51ed93 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -322,3 +322,25 @@ fn open_participant_after_remote_part_is_closed() { let mut s1_a = block_on(p_a.opened()).unwrap(); assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string())); } + +#[test] +fn close_network_scheduler_completely() { + let (n_a, f) = Network::new(Pid::fake(0)); + let ha = std::thread::spawn(f); + let (n_b, f) = Network::new(Pid::fake(1)); + let hb = std::thread::spawn(f); + let addr = tcp(); + block_on(n_a.listen(addr.clone())).unwrap(); + let p_b = block_on(n_b.connect(addr)).unwrap(); + let mut s1_b = block_on(p_b.open(10, PROMISES_NONE)).unwrap(); + s1_b.send("HelloWorld").unwrap(); + + let p_a = block_on(n_a.connected()).unwrap(); + let mut s1_a = block_on(p_a.opened()).unwrap(); + assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string())); + drop(n_a); + drop(n_b); + std::thread::sleep(std::time::Duration::from_millis(1000)); + ha.join().unwrap(); + hb.join().unwrap(); +} diff --git a/network/tests/helper.rs b/network/tests/helper.rs index 91987cbbc5..02edf8bbff 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -10,10 +10,7 @@ use tracing_subscriber::EnvFilter; use veloren_network::{Network, Participant, Pid, ProtocolAddr, Stream, PROMISES_NONE}; #[allow(dead_code)] -pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) { - if tracing { - sleep += 1000 - } +pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) { if sleep > 0 { thread::sleep(Duration::from_millis(sleep)); }