From 6c756c2440eface87d70fb9209f4dc3f98fb5ddb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Mon, 9 May 2022 01:11:46 +0200 Subject: [PATCH] first implementation of defering serialisation --- server/src/chunk_serialize.rs | 19 ++++ server/src/lib.rs | 4 + server/src/sys/chunk_serialize.rs | 161 ++++++++++++++++++++++++++++++ server/src/sys/mod.rs | 2 + server/src/sys/msg/terrain.rs | 46 ++++----- server/src/sys/terrain.rs | 81 ++------------- server/src/sys/terrain_sync.rs | 26 ++--- 7 files changed, 223 insertions(+), 116 deletions(-) create mode 100644 server/src/chunk_serialize.rs create mode 100644 server/src/sys/chunk_serialize.rs diff --git a/server/src/chunk_serialize.rs b/server/src/chunk_serialize.rs new file mode 100644 index 0000000000..02473e9380 --- /dev/null +++ b/server/src/chunk_serialize.rs @@ -0,0 +1,19 @@ +use specs::Component; +use specs_idvs::IdvStorage; +use vek::Vec2; + +/// Curing the runtime of a tick, multiple systems can request a chunk to be +/// synced to a client E.g. msg::terrain will do so, when a client requested a +/// chunk that already exist terrain will do so when a chunk came back from +/// ChunkGeneration. All those sends are deferred by this queue. +/// Deferring allows us to remove code duplication and maybe serialize ONCE, +/// send to MULTIPLE clients TODO: store a urgent flag and seperate even more, 5 +/// ticks vs 5 seconds +#[derive(Default, Clone, Debug, PartialEq)] +pub struct ChunkSendQueue { + pub chunks: Vec>, +} + +impl Component for ChunkSendQueue { + type Storage = IdvStorage; +} diff --git a/server/src/lib.rs b/server/src/lib.rs index 7fb2073d81..576eb478d9 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -16,6 +16,7 @@ pub mod alias_validator; mod character_creator; pub mod chunk_generator; +mod chunk_serialize; pub mod client; pub mod cmd; pub mod connection_handler; @@ -321,6 +322,9 @@ impl Server { state.ecs_mut().register::(); state.ecs_mut().register::(); state.ecs_mut().register::(); + state + .ecs_mut() + .register::(); //Alias validator let banned_words_paths = &settings.banned_words_files; diff --git a/server/src/sys/chunk_serialize.rs b/server/src/sys/chunk_serialize.rs new file mode 100644 index 0000000000..e10d1d216d --- /dev/null +++ b/server/src/sys/chunk_serialize.rs @@ -0,0 +1,161 @@ +use crate::{ + chunk_serialize::ChunkSendQueue, client::Client, metrics::NetworkRequestMetrics, + presence::Presence, Tick, +}; +use common::{slowjob::SlowJobPool, terrain::TerrainGrid}; + +use common_ecs::{Job, Origin, Phase, System}; +use common_net::msg::{SerializedTerrainChunk, ServerGeneral}; +use hashbrown::HashMap; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage, WriteStorage}; +use std::cmp::Ordering; + +pub(crate) struct LazyTerrainMessage { + lazy_msg_lo: Option, + lazy_msg_hi: Option, +} + +pub const SAFE_ZONE_RADIUS: f32 = 200.0; + +impl LazyTerrainMessage { + pub(crate) fn new() -> Self { + Self { + lazy_msg_lo: None, + lazy_msg_hi: None, + } + } + + pub(crate) fn prepare_and_send< + 'a, + A, + F: FnOnce() -> Result<&'a common::terrain::TerrainChunk, A>, + >( + &mut self, + network_metrics: &NetworkRequestMetrics, + client: &Client, + presence: &Presence, + chunk_key: &vek::Vec2, + generate_chunk: F, + ) -> Result<(), A> { + let lazy_msg = if presence.lossy_terrain_compression { + &mut self.lazy_msg_lo + } else { + &mut self.lazy_msg_hi + }; + if lazy_msg.is_none() { + *lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { + key: *chunk_key, + chunk: Ok(match generate_chunk() { + Ok(chunk) => SerializedTerrainChunk::via_heuristic( + chunk, + presence.lossy_terrain_compression, + ), + Err(e) => return Err(e), + }), + })); + } + lazy_msg.as_ref().map(|msg| { + let _ = client.send_prepared(msg); + if presence.lossy_terrain_compression { + network_metrics.chunks_served_lossy.inc(); + } else { + network_metrics.chunks_served_lossless.inc(); + } + }); + Ok(()) + } +} + +/// This system will handle sending terrain to clients by +/// collecting chunks that need to be send for a single generation run and then +/// trigger a SlowJob for serialisation. +#[derive(Default)] +pub struct Sys; +impl<'a> System<'a> for Sys { + type SystemData = ( + Read<'a, Tick>, + Entities<'a>, + ReadStorage<'a, Client>, + ReadStorage<'a, Presence>, + WriteStorage<'a, ChunkSendQueue>, + ReadExpect<'a, SlowJobPool>, + ReadExpect<'a, TerrainGrid>, + ReadExpect<'a, NetworkRequestMetrics>, + ); + + const NAME: &'static str = "chunk_serialize"; + const ORIGIN: Origin = Origin::Server; + const PHASE: Phase = Phase::Create; + + fn run( + _job: &mut Job, + ( + tick, + entities, + clients, + presences, + mut chunk_send_queues, + slow_jobs, + terrain, + network_metrics, + ): Self::SystemData, + ) { + // Only operate once per second + if tick.0.rem_euclid(60) != 0 { + return; + } + + let mut chunks = HashMap::<_, Vec<_>>::new(); + + for entity in (&entities, &clients, &presences, !&chunk_send_queues) + .join() + .map(|(e, _, _, _)| e) + .collect::>() + { + let _ = chunk_send_queues.insert(entity, ChunkSendQueue::default()); + } + + // Grab all chunk requests for all clients and sort them + for (entity, _client, chunk_send_queue) in + (&entities, &clients, &mut chunk_send_queues).join() + { + let mut chunk_send_queue = std::mem::take(chunk_send_queue); + // dedup input + chunk_send_queue.chunks.sort_by(|a, b| { + let zero = a.x.partial_cmp(&b.x).unwrap_or(Ordering::Equal); + let one = a.y.partial_cmp(&b.y).unwrap_or(Ordering::Equal); + if matches!(zero, Ordering::Equal) { + one + } else { + zero + } + }); + chunk_send_queue.chunks.dedup(); + for chunk_key in chunk_send_queue.chunks { + let recipients = chunks.entry(chunk_key).or_default(); + recipients.push(entity); + } + } + + if !chunks.is_empty() { + let len = chunks.len(); + print!("{}", len); + for (chunk_key, entities) in chunks { + let mut lazy_msg = LazyTerrainMessage::new(); + for entity in entities { + let client = clients.get(entity).unwrap(); + let presence = presences.get(entity).unwrap(); + if let Err(e) = lazy_msg.prepare_and_send( + &network_metrics, + client, + presence, + &chunk_key, + || terrain.get_key(chunk_key).ok_or(()), + ) { + tracing::error!(?e, "error sending chunk"); + } + } + } + } + } +} diff --git a/server/src/sys/mod.rs b/server/src/sys/mod.rs index d0f76023c6..28db903e55 100644 --- a/server/src/sys/mod.rs +++ b/server/src/sys/mod.rs @@ -1,4 +1,5 @@ pub mod agent; +pub mod chunk_serialize; pub mod entity_sync; pub mod invite_timeout; pub mod metrics; @@ -33,6 +34,7 @@ pub fn add_server_systems(dispatch_builder: &mut DispatcherBuilder) { dispatch::(dispatch_builder, &[]); dispatch::(dispatch_builder, &[]); dispatch::(dispatch_builder, &[]); + dispatch::(dispatch_builder, &[]); } pub fn run_sync_systems(ecs: &mut specs::World) { diff --git a/server/src/sys/msg/terrain.rs b/server/src/sys/msg/terrain.rs index ab1ffec448..6b492394ab 100644 --- a/server/src/sys/msg/terrain.rs +++ b/server/src/sys/msg/terrain.rs @@ -1,5 +1,6 @@ use crate::{ - client::Client, lod::Lod, metrics::NetworkRequestMetrics, presence::Presence, ChunkRequest, + chunk_serialize::ChunkSendQueue, client::Client, lod::Lod, metrics::NetworkRequestMetrics, + presence::Presence, ChunkRequest, }; use common::{ comp::Pos, @@ -9,9 +10,9 @@ use common::{ vol::RectVolSize, }; use common_ecs::{Job, Origin, ParMode, Phase, System}; -use common_net::msg::{ClientGeneral, SerializedTerrainChunk, ServerGeneral}; +use common_net::msg::{ClientGeneral, ServerGeneral}; use rayon::iter::ParallelIterator; -use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write}; +use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write, WriteStorage}; use tracing::{debug, trace}; /// This system will handle new messages from clients @@ -21,6 +22,7 @@ impl<'a> System<'a> for Sys { type SystemData = ( Entities<'a>, Read<'a, EventBus>, + WriteStorage<'a, ChunkSendQueue>, ReadExpect<'a, TerrainGrid>, ReadExpect<'a, Lod>, ReadExpect<'a, NetworkRequestMetrics>, @@ -39,6 +41,7 @@ impl<'a> System<'a> for Sys { ( entities, server_event_bus, + mut chunk_send_queues, terrain, lod, network_metrics, @@ -49,11 +52,16 @@ impl<'a> System<'a> for Sys { ): Self::SystemData, ) { job.cpu_stats.measure(ParMode::Rayon); - let mut new_chunk_requests = (&entities, &clients, (&presences).maybe()) + let mut new_chunk_requests = ( + &entities, + &clients, + (&presences).maybe(), + &mut chunk_send_queues, + ) .par_join() - .map(|(entity, client, maybe_presence)| { + .map(|(entity, client, maybe_presence, chunk_send_queue)| { let mut chunk_requests = Vec::new(); - let _ = super::try_recv_all(client, 5, |client, msg| { + let _ = super::try_recv_all(client, 5, |_, msg| { // TODO: Refactor things (https://gitlab.com/veloren/veloren/-/merge_requests/3245#note_856538056) let mut server_emitter = server_event_bus.emitter(); let presence = match maybe_presence { @@ -80,26 +88,12 @@ impl<'a> System<'a> for Sys { true }; if in_vd { - match terrain.get_key_arc(key) { - Some(chunk) => { - network_metrics.chunks_served_from_memory.inc(); - client.send(ServerGeneral::TerrainChunkUpdate { - key, - chunk: Ok(SerializedTerrainChunk::via_heuristic( - chunk, - presence.lossy_terrain_compression, - )), - })?; - if presence.lossy_terrain_compression { - network_metrics.chunks_served_lossy.inc(); - } else { - network_metrics.chunks_served_lossless.inc(); - } - }, - None => { - network_metrics.chunks_generation_triggered.inc(); - chunk_requests.push(ChunkRequest { entity, key }); - }, + if terrain.get_key_arc(key).is_some() { + network_metrics.chunks_served_from_memory.inc(); + chunk_send_queue.chunks.push(key); + } else { + network_metrics.chunks_generation_triggered.inc(); + chunk_requests.push(ChunkRequest { entity, key }); } } else { network_metrics.chunks_request_dropped.inc(); diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 543830d242..43c76b436a 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -7,8 +7,8 @@ use world::{IndexOwned, World}; use crate::{ chunk_generator::ChunkGenerator, + chunk_serialize::ChunkSendQueue, client::Client, - metrics::NetworkRequestMetrics, presence::{Presence, RepositionOnChunkLoad}, rtsim::RtSim, settings::Settings, @@ -29,7 +29,7 @@ use common::{ }; use common_ecs::{Job, Origin, Phase, System}; -use common_net::msg::{SerializedTerrainChunk, ServerGeneral}; +use common_net::msg::ServerGeneral; use common_state::TerrainChanges; use comp::Behavior; use specs::{Entities, Join, Read, ReadExpect, ReadStorage, Write, WriteExpect, WriteStorage}; @@ -41,62 +41,8 @@ pub type TerrainPersistenceData<'a> = Option>; #[cfg(not(feature = "persistent_world"))] pub type TerrainPersistenceData<'a> = (); -pub(crate) struct LazyTerrainMessage { - lazy_msg_lo: Option, - lazy_msg_hi: Option, -} - pub const SAFE_ZONE_RADIUS: f32 = 200.0; -impl LazyTerrainMessage { - pub(crate) fn new() -> Self { - Self { - lazy_msg_lo: None, - lazy_msg_hi: None, - } - } - - pub(crate) fn prepare_and_send< - 'a, - A, - F: FnOnce() -> Result<&'a common::terrain::TerrainChunk, A>, - >( - &mut self, - network_metrics: &NetworkRequestMetrics, - client: &Client, - presence: &Presence, - chunk_key: &vek::Vec2, - generate_chunk: F, - ) -> Result<(), A> { - let lazy_msg = if presence.lossy_terrain_compression { - &mut self.lazy_msg_lo - } else { - &mut self.lazy_msg_hi - }; - if lazy_msg.is_none() { - *lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { - key: *chunk_key, - chunk: Ok(match generate_chunk() { - Ok(chunk) => SerializedTerrainChunk::via_heuristic( - chunk, - presence.lossy_terrain_compression, - ), - Err(e) => return Err(e), - }), - })); - } - lazy_msg.as_ref().map(|msg| { - let _ = client.send_prepared(msg); - if presence.lossy_terrain_compression { - network_metrics.chunks_served_lossy.inc(); - } else { - network_metrics.chunks_served_lossless.inc(); - } - }); - Ok(()) - } -} - /// This system will handle loading generated chunks and unloading /// unneeded chunks. /// 1. Inserts newly generated chunks into the TerrainGrid @@ -117,7 +63,6 @@ impl<'a> System<'a> for Sys { ReadExpect<'a, SlowJobPool>, ReadExpect<'a, IndexOwned>, ReadExpect<'a, Arc>, - ReadExpect<'a, NetworkRequestMetrics>, WriteExpect<'a, ChunkGenerator>, WriteExpect<'a, TerrainGrid>, Write<'a, TerrainChanges>, @@ -128,6 +73,7 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Presence>, ReadStorage<'a, Client>, Entities<'a>, + WriteStorage<'a, ChunkSendQueue>, WriteStorage<'a, RepositionOnChunkLoad>, WriteStorage<'a, ForceUpdate>, WriteStorage<'a, Waypoint>, @@ -150,7 +96,6 @@ impl<'a> System<'a> for Sys { slow_jobs, index, world, - network_metrics, mut chunk_generator, mut terrain, mut terrain_changes, @@ -161,6 +106,7 @@ impl<'a> System<'a> for Sys { presences, clients, entities, + mut chunk_send_queues, mut reposition_on_load, mut force_update, mut waypoints, @@ -306,13 +252,10 @@ impl<'a> System<'a> for Sys { } // Send the chunk to all nearby players. - use rayon::iter::{IntoParallelIterator, ParallelIterator}; - new_chunks.into_par_iter().for_each(|(key, chunk)| { - let mut lazy_msg = LazyTerrainMessage::new(); - - (&presences, &positions, &clients) + new_chunks.into_iter().for_each(|(key, chunk)| { + (&presences, &positions, &clients, &mut chunk_send_queues) .join() - .for_each(|(presence, pos, client)| { + .for_each(|(presence, pos, client, chunk_send_queue)| { let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32)); // Subtract 2 from the offset before computing squared magnitude // 1 since chunks need neighbors to be meshed @@ -322,15 +265,7 @@ impl<'a> System<'a> for Sys { .magnitude_squared(); if adjusted_dist_sqr <= presence.view_distance.pow(2) { - lazy_msg - .prepare_and_send::( - &network_metrics, - client, - presence, - &key, - || Ok(&*chunk), - ) - .into_ok(); + chunk_send_queue.chunks.push(key); } }); }); diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index e737c1c582..119cffd048 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -1,10 +1,9 @@ -use super::terrain::LazyTerrainMessage; -use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence}; +use crate::{chunk_serialize::ChunkSendQueue, client::Client, presence::Presence}; use common::{comp::Pos, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{CompressedData, ServerGeneral}; use common_state::TerrainChanges; -use specs::{Join, Read, ReadExpect, ReadStorage}; +use specs::{Join, Read, ReadExpect, ReadStorage, WriteStorage}; /// This systems sends new chunks to clients as well as changes to existing /// chunks @@ -14,10 +13,10 @@ impl<'a> System<'a> for Sys { type SystemData = ( ReadExpect<'a, TerrainGrid>, Read<'a, TerrainChanges>, + WriteStorage<'a, ChunkSendQueue>, ReadStorage<'a, Pos>, ReadStorage<'a, Presence>, ReadStorage<'a, Client>, - ReadExpect<'a, NetworkRequestMetrics>, ); const NAME: &'static str = "terrain_sync"; @@ -26,23 +25,16 @@ impl<'a> System<'a> for Sys { fn run( _job: &mut Job, - (terrain, terrain_changes, positions, presences, clients, network_metrics): Self::SystemData, + (terrain, terrain_changes, mut chunk_send_queues, positions, presences, clients): Self::SystemData, ) { // Sync changed chunks - 'chunk: for chunk_key in &terrain_changes.modified_chunks { - let mut lazy_msg = LazyTerrainMessage::new(); - for (presence, pos, client) in (&presences, &positions, &clients).join() { + for chunk_key in &terrain_changes.modified_chunks { + for (presence, pos, chunk_send_queue) in + (&presences, &positions, &mut chunk_send_queues).join() + { if super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance) { - if let Err(()) = lazy_msg.prepare_and_send( - &network_metrics, - client, - presence, - chunk_key, - || terrain.get_key(*chunk_key).ok_or(()), - ) { - break 'chunk; - } + chunk_send_queue.chunks.push(*chunk_key); } } }