fix main thread blocking which was a bad combination of

- a channel was stale and wasn't shut down propertly AS WELL AS
 - the msg ingoing pipe was bounded, so it could fill up

To mitigate this we
 a) unbounded the pipe
 b) stoped spam the log in no channel case
 c) instead of ever reaching "no channel" state we actually shutdown participant
 d) when send_mgr is closed it will no longer be able to SEND on streams
This commit is contained in:
Marcel Märtens 2021-02-18 19:33:20 +01:00
parent c6d69d1196
commit 5a48bffcb0

View File

@ -57,6 +57,12 @@ struct ShutdownInfo {
error: Option<ParticipantError>,
}
#[derive(Debug)]
struct OpenStreamInfo {
a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
}
#[derive(Debug)]
pub struct BParticipant {
local_pid: Pid, //tracing
@ -69,6 +75,7 @@ pub struct BParticipant {
shutdown_barrier: AtomicI32,
metrics: Arc<NetworkMetrics>,
no_channel_error_info: RwLock<(Instant, u64)>,
open_stream_channels: Arc<Mutex<Option<OpenStreamInfo>>>,
}
impl BParticipant {
@ -118,6 +125,7 @@ impl BParticipant {
run_channels,
metrics,
no_channel_error_info: RwLock::new((Instant::now(), 0)),
open_stream_channels: Arc::new(Mutex::new(None)),
},
a2b_open_stream_s,
b2a_stream_opened_r,
@ -139,9 +147,12 @@ impl BParticipant {
crossbeam_channel::unbounded::<ProtocolEvent>();
let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded_channel::<Sid>();
const STREAM_BOUND: usize = 10_000;
let (a2b_msg_s, a2b_msg_r) = crossbeam_channel::bounded::<(Sid, Bytes)>(STREAM_BOUND);
let (a2b_msg_s, a2b_msg_r) = crossbeam_channel::unbounded::<(Sid, Bytes)>();
*self.open_stream_channels.lock().await = Some(OpenStreamInfo {
a2b_msg_s,
a2b_close_stream_s,
});
let run_channels = self.run_channels.take().unwrap();
trace!("start all managers");
tokio::join!(
@ -153,8 +164,6 @@ impl BParticipant {
b2b_close_send_protocol_r,
b2b_notify_send_of_recv_r,
b2s_prio_statistic_s,
a2b_msg_s.clone(), //self
a2b_close_stream_s.clone(), //self
)
.instrument(tracing::info_span!("send")),
self.recv_mgr(
@ -163,8 +172,6 @@ impl BParticipant {
b2b_force_close_recv_protocol_r,
b2b_close_send_protocol_s.clone(),
b2b_notify_send_of_recv_s,
a2b_msg_s.clone(), //self
a2b_close_stream_s.clone(), //self
)
.instrument(tracing::info_span!("recv")),
self.create_channel_mgr(
@ -191,8 +198,6 @@ impl BParticipant {
b2b_close_send_protocol_r: async_channel::Receiver<Cid>,
b2b_notify_send_of_recv_r: crossbeam_channel::Receiver<ProtocolEvent>,
_b2s_prio_statistic_s: mpsc::UnboundedSender<B2sPrioStatistic>,
a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
) {
let mut send_protocols: HashMap<Cid, SendProtocols> = HashMap::new();
let mut interval = tokio::time::interval(Self::TICK_TIME);
@ -222,6 +227,7 @@ impl BParticipant {
Some((cid, a)) => (*cid, a),
None => {
warn!("no channel");
tokio::time::sleep(Self::TICK_TIME * 1000).await; //TODO: failover
continue;
},
};
@ -232,14 +238,7 @@ impl BParticipant {
trace!(?sid, "open stream");
stream_ids += Sid::from(1);
let stream = self
.create_stream(
sid,
prio,
promises,
guaranteed_bandwidth,
&a2b_msg_s,
&a2b_close_stream_s,
)
.create_stream(sid, prio, promises, guaranteed_bandwidth)
.await;
let event = ProtocolEvent::OpenStream {
@ -305,8 +304,12 @@ impl BParticipant {
info!(?cid, ?e, "protocol failed, shutting down channel");
// remote recv will now fail, which will trigger remote send which will trigger
// recv
trace!("TODO: for now decide to FAIL this participant and not wait for a failover");
send_protocols.remove(&cid).unwrap();
self.metrics.channels_disconnected(&self.remote_pid_string);
if send_protocols.is_empty() {
break;
}
}
if let Some(cid) = remp {
@ -326,6 +329,8 @@ impl BParticipant {
}
}
}
trace!("stop sending in api!");
self.open_stream_channels.lock().await.take();
trace!("Stop send_mgr");
self.shutdown_barrier
.fetch_sub(Self::BARR_SEND, Ordering::Relaxed);
@ -339,8 +344,6 @@ impl BParticipant {
b2b_force_close_recv_protocol_r: async_channel::Receiver<Cid>,
b2b_close_send_protocol_s: async_channel::Sender<Cid>,
b2b_notify_send_of_recv_s: crossbeam_channel::Sender<ProtocolEvent>,
a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
) {
let mut recv_protocols: HashMap<Cid, JoinHandle<()>> = HashMap::new();
// we should be able to directly await futures imo
@ -402,14 +405,7 @@ impl BParticipant {
// waiting for receiving is not necessary, because the send_mgr will first
// process this before process messages!
let stream = self
.create_stream(
sid,
prio,
promises,
guaranteed_bandwidth,
&a2b_msg_s,
&a2b_close_stream_s,
)
.create_stream(sid, prio, promises, guaranteed_bandwidth)
.await;
b2a_stream_opened_s.send(stream).unwrap();
retrigger(cid, p, &mut recv_protocols);
@ -625,8 +621,6 @@ impl BParticipant {
prio: Prio,
promises: Promises,
guaranteed_bandwidth: Bandwidth,
a2b_msg_s: &crossbeam_channel::Sender<(Sid, Bytes)>,
a2b_close_stream_s: &mpsc::UnboundedSender<Sid>,
) -> Stream {
let (b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded::<Bytes>();
let send_closed = Arc::new(AtomicBool::new(false));
@ -637,6 +631,24 @@ impl BParticipant {
b2a_msg_recv_s: Mutex::new(b2a_msg_recv_s),
});
self.metrics.streams_opened(&self.remote_pid_string);
let (a2b_msg_s, a2b_close_stream_s) = {
let lock = self.open_stream_channels.lock().await;
match &*lock {
Some(osi) => (osi.a2b_msg_s.clone(), osi.a2b_close_stream_s.clone()),
None => {
// This Stream will not be able to send. feed it some "Dummy" Channels.
debug!(
"It seems that a stream was requested to open, while the send_mgr is \
already closed"
);
let (a2b_msg_s, _) = crossbeam_channel::unbounded();
let (a2b_close_stream_s, _) = mpsc::unbounded_channel();
(a2b_msg_s, a2b_close_stream_s)
},
}
};
Stream::new(
self.local_pid,
self.remote_pid,
@ -645,9 +657,9 @@ impl BParticipant {
promises,
guaranteed_bandwidth,
send_closed,
a2b_msg_s.clone(),
a2b_msg_s,
b2a_msg_recv_r,
a2b_close_stream_s.clone(),
a2b_close_stream_s,
)
}
}