diff --git a/network/src/participant.rs b/network/src/participant.rs index 3882a2f5e7..504086fdd7 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -455,8 +455,10 @@ impl BParticipant { .await; }, f => { - //unreachable!("Frame should never reache participant!: {:?}", f); - error!(?f, ?cid, "Frame should never reache participant!"); + unreachable!( + "Frame should never reach participant!: {:?}, cid: {}", + f, cid + ); }, } } diff --git a/network/src/protocols.rs b/network/src/protocols.rs index 65541d4dc1..f26e2572f7 100644 --- a/network/src/protocols.rs +++ b/network/src/protocols.rs @@ -2,12 +2,13 @@ use crate::metrics::{CidFrameCache, NetworkMetrics}; use crate::{ participant::C2pFrame, - types::{Cid, Frame, Mid, Pid, Sid}, + types::{Cid, Frame}, }; use async_std::{ + io::prelude::*, net::{TcpStream, UdpSocket}, - prelude::*, }; + use futures::{ channel::{mpsc, oneshot}, future::{Fuse, FutureExt}, @@ -69,33 +70,85 @@ impl TcpProtocol { } } - /// read_except and if it fails, close the protocol - async fn read_or_close( - cid: Cid, - mut stream: &TcpStream, - mut bytes: &mut [u8], + async fn read_frame( + r: &mut R, mut end_receiver: &mut Fuse>, - w2c_cid_frame_s: &mut mpsc::UnboundedSender, - ) -> bool { + ) -> Result> { + let handle = |read_result| match read_result { + Ok(_) => Ok(()), + Err(e) => Err(Some(e)), + }; + + let mut frame_no = [0u8; 1]; match select! { - r = stream.read_exact(&mut bytes).fuse() => Some(r), + r = r.read_exact(&mut frame_no).fuse() => Some(r), _ = end_receiver => None, } { - Some(Ok(_)) => false, - Some(Err(e)) => { - info!(?e, "Closing tcp protocol due to read error"); - //w2c_cid_frame_s is shared, dropping it wouldn't notify the receiver as every - // channel is holding a sender! thats why Ne need a explicit - // STOP here - w2c_cid_frame_s - .send((cid, Err(()))) - .await - .expect("Channel or Participant seems no longer to exist"); - true - }, + Some(read_result) => handle(read_result)?, None => { trace!("shutdown requested"); - true + return Err(None); + }, + }; + + match frame_no[0] { + FRAME_HANDSHAKE => { + let mut bytes = [0u8; 19]; + handle(r.read_exact(&mut bytes).await)?; + Ok(Frame::gen_handshake(bytes)) + }, + FRAME_INIT => { + let mut bytes = [0u8; 32]; + handle(r.read_exact(&mut bytes).await)?; + Ok(Frame::gen_init(bytes)) + }, + FRAME_SHUTDOWN => Ok(Frame::Shutdown), + FRAME_OPEN_STREAM => { + let mut bytes = [0u8; 10]; + handle(r.read_exact(&mut bytes).await)?; + Ok(Frame::gen_open_stream(bytes)) + }, + FRAME_CLOSE_STREAM => { + let mut bytes = [0u8; 8]; + handle(r.read_exact(&mut bytes).await)?; + Ok(Frame::gen_close_stream(bytes)) + }, + FRAME_DATA_HEADER => { + let mut bytes = [0u8; 24]; + handle(r.read_exact(&mut bytes).await)?; + Ok(Frame::gen_data_header(bytes)) + }, + FRAME_DATA => { + let mut bytes = [0u8; 18]; + handle(r.read_exact(&mut bytes).await)?; + let (mid, start, length) = Frame::gen_data(bytes); + let mut data = vec![0; length as usize]; + handle(r.read_exact(&mut data).await)?; + Ok(Frame::Data { mid, start, data }) + }, + FRAME_RAW => { + let mut bytes = [0u8; 2]; + handle(r.read_exact(&mut bytes).await)?; + let length = Frame::gen_raw(bytes); + let mut data = vec![0; length as usize]; + handle(r.read_exact(&mut data).await)?; + Ok(Frame::Raw(data)) + }, + other => { + // report a RAW frame, but cannot rely on the next 2 bytes to be a size. + // guessing 32 bytes, which might help to sort down issues + let mut data = vec![0; 32]; + //keep the first byte! + match r.read(&mut data[1..]).await { + Ok(n) => { + data.truncate(n + 1); + Ok(()) + }, + Err(e) => Err(Some(e)), + }?; + data[0] = other; + warn!(?data, "got a unexpected RAW msg"); + Ok(Frame::Raw(data)) }, } } @@ -114,131 +167,105 @@ impl TcpProtocol { .metrics .wire_in_throughput .with_label_values(&[&cid.to_string()]); - let stream = self.stream.clone(); + let mut stream = self.stream.clone(); let mut end_r = end_r.fuse(); - macro_rules! read_or_close { - ($x:expr) => { - if TcpProtocol::read_or_close(cid, &stream, $x, &mut end_r, w2c_cid_frame_s).await { - trace!("read_or_close requested a shutdown"); - break; - } - }; - } - loop { - let frame_no = { - let mut bytes = [0u8; 1]; - read_or_close!(&mut bytes); - bytes[0] - }; - let frame = match frame_no { - FRAME_HANDSHAKE => { - let mut bytes = [0u8; 19]; - read_or_close!(&mut bytes); - let magic_number = *<&[u8; 7]>::try_from(&bytes[0..7]).unwrap(); - Frame::Handshake { - magic_number, - version: [ - u32::from_le_bytes(*<&[u8; 4]>::try_from(&bytes[7..11]).unwrap()), - u32::from_le_bytes(*<&[u8; 4]>::try_from(&bytes[11..15]).unwrap()), - u32::from_le_bytes(*<&[u8; 4]>::try_from(&bytes[15..19]).unwrap()), - ], - } - }, - FRAME_INIT => { - let mut bytes = [0u8; 16]; - read_or_close!(&mut bytes); - let pid = Pid::from_le_bytes(bytes); - read_or_close!(&mut bytes); - let secret = u128::from_le_bytes(bytes); - Frame::Init { pid, secret } - }, - FRAME_SHUTDOWN => Frame::Shutdown, - FRAME_OPEN_STREAM => { - let mut bytes = [0u8; 10]; - read_or_close!(&mut bytes); - let sid = Sid::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[0..8]).unwrap()); - let prio = bytes[8]; - let promises = bytes[9]; - Frame::OpenStream { - sid, - prio, - promises, - } - }, - FRAME_CLOSE_STREAM => { - let mut bytes = [0u8; 8]; - read_or_close!(&mut bytes); - let sid = Sid::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[0..8]).unwrap()); - Frame::CloseStream { sid } - }, - FRAME_DATA_HEADER => { - let mut bytes = [0u8; 24]; - read_or_close!(&mut bytes); - let mid = Mid::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[0..8]).unwrap()); - let sid = Sid::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[8..16]).unwrap()); - let length = u64::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[16..24]).unwrap()); - Frame::DataHeader { mid, sid, length } - }, - FRAME_DATA => { - let mut bytes = [0u8; 18]; - read_or_close!(&mut bytes); - let mid = Mid::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[0..8]).unwrap()); - let start = u64::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[8..16]).unwrap()); - let length = u16::from_le_bytes(*<&[u8; 2]>::try_from(&bytes[16..18]).unwrap()); - let mut data = vec![0; length as usize]; + match Self::read_frame(&mut stream, &mut end_r).await { + Ok(frame) => { #[cfg(feature = "metrics")] - throughput_cache.inc_by(length as i64); - read_or_close!(&mut data); - Frame::Data { mid, start, data } + { + metrics_cache.with_label_values(&frame).inc(); + if let Frame::Data { + mid: _, + start: _, + ref data, + } = frame + { + throughput_cache.inc_by(data.len() as i64); + } + } + w2c_cid_frame_s + .send((cid, Ok(frame))) + .await + .expect("Channel or Participant seems no longer to exist"); }, - FRAME_RAW => { - let mut bytes = [0u8; 2]; - read_or_close!(&mut bytes); - let length = u16::from_le_bytes([bytes[0], bytes[1]]); - let mut data = vec![0; length as usize]; - read_or_close!(&mut data); - Frame::Raw(data) + Err(e_option) => { + if let Some(e) = e_option { + info!(?e, "Closing tcp protocol due to read error"); + //w2c_cid_frame_s is shared, dropping it wouldn't notify the receiver as + // every channel is holding a sender! thats why Ne + // need a explicit STOP here + w2c_cid_frame_s + .send((cid, Err(()))) + .await + .expect("Channel or Participant seems no longer to exist"); + } + //None is clean shutdown + break; }, - other => { - // report a RAW frame, but cannot rely on the next 2 bytes to be a size. - // guessing 32 bytes, which might help to sort down issues - let mut data = vec![0; 32]; - //keep the first byte! - read_or_close!(&mut data[1..]); - data[0] = other; - warn!(?data, "got a unexpected RAW msg"); - Frame::Raw(data) - }, - }; - #[cfg(feature = "metrics")] - metrics_cache.with_label_values(&frame).inc(); - w2c_cid_frame_s - .send((cid, Ok(frame))) - .await - .expect("Channel or Participant seems no longer to exist"); + } } trace!("Shutting down tcp read()"); } - /// read_except and if it fails, close the protocol - async fn write_or_close( - stream: &mut TcpStream, - bytes: &[u8], - c2w_frame_r: &mut mpsc::UnboundedReceiver, - ) -> bool { - match stream.write_all(&bytes).await { - Err(e) => { - info!( - ?e, - "Got an error writing to tcp, going to close this channel" - ); - c2w_frame_r.close(); - true + pub async fn write_frame( + w: &mut W, + frame: Frame, + ) -> Result<(), std::io::Error> { + match frame { + Frame::Handshake { + magic_number, + version, + } => { + w.write_all(&FRAME_HANDSHAKE.to_be_bytes()).await?; + w.write_all(&magic_number).await?; + w.write_all(&version[0].to_le_bytes()).await?; + w.write_all(&version[1].to_le_bytes()).await?; + w.write_all(&version[2].to_le_bytes()).await?; }, - _ => false, - } + Frame::Init { pid, secret } => { + w.write_all(&FRAME_INIT.to_be_bytes()).await?; + w.write_all(&pid.to_le_bytes()).await?; + w.write_all(&secret.to_le_bytes()).await?; + }, + Frame::Shutdown => { + w.write_all(&FRAME_SHUTDOWN.to_be_bytes()).await?; + }, + Frame::OpenStream { + sid, + prio, + promises, + } => { + w.write_all(&FRAME_OPEN_STREAM.to_be_bytes()).await?; + w.write_all(&sid.to_le_bytes()).await?; + w.write_all(&prio.to_le_bytes()).await?; + w.write_all(&promises.to_le_bytes()).await?; + }, + Frame::CloseStream { sid } => { + w.write_all(&FRAME_CLOSE_STREAM.to_be_bytes()).await?; + w.write_all(&sid.to_le_bytes()).await?; + }, + Frame::DataHeader { mid, sid, length } => { + w.write_all(&FRAME_DATA_HEADER.to_be_bytes()).await?; + w.write_all(&mid.to_le_bytes()).await?; + w.write_all(&sid.to_le_bytes()).await?; + w.write_all(&length.to_le_bytes()).await?; + }, + Frame::Data { mid, start, data } => { + w.write_all(&FRAME_DATA.to_be_bytes()).await?; + w.write_all(&mid.to_le_bytes()).await?; + w.write_all(&start.to_le_bytes()).await?; + w.write_all(&(data.len() as u16).to_le_bytes()).await?; + w.write_all(&data).await?; + }, + Frame::Raw(data) => { + w.write_all(&FRAME_RAW.to_be_bytes()).await?; + w.write_all(&(data.len() as u16).to_le_bytes()).await?; + w.write_all(&data).await?; + }, + }; + Ok(()) } pub async fn write_to_wire(&self, cid: Cid, mut c2w_frame_r: mpsc::UnboundedReceiver) { @@ -254,72 +281,27 @@ impl TcpProtocol { #[cfg(not(feature = "metrics"))] let _cid = cid; - macro_rules! write_or_close { - ($x:expr) => { - if TcpProtocol::write_or_close(&mut stream, $x, &mut c2w_frame_r).await { - trace!("write_or_close requested a shutdown"); - break; - } - }; - } - while let Some(frame) = c2w_frame_r.next().await { #[cfg(feature = "metrics")] - metrics_cache.with_label_values(&frame).inc(); - match frame { - Frame::Handshake { - magic_number, - version, - } => { - write_or_close!(&FRAME_HANDSHAKE.to_be_bytes()); - write_or_close!(&magic_number); - write_or_close!(&version[0].to_le_bytes()); - write_or_close!(&version[1].to_le_bytes()); - write_or_close!(&version[2].to_le_bytes()); - }, - Frame::Init { pid, secret } => { - write_or_close!(&FRAME_INIT.to_be_bytes()); - write_or_close!(&pid.to_le_bytes()); - write_or_close!(&secret.to_le_bytes()); - }, - Frame::Shutdown => { - write_or_close!(&FRAME_SHUTDOWN.to_be_bytes()); - }, - Frame::OpenStream { - sid, - prio, - promises, - } => { - write_or_close!(&FRAME_OPEN_STREAM.to_be_bytes()); - write_or_close!(&sid.to_le_bytes()); - write_or_close!(&prio.to_le_bytes()); - write_or_close!(&promises.to_le_bytes()); - }, - Frame::CloseStream { sid } => { - write_or_close!(&FRAME_CLOSE_STREAM.to_be_bytes()); - write_or_close!(&sid.to_le_bytes()); - }, - Frame::DataHeader { mid, sid, length } => { - write_or_close!(&FRAME_DATA_HEADER.to_be_bytes()); - write_or_close!(&mid.to_le_bytes()); - write_or_close!(&sid.to_le_bytes()); - write_or_close!(&length.to_le_bytes()); - }, - Frame::Data { mid, start, data } => { - #[cfg(feature = "metrics")] + { + metrics_cache.with_label_values(&frame).inc(); + if let Frame::Data { + mid: _, + start: _, + ref data, + } = frame + { throughput_cache.inc_by(data.len() as i64); - write_or_close!(&FRAME_DATA.to_be_bytes()); - write_or_close!(&mid.to_le_bytes()); - write_or_close!(&start.to_le_bytes()); - write_or_close!(&(data.len() as u16).to_le_bytes()); - write_or_close!(&data); - }, - Frame::Raw(data) => { - write_or_close!(&FRAME_RAW.to_be_bytes()); - write_or_close!(&(data.len() as u16).to_le_bytes()); - write_or_close!(&data); - }, + } } + if let Err(e) = Self::write_frame(&mut stream, frame).await { + info!( + ?e, + "Got an error writing to tcp, going to close this channel" + ); + c2w_frame_r.close(); + break; + }; } trace!("shutting down tcp write()"); } @@ -372,81 +354,22 @@ impl UdpProtocol { let frame_no = bytes[0]; let frame = match frame_no { FRAME_HANDSHAKE => { - let bytes = &bytes[1..20]; - let magic_number = [ - bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], - ]; - Frame::Handshake { - magic_number, - version: [ - u32::from_le_bytes([bytes[7], bytes[8], bytes[9], bytes[10]]), - u32::from_le_bytes([bytes[11], bytes[12], bytes[13], bytes[14]]), - u32::from_le_bytes([bytes[15], bytes[16], bytes[17], bytes[18]]), - ], - } - }, - FRAME_INIT => { - let pid = Pid::from_le_bytes([ - bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], - bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], - bytes[15], bytes[16], - ]); - let secret = u128::from_le_bytes([ - bytes[17], bytes[18], bytes[19], bytes[20], bytes[21], bytes[22], - bytes[23], bytes[24], bytes[25], bytes[26], bytes[27], bytes[28], - bytes[29], bytes[30], bytes[31], bytes[32], - ]); - Frame::Init { pid, secret } + Frame::gen_handshake(*<&[u8; 19]>::try_from(&bytes[1..20]).unwrap()) }, + FRAME_INIT => Frame::gen_init(*<&[u8; 32]>::try_from(&bytes[1..33]).unwrap()), FRAME_SHUTDOWN => Frame::Shutdown, FRAME_OPEN_STREAM => { - let bytes = &bytes[1..11]; - let sid = Sid::from_le_bytes([ - bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], - bytes[7], - ]); - let prio = bytes[8]; - let promises = bytes[9]; - Frame::OpenStream { - sid, - prio, - promises, - } + Frame::gen_open_stream(*<&[u8; 10]>::try_from(&bytes[1..11]).unwrap()) }, FRAME_CLOSE_STREAM => { - let bytes = &bytes[1..9]; - let sid = Sid::from_le_bytes([ - bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], - bytes[7], - ]); - Frame::CloseStream { sid } + Frame::gen_close_stream(*<&[u8; 8]>::try_from(&bytes[1..9]).unwrap()) }, FRAME_DATA_HEADER => { - let bytes = &bytes[1..25]; - let mid = Mid::from_le_bytes([ - bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], - bytes[7], - ]); - let sid = Sid::from_le_bytes([ - bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], - bytes[15], - ]); - let length = u64::from_le_bytes([ - bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21], - bytes[22], bytes[23], - ]); - Frame::DataHeader { mid, sid, length } + Frame::gen_data_header(*<&[u8; 24]>::try_from(&bytes[1..25]).unwrap()) }, FRAME_DATA => { - let mid = Mid::from_le_bytes([ - bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], - bytes[8], - ]); - let start = u64::from_le_bytes([ - bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15], - bytes[16], - ]); - let length = u16::from_le_bytes([bytes[17], bytes[18]]); + let (mid, start, length) = + Frame::gen_data(*<&[u8; 18]>::try_from(&bytes[1..19]).unwrap()); let mut data = vec![0; length as usize]; #[cfg(feature = "metrics")] throughput_cache.inc_by(length as i64); @@ -454,7 +377,7 @@ impl UdpProtocol { Frame::Data { mid, start, data } }, FRAME_RAW => { - let length = u16::from_le_bytes([bytes[1], bytes[2]]); + let length = Frame::gen_raw(*<&[u8; 2]>::try_from(&bytes[1..3]).unwrap()); let mut data = vec![0; length as usize]; data.copy_from_slice(&bytes[3..]); Frame::Raw(data) @@ -648,7 +571,6 @@ mod tests { .await .unwrap(); client.flush(); - //handle data let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::(); let (read_stop_sender, read_stop_receiver) = oneshot::channel(); diff --git a/network/src/types.rs b/network/src/types.rs index 51fd1843e3..527fc368ae 100644 --- a/network/src/types.rs +++ b/network/src/types.rs @@ -1,4 +1,5 @@ use rand::Rng; +use std::convert::TryFrom; pub type Mid = u64; pub type Cid = u64; @@ -124,6 +125,58 @@ impl Frame { #[cfg(feature = "metrics")] pub fn get_string(&self) -> &str { Self::int_to_string(self.get_int()) } + + pub fn gen_handshake(buf: [u8; 19]) -> Self { + let magic_number = *<&[u8; 7]>::try_from(&buf[0..7]).unwrap(); + Frame::Handshake { + magic_number, + version: [ + u32::from_le_bytes(*<&[u8; 4]>::try_from(&buf[7..11]).unwrap()), + u32::from_le_bytes(*<&[u8; 4]>::try_from(&buf[11..15]).unwrap()), + u32::from_le_bytes(*<&[u8; 4]>::try_from(&buf[15..19]).unwrap()), + ], + } + } + + pub fn gen_init(buf: [u8; 32]) -> Self { + Frame::Init { + pid: Pid::from_le_bytes(*<&[u8; 16]>::try_from(&buf[0..16]).unwrap()), + secret: u128::from_le_bytes(*<&[u8; 16]>::try_from(&buf[16..32]).unwrap()), + } + } + + pub fn gen_open_stream(buf: [u8; 10]) -> Self { + Frame::OpenStream { + sid: Sid::from_le_bytes(*<&[u8; 8]>::try_from(&buf[0..8]).unwrap()), + prio: buf[8], + promises: buf[9], + } + } + + pub fn gen_close_stream(buf: [u8; 8]) -> Self { + Frame::CloseStream { + sid: Sid::from_le_bytes(*<&[u8; 8]>::try_from(&buf[0..8]).unwrap()), + } + } + + pub fn gen_data_header(buf: [u8; 24]) -> Self { + Frame::DataHeader { + mid: Mid::from_le_bytes(*<&[u8; 8]>::try_from(&buf[0..8]).unwrap()), + sid: Sid::from_le_bytes(*<&[u8; 8]>::try_from(&buf[8..16]).unwrap()), + length: u64::from_le_bytes(*<&[u8; 8]>::try_from(&buf[16..24]).unwrap()), + } + } + + pub fn gen_data(buf: [u8; 18]) -> (Mid, u64, u16) { + let mid = Mid::from_le_bytes(*<&[u8; 8]>::try_from(&buf[0..8]).unwrap()); + let start = u64::from_le_bytes(*<&[u8; 8]>::try_from(&buf[8..16]).unwrap()); + let length = u16::from_le_bytes(*<&[u8; 2]>::try_from(&buf[16..18]).unwrap()); + (mid, start, length) + } + + pub fn gen_raw(buf: [u8; 2]) -> u16 { + u16::from_le_bytes(*<&[u8; 2]>::try_from(&buf[0..2]).unwrap()) + } } impl Pid { diff --git a/network/tests/closing.rs b/network/tests/closing.rs index 0a9a51ed93..cfd81ac29f 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -285,6 +285,7 @@ fn failed_stream_open_after_remote_part_is_closed() { #[test] fn open_participant_before_remote_part_is_closed() { + let (_, _) = helper::setup(false, 0); let (n_a, f) = Network::new(Pid::fake(0)); std::thread::spawn(f); let (n_b, f) = Network::new(Pid::fake(1)); @@ -305,6 +306,7 @@ fn open_participant_before_remote_part_is_closed() { #[test] fn open_participant_after_remote_part_is_closed() { + let (_, _) = helper::setup(false, 0); let (n_a, f) = Network::new(Pid::fake(0)); std::thread::spawn(f); let (n_b, f) = Network::new(Pid::fake(1)); @@ -325,6 +327,7 @@ fn open_participant_after_remote_part_is_closed() { #[test] fn close_network_scheduler_completely() { + let (_, _) = helper::setup(false, 0); let (n_a, f) = Network::new(Pid::fake(0)); let ha = std::thread::spawn(f); let (n_b, f) = Network::new(Pid::fake(1));