Add LZ Dictionaries to the initial Participant stream negotiation.

This commit is contained in:
Avi Weinstock 2021-03-25 19:51:50 -04:00
parent 64e4aabd9f
commit fb0838e2c2
7 changed files with 70 additions and 15 deletions

View File

@ -17,6 +17,7 @@ pub enum ProtocolEvent {
prio: Prio,
promises: Promises,
guaranteed_bandwidth: Bandwidth,
lz_dictionary: Vec<u8>,
},
CloseStream {
sid: Sid,
@ -36,11 +37,13 @@ impl ProtocolEvent {
prio,
promises,
guaranteed_bandwidth,
lz_dictionary,
} => OTFrame::OpenStream {
sid: *sid,
prio: *prio,
promises: *promises,
guaranteed_bandwidth: *guaranteed_bandwidth,
lz_dictionary: lz_dictionary.clone(),
},
ProtocolEvent::CloseStream { sid } => OTFrame::CloseStream { sid: *sid },
ProtocolEvent::Message { .. } => {

View File

@ -39,6 +39,7 @@ pub enum OTFrame {
prio: Prio,
promises: Promises,
guaranteed_bandwidth: Bandwidth,
lz_dictionary: Vec<u8>,
},
CloseStream {
sid: Sid,
@ -64,6 +65,7 @@ pub enum ITFrame {
prio: Prio,
promises: Promises,
guaranteed_bandwidth: Bandwidth,
lz_dictionary: Vec<u8>,
},
CloseStream {
sid: Sid,
@ -163,7 +165,7 @@ pub(crate) const TCP_CLOSE_STREAM_CNS: usize = 8;
/// const part of the DATA frame, actual size is variable
pub(crate) const TCP_DATA_CNS: usize = 10;
pub(crate) const TCP_DATA_HEADER_CNS: usize = 24;
pub(crate) const TCP_OPEN_STREAM_CNS: usize = 18;
pub(crate) const TCP_OPEN_STREAM_CNS: usize = 20;
// Size WITHOUT the 1rst indicating byte
pub(crate) const TCP_SHUTDOWN_CNS: usize = 0;
@ -178,12 +180,17 @@ impl OTFrame {
prio,
promises,
guaranteed_bandwidth,
lz_dictionary,
} => {
bytes.put_u8(FRAME_OPEN_STREAM);
sid.to_bytes(bytes);
bytes.put_u8(prio);
bytes.put_u8(promises.to_le_bytes()[0]);
bytes.put_u64_le(guaranteed_bandwidth);
let lz_wire_length = lz_dictionary.len().min(0xffff);
bytes.put_u16_le(lz_wire_length as u16);
bytes.reserve(lz_wire_length);
bytes.extend_from_slice(&lz_dictionary[0..lz_wire_length]);
},
Self::CloseStream { sid } => {
bytes.put_u8(FRAME_CLOSE_STREAM);
@ -213,7 +220,12 @@ impl ITFrame {
};
let size = match frame_no {
FRAME_SHUTDOWN => TCP_SHUTDOWN_CNS,
FRAME_OPEN_STREAM => TCP_OPEN_STREAM_CNS,
FRAME_OPEN_STREAM => {
if bytes.len() < TCP_OPEN_STREAM_CNS {
return None;
}
u16::from_le_bytes([bytes[18], bytes[19]]) as usize + TCP_OPEN_STREAM_CNS
},
FRAME_CLOSE_STREAM => TCP_CLOSE_STREAM_CNS,
FRAME_DATA_HEADER => TCP_DATA_HEADER_CNS,
FRAME_DATA => {
@ -242,6 +254,12 @@ impl ITFrame {
prio: bytes.get_u8(),
promises: Promises::from_bits_truncate(bytes.get_u8()),
guaranteed_bandwidth: bytes.get_u64_le(),
lz_dictionary: {
let lz_wire_length = bytes.get_u16_le();
let mut lz_dictionary = Vec::with_capacity(lz_wire_length as usize);
bytes.copy_to_slice(&mut lz_dictionary[..]);
lz_dictionary
},
}
},
FRAME_CLOSE_STREAM => {
@ -284,11 +302,13 @@ impl PartialEq<ITFrame> for OTFrame {
prio,
promises,
guaranteed_bandwidth,
lz_dictionary,
} => matches!(other, ITFrame::OpenStream {
sid,
prio,
promises,
guaranteed_bandwidth,
lz_dictionary,
}),
Self::CloseStream { sid } => matches!(other, ITFrame::CloseStream { sid }),
Self::DataHeader { mid, sid, length } => {
@ -325,6 +345,7 @@ mod tests {
prio: 14,
promises: Promises::GUARANTEED_DELIVERY,
guaranteed_bandwidth: 1_000_000,
lz_dictionary: Vec::new(),
},
OTFrame::DataHeader {
sid: Sid::new(1337),
@ -499,6 +520,7 @@ mod tests {
promises: Promises::ENCRYPTED,
prio: 88,
guaranteed_bandwidth: 1_000_000,
lz_dictionary: Vec::new(),
};
OTFrame::write_bytes(frame1, &mut buffer);
}
@ -512,6 +534,7 @@ mod tests {
promises: Promises::ENCRYPTED,
prio: 88,
guaranteed_bandwidth: 1_000_000,
lz_dictionary: Vec::new(),
};
OTFrame::write_bytes(frame1, &mut buffer);
buffer.truncate(6); // simulate partial retrieve

View File

@ -15,6 +15,7 @@ struct StreamInfo {
pub(crate) guaranteed_bandwidth: Bandwidth,
pub(crate) prio: Prio,
pub(crate) promises: Promises,
pub(crate) lz_dictionary: Vec<u8>,
pub(crate) messages: VecDeque<OTMessage>,
}
@ -44,11 +45,13 @@ impl PrioManager {
prio: Prio,
promises: Promises,
guaranteed_bandwidth: Bandwidth,
lz_dictionary: Vec<u8>,
) {
self.streams.insert(sid, StreamInfo {
guaranteed_bandwidth,
prio,
promises,
lz_dictionary,
messages: VecDeque::new(),
});
}

View File

@ -99,9 +99,10 @@ where
prio,
promises,
guaranteed_bandwidth,
lz_dictionary,
} => {
self.store
.open_stream(sid, prio, promises, guaranteed_bandwidth);
.open_stream(sid, prio, promises, guaranteed_bandwidth, lz_dictionary);
},
ProtocolEvent::CloseStream { sid } => {
if !self.store.try_close_stream(sid) {
@ -123,9 +124,15 @@ where
prio,
promises,
guaranteed_bandwidth,
ref lz_dictionary,
} => {
self.store
.open_stream(sid, prio, promises, guaranteed_bandwidth);
self.store.open_stream(
sid,
prio,
promises,
guaranteed_bandwidth,
lz_dictionary.clone(),
);
event.to_frame().write_bytes(&mut self.buffer);
self.drain.send(self.buffer.split()).await?;
},
@ -228,12 +235,14 @@ where
prio,
promises,
guaranteed_bandwidth,
lz_dictionary,
} => {
break 'outer Ok(ProtocolEvent::OpenStream {
sid,
prio: prio.min(crate::types::HIGHEST_PRIO),
promises,
guaranteed_bandwidth,
lz_dictionary,
});
},
ITFrame::CloseStream { sid } => {

View File

@ -72,6 +72,7 @@ pub struct Stream {
prio: Prio,
promises: Promises,
guaranteed_bandwidth: Bandwidth,
lz_dictionary: Vec<u8>,
send_closed: Arc<AtomicBool>,
a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
b2a_msg_recv_r: Option<async_channel::Receiver<Bytes>>,
@ -551,6 +552,7 @@ impl Participant {
prio: u8,
promises: Promises,
bandwidth: Bandwidth,
lz_dictionary: Vec<u8>,
) -> Result<Stream, ParticipantError> {
debug_assert!(prio <= network_protocol::HIGHEST_PRIO, "invalid prio");
let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel::<Stream>();
@ -559,6 +561,7 @@ impl Participant {
promises,
bandwidth,
p2a_return_stream_s,
lz_dictionary,
)) {
debug!(?e, "bParticipant is already closed, notifying");
return Err(ParticipantError::ParticipantDisconnected);
@ -728,6 +731,7 @@ impl Stream {
prio: Prio,
promises: Promises,
guaranteed_bandwidth: Bandwidth,
lz_dictionary: Vec<u8>,
send_closed: Arc<AtomicBool>,
a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
b2a_msg_recv_r: async_channel::Receiver<Bytes>,
@ -740,6 +744,7 @@ impl Stream {
prio,
promises,
guaranteed_bandwidth,
lz_dictionary,
send_closed,
a2b_msg_s,
b2a_msg_recv_r: Some(b2a_msg_recv_r),

View File

@ -25,7 +25,7 @@ use tokio::{
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
pub(crate) type A2bStreamOpen = (Prio, Promises, Bandwidth, oneshot::Sender<Stream>);
pub(crate) type A2bStreamOpen = (Prio, Promises, Bandwidth, oneshot::Sender<Stream>, Vec<u8>);
pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, oneshot::Sender<()>);
pub(crate) type S2bShutdownBparticipant = (Duration, oneshot::Sender<Result<(), ParticipantError>>);
pub(crate) type B2sPrioStatistic = (Pid, u64, u64);
@ -225,12 +225,19 @@ impl BParticipant {
};
let active_err = async {
if let Some((prio, promises, guaranteed_bandwidth, return_s)) = open {
if let Some((prio, promises, guaranteed_bandwidth, return_s, lz_dictionary)) = open
{
let sid = stream_ids;
trace!(?sid, "open stream");
stream_ids += Sid::from(1);
let stream = self
.create_stream(sid, prio, promises, guaranteed_bandwidth)
.create_stream(
sid,
prio,
promises,
guaranteed_bandwidth,
lz_dictionary.clone(),
)
.await;
let event = ProtocolEvent::OpenStream {
@ -238,6 +245,7 @@ impl BParticipant {
prio,
promises,
guaranteed_bandwidth,
lz_dictionary,
};
return_s.send(stream).unwrap();
@ -388,13 +396,15 @@ impl BParticipant {
prio,
promises,
guaranteed_bandwidth,
ref lz_dictionary,
}) => {
trace!(?sid, "open stream");
let lz_dictionary = lz_dictionary.clone();
let _ = b2b_notify_send_of_recv_s.send(r.unwrap());
// waiting for receiving is not necessary, because the send_mgr will first
// process this before process messages!
let stream = self
.create_stream(sid, prio, promises, guaranteed_bandwidth)
.create_stream(sid, prio, promises, guaranteed_bandwidth, lz_dictionary)
.await;
b2a_stream_opened_s.send(stream).unwrap();
retrigger(cid, p, &mut recv_protocols);
@ -616,6 +626,7 @@ impl BParticipant {
prio: Prio,
promises: Promises,
guaranteed_bandwidth: Bandwidth,
lz_dictionary: Vec<u8>,
) -> Stream {
let (b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded::<Bytes>();
let send_closed = Arc::new(AtomicBool::new(false));
@ -651,6 +662,7 @@ impl BParticipant {
prio,
promises,
guaranteed_bandwidth,
lz_dictionary,
send_closed,
a2b_msg_s,
b2a_msg_recv_r,

View File

@ -100,12 +100,12 @@ impl ConnectionHandler {
let reliable = Promises::ORDERED | Promises::CONSISTENCY;
let reliablec = reliable | Promises::COMPRESSED;
let general_stream = participant.open(3, reliablec, 500).await?;
let ping_stream = participant.open(2, reliable, 500).await?;
let mut register_stream = participant.open(3, reliablec, 500).await?;
let character_screen_stream = participant.open(3, reliablec, 500).await?;
let in_game_stream = participant.open(3, reliablec, 100_000).await?;
let terrain_stream = participant.open(4, reliablec, 20_000).await?;
let general_stream = participant.open(3, reliablec, 500, Vec::new()).await?;
let ping_stream = participant.open(2, reliable, 500, Vec::new()).await?;
let mut register_stream = participant.open(3, reliablec, 500, Vec::new()).await?;
let character_screen_stream = participant.open(3, reliablec, 500, Vec::new()).await?;
let in_game_stream = participant.open(3, reliablec, 100_000, Vec::new()).await?;
let terrain_stream = participant.open(4, reliablec, 20_000, Vec::new()).await?;
let server_data = receiver.recv()?;