Merge branch 'xMAC94x/netfixB' into 'master'

FIX for hanging participant deletion.

See merge request veloren/veloren!1437
This commit is contained in:
Marcel 2020-10-14 14:16:38 +00:00
commit 15a96f5812
3 changed files with 153 additions and 107 deletions

View File

@ -40,4 +40,5 @@ tracing-subscriber = { version = "0.2.3", default-features = false, features = [
uvth = { version = ">= 3.0, <= 4.0", default-features = false }
clap = { version = "2.33", default-features = false }
shellexpand = "2.0.0"
tiny_http = "0.7.0"
tiny_http = "0.7.0"
serde = { version = "1.0", features = ["derive"] }

View File

@ -955,17 +955,13 @@ impl Drop for Participant {
impl Drop for Stream {
fn drop(&mut self) {
// send if closed is unnecessary but doesnt hurt, we must not crash
// send if closed is unnecessary but doesn't hurt, we must not crash
if !self.send_closed.load(Ordering::Relaxed) {
let sid = self.sid;
let pid = self.pid;
debug!(?pid, ?sid, "Shutting down Stream");
if task::block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid)).is_err() {
warn!(
"Other side got already dropped, probably due to timing, other side will \
handle this gracefully"
);
};
task::block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid))
.expect("bparticipant part of a gracefully shutdown must have crashed");
} else {
let sid = self.sid;
let pid = self.pid;

View File

@ -143,7 +143,6 @@ impl BParticipant {
let (shutdown_stream_close_mgr_sender, shutdown_stream_close_mgr_receiver) =
oneshot::channel();
let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel();
let (b2b_prios_flushed_s, b2b_prios_flushed_r) = oneshot::channel();
let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<C2pFrame>();
let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = PrioManager::new(
#[cfg(feature = "metrics")]
@ -166,12 +165,7 @@ impl BParticipant {
a2p_msg_s.clone(),
),
self.create_channel_mgr(run_channels.s2b_create_channel_r, w2b_frames_s),
self.send_mgr(
prios,
shutdown_send_mgr_receiver,
b2b_prios_flushed_s,
b2s_prio_statistic_s
),
self.send_mgr(prios, shutdown_send_mgr_receiver, b2s_prio_statistic_s),
self.stream_close_mgr(
run_channels.a2b_close_stream_r,
shutdown_stream_close_mgr_receiver,
@ -179,12 +173,9 @@ impl BParticipant {
),
self.participant_shutdown_mgr(
run_channels.s2b_shutdown_bparticipant_r,
b2b_prios_flushed_r,
vec![
shutdown_send_mgr_sender,
shutdown_open_mgr_sender,
shutdown_stream_close_mgr_sender,
],
shutdown_open_mgr_sender,
shutdown_stream_close_mgr_sender,
shutdown_send_mgr_sender,
),
);
}
@ -192,8 +183,7 @@ impl BParticipant {
async fn send_mgr(
&self,
mut prios: PrioManager,
mut shutdown_send_mgr_receiver: oneshot::Receiver<()>,
b2b_prios_flushed_s: oneshot::Sender<()>,
mut shutdown_send_mgr_receiver: oneshot::Receiver<oneshot::Sender<()>>,
mut b2s_prio_statistic_s: mpsc::UnboundedSender<B2sPrioStatistic>,
) {
//This time equals the MINIMUM Latency in average, so keep it down and //Todo:
@ -202,7 +192,7 @@ impl BParticipant {
const TICK_TIME: Duration = Duration::from_millis(10);
const FRAMES_PER_TICK: usize = 10005;
self.running_mgr.fetch_add(1, Ordering::Relaxed);
let mut closing_up = false;
let mut b2b_prios_flushed_s = None; //closing up
trace!("Start send_mgr");
#[cfg(feature = "metrics")]
let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone());
@ -229,20 +219,22 @@ impl BParticipant {
trace!("Did 1000 ticks");
}
//shutdown after all msg are send!
if closing_up && (len == 0) {
// Make sure this is called after the API is closed, and all streams are known
// to be droped to the priomgr
if b2b_prios_flushed_s.is_some() && (len == 0) {
break;
}
//this IF below the break IF to give it another chance to close all streams
// closed
if !closing_up && shutdown_send_mgr_receiver.try_recv().unwrap().is_some() {
closing_up = true;
//FIXME: quickfix for an error that we are WAITING on close confirmation of
// streams from prio manager while prio manager is already shut down.
async_std::task::sleep(TICK_TIME * 10).await;
if b2b_prios_flushed_s.is_none() {
if let Some(prios_flushed_s) = shutdown_send_mgr_receiver.try_recv().unwrap() {
b2b_prios_flushed_s = Some(prios_flushed_s);
}
}
}
trace!("Stop send_mgr");
b2b_prios_flushed_s.send(()).unwrap();
b2b_prios_flushed_s
.expect("b2b_prios_flushed_s not set")
.send(())
.unwrap();
self.running_mgr.fetch_sub(1, Ordering::Relaxed);
}
@ -321,6 +313,8 @@ impl BParticipant {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("Start handle_frames_mgr");
let mut messages = HashMap::new();
#[cfg(feature = "metrics")]
let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone());
let mut dropped_instant = Instant::now();
let mut dropped_cnt = 0u64;
let mut dropped_sid = Sid::new(0);
@ -371,33 +365,16 @@ impl BParticipant {
}
},
Frame::CloseStream { sid } => {
// Closing is realised by setting a AtomicBool to true, however we also have a
// guarantee that send or recv fails if the other one is destroyed
// However Stream.send() is not async and their receiver isn't dropped if Steam
// is dropped, so i need a way to notify the Stream that it's send messages will
// be dropped... from remote, notify local
trace!(
?sid,
"Got remote request to close a stream, without flushing it, local \
messages are dropped"
);
// no wait for flush here, as the remote wouldn't care anyway.
if let Some(si) = self.streams.write().await.remove(&sid) {
// no need to keep flushing as the remote no longer knows about this stream
// anyway
self.delete_stream(
sid,
None,
true,
#[cfg(feature = "metrics")]
self.metrics
.streams_closed_total
.with_label_values(&[&self.remote_pid_string])
.inc();
si.send_closed.store(true, Ordering::Relaxed);
si.b2a_msg_recv_s.into_inner().close_channel();
trace!(?sid, "Closed stream from remote");
} else {
warn!(
?sid,
"Couldn't find stream to close, either this is a duplicate message, \
or the local copy of the Stream got closed simultaniously"
);
}
&mut send_cache,
)
.await;
},
Frame::DataHeader { mid, sid, length } => {
let imsg = IncomingMessage {
@ -434,8 +411,9 @@ impl BParticipant {
} else {
//aggregate errors
let n = Instant::now();
if dropped_sid != imsg.sid
|| n.duration_since(dropped_instant) > Duration::from_secs(1)
if dropped_cnt > 0
&& (dropped_sid != imsg.sid
|| n.duration_since(dropped_instant) > Duration::from_secs(1))
{
warn!(
?dropped_cnt,
@ -613,8 +591,9 @@ impl BParticipant {
async fn participant_shutdown_mgr(
&self,
s2b_shutdown_bparticipant_r: oneshot::Receiver<S2bShutdownBparticipant>,
b2b_prios_flushed_r: oneshot::Receiver<()>,
mut mgr_to_shutdown: Vec<oneshot::Sender<()>>,
shutdown_open_mgr_sender: oneshot::Sender<()>,
shutdown_stream_close_mgr_sender: oneshot::Sender<oneshot::Sender<()>>,
shutdown_send_mgr_sender: oneshot::Sender<oneshot::Sender<()>>,
) {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("Start participant_shutdown_mgr");
@ -626,13 +605,27 @@ impl BParticipant {
self.close_api(None).await;
debug!("Closing all managers");
for sender in mgr_to_shutdown.drain(..) {
if let Err(e) = sender.send(()) {
warn!(?e, "Manager seems to be closed already, weird, maybe a bug");
};
}
shutdown_open_mgr_sender
.send(())
.expect("open_mgr must have crashed before");
let (b2b_stream_close_shutdown_confirmed_s, b2b_stream_close_shutdown_confirmed_r) =
oneshot::channel();
shutdown_stream_close_mgr_sender
.send(b2b_stream_close_shutdown_confirmed_s)
.expect("stream_close_mgr must have crashed before");
// We need to wait for the stream_close_mgr BEFORE send_mgr, as the
// stream_close_mgr needs to wait on the API to drop `Stream` and be triggered
// It will then sleep for streams to be flushed in PRIO, and send_mgr is
// responsible for ticking PRIO WHILE this happens, so we cant close it before!
b2b_stream_close_shutdown_confirmed_r.await.unwrap();
//closing send_mgr now:
let (b2b_prios_flushed_s, b2b_prios_flushed_r) = oneshot::channel();
shutdown_send_mgr_sender
.send(b2b_prios_flushed_s)
.expect("stream_close_mgr must have crashed before");
b2b_prios_flushed_r.await.unwrap();
if Some(ParticipantError::ParticipantDisconnected) != self.shutdown_info.read().await.error
{
debug!("Sending shutdown frame after flushed all prios");
@ -701,7 +694,7 @@ impl BParticipant {
async fn stream_close_mgr(
&self,
mut a2b_close_stream_r: mpsc::UnboundedReceiver<Sid>,
shutdown_stream_close_mgr_receiver: oneshot::Receiver<()>,
shutdown_stream_close_mgr_receiver: oneshot::Receiver<oneshot::Sender<()>>,
b2p_notify_empty_stream_s: crossbeam_channel::Sender<(Sid, oneshot::Sender<()>)>,
) {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
@ -709,59 +702,115 @@ impl BParticipant {
#[cfg(feature = "metrics")]
let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone());
let mut shutdown_stream_close_mgr_receiver = shutdown_stream_close_mgr_receiver.fuse();
let mut b2b_stream_close_shutdown_confirmed_s = None;
//from api or shutdown signal
while let Some(sid) = select! {
next = a2b_close_stream_r.next().fuse() => next,
_ = shutdown_stream_close_mgr_receiver => None,
sender = shutdown_stream_close_mgr_receiver => {
b2b_stream_close_shutdown_confirmed_s = Some(sender.unwrap());
None
}
} {
//TODO: make this concurrent!
//TODO: Performance, closing is slow!
trace!(?sid, "Got request from api to close steam");
//This needs to first stop clients from sending any more.
//Then it will wait for all pending messages (in prio) to be send to the
// protocol After this happened the stream is closed
//Only after all messages are send to the prococol, we can send the CloseStream
// frame! If we would send it before, all followup messages couldn't
// be handled at the remote side.
trace!(?sid, "Stopping api to use this stream");
match self.streams.read().await.get(&sid) {
Some(si) => {
si.send_closed.store(true, Ordering::Relaxed);
si.b2a_msg_recv_s.lock().await.close_channel();
},
None => warn!("Couldn't find the stream, might be simultaneous close from remote"),
}
//TODO: what happens if RIGHT NOW the remote sends a StreamClose and this
// streams get closed and removed? RACE CONDITION
trace!(?sid, "Wait for stream to be flushed");
let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) = oneshot::channel();
b2p_notify_empty_stream_s
.send((sid, s2b_stream_finished_closed_s))
.unwrap();
s2b_stream_finished_closed_r.await.unwrap();
trace!(?sid, "Stream was successfully flushed");
#[cfg(feature = "metrics")]
self.metrics
.streams_closed_total
.with_label_values(&[&self.remote_pid_string])
.inc();
//only now remove the Stream, that means we can still recv on it.
self.streams.write().await.remove(&sid);
self.send_frame(
Frame::CloseStream { sid },
self.delete_stream(
sid,
Some(b2p_notify_empty_stream_s.clone()),
false,
#[cfg(feature = "metrics")]
&mut send_cache,
)
.await;
}
trace!("deleting all leftover streams");
let sids = self
.streams
.read()
.await
.keys()
.cloned()
.collect::<Vec<_>>();
for sid in sids {
//flushing is still important, e.g. when Participant::drop is called (but
// Stream:drop isn't)!
self.delete_stream(
sid,
Some(b2p_notify_empty_stream_s.clone()),
false,
#[cfg(feature = "metrics")]
&mut send_cache,
)
.await;
}
if b2b_stream_close_shutdown_confirmed_s.is_none() {
b2b_stream_close_shutdown_confirmed_s =
Some(shutdown_stream_close_mgr_receiver.await.unwrap());
}
b2b_stream_close_shutdown_confirmed_s
.unwrap()
.send(())
.unwrap();
trace!("Stop stream_close_mgr");
self.running_mgr.fetch_sub(1, Ordering::Relaxed);
}
async fn delete_stream(
&self,
sid: Sid,
b2p_notify_empty_stream_s: Option<crossbeam_channel::Sender<(Sid, oneshot::Sender<()>)>>,
from_remote: bool,
#[cfg(feature = "metrics")] frames_out_total_cache: &mut MultiCidFrameCache,
) {
let span = span!(Level::INFO, "delete_stream", ?sid, ?from_remote);
let _enter = span.enter();
//This needs to first stop clients from sending any more.
//Then it will wait for all pending messages (in prio) to be send to the
// protocol After this happened the stream is closed
//Only after all messages are send to the protocol, we can send the CloseStream
// frame! If we would send it before, all followup messages couldn't
// be handled at the remote side.
trace!("Stopping api to use this stream");
match self.streams.read().await.get(&sid) {
Some(si) => {
si.send_closed.store(true, Ordering::Relaxed);
si.b2a_msg_recv_s.lock().await.close_channel();
},
None => {
trace!("Couldn't find the stream, might be simultaneous close from local/remote")
},
}
if !from_remote {
trace!("Wait for stream to be flushed");
let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) = oneshot::channel();
b2p_notify_empty_stream_s
.expect("needs to be set when from_remote is false")
.send((sid, s2b_stream_finished_closed_s))
.unwrap();
s2b_stream_finished_closed_r.await.unwrap();
trace!("Stream was successfully flushed");
}
#[cfg(feature = "metrics")]
self.metrics
.streams_closed_total
.with_label_values(&[&self.remote_pid_string])
.inc();
//only now remove the Stream, that means we can still recv on it.
self.streams.write().await.remove(&sid);
if !from_remote {
self.send_frame(
Frame::CloseStream { sid },
#[cfg(feature = "metrics")]
frames_out_total_cache,
)
.await;
}
}
async fn create_stream(
&self,
sid: Sid,
@ -815,7 +864,7 @@ impl BParticipant {
async fn close_api(&self, reason: Option<ParticipantError>) {
self.close_write_api(reason).await;
debug!("Closing all streams");
for (sid, si) in self.streams.write().await.drain() {
for (sid, si) in self.streams.read().await.iter() {
trace!(?sid, "Shutting down Stream");
si.b2a_msg_recv_s.lock().await.close_channel();
}