From 5f902b5eabbdab479df7ec9ed733b491e11ae79c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Sun, 5 Jul 2020 22:51:07 +0200 Subject: [PATCH] doing a clean shutdown of the BParticipant once the TCP connection is suddenly interrupted --- network/src/participant.rs | 131 ++++++++++++++++++++++--------------- 1 file changed, 77 insertions(+), 54 deletions(-) diff --git a/network/src/participant.rs b/network/src/participant.rs index 28647b234d..07b908f5eb 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -11,6 +11,7 @@ use async_std::sync::RwLock; use futures::{ channel::{mpsc, oneshot}, future::FutureExt, + lock::Mutex, select, sink::SinkExt, stream::StreamExt, @@ -53,6 +54,12 @@ struct ControlChannels { s2b_shutdown_bparticipant_r: oneshot::Receiver>>, /* own */ } +//this is needed in case of a shutdown +struct BParticipantShutdown { + b2b_prios_flushed_r: oneshot::Receiver<()>, + mgr_to_shutdown: Vec>, +} + #[derive(Debug)] pub struct BParticipant { remote_pid: Pid, @@ -63,6 +70,7 @@ pub struct BParticipant { running_mgr: AtomicUsize, run_channels: Option, metrics: Arc, + shutdown_info: Mutex>, no_channel_error_info: RwLock<(Instant, u64)>, } @@ -106,6 +114,7 @@ impl BParticipant { run_channels, metrics, no_channel_error_info: RwLock::new((Instant::now(), 0)), + shutdown_info: Mutex::new(None), }, a2b_steam_open_s, b2a_stream_opened_r, @@ -125,6 +134,14 @@ impl BParticipant { let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<(Cid, Frame)>(); let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = PrioManager::new(self.metrics.clone(), self.remote_pid_string.clone()); + *self.shutdown_info.lock().await = Some(BParticipantShutdown { + b2b_prios_flushed_r, + mgr_to_shutdown: vec![ + shutdown_send_mgr_sender, + shutdown_open_mgr_sender, + shutdown_stream_close_mgr_sender, + ], + }); let run_channels = self.run_channels.take().unwrap(); futures::join!( @@ -152,15 +169,7 @@ impl BParticipant { shutdown_stream_close_mgr_receiver, b2p_notify_empty_stream_s, ), - self.participant_shutdown_mgr( - run_channels.s2b_shutdown_bparticipant_r, - b2b_prios_flushed_r, - vec!( - shutdown_send_mgr_sender, - shutdown_open_mgr_sender, - shutdown_stream_close_mgr_sender - ) - ), + self.participant_shutdown_mgr(run_channels.s2b_shutdown_bparticipant_r,), ); } @@ -248,6 +257,8 @@ impl BParticipant { "FIXME: the frame is actually drop. which is fine for now as the participant \ will be closed, but not if we do channel-takeover" ); + //TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant + self.close_participant(2).await; false } else { true @@ -380,10 +391,10 @@ impl BParticipant { } } }, - Frame::Shutdown => error!( - "Somehow this Shutdown signal got here, i should probably handle it. To not \ - crash let me just put this message here" - ), + Frame::Shutdown => { + debug!("Shutdown received from remote side"); + self.close_participant(2).await; + }, f => unreachable!("never reaches frame!: {:?}", f), } } @@ -502,47 +513,11 @@ impl BParticipant { async fn participant_shutdown_mgr( &self, s2b_shutdown_bparticipant_r: oneshot::Receiver>>, - b2b_prios_flushed_r: oneshot::Receiver<()>, - mut to_shutdown: Vec>, ) { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("start participant_shutdown_mgr"); let sender = s2b_shutdown_bparticipant_r.await.unwrap(); - debug!("closing all managers"); - for sender in to_shutdown.drain(..) { - if let Err(e) = sender.send(()) { - warn!(?e, "manager seems to be closed already, weird, maybe a bug"); - }; - } - debug!("closing all streams"); - for (sid, si) in self.streams.write().await.drain() { - trace!(?sid, "shutting down Stream"); - si.closed.store(true, Ordering::Relaxed); - } - debug!("waiting for prios to be flushed"); - b2b_prios_flushed_r.await.unwrap(); - debug!("closing all channels"); - for ci in self.channels.write().await.drain(..) { - if let Err(e) = ci.b2r_read_shutdown.send(()) { - debug!(?e, ?ci.cid, "seems like this read protocol got already dropped by closing the Stream itself, just ignoring the fact"); - }; - } - //Wait for other bparticipants mgr to close via AtomicUsize - const SLEEP_TIME: Duration = Duration::from_millis(5); - async_std::task::sleep(SLEEP_TIME).await; - let mut i: u32 = 1; - while self.running_mgr.load(Ordering::Relaxed) > 1 { - i += 1; - if i.rem_euclid(10) == 1 { - trace!( - "waiting for bparticipant mgr to shut down, remaining {}", - self.running_mgr.load(Ordering::Relaxed) - 1 - ); - } - async_std::task::sleep(SLEEP_TIME * i).await; - } - trace!("all bparticipant mgr (except me) are shut down now"); - self.metrics.participants_disconnected_total.inc(); + self.close_participant(1).await; sender.send(Ok(())).unwrap(); trace!("stop participant_shutdown_mgr"); self.running_mgr.fetch_sub(1, Ordering::Relaxed); @@ -638,9 +613,57 @@ impl BParticipant { ) } - /* - async fn close_participant(&self) { - + /// this will gracefully shut down the bparticipant + /// allowed_managers: the number of open managers to sleep on. Must be 1 for + /// shutdown_mgr and 2 if it comes from a send error. + async fn close_participant(&self, allowed_managers: usize) { + trace!("participant shutdown triggered"); + let mut info = match self.shutdown_info.lock().await.take() { + Some(info) => info, + None => { + error!( + "Close of participant seemed to be called twice, that's bad, ignoring the 2nd \ + close" + ); + return; + }, + }; + debug!("closing all managers"); + for sender in info.mgr_to_shutdown.drain(..) { + if let Err(e) = sender.send(()) { + warn!(?e, "manager seems to be closed already, weird, maybe a bug"); + }; + } + debug!("closing all streams"); + for (sid, si) in self.streams.write().await.drain() { + trace!(?sid, "shutting down Stream"); + si.closed.store(true, Ordering::Relaxed); + } + debug!("waiting for prios to be flushed"); + info.b2b_prios_flushed_r.await.unwrap(); + debug!("closing all channels"); + for ci in self.channels.write().await.drain(..) { + if let Err(e) = ci.b2r_read_shutdown.send(()) { + debug!(?e, ?ci.cid, "seems like this read protocol got already dropped by closing the Stream itself, just ignoring the fact"); + }; + } + //Wait for other bparticipants mgr to close via AtomicUsize + const SLEEP_TIME: Duration = Duration::from_millis(5); + async_std::task::sleep(SLEEP_TIME).await; + let mut i: u32 = 1; + while self.running_mgr.load(Ordering::Relaxed) > allowed_managers { + i += 1; + if i.rem_euclid(10) == 1 { + trace!( + ?allowed_managers, + "waiting for bparticipant mgr to shut down, remaining {}", + self.running_mgr.load(Ordering::Relaxed) - allowed_managers + ); + } + async_std::task::sleep(SLEEP_TIME * i).await; + } + trace!("all bparticipant mgr (except me) are shut down now"); + self.metrics.participants_disconnected_total.inc(); + debug!("bparticipant close done"); } - */ }