From fb0838e2c2ba918d437468f838389b51eab849c5 Mon Sep 17 00:00:00 2001 From: Avi Weinstock Date: Thu, 25 Mar 2021 19:51:50 -0400 Subject: [PATCH] Add LZ Dictionaries to the initial Participant stream negotiation. --- network/protocol/src/event.rs | 3 +++ network/protocol/src/frame.rs | 27 +++++++++++++++++++++++++-- network/protocol/src/prio.rs | 3 +++ network/protocol/src/tcp.rs | 15 ++++++++++++--- network/src/api.rs | 5 +++++ network/src/participant.rs | 20 ++++++++++++++++---- server/src/connection_handler.rs | 12 ++++++------ 7 files changed, 70 insertions(+), 15 deletions(-) diff --git a/network/protocol/src/event.rs b/network/protocol/src/event.rs index f0f333ed8a..70809678a0 100644 --- a/network/protocol/src/event.rs +++ b/network/protocol/src/event.rs @@ -17,6 +17,7 @@ pub enum ProtocolEvent { prio: Prio, promises: Promises, guaranteed_bandwidth: Bandwidth, + lz_dictionary: Vec, }, CloseStream { sid: Sid, @@ -36,11 +37,13 @@ impl ProtocolEvent { prio, promises, guaranteed_bandwidth, + lz_dictionary, } => OTFrame::OpenStream { sid: *sid, prio: *prio, promises: *promises, guaranteed_bandwidth: *guaranteed_bandwidth, + lz_dictionary: lz_dictionary.clone(), }, ProtocolEvent::CloseStream { sid } => OTFrame::CloseStream { sid: *sid }, ProtocolEvent::Message { .. } => { diff --git a/network/protocol/src/frame.rs b/network/protocol/src/frame.rs index 20a0439953..54923cd94b 100644 --- a/network/protocol/src/frame.rs +++ b/network/protocol/src/frame.rs @@ -39,6 +39,7 @@ pub enum OTFrame { prio: Prio, promises: Promises, guaranteed_bandwidth: Bandwidth, + lz_dictionary: Vec, }, CloseStream { sid: Sid, @@ -64,6 +65,7 @@ pub enum ITFrame { prio: Prio, promises: Promises, guaranteed_bandwidth: Bandwidth, + lz_dictionary: Vec, }, CloseStream { sid: Sid, @@ -163,7 +165,7 @@ pub(crate) const TCP_CLOSE_STREAM_CNS: usize = 8; /// const part of the DATA frame, actual size is variable pub(crate) const TCP_DATA_CNS: usize = 10; pub(crate) const TCP_DATA_HEADER_CNS: usize = 24; -pub(crate) const TCP_OPEN_STREAM_CNS: usize = 18; +pub(crate) const TCP_OPEN_STREAM_CNS: usize = 20; // Size WITHOUT the 1rst indicating byte pub(crate) const TCP_SHUTDOWN_CNS: usize = 0; @@ -178,12 +180,17 @@ impl OTFrame { prio, promises, guaranteed_bandwidth, + lz_dictionary, } => { bytes.put_u8(FRAME_OPEN_STREAM); sid.to_bytes(bytes); bytes.put_u8(prio); bytes.put_u8(promises.to_le_bytes()[0]); bytes.put_u64_le(guaranteed_bandwidth); + let lz_wire_length = lz_dictionary.len().min(0xffff); + bytes.put_u16_le(lz_wire_length as u16); + bytes.reserve(lz_wire_length); + bytes.extend_from_slice(&lz_dictionary[0..lz_wire_length]); }, Self::CloseStream { sid } => { bytes.put_u8(FRAME_CLOSE_STREAM); @@ -213,7 +220,12 @@ impl ITFrame { }; let size = match frame_no { FRAME_SHUTDOWN => TCP_SHUTDOWN_CNS, - FRAME_OPEN_STREAM => TCP_OPEN_STREAM_CNS, + FRAME_OPEN_STREAM => { + if bytes.len() < TCP_OPEN_STREAM_CNS { + return None; + } + u16::from_le_bytes([bytes[18], bytes[19]]) as usize + TCP_OPEN_STREAM_CNS + }, FRAME_CLOSE_STREAM => TCP_CLOSE_STREAM_CNS, FRAME_DATA_HEADER => TCP_DATA_HEADER_CNS, FRAME_DATA => { @@ -242,6 +254,12 @@ impl ITFrame { prio: bytes.get_u8(), promises: Promises::from_bits_truncate(bytes.get_u8()), guaranteed_bandwidth: bytes.get_u64_le(), + lz_dictionary: { + let lz_wire_length = bytes.get_u16_le(); + let mut lz_dictionary = Vec::with_capacity(lz_wire_length as usize); + bytes.copy_to_slice(&mut lz_dictionary[..]); + lz_dictionary + }, } }, FRAME_CLOSE_STREAM => { @@ -284,11 +302,13 @@ impl PartialEq for OTFrame { prio, promises, guaranteed_bandwidth, + lz_dictionary, } => matches!(other, ITFrame::OpenStream { sid, prio, promises, guaranteed_bandwidth, + lz_dictionary, }), Self::CloseStream { sid } => matches!(other, ITFrame::CloseStream { sid }), Self::DataHeader { mid, sid, length } => { @@ -325,6 +345,7 @@ mod tests { prio: 14, promises: Promises::GUARANTEED_DELIVERY, guaranteed_bandwidth: 1_000_000, + lz_dictionary: Vec::new(), }, OTFrame::DataHeader { sid: Sid::new(1337), @@ -499,6 +520,7 @@ mod tests { promises: Promises::ENCRYPTED, prio: 88, guaranteed_bandwidth: 1_000_000, + lz_dictionary: Vec::new(), }; OTFrame::write_bytes(frame1, &mut buffer); } @@ -512,6 +534,7 @@ mod tests { promises: Promises::ENCRYPTED, prio: 88, guaranteed_bandwidth: 1_000_000, + lz_dictionary: Vec::new(), }; OTFrame::write_bytes(frame1, &mut buffer); buffer.truncate(6); // simulate partial retrieve diff --git a/network/protocol/src/prio.rs b/network/protocol/src/prio.rs index 374a1ac216..b087dfcb12 100644 --- a/network/protocol/src/prio.rs +++ b/network/protocol/src/prio.rs @@ -15,6 +15,7 @@ struct StreamInfo { pub(crate) guaranteed_bandwidth: Bandwidth, pub(crate) prio: Prio, pub(crate) promises: Promises, + pub(crate) lz_dictionary: Vec, pub(crate) messages: VecDeque, } @@ -44,11 +45,13 @@ impl PrioManager { prio: Prio, promises: Promises, guaranteed_bandwidth: Bandwidth, + lz_dictionary: Vec, ) { self.streams.insert(sid, StreamInfo { guaranteed_bandwidth, prio, promises, + lz_dictionary, messages: VecDeque::new(), }); } diff --git a/network/protocol/src/tcp.rs b/network/protocol/src/tcp.rs index c944674ad6..f01780f7b4 100644 --- a/network/protocol/src/tcp.rs +++ b/network/protocol/src/tcp.rs @@ -99,9 +99,10 @@ where prio, promises, guaranteed_bandwidth, + lz_dictionary, } => { self.store - .open_stream(sid, prio, promises, guaranteed_bandwidth); + .open_stream(sid, prio, promises, guaranteed_bandwidth, lz_dictionary); }, ProtocolEvent::CloseStream { sid } => { if !self.store.try_close_stream(sid) { @@ -123,9 +124,15 @@ where prio, promises, guaranteed_bandwidth, + ref lz_dictionary, } => { - self.store - .open_stream(sid, prio, promises, guaranteed_bandwidth); + self.store.open_stream( + sid, + prio, + promises, + guaranteed_bandwidth, + lz_dictionary.clone(), + ); event.to_frame().write_bytes(&mut self.buffer); self.drain.send(self.buffer.split()).await?; }, @@ -228,12 +235,14 @@ where prio, promises, guaranteed_bandwidth, + lz_dictionary, } => { break 'outer Ok(ProtocolEvent::OpenStream { sid, prio: prio.min(crate::types::HIGHEST_PRIO), promises, guaranteed_bandwidth, + lz_dictionary, }); }, ITFrame::CloseStream { sid } => { diff --git a/network/src/api.rs b/network/src/api.rs index f8ab01602c..0446a7b709 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -72,6 +72,7 @@ pub struct Stream { prio: Prio, promises: Promises, guaranteed_bandwidth: Bandwidth, + lz_dictionary: Vec, send_closed: Arc, a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>, b2a_msg_recv_r: Option>, @@ -551,6 +552,7 @@ impl Participant { prio: u8, promises: Promises, bandwidth: Bandwidth, + lz_dictionary: Vec, ) -> Result { debug_assert!(prio <= network_protocol::HIGHEST_PRIO, "invalid prio"); let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel::(); @@ -559,6 +561,7 @@ impl Participant { promises, bandwidth, p2a_return_stream_s, + lz_dictionary, )) { debug!(?e, "bParticipant is already closed, notifying"); return Err(ParticipantError::ParticipantDisconnected); @@ -728,6 +731,7 @@ impl Stream { prio: Prio, promises: Promises, guaranteed_bandwidth: Bandwidth, + lz_dictionary: Vec, send_closed: Arc, a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>, b2a_msg_recv_r: async_channel::Receiver, @@ -740,6 +744,7 @@ impl Stream { prio, promises, guaranteed_bandwidth, + lz_dictionary, send_closed, a2b_msg_s, b2a_msg_recv_r: Some(b2a_msg_recv_r), diff --git a/network/src/participant.rs b/network/src/participant.rs index c0276a4ad2..b91c7ff382 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -25,7 +25,7 @@ use tokio::{ use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; -pub(crate) type A2bStreamOpen = (Prio, Promises, Bandwidth, oneshot::Sender); +pub(crate) type A2bStreamOpen = (Prio, Promises, Bandwidth, oneshot::Sender, Vec); pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, oneshot::Sender<()>); pub(crate) type S2bShutdownBparticipant = (Duration, oneshot::Sender>); pub(crate) type B2sPrioStatistic = (Pid, u64, u64); @@ -225,12 +225,19 @@ impl BParticipant { }; let active_err = async { - if let Some((prio, promises, guaranteed_bandwidth, return_s)) = open { + if let Some((prio, promises, guaranteed_bandwidth, return_s, lz_dictionary)) = open + { let sid = stream_ids; trace!(?sid, "open stream"); stream_ids += Sid::from(1); let stream = self - .create_stream(sid, prio, promises, guaranteed_bandwidth) + .create_stream( + sid, + prio, + promises, + guaranteed_bandwidth, + lz_dictionary.clone(), + ) .await; let event = ProtocolEvent::OpenStream { @@ -238,6 +245,7 @@ impl BParticipant { prio, promises, guaranteed_bandwidth, + lz_dictionary, }; return_s.send(stream).unwrap(); @@ -388,13 +396,15 @@ impl BParticipant { prio, promises, guaranteed_bandwidth, + ref lz_dictionary, }) => { trace!(?sid, "open stream"); + let lz_dictionary = lz_dictionary.clone(); let _ = b2b_notify_send_of_recv_s.send(r.unwrap()); // waiting for receiving is not necessary, because the send_mgr will first // process this before process messages! let stream = self - .create_stream(sid, prio, promises, guaranteed_bandwidth) + .create_stream(sid, prio, promises, guaranteed_bandwidth, lz_dictionary) .await; b2a_stream_opened_s.send(stream).unwrap(); retrigger(cid, p, &mut recv_protocols); @@ -616,6 +626,7 @@ impl BParticipant { prio: Prio, promises: Promises, guaranteed_bandwidth: Bandwidth, + lz_dictionary: Vec, ) -> Stream { let (b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded::(); let send_closed = Arc::new(AtomicBool::new(false)); @@ -651,6 +662,7 @@ impl BParticipant { prio, promises, guaranteed_bandwidth, + lz_dictionary, send_closed, a2b_msg_s, b2a_msg_recv_r, diff --git a/server/src/connection_handler.rs b/server/src/connection_handler.rs index 44b975ec04..f7758e94c2 100644 --- a/server/src/connection_handler.rs +++ b/server/src/connection_handler.rs @@ -100,12 +100,12 @@ impl ConnectionHandler { let reliable = Promises::ORDERED | Promises::CONSISTENCY; let reliablec = reliable | Promises::COMPRESSED; - let general_stream = participant.open(3, reliablec, 500).await?; - let ping_stream = participant.open(2, reliable, 500).await?; - let mut register_stream = participant.open(3, reliablec, 500).await?; - let character_screen_stream = participant.open(3, reliablec, 500).await?; - let in_game_stream = participant.open(3, reliablec, 100_000).await?; - let terrain_stream = participant.open(4, reliablec, 20_000).await?; + let general_stream = participant.open(3, reliablec, 500, Vec::new()).await?; + let ping_stream = participant.open(2, reliable, 500, Vec::new()).await?; + let mut register_stream = participant.open(3, reliablec, 500, Vec::new()).await?; + let character_screen_stream = participant.open(3, reliablec, 500, Vec::new()).await?; + let in_game_stream = participant.open(3, reliablec, 100_000, Vec::new()).await?; + let terrain_stream = participant.open(4, reliablec, 20_000, Vec::new()).await?; let server_data = receiver.recv()?;