Add a bandwidth-based heuristic for chunk compression.

This commit is contained in:
Avi Weinstock 2021-04-27 19:30:42 -04:00
parent cdc2eccda8
commit 30cae40b82
8 changed files with 198 additions and 83 deletions

View File

@ -21,7 +21,6 @@ num-traits = "0.2"
sum_type = "0.2.0"
vek = { version = "=0.14.1", features = ["serde"] }
tracing = { version = "0.1", default-features = false }
inline_tweak = "1.0.2"
# Data structures
hashbrown = { version = "0.9", features = ["rayon", "serde", "nightly"] }

View File

@ -12,7 +12,7 @@ use std::{
io::{Read, Write},
marker::PhantomData,
};
use tracing::{trace, warn};
use tracing::warn;
use vek::*;
/// Wrapper for compressed, serialized data (for stuff that doesn't use the
@ -37,12 +37,6 @@ impl<T: Serialize> CompressedData<T> {
let mut encoder = DeflateEncoder::new(Vec::new(), Compression::new(level));
encoder.write_all(&*uncompressed).expect(EXPECT_MSG);
let compressed = encoder.finish().expect(EXPECT_MSG);
trace!(
"compressed {}, uncompressed {}, ratio {}",
compressed.len(),
uncompressed.len(),
compressed.len() as f32 / uncompressed.len() as f32
);
CompressedData {
data: compressed,
compressed: true,
@ -106,6 +100,10 @@ impl PackingFormula for TallPacking {
}
}
/// A wide, short image. Shares the advantage of not wasting space with
/// TallPacking, but faster to compress and smaller since PNG compresses each
/// row indepedently, so a wide image has fewer calls to the compressor. FLIP_X
/// has the same spatial locality preserving behavior as with TallPacking.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct WidePacking<const FLIP_X: bool>();
@ -602,9 +600,9 @@ impl<const N: u32> VoxelImageDecoding for QuadPngEncoding<N> {
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct TriPngEncoding;
pub struct TriPngEncoding<const AVERAGE_PALETTE: bool>();
impl VoxelImageEncoding for TriPngEncoding {
impl<const AVERAGE_PALETTE: bool> VoxelImageEncoding for TriPngEncoding<AVERAGE_PALETTE> {
#[allow(clippy::type_complexity)]
type Output = CompressedData<(Vec<u8>, Vec<Rgb<u8>>, [usize; 3])>;
#[allow(clippy::type_complexity)]
@ -628,7 +626,9 @@ impl VoxelImageEncoding for TriPngEncoding {
ws.0.put_pixel(x, y, image::Luma([kind as u8]));
ws.1.put_pixel(x, y, image::Luma([0]));
ws.2.put_pixel(x, y, image::Luma([0]));
*ws.3.entry(kind).or_default().entry(rgb).or_insert(0) += 1;
if AVERAGE_PALETTE {
*ws.3.entry(kind).or_default().entry(rgb).or_insert(0) += 1;
}
}
fn put_sprite(
@ -663,29 +663,34 @@ impl VoxelImageEncoding for TriPngEncoding {
f(&ws.1, 1)?;
f(&ws.2, 2)?;
let mut palette = vec![Rgb { r: 0, g: 0, b: 0 }; 256];
for (block, hist) in ws.3.iter() {
let (mut r, mut g, mut b) = (0.0, 0.0, 0.0);
let mut total = 0;
for (color, count) in hist.iter() {
r += color.r as f64 * *count as f64;
g += color.g as f64 * *count as f64;
b += color.b as f64 * *count as f64;
total += *count;
let palette = if AVERAGE_PALETTE {
let mut palette = vec![Rgb { r: 0, g: 0, b: 0 }; 256];
for (block, hist) in ws.3.iter() {
let (mut r, mut g, mut b) = (0.0, 0.0, 0.0);
let mut total = 0;
for (color, count) in hist.iter() {
r += color.r as f64 * *count as f64;
g += color.g as f64 * *count as f64;
b += color.b as f64 * *count as f64;
total += *count;
}
r /= total as f64;
g /= total as f64;
b /= total as f64;
palette[*block as u8 as usize].r = r as u8;
palette[*block as u8 as usize].g = g as u8;
palette[*block as u8 as usize].b = b as u8;
}
r /= total as f64;
g /= total as f64;
b /= total as f64;
palette[*block as u8 as usize].r = r as u8;
palette[*block as u8 as usize].g = g as u8;
palette[*block as u8 as usize].b = b as u8;
}
palette
} else {
Vec::new()
};
Some(CompressedData::compress(&(buf, palette, indices), 4))
}
}
impl VoxelImageDecoding for TriPngEncoding {
impl<const AVERAGE_PALETTE: bool> VoxelImageDecoding for TriPngEncoding<AVERAGE_PALETTE> {
fn start(data: &Self::Output) -> Option<Self::Workspace> {
use image::codecs::png::PngDecoder;
let (quad, palette, indices) = data.decompress()?;
@ -698,12 +703,14 @@ impl VoxelImageDecoding for TriPngEncoding {
let b = image_from_bytes(PngDecoder::new(&quad[ranges[1].clone()]).ok()?)?;
let c = image_from_bytes(PngDecoder::new(&quad[ranges[2].clone()]).ok()?)?;
let mut d: HashMap<_, HashMap<_, _>> = HashMap::new();
for i in 0..=255 {
if let Some(block) = BlockKind::from_u8(i) {
d.entry(block)
.or_default()
.entry(palette[i as usize])
.insert(1);
if AVERAGE_PALETTE {
for i in 0..=255 {
if let Some(block) = BlockKind::from_u8(i) {
d.entry(block)
.or_default()
.entry(palette[i as usize])
.insert(1);
}
}
}
@ -713,11 +720,62 @@ impl VoxelImageDecoding for TriPngEncoding {
fn get_block(ws: &Self::Workspace, x: u32, y: u32, _: bool) -> Block {
if let Some(kind) = BlockKind::from_u8(ws.0.get_pixel(x, y).0[0]) {
if kind.is_filled() {
let rgb = *ws
.3
.get(&kind)
.and_then(|h| h.keys().next())
.unwrap_or(&Rgb::default());
let rgb = if AVERAGE_PALETTE {
*ws.3
.get(&kind)
.and_then(|h| h.keys().next())
.unwrap_or(&Rgb::default())
} else {
use BlockKind::*;
match kind {
Air | Water => Rgb { r: 0, g: 0, b: 0 },
Rock => Rgb {
r: 93,
g: 110,
b: 145,
},
WeakRock => Rgb {
r: 93,
g: 132,
b: 145,
},
Grass => Rgb {
r: 51,
g: 160,
b: 94,
},
Snow => Rgb {
r: 192,
g: 255,
b: 255,
},
Earth => Rgb {
r: 200,
g: 140,
b: 93,
},
Sand => Rgb {
r: 241,
g: 177,
b: 128,
},
Wood => Rgb {
r: 128,
g: 77,
b: 51,
},
Leaves => Rgb {
r: 93,
g: 206,
b: 64,
},
Misc => Rgb {
r: 255,
g: 0,
b: 255,
},
}
};
Block::new(kind, rgb)
} else {
let mut block = Block::new(kind, Rgb { r: 0, g: 0, b: 0 });

View File

@ -15,7 +15,7 @@ pub use self::{
server::{
CharacterInfo, DisconnectReason, InviteAnswer, Notification, PlayerInfo, PlayerListUpdate,
RegisterError, SerializedTerrainChunk, ServerGeneral, ServerInfo, ServerInit, ServerMsg,
ServerRegisterAnswer,
ServerRegisterAnswer, TERRAIN_LOW_BANDWIDTH,
},
world_msg::WorldMapMsg,
};

View File

@ -70,12 +70,16 @@ pub type ServerRegisterAnswer = Result<(), RegisterError>;
pub enum SerializedTerrainChunk {
DeflatedChonk(CompressedData<TerrainChunk>),
QuadPng(WireChonk<QuadPngEncoding<4>, WidePacking<true>, TerrainChunkMeta, TerrainChunkSize>),
TriPng(WireChonk<TriPngEncoding, WidePacking<true>, TerrainChunkMeta, TerrainChunkSize>),
TriPng(WireChonk<TriPngEncoding<false>, WidePacking<true>, TerrainChunkMeta, TerrainChunkSize>),
}
/// If someone has less than this number of bytes per second of bandwidth, spend
/// more CPU generating a smaller encoding of terrain data.
pub const TERRAIN_LOW_BANDWIDTH: f32 = 5_000_000.0;
impl SerializedTerrainChunk {
pub fn via_heuristic(chunk: &TerrainChunk) -> Self {
if chunk.get_max_z() - chunk.get_min_z() < 128 {
pub fn via_heuristic(chunk: &TerrainChunk, low_bandwidth: bool) -> Self {
if low_bandwidth && (chunk.get_max_z() - chunk.get_min_z() <= 128) {
Self::quadpng(chunk)
} else {
Self::deflate(chunk)
@ -96,7 +100,7 @@ impl SerializedTerrainChunk {
}
pub fn tripng(chunk: &TerrainChunk) -> Self {
if let Some(wc) = WireChonk::from_chonk(TriPngEncoding, WidePacking(), chunk) {
if let Some(wc) = WireChonk::from_chonk(TriPngEncoding(), WidePacking(), chunk) {
Self::TriPng(wc)
} else {
warn!("Image encoding failure occurred, falling back to deflate");

View File

@ -6,7 +6,9 @@ use common::{
vol::RectVolSize,
};
use common_ecs::{Job, Origin, ParMode, Phase, System};
use common_net::msg::{ClientGeneral, SerializedTerrainChunk, ServerGeneral};
use common_net::msg::{
ClientGeneral, SerializedTerrainChunk, ServerGeneral, TERRAIN_LOW_BANDWIDTH,
};
use rayon::iter::ParallelIterator;
use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage};
use tracing::{debug, trace};
@ -77,12 +79,17 @@ impl<'a> System<'a> for Sys {
match terrain.get_key_arc(key) {
Some(chunk) => {
network_metrics.chunks_served_from_memory.inc();
client.send(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(SerializedTerrainChunk::via_heuristic(
&chunk,
)),
})?
if let Some(participant) = &client.participant {
let low_bandwidth =
participant.bandwidth() < TERRAIN_LOW_BANDWIDTH;
client.send(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(SerializedTerrainChunk::via_heuristic(
&chunk,
low_bandwidth,
)),
})?
}
},
None => {
network_metrics.chunks_generation_triggered.inc();

View File

@ -14,7 +14,7 @@ use common::{
LoadoutBuilder, SkillSetBuilder,
};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{SerializedTerrainChunk, ServerGeneral};
use common_net::msg::{SerializedTerrainChunk, ServerGeneral, TERRAIN_LOW_BANDWIDTH};
use common_state::TerrainChanges;
use comp::Behavior;
use specs::{Join, Read, ReadStorage, Write, WriteExpect};
@ -222,11 +222,8 @@ impl<'a> System<'a> for Sys {
// Send the chunk to all nearby players.
use rayon::iter::{IntoParallelIterator, ParallelIterator};
new_chunks.into_par_iter().for_each(|(key, chunk)| {
let mut msg = Some(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(SerializedTerrainChunk::via_heuristic(&*chunk)),
});
let mut lazy_msg = None;
let mut lazy_msg_lo = None;
let mut lazy_msg_hi = None;
(&presences, &positions, &clients)
.join()
@ -240,11 +237,26 @@ impl<'a> System<'a> for Sys {
.magnitude_squared();
if adjusted_dist_sqr <= presence.view_distance.pow(2) {
if let Some(msg) = msg.take() {
lazy_msg = Some(client.prepare(msg));
};
if let Some(participant) = &client.participant {
let low_bandwidth = participant.bandwidth() < TERRAIN_LOW_BANDWIDTH;
let lazy_msg = if low_bandwidth {
&mut lazy_msg_lo
} else {
&mut lazy_msg_hi
};
if lazy_msg.is_none() {
*lazy_msg =
Some(client.prepare(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(SerializedTerrainChunk::via_heuristic(
&*chunk,
low_bandwidth,
)),
}));
};
lazy_msg.as_ref().map(|msg| client.send_prepared(msg));
lazy_msg.as_ref().map(|msg| client.send_prepared(msg));
}
}
});
});

View File

@ -1,7 +1,9 @@
use crate::{client::Client, presence::Presence};
use common::{comp::Pos, terrain::TerrainGrid};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{CompressedData, SerializedTerrainChunk, ServerGeneral};
use common_net::msg::{
CompressedData, SerializedTerrainChunk, ServerGeneral, TERRAIN_LOW_BANDWIDTH,
};
use common_state::TerrainChanges;
use specs::{Join, Read, ReadExpect, ReadStorage};
@ -29,21 +31,36 @@ impl<'a> System<'a> for Sys {
) {
// Sync changed chunks
'chunk: for chunk_key in &terrain_changes.modified_chunks {
let mut lazy_msg = None;
let mut lazy_msg_hi = None;
let mut lazy_msg_lo = None;
for (presence, pos, client) in (&presences, &positions, &clients).join() {
if super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance)
{
if lazy_msg.is_none() {
lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate {
key: *chunk_key,
chunk: Ok(match terrain.get_key(*chunk_key) {
Some(chunk) => SerializedTerrainChunk::via_heuristic(&chunk),
None => break 'chunk,
}),
}));
if let Some(participant) = &client.participant {
let low_bandwidth = participant.bandwidth() < TERRAIN_LOW_BANDWIDTH;
let lazy_msg = if low_bandwidth {
&mut lazy_msg_lo
} else {
&mut lazy_msg_hi
};
if super::terrain::chunk_in_vd(
pos.0,
*chunk_key,
&terrain,
presence.view_distance,
) {
if lazy_msg.is_none() {
*lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate {
key: *chunk_key,
chunk: Ok(match terrain.get_key(*chunk_key) {
Some(chunk) => {
SerializedTerrainChunk::via_heuristic(&chunk, low_bandwidth)
},
None => break 'chunk,
}),
}));
}
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));
}
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));
}
}
}

View File

@ -620,18 +620,26 @@ fn main() {
.unwrap();
let quadpngquartwide_post = Instant::now();
let tripng_pre = Instant::now();
let tripng =
image_terrain_chonk(TriPngEncoding, TallPacking { flip_y: true }, &chunk)
let tripngaverage_pre = Instant::now();
let tripngaverage =
image_terrain_chonk(TriPngEncoding::<true>(), WidePacking::<true>(), &chunk)
.unwrap();
let tripng_post = Instant::now();
let tripngaverage_post = Instant::now();
let tripngconst_pre = Instant::now();
let tripngconst =
image_terrain_chonk(TriPngEncoding::<false>(), WidePacking::<true>(), &chunk)
.unwrap();
let tripngconst_post = Instant::now();
#[rustfmt::skip]
sizes.extend_from_slice(&[
("quadpngfull", quadpngfull.data.len() as f32 / n as f32),
("quadpnghalf", quadpnghalf.data.len() as f32 / n as f32),
("quadpngquarttall", quadpngquarttall.data.len() as f32 / n as f32),
("quadpngquartwide", quadpngquartwide.data.len() as f32 / n as f32),
("tripng", tripng.data.len() as f32 / n as f32),
("tripngaverage", tripngaverage.data.len() as f32 / n as f32),
("tripngconst", tripngconst.data.len() as f32 / n as f32),
]);
let best_idx = sizes
.iter()
@ -650,7 +658,8 @@ fn main() {
("quadpnghalf", (quadpnghalf_post - quadpnghalf_pre).subsec_nanos()),
("quadpngquarttall", (quadpngquarttall_post - quadpngquarttall_pre).subsec_nanos()),
("quadpngquartwide", (quadpngquartwide_post - quadpngquartwide_pre).subsec_nanos()),
("tripng", (tripng_post - tripng_pre).subsec_nanos()),
("tripngaverage", (tripngaverage_post - tripngaverage_pre).subsec_nanos()),
("tripngconst", (tripngconst_post - tripngconst_pre).subsec_nanos()),
]);
{
let bucket = z_buckets
@ -672,14 +681,23 @@ fn main() {
bucket.1 +=
(quadpngquartwide_post - quadpngquartwide_pre).subsec_nanos() as f32;
}
if false {
if true {
let bucket = z_buckets
.entry("tripng")
.entry("tripngaverage")
.or_default()
.entry(chunk.get_max_z() - chunk.get_min_z())
.or_insert((0, 0.0));
bucket.0 += 1;
bucket.1 += (tripng_post - tripng_pre).subsec_nanos() as f32;
bucket.1 += (tripngaverage_post - tripngaverage_pre).subsec_nanos() as f32;
}
if true {
let bucket = z_buckets
.entry("tripngconst")
.or_default()
.entry(chunk.get_max_z() - chunk.get_min_z())
.or_insert((0, 0.0));
bucket.0 += 1;
bucket.1 += (tripngconst_post - tripngconst_pre).subsec_nanos() as f32;
}
trace!(
"{} {}: uncompressed: {}, {:?} {} {:?}",