From 30cae40b82d527c8758c926919e1feffa0bd0f40 Mon Sep 17 00:00:00 2001 From: Avi Weinstock Date: Tue, 27 Apr 2021 19:30:42 -0400 Subject: [PATCH] Add a bandwidth-based heuristic for chunk compression. --- common/net/Cargo.toml | 1 - common/net/src/msg/compression.rs | 134 +++++++++++++----- common/net/src/msg/mod.rs | 2 +- common/net/src/msg/server.rs | 12 +- server/src/sys/msg/terrain.rs | 21 ++- server/src/sys/terrain.rs | 32 +++-- server/src/sys/terrain_sync.rs | 43 ++++-- world/src/bin/chunk_compression_benchmarks.rs | 36 +++-- 8 files changed, 198 insertions(+), 83 deletions(-) diff --git a/common/net/Cargo.toml b/common/net/Cargo.toml index 099aca7835..8ba5ac3f20 100644 --- a/common/net/Cargo.toml +++ b/common/net/Cargo.toml @@ -21,7 +21,6 @@ num-traits = "0.2" sum_type = "0.2.0" vek = { version = "=0.14.1", features = ["serde"] } tracing = { version = "0.1", default-features = false } -inline_tweak = "1.0.2" # Data structures hashbrown = { version = "0.9", features = ["rayon", "serde", "nightly"] } diff --git a/common/net/src/msg/compression.rs b/common/net/src/msg/compression.rs index 9dd6c114ab..b80f04f033 100644 --- a/common/net/src/msg/compression.rs +++ b/common/net/src/msg/compression.rs @@ -12,7 +12,7 @@ use std::{ io::{Read, Write}, marker::PhantomData, }; -use tracing::{trace, warn}; +use tracing::warn; use vek::*; /// Wrapper for compressed, serialized data (for stuff that doesn't use the @@ -37,12 +37,6 @@ impl CompressedData { let mut encoder = DeflateEncoder::new(Vec::new(), Compression::new(level)); encoder.write_all(&*uncompressed).expect(EXPECT_MSG); let compressed = encoder.finish().expect(EXPECT_MSG); - trace!( - "compressed {}, uncompressed {}, ratio {}", - compressed.len(), - uncompressed.len(), - compressed.len() as f32 / uncompressed.len() as f32 - ); CompressedData { data: compressed, compressed: true, @@ -106,6 +100,10 @@ impl PackingFormula for TallPacking { } } +/// A wide, short image. Shares the advantage of not wasting space with +/// TallPacking, but faster to compress and smaller since PNG compresses each +/// row indepedently, so a wide image has fewer calls to the compressor. FLIP_X +/// has the same spatial locality preserving behavior as with TallPacking. #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct WidePacking(); @@ -602,9 +600,9 @@ impl VoxelImageDecoding for QuadPngEncoding { } #[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct TriPngEncoding; +pub struct TriPngEncoding(); -impl VoxelImageEncoding for TriPngEncoding { +impl VoxelImageEncoding for TriPngEncoding { #[allow(clippy::type_complexity)] type Output = CompressedData<(Vec, Vec>, [usize; 3])>; #[allow(clippy::type_complexity)] @@ -628,7 +626,9 @@ impl VoxelImageEncoding for TriPngEncoding { ws.0.put_pixel(x, y, image::Luma([kind as u8])); ws.1.put_pixel(x, y, image::Luma([0])); ws.2.put_pixel(x, y, image::Luma([0])); - *ws.3.entry(kind).or_default().entry(rgb).or_insert(0) += 1; + if AVERAGE_PALETTE { + *ws.3.entry(kind).or_default().entry(rgb).or_insert(0) += 1; + } } fn put_sprite( @@ -663,29 +663,34 @@ impl VoxelImageEncoding for TriPngEncoding { f(&ws.1, 1)?; f(&ws.2, 2)?; - let mut palette = vec![Rgb { r: 0, g: 0, b: 0 }; 256]; - for (block, hist) in ws.3.iter() { - let (mut r, mut g, mut b) = (0.0, 0.0, 0.0); - let mut total = 0; - for (color, count) in hist.iter() { - r += color.r as f64 * *count as f64; - g += color.g as f64 * *count as f64; - b += color.b as f64 * *count as f64; - total += *count; + let palette = if AVERAGE_PALETTE { + let mut palette = vec![Rgb { r: 0, g: 0, b: 0 }; 256]; + for (block, hist) in ws.3.iter() { + let (mut r, mut g, mut b) = (0.0, 0.0, 0.0); + let mut total = 0; + for (color, count) in hist.iter() { + r += color.r as f64 * *count as f64; + g += color.g as f64 * *count as f64; + b += color.b as f64 * *count as f64; + total += *count; + } + r /= total as f64; + g /= total as f64; + b /= total as f64; + palette[*block as u8 as usize].r = r as u8; + palette[*block as u8 as usize].g = g as u8; + palette[*block as u8 as usize].b = b as u8; } - r /= total as f64; - g /= total as f64; - b /= total as f64; - palette[*block as u8 as usize].r = r as u8; - palette[*block as u8 as usize].g = g as u8; - palette[*block as u8 as usize].b = b as u8; - } + palette + } else { + Vec::new() + }; Some(CompressedData::compress(&(buf, palette, indices), 4)) } } -impl VoxelImageDecoding for TriPngEncoding { +impl VoxelImageDecoding for TriPngEncoding { fn start(data: &Self::Output) -> Option { use image::codecs::png::PngDecoder; let (quad, palette, indices) = data.decompress()?; @@ -698,12 +703,14 @@ impl VoxelImageDecoding for TriPngEncoding { let b = image_from_bytes(PngDecoder::new(&quad[ranges[1].clone()]).ok()?)?; let c = image_from_bytes(PngDecoder::new(&quad[ranges[2].clone()]).ok()?)?; let mut d: HashMap<_, HashMap<_, _>> = HashMap::new(); - for i in 0..=255 { - if let Some(block) = BlockKind::from_u8(i) { - d.entry(block) - .or_default() - .entry(palette[i as usize]) - .insert(1); + if AVERAGE_PALETTE { + for i in 0..=255 { + if let Some(block) = BlockKind::from_u8(i) { + d.entry(block) + .or_default() + .entry(palette[i as usize]) + .insert(1); + } } } @@ -713,11 +720,62 @@ impl VoxelImageDecoding for TriPngEncoding { fn get_block(ws: &Self::Workspace, x: u32, y: u32, _: bool) -> Block { if let Some(kind) = BlockKind::from_u8(ws.0.get_pixel(x, y).0[0]) { if kind.is_filled() { - let rgb = *ws - .3 - .get(&kind) - .and_then(|h| h.keys().next()) - .unwrap_or(&Rgb::default()); + let rgb = if AVERAGE_PALETTE { + *ws.3 + .get(&kind) + .and_then(|h| h.keys().next()) + .unwrap_or(&Rgb::default()) + } else { + use BlockKind::*; + match kind { + Air | Water => Rgb { r: 0, g: 0, b: 0 }, + Rock => Rgb { + r: 93, + g: 110, + b: 145, + }, + WeakRock => Rgb { + r: 93, + g: 132, + b: 145, + }, + Grass => Rgb { + r: 51, + g: 160, + b: 94, + }, + Snow => Rgb { + r: 192, + g: 255, + b: 255, + }, + Earth => Rgb { + r: 200, + g: 140, + b: 93, + }, + Sand => Rgb { + r: 241, + g: 177, + b: 128, + }, + Wood => Rgb { + r: 128, + g: 77, + b: 51, + }, + Leaves => Rgb { + r: 93, + g: 206, + b: 64, + }, + Misc => Rgb { + r: 255, + g: 0, + b: 255, + }, + } + }; Block::new(kind, rgb) } else { let mut block = Block::new(kind, Rgb { r: 0, g: 0, b: 0 }); diff --git a/common/net/src/msg/mod.rs b/common/net/src/msg/mod.rs index 65fea7d8cb..57a11e1806 100644 --- a/common/net/src/msg/mod.rs +++ b/common/net/src/msg/mod.rs @@ -15,7 +15,7 @@ pub use self::{ server::{ CharacterInfo, DisconnectReason, InviteAnswer, Notification, PlayerInfo, PlayerListUpdate, RegisterError, SerializedTerrainChunk, ServerGeneral, ServerInfo, ServerInit, ServerMsg, - ServerRegisterAnswer, + ServerRegisterAnswer, TERRAIN_LOW_BANDWIDTH, }, world_msg::WorldMapMsg, }; diff --git a/common/net/src/msg/server.rs b/common/net/src/msg/server.rs index 25cef048ee..31e66b5299 100644 --- a/common/net/src/msg/server.rs +++ b/common/net/src/msg/server.rs @@ -70,12 +70,16 @@ pub type ServerRegisterAnswer = Result<(), RegisterError>; pub enum SerializedTerrainChunk { DeflatedChonk(CompressedData), QuadPng(WireChonk, WidePacking, TerrainChunkMeta, TerrainChunkSize>), - TriPng(WireChonk, TerrainChunkMeta, TerrainChunkSize>), + TriPng(WireChonk, WidePacking, TerrainChunkMeta, TerrainChunkSize>), } +/// If someone has less than this number of bytes per second of bandwidth, spend +/// more CPU generating a smaller encoding of terrain data. +pub const TERRAIN_LOW_BANDWIDTH: f32 = 5_000_000.0; + impl SerializedTerrainChunk { - pub fn via_heuristic(chunk: &TerrainChunk) -> Self { - if chunk.get_max_z() - chunk.get_min_z() < 128 { + pub fn via_heuristic(chunk: &TerrainChunk, low_bandwidth: bool) -> Self { + if low_bandwidth && (chunk.get_max_z() - chunk.get_min_z() <= 128) { Self::quadpng(chunk) } else { Self::deflate(chunk) @@ -96,7 +100,7 @@ impl SerializedTerrainChunk { } pub fn tripng(chunk: &TerrainChunk) -> Self { - if let Some(wc) = WireChonk::from_chonk(TriPngEncoding, WidePacking(), chunk) { + if let Some(wc) = WireChonk::from_chonk(TriPngEncoding(), WidePacking(), chunk) { Self::TriPng(wc) } else { warn!("Image encoding failure occurred, falling back to deflate"); diff --git a/server/src/sys/msg/terrain.rs b/server/src/sys/msg/terrain.rs index e3902af7cf..57c0684730 100644 --- a/server/src/sys/msg/terrain.rs +++ b/server/src/sys/msg/terrain.rs @@ -6,7 +6,9 @@ use common::{ vol::RectVolSize, }; use common_ecs::{Job, Origin, ParMode, Phase, System}; -use common_net::msg::{ClientGeneral, SerializedTerrainChunk, ServerGeneral}; +use common_net::msg::{ + ClientGeneral, SerializedTerrainChunk, ServerGeneral, TERRAIN_LOW_BANDWIDTH, +}; use rayon::iter::ParallelIterator; use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage}; use tracing::{debug, trace}; @@ -77,12 +79,17 @@ impl<'a> System<'a> for Sys { match terrain.get_key_arc(key) { Some(chunk) => { network_metrics.chunks_served_from_memory.inc(); - client.send(ServerGeneral::TerrainChunkUpdate { - key, - chunk: Ok(SerializedTerrainChunk::via_heuristic( - &chunk, - )), - })? + if let Some(participant) = &client.participant { + let low_bandwidth = + participant.bandwidth() < TERRAIN_LOW_BANDWIDTH; + client.send(ServerGeneral::TerrainChunkUpdate { + key, + chunk: Ok(SerializedTerrainChunk::via_heuristic( + &chunk, + low_bandwidth, + )), + })? + } }, None => { network_metrics.chunks_generation_triggered.inc(); diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 03aeedba99..c9595f7cf7 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -14,7 +14,7 @@ use common::{ LoadoutBuilder, SkillSetBuilder, }; use common_ecs::{Job, Origin, Phase, System}; -use common_net::msg::{SerializedTerrainChunk, ServerGeneral}; +use common_net::msg::{SerializedTerrainChunk, ServerGeneral, TERRAIN_LOW_BANDWIDTH}; use common_state::TerrainChanges; use comp::Behavior; use specs::{Join, Read, ReadStorage, Write, WriteExpect}; @@ -222,11 +222,8 @@ impl<'a> System<'a> for Sys { // Send the chunk to all nearby players. use rayon::iter::{IntoParallelIterator, ParallelIterator}; new_chunks.into_par_iter().for_each(|(key, chunk)| { - let mut msg = Some(ServerGeneral::TerrainChunkUpdate { - key, - chunk: Ok(SerializedTerrainChunk::via_heuristic(&*chunk)), - }); - let mut lazy_msg = None; + let mut lazy_msg_lo = None; + let mut lazy_msg_hi = None; (&presences, &positions, &clients) .join() @@ -240,11 +237,26 @@ impl<'a> System<'a> for Sys { .magnitude_squared(); if adjusted_dist_sqr <= presence.view_distance.pow(2) { - if let Some(msg) = msg.take() { - lazy_msg = Some(client.prepare(msg)); - }; + if let Some(participant) = &client.participant { + let low_bandwidth = participant.bandwidth() < TERRAIN_LOW_BANDWIDTH; + let lazy_msg = if low_bandwidth { + &mut lazy_msg_lo + } else { + &mut lazy_msg_hi + }; + if lazy_msg.is_none() { + *lazy_msg = + Some(client.prepare(ServerGeneral::TerrainChunkUpdate { + key, + chunk: Ok(SerializedTerrainChunk::via_heuristic( + &*chunk, + low_bandwidth, + )), + })); + }; - lazy_msg.as_ref().map(|msg| client.send_prepared(msg)); + lazy_msg.as_ref().map(|msg| client.send_prepared(msg)); + } } }); }); diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index 178fc0f604..6fb86443f2 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -1,7 +1,9 @@ use crate::{client::Client, presence::Presence}; use common::{comp::Pos, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, System}; -use common_net::msg::{CompressedData, SerializedTerrainChunk, ServerGeneral}; +use common_net::msg::{ + CompressedData, SerializedTerrainChunk, ServerGeneral, TERRAIN_LOW_BANDWIDTH, +}; use common_state::TerrainChanges; use specs::{Join, Read, ReadExpect, ReadStorage}; @@ -29,21 +31,36 @@ impl<'a> System<'a> for Sys { ) { // Sync changed chunks 'chunk: for chunk_key in &terrain_changes.modified_chunks { - let mut lazy_msg = None; + let mut lazy_msg_hi = None; + let mut lazy_msg_lo = None; for (presence, pos, client) in (&presences, &positions, &clients).join() { - if super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance) - { - if lazy_msg.is_none() { - lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { - key: *chunk_key, - chunk: Ok(match terrain.get_key(*chunk_key) { - Some(chunk) => SerializedTerrainChunk::via_heuristic(&chunk), - None => break 'chunk, - }), - })); + if let Some(participant) = &client.participant { + let low_bandwidth = participant.bandwidth() < TERRAIN_LOW_BANDWIDTH; + let lazy_msg = if low_bandwidth { + &mut lazy_msg_lo + } else { + &mut lazy_msg_hi + }; + if super::terrain::chunk_in_vd( + pos.0, + *chunk_key, + &terrain, + presence.view_distance, + ) { + if lazy_msg.is_none() { + *lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { + key: *chunk_key, + chunk: Ok(match terrain.get_key(*chunk_key) { + Some(chunk) => { + SerializedTerrainChunk::via_heuristic(&chunk, low_bandwidth) + }, + None => break 'chunk, + }), + })); + } + lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); } - lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); } } } diff --git a/world/src/bin/chunk_compression_benchmarks.rs b/world/src/bin/chunk_compression_benchmarks.rs index 75a61415c8..2c6196e375 100644 --- a/world/src/bin/chunk_compression_benchmarks.rs +++ b/world/src/bin/chunk_compression_benchmarks.rs @@ -620,18 +620,26 @@ fn main() { .unwrap(); let quadpngquartwide_post = Instant::now(); - let tripng_pre = Instant::now(); - let tripng = - image_terrain_chonk(TriPngEncoding, TallPacking { flip_y: true }, &chunk) + let tripngaverage_pre = Instant::now(); + let tripngaverage = + image_terrain_chonk(TriPngEncoding::(), WidePacking::(), &chunk) .unwrap(); - let tripng_post = Instant::now(); + let tripngaverage_post = Instant::now(); + + let tripngconst_pre = Instant::now(); + let tripngconst = + image_terrain_chonk(TriPngEncoding::(), WidePacking::(), &chunk) + .unwrap(); + let tripngconst_post = Instant::now(); + #[rustfmt::skip] sizes.extend_from_slice(&[ ("quadpngfull", quadpngfull.data.len() as f32 / n as f32), ("quadpnghalf", quadpnghalf.data.len() as f32 / n as f32), ("quadpngquarttall", quadpngquarttall.data.len() as f32 / n as f32), ("quadpngquartwide", quadpngquartwide.data.len() as f32 / n as f32), - ("tripng", tripng.data.len() as f32 / n as f32), + ("tripngaverage", tripngaverage.data.len() as f32 / n as f32), + ("tripngconst", tripngconst.data.len() as f32 / n as f32), ]); let best_idx = sizes .iter() @@ -650,7 +658,8 @@ fn main() { ("quadpnghalf", (quadpnghalf_post - quadpnghalf_pre).subsec_nanos()), ("quadpngquarttall", (quadpngquarttall_post - quadpngquarttall_pre).subsec_nanos()), ("quadpngquartwide", (quadpngquartwide_post - quadpngquartwide_pre).subsec_nanos()), - ("tripng", (tripng_post - tripng_pre).subsec_nanos()), + ("tripngaverage", (tripngaverage_post - tripngaverage_pre).subsec_nanos()), + ("tripngconst", (tripngconst_post - tripngconst_pre).subsec_nanos()), ]); { let bucket = z_buckets @@ -672,14 +681,23 @@ fn main() { bucket.1 += (quadpngquartwide_post - quadpngquartwide_pre).subsec_nanos() as f32; } - if false { + if true { let bucket = z_buckets - .entry("tripng") + .entry("tripngaverage") .or_default() .entry(chunk.get_max_z() - chunk.get_min_z()) .or_insert((0, 0.0)); bucket.0 += 1; - bucket.1 += (tripng_post - tripng_pre).subsec_nanos() as f32; + bucket.1 += (tripngaverage_post - tripngaverage_pre).subsec_nanos() as f32; + } + if true { + let bucket = z_buckets + .entry("tripngconst") + .or_default() + .entry(chunk.get_max_z() - chunk.get_min_z()) + .or_insert((0, 0.0)); + bucket.0 += 1; + bucket.1 += (tripngconst_post - tripngconst_pre).subsec_nanos() as f32; } trace!( "{} {}: uncompressed: {}, {:?} {} {:?}",