Move terrain compression code to common_net and disable redundant LZ4 compression on the terrain stream.

This commit is contained in:
Avi Weinstock 2021-04-20 19:33:42 -04:00
parent 6d9de520f3
commit 15e32e5655
11 changed files with 94 additions and 86 deletions

4
Cargo.lock generated
View File

@ -5463,7 +5463,6 @@ dependencies = [
"approx 0.4.0",
"arraygen",
"assets_manager",
"bincode",
"bitflags",
"criterion",
"crossbeam-channel",
@ -5471,7 +5470,6 @@ dependencies = [
"csv",
"dot_vox",
"enum-iterator",
"flate2",
"hashbrown",
"image",
"indexmap",
@ -5535,6 +5533,8 @@ dependencies = [
name = "veloren-common-net"
version = "0.9.0"
dependencies = [
"bincode",
"flate2",
"hashbrown",
"serde",
"specs",

View File

@ -1923,15 +1923,17 @@ impl Client {
fn handle_server_terrain_msg(&mut self, msg: ServerGeneral) -> Result<(), Error> {
match msg {
ServerGeneral::TerrainChunkUpdate { key, chunk } => {
if let Some(chunk) = chunk.ok().and_then(|c| c.to_chunk()) {
if let Some(chunk) = chunk.ok().and_then(|c| c.decompress()) {
self.state.insert_chunk(key, chunk);
}
self.pending_chunks.remove(&key);
},
ServerGeneral::TerrainBlockUpdates(mut blocks) => {
blocks.drain().for_each(|(pos, block)| {
self.state.set_block(pos, block);
});
ServerGeneral::TerrainBlockUpdates(blocks) => {
if let Some(mut blocks) = blocks.decompress() {
blocks.drain().for_each(|(pos, block)| {
self.state.set_block(pos, block);
});
}
},
_ => unreachable!("Not a terrain message"),
}

View File

@ -9,9 +9,8 @@ no-assets = []
tracy = ["common-base/tracy"]
simd = ["vek/platform_intrinsics"]
bin_csv = ["csv", "structopt"]
compression = ["flate2"]
default = ["simd", "compression"]
default = ["simd"]
[dependencies]
@ -24,13 +23,11 @@ serde = { version = "1.0.110", features = ["derive", "rc"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
approx = "0.4.0"
arraygen = "0.1.13"
bincode = "1.3.3"
crossbeam-utils = "0.8.1"
bitflags = "1.2"
crossbeam-channel = "0.5"
enum-iterator = "0.6"
lazy_static = "1.4.0"
flate2 = { version = "1.0.20", optional = true }
num-derive = "0.3"
num-traits = "0.2"
ordered-float = { version = "2.0.1", default-features = false }

View File

@ -14,6 +14,8 @@ default = ["simd"]
common = {package = "veloren-common", path = "../../common"}
#inline_tweak = "1.0.2"
bincode = "1.3.3"
flate2 = "1.0.20"
sum_type = "0.2.0"
vek = { version = "=0.14.1", features = ["serde"] }
tracing = { version = "0.1", default-features = false }

View File

@ -15,6 +15,8 @@ pub use self::{
};
use common::character::CharacterId;
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;
use tracing::trace;
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum PresenceKind {
@ -42,3 +44,60 @@ pub fn validate_chat_msg(msg: &str) -> Result<(), ChatMsgValidationError> {
Err(ChatMsgValidationError::TooLong)
}
}
/// Wrapper for compressed, serialized data (for stuff that doesn't use the
/// default lz4 compression)
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CompressedData<T> {
pub data: Vec<u8>,
compressed: bool,
_phantom: PhantomData<T>,
}
impl<T: Serialize + for<'a> Deserialize<'a>> CompressedData<T> {
pub fn compress(t: &T, level: u32) -> Self {
use flate2::{write::DeflateEncoder, Compression};
use std::io::Write;
let uncompressed = bincode::serialize(t)
.expect("bincode serialization can only fail if a byte limit is set");
if uncompressed.len() >= 32 {
const EXPECT_MSG: &str =
"compression only fails for fallible Read/Write impls (which Vec<u8> is not)";
let mut encoder = DeflateEncoder::new(Vec::new(), Compression::new(level));
encoder.write_all(&*uncompressed).expect(EXPECT_MSG);
let compressed = encoder.finish().expect(EXPECT_MSG);
trace!(
"compressed {}, uncompressed {}, ratio {}",
compressed.len(),
uncompressed.len(),
compressed.len() as f32 / uncompressed.len() as f32
);
CompressedData {
data: compressed,
compressed: true,
_phantom: PhantomData,
}
} else {
CompressedData {
data: uncompressed,
compressed: false,
_phantom: PhantomData,
}
}
}
pub fn decompress(&self) -> Option<T> {
use std::io::Read;
if self.compressed {
let mut uncompressed = Vec::new();
flate2::read::DeflateDecoder::new(&*self.data)
.read_to_end(&mut uncompressed)
.ok()?;
bincode::deserialize(&*uncompressed).ok()
} else {
bincode::deserialize(&*self.data).ok()
}
}
}

View File

@ -1,4 +1,4 @@
use super::{world_msg::EconomyInfo, ClientType, EcsCompPacket, PingMsg};
use super::{world_msg::EconomyInfo, ClientType, CompressedData, EcsCompPacket, PingMsg};
use crate::sync;
use common::{
character::{self, CharacterItem},
@ -6,7 +6,7 @@ use common::{
outcome::Outcome,
recipe::RecipeBook,
resources::TimeOfDay,
terrain::{Block, SerializedTerrainChunk},
terrain::{Block, TerrainChunk},
trade::{PendingTrade, SitePrices, TradeId, TradeResult},
uid::Uid,
};
@ -106,9 +106,9 @@ pub enum ServerGeneral {
// Ingame related AND terrain stream
TerrainChunkUpdate {
key: Vec2<i32>,
chunk: Result<SerializedTerrainChunk, ()>,
chunk: Result<CompressedData<TerrainChunk>, ()>,
},
TerrainBlockUpdates(HashMap<Vec3<i32>, Block>),
TerrainBlockUpdates(CompressedData<HashMap<Vec3<i32>, Block>>),
// Always possible
PlayerListUpdate(PlayerListUpdate),
/// A message to go into the client chat box. The client is responsible for

View File

@ -17,7 +17,6 @@ pub use self::{
};
use roots::find_roots_cubic;
use serde::{Deserialize, Serialize};
use tracing::trace;
use crate::{
vol::{ReadVol, RectVolSize},
@ -143,56 +142,6 @@ impl TerrainChunkMeta {
pub type TerrainChunk = chonk::Chonk<Block, TerrainChunkSize, TerrainChunkMeta>;
pub type TerrainGrid = VolGrid2d<TerrainChunk>;
/// Wrapper for custom serialization strategies (e.g. compression) for terrain
/// chunks
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SerializedTerrainChunk(pub Vec<u8>);
impl SerializedTerrainChunk {
pub fn from_chunk(chunk: &TerrainChunk) -> Self {
let uncompressed = bincode::serialize(chunk)
.expect("bincode serialization can only fail if a byte limit is set");
#[cfg(feature = "compression")]
{
use flate2::{write::DeflateEncoder, Compression};
use std::io::Write;
const EXPECT_MSG: &str =
"compression only fails for fallible Read/Write impls (which Vec<u8> is not)";
let mut encoder = DeflateEncoder::new(Vec::new(), Compression::new(5));
encoder.write_all(&*uncompressed).expect(EXPECT_MSG);
let compressed = encoder.finish().expect(EXPECT_MSG);
trace!(
"compressed {}, uncompressed {}, ratio {}",
compressed.len(),
uncompressed.len(),
compressed.len() as f32 / uncompressed.len() as f32
);
SerializedTerrainChunk(compressed)
}
#[cfg(not(feature = "compression"))]
{
SerializedTerrainChunk(uncompressed)
}
}
pub fn to_chunk(&self) -> Option<TerrainChunk> {
#[cfg(feature = "compression")]
{
use std::io::Read;
let mut uncompressed = Vec::new();
flate2::read::DeflateDecoder::new(&*self.0)
.read_to_end(&mut uncompressed)
.ok()?;
bincode::deserialize(&*uncompressed).ok()
}
#[cfg(not(feature = "compression"))]
{
bincode::deserialize(&self.0).ok()
}
}
}
impl TerrainGrid {
/// Find a location suitable for spawning an entity near the given
/// position (but in the same chunk).

View File

@ -105,7 +105,7 @@ impl ConnectionHandler {
let mut register_stream = participant.open(3, reliablec, 500).await?;
let character_screen_stream = participant.open(3, reliablec, 500).await?;
let in_game_stream = participant.open(3, reliablec, 100_000).await?;
let terrain_stream = participant.open(4, reliablec, 20_000).await?;
let terrain_stream = participant.open(4, reliable, 20_000).await?;
let server_data = receiver.recv()?;

View File

@ -2,11 +2,11 @@ use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence};
use common::{
comp::Pos,
event::{EventBus, ServerEvent},
terrain::{SerializedTerrainChunk, TerrainChunkSize, TerrainGrid},
terrain::{TerrainChunkSize, TerrainGrid},
vol::RectVolSize,
};
use common_ecs::{Job, Origin, ParMode, Phase, System};
use common_net::msg::{ClientGeneral, ServerGeneral};
use common_net::msg::{ClientGeneral, CompressedData, ServerGeneral};
use rayon::iter::ParallelIterator;
use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage};
use std::sync::Arc;
@ -80,7 +80,7 @@ impl<'a> System<'a> for Sys {
network_metrics.chunks_served_from_memory.inc();
client.send(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(SerializedTerrainChunk::from_chunk(&chunk)),
chunk: Ok(CompressedData::compress(&chunk, 5)),
})?
},
None => {

View File

@ -10,11 +10,11 @@ use common::{
event::{EventBus, ServerEvent},
generation::get_npc_name,
npc::NPC_NAMES,
terrain::{SerializedTerrainChunk, TerrainGrid},
terrain::TerrainGrid,
LoadoutBuilder, SkillSetBuilder,
};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::ServerGeneral;
use common_net::msg::{CompressedData, ServerGeneral};
use common_state::TerrainChanges;
use comp::Behavior;
use specs::{Join, Read, ReadStorage, Write, WriteExpect};
@ -108,7 +108,7 @@ impl<'a> System<'a> for Sys {
if lazy_msg.is_none() {
lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(SerializedTerrainChunk::from_chunk(&*chunk)),
chunk: Ok(CompressedData::compress(&*chunk, 5)),
}));
}
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));

View File

@ -1,10 +1,7 @@
use crate::{client::Client, presence::Presence};
use common::{
comp::Pos,
terrain::{SerializedTerrainChunk, TerrainGrid},
};
use common::{comp::Pos, terrain::TerrainGrid};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::ServerGeneral;
use common_net::msg::{CompressedData, ServerGeneral};
use common_state::TerrainChanges;
use specs::{Join, Read, ReadExpect, ReadStorage};
use std::sync::Arc;
@ -42,7 +39,7 @@ impl<'a> System<'a> for Sys {
lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate {
key: *chunk_key,
chunk: Ok(match terrain.get_key(*chunk_key) {
Some(chunk) => SerializedTerrainChunk::from_chunk(&chunk),
Some(chunk) => CompressedData::compress(&chunk, 5),
None => break 'chunk,
}),
}));
@ -54,14 +51,16 @@ impl<'a> System<'a> for Sys {
// TODO: Don't send all changed blocks to all clients
// Sync changed blocks
let mut lazy_msg = None;
for (_, client) in (&presences, &clients).join() {
if lazy_msg.is_none() {
lazy_msg = Some(client.prepare(ServerGeneral::TerrainBlockUpdates(
terrain_changes.modified_blocks.clone(),
)));
if !terrain_changes.modified_blocks.is_empty() {
let mut lazy_msg = None;
for (_, client) in (&presences, &clients).join() {
if lazy_msg.is_none() {
lazy_msg = Some(client.prepare(ServerGeneral::TerrainBlockUpdates(
CompressedData::compress(&terrain_changes.modified_blocks, 2),
)));
}
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));
}
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));
}
}
}