protocols no longer send a Close Frame in case the read fails. They just fail, let participant handle this!

Participant will now handle a close in the `create_channel_mgr` rather then the `send` fn. Its the better place, which makes a HashMap better for delete lookup
Since tcp_read now broke but tcp_write didn't and the Participant wasnt updated till both were broke, we changed CHANNEL tcp_read and tcp_write in protocols to be a `select` rather than a `join`
However only do this in the CHANNEL, but in the HANDSHAKE. it fails if you try to. Also the handshake will take care of any failed read or write manually and will handle a clear teardown in this case.
This commit is contained in:
Marcel Märtens 2020-08-18 17:52:19 +02:00
parent b59fc2ff0c
commit 12b46250f5
5 changed files with 59 additions and 44 deletions

View File

@ -9,7 +9,7 @@ use crate::{
};
use futures::{
channel::{mpsc, oneshot},
join,
join, select,
sink::SinkExt,
stream::StreamExt,
FutureExt,
@ -58,15 +58,15 @@ impl Channel {
trace!(?self.cid, "Start up channel");
match protocol {
Protocols::Tcp(tcp) => {
futures::join!(
tcp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver),
tcp.write_to_wire(self.cid, c2w_frame_r),
select!(
_ = tcp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver).fuse() => (),
_ = tcp.write_to_wire(self.cid, c2w_frame_r).fuse() => (),
);
},
Protocols::Udp(udp) => {
futures::join!(
udp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver),
udp.write_to_wire(self.cid, c2w_frame_r),
select!(
_ = udp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver).fuse() => (),
_ = udp.write_to_wire(self.cid, c2w_frame_r).fuse() => (),
);
},
}

View File

@ -70,7 +70,7 @@ pub struct BParticipant {
remote_pid: Pid,
remote_pid_string: String, //optimisation
offset_sid: Sid,
channels: Arc<RwLock<Vec<ChannelInfo>>>,
channels: Arc<RwLock<HashMap<Cid, ChannelInfo>>>,
streams: RwLock<HashMap<Sid, StreamInfo>>,
running_mgr: AtomicUsize,
run_channels: Option<ControlChannels>,
@ -119,7 +119,7 @@ impl BParticipant {
remote_pid,
remote_pid_string: remote_pid.to_string(),
offset_sid,
channels: Arc::new(RwLock::new(vec![])),
channels: Arc::new(RwLock::new(HashMap::new())),
streams: RwLock::new(HashMap::new()),
running_mgr: AtomicUsize::new(0),
run_channels,
@ -252,27 +252,21 @@ impl BParticipant {
// find out ideal channel here
//TODO: just take first
let mut lock = self.channels.write().await;
if let Some(ci) = lock.get_mut(0) {
//note: this is technically wrong we should only increase when it suceeded, but
// this requiered me to clone `frame` which is a to big performance impact for
// error handling
if let Some(ci) = lock.values_mut().next() {
//note: this is technically wrong we should only increase when it succeeded,
// but this requiered me to clone `frame` which is a to big
// performance impact for error handling
#[cfg(feature = "metrics")]
frames_out_total_cache
.with_label_values(ci.cid, &frame)
.inc();
if let Err(e) = ci.b2w_frame_s.send(frame).await {
let cid = ci.cid;
warn!(
?e,
"The channel got closed unexpectedly, cleaning it up now."
?cid,
"channel no longer available, maybe cleanup in process?"
);
let ci = lock.remove(0);
if let Err(e) = ci.b2r_read_shutdown.send(()) {
debug!(
?e,
"Error shutdowning channel, which is prob fine as we detected it to no \
longer work in the first place"
);
};
//TODO FIXME tags: takeover channel multiple
info!(
"FIXME: the frame is actually drop. which is fine for now as the participant \
@ -292,10 +286,18 @@ impl BParticipant {
guard.0 = now;
let occurrences = guard.1 + 1;
guard.1 = 0;
error!(?occurrences, "Participant has no channel to communicate on");
let lastframe = frame;
error!(
?occurrences,
?lastframe,
"Participant has no channel to communicate on"
);
} else {
guard.1 += 1;
}
//TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant
self.close_api(Some(ParticipantError::ProtocolFailedUnrecoverable))
.await;
false
}
}
@ -463,7 +465,7 @@ impl BParticipant {
let channels = self.channels.clone();
async move {
let (channel, b2w_frame_s, b2r_read_shutdown) = Channel::new(cid);
channels.write().await.push(ChannelInfo {
channels.write().await.insert(cid, ChannelInfo {
cid,
cid_string: cid.to_string(),
b2w_frame_s,
@ -485,7 +487,17 @@ impl BParticipant {
.channels_disconnected_total
.with_label_values(&[&self.remote_pid_string])
.inc();
trace!(?cid, "Channel got closed");
info!(?cid, "Channel got closed, cleaning up");
//stopping read in case write triggered the failure
if let Some(ci) = channels.write().await.remove(&cid) {
if let Err(e) = ci.b2r_read_shutdown.send(()) {
debug!(?e, "read channel was already shut down");
};
} //None means it prob got closed by closing the participant
trace!(?cid, "Channel cleanup completed");
//TEMP FIX: as we dont have channel takeover yet drop the whole
// bParticipant
self.close_api(None).await;
}
},
)
@ -575,9 +587,14 @@ impl BParticipant {
b2b_prios_flushed_r.await.unwrap();
debug!("Closing all channels, after flushed prios");
for ci in self.channels.write().await.drain(..) {
for (cid, ci) in self.channels.write().await.drain() {
if let Err(e) = ci.b2r_read_shutdown.send(()) {
debug!(?e, ?ci.cid, "Seems like this read protocol got already dropped by closing the Stream itself, just ignoring the fact");
debug!(
?e,
?cid,
"Seems like this read protocol got already dropped by closing the Stream \
itself, ignoring"
);
};
}
@ -649,7 +666,7 @@ impl BParticipant {
si.send_closed.store(true, Ordering::Relaxed);
si.b2a_msg_recv_s.close_channel();
},
None => warn!("Couldn't find the stream, might be simulanious close from remote"),
None => warn!("Couldn't find the stream, might be simultaneous close from remote"),
}
//TODO: what happens if RIGHT NOW the remote sends a StreamClose and this

View File

@ -68,10 +68,8 @@ impl TcpProtocol {
/// read_except and if it fails, close the protocol
async fn read_or_close(
cid: Cid,
mut stream: &TcpStream,
mut bytes: &mut [u8],
w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>,
mut end_receiver: &mut Fuse<oneshot::Receiver<()>>,
) -> bool {
match select! {
@ -80,15 +78,7 @@ impl TcpProtocol {
} {
Some(Ok(_)) => false,
Some(Err(e)) => {
info!(
?e,
"Closing tcp protocol due to read error, pushing close frame (to own \
Channel/Participant) to gracefully shutdown"
);
w2c_cid_frame_s
.send((cid, Frame::Shutdown))
.await
.expect("Channel or Participant seems no longer to exist to be Shutdown");
info!(?e, "Closing tcp protocol due to read error");
true
},
None => {
@ -117,7 +107,7 @@ impl TcpProtocol {
macro_rules! read_or_close {
($x:expr) => {
if TcpProtocol::read_or_close(cid, &stream, $x, w2c_cid_frame_s, &mut end_r).await {
if TcpProtocol::read_or_close(&stream, $x, &mut end_r).await {
trace!("read_or_close requested a shutdown");
break;
}

View File

@ -621,6 +621,7 @@ impl Scheduler {
// move directly to participant!
},
Err(()) => {
debug!(?cid, "Handshake from a new connection failed");
if let Some(pid_oneshot) = s2a_return_pid_s {
// someone is waiting with `connect`, so give them their Error
trace!(?cid, "returning the Err to api who requested the connect");

View File

@ -42,10 +42,8 @@ fn close_participant() {
let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp()));
block_on(p1_a.disconnect()).unwrap();
assert_eq!(
block_on(p1_b.disconnect()),
Err(ParticipantError::ParticipantDisconnected)
);
//As no more read/write is done on p1_b the disconnect is successful
block_on(p1_b.disconnect()).unwrap();
assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed));
assert_eq!(
@ -227,6 +225,15 @@ fn close_network_then_disconnect_part() {
}
#[test]
/*
FLANKY:
---- opened_stream_before_remote_part_is_closed stdout ----
thread 'opened_stream_before_remote_part_is_closed' panicked at 'assertion failed: `(left == right)`
left: `Err(StreamClosed)`,
right: `Ok("HelloWorld")`', network/tests/closing.rs:236:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
*/
fn opened_stream_before_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp()));