From 6c756c2440eface87d70fb9209f4dc3f98fb5ddb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Mon, 9 May 2022 01:11:46 +0200 Subject: [PATCH 1/6] first implementation of defering serialisation --- server/src/chunk_serialize.rs | 19 ++++ server/src/lib.rs | 4 + server/src/sys/chunk_serialize.rs | 161 ++++++++++++++++++++++++++++++ server/src/sys/mod.rs | 2 + server/src/sys/msg/terrain.rs | 46 ++++----- server/src/sys/terrain.rs | 81 ++------------- server/src/sys/terrain_sync.rs | 26 ++--- 7 files changed, 223 insertions(+), 116 deletions(-) create mode 100644 server/src/chunk_serialize.rs create mode 100644 server/src/sys/chunk_serialize.rs diff --git a/server/src/chunk_serialize.rs b/server/src/chunk_serialize.rs new file mode 100644 index 0000000000..02473e9380 --- /dev/null +++ b/server/src/chunk_serialize.rs @@ -0,0 +1,19 @@ +use specs::Component; +use specs_idvs::IdvStorage; +use vek::Vec2; + +/// Curing the runtime of a tick, multiple systems can request a chunk to be +/// synced to a client E.g. msg::terrain will do so, when a client requested a +/// chunk that already exist terrain will do so when a chunk came back from +/// ChunkGeneration. All those sends are deferred by this queue. +/// Deferring allows us to remove code duplication and maybe serialize ONCE, +/// send to MULTIPLE clients TODO: store a urgent flag and seperate even more, 5 +/// ticks vs 5 seconds +#[derive(Default, Clone, Debug, PartialEq)] +pub struct ChunkSendQueue { + pub chunks: Vec>, +} + +impl Component for ChunkSendQueue { + type Storage = IdvStorage; +} diff --git a/server/src/lib.rs b/server/src/lib.rs index 7fb2073d81..576eb478d9 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -16,6 +16,7 @@ pub mod alias_validator; mod character_creator; pub mod chunk_generator; +mod chunk_serialize; pub mod client; pub mod cmd; pub mod connection_handler; @@ -321,6 +322,9 @@ impl Server { state.ecs_mut().register::(); state.ecs_mut().register::(); state.ecs_mut().register::(); + state + .ecs_mut() + .register::(); //Alias validator let banned_words_paths = &settings.banned_words_files; diff --git a/server/src/sys/chunk_serialize.rs b/server/src/sys/chunk_serialize.rs new file mode 100644 index 0000000000..e10d1d216d --- /dev/null +++ b/server/src/sys/chunk_serialize.rs @@ -0,0 +1,161 @@ +use crate::{ + chunk_serialize::ChunkSendQueue, client::Client, metrics::NetworkRequestMetrics, + presence::Presence, Tick, +}; +use common::{slowjob::SlowJobPool, terrain::TerrainGrid}; + +use common_ecs::{Job, Origin, Phase, System}; +use common_net::msg::{SerializedTerrainChunk, ServerGeneral}; +use hashbrown::HashMap; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage, WriteStorage}; +use std::cmp::Ordering; + +pub(crate) struct LazyTerrainMessage { + lazy_msg_lo: Option, + lazy_msg_hi: Option, +} + +pub const SAFE_ZONE_RADIUS: f32 = 200.0; + +impl LazyTerrainMessage { + pub(crate) fn new() -> Self { + Self { + lazy_msg_lo: None, + lazy_msg_hi: None, + } + } + + pub(crate) fn prepare_and_send< + 'a, + A, + F: FnOnce() -> Result<&'a common::terrain::TerrainChunk, A>, + >( + &mut self, + network_metrics: &NetworkRequestMetrics, + client: &Client, + presence: &Presence, + chunk_key: &vek::Vec2, + generate_chunk: F, + ) -> Result<(), A> { + let lazy_msg = if presence.lossy_terrain_compression { + &mut self.lazy_msg_lo + } else { + &mut self.lazy_msg_hi + }; + if lazy_msg.is_none() { + *lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { + key: *chunk_key, + chunk: Ok(match generate_chunk() { + Ok(chunk) => SerializedTerrainChunk::via_heuristic( + chunk, + presence.lossy_terrain_compression, + ), + Err(e) => return Err(e), + }), + })); + } + lazy_msg.as_ref().map(|msg| { + let _ = client.send_prepared(msg); + if presence.lossy_terrain_compression { + network_metrics.chunks_served_lossy.inc(); + } else { + network_metrics.chunks_served_lossless.inc(); + } + }); + Ok(()) + } +} + +/// This system will handle sending terrain to clients by +/// collecting chunks that need to be send for a single generation run and then +/// trigger a SlowJob for serialisation. +#[derive(Default)] +pub struct Sys; +impl<'a> System<'a> for Sys { + type SystemData = ( + Read<'a, Tick>, + Entities<'a>, + ReadStorage<'a, Client>, + ReadStorage<'a, Presence>, + WriteStorage<'a, ChunkSendQueue>, + ReadExpect<'a, SlowJobPool>, + ReadExpect<'a, TerrainGrid>, + ReadExpect<'a, NetworkRequestMetrics>, + ); + + const NAME: &'static str = "chunk_serialize"; + const ORIGIN: Origin = Origin::Server; + const PHASE: Phase = Phase::Create; + + fn run( + _job: &mut Job, + ( + tick, + entities, + clients, + presences, + mut chunk_send_queues, + slow_jobs, + terrain, + network_metrics, + ): Self::SystemData, + ) { + // Only operate once per second + if tick.0.rem_euclid(60) != 0 { + return; + } + + let mut chunks = HashMap::<_, Vec<_>>::new(); + + for entity in (&entities, &clients, &presences, !&chunk_send_queues) + .join() + .map(|(e, _, _, _)| e) + .collect::>() + { + let _ = chunk_send_queues.insert(entity, ChunkSendQueue::default()); + } + + // Grab all chunk requests for all clients and sort them + for (entity, _client, chunk_send_queue) in + (&entities, &clients, &mut chunk_send_queues).join() + { + let mut chunk_send_queue = std::mem::take(chunk_send_queue); + // dedup input + chunk_send_queue.chunks.sort_by(|a, b| { + let zero = a.x.partial_cmp(&b.x).unwrap_or(Ordering::Equal); + let one = a.y.partial_cmp(&b.y).unwrap_or(Ordering::Equal); + if matches!(zero, Ordering::Equal) { + one + } else { + zero + } + }); + chunk_send_queue.chunks.dedup(); + for chunk_key in chunk_send_queue.chunks { + let recipients = chunks.entry(chunk_key).or_default(); + recipients.push(entity); + } + } + + if !chunks.is_empty() { + let len = chunks.len(); + print!("{}", len); + for (chunk_key, entities) in chunks { + let mut lazy_msg = LazyTerrainMessage::new(); + for entity in entities { + let client = clients.get(entity).unwrap(); + let presence = presences.get(entity).unwrap(); + if let Err(e) = lazy_msg.prepare_and_send( + &network_metrics, + client, + presence, + &chunk_key, + || terrain.get_key(chunk_key).ok_or(()), + ) { + tracing::error!(?e, "error sending chunk"); + } + } + } + } + } +} diff --git a/server/src/sys/mod.rs b/server/src/sys/mod.rs index d0f76023c6..28db903e55 100644 --- a/server/src/sys/mod.rs +++ b/server/src/sys/mod.rs @@ -1,4 +1,5 @@ pub mod agent; +pub mod chunk_serialize; pub mod entity_sync; pub mod invite_timeout; pub mod metrics; @@ -33,6 +34,7 @@ pub fn add_server_systems(dispatch_builder: &mut DispatcherBuilder) { dispatch::(dispatch_builder, &[]); dispatch::(dispatch_builder, &[]); dispatch::(dispatch_builder, &[]); + dispatch::(dispatch_builder, &[]); } pub fn run_sync_systems(ecs: &mut specs::World) { diff --git a/server/src/sys/msg/terrain.rs b/server/src/sys/msg/terrain.rs index ab1ffec448..6b492394ab 100644 --- a/server/src/sys/msg/terrain.rs +++ b/server/src/sys/msg/terrain.rs @@ -1,5 +1,6 @@ use crate::{ - client::Client, lod::Lod, metrics::NetworkRequestMetrics, presence::Presence, ChunkRequest, + chunk_serialize::ChunkSendQueue, client::Client, lod::Lod, metrics::NetworkRequestMetrics, + presence::Presence, ChunkRequest, }; use common::{ comp::Pos, @@ -9,9 +10,9 @@ use common::{ vol::RectVolSize, }; use common_ecs::{Job, Origin, ParMode, Phase, System}; -use common_net::msg::{ClientGeneral, SerializedTerrainChunk, ServerGeneral}; +use common_net::msg::{ClientGeneral, ServerGeneral}; use rayon::iter::ParallelIterator; -use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write}; +use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write, WriteStorage}; use tracing::{debug, trace}; /// This system will handle new messages from clients @@ -21,6 +22,7 @@ impl<'a> System<'a> for Sys { type SystemData = ( Entities<'a>, Read<'a, EventBus>, + WriteStorage<'a, ChunkSendQueue>, ReadExpect<'a, TerrainGrid>, ReadExpect<'a, Lod>, ReadExpect<'a, NetworkRequestMetrics>, @@ -39,6 +41,7 @@ impl<'a> System<'a> for Sys { ( entities, server_event_bus, + mut chunk_send_queues, terrain, lod, network_metrics, @@ -49,11 +52,16 @@ impl<'a> System<'a> for Sys { ): Self::SystemData, ) { job.cpu_stats.measure(ParMode::Rayon); - let mut new_chunk_requests = (&entities, &clients, (&presences).maybe()) + let mut new_chunk_requests = ( + &entities, + &clients, + (&presences).maybe(), + &mut chunk_send_queues, + ) .par_join() - .map(|(entity, client, maybe_presence)| { + .map(|(entity, client, maybe_presence, chunk_send_queue)| { let mut chunk_requests = Vec::new(); - let _ = super::try_recv_all(client, 5, |client, msg| { + let _ = super::try_recv_all(client, 5, |_, msg| { // TODO: Refactor things (https://gitlab.com/veloren/veloren/-/merge_requests/3245#note_856538056) let mut server_emitter = server_event_bus.emitter(); let presence = match maybe_presence { @@ -80,26 +88,12 @@ impl<'a> System<'a> for Sys { true }; if in_vd { - match terrain.get_key_arc(key) { - Some(chunk) => { - network_metrics.chunks_served_from_memory.inc(); - client.send(ServerGeneral::TerrainChunkUpdate { - key, - chunk: Ok(SerializedTerrainChunk::via_heuristic( - chunk, - presence.lossy_terrain_compression, - )), - })?; - if presence.lossy_terrain_compression { - network_metrics.chunks_served_lossy.inc(); - } else { - network_metrics.chunks_served_lossless.inc(); - } - }, - None => { - network_metrics.chunks_generation_triggered.inc(); - chunk_requests.push(ChunkRequest { entity, key }); - }, + if terrain.get_key_arc(key).is_some() { + network_metrics.chunks_served_from_memory.inc(); + chunk_send_queue.chunks.push(key); + } else { + network_metrics.chunks_generation_triggered.inc(); + chunk_requests.push(ChunkRequest { entity, key }); } } else { network_metrics.chunks_request_dropped.inc(); diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 543830d242..43c76b436a 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -7,8 +7,8 @@ use world::{IndexOwned, World}; use crate::{ chunk_generator::ChunkGenerator, + chunk_serialize::ChunkSendQueue, client::Client, - metrics::NetworkRequestMetrics, presence::{Presence, RepositionOnChunkLoad}, rtsim::RtSim, settings::Settings, @@ -29,7 +29,7 @@ use common::{ }; use common_ecs::{Job, Origin, Phase, System}; -use common_net::msg::{SerializedTerrainChunk, ServerGeneral}; +use common_net::msg::ServerGeneral; use common_state::TerrainChanges; use comp::Behavior; use specs::{Entities, Join, Read, ReadExpect, ReadStorage, Write, WriteExpect, WriteStorage}; @@ -41,62 +41,8 @@ pub type TerrainPersistenceData<'a> = Option>; #[cfg(not(feature = "persistent_world"))] pub type TerrainPersistenceData<'a> = (); -pub(crate) struct LazyTerrainMessage { - lazy_msg_lo: Option, - lazy_msg_hi: Option, -} - pub const SAFE_ZONE_RADIUS: f32 = 200.0; -impl LazyTerrainMessage { - pub(crate) fn new() -> Self { - Self { - lazy_msg_lo: None, - lazy_msg_hi: None, - } - } - - pub(crate) fn prepare_and_send< - 'a, - A, - F: FnOnce() -> Result<&'a common::terrain::TerrainChunk, A>, - >( - &mut self, - network_metrics: &NetworkRequestMetrics, - client: &Client, - presence: &Presence, - chunk_key: &vek::Vec2, - generate_chunk: F, - ) -> Result<(), A> { - let lazy_msg = if presence.lossy_terrain_compression { - &mut self.lazy_msg_lo - } else { - &mut self.lazy_msg_hi - }; - if lazy_msg.is_none() { - *lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { - key: *chunk_key, - chunk: Ok(match generate_chunk() { - Ok(chunk) => SerializedTerrainChunk::via_heuristic( - chunk, - presence.lossy_terrain_compression, - ), - Err(e) => return Err(e), - }), - })); - } - lazy_msg.as_ref().map(|msg| { - let _ = client.send_prepared(msg); - if presence.lossy_terrain_compression { - network_metrics.chunks_served_lossy.inc(); - } else { - network_metrics.chunks_served_lossless.inc(); - } - }); - Ok(()) - } -} - /// This system will handle loading generated chunks and unloading /// unneeded chunks. /// 1. Inserts newly generated chunks into the TerrainGrid @@ -117,7 +63,6 @@ impl<'a> System<'a> for Sys { ReadExpect<'a, SlowJobPool>, ReadExpect<'a, IndexOwned>, ReadExpect<'a, Arc>, - ReadExpect<'a, NetworkRequestMetrics>, WriteExpect<'a, ChunkGenerator>, WriteExpect<'a, TerrainGrid>, Write<'a, TerrainChanges>, @@ -128,6 +73,7 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Presence>, ReadStorage<'a, Client>, Entities<'a>, + WriteStorage<'a, ChunkSendQueue>, WriteStorage<'a, RepositionOnChunkLoad>, WriteStorage<'a, ForceUpdate>, WriteStorage<'a, Waypoint>, @@ -150,7 +96,6 @@ impl<'a> System<'a> for Sys { slow_jobs, index, world, - network_metrics, mut chunk_generator, mut terrain, mut terrain_changes, @@ -161,6 +106,7 @@ impl<'a> System<'a> for Sys { presences, clients, entities, + mut chunk_send_queues, mut reposition_on_load, mut force_update, mut waypoints, @@ -306,13 +252,10 @@ impl<'a> System<'a> for Sys { } // Send the chunk to all nearby players. - use rayon::iter::{IntoParallelIterator, ParallelIterator}; - new_chunks.into_par_iter().for_each(|(key, chunk)| { - let mut lazy_msg = LazyTerrainMessage::new(); - - (&presences, &positions, &clients) + new_chunks.into_iter().for_each(|(key, chunk)| { + (&presences, &positions, &clients, &mut chunk_send_queues) .join() - .for_each(|(presence, pos, client)| { + .for_each(|(presence, pos, client, chunk_send_queue)| { let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32)); // Subtract 2 from the offset before computing squared magnitude // 1 since chunks need neighbors to be meshed @@ -322,15 +265,7 @@ impl<'a> System<'a> for Sys { .magnitude_squared(); if adjusted_dist_sqr <= presence.view_distance.pow(2) { - lazy_msg - .prepare_and_send::( - &network_metrics, - client, - presence, - &key, - || Ok(&*chunk), - ) - .into_ok(); + chunk_send_queue.chunks.push(key); } }); }); diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index e737c1c582..119cffd048 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -1,10 +1,9 @@ -use super::terrain::LazyTerrainMessage; -use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence}; +use crate::{chunk_serialize::ChunkSendQueue, client::Client, presence::Presence}; use common::{comp::Pos, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{CompressedData, ServerGeneral}; use common_state::TerrainChanges; -use specs::{Join, Read, ReadExpect, ReadStorage}; +use specs::{Join, Read, ReadExpect, ReadStorage, WriteStorage}; /// This systems sends new chunks to clients as well as changes to existing /// chunks @@ -14,10 +13,10 @@ impl<'a> System<'a> for Sys { type SystemData = ( ReadExpect<'a, TerrainGrid>, Read<'a, TerrainChanges>, + WriteStorage<'a, ChunkSendQueue>, ReadStorage<'a, Pos>, ReadStorage<'a, Presence>, ReadStorage<'a, Client>, - ReadExpect<'a, NetworkRequestMetrics>, ); const NAME: &'static str = "terrain_sync"; @@ -26,23 +25,16 @@ impl<'a> System<'a> for Sys { fn run( _job: &mut Job, - (terrain, terrain_changes, positions, presences, clients, network_metrics): Self::SystemData, + (terrain, terrain_changes, mut chunk_send_queues, positions, presences, clients): Self::SystemData, ) { // Sync changed chunks - 'chunk: for chunk_key in &terrain_changes.modified_chunks { - let mut lazy_msg = LazyTerrainMessage::new(); - for (presence, pos, client) in (&presences, &positions, &clients).join() { + for chunk_key in &terrain_changes.modified_chunks { + for (presence, pos, chunk_send_queue) in + (&presences, &positions, &mut chunk_send_queues).join() + { if super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance) { - if let Err(()) = lazy_msg.prepare_and_send( - &network_metrics, - client, - presence, - chunk_key, - || terrain.get_key(*chunk_key).ok_or(()), - ) { - break 'chunk; - } + chunk_send_queue.chunks.push(*chunk_key); } } } From 9b536937833542460f5bc737c2f5108d9023423a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Tue, 10 May 2022 09:11:11 +0200 Subject: [PATCH 2/6] switch to a system where chunk_serialize will start a SlowJow that is then consumed by chunk_send --- server/src/chunk_serialize.rs | 9 +- server/src/client.rs | 15 ++++ server/src/lib.rs | 15 +++- server/src/sys/chunk_send.rs | 37 ++++++++ server/src/sys/chunk_serialize.rs | 143 ++++++++++++------------------ server/src/sys/mod.rs | 4 + server/src/sys/terrain.rs | 4 +- 7 files changed, 132 insertions(+), 95 deletions(-) create mode 100644 server/src/sys/chunk_send.rs diff --git a/server/src/chunk_serialize.rs b/server/src/chunk_serialize.rs index 02473e9380..2755ce15bd 100644 --- a/server/src/chunk_serialize.rs +++ b/server/src/chunk_serialize.rs @@ -1,4 +1,5 @@ -use specs::Component; +use crate::client::PreparedMsg; +use specs::{Component, Entity}; use specs_idvs::IdvStorage; use vek::Vec2; @@ -17,3 +18,9 @@ pub struct ChunkSendQueue { impl Component for ChunkSendQueue { type Storage = IdvStorage; } + +pub struct SerializedChunk { + pub(crate) lossy_compression: bool, + pub(crate) msg: PreparedMsg, + pub(crate) recipients: Vec, +} diff --git a/server/src/client.rs b/server/src/client.rs index d357d3d4e1..3187fd0cc4 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -216,6 +216,21 @@ impl Client { } } + pub(crate) fn terrain_params(&self) -> StreamParams { self.terrain_stream_params.clone() } + + pub(crate) fn prepare_terrain( + terrain_chunk_update: ServerGeneral, + params: &StreamParams, + ) -> PreparedMsg { + if !matches!( + terrain_chunk_update, + ServerGeneral::TerrainChunkUpdate { .. } + ) { + unreachable!("You must not call this function without a terrain chunk update!") + } + PreparedMsg::new(5, &terrain_chunk_update, params) + } + pub(crate) fn recv( &self, stream_id: u8, diff --git a/server/src/lib.rs b/server/src/lib.rs index 576eb478d9..b2ed1de77f 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -283,13 +283,20 @@ impl Server { compiled with the feature. Terrain modifications will *not* be persisted." ); } - state - .ecs_mut() - .write_resource::() - .configure("CHUNK_GENERATOR", |n| n / 2 + n / 4); + { + let pool = state.ecs_mut().write_resource::(); + pool.configure("CHUNK_GENERATOR", |n| n / 2 + n / 4); + pool.configure("CHUNK_SERIALIZER", |n| n / 2); + } state .ecs_mut() .insert(ChunkGenerator::new(chunk_gen_metrics)); + { + let (sender, receiver) = + crossbeam_channel::bounded::(10_000); + state.ecs_mut().insert(sender); + state.ecs_mut().insert(receiver); + } state.ecs_mut().insert(CharacterUpdater::new( Arc::>::clone(&database_settings), diff --git a/server/src/sys/chunk_send.rs b/server/src/sys/chunk_send.rs new file mode 100644 index 0000000000..eb6d13390d --- /dev/null +++ b/server/src/sys/chunk_send.rs @@ -0,0 +1,37 @@ +use crate::{chunk_serialize::SerializedChunk, client::Client, metrics::NetworkRequestMetrics}; + +use common_ecs::{Job, Origin, Phase, System}; +use specs::{ReadExpect, ReadStorage}; + +/// This system will handle sending terrain to clients by +/// collecting chunks that need to be send for a single generation run and then +/// trigger a SlowJob for serialisation. +#[derive(Default)] +pub struct Sys; +impl<'a> System<'a> for Sys { + type SystemData = ( + ReadStorage<'a, Client>, + ReadExpect<'a, NetworkRequestMetrics>, + ReadExpect<'a, crossbeam_channel::Receiver>, + ); + + const NAME: &'static str = "chunk_send"; + const ORIGIN: Origin = Origin::Server; + const PHASE: Phase = Phase::Create; + + fn run(_job: &mut Job, (clients, network_metrics, chunk_receiver): Self::SystemData) { + for sc in chunk_receiver.try_iter() { + for recipient in sc.recipients { + if let Some(client) = clients.get(recipient) { + if client.send_prepared(&sc.msg).is_err() { + if sc.lossy_compression { + network_metrics.chunks_served_lossy.inc() + } else { + network_metrics.chunks_served_lossless.inc() + } + } + } + } + } + } +} diff --git a/server/src/sys/chunk_serialize.rs b/server/src/sys/chunk_serialize.rs index e10d1d216d..33575d64df 100644 --- a/server/src/sys/chunk_serialize.rs +++ b/server/src/sys/chunk_serialize.rs @@ -1,70 +1,16 @@ use crate::{ - chunk_serialize::ChunkSendQueue, client::Client, metrics::NetworkRequestMetrics, - presence::Presence, Tick, + chunk_serialize::{ChunkSendQueue, SerializedChunk}, + client::Client, + presence::Presence, + Tick, }; use common::{slowjob::SlowJobPool, terrain::TerrainGrid}; - use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{SerializedTerrainChunk, ServerGeneral}; use hashbrown::HashMap; -use specs::{Entities, Join, Read, ReadExpect, ReadStorage, WriteStorage}; -use std::cmp::Ordering; - -pub(crate) struct LazyTerrainMessage { - lazy_msg_lo: Option, - lazy_msg_hi: Option, -} - -pub const SAFE_ZONE_RADIUS: f32 = 200.0; - -impl LazyTerrainMessage { - pub(crate) fn new() -> Self { - Self { - lazy_msg_lo: None, - lazy_msg_hi: None, - } - } - - pub(crate) fn prepare_and_send< - 'a, - A, - F: FnOnce() -> Result<&'a common::terrain::TerrainChunk, A>, - >( - &mut self, - network_metrics: &NetworkRequestMetrics, - client: &Client, - presence: &Presence, - chunk_key: &vek::Vec2, - generate_chunk: F, - ) -> Result<(), A> { - let lazy_msg = if presence.lossy_terrain_compression { - &mut self.lazy_msg_lo - } else { - &mut self.lazy_msg_hi - }; - if lazy_msg.is_none() { - *lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { - key: *chunk_key, - chunk: Ok(match generate_chunk() { - Ok(chunk) => SerializedTerrainChunk::via_heuristic( - chunk, - presence.lossy_terrain_compression, - ), - Err(e) => return Err(e), - }), - })); - } - lazy_msg.as_ref().map(|msg| { - let _ = client.send_prepared(msg); - if presence.lossy_terrain_compression { - network_metrics.chunks_served_lossy.inc(); - } else { - network_metrics.chunks_served_lossless.inc(); - } - }); - Ok(()) - } -} +use network::StreamParams; +use specs::{Entities, Entity, Join, Read, ReadExpect, ReadStorage, WriteStorage}; +use std::{cmp::Ordering, sync::Arc}; /// This system will handle sending terrain to clients by /// collecting chunks that need to be send for a single generation run and then @@ -80,7 +26,7 @@ impl<'a> System<'a> for Sys { WriteStorage<'a, ChunkSendQueue>, ReadExpect<'a, SlowJobPool>, ReadExpect<'a, TerrainGrid>, - ReadExpect<'a, NetworkRequestMetrics>, + ReadExpect<'a, crossbeam_channel::Sender>, ); const NAME: &'static str = "chunk_serialize"; @@ -97,16 +43,15 @@ impl<'a> System<'a> for Sys { mut chunk_send_queues, slow_jobs, terrain, - network_metrics, + chunk_sender, ): Self::SystemData, ) { // Only operate once per second - if tick.0.rem_euclid(60) != 0 { + //TODO: move out of this system and now even spawn this. + if tick.0.rem_euclid(30) != 0 { return; } - let mut chunks = HashMap::<_, Vec<_>>::new(); - for entity in (&entities, &clients, &presences, !&chunk_send_queues) .join() .map(|(e, _, _, _)| e) @@ -115,9 +60,16 @@ impl<'a> System<'a> for Sys { let _ = chunk_send_queues.insert(entity, ChunkSendQueue::default()); } + struct Metadata { + recipients: Vec, + lossy_compression: bool, + params: StreamParams, + } + + let mut chunks = HashMap::<_, Metadata>::new(); // Grab all chunk requests for all clients and sort them - for (entity, _client, chunk_send_queue) in - (&entities, &clients, &mut chunk_send_queues).join() + for (entity, client, presence, chunk_send_queue) in + (&entities, &clients, &presences, &mut chunk_send_queues).join() { let mut chunk_send_queue = std::mem::take(chunk_send_queue); // dedup input @@ -132,29 +84,44 @@ impl<'a> System<'a> for Sys { }); chunk_send_queue.chunks.dedup(); for chunk_key in chunk_send_queue.chunks { - let recipients = chunks.entry(chunk_key).or_default(); - recipients.push(entity); + let meta = chunks.entry(chunk_key).or_insert_with(|| Metadata { + recipients: Vec::default(), + lossy_compression: true, + params: client.terrain_params(), + }); + meta.recipients.push(entity); + // We decide here, to ONLY send lossy compressed data If all clients want those. + // If at least 1 client here does not want lossy we don't compress it twice. + // It would just be too expensive for the server + meta.lossy_compression = + meta.lossy_compression && presence.lossy_terrain_compression; } } - if !chunks.is_empty() { - let len = chunks.len(); - print!("{}", len); - for (chunk_key, entities) in chunks { - let mut lazy_msg = LazyTerrainMessage::new(); - for entity in entities { - let client = clients.get(entity).unwrap(); - let presence = presences.get(entity).unwrap(); - if let Err(e) = lazy_msg.prepare_and_send( - &network_metrics, - client, - presence, - &chunk_key, - || terrain.get_key(chunk_key).ok_or(()), - ) { - tracing::error!(?e, "error sending chunk"); - } - } + // Trigger serialization in a SlowJob + for (chunk_key, meta) in chunks { + if let Some(chunk) = terrain.get_key_arc(chunk_key) { + let chunk = Arc::clone(chunk); + let chunk_sender = chunk_sender.clone(); + slow_jobs.spawn("CHUNK_SERIALIZER", move || { + let msg = Client::prepare_terrain( + ServerGeneral::TerrainChunkUpdate { + key: chunk_key, + chunk: Ok(SerializedTerrainChunk::via_heuristic( + &chunk, + meta.lossy_compression, + )), + }, + &meta.params, + ); + if let Err(e) = chunk_sender.send(SerializedChunk { + lossy_compression: meta.lossy_compression, + msg, + recipients: meta.recipients, + }) { + tracing::warn!(?e, "cannot send serialized chunk to sender") + }; + }); } } } diff --git a/server/src/sys/mod.rs b/server/src/sys/mod.rs index 28db903e55..56ac301431 100644 --- a/server/src/sys/mod.rs +++ b/server/src/sys/mod.rs @@ -1,4 +1,5 @@ pub mod agent; +pub mod chunk_send; pub mod chunk_serialize; pub mod entity_sync; pub mod invite_timeout; @@ -34,7 +35,10 @@ pub fn add_server_systems(dispatch_builder: &mut DispatcherBuilder) { dispatch::(dispatch_builder, &[]); dispatch::(dispatch_builder, &[]); dispatch::(dispatch_builder, &[]); + // no dependency, as we only work once per sec anyway. dispatch::(dispatch_builder, &[]); + // don't depend on chunk_serialize, as we assume everything is done in a SlowJow + dispatch::(dispatch_builder, &[]); } pub fn run_sync_systems(ecs: &mut specs::World) { diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 43c76b436a..70c453c2ea 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -252,10 +252,10 @@ impl<'a> System<'a> for Sys { } // Send the chunk to all nearby players. - new_chunks.into_iter().for_each(|(key, chunk)| { + new_chunks.into_iter().for_each(|(key, _chunk)| { (&presences, &positions, &clients, &mut chunk_send_queues) .join() - .for_each(|(presence, pos, client, chunk_send_queue)| { + .for_each(|(presence, pos, _client, chunk_send_queue)| { let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32)); // Subtract 2 from the offset before computing squared magnitude // 1 since chunks need neighbors to be meshed From 8e47d02f8dcc9d71b9f32d339f3ddffac3e4e8bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Tue, 10 May 2022 14:38:43 +0200 Subject: [PATCH 3/6] add metrics and trigger slowjobs in chunks of 25 --- server/src/metrics.rs | 14 +++++++++ server/src/sys/chunk_send.rs | 8 +++-- server/src/sys/chunk_serialize.rs | 51 +++++++++++++++++++++++-------- 3 files changed, 59 insertions(+), 14 deletions(-) diff --git a/server/src/metrics.rs b/server/src/metrics.rs index 76c06d422c..9bacdbe880 100644 --- a/server/src/metrics.rs +++ b/server/src/metrics.rs @@ -36,6 +36,8 @@ pub struct NetworkRequestMetrics { pub chunks_generation_triggered: IntCounter, pub chunks_served_lossy: IntCounter, pub chunks_served_lossless: IntCounter, + pub chunks_serialisation_requests: IntCounter, + pub chunks_distinct_serialisation_requests: IntCounter, } pub struct ChunkGenMetrics { @@ -202,12 +204,22 @@ impl NetworkRequestMetrics { "chunks_served_lossless", "number of chunks that were sent with lossless compression requested", ))?; + let chunks_serialisation_requests = IntCounter::with_opts(Opts::new( + "chunks_serialisation_requests", + "number of requests for the sys chunk_serialisation", + ))?; + let chunks_distinct_serialisation_requests = IntCounter::with_opts(Opts::new( + "chunks_distinct_serialisation_requests", + "number of distinct chunks in requests for the sys chunk_serialisation", + ))?; registry.register(Box::new(chunks_request_dropped.clone()))?; registry.register(Box::new(chunks_served_from_memory.clone()))?; registry.register(Box::new(chunks_generation_triggered.clone()))?; registry.register(Box::new(chunks_served_lossy.clone()))?; registry.register(Box::new(chunks_served_lossless.clone()))?; + registry.register(Box::new(chunks_serialisation_requests.clone()))?; + registry.register(Box::new(chunks_distinct_serialisation_requests.clone()))?; Ok(Self { chunks_request_dropped, @@ -215,6 +227,8 @@ impl NetworkRequestMetrics { chunks_generation_triggered, chunks_served_lossy, chunks_served_lossless, + chunks_serialisation_requests, + chunks_distinct_serialisation_requests, }) } } diff --git a/server/src/sys/chunk_send.rs b/server/src/sys/chunk_send.rs index eb6d13390d..6aa3f3fb07 100644 --- a/server/src/sys/chunk_send.rs +++ b/server/src/sys/chunk_send.rs @@ -20,18 +20,22 @@ impl<'a> System<'a> for Sys { const PHASE: Phase = Phase::Create; fn run(_job: &mut Job, (clients, network_metrics, chunk_receiver): Self::SystemData) { + let mut lossy = 0u64; + let mut lossless = 0u64; for sc in chunk_receiver.try_iter() { for recipient in sc.recipients { if let Some(client) = clients.get(recipient) { if client.send_prepared(&sc.msg).is_err() { if sc.lossy_compression { - network_metrics.chunks_served_lossy.inc() + lossy += 1; } else { - network_metrics.chunks_served_lossless.inc() + lossless += 1; } } } } } + network_metrics.chunks_served_lossy.inc_by(lossy); + network_metrics.chunks_served_lossless.inc_by(lossless); } } diff --git a/server/src/sys/chunk_serialize.rs b/server/src/sys/chunk_serialize.rs index 33575d64df..6cb3bd35d1 100644 --- a/server/src/sys/chunk_serialize.rs +++ b/server/src/sys/chunk_serialize.rs @@ -1,6 +1,7 @@ use crate::{ chunk_serialize::{ChunkSendQueue, SerializedChunk}, client::Client, + metrics::NetworkRequestMetrics, presence::Presence, Tick, }; @@ -24,6 +25,7 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Client>, ReadStorage<'a, Presence>, WriteStorage<'a, ChunkSendQueue>, + ReadExpect<'a, NetworkRequestMetrics>, ReadExpect<'a, SlowJobPool>, ReadExpect<'a, TerrainGrid>, ReadExpect<'a, crossbeam_channel::Sender>, @@ -41,6 +43,7 @@ impl<'a> System<'a> for Sys { clients, presences, mut chunk_send_queues, + network_metrics, slow_jobs, terrain, chunk_sender, @@ -68,6 +71,8 @@ impl<'a> System<'a> for Sys { let mut chunks = HashMap::<_, Metadata>::new(); // Grab all chunk requests for all clients and sort them + let mut requests = 0u64; + let mut distinct_requests = 0u64; for (entity, client, presence, chunk_send_queue) in (&entities, &clients, &presences, &mut chunk_send_queues).join() { @@ -83,11 +88,15 @@ impl<'a> System<'a> for Sys { } }); chunk_send_queue.chunks.dedup(); + requests += chunk_send_queue.chunks.len() as u64; for chunk_key in chunk_send_queue.chunks { - let meta = chunks.entry(chunk_key).or_insert_with(|| Metadata { - recipients: Vec::default(), - lossy_compression: true, - params: client.terrain_params(), + let meta = chunks.entry(chunk_key).or_insert_with(|| { + distinct_requests += 1; + Metadata { + recipients: Vec::default(), + lossy_compression: true, + params: client.terrain_params(), + } }); meta.recipients.push(entity); // We decide here, to ONLY send lossy compressed data If all clients want those. @@ -97,13 +106,30 @@ impl<'a> System<'a> for Sys { meta.lossy_compression && presence.lossy_terrain_compression; } } + network_metrics + .chunks_serialisation_requests + .inc_by(requests); + network_metrics + .chunks_distinct_serialisation_requests + .inc_by(distinct_requests); // Trigger serialization in a SlowJob - for (chunk_key, meta) in chunks { - if let Some(chunk) = terrain.get_key_arc(chunk_key) { - let chunk = Arc::clone(chunk); - let chunk_sender = chunk_sender.clone(); - slow_jobs.spawn("CHUNK_SERIALIZER", move || { + const CHUNK_SIZE: usize = 25; // trigger one job per 25 chunks to reduce SlowJob overhead. as we use a channel, there is no disadvantage to this + let mut chunks_iter = chunks + .into_iter() + .filter_map(|(chunk_key, meta)| { + terrain + .get_key_arc(chunk_key) + .map(|chunk| (Arc::clone(chunk), chunk_key, meta)) + }) + .into_iter() + .peekable(); + + while chunks_iter.peek().is_some() { + let chunks: Vec<_> = chunks_iter.by_ref().take(CHUNK_SIZE).collect(); + let chunk_sender = chunk_sender.clone(); + slow_jobs.spawn("CHUNK_SERIALIZER", move || { + for (chunk, chunk_key, meta) in chunks { let msg = Client::prepare_terrain( ServerGeneral::TerrainChunkUpdate { key: chunk_key, @@ -119,10 +145,11 @@ impl<'a> System<'a> for Sys { msg, recipients: meta.recipients, }) { - tracing::warn!(?e, "cannot send serialized chunk to sender") + tracing::warn!(?e, "cannot send serialized chunk to sender"); + break; }; - }); - } + } + }); } } } From efe284a6730b61403e5ea5512dc9a345e3ffba6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Wed, 11 May 2022 15:44:48 +0200 Subject: [PATCH 4/6] make the client request a few more chunks to compensate for the delay in serialisation --- client/src/lib.rs | 9 ++++++++- server/src/sys/chunk_serialize.rs | 4 ++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/client/src/lib.rs b/client/src/lib.rs index 7a1c311dfa..51d47faa2d 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1662,6 +1662,7 @@ impl Client { self.state.remove_chunk(key); } + let mut current_tick_send_chunk_requests = 0; // Request chunks from the server. self.loaded_distance = ((view_distance * TerrainChunkSize::RECT_SIZE.x) as f32).powi(2); // +1 so we can find a chunk that's outside the vd for better fog @@ -1695,10 +1696,16 @@ impl Client { for key in keys.iter() { if self.state.terrain().get_key(*key).is_none() { if !skip_mode && !self.pending_chunks.contains_key(key) { - if self.pending_chunks.len() < 4 { + const TOTAL_PENDING_CHUNKS_LIMIT: usize = 12; + const CURRENT_TICK_PENDING_CHUNKS_LIMIT: usize = 2; + if self.pending_chunks.len() < TOTAL_PENDING_CHUNKS_LIMIT + && current_tick_send_chunk_requests + < CURRENT_TICK_PENDING_CHUNKS_LIMIT + { self.send_msg_err(ClientGeneral::TerrainChunkRequest { key: *key, })?; + current_tick_send_chunk_requests += 1; self.pending_chunks.insert(*key, Instant::now()); } else { skip_mode = true; diff --git a/server/src/sys/chunk_serialize.rs b/server/src/sys/chunk_serialize.rs index 6cb3bd35d1..724e757309 100644 --- a/server/src/sys/chunk_serialize.rs +++ b/server/src/sys/chunk_serialize.rs @@ -49,9 +49,9 @@ impl<'a> System<'a> for Sys { chunk_sender, ): Self::SystemData, ) { - // Only operate once per second + // Only operate twice per second //TODO: move out of this system and now even spawn this. - if tick.0.rem_euclid(30) != 0 { + if tick.0.rem_euclid(15) != 0 { return; } From 637e63fbe20daf1483e0f454e95281b485e27518 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Wed, 11 May 2022 17:52:22 +0200 Subject: [PATCH 5/6] switch from a Component to a Ressource with an Eventbus Set CHUNK_SITE to 10 which results in a mean of 13ms per Slowjob. Its good if it stays under 30ms so it has less influence on ticks. Some performance values measured with a AMD Ryzen 1700X: - voxygen and server and swarm (25 clients, 10 vd) on one machine. - total runtime was 240s - CHUNK_GENERATOR total time is 486s with a mean of 40ms - CHUNK_SERIALIZER total time is 18.19s with a mean of 13ms, so in total its a order of magnitude lower Trancy confirms this, the Serialize backlog is usually handled within 1-2 ticks. - terrain::sys total time 1.2s, mean 188us - msg::terrain::sys total time 812ms, mean 125us - terrain::sync total time 12ms, mean 1,85us - chunk_serialize::sys total time 69ms, mean 10us - chunk_send::sys total time 50ms, mean 7us so all in all total time for serializsation is 20.33 of which 89% are spend outside of the ECS --- server/src/chunk_serialize.rs | 28 ++--- server/src/lib.rs | 6 +- server/src/sys/chunk_serialize.rs | 91 +++++++--------- server/src/sys/msg/terrain.rs | 173 +++++++++++++++--------------- server/src/sys/terrain.rs | 43 ++++---- server/src/sys/terrain_sync.rs | 20 ++-- 6 files changed, 182 insertions(+), 179 deletions(-) diff --git a/server/src/chunk_serialize.rs b/server/src/chunk_serialize.rs index 2755ce15bd..9929f056fa 100644 --- a/server/src/chunk_serialize.rs +++ b/server/src/chunk_serialize.rs @@ -1,22 +1,22 @@ use crate::client::PreparedMsg; -use specs::{Component, Entity}; -use specs_idvs::IdvStorage; +use specs::Entity; use vek::Vec2; -/// Curing the runtime of a tick, multiple systems can request a chunk to be -/// synced to a client E.g. msg::terrain will do so, when a client requested a -/// chunk that already exist terrain will do so when a chunk came back from -/// ChunkGeneration. All those sends are deferred by this queue. +/// Sending a chunk to the user works the following way: +/// A system like `msg::terrain` `terrain` or `terrain_sync` either decide to +/// trigger chunk generation, or if the chunk already exists +/// push a `ChunkSendQueue` to the eventbus. +/// The `chunk_serialize` system will coordinate serializing via a SlowJob +/// outside of the tick. On the next tick, the `chunk_send` system will pick up +/// finished chunks. +/// /// Deferring allows us to remove code duplication and maybe serialize ONCE, -/// send to MULTIPLE clients TODO: store a urgent flag and seperate even more, 5 -/// ticks vs 5 seconds -#[derive(Default, Clone, Debug, PartialEq)] +/// send to MULTIPLE clients +/// TODO: store a urgent flag and seperate even more, 5 ticks vs 5 seconds +#[derive(Debug, PartialEq)] pub struct ChunkSendQueue { - pub chunks: Vec>, -} - -impl Component for ChunkSendQueue { - type Storage = IdvStorage; + pub(crate) entity: Entity, + pub(crate) chunk_key: Vec2, } pub struct SerializedChunk { diff --git a/server/src/lib.rs b/server/src/lib.rs index b2ed1de77f..852767f29a 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -247,6 +247,9 @@ impl Server { }); state.ecs_mut().insert(EventBus::::default()); state.ecs_mut().insert(Vec::::new()); + state + .ecs_mut() + .insert(EventBus::::default()); state.ecs_mut().insert(Locations::default()); state.ecs_mut().insert(LoginProvider::new( settings.auth_server_address.clone(), @@ -329,9 +332,6 @@ impl Server { state.ecs_mut().register::(); state.ecs_mut().register::(); state.ecs_mut().register::(); - state - .ecs_mut() - .register::(); //Alias validator let banned_words_paths = &settings.banned_words_files; diff --git a/server/src/sys/chunk_serialize.rs b/server/src/sys/chunk_serialize.rs index 724e757309..2325c31b3a 100644 --- a/server/src/sys/chunk_serialize.rs +++ b/server/src/sys/chunk_serialize.rs @@ -5,13 +5,13 @@ use crate::{ presence::Presence, Tick, }; -use common::{slowjob::SlowJobPool, terrain::TerrainGrid}; +use common::{event::EventBus, slowjob::SlowJobPool, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{SerializedTerrainChunk, ServerGeneral}; -use hashbrown::HashMap; +use hashbrown::{hash_map::Entry, HashMap}; use network::StreamParams; -use specs::{Entities, Entity, Join, Read, ReadExpect, ReadStorage, WriteStorage}; -use std::{cmp::Ordering, sync::Arc}; +use specs::{Entity, Read, ReadExpect, ReadStorage}; +use std::sync::Arc; /// This system will handle sending terrain to clients by /// collecting chunks that need to be send for a single generation run and then @@ -21,10 +21,9 @@ pub struct Sys; impl<'a> System<'a> for Sys { type SystemData = ( Read<'a, Tick>, - Entities<'a>, ReadStorage<'a, Client>, ReadStorage<'a, Presence>, - WriteStorage<'a, ChunkSendQueue>, + ReadExpect<'a, EventBus>, ReadExpect<'a, NetworkRequestMetrics>, ReadExpect<'a, SlowJobPool>, ReadExpect<'a, TerrainGrid>, @@ -39,10 +38,9 @@ impl<'a> System<'a> for Sys { _job: &mut Job, ( tick, - entities, clients, presences, - mut chunk_send_queues, + chunk_send_queues_bus, network_metrics, slow_jobs, terrain, @@ -55,57 +53,48 @@ impl<'a> System<'a> for Sys { return; } - for entity in (&entities, &clients, &presences, !&chunk_send_queues) - .join() - .map(|(e, _, _, _)| e) - .collect::>() - { - let _ = chunk_send_queues.insert(entity, ChunkSendQueue::default()); - } - struct Metadata { recipients: Vec, lossy_compression: bool, params: StreamParams, } + // collect all deduped entities that request a chunk let mut chunks = HashMap::<_, Metadata>::new(); - // Grab all chunk requests for all clients and sort them let mut requests = 0u64; let mut distinct_requests = 0u64; - for (entity, client, presence, chunk_send_queue) in - (&entities, &clients, &presences, &mut chunk_send_queues).join() - { - let mut chunk_send_queue = std::mem::take(chunk_send_queue); - // dedup input - chunk_send_queue.chunks.sort_by(|a, b| { - let zero = a.x.partial_cmp(&b.x).unwrap_or(Ordering::Equal); - let one = a.y.partial_cmp(&b.y).unwrap_or(Ordering::Equal); - if matches!(zero, Ordering::Equal) { - one - } else { - zero - } - }); - chunk_send_queue.chunks.dedup(); - requests += chunk_send_queue.chunks.len() as u64; - for chunk_key in chunk_send_queue.chunks { - let meta = chunks.entry(chunk_key).or_insert_with(|| { - distinct_requests += 1; - Metadata { - recipients: Vec::default(), - lossy_compression: true, - params: client.terrain_params(), + + for queue_entry in chunk_send_queues_bus.recv_all() { + let entry = chunks.entry(queue_entry.chunk_key); + let meta = match entry { + Entry::Vacant(ve) => { + match clients.get(queue_entry.entity).map(|c| c.terrain_params()) { + Some(params) => { + distinct_requests += 1; + ve.insert(Metadata { + recipients: Vec::new(), + lossy_compression: true, + params, + }) + }, + None => continue, } - }); - meta.recipients.push(entity); - // We decide here, to ONLY send lossy compressed data If all clients want those. - // If at least 1 client here does not want lossy we don't compress it twice. - // It would just be too expensive for the server - meta.lossy_compression = - meta.lossy_compression && presence.lossy_terrain_compression; - } + }, + Entry::Occupied(oe) => oe.into_mut(), + }; + + // We decide here, to ONLY send lossy compressed data If all clients want those. + // If at least 1 client here does not want lossy we don't compress it twice. + // It would just be too expensive for the server + meta.lossy_compression = meta.lossy_compression + && presences + .get(queue_entry.entity) + .map(|p| p.lossy_terrain_compression) + .unwrap_or(true); + meta.recipients.push(queue_entry.entity); + requests += 1; } + network_metrics .chunks_serialisation_requests .inc_by(requests); @@ -114,7 +103,7 @@ impl<'a> System<'a> for Sys { .inc_by(distinct_requests); // Trigger serialization in a SlowJob - const CHUNK_SIZE: usize = 25; // trigger one job per 25 chunks to reduce SlowJob overhead. as we use a channel, there is no disadvantage to this + const CHUNK_SIZE: usize = 10; // trigger one job per 10 chunks to reduce SlowJob overhead. as we use a channel, there is no disadvantage to this let mut chunks_iter = chunks .into_iter() .filter_map(|(chunk_key, meta)| { @@ -129,7 +118,7 @@ impl<'a> System<'a> for Sys { let chunks: Vec<_> = chunks_iter.by_ref().take(CHUNK_SIZE).collect(); let chunk_sender = chunk_sender.clone(); slow_jobs.spawn("CHUNK_SERIALIZER", move || { - for (chunk, chunk_key, meta) in chunks { + for (chunk, chunk_key, mut meta) in chunks { let msg = Client::prepare_terrain( ServerGeneral::TerrainChunkUpdate { key: chunk_key, @@ -140,6 +129,8 @@ impl<'a> System<'a> for Sys { }, &meta.params, ); + meta.recipients.sort(); + meta.recipients.dedup(); if let Err(e) = chunk_sender.send(SerializedChunk { lossy_compression: meta.lossy_compression, msg, diff --git a/server/src/sys/msg/terrain.rs b/server/src/sys/msg/terrain.rs index 6b492394ab..3fef026079 100644 --- a/server/src/sys/msg/terrain.rs +++ b/server/src/sys/msg/terrain.rs @@ -12,7 +12,7 @@ use common::{ use common_ecs::{Job, Origin, ParMode, Phase, System}; use common_net::msg::{ClientGeneral, ServerGeneral}; use rayon::iter::ParallelIterator; -use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write, WriteStorage}; +use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write}; use tracing::{debug, trace}; /// This system will handle new messages from clients @@ -22,7 +22,7 @@ impl<'a> System<'a> for Sys { type SystemData = ( Entities<'a>, Read<'a, EventBus>, - WriteStorage<'a, ChunkSendQueue>, + ReadExpect<'a, EventBus>, ReadExpect<'a, TerrainGrid>, ReadExpect<'a, Lod>, ReadExpect<'a, NetworkRequestMetrics>, @@ -41,7 +41,7 @@ impl<'a> System<'a> for Sys { ( entities, server_event_bus, - mut chunk_send_queues, + chunk_send_bus, terrain, lod, network_metrics, @@ -52,95 +52,96 @@ impl<'a> System<'a> for Sys { ): Self::SystemData, ) { job.cpu_stats.measure(ParMode::Rayon); - let mut new_chunk_requests = ( - &entities, - &clients, - (&presences).maybe(), - &mut chunk_send_queues, - ) + let mut new_chunk_requests = (&entities, &clients, (&presences).maybe()) .par_join() - .map(|(entity, client, maybe_presence, chunk_send_queue)| { - let mut chunk_requests = Vec::new(); - let _ = super::try_recv_all(client, 5, |_, msg| { - // TODO: Refactor things (https://gitlab.com/veloren/veloren/-/merge_requests/3245#note_856538056) - let mut server_emitter = server_event_bus.emitter(); - let presence = match maybe_presence { - Some(g) => g, - None => { - debug!(?entity, "client is not in_game, ignoring msg"); - trace!(?msg, "ignored msg content"); - if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) { - network_metrics.chunks_request_dropped.inc(); - } - return Ok(()); - }, - }; - match msg { - ClientGeneral::TerrainChunkRequest { key } => { - let in_vd = if let Some(pos) = positions.get(entity) { - pos.0.xy().map(|e| e as f64).distance_squared( - key.map(|e| e as f64 + 0.5) - * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), - ) < ((presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt()) - * TerrainChunkSize::RECT_SIZE.x as f64) - .powi(2) - } else { - true - }; - if in_vd { - if terrain.get_key_arc(key).is_some() { - network_metrics.chunks_served_from_memory.inc(); - chunk_send_queue.chunks.push(key); - } else { - network_metrics.chunks_generation_triggered.inc(); - chunk_requests.push(ChunkRequest { entity, key }); + .map_init( + || (chunk_send_bus.emitter(), server_event_bus.emitter()), + |(chunk_send_emitter, server_emitter), (entity, client, maybe_presence)| { + let mut chunk_requests = Vec::new(); + let _ = super::try_recv_all(client, 5, |_, msg| { + let presence = match maybe_presence { + Some(g) => g, + None => { + debug!(?entity, "client is not in_game, ignoring msg"); + trace!(?msg, "ignored msg content"); + if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) { + network_metrics.chunks_request_dropped.inc(); } - } else { - network_metrics.chunks_request_dropped.inc(); - } - }, - ClientGeneral::LodZoneRequest { key } => { - client.send(ServerGeneral::LodZoneUpdate { - key, - zone: lod.zone(key).clone(), - })?; - }, - _ => { - debug!( - "Kicking possibly misbehaving client due to invalud terrain \ - request" - ); - server_emitter.emit(ServerEvent::ClientDisconnect( - entity, - common::comp::DisconnectReason::NetworkError, - )); - }, - } - Ok(()) - }); + return Ok(()); + }, + }; + match msg { + ClientGeneral::TerrainChunkRequest { key } => { + let in_vd = if let Some(pos) = positions.get(entity) { + pos.0.xy().map(|e| e as f64).distance_squared( + key.map(|e| e as f64 + 0.5) + * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), + ) < ((presence.view_distance as f64 - 1.0 + + 2.5 * 2.0_f64.sqrt()) + * TerrainChunkSize::RECT_SIZE.x as f64) + .powi(2) + } else { + true + }; + if in_vd { + if terrain.get_key_arc(key).is_some() { + network_metrics.chunks_served_from_memory.inc(); + chunk_send_emitter.emit(ChunkSendQueue { + chunk_key: key, + entity, + }); + } else { + network_metrics.chunks_generation_triggered.inc(); + chunk_requests.push(ChunkRequest { entity, key }); + } + } else { + network_metrics.chunks_request_dropped.inc(); + } + }, + ClientGeneral::LodZoneRequest { key } => { + client.send(ServerGeneral::LodZoneUpdate { + key, + zone: lod.zone(key).clone(), + })?; + }, + _ => { + debug!( + "Kicking possibly misbehaving client due to invalud terrain \ + request" + ); + server_emitter.emit(ServerEvent::ClientDisconnect( + entity, + common::comp::DisconnectReason::NetworkError, + )); + }, + } + Ok(()) + }); - // Load a minimum radius of chunks around each player. - // This is used to prevent view distance reloading exploits and make sure that - // entity simulation occurs within a minimum radius around the - // player. - if let Some(pos) = positions.get(entity) { - let player_chunk = pos - .0 - .xy() - .map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32); - for rpos in Spiral2d::new().take((crate::MIN_VD as usize + 1).pow(2)) { - let key = player_chunk + rpos; - if terrain.get_key(key).is_none() { - // TODO: @zesterer do we want to be sending these chunk to the client - // even if they aren't requested? If we don't we could replace the - // entity here with Option and pass in None. - chunk_requests.push(ChunkRequest { entity, key }); + // Load a minimum radius of chunks around each player. + // This is used to prevent view distance reloading exploits and make sure that + // entity simulation occurs within a minimum radius around the + // player. + if let Some(pos) = positions.get(entity) { + let player_chunk = pos + .0 + .xy() + .map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32); + for rpos in Spiral2d::new().take((crate::MIN_VD as usize + 1).pow(2)) { + let key = player_chunk + rpos; + if terrain.get_key(key).is_none() { + // TODO: @zesterer do we want to be sending these chunk to the + // client even if they aren't + // requested? If we don't we could replace the + // entity here with Option and pass in None. + chunk_requests.push(ChunkRequest { entity, key }); + } } } - } - chunk_requests - }) + chunk_requests + }, + ) .flatten() .collect::>(); diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 70c453c2ea..c53cc4159c 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -63,6 +63,7 @@ impl<'a> System<'a> for Sys { ReadExpect<'a, SlowJobPool>, ReadExpect<'a, IndexOwned>, ReadExpect<'a, Arc>, + ReadExpect<'a, EventBus>, WriteExpect<'a, ChunkGenerator>, WriteExpect<'a, TerrainGrid>, Write<'a, TerrainChanges>, @@ -73,7 +74,6 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Presence>, ReadStorage<'a, Client>, Entities<'a>, - WriteStorage<'a, ChunkSendQueue>, WriteStorage<'a, RepositionOnChunkLoad>, WriteStorage<'a, ForceUpdate>, WriteStorage<'a, Waypoint>, @@ -96,6 +96,7 @@ impl<'a> System<'a> for Sys { slow_jobs, index, world, + chunk_send_bus, mut chunk_generator, mut terrain, mut terrain_changes, @@ -106,7 +107,6 @@ impl<'a> System<'a> for Sys { presences, clients, entities, - mut chunk_send_queues, mut reposition_on_load, mut force_update, mut waypoints, @@ -252,23 +252,30 @@ impl<'a> System<'a> for Sys { } // Send the chunk to all nearby players. - new_chunks.into_iter().for_each(|(key, _chunk)| { - (&presences, &positions, &clients, &mut chunk_send_queues) - .join() - .for_each(|(presence, pos, _client, chunk_send_queue)| { - let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32)); - // Subtract 2 from the offset before computing squared magnitude - // 1 since chunks need neighbors to be meshed - // 1 to act as a buffer if the player moves in that direction - let adjusted_dist_sqr = (chunk_pos - key) - .map(|e: i32| (e.unsigned_abs()).saturating_sub(2)) - .magnitude_squared(); + use rayon::iter::{IntoParallelIterator, ParallelIterator}; + new_chunks.into_par_iter().for_each_init( + || chunk_send_bus.emitter(), + |chunk_send_emitter, (key, _chunk)| { + (&entities, &presences, &positions, &clients) + .join() + .for_each(|(entity, presence, pos, _client)| { + let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32)); + // Subtract 2 from the offset before computing squared magnitude + // 1 since chunks need neighbors to be meshed + // 1 to act as a buffer if the player moves in that direction + let adjusted_dist_sqr = (chunk_pos - key) + .map(|e: i32| (e.unsigned_abs()).saturating_sub(2)) + .magnitude_squared(); - if adjusted_dist_sqr <= presence.view_distance.pow(2) { - chunk_send_queue.chunks.push(key); - } - }); - }); + if adjusted_dist_sqr <= presence.view_distance.pow(2) { + chunk_send_emitter.emit(ChunkSendQueue { + entity, + chunk_key: key, + }); + } + }); + }, + ); // Remove chunks that are too far from players. let mut chunks_to_remove = Vec::new(); diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index 119cffd048..ad1bd95659 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -1,9 +1,9 @@ use crate::{chunk_serialize::ChunkSendQueue, client::Client, presence::Presence}; -use common::{comp::Pos, terrain::TerrainGrid}; +use common::{comp::Pos, event::EventBus, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{CompressedData, ServerGeneral}; use common_state::TerrainChanges; -use specs::{Join, Read, ReadExpect, ReadStorage, WriteStorage}; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage}; /// This systems sends new chunks to clients as well as changes to existing /// chunks @@ -11,9 +11,10 @@ use specs::{Join, Read, ReadExpect, ReadStorage, WriteStorage}; pub struct Sys; impl<'a> System<'a> for Sys { type SystemData = ( + Entities<'a>, ReadExpect<'a, TerrainGrid>, Read<'a, TerrainChanges>, - WriteStorage<'a, ChunkSendQueue>, + ReadExpect<'a, EventBus>, ReadStorage<'a, Pos>, ReadStorage<'a, Presence>, ReadStorage<'a, Client>, @@ -25,16 +26,19 @@ impl<'a> System<'a> for Sys { fn run( _job: &mut Job, - (terrain, terrain_changes, mut chunk_send_queues, positions, presences, clients): Self::SystemData, + (entities, terrain, terrain_changes, chunk_send_bus, positions, presences, clients): Self::SystemData, ) { + let mut chunk_send_emitter = chunk_send_bus.emitter(); + // Sync changed chunks for chunk_key in &terrain_changes.modified_chunks { - for (presence, pos, chunk_send_queue) in - (&presences, &positions, &mut chunk_send_queues).join() - { + for (entity, presence, pos) in (&entities, &presences, &positions).join() { if super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance) { - chunk_send_queue.chunks.push(*chunk_key); + chunk_send_emitter.emit(ChunkSendQueue { + entity, + chunk_key: *chunk_key, + }); } } } From 1b768743426b1466db579bd5355687e679285caa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Wed, 11 May 2022 21:50:14 +0200 Subject: [PATCH 6/6] renamings and using unstable sort according to a code review --- server/src/chunk_serialize.rs | 2 +- server/src/client.rs | 4 +++- server/src/lib.rs | 2 +- server/src/sys/chunk_serialize.rs | 8 ++++---- server/src/sys/msg/terrain.rs | 6 +++--- server/src/sys/terrain.rs | 6 +++--- server/src/sys/terrain_sync.rs | 6 +++--- 7 files changed, 18 insertions(+), 16 deletions(-) diff --git a/server/src/chunk_serialize.rs b/server/src/chunk_serialize.rs index 9929f056fa..4986bd0f49 100644 --- a/server/src/chunk_serialize.rs +++ b/server/src/chunk_serialize.rs @@ -14,7 +14,7 @@ use vek::Vec2; /// send to MULTIPLE clients /// TODO: store a urgent flag and seperate even more, 5 ticks vs 5 seconds #[derive(Debug, PartialEq)] -pub struct ChunkSendQueue { +pub struct ChunkSendEntry { pub(crate) entity: Entity, pub(crate) chunk_key: Vec2, } diff --git a/server/src/client.rs b/server/src/client.rs index 3187fd0cc4..202790ad08 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -218,7 +218,9 @@ impl Client { pub(crate) fn terrain_params(&self) -> StreamParams { self.terrain_stream_params.clone() } - pub(crate) fn prepare_terrain( + /// Only used for Serialize Chunks in a SlowJob. + /// TODO: find a more elegant version for this invariant + pub(crate) fn prepare_chunk_update_msg( terrain_chunk_update: ServerGeneral, params: &StreamParams, ) -> PreparedMsg { diff --git a/server/src/lib.rs b/server/src/lib.rs index 852767f29a..aa702a788c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -249,7 +249,7 @@ impl Server { state.ecs_mut().insert(Vec::::new()); state .ecs_mut() - .insert(EventBus::::default()); + .insert(EventBus::::default()); state.ecs_mut().insert(Locations::default()); state.ecs_mut().insert(LoginProvider::new( settings.auth_server_address.clone(), diff --git a/server/src/sys/chunk_serialize.rs b/server/src/sys/chunk_serialize.rs index 2325c31b3a..aa97fcb5b2 100644 --- a/server/src/sys/chunk_serialize.rs +++ b/server/src/sys/chunk_serialize.rs @@ -1,5 +1,5 @@ use crate::{ - chunk_serialize::{ChunkSendQueue, SerializedChunk}, + chunk_serialize::{ChunkSendEntry, SerializedChunk}, client::Client, metrics::NetworkRequestMetrics, presence::Presence, @@ -23,7 +23,7 @@ impl<'a> System<'a> for Sys { Read<'a, Tick>, ReadStorage<'a, Client>, ReadStorage<'a, Presence>, - ReadExpect<'a, EventBus>, + ReadExpect<'a, EventBus>, ReadExpect<'a, NetworkRequestMetrics>, ReadExpect<'a, SlowJobPool>, ReadExpect<'a, TerrainGrid>, @@ -119,7 +119,7 @@ impl<'a> System<'a> for Sys { let chunk_sender = chunk_sender.clone(); slow_jobs.spawn("CHUNK_SERIALIZER", move || { for (chunk, chunk_key, mut meta) in chunks { - let msg = Client::prepare_terrain( + let msg = Client::prepare_chunk_update_msg( ServerGeneral::TerrainChunkUpdate { key: chunk_key, chunk: Ok(SerializedTerrainChunk::via_heuristic( @@ -129,7 +129,7 @@ impl<'a> System<'a> for Sys { }, &meta.params, ); - meta.recipients.sort(); + meta.recipients.sort_unstable(); meta.recipients.dedup(); if let Err(e) = chunk_sender.send(SerializedChunk { lossy_compression: meta.lossy_compression, diff --git a/server/src/sys/msg/terrain.rs b/server/src/sys/msg/terrain.rs index 3fef026079..af91135db6 100644 --- a/server/src/sys/msg/terrain.rs +++ b/server/src/sys/msg/terrain.rs @@ -1,5 +1,5 @@ use crate::{ - chunk_serialize::ChunkSendQueue, client::Client, lod::Lod, metrics::NetworkRequestMetrics, + chunk_serialize::ChunkSendEntry, client::Client, lod::Lod, metrics::NetworkRequestMetrics, presence::Presence, ChunkRequest, }; use common::{ @@ -22,7 +22,7 @@ impl<'a> System<'a> for Sys { type SystemData = ( Entities<'a>, Read<'a, EventBus>, - ReadExpect<'a, EventBus>, + ReadExpect<'a, EventBus>, ReadExpect<'a, TerrainGrid>, ReadExpect<'a, Lod>, ReadExpect<'a, NetworkRequestMetrics>, @@ -86,7 +86,7 @@ impl<'a> System<'a> for Sys { if in_vd { if terrain.get_key_arc(key).is_some() { network_metrics.chunks_served_from_memory.inc(); - chunk_send_emitter.emit(ChunkSendQueue { + chunk_send_emitter.emit(ChunkSendEntry { chunk_key: key, entity, }); diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index c53cc4159c..967c20c2e5 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -7,7 +7,7 @@ use world::{IndexOwned, World}; use crate::{ chunk_generator::ChunkGenerator, - chunk_serialize::ChunkSendQueue, + chunk_serialize::ChunkSendEntry, client::Client, presence::{Presence, RepositionOnChunkLoad}, rtsim::RtSim, @@ -63,7 +63,7 @@ impl<'a> System<'a> for Sys { ReadExpect<'a, SlowJobPool>, ReadExpect<'a, IndexOwned>, ReadExpect<'a, Arc>, - ReadExpect<'a, EventBus>, + ReadExpect<'a, EventBus>, WriteExpect<'a, ChunkGenerator>, WriteExpect<'a, TerrainGrid>, Write<'a, TerrainChanges>, @@ -268,7 +268,7 @@ impl<'a> System<'a> for Sys { .magnitude_squared(); if adjusted_dist_sqr <= presence.view_distance.pow(2) { - chunk_send_emitter.emit(ChunkSendQueue { + chunk_send_emitter.emit(ChunkSendEntry { entity, chunk_key: key, }); diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index ad1bd95659..a12302b393 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -1,4 +1,4 @@ -use crate::{chunk_serialize::ChunkSendQueue, client::Client, presence::Presence}; +use crate::{chunk_serialize::ChunkSendEntry, client::Client, presence::Presence}; use common::{comp::Pos, event::EventBus, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{CompressedData, ServerGeneral}; @@ -14,7 +14,7 @@ impl<'a> System<'a> for Sys { Entities<'a>, ReadExpect<'a, TerrainGrid>, Read<'a, TerrainChanges>, - ReadExpect<'a, EventBus>, + ReadExpect<'a, EventBus>, ReadStorage<'a, Pos>, ReadStorage<'a, Presence>, ReadStorage<'a, Client>, @@ -35,7 +35,7 @@ impl<'a> System<'a> for Sys { for (entity, presence, pos) in (&entities, &presences, &positions).join() { if super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance) { - chunk_send_emitter.emit(ChunkSendQueue { + chunk_send_emitter.emit(ChunkSendEntry { entity, chunk_key: *chunk_key, });