switch from a Component to a Ressource with an Eventbus

Set CHUNK_SITE to 10 which results in a mean of 13ms per Slowjob. Its good if it stays under 30ms so it has less influence on ticks.
Some performance values measured with a AMD Ryzen 1700X:
 - voxygen and server and swarm (25 clients, 10 vd) on one machine.
 - total runtime was 240s
 - CHUNK_GENERATOR total time is 486s with a mean of 40ms
 - CHUNK_SERIALIZER total time is 18.19s with a mean of 13ms, so in total its a order of magnitude lower
   Trancy confirms this, the Serialize backlog is usually handled within 1-2 ticks.
 - terrain::sys total time 1.2s, mean 188us
 - msg::terrain::sys total time 812ms, mean 125us
 - terrain::sync total time 12ms, mean 1,85us
 - chunk_serialize::sys total time 69ms, mean 10us
 - chunk_send::sys total time 50ms, mean 7us

so all in all total time for serializsation is 20.33 of which 89% are spend outside of the ECS
This commit is contained in:
Marcel Märtens 2022-05-11 17:52:22 +02:00
parent efe284a673
commit 637e63fbe2
6 changed files with 182 additions and 179 deletions

View File

@ -1,22 +1,22 @@
use crate::client::PreparedMsg; use crate::client::PreparedMsg;
use specs::{Component, Entity}; use specs::Entity;
use specs_idvs::IdvStorage;
use vek::Vec2; use vek::Vec2;
/// Curing the runtime of a tick, multiple systems can request a chunk to be /// Sending a chunk to the user works the following way:
/// synced to a client E.g. msg::terrain will do so, when a client requested a /// A system like `msg::terrain` `terrain` or `terrain_sync` either decide to
/// chunk that already exist terrain will do so when a chunk came back from /// trigger chunk generation, or if the chunk already exists
/// ChunkGeneration. All those sends are deferred by this queue. /// push a `ChunkSendQueue` to the eventbus.
/// The `chunk_serialize` system will coordinate serializing via a SlowJob
/// outside of the tick. On the next tick, the `chunk_send` system will pick up
/// finished chunks.
///
/// Deferring allows us to remove code duplication and maybe serialize ONCE, /// Deferring allows us to remove code duplication and maybe serialize ONCE,
/// send to MULTIPLE clients TODO: store a urgent flag and seperate even more, 5 /// send to MULTIPLE clients
/// ticks vs 5 seconds /// TODO: store a urgent flag and seperate even more, 5 ticks vs 5 seconds
#[derive(Default, Clone, Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub struct ChunkSendQueue { pub struct ChunkSendQueue {
pub chunks: Vec<Vec2<i32>>, pub(crate) entity: Entity,
} pub(crate) chunk_key: Vec2<i32>,
impl Component for ChunkSendQueue {
type Storage = IdvStorage<Self>;
} }
pub struct SerializedChunk { pub struct SerializedChunk {

View File

@ -247,6 +247,9 @@ impl Server {
}); });
state.ecs_mut().insert(EventBus::<ServerEvent>::default()); state.ecs_mut().insert(EventBus::<ServerEvent>::default());
state.ecs_mut().insert(Vec::<ChunkRequest>::new()); state.ecs_mut().insert(Vec::<ChunkRequest>::new());
state
.ecs_mut()
.insert(EventBus::<chunk_serialize::ChunkSendQueue>::default());
state.ecs_mut().insert(Locations::default()); state.ecs_mut().insert(Locations::default());
state.ecs_mut().insert(LoginProvider::new( state.ecs_mut().insert(LoginProvider::new(
settings.auth_server_address.clone(), settings.auth_server_address.clone(),
@ -329,9 +332,6 @@ impl Server {
state.ecs_mut().register::<comp::Pet>(); state.ecs_mut().register::<comp::Pet>();
state.ecs_mut().register::<login_provider::PendingLogin>(); state.ecs_mut().register::<login_provider::PendingLogin>();
state.ecs_mut().register::<RepositionOnChunkLoad>(); state.ecs_mut().register::<RepositionOnChunkLoad>();
state
.ecs_mut()
.register::<chunk_serialize::ChunkSendQueue>();
//Alias validator //Alias validator
let banned_words_paths = &settings.banned_words_files; let banned_words_paths = &settings.banned_words_files;

View File

@ -5,13 +5,13 @@ use crate::{
presence::Presence, presence::Presence,
Tick, Tick,
}; };
use common::{slowjob::SlowJobPool, terrain::TerrainGrid}; use common::{event::EventBus, slowjob::SlowJobPool, terrain::TerrainGrid};
use common_ecs::{Job, Origin, Phase, System}; use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{SerializedTerrainChunk, ServerGeneral}; use common_net::msg::{SerializedTerrainChunk, ServerGeneral};
use hashbrown::HashMap; use hashbrown::{hash_map::Entry, HashMap};
use network::StreamParams; use network::StreamParams;
use specs::{Entities, Entity, Join, Read, ReadExpect, ReadStorage, WriteStorage}; use specs::{Entity, Read, ReadExpect, ReadStorage};
use std::{cmp::Ordering, sync::Arc}; use std::sync::Arc;
/// This system will handle sending terrain to clients by /// This system will handle sending terrain to clients by
/// collecting chunks that need to be send for a single generation run and then /// collecting chunks that need to be send for a single generation run and then
@ -21,10 +21,9 @@ pub struct Sys;
impl<'a> System<'a> for Sys { impl<'a> System<'a> for Sys {
type SystemData = ( type SystemData = (
Read<'a, Tick>, Read<'a, Tick>,
Entities<'a>,
ReadStorage<'a, Client>, ReadStorage<'a, Client>,
ReadStorage<'a, Presence>, ReadStorage<'a, Presence>,
WriteStorage<'a, ChunkSendQueue>, ReadExpect<'a, EventBus<ChunkSendQueue>>,
ReadExpect<'a, NetworkRequestMetrics>, ReadExpect<'a, NetworkRequestMetrics>,
ReadExpect<'a, SlowJobPool>, ReadExpect<'a, SlowJobPool>,
ReadExpect<'a, TerrainGrid>, ReadExpect<'a, TerrainGrid>,
@ -39,10 +38,9 @@ impl<'a> System<'a> for Sys {
_job: &mut Job<Self>, _job: &mut Job<Self>,
( (
tick, tick,
entities,
clients, clients,
presences, presences,
mut chunk_send_queues, chunk_send_queues_bus,
network_metrics, network_metrics,
slow_jobs, slow_jobs,
terrain, terrain,
@ -55,57 +53,48 @@ impl<'a> System<'a> for Sys {
return; return;
} }
for entity in (&entities, &clients, &presences, !&chunk_send_queues)
.join()
.map(|(e, _, _, _)| e)
.collect::<Vec<_>>()
{
let _ = chunk_send_queues.insert(entity, ChunkSendQueue::default());
}
struct Metadata { struct Metadata {
recipients: Vec<Entity>, recipients: Vec<Entity>,
lossy_compression: bool, lossy_compression: bool,
params: StreamParams, params: StreamParams,
} }
// collect all deduped entities that request a chunk
let mut chunks = HashMap::<_, Metadata>::new(); let mut chunks = HashMap::<_, Metadata>::new();
// Grab all chunk requests for all clients and sort them
let mut requests = 0u64; let mut requests = 0u64;
let mut distinct_requests = 0u64; let mut distinct_requests = 0u64;
for (entity, client, presence, chunk_send_queue) in
(&entities, &clients, &presences, &mut chunk_send_queues).join() for queue_entry in chunk_send_queues_bus.recv_all() {
{ let entry = chunks.entry(queue_entry.chunk_key);
let mut chunk_send_queue = std::mem::take(chunk_send_queue); let meta = match entry {
// dedup input Entry::Vacant(ve) => {
chunk_send_queue.chunks.sort_by(|a, b| { match clients.get(queue_entry.entity).map(|c| c.terrain_params()) {
let zero = a.x.partial_cmp(&b.x).unwrap_or(Ordering::Equal); Some(params) => {
let one = a.y.partial_cmp(&b.y).unwrap_or(Ordering::Equal); distinct_requests += 1;
if matches!(zero, Ordering::Equal) { ve.insert(Metadata {
one recipients: Vec::new(),
} else { lossy_compression: true,
zero params,
} })
}); },
chunk_send_queue.chunks.dedup(); None => continue,
requests += chunk_send_queue.chunks.len() as u64;
for chunk_key in chunk_send_queue.chunks {
let meta = chunks.entry(chunk_key).or_insert_with(|| {
distinct_requests += 1;
Metadata {
recipients: Vec::default(),
lossy_compression: true,
params: client.terrain_params(),
} }
}); },
meta.recipients.push(entity); Entry::Occupied(oe) => oe.into_mut(),
// We decide here, to ONLY send lossy compressed data If all clients want those. };
// If at least 1 client here does not want lossy we don't compress it twice.
// It would just be too expensive for the server // We decide here, to ONLY send lossy compressed data If all clients want those.
meta.lossy_compression = // If at least 1 client here does not want lossy we don't compress it twice.
meta.lossy_compression && presence.lossy_terrain_compression; // It would just be too expensive for the server
} meta.lossy_compression = meta.lossy_compression
&& presences
.get(queue_entry.entity)
.map(|p| p.lossy_terrain_compression)
.unwrap_or(true);
meta.recipients.push(queue_entry.entity);
requests += 1;
} }
network_metrics network_metrics
.chunks_serialisation_requests .chunks_serialisation_requests
.inc_by(requests); .inc_by(requests);
@ -114,7 +103,7 @@ impl<'a> System<'a> for Sys {
.inc_by(distinct_requests); .inc_by(distinct_requests);
// Trigger serialization in a SlowJob // Trigger serialization in a SlowJob
const CHUNK_SIZE: usize = 25; // trigger one job per 25 chunks to reduce SlowJob overhead. as we use a channel, there is no disadvantage to this const CHUNK_SIZE: usize = 10; // trigger one job per 10 chunks to reduce SlowJob overhead. as we use a channel, there is no disadvantage to this
let mut chunks_iter = chunks let mut chunks_iter = chunks
.into_iter() .into_iter()
.filter_map(|(chunk_key, meta)| { .filter_map(|(chunk_key, meta)| {
@ -129,7 +118,7 @@ impl<'a> System<'a> for Sys {
let chunks: Vec<_> = chunks_iter.by_ref().take(CHUNK_SIZE).collect(); let chunks: Vec<_> = chunks_iter.by_ref().take(CHUNK_SIZE).collect();
let chunk_sender = chunk_sender.clone(); let chunk_sender = chunk_sender.clone();
slow_jobs.spawn("CHUNK_SERIALIZER", move || { slow_jobs.spawn("CHUNK_SERIALIZER", move || {
for (chunk, chunk_key, meta) in chunks { for (chunk, chunk_key, mut meta) in chunks {
let msg = Client::prepare_terrain( let msg = Client::prepare_terrain(
ServerGeneral::TerrainChunkUpdate { ServerGeneral::TerrainChunkUpdate {
key: chunk_key, key: chunk_key,
@ -140,6 +129,8 @@ impl<'a> System<'a> for Sys {
}, },
&meta.params, &meta.params,
); );
meta.recipients.sort();
meta.recipients.dedup();
if let Err(e) = chunk_sender.send(SerializedChunk { if let Err(e) = chunk_sender.send(SerializedChunk {
lossy_compression: meta.lossy_compression, lossy_compression: meta.lossy_compression,
msg, msg,

View File

@ -12,7 +12,7 @@ use common::{
use common_ecs::{Job, Origin, ParMode, Phase, System}; use common_ecs::{Job, Origin, ParMode, Phase, System};
use common_net::msg::{ClientGeneral, ServerGeneral}; use common_net::msg::{ClientGeneral, ServerGeneral};
use rayon::iter::ParallelIterator; use rayon::iter::ParallelIterator;
use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write, WriteStorage}; use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write};
use tracing::{debug, trace}; use tracing::{debug, trace};
/// This system will handle new messages from clients /// This system will handle new messages from clients
@ -22,7 +22,7 @@ impl<'a> System<'a> for Sys {
type SystemData = ( type SystemData = (
Entities<'a>, Entities<'a>,
Read<'a, EventBus<ServerEvent>>, Read<'a, EventBus<ServerEvent>>,
WriteStorage<'a, ChunkSendQueue>, ReadExpect<'a, EventBus<ChunkSendQueue>>,
ReadExpect<'a, TerrainGrid>, ReadExpect<'a, TerrainGrid>,
ReadExpect<'a, Lod>, ReadExpect<'a, Lod>,
ReadExpect<'a, NetworkRequestMetrics>, ReadExpect<'a, NetworkRequestMetrics>,
@ -41,7 +41,7 @@ impl<'a> System<'a> for Sys {
( (
entities, entities,
server_event_bus, server_event_bus,
mut chunk_send_queues, chunk_send_bus,
terrain, terrain,
lod, lod,
network_metrics, network_metrics,
@ -52,95 +52,96 @@ impl<'a> System<'a> for Sys {
): Self::SystemData, ): Self::SystemData,
) { ) {
job.cpu_stats.measure(ParMode::Rayon); job.cpu_stats.measure(ParMode::Rayon);
let mut new_chunk_requests = ( let mut new_chunk_requests = (&entities, &clients, (&presences).maybe())
&entities,
&clients,
(&presences).maybe(),
&mut chunk_send_queues,
)
.par_join() .par_join()
.map(|(entity, client, maybe_presence, chunk_send_queue)| { .map_init(
let mut chunk_requests = Vec::new(); || (chunk_send_bus.emitter(), server_event_bus.emitter()),
let _ = super::try_recv_all(client, 5, |_, msg| { |(chunk_send_emitter, server_emitter), (entity, client, maybe_presence)| {
// TODO: Refactor things (https://gitlab.com/veloren/veloren/-/merge_requests/3245#note_856538056) let mut chunk_requests = Vec::new();
let mut server_emitter = server_event_bus.emitter(); let _ = super::try_recv_all(client, 5, |_, msg| {
let presence = match maybe_presence { let presence = match maybe_presence {
Some(g) => g, Some(g) => g,
None => { None => {
debug!(?entity, "client is not in_game, ignoring msg"); debug!(?entity, "client is not in_game, ignoring msg");
trace!(?msg, "ignored msg content"); trace!(?msg, "ignored msg content");
if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) { if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) {
network_metrics.chunks_request_dropped.inc(); network_metrics.chunks_request_dropped.inc();
}
return Ok(());
},
};
match msg {
ClientGeneral::TerrainChunkRequest { key } => {
let in_vd = if let Some(pos) = positions.get(entity) {
pos.0.xy().map(|e| e as f64).distance_squared(
key.map(|e| e as f64 + 0.5)
* TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
) < ((presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt())
* TerrainChunkSize::RECT_SIZE.x as f64)
.powi(2)
} else {
true
};
if in_vd {
if terrain.get_key_arc(key).is_some() {
network_metrics.chunks_served_from_memory.inc();
chunk_send_queue.chunks.push(key);
} else {
network_metrics.chunks_generation_triggered.inc();
chunk_requests.push(ChunkRequest { entity, key });
} }
} else { return Ok(());
network_metrics.chunks_request_dropped.inc(); },
} };
}, match msg {
ClientGeneral::LodZoneRequest { key } => { ClientGeneral::TerrainChunkRequest { key } => {
client.send(ServerGeneral::LodZoneUpdate { let in_vd = if let Some(pos) = positions.get(entity) {
key, pos.0.xy().map(|e| e as f64).distance_squared(
zone: lod.zone(key).clone(), key.map(|e| e as f64 + 0.5)
})?; * TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
}, ) < ((presence.view_distance as f64 - 1.0
_ => { + 2.5 * 2.0_f64.sqrt())
debug!( * TerrainChunkSize::RECT_SIZE.x as f64)
"Kicking possibly misbehaving client due to invalud terrain \ .powi(2)
request" } else {
); true
server_emitter.emit(ServerEvent::ClientDisconnect( };
entity, if in_vd {
common::comp::DisconnectReason::NetworkError, if terrain.get_key_arc(key).is_some() {
)); network_metrics.chunks_served_from_memory.inc();
}, chunk_send_emitter.emit(ChunkSendQueue {
} chunk_key: key,
Ok(()) entity,
}); });
} else {
network_metrics.chunks_generation_triggered.inc();
chunk_requests.push(ChunkRequest { entity, key });
}
} else {
network_metrics.chunks_request_dropped.inc();
}
},
ClientGeneral::LodZoneRequest { key } => {
client.send(ServerGeneral::LodZoneUpdate {
key,
zone: lod.zone(key).clone(),
})?;
},
_ => {
debug!(
"Kicking possibly misbehaving client due to invalud terrain \
request"
);
server_emitter.emit(ServerEvent::ClientDisconnect(
entity,
common::comp::DisconnectReason::NetworkError,
));
},
}
Ok(())
});
// Load a minimum radius of chunks around each player. // Load a minimum radius of chunks around each player.
// This is used to prevent view distance reloading exploits and make sure that // This is used to prevent view distance reloading exploits and make sure that
// entity simulation occurs within a minimum radius around the // entity simulation occurs within a minimum radius around the
// player. // player.
if let Some(pos) = positions.get(entity) { if let Some(pos) = positions.get(entity) {
let player_chunk = pos let player_chunk = pos
.0 .0
.xy() .xy()
.map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32); .map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32);
for rpos in Spiral2d::new().take((crate::MIN_VD as usize + 1).pow(2)) { for rpos in Spiral2d::new().take((crate::MIN_VD as usize + 1).pow(2)) {
let key = player_chunk + rpos; let key = player_chunk + rpos;
if terrain.get_key(key).is_none() { if terrain.get_key(key).is_none() {
// TODO: @zesterer do we want to be sending these chunk to the client // TODO: @zesterer do we want to be sending these chunk to the
// even if they aren't requested? If we don't we could replace the // client even if they aren't
// entity here with Option<Entity> and pass in None. // requested? If we don't we could replace the
chunk_requests.push(ChunkRequest { entity, key }); // entity here with Option<Entity> and pass in None.
chunk_requests.push(ChunkRequest { entity, key });
}
} }
} }
}
chunk_requests chunk_requests
}) },
)
.flatten() .flatten()
.collect::<Vec<_>>(); .collect::<Vec<_>>();

View File

@ -63,6 +63,7 @@ impl<'a> System<'a> for Sys {
ReadExpect<'a, SlowJobPool>, ReadExpect<'a, SlowJobPool>,
ReadExpect<'a, IndexOwned>, ReadExpect<'a, IndexOwned>,
ReadExpect<'a, Arc<World>>, ReadExpect<'a, Arc<World>>,
ReadExpect<'a, EventBus<ChunkSendQueue>>,
WriteExpect<'a, ChunkGenerator>, WriteExpect<'a, ChunkGenerator>,
WriteExpect<'a, TerrainGrid>, WriteExpect<'a, TerrainGrid>,
Write<'a, TerrainChanges>, Write<'a, TerrainChanges>,
@ -73,7 +74,6 @@ impl<'a> System<'a> for Sys {
ReadStorage<'a, Presence>, ReadStorage<'a, Presence>,
ReadStorage<'a, Client>, ReadStorage<'a, Client>,
Entities<'a>, Entities<'a>,
WriteStorage<'a, ChunkSendQueue>,
WriteStorage<'a, RepositionOnChunkLoad>, WriteStorage<'a, RepositionOnChunkLoad>,
WriteStorage<'a, ForceUpdate>, WriteStorage<'a, ForceUpdate>,
WriteStorage<'a, Waypoint>, WriteStorage<'a, Waypoint>,
@ -96,6 +96,7 @@ impl<'a> System<'a> for Sys {
slow_jobs, slow_jobs,
index, index,
world, world,
chunk_send_bus,
mut chunk_generator, mut chunk_generator,
mut terrain, mut terrain,
mut terrain_changes, mut terrain_changes,
@ -106,7 +107,6 @@ impl<'a> System<'a> for Sys {
presences, presences,
clients, clients,
entities, entities,
mut chunk_send_queues,
mut reposition_on_load, mut reposition_on_load,
mut force_update, mut force_update,
mut waypoints, mut waypoints,
@ -252,23 +252,30 @@ impl<'a> System<'a> for Sys {
} }
// Send the chunk to all nearby players. // Send the chunk to all nearby players.
new_chunks.into_iter().for_each(|(key, _chunk)| { use rayon::iter::{IntoParallelIterator, ParallelIterator};
(&presences, &positions, &clients, &mut chunk_send_queues) new_chunks.into_par_iter().for_each_init(
.join() || chunk_send_bus.emitter(),
.for_each(|(presence, pos, _client, chunk_send_queue)| { |chunk_send_emitter, (key, _chunk)| {
let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32)); (&entities, &presences, &positions, &clients)
// Subtract 2 from the offset before computing squared magnitude .join()
// 1 since chunks need neighbors to be meshed .for_each(|(entity, presence, pos, _client)| {
// 1 to act as a buffer if the player moves in that direction let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32));
let adjusted_dist_sqr = (chunk_pos - key) // Subtract 2 from the offset before computing squared magnitude
.map(|e: i32| (e.unsigned_abs()).saturating_sub(2)) // 1 since chunks need neighbors to be meshed
.magnitude_squared(); // 1 to act as a buffer if the player moves in that direction
let adjusted_dist_sqr = (chunk_pos - key)
.map(|e: i32| (e.unsigned_abs()).saturating_sub(2))
.magnitude_squared();
if adjusted_dist_sqr <= presence.view_distance.pow(2) { if adjusted_dist_sqr <= presence.view_distance.pow(2) {
chunk_send_queue.chunks.push(key); chunk_send_emitter.emit(ChunkSendQueue {
} entity,
}); chunk_key: key,
}); });
}
});
},
);
// Remove chunks that are too far from players. // Remove chunks that are too far from players.
let mut chunks_to_remove = Vec::new(); let mut chunks_to_remove = Vec::new();

View File

@ -1,9 +1,9 @@
use crate::{chunk_serialize::ChunkSendQueue, client::Client, presence::Presence}; use crate::{chunk_serialize::ChunkSendQueue, client::Client, presence::Presence};
use common::{comp::Pos, terrain::TerrainGrid}; use common::{comp::Pos, event::EventBus, terrain::TerrainGrid};
use common_ecs::{Job, Origin, Phase, System}; use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{CompressedData, ServerGeneral}; use common_net::msg::{CompressedData, ServerGeneral};
use common_state::TerrainChanges; use common_state::TerrainChanges;
use specs::{Join, Read, ReadExpect, ReadStorage, WriteStorage}; use specs::{Entities, Join, Read, ReadExpect, ReadStorage};
/// This systems sends new chunks to clients as well as changes to existing /// This systems sends new chunks to clients as well as changes to existing
/// chunks /// chunks
@ -11,9 +11,10 @@ use specs::{Join, Read, ReadExpect, ReadStorage, WriteStorage};
pub struct Sys; pub struct Sys;
impl<'a> System<'a> for Sys { impl<'a> System<'a> for Sys {
type SystemData = ( type SystemData = (
Entities<'a>,
ReadExpect<'a, TerrainGrid>, ReadExpect<'a, TerrainGrid>,
Read<'a, TerrainChanges>, Read<'a, TerrainChanges>,
WriteStorage<'a, ChunkSendQueue>, ReadExpect<'a, EventBus<ChunkSendQueue>>,
ReadStorage<'a, Pos>, ReadStorage<'a, Pos>,
ReadStorage<'a, Presence>, ReadStorage<'a, Presence>,
ReadStorage<'a, Client>, ReadStorage<'a, Client>,
@ -25,16 +26,19 @@ impl<'a> System<'a> for Sys {
fn run( fn run(
_job: &mut Job<Self>, _job: &mut Job<Self>,
(terrain, terrain_changes, mut chunk_send_queues, positions, presences, clients): Self::SystemData, (entities, terrain, terrain_changes, chunk_send_bus, positions, presences, clients): Self::SystemData,
) { ) {
let mut chunk_send_emitter = chunk_send_bus.emitter();
// Sync changed chunks // Sync changed chunks
for chunk_key in &terrain_changes.modified_chunks { for chunk_key in &terrain_changes.modified_chunks {
for (presence, pos, chunk_send_queue) in for (entity, presence, pos) in (&entities, &presences, &positions).join() {
(&presences, &positions, &mut chunk_send_queues).join()
{
if super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance) if super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance)
{ {
chunk_send_queue.chunks.push(*chunk_key); chunk_send_emitter.emit(ChunkSendQueue {
entity,
chunk_key: *chunk_key,
});
} }
} }
} }