diff --git a/server/src/sys/msg/terrain.rs b/server/src/sys/msg/terrain.rs index abac7af6d0..cf00b83229 100644 --- a/server/src/sys/msg/terrain.rs +++ b/server/src/sys/msg/terrain.rs @@ -5,9 +5,10 @@ use common::{ terrain::{TerrainChunkSize, TerrainGrid}, vol::RectVolSize, }; -use common_ecs::{Job, Origin, Phase, System}; +use common_ecs::{Job, Origin, ParMode, Phase, System}; use common_net::msg::{ClientGeneral, ServerGeneral}; -use specs::{Entities, Join, Read, ReadExpect, ReadStorage}; +use rayon::iter::ParallelIterator; +use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage}; use tracing::{debug, trace}; /// This system will handle new messages from clients @@ -30,7 +31,7 @@ impl<'a> System<'a> for Sys { const PHASE: Phase = Phase::Create; fn run( - _job: &mut Job, + job: &mut Job, ( entities, server_event_bus, @@ -43,53 +44,65 @@ impl<'a> System<'a> for Sys { ) { let mut server_emitter = server_event_bus.emitter(); - for (entity, client, maybe_presence) in (&entities, &clients, (&presences).maybe()).join() { - let _ = super::try_recv_all(client, 5, |client, msg| { - let presence = match maybe_presence { - Some(g) => g, - None => { - debug!(?entity, "client is not in_game, ignoring msg"); - trace!(?msg, "ignored msg content"); - if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) { - network_metrics.chunks_request_dropped.inc(); - } - return Ok(()); - }, - }; - match msg { - ClientGeneral::TerrainChunkRequest { key } => { - let in_vd = if let Some(pos) = positions.get(entity) { - pos.0.xy().map(|e| e as f64).distance_squared( - key.map(|e| e as f64 + 0.5) - * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), - ) < ((presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt()) - * TerrainChunkSize::RECT_SIZE.x as f64) - .powi(2) - } else { - true - }; - if in_vd { - match terrain.get_key(key) { - Some(chunk) => { - network_metrics.chunks_served_from_memory.inc(); - client.send(ServerGeneral::TerrainChunkUpdate { - key, - chunk: Ok(Box::new(chunk.clone())), - })? - }, - None => { - network_metrics.chunks_generation_triggered.inc(); - server_emitter.emit(ServerEvent::ChunkRequest(entity, key)) - }, + job.cpu_stats.measure(ParMode::Rayon); + let mut events = (&entities, &clients, (&presences).maybe()) + .par_join() + .map(|(entity, client, maybe_presence)| { + let mut events = Vec::new(); + let _ = super::try_recv_all(client, 5, |client, msg| { + let presence = match maybe_presence { + Some(g) => g, + None => { + debug!(?entity, "client is not in_game, ignoring msg"); + trace!(?msg, "ignored msg content"); + if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) { + network_metrics.chunks_request_dropped.inc(); } - } else { - network_metrics.chunks_request_dropped.inc(); - } - }, - _ => tracing::error!("not a client_terrain msg"), - } - Ok(()) - }); + return Ok(()); + }, + }; + match msg { + ClientGeneral::TerrainChunkRequest { key } => { + let in_vd = if let Some(pos) = positions.get(entity) { + pos.0.xy().map(|e| e as f64).distance_squared( + key.map(|e| e as f64 + 0.5) + * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), + ) < ((presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt()) + * TerrainChunkSize::RECT_SIZE.x as f64) + .powi(2) + } else { + true + }; + if in_vd { + match terrain.get_key(key) { + Some(chunk) => { + network_metrics.chunks_served_from_memory.inc(); + client.send(ServerGeneral::TerrainChunkUpdate { + key, + chunk: Ok(Box::new(chunk.clone())), + })? + }, + None => { + network_metrics.chunks_generation_triggered.inc(); + events.push(ServerEvent::ChunkRequest(entity, key)); + }, + } + } else { + network_metrics.chunks_request_dropped.inc(); + } + }, + _ => tracing::error!("not a client_terrain msg"), + } + Ok(()) + }); + events + }) + .flatten() + .collect::>(); + + job.cpu_stats.measure(ParMode::Single); + for event in events.drain(..) { + server_emitter.emit(event); } } }