From aa88f9005692889a726b5a5bee70d204d670e307 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Fri, 26 Feb 2021 10:45:38 +0100 Subject: [PATCH] Shutdown improvements - Timeout for Participant::drop, it will stop eventually - Detect tokio runtime in Participant::drop and no longer use std::sleep in that case (it could hang the thread that is actually doing the shutdown work and deadlock - Parallel Shutdown in Scheduler: Instead of a slow shutdown locking up everything we can now shutdown participants in parallel, this should reduces `WARN` part took long for shutdown dramatically --- network/src/api.rs | 60 +++++++++++++++++++++-------------- network/src/scheduler.rs | 68 ++++++++++++++++++++++------------------ 2 files changed, 75 insertions(+), 53 deletions(-) diff --git a/network/src/api.rs b/network/src/api.rs index c71357a5fd..1a718ffc2f 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -482,7 +482,7 @@ impl Participant { /// [`Bandwidth`]: network_protocol::Bandwidth /// [`Promises`]: network_protocol::Promises /// [`Streams`]: crate::api::Stream - #[instrument(name="network", skip(self, prio, promises), fields(p = %self.local_pid))] + #[instrument(name="network", skip(self, prio, promises, bandwidth), fields(p = %self.local_pid))] pub async fn open( &self, prio: u8, @@ -1013,6 +1013,9 @@ impl Drop for Participant { #[instrument(name="remote", skip(self), fields(p = %self.remote_pid))] #[instrument(name="network", skip(self), fields(p = %self.local_pid))] fn drop(&mut self) { + const SHUTDOWN_ERR: &str = "Error while dropping the participant, couldn't send all \ + outgoing messages, dropping remaining"; + const CHANNEL_ERR: &str = "Something is wrong in internal scheduler/participant coding"; use tokio::sync::oneshot::error::TryRecvError; // ignore closed, as we need to send it even though we disconnected the // participant from network @@ -1031,29 +1034,40 @@ impl Drop for Participant { a2s_disconnect_s .send((self.remote_pid, (Duration::from_secs(120), finished_sender))) .expect("Something is wrong in internal scheduler coding"); - loop { - match finished_receiver.try_recv() { - Ok(Ok(())) => { - info!("Participant dropped gracefully"); - break; - }, - Ok(Err(e)) => { - error!( - ?e, - "Error while dropping the participant, couldn't send all outgoing \ - messages, dropping remaining" - ); - break; - }, - Err(TryRecvError::Closed) => { - panic!("Something is wrong in internal scheduler/participant coding") - }, - Err(TryRecvError::Empty) => { - trace!("activly sleeping"); - std::thread::sleep(Duration::from_millis(20)); - }, + if let Ok(handle) = tokio::runtime::Handle::try_current() { + trace!("Participant drop Async"); + handle.spawn(async move { + match finished_receiver.await { + Ok(Ok(())) => info!("Participant dropped gracefully"), + Ok(Err(e)) => error!(?e, SHUTDOWN_ERR), + Err(e) => panic!("{}: {}", CHANNEL_ERR, e), + } + }); + } else { + let mut cnt = 0; + loop { + match finished_receiver.try_recv() { + Ok(Ok(())) => { + info!("Participant dropped gracefully"); + break; + }, + Ok(Err(e)) => { + error!(?e, SHUTDOWN_ERR); + break; + }, + Err(TryRecvError::Closed) => panic!(CHANNEL_ERR), + Err(TryRecvError::Empty) => { + trace!("activly sleeping"); + cnt += 1; + if cnt > 120 { + error!("Timeout waiting for participant shutdown, droping"); + break; + } + std::thread::sleep(Duration::from_millis(100) * cnt); + }, + } } - } + }; }, } } diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 8aed480abf..77baa45c99 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -299,37 +299,45 @@ impl Scheduler { trace!("Stop connect_mgr"); } - async fn disconnect_mgr(&self, mut a2s_disconnect_r: mpsc::UnboundedReceiver) { + async fn disconnect_mgr(&self, a2s_disconnect_r: mpsc::UnboundedReceiver) { trace!("Start disconnect_mgr"); - while let Some((pid, (timeout_time, return_once_successful_shutdown))) = - a2s_disconnect_r.recv().await - { - //Closing Participants is done the following way: - // 1. We drop our senders and receivers - // 2. we need to close BParticipant, this will drop its senderns and receivers - // 3. Participant will try to access the BParticipant senders and receivers with - // their next api action, it will fail and be closed then. - trace!(?pid, "Got request to close participant"); - let pi = self.participants.lock().await.remove(&pid); - trace!(?pid, "dropped participants lock"); - if let Some(mut pi) = pi { - let (finished_sender, finished_receiver) = oneshot::channel(); - pi.s2b_shutdown_bparticipant_s - .take() - .unwrap() - .send((timeout_time, finished_sender)) - .unwrap(); - drop(pi); - trace!(?pid, "dropped bparticipant, waiting for finish"); - let e = finished_receiver.await.unwrap(); - trace!(?pid, "waiting completed"); - return_once_successful_shutdown.send(e).unwrap(); - } else { - debug!(?pid, "Looks like participant is already dropped"); - return_once_successful_shutdown.send(Ok(())).unwrap(); - } - trace!(?pid, "Closed participant"); - } + + let a2s_disconnect_r = UnboundedReceiverStream::new(a2s_disconnect_r); + a2s_disconnect_r + .for_each_concurrent( + None, + |(pid, (timeout_time, return_once_successful_shutdown))| { + //Closing Participants is done the following way: + // 1. We drop our senders and receivers + // 2. we need to close BParticipant, this will drop its senderns and receivers + // 3. Participant will try to access the BParticipant senders and receivers with + // their next api action, it will fail and be closed then. + let participants = Arc::clone(&self.participants); + async move { + trace!(?pid, "Got request to close participant"); + let pi = participants.lock().await.remove(&pid); + trace!(?pid, "dropped participants lock"); + if let Some(mut pi) = pi { + let (finished_sender, finished_receiver) = oneshot::channel(); + pi.s2b_shutdown_bparticipant_s + .take() + .unwrap() + .send((timeout_time, finished_sender)) + .unwrap(); + drop(pi); + trace!(?pid, "dropped bparticipant, waiting for finish"); + let e = finished_receiver.await.unwrap(); + trace!(?pid, "waiting completed"); + return_once_successful_shutdown.send(e).unwrap(); + } else { + debug!(?pid, "Looks like participant is already dropped"); + return_once_successful_shutdown.send(Ok(())).unwrap(); + } + trace!(?pid, "Closed participant"); + } + }, + ) + .await; trace!("Stop disconnect_mgr"); }