Experiment with TCP buffering

This commit is contained in:
Marcel Märtens 2020-02-28 16:57:39 +01:00
parent a6f1e3f176
commit 19fb1d3be4
5 changed files with 152 additions and 71 deletions

View File

@ -21,8 +21,8 @@ pub(crate) trait ChannelProtocol {
type Handle: ?Sized + mio::Evented;
/// Execute when ready to read
fn read(&mut self) -> Vec<Frame>;
/// Execute when ready to write
fn write(&mut self, frame: Frame);
/// Execute when ready to write, return Err when would block
fn write(&mut self, frame: Frame) -> Result<(), ()>;
/// used for mio
fn get_handle(&self) -> &Self::Handle;
}
@ -142,17 +142,23 @@ impl Channel {
match &mut self.protocol {
ChannelProtocols::Tcp(c) => {
while let Some(frame) = self.send_queue.pop_front() {
c.write(frame)
if c.write(frame).is_err() {
break;
}
}
},
ChannelProtocols::Udp(c) => {
while let Some(frame) = self.send_queue.pop_front() {
c.write(frame)
if c.write(frame).is_err() {
break;
}
}
},
ChannelProtocols::Mpsc(c) => {
while let Some(frame) = self.send_queue.pop_front() {
c.write(frame)
if c.write(frame).is_err() {
break;
}
}
},
}
@ -165,11 +171,14 @@ impl Channel {
version,
} => {
if magic_number != VELOREN_MAGIC_NUMBER {
error!("tcp connection with invalid handshake, closing connection");
error!(
?magic_number,
"connection with invalid magic_number, closing connection"
);
self.wrong_shutdown(Self::WRONG_NUMBER);
}
if version != VELOREN_NETWORK_VERSION {
error!("tcp connection with wrong network version");
error!(?version, "tcp connection with wrong network version");
self.wrong_shutdown(
format!(
"{} Our Version: {:?}\nYour Version: {:?}\nClosing the connection",

View File

@ -34,7 +34,7 @@ impl ChannelProtocol for MpscChannel {
}
/// Execute when ready to write
fn write(&mut self, frame: Frame) {
fn write(&mut self, frame: Frame) -> Result<(), ()> {
match self.endpoint_sender.send(frame) {
Ok(n) => {
trace!("semded");
@ -43,12 +43,13 @@ impl ChannelProtocol for MpscChannel {
if e.kind() == std::io::ErrorKind::WouldBlock =>
{
debug!("would block");
return;
return Err(());
}
Err(e) => {
panic!("{}", e);
},
};
Ok(())
}
fn get_handle(&self) -> &Self::Handle { &self.endpoint_receiver }

View File

@ -1,7 +1,10 @@
use crate::{channel::ChannelProtocol, types::Frame};
use bincode;
use mio::net::TcpStream;
use std::io::{Read, Write};
use std::{
io::{Read, Write},
ops::Range,
};
use tracing::*;
pub(crate) struct TcpChannel {
@ -9,79 +12,141 @@ pub(crate) struct TcpChannel {
//these buffers only ever contain 1 FRAME !
read_buffer: Vec<u8>,
write_buffer: Vec<u8>,
filled_data: usize,
serialized_data: usize,
need_to_send_till: usize,
}
impl TcpChannel {
pub fn new(endpoint: TcpStream) -> Self {
let mut b = vec![0; 1600000];
//let mut b = vec![0; 1048576]; // 1 MB
let mut b = vec![0; 2048]; // 1 MB
Self {
endpoint,
read_buffer: b.clone(),
write_buffer: b,
filled_data: 0,
serialized_data: 0,
need_to_send_till: 0,
}
}
}
fn move_in_vec(vec: &mut Vec<u8>, src: Range<usize>, dest: Range<usize>) {
debug_assert_eq!(src.end - src.start, dest.end - dest.start);
let mut i2 = dest.start;
for i in src {
vec[i2] = vec[i];
i2 += 1;
}
}
impl ChannelProtocol for TcpChannel {
type Handle = TcpStream;
/// Execute when ready to read
fn read(&mut self) -> Vec<Frame> {
let mut result = Vec::new();
match self.endpoint.read(self.read_buffer.as_mut_slice()) {
Ok(n) => {
trace!("incomming message with len: {}", n);
let mut cur = std::io::Cursor::new(&self.read_buffer[..n]);
while cur.position() < n as u64 {
let round_start = cur.position();
let r: Result<Frame, _> = bincode::deserialize_from(&mut cur);
match r {
Ok(frame) => result.push(frame),
Err(e) => {
let newlen = self.read_buffer.len() * 2;
let debug_part = &self.read_buffer[(round_start as usize)
..std::cmp::min(n as usize, (round_start + 10) as usize)];
warn!(
?self,
?e,
?round_start,
"message cant be parsed, probably because buffer isn't large \
enough, starting with: {:?}, increase to {}",
debug_part,
newlen
);
error!(
"please please please find a solution, either we need to keep the \
buffer hight 1500 and hope for the other part to coorporate or \
we need a way to keep some data in read_buffer till next call or \
have a loop around it ... etc... which is error prone, so i dont \
want to do it!"
);
if newlen > 204800000 {
error!(
"something is seriossly broken with our messages, skipp the \
resize"
);
} else {
self.read_buffer.resize(newlen as usize, 0);
}
break;
},
loop {
match self
.endpoint
.read(&mut self.read_buffer[self.filled_data..])
{
Ok(n) => {
trace!(?self.filled_data, "incomming message with len: {}", n);
self.filled_data += n;
let cursor_start = self.serialized_data;
let mut cur = std::io::Cursor::new(
&self.read_buffer[self.serialized_data..self.filled_data],
);
while cur.position() < n as u64 {
let round_start = cur.position() as usize;
let r: Result<Frame, _> = bincode::deserialize_from(&mut cur);
match r {
Ok(frame) => {
self.serialized_data = cursor_start + cur.position() as usize;
result.push(frame);
},
Err(e) => {
/* Probably we have to wait for moare data!
* Our strategy is as follows: If there is space in our buffer,
* we just set a flag to the failed start, and the point it's
* filled to, On the next run, we
* continue filling and retry to convert from the last point.
* This way no memory needs to be copied, but we need a larger
* buffer. Once either the
* following will happen
* a) We sucessfully deserialized everything we send -> So we can
* safe reset to 0! b) Our buffer
* is full => 1) We started at
* != 0 => we copy the memory to start, and set both variables to
* 0 2) We need to increase
* the buffer (this will never happenTM) */
let first_bytes_of_msg = &self.read_buffer[(round_start as usize)
..std::cmp::min(n as usize, (round_start + 16) as usize)];
debug!(?self, ?self.serialized_data, ?self.filled_data, ?e, ?n, ?round_start, ?first_bytes_of_msg, "message cant be parsed, probably because we need to wait for more data");
warn!("aa {:?}", self.read_buffer);
break;
},
}
}
}
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("would block");
},
Err(e) => {
panic!("{}", e);
},
};
if self.serialized_data == self.filled_data {
// reset the buffer as everything received was handled!
self.filled_data = 0;
self.serialized_data = 0;
} else {
// TODO: Checks for memory movement!
if self.filled_data == self.read_buffer.len() {
let move_src = self.serialized_data..self.filled_data;
trace!(?move_src, "readbuffer was full, moving memory to front");
warn!(?self.filled_data, ?self.serialized_data, "bb {:?}", self.read_buffer);
let move_dest = 0..self.filled_data - self.serialized_data;
move_in_vec(&mut self.read_buffer, move_src, move_dest.clone());
self.filled_data = move_dest.end;
self.serialized_data = 0;
warn!(?self.filled_data, ?self.serialized_data, "cc {:?}", self.read_buffer);
}
}
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("would block");
break;
},
Err(e) => {
panic!("{}", e);
},
};
}
result
}
/// Execute when ready to write
fn write(&mut self, frame: Frame) {
fn write(&mut self, frame: Frame) -> Result<(), ()> {
if self.need_to_send_till != 0 {
//send buffer first
match self
.endpoint
.write(&self.write_buffer[..self.need_to_send_till])
{
Ok(n) if n == self.need_to_send_till => {
trace!("cleared buffer {}", n);
self.need_to_send_till = 0;
},
Ok(n) => {
debug!("could only send part of buffer, this is going bad if happens often! ");
let move_src = n..self.need_to_send_till;
let move_dest = 0..self.need_to_send_till - n;
move_in_vec(&mut self.read_buffer, move_src, move_dest.clone());
self.need_to_send_till = move_dest.end;
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("would block");
},
Err(e) => {
panic!("{}", e);
},
};
};
if let Ok(mut data) = bincode::serialize(&frame) {
let total = data.len();
match self.endpoint.write(&data) {
@ -90,19 +155,24 @@ impl ChannelProtocol for TcpChannel {
},
Ok(n) => {
error!("could only send part");
//let data = data.drain(n..).collect(); //TODO:
// validate n.. is correct
// to_send.push_front(data);
self.write_buffer[self.need_to_send_till..self.need_to_send_till + total - n]
.clone_from_slice(&data[n..]);
self.need_to_send_till += total - n;
return Err(());
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("would block");
return;
self.write_buffer[self.need_to_send_till..self.need_to_send_till + total]
.clone_from_slice(&data[..]);
self.need_to_send_till += total;
return Err(());
},
Err(e) => {
panic!("{}", e);
},
};
};
Ok(())
}
fn get_handle(&self) -> &Self::Handle { &self.endpoint }

View File

@ -57,7 +57,7 @@ impl ChannelProtocol for UdpChannel {
}
/// Execute when ready to write
fn write(&mut self, frame: Frame) {
fn write(&mut self, frame: Frame) -> Result<(), ()> {
if let Ok(mut data) = bincode::serialize(&frame) {
let total = data.len();
match self.endpoint.send(&data) {
@ -72,13 +72,14 @@ impl ChannelProtocol for UdpChannel {
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("would block");
return;
return Err(());
},
Err(e) => {
panic!("{}", e);
},
};
};
Ok(())
}
fn get_handle(&self) -> &Self::Handle { &self.endpoint }

View File

@ -59,7 +59,7 @@ fn main() {
)
.get_matches();
let filter = EnvFilter::from_default_env().add_directive("warn".parse().unwrap());
let filter = EnvFilter::from_default_env().add_directive("trace".parse().unwrap());
//.add_directive("veloren_network::tests=trace".parse().unwrap());
tracing_subscriber::FmtSubscriber::builder()
@ -121,18 +121,18 @@ fn client() {
let p1 = block_on(client.connect(&address)).unwrap(); //remote representation of p1
let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1
let mut last = Instant::now();
let mut id = 0u64;
loop {
let mut id = 0u64;
s1.send(Msg::Ping {
id,
data: vec![0; 100],
data: vec![0; 1000],
});
id += 1;
if id.rem_euclid(10000) == 0 {
if id.rem_euclid(1000000) == 0 {
let new = Instant::now();
let diff = new.duration_since(last);
last = new;
println!("10.000 took {}", diff.as_millis());
println!("1.000.000 took {}", diff.as_millis());
}
let _: Result<Option<Msg>, _> = s1.recv();
}