From 5a48bffcb05738815cacc3ce844927c8b454650e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Thu, 18 Feb 2021 19:33:20 +0100 Subject: [PATCH] fix main thread blocking which was a bad combination of - a channel was stale and wasn't shut down propertly AS WELL AS - the msg ingoing pipe was bounded, so it could fill up To mitigate this we a) unbounded the pipe b) stoped spam the log in no channel case c) instead of ever reaching "no channel" state we actually shutdown participant d) when send_mgr is closed it will no longer be able to SEND on streams --- network/src/participant.rs | 72 ++++++++++++++++++++++---------------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/network/src/participant.rs b/network/src/participant.rs index c7d6a9b64d..bcbc219c5c 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -57,6 +57,12 @@ struct ShutdownInfo { error: Option, } +#[derive(Debug)] +struct OpenStreamInfo { + a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>, + a2b_close_stream_s: mpsc::UnboundedSender, +} + #[derive(Debug)] pub struct BParticipant { local_pid: Pid, //tracing @@ -69,6 +75,7 @@ pub struct BParticipant { shutdown_barrier: AtomicI32, metrics: Arc, no_channel_error_info: RwLock<(Instant, u64)>, + open_stream_channels: Arc>>, } impl BParticipant { @@ -118,6 +125,7 @@ impl BParticipant { run_channels, metrics, no_channel_error_info: RwLock::new((Instant::now(), 0)), + open_stream_channels: Arc::new(Mutex::new(None)), }, a2b_open_stream_s, b2a_stream_opened_r, @@ -139,9 +147,12 @@ impl BParticipant { crossbeam_channel::unbounded::(); let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded_channel::(); - const STREAM_BOUND: usize = 10_000; - let (a2b_msg_s, a2b_msg_r) = crossbeam_channel::bounded::<(Sid, Bytes)>(STREAM_BOUND); + let (a2b_msg_s, a2b_msg_r) = crossbeam_channel::unbounded::<(Sid, Bytes)>(); + *self.open_stream_channels.lock().await = Some(OpenStreamInfo { + a2b_msg_s, + a2b_close_stream_s, + }); let run_channels = self.run_channels.take().unwrap(); trace!("start all managers"); tokio::join!( @@ -153,8 +164,6 @@ impl BParticipant { b2b_close_send_protocol_r, b2b_notify_send_of_recv_r, b2s_prio_statistic_s, - a2b_msg_s.clone(), //self - a2b_close_stream_s.clone(), //self ) .instrument(tracing::info_span!("send")), self.recv_mgr( @@ -163,8 +172,6 @@ impl BParticipant { b2b_force_close_recv_protocol_r, b2b_close_send_protocol_s.clone(), b2b_notify_send_of_recv_s, - a2b_msg_s.clone(), //self - a2b_close_stream_s.clone(), //self ) .instrument(tracing::info_span!("recv")), self.create_channel_mgr( @@ -191,8 +198,6 @@ impl BParticipant { b2b_close_send_protocol_r: async_channel::Receiver, b2b_notify_send_of_recv_r: crossbeam_channel::Receiver, _b2s_prio_statistic_s: mpsc::UnboundedSender, - a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>, - a2b_close_stream_s: mpsc::UnboundedSender, ) { let mut send_protocols: HashMap = HashMap::new(); let mut interval = tokio::time::interval(Self::TICK_TIME); @@ -222,6 +227,7 @@ impl BParticipant { Some((cid, a)) => (*cid, a), None => { warn!("no channel"); + tokio::time::sleep(Self::TICK_TIME * 1000).await; //TODO: failover continue; }, }; @@ -232,14 +238,7 @@ impl BParticipant { trace!(?sid, "open stream"); stream_ids += Sid::from(1); let stream = self - .create_stream( - sid, - prio, - promises, - guaranteed_bandwidth, - &a2b_msg_s, - &a2b_close_stream_s, - ) + .create_stream(sid, prio, promises, guaranteed_bandwidth) .await; let event = ProtocolEvent::OpenStream { @@ -305,8 +304,12 @@ impl BParticipant { info!(?cid, ?e, "protocol failed, shutting down channel"); // remote recv will now fail, which will trigger remote send which will trigger // recv + trace!("TODO: for now decide to FAIL this participant and not wait for a failover"); send_protocols.remove(&cid).unwrap(); self.metrics.channels_disconnected(&self.remote_pid_string); + if send_protocols.is_empty() { + break; + } } if let Some(cid) = remp { @@ -326,6 +329,8 @@ impl BParticipant { } } } + trace!("stop sending in api!"); + self.open_stream_channels.lock().await.take(); trace!("Stop send_mgr"); self.shutdown_barrier .fetch_sub(Self::BARR_SEND, Ordering::Relaxed); @@ -339,8 +344,6 @@ impl BParticipant { b2b_force_close_recv_protocol_r: async_channel::Receiver, b2b_close_send_protocol_s: async_channel::Sender, b2b_notify_send_of_recv_s: crossbeam_channel::Sender, - a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>, - a2b_close_stream_s: mpsc::UnboundedSender, ) { let mut recv_protocols: HashMap> = HashMap::new(); // we should be able to directly await futures imo @@ -402,14 +405,7 @@ impl BParticipant { // waiting for receiving is not necessary, because the send_mgr will first // process this before process messages! let stream = self - .create_stream( - sid, - prio, - promises, - guaranteed_bandwidth, - &a2b_msg_s, - &a2b_close_stream_s, - ) + .create_stream(sid, prio, promises, guaranteed_bandwidth) .await; b2a_stream_opened_s.send(stream).unwrap(); retrigger(cid, p, &mut recv_protocols); @@ -625,8 +621,6 @@ impl BParticipant { prio: Prio, promises: Promises, guaranteed_bandwidth: Bandwidth, - a2b_msg_s: &crossbeam_channel::Sender<(Sid, Bytes)>, - a2b_close_stream_s: &mpsc::UnboundedSender, ) -> Stream { let (b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded::(); let send_closed = Arc::new(AtomicBool::new(false)); @@ -637,6 +631,24 @@ impl BParticipant { b2a_msg_recv_s: Mutex::new(b2a_msg_recv_s), }); self.metrics.streams_opened(&self.remote_pid_string); + + let (a2b_msg_s, a2b_close_stream_s) = { + let lock = self.open_stream_channels.lock().await; + match &*lock { + Some(osi) => (osi.a2b_msg_s.clone(), osi.a2b_close_stream_s.clone()), + None => { + // This Stream will not be able to send. feed it some "Dummy" Channels. + debug!( + "It seems that a stream was requested to open, while the send_mgr is \ + already closed" + ); + let (a2b_msg_s, _) = crossbeam_channel::unbounded(); + let (a2b_close_stream_s, _) = mpsc::unbounded_channel(); + (a2b_msg_s, a2b_close_stream_s) + }, + } + }; + Stream::new( self.local_pid, self.remote_pid, @@ -645,9 +657,9 @@ impl BParticipant { promises, guaranteed_bandwidth, send_closed, - a2b_msg_s.clone(), + a2b_msg_s, b2a_msg_recv_r, - a2b_close_stream_s.clone(), + a2b_close_stream_s, ) } }