diff --git a/server/src/chunk_serialize.rs b/server/src/chunk_serialize.rs index 02473e9380..2755ce15bd 100644 --- a/server/src/chunk_serialize.rs +++ b/server/src/chunk_serialize.rs @@ -1,4 +1,5 @@ -use specs::Component; +use crate::client::PreparedMsg; +use specs::{Component, Entity}; use specs_idvs::IdvStorage; use vek::Vec2; @@ -17,3 +18,9 @@ pub struct ChunkSendQueue { impl Component for ChunkSendQueue { type Storage = IdvStorage; } + +pub struct SerializedChunk { + pub(crate) lossy_compression: bool, + pub(crate) msg: PreparedMsg, + pub(crate) recipients: Vec, +} diff --git a/server/src/client.rs b/server/src/client.rs index d357d3d4e1..3187fd0cc4 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -216,6 +216,21 @@ impl Client { } } + pub(crate) fn terrain_params(&self) -> StreamParams { self.terrain_stream_params.clone() } + + pub(crate) fn prepare_terrain( + terrain_chunk_update: ServerGeneral, + params: &StreamParams, + ) -> PreparedMsg { + if !matches!( + terrain_chunk_update, + ServerGeneral::TerrainChunkUpdate { .. } + ) { + unreachable!("You must not call this function without a terrain chunk update!") + } + PreparedMsg::new(5, &terrain_chunk_update, params) + } + pub(crate) fn recv( &self, stream_id: u8, diff --git a/server/src/lib.rs b/server/src/lib.rs index 576eb478d9..b2ed1de77f 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -283,13 +283,20 @@ impl Server { compiled with the feature. Terrain modifications will *not* be persisted." ); } - state - .ecs_mut() - .write_resource::() - .configure("CHUNK_GENERATOR", |n| n / 2 + n / 4); + { + let pool = state.ecs_mut().write_resource::(); + pool.configure("CHUNK_GENERATOR", |n| n / 2 + n / 4); + pool.configure("CHUNK_SERIALIZER", |n| n / 2); + } state .ecs_mut() .insert(ChunkGenerator::new(chunk_gen_metrics)); + { + let (sender, receiver) = + crossbeam_channel::bounded::(10_000); + state.ecs_mut().insert(sender); + state.ecs_mut().insert(receiver); + } state.ecs_mut().insert(CharacterUpdater::new( Arc::>::clone(&database_settings), diff --git a/server/src/sys/chunk_send.rs b/server/src/sys/chunk_send.rs new file mode 100644 index 0000000000..eb6d13390d --- /dev/null +++ b/server/src/sys/chunk_send.rs @@ -0,0 +1,37 @@ +use crate::{chunk_serialize::SerializedChunk, client::Client, metrics::NetworkRequestMetrics}; + +use common_ecs::{Job, Origin, Phase, System}; +use specs::{ReadExpect, ReadStorage}; + +/// This system will handle sending terrain to clients by +/// collecting chunks that need to be send for a single generation run and then +/// trigger a SlowJob for serialisation. +#[derive(Default)] +pub struct Sys; +impl<'a> System<'a> for Sys { + type SystemData = ( + ReadStorage<'a, Client>, + ReadExpect<'a, NetworkRequestMetrics>, + ReadExpect<'a, crossbeam_channel::Receiver>, + ); + + const NAME: &'static str = "chunk_send"; + const ORIGIN: Origin = Origin::Server; + const PHASE: Phase = Phase::Create; + + fn run(_job: &mut Job, (clients, network_metrics, chunk_receiver): Self::SystemData) { + for sc in chunk_receiver.try_iter() { + for recipient in sc.recipients { + if let Some(client) = clients.get(recipient) { + if client.send_prepared(&sc.msg).is_err() { + if sc.lossy_compression { + network_metrics.chunks_served_lossy.inc() + } else { + network_metrics.chunks_served_lossless.inc() + } + } + } + } + } + } +} diff --git a/server/src/sys/chunk_serialize.rs b/server/src/sys/chunk_serialize.rs index e10d1d216d..33575d64df 100644 --- a/server/src/sys/chunk_serialize.rs +++ b/server/src/sys/chunk_serialize.rs @@ -1,70 +1,16 @@ use crate::{ - chunk_serialize::ChunkSendQueue, client::Client, metrics::NetworkRequestMetrics, - presence::Presence, Tick, + chunk_serialize::{ChunkSendQueue, SerializedChunk}, + client::Client, + presence::Presence, + Tick, }; use common::{slowjob::SlowJobPool, terrain::TerrainGrid}; - use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{SerializedTerrainChunk, ServerGeneral}; use hashbrown::HashMap; -use specs::{Entities, Join, Read, ReadExpect, ReadStorage, WriteStorage}; -use std::cmp::Ordering; - -pub(crate) struct LazyTerrainMessage { - lazy_msg_lo: Option, - lazy_msg_hi: Option, -} - -pub const SAFE_ZONE_RADIUS: f32 = 200.0; - -impl LazyTerrainMessage { - pub(crate) fn new() -> Self { - Self { - lazy_msg_lo: None, - lazy_msg_hi: None, - } - } - - pub(crate) fn prepare_and_send< - 'a, - A, - F: FnOnce() -> Result<&'a common::terrain::TerrainChunk, A>, - >( - &mut self, - network_metrics: &NetworkRequestMetrics, - client: &Client, - presence: &Presence, - chunk_key: &vek::Vec2, - generate_chunk: F, - ) -> Result<(), A> { - let lazy_msg = if presence.lossy_terrain_compression { - &mut self.lazy_msg_lo - } else { - &mut self.lazy_msg_hi - }; - if lazy_msg.is_none() { - *lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { - key: *chunk_key, - chunk: Ok(match generate_chunk() { - Ok(chunk) => SerializedTerrainChunk::via_heuristic( - chunk, - presence.lossy_terrain_compression, - ), - Err(e) => return Err(e), - }), - })); - } - lazy_msg.as_ref().map(|msg| { - let _ = client.send_prepared(msg); - if presence.lossy_terrain_compression { - network_metrics.chunks_served_lossy.inc(); - } else { - network_metrics.chunks_served_lossless.inc(); - } - }); - Ok(()) - } -} +use network::StreamParams; +use specs::{Entities, Entity, Join, Read, ReadExpect, ReadStorage, WriteStorage}; +use std::{cmp::Ordering, sync::Arc}; /// This system will handle sending terrain to clients by /// collecting chunks that need to be send for a single generation run and then @@ -80,7 +26,7 @@ impl<'a> System<'a> for Sys { WriteStorage<'a, ChunkSendQueue>, ReadExpect<'a, SlowJobPool>, ReadExpect<'a, TerrainGrid>, - ReadExpect<'a, NetworkRequestMetrics>, + ReadExpect<'a, crossbeam_channel::Sender>, ); const NAME: &'static str = "chunk_serialize"; @@ -97,16 +43,15 @@ impl<'a> System<'a> for Sys { mut chunk_send_queues, slow_jobs, terrain, - network_metrics, + chunk_sender, ): Self::SystemData, ) { // Only operate once per second - if tick.0.rem_euclid(60) != 0 { + //TODO: move out of this system and now even spawn this. + if tick.0.rem_euclid(30) != 0 { return; } - let mut chunks = HashMap::<_, Vec<_>>::new(); - for entity in (&entities, &clients, &presences, !&chunk_send_queues) .join() .map(|(e, _, _, _)| e) @@ -115,9 +60,16 @@ impl<'a> System<'a> for Sys { let _ = chunk_send_queues.insert(entity, ChunkSendQueue::default()); } + struct Metadata { + recipients: Vec, + lossy_compression: bool, + params: StreamParams, + } + + let mut chunks = HashMap::<_, Metadata>::new(); // Grab all chunk requests for all clients and sort them - for (entity, _client, chunk_send_queue) in - (&entities, &clients, &mut chunk_send_queues).join() + for (entity, client, presence, chunk_send_queue) in + (&entities, &clients, &presences, &mut chunk_send_queues).join() { let mut chunk_send_queue = std::mem::take(chunk_send_queue); // dedup input @@ -132,29 +84,44 @@ impl<'a> System<'a> for Sys { }); chunk_send_queue.chunks.dedup(); for chunk_key in chunk_send_queue.chunks { - let recipients = chunks.entry(chunk_key).or_default(); - recipients.push(entity); + let meta = chunks.entry(chunk_key).or_insert_with(|| Metadata { + recipients: Vec::default(), + lossy_compression: true, + params: client.terrain_params(), + }); + meta.recipients.push(entity); + // We decide here, to ONLY send lossy compressed data If all clients want those. + // If at least 1 client here does not want lossy we don't compress it twice. + // It would just be too expensive for the server + meta.lossy_compression = + meta.lossy_compression && presence.lossy_terrain_compression; } } - if !chunks.is_empty() { - let len = chunks.len(); - print!("{}", len); - for (chunk_key, entities) in chunks { - let mut lazy_msg = LazyTerrainMessage::new(); - for entity in entities { - let client = clients.get(entity).unwrap(); - let presence = presences.get(entity).unwrap(); - if let Err(e) = lazy_msg.prepare_and_send( - &network_metrics, - client, - presence, - &chunk_key, - || terrain.get_key(chunk_key).ok_or(()), - ) { - tracing::error!(?e, "error sending chunk"); - } - } + // Trigger serialization in a SlowJob + for (chunk_key, meta) in chunks { + if let Some(chunk) = terrain.get_key_arc(chunk_key) { + let chunk = Arc::clone(chunk); + let chunk_sender = chunk_sender.clone(); + slow_jobs.spawn("CHUNK_SERIALIZER", move || { + let msg = Client::prepare_terrain( + ServerGeneral::TerrainChunkUpdate { + key: chunk_key, + chunk: Ok(SerializedTerrainChunk::via_heuristic( + &chunk, + meta.lossy_compression, + )), + }, + &meta.params, + ); + if let Err(e) = chunk_sender.send(SerializedChunk { + lossy_compression: meta.lossy_compression, + msg, + recipients: meta.recipients, + }) { + tracing::warn!(?e, "cannot send serialized chunk to sender") + }; + }); } } } diff --git a/server/src/sys/mod.rs b/server/src/sys/mod.rs index 28db903e55..56ac301431 100644 --- a/server/src/sys/mod.rs +++ b/server/src/sys/mod.rs @@ -1,4 +1,5 @@ pub mod agent; +pub mod chunk_send; pub mod chunk_serialize; pub mod entity_sync; pub mod invite_timeout; @@ -34,7 +35,10 @@ pub fn add_server_systems(dispatch_builder: &mut DispatcherBuilder) { dispatch::(dispatch_builder, &[]); dispatch::(dispatch_builder, &[]); dispatch::(dispatch_builder, &[]); + // no dependency, as we only work once per sec anyway. dispatch::(dispatch_builder, &[]); + // don't depend on chunk_serialize, as we assume everything is done in a SlowJow + dispatch::(dispatch_builder, &[]); } pub fn run_sync_systems(ecs: &mut specs::World) { diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 43c76b436a..70c453c2ea 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -252,10 +252,10 @@ impl<'a> System<'a> for Sys { } // Send the chunk to all nearby players. - new_chunks.into_iter().for_each(|(key, chunk)| { + new_chunks.into_iter().for_each(|(key, _chunk)| { (&presences, &positions, &clients, &mut chunk_send_queues) .join() - .for_each(|(presence, pos, client, chunk_send_queue)| { + .for_each(|(presence, pos, _client, chunk_send_queue)| { let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32)); // Subtract 2 from the offset before computing squared magnitude // 1 since chunks need neighbors to be meshed