rather than letting the api/Participant handling the cleanup, we had to move over to the bParticipant::shutdownmanager.

Because the old case didn't account for the Scheduler got dropped but Participant keept around.
Shutdown behavior is quite easy now: bParticipant sends a oneshot, when that hits we stop. also when Participant gets droped we still stop as there woul dbe no sense in continue running the report_mgr ...
This commit is contained in:
Marcel Märtens 2022-06-20 09:47:46 +02:00
parent 5b63035506
commit f3e4f022cb
2 changed files with 56 additions and 37 deletions

View File

@ -61,7 +61,7 @@ pub struct Participant {
remote_pid: Pid, remote_pid: Pid,
a2b_open_stream_s: Mutex<mpsc::UnboundedSender<A2bStreamOpen>>, a2b_open_stream_s: Mutex<mpsc::UnboundedSender<A2bStreamOpen>>,
b2a_stream_opened_r: Mutex<mpsc::UnboundedReceiver<Stream>>, b2a_stream_opened_r: Mutex<mpsc::UnboundedReceiver<Stream>>,
a2b_report_channel_s: Option<Mutex<Option<mpsc::UnboundedSender<A2bReportChannel>>>>, a2b_report_channel_s: Mutex<mpsc::UnboundedSender<A2bReportChannel>>,
b2a_bandwidth_stats_r: watch::Receiver<f32>, b2a_bandwidth_stats_r: watch::Receiver<f32>,
a2s_disconnect_s: A2sDisconnect, a2s_disconnect_s: A2sDisconnect,
} }
@ -530,7 +530,7 @@ impl Participant {
remote_pid, remote_pid,
a2b_open_stream_s: Mutex::new(a2b_open_stream_s), a2b_open_stream_s: Mutex::new(a2b_open_stream_s),
b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r), b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r),
a2b_report_channel_s: Some(Mutex::new(Some(a2b_report_channel_s))), a2b_report_channel_s: Mutex::new(a2b_report_channel_s),
b2a_bandwidth_stats_r, b2a_bandwidth_stats_r,
a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
} }
@ -712,9 +712,6 @@ impl Participant {
pub async fn disconnect(self) -> Result<(), ParticipantError> { pub async fn disconnect(self) -> Result<(), ParticipantError> {
// Remove, Close and try_unwrap error when unwrap fails! // Remove, Close and try_unwrap error when unwrap fails!
debug!("Closing participant from network"); debug!("Closing participant from network");
if let Some(s) = &self.a2b_report_channel_s {
let _ = s.lock().await.take(); // drop so that bparticipant can close
}
//Streams will be closed by BParticipant //Streams will be closed by BParticipant
match self.a2s_disconnect_s.lock().await.take() { match self.a2s_disconnect_s.lock().await.take() {
@ -769,26 +766,25 @@ impl Participant {
/// [`ConnectAddr`]: ConnectAddr /// [`ConnectAddr`]: ConnectAddr
pub async fn report_current_connect_addr(&self) -> Result<Vec<ConnectAddr>, ParticipantError> { pub async fn report_current_connect_addr(&self) -> Result<Vec<ConnectAddr>, ParticipantError> {
let (p2a_return_report_s, p2a_return_report_r) = oneshot::channel::<Vec<ConnectAddr>>(); let (p2a_return_report_s, p2a_return_report_r) = oneshot::channel::<Vec<ConnectAddr>>();
if let Some(s) = &self.a2b_report_channel_s { match self
if let Some(s) = &*s.lock().await { .a2b_report_channel_s
match s.send(p2a_return_report_s) { .lock()
.await
.send(p2a_return_report_s)
{
Ok(()) => match p2a_return_report_r.await { Ok(()) => match p2a_return_report_r.await {
Ok(list) => return Ok(list), Ok(list) => Ok(list),
Err(_) => { Err(_) => {
debug!("p2a_return_report_r failed, closing participant"); debug!("p2a_return_report_r failed, closing participant");
return Err(ParticipantError::ParticipantDisconnected); Err(ParticipantError::ParticipantDisconnected)
}, },
}, },
Err(e) => { Err(e) => {
debug!(?e, "bParticipant is already closed, notifying"); debug!(?e, "bParticipant is already closed, notifying");
return Err(ParticipantError::ParticipantDisconnected); Err(ParticipantError::ParticipantDisconnected)
}, },
} }
} }
}
debug!("Participant is already closed, notifying");
Err(ParticipantError::ParticipantDisconnected)
}
/// Returns the current approximation on the maximum bandwidth available. /// Returns the current approximation on the maximum bandwidth available.
/// This WILL fluctuate based on the amount/size of send messages. /// This WILL fluctuate based on the amount/size of send messages.
@ -1186,7 +1182,6 @@ impl Drop for Participant {
// ignore closed, as we need to send it even though we disconnected the // ignore closed, as we need to send it even though we disconnected the
// participant from network // participant from network
debug!("Shutting down Participant"); debug!("Shutting down Participant");
let _ = self.a2b_report_channel_s.take(); // drop so that bparticipant can close
match self.a2s_disconnect_s.try_lock() { match self.a2s_disconnect_s.try_lock() {
Err(e) => debug!(?e, "Participant is beeing dropped by Network right now"), Err(e) => debug!(?e, "Participant is beeing dropped by Network right now"),

View File

@ -153,6 +153,7 @@ impl BParticipant {
async_channel::unbounded::<Cid>(); async_channel::unbounded::<Cid>();
let (b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_r) = let (b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_r) =
async_channel::unbounded::<Cid>(); async_channel::unbounded::<Cid>();
let (b2b_close_report_channel_s, b2b_close_report_channel_r) = oneshot::channel();
let (b2b_notify_send_of_recv_open_s, b2b_notify_send_of_recv_open_r) = let (b2b_notify_send_of_recv_open_s, b2b_notify_send_of_recv_open_r) =
crossbeam_channel::unbounded::<(Cid, Sid, Prio, Promises, u64)>(); crossbeam_channel::unbounded::<(Cid, Sid, Prio, Promises, u64)>();
let (b2b_notify_send_of_recv_close_s, b2b_notify_send_of_recv_close_r) = let (b2b_notify_send_of_recv_close_s, b2b_notify_send_of_recv_close_r) =
@ -194,11 +195,15 @@ impl BParticipant {
b2b_add_send_protocol_s, b2b_add_send_protocol_s,
b2b_add_recv_protocol_s, b2b_add_recv_protocol_s,
), ),
self.report_channels_mgr(run_channels.a2b_report_channel_r), self.report_channels_mgr(
run_channels.a2b_report_channel_r,
b2b_close_report_channel_r
),
self.participant_shutdown_mgr( self.participant_shutdown_mgr(
run_channels.s2b_shutdown_bparticipant_r, run_channels.s2b_shutdown_bparticipant_r,
b2b_close_send_protocol_s.clone(), b2b_close_send_protocol_s.clone(),
b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_s,
b2b_close_report_channel_s,
), ),
); );
} }
@ -612,8 +617,16 @@ impl BParticipant {
async fn report_channels_mgr( async fn report_channels_mgr(
&self, &self,
mut a2b_report_channel_r: mpsc::UnboundedReceiver<A2bReportChannel>, mut a2b_report_channel_r: mpsc::UnboundedReceiver<A2bReportChannel>,
b2b_close_report_channel_r: oneshot::Receiver<()>,
) { ) {
while let Some(b2a_report_channel_s) = a2b_report_channel_r.recv().await { let mut b2b_close_report_channel_r = b2b_close_report_channel_r.fuse();
loop {
let report = select!(
n = a2b_report_channel_r.recv().fuse() => n,
_ = &mut b2b_close_report_channel_r => break,
);
match report {
Some(b2a_report_channel_s) => {
let data = { let data = {
let lock = self.channels.read().await; let lock = self.channels.read().await;
let mut data = Vec::new(); let mut data = Vec::new();
@ -625,6 +638,9 @@ impl BParticipant {
if b2a_report_channel_s.send(data).is_err() { if b2a_report_channel_s.send(data).is_err() {
warn!("couldn't report back connect_addrs"); warn!("couldn't report back connect_addrs");
}; };
},
None => break,
}
} }
trace!("Stop report_channels_mgr"); trace!("Stop report_channels_mgr");
self.shutdown_barrier self.shutdown_barrier
@ -658,6 +674,7 @@ impl BParticipant {
s2b_shutdown_bparticipant_r: oneshot::Receiver<S2bShutdownBparticipant>, s2b_shutdown_bparticipant_r: oneshot::Receiver<S2bShutdownBparticipant>,
b2b_close_send_protocol_s: async_channel::Sender<Cid>, b2b_close_send_protocol_s: async_channel::Sender<Cid>,
b2b_force_close_recv_protocol_s: async_channel::Sender<Cid>, b2b_force_close_recv_protocol_s: async_channel::Sender<Cid>,
b2b_close_report_channel_s: oneshot::Sender<()>,
) { ) {
let wait_for_manager = || async { let wait_for_manager = || async {
let mut sleep = 0.01f64; let mut sleep = 0.01f64;
@ -699,6 +716,13 @@ impl BParticipant {
} }
} }
drop(lock); drop(lock);
trace!("close report_channel_mgr");
if let Err(e) = b2b_close_report_channel_s.send(()) {
debug!(
?e,
"report_channel_mgr could be closed if Participant was dropped already here"
);
}
trace!("wait for other managers"); trace!("wait for other managers");
let timeout = tokio::time::sleep( let timeout = tokio::time::sleep(