Switch to iterator based ChannelProtocols

This commit is contained in:
Marcel Märtens
2020-03-03 12:33:56 +01:00
parent 17f9dda87b
commit f0e137c20e
4 changed files with 78 additions and 83 deletions

View File

@ -21,8 +21,8 @@ pub(crate) trait ChannelProtocol {
type Handle: ?Sized + mio::Evented; type Handle: ?Sized + mio::Evented;
/// Execute when ready to read /// Execute when ready to read
fn read(&mut self) -> Vec<Frame>; fn read(&mut self) -> Vec<Frame>;
/// Execute when ready to write, return Err when would block /// Execute when ready to write
fn write(&mut self, frame: Frame) -> Result<(), ()>; fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I);
/// used for mio /// used for mio
fn get_handle(&self) -> &Self::Handle; fn get_handle(&self) -> &Self::Handle;
} }
@ -141,25 +141,13 @@ impl Channel {
self.tick_streams(); self.tick_streams();
match &mut self.protocol { match &mut self.protocol {
ChannelProtocols::Tcp(c) => { ChannelProtocols::Tcp(c) => {
while let Some(frame) = self.send_queue.pop_front() { c.write(&mut self.send_queue.drain(..));
if c.write(frame).is_err() {
break;
}
}
}, },
ChannelProtocols::Udp(c) => { ChannelProtocols::Udp(c) => {
while let Some(frame) = self.send_queue.pop_front() { c.write(&mut self.send_queue.drain(..));
if c.write(frame).is_err() {
break;
}
}
}, },
ChannelProtocols::Mpsc(c) => { ChannelProtocols::Mpsc(c) => {
while let Some(frame) = self.send_queue.pop_front() { c.write(&mut self.send_queue.drain(..));
if c.write(frame).is_err() {
break;
}
}
}, },
} }
} }

View File

@ -33,23 +33,23 @@ impl ChannelProtocol for MpscChannel {
result result
} }
/// Execute when ready to write fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I) {
fn write(&mut self, frame: Frame) -> Result<(), ()> { for frame in frames {
match self.endpoint_sender.send(frame) { match self.endpoint_sender.send(frame) {
Ok(n) => { Ok(n) => {
trace!("semded"); trace!("sended");
}, },
Err(mio_extras::channel::SendError::Io(e)) Err(mio_extras::channel::SendError::Io(e))
if e.kind() == std::io::ErrorKind::WouldBlock => if e.kind() == std::io::ErrorKind::WouldBlock =>
{ {
debug!("would block"); debug!("would block");
return Err(()); return;
} }
Err(e) => { Err(e) => {
panic!("{}", e); panic!("{}", e);
}, },
}; };
Ok(()) }
} }
fn get_handle(&self) -> &Self::Handle { &self.endpoint_receiver } fn get_handle(&self) -> &Self::Handle { &self.endpoint_receiver }

View File

@ -59,10 +59,7 @@ impl NetworkBuffer {
fn actually_written(&mut self, cnt: usize) { self.write_idx += cnt; } fn actually_written(&mut self, cnt: usize) { self.write_idx += cnt; }
fn get_read_slice(&self) -> &[u8] { fn get_read_slice(&self) -> &[u8] { &self.data[self.read_idx..self.write_idx] }
trace!(?self, "get_read_slice");
&self.data[self.read_idx..self.write_idx]
}
fn actually_read(&mut self, cnt: usize) { fn actually_read(&mut self, cnt: usize) {
self.read_idx += cnt; self.read_idx += cnt;
@ -162,32 +159,44 @@ impl ChannelProtocol for TcpChannel {
} }
/// Execute when ready to write /// Execute when ready to write
fn write(&mut self, frame: Frame) -> Result<(), ()> { fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I) {
loop {
//serialize when len < MTU 1500, then write
if self.write_buffer.get_read_slice().len() < 1500 {
match frames.next() {
Some(frame) => {
if let Ok(mut size) = bincode::serialized_size(&frame) { if let Ok(mut size) = bincode::serialized_size(&frame) {
let slice = self.write_buffer.get_write_slice(size as usize); let slice = self.write_buffer.get_write_slice(size as usize);
if let Err(e) = bincode::serialize_into(slice, &frame) { if let Err(e) = bincode::serialize_into(slice, &frame) {
error!( error!(
"serialising frame was unsuccessful, this should never happen! dropping frame!" "serialising frame was unsuccessful, this should never \
happen! dropping frame!"
) )
} }
self.write_buffer.actually_written(size as usize); //I have to rely on those informations to be consistent! self.write_buffer.actually_written(size as usize); //I have to rely on those informations to be consistent!
} else { } else {
error!( error!(
"getting size of frame was unsuccessful, this should never happen! dropping frame!" "getting size of frame was unsuccessful, this should never \
happen! dropping frame!"
) )
}; };
},
None => break,
}
}
match self.endpoint.write(self.write_buffer.get_read_slice()) { match self.endpoint.write(self.write_buffer.get_read_slice()) {
Ok(n) => { Ok(n) => {
self.write_buffer.actually_read(n); self.write_buffer.actually_read(n);
Ok(())
}, },
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("can't send tcp yet, would block"); debug!("can't send tcp yet, would block");
Err(()) return;
}, },
Err(e) => panic!("{}", e), Err(e) => panic!("{}", e),
} }
} }
}
fn get_handle(&self) -> &Self::Handle { &self.endpoint } fn get_handle(&self) -> &Self::Handle { &self.endpoint }
} }

View File

@ -57,7 +57,8 @@ impl ChannelProtocol for UdpChannel {
} }
/// Execute when ready to write /// Execute when ready to write
fn write(&mut self, frame: Frame) -> Result<(), ()> { fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I) {
for frame in frames {
if let Ok(mut data) = bincode::serialize(&frame) { if let Ok(mut data) = bincode::serialize(&frame) {
let total = data.len(); let total = data.len();
match self.endpoint.send(&data) { match self.endpoint.send(&data) {
@ -66,20 +67,17 @@ impl ChannelProtocol for UdpChannel {
}, },
Ok(n) => { Ok(n) => {
error!("could only send part"); error!("could only send part");
//let data = data.drain(n..).collect(); //TODO:
// validate n.. is correct
// to_send.push_front(data);
}, },
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("would block"); debug!("would block");
return Err(()); return;
}, },
Err(e) => { Err(e) => {
panic!("{}", e); panic!("{}", e);
}, },
}; };
}; };
Ok(()) }
} }
fn get_handle(&self) -> &Self::Handle { &self.endpoint } fn get_handle(&self) -> &Self::Handle { &self.endpoint }