revert Client drop to be correct again and also stop network properly, reduce timeout to 10s

This commit is contained in:
Marcel Märtens 2021-04-14 16:17:06 +02:00
parent fb940ad27a
commit 68d326c817
3 changed files with 18 additions and 12 deletions

View File

@ -168,7 +168,7 @@ pub struct Client {
// The pending trade the client is involved in, and it's id // The pending trade the client is involved in, and it's id
pending_trade: Option<(TradeId, PendingTrade, Option<SitePrices>)>, pending_trade: Option<(TradeId, PendingTrade, Option<SitePrices>)>,
_network: Network, network: Option<Network>,
participant: Option<Participant>, participant: Option<Participant>,
general_stream: Stream, general_stream: Stream,
ping_stream: Stream, ping_stream: Stream,
@ -676,7 +676,7 @@ impl Client {
pending_invites: HashSet::new(), pending_invites: HashSet::new(),
pending_trade: None, pending_trade: None,
_network: network, network: Some(network),
participant: Some(participant), participant: Some(participant),
general_stream: stream, general_stream: stream,
ping_stream, ping_stream,
@ -2421,12 +2421,16 @@ impl Drop for Client {
trace!("no disconnect msg necessary as client wasn't registered") trace!("no disconnect msg necessary as client wasn't registered")
} }
self.runtime.spawn( tokio::task::block_in_place(|| {
self.participant if let Err(e) = self
.take() .runtime
.expect("Only set to None in Drop") .block_on(self.participant.take().unwrap().disconnect())
.disconnect(), {
); warn!(?e, "error when disconnecting, couldn't send all data");
}
});
//explicitly drop the network here while the runtime is still existing
drop(self.network.take());
} }
} }

View File

@ -471,7 +471,7 @@ impl Network {
let (finished_sender, finished_receiver) = oneshot::channel(); let (finished_sender, finished_receiver) = oneshot::channel();
finished_receiver_list.push((remote_pid, finished_receiver)); finished_receiver_list.push((remote_pid, finished_receiver));
a2s_disconnect_s a2s_disconnect_s
.send((remote_pid, (Duration::from_secs(120), finished_sender))) .send((remote_pid, (Duration::from_secs(10), finished_sender)))
.expect("Scheduler is closed, but nobody other should be able to close it"); .expect("Scheduler is closed, but nobody other should be able to close it");
}, },
None => trace!(?remote_pid, "Participant already disconnected gracefully"), None => trace!(?remote_pid, "Participant already disconnected gracefully"),
@ -1087,7 +1087,7 @@ where
Err(TryRecvError::Empty) => { Err(TryRecvError::Empty) => {
trace!("activly sleeping"); trace!("activly sleeping");
cnt += 1; cnt += 1;
if cnt > 120 { if cnt > 10 {
error!("Timeout waiting for shutdown, dropping"); error!("Timeout waiting for shutdown, dropping");
break; break;
} }
@ -1137,7 +1137,7 @@ impl Drop for Participant {
debug!("Disconnect from Scheduler"); debug!("Disconnect from Scheduler");
let (finished_sender, finished_receiver) = oneshot::channel(); let (finished_sender, finished_receiver) = oneshot::channel();
match a2s_disconnect_s match a2s_disconnect_s
.send((self.remote_pid, (Duration::from_secs(120), finished_sender))) .send((self.remote_pid, (Duration::from_secs(10), finished_sender)))
{ {
Err(e) => warn!(?e, SCHEDULER_ERR), Err(e) => warn!(?e, SCHEDULER_ERR),
Ok(()) => { Ok(()) => {

View File

@ -681,7 +681,9 @@ impl BParticipant {
trace!("wait again"); trace!("wait again");
wait_for_manager().await; wait_for_manager().await;
sender.send(Ok(())).unwrap(); if sender.send(Ok(())).is_err() {
trace!("couldn't notify sender that participant is dropped");
}
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
self.metrics.participants_disconnected_total.inc(); self.metrics.participants_disconnected_total.inc();