diff --git a/network/src/participant.rs b/network/src/participant.rs index c7d6a9b64d..bcbc219c5c 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -57,6 +57,12 @@ struct ShutdownInfo { error: Option, } +#[derive(Debug)] +struct OpenStreamInfo { + a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>, + a2b_close_stream_s: mpsc::UnboundedSender, +} + #[derive(Debug)] pub struct BParticipant { local_pid: Pid, //tracing @@ -69,6 +75,7 @@ pub struct BParticipant { shutdown_barrier: AtomicI32, metrics: Arc, no_channel_error_info: RwLock<(Instant, u64)>, + open_stream_channels: Arc>>, } impl BParticipant { @@ -118,6 +125,7 @@ impl BParticipant { run_channels, metrics, no_channel_error_info: RwLock::new((Instant::now(), 0)), + open_stream_channels: Arc::new(Mutex::new(None)), }, a2b_open_stream_s, b2a_stream_opened_r, @@ -139,9 +147,12 @@ impl BParticipant { crossbeam_channel::unbounded::(); let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded_channel::(); - const STREAM_BOUND: usize = 10_000; - let (a2b_msg_s, a2b_msg_r) = crossbeam_channel::bounded::<(Sid, Bytes)>(STREAM_BOUND); + let (a2b_msg_s, a2b_msg_r) = crossbeam_channel::unbounded::<(Sid, Bytes)>(); + *self.open_stream_channels.lock().await = Some(OpenStreamInfo { + a2b_msg_s, + a2b_close_stream_s, + }); let run_channels = self.run_channels.take().unwrap(); trace!("start all managers"); tokio::join!( @@ -153,8 +164,6 @@ impl BParticipant { b2b_close_send_protocol_r, b2b_notify_send_of_recv_r, b2s_prio_statistic_s, - a2b_msg_s.clone(), //self - a2b_close_stream_s.clone(), //self ) .instrument(tracing::info_span!("send")), self.recv_mgr( @@ -163,8 +172,6 @@ impl BParticipant { b2b_force_close_recv_protocol_r, b2b_close_send_protocol_s.clone(), b2b_notify_send_of_recv_s, - a2b_msg_s.clone(), //self - a2b_close_stream_s.clone(), //self ) .instrument(tracing::info_span!("recv")), self.create_channel_mgr( @@ -191,8 +198,6 @@ impl BParticipant { b2b_close_send_protocol_r: async_channel::Receiver, b2b_notify_send_of_recv_r: crossbeam_channel::Receiver, _b2s_prio_statistic_s: mpsc::UnboundedSender, - a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>, - a2b_close_stream_s: mpsc::UnboundedSender, ) { let mut send_protocols: HashMap = HashMap::new(); let mut interval = tokio::time::interval(Self::TICK_TIME); @@ -222,6 +227,7 @@ impl BParticipant { Some((cid, a)) => (*cid, a), None => { warn!("no channel"); + tokio::time::sleep(Self::TICK_TIME * 1000).await; //TODO: failover continue; }, }; @@ -232,14 +238,7 @@ impl BParticipant { trace!(?sid, "open stream"); stream_ids += Sid::from(1); let stream = self - .create_stream( - sid, - prio, - promises, - guaranteed_bandwidth, - &a2b_msg_s, - &a2b_close_stream_s, - ) + .create_stream(sid, prio, promises, guaranteed_bandwidth) .await; let event = ProtocolEvent::OpenStream { @@ -305,8 +304,12 @@ impl BParticipant { info!(?cid, ?e, "protocol failed, shutting down channel"); // remote recv will now fail, which will trigger remote send which will trigger // recv + trace!("TODO: for now decide to FAIL this participant and not wait for a failover"); send_protocols.remove(&cid).unwrap(); self.metrics.channels_disconnected(&self.remote_pid_string); + if send_protocols.is_empty() { + break; + } } if let Some(cid) = remp { @@ -326,6 +329,8 @@ impl BParticipant { } } } + trace!("stop sending in api!"); + self.open_stream_channels.lock().await.take(); trace!("Stop send_mgr"); self.shutdown_barrier .fetch_sub(Self::BARR_SEND, Ordering::Relaxed); @@ -339,8 +344,6 @@ impl BParticipant { b2b_force_close_recv_protocol_r: async_channel::Receiver, b2b_close_send_protocol_s: async_channel::Sender, b2b_notify_send_of_recv_s: crossbeam_channel::Sender, - a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>, - a2b_close_stream_s: mpsc::UnboundedSender, ) { let mut recv_protocols: HashMap> = HashMap::new(); // we should be able to directly await futures imo @@ -402,14 +405,7 @@ impl BParticipant { // waiting for receiving is not necessary, because the send_mgr will first // process this before process messages! let stream = self - .create_stream( - sid, - prio, - promises, - guaranteed_bandwidth, - &a2b_msg_s, - &a2b_close_stream_s, - ) + .create_stream(sid, prio, promises, guaranteed_bandwidth) .await; b2a_stream_opened_s.send(stream).unwrap(); retrigger(cid, p, &mut recv_protocols); @@ -625,8 +621,6 @@ impl BParticipant { prio: Prio, promises: Promises, guaranteed_bandwidth: Bandwidth, - a2b_msg_s: &crossbeam_channel::Sender<(Sid, Bytes)>, - a2b_close_stream_s: &mpsc::UnboundedSender, ) -> Stream { let (b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded::(); let send_closed = Arc::new(AtomicBool::new(false)); @@ -637,6 +631,24 @@ impl BParticipant { b2a_msg_recv_s: Mutex::new(b2a_msg_recv_s), }); self.metrics.streams_opened(&self.remote_pid_string); + + let (a2b_msg_s, a2b_close_stream_s) = { + let lock = self.open_stream_channels.lock().await; + match &*lock { + Some(osi) => (osi.a2b_msg_s.clone(), osi.a2b_close_stream_s.clone()), + None => { + // This Stream will not be able to send. feed it some "Dummy" Channels. + debug!( + "It seems that a stream was requested to open, while the send_mgr is \ + already closed" + ); + let (a2b_msg_s, _) = crossbeam_channel::unbounded(); + let (a2b_close_stream_s, _) = mpsc::unbounded_channel(); + (a2b_msg_s, a2b_close_stream_s) + }, + } + }; + Stream::new( self.local_pid, self.remote_pid, @@ -645,9 +657,9 @@ impl BParticipant { promises, guaranteed_bandwidth, send_closed, - a2b_msg_s.clone(), + a2b_msg_s, b2a_msg_recv_r, - a2b_close_stream_s.clone(), + a2b_close_stream_s, ) } }