diff --git a/network/src/channel.rs b/network/src/channel.rs index 5312fef280..3f03120fd9 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -21,8 +21,8 @@ pub(crate) trait ChannelProtocol { type Handle: ?Sized + mio::Evented; /// Execute when ready to read fn read(&mut self) -> Vec; - /// Execute when ready to write - fn write(&mut self, frame: Frame); + /// Execute when ready to write, return Err when would block + fn write(&mut self, frame: Frame) -> Result<(), ()>; /// used for mio fn get_handle(&self) -> &Self::Handle; } @@ -142,17 +142,23 @@ impl Channel { match &mut self.protocol { ChannelProtocols::Tcp(c) => { while let Some(frame) = self.send_queue.pop_front() { - c.write(frame) + if c.write(frame).is_err() { + break; + } } }, ChannelProtocols::Udp(c) => { while let Some(frame) = self.send_queue.pop_front() { - c.write(frame) + if c.write(frame).is_err() { + break; + } } }, ChannelProtocols::Mpsc(c) => { while let Some(frame) = self.send_queue.pop_front() { - c.write(frame) + if c.write(frame).is_err() { + break; + } } }, } @@ -165,11 +171,14 @@ impl Channel { version, } => { if magic_number != VELOREN_MAGIC_NUMBER { - error!("tcp connection with invalid handshake, closing connection"); + error!( + ?magic_number, + "connection with invalid magic_number, closing connection" + ); self.wrong_shutdown(Self::WRONG_NUMBER); } if version != VELOREN_NETWORK_VERSION { - error!("tcp connection with wrong network version"); + error!(?version, "tcp connection with wrong network version"); self.wrong_shutdown( format!( "{} Our Version: {:?}\nYour Version: {:?}\nClosing the connection", diff --git a/network/src/mpsc.rs b/network/src/mpsc.rs index e782421744..2793007e28 100644 --- a/network/src/mpsc.rs +++ b/network/src/mpsc.rs @@ -34,7 +34,7 @@ impl ChannelProtocol for MpscChannel { } /// Execute when ready to write - fn write(&mut self, frame: Frame) { + fn write(&mut self, frame: Frame) -> Result<(), ()> { match self.endpoint_sender.send(frame) { Ok(n) => { trace!("semded"); @@ -43,12 +43,13 @@ impl ChannelProtocol for MpscChannel { if e.kind() == std::io::ErrorKind::WouldBlock => { debug!("would block"); - return; + return Err(()); } Err(e) => { panic!("{}", e); }, }; + Ok(()) } fn get_handle(&self) -> &Self::Handle { &self.endpoint_receiver } diff --git a/network/src/tcp.rs b/network/src/tcp.rs index 4d3a060aca..530e88e1d1 100644 --- a/network/src/tcp.rs +++ b/network/src/tcp.rs @@ -1,7 +1,10 @@ use crate::{channel::ChannelProtocol, types::Frame}; use bincode; use mio::net::TcpStream; -use std::io::{Read, Write}; +use std::{ + io::{Read, Write}, + ops::Range, +}; use tracing::*; pub(crate) struct TcpChannel { @@ -9,79 +12,141 @@ pub(crate) struct TcpChannel { //these buffers only ever contain 1 FRAME ! read_buffer: Vec, write_buffer: Vec, + filled_data: usize, + serialized_data: usize, + need_to_send_till: usize, } impl TcpChannel { pub fn new(endpoint: TcpStream) -> Self { - let mut b = vec![0; 1600000]; + //let mut b = vec![0; 1048576]; // 1 MB + let mut b = vec![0; 2048]; // 1 MB Self { endpoint, read_buffer: b.clone(), write_buffer: b, + filled_data: 0, + serialized_data: 0, + need_to_send_till: 0, } } } +fn move_in_vec(vec: &mut Vec, src: Range, dest: Range) { + debug_assert_eq!(src.end - src.start, dest.end - dest.start); + let mut i2 = dest.start; + for i in src { + vec[i2] = vec[i]; + i2 += 1; + } +} + impl ChannelProtocol for TcpChannel { type Handle = TcpStream; /// Execute when ready to read fn read(&mut self) -> Vec { let mut result = Vec::new(); - match self.endpoint.read(self.read_buffer.as_mut_slice()) { - Ok(n) => { - trace!("incomming message with len: {}", n); - let mut cur = std::io::Cursor::new(&self.read_buffer[..n]); - while cur.position() < n as u64 { - let round_start = cur.position(); - let r: Result = bincode::deserialize_from(&mut cur); - match r { - Ok(frame) => result.push(frame), - Err(e) => { - let newlen = self.read_buffer.len() * 2; - let debug_part = &self.read_buffer[(round_start as usize) - ..std::cmp::min(n as usize, (round_start + 10) as usize)]; - warn!( - ?self, - ?e, - ?round_start, - "message cant be parsed, probably because buffer isn't large \ - enough, starting with: {:?}, increase to {}", - debug_part, - newlen - ); - error!( - "please please please find a solution, either we need to keep the \ - buffer hight 1500 and hope for the other part to coorporate or \ - we need a way to keep some data in read_buffer till next call or \ - have a loop around it ... etc... which is error prone, so i dont \ - want to do it!" - ); - if newlen > 204800000 { - error!( - "something is seriossly broken with our messages, skipp the \ - resize" - ); - } else { - self.read_buffer.resize(newlen as usize, 0); - } - break; - }, + loop { + match self + .endpoint + .read(&mut self.read_buffer[self.filled_data..]) + { + Ok(n) => { + trace!(?self.filled_data, "incomming message with len: {}", n); + self.filled_data += n; + let cursor_start = self.serialized_data; + let mut cur = std::io::Cursor::new( + &self.read_buffer[self.serialized_data..self.filled_data], + ); + while cur.position() < n as u64 { + let round_start = cur.position() as usize; + let r: Result = bincode::deserialize_from(&mut cur); + match r { + Ok(frame) => { + self.serialized_data = cursor_start + cur.position() as usize; + result.push(frame); + }, + Err(e) => { + /* Probably we have to wait for moare data! + * Our strategy is as follows: If there is space in our buffer, + * we just set a flag to the failed start, and the point it's + * filled to, On the next run, we + * continue filling and retry to convert from the last point. + * This way no memory needs to be copied, but we need a larger + * buffer. Once either the + * following will happen + * a) We sucessfully deserialized everything we send -> So we can + * safe reset to 0! b) Our buffer + * is full => 1) We started at + * != 0 => we copy the memory to start, and set both variables to + * 0 2) We need to increase + * the buffer (this will never happenTM) */ + let first_bytes_of_msg = &self.read_buffer[(round_start as usize) + ..std::cmp::min(n as usize, (round_start + 16) as usize)]; + debug!(?self, ?self.serialized_data, ?self.filled_data, ?e, ?n, ?round_start, ?first_bytes_of_msg, "message cant be parsed, probably because we need to wait for more data"); + warn!("aa {:?}", self.read_buffer); + break; + }, + } } - } - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("would block"); - }, - Err(e) => { - panic!("{}", e); - }, - }; + if self.serialized_data == self.filled_data { + // reset the buffer as everything received was handled! + self.filled_data = 0; + self.serialized_data = 0; + } else { + // TODO: Checks for memory movement! + if self.filled_data == self.read_buffer.len() { + let move_src = self.serialized_data..self.filled_data; + trace!(?move_src, "readbuffer was full, moving memory to front"); + warn!(?self.filled_data, ?self.serialized_data, "bb {:?}", self.read_buffer); + let move_dest = 0..self.filled_data - self.serialized_data; + move_in_vec(&mut self.read_buffer, move_src, move_dest.clone()); + self.filled_data = move_dest.end; + self.serialized_data = 0; + warn!(?self.filled_data, ?self.serialized_data, "cc {:?}", self.read_buffer); + } + } + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + debug!("would block"); + break; + }, + Err(e) => { + panic!("{}", e); + }, + }; + } result } /// Execute when ready to write - fn write(&mut self, frame: Frame) { + fn write(&mut self, frame: Frame) -> Result<(), ()> { + if self.need_to_send_till != 0 { + //send buffer first + match self + .endpoint + .write(&self.write_buffer[..self.need_to_send_till]) + { + Ok(n) if n == self.need_to_send_till => { + trace!("cleared buffer {}", n); + self.need_to_send_till = 0; + }, + Ok(n) => { + debug!("could only send part of buffer, this is going bad if happens often! "); + let move_src = n..self.need_to_send_till; + let move_dest = 0..self.need_to_send_till - n; + move_in_vec(&mut self.read_buffer, move_src, move_dest.clone()); + self.need_to_send_till = move_dest.end; + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + debug!("would block"); + }, + Err(e) => { + panic!("{}", e); + }, + }; + }; if let Ok(mut data) = bincode::serialize(&frame) { let total = data.len(); match self.endpoint.write(&data) { @@ -90,19 +155,24 @@ impl ChannelProtocol for TcpChannel { }, Ok(n) => { error!("could only send part"); - //let data = data.drain(n..).collect(); //TODO: - // validate n.. is correct - // to_send.push_front(data); + self.write_buffer[self.need_to_send_till..self.need_to_send_till + total - n] + .clone_from_slice(&data[n..]); + self.need_to_send_till += total - n; + return Err(()); }, Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { debug!("would block"); - return; + self.write_buffer[self.need_to_send_till..self.need_to_send_till + total] + .clone_from_slice(&data[..]); + self.need_to_send_till += total; + return Err(()); }, Err(e) => { panic!("{}", e); }, }; }; + Ok(()) } fn get_handle(&self) -> &Self::Handle { &self.endpoint } diff --git a/network/src/udp.rs b/network/src/udp.rs index 009338d031..e5c32daf81 100644 --- a/network/src/udp.rs +++ b/network/src/udp.rs @@ -57,7 +57,7 @@ impl ChannelProtocol for UdpChannel { } /// Execute when ready to write - fn write(&mut self, frame: Frame) { + fn write(&mut self, frame: Frame) -> Result<(), ()> { if let Ok(mut data) = bincode::serialize(&frame) { let total = data.len(); match self.endpoint.send(&data) { @@ -72,13 +72,14 @@ impl ChannelProtocol for UdpChannel { }, Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { debug!("would block"); - return; + return Err(()); }, Err(e) => { panic!("{}", e); }, }; }; + Ok(()) } fn get_handle(&self) -> &Self::Handle { &self.endpoint } diff --git a/network/tools/network-speed/src/main.rs b/network/tools/network-speed/src/main.rs index 88d117e24b..c8e5809a02 100644 --- a/network/tools/network-speed/src/main.rs +++ b/network/tools/network-speed/src/main.rs @@ -59,7 +59,7 @@ fn main() { ) .get_matches(); - let filter = EnvFilter::from_default_env().add_directive("warn".parse().unwrap()); + let filter = EnvFilter::from_default_env().add_directive("trace".parse().unwrap()); //.add_directive("veloren_network::tests=trace".parse().unwrap()); tracing_subscriber::FmtSubscriber::builder() @@ -121,18 +121,18 @@ fn client() { let p1 = block_on(client.connect(&address)).unwrap(); //remote representation of p1 let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1 let mut last = Instant::now(); + let mut id = 0u64; loop { - let mut id = 0u64; s1.send(Msg::Ping { id, - data: vec![0; 100], + data: vec![0; 1000], }); id += 1; - if id.rem_euclid(10000) == 0 { + if id.rem_euclid(1000000) == 0 { let new = Instant::now(); let diff = new.duration_since(last); last = new; - println!("10.000 took {}", diff.as_millis()); + println!("1.000.000 took {}", diff.as_millis()); } let _: Result, _> = s1.recv(); }