Merge branch 'aweinstock/deflate-terrain' into 'master'

Compress terrain chunks with deflate. Includes a benchmark showing that this...

See merge request veloren/veloren!2166
This commit is contained in:
Marcel 2021-04-22 17:12:53 +00:00
commit e14f8b9745
12 changed files with 368 additions and 28 deletions

View File

@ -43,6 +43,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Entities now have density
- Buoyancy is calculated from the difference in density between an entity and surrounding fluid
- Drag is now calculated based on physical properties
- Terrain chunks are now deflate-compressed when sent over the network.
### Changed

17
Cargo.lock generated
View File

@ -1308,6 +1308,15 @@ dependencies = [
"byteorder",
]
[[package]]
name = "deflate"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f95bf05dffba6e6cce8dfbb30def788154949ccd9aed761b472119c21e01c70"
dependencies = [
"adler32",
]
[[package]]
name = "derivative"
version = "2.2.0"
@ -3765,7 +3774,7 @@ checksum = "3c3287920cb847dee3de33d301c463fba14dda99db24214ddf93f83d3021f4c6"
dependencies = [
"bitflags",
"crc32fast",
"deflate",
"deflate 0.8.6",
"miniz_oxide 0.3.7",
]
@ -5524,6 +5533,8 @@ dependencies = [
name = "veloren-common-net"
version = "0.9.0"
dependencies = [
"bincode",
"flate2",
"hashbrown",
"serde",
"specs",
@ -5809,12 +5820,15 @@ dependencies = [
"bincode",
"bitvec",
"criterion",
"deflate 0.9.1",
"enum-iterator",
"flate2",
"fxhash",
"hashbrown",
"image",
"itertools 0.10.0",
"lazy_static",
"lz-fear",
"minifb",
"noise",
"num 0.4.0",
@ -5831,6 +5845,7 @@ dependencies = [
"tracing-subscriber",
"vek",
"veloren-common",
"veloren-common-frontend",
"veloren-common-net",
]

View File

@ -1923,15 +1923,17 @@ impl Client {
fn handle_server_terrain_msg(&mut self, msg: ServerGeneral) -> Result<(), Error> {
match msg {
ServerGeneral::TerrainChunkUpdate { key, chunk } => {
if let Ok(chunk) = chunk {
self.state.insert_chunk(key, chunk);
if let Some(chunk) = chunk.ok().and_then(|c| c.decompress()) {
self.state.insert_chunk(key, Arc::new(chunk));
}
self.pending_chunks.remove(&key);
},
ServerGeneral::TerrainBlockUpdates(mut blocks) => {
ServerGeneral::TerrainBlockUpdates(blocks) => {
if let Some(mut blocks) = blocks.decompress() {
blocks.drain().for_each(|(pos, block)| {
self.state.set_block(pos, block);
});
}
},
_ => unreachable!("Not a terrain message"),
}

View File

@ -14,6 +14,8 @@ default = ["simd"]
common = {package = "veloren-common", path = "../../common"}
#inline_tweak = "1.0.2"
bincode = "1.3.3"
flate2 = "1.0.20"
sum_type = "0.2.0"
vek = { version = "=0.14.1", features = ["serde"] }
tracing = { version = "0.1", default-features = false }

View File

@ -15,6 +15,8 @@ pub use self::{
};
use common::character::CharacterId;
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;
use tracing::trace;
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum PresenceKind {
@ -42,3 +44,60 @@ pub fn validate_chat_msg(msg: &str) -> Result<(), ChatMsgValidationError> {
Err(ChatMsgValidationError::TooLong)
}
}
/// Wrapper for compressed, serialized data (for stuff that doesn't use the
/// default lz4 compression)
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CompressedData<T> {
pub data: Vec<u8>,
compressed: bool,
_phantom: PhantomData<T>,
}
impl<T: Serialize + for<'a> Deserialize<'a>> CompressedData<T> {
pub fn compress(t: &T, level: u32) -> Self {
use flate2::{write::DeflateEncoder, Compression};
use std::io::Write;
let uncompressed = bincode::serialize(t)
.expect("bincode serialization can only fail if a byte limit is set");
if uncompressed.len() >= 32 {
const EXPECT_MSG: &str =
"compression only fails for fallible Read/Write impls (which Vec<u8> is not)";
let mut encoder = DeflateEncoder::new(Vec::new(), Compression::new(level));
encoder.write_all(&*uncompressed).expect(EXPECT_MSG);
let compressed = encoder.finish().expect(EXPECT_MSG);
trace!(
"compressed {}, uncompressed {}, ratio {}",
compressed.len(),
uncompressed.len(),
compressed.len() as f32 / uncompressed.len() as f32
);
CompressedData {
data: compressed,
compressed: true,
_phantom: PhantomData,
}
} else {
CompressedData {
data: uncompressed,
compressed: false,
_phantom: PhantomData,
}
}
}
pub fn decompress(&self) -> Option<T> {
use std::io::Read;
if self.compressed {
let mut uncompressed = Vec::new();
flate2::read::DeflateDecoder::new(&*self.data)
.read_to_end(&mut uncompressed)
.ok()?;
bincode::deserialize(&*uncompressed).ok()
} else {
bincode::deserialize(&*self.data).ok()
}
}
}

View File

@ -1,4 +1,4 @@
use super::{world_msg::EconomyInfo, ClientType, EcsCompPacket, PingMsg};
use super::{world_msg::EconomyInfo, ClientType, CompressedData, EcsCompPacket, PingMsg};
use crate::sync;
use common::{
character::{self, CharacterItem},
@ -12,7 +12,7 @@ use common::{
};
use hashbrown::HashMap;
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};
use std::time::Duration;
use vek::*;
///This struct contains all messages the server might send (on different
@ -106,9 +106,9 @@ pub enum ServerGeneral {
// Ingame related AND terrain stream
TerrainChunkUpdate {
key: Vec2<i32>,
chunk: Result<Arc<TerrainChunk>, ()>,
chunk: Result<CompressedData<TerrainChunk>, ()>,
},
TerrainBlockUpdates(HashMap<Vec3<i32>, Block>),
TerrainBlockUpdates(CompressedData<HashMap<Vec3<i32>, Block>>),
// Always possible
PlayerListUpdate(PlayerListUpdate),
/// A message to go into the client chat box. The client is responsible for

View File

@ -105,7 +105,7 @@ impl ConnectionHandler {
let mut register_stream = participant.open(3, reliablec, 500).await?;
let character_screen_stream = participant.open(3, reliablec, 500).await?;
let in_game_stream = participant.open(3, reliablec, 100_000).await?;
let terrain_stream = participant.open(4, reliablec, 20_000).await?;
let terrain_stream = participant.open(4, reliable, 20_000).await?;
let server_data = receiver.recv()?;

View File

@ -6,10 +6,9 @@ use common::{
vol::RectVolSize,
};
use common_ecs::{Job, Origin, ParMode, Phase, System};
use common_net::msg::{ClientGeneral, ServerGeneral};
use common_net::msg::{ClientGeneral, CompressedData, ServerGeneral};
use rayon::iter::ParallelIterator;
use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage};
use std::sync::Arc;
use tracing::{debug, trace};
/// This system will handle new messages from clients
@ -80,7 +79,7 @@ impl<'a> System<'a> for Sys {
network_metrics.chunks_served_from_memory.inc();
client.send(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(Arc::clone(chunk)),
chunk: Ok(CompressedData::compress(&chunk, 5)),
})?
},
None => {

View File

@ -14,7 +14,7 @@ use common::{
LoadoutBuilder, SkillSetBuilder,
};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::ServerGeneral;
use common_net::msg::{CompressedData, ServerGeneral};
use common_state::TerrainChanges;
use comp::Behavior;
use specs::{Join, Read, ReadStorage, Write, WriteExpect};
@ -224,7 +224,7 @@ impl<'a> System<'a> for Sys {
new_chunks.into_par_iter().for_each(|(key, chunk)| {
let mut msg = Some(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(chunk),
chunk: Ok(CompressedData::compress(&*chunk, 5)),
});
let mut lazy_msg = None;

View File

@ -1,10 +1,9 @@
use crate::{client::Client, presence::Presence};
use common::{comp::Pos, terrain::TerrainGrid};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::ServerGeneral;
use common_net::msg::{CompressedData, ServerGeneral};
use common_state::TerrainChanges;
use specs::{Join, Read, ReadExpect, ReadStorage};
use std::sync::Arc;
/// This systems sends new chunks to clients as well as changes to existing
/// chunks
@ -38,8 +37,8 @@ impl<'a> System<'a> for Sys {
if lazy_msg.is_none() {
lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate {
key: *chunk_key,
chunk: Ok(match terrain.get_key_arc(*chunk_key) {
Some(chunk) => Arc::clone(chunk),
chunk: Ok(match terrain.get_key(*chunk_key) {
Some(chunk) => CompressedData::compress(&chunk, 5),
None => break 'chunk,
}),
}));
@ -51,14 +50,16 @@ impl<'a> System<'a> for Sys {
// TODO: Don't send all changed blocks to all clients
// Sync changed blocks
if !terrain_changes.modified_blocks.is_empty() {
let mut lazy_msg = None;
for (_, client) in (&presences, &clients).join() {
if lazy_msg.is_none() {
lazy_msg = Some(client.prepare(ServerGeneral::TerrainBlockUpdates(
terrain_changes.modified_blocks.clone(),
CompressedData::compress(&terrain_changes.modified_blocks, 2),
)));
}
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));
}
}
}
}

View File

@ -7,6 +7,7 @@ edition = "2018"
[features]
tracy = ["common/tracy", "common-net/tracy"]
simd = ["vek/platform_intrinsics"]
bin_compression = ["lz-fear", "deflate", "flate2", "common-frontend"]
default = ["simd"]
@ -37,6 +38,12 @@ ron = { version = "0.6", default-features = false }
assets_manager = {version = "0.4.3", features = ["ron"]}
#inline_tweak = "1.0.2"
# compression benchmarks
lz-fear = { version = "0.1.1", optional = true }
deflate = { version = "0.9.1", optional = true }
flate2 = { version = "1.0.20", optional = true }
common-frontend = { package = "veloren-common-frontend", path = "../common/frontend", optional = true }
[dev-dependencies]
criterion = "0.3"
@ -48,3 +55,7 @@ structopt = "0.3"
[[bench]]
harness = false
name = "tree"
[[bin]]
name = "chunk_compression_benchmarks"
required-features = ["bin_compression"]

View File

@ -0,0 +1,250 @@
use common::{
spiral::Spiral2d,
terrain::{chonk::Chonk, Block, BlockKind, SpriteKind},
vol::{IntoVolIterator, RectVolSize, SizedVol, WriteVol},
volumes::dyna::{Access, ColumnAccess, Dyna},
};
use hashbrown::HashMap;
use std::{
io::{Read, Write},
time::Instant,
};
use tracing::{debug, trace};
use vek::*;
use veloren_world::{
sim::{FileOpts, WorldOpts, DEFAULT_WORLD_MAP},
World,
};
fn lz4_with_dictionary(data: &[u8], dictionary: &[u8]) -> Vec<u8> {
let mut compressed = Vec::new();
lz_fear::CompressionSettings::default()
.dictionary(0, &dictionary)
.compress(data, &mut compressed)
.unwrap();
compressed
}
#[allow(dead_code)]
fn unlz4_with_dictionary(data: &[u8], dictionary: &[u8]) -> Option<Vec<u8>> {
lz_fear::LZ4FrameReader::new(data).ok().and_then(|r| {
let mut uncompressed = Vec::new();
r.into_read_with_dictionary(dictionary)
.read_to_end(&mut uncompressed)
.ok()?;
bincode::deserialize(&*uncompressed).ok()
})
}
#[allow(dead_code)]
fn do_deflate(data: &[u8]) -> Vec<u8> {
use deflate::{write::DeflateEncoder, Compression};
let mut encoder = DeflateEncoder::new(Vec::new(), Compression::Fast);
encoder.write_all(data).expect("Write error!");
let compressed_data = encoder.finish().expect("Failed to finish compression!");
compressed_data
}
fn do_deflate_flate2(data: &[u8]) -> Vec<u8> {
use flate2::{write::DeflateEncoder, Compression};
let mut encoder = DeflateEncoder::new(Vec::new(), Compression::new(5));
encoder.write_all(data).expect("Write error!");
let compressed_data = encoder.finish().expect("Failed to finish compression!");
compressed_data
}
fn chonk_to_dyna<V: Clone, S: RectVolSize, M: Clone, A: Access>(
chonk: &Chonk<V, S, M>,
block: V,
) -> Dyna<V, M, A> {
let mut dyna = Dyna::<V, M, A>::filled(
Vec3::new(
S::RECT_SIZE.x,
S::RECT_SIZE.y,
(chonk.get_max_z() - chonk.get_min_z()) as u32,
),
block,
chonk.meta().clone(),
);
for (pos, block) in chonk.vol_iter(
Vec3::new(0, 0, chonk.get_min_z()),
Vec3::new(S::RECT_SIZE.x as _, S::RECT_SIZE.y as _, chonk.get_max_z()),
) {
dyna.set(pos - chonk.get_min_z() * Vec3::unit_z(), block.clone())
.expect("a bug here represents the arithmetic being wrong");
}
dyna
}
fn channelize_dyna<M: Clone, A: Access>(
dyna: &Dyna<Block, M, A>,
) -> (
Dyna<BlockKind, M, A>,
Vec<u8>,
Vec<u8>,
Vec<u8>,
Vec<SpriteKind>,
) {
let mut blocks = Dyna::filled(dyna.sz, BlockKind::Air, dyna.metadata().clone());
let (mut r, mut g, mut b, mut sprites) = (Vec::new(), Vec::new(), Vec::new(), Vec::new());
for (pos, block) in dyna.vol_iter(dyna.lower_bound(), dyna.upper_bound()) {
blocks.set(pos, **block).unwrap();
match (block.get_color(), block.get_sprite()) {
(Some(rgb), None) => {
r.push(rgb.r);
g.push(rgb.g);
b.push(rgb.b);
},
(None, Some(spritekind)) => {
sprites.push(spritekind);
},
_ => panic!(
"attr being used for color vs sprite is mutually exclusive (and that's required \
for this translation to be lossless), but there's no way to guarantee that at \
the type level with Block's public API"
),
}
}
(blocks, r, g, b, sprites)
}
fn histogram_to_dictionary(histogram: &HashMap<Vec<u8>, usize>, dictionary: &mut Vec<u8>) {
let mut tmp: Vec<(Vec<u8>, usize)> = histogram.iter().map(|(k, v)| (k.clone(), *v)).collect();
tmp.sort_by_key(|(_, count)| *count);
debug!("{:?}", tmp.last());
let mut i = 0;
let mut j = tmp.len() - 1;
while i < dictionary.len() && j > 0 {
let (k, v) = &tmp[j];
let dlen = dictionary.len();
let n = (i + k.len()).min(dlen);
dictionary[i..n].copy_from_slice(&k[0..k.len().min(dlen - i)]);
debug!("{}: {}: {:?}", tmp.len() - j, v, k);
j -= 1;
i = n;
}
}
fn main() {
common_frontend::init_stdout(None);
println!("Loading world");
let (world, index) = World::generate(59686, WorldOpts {
seed_elements: true,
world_file: FileOpts::LoadAsset(DEFAULT_WORLD_MAP.into()),
..WorldOpts::default()
});
println!("Loaded world");
let mut histogram: HashMap<Vec<u8>, usize> = HashMap::new();
let mut histogram2: HashMap<Vec<u8>, usize> = HashMap::new();
let mut dictionary = vec![0xffu8; 1 << 16];
let mut dictionary2 = vec![0xffu8; 1 << 16];
let k = 32;
let sz = world.sim().get_size();
let mut totals = [0.0; 5];
let mut total_timings = [0.0; 2];
let mut count = 0;
for (i, (x, y)) in Spiral2d::new()
.radius(20)
.map(|v| (v.x + sz.x as i32 / 2, v.y + sz.y as i32 / 2))
.enumerate()
{
let chunk = world.generate_chunk(index.as_index_ref(), Vec2::new(x as _, y as _), || false);
if let Ok((chunk, _)) = chunk {
let uncompressed = bincode::serialize(&chunk).unwrap();
for w in uncompressed.windows(k) {
*histogram.entry(w.to_vec()).or_default() += 1;
}
if i % 128 == 0 {
histogram_to_dictionary(&histogram, &mut dictionary);
}
let lz4chonk_pre = Instant::now();
let lz4_chonk = lz4_with_dictionary(&bincode::serialize(&chunk).unwrap(), &[]);
let lz4chonk_post = Instant::now();
//let lz4_dict_chonk = SerializedTerrainChunk::from_chunk(&chunk,
// &*dictionary);
let deflatechonk_pre = Instant::now();
let deflate_chonk = do_deflate_flate2(&bincode::serialize(&chunk).unwrap());
let deflatechonk_post = Instant::now();
let dyna: Dyna<_, _, ColumnAccess> = chonk_to_dyna(&chunk, Block::empty());
let ser_dyna = bincode::serialize(&dyna).unwrap();
for w in ser_dyna.windows(k) {
*histogram2.entry(w.to_vec()).or_default() += 1;
}
if i % 128 == 0 {
histogram_to_dictionary(&histogram2, &mut dictionary2);
}
let lz4_dyna = lz4_with_dictionary(&*ser_dyna, &[]);
//let lz4_dict_dyna = lz4_with_dictionary(&*ser_dyna, &dictionary2);
let deflate_dyna = do_deflate(&*ser_dyna);
let deflate_channeled_dyna =
do_deflate_flate2(&bincode::serialize(&channelize_dyna(&dyna)).unwrap());
let n = uncompressed.len();
let sizes = [
lz4_chonk.len() as f32 / n as f32,
deflate_chonk.len() as f32 / n as f32,
lz4_dyna.len() as f32 / n as f32,
deflate_dyna.len() as f32 / n as f32,
deflate_channeled_dyna.len() as f32 / n as f32,
];
let best_idx = sizes
.iter()
.enumerate()
.fold((1.0, 0), |(best, i), (j, ratio)| {
if ratio < &best {
(*ratio, j)
} else {
(best, i)
}
})
.1;
let timings = [
(lz4chonk_post - lz4chonk_pre).subsec_nanos(),
(deflatechonk_post - deflatechonk_pre).subsec_nanos(),
];
trace!(
"{} {}: uncompressed: {}, {:?} {} {:?}",
x,
y,
n,
sizes,
best_idx,
timings
);
for j in 0..5 {
totals[j] += sizes[j];
}
for j in 0..2 {
total_timings[j] += timings[j] as f32;
}
count += 1;
}
if i % 64 == 0 {
println!("Chunks processed: {}\n", count);
println!("Average lz4_chonk: {}", totals[0] / count as f32);
println!("Average deflate_chonk: {}", totals[1] / count as f32);
println!("Average lz4_dyna: {}", totals[2] / count as f32);
println!("Average deflate_dyna: {}", totals[3] / count as f32);
println!(
"Average deflate_channeled_dyna: {}",
totals[4] / count as f32
);
println!("");
println!(
"Average lz4_chonk nanos : {:02}",
total_timings[0] / count as f32
);
println!(
"Average deflate_chonk nanos: {:02}",
total_timings[1] / count as f32
);
println!("-----");
}
if i % 256 == 0 {
histogram.clear();
}
}
}