Merge branch 'xMAC94x/terrain_sending' into 'master'

xmac94x/terrain sending

See merge request veloren/veloren!3368
This commit is contained in:
Marcel 2022-05-11 20:16:07 +00:00
commit 8c72ed9012
11 changed files with 398 additions and 197 deletions

View File

@ -1662,6 +1662,7 @@ impl Client {
self.state.remove_chunk(key);
}
let mut current_tick_send_chunk_requests = 0;
// Request chunks from the server.
self.loaded_distance = ((view_distance * TerrainChunkSize::RECT_SIZE.x) as f32).powi(2);
// +1 so we can find a chunk that's outside the vd for better fog
@ -1695,10 +1696,16 @@ impl Client {
for key in keys.iter() {
if self.state.terrain().get_key(*key).is_none() {
if !skip_mode && !self.pending_chunks.contains_key(key) {
if self.pending_chunks.len() < 4 {
const TOTAL_PENDING_CHUNKS_LIMIT: usize = 12;
const CURRENT_TICK_PENDING_CHUNKS_LIMIT: usize = 2;
if self.pending_chunks.len() < TOTAL_PENDING_CHUNKS_LIMIT
&& current_tick_send_chunk_requests
< CURRENT_TICK_PENDING_CHUNKS_LIMIT
{
self.send_msg_err(ClientGeneral::TerrainChunkRequest {
key: *key,
})?;
current_tick_send_chunk_requests += 1;
self.pending_chunks.insert(*key, Instant::now());
} else {
skip_mode = true;

View File

@ -0,0 +1,26 @@
use crate::client::PreparedMsg;
use specs::Entity;
use vek::Vec2;
/// Sending a chunk to the user works the following way:
/// A system like `msg::terrain` `terrain` or `terrain_sync` either decide to
/// trigger chunk generation, or if the chunk already exists
/// push a `ChunkSendQueue` to the eventbus.
/// The `chunk_serialize` system will coordinate serializing via a SlowJob
/// outside of the tick. On the next tick, the `chunk_send` system will pick up
/// finished chunks.
///
/// Deferring allows us to remove code duplication and maybe serialize ONCE,
/// send to MULTIPLE clients
/// TODO: store a urgent flag and seperate even more, 5 ticks vs 5 seconds
#[derive(Debug, PartialEq)]
pub struct ChunkSendEntry {
pub(crate) entity: Entity,
pub(crate) chunk_key: Vec2<i32>,
}
pub struct SerializedChunk {
pub(crate) lossy_compression: bool,
pub(crate) msg: PreparedMsg,
pub(crate) recipients: Vec<Entity>,
}

View File

@ -216,6 +216,23 @@ impl Client {
}
}
pub(crate) fn terrain_params(&self) -> StreamParams { self.terrain_stream_params.clone() }
/// Only used for Serialize Chunks in a SlowJob.
/// TODO: find a more elegant version for this invariant
pub(crate) fn prepare_chunk_update_msg(
terrain_chunk_update: ServerGeneral,
params: &StreamParams,
) -> PreparedMsg {
if !matches!(
terrain_chunk_update,
ServerGeneral::TerrainChunkUpdate { .. }
) {
unreachable!("You must not call this function without a terrain chunk update!")
}
PreparedMsg::new(5, &terrain_chunk_update, params)
}
pub(crate) fn recv<M: DeserializeOwned>(
&self,
stream_id: u8,

View File

@ -16,6 +16,7 @@
pub mod alias_validator;
mod character_creator;
pub mod chunk_generator;
mod chunk_serialize;
pub mod client;
pub mod cmd;
pub mod connection_handler;
@ -246,6 +247,9 @@ impl Server {
});
state.ecs_mut().insert(EventBus::<ServerEvent>::default());
state.ecs_mut().insert(Vec::<ChunkRequest>::new());
state
.ecs_mut()
.insert(EventBus::<chunk_serialize::ChunkSendEntry>::default());
state.ecs_mut().insert(Locations::default());
state.ecs_mut().insert(LoginProvider::new(
settings.auth_server_address.clone(),
@ -282,13 +286,20 @@ impl Server {
compiled with the feature. Terrain modifications will *not* be persisted."
);
}
state
.ecs_mut()
.write_resource::<SlowJobPool>()
.configure("CHUNK_GENERATOR", |n| n / 2 + n / 4);
{
let pool = state.ecs_mut().write_resource::<SlowJobPool>();
pool.configure("CHUNK_GENERATOR", |n| n / 2 + n / 4);
pool.configure("CHUNK_SERIALIZER", |n| n / 2);
}
state
.ecs_mut()
.insert(ChunkGenerator::new(chunk_gen_metrics));
{
let (sender, receiver) =
crossbeam_channel::bounded::<chunk_serialize::SerializedChunk>(10_000);
state.ecs_mut().insert(sender);
state.ecs_mut().insert(receiver);
}
state.ecs_mut().insert(CharacterUpdater::new(
Arc::<RwLock<DatabaseSettings>>::clone(&database_settings),

View File

@ -36,6 +36,8 @@ pub struct NetworkRequestMetrics {
pub chunks_generation_triggered: IntCounter,
pub chunks_served_lossy: IntCounter,
pub chunks_served_lossless: IntCounter,
pub chunks_serialisation_requests: IntCounter,
pub chunks_distinct_serialisation_requests: IntCounter,
}
pub struct ChunkGenMetrics {
@ -202,12 +204,22 @@ impl NetworkRequestMetrics {
"chunks_served_lossless",
"number of chunks that were sent with lossless compression requested",
))?;
let chunks_serialisation_requests = IntCounter::with_opts(Opts::new(
"chunks_serialisation_requests",
"number of requests for the sys chunk_serialisation",
))?;
let chunks_distinct_serialisation_requests = IntCounter::with_opts(Opts::new(
"chunks_distinct_serialisation_requests",
"number of distinct chunks in requests for the sys chunk_serialisation",
))?;
registry.register(Box::new(chunks_request_dropped.clone()))?;
registry.register(Box::new(chunks_served_from_memory.clone()))?;
registry.register(Box::new(chunks_generation_triggered.clone()))?;
registry.register(Box::new(chunks_served_lossy.clone()))?;
registry.register(Box::new(chunks_served_lossless.clone()))?;
registry.register(Box::new(chunks_serialisation_requests.clone()))?;
registry.register(Box::new(chunks_distinct_serialisation_requests.clone()))?;
Ok(Self {
chunks_request_dropped,
@ -215,6 +227,8 @@ impl NetworkRequestMetrics {
chunks_generation_triggered,
chunks_served_lossy,
chunks_served_lossless,
chunks_serialisation_requests,
chunks_distinct_serialisation_requests,
})
}
}

View File

@ -0,0 +1,41 @@
use crate::{chunk_serialize::SerializedChunk, client::Client, metrics::NetworkRequestMetrics};
use common_ecs::{Job, Origin, Phase, System};
use specs::{ReadExpect, ReadStorage};
/// This system will handle sending terrain to clients by
/// collecting chunks that need to be send for a single generation run and then
/// trigger a SlowJob for serialisation.
#[derive(Default)]
pub struct Sys;
impl<'a> System<'a> for Sys {
type SystemData = (
ReadStorage<'a, Client>,
ReadExpect<'a, NetworkRequestMetrics>,
ReadExpect<'a, crossbeam_channel::Receiver<SerializedChunk>>,
);
const NAME: &'static str = "chunk_send";
const ORIGIN: Origin = Origin::Server;
const PHASE: Phase = Phase::Create;
fn run(_job: &mut Job<Self>, (clients, network_metrics, chunk_receiver): Self::SystemData) {
let mut lossy = 0u64;
let mut lossless = 0u64;
for sc in chunk_receiver.try_iter() {
for recipient in sc.recipients {
if let Some(client) = clients.get(recipient) {
if client.send_prepared(&sc.msg).is_err() {
if sc.lossy_compression {
lossy += 1;
} else {
lossless += 1;
}
}
}
}
}
network_metrics.chunks_served_lossy.inc_by(lossy);
network_metrics.chunks_served_lossless.inc_by(lossless);
}
}

View File

@ -0,0 +1,146 @@
use crate::{
chunk_serialize::{ChunkSendEntry, SerializedChunk},
client::Client,
metrics::NetworkRequestMetrics,
presence::Presence,
Tick,
};
use common::{event::EventBus, slowjob::SlowJobPool, terrain::TerrainGrid};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{SerializedTerrainChunk, ServerGeneral};
use hashbrown::{hash_map::Entry, HashMap};
use network::StreamParams;
use specs::{Entity, Read, ReadExpect, ReadStorage};
use std::sync::Arc;
/// This system will handle sending terrain to clients by
/// collecting chunks that need to be send for a single generation run and then
/// trigger a SlowJob for serialisation.
#[derive(Default)]
pub struct Sys;
impl<'a> System<'a> for Sys {
type SystemData = (
Read<'a, Tick>,
ReadStorage<'a, Client>,
ReadStorage<'a, Presence>,
ReadExpect<'a, EventBus<ChunkSendEntry>>,
ReadExpect<'a, NetworkRequestMetrics>,
ReadExpect<'a, SlowJobPool>,
ReadExpect<'a, TerrainGrid>,
ReadExpect<'a, crossbeam_channel::Sender<SerializedChunk>>,
);
const NAME: &'static str = "chunk_serialize";
const ORIGIN: Origin = Origin::Server;
const PHASE: Phase = Phase::Create;
fn run(
_job: &mut Job<Self>,
(
tick,
clients,
presences,
chunk_send_queues_bus,
network_metrics,
slow_jobs,
terrain,
chunk_sender,
): Self::SystemData,
) {
// Only operate twice per second
//TODO: move out of this system and now even spawn this.
if tick.0.rem_euclid(15) != 0 {
return;
}
struct Metadata {
recipients: Vec<Entity>,
lossy_compression: bool,
params: StreamParams,
}
// collect all deduped entities that request a chunk
let mut chunks = HashMap::<_, Metadata>::new();
let mut requests = 0u64;
let mut distinct_requests = 0u64;
for queue_entry in chunk_send_queues_bus.recv_all() {
let entry = chunks.entry(queue_entry.chunk_key);
let meta = match entry {
Entry::Vacant(ve) => {
match clients.get(queue_entry.entity).map(|c| c.terrain_params()) {
Some(params) => {
distinct_requests += 1;
ve.insert(Metadata {
recipients: Vec::new(),
lossy_compression: true,
params,
})
},
None => continue,
}
},
Entry::Occupied(oe) => oe.into_mut(),
};
// We decide here, to ONLY send lossy compressed data If all clients want those.
// If at least 1 client here does not want lossy we don't compress it twice.
// It would just be too expensive for the server
meta.lossy_compression = meta.lossy_compression
&& presences
.get(queue_entry.entity)
.map(|p| p.lossy_terrain_compression)
.unwrap_or(true);
meta.recipients.push(queue_entry.entity);
requests += 1;
}
network_metrics
.chunks_serialisation_requests
.inc_by(requests);
network_metrics
.chunks_distinct_serialisation_requests
.inc_by(distinct_requests);
// Trigger serialization in a SlowJob
const CHUNK_SIZE: usize = 10; // trigger one job per 10 chunks to reduce SlowJob overhead. as we use a channel, there is no disadvantage to this
let mut chunks_iter = chunks
.into_iter()
.filter_map(|(chunk_key, meta)| {
terrain
.get_key_arc(chunk_key)
.map(|chunk| (Arc::clone(chunk), chunk_key, meta))
})
.into_iter()
.peekable();
while chunks_iter.peek().is_some() {
let chunks: Vec<_> = chunks_iter.by_ref().take(CHUNK_SIZE).collect();
let chunk_sender = chunk_sender.clone();
slow_jobs.spawn("CHUNK_SERIALIZER", move || {
for (chunk, chunk_key, mut meta) in chunks {
let msg = Client::prepare_chunk_update_msg(
ServerGeneral::TerrainChunkUpdate {
key: chunk_key,
chunk: Ok(SerializedTerrainChunk::via_heuristic(
&chunk,
meta.lossy_compression,
)),
},
&meta.params,
);
meta.recipients.sort_unstable();
meta.recipients.dedup();
if let Err(e) = chunk_sender.send(SerializedChunk {
lossy_compression: meta.lossy_compression,
msg,
recipients: meta.recipients,
}) {
tracing::warn!(?e, "cannot send serialized chunk to sender");
break;
};
}
});
}
}
}

View File

@ -1,4 +1,6 @@
pub mod agent;
pub mod chunk_send;
pub mod chunk_serialize;
pub mod entity_sync;
pub mod invite_timeout;
pub mod metrics;
@ -33,6 +35,10 @@ pub fn add_server_systems(dispatch_builder: &mut DispatcherBuilder) {
dispatch::<persistence::Sys>(dispatch_builder, &[]);
dispatch::<object::Sys>(dispatch_builder, &[]);
dispatch::<wiring::Sys>(dispatch_builder, &[]);
// no dependency, as we only work once per sec anyway.
dispatch::<chunk_serialize::Sys>(dispatch_builder, &[]);
// don't depend on chunk_serialize, as we assume everything is done in a SlowJow
dispatch::<chunk_send::Sys>(dispatch_builder, &[]);
}
pub fn run_sync_systems(ecs: &mut specs::World) {

View File

@ -1,5 +1,6 @@
use crate::{
client::Client, lod::Lod, metrics::NetworkRequestMetrics, presence::Presence, ChunkRequest,
chunk_serialize::ChunkSendEntry, client::Client, lod::Lod, metrics::NetworkRequestMetrics,
presence::Presence, ChunkRequest,
};
use common::{
comp::Pos,
@ -9,7 +10,7 @@ use common::{
vol::RectVolSize,
};
use common_ecs::{Job, Origin, ParMode, Phase, System};
use common_net::msg::{ClientGeneral, SerializedTerrainChunk, ServerGeneral};
use common_net::msg::{ClientGeneral, ServerGeneral};
use rayon::iter::ParallelIterator;
use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write};
use tracing::{debug, trace};
@ -21,6 +22,7 @@ impl<'a> System<'a> for Sys {
type SystemData = (
Entities<'a>,
Read<'a, EventBus<ServerEvent>>,
ReadExpect<'a, EventBus<ChunkSendEntry>>,
ReadExpect<'a, TerrainGrid>,
ReadExpect<'a, Lod>,
ReadExpect<'a, NetworkRequestMetrics>,
@ -39,6 +41,7 @@ impl<'a> System<'a> for Sys {
(
entities,
server_event_bus,
chunk_send_bus,
terrain,
lod,
network_metrics,
@ -51,102 +54,94 @@ impl<'a> System<'a> for Sys {
job.cpu_stats.measure(ParMode::Rayon);
let mut new_chunk_requests = (&entities, &clients, (&presences).maybe())
.par_join()
.map(|(entity, client, maybe_presence)| {
let mut chunk_requests = Vec::new();
let _ = super::try_recv_all(client, 5, |client, msg| {
// TODO: Refactor things (https://gitlab.com/veloren/veloren/-/merge_requests/3245#note_856538056)
let mut server_emitter = server_event_bus.emitter();
let presence = match maybe_presence {
Some(g) => g,
None => {
debug!(?entity, "client is not in_game, ignoring msg");
trace!(?msg, "ignored msg content");
if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) {
network_metrics.chunks_request_dropped.inc();
}
return Ok(());
},
};
match msg {
ClientGeneral::TerrainChunkRequest { key } => {
let in_vd = if let Some(pos) = positions.get(entity) {
pos.0.xy().map(|e| e as f64).distance_squared(
key.map(|e| e as f64 + 0.5)
* TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
) < ((presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt())
* TerrainChunkSize::RECT_SIZE.x as f64)
.powi(2)
} else {
true
};
if in_vd {
match terrain.get_key_arc(key) {
Some(chunk) => {
.map_init(
|| (chunk_send_bus.emitter(), server_event_bus.emitter()),
|(chunk_send_emitter, server_emitter), (entity, client, maybe_presence)| {
let mut chunk_requests = Vec::new();
let _ = super::try_recv_all(client, 5, |_, msg| {
let presence = match maybe_presence {
Some(g) => g,
None => {
debug!(?entity, "client is not in_game, ignoring msg");
trace!(?msg, "ignored msg content");
if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) {
network_metrics.chunks_request_dropped.inc();
}
return Ok(());
},
};
match msg {
ClientGeneral::TerrainChunkRequest { key } => {
let in_vd = if let Some(pos) = positions.get(entity) {
pos.0.xy().map(|e| e as f64).distance_squared(
key.map(|e| e as f64 + 0.5)
* TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
) < ((presence.view_distance as f64 - 1.0
+ 2.5 * 2.0_f64.sqrt())
* TerrainChunkSize::RECT_SIZE.x as f64)
.powi(2)
} else {
true
};
if in_vd {
if terrain.get_key_arc(key).is_some() {
network_metrics.chunks_served_from_memory.inc();
client.send(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(SerializedTerrainChunk::via_heuristic(
chunk,
presence.lossy_terrain_compression,
)),
})?;
if presence.lossy_terrain_compression {
network_metrics.chunks_served_lossy.inc();
} else {
network_metrics.chunks_served_lossless.inc();
}
},
None => {
chunk_send_emitter.emit(ChunkSendEntry {
chunk_key: key,
entity,
});
} else {
network_metrics.chunks_generation_triggered.inc();
chunk_requests.push(ChunkRequest { entity, key });
},
}
} else {
network_metrics.chunks_request_dropped.inc();
}
} else {
network_metrics.chunks_request_dropped.inc();
}
},
ClientGeneral::LodZoneRequest { key } => {
client.send(ServerGeneral::LodZoneUpdate {
key,
zone: lod.zone(key).clone(),
})?;
},
_ => {
debug!(
"Kicking possibly misbehaving client due to invalud terrain \
request"
);
server_emitter.emit(ServerEvent::ClientDisconnect(
entity,
common::comp::DisconnectReason::NetworkError,
));
},
}
Ok(())
});
},
ClientGeneral::LodZoneRequest { key } => {
client.send(ServerGeneral::LodZoneUpdate {
key,
zone: lod.zone(key).clone(),
})?;
},
_ => {
debug!(
"Kicking possibly misbehaving client due to invalud terrain \
request"
);
server_emitter.emit(ServerEvent::ClientDisconnect(
entity,
common::comp::DisconnectReason::NetworkError,
));
},
}
Ok(())
});
// Load a minimum radius of chunks around each player.
// This is used to prevent view distance reloading exploits and make sure that
// entity simulation occurs within a minimum radius around the
// player.
if let Some(pos) = positions.get(entity) {
let player_chunk = pos
.0
.xy()
.map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32);
for rpos in Spiral2d::new().take((crate::MIN_VD as usize + 1).pow(2)) {
let key = player_chunk + rpos;
if terrain.get_key(key).is_none() {
// TODO: @zesterer do we want to be sending these chunk to the client
// even if they aren't requested? If we don't we could replace the
// entity here with Option<Entity> and pass in None.
chunk_requests.push(ChunkRequest { entity, key });
// Load a minimum radius of chunks around each player.
// This is used to prevent view distance reloading exploits and make sure that
// entity simulation occurs within a minimum radius around the
// player.
if let Some(pos) = positions.get(entity) {
let player_chunk = pos
.0
.xy()
.map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32);
for rpos in Spiral2d::new().take((crate::MIN_VD as usize + 1).pow(2)) {
let key = player_chunk + rpos;
if terrain.get_key(key).is_none() {
// TODO: @zesterer do we want to be sending these chunk to the
// client even if they aren't
// requested? If we don't we could replace the
// entity here with Option<Entity> and pass in None.
chunk_requests.push(ChunkRequest { entity, key });
}
}
}
}
chunk_requests
})
chunk_requests
},
)
.flatten()
.collect::<Vec<_>>();

View File

@ -7,8 +7,8 @@ use world::{IndexOwned, World};
use crate::{
chunk_generator::ChunkGenerator,
chunk_serialize::ChunkSendEntry,
client::Client,
metrics::NetworkRequestMetrics,
presence::{Presence, RepositionOnChunkLoad},
rtsim::RtSim,
settings::Settings,
@ -29,7 +29,7 @@ use common::{
};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{SerializedTerrainChunk, ServerGeneral};
use common_net::msg::ServerGeneral;
use common_state::TerrainChanges;
use comp::Behavior;
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, Write, WriteExpect, WriteStorage};
@ -41,62 +41,8 @@ pub type TerrainPersistenceData<'a> = Option<Write<'a, TerrainPersistence>>;
#[cfg(not(feature = "persistent_world"))]
pub type TerrainPersistenceData<'a> = ();
pub(crate) struct LazyTerrainMessage {
lazy_msg_lo: Option<crate::client::PreparedMsg>,
lazy_msg_hi: Option<crate::client::PreparedMsg>,
}
pub const SAFE_ZONE_RADIUS: f32 = 200.0;
impl LazyTerrainMessage {
pub(crate) fn new() -> Self {
Self {
lazy_msg_lo: None,
lazy_msg_hi: None,
}
}
pub(crate) fn prepare_and_send<
'a,
A,
F: FnOnce() -> Result<&'a common::terrain::TerrainChunk, A>,
>(
&mut self,
network_metrics: &NetworkRequestMetrics,
client: &Client,
presence: &Presence,
chunk_key: &vek::Vec2<i32>,
generate_chunk: F,
) -> Result<(), A> {
let lazy_msg = if presence.lossy_terrain_compression {
&mut self.lazy_msg_lo
} else {
&mut self.lazy_msg_hi
};
if lazy_msg.is_none() {
*lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate {
key: *chunk_key,
chunk: Ok(match generate_chunk() {
Ok(chunk) => SerializedTerrainChunk::via_heuristic(
chunk,
presence.lossy_terrain_compression,
),
Err(e) => return Err(e),
}),
}));
}
lazy_msg.as_ref().map(|msg| {
let _ = client.send_prepared(msg);
if presence.lossy_terrain_compression {
network_metrics.chunks_served_lossy.inc();
} else {
network_metrics.chunks_served_lossless.inc();
}
});
Ok(())
}
}
/// This system will handle loading generated chunks and unloading
/// unneeded chunks.
/// 1. Inserts newly generated chunks into the TerrainGrid
@ -117,7 +63,7 @@ impl<'a> System<'a> for Sys {
ReadExpect<'a, SlowJobPool>,
ReadExpect<'a, IndexOwned>,
ReadExpect<'a, Arc<World>>,
ReadExpect<'a, NetworkRequestMetrics>,
ReadExpect<'a, EventBus<ChunkSendEntry>>,
WriteExpect<'a, ChunkGenerator>,
WriteExpect<'a, TerrainGrid>,
Write<'a, TerrainChanges>,
@ -150,7 +96,7 @@ impl<'a> System<'a> for Sys {
slow_jobs,
index,
world,
network_metrics,
chunk_send_bus,
mut chunk_generator,
mut terrain,
mut terrain_changes,
@ -307,33 +253,29 @@ impl<'a> System<'a> for Sys {
// Send the chunk to all nearby players.
use rayon::iter::{IntoParallelIterator, ParallelIterator};
new_chunks.into_par_iter().for_each(|(key, chunk)| {
let mut lazy_msg = LazyTerrainMessage::new();
new_chunks.into_par_iter().for_each_init(
|| chunk_send_bus.emitter(),
|chunk_send_emitter, (key, _chunk)| {
(&entities, &presences, &positions, &clients)
.join()
.for_each(|(entity, presence, pos, _client)| {
let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32));
// Subtract 2 from the offset before computing squared magnitude
// 1 since chunks need neighbors to be meshed
// 1 to act as a buffer if the player moves in that direction
let adjusted_dist_sqr = (chunk_pos - key)
.map(|e: i32| (e.unsigned_abs()).saturating_sub(2))
.magnitude_squared();
(&presences, &positions, &clients)
.join()
.for_each(|(presence, pos, client)| {
let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32));
// Subtract 2 from the offset before computing squared magnitude
// 1 since chunks need neighbors to be meshed
// 1 to act as a buffer if the player moves in that direction
let adjusted_dist_sqr = (chunk_pos - key)
.map(|e: i32| (e.unsigned_abs()).saturating_sub(2))
.magnitude_squared();
if adjusted_dist_sqr <= presence.view_distance.pow(2) {
lazy_msg
.prepare_and_send::<!, _>(
&network_metrics,
client,
presence,
&key,
|| Ok(&*chunk),
)
.into_ok();
}
});
});
if adjusted_dist_sqr <= presence.view_distance.pow(2) {
chunk_send_emitter.emit(ChunkSendEntry {
entity,
chunk_key: key,
});
}
});
},
);
// Remove chunks that are too far from players.
let mut chunks_to_remove = Vec::new();

View File

@ -1,10 +1,9 @@
use super::terrain::LazyTerrainMessage;
use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence};
use common::{comp::Pos, terrain::TerrainGrid};
use crate::{chunk_serialize::ChunkSendEntry, client::Client, presence::Presence};
use common::{comp::Pos, event::EventBus, terrain::TerrainGrid};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{CompressedData, ServerGeneral};
use common_state::TerrainChanges;
use specs::{Join, Read, ReadExpect, ReadStorage};
use specs::{Entities, Join, Read, ReadExpect, ReadStorage};
/// This systems sends new chunks to clients as well as changes to existing
/// chunks
@ -12,12 +11,13 @@ use specs::{Join, Read, ReadExpect, ReadStorage};
pub struct Sys;
impl<'a> System<'a> for Sys {
type SystemData = (
Entities<'a>,
ReadExpect<'a, TerrainGrid>,
Read<'a, TerrainChanges>,
ReadExpect<'a, EventBus<ChunkSendEntry>>,
ReadStorage<'a, Pos>,
ReadStorage<'a, Presence>,
ReadStorage<'a, Client>,
ReadExpect<'a, NetworkRequestMetrics>,
);
const NAME: &'static str = "terrain_sync";
@ -26,23 +26,19 @@ impl<'a> System<'a> for Sys {
fn run(
_job: &mut Job<Self>,
(terrain, terrain_changes, positions, presences, clients, network_metrics): Self::SystemData,
(entities, terrain, terrain_changes, chunk_send_bus, positions, presences, clients): Self::SystemData,
) {
let mut chunk_send_emitter = chunk_send_bus.emitter();
// Sync changed chunks
'chunk: for chunk_key in &terrain_changes.modified_chunks {
let mut lazy_msg = LazyTerrainMessage::new();
for (presence, pos, client) in (&presences, &positions, &clients).join() {
for chunk_key in &terrain_changes.modified_chunks {
for (entity, presence, pos) in (&entities, &presences, &positions).join() {
if super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance)
{
if let Err(()) = lazy_msg.prepare_and_send(
&network_metrics,
client,
presence,
chunk_key,
|| terrain.get_key(*chunk_key).ok_or(()),
) {
break 'chunk;
}
chunk_send_emitter.emit(ChunkSendEntry {
entity,
chunk_key: *chunk_key,
});
}
}
}