switch to a system where chunk_serialize will start a SlowJow that is then consumed by chunk_send

This commit is contained in:
Marcel Märtens 2022-05-10 09:11:11 +02:00
parent 6c756c2440
commit 9b53693783
7 changed files with 132 additions and 95 deletions

View File

@ -1,4 +1,5 @@
use specs::Component;
use crate::client::PreparedMsg;
use specs::{Component, Entity};
use specs_idvs::IdvStorage;
use vek::Vec2;
@ -17,3 +18,9 @@ pub struct ChunkSendQueue {
impl Component for ChunkSendQueue {
type Storage = IdvStorage<Self>;
}
pub struct SerializedChunk {
pub(crate) lossy_compression: bool,
pub(crate) msg: PreparedMsg,
pub(crate) recipients: Vec<Entity>,
}

View File

@ -216,6 +216,21 @@ impl Client {
}
}
pub(crate) fn terrain_params(&self) -> StreamParams { self.terrain_stream_params.clone() }
pub(crate) fn prepare_terrain(
terrain_chunk_update: ServerGeneral,
params: &StreamParams,
) -> PreparedMsg {
if !matches!(
terrain_chunk_update,
ServerGeneral::TerrainChunkUpdate { .. }
) {
unreachable!("You must not call this function without a terrain chunk update!")
}
PreparedMsg::new(5, &terrain_chunk_update, params)
}
pub(crate) fn recv<M: DeserializeOwned>(
&self,
stream_id: u8,

View File

@ -283,13 +283,20 @@ impl Server {
compiled with the feature. Terrain modifications will *not* be persisted."
);
}
state
.ecs_mut()
.write_resource::<SlowJobPool>()
.configure("CHUNK_GENERATOR", |n| n / 2 + n / 4);
{
let pool = state.ecs_mut().write_resource::<SlowJobPool>();
pool.configure("CHUNK_GENERATOR", |n| n / 2 + n / 4);
pool.configure("CHUNK_SERIALIZER", |n| n / 2);
}
state
.ecs_mut()
.insert(ChunkGenerator::new(chunk_gen_metrics));
{
let (sender, receiver) =
crossbeam_channel::bounded::<chunk_serialize::SerializedChunk>(10_000);
state.ecs_mut().insert(sender);
state.ecs_mut().insert(receiver);
}
state.ecs_mut().insert(CharacterUpdater::new(
Arc::<RwLock<DatabaseSettings>>::clone(&database_settings),

View File

@ -0,0 +1,37 @@
use crate::{chunk_serialize::SerializedChunk, client::Client, metrics::NetworkRequestMetrics};
use common_ecs::{Job, Origin, Phase, System};
use specs::{ReadExpect, ReadStorage};
/// This system will handle sending terrain to clients by
/// collecting chunks that need to be send for a single generation run and then
/// trigger a SlowJob for serialisation.
#[derive(Default)]
pub struct Sys;
impl<'a> System<'a> for Sys {
type SystemData = (
ReadStorage<'a, Client>,
ReadExpect<'a, NetworkRequestMetrics>,
ReadExpect<'a, crossbeam_channel::Receiver<SerializedChunk>>,
);
const NAME: &'static str = "chunk_send";
const ORIGIN: Origin = Origin::Server;
const PHASE: Phase = Phase::Create;
fn run(_job: &mut Job<Self>, (clients, network_metrics, chunk_receiver): Self::SystemData) {
for sc in chunk_receiver.try_iter() {
for recipient in sc.recipients {
if let Some(client) = clients.get(recipient) {
if client.send_prepared(&sc.msg).is_err() {
if sc.lossy_compression {
network_metrics.chunks_served_lossy.inc()
} else {
network_metrics.chunks_served_lossless.inc()
}
}
}
}
}
}
}

View File

@ -1,70 +1,16 @@
use crate::{
chunk_serialize::ChunkSendQueue, client::Client, metrics::NetworkRequestMetrics,
presence::Presence, Tick,
chunk_serialize::{ChunkSendQueue, SerializedChunk},
client::Client,
presence::Presence,
Tick,
};
use common::{slowjob::SlowJobPool, terrain::TerrainGrid};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{SerializedTerrainChunk, ServerGeneral};
use hashbrown::HashMap;
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, WriteStorage};
use std::cmp::Ordering;
pub(crate) struct LazyTerrainMessage {
lazy_msg_lo: Option<crate::client::PreparedMsg>,
lazy_msg_hi: Option<crate::client::PreparedMsg>,
}
pub const SAFE_ZONE_RADIUS: f32 = 200.0;
impl LazyTerrainMessage {
pub(crate) fn new() -> Self {
Self {
lazy_msg_lo: None,
lazy_msg_hi: None,
}
}
pub(crate) fn prepare_and_send<
'a,
A,
F: FnOnce() -> Result<&'a common::terrain::TerrainChunk, A>,
>(
&mut self,
network_metrics: &NetworkRequestMetrics,
client: &Client,
presence: &Presence,
chunk_key: &vek::Vec2<i32>,
generate_chunk: F,
) -> Result<(), A> {
let lazy_msg = if presence.lossy_terrain_compression {
&mut self.lazy_msg_lo
} else {
&mut self.lazy_msg_hi
};
if lazy_msg.is_none() {
*lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate {
key: *chunk_key,
chunk: Ok(match generate_chunk() {
Ok(chunk) => SerializedTerrainChunk::via_heuristic(
chunk,
presence.lossy_terrain_compression,
),
Err(e) => return Err(e),
}),
}));
}
lazy_msg.as_ref().map(|msg| {
let _ = client.send_prepared(msg);
if presence.lossy_terrain_compression {
network_metrics.chunks_served_lossy.inc();
} else {
network_metrics.chunks_served_lossless.inc();
}
});
Ok(())
}
}
use network::StreamParams;
use specs::{Entities, Entity, Join, Read, ReadExpect, ReadStorage, WriteStorage};
use std::{cmp::Ordering, sync::Arc};
/// This system will handle sending terrain to clients by
/// collecting chunks that need to be send for a single generation run and then
@ -80,7 +26,7 @@ impl<'a> System<'a> for Sys {
WriteStorage<'a, ChunkSendQueue>,
ReadExpect<'a, SlowJobPool>,
ReadExpect<'a, TerrainGrid>,
ReadExpect<'a, NetworkRequestMetrics>,
ReadExpect<'a, crossbeam_channel::Sender<SerializedChunk>>,
);
const NAME: &'static str = "chunk_serialize";
@ -97,16 +43,15 @@ impl<'a> System<'a> for Sys {
mut chunk_send_queues,
slow_jobs,
terrain,
network_metrics,
chunk_sender,
): Self::SystemData,
) {
// Only operate once per second
if tick.0.rem_euclid(60) != 0 {
//TODO: move out of this system and now even spawn this.
if tick.0.rem_euclid(30) != 0 {
return;
}
let mut chunks = HashMap::<_, Vec<_>>::new();
for entity in (&entities, &clients, &presences, !&chunk_send_queues)
.join()
.map(|(e, _, _, _)| e)
@ -115,9 +60,16 @@ impl<'a> System<'a> for Sys {
let _ = chunk_send_queues.insert(entity, ChunkSendQueue::default());
}
struct Metadata {
recipients: Vec<Entity>,
lossy_compression: bool,
params: StreamParams,
}
let mut chunks = HashMap::<_, Metadata>::new();
// Grab all chunk requests for all clients and sort them
for (entity, _client, chunk_send_queue) in
(&entities, &clients, &mut chunk_send_queues).join()
for (entity, client, presence, chunk_send_queue) in
(&entities, &clients, &presences, &mut chunk_send_queues).join()
{
let mut chunk_send_queue = std::mem::take(chunk_send_queue);
// dedup input
@ -132,29 +84,44 @@ impl<'a> System<'a> for Sys {
});
chunk_send_queue.chunks.dedup();
for chunk_key in chunk_send_queue.chunks {
let recipients = chunks.entry(chunk_key).or_default();
recipients.push(entity);
let meta = chunks.entry(chunk_key).or_insert_with(|| Metadata {
recipients: Vec::default(),
lossy_compression: true,
params: client.terrain_params(),
});
meta.recipients.push(entity);
// We decide here, to ONLY send lossy compressed data If all clients want those.
// If at least 1 client here does not want lossy we don't compress it twice.
// It would just be too expensive for the server
meta.lossy_compression =
meta.lossy_compression && presence.lossy_terrain_compression;
}
}
if !chunks.is_empty() {
let len = chunks.len();
print!("{}", len);
for (chunk_key, entities) in chunks {
let mut lazy_msg = LazyTerrainMessage::new();
for entity in entities {
let client = clients.get(entity).unwrap();
let presence = presences.get(entity).unwrap();
if let Err(e) = lazy_msg.prepare_and_send(
&network_metrics,
client,
presence,
&chunk_key,
|| terrain.get_key(chunk_key).ok_or(()),
) {
tracing::error!(?e, "error sending chunk");
}
}
// Trigger serialization in a SlowJob
for (chunk_key, meta) in chunks {
if let Some(chunk) = terrain.get_key_arc(chunk_key) {
let chunk = Arc::clone(chunk);
let chunk_sender = chunk_sender.clone();
slow_jobs.spawn("CHUNK_SERIALIZER", move || {
let msg = Client::prepare_terrain(
ServerGeneral::TerrainChunkUpdate {
key: chunk_key,
chunk: Ok(SerializedTerrainChunk::via_heuristic(
&chunk,
meta.lossy_compression,
)),
},
&meta.params,
);
if let Err(e) = chunk_sender.send(SerializedChunk {
lossy_compression: meta.lossy_compression,
msg,
recipients: meta.recipients,
}) {
tracing::warn!(?e, "cannot send serialized chunk to sender")
};
});
}
}
}

View File

@ -1,4 +1,5 @@
pub mod agent;
pub mod chunk_send;
pub mod chunk_serialize;
pub mod entity_sync;
pub mod invite_timeout;
@ -34,7 +35,10 @@ pub fn add_server_systems(dispatch_builder: &mut DispatcherBuilder) {
dispatch::<persistence::Sys>(dispatch_builder, &[]);
dispatch::<object::Sys>(dispatch_builder, &[]);
dispatch::<wiring::Sys>(dispatch_builder, &[]);
// no dependency, as we only work once per sec anyway.
dispatch::<chunk_serialize::Sys>(dispatch_builder, &[]);
// don't depend on chunk_serialize, as we assume everything is done in a SlowJow
dispatch::<chunk_send::Sys>(dispatch_builder, &[]);
}
pub fn run_sync_systems(ecs: &mut specs::World) {

View File

@ -252,10 +252,10 @@ impl<'a> System<'a> for Sys {
}
// Send the chunk to all nearby players.
new_chunks.into_iter().for_each(|(key, chunk)| {
new_chunks.into_iter().for_each(|(key, _chunk)| {
(&presences, &positions, &clients, &mut chunk_send_queues)
.join()
.for_each(|(presence, pos, client, chunk_send_queue)| {
.for_each(|(presence, pos, _client, chunk_send_queue)| {
let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32));
// Subtract 2 from the offset before computing squared magnitude
// 1 since chunks need neighbors to be meshed