From 17f9dda87b99ffced7f97c285f6e989d0ee21ba7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Mon, 2 Mar 2020 16:50:19 +0100 Subject: [PATCH] Fix TCP buffering with a NetworkBuffer struct --- network/src/tcp.rs | 245 +++++++++++++----------- network/tools/network-speed/src/main.rs | 2 +- 2 files changed, 137 insertions(+), 110 deletions(-) diff --git a/network/src/tcp.rs b/network/src/tcp.rs index 530e88e1d1..8e9a42da22 100644 --- a/network/src/tcp.rs +++ b/network/src/tcp.rs @@ -10,28 +10,97 @@ use tracing::*; pub(crate) struct TcpChannel { endpoint: TcpStream, //these buffers only ever contain 1 FRAME ! - read_buffer: Vec, - write_buffer: Vec, - filled_data: usize, - serialized_data: usize, + read_buffer: NetworkBuffer, + write_buffer: NetworkBuffer, need_to_send_till: usize, } +struct NetworkBuffer { + data: Vec, + read_idx: usize, + write_idx: usize, +} + impl TcpChannel { pub fn new(endpoint: TcpStream) -> Self { - //let mut b = vec![0; 1048576]; // 1 MB - let mut b = vec![0; 2048]; // 1 MB Self { endpoint, - read_buffer: b.clone(), - write_buffer: b, - filled_data: 0, - serialized_data: 0, + read_buffer: NetworkBuffer::new(), + write_buffer: NetworkBuffer::new(), need_to_send_till: 0, } } } +/// NetworkBuffer to use for streamed access +/// valid data is between read_idx and write_idx! +/// everything before read_idx is already processed and no longer important +/// everything after write_idx is either 0 or random data buffered +impl NetworkBuffer { + fn new() -> Self { + NetworkBuffer { + data: vec![0; 2048], + read_idx: 0, + write_idx: 0, + } + } + + fn get_write_slice(&mut self, min_size: usize) -> &mut [u8] { + if self.data.len() < self.write_idx + min_size { + trace!( + ?self, + ?min_size, + "need to resize because buffer is to small" + ); + self.data.resize(self.write_idx + min_size, 0); + } + &mut self.data[self.write_idx..] + } + + fn actually_written(&mut self, cnt: usize) { self.write_idx += cnt; } + + fn get_read_slice(&self) -> &[u8] { + trace!(?self, "get_read_slice"); + &self.data[self.read_idx..self.write_idx] + } + + fn actually_read(&mut self, cnt: usize) { + self.read_idx += cnt; + if self.read_idx == self.write_idx { + if self.read_idx > 10485760 { + trace!(?self, "buffer empty, resetting indices"); + } + self.read_idx = 0; + self.write_idx = 0; + } + if self.write_idx > 10485760 { + if self.write_idx - self.read_idx < 65536 { + debug!( + ?self, + "This buffer is filled over 10 MB, but the actual data diff is less then \ + 65kB, which is a sign of stressing this connection much as always new data \ + comes in - nevertheless, in order to handle this we will remove some data \ + now so that this buffer doesn't grow endlessly" + ); + let mut i2 = 0; + for i in self.read_idx..self.write_idx { + self.data[i2] = self.data[i]; + i2 += 1; + } + self.read_idx = 0; + self.write_idx = i2; + } + if self.data.len() > 67108864 { + warn!( + ?self, + "over 64Mbyte used, something seems fishy, len: {}", + self.data.len() + ); + } + } + } +} + fn move_in_vec(vec: &mut Vec, src: Range, dest: Range) { debug_assert_eq!(src.end - src.start, dest.end - dest.start); let mut i2 = dest.start; @@ -48,73 +117,45 @@ impl ChannelProtocol for TcpChannel { fn read(&mut self) -> Vec { let mut result = Vec::new(); loop { - match self - .endpoint - .read(&mut self.read_buffer[self.filled_data..]) - { + match self.endpoint.read(self.read_buffer.get_write_slice(2048)) { Ok(n) => { - trace!(?self.filled_data, "incomming message with len: {}", n); - self.filled_data += n; - let cursor_start = self.serialized_data; - let mut cur = std::io::Cursor::new( - &self.read_buffer[self.serialized_data..self.filled_data], - ); + self.read_buffer.actually_written(n); + trace!("incomming message with len: {}", n); + let slice = self.read_buffer.get_read_slice(); + let mut cur = std::io::Cursor::new(slice); + let mut read_ok = 0; while cur.position() < n as u64 { let round_start = cur.position() as usize; let r: Result = bincode::deserialize_from(&mut cur); match r { Ok(frame) => { - self.serialized_data = cursor_start + cur.position() as usize; result.push(frame); + read_ok = cur.position() as usize; }, Err(e) => { - /* Probably we have to wait for moare data! - * Our strategy is as follows: If there is space in our buffer, - * we just set a flag to the failed start, and the point it's - * filled to, On the next run, we - * continue filling and retry to convert from the last point. - * This way no memory needs to be copied, but we need a larger - * buffer. Once either the - * following will happen - * a) We sucessfully deserialized everything we send -> So we can - * safe reset to 0! b) Our buffer - * is full => 1) We started at - * != 0 => we copy the memory to start, and set both variables to - * 0 2) We need to increase - * the buffer (this will never happenTM) */ - let first_bytes_of_msg = &self.read_buffer[(round_start as usize) - ..std::cmp::min(n as usize, (round_start + 16) as usize)]; - debug!(?self, ?self.serialized_data, ?self.filled_data, ?e, ?n, ?round_start, ?first_bytes_of_msg, "message cant be parsed, probably because we need to wait for more data"); - warn!("aa {:?}", self.read_buffer); + // Probably we have to wait for moare data! + let first_bytes_of_msg = + &slice[round_start..std::cmp::min(n, round_start + 16)]; + debug!( + ?self, + ?e, + ?n, + ?round_start, + ?first_bytes_of_msg, + "message cant be parsed, probably because we need to wait for \ + more data" + ); break; }, } } - if self.serialized_data == self.filled_data { - // reset the buffer as everything received was handled! - self.filled_data = 0; - self.serialized_data = 0; - } else { - // TODO: Checks for memory movement! - if self.filled_data == self.read_buffer.len() { - let move_src = self.serialized_data..self.filled_data; - trace!(?move_src, "readbuffer was full, moving memory to front"); - warn!(?self.filled_data, ?self.serialized_data, "bb {:?}", self.read_buffer); - let move_dest = 0..self.filled_data - self.serialized_data; - move_in_vec(&mut self.read_buffer, move_src, move_dest.clone()); - self.filled_data = move_dest.end; - self.serialized_data = 0; - warn!(?self.filled_data, ?self.serialized_data, "cc {:?}", self.read_buffer); - } - } + self.read_buffer.actually_read(read_ok); }, Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { debug!("would block"); break; }, - Err(e) => { - panic!("{}", e); - }, + Err(e) => panic!("{}", e), }; } result @@ -122,57 +163,30 @@ impl ChannelProtocol for TcpChannel { /// Execute when ready to write fn write(&mut self, frame: Frame) -> Result<(), ()> { - if self.need_to_send_till != 0 { - //send buffer first - match self - .endpoint - .write(&self.write_buffer[..self.need_to_send_till]) - { - Ok(n) if n == self.need_to_send_till => { - trace!("cleared buffer {}", n); - self.need_to_send_till = 0; - }, - Ok(n) => { - debug!("could only send part of buffer, this is going bad if happens often! "); - let move_src = n..self.need_to_send_till; - let move_dest = 0..self.need_to_send_till - n; - move_in_vec(&mut self.read_buffer, move_src, move_dest.clone()); - self.need_to_send_till = move_dest.end; - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("would block"); - }, - Err(e) => { - panic!("{}", e); - }, - }; + if let Ok(mut size) = bincode::serialized_size(&frame) { + let slice = self.write_buffer.get_write_slice(size as usize); + if let Err(e) = bincode::serialize_into(slice, &frame) { + error!( + "serialising frame was unsuccessful, this should never happen! dropping frame!" + ) + } + self.write_buffer.actually_written(size as usize); //I have to rely on those informations to be consistent! + } else { + error!( + "getting size of frame was unsuccessful, this should never happen! dropping frame!" + ) }; - if let Ok(mut data) = bincode::serialize(&frame) { - let total = data.len(); - match self.endpoint.write(&data) { - Ok(n) if n == total => { - trace!("send {} bytes", n); - }, - Ok(n) => { - error!("could only send part"); - self.write_buffer[self.need_to_send_till..self.need_to_send_till + total - n] - .clone_from_slice(&data[n..]); - self.need_to_send_till += total - n; - return Err(()); - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("would block"); - self.write_buffer[self.need_to_send_till..self.need_to_send_till + total] - .clone_from_slice(&data[..]); - self.need_to_send_till += total; - return Err(()); - }, - Err(e) => { - panic!("{}", e); - }, - }; - }; - Ok(()) + match self.endpoint.write(self.write_buffer.get_read_slice()) { + Ok(n) => { + self.write_buffer.actually_read(n); + Ok(()) + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + debug!("can't send tcp yet, would block"); + Err(()) + }, + Err(e) => panic!("{}", e), + } } fn get_handle(&self) -> &Self::Handle { &self.endpoint } @@ -184,3 +198,16 @@ impl std::fmt::Debug for TcpChannel { write!(f, "{:?}", self.endpoint) } } + +impl std::fmt::Debug for NetworkBuffer { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "NetworkBuffer(len: {}, read: {}, write: {})", + self.data.len(), + self.read_idx, + self.write_idx + ) + } +} diff --git a/network/tools/network-speed/src/main.rs b/network/tools/network-speed/src/main.rs index c8e5809a02..ac6346bc02 100644 --- a/network/tools/network-speed/src/main.rs +++ b/network/tools/network-speed/src/main.rs @@ -59,7 +59,7 @@ fn main() { ) .get_matches(); - let filter = EnvFilter::from_default_env().add_directive("trace".parse().unwrap()); + let filter = EnvFilter::from_default_env().add_directive("error".parse().unwrap()); //.add_directive("veloren_network::tests=trace".parse().unwrap()); tracing_subscriber::FmtSubscriber::builder()