diff --git a/server/src/chunk_serialize.rs b/server/src/chunk_serialize.rs index 2755ce15bd..9929f056fa 100644 --- a/server/src/chunk_serialize.rs +++ b/server/src/chunk_serialize.rs @@ -1,22 +1,22 @@ use crate::client::PreparedMsg; -use specs::{Component, Entity}; -use specs_idvs::IdvStorage; +use specs::Entity; use vek::Vec2; -/// Curing the runtime of a tick, multiple systems can request a chunk to be -/// synced to a client E.g. msg::terrain will do so, when a client requested a -/// chunk that already exist terrain will do so when a chunk came back from -/// ChunkGeneration. All those sends are deferred by this queue. +/// Sending a chunk to the user works the following way: +/// A system like `msg::terrain` `terrain` or `terrain_sync` either decide to +/// trigger chunk generation, or if the chunk already exists +/// push a `ChunkSendQueue` to the eventbus. +/// The `chunk_serialize` system will coordinate serializing via a SlowJob +/// outside of the tick. On the next tick, the `chunk_send` system will pick up +/// finished chunks. +/// /// Deferring allows us to remove code duplication and maybe serialize ONCE, -/// send to MULTIPLE clients TODO: store a urgent flag and seperate even more, 5 -/// ticks vs 5 seconds -#[derive(Default, Clone, Debug, PartialEq)] +/// send to MULTIPLE clients +/// TODO: store a urgent flag and seperate even more, 5 ticks vs 5 seconds +#[derive(Debug, PartialEq)] pub struct ChunkSendQueue { - pub chunks: Vec>, -} - -impl Component for ChunkSendQueue { - type Storage = IdvStorage; + pub(crate) entity: Entity, + pub(crate) chunk_key: Vec2, } pub struct SerializedChunk { diff --git a/server/src/lib.rs b/server/src/lib.rs index b2ed1de77f..852767f29a 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -247,6 +247,9 @@ impl Server { }); state.ecs_mut().insert(EventBus::::default()); state.ecs_mut().insert(Vec::::new()); + state + .ecs_mut() + .insert(EventBus::::default()); state.ecs_mut().insert(Locations::default()); state.ecs_mut().insert(LoginProvider::new( settings.auth_server_address.clone(), @@ -329,9 +332,6 @@ impl Server { state.ecs_mut().register::(); state.ecs_mut().register::(); state.ecs_mut().register::(); - state - .ecs_mut() - .register::(); //Alias validator let banned_words_paths = &settings.banned_words_files; diff --git a/server/src/sys/chunk_serialize.rs b/server/src/sys/chunk_serialize.rs index 724e757309..2325c31b3a 100644 --- a/server/src/sys/chunk_serialize.rs +++ b/server/src/sys/chunk_serialize.rs @@ -5,13 +5,13 @@ use crate::{ presence::Presence, Tick, }; -use common::{slowjob::SlowJobPool, terrain::TerrainGrid}; +use common::{event::EventBus, slowjob::SlowJobPool, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{SerializedTerrainChunk, ServerGeneral}; -use hashbrown::HashMap; +use hashbrown::{hash_map::Entry, HashMap}; use network::StreamParams; -use specs::{Entities, Entity, Join, Read, ReadExpect, ReadStorage, WriteStorage}; -use std::{cmp::Ordering, sync::Arc}; +use specs::{Entity, Read, ReadExpect, ReadStorage}; +use std::sync::Arc; /// This system will handle sending terrain to clients by /// collecting chunks that need to be send for a single generation run and then @@ -21,10 +21,9 @@ pub struct Sys; impl<'a> System<'a> for Sys { type SystemData = ( Read<'a, Tick>, - Entities<'a>, ReadStorage<'a, Client>, ReadStorage<'a, Presence>, - WriteStorage<'a, ChunkSendQueue>, + ReadExpect<'a, EventBus>, ReadExpect<'a, NetworkRequestMetrics>, ReadExpect<'a, SlowJobPool>, ReadExpect<'a, TerrainGrid>, @@ -39,10 +38,9 @@ impl<'a> System<'a> for Sys { _job: &mut Job, ( tick, - entities, clients, presences, - mut chunk_send_queues, + chunk_send_queues_bus, network_metrics, slow_jobs, terrain, @@ -55,57 +53,48 @@ impl<'a> System<'a> for Sys { return; } - for entity in (&entities, &clients, &presences, !&chunk_send_queues) - .join() - .map(|(e, _, _, _)| e) - .collect::>() - { - let _ = chunk_send_queues.insert(entity, ChunkSendQueue::default()); - } - struct Metadata { recipients: Vec, lossy_compression: bool, params: StreamParams, } + // collect all deduped entities that request a chunk let mut chunks = HashMap::<_, Metadata>::new(); - // Grab all chunk requests for all clients and sort them let mut requests = 0u64; let mut distinct_requests = 0u64; - for (entity, client, presence, chunk_send_queue) in - (&entities, &clients, &presences, &mut chunk_send_queues).join() - { - let mut chunk_send_queue = std::mem::take(chunk_send_queue); - // dedup input - chunk_send_queue.chunks.sort_by(|a, b| { - let zero = a.x.partial_cmp(&b.x).unwrap_or(Ordering::Equal); - let one = a.y.partial_cmp(&b.y).unwrap_or(Ordering::Equal); - if matches!(zero, Ordering::Equal) { - one - } else { - zero - } - }); - chunk_send_queue.chunks.dedup(); - requests += chunk_send_queue.chunks.len() as u64; - for chunk_key in chunk_send_queue.chunks { - let meta = chunks.entry(chunk_key).or_insert_with(|| { - distinct_requests += 1; - Metadata { - recipients: Vec::default(), - lossy_compression: true, - params: client.terrain_params(), + + for queue_entry in chunk_send_queues_bus.recv_all() { + let entry = chunks.entry(queue_entry.chunk_key); + let meta = match entry { + Entry::Vacant(ve) => { + match clients.get(queue_entry.entity).map(|c| c.terrain_params()) { + Some(params) => { + distinct_requests += 1; + ve.insert(Metadata { + recipients: Vec::new(), + lossy_compression: true, + params, + }) + }, + None => continue, } - }); - meta.recipients.push(entity); - // We decide here, to ONLY send lossy compressed data If all clients want those. - // If at least 1 client here does not want lossy we don't compress it twice. - // It would just be too expensive for the server - meta.lossy_compression = - meta.lossy_compression && presence.lossy_terrain_compression; - } + }, + Entry::Occupied(oe) => oe.into_mut(), + }; + + // We decide here, to ONLY send lossy compressed data If all clients want those. + // If at least 1 client here does not want lossy we don't compress it twice. + // It would just be too expensive for the server + meta.lossy_compression = meta.lossy_compression + && presences + .get(queue_entry.entity) + .map(|p| p.lossy_terrain_compression) + .unwrap_or(true); + meta.recipients.push(queue_entry.entity); + requests += 1; } + network_metrics .chunks_serialisation_requests .inc_by(requests); @@ -114,7 +103,7 @@ impl<'a> System<'a> for Sys { .inc_by(distinct_requests); // Trigger serialization in a SlowJob - const CHUNK_SIZE: usize = 25; // trigger one job per 25 chunks to reduce SlowJob overhead. as we use a channel, there is no disadvantage to this + const CHUNK_SIZE: usize = 10; // trigger one job per 10 chunks to reduce SlowJob overhead. as we use a channel, there is no disadvantage to this let mut chunks_iter = chunks .into_iter() .filter_map(|(chunk_key, meta)| { @@ -129,7 +118,7 @@ impl<'a> System<'a> for Sys { let chunks: Vec<_> = chunks_iter.by_ref().take(CHUNK_SIZE).collect(); let chunk_sender = chunk_sender.clone(); slow_jobs.spawn("CHUNK_SERIALIZER", move || { - for (chunk, chunk_key, meta) in chunks { + for (chunk, chunk_key, mut meta) in chunks { let msg = Client::prepare_terrain( ServerGeneral::TerrainChunkUpdate { key: chunk_key, @@ -140,6 +129,8 @@ impl<'a> System<'a> for Sys { }, &meta.params, ); + meta.recipients.sort(); + meta.recipients.dedup(); if let Err(e) = chunk_sender.send(SerializedChunk { lossy_compression: meta.lossy_compression, msg, diff --git a/server/src/sys/msg/terrain.rs b/server/src/sys/msg/terrain.rs index 6b492394ab..3fef026079 100644 --- a/server/src/sys/msg/terrain.rs +++ b/server/src/sys/msg/terrain.rs @@ -12,7 +12,7 @@ use common::{ use common_ecs::{Job, Origin, ParMode, Phase, System}; use common_net::msg::{ClientGeneral, ServerGeneral}; use rayon::iter::ParallelIterator; -use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write, WriteStorage}; +use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write}; use tracing::{debug, trace}; /// This system will handle new messages from clients @@ -22,7 +22,7 @@ impl<'a> System<'a> for Sys { type SystemData = ( Entities<'a>, Read<'a, EventBus>, - WriteStorage<'a, ChunkSendQueue>, + ReadExpect<'a, EventBus>, ReadExpect<'a, TerrainGrid>, ReadExpect<'a, Lod>, ReadExpect<'a, NetworkRequestMetrics>, @@ -41,7 +41,7 @@ impl<'a> System<'a> for Sys { ( entities, server_event_bus, - mut chunk_send_queues, + chunk_send_bus, terrain, lod, network_metrics, @@ -52,95 +52,96 @@ impl<'a> System<'a> for Sys { ): Self::SystemData, ) { job.cpu_stats.measure(ParMode::Rayon); - let mut new_chunk_requests = ( - &entities, - &clients, - (&presences).maybe(), - &mut chunk_send_queues, - ) + let mut new_chunk_requests = (&entities, &clients, (&presences).maybe()) .par_join() - .map(|(entity, client, maybe_presence, chunk_send_queue)| { - let mut chunk_requests = Vec::new(); - let _ = super::try_recv_all(client, 5, |_, msg| { - // TODO: Refactor things (https://gitlab.com/veloren/veloren/-/merge_requests/3245#note_856538056) - let mut server_emitter = server_event_bus.emitter(); - let presence = match maybe_presence { - Some(g) => g, - None => { - debug!(?entity, "client is not in_game, ignoring msg"); - trace!(?msg, "ignored msg content"); - if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) { - network_metrics.chunks_request_dropped.inc(); - } - return Ok(()); - }, - }; - match msg { - ClientGeneral::TerrainChunkRequest { key } => { - let in_vd = if let Some(pos) = positions.get(entity) { - pos.0.xy().map(|e| e as f64).distance_squared( - key.map(|e| e as f64 + 0.5) - * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), - ) < ((presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt()) - * TerrainChunkSize::RECT_SIZE.x as f64) - .powi(2) - } else { - true - }; - if in_vd { - if terrain.get_key_arc(key).is_some() { - network_metrics.chunks_served_from_memory.inc(); - chunk_send_queue.chunks.push(key); - } else { - network_metrics.chunks_generation_triggered.inc(); - chunk_requests.push(ChunkRequest { entity, key }); + .map_init( + || (chunk_send_bus.emitter(), server_event_bus.emitter()), + |(chunk_send_emitter, server_emitter), (entity, client, maybe_presence)| { + let mut chunk_requests = Vec::new(); + let _ = super::try_recv_all(client, 5, |_, msg| { + let presence = match maybe_presence { + Some(g) => g, + None => { + debug!(?entity, "client is not in_game, ignoring msg"); + trace!(?msg, "ignored msg content"); + if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) { + network_metrics.chunks_request_dropped.inc(); } - } else { - network_metrics.chunks_request_dropped.inc(); - } - }, - ClientGeneral::LodZoneRequest { key } => { - client.send(ServerGeneral::LodZoneUpdate { - key, - zone: lod.zone(key).clone(), - })?; - }, - _ => { - debug!( - "Kicking possibly misbehaving client due to invalud terrain \ - request" - ); - server_emitter.emit(ServerEvent::ClientDisconnect( - entity, - common::comp::DisconnectReason::NetworkError, - )); - }, - } - Ok(()) - }); + return Ok(()); + }, + }; + match msg { + ClientGeneral::TerrainChunkRequest { key } => { + let in_vd = if let Some(pos) = positions.get(entity) { + pos.0.xy().map(|e| e as f64).distance_squared( + key.map(|e| e as f64 + 0.5) + * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), + ) < ((presence.view_distance as f64 - 1.0 + + 2.5 * 2.0_f64.sqrt()) + * TerrainChunkSize::RECT_SIZE.x as f64) + .powi(2) + } else { + true + }; + if in_vd { + if terrain.get_key_arc(key).is_some() { + network_metrics.chunks_served_from_memory.inc(); + chunk_send_emitter.emit(ChunkSendQueue { + chunk_key: key, + entity, + }); + } else { + network_metrics.chunks_generation_triggered.inc(); + chunk_requests.push(ChunkRequest { entity, key }); + } + } else { + network_metrics.chunks_request_dropped.inc(); + } + }, + ClientGeneral::LodZoneRequest { key } => { + client.send(ServerGeneral::LodZoneUpdate { + key, + zone: lod.zone(key).clone(), + })?; + }, + _ => { + debug!( + "Kicking possibly misbehaving client due to invalud terrain \ + request" + ); + server_emitter.emit(ServerEvent::ClientDisconnect( + entity, + common::comp::DisconnectReason::NetworkError, + )); + }, + } + Ok(()) + }); - // Load a minimum radius of chunks around each player. - // This is used to prevent view distance reloading exploits and make sure that - // entity simulation occurs within a minimum radius around the - // player. - if let Some(pos) = positions.get(entity) { - let player_chunk = pos - .0 - .xy() - .map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32); - for rpos in Spiral2d::new().take((crate::MIN_VD as usize + 1).pow(2)) { - let key = player_chunk + rpos; - if terrain.get_key(key).is_none() { - // TODO: @zesterer do we want to be sending these chunk to the client - // even if they aren't requested? If we don't we could replace the - // entity here with Option and pass in None. - chunk_requests.push(ChunkRequest { entity, key }); + // Load a minimum radius of chunks around each player. + // This is used to prevent view distance reloading exploits and make sure that + // entity simulation occurs within a minimum radius around the + // player. + if let Some(pos) = positions.get(entity) { + let player_chunk = pos + .0 + .xy() + .map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32); + for rpos in Spiral2d::new().take((crate::MIN_VD as usize + 1).pow(2)) { + let key = player_chunk + rpos; + if terrain.get_key(key).is_none() { + // TODO: @zesterer do we want to be sending these chunk to the + // client even if they aren't + // requested? If we don't we could replace the + // entity here with Option and pass in None. + chunk_requests.push(ChunkRequest { entity, key }); + } } } - } - chunk_requests - }) + chunk_requests + }, + ) .flatten() .collect::>(); diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 70c453c2ea..c53cc4159c 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -63,6 +63,7 @@ impl<'a> System<'a> for Sys { ReadExpect<'a, SlowJobPool>, ReadExpect<'a, IndexOwned>, ReadExpect<'a, Arc>, + ReadExpect<'a, EventBus>, WriteExpect<'a, ChunkGenerator>, WriteExpect<'a, TerrainGrid>, Write<'a, TerrainChanges>, @@ -73,7 +74,6 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Presence>, ReadStorage<'a, Client>, Entities<'a>, - WriteStorage<'a, ChunkSendQueue>, WriteStorage<'a, RepositionOnChunkLoad>, WriteStorage<'a, ForceUpdate>, WriteStorage<'a, Waypoint>, @@ -96,6 +96,7 @@ impl<'a> System<'a> for Sys { slow_jobs, index, world, + chunk_send_bus, mut chunk_generator, mut terrain, mut terrain_changes, @@ -106,7 +107,6 @@ impl<'a> System<'a> for Sys { presences, clients, entities, - mut chunk_send_queues, mut reposition_on_load, mut force_update, mut waypoints, @@ -252,23 +252,30 @@ impl<'a> System<'a> for Sys { } // Send the chunk to all nearby players. - new_chunks.into_iter().for_each(|(key, _chunk)| { - (&presences, &positions, &clients, &mut chunk_send_queues) - .join() - .for_each(|(presence, pos, _client, chunk_send_queue)| { - let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32)); - // Subtract 2 from the offset before computing squared magnitude - // 1 since chunks need neighbors to be meshed - // 1 to act as a buffer if the player moves in that direction - let adjusted_dist_sqr = (chunk_pos - key) - .map(|e: i32| (e.unsigned_abs()).saturating_sub(2)) - .magnitude_squared(); + use rayon::iter::{IntoParallelIterator, ParallelIterator}; + new_chunks.into_par_iter().for_each_init( + || chunk_send_bus.emitter(), + |chunk_send_emitter, (key, _chunk)| { + (&entities, &presences, &positions, &clients) + .join() + .for_each(|(entity, presence, pos, _client)| { + let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32)); + // Subtract 2 from the offset before computing squared magnitude + // 1 since chunks need neighbors to be meshed + // 1 to act as a buffer if the player moves in that direction + let adjusted_dist_sqr = (chunk_pos - key) + .map(|e: i32| (e.unsigned_abs()).saturating_sub(2)) + .magnitude_squared(); - if adjusted_dist_sqr <= presence.view_distance.pow(2) { - chunk_send_queue.chunks.push(key); - } - }); - }); + if adjusted_dist_sqr <= presence.view_distance.pow(2) { + chunk_send_emitter.emit(ChunkSendQueue { + entity, + chunk_key: key, + }); + } + }); + }, + ); // Remove chunks that are too far from players. let mut chunks_to_remove = Vec::new(); diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index 119cffd048..ad1bd95659 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -1,9 +1,9 @@ use crate::{chunk_serialize::ChunkSendQueue, client::Client, presence::Presence}; -use common::{comp::Pos, terrain::TerrainGrid}; +use common::{comp::Pos, event::EventBus, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{CompressedData, ServerGeneral}; use common_state::TerrainChanges; -use specs::{Join, Read, ReadExpect, ReadStorage, WriteStorage}; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage}; /// This systems sends new chunks to clients as well as changes to existing /// chunks @@ -11,9 +11,10 @@ use specs::{Join, Read, ReadExpect, ReadStorage, WriteStorage}; pub struct Sys; impl<'a> System<'a> for Sys { type SystemData = ( + Entities<'a>, ReadExpect<'a, TerrainGrid>, Read<'a, TerrainChanges>, - WriteStorage<'a, ChunkSendQueue>, + ReadExpect<'a, EventBus>, ReadStorage<'a, Pos>, ReadStorage<'a, Presence>, ReadStorage<'a, Client>, @@ -25,16 +26,19 @@ impl<'a> System<'a> for Sys { fn run( _job: &mut Job, - (terrain, terrain_changes, mut chunk_send_queues, positions, presences, clients): Self::SystemData, + (entities, terrain, terrain_changes, chunk_send_bus, positions, presences, clients): Self::SystemData, ) { + let mut chunk_send_emitter = chunk_send_bus.emitter(); + // Sync changed chunks for chunk_key in &terrain_changes.modified_chunks { - for (presence, pos, chunk_send_queue) in - (&presences, &positions, &mut chunk_send_queues).join() - { + for (entity, presence, pos) in (&entities, &presences, &positions).join() { if super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance) { - chunk_send_queue.chunks.push(*chunk_key); + chunk_send_emitter.emit(ChunkSendQueue { + entity, + chunk_key: *chunk_key, + }); } } }