mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
first implementation of defering serialisation
This commit is contained in:
parent
07239c3056
commit
6c756c2440
19
server/src/chunk_serialize.rs
Normal file
19
server/src/chunk_serialize.rs
Normal file
@ -0,0 +1,19 @@
|
||||
use specs::Component;
|
||||
use specs_idvs::IdvStorage;
|
||||
use vek::Vec2;
|
||||
|
||||
/// Curing the runtime of a tick, multiple systems can request a chunk to be
|
||||
/// synced to a client E.g. msg::terrain will do so, when a client requested a
|
||||
/// chunk that already exist terrain will do so when a chunk came back from
|
||||
/// ChunkGeneration. All those sends are deferred by this queue.
|
||||
/// Deferring allows us to remove code duplication and maybe serialize ONCE,
|
||||
/// send to MULTIPLE clients TODO: store a urgent flag and seperate even more, 5
|
||||
/// ticks vs 5 seconds
|
||||
#[derive(Default, Clone, Debug, PartialEq)]
|
||||
pub struct ChunkSendQueue {
|
||||
pub chunks: Vec<Vec2<i32>>,
|
||||
}
|
||||
|
||||
impl Component for ChunkSendQueue {
|
||||
type Storage = IdvStorage<Self>;
|
||||
}
|
@ -16,6 +16,7 @@
|
||||
pub mod alias_validator;
|
||||
mod character_creator;
|
||||
pub mod chunk_generator;
|
||||
mod chunk_serialize;
|
||||
pub mod client;
|
||||
pub mod cmd;
|
||||
pub mod connection_handler;
|
||||
@ -321,6 +322,9 @@ impl Server {
|
||||
state.ecs_mut().register::<comp::Pet>();
|
||||
state.ecs_mut().register::<login_provider::PendingLogin>();
|
||||
state.ecs_mut().register::<RepositionOnChunkLoad>();
|
||||
state
|
||||
.ecs_mut()
|
||||
.register::<chunk_serialize::ChunkSendQueue>();
|
||||
|
||||
//Alias validator
|
||||
let banned_words_paths = &settings.banned_words_files;
|
||||
|
161
server/src/sys/chunk_serialize.rs
Normal file
161
server/src/sys/chunk_serialize.rs
Normal file
@ -0,0 +1,161 @@
|
||||
use crate::{
|
||||
chunk_serialize::ChunkSendQueue, client::Client, metrics::NetworkRequestMetrics,
|
||||
presence::Presence, Tick,
|
||||
};
|
||||
use common::{slowjob::SlowJobPool, terrain::TerrainGrid};
|
||||
|
||||
use common_ecs::{Job, Origin, Phase, System};
|
||||
use common_net::msg::{SerializedTerrainChunk, ServerGeneral};
|
||||
use hashbrown::HashMap;
|
||||
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, WriteStorage};
|
||||
use std::cmp::Ordering;
|
||||
|
||||
pub(crate) struct LazyTerrainMessage {
|
||||
lazy_msg_lo: Option<crate::client::PreparedMsg>,
|
||||
lazy_msg_hi: Option<crate::client::PreparedMsg>,
|
||||
}
|
||||
|
||||
pub const SAFE_ZONE_RADIUS: f32 = 200.0;
|
||||
|
||||
impl LazyTerrainMessage {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
lazy_msg_lo: None,
|
||||
lazy_msg_hi: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn prepare_and_send<
|
||||
'a,
|
||||
A,
|
||||
F: FnOnce() -> Result<&'a common::terrain::TerrainChunk, A>,
|
||||
>(
|
||||
&mut self,
|
||||
network_metrics: &NetworkRequestMetrics,
|
||||
client: &Client,
|
||||
presence: &Presence,
|
||||
chunk_key: &vek::Vec2<i32>,
|
||||
generate_chunk: F,
|
||||
) -> Result<(), A> {
|
||||
let lazy_msg = if presence.lossy_terrain_compression {
|
||||
&mut self.lazy_msg_lo
|
||||
} else {
|
||||
&mut self.lazy_msg_hi
|
||||
};
|
||||
if lazy_msg.is_none() {
|
||||
*lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate {
|
||||
key: *chunk_key,
|
||||
chunk: Ok(match generate_chunk() {
|
||||
Ok(chunk) => SerializedTerrainChunk::via_heuristic(
|
||||
chunk,
|
||||
presence.lossy_terrain_compression,
|
||||
),
|
||||
Err(e) => return Err(e),
|
||||
}),
|
||||
}));
|
||||
}
|
||||
lazy_msg.as_ref().map(|msg| {
|
||||
let _ = client.send_prepared(msg);
|
||||
if presence.lossy_terrain_compression {
|
||||
network_metrics.chunks_served_lossy.inc();
|
||||
} else {
|
||||
network_metrics.chunks_served_lossless.inc();
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// This system will handle sending terrain to clients by
|
||||
/// collecting chunks that need to be send for a single generation run and then
|
||||
/// trigger a SlowJob for serialisation.
|
||||
#[derive(Default)]
|
||||
pub struct Sys;
|
||||
impl<'a> System<'a> for Sys {
|
||||
type SystemData = (
|
||||
Read<'a, Tick>,
|
||||
Entities<'a>,
|
||||
ReadStorage<'a, Client>,
|
||||
ReadStorage<'a, Presence>,
|
||||
WriteStorage<'a, ChunkSendQueue>,
|
||||
ReadExpect<'a, SlowJobPool>,
|
||||
ReadExpect<'a, TerrainGrid>,
|
||||
ReadExpect<'a, NetworkRequestMetrics>,
|
||||
);
|
||||
|
||||
const NAME: &'static str = "chunk_serialize";
|
||||
const ORIGIN: Origin = Origin::Server;
|
||||
const PHASE: Phase = Phase::Create;
|
||||
|
||||
fn run(
|
||||
_job: &mut Job<Self>,
|
||||
(
|
||||
tick,
|
||||
entities,
|
||||
clients,
|
||||
presences,
|
||||
mut chunk_send_queues,
|
||||
slow_jobs,
|
||||
terrain,
|
||||
network_metrics,
|
||||
): Self::SystemData,
|
||||
) {
|
||||
// Only operate once per second
|
||||
if tick.0.rem_euclid(60) != 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut chunks = HashMap::<_, Vec<_>>::new();
|
||||
|
||||
for entity in (&entities, &clients, &presences, !&chunk_send_queues)
|
||||
.join()
|
||||
.map(|(e, _, _, _)| e)
|
||||
.collect::<Vec<_>>()
|
||||
{
|
||||
let _ = chunk_send_queues.insert(entity, ChunkSendQueue::default());
|
||||
}
|
||||
|
||||
// Grab all chunk requests for all clients and sort them
|
||||
for (entity, _client, chunk_send_queue) in
|
||||
(&entities, &clients, &mut chunk_send_queues).join()
|
||||
{
|
||||
let mut chunk_send_queue = std::mem::take(chunk_send_queue);
|
||||
// dedup input
|
||||
chunk_send_queue.chunks.sort_by(|a, b| {
|
||||
let zero = a.x.partial_cmp(&b.x).unwrap_or(Ordering::Equal);
|
||||
let one = a.y.partial_cmp(&b.y).unwrap_or(Ordering::Equal);
|
||||
if matches!(zero, Ordering::Equal) {
|
||||
one
|
||||
} else {
|
||||
zero
|
||||
}
|
||||
});
|
||||
chunk_send_queue.chunks.dedup();
|
||||
for chunk_key in chunk_send_queue.chunks {
|
||||
let recipients = chunks.entry(chunk_key).or_default();
|
||||
recipients.push(entity);
|
||||
}
|
||||
}
|
||||
|
||||
if !chunks.is_empty() {
|
||||
let len = chunks.len();
|
||||
print!("{}", len);
|
||||
for (chunk_key, entities) in chunks {
|
||||
let mut lazy_msg = LazyTerrainMessage::new();
|
||||
for entity in entities {
|
||||
let client = clients.get(entity).unwrap();
|
||||
let presence = presences.get(entity).unwrap();
|
||||
if let Err(e) = lazy_msg.prepare_and_send(
|
||||
&network_metrics,
|
||||
client,
|
||||
presence,
|
||||
&chunk_key,
|
||||
|| terrain.get_key(chunk_key).ok_or(()),
|
||||
) {
|
||||
tracing::error!(?e, "error sending chunk");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,4 +1,5 @@
|
||||
pub mod agent;
|
||||
pub mod chunk_serialize;
|
||||
pub mod entity_sync;
|
||||
pub mod invite_timeout;
|
||||
pub mod metrics;
|
||||
@ -33,6 +34,7 @@ pub fn add_server_systems(dispatch_builder: &mut DispatcherBuilder) {
|
||||
dispatch::<persistence::Sys>(dispatch_builder, &[]);
|
||||
dispatch::<object::Sys>(dispatch_builder, &[]);
|
||||
dispatch::<wiring::Sys>(dispatch_builder, &[]);
|
||||
dispatch::<chunk_serialize::Sys>(dispatch_builder, &[]);
|
||||
}
|
||||
|
||||
pub fn run_sync_systems(ecs: &mut specs::World) {
|
||||
|
@ -1,5 +1,6 @@
|
||||
use crate::{
|
||||
client::Client, lod::Lod, metrics::NetworkRequestMetrics, presence::Presence, ChunkRequest,
|
||||
chunk_serialize::ChunkSendQueue, client::Client, lod::Lod, metrics::NetworkRequestMetrics,
|
||||
presence::Presence, ChunkRequest,
|
||||
};
|
||||
use common::{
|
||||
comp::Pos,
|
||||
@ -9,9 +10,9 @@ use common::{
|
||||
vol::RectVolSize,
|
||||
};
|
||||
use common_ecs::{Job, Origin, ParMode, Phase, System};
|
||||
use common_net::msg::{ClientGeneral, SerializedTerrainChunk, ServerGeneral};
|
||||
use common_net::msg::{ClientGeneral, ServerGeneral};
|
||||
use rayon::iter::ParallelIterator;
|
||||
use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write};
|
||||
use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write, WriteStorage};
|
||||
use tracing::{debug, trace};
|
||||
|
||||
/// This system will handle new messages from clients
|
||||
@ -21,6 +22,7 @@ impl<'a> System<'a> for Sys {
|
||||
type SystemData = (
|
||||
Entities<'a>,
|
||||
Read<'a, EventBus<ServerEvent>>,
|
||||
WriteStorage<'a, ChunkSendQueue>,
|
||||
ReadExpect<'a, TerrainGrid>,
|
||||
ReadExpect<'a, Lod>,
|
||||
ReadExpect<'a, NetworkRequestMetrics>,
|
||||
@ -39,6 +41,7 @@ impl<'a> System<'a> for Sys {
|
||||
(
|
||||
entities,
|
||||
server_event_bus,
|
||||
mut chunk_send_queues,
|
||||
terrain,
|
||||
lod,
|
||||
network_metrics,
|
||||
@ -49,11 +52,16 @@ impl<'a> System<'a> for Sys {
|
||||
): Self::SystemData,
|
||||
) {
|
||||
job.cpu_stats.measure(ParMode::Rayon);
|
||||
let mut new_chunk_requests = (&entities, &clients, (&presences).maybe())
|
||||
let mut new_chunk_requests = (
|
||||
&entities,
|
||||
&clients,
|
||||
(&presences).maybe(),
|
||||
&mut chunk_send_queues,
|
||||
)
|
||||
.par_join()
|
||||
.map(|(entity, client, maybe_presence)| {
|
||||
.map(|(entity, client, maybe_presence, chunk_send_queue)| {
|
||||
let mut chunk_requests = Vec::new();
|
||||
let _ = super::try_recv_all(client, 5, |client, msg| {
|
||||
let _ = super::try_recv_all(client, 5, |_, msg| {
|
||||
// TODO: Refactor things (https://gitlab.com/veloren/veloren/-/merge_requests/3245#note_856538056)
|
||||
let mut server_emitter = server_event_bus.emitter();
|
||||
let presence = match maybe_presence {
|
||||
@ -80,26 +88,12 @@ impl<'a> System<'a> for Sys {
|
||||
true
|
||||
};
|
||||
if in_vd {
|
||||
match terrain.get_key_arc(key) {
|
||||
Some(chunk) => {
|
||||
network_metrics.chunks_served_from_memory.inc();
|
||||
client.send(ServerGeneral::TerrainChunkUpdate {
|
||||
key,
|
||||
chunk: Ok(SerializedTerrainChunk::via_heuristic(
|
||||
chunk,
|
||||
presence.lossy_terrain_compression,
|
||||
)),
|
||||
})?;
|
||||
if presence.lossy_terrain_compression {
|
||||
network_metrics.chunks_served_lossy.inc();
|
||||
} else {
|
||||
network_metrics.chunks_served_lossless.inc();
|
||||
}
|
||||
},
|
||||
None => {
|
||||
network_metrics.chunks_generation_triggered.inc();
|
||||
chunk_requests.push(ChunkRequest { entity, key });
|
||||
},
|
||||
if terrain.get_key_arc(key).is_some() {
|
||||
network_metrics.chunks_served_from_memory.inc();
|
||||
chunk_send_queue.chunks.push(key);
|
||||
} else {
|
||||
network_metrics.chunks_generation_triggered.inc();
|
||||
chunk_requests.push(ChunkRequest { entity, key });
|
||||
}
|
||||
} else {
|
||||
network_metrics.chunks_request_dropped.inc();
|
||||
|
@ -7,8 +7,8 @@ use world::{IndexOwned, World};
|
||||
|
||||
use crate::{
|
||||
chunk_generator::ChunkGenerator,
|
||||
chunk_serialize::ChunkSendQueue,
|
||||
client::Client,
|
||||
metrics::NetworkRequestMetrics,
|
||||
presence::{Presence, RepositionOnChunkLoad},
|
||||
rtsim::RtSim,
|
||||
settings::Settings,
|
||||
@ -29,7 +29,7 @@ use common::{
|
||||
};
|
||||
|
||||
use common_ecs::{Job, Origin, Phase, System};
|
||||
use common_net::msg::{SerializedTerrainChunk, ServerGeneral};
|
||||
use common_net::msg::ServerGeneral;
|
||||
use common_state::TerrainChanges;
|
||||
use comp::Behavior;
|
||||
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, Write, WriteExpect, WriteStorage};
|
||||
@ -41,62 +41,8 @@ pub type TerrainPersistenceData<'a> = Option<Write<'a, TerrainPersistence>>;
|
||||
#[cfg(not(feature = "persistent_world"))]
|
||||
pub type TerrainPersistenceData<'a> = ();
|
||||
|
||||
pub(crate) struct LazyTerrainMessage {
|
||||
lazy_msg_lo: Option<crate::client::PreparedMsg>,
|
||||
lazy_msg_hi: Option<crate::client::PreparedMsg>,
|
||||
}
|
||||
|
||||
pub const SAFE_ZONE_RADIUS: f32 = 200.0;
|
||||
|
||||
impl LazyTerrainMessage {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
lazy_msg_lo: None,
|
||||
lazy_msg_hi: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn prepare_and_send<
|
||||
'a,
|
||||
A,
|
||||
F: FnOnce() -> Result<&'a common::terrain::TerrainChunk, A>,
|
||||
>(
|
||||
&mut self,
|
||||
network_metrics: &NetworkRequestMetrics,
|
||||
client: &Client,
|
||||
presence: &Presence,
|
||||
chunk_key: &vek::Vec2<i32>,
|
||||
generate_chunk: F,
|
||||
) -> Result<(), A> {
|
||||
let lazy_msg = if presence.lossy_terrain_compression {
|
||||
&mut self.lazy_msg_lo
|
||||
} else {
|
||||
&mut self.lazy_msg_hi
|
||||
};
|
||||
if lazy_msg.is_none() {
|
||||
*lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate {
|
||||
key: *chunk_key,
|
||||
chunk: Ok(match generate_chunk() {
|
||||
Ok(chunk) => SerializedTerrainChunk::via_heuristic(
|
||||
chunk,
|
||||
presence.lossy_terrain_compression,
|
||||
),
|
||||
Err(e) => return Err(e),
|
||||
}),
|
||||
}));
|
||||
}
|
||||
lazy_msg.as_ref().map(|msg| {
|
||||
let _ = client.send_prepared(msg);
|
||||
if presence.lossy_terrain_compression {
|
||||
network_metrics.chunks_served_lossy.inc();
|
||||
} else {
|
||||
network_metrics.chunks_served_lossless.inc();
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// This system will handle loading generated chunks and unloading
|
||||
/// unneeded chunks.
|
||||
/// 1. Inserts newly generated chunks into the TerrainGrid
|
||||
@ -117,7 +63,6 @@ impl<'a> System<'a> for Sys {
|
||||
ReadExpect<'a, SlowJobPool>,
|
||||
ReadExpect<'a, IndexOwned>,
|
||||
ReadExpect<'a, Arc<World>>,
|
||||
ReadExpect<'a, NetworkRequestMetrics>,
|
||||
WriteExpect<'a, ChunkGenerator>,
|
||||
WriteExpect<'a, TerrainGrid>,
|
||||
Write<'a, TerrainChanges>,
|
||||
@ -128,6 +73,7 @@ impl<'a> System<'a> for Sys {
|
||||
ReadStorage<'a, Presence>,
|
||||
ReadStorage<'a, Client>,
|
||||
Entities<'a>,
|
||||
WriteStorage<'a, ChunkSendQueue>,
|
||||
WriteStorage<'a, RepositionOnChunkLoad>,
|
||||
WriteStorage<'a, ForceUpdate>,
|
||||
WriteStorage<'a, Waypoint>,
|
||||
@ -150,7 +96,6 @@ impl<'a> System<'a> for Sys {
|
||||
slow_jobs,
|
||||
index,
|
||||
world,
|
||||
network_metrics,
|
||||
mut chunk_generator,
|
||||
mut terrain,
|
||||
mut terrain_changes,
|
||||
@ -161,6 +106,7 @@ impl<'a> System<'a> for Sys {
|
||||
presences,
|
||||
clients,
|
||||
entities,
|
||||
mut chunk_send_queues,
|
||||
mut reposition_on_load,
|
||||
mut force_update,
|
||||
mut waypoints,
|
||||
@ -306,13 +252,10 @@ impl<'a> System<'a> for Sys {
|
||||
}
|
||||
|
||||
// Send the chunk to all nearby players.
|
||||
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
||||
new_chunks.into_par_iter().for_each(|(key, chunk)| {
|
||||
let mut lazy_msg = LazyTerrainMessage::new();
|
||||
|
||||
(&presences, &positions, &clients)
|
||||
new_chunks.into_iter().for_each(|(key, chunk)| {
|
||||
(&presences, &positions, &clients, &mut chunk_send_queues)
|
||||
.join()
|
||||
.for_each(|(presence, pos, client)| {
|
||||
.for_each(|(presence, pos, client, chunk_send_queue)| {
|
||||
let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32));
|
||||
// Subtract 2 from the offset before computing squared magnitude
|
||||
// 1 since chunks need neighbors to be meshed
|
||||
@ -322,15 +265,7 @@ impl<'a> System<'a> for Sys {
|
||||
.magnitude_squared();
|
||||
|
||||
if adjusted_dist_sqr <= presence.view_distance.pow(2) {
|
||||
lazy_msg
|
||||
.prepare_and_send::<!, _>(
|
||||
&network_metrics,
|
||||
client,
|
||||
presence,
|
||||
&key,
|
||||
|| Ok(&*chunk),
|
||||
)
|
||||
.into_ok();
|
||||
chunk_send_queue.chunks.push(key);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
@ -1,10 +1,9 @@
|
||||
use super::terrain::LazyTerrainMessage;
|
||||
use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence};
|
||||
use crate::{chunk_serialize::ChunkSendQueue, client::Client, presence::Presence};
|
||||
use common::{comp::Pos, terrain::TerrainGrid};
|
||||
use common_ecs::{Job, Origin, Phase, System};
|
||||
use common_net::msg::{CompressedData, ServerGeneral};
|
||||
use common_state::TerrainChanges;
|
||||
use specs::{Join, Read, ReadExpect, ReadStorage};
|
||||
use specs::{Join, Read, ReadExpect, ReadStorage, WriteStorage};
|
||||
|
||||
/// This systems sends new chunks to clients as well as changes to existing
|
||||
/// chunks
|
||||
@ -14,10 +13,10 @@ impl<'a> System<'a> for Sys {
|
||||
type SystemData = (
|
||||
ReadExpect<'a, TerrainGrid>,
|
||||
Read<'a, TerrainChanges>,
|
||||
WriteStorage<'a, ChunkSendQueue>,
|
||||
ReadStorage<'a, Pos>,
|
||||
ReadStorage<'a, Presence>,
|
||||
ReadStorage<'a, Client>,
|
||||
ReadExpect<'a, NetworkRequestMetrics>,
|
||||
);
|
||||
|
||||
const NAME: &'static str = "terrain_sync";
|
||||
@ -26,23 +25,16 @@ impl<'a> System<'a> for Sys {
|
||||
|
||||
fn run(
|
||||
_job: &mut Job<Self>,
|
||||
(terrain, terrain_changes, positions, presences, clients, network_metrics): Self::SystemData,
|
||||
(terrain, terrain_changes, mut chunk_send_queues, positions, presences, clients): Self::SystemData,
|
||||
) {
|
||||
// Sync changed chunks
|
||||
'chunk: for chunk_key in &terrain_changes.modified_chunks {
|
||||
let mut lazy_msg = LazyTerrainMessage::new();
|
||||
for (presence, pos, client) in (&presences, &positions, &clients).join() {
|
||||
for chunk_key in &terrain_changes.modified_chunks {
|
||||
for (presence, pos, chunk_send_queue) in
|
||||
(&presences, &positions, &mut chunk_send_queues).join()
|
||||
{
|
||||
if super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance)
|
||||
{
|
||||
if let Err(()) = lazy_msg.prepare_and_send(
|
||||
&network_metrics,
|
||||
client,
|
||||
presence,
|
||||
chunk_key,
|
||||
|| terrain.get_key(*chunk_key).ok_or(()),
|
||||
) {
|
||||
break 'chunk;
|
||||
}
|
||||
chunk_send_queue.chunks.push(*chunk_key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user