diff --git a/Cargo.lock b/Cargo.lock index e41c084765..c36ba62d0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5463,7 +5463,6 @@ dependencies = [ "approx 0.4.0", "arraygen", "assets_manager", - "bincode", "bitflags", "criterion", "crossbeam-channel", @@ -5471,7 +5470,6 @@ dependencies = [ "csv", "dot_vox", "enum-iterator", - "flate2", "hashbrown", "image", "indexmap", @@ -5535,6 +5533,8 @@ dependencies = [ name = "veloren-common-net" version = "0.9.0" dependencies = [ + "bincode", + "flate2", "hashbrown", "serde", "specs", diff --git a/client/src/lib.rs b/client/src/lib.rs index 7a6ecf3d41..d0108121fc 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1923,15 +1923,17 @@ impl Client { fn handle_server_terrain_msg(&mut self, msg: ServerGeneral) -> Result<(), Error> { match msg { ServerGeneral::TerrainChunkUpdate { key, chunk } => { - if let Some(chunk) = chunk.ok().and_then(|c| c.to_chunk()) { + if let Some(chunk) = chunk.ok().and_then(|c| c.decompress()) { self.state.insert_chunk(key, chunk); } self.pending_chunks.remove(&key); }, - ServerGeneral::TerrainBlockUpdates(mut blocks) => { - blocks.drain().for_each(|(pos, block)| { - self.state.set_block(pos, block); - }); + ServerGeneral::TerrainBlockUpdates(blocks) => { + if let Some(mut blocks) = blocks.decompress() { + blocks.drain().for_each(|(pos, block)| { + self.state.set_block(pos, block); + }); + } }, _ => unreachable!("Not a terrain message"), } diff --git a/common/Cargo.toml b/common/Cargo.toml index 5028493a73..8c3ef5d6c9 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -9,9 +9,8 @@ no-assets = [] tracy = ["common-base/tracy"] simd = ["vek/platform_intrinsics"] bin_csv = ["csv", "structopt"] -compression = ["flate2"] -default = ["simd", "compression"] +default = ["simd"] [dependencies] @@ -24,13 +23,11 @@ serde = { version = "1.0.110", features = ["derive", "rc"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] approx = "0.4.0" arraygen = "0.1.13" -bincode = "1.3.3" crossbeam-utils = "0.8.1" bitflags = "1.2" crossbeam-channel = "0.5" enum-iterator = "0.6" lazy_static = "1.4.0" -flate2 = { version = "1.0.20", optional = true } num-derive = "0.3" num-traits = "0.2" ordered-float = { version = "2.0.1", default-features = false } diff --git a/common/net/Cargo.toml b/common/net/Cargo.toml index 209c2d6a18..0661f79697 100644 --- a/common/net/Cargo.toml +++ b/common/net/Cargo.toml @@ -14,6 +14,8 @@ default = ["simd"] common = {package = "veloren-common", path = "../../common"} #inline_tweak = "1.0.2" +bincode = "1.3.3" +flate2 = "1.0.20" sum_type = "0.2.0" vek = { version = "=0.14.1", features = ["serde"] } tracing = { version = "0.1", default-features = false } diff --git a/common/net/src/msg/mod.rs b/common/net/src/msg/mod.rs index 667038ac78..2d2421f9cc 100644 --- a/common/net/src/msg/mod.rs +++ b/common/net/src/msg/mod.rs @@ -15,6 +15,8 @@ pub use self::{ }; use common::character::CharacterId; use serde::{Deserialize, Serialize}; +use std::marker::PhantomData; +use tracing::trace; #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] pub enum PresenceKind { @@ -42,3 +44,60 @@ pub fn validate_chat_msg(msg: &str) -> Result<(), ChatMsgValidationError> { Err(ChatMsgValidationError::TooLong) } } + +/// Wrapper for compressed, serialized data (for stuff that doesn't use the +/// default lz4 compression) +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CompressedData { + pub data: Vec, + compressed: bool, + _phantom: PhantomData, +} + +impl Deserialize<'a>> CompressedData { + pub fn compress(t: &T, level: u32) -> Self { + use flate2::{write::DeflateEncoder, Compression}; + use std::io::Write; + let uncompressed = bincode::serialize(t) + .expect("bincode serialization can only fail if a byte limit is set"); + + if uncompressed.len() >= 32 { + const EXPECT_MSG: &str = + "compression only fails for fallible Read/Write impls (which Vec is not)"; + + let mut encoder = DeflateEncoder::new(Vec::new(), Compression::new(level)); + encoder.write_all(&*uncompressed).expect(EXPECT_MSG); + let compressed = encoder.finish().expect(EXPECT_MSG); + trace!( + "compressed {}, uncompressed {}, ratio {}", + compressed.len(), + uncompressed.len(), + compressed.len() as f32 / uncompressed.len() as f32 + ); + CompressedData { + data: compressed, + compressed: true, + _phantom: PhantomData, + } + } else { + CompressedData { + data: uncompressed, + compressed: false, + _phantom: PhantomData, + } + } + } + + pub fn decompress(&self) -> Option { + use std::io::Read; + if self.compressed { + let mut uncompressed = Vec::new(); + flate2::read::DeflateDecoder::new(&*self.data) + .read_to_end(&mut uncompressed) + .ok()?; + bincode::deserialize(&*uncompressed).ok() + } else { + bincode::deserialize(&*self.data).ok() + } + } +} diff --git a/common/net/src/msg/server.rs b/common/net/src/msg/server.rs index 1a656e08c5..5596945f86 100644 --- a/common/net/src/msg/server.rs +++ b/common/net/src/msg/server.rs @@ -1,4 +1,4 @@ -use super::{world_msg::EconomyInfo, ClientType, EcsCompPacket, PingMsg}; +use super::{world_msg::EconomyInfo, ClientType, CompressedData, EcsCompPacket, PingMsg}; use crate::sync; use common::{ character::{self, CharacterItem}, @@ -6,7 +6,7 @@ use common::{ outcome::Outcome, recipe::RecipeBook, resources::TimeOfDay, - terrain::{Block, SerializedTerrainChunk}, + terrain::{Block, TerrainChunk}, trade::{PendingTrade, SitePrices, TradeId, TradeResult}, uid::Uid, }; @@ -106,9 +106,9 @@ pub enum ServerGeneral { // Ingame related AND terrain stream TerrainChunkUpdate { key: Vec2, - chunk: Result, + chunk: Result, ()>, }, - TerrainBlockUpdates(HashMap, Block>), + TerrainBlockUpdates(CompressedData, Block>>), // Always possible PlayerListUpdate(PlayerListUpdate), /// A message to go into the client chat box. The client is responsible for diff --git a/common/src/terrain/mod.rs b/common/src/terrain/mod.rs index f1fb3a4106..5360b6c7c8 100644 --- a/common/src/terrain/mod.rs +++ b/common/src/terrain/mod.rs @@ -17,7 +17,6 @@ pub use self::{ }; use roots::find_roots_cubic; use serde::{Deserialize, Serialize}; -use tracing::trace; use crate::{ vol::{ReadVol, RectVolSize}, @@ -143,56 +142,6 @@ impl TerrainChunkMeta { pub type TerrainChunk = chonk::Chonk; pub type TerrainGrid = VolGrid2d; -/// Wrapper for custom serialization strategies (e.g. compression) for terrain -/// chunks -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct SerializedTerrainChunk(pub Vec); - -impl SerializedTerrainChunk { - pub fn from_chunk(chunk: &TerrainChunk) -> Self { - let uncompressed = bincode::serialize(chunk) - .expect("bincode serialization can only fail if a byte limit is set"); - #[cfg(feature = "compression")] - { - use flate2::{write::DeflateEncoder, Compression}; - use std::io::Write; - const EXPECT_MSG: &str = - "compression only fails for fallible Read/Write impls (which Vec is not)"; - - let mut encoder = DeflateEncoder::new(Vec::new(), Compression::new(5)); - encoder.write_all(&*uncompressed).expect(EXPECT_MSG); - let compressed = encoder.finish().expect(EXPECT_MSG); - trace!( - "compressed {}, uncompressed {}, ratio {}", - compressed.len(), - uncompressed.len(), - compressed.len() as f32 / uncompressed.len() as f32 - ); - SerializedTerrainChunk(compressed) - } - #[cfg(not(feature = "compression"))] - { - SerializedTerrainChunk(uncompressed) - } - } - - pub fn to_chunk(&self) -> Option { - #[cfg(feature = "compression")] - { - use std::io::Read; - let mut uncompressed = Vec::new(); - flate2::read::DeflateDecoder::new(&*self.0) - .read_to_end(&mut uncompressed) - .ok()?; - bincode::deserialize(&*uncompressed).ok() - } - #[cfg(not(feature = "compression"))] - { - bincode::deserialize(&self.0).ok() - } - } -} - impl TerrainGrid { /// Find a location suitable for spawning an entity near the given /// position (but in the same chunk). diff --git a/server/src/connection_handler.rs b/server/src/connection_handler.rs index 7f0f11a198..1f7bb553af 100644 --- a/server/src/connection_handler.rs +++ b/server/src/connection_handler.rs @@ -105,7 +105,7 @@ impl ConnectionHandler { let mut register_stream = participant.open(3, reliablec, 500).await?; let character_screen_stream = participant.open(3, reliablec, 500).await?; let in_game_stream = participant.open(3, reliablec, 100_000).await?; - let terrain_stream = participant.open(4, reliablec, 20_000).await?; + let terrain_stream = participant.open(4, reliable, 20_000).await?; let server_data = receiver.recv()?; diff --git a/server/src/sys/msg/terrain.rs b/server/src/sys/msg/terrain.rs index 9725910373..009d8dfa6e 100644 --- a/server/src/sys/msg/terrain.rs +++ b/server/src/sys/msg/terrain.rs @@ -2,11 +2,11 @@ use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence}; use common::{ comp::Pos, event::{EventBus, ServerEvent}, - terrain::{SerializedTerrainChunk, TerrainChunkSize, TerrainGrid}, + terrain::{TerrainChunkSize, TerrainGrid}, vol::RectVolSize, }; use common_ecs::{Job, Origin, ParMode, Phase, System}; -use common_net::msg::{ClientGeneral, ServerGeneral}; +use common_net::msg::{ClientGeneral, CompressedData, ServerGeneral}; use rayon::iter::ParallelIterator; use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage}; use std::sync::Arc; @@ -80,7 +80,7 @@ impl<'a> System<'a> for Sys { network_metrics.chunks_served_from_memory.inc(); client.send(ServerGeneral::TerrainChunkUpdate { key, - chunk: Ok(SerializedTerrainChunk::from_chunk(&chunk)), + chunk: Ok(CompressedData::compress(&chunk, 5)), })? }, None => { diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 3f7cbb145f..50c9be9cce 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -10,11 +10,11 @@ use common::{ event::{EventBus, ServerEvent}, generation::get_npc_name, npc::NPC_NAMES, - terrain::{SerializedTerrainChunk, TerrainGrid}, + terrain::TerrainGrid, LoadoutBuilder, SkillSetBuilder, }; use common_ecs::{Job, Origin, Phase, System}; -use common_net::msg::ServerGeneral; +use common_net::msg::{CompressedData, ServerGeneral}; use common_state::TerrainChanges; use comp::Behavior; use specs::{Join, Read, ReadStorage, Write, WriteExpect}; @@ -108,7 +108,7 @@ impl<'a> System<'a> for Sys { if lazy_msg.is_none() { lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { key, - chunk: Ok(SerializedTerrainChunk::from_chunk(&*chunk)), + chunk: Ok(CompressedData::compress(&*chunk, 5)), })); } lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index 9f203874f9..ebbe7bb95c 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -1,10 +1,7 @@ use crate::{client::Client, presence::Presence}; -use common::{ - comp::Pos, - terrain::{SerializedTerrainChunk, TerrainGrid}, -}; +use common::{comp::Pos, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, System}; -use common_net::msg::ServerGeneral; +use common_net::msg::{CompressedData, ServerGeneral}; use common_state::TerrainChanges; use specs::{Join, Read, ReadExpect, ReadStorage}; use std::sync::Arc; @@ -42,7 +39,7 @@ impl<'a> System<'a> for Sys { lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { key: *chunk_key, chunk: Ok(match terrain.get_key(*chunk_key) { - Some(chunk) => SerializedTerrainChunk::from_chunk(&chunk), + Some(chunk) => CompressedData::compress(&chunk, 5), None => break 'chunk, }), })); @@ -54,14 +51,16 @@ impl<'a> System<'a> for Sys { // TODO: Don't send all changed blocks to all clients // Sync changed blocks - let mut lazy_msg = None; - for (_, client) in (&presences, &clients).join() { - if lazy_msg.is_none() { - lazy_msg = Some(client.prepare(ServerGeneral::TerrainBlockUpdates( - terrain_changes.modified_blocks.clone(), - ))); + if !terrain_changes.modified_blocks.is_empty() { + let mut lazy_msg = None; + for (_, client) in (&presences, &clients).join() { + if lazy_msg.is_none() { + lazy_msg = Some(client.prepare(ServerGeneral::TerrainBlockUpdates( + CompressedData::compress(&terrain_changes.modified_blocks, 2), + ))); + } + lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); } - lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); } } }