Propper Compression support of network.

- Compression is no longer enabled always but can now be enabled per Stream.
   If a Stream is Compression enabled it will compress and decompress all msg (except for `raw` access) before handling them internally.
   You need to handle compression yourself for `raw` fn.
 - added a new feature to the network crate to enable or disable the compression
 - switched to `lz-fear` instead of `lz4-compression`
 - use `bitflags` to represent the `Promises` struct
This commit is contained in:
Marcel Märtens 2020-08-25 15:32:42 +02:00
parent ae208d9345
commit 144f88f811
12 changed files with 301 additions and 153 deletions

99
Cargo.lock generated
View File

@ -292,7 +292,7 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5753e2a71534719bf3f4e57006c3a4f0d2c672a4b676eec84161f763eca87dbf"
dependencies = [
"byteorder 1.3.4",
"byteorder",
"serde",
]
@ -387,12 +387,6 @@ version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db7a1029718df60331e557c9e83a55523c955e5dd2a7bfeffad6bbd50b538ae9"
[[package]]
name = "byteorder"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855"
[[package]]
name = "byteorder"
version = "1.3.4"
@ -405,7 +399,7 @@ version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c"
dependencies = [
"byteorder 1.3.4",
"byteorder",
"either",
"iovec",
]
@ -1056,7 +1050,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73770f8e1fe7d64df17ca66ad28994a0a623ea497fa69486e14984e715c5d174"
dependencies = [
"adler32",
"byteorder 1.3.4",
"byteorder",
]
[[package]]
@ -1082,7 +1076,7 @@ version = "1.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e2de9deab977a153492a1468d1b1c0662c1cf39e5ea87d0c060ecd59ef18d8c"
dependencies = [
"byteorder 1.3.4",
"byteorder",
"diesel_derives",
"libsqlite3-sys",
]
@ -1162,7 +1156,7 @@ version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83c18405ef54de0398b77a3ec8394d3a1639e7bf060e1385201e8db40c44ab41"
dependencies = [
"byteorder 1.3.4",
"byteorder",
"lazy_static",
"log",
"nom 4.2.3",
@ -1264,6 +1258,26 @@ dependencies = [
"synstructure",
]
[[package]]
name = "fehler"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5729fe49ba028cd550747b6e62cd3d841beccab5390aa398538c31a2d983635"
dependencies = [
"fehler-macros",
]
[[package]]
name = "fehler-macros"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccb5acb1045ebbfa222e2c50679e392a71dd77030b78fb0189f2d9c5974400f9"
dependencies = [
"proc-macro2 1.0.18",
"quote 1.0.7",
"syn 1.0.33",
]
[[package]]
name = "filetime"
version = "0.2.10"
@ -1476,7 +1490,7 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
dependencies = [
"byteorder 1.3.4",
"byteorder",
]
[[package]]
@ -1886,7 +1900,7 @@ version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5b34c246847f938a410a03c5458c7fee2274436675e76d8b903c08efc29c462"
dependencies = [
"byteorder 1.3.4",
"byteorder",
"bytes",
"fnv",
"futures 0.1.29",
@ -2075,7 +2089,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "543904170510c1b5fb65140485d84de4a57fddb2ed685481e9020ce3d2c9f64c"
dependencies = [
"bytemuck",
"byteorder 1.3.4",
"byteorder",
"num-iter",
"num-rational 0.3.0",
"num-traits",
@ -2218,7 +2232,7 @@ version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be42bea7971f4ba0ea1e215730c29bc1ff9bd2a9c10013912f42a8dcf8d77c0d"
dependencies = [
"byteorder 1.3.4",
"byteorder",
"ogg",
"tinyvec",
]
@ -2343,13 +2357,16 @@ dependencies = [
]
[[package]]
name = "lz4-compress"
name = "lz-fear"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f966533a922a9bba9e95e594c1fdb3b9bf5fdcdb11e37e51ad84cd76e468b91"
checksum = "06aad1ce45e4ccf7a8d7d43e0c3ad38dc5d2255174a5f29a3c39d961fbc6181d"
dependencies = [
"byteorder 0.5.3",
"quick-error",
"bitflags",
"byteorder",
"fehler",
"thiserror",
"twox-hash",
]
[[package]]
@ -2833,7 +2850,7 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d79f1db9148be9d0e174bb3ac890f6030fcb1ed947267c5a91ee4c91b5a91e15"
dependencies = [
"byteorder 1.3.4",
"byteorder",
]
[[package]]
@ -3233,12 +3250,6 @@ dependencies = [
"percent-encoding 2.1.0",
]
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quote"
version = "0.6.13"
@ -3510,7 +3521,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4"
dependencies = [
"byteorder 1.3.4",
"byteorder",
"regex-syntax",
]
@ -3948,7 +3959,7 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f77b6b07e862c66a9f3e62a07588fee67cd90a9135a2b942409f195507b4fb51"
dependencies = [
"byteorder 1.3.4",
"byteorder",
]
[[package]]
@ -4473,6 +4484,12 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44834418e2c5b16f47bedf35c28e148db099187dd5feee6367fb2525863af4f1"
[[package]]
name = "twox-hash"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bfd5b7557925ce778ff9b9ef90e3ade34c524b5ff10e239c69a42d546d2af56"
[[package]]
name = "tynm"
version = "0.1.4"
@ -4589,17 +4606,6 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "uvth"
version = "4.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e5910f9106b96334c6cae1f1d77a764bda66ac4ca9f507f73259f184fe1bb6b"
dependencies = [
"crossbeam-channel 0.3.9",
"log",
"num_cpus",
]
[[package]]
name = "vcpkg"
version = "0.2.10"
@ -4654,7 +4660,7 @@ name = "veloren-client"
version = "0.7.0"
dependencies = [
"authc",
"byteorder 1.3.4",
"byteorder",
"futures-executor",
"futures-timer",
"futures-util",
@ -4665,7 +4671,7 @@ dependencies = [
"rayon",
"specs",
"tracing",
"uvth 3.1.1",
"uvth",
"vek 0.12.0",
"veloren-common",
"veloren_network",
@ -4730,7 +4736,7 @@ dependencies = [
"specs-idvs",
"tiny_http",
"tracing",
"uvth 3.1.1",
"uvth",
"vek 0.12.0",
"veloren-common",
"veloren-world",
@ -4792,7 +4798,7 @@ dependencies = [
"tracing-log",
"tracing-subscriber",
"treeculler",
"uvth 3.1.1",
"uvth",
"vek 0.12.0",
"veloren-client",
"veloren-common",
@ -4847,21 +4853,22 @@ dependencies = [
[[package]]
name = "veloren_network"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"async-std",
"bincode",
"bitflags",
"crossbeam-channel 0.4.2",
"futures 0.3.5",
"lazy_static",
"lz4-compress",
"lz-fear",
"prometheus",
"rand 0.7.3",
"serde",
"tracing",
"tracing-futures",
"tracing-subscriber",
"uvth 4.0.1",
"uvth",
]
[[package]]

View File

@ -6,7 +6,7 @@ edition = "2018"
[dependencies]
common = { package = "veloren-common", path = "../common", features = ["no-assets"] }
network = { package = "veloren_network", path = "../network", default-features = false }
network = { package = "veloren_network", path = "../network", features = ["compression"], default-features = false }
byteorder = "1.3.2"
uvth = "3.1.1"

View File

@ -37,9 +37,7 @@ use futures_timer::Delay;
use futures_util::{select, FutureExt};
use hashbrown::{HashMap, HashSet};
use image::DynamicImage;
use network::{
Network, Participant, Pid, ProtocolAddr, Stream, PROMISES_CONSISTENCY, PROMISES_ORDERED,
};
use network::{Network, Participant, Pid, Promises, ProtocolAddr, Stream};
use num::traits::FloatConst;
use rayon::prelude::*;
use std::{
@ -157,7 +155,10 @@ impl Client {
thread_pool.execute(scheduler);
let participant = block_on(network.connect(ProtocolAddr::Tcp(addr.into())))?;
let mut stream = block_on(participant.open(10, PROMISES_ORDERED | PROMISES_CONSISTENCY))?;
let mut stream = block_on(participant.open(
10,
Promises::ORDERED | Promises::CONSISTENCY | Promises::COMPRESSED,
))?;
// Wait for initial sync
let (

View File

@ -1,20 +1,19 @@
[package]
name = "veloren_network"
version = "0.1.0"
version = "0.2.0"
authors = ["Marcel Märtens <marcel.cochem@googlemail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
metrics = ["prometheus"]
compression = ["lz-fear"]
default = ["metrics"]
default = ["metrics","compression"]
[dependencies]
lz4-compress = "0.1.1"
#serialisation
bincode = "1.2"
serde = { version = "1.0" }
@ -31,6 +30,9 @@ futures = { version = "0.3", features = ["thread-pool"] }
#mpsc channel registry
lazy_static = { version = "1.4", default-features = false }
rand = { version = "0.7" }
#stream flags
bitflags = "1.2.1"
lz-fear = { version = "0.1.1", optional = true }
[dev-dependencies]
tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] }

View File

@ -14,6 +14,8 @@ use futures::{
sink::SinkExt,
stream::StreamExt,
};
#[cfg(feature = "compression")]
use lz_fear::raw::DecodeError;
#[cfg(feature = "metrics")]
use prometheus::Registry;
use serde::{de::DeserializeOwned, Serialize};
@ -100,10 +102,15 @@ pub enum ParticipantError {
}
/// Error type thrown by [`Streams`](Stream) methods
/// A Compression Error should only happen if a client sends malicious code.
/// A Deserialize Error probably means you are expecting Type X while you
/// actually got send type Y.
#[derive(Debug)]
pub enum StreamError {
StreamClosed,
DeserializeError(Box<bincode::ErrorKind>),
#[cfg(feature = "compression")]
Compression(DecodeError),
Deserialize(bincode::Error),
}
/// Use the `Network` to create connections to other [`Participants`]
@ -442,7 +449,7 @@ impl Participant {
/// etc...
/// * `promises` - use a combination of you prefered [`Promises`], see the
/// link for further documentation. You can combine them, e.g.
/// `PROMISES_ORDERED | PROMISES_CONSISTENCY` The Stream will then
/// `Promises::ORDERED | Promises::CONSISTENCY` The Stream will then
/// guarantee that those promises are met.
///
/// A [`ParticipantError`] might be thrown if the `Participant` is already
@ -452,7 +459,7 @@ impl Participant {
/// # Examples
/// ```rust
/// use futures::executor::block_on;
/// use veloren_network::{Network, Pid, ProtocolAddr, PROMISES_CONSISTENCY, PROMISES_ORDERED};
/// use veloren_network::{Network, Pid, Promises, ProtocolAddr};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, connect on port 2100 and open a stream
@ -465,7 +472,9 @@ impl Participant {
/// let p1 = network
/// .connect(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap()))
/// .await?;
/// let _s1 = p1.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// let _s1 = p1
/// .open(16, Promises::ORDERED | Promises::CONSISTENCY)
/// .await?;
/// # Ok(())
/// })
/// # }
@ -506,7 +515,7 @@ impl Participant {
///
/// # Examples
/// ```rust
/// use veloren_network::{Network, Pid, ProtocolAddr, PROMISES_ORDERED, PROMISES_CONSISTENCY};
/// use veloren_network::{Network, Pid, ProtocolAddr, Promises};
/// use futures::executor::block_on;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
@ -520,7 +529,7 @@ impl Participant {
/// # remote.listen(ProtocolAddr::Tcp("0.0.0.0:2110".parse().unwrap())).await?;
/// let p1 = network.connect(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
/// # let p2 = remote.connected().await?;
/// # p2.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// # p2.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?;
/// let _s1 = p1.opened().await?;
/// # Ok(())
/// })
@ -690,7 +699,7 @@ impl Stream {
/// # Example
/// ```
/// use veloren_network::{Network, ProtocolAddr, Pid};
/// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY};
/// # use veloren_network::Promises;
/// use futures::executor::block_on;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
@ -703,7 +712,7 @@ impl Stream {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
/// # // keep it alive
/// # let _stream_p = remote_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// # let _stream_p = remote_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?;
/// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// //Send Message
@ -718,18 +727,22 @@ impl Stream {
/// [`Serialized`]: Serialize
#[inline]
pub fn send<M: Serialize>(&mut self, msg: M) -> Result<(), StreamError> {
self.send_raw(Arc::new(message::serialize(&msg)))
self.send_raw(Arc::new(message::serialize(
&msg,
#[cfg(feature = "compression")]
self.promises.contains(Promises::COMPRESSED),
)))
}
/// This methods give the option to skip multiple calls of [`bincode`], e.g.
/// in case the same Message needs to send on multiple `Streams` to multiple
/// [`Participants`]. Other then that, the same rules apply than for
/// [`send`]
/// This methods give the option to skip multiple calls of [`bincode`] and
/// [`compress`], e.g. in case the same Message needs to send on
/// multiple `Streams` to multiple [`Participants`]. Other then that,
/// the same rules apply than for [`send`]
///
/// # Example
/// ```rust
/// use veloren_network::{Network, ProtocolAddr, Pid, MessageBuffer};
/// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY};
/// # use veloren_network::Promises;
/// use futures::executor::block_on;
/// use bincode;
/// use std::sync::Arc;
@ -746,8 +759,8 @@ impl Stream {
/// # let remote1_p = remote1.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
/// # let remote2_p = remote2.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
/// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid());
/// # remote1_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// # remote2_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// # remote1_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?;
/// # remote2_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?;
/// let participant_a = network.connected().await?;
/// let participant_b = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
@ -768,6 +781,7 @@ impl Stream {
///
/// [`send`]: Stream::send
/// [`Participants`]: crate::api::Participant
/// [`compress`]: lz_fear::raw::compress2
pub fn send_raw(&mut self, messagebuffer: Arc<MessageBuffer>) -> Result<(), StreamError> {
if self.send_closed.load(Ordering::Relaxed) {
return Err(StreamError::StreamClosed);
@ -794,7 +808,7 @@ impl Stream {
/// # Example
/// ```
/// use veloren_network::{Network, ProtocolAddr, Pid};
/// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY};
/// # use veloren_network::Promises;
/// use futures::executor::block_on;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
@ -806,7 +820,7 @@ impl Stream {
/// block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
/// # let mut stream_p = remote_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// # let mut stream_p = remote_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?;
/// # stream_p.send("Hello World");
/// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
@ -818,14 +832,19 @@ impl Stream {
/// ```
#[inline]
pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError> {
Ok(message::deserialize(self.recv_raw().await?)?)
message::deserialize(
self.recv_raw().await?,
#[cfg(feature = "compression")]
self.promises.contains(Promises::COMPRESSED),
)
}
/// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] is
/// executed for performance reasons.
/// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] or
/// [`decompress`] is executed for performance reasons.
///
/// [`send_raw`]: Stream::send_raw
/// [`recv`]: Stream::recv
/// [`decompress`]: lz_fear::raw::decompress_raw
pub async fn recv_raw(&mut self) -> Result<MessageBuffer, StreamError> {
let msg = self.b2a_msg_recv_r.next().await?;
Ok(msg.buffer)
@ -991,16 +1010,16 @@ impl From<oneshot::Canceled> for NetworkError {
}
impl From<Box<bincode::ErrorKind>> for StreamError {
fn from(err: Box<bincode::ErrorKind>) -> Self { StreamError::DeserializeError(err) }
fn from(err: Box<bincode::ErrorKind>) -> Self { StreamError::Deserialize(err) }
}
impl core::fmt::Display for StreamError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
StreamError::StreamClosed => write!(f, "stream closed"),
StreamError::DeserializeError(err) => {
write!(f, "deserialize error on message: {}", err)
},
#[cfg(feature = "compression")]
StreamError::Compression(err) => write!(f, "compression error on message: {}", err),
StreamError::Deserialize(err) => write!(f, "deserialize error on message: {}", err),
}
}
}
@ -1032,11 +1051,22 @@ impl core::cmp::PartialEq for StreamError {
match self {
StreamError::StreamClosed => match other {
StreamError::StreamClosed => true,
StreamError::DeserializeError(_) => false,
#[cfg(feature = "compression")]
StreamError::Compression(_) => false,
StreamError::Deserialize(_) => false,
},
StreamError::DeserializeError(err) => match other {
#[cfg(feature = "compression")]
StreamError::Compression(err) => match other {
StreamError::StreamClosed => false,
StreamError::DeserializeError(other_err) => partial_eq_bincode(err, other_err),
#[cfg(feature = "compression")]
StreamError::Compression(other_err) => err == other_err,
StreamError::Deserialize(_) => false,
},
StreamError::Deserialize(err) => match other {
StreamError::StreamClosed => false,
#[cfg(feature = "compression")]
StreamError::Compression(_) => false,
StreamError::Deserialize(other_err) => partial_eq_bincode(err, other_err),
},
}
}

View File

@ -40,7 +40,7 @@
//! ```rust
//! use async_std::task::sleep;
//! use futures::{executor::block_on, join};
//! use veloren_network::{Network, Pid, ProtocolAddr, PROMISES_CONSISTENCY, PROMISES_ORDERED};
//! use veloren_network::{Network, Pid, Promises, ProtocolAddr};
//!
//! // Client
//! async fn client() -> std::result::Result<(), Box<dyn std::error::Error>> {
@ -51,7 +51,7 @@
//! .connect(ProtocolAddr::Tcp("127.0.0.1:12345".parse().unwrap()))
//! .await?;
//! let mut stream = server
//! .open(10, PROMISES_ORDERED | PROMISES_CONSISTENCY)
//! .open(10, Promises::ORDERED | Promises::CONSISTENCY)
//! .await?;
//! stream.send("Hello World")?;
//! Ok(())
@ -113,7 +113,4 @@ pub use api::{
Network, NetworkError, Participant, ParticipantError, ProtocolAddr, Stream, StreamError,
};
pub use message::MessageBuffer;
pub use types::{
Pid, Promises, PROMISES_COMPRESSED, PROMISES_CONSISTENCY, PROMISES_ENCRYPTED,
PROMISES_GUARANTEED_DELIVERY, PROMISES_NONE, PROMISES_ORDERED,
};
pub use types::{Pid, Promises};

View File

@ -1,6 +1,9 @@
use serde::{de::DeserializeOwned, Serialize};
//use std::collections::VecDeque;
use crate::types::{Frame, Mid, Sid};
use crate::{
api::StreamError,
types::{Frame, Mid, Sid},
};
use std::{io, sync::Arc};
//Todo: Evaluate switching to VecDeque for quickly adding and removing data
@ -33,23 +36,64 @@ pub(crate) struct IncomingMessage {
pub sid: Sid,
}
pub(crate) fn serialize<M: Serialize>(message: &M) -> MessageBuffer {
pub(crate) fn serialize<M: Serialize>(
message: &M,
#[cfg(feature = "compression")] compress: bool,
) -> MessageBuffer {
//this will never fail: https://docs.rs/bincode/0.8.0/bincode/fn.serialize.html
let writer = bincode::serialize(message).unwrap();
let serialized_data = bincode::serialize(message).unwrap();
#[cfg(not(feature = "compression"))]
let compress = false;
MessageBuffer {
data: lz4_compress::compress(&writer),
data: if compress {
#[cfg(feature = "compression")]
{
let mut compressed_data = Vec::with_capacity(serialized_data.len() / 4 + 10);
let mut table = lz_fear::raw::U32Table::default();
lz_fear::raw::compress2(&serialized_data, 0, &mut table, &mut compressed_data)
.unwrap();
compressed_data
}
#[cfg(not(feature = "compression"))]
unreachable!("compression isn't enabled as a feature");
} else {
serialized_data
},
}
}
//pub(crate) fn deserialize<M: DeserializeOwned>(buffer: MessageBuffer) ->
// std::Result<M, std::Box<bincode::error::bincode::ErrorKind>> {
pub(crate) fn deserialize<M: DeserializeOwned>(buffer: MessageBuffer) -> bincode::Result<M> {
let span = lz4_compress::decompress(&buffer.data)
.expect("lz4 decompression failed, failed to deserialze");
//this might fail if you choose the wrong type for M. in that case probably X
// got transferred while you assume Y. probably this means your application
// logic is wrong. E.g. You expect a String, but just get a u8.
bincode::deserialize(span.as_slice())
pub(crate) fn deserialize<M: DeserializeOwned>(
buffer: MessageBuffer,
#[cfg(feature = "compression")] compress: bool,
) -> Result<M, StreamError> {
#[cfg(not(feature = "compression"))]
let compress = false;
let uncompressed_data = if compress {
#[cfg(feature = "compression")]
{
let mut uncompressed_data = Vec::with_capacity(buffer.data.len() * 2);
if let Err(e) = lz_fear::raw::decompress_raw(
&buffer.data,
&[0; 0],
&mut uncompressed_data,
usize::MAX,
) {
return Err(StreamError::Compression(e));
}
uncompressed_data
}
#[cfg(not(feature = "compression"))]
unreachable!("compression isn't enabled as a feature");
} else {
buffer.data
};
match bincode::deserialize(uncompressed_data.as_slice()) {
Ok(m) => Ok(m),
Err(e) => Err(StreamError::Deserialize(e)),
}
}
impl OutgoingMessage {
@ -142,12 +186,78 @@ mod tests {
#[test]
fn serialize_test() {
let msg = "abc";
let mb = serialize(&msg);
assert_eq!(mb.data.len(), 9);
assert_eq!(mb.data[0], 34);
let mb = serialize(
&msg,
#[cfg(feature = "compression")]
false,
);
assert_eq!(mb.data.len(), 11);
assert_eq!(mb.data[0], 3);
assert_eq!(mb.data[1..7], [0, 0, 0, 0, 0, 0]);
assert_eq!(mb.data[8], b'a');
assert_eq!(mb.data[9], b'b');
assert_eq!(mb.data[10], b'c');
}
#[cfg(feature = "compression")]
#[test]
fn serialize_compress_small() {
let msg = "abc";
let mb = serialize(&msg, true);
assert_eq!(mb.data.len(), 12);
assert_eq!(mb.data[0], 176);
assert_eq!(mb.data[1], 3);
assert_eq!(mb.data[6], b'a');
assert_eq!(mb.data[7], b'b');
assert_eq!(mb.data[8], b'c');
assert_eq!(mb.data[2..8], [0, 0, 0, 0, 0, 0]);
assert_eq!(mb.data[9], b'a');
assert_eq!(mb.data[10], b'b');
assert_eq!(mb.data[11], b'c');
}
#[cfg(feature = "compression")]
#[test]
fn serialize_compress_medium() {
let msg = (
"abccc",
100u32,
80u32,
"DATA",
4,
0,
0,
0,
"assets/data/plants/flowers/greenrose.ron",
);
let mb = serialize(&msg, true);
assert_eq!(mb.data.len(), 79);
assert_eq!(mb.data[0], 34);
assert_eq!(mb.data[1], 5);
assert_eq!(mb.data[2], 0);
assert_eq!(mb.data[3], 1);
assert_eq!(mb.data[20], 20);
assert_eq!(mb.data[40], 115);
assert_eq!(mb.data[60], 111);
}
#[cfg(feature = "compression")]
#[test]
fn serialize_compress_large() {
use rand::{Rng, SeedableRng};
let mut seed = [0u8; 32];
seed[8] = 13;
seed[9] = 37;
let mut rnd = rand::rngs::StdRng::from_seed(seed);
let mut msg = vec![0u8; 10000];
for (i, s) in msg.iter_mut().enumerate() {
match i.rem_euclid(32) {
2 => *s = 128,
3 => *s = 128 + 16,
4 => *s = 150,
11 => *s = 64,
12 => *s = rnd.gen::<u8>() / 32,
_ => {},
}
}
let mb = serialize(&msg, true);
assert_eq!(mb.data.len(), 1296);
}
}

View File

@ -1,41 +1,42 @@
use bitflags::bitflags;
use rand::Rng;
use std::convert::TryFrom;
pub type Mid = u64;
pub type Cid = u64;
pub type Prio = u8;
/// use promises to modify the behavior of [`Streams`].
/// available promises are:
/// * [`PROMISES_NONE`]
/// * [`PROMISES_ORDERED`]
/// * [`PROMISES_CONSISTENCY`]
/// * [`PROMISES_GUARANTEED_DELIVERY`]
/// * [`PROMISES_COMPRESSED`]
/// * [`PROMISES_ENCRYPTED`]
///
/// [`Streams`]: crate::api::Stream
pub type Promises = u8;
/// use for no special promises on this [`Stream`](crate::api::Stream).
pub const PROMISES_NONE: Promises = 0;
/// this will guarantee that the order of messages which are send on one side,
/// is the same when received on the other.
pub const PROMISES_ORDERED: Promises = 1;
/// this will guarantee that messages received haven't been altered by errors,
/// like bit flips, this is done with a checksum.
pub const PROMISES_CONSISTENCY: Promises = 2;
/// this will guarantee that the other side will receive every message exactly
/// once no messages are dropped
pub const PROMISES_GUARANTEED_DELIVERY: Promises = 4;
/// this will enable the internal compression on this
/// [`Stream`](crate::api::Stream)
pub const PROMISES_COMPRESSED: Promises = 8;
/// this will enable the internal encryption on this
/// [`Stream`](crate::api::Stream)
pub const PROMISES_ENCRYPTED: Promises = 16;
bitflags! {
/// use promises to modify the behavior of [`Streams`].
/// see the consts in this `struct` for
///
/// [`Streams`]: crate::api::Stream
pub struct Promises: u8 {
/// this will guarantee that the order of messages which are send on one side,
/// is the same when received on the other.
const ORDERED = 0b00000001;
/// this will guarantee that messages received haven't been altered by errors,
/// like bit flips, this is done with a checksum.
const CONSISTENCY = 0b00000010;
/// this will guarantee that the other side will receive every message exactly
/// once no messages are dropped
const GUARANTEED_DELIVERY = 0b00000100;
/// this will enable the internal compression on this
/// [`Stream`](crate::api::Stream)
#[cfg(feature = "compression")]
const COMPRESSED = 0b00001000;
/// this will enable the internal encryption on this
/// [`Stream`](crate::api::Stream)
const ENCRYPTED = 0b00010000;
}
}
impl Promises {
pub const fn to_le_bytes(self) -> [u8; 1] { self.bits.to_le_bytes() }
}
pub(crate) const VELOREN_MAGIC_NUMBER: [u8; 7] = [86, 69, 76, 79, 82, 69, 78]; //VELOREN
pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 4, 0];
pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 5, 0];
pub(crate) const STREAM_ID_OFFSET1: Sid = Sid::new(0);
pub(crate) const STREAM_ID_OFFSET2: Sid = Sid::new(u64::MAX / 2);
@ -149,7 +150,7 @@ impl Frame {
Frame::OpenStream {
sid: Sid::from_le_bytes(*<&[u8; 8]>::try_from(&buf[0..8]).unwrap()),
prio: buf[8],
promises: buf[9],
promises: Promises::from_bits_truncate(buf[9]),
}
}

View File

@ -20,7 +20,7 @@
use async_std::task;
use task::block_on;
use veloren_network::{Network, ParticipantError, Pid, StreamError, PROMISES_NONE};
use veloren_network::{Network, ParticipantError, Pid, Promises, StreamError};
mod helper;
use helper::{network_participant_stream, tcp};
@ -227,7 +227,7 @@ fn close_network_then_disconnect_part() {
fn opened_stream_before_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp()));
let mut s2_a = block_on(p_a.open(10, PROMISES_NONE)).unwrap();
let mut s2_a = block_on(p_a.open(10, Promises::empty())).unwrap();
s2_a.send("HelloWorld").unwrap();
let mut s2_b = block_on(p_b.opened()).unwrap();
drop(p_a);
@ -239,7 +239,7 @@ fn opened_stream_before_remote_part_is_closed() {
fn opened_stream_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp()));
let mut s2_a = block_on(p_a.open(10, PROMISES_NONE)).unwrap();
let mut s2_a = block_on(p_a.open(10, Promises::empty())).unwrap();
s2_a.send("HelloWorld").unwrap();
drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
@ -255,14 +255,14 @@ fn opened_stream_after_remote_part_is_closed() {
fn open_stream_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp()));
let mut s2_a = block_on(p_a.open(10, PROMISES_NONE)).unwrap();
let mut s2_a = block_on(p_a.open(10, Promises::empty())).unwrap();
s2_a.send("HelloWorld").unwrap();
drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
let mut s2_b = block_on(p_b.opened()).unwrap();
assert_eq!(block_on(s2_b.recv()), Ok("HelloWorld".to_string()));
assert_eq!(
block_on(p_b.open(20, PROMISES_NONE)).unwrap_err(),
block_on(p_b.open(20, Promises::empty())).unwrap_err(),
ParticipantError::ParticipantDisconnected
);
}
@ -289,7 +289,7 @@ fn open_participant_before_remote_part_is_closed() {
let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap();
let p_b = block_on(n_b.connect(addr)).unwrap();
let mut s1_b = block_on(p_b.open(10, PROMISES_NONE)).unwrap();
let mut s1_b = block_on(p_b.open(10, Promises::empty())).unwrap();
s1_b.send("HelloWorld").unwrap();
let p_a = block_on(n_a.connected()).unwrap();
drop(s1_b);
@ -310,7 +310,7 @@ fn open_participant_after_remote_part_is_closed() {
let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap();
let p_b = block_on(n_b.connect(addr)).unwrap();
let mut s1_b = block_on(p_b.open(10, PROMISES_NONE)).unwrap();
let mut s1_b = block_on(p_b.open(10, Promises::empty())).unwrap();
s1_b.send("HelloWorld").unwrap();
drop(s1_b);
drop(p_b);
@ -331,7 +331,7 @@ fn close_network_scheduler_completely() {
let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap();
let p_b = block_on(n_b.connect(addr)).unwrap();
let mut s1_b = block_on(p_b.open(10, PROMISES_NONE)).unwrap();
let mut s1_b = block_on(p_b.open(10, Promises::empty())).unwrap();
s1_b.send("HelloWorld").unwrap();
let p_a = block_on(n_a.connected()).unwrap();

View File

@ -7,7 +7,7 @@ use std::{
};
use tracing::*;
use tracing_subscriber::EnvFilter;
use veloren_network::{Network, Participant, Pid, ProtocolAddr, Stream, PROMISES_NONE};
use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr, Stream};
#[allow(dead_code)]
pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) {
@ -55,7 +55,7 @@ pub async fn network_participant_stream(
let p1_b = n_b.connect(addr).await.unwrap();
let p1_a = n_a.connected().await.unwrap();
let s1_a = p1_a.open(10, PROMISES_NONE).await.unwrap();
let s1_a = p1_a.open(10, Promises::empty()).await.unwrap();
let s1_b = p1_b.opened().await.unwrap();
(n_a, p1_a, s1_a, n_b, p1_b, s1_b)

View File

@ -4,7 +4,7 @@ use veloren_network::{NetworkError, StreamError};
mod helper;
use helper::{network_participant_stream, tcp, udp};
use std::io::ErrorKind;
use veloren_network::{Network, Pid, ProtocolAddr, PROMISES_CONSISTENCY, PROMISES_ORDERED};
use veloren_network::{Network, Pid, Promises, ProtocolAddr};
#[test]
#[ignore]
@ -133,7 +133,7 @@ fn api_stream_send_main() -> std::result::Result<(), Box<dyn std::error::Error>>
.await?;
// keep it alive
let _stream_p = remote_p
.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY)
.open(16, Promises::ORDERED | Promises::CONSISTENCY)
.await?;
let participant_a = network.connected().await?;
let mut stream_a = participant_a.opened().await?;
@ -160,7 +160,7 @@ fn api_stream_recv_main() -> std::result::Result<(), Box<dyn std::error::Error>>
.connect(ProtocolAddr::Tcp("127.0.0.1:1220".parse().unwrap()))
.await?;
let mut stream_p = remote_p
.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY)
.open(16, Promises::ORDERED | Promises::CONSISTENCY)
.await?;
stream_p.send("Hello World")?;
let participant_a = network.connected().await?;
@ -178,7 +178,7 @@ fn wrong_parse() {
s1_a.send(1337).unwrap();
match block_on(s1_b.recv::<String>()) {
Err(StreamError::DeserializeError(_)) => (),
Err(StreamError::Deserialize(_)) => (),
_ => panic!("this should fail, but it doesnt!"),
}
}

View File

@ -11,7 +11,7 @@ default = ["worldgen"]
[dependencies]
common = { package = "veloren-common", path = "../common" }
world = { package = "veloren-world", path = "../world" }
network = { package = "veloren_network", path = "../network", features = ["metrics"], default-features = false }
network = { package = "veloren_network", path = "../network", features = ["metrics", "compression"], default-features = false }
specs-idvs = { git = "https://gitlab.com/veloren/specs-idvs.git", branch = "specs-git" }