Shutdown improvements

- Timeout for Participant::drop, it will stop eventually
 - Detect tokio runtime in Participant::drop and no longer use std::sleep in that case (it could hang the thread that is actually doing the shutdown work and deadlock
 - Parallel Shutdown in Scheduler: Instead of a slow shutdown locking up everything we can now shutdown participants in parallel, this should reduces `WARN` part took long for shutdown dramatically
This commit is contained in:
Marcel Märtens 2021-02-26 10:45:38 +01:00
parent 914266c705
commit 44817870ee
2 changed files with 75 additions and 53 deletions

View File

@ -482,7 +482,7 @@ impl Participant {
/// [`Bandwidth`]: network_protocol::Bandwidth
/// [`Promises`]: network_protocol::Promises
/// [`Streams`]: crate::api::Stream
#[instrument(name="network", skip(self, prio, promises), fields(p = %self.local_pid))]
#[instrument(name="network", skip(self, prio, promises, bandwidth), fields(p = %self.local_pid))]
pub async fn open(
&self,
prio: u8,
@ -1013,6 +1013,9 @@ impl Drop for Participant {
#[instrument(name="remote", skip(self), fields(p = %self.remote_pid))]
#[instrument(name="network", skip(self), fields(p = %self.local_pid))]
fn drop(&mut self) {
const SHUTDOWN_ERR: &str = "Error while dropping the participant, couldn't send all \
outgoing messages, dropping remaining";
const CHANNEL_ERR: &str = "Something is wrong in internal scheduler/participant coding";
use tokio::sync::oneshot::error::TryRecvError;
// ignore closed, as we need to send it even though we disconnected the
// participant from network
@ -1031,29 +1034,40 @@ impl Drop for Participant {
a2s_disconnect_s
.send((self.remote_pid, (Duration::from_secs(120), finished_sender)))
.expect("Something is wrong in internal scheduler coding");
loop {
match finished_receiver.try_recv() {
Ok(Ok(())) => {
info!("Participant dropped gracefully");
break;
},
Ok(Err(e)) => {
error!(
?e,
"Error while dropping the participant, couldn't send all outgoing \
messages, dropping remaining"
);
break;
},
Err(TryRecvError::Closed) => {
panic!("Something is wrong in internal scheduler/participant coding")
},
Err(TryRecvError::Empty) => {
trace!("activly sleeping");
std::thread::sleep(Duration::from_millis(20));
},
if let Ok(handle) = tokio::runtime::Handle::try_current() {
trace!("Participant drop Async");
handle.spawn(async move {
match finished_receiver.await {
Ok(Ok(())) => info!("Participant dropped gracefully"),
Ok(Err(e)) => error!(?e, SHUTDOWN_ERR),
Err(e) => panic!("{}: {}", CHANNEL_ERR, e),
}
});
} else {
let mut cnt = 0;
loop {
match finished_receiver.try_recv() {
Ok(Ok(())) => {
info!("Participant dropped gracefully");
break;
},
Ok(Err(e)) => {
error!(?e, SHUTDOWN_ERR);
break;
},
Err(TryRecvError::Closed) => panic!(CHANNEL_ERR),
Err(TryRecvError::Empty) => {
trace!("activly sleeping");
cnt += 1;
if cnt > 120 {
error!("Timeout waiting for participant shutdown, droping");
break;
}
std::thread::sleep(Duration::from_millis(100) * cnt);
},
}
}
}
};
},
}
}

View File

@ -299,37 +299,45 @@ impl Scheduler {
trace!("Stop connect_mgr");
}
async fn disconnect_mgr(&self, mut a2s_disconnect_r: mpsc::UnboundedReceiver<A2sDisconnect>) {
async fn disconnect_mgr(&self, a2s_disconnect_r: mpsc::UnboundedReceiver<A2sDisconnect>) {
trace!("Start disconnect_mgr");
while let Some((pid, (timeout_time, return_once_successful_shutdown))) =
a2s_disconnect_r.recv().await
{
//Closing Participants is done the following way:
// 1. We drop our senders and receivers
// 2. we need to close BParticipant, this will drop its senderns and receivers
// 3. Participant will try to access the BParticipant senders and receivers with
// their next api action, it will fail and be closed then.
trace!(?pid, "Got request to close participant");
let pi = self.participants.lock().await.remove(&pid);
trace!(?pid, "dropped participants lock");
if let Some(mut pi) = pi {
let (finished_sender, finished_receiver) = oneshot::channel();
pi.s2b_shutdown_bparticipant_s
.take()
.unwrap()
.send((timeout_time, finished_sender))
.unwrap();
drop(pi);
trace!(?pid, "dropped bparticipant, waiting for finish");
let e = finished_receiver.await.unwrap();
trace!(?pid, "waiting completed");
return_once_successful_shutdown.send(e).unwrap();
} else {
debug!(?pid, "Looks like participant is already dropped");
return_once_successful_shutdown.send(Ok(())).unwrap();
}
trace!(?pid, "Closed participant");
}
let a2s_disconnect_r = UnboundedReceiverStream::new(a2s_disconnect_r);
a2s_disconnect_r
.for_each_concurrent(
None,
|(pid, (timeout_time, return_once_successful_shutdown))| {
//Closing Participants is done the following way:
// 1. We drop our senders and receivers
// 2. we need to close BParticipant, this will drop its senderns and receivers
// 3. Participant will try to access the BParticipant senders and receivers with
// their next api action, it will fail and be closed then.
let participants = Arc::clone(&self.participants);
async move {
trace!(?pid, "Got request to close participant");
let pi = participants.lock().await.remove(&pid);
trace!(?pid, "dropped participants lock");
if let Some(mut pi) = pi {
let (finished_sender, finished_receiver) = oneshot::channel();
pi.s2b_shutdown_bparticipant_s
.take()
.unwrap()
.send((timeout_time, finished_sender))
.unwrap();
drop(pi);
trace!(?pid, "dropped bparticipant, waiting for finish");
let e = finished_receiver.await.unwrap();
trace!(?pid, "waiting completed");
return_once_successful_shutdown.send(e).unwrap();
} else {
debug!(?pid, "Looks like participant is already dropped");
return_once_successful_shutdown.send(Ok(())).unwrap();
}
trace!(?pid, "Closed participant");
}
},
)
.await;
trace!("Stop disconnect_mgr");
}