diff --git a/network/Cargo.toml b/network/Cargo.toml index 2a56ae7f9c..553ec532ab 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -40,4 +40,5 @@ tracing-subscriber = { version = "0.2.3", default-features = false, features = [ uvth = { version = ">= 3.0, <= 4.0", default-features = false } clap = { version = "2.33", default-features = false } shellexpand = "2.0.0" -tiny_http = "0.7.0" \ No newline at end of file +tiny_http = "0.7.0" +serde = { version = "1.0", features = ["derive"] } \ No newline at end of file diff --git a/network/src/api.rs b/network/src/api.rs index af171b7e1b..972c71b400 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -955,17 +955,13 @@ impl Drop for Participant { impl Drop for Stream { fn drop(&mut self) { - // send if closed is unnecessary but doesnt hurt, we must not crash + // send if closed is unnecessary but doesn't hurt, we must not crash if !self.send_closed.load(Ordering::Relaxed) { let sid = self.sid; let pid = self.pid; debug!(?pid, ?sid, "Shutting down Stream"); - if task::block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid)).is_err() { - warn!( - "Other side got already dropped, probably due to timing, other side will \ - handle this gracefully" - ); - }; + task::block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid)) + .expect("bparticipant part of a gracefully shutdown must have crashed"); } else { let sid = self.sid; let pid = self.pid; diff --git a/network/src/participant.rs b/network/src/participant.rs index 4734040760..1595659e9b 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -143,7 +143,6 @@ impl BParticipant { let (shutdown_stream_close_mgr_sender, shutdown_stream_close_mgr_receiver) = oneshot::channel(); let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel(); - let (b2b_prios_flushed_s, b2b_prios_flushed_r) = oneshot::channel(); let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::(); let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = PrioManager::new( #[cfg(feature = "metrics")] @@ -166,12 +165,7 @@ impl BParticipant { a2p_msg_s.clone(), ), self.create_channel_mgr(run_channels.s2b_create_channel_r, w2b_frames_s), - self.send_mgr( - prios, - shutdown_send_mgr_receiver, - b2b_prios_flushed_s, - b2s_prio_statistic_s - ), + self.send_mgr(prios, shutdown_send_mgr_receiver, b2s_prio_statistic_s), self.stream_close_mgr( run_channels.a2b_close_stream_r, shutdown_stream_close_mgr_receiver, @@ -179,12 +173,9 @@ impl BParticipant { ), self.participant_shutdown_mgr( run_channels.s2b_shutdown_bparticipant_r, - b2b_prios_flushed_r, - vec![ - shutdown_send_mgr_sender, - shutdown_open_mgr_sender, - shutdown_stream_close_mgr_sender, - ], + shutdown_open_mgr_sender, + shutdown_stream_close_mgr_sender, + shutdown_send_mgr_sender, ), ); } @@ -192,8 +183,7 @@ impl BParticipant { async fn send_mgr( &self, mut prios: PrioManager, - mut shutdown_send_mgr_receiver: oneshot::Receiver<()>, - b2b_prios_flushed_s: oneshot::Sender<()>, + mut shutdown_send_mgr_receiver: oneshot::Receiver>, mut b2s_prio_statistic_s: mpsc::UnboundedSender, ) { //This time equals the MINIMUM Latency in average, so keep it down and //Todo: @@ -202,7 +192,7 @@ impl BParticipant { const TICK_TIME: Duration = Duration::from_millis(10); const FRAMES_PER_TICK: usize = 10005; self.running_mgr.fetch_add(1, Ordering::Relaxed); - let mut closing_up = false; + let mut b2b_prios_flushed_s = None; //closing up trace!("Start send_mgr"); #[cfg(feature = "metrics")] let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone()); @@ -229,20 +219,22 @@ impl BParticipant { trace!("Did 1000 ticks"); } //shutdown after all msg are send! - if closing_up && (len == 0) { + // Make sure this is called after the API is closed, and all streams are known + // to be droped to the priomgr + if b2b_prios_flushed_s.is_some() && (len == 0) { break; } - //this IF below the break IF to give it another chance to close all streams - // closed - if !closing_up && shutdown_send_mgr_receiver.try_recv().unwrap().is_some() { - closing_up = true; - //FIXME: quickfix for an error that we are WAITING on close confirmation of - // streams from prio manager while prio manager is already shut down. - async_std::task::sleep(TICK_TIME * 10).await; + if b2b_prios_flushed_s.is_none() { + if let Some(prios_flushed_s) = shutdown_send_mgr_receiver.try_recv().unwrap() { + b2b_prios_flushed_s = Some(prios_flushed_s); + } } } trace!("Stop send_mgr"); - b2b_prios_flushed_s.send(()).unwrap(); + b2b_prios_flushed_s + .expect("b2b_prios_flushed_s not set") + .send(()) + .unwrap(); self.running_mgr.fetch_sub(1, Ordering::Relaxed); } @@ -321,6 +313,8 @@ impl BParticipant { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("Start handle_frames_mgr"); let mut messages = HashMap::new(); + #[cfg(feature = "metrics")] + let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone()); let mut dropped_instant = Instant::now(); let mut dropped_cnt = 0u64; let mut dropped_sid = Sid::new(0); @@ -371,33 +365,16 @@ impl BParticipant { } }, Frame::CloseStream { sid } => { - // Closing is realised by setting a AtomicBool to true, however we also have a - // guarantee that send or recv fails if the other one is destroyed - // However Stream.send() is not async and their receiver isn't dropped if Steam - // is dropped, so i need a way to notify the Stream that it's send messages will - // be dropped... from remote, notify local - trace!( - ?sid, - "Got remote request to close a stream, without flushing it, local \ - messages are dropped" - ); - // no wait for flush here, as the remote wouldn't care anyway. - if let Some(si) = self.streams.write().await.remove(&sid) { + // no need to keep flushing as the remote no longer knows about this stream + // anyway + self.delete_stream( + sid, + None, + true, #[cfg(feature = "metrics")] - self.metrics - .streams_closed_total - .with_label_values(&[&self.remote_pid_string]) - .inc(); - si.send_closed.store(true, Ordering::Relaxed); - si.b2a_msg_recv_s.into_inner().close_channel(); - trace!(?sid, "Closed stream from remote"); - } else { - warn!( - ?sid, - "Couldn't find stream to close, either this is a duplicate message, \ - or the local copy of the Stream got closed simultaniously" - ); - } + &mut send_cache, + ) + .await; }, Frame::DataHeader { mid, sid, length } => { let imsg = IncomingMessage { @@ -434,8 +411,9 @@ impl BParticipant { } else { //aggregate errors let n = Instant::now(); - if dropped_sid != imsg.sid - || n.duration_since(dropped_instant) > Duration::from_secs(1) + if dropped_cnt > 0 + && (dropped_sid != imsg.sid + || n.duration_since(dropped_instant) > Duration::from_secs(1)) { warn!( ?dropped_cnt, @@ -613,8 +591,9 @@ impl BParticipant { async fn participant_shutdown_mgr( &self, s2b_shutdown_bparticipant_r: oneshot::Receiver, - b2b_prios_flushed_r: oneshot::Receiver<()>, - mut mgr_to_shutdown: Vec>, + shutdown_open_mgr_sender: oneshot::Sender<()>, + shutdown_stream_close_mgr_sender: oneshot::Sender>, + shutdown_send_mgr_sender: oneshot::Sender>, ) { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("Start participant_shutdown_mgr"); @@ -626,13 +605,27 @@ impl BParticipant { self.close_api(None).await; debug!("Closing all managers"); - for sender in mgr_to_shutdown.drain(..) { - if let Err(e) = sender.send(()) { - warn!(?e, "Manager seems to be closed already, weird, maybe a bug"); - }; - } + shutdown_open_mgr_sender + .send(()) + .expect("open_mgr must have crashed before"); + let (b2b_stream_close_shutdown_confirmed_s, b2b_stream_close_shutdown_confirmed_r) = + oneshot::channel(); + shutdown_stream_close_mgr_sender + .send(b2b_stream_close_shutdown_confirmed_s) + .expect("stream_close_mgr must have crashed before"); + // We need to wait for the stream_close_mgr BEFORE send_mgr, as the + // stream_close_mgr needs to wait on the API to drop `Stream` and be triggered + // It will then sleep for streams to be flushed in PRIO, and send_mgr is + // responsible for ticking PRIO WHILE this happens, so we cant close it before! + b2b_stream_close_shutdown_confirmed_r.await.unwrap(); + //closing send_mgr now: + let (b2b_prios_flushed_s, b2b_prios_flushed_r) = oneshot::channel(); + shutdown_send_mgr_sender + .send(b2b_prios_flushed_s) + .expect("stream_close_mgr must have crashed before"); b2b_prios_flushed_r.await.unwrap(); + if Some(ParticipantError::ParticipantDisconnected) != self.shutdown_info.read().await.error { debug!("Sending shutdown frame after flushed all prios"); @@ -701,7 +694,7 @@ impl BParticipant { async fn stream_close_mgr( &self, mut a2b_close_stream_r: mpsc::UnboundedReceiver, - shutdown_stream_close_mgr_receiver: oneshot::Receiver<()>, + shutdown_stream_close_mgr_receiver: oneshot::Receiver>, b2p_notify_empty_stream_s: crossbeam_channel::Sender<(Sid, oneshot::Sender<()>)>, ) { self.running_mgr.fetch_add(1, Ordering::Relaxed); @@ -709,59 +702,115 @@ impl BParticipant { #[cfg(feature = "metrics")] let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone()); let mut shutdown_stream_close_mgr_receiver = shutdown_stream_close_mgr_receiver.fuse(); + let mut b2b_stream_close_shutdown_confirmed_s = None; //from api or shutdown signal while let Some(sid) = select! { next = a2b_close_stream_r.next().fuse() => next, - _ = shutdown_stream_close_mgr_receiver => None, + sender = shutdown_stream_close_mgr_receiver => { + b2b_stream_close_shutdown_confirmed_s = Some(sender.unwrap()); + None + } } { //TODO: make this concurrent! //TODO: Performance, closing is slow! - trace!(?sid, "Got request from api to close steam"); - //This needs to first stop clients from sending any more. - //Then it will wait for all pending messages (in prio) to be send to the - // protocol After this happened the stream is closed - //Only after all messages are send to the prococol, we can send the CloseStream - // frame! If we would send it before, all followup messages couldn't - // be handled at the remote side. - - trace!(?sid, "Stopping api to use this stream"); - match self.streams.read().await.get(&sid) { - Some(si) => { - si.send_closed.store(true, Ordering::Relaxed); - si.b2a_msg_recv_s.lock().await.close_channel(); - }, - None => warn!("Couldn't find the stream, might be simultaneous close from remote"), - } - - //TODO: what happens if RIGHT NOW the remote sends a StreamClose and this - // streams get closed and removed? RACE CONDITION - trace!(?sid, "Wait for stream to be flushed"); - let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) = oneshot::channel(); - b2p_notify_empty_stream_s - .send((sid, s2b_stream_finished_closed_s)) - .unwrap(); - s2b_stream_finished_closed_r.await.unwrap(); - - trace!(?sid, "Stream was successfully flushed"); - #[cfg(feature = "metrics")] - self.metrics - .streams_closed_total - .with_label_values(&[&self.remote_pid_string]) - .inc(); - //only now remove the Stream, that means we can still recv on it. - self.streams.write().await.remove(&sid); - self.send_frame( - Frame::CloseStream { sid }, + self.delete_stream( + sid, + Some(b2p_notify_empty_stream_s.clone()), + false, #[cfg(feature = "metrics")] &mut send_cache, ) .await; } + trace!("deleting all leftover streams"); + let sids = self + .streams + .read() + .await + .keys() + .cloned() + .collect::>(); + for sid in sids { + //flushing is still important, e.g. when Participant::drop is called (but + // Stream:drop isn't)! + self.delete_stream( + sid, + Some(b2p_notify_empty_stream_s.clone()), + false, + #[cfg(feature = "metrics")] + &mut send_cache, + ) + .await; + } + if b2b_stream_close_shutdown_confirmed_s.is_none() { + b2b_stream_close_shutdown_confirmed_s = + Some(shutdown_stream_close_mgr_receiver.await.unwrap()); + } + b2b_stream_close_shutdown_confirmed_s + .unwrap() + .send(()) + .unwrap(); trace!("Stop stream_close_mgr"); self.running_mgr.fetch_sub(1, Ordering::Relaxed); } + async fn delete_stream( + &self, + sid: Sid, + b2p_notify_empty_stream_s: Option)>>, + from_remote: bool, + #[cfg(feature = "metrics")] frames_out_total_cache: &mut MultiCidFrameCache, + ) { + let span = span!(Level::INFO, "delete_stream", ?sid, ?from_remote); + let _enter = span.enter(); + //This needs to first stop clients from sending any more. + //Then it will wait for all pending messages (in prio) to be send to the + // protocol After this happened the stream is closed + //Only after all messages are send to the protocol, we can send the CloseStream + // frame! If we would send it before, all followup messages couldn't + // be handled at the remote side. + trace!("Stopping api to use this stream"); + match self.streams.read().await.get(&sid) { + Some(si) => { + si.send_closed.store(true, Ordering::Relaxed); + si.b2a_msg_recv_s.lock().await.close_channel(); + }, + None => { + trace!("Couldn't find the stream, might be simultaneous close from local/remote") + }, + } + + if !from_remote { + trace!("Wait for stream to be flushed"); + let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) = oneshot::channel(); + b2p_notify_empty_stream_s + .expect("needs to be set when from_remote is false") + .send((sid, s2b_stream_finished_closed_s)) + .unwrap(); + s2b_stream_finished_closed_r.await.unwrap(); + + trace!("Stream was successfully flushed"); + } + + #[cfg(feature = "metrics")] + self.metrics + .streams_closed_total + .with_label_values(&[&self.remote_pid_string]) + .inc(); + //only now remove the Stream, that means we can still recv on it. + self.streams.write().await.remove(&sid); + + if !from_remote { + self.send_frame( + Frame::CloseStream { sid }, + #[cfg(feature = "metrics")] + frames_out_total_cache, + ) + .await; + } + } + async fn create_stream( &self, sid: Sid, @@ -815,7 +864,7 @@ impl BParticipant { async fn close_api(&self, reason: Option) { self.close_write_api(reason).await; debug!("Closing all streams"); - for (sid, si) in self.streams.write().await.drain() { + for (sid, si) in self.streams.read().await.iter() { trace!(?sid, "Shutting down Stream"); si.b2a_msg_recv_s.lock().await.close_channel(); }