From 144f88f811cb580731615fd7228f8534f71e59b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Tue, 25 Aug 2020 15:32:42 +0200 Subject: [PATCH] Propper Compression support of network. - Compression is no longer enabled always but can now be enabled per Stream. If a Stream is Compression enabled it will compress and decompress all msg (except for `raw` access) before handling them internally. You need to handle compression yourself for `raw` fn. - added a new feature to the network crate to enable or disable the compression - switched to `lz-fear` instead of `lz4-compression` - use `bitflags` to represent the `Promises` struct --- Cargo.lock | 99 ++++++++++++----------- client/Cargo.toml | 2 +- client/src/lib.rs | 9 ++- network/Cargo.toml | 10 ++- network/src/api.rs | 86 +++++++++++++------- network/src/lib.rs | 9 +-- network/src/message.rs | 148 ++++++++++++++++++++++++++++++----- network/src/types.rs | 61 ++++++++------- network/tests/closing.rs | 16 ++-- network/tests/helper.rs | 4 +- network/tests/integration.rs | 8 +- server/Cargo.toml | 2 +- 12 files changed, 301 insertions(+), 153 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 259543ab2c..26a8ddaba5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -292,7 +292,7 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5753e2a71534719bf3f4e57006c3a4f0d2c672a4b676eec84161f763eca87dbf" dependencies = [ - "byteorder 1.3.4", + "byteorder", "serde", ] @@ -387,12 +387,6 @@ version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db7a1029718df60331e557c9e83a55523c955e5dd2a7bfeffad6bbd50b538ae9" -[[package]] -name = "byteorder" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855" - [[package]] name = "byteorder" version = "1.3.4" @@ -405,7 +399,7 @@ version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" dependencies = [ - "byteorder 1.3.4", + "byteorder", "either", "iovec", ] @@ -1056,7 +1050,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73770f8e1fe7d64df17ca66ad28994a0a623ea497fa69486e14984e715c5d174" dependencies = [ "adler32", - "byteorder 1.3.4", + "byteorder", ] [[package]] @@ -1082,7 +1076,7 @@ version = "1.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2de9deab977a153492a1468d1b1c0662c1cf39e5ea87d0c060ecd59ef18d8c" dependencies = [ - "byteorder 1.3.4", + "byteorder", "diesel_derives", "libsqlite3-sys", ] @@ -1162,7 +1156,7 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83c18405ef54de0398b77a3ec8394d3a1639e7bf060e1385201e8db40c44ab41" dependencies = [ - "byteorder 1.3.4", + "byteorder", "lazy_static", "log", "nom 4.2.3", @@ -1264,6 +1258,26 @@ dependencies = [ "synstructure", ] +[[package]] +name = "fehler" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5729fe49ba028cd550747b6e62cd3d841beccab5390aa398538c31a2d983635" +dependencies = [ + "fehler-macros", +] + +[[package]] +name = "fehler-macros" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccb5acb1045ebbfa222e2c50679e392a71dd77030b78fb0189f2d9c5974400f9" +dependencies = [ + "proc-macro2 1.0.18", + "quote 1.0.7", + "syn 1.0.33", +] + [[package]] name = "filetime" version = "0.2.10" @@ -1476,7 +1490,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" dependencies = [ - "byteorder 1.3.4", + "byteorder", ] [[package]] @@ -1886,7 +1900,7 @@ version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5b34c246847f938a410a03c5458c7fee2274436675e76d8b903c08efc29c462" dependencies = [ - "byteorder 1.3.4", + "byteorder", "bytes", "fnv", "futures 0.1.29", @@ -2075,7 +2089,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "543904170510c1b5fb65140485d84de4a57fddb2ed685481e9020ce3d2c9f64c" dependencies = [ "bytemuck", - "byteorder 1.3.4", + "byteorder", "num-iter", "num-rational 0.3.0", "num-traits", @@ -2218,7 +2232,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be42bea7971f4ba0ea1e215730c29bc1ff9bd2a9c10013912f42a8dcf8d77c0d" dependencies = [ - "byteorder 1.3.4", + "byteorder", "ogg", "tinyvec", ] @@ -2343,13 +2357,16 @@ dependencies = [ ] [[package]] -name = "lz4-compress" +name = "lz-fear" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f966533a922a9bba9e95e594c1fdb3b9bf5fdcdb11e37e51ad84cd76e468b91" +checksum = "06aad1ce45e4ccf7a8d7d43e0c3ad38dc5d2255174a5f29a3c39d961fbc6181d" dependencies = [ - "byteorder 0.5.3", - "quick-error", + "bitflags", + "byteorder", + "fehler", + "thiserror", + "twox-hash", ] [[package]] @@ -2833,7 +2850,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d79f1db9148be9d0e174bb3ac890f6030fcb1ed947267c5a91ee4c91b5a91e15" dependencies = [ - "byteorder 1.3.4", + "byteorder", ] [[package]] @@ -3233,12 +3250,6 @@ dependencies = [ "percent-encoding 2.1.0", ] -[[package]] -name = "quick-error" -version = "1.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" - [[package]] name = "quote" version = "0.6.13" @@ -3510,7 +3521,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" dependencies = [ - "byteorder 1.3.4", + "byteorder", "regex-syntax", ] @@ -3948,7 +3959,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f77b6b07e862c66a9f3e62a07588fee67cd90a9135a2b942409f195507b4fb51" dependencies = [ - "byteorder 1.3.4", + "byteorder", ] [[package]] @@ -4473,6 +4484,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44834418e2c5b16f47bedf35c28e148db099187dd5feee6367fb2525863af4f1" +[[package]] +name = "twox-hash" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bfd5b7557925ce778ff9b9ef90e3ade34c524b5ff10e239c69a42d546d2af56" + [[package]] name = "tynm" version = "0.1.4" @@ -4589,17 +4606,6 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "uvth" -version = "4.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e5910f9106b96334c6cae1f1d77a764bda66ac4ca9f507f73259f184fe1bb6b" -dependencies = [ - "crossbeam-channel 0.3.9", - "log", - "num_cpus", -] - [[package]] name = "vcpkg" version = "0.2.10" @@ -4654,7 +4660,7 @@ name = "veloren-client" version = "0.7.0" dependencies = [ "authc", - "byteorder 1.3.4", + "byteorder", "futures-executor", "futures-timer", "futures-util", @@ -4665,7 +4671,7 @@ dependencies = [ "rayon", "specs", "tracing", - "uvth 3.1.1", + "uvth", "vek 0.12.0", "veloren-common", "veloren_network", @@ -4730,7 +4736,7 @@ dependencies = [ "specs-idvs", "tiny_http", "tracing", - "uvth 3.1.1", + "uvth", "vek 0.12.0", "veloren-common", "veloren-world", @@ -4792,7 +4798,7 @@ dependencies = [ "tracing-log", "tracing-subscriber", "treeculler", - "uvth 3.1.1", + "uvth", "vek 0.12.0", "veloren-client", "veloren-common", @@ -4847,21 +4853,22 @@ dependencies = [ [[package]] name = "veloren_network" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-std", "bincode", + "bitflags", "crossbeam-channel 0.4.2", "futures 0.3.5", "lazy_static", - "lz4-compress", + "lz-fear", "prometheus", "rand 0.7.3", "serde", "tracing", "tracing-futures", "tracing-subscriber", - "uvth 4.0.1", + "uvth", ] [[package]] diff --git a/client/Cargo.toml b/client/Cargo.toml index 70733d28dd..0c0ebc0a42 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] common = { package = "veloren-common", path = "../common", features = ["no-assets"] } -network = { package = "veloren_network", path = "../network", default-features = false } +network = { package = "veloren_network", path = "../network", features = ["compression"], default-features = false } byteorder = "1.3.2" uvth = "3.1.1" diff --git a/client/src/lib.rs b/client/src/lib.rs index 4b5bea63e0..53a7b5d22a 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -37,9 +37,7 @@ use futures_timer::Delay; use futures_util::{select, FutureExt}; use hashbrown::{HashMap, HashSet}; use image::DynamicImage; -use network::{ - Network, Participant, Pid, ProtocolAddr, Stream, PROMISES_CONSISTENCY, PROMISES_ORDERED, -}; +use network::{Network, Participant, Pid, Promises, ProtocolAddr, Stream}; use num::traits::FloatConst; use rayon::prelude::*; use std::{ @@ -157,7 +155,10 @@ impl Client { thread_pool.execute(scheduler); let participant = block_on(network.connect(ProtocolAddr::Tcp(addr.into())))?; - let mut stream = block_on(participant.open(10, PROMISES_ORDERED | PROMISES_CONSISTENCY))?; + let mut stream = block_on(participant.open( + 10, + Promises::ORDERED | Promises::CONSISTENCY | Promises::COMPRESSED, + ))?; // Wait for initial sync let ( diff --git a/network/Cargo.toml b/network/Cargo.toml index d6c271ee48..c28661dd07 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -1,20 +1,19 @@ [package] name = "veloren_network" -version = "0.1.0" +version = "0.2.0" authors = ["Marcel Märtens "] edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [features] metrics = ["prometheus"] +compression = ["lz-fear"] -default = ["metrics"] +default = ["metrics","compression"] [dependencies] -lz4-compress = "0.1.1" #serialisation bincode = "1.2" serde = { version = "1.0" } @@ -31,6 +30,9 @@ futures = { version = "0.3", features = ["thread-pool"] } #mpsc channel registry lazy_static = { version = "1.4", default-features = false } rand = { version = "0.7" } +#stream flags +bitflags = "1.2.1" +lz-fear = { version = "0.1.1", optional = true } [dev-dependencies] tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] } diff --git a/network/src/api.rs b/network/src/api.rs index 33cfac8015..c77003a9a6 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -14,6 +14,8 @@ use futures::{ sink::SinkExt, stream::StreamExt, }; +#[cfg(feature = "compression")] +use lz_fear::raw::DecodeError; #[cfg(feature = "metrics")] use prometheus::Registry; use serde::{de::DeserializeOwned, Serialize}; @@ -100,10 +102,15 @@ pub enum ParticipantError { } /// Error type thrown by [`Streams`](Stream) methods +/// A Compression Error should only happen if a client sends malicious code. +/// A Deserialize Error probably means you are expecting Type X while you +/// actually got send type Y. #[derive(Debug)] pub enum StreamError { StreamClosed, - DeserializeError(Box), + #[cfg(feature = "compression")] + Compression(DecodeError), + Deserialize(bincode::Error), } /// Use the `Network` to create connections to other [`Participants`] @@ -442,7 +449,7 @@ impl Participant { /// etc... /// * `promises` - use a combination of you prefered [`Promises`], see the /// link for further documentation. You can combine them, e.g. - /// `PROMISES_ORDERED | PROMISES_CONSISTENCY` The Stream will then + /// `Promises::ORDERED | Promises::CONSISTENCY` The Stream will then /// guarantee that those promises are met. /// /// A [`ParticipantError`] might be thrown if the `Participant` is already @@ -452,7 +459,7 @@ impl Participant { /// # Examples /// ```rust /// use futures::executor::block_on; - /// use veloren_network::{Network, Pid, ProtocolAddr, PROMISES_CONSISTENCY, PROMISES_ORDERED}; + /// use veloren_network::{Network, Pid, Promises, ProtocolAddr}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port 2100 and open a stream @@ -465,7 +472,9 @@ impl Participant { /// let p1 = network /// .connect(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap())) /// .await?; - /// let _s1 = p1.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?; + /// let _s1 = p1 + /// .open(16, Promises::ORDERED | Promises::CONSISTENCY) + /// .await?; /// # Ok(()) /// }) /// # } @@ -506,7 +515,7 @@ impl Participant { /// /// # Examples /// ```rust - /// use veloren_network::{Network, Pid, ProtocolAddr, PROMISES_ORDERED, PROMISES_CONSISTENCY}; + /// use veloren_network::{Network, Pid, ProtocolAddr, Promises}; /// use futures::executor::block_on; /// /// # fn main() -> std::result::Result<(), Box> { @@ -520,7 +529,7 @@ impl Participant { /// # remote.listen(ProtocolAddr::Tcp("0.0.0.0:2110".parse().unwrap())).await?; /// let p1 = network.connect(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?; /// # let p2 = remote.connected().await?; - /// # p2.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?; + /// # p2.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?; /// let _s1 = p1.opened().await?; /// # Ok(()) /// }) @@ -690,7 +699,7 @@ impl Stream { /// # Example /// ``` /// use veloren_network::{Network, ProtocolAddr, Pid}; - /// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY}; + /// # use veloren_network::Promises; /// use futures::executor::block_on; /// /// # fn main() -> std::result::Result<(), Box> { @@ -703,7 +712,7 @@ impl Stream { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?; /// # // keep it alive - /// # let _stream_p = remote_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?; + /// # let _stream_p = remote_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?; /// let participant_a = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; /// //Send Message @@ -718,18 +727,22 @@ impl Stream { /// [`Serialized`]: Serialize #[inline] pub fn send(&mut self, msg: M) -> Result<(), StreamError> { - self.send_raw(Arc::new(message::serialize(&msg))) + self.send_raw(Arc::new(message::serialize( + &msg, + #[cfg(feature = "compression")] + self.promises.contains(Promises::COMPRESSED), + ))) } - /// This methods give the option to skip multiple calls of [`bincode`], e.g. - /// in case the same Message needs to send on multiple `Streams` to multiple - /// [`Participants`]. Other then that, the same rules apply than for - /// [`send`] + /// This methods give the option to skip multiple calls of [`bincode`] and + /// [`compress`], e.g. in case the same Message needs to send on + /// multiple `Streams` to multiple [`Participants`]. Other then that, + /// the same rules apply than for [`send`] /// /// # Example /// ```rust /// use veloren_network::{Network, ProtocolAddr, Pid, MessageBuffer}; - /// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY}; + /// # use veloren_network::Promises; /// use futures::executor::block_on; /// use bincode; /// use std::sync::Arc; @@ -746,8 +759,8 @@ impl Stream { /// # let remote1_p = remote1.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; /// # let remote2_p = remote2.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; /// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid()); - /// # remote1_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?; - /// # remote2_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?; + /// # remote1_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?; + /// # remote2_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?; /// let participant_a = network.connected().await?; /// let participant_b = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; @@ -768,6 +781,7 @@ impl Stream { /// /// [`send`]: Stream::send /// [`Participants`]: crate::api::Participant + /// [`compress`]: lz_fear::raw::compress2 pub fn send_raw(&mut self, messagebuffer: Arc) -> Result<(), StreamError> { if self.send_closed.load(Ordering::Relaxed) { return Err(StreamError::StreamClosed); @@ -794,7 +808,7 @@ impl Stream { /// # Example /// ``` /// use veloren_network::{Network, ProtocolAddr, Pid}; - /// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY}; + /// # use veloren_network::Promises; /// use futures::executor::block_on; /// /// # fn main() -> std::result::Result<(), Box> { @@ -806,7 +820,7 @@ impl Stream { /// block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?; - /// # let mut stream_p = remote_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?; + /// # let mut stream_p = remote_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?; /// # stream_p.send("Hello World"); /// let participant_a = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; @@ -818,14 +832,19 @@ impl Stream { /// ``` #[inline] pub async fn recv(&mut self) -> Result { - Ok(message::deserialize(self.recv_raw().await?)?) + message::deserialize( + self.recv_raw().await?, + #[cfg(feature = "compression")] + self.promises.contains(Promises::COMPRESSED), + ) } - /// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] is - /// executed for performance reasons. + /// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] or + /// [`decompress`] is executed for performance reasons. /// /// [`send_raw`]: Stream::send_raw /// [`recv`]: Stream::recv + /// [`decompress`]: lz_fear::raw::decompress_raw pub async fn recv_raw(&mut self) -> Result { let msg = self.b2a_msg_recv_r.next().await?; Ok(msg.buffer) @@ -991,16 +1010,16 @@ impl From for NetworkError { } impl From> for StreamError { - fn from(err: Box) -> Self { StreamError::DeserializeError(err) } + fn from(err: Box) -> Self { StreamError::Deserialize(err) } } impl core::fmt::Display for StreamError { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { StreamError::StreamClosed => write!(f, "stream closed"), - StreamError::DeserializeError(err) => { - write!(f, "deserialize error on message: {}", err) - }, + #[cfg(feature = "compression")] + StreamError::Compression(err) => write!(f, "compression error on message: {}", err), + StreamError::Deserialize(err) => write!(f, "deserialize error on message: {}", err), } } } @@ -1032,11 +1051,22 @@ impl core::cmp::PartialEq for StreamError { match self { StreamError::StreamClosed => match other { StreamError::StreamClosed => true, - StreamError::DeserializeError(_) => false, + #[cfg(feature = "compression")] + StreamError::Compression(_) => false, + StreamError::Deserialize(_) => false, }, - StreamError::DeserializeError(err) => match other { + #[cfg(feature = "compression")] + StreamError::Compression(err) => match other { StreamError::StreamClosed => false, - StreamError::DeserializeError(other_err) => partial_eq_bincode(err, other_err), + #[cfg(feature = "compression")] + StreamError::Compression(other_err) => err == other_err, + StreamError::Deserialize(_) => false, + }, + StreamError::Deserialize(err) => match other { + StreamError::StreamClosed => false, + #[cfg(feature = "compression")] + StreamError::Compression(_) => false, + StreamError::Deserialize(other_err) => partial_eq_bincode(err, other_err), }, } } diff --git a/network/src/lib.rs b/network/src/lib.rs index f63eb9abb4..f6fcc4ecf2 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -40,7 +40,7 @@ //! ```rust //! use async_std::task::sleep; //! use futures::{executor::block_on, join}; -//! use veloren_network::{Network, Pid, ProtocolAddr, PROMISES_CONSISTENCY, PROMISES_ORDERED}; +//! use veloren_network::{Network, Pid, Promises, ProtocolAddr}; //! //! // Client //! async fn client() -> std::result::Result<(), Box> { @@ -51,7 +51,7 @@ //! .connect(ProtocolAddr::Tcp("127.0.0.1:12345".parse().unwrap())) //! .await?; //! let mut stream = server -//! .open(10, PROMISES_ORDERED | PROMISES_CONSISTENCY) +//! .open(10, Promises::ORDERED | Promises::CONSISTENCY) //! .await?; //! stream.send("Hello World")?; //! Ok(()) @@ -113,7 +113,4 @@ pub use api::{ Network, NetworkError, Participant, ParticipantError, ProtocolAddr, Stream, StreamError, }; pub use message::MessageBuffer; -pub use types::{ - Pid, Promises, PROMISES_COMPRESSED, PROMISES_CONSISTENCY, PROMISES_ENCRYPTED, - PROMISES_GUARANTEED_DELIVERY, PROMISES_NONE, PROMISES_ORDERED, -}; +pub use types::{Pid, Promises}; diff --git a/network/src/message.rs b/network/src/message.rs index 9b2255e890..f711168481 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -1,6 +1,9 @@ use serde::{de::DeserializeOwned, Serialize}; //use std::collections::VecDeque; -use crate::types::{Frame, Mid, Sid}; +use crate::{ + api::StreamError, + types::{Frame, Mid, Sid}, +}; use std::{io, sync::Arc}; //Todo: Evaluate switching to VecDeque for quickly adding and removing data @@ -33,23 +36,64 @@ pub(crate) struct IncomingMessage { pub sid: Sid, } -pub(crate) fn serialize(message: &M) -> MessageBuffer { +pub(crate) fn serialize( + message: &M, + #[cfg(feature = "compression")] compress: bool, +) -> MessageBuffer { //this will never fail: https://docs.rs/bincode/0.8.0/bincode/fn.serialize.html - let writer = bincode::serialize(message).unwrap(); + let serialized_data = bincode::serialize(message).unwrap(); + + #[cfg(not(feature = "compression"))] + let compress = false; + MessageBuffer { - data: lz4_compress::compress(&writer), + data: if compress { + #[cfg(feature = "compression")] + { + let mut compressed_data = Vec::with_capacity(serialized_data.len() / 4 + 10); + let mut table = lz_fear::raw::U32Table::default(); + lz_fear::raw::compress2(&serialized_data, 0, &mut table, &mut compressed_data) + .unwrap(); + compressed_data + } + #[cfg(not(feature = "compression"))] + unreachable!("compression isn't enabled as a feature"); + } else { + serialized_data + }, } } -//pub(crate) fn deserialize(buffer: MessageBuffer) -> -// std::Result> { -pub(crate) fn deserialize(buffer: MessageBuffer) -> bincode::Result { - let span = lz4_compress::decompress(&buffer.data) - .expect("lz4 decompression failed, failed to deserialze"); - //this might fail if you choose the wrong type for M. in that case probably X - // got transferred while you assume Y. probably this means your application - // logic is wrong. E.g. You expect a String, but just get a u8. - bincode::deserialize(span.as_slice()) +pub(crate) fn deserialize( + buffer: MessageBuffer, + #[cfg(feature = "compression")] compress: bool, +) -> Result { + #[cfg(not(feature = "compression"))] + let compress = false; + + let uncompressed_data = if compress { + #[cfg(feature = "compression")] + { + let mut uncompressed_data = Vec::with_capacity(buffer.data.len() * 2); + if let Err(e) = lz_fear::raw::decompress_raw( + &buffer.data, + &[0; 0], + &mut uncompressed_data, + usize::MAX, + ) { + return Err(StreamError::Compression(e)); + } + uncompressed_data + } + #[cfg(not(feature = "compression"))] + unreachable!("compression isn't enabled as a feature"); + } else { + buffer.data + }; + match bincode::deserialize(uncompressed_data.as_slice()) { + Ok(m) => Ok(m), + Err(e) => Err(StreamError::Deserialize(e)), + } } impl OutgoingMessage { @@ -142,12 +186,78 @@ mod tests { #[test] fn serialize_test() { let msg = "abc"; - let mb = serialize(&msg); - assert_eq!(mb.data.len(), 9); - assert_eq!(mb.data[0], 34); + let mb = serialize( + &msg, + #[cfg(feature = "compression")] + false, + ); + assert_eq!(mb.data.len(), 11); + assert_eq!(mb.data[0], 3); + assert_eq!(mb.data[1..7], [0, 0, 0, 0, 0, 0]); + assert_eq!(mb.data[8], b'a'); + assert_eq!(mb.data[9], b'b'); + assert_eq!(mb.data[10], b'c'); + } + + #[cfg(feature = "compression")] + #[test] + fn serialize_compress_small() { + let msg = "abc"; + let mb = serialize(&msg, true); + assert_eq!(mb.data.len(), 12); + assert_eq!(mb.data[0], 176); assert_eq!(mb.data[1], 3); - assert_eq!(mb.data[6], b'a'); - assert_eq!(mb.data[7], b'b'); - assert_eq!(mb.data[8], b'c'); + assert_eq!(mb.data[2..8], [0, 0, 0, 0, 0, 0]); + assert_eq!(mb.data[9], b'a'); + assert_eq!(mb.data[10], b'b'); + assert_eq!(mb.data[11], b'c'); + } + + #[cfg(feature = "compression")] + #[test] + fn serialize_compress_medium() { + let msg = ( + "abccc", + 100u32, + 80u32, + "DATA", + 4, + 0, + 0, + 0, + "assets/data/plants/flowers/greenrose.ron", + ); + let mb = serialize(&msg, true); + assert_eq!(mb.data.len(), 79); + assert_eq!(mb.data[0], 34); + assert_eq!(mb.data[1], 5); + assert_eq!(mb.data[2], 0); + assert_eq!(mb.data[3], 1); + assert_eq!(mb.data[20], 20); + assert_eq!(mb.data[40], 115); + assert_eq!(mb.data[60], 111); + } + + #[cfg(feature = "compression")] + #[test] + fn serialize_compress_large() { + use rand::{Rng, SeedableRng}; + let mut seed = [0u8; 32]; + seed[8] = 13; + seed[9] = 37; + let mut rnd = rand::rngs::StdRng::from_seed(seed); + let mut msg = vec![0u8; 10000]; + for (i, s) in msg.iter_mut().enumerate() { + match i.rem_euclid(32) { + 2 => *s = 128, + 3 => *s = 128 + 16, + 4 => *s = 150, + 11 => *s = 64, + 12 => *s = rnd.gen::() / 32, + _ => {}, + } + } + let mb = serialize(&msg, true); + assert_eq!(mb.data.len(), 1296); } } diff --git a/network/src/types.rs b/network/src/types.rs index 267d539f03..d257ed808f 100644 --- a/network/src/types.rs +++ b/network/src/types.rs @@ -1,41 +1,42 @@ +use bitflags::bitflags; use rand::Rng; use std::convert::TryFrom; pub type Mid = u64; pub type Cid = u64; pub type Prio = u8; -/// use promises to modify the behavior of [`Streams`]. -/// available promises are: -/// * [`PROMISES_NONE`] -/// * [`PROMISES_ORDERED`] -/// * [`PROMISES_CONSISTENCY`] -/// * [`PROMISES_GUARANTEED_DELIVERY`] -/// * [`PROMISES_COMPRESSED`] -/// * [`PROMISES_ENCRYPTED`] -/// -/// [`Streams`]: crate::api::Stream -pub type Promises = u8; -/// use for no special promises on this [`Stream`](crate::api::Stream). -pub const PROMISES_NONE: Promises = 0; -/// this will guarantee that the order of messages which are send on one side, -/// is the same when received on the other. -pub const PROMISES_ORDERED: Promises = 1; -/// this will guarantee that messages received haven't been altered by errors, -/// like bit flips, this is done with a checksum. -pub const PROMISES_CONSISTENCY: Promises = 2; -/// this will guarantee that the other side will receive every message exactly -/// once no messages are dropped -pub const PROMISES_GUARANTEED_DELIVERY: Promises = 4; -/// this will enable the internal compression on this -/// [`Stream`](crate::api::Stream) -pub const PROMISES_COMPRESSED: Promises = 8; -/// this will enable the internal encryption on this -/// [`Stream`](crate::api::Stream) -pub const PROMISES_ENCRYPTED: Promises = 16; +bitflags! { + /// use promises to modify the behavior of [`Streams`]. + /// see the consts in this `struct` for + /// + /// [`Streams`]: crate::api::Stream + pub struct Promises: u8 { + /// this will guarantee that the order of messages which are send on one side, + /// is the same when received on the other. + const ORDERED = 0b00000001; + /// this will guarantee that messages received haven't been altered by errors, + /// like bit flips, this is done with a checksum. + const CONSISTENCY = 0b00000010; + /// this will guarantee that the other side will receive every message exactly + /// once no messages are dropped + const GUARANTEED_DELIVERY = 0b00000100; + /// this will enable the internal compression on this + /// [`Stream`](crate::api::Stream) + #[cfg(feature = "compression")] + const COMPRESSED = 0b00001000; + /// this will enable the internal encryption on this + /// [`Stream`](crate::api::Stream) + const ENCRYPTED = 0b00010000; + } +} + +impl Promises { + pub const fn to_le_bytes(self) -> [u8; 1] { self.bits.to_le_bytes() } +} pub(crate) const VELOREN_MAGIC_NUMBER: [u8; 7] = [86, 69, 76, 79, 82, 69, 78]; //VELOREN -pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 4, 0]; +pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 5, 0]; pub(crate) const STREAM_ID_OFFSET1: Sid = Sid::new(0); pub(crate) const STREAM_ID_OFFSET2: Sid = Sid::new(u64::MAX / 2); @@ -149,7 +150,7 @@ impl Frame { Frame::OpenStream { sid: Sid::from_le_bytes(*<&[u8; 8]>::try_from(&buf[0..8]).unwrap()), prio: buf[8], - promises: buf[9], + promises: Promises::from_bits_truncate(buf[9]), } } diff --git a/network/tests/closing.rs b/network/tests/closing.rs index dd6cd310a8..2c714277d3 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -20,7 +20,7 @@ use async_std::task; use task::block_on; -use veloren_network::{Network, ParticipantError, Pid, StreamError, PROMISES_NONE}; +use veloren_network::{Network, ParticipantError, Pid, Promises, StreamError}; mod helper; use helper::{network_participant_stream, tcp}; @@ -227,7 +227,7 @@ fn close_network_then_disconnect_part() { fn opened_stream_before_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp())); - let mut s2_a = block_on(p_a.open(10, PROMISES_NONE)).unwrap(); + let mut s2_a = block_on(p_a.open(10, Promises::empty())).unwrap(); s2_a.send("HelloWorld").unwrap(); let mut s2_b = block_on(p_b.opened()).unwrap(); drop(p_a); @@ -239,7 +239,7 @@ fn opened_stream_before_remote_part_is_closed() { fn opened_stream_after_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp())); - let mut s2_a = block_on(p_a.open(10, PROMISES_NONE)).unwrap(); + let mut s2_a = block_on(p_a.open(10, Promises::empty())).unwrap(); s2_a.send("HelloWorld").unwrap(); drop(p_a); std::thread::sleep(std::time::Duration::from_millis(1000)); @@ -255,14 +255,14 @@ fn opened_stream_after_remote_part_is_closed() { fn open_stream_after_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp())); - let mut s2_a = block_on(p_a.open(10, PROMISES_NONE)).unwrap(); + let mut s2_a = block_on(p_a.open(10, Promises::empty())).unwrap(); s2_a.send("HelloWorld").unwrap(); drop(p_a); std::thread::sleep(std::time::Duration::from_millis(1000)); let mut s2_b = block_on(p_b.opened()).unwrap(); assert_eq!(block_on(s2_b.recv()), Ok("HelloWorld".to_string())); assert_eq!( - block_on(p_b.open(20, PROMISES_NONE)).unwrap_err(), + block_on(p_b.open(20, Promises::empty())).unwrap_err(), ParticipantError::ParticipantDisconnected ); } @@ -289,7 +289,7 @@ fn open_participant_before_remote_part_is_closed() { let addr = tcp(); block_on(n_a.listen(addr.clone())).unwrap(); let p_b = block_on(n_b.connect(addr)).unwrap(); - let mut s1_b = block_on(p_b.open(10, PROMISES_NONE)).unwrap(); + let mut s1_b = block_on(p_b.open(10, Promises::empty())).unwrap(); s1_b.send("HelloWorld").unwrap(); let p_a = block_on(n_a.connected()).unwrap(); drop(s1_b); @@ -310,7 +310,7 @@ fn open_participant_after_remote_part_is_closed() { let addr = tcp(); block_on(n_a.listen(addr.clone())).unwrap(); let p_b = block_on(n_b.connect(addr)).unwrap(); - let mut s1_b = block_on(p_b.open(10, PROMISES_NONE)).unwrap(); + let mut s1_b = block_on(p_b.open(10, Promises::empty())).unwrap(); s1_b.send("HelloWorld").unwrap(); drop(s1_b); drop(p_b); @@ -331,7 +331,7 @@ fn close_network_scheduler_completely() { let addr = tcp(); block_on(n_a.listen(addr.clone())).unwrap(); let p_b = block_on(n_b.connect(addr)).unwrap(); - let mut s1_b = block_on(p_b.open(10, PROMISES_NONE)).unwrap(); + let mut s1_b = block_on(p_b.open(10, Promises::empty())).unwrap(); s1_b.send("HelloWorld").unwrap(); let p_a = block_on(n_a.connected()).unwrap(); diff --git a/network/tests/helper.rs b/network/tests/helper.rs index 02edf8bbff..93ee64a1e6 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -7,7 +7,7 @@ use std::{ }; use tracing::*; use tracing_subscriber::EnvFilter; -use veloren_network::{Network, Participant, Pid, ProtocolAddr, Stream, PROMISES_NONE}; +use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr, Stream}; #[allow(dead_code)] pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) { @@ -55,7 +55,7 @@ pub async fn network_participant_stream( let p1_b = n_b.connect(addr).await.unwrap(); let p1_a = n_a.connected().await.unwrap(); - let s1_a = p1_a.open(10, PROMISES_NONE).await.unwrap(); + let s1_a = p1_a.open(10, Promises::empty()).await.unwrap(); let s1_b = p1_b.opened().await.unwrap(); (n_a, p1_a, s1_a, n_b, p1_b, s1_b) diff --git a/network/tests/integration.rs b/network/tests/integration.rs index 7f37d5e78e..30e1f79fd6 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -4,7 +4,7 @@ use veloren_network::{NetworkError, StreamError}; mod helper; use helper::{network_participant_stream, tcp, udp}; use std::io::ErrorKind; -use veloren_network::{Network, Pid, ProtocolAddr, PROMISES_CONSISTENCY, PROMISES_ORDERED}; +use veloren_network::{Network, Pid, Promises, ProtocolAddr}; #[test] #[ignore] @@ -133,7 +133,7 @@ fn api_stream_send_main() -> std::result::Result<(), Box> .await?; // keep it alive let _stream_p = remote_p - .open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY) + .open(16, Promises::ORDERED | Promises::CONSISTENCY) .await?; let participant_a = network.connected().await?; let mut stream_a = participant_a.opened().await?; @@ -160,7 +160,7 @@ fn api_stream_recv_main() -> std::result::Result<(), Box> .connect(ProtocolAddr::Tcp("127.0.0.1:1220".parse().unwrap())) .await?; let mut stream_p = remote_p - .open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY) + .open(16, Promises::ORDERED | Promises::CONSISTENCY) .await?; stream_p.send("Hello World")?; let participant_a = network.connected().await?; @@ -178,7 +178,7 @@ fn wrong_parse() { s1_a.send(1337).unwrap(); match block_on(s1_b.recv::()) { - Err(StreamError::DeserializeError(_)) => (), + Err(StreamError::Deserialize(_)) => (), _ => panic!("this should fail, but it doesnt!"), } } diff --git a/server/Cargo.toml b/server/Cargo.toml index e0a6493d08..e47125e94a 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,7 +11,7 @@ default = ["worldgen"] [dependencies] common = { package = "veloren-common", path = "../common" } world = { package = "veloren-world", path = "../world" } -network = { package = "veloren_network", path = "../network", features = ["metrics"], default-features = false } +network = { package = "veloren_network", path = "../network", features = ["metrics", "compression"], default-features = false } specs-idvs = { git = "https://gitlab.com/veloren/specs-idvs.git", branch = "specs-git" }