fix a bug that some closes could get lost

This commit is contained in:
Marcel Märtens 2021-03-25 16:46:40 +01:00
parent 8dccc21125
commit 034d0f0c5d

View File

@ -136,8 +136,10 @@ impl BParticipant {
async_channel::unbounded::<Cid>();
let (b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_r) =
async_channel::unbounded::<Cid>();
let (b2b_notify_send_of_recv_s, b2b_notify_send_of_recv_r) =
crossbeam_channel::unbounded::<(Cid, ProtocolEvent)>();
let (b2b_notify_send_of_recv_open_s, b2b_notify_send_of_recv_open_r) =
crossbeam_channel::unbounded::<(Cid, Sid, Prio, Promises, u64)>();
let (b2b_notify_send_of_recv_close_s, b2b_notify_send_of_recv_close_r) =
crossbeam_channel::unbounded::<(Cid, Sid)>();
let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded_channel::<Sid>();
let (a2b_msg_s, a2b_msg_r) = crossbeam_channel::unbounded::<(Sid, Bytes)>();
@ -155,7 +157,8 @@ impl BParticipant {
a2b_msg_r,
b2b_add_send_protocol_r,
b2b_close_send_protocol_r,
b2b_notify_send_of_recv_r,
b2b_notify_send_of_recv_open_r,
b2b_notify_send_of_recv_close_r,
b2s_prio_statistic_s,
)
.instrument(tracing::info_span!("send")),
@ -164,7 +167,8 @@ impl BParticipant {
b2b_add_recv_protocol_r,
b2b_force_close_recv_protocol_r,
b2b_close_send_protocol_s.clone(),
b2b_notify_send_of_recv_s,
b2b_notify_send_of_recv_open_s,
b2b_notify_send_of_recv_close_s,
)
.instrument(tracing::info_span!("recv")),
self.create_channel_mgr(
@ -211,7 +215,14 @@ impl BParticipant {
a2b_msg_r: crossbeam_channel::Receiver<(Sid, Bytes)>,
mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, SendProtocols)>,
b2b_close_send_protocol_r: async_channel::Receiver<Cid>,
b2b_notify_send_of_recv_r: crossbeam_channel::Receiver<(Cid, ProtocolEvent)>,
b2b_notify_send_of_recv_open_r: crossbeam_channel::Receiver<(
Cid,
Sid,
Prio,
Promises,
Bandwidth,
)>,
b2b_notify_send_of_recv_close_r: crossbeam_channel::Receiver<(Cid, Sid)>,
_b2s_prio_statistic_s: mpsc::UnboundedSender<B2sPrioStatistic>,
) {
let mut sorted_send_protocols = SortedVec::<Cid, SendProtocols>::default();
@ -276,24 +287,22 @@ impl BParticipant {
}
// process recv content first
let mut closeevents = b2b_notify_send_of_recv_r
.try_iter()
.map(|(cid, e)| match e {
ProtocolEvent::OpenStream { sid, .. } => {
match sorted_send_protocols.get_mut(&cid) {
Some(p) => {
sorted_stream_protocols.insert(sid, cid);
p.notify_from_recv(e);
},
None => {
warn!(?cid, "couldn't notify create protocol, doesn't exist")
},
};
None
for (cid, sid, prio, promises, guaranteed_bandwidth) in
b2b_notify_send_of_recv_open_r.try_iter()
{
match sorted_send_protocols.get_mut(&cid) {
Some(p) => {
sorted_stream_protocols.insert(sid, cid);
p.notify_from_recv(ProtocolEvent::OpenStream {
sid,
prio,
promises,
guaranteed_bandwidth,
});
},
e => Some((cid, e)),
})
.collect::<Vec<_>>();
None => warn!(?cid, "couldn't notify create protocol, doesn't exist"),
};
}
// get all messages and assign it to a channel
for (sid, buffer) in a2b_msg_r.try_iter() {
@ -307,36 +316,30 @@ impl BParticipant {
}
// process recv content afterwards
//TODO: this might get skipped when a send msg fails on another channel in the
// previous line
let _ = closeevents.drain(..).map(|e| {
if let Some((cid, e)) = e {
match sorted_send_protocols.get_mut(&cid) {
Some(p) => {
if let ProtocolEvent::OpenStream { sid, .. } = e {
let _ = sorted_stream_protocols.delete(&sid);
p.notify_from_recv(e);
} else {
unreachable!("we dont send other over this channel");
}
},
None => warn!(?cid, "couldn't notify close protocol, doesn't exist"),
};
}
});
for (cid, sid) in b2b_notify_send_of_recv_close_r.try_iter() {
match sorted_send_protocols.get_mut(&cid) {
Some(p) => {
let _ = sorted_stream_protocols.delete(&sid);
p.notify_from_recv(ProtocolEvent::CloseStream { sid });
},
None => warn!(?cid, "couldn't notify close protocol, doesn't exist"),
};
}
if let Some(sid) = close {
trace!(?stream_ids, "delete stream");
self.delete_stream(sid).await;
// Fire&Forget the protocol will take care to verify that this Frame is delayed
// till the last msg was received!
cid = sorted_stream_protocols.delete(&sid).unwrap();
let event = ProtocolEvent::CloseStream { sid };
sorted_send_protocols
.get_mut(&cid)
.unwrap()
.send(event)
.await?;
if let Some(c) = sorted_stream_protocols.delete(&sid) {
cid = c;
let event = ProtocolEvent::CloseStream { sid };
sorted_send_protocols
.get_mut(&c)
.unwrap()
.send(event)
.await?;
}
}
let send_time = Instant::now();
@ -392,7 +395,14 @@ impl BParticipant {
mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, RecvProtocols)>,
b2b_force_close_recv_protocol_r: async_channel::Receiver<Cid>,
b2b_close_send_protocol_s: async_channel::Sender<Cid>,
b2b_notify_send_of_recv_s: crossbeam_channel::Sender<(Cid, ProtocolEvent)>,
b2b_notify_send_of_recv_open_r: crossbeam_channel::Sender<(
Cid,
Sid,
Prio,
Promises,
Bandwidth,
)>,
b2b_notify_send_of_recv_close_s: crossbeam_channel::Sender<(Cid, Sid)>,
) {
let mut recv_protocols: HashMap<Cid, JoinHandle<()>> = HashMap::new();
// we should be able to directly await futures imo
@ -452,7 +462,13 @@ impl BParticipant {
guaranteed_bandwidth,
}) => {
trace!(?sid, "open stream");
let _ = b2b_notify_send_of_recv_s.send((cid, r.unwrap()));
let _ = b2b_notify_send_of_recv_open_r.send((
cid,
sid,
prio,
promises,
guaranteed_bandwidth,
));
// waiting for receiving is not necessary, because the send_mgr will first
// process this before process messages!
let stream = self
@ -463,7 +479,7 @@ impl BParticipant {
},
Ok(ProtocolEvent::CloseStream { sid }) => {
trace!(?sid, "close stream");
let _ = b2b_notify_send_of_recv_s.send((cid, r.unwrap()));
let _ = b2b_notify_send_of_recv_close_s.send((cid, sid));
self.delete_stream(sid).await;
retrigger(cid, p, &mut recv_protocols);
},