diff --git a/client/src/lib.rs b/client/src/lib.rs index 1c5e28bf6c..187c33306f 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1383,8 +1383,10 @@ impl Client { ); }, ServerMsg::Disconnect => { + debug!("finally sendinge ClientMsg::Terminate"); frontend_events.push(Event::Disconnect); self.singleton_stream.send(ClientMsg::Terminate)?; + break Ok(()); }, ServerMsg::CharacterListUpdate(character_list) => { self.character_list.characters = character_list; diff --git a/network/src/channel.rs b/network/src/channel.rs index 25babccd2b..6e9a4c5e43 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -1,6 +1,7 @@ #[cfg(feature = "metrics")] use crate::metrics::NetworkMetrics; use crate::{ + participant::C2pFrame, protocols::Protocols, types::{ Cid, Frame, Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER, @@ -41,8 +42,8 @@ impl Channel { pub async fn run( mut self, protocol: Protocols, - mut w2c_cid_frame_s: mpsc::UnboundedSender<(Cid, Frame)>, - mut leftover_cid_frame: Vec<(Cid, Frame)>, + mut w2c_cid_frame_s: mpsc::UnboundedSender, + mut leftover_cid_frame: Vec, ) { let c2w_frame_r = self.c2w_frame_r.take().unwrap(); let read_stop_receiver = self.read_stop_receiver.take().unwrap(); @@ -58,13 +59,13 @@ impl Channel { trace!(?self.cid, "Start up channel"); match protocol { Protocols::Tcp(tcp) => { - futures::join!( + join!( tcp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver), tcp.write_to_wire(self.cid, c2w_frame_r), ); }, Protocols::Udp(udp) => { - futures::join!( + join!( udp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver), udp.write_to_wire(self.cid, c2w_frame_r), ); @@ -113,12 +114,9 @@ impl Handshake { } } - pub async fn setup( - self, - protocol: &Protocols, - ) -> Result<(Pid, Sid, u128, Vec<(Cid, Frame)>), ()> { + pub async fn setup(self, protocol: &Protocols) -> Result<(Pid, Sid, u128, Vec), ()> { let (c2w_frame_s, c2w_frame_r) = mpsc::unbounded::(); - let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>(); + let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::(); let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let handler_future = @@ -160,7 +158,7 @@ impl Handshake { async fn frame_handler( &self, - w2c_cid_frame_r: &mut mpsc::UnboundedReceiver<(Cid, Frame)>, + w2c_cid_frame_r: &mut mpsc::UnboundedReceiver, mut c2w_frame_s: mpsc::UnboundedSender, read_stop_sender: oneshot::Sender<()>, ) -> Result<(Pid, Sid, u128), ()> { @@ -176,7 +174,7 @@ impl Handshake { let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame); #[cfg(feature = "metrics")] { - if let Some(ref frame) = frame { + if let Some(Ok(ref frame)) = frame { self.metrics .frames_in_total .with_label_values(&["", &cid_string, &frame.get_string()]) @@ -184,10 +182,10 @@ impl Handshake { } } let r = match frame { - Some(Frame::Handshake { + Some(Ok(Frame::Handshake { magic_number, version, - }) => { + })) => { trace!(?magic_number, ?version, "Recv handshake"); if magic_number != VELOREN_MAGIC_NUMBER { error!(?magic_number, "Connection with invalid magic_number"); @@ -221,18 +219,24 @@ impl Handshake { Ok(()) } }, - Some(Frame::Shutdown) => { - info!("Shutdown signal received"); - Err(()) - }, - Some(Frame::Raw(bytes)) => { - match std::str::from_utf8(bytes.as_slice()) { - Ok(string) => error!(?string, ERR_S), - _ => error!(?bytes, ERR_S), + Some(Ok(frame)) => { + #[cfg(feature = "metrics")] + self.metrics + .frames_in_total + .with_label_values(&["", &cid_string, frame.get_string()]) + .inc(); + if let Frame::Raw(bytes) = frame { + match std::str::from_utf8(bytes.as_slice()) { + Ok(string) => error!(?string, ERR_S), + _ => error!(?bytes, ERR_S), + } } Err(()) }, - Some(_) => Err(()), + Some(Err(())) => { + info!("Protocol got interrupted"); + Err(()) + }, None => Err(()), }; if let Err(()) = r { @@ -248,7 +252,7 @@ impl Handshake { let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame); let r = match frame { - Some(Frame::Init { pid, secret }) => { + Some(Ok(Frame::Init { pid, secret })) => { debug!(?pid, "Participant send their ID"); let pid_string = pid.to_string(); #[cfg(feature = "metrics")] @@ -265,22 +269,24 @@ impl Handshake { info!(?pid, "This Handshake is now configured!"); Ok((pid, stream_id_offset, secret)) }, - Some(frame) => { + Some(Ok(frame)) => { #[cfg(feature = "metrics")] self.metrics .frames_in_total .with_label_values(&["", &cid_string, frame.get_string()]) .inc(); - match frame { - Frame::Shutdown => info!("Shutdown signal received"), - Frame::Raw(bytes) => match std::str::from_utf8(bytes.as_slice()) { + if let Frame::Raw(bytes) = frame { + match std::str::from_utf8(bytes.as_slice()) { Ok(string) => error!(?string, ERR_S), _ => error!(?bytes, ERR_S), - }, - _ => (), + } } Err(()) }, + Some(Err(())) => { + info!("Protocol got interrupted"); + Err(()) + }, None => Err(()), }; if r.is_err() { diff --git a/network/src/metrics.rs b/network/src/metrics.rs index 4436a8ce7f..02940acd40 100644 --- a/network/src/metrics.rs +++ b/network/src/metrics.rs @@ -34,6 +34,8 @@ pub struct NetworkMetrics { pub message_out_total: IntCounterVec, // send(prio) Messages throughput, seperated by STREAM AND PARTICIPANT, pub message_out_throughput: IntCounterVec, + // flushed(prio) stream count, seperated by PARTICIPANT, + pub streams_flushed: IntCounterVec, // TODO: queued Messages, seperated by STREAM (add PART, CHANNEL), // queued Messages, seperated by PARTICIPANT pub queued_count: IntGaugeVec, @@ -167,6 +169,13 @@ impl NetworkMetrics { ), &["participant", "stream"], )?; + let streams_flushed = IntCounterVec::new( + Opts::new( + "stream_flushed", + "Number of flushed streams requested to PrioManager at participant level", + ), + &["participant"], + )?; let queued_count = IntGaugeVec::new( Opts::new( "queued_count", @@ -207,6 +216,7 @@ impl NetworkMetrics { wire_in_throughput, message_out_total, message_out_throughput, + streams_flushed, queued_count, queued_bytes, participants_ping, diff --git a/network/src/participant.rs b/network/src/participant.rs index 085ddfcb77..607b83e925 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -25,9 +25,11 @@ use std::{ time::{Duration, Instant}, }; use tracing::*; +use tracing_futures::Instrument; pub(crate) type A2bStreamOpen = (Prio, Promises, oneshot::Sender); -pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>); +pub(crate) type C2pFrame = (Cid, Result); +pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, Vec, oneshot::Sender<()>); pub(crate) type S2bShutdownBparticipant = oneshot::Sender>; pub(crate) type B2sPrioStatistic = (Pid, u64, u64); @@ -69,7 +71,7 @@ pub struct BParticipant { remote_pid: Pid, remote_pid_string: String, //optimisation offset_sid: Sid, - channels: Arc>>, + channels: Arc>>, streams: RwLock>, running_mgr: AtomicUsize, run_channels: Option, @@ -118,7 +120,7 @@ impl BParticipant { remote_pid, remote_pid_string: remote_pid.to_string(), offset_sid, - channels: Arc::new(RwLock::new(vec![])), + channels: Arc::new(RwLock::new(HashMap::new())), streams: RwLock::new(HashMap::new()), running_mgr: AtomicUsize::new(0), run_channels, @@ -142,7 +144,7 @@ impl BParticipant { oneshot::channel(); let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel(); let (b2b_prios_flushed_s, b2b_prios_flushed_r) = oneshot::channel(); - let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<(Cid, Frame)>(); + let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::(); let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = PrioManager::new( #[cfg(feature = "metrics")] self.metrics.clone(), @@ -205,13 +207,11 @@ impl BParticipant { #[cfg(feature = "metrics")] let mut send_cache = PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); + let mut i: u64 = 0; loop { let mut frames = VecDeque::new(); prios.fill_frames(FRAMES_PER_TICK, &mut frames).await; let len = frames.len(); - if len > 0 { - trace!("Tick {}", len); - } for (_, frame) in frames { self.send_frame( frame, @@ -225,6 +225,10 @@ impl BParticipant { .await .unwrap(); async_std::task::sleep(TICK_TIME).await; + i += 1; + if i.rem_euclid(1000) == 0 { + trace!("Did 1000 ticks"); + } //shutdown after all msg are send! if closing_up && (len == 0) { break; @@ -251,34 +255,30 @@ impl BParticipant { // find out ideal channel here //TODO: just take first let mut lock = self.channels.write().await; - if let Some(ci) = lock.get_mut(0) { - //note: this is technically wrong we should only increase when it suceeded, but - // this requiered me to clone `frame` which is a to big performance impact for - // error handling + if let Some(ci) = lock.values_mut().next() { + //we are increasing metrics without checking the result to please + // borrow_checker. otherwise we would need to close `frame` what we + // dont want! #[cfg(feature = "metrics")] frames_out_total_cache .with_label_values(ci.cid, &frame) .inc(); if let Err(e) = ci.b2w_frame_s.send(frame).await { - warn!( - ?e, - "The channel got closed unexpectedly, cleaning it up now." - ); - let ci = lock.remove(0); - if let Err(e) = ci.b2r_read_shutdown.send(()) { - debug!( - ?e, - "Error shutdowning channel, which is prob fine as we detected it to no \ - longer work in the first place" - ); - }; + let cid = ci.cid; + info!(?e, ?cid, "channel no longer available"); + if let Some(ci) = self.channels.write().await.remove(&cid) { + trace!(?cid, "stopping read protocol"); + if let Err(e) = ci.b2r_read_shutdown.send(()) { + trace!(?cid, ?e, "seems like was already shut down"); + } + } //TODO FIXME tags: takeover channel multiple info!( "FIXME: the frame is actually drop. which is fine for now as the participant \ will be closed, but not if we do channel-takeover" ); //TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant - self.close_api(Some(ParticipantError::ProtocolFailedUnrecoverable)) + self.close_write_api(Some(ParticipantError::ProtocolFailedUnrecoverable)) .await; false } else { @@ -291,7 +291,12 @@ impl BParticipant { guard.0 = now; let occurrences = guard.1 + 1; guard.1 = 0; - error!(?occurrences, "Participant has no channel to communicate on"); + let lastframe = frame; + error!( + ?occurrences, + ?lastframe, + "Participant has no channel to communicate on" + ); } else { guard.1 += 1; } @@ -301,7 +306,7 @@ impl BParticipant { async fn handle_frames_mgr( &self, - mut w2b_frames_r: mpsc::UnboundedReceiver<(Cid, Frame)>, + mut w2b_frames_r: mpsc::UnboundedReceiver, mut b2a_stream_opened_s: mpsc::UnboundedSender, a2b_close_stream_s: mpsc::UnboundedSender, a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, @@ -313,8 +318,22 @@ impl BParticipant { let mut dropped_cnt = 0u64; let mut dropped_sid = Sid::new(0); - while let Some((cid, frame)) = w2b_frames_r.next().await { - //trace!("handling frame"); + while let Some((cid, result_frame)) = w2b_frames_r.next().await { + //trace!(?result_frame, "handling frame"); + let frame = match result_frame { + Ok(frame) => frame, + Err(()) => { + // The read protocol stopped, i need to make sure that write gets stopped + debug!("read protocol was closed. Stopping write protocol"); + if let Some(ci) = self.channels.write().await.get_mut(&cid) { + ci.b2w_frame_s + .close() + .await + .expect("couldn't stop write protocol"); + } + continue; + }, + }; #[cfg(feature = "metrics")] { let cid_string = cid.to_string(); @@ -323,8 +342,6 @@ impl BParticipant { .with_label_values(&[&self.remote_pid_string, &cid_string, frame.get_string()]) .inc(); } - #[cfg(not(feature = "metrics"))] - let _cid = cid; match frame { Frame::OpenStream { sid, @@ -395,7 +412,7 @@ impl BParticipant { false }; if finished { - //debug!(?mid, "finished receiving message"); + //trace!(?mid, "finished receiving message"); let imsg = messages.remove(&mid).unwrap(); if let Some(si) = self.streams.write().await.get_mut(&imsg.sid) { if let Err(e) = si.b2a_msg_recv_s.send(imsg).await { @@ -448,7 +465,7 @@ impl BParticipant { async fn create_channel_mgr( &self, s2b_create_channel_r: mpsc::UnboundedReceiver, - w2b_frames_s: mpsc::UnboundedSender<(Cid, Frame)>, + w2b_frames_s: mpsc::UnboundedSender, ) { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("Start create_channel_mgr"); @@ -462,7 +479,7 @@ impl BParticipant { let channels = self.channels.clone(); async move { let (channel, b2w_frame_s, b2r_read_shutdown) = Channel::new(cid); - channels.write().await.push(ChannelInfo { + channels.write().await.insert(cid, ChannelInfo { cid, cid_string: cid.to_string(), b2w_frame_s, @@ -477,13 +494,20 @@ impl BParticipant { trace!(?cid, "Running channel in participant"); channel .run(protocol, w2b_frames_s, leftover_cid_frame) + .instrument(tracing::info_span!("", ?cid)) .await; #[cfg(feature = "metrics")] self.metrics .channels_disconnected_total .with_label_values(&[&self.remote_pid_string]) .inc(); - trace!(?cid, "Channel got closed"); + info!(?cid, "Channel got closed"); + //maybe channel got already dropped, we don't know. + channels.write().await.remove(&cid); + trace!(?cid, "Channel cleanup completed"); + //TEMP FIX: as we dont have channel takeover yet drop the whole + // bParticipant + self.close_write_api(None).await; } }, ) @@ -562,6 +586,9 @@ impl BParticipant { trace!("Start participant_shutdown_mgr"); let sender = s2b_shutdown_bparticipant_r.await.unwrap(); + let mut send_cache = + PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); + self.close_api(None).await; debug!("Closing all managers"); @@ -572,10 +599,30 @@ impl BParticipant { } b2b_prios_flushed_r.await.unwrap(); + if Some(ParticipantError::ParticipantDisconnected) != self.shutdown_info.read().await.error + { + debug!("Sending shutdown frame after flushed all prios"); + if !self + .send_frame( + Frame::Shutdown, + #[cfg(feature = "metrics")] + &mut send_cache, + ) + .await + { + warn!("couldn't send shutdown frame, are channels already closed?"); + } + } + debug!("Closing all channels, after flushed prios"); - for ci in self.channels.write().await.drain(..) { + for (cid, ci) in self.channels.write().await.drain() { if let Err(e) = ci.b2r_read_shutdown.send(()) { - debug!(?e, ?ci.cid, "Seems like this read protocol got already dropped by closing the Stream itself, just ignoring the fact"); + debug!( + ?e, + ?cid, + "Seems like this read protocol got already dropped by closing the Stream \ + itself, ignoring" + ); }; } @@ -647,7 +694,7 @@ impl BParticipant { si.send_closed.store(true, Ordering::Relaxed); si.b2a_msg_recv_s.close_channel(); }, - None => warn!("Couldn't find the stream, might be simulanious close from remote"), + None => warn!("Couldn't find the stream, might be simultaneous close from remote"), } //TODO: what happens if RIGHT NOW the remote sends a StreamClose and this @@ -711,21 +758,29 @@ impl BParticipant { ) } - /// close streams and set err - async fn close_api(&self, reason: Option) { - //closing api::Participant is done by closing all channels, exepct for the - // shutdown channel at this point! + async fn close_write_api(&self, reason: Option) { + trace!(?reason, "close_api"); let mut lock = self.shutdown_info.write().await; if let Some(r) = reason { lock.error = Some(r); } lock.b2a_stream_opened_s.close_channel(); + debug!("Closing all streams for write"); + for (sid, si) in self.streams.write().await.iter() { + trace!(?sid, "Shutting down Stream for write"); + si.send_closed.store(true, Ordering::Relaxed); + } + } + + ///closing api::Participant is done by closing all channels, exepct for the + /// shutdown channel at this point! + async fn close_api(&self, reason: Option) { + self.close_write_api(reason).await; debug!("Closing all streams"); for (sid, si) in self.streams.write().await.drain() { trace!(?sid, "Shutting down Stream"); si.b2a_msg_recv_s.close_channel(); - si.send_closed.store(true, Ordering::Relaxed); } } } diff --git a/network/src/prios.rs b/network/src/prios.rs index 4e5971a32e..1026b5f581 100644 --- a/network/src/prios.rs +++ b/network/src/prios.rs @@ -16,8 +16,6 @@ use futures::channel::oneshot; use std::collections::{HashMap, HashSet, VecDeque}; #[cfg(feature = "metrics")] use std::sync::Arc; -use tracing::*; - const PRIO_MAX: usize = 64; #[derive(Default)] @@ -148,11 +146,8 @@ impl PrioManager { async fn tick(&mut self) { // Check Range - let mut messages = 0; - let mut closed = 0; for (prio, sid, msg) in self.messages_rx.try_iter() { debug_assert!(prio as usize <= PRIO_MAX); - messages += 1; #[cfg(feature = "metrics")] { let sid_string = sid.to_string(); @@ -173,7 +168,11 @@ impl PrioManager { } //this must be AFTER messages for (sid, return_sender) in self.sid_flushed_rx.try_iter() { - closed += 1; + #[cfg(feature = "metrics")] + self.metrics + .streams_flushed + .with_label_values(&[&self.pid]) + .inc(); if let Some(cnt) = self.sid_owned.get_mut(&sid) { // register sender cnt.empty_notify = Some(return_sender); @@ -182,9 +181,6 @@ impl PrioManager { return_sender.send(()).unwrap(); } } - if messages > 0 || closed > 0 { - trace!(?messages, ?closed, "tick"); - } } //if None returned, we are empty! @@ -256,7 +252,6 @@ impl PrioManager { } } } else { - trace!(?msg.mid, "Repush message"); self.messages[prio as usize].push_front((sid, msg)); } }, diff --git a/network/src/protocols.rs b/network/src/protocols.rs index 687e1d8885..910b11d5c6 100644 --- a/network/src/protocols.rs +++ b/network/src/protocols.rs @@ -1,6 +1,9 @@ #[cfg(feature = "metrics")] use crate::metrics::{CidFrameCache, NetworkMetrics}; -use crate::types::{Cid, Frame, Mid, Pid, Sid}; +use crate::{ + participant::C2pFrame, + types::{Cid, Frame, Mid, Pid, Sid}, +}; use async_std::{ net::{TcpStream, UdpSocket}, prelude::*, @@ -71,8 +74,8 @@ impl TcpProtocol { cid: Cid, mut stream: &TcpStream, mut bytes: &mut [u8], - w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>, mut end_receiver: &mut Fuse>, + w2c_cid_frame_s: &mut mpsc::UnboundedSender, ) -> bool { match select! { r = stream.read_exact(&mut bytes).fuse() => Some(r), @@ -80,20 +83,18 @@ impl TcpProtocol { } { Some(Ok(_)) => false, Some(Err(e)) => { - debug!( - ?cid, - ?e, - "Closing tcp protocol due to read error, sending close frame to gracefully \ - shutdown" - ); + info!(?e, "Closing tcp protocol due to read error"); + //w2c_cid_frame_s is shared, dropping it wouldn't notify the receiver as every + // channel is holding a sender! thats why Ne need a explicit + // STOP here w2c_cid_frame_s - .send((cid, Frame::Shutdown)) + .send((cid, Err(()))) .await - .expect("Channel or Participant seems no longer to exist to be Shutdown"); + .expect("Channel or Participant seems no longer to exist"); true }, None => { - trace!(?cid, "shutdown requested"); + trace!("shutdown requested"); true }, } @@ -102,7 +103,7 @@ impl TcpProtocol { pub async fn read_from_wire( &self, cid: Cid, - w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>, + w2c_cid_frame_s: &mut mpsc::UnboundedSender, end_r: oneshot::Receiver<()>, ) { trace!("Starting up tcp read()"); @@ -118,8 +119,8 @@ impl TcpProtocol { macro_rules! read_or_close { ($x:expr) => { - if TcpProtocol::read_or_close(cid, &stream, $x, w2c_cid_frame_s, &mut end_r).await { - info!("Tcp stream closed, shutting down read"); + if TcpProtocol::read_or_close(cid, &stream, $x, &mut end_r, w2c_cid_frame_s).await { + trace!("read_or_close requested a shutdown"); break; } }; @@ -213,7 +214,7 @@ impl TcpProtocol { #[cfg(feature = "metrics")] metrics_cache.with_label_values(&frame).inc(); w2c_cid_frame_s - .send((cid, frame)) + .send((cid, Ok(frame))) .await .expect("Channel or Participant seems no longer to exist"); } @@ -228,7 +229,7 @@ impl TcpProtocol { ) -> bool { match stream.write_all(&bytes).await { Err(e) => { - debug!( + info!( ?e, "Got an error writing to tcp, going to close this channel" ); @@ -255,7 +256,7 @@ impl TcpProtocol { macro_rules! write_or_close { ($x:expr) => { if TcpProtocol::write_or_close(&mut stream, $x, &mut c2w_frame_r).await { - info!("Tcp stream closed, shutting down write"); + trace!("write_or_close requested a shutdown"); break; } }; @@ -342,7 +343,7 @@ impl UdpProtocol { pub async fn read_from_wire( &self, cid: Cid, - w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>, + w2c_cid_frame_s: &mut mpsc::UnboundedSender, end_r: oneshot::Receiver<()>, ) { trace!("Starting up udp read()"); @@ -356,7 +357,14 @@ impl UdpProtocol { let mut data_in = self.data_in.lock().await; let mut end_r = end_r.fuse(); while let Some(bytes) = select! { - r = data_in.next().fuse() => r, + r = data_in.next().fuse() => match r { + Some(r) => Some(r), + None => { + info!("Udp read ended"); + w2c_cid_frame_s.send((cid, Err(()))).await.expect("Channel or Participant seems no longer to exist"); + None + } + }, _ = end_r => None, } { trace!("Got raw UDP message with len: {}", bytes.len()); @@ -454,7 +462,7 @@ impl UdpProtocol { }; #[cfg(feature = "metrics")] metrics_cache.with_label_values(&frame).inc(); - w2c_cid_frame_s.send((cid, frame)).await.unwrap(); + w2c_cid_frame_s.send((cid, Ok(frame))).await.unwrap(); } trace!("Shutting down udp read()"); } @@ -565,10 +573,7 @@ impl UdpProtocol { #[cfg(test)] mod tests { use super::*; - use crate::{ - metrics::NetworkMetrics, - types::{Cid, Pid}, - }; + use crate::{metrics::NetworkMetrics, types::Pid}; use async_std::net; use futures::{executor::block_on, stream::StreamExt}; use std::sync::Arc; @@ -595,7 +600,7 @@ mod tests { client.flush(); //handle data - let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>(); + let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::(); let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let cid2 = cid; let t = std::thread::spawn(move || { @@ -608,10 +613,10 @@ mod tests { //async_std::task::sleep(std::time::Duration::from_millis(1000)); let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap(); assert_eq!(cid, cid_r); - if let Frame::Handshake { + if let Ok(Frame::Handshake { magic_number, version, - } = frame + }) = frame { assert_eq!(&magic_number, b"HELLOWO"); assert_eq!(version, [1337, 0, 42]); @@ -644,7 +649,7 @@ mod tests { client.flush(); //handle data - let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>(); + let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::(); let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let cid2 = cid; let t = std::thread::spawn(move || { @@ -656,7 +661,7 @@ mod tests { // Assert than we get some value back! Its a Raw! let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap(); assert_eq!(cid, cid_r); - if let Frame::Raw(data) = frame { + if let Ok(Frame::Raw(data)) = frame { assert_eq!(&data.as_slice(), b"x4hrtzsektfhxugzdtz5r78gzrtzfhxf"); } else { panic!("wrong frame type"); diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 000ccd92e7..a8a0a0a889 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -334,6 +334,13 @@ impl Scheduler { ); }; } + debug!("shutting down protocol listeners"); + for (addr, end_channel_sender) in self.channel_listener.write().await.drain() { + trace!(?addr, "stopping listen on protocol"); + if let Err(e) = end_channel_sender.send(()) { + warn!(?addr, ?e, "listener crashed/disconnected already"); + } + } debug!("Scheduler shut down gracefully"); //removing the possibility to create new participants, needed to close down // some mgr: @@ -512,7 +519,11 @@ impl Scheduler { metrics.clone(), send_handshake, ); - match handshake.setup(&protocol).await { + match handshake + .setup(&protocol) + .instrument(tracing::info_span!("handshake", ?cid)) + .await + { Ok((pid, sid, secret, leftover_cid_frame)) => { trace!( ?cid, @@ -583,14 +594,18 @@ impl Scheduler { } } else { let pi = &participants[&pid]; - trace!("2nd+ channel of participant, going to compare security ids"); + trace!( + ?cid, + "2nd+ channel of participant, going to compare security ids" + ); if pi.secret != secret { warn!( + ?cid, ?pid, ?secret, "Detected incompatible Secret!, this is probably an attack!" ); - error!("Just dropping here, TODO handle this correctly!"); + error!(?cid, "Just dropping here, TODO handle this correctly!"); //TODO if let Some(pid_oneshot) = s2a_return_pid_s { // someone is waiting with `connect`, so give them their Error @@ -604,6 +619,7 @@ impl Scheduler { return; } error!( + ?cid, "Ufff i cant answer the pid_oneshot. as i need to create the SAME \ participant. maybe switch to ARC" ); @@ -612,9 +628,10 @@ impl Scheduler { // move directly to participant! }, Err(()) => { + debug!(?cid, "Handshake from a new connection failed"); if let Some(pid_oneshot) = s2a_return_pid_s { // someone is waiting with `connect`, so give them their Error - trace!("returning the Err to api who requested the connect"); + trace!(?cid, "returning the Err to api who requested the connect"); pid_oneshot .send(Err(std::io::Error::new( std::io::ErrorKind::PermissionDenied, @@ -625,7 +642,7 @@ impl Scheduler { }, } } - .instrument(tracing::trace_span!("")), + .instrument(tracing::info_span!("")), ); /*WORKAROUND FOR SPAN NOT TO GET LOST*/ } } diff --git a/network/src/types.rs b/network/src/types.rs index 3ede7fd302..51fd1843e3 100644 --- a/network/src/types.rs +++ b/network/src/types.rs @@ -146,10 +146,18 @@ impl Pid { /// This will panic if pid i greater than 7, as I do not want you to use /// this in production! #[doc(hidden)] - pub fn fake(pid: u8) -> Self { - assert!(pid < 8); + pub fn fake(pid_offset: u8) -> Self { + assert!(pid_offset < 8); + let o = pid_offset as u128; + const OFF: [u128; 5] = [ + 0x40, + 0x40 * 0x40, + 0x40 * 0x40 * 0x40, + 0x40 * 0x40 * 0x40 * 0x40, + 0x40 * 0x40 * 0x40 * 0x40 * 0x40, + ]; Self { - internal: pid as u128, + internal: o + o * OFF[0] + o * OFF[1] + o * OFF[2] + o * OFF[3] + o * OFF[4], } } @@ -251,7 +259,9 @@ mod tests { #[test] fn frame_creation() { Pid::new(); - assert_eq!(format!("{}", Pid::fake(2)), "CAAAAA"); + assert_eq!(format!("{}", Pid::fake(0)), "AAAAAA"); + assert_eq!(format!("{}", Pid::fake(1)), "BBBBBB"); + assert_eq!(format!("{}", Pid::fake(2)), "CCCCCC"); } #[test] diff --git a/network/tests/closing.rs b/network/tests/closing.rs index 1eb81b1990..0a9a51ed93 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -42,10 +42,11 @@ fn close_participant() { let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp())); block_on(p1_a.disconnect()).unwrap(); - assert_eq!( - block_on(p1_b.disconnect()), - Err(ParticipantError::ParticipantDisconnected) - ); + //As no more read/write is run disconnect is successful or already disconnected + match block_on(p1_b.disconnect()) { + Ok(_) | Err(ParticipantError::ParticipantDisconnected) => (), + e => panic!("wrong disconnect type {:?}", e), + }; assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed)); assert_eq!( @@ -284,9 +285,9 @@ fn failed_stream_open_after_remote_part_is_closed() { #[test] fn open_participant_before_remote_part_is_closed() { - let (n_a, f) = Network::new(Pid::fake(1)); + let (n_a, f) = Network::new(Pid::fake(0)); std::thread::spawn(f); - let (n_b, f) = Network::new(Pid::fake(2)); + let (n_b, f) = Network::new(Pid::fake(1)); std::thread::spawn(f); let addr = tcp(); block_on(n_a.listen(addr.clone())).unwrap(); @@ -304,9 +305,9 @@ fn open_participant_before_remote_part_is_closed() { #[test] fn open_participant_after_remote_part_is_closed() { - let (n_a, f) = Network::new(Pid::fake(1)); + let (n_a, f) = Network::new(Pid::fake(0)); std::thread::spawn(f); - let (n_b, f) = Network::new(Pid::fake(2)); + let (n_b, f) = Network::new(Pid::fake(1)); std::thread::spawn(f); let addr = tcp(); block_on(n_a.listen(addr.clone())).unwrap(); @@ -321,3 +322,25 @@ fn open_participant_after_remote_part_is_closed() { let mut s1_a = block_on(p_a.opened()).unwrap(); assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string())); } + +#[test] +fn close_network_scheduler_completely() { + let (n_a, f) = Network::new(Pid::fake(0)); + let ha = std::thread::spawn(f); + let (n_b, f) = Network::new(Pid::fake(1)); + let hb = std::thread::spawn(f); + let addr = tcp(); + block_on(n_a.listen(addr.clone())).unwrap(); + let p_b = block_on(n_b.connect(addr)).unwrap(); + let mut s1_b = block_on(p_b.open(10, PROMISES_NONE)).unwrap(); + s1_b.send("HelloWorld").unwrap(); + + let p_a = block_on(n_a.connected()).unwrap(); + let mut s1_a = block_on(p_a.opened()).unwrap(); + assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string())); + drop(n_a); + drop(n_b); + std::thread::sleep(std::time::Duration::from_millis(1000)); + ha.join().unwrap(); + hb.join().unwrap(); +} diff --git a/network/tests/helper.rs b/network/tests/helper.rs index 53ce8e0c47..02edf8bbff 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -10,10 +10,7 @@ use tracing_subscriber::EnvFilter; use veloren_network::{Network, Participant, Pid, ProtocolAddr, Stream, PROMISES_NONE}; #[allow(dead_code)] -pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) { - if tracing { - sleep += 1000 - } +pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) { if sleep > 0 { thread::sleep(Duration::from_millis(sleep)); } @@ -49,9 +46,9 @@ pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) { pub async fn network_participant_stream( addr: ProtocolAddr, ) -> (Network, Participant, Stream, Network, Participant, Stream) { - let (n_a, f_a) = Network::new(Pid::fake(1)); + let (n_a, f_a) = Network::new(Pid::fake(0)); std::thread::spawn(f_a); - let (n_b, f_b) = Network::new(Pid::fake(2)); + let (n_b, f_b) = Network::new(Pid::fake(1)); std::thread::spawn(f_b); n_a.listen(addr.clone()).await.unwrap(); diff --git a/server/src/events/player.rs b/server/src/events/player.rs index 306300605e..8b5ea37dfe 100644 --- a/server/src/events/player.rs +++ b/server/src/events/player.rs @@ -79,30 +79,32 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) { pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event { if let Some(client) = server.state().read_storage::().get(entity) { - trace!("Closing participant of client"); let participant = match client.participant.try_lock() { Ok(mut p) => p.take().unwrap(), Err(e) => { - error!(?e, "coudln't lock participant for removal"); + error!(?e, ?entity, "coudln't lock participant for removal"); return Event::ClientDisconnected { entity }; }, }; - std::thread::spawn(|| { - let pid = participant.remote_pid(); + let pid = participant.remote_pid(); + std::thread::spawn(move || { + let span = tracing::span!(tracing::Level::DEBUG, "client_disconnect", ?pid, ?entity); + let _enter = span.enter(); let now = std::time::Instant::now(); - trace!(?pid, "start disconnect"); + debug!(?pid, ?entity, "Start handle disconnect of client"); if let Err(e) = block_on(participant.disconnect()) { debug!( ?e, + ?pid, "Error when disconnecting client, maybe the pipe already broke" ); }; trace!(?pid, "finished disconnect"); let elapsed = now.elapsed(); if elapsed.as_millis() > 100 { - warn!(?elapsed, "disconecting took quite long"); + warn!(?elapsed, ?pid, "disconecting took quite long"); } else { - debug!(?elapsed, "disconecting took"); + debug!(?elapsed, ?pid, "disconecting took"); } }); } diff --git a/server/src/lib.rs b/server/src/lib.rs index aef6fe1dbb..d93851401c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -274,16 +274,19 @@ impl Server { let mut metrics = ServerMetrics::new(); // register all metrics submodules here - let tick_metrics = TickMetrics::new(metrics.registry(), metrics.tick_clone()) + let tick_metrics = TickMetrics::new(metrics.tick_clone()) .expect("Failed to initialize server tick metrics submodule."); - metrics - .run(settings.metrics_address) - .expect("Failed to initialize server metrics submodule."); + tick_metrics + .register(&metrics.registry()) + .expect("failed to register tick metrics"); let thread_pool = ThreadPoolBuilder::new() .name("veloren-worker".to_string()) .build(); - let (network, f) = Network::new(Pid::new()); + let (network, f) = Network::new_with_registry(Pid::new(), &metrics.registry()); + metrics + .run(settings.metrics_address) + .expect("Failed to initialize server metrics submodule."); thread_pool.execute(f); block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?; diff --git a/server/src/metrics.rs b/server/src/metrics.rs index 11913abd7a..a23a77db79 100644 --- a/server/src/metrics.rs +++ b/server/src/metrics.rs @@ -33,7 +33,7 @@ pub struct ServerMetrics { } impl TickMetrics { - pub fn new(registry: &Registry, tick: Arc) -> Result> { + pub fn new(tick: Arc) -> Result> { let player_online = IntGauge::with_opts(Opts::new( "player_online", "shows the number of clients connected to the server", @@ -74,15 +74,6 @@ impl TickMetrics { .expect("Time went backwards"); start_time.set(since_the_epoch.as_secs().try_into()?); - registry.register(Box::new(player_online.clone()))?; - registry.register(Box::new(entity_count.clone()))?; - registry.register(Box::new(build_info.clone()))?; - registry.register(Box::new(start_time.clone()))?; - registry.register(Box::new(time_of_day.clone()))?; - registry.register(Box::new(chonks_count.clone()))?; - registry.register(Box::new(chunks_count.clone()))?; - registry.register(Box::new(tick_time.clone()))?; - Ok(Self { chonks_count, chunks_count, @@ -97,6 +88,18 @@ impl TickMetrics { }) } + pub fn register(&self, registry: &Registry) -> Result<(), Box> { + registry.register(Box::new(self.player_online.clone()))?; + registry.register(Box::new(self.entity_count.clone()))?; + registry.register(Box::new(self.build_info.clone()))?; + registry.register(Box::new(self.start_time.clone()))?; + registry.register(Box::new(self.time_of_day.clone()))?; + registry.register(Box::new(self.chonks_count.clone()))?; + registry.register(Box::new(self.chunks_count.clone()))?; + registry.register(Box::new(self.tick_time.clone()))?; + Ok(()) + } + pub fn is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 } } diff --git a/server/src/sys/message.rs b/server/src/sys/message.rs index 234f6c0a86..415f9d4727 100644 --- a/server/src/sys/message.rs +++ b/server/src/sys/message.rs @@ -25,6 +25,7 @@ use hashbrown::HashMap; use specs::{ Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage, }; +use tracing::{debug, error, info, warn}; impl Sys { ///We needed to move this to a async fn, if we would use a async closures @@ -267,17 +268,13 @@ impl Sys { let msg = mode.new_message(*from, message); new_chat_msgs.push((Some(entity), msg)); } else { - tracing::error!("Could not send message. Missing player uid"); + error!("Could not send message. Missing player uid"); } }, Err(ChatMsgValidationError::TooLong) => { let max = MAX_BYTES_CHAT_MSG; let len = message.len(); - tracing::warn!( - ?len, - ?max, - "Recieved a chat message that's too long" - ) + warn!(?len, ?max, "Recieved a chat message that's too long") }, } }, @@ -342,7 +339,9 @@ impl Sys { client.notify(ServerMsg::Disconnect); }, ClientMsg::Terminate => { + debug!(?entity, "Client send message to termitate session"); server_emitter.emit(ServerEvent::ClientDisconnect(entity)); + break Ok(()); }, ClientMsg::RequestCharacterList => { if let Some(player) = players.get(entity) { @@ -351,11 +350,7 @@ impl Sys { }, ClientMsg::CreateCharacter { alias, tool, body } => { if let Err(error) = alias_validator.validate(&alias) { - tracing::debug!( - ?error, - ?alias, - "denied alias as it contained a banned word" - ); + debug!(?error, ?alias, "denied alias as it contained a banned word"); client.notify(ServerMsg::CharacterActionError(error.to_string())); } else if let Some(player) = players.get(entity) { character_loader.create_character( @@ -522,10 +517,15 @@ impl<'a> System<'a> for Sys { // Update client ping. if cnt > 0 { client.last_ping = time.0 - } else if time.0 - client.last_ping > CLIENT_TIMEOUT // Timeout - || network_err.is_err() + } else if time.0 - client.last_ping > CLIENT_TIMEOUT + // Timeout + { + info!(?entity, "timeout error with client, disconnecting"); + server_emitter.emit(ServerEvent::ClientDisconnect(entity)); + } else if network_err.is_err() // Postbox error { + debug!(?entity, "postbox error with client, disconnecting"); server_emitter.emit(ServerEvent::ClientDisconnect(entity)); } else if time.0 - client.last_ping > CLIENT_TIMEOUT * 0.5 { // Try pinging the client if the timeout is nearing.