From 67ae3494c405bfc68afe6f250d002c00c2c9c39a Mon Sep 17 00:00:00 2001 From: Avi Weinstock Date: Tue, 20 Apr 2021 17:33:46 -0400 Subject: [PATCH 1/5] Compress terrain chunks with deflate. Includes a benchmark showing that this makes them around 70% smaller, and is the same speed as LZ4. --- CHANGELOG.md | 1 + Cargo.lock | 17 +- client/src/lib.rs | 2 +- common/Cargo.toml | 5 +- common/net/src/msg/server.rs | 4 +- common/src/terrain/mod.rs | 51 ++++ server/src/sys/msg/terrain.rs | 4 +- server/src/sys/terrain.rs | 24 +- server/src/sys/terrain_sync.rs | 9 +- world/Cargo.toml | 11 + world/src/bin/chunk_compression_benchmarks.rs | 246 ++++++++++++++++++ 11 files changed, 363 insertions(+), 11 deletions(-) create mode 100644 world/src/bin/chunk_compression_benchmarks.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index cb78c2f892..65067d5942 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Entities now have density - Buoyancy is calculated from the difference in density between an entity and surrounding fluid - Drag is now calculated based on physical properties +- Terrain chunks are now deflate-compressed when sent over the network. ### Changed diff --git a/Cargo.lock b/Cargo.lock index ef8a66c119..e41c084765 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1308,6 +1308,15 @@ dependencies = [ "byteorder", ] +[[package]] +name = "deflate" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f95bf05dffba6e6cce8dfbb30def788154949ccd9aed761b472119c21e01c70" +dependencies = [ + "adler32", +] + [[package]] name = "derivative" version = "2.2.0" @@ -3765,7 +3774,7 @@ checksum = "3c3287920cb847dee3de33d301c463fba14dda99db24214ddf93f83d3021f4c6" dependencies = [ "bitflags", "crc32fast", - "deflate", + "deflate 0.8.6", "miniz_oxide 0.3.7", ] @@ -5454,6 +5463,7 @@ dependencies = [ "approx 0.4.0", "arraygen", "assets_manager", + "bincode", "bitflags", "criterion", "crossbeam-channel", @@ -5461,6 +5471,7 @@ dependencies = [ "csv", "dot_vox", "enum-iterator", + "flate2", "hashbrown", "image", "indexmap", @@ -5809,12 +5820,15 @@ dependencies = [ "bincode", "bitvec", "criterion", + "deflate 0.9.1", "enum-iterator", + "flate2", "fxhash", "hashbrown", "image", "itertools 0.10.0", "lazy_static", + "lz-fear", "minifb", "noise", "num 0.4.0", @@ -5831,6 +5845,7 @@ dependencies = [ "tracing-subscriber", "vek", "veloren-common", + "veloren-common-frontend", "veloren-common-net", ] diff --git a/client/src/lib.rs b/client/src/lib.rs index 39ce8d113e..7a6ecf3d41 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1923,7 +1923,7 @@ impl Client { fn handle_server_terrain_msg(&mut self, msg: ServerGeneral) -> Result<(), Error> { match msg { ServerGeneral::TerrainChunkUpdate { key, chunk } => { - if let Ok(chunk) = chunk { + if let Some(chunk) = chunk.ok().and_then(|c| c.to_chunk()) { self.state.insert_chunk(key, chunk); } self.pending_chunks.remove(&key); diff --git a/common/Cargo.toml b/common/Cargo.toml index 8c3ef5d6c9..5028493a73 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -9,8 +9,9 @@ no-assets = [] tracy = ["common-base/tracy"] simd = ["vek/platform_intrinsics"] bin_csv = ["csv", "structopt"] +compression = ["flate2"] -default = ["simd"] +default = ["simd", "compression"] [dependencies] @@ -23,11 +24,13 @@ serde = { version = "1.0.110", features = ["derive", "rc"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] approx = "0.4.0" arraygen = "0.1.13" +bincode = "1.3.3" crossbeam-utils = "0.8.1" bitflags = "1.2" crossbeam-channel = "0.5" enum-iterator = "0.6" lazy_static = "1.4.0" +flate2 = { version = "1.0.20", optional = true } num-derive = "0.3" num-traits = "0.2" ordered-float = { version = "2.0.1", default-features = false } diff --git a/common/net/src/msg/server.rs b/common/net/src/msg/server.rs index 239d3e10d0..1a656e08c5 100644 --- a/common/net/src/msg/server.rs +++ b/common/net/src/msg/server.rs @@ -6,7 +6,7 @@ use common::{ outcome::Outcome, recipe::RecipeBook, resources::TimeOfDay, - terrain::{Block, TerrainChunk}, + terrain::{Block, SerializedTerrainChunk}, trade::{PendingTrade, SitePrices, TradeId, TradeResult}, uid::Uid, }; @@ -106,7 +106,7 @@ pub enum ServerGeneral { // Ingame related AND terrain stream TerrainChunkUpdate { key: Vec2, - chunk: Result, ()>, + chunk: Result, }, TerrainBlockUpdates(HashMap, Block>), // Always possible diff --git a/common/src/terrain/mod.rs b/common/src/terrain/mod.rs index 5360b6c7c8..f1fb3a4106 100644 --- a/common/src/terrain/mod.rs +++ b/common/src/terrain/mod.rs @@ -17,6 +17,7 @@ pub use self::{ }; use roots::find_roots_cubic; use serde::{Deserialize, Serialize}; +use tracing::trace; use crate::{ vol::{ReadVol, RectVolSize}, @@ -142,6 +143,56 @@ impl TerrainChunkMeta { pub type TerrainChunk = chonk::Chonk; pub type TerrainGrid = VolGrid2d; +/// Wrapper for custom serialization strategies (e.g. compression) for terrain +/// chunks +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SerializedTerrainChunk(pub Vec); + +impl SerializedTerrainChunk { + pub fn from_chunk(chunk: &TerrainChunk) -> Self { + let uncompressed = bincode::serialize(chunk) + .expect("bincode serialization can only fail if a byte limit is set"); + #[cfg(feature = "compression")] + { + use flate2::{write::DeflateEncoder, Compression}; + use std::io::Write; + const EXPECT_MSG: &str = + "compression only fails for fallible Read/Write impls (which Vec is not)"; + + let mut encoder = DeflateEncoder::new(Vec::new(), Compression::new(5)); + encoder.write_all(&*uncompressed).expect(EXPECT_MSG); + let compressed = encoder.finish().expect(EXPECT_MSG); + trace!( + "compressed {}, uncompressed {}, ratio {}", + compressed.len(), + uncompressed.len(), + compressed.len() as f32 / uncompressed.len() as f32 + ); + SerializedTerrainChunk(compressed) + } + #[cfg(not(feature = "compression"))] + { + SerializedTerrainChunk(uncompressed) + } + } + + pub fn to_chunk(&self) -> Option { + #[cfg(feature = "compression")] + { + use std::io::Read; + let mut uncompressed = Vec::new(); + flate2::read::DeflateDecoder::new(&*self.0) + .read_to_end(&mut uncompressed) + .ok()?; + bincode::deserialize(&*uncompressed).ok() + } + #[cfg(not(feature = "compression"))] + { + bincode::deserialize(&self.0).ok() + } + } +} + impl TerrainGrid { /// Find a location suitable for spawning an entity near the given /// position (but in the same chunk). diff --git a/server/src/sys/msg/terrain.rs b/server/src/sys/msg/terrain.rs index 1dd5f82ecb..9725910373 100644 --- a/server/src/sys/msg/terrain.rs +++ b/server/src/sys/msg/terrain.rs @@ -2,7 +2,7 @@ use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence}; use common::{ comp::Pos, event::{EventBus, ServerEvent}, - terrain::{TerrainChunkSize, TerrainGrid}, + terrain::{SerializedTerrainChunk, TerrainChunkSize, TerrainGrid}, vol::RectVolSize, }; use common_ecs::{Job, Origin, ParMode, Phase, System}; @@ -80,7 +80,7 @@ impl<'a> System<'a> for Sys { network_metrics.chunks_served_from_memory.inc(); client.send(ServerGeneral::TerrainChunkUpdate { key, - chunk: Ok(Arc::clone(chunk)), + chunk: Ok(SerializedTerrainChunk::from_chunk(&chunk)), })? }, None => { diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 1d3f224828..3f7cbb145f 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -10,7 +10,7 @@ use common::{ event::{EventBus, ServerEvent}, generation::get_npc_name, npc::NPC_NAMES, - terrain::TerrainGrid, + terrain::{SerializedTerrainChunk, TerrainGrid}, LoadoutBuilder, SkillSetBuilder, }; use common_ecs::{Job, Origin, Phase, System}; @@ -93,6 +93,28 @@ impl<'a> System<'a> for Sys { // Add to list of chunks to send to nearby players. new_chunks.push((key, Arc::clone(&chunk))); + // Send the chunk to all nearby players. + let mut lazy_msg = None; + for (presence, pos, client) in (&presences, &positions, &clients).join() { + let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32)); + // Subtract 2 from the offset before computing squared magnitude + // 1 since chunks need neighbors to be meshed + // 1 to act as a buffer if the player moves in that direction + let adjusted_dist_sqr = (chunk_pos - key) + .map(|e: i32| (e.abs() as u32).saturating_sub(2)) + .magnitude_squared(); + + if adjusted_dist_sqr <= presence.view_distance.pow(2) { + if lazy_msg.is_none() { + lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { + key, + chunk: Ok(SerializedTerrainChunk::from_chunk(&*chunk)), + })); + } + lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); + } + } + // TODO: code duplication for chunk insertion between here and state.rs // Insert the chunk into terrain changes if terrain.insert(key, chunk).is_some() { diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index 77f7cd1a86..9f203874f9 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -1,5 +1,8 @@ use crate::{client::Client, presence::Presence}; -use common::{comp::Pos, terrain::TerrainGrid}; +use common::{ + comp::Pos, + terrain::{SerializedTerrainChunk, TerrainGrid}, +}; use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::ServerGeneral; use common_state::TerrainChanges; @@ -38,8 +41,8 @@ impl<'a> System<'a> for Sys { if lazy_msg.is_none() { lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { key: *chunk_key, - chunk: Ok(match terrain.get_key_arc(*chunk_key) { - Some(chunk) => Arc::clone(chunk), + chunk: Ok(match terrain.get_key(*chunk_key) { + Some(chunk) => SerializedTerrainChunk::from_chunk(&chunk), None => break 'chunk, }), })); diff --git a/world/Cargo.toml b/world/Cargo.toml index bc82bd6e45..e08f498f08 100644 --- a/world/Cargo.toml +++ b/world/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [features] tracy = ["common/tracy", "common-net/tracy"] simd = ["vek/platform_intrinsics"] +bin_compression = ["lz-fear", "deflate", "flate2", "common-frontend"] default = ["simd"] @@ -37,6 +38,12 @@ ron = { version = "0.6", default-features = false } assets_manager = {version = "0.4.3", features = ["ron"]} #inline_tweak = "1.0.2" +# compression benchmarks +lz-fear = { version = "0.1.1", optional = true } +deflate = { version = "0.9.1", optional = true } +flate2 = { version = "1.0.20", optional = true } +common-frontend = { package = "veloren-common-frontend", path = "../common/frontend", optional = true } + [dev-dependencies] criterion = "0.3" @@ -48,3 +55,7 @@ structopt = "0.3" [[bench]] harness = false name = "tree" + +[[bin]] +name = "chunk_compression_benchmarks" +required-features = ["bin_compression"] diff --git a/world/src/bin/chunk_compression_benchmarks.rs b/world/src/bin/chunk_compression_benchmarks.rs new file mode 100644 index 0000000000..a20cb03e53 --- /dev/null +++ b/world/src/bin/chunk_compression_benchmarks.rs @@ -0,0 +1,246 @@ +use common::{ + terrain::{chonk::Chonk, Block, BlockKind, SpriteKind}, + vol::{IntoVolIterator, RectVolSize, SizedVol, WriteVol}, + volumes::dyna::{Access, ColumnAccess, Dyna}, +}; +use hashbrown::HashMap; +use std::{ + io::{Read, Write}, + time::Instant, +}; +use tracing::{debug, trace}; +use vek::*; +use veloren_world::{ + sim::{FileOpts, WorldOpts, DEFAULT_WORLD_MAP}, + World, +}; + +fn lz4_with_dictionary(data: &[u8], dictionary: &[u8]) -> Vec { + let mut compressed = Vec::new(); + lz_fear::CompressionSettings::default() + .dictionary(0, &dictionary) + .compress(data, &mut compressed) + .unwrap(); + compressed +} + +#[allow(dead_code)] +fn unlz4_with_dictionary(data: &[u8], dictionary: &[u8]) -> Option> { + lz_fear::LZ4FrameReader::new(data).ok().and_then(|r| { + let mut uncompressed = Vec::new(); + r.into_read_with_dictionary(dictionary) + .read_to_end(&mut uncompressed) + .ok()?; + bincode::deserialize(&*uncompressed).ok() + }) +} + +#[allow(dead_code)] +fn do_deflate(data: &[u8]) -> Vec { + use deflate::{write::DeflateEncoder, Compression}; + + let mut encoder = DeflateEncoder::new(Vec::new(), Compression::Fast); + encoder.write_all(data).expect("Write error!"); + let compressed_data = encoder.finish().expect("Failed to finish compression!"); + compressed_data +} + +fn do_deflate_flate2(data: &[u8]) -> Vec { + use flate2::{write::DeflateEncoder, Compression}; + + let mut encoder = DeflateEncoder::new(Vec::new(), Compression::new(5)); + encoder.write_all(data).expect("Write error!"); + let compressed_data = encoder.finish().expect("Failed to finish compression!"); + compressed_data +} + +fn chonk_to_dyna( + chonk: &Chonk, + block: V, +) -> Dyna { + let mut dyna = Dyna::::filled( + Vec3::new( + S::RECT_SIZE.x, + S::RECT_SIZE.y, + (chonk.get_max_z() - chonk.get_min_z()) as u32, + ), + block, + chonk.meta().clone(), + ); + for (pos, block) in chonk.vol_iter( + Vec3::new(0, 0, chonk.get_min_z()), + Vec3::new(S::RECT_SIZE.x as _, S::RECT_SIZE.y as _, chonk.get_max_z()), + ) { + dyna.set(pos - chonk.get_min_z() * Vec3::unit_z(), block.clone()) + .expect("a bug here represents the arithmetic being wrong"); + } + dyna +} + +fn channelize_dyna( + dyna: &Dyna, +) -> ( + Dyna, + Vec, + Vec, + Vec, + Vec, +) { + let mut blocks = Dyna::filled(dyna.sz, BlockKind::Air, dyna.metadata().clone()); + let (mut r, mut g, mut b, mut sprites) = (Vec::new(), Vec::new(), Vec::new(), Vec::new()); + for (pos, block) in dyna.vol_iter(dyna.lower_bound(), dyna.upper_bound()) { + blocks.set(pos, **block).unwrap(); + match (block.get_color(), block.get_sprite()) { + (Some(rgb), None) => { + r.push(rgb.r); + g.push(rgb.g); + b.push(rgb.b); + }, + (None, Some(spritekind)) => { + sprites.push(spritekind); + }, + _ => panic!( + "attr being used for color vs sprite is mutually exclusive (and that's required \ + for this translation to be lossless), but there's no way to guarantee that at \ + the type level with Block's public API" + ), + } + } + (blocks, r, g, b, sprites) +} + +fn histogram_to_dictionary(histogram: &HashMap, usize>, dictionary: &mut Vec) { + let mut tmp: Vec<(Vec, usize)> = histogram.iter().map(|(k, v)| (k.clone(), *v)).collect(); + tmp.sort_by_key(|(_, count)| *count); + debug!("{:?}", tmp.last()); + let mut i = 0; + let mut j = tmp.len() - 1; + while i < dictionary.len() && j > 0 { + let (k, v) = &tmp[j]; + let dlen = dictionary.len(); + let n = (i + k.len()).min(dlen); + dictionary[i..n].copy_from_slice(&k[0..k.len().min(dlen - i)]); + debug!("{}: {}: {:?}", tmp.len() - j, v, k); + j -= 1; + i = n; + } +} + +fn main() { + common_frontend::init_stdout(None); + println!("Loading world"); + let (world, index) = World::generate(59686, WorldOpts { + seed_elements: true, + world_file: FileOpts::LoadAsset(DEFAULT_WORLD_MAP.into()), + ..WorldOpts::default() + }); + println!("Loaded world"); + let mut histogram: HashMap, usize> = HashMap::new(); + let mut histogram2: HashMap, usize> = HashMap::new(); + let mut dictionary = vec![0xffu8; 1 << 16]; + let mut dictionary2 = vec![0xffu8; 1 << 16]; + let k = 32; + let sz = world.sim().get_size(); + let mut totals = [0.0; 5]; + let mut total_timings = [0.0; 2]; + let mut count = 0; + for y in 1..sz.y { + for x in 1..sz.x { + let chunk = + world.generate_chunk(index.as_index_ref(), Vec2::new(x as _, y as _), || false); + if let Ok((chunk, _)) = chunk { + let uncompressed = bincode::serialize(&chunk).unwrap(); + for w in uncompressed.windows(k) { + *histogram.entry(w.to_vec()).or_default() += 1; + } + if x % 128 == 0 { + histogram_to_dictionary(&histogram, &mut dictionary); + } + let lz4chonk_pre = Instant::now(); + let lz4_chonk = lz4_with_dictionary(&bincode::serialize(&chunk).unwrap(), &[]); + let lz4chonk_post = Instant::now(); + //let lz4_dict_chonk = SerializedTerrainChunk::from_chunk(&chunk, + // &*dictionary); + + let deflatechonk_pre = Instant::now(); + let deflate_chonk = do_deflate_flate2(&bincode::serialize(&chunk).unwrap()); + let deflatechonk_post = Instant::now(); + + let dyna: Dyna<_, _, ColumnAccess> = chonk_to_dyna(&chunk, Block::empty()); + let ser_dyna = bincode::serialize(&dyna).unwrap(); + for w in ser_dyna.windows(k) { + *histogram2.entry(w.to_vec()).or_default() += 1; + } + if x % 128 == 0 { + histogram_to_dictionary(&histogram2, &mut dictionary2); + } + let lz4_dyna = lz4_with_dictionary(&*ser_dyna, &[]); + //let lz4_dict_dyna = lz4_with_dictionary(&*ser_dyna, &dictionary2); + let deflate_dyna = do_deflate(&*ser_dyna); + let deflate_channeled_dyna = + do_deflate_flate2(&bincode::serialize(&channelize_dyna(&dyna)).unwrap()); + let n = uncompressed.len(); + let sizes = [ + lz4_chonk.len() as f32 / n as f32, + deflate_chonk.len() as f32 / n as f32, + lz4_dyna.len() as f32 / n as f32, + deflate_dyna.len() as f32 / n as f32, + deflate_channeled_dyna.len() as f32 / n as f32, + ]; + let i = sizes + .iter() + .enumerate() + .fold((1.0, 0), |(best, i), (j, ratio)| { + if ratio < &best { + (*ratio, j) + } else { + (best, i) + } + }) + .1; + let timings = [ + (lz4chonk_post - lz4chonk_pre).subsec_nanos(), + (deflatechonk_post - deflatechonk_pre).subsec_nanos(), + ]; + trace!( + "{} {}: uncompressed: {}, {:?} {} {:?}", + x, + y, + n, + sizes, + i, + timings + ); + for i in 0..5 { + totals[i] += sizes[i]; + } + for i in 0..2 { + total_timings[i] += timings[i] as f32; + } + count += 1; + } + if x % 64 == 0 { + println!("Chunks processed: {}\n", count); + println!("Average lz4_chonk: {}", totals[0] / count as f32); + println!("Average deflate_chonk: {}", totals[1] / count as f32); + println!("Average lz4_dyna: {}", totals[2] / count as f32); + println!("Average deflate_dyna: {}", totals[3] / count as f32); + println!( + "Average deflate_channeled_dyna: {}", + totals[4] / count as f32 + ); + println!(""); + println!( + "Average lz4_chonk nanos : {:02}", + total_timings[0] / count as f32 + ); + println!( + "Average deflate_chonk nanos: {:02}", + total_timings[1] / count as f32 + ); + println!("-----"); + } + } + histogram.clear(); + } +} From c1c7f748ef739e8e5173f0b6675a1b7bf5c18d09 Mon Sep 17 00:00:00 2001 From: Avi Weinstock Date: Tue, 20 Apr 2021 19:33:42 -0400 Subject: [PATCH 2/5] Move terrain compression code to common_net and disable redundant LZ4 compression on the terrain stream. --- Cargo.lock | 4 +-- client/src/lib.rs | 12 ++++--- common/Cargo.toml | 5 +-- common/net/Cargo.toml | 2 ++ common/net/src/msg/mod.rs | 59 ++++++++++++++++++++++++++++++++ common/net/src/msg/server.rs | 8 ++--- common/src/terrain/mod.rs | 51 --------------------------- server/src/connection_handler.rs | 2 +- server/src/sys/msg/terrain.rs | 6 ++-- server/src/sys/terrain.rs | 6 ++-- server/src/sys/terrain_sync.rs | 25 +++++++------- 11 files changed, 94 insertions(+), 86 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e41c084765..c36ba62d0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5463,7 +5463,6 @@ dependencies = [ "approx 0.4.0", "arraygen", "assets_manager", - "bincode", "bitflags", "criterion", "crossbeam-channel", @@ -5471,7 +5470,6 @@ dependencies = [ "csv", "dot_vox", "enum-iterator", - "flate2", "hashbrown", "image", "indexmap", @@ -5535,6 +5533,8 @@ dependencies = [ name = "veloren-common-net" version = "0.9.0" dependencies = [ + "bincode", + "flate2", "hashbrown", "serde", "specs", diff --git a/client/src/lib.rs b/client/src/lib.rs index 7a6ecf3d41..d0108121fc 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1923,15 +1923,17 @@ impl Client { fn handle_server_terrain_msg(&mut self, msg: ServerGeneral) -> Result<(), Error> { match msg { ServerGeneral::TerrainChunkUpdate { key, chunk } => { - if let Some(chunk) = chunk.ok().and_then(|c| c.to_chunk()) { + if let Some(chunk) = chunk.ok().and_then(|c| c.decompress()) { self.state.insert_chunk(key, chunk); } self.pending_chunks.remove(&key); }, - ServerGeneral::TerrainBlockUpdates(mut blocks) => { - blocks.drain().for_each(|(pos, block)| { - self.state.set_block(pos, block); - }); + ServerGeneral::TerrainBlockUpdates(blocks) => { + if let Some(mut blocks) = blocks.decompress() { + blocks.drain().for_each(|(pos, block)| { + self.state.set_block(pos, block); + }); + } }, _ => unreachable!("Not a terrain message"), } diff --git a/common/Cargo.toml b/common/Cargo.toml index 5028493a73..8c3ef5d6c9 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -9,9 +9,8 @@ no-assets = [] tracy = ["common-base/tracy"] simd = ["vek/platform_intrinsics"] bin_csv = ["csv", "structopt"] -compression = ["flate2"] -default = ["simd", "compression"] +default = ["simd"] [dependencies] @@ -24,13 +23,11 @@ serde = { version = "1.0.110", features = ["derive", "rc"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] approx = "0.4.0" arraygen = "0.1.13" -bincode = "1.3.3" crossbeam-utils = "0.8.1" bitflags = "1.2" crossbeam-channel = "0.5" enum-iterator = "0.6" lazy_static = "1.4.0" -flate2 = { version = "1.0.20", optional = true } num-derive = "0.3" num-traits = "0.2" ordered-float = { version = "2.0.1", default-features = false } diff --git a/common/net/Cargo.toml b/common/net/Cargo.toml index 209c2d6a18..0661f79697 100644 --- a/common/net/Cargo.toml +++ b/common/net/Cargo.toml @@ -14,6 +14,8 @@ default = ["simd"] common = {package = "veloren-common", path = "../../common"} #inline_tweak = "1.0.2" +bincode = "1.3.3" +flate2 = "1.0.20" sum_type = "0.2.0" vek = { version = "=0.14.1", features = ["serde"] } tracing = { version = "0.1", default-features = false } diff --git a/common/net/src/msg/mod.rs b/common/net/src/msg/mod.rs index 667038ac78..2d2421f9cc 100644 --- a/common/net/src/msg/mod.rs +++ b/common/net/src/msg/mod.rs @@ -15,6 +15,8 @@ pub use self::{ }; use common::character::CharacterId; use serde::{Deserialize, Serialize}; +use std::marker::PhantomData; +use tracing::trace; #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] pub enum PresenceKind { @@ -42,3 +44,60 @@ pub fn validate_chat_msg(msg: &str) -> Result<(), ChatMsgValidationError> { Err(ChatMsgValidationError::TooLong) } } + +/// Wrapper for compressed, serialized data (for stuff that doesn't use the +/// default lz4 compression) +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CompressedData { + pub data: Vec, + compressed: bool, + _phantom: PhantomData, +} + +impl Deserialize<'a>> CompressedData { + pub fn compress(t: &T, level: u32) -> Self { + use flate2::{write::DeflateEncoder, Compression}; + use std::io::Write; + let uncompressed = bincode::serialize(t) + .expect("bincode serialization can only fail if a byte limit is set"); + + if uncompressed.len() >= 32 { + const EXPECT_MSG: &str = + "compression only fails for fallible Read/Write impls (which Vec is not)"; + + let mut encoder = DeflateEncoder::new(Vec::new(), Compression::new(level)); + encoder.write_all(&*uncompressed).expect(EXPECT_MSG); + let compressed = encoder.finish().expect(EXPECT_MSG); + trace!( + "compressed {}, uncompressed {}, ratio {}", + compressed.len(), + uncompressed.len(), + compressed.len() as f32 / uncompressed.len() as f32 + ); + CompressedData { + data: compressed, + compressed: true, + _phantom: PhantomData, + } + } else { + CompressedData { + data: uncompressed, + compressed: false, + _phantom: PhantomData, + } + } + } + + pub fn decompress(&self) -> Option { + use std::io::Read; + if self.compressed { + let mut uncompressed = Vec::new(); + flate2::read::DeflateDecoder::new(&*self.data) + .read_to_end(&mut uncompressed) + .ok()?; + bincode::deserialize(&*uncompressed).ok() + } else { + bincode::deserialize(&*self.data).ok() + } + } +} diff --git a/common/net/src/msg/server.rs b/common/net/src/msg/server.rs index 1a656e08c5..5596945f86 100644 --- a/common/net/src/msg/server.rs +++ b/common/net/src/msg/server.rs @@ -1,4 +1,4 @@ -use super::{world_msg::EconomyInfo, ClientType, EcsCompPacket, PingMsg}; +use super::{world_msg::EconomyInfo, ClientType, CompressedData, EcsCompPacket, PingMsg}; use crate::sync; use common::{ character::{self, CharacterItem}, @@ -6,7 +6,7 @@ use common::{ outcome::Outcome, recipe::RecipeBook, resources::TimeOfDay, - terrain::{Block, SerializedTerrainChunk}, + terrain::{Block, TerrainChunk}, trade::{PendingTrade, SitePrices, TradeId, TradeResult}, uid::Uid, }; @@ -106,9 +106,9 @@ pub enum ServerGeneral { // Ingame related AND terrain stream TerrainChunkUpdate { key: Vec2, - chunk: Result, + chunk: Result, ()>, }, - TerrainBlockUpdates(HashMap, Block>), + TerrainBlockUpdates(CompressedData, Block>>), // Always possible PlayerListUpdate(PlayerListUpdate), /// A message to go into the client chat box. The client is responsible for diff --git a/common/src/terrain/mod.rs b/common/src/terrain/mod.rs index f1fb3a4106..5360b6c7c8 100644 --- a/common/src/terrain/mod.rs +++ b/common/src/terrain/mod.rs @@ -17,7 +17,6 @@ pub use self::{ }; use roots::find_roots_cubic; use serde::{Deserialize, Serialize}; -use tracing::trace; use crate::{ vol::{ReadVol, RectVolSize}, @@ -143,56 +142,6 @@ impl TerrainChunkMeta { pub type TerrainChunk = chonk::Chonk; pub type TerrainGrid = VolGrid2d; -/// Wrapper for custom serialization strategies (e.g. compression) for terrain -/// chunks -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct SerializedTerrainChunk(pub Vec); - -impl SerializedTerrainChunk { - pub fn from_chunk(chunk: &TerrainChunk) -> Self { - let uncompressed = bincode::serialize(chunk) - .expect("bincode serialization can only fail if a byte limit is set"); - #[cfg(feature = "compression")] - { - use flate2::{write::DeflateEncoder, Compression}; - use std::io::Write; - const EXPECT_MSG: &str = - "compression only fails for fallible Read/Write impls (which Vec is not)"; - - let mut encoder = DeflateEncoder::new(Vec::new(), Compression::new(5)); - encoder.write_all(&*uncompressed).expect(EXPECT_MSG); - let compressed = encoder.finish().expect(EXPECT_MSG); - trace!( - "compressed {}, uncompressed {}, ratio {}", - compressed.len(), - uncompressed.len(), - compressed.len() as f32 / uncompressed.len() as f32 - ); - SerializedTerrainChunk(compressed) - } - #[cfg(not(feature = "compression"))] - { - SerializedTerrainChunk(uncompressed) - } - } - - pub fn to_chunk(&self) -> Option { - #[cfg(feature = "compression")] - { - use std::io::Read; - let mut uncompressed = Vec::new(); - flate2::read::DeflateDecoder::new(&*self.0) - .read_to_end(&mut uncompressed) - .ok()?; - bincode::deserialize(&*uncompressed).ok() - } - #[cfg(not(feature = "compression"))] - { - bincode::deserialize(&self.0).ok() - } - } -} - impl TerrainGrid { /// Find a location suitable for spawning an entity near the given /// position (but in the same chunk). diff --git a/server/src/connection_handler.rs b/server/src/connection_handler.rs index 7f0f11a198..1f7bb553af 100644 --- a/server/src/connection_handler.rs +++ b/server/src/connection_handler.rs @@ -105,7 +105,7 @@ impl ConnectionHandler { let mut register_stream = participant.open(3, reliablec, 500).await?; let character_screen_stream = participant.open(3, reliablec, 500).await?; let in_game_stream = participant.open(3, reliablec, 100_000).await?; - let terrain_stream = participant.open(4, reliablec, 20_000).await?; + let terrain_stream = participant.open(4, reliable, 20_000).await?; let server_data = receiver.recv()?; diff --git a/server/src/sys/msg/terrain.rs b/server/src/sys/msg/terrain.rs index 9725910373..009d8dfa6e 100644 --- a/server/src/sys/msg/terrain.rs +++ b/server/src/sys/msg/terrain.rs @@ -2,11 +2,11 @@ use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence}; use common::{ comp::Pos, event::{EventBus, ServerEvent}, - terrain::{SerializedTerrainChunk, TerrainChunkSize, TerrainGrid}, + terrain::{TerrainChunkSize, TerrainGrid}, vol::RectVolSize, }; use common_ecs::{Job, Origin, ParMode, Phase, System}; -use common_net::msg::{ClientGeneral, ServerGeneral}; +use common_net::msg::{ClientGeneral, CompressedData, ServerGeneral}; use rayon::iter::ParallelIterator; use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage}; use std::sync::Arc; @@ -80,7 +80,7 @@ impl<'a> System<'a> for Sys { network_metrics.chunks_served_from_memory.inc(); client.send(ServerGeneral::TerrainChunkUpdate { key, - chunk: Ok(SerializedTerrainChunk::from_chunk(&chunk)), + chunk: Ok(CompressedData::compress(&chunk, 5)), })? }, None => { diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 3f7cbb145f..50c9be9cce 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -10,11 +10,11 @@ use common::{ event::{EventBus, ServerEvent}, generation::get_npc_name, npc::NPC_NAMES, - terrain::{SerializedTerrainChunk, TerrainGrid}, + terrain::TerrainGrid, LoadoutBuilder, SkillSetBuilder, }; use common_ecs::{Job, Origin, Phase, System}; -use common_net::msg::ServerGeneral; +use common_net::msg::{CompressedData, ServerGeneral}; use common_state::TerrainChanges; use comp::Behavior; use specs::{Join, Read, ReadStorage, Write, WriteExpect}; @@ -108,7 +108,7 @@ impl<'a> System<'a> for Sys { if lazy_msg.is_none() { lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { key, - chunk: Ok(SerializedTerrainChunk::from_chunk(&*chunk)), + chunk: Ok(CompressedData::compress(&*chunk, 5)), })); } lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index 9f203874f9..ebbe7bb95c 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -1,10 +1,7 @@ use crate::{client::Client, presence::Presence}; -use common::{ - comp::Pos, - terrain::{SerializedTerrainChunk, TerrainGrid}, -}; +use common::{comp::Pos, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, System}; -use common_net::msg::ServerGeneral; +use common_net::msg::{CompressedData, ServerGeneral}; use common_state::TerrainChanges; use specs::{Join, Read, ReadExpect, ReadStorage}; use std::sync::Arc; @@ -42,7 +39,7 @@ impl<'a> System<'a> for Sys { lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { key: *chunk_key, chunk: Ok(match terrain.get_key(*chunk_key) { - Some(chunk) => SerializedTerrainChunk::from_chunk(&chunk), + Some(chunk) => CompressedData::compress(&chunk, 5), None => break 'chunk, }), })); @@ -54,14 +51,16 @@ impl<'a> System<'a> for Sys { // TODO: Don't send all changed blocks to all clients // Sync changed blocks - let mut lazy_msg = None; - for (_, client) in (&presences, &clients).join() { - if lazy_msg.is_none() { - lazy_msg = Some(client.prepare(ServerGeneral::TerrainBlockUpdates( - terrain_changes.modified_blocks.clone(), - ))); + if !terrain_changes.modified_blocks.is_empty() { + let mut lazy_msg = None; + for (_, client) in (&presences, &clients).join() { + if lazy_msg.is_none() { + lazy_msg = Some(client.prepare(ServerGeneral::TerrainBlockUpdates( + CompressedData::compress(&terrain_changes.modified_blocks, 2), + ))); + } + lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); } - lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); } } } From b76ca76d8db50d9f78a5ae1b296d20ab05ecb4c1 Mon Sep 17 00:00:00 2001 From: Avi Weinstock Date: Tue, 20 Apr 2021 20:05:45 -0400 Subject: [PATCH 3/5] Switch the `chunk_compression_benchmarks` to using a spiral around the origin instead of the top-left corner to get a more representative sample. --- world/src/bin/chunk_compression_benchmarks.rs | 194 +++++++++--------- 1 file changed, 99 insertions(+), 95 deletions(-) diff --git a/world/src/bin/chunk_compression_benchmarks.rs b/world/src/bin/chunk_compression_benchmarks.rs index a20cb03e53..84c4038dcc 100644 --- a/world/src/bin/chunk_compression_benchmarks.rs +++ b/world/src/bin/chunk_compression_benchmarks.rs @@ -1,4 +1,5 @@ use common::{ + spiral::Spiral2d, terrain::{chonk::Chonk, Block, BlockKind, SpriteKind}, vol::{IntoVolIterator, RectVolSize, SizedVol, WriteVol}, volumes::dyna::{Access, ColumnAccess, Dyna}, @@ -144,103 +145,106 @@ fn main() { let mut totals = [0.0; 5]; let mut total_timings = [0.0; 2]; let mut count = 0; - for y in 1..sz.y { - for x in 1..sz.x { - let chunk = - world.generate_chunk(index.as_index_ref(), Vec2::new(x as _, y as _), || false); - if let Ok((chunk, _)) = chunk { - let uncompressed = bincode::serialize(&chunk).unwrap(); - for w in uncompressed.windows(k) { - *histogram.entry(w.to_vec()).or_default() += 1; - } - if x % 128 == 0 { - histogram_to_dictionary(&histogram, &mut dictionary); - } - let lz4chonk_pre = Instant::now(); - let lz4_chonk = lz4_with_dictionary(&bincode::serialize(&chunk).unwrap(), &[]); - let lz4chonk_post = Instant::now(); - //let lz4_dict_chonk = SerializedTerrainChunk::from_chunk(&chunk, - // &*dictionary); - - let deflatechonk_pre = Instant::now(); - let deflate_chonk = do_deflate_flate2(&bincode::serialize(&chunk).unwrap()); - let deflatechonk_post = Instant::now(); - - let dyna: Dyna<_, _, ColumnAccess> = chonk_to_dyna(&chunk, Block::empty()); - let ser_dyna = bincode::serialize(&dyna).unwrap(); - for w in ser_dyna.windows(k) { - *histogram2.entry(w.to_vec()).or_default() += 1; - } - if x % 128 == 0 { - histogram_to_dictionary(&histogram2, &mut dictionary2); - } - let lz4_dyna = lz4_with_dictionary(&*ser_dyna, &[]); - //let lz4_dict_dyna = lz4_with_dictionary(&*ser_dyna, &dictionary2); - let deflate_dyna = do_deflate(&*ser_dyna); - let deflate_channeled_dyna = - do_deflate_flate2(&bincode::serialize(&channelize_dyna(&dyna)).unwrap()); - let n = uncompressed.len(); - let sizes = [ - lz4_chonk.len() as f32 / n as f32, - deflate_chonk.len() as f32 / n as f32, - lz4_dyna.len() as f32 / n as f32, - deflate_dyna.len() as f32 / n as f32, - deflate_channeled_dyna.len() as f32 / n as f32, - ]; - let i = sizes - .iter() - .enumerate() - .fold((1.0, 0), |(best, i), (j, ratio)| { - if ratio < &best { - (*ratio, j) - } else { - (best, i) - } - }) - .1; - let timings = [ - (lz4chonk_post - lz4chonk_pre).subsec_nanos(), - (deflatechonk_post - deflatechonk_pre).subsec_nanos(), - ]; - trace!( - "{} {}: uncompressed: {}, {:?} {} {:?}", - x, - y, - n, - sizes, - i, - timings - ); - for i in 0..5 { - totals[i] += sizes[i]; - } - for i in 0..2 { - total_timings[i] += timings[i] as f32; - } - count += 1; + for (i, (x, y)) in Spiral2d::new() + .radius(20) + .map(|v| (v.x + sz.x as i32 / 2, v.y + sz.y as i32 / 2)) + .enumerate() + { + let chunk = world.generate_chunk(index.as_index_ref(), Vec2::new(x as _, y as _), || false); + if let Ok((chunk, _)) = chunk { + let uncompressed = bincode::serialize(&chunk).unwrap(); + for w in uncompressed.windows(k) { + *histogram.entry(w.to_vec()).or_default() += 1; } - if x % 64 == 0 { - println!("Chunks processed: {}\n", count); - println!("Average lz4_chonk: {}", totals[0] / count as f32); - println!("Average deflate_chonk: {}", totals[1] / count as f32); - println!("Average lz4_dyna: {}", totals[2] / count as f32); - println!("Average deflate_dyna: {}", totals[3] / count as f32); - println!( - "Average deflate_channeled_dyna: {}", - totals[4] / count as f32 - ); - println!(""); - println!( - "Average lz4_chonk nanos : {:02}", - total_timings[0] / count as f32 - ); - println!( - "Average deflate_chonk nanos: {:02}", - total_timings[1] / count as f32 - ); - println!("-----"); + if i % 128 == 0 { + histogram_to_dictionary(&histogram, &mut dictionary); } + let lz4chonk_pre = Instant::now(); + let lz4_chonk = lz4_with_dictionary(&bincode::serialize(&chunk).unwrap(), &[]); + let lz4chonk_post = Instant::now(); + //let lz4_dict_chonk = SerializedTerrainChunk::from_chunk(&chunk, + // &*dictionary); + + let deflatechonk_pre = Instant::now(); + let deflate_chonk = do_deflate_flate2(&bincode::serialize(&chunk).unwrap()); + let deflatechonk_post = Instant::now(); + + let dyna: Dyna<_, _, ColumnAccess> = chonk_to_dyna(&chunk, Block::empty()); + let ser_dyna = bincode::serialize(&dyna).unwrap(); + for w in ser_dyna.windows(k) { + *histogram2.entry(w.to_vec()).or_default() += 1; + } + if i % 128 == 0 { + histogram_to_dictionary(&histogram2, &mut dictionary2); + } + let lz4_dyna = lz4_with_dictionary(&*ser_dyna, &[]); + //let lz4_dict_dyna = lz4_with_dictionary(&*ser_dyna, &dictionary2); + let deflate_dyna = do_deflate(&*ser_dyna); + let deflate_channeled_dyna = + do_deflate_flate2(&bincode::serialize(&channelize_dyna(&dyna)).unwrap()); + let n = uncompressed.len(); + let sizes = [ + lz4_chonk.len() as f32 / n as f32, + deflate_chonk.len() as f32 / n as f32, + lz4_dyna.len() as f32 / n as f32, + deflate_dyna.len() as f32 / n as f32, + deflate_channeled_dyna.len() as f32 / n as f32, + ]; + let best_idx = sizes + .iter() + .enumerate() + .fold((1.0, 0), |(best, i), (j, ratio)| { + if ratio < &best { + (*ratio, j) + } else { + (best, i) + } + }) + .1; + let timings = [ + (lz4chonk_post - lz4chonk_pre).subsec_nanos(), + (deflatechonk_post - deflatechonk_pre).subsec_nanos(), + ]; + trace!( + "{} {}: uncompressed: {}, {:?} {} {:?}", + x, + y, + n, + sizes, + best_idx, + timings + ); + for j in 0..5 { + totals[j] += sizes[j]; + } + for j in 0..2 { + total_timings[j] += timings[j] as f32; + } + count += 1; + } + if i % 64 == 0 { + println!("Chunks processed: {}\n", count); + println!("Average lz4_chonk: {}", totals[0] / count as f32); + println!("Average deflate_chonk: {}", totals[1] / count as f32); + println!("Average lz4_dyna: {}", totals[2] / count as f32); + println!("Average deflate_dyna: {}", totals[3] / count as f32); + println!( + "Average deflate_channeled_dyna: {}", + totals[4] / count as f32 + ); + println!(""); + println!( + "Average lz4_chonk nanos : {:02}", + total_timings[0] / count as f32 + ); + println!( + "Average deflate_chonk nanos: {:02}", + total_timings[1] / count as f32 + ); + println!("-----"); + } + if i % 256 == 0 { + histogram.clear(); } - histogram.clear(); } } From 0ae259f35965b06a85c0da04ab116c6b42c3b041 Mon Sep 17 00:00:00 2001 From: Avi Weinstock Date: Tue, 20 Apr 2021 23:27:43 -0400 Subject: [PATCH 4/5] Cleanup errors introduced in rebase. --- client/src/lib.rs | 2 +- common/net/src/msg/server.rs | 2 +- server/src/sys/msg/terrain.rs | 1 - server/src/sys/terrain.rs | 2 +- server/src/sys/terrain_sync.rs | 1 - 5 files changed, 3 insertions(+), 5 deletions(-) diff --git a/client/src/lib.rs b/client/src/lib.rs index d0108121fc..63168d9337 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1924,7 +1924,7 @@ impl Client { match msg { ServerGeneral::TerrainChunkUpdate { key, chunk } => { if let Some(chunk) = chunk.ok().and_then(|c| c.decompress()) { - self.state.insert_chunk(key, chunk); + self.state.insert_chunk(key, Arc::new(chunk)); } self.pending_chunks.remove(&key); }, diff --git a/common/net/src/msg/server.rs b/common/net/src/msg/server.rs index 5596945f86..033ed0892e 100644 --- a/common/net/src/msg/server.rs +++ b/common/net/src/msg/server.rs @@ -12,7 +12,7 @@ use common::{ }; use hashbrown::HashMap; use serde::{Deserialize, Serialize}; -use std::{sync::Arc, time::Duration}; +use std::time::Duration; use vek::*; ///This struct contains all messages the server might send (on different diff --git a/server/src/sys/msg/terrain.rs b/server/src/sys/msg/terrain.rs index 009d8dfa6e..78209ef68b 100644 --- a/server/src/sys/msg/terrain.rs +++ b/server/src/sys/msg/terrain.rs @@ -9,7 +9,6 @@ use common_ecs::{Job, Origin, ParMode, Phase, System}; use common_net::msg::{ClientGeneral, CompressedData, ServerGeneral}; use rayon::iter::ParallelIterator; use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage}; -use std::sync::Arc; use tracing::{debug, trace}; /// This system will handle new messages from clients diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 50c9be9cce..3294c13059 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -246,7 +246,7 @@ impl<'a> System<'a> for Sys { new_chunks.into_par_iter().for_each(|(key, chunk)| { let mut msg = Some(ServerGeneral::TerrainChunkUpdate { key, - chunk: Ok(chunk), + chunk: Ok(CompressedData::compress(&*chunk, 5)), }); let mut lazy_msg = None; diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index ebbe7bb95c..305427457b 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -4,7 +4,6 @@ use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{CompressedData, ServerGeneral}; use common_state::TerrainChanges; use specs::{Join, Read, ReadExpect, ReadStorage}; -use std::sync::Arc; /// This systems sends new chunks to clients as well as changes to existing /// chunks From 03de863328fccdac04f8460d1fb6b97181562572 Mon Sep 17 00:00:00 2001 From: Avi Weinstock Date: Tue, 20 Apr 2021 23:48:15 -0400 Subject: [PATCH 5/5] Remove redundant terrain message per MR 2166 comment. --- server/src/sys/terrain.rs | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 3294c13059..42778912e9 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -93,28 +93,6 @@ impl<'a> System<'a> for Sys { // Add to list of chunks to send to nearby players. new_chunks.push((key, Arc::clone(&chunk))); - // Send the chunk to all nearby players. - let mut lazy_msg = None; - for (presence, pos, client) in (&presences, &positions, &clients).join() { - let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32)); - // Subtract 2 from the offset before computing squared magnitude - // 1 since chunks need neighbors to be meshed - // 1 to act as a buffer if the player moves in that direction - let adjusted_dist_sqr = (chunk_pos - key) - .map(|e: i32| (e.abs() as u32).saturating_sub(2)) - .magnitude_squared(); - - if adjusted_dist_sqr <= presence.view_distance.pow(2) { - if lazy_msg.is_none() { - lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { - key, - chunk: Ok(CompressedData::compress(&*chunk, 5)), - })); - } - lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); - } - } - // TODO: code duplication for chunk insertion between here and state.rs // Insert the chunk into terrain changes if terrain.insert(key, chunk).is_some() {