From f3e4f022cb55bf7e749d42a17f777871e63f5bca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Mon, 20 Jun 2022 09:47:46 +0200 Subject: [PATCH] rather than letting the api/Participant handling the cleanup, we had to move over to the bParticipant::shutdownmanager. Because the old case didn't account for the Scheduler got dropped but Participant keept around. Shutdown behavior is quite easy now: bParticipant sends a oneshot, when that hits we stop. also when Participant gets droped we still stop as there woul dbe no sense in continue running the report_mgr ... --- network/src/api.rs | 43 +++++++++++++++----------------- network/src/participant.rs | 50 ++++++++++++++++++++++++++++---------- 2 files changed, 56 insertions(+), 37 deletions(-) diff --git a/network/src/api.rs b/network/src/api.rs index c5972af28d..dd31c482ca 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -61,7 +61,7 @@ pub struct Participant { remote_pid: Pid, a2b_open_stream_s: Mutex>, b2a_stream_opened_r: Mutex>, - a2b_report_channel_s: Option>>>, + a2b_report_channel_s: Mutex>, b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: A2sDisconnect, } @@ -530,7 +530,7 @@ impl Participant { remote_pid, a2b_open_stream_s: Mutex::new(a2b_open_stream_s), b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r), - a2b_report_channel_s: Some(Mutex::new(Some(a2b_report_channel_s))), + a2b_report_channel_s: Mutex::new(a2b_report_channel_s), b2a_bandwidth_stats_r, a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), } @@ -712,9 +712,6 @@ impl Participant { pub async fn disconnect(self) -> Result<(), ParticipantError> { // Remove, Close and try_unwrap error when unwrap fails! debug!("Closing participant from network"); - if let Some(s) = &self.a2b_report_channel_s { - let _ = s.lock().await.take(); // drop so that bparticipant can close - } //Streams will be closed by BParticipant match self.a2s_disconnect_s.lock().await.take() { @@ -769,25 +766,24 @@ impl Participant { /// [`ConnectAddr`]: ConnectAddr pub async fn report_current_connect_addr(&self) -> Result, ParticipantError> { let (p2a_return_report_s, p2a_return_report_r) = oneshot::channel::>(); - if let Some(s) = &self.a2b_report_channel_s { - if let Some(s) = &*s.lock().await { - match s.send(p2a_return_report_s) { - Ok(()) => match p2a_return_report_r.await { - Ok(list) => return Ok(list), - Err(_) => { - debug!("p2a_return_report_r failed, closing participant"); - return Err(ParticipantError::ParticipantDisconnected); - }, - }, - Err(e) => { - debug!(?e, "bParticipant is already closed, notifying"); - return Err(ParticipantError::ParticipantDisconnected); - }, - } - } + match self + .a2b_report_channel_s + .lock() + .await + .send(p2a_return_report_s) + { + Ok(()) => match p2a_return_report_r.await { + Ok(list) => Ok(list), + Err(_) => { + debug!("p2a_return_report_r failed, closing participant"); + Err(ParticipantError::ParticipantDisconnected) + }, + }, + Err(e) => { + debug!(?e, "bParticipant is already closed, notifying"); + Err(ParticipantError::ParticipantDisconnected) + }, } - debug!("Participant is already closed, notifying"); - Err(ParticipantError::ParticipantDisconnected) } /// Returns the current approximation on the maximum bandwidth available. @@ -1186,7 +1182,6 @@ impl Drop for Participant { // ignore closed, as we need to send it even though we disconnected the // participant from network debug!("Shutting down Participant"); - let _ = self.a2b_report_channel_s.take(); // drop so that bparticipant can close match self.a2s_disconnect_s.try_lock() { Err(e) => debug!(?e, "Participant is beeing dropped by Network right now"), diff --git a/network/src/participant.rs b/network/src/participant.rs index ab3a50912a..54c3b1f1dc 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -153,6 +153,7 @@ impl BParticipant { async_channel::unbounded::(); let (b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_r) = async_channel::unbounded::(); + let (b2b_close_report_channel_s, b2b_close_report_channel_r) = oneshot::channel(); let (b2b_notify_send_of_recv_open_s, b2b_notify_send_of_recv_open_r) = crossbeam_channel::unbounded::<(Cid, Sid, Prio, Promises, u64)>(); let (b2b_notify_send_of_recv_close_s, b2b_notify_send_of_recv_close_r) = @@ -194,11 +195,15 @@ impl BParticipant { b2b_add_send_protocol_s, b2b_add_recv_protocol_s, ), - self.report_channels_mgr(run_channels.a2b_report_channel_r), + self.report_channels_mgr( + run_channels.a2b_report_channel_r, + b2b_close_report_channel_r + ), self.participant_shutdown_mgr( run_channels.s2b_shutdown_bparticipant_r, b2b_close_send_protocol_s.clone(), b2b_force_close_recv_protocol_s, + b2b_close_report_channel_s, ), ); } @@ -612,19 +617,30 @@ impl BParticipant { async fn report_channels_mgr( &self, mut a2b_report_channel_r: mpsc::UnboundedReceiver, + b2b_close_report_channel_r: oneshot::Receiver<()>, ) { - while let Some(b2a_report_channel_s) = a2b_report_channel_r.recv().await { - let data = { - let lock = self.channels.read().await; - let mut data = Vec::new(); - for (_, c) in lock.iter() { - data.push(c.lock().await.remote_con_addr.clone()); - } - data - }; - if b2a_report_channel_s.send(data).is_err() { - warn!("couldn't report back connect_addrs"); - }; + let mut b2b_close_report_channel_r = b2b_close_report_channel_r.fuse(); + loop { + let report = select!( + n = a2b_report_channel_r.recv().fuse() => n, + _ = &mut b2b_close_report_channel_r => break, + ); + match report { + Some(b2a_report_channel_s) => { + let data = { + let lock = self.channels.read().await; + let mut data = Vec::new(); + for (_, c) in lock.iter() { + data.push(c.lock().await.remote_con_addr.clone()); + } + data + }; + if b2a_report_channel_s.send(data).is_err() { + warn!("couldn't report back connect_addrs"); + }; + }, + None => break, + } } trace!("Stop report_channels_mgr"); self.shutdown_barrier @@ -658,6 +674,7 @@ impl BParticipant { s2b_shutdown_bparticipant_r: oneshot::Receiver, b2b_close_send_protocol_s: async_channel::Sender, b2b_force_close_recv_protocol_s: async_channel::Sender, + b2b_close_report_channel_s: oneshot::Sender<()>, ) { let wait_for_manager = || async { let mut sleep = 0.01f64; @@ -699,6 +716,13 @@ impl BParticipant { } } drop(lock); + trace!("close report_channel_mgr"); + if let Err(e) = b2b_close_report_channel_s.send(()) { + debug!( + ?e, + "report_channel_mgr could be closed if Participant was dropped already here" + ); + } trace!("wait for other managers"); let timeout = tokio::time::sleep(