diff --git a/network/src/channel.rs b/network/src/channel.rs index 3f03120fd9..d07826c7e1 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -21,8 +21,8 @@ pub(crate) trait ChannelProtocol { type Handle: ?Sized + mio::Evented; /// Execute when ready to read fn read(&mut self) -> Vec<Frame>; - /// Execute when ready to write, return Err when would block - fn write(&mut self, frame: Frame) -> Result<(), ()>; + /// Execute when ready to write + fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I); /// used for mio fn get_handle(&self) -> &Self::Handle; } @@ -141,25 +141,13 @@ impl Channel { self.tick_streams(); match &mut self.protocol { ChannelProtocols::Tcp(c) => { - while let Some(frame) = self.send_queue.pop_front() { - if c.write(frame).is_err() { - break; - } - } + c.write(&mut self.send_queue.drain(..)); }, ChannelProtocols::Udp(c) => { - while let Some(frame) = self.send_queue.pop_front() { - if c.write(frame).is_err() { - break; - } - } + c.write(&mut self.send_queue.drain(..)); }, ChannelProtocols::Mpsc(c) => { - while let Some(frame) = self.send_queue.pop_front() { - if c.write(frame).is_err() { - break; - } - } + c.write(&mut self.send_queue.drain(..)); }, } } diff --git a/network/src/mpsc.rs b/network/src/mpsc.rs index 2793007e28..08939ac2f9 100644 --- a/network/src/mpsc.rs +++ b/network/src/mpsc.rs @@ -33,23 +33,23 @@ impl ChannelProtocol for MpscChannel { result } - /// Execute when ready to write - fn write(&mut self, frame: Frame) -> Result<(), ()> { - match self.endpoint_sender.send(frame) { - Ok(n) => { - trace!("semded"); - }, - Err(mio_extras::channel::SendError::Io(e)) - if e.kind() == std::io::ErrorKind::WouldBlock => - { - debug!("would block"); - return Err(()); - } - Err(e) => { - panic!("{}", e); - }, - }; - Ok(()) + fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I) { + for frame in frames { + match self.endpoint_sender.send(frame) { + Ok(n) => { + trace!("sended"); + }, + Err(mio_extras::channel::SendError::Io(e)) + if e.kind() == std::io::ErrorKind::WouldBlock => + { + debug!("would block"); + return; + } + Err(e) => { + panic!("{}", e); + }, + }; + } } fn get_handle(&self) -> &Self::Handle { &self.endpoint_receiver } diff --git a/network/src/tcp.rs b/network/src/tcp.rs index 8e9a42da22..781e189ae5 100644 --- a/network/src/tcp.rs +++ b/network/src/tcp.rs @@ -59,10 +59,7 @@ impl NetworkBuffer { fn actually_written(&mut self, cnt: usize) { self.write_idx += cnt; } - fn get_read_slice(&self) -> &[u8] { - trace!(?self, "get_read_slice"); - &self.data[self.read_idx..self.write_idx] - } + fn get_read_slice(&self) -> &[u8] { &self.data[self.read_idx..self.write_idx] } fn actually_read(&mut self, cnt: usize) { self.read_idx += cnt; @@ -162,30 +159,42 @@ impl ChannelProtocol for TcpChannel { } /// Execute when ready to write - fn write(&mut self, frame: Frame) -> Result<(), ()> { - if let Ok(mut size) = bincode::serialized_size(&frame) { - let slice = self.write_buffer.get_write_slice(size as usize); - if let Err(e) = bincode::serialize_into(slice, &frame) { - error!( - "serialising frame was unsuccessful, this should never happen! dropping frame!" - ) + fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I) { + loop { + //serialize when len < MTU 1500, then write + if self.write_buffer.get_read_slice().len() < 1500 { + match frames.next() { + Some(frame) => { + if let Ok(mut size) = bincode::serialized_size(&frame) { + let slice = self.write_buffer.get_write_slice(size as usize); + if let Err(e) = bincode::serialize_into(slice, &frame) { + error!( + "serialising frame was unsuccessful, this should never \ + happen! dropping frame!" + ) + } + self.write_buffer.actually_written(size as usize); //I have to rely on those informations to be consistent! + } else { + error!( + "getting size of frame was unsuccessful, this should never \ + happen! dropping frame!" + ) + }; + }, + None => break, + } + } + + match self.endpoint.write(self.write_buffer.get_read_slice()) { + Ok(n) => { + self.write_buffer.actually_read(n); + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + debug!("can't send tcp yet, would block"); + return; + }, + Err(e) => panic!("{}", e), } - self.write_buffer.actually_written(size as usize); //I have to rely on those informations to be consistent! - } else { - error!( - "getting size of frame was unsuccessful, this should never happen! dropping frame!" - ) - }; - match self.endpoint.write(self.write_buffer.get_read_slice()) { - Ok(n) => { - self.write_buffer.actually_read(n); - Ok(()) - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("can't send tcp yet, would block"); - Err(()) - }, - Err(e) => panic!("{}", e), } } diff --git a/network/src/udp.rs b/network/src/udp.rs index e5c32daf81..7c53604755 100644 --- a/network/src/udp.rs +++ b/network/src/udp.rs @@ -57,29 +57,27 @@ impl ChannelProtocol for UdpChannel { } /// Execute when ready to write - fn write(&mut self, frame: Frame) -> Result<(), ()> { - if let Ok(mut data) = bincode::serialize(&frame) { - let total = data.len(); - match self.endpoint.send(&data) { - Ok(n) if n == total => { - trace!("send {} bytes", n); - }, - Ok(n) => { - error!("could only send part"); - //let data = data.drain(n..).collect(); //TODO: - // validate n.. is correct - // to_send.push_front(data); - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("would block"); - return Err(()); - }, - Err(e) => { - panic!("{}", e); - }, + fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I) { + for frame in frames { + if let Ok(mut data) = bincode::serialize(&frame) { + let total = data.len(); + match self.endpoint.send(&data) { + Ok(n) if n == total => { + trace!("send {} bytes", n); + }, + Ok(n) => { + error!("could only send part"); + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + debug!("would block"); + return; + }, + Err(e) => { + panic!("{}", e); + }, + }; }; - }; - Ok(()) + } } fn get_handle(&self) -> &Self::Handle { &self.endpoint }