add metrics and trigger slowjobs in chunks of 25

This commit is contained in:
Marcel Märtens 2022-05-10 14:38:43 +02:00
parent 9b53693783
commit 8e47d02f8d
3 changed files with 59 additions and 14 deletions

View File

@ -36,6 +36,8 @@ pub struct NetworkRequestMetrics {
pub chunks_generation_triggered: IntCounter, pub chunks_generation_triggered: IntCounter,
pub chunks_served_lossy: IntCounter, pub chunks_served_lossy: IntCounter,
pub chunks_served_lossless: IntCounter, pub chunks_served_lossless: IntCounter,
pub chunks_serialisation_requests: IntCounter,
pub chunks_distinct_serialisation_requests: IntCounter,
} }
pub struct ChunkGenMetrics { pub struct ChunkGenMetrics {
@ -202,12 +204,22 @@ impl NetworkRequestMetrics {
"chunks_served_lossless", "chunks_served_lossless",
"number of chunks that were sent with lossless compression requested", "number of chunks that were sent with lossless compression requested",
))?; ))?;
let chunks_serialisation_requests = IntCounter::with_opts(Opts::new(
"chunks_serialisation_requests",
"number of requests for the sys chunk_serialisation",
))?;
let chunks_distinct_serialisation_requests = IntCounter::with_opts(Opts::new(
"chunks_distinct_serialisation_requests",
"number of distinct chunks in requests for the sys chunk_serialisation",
))?;
registry.register(Box::new(chunks_request_dropped.clone()))?; registry.register(Box::new(chunks_request_dropped.clone()))?;
registry.register(Box::new(chunks_served_from_memory.clone()))?; registry.register(Box::new(chunks_served_from_memory.clone()))?;
registry.register(Box::new(chunks_generation_triggered.clone()))?; registry.register(Box::new(chunks_generation_triggered.clone()))?;
registry.register(Box::new(chunks_served_lossy.clone()))?; registry.register(Box::new(chunks_served_lossy.clone()))?;
registry.register(Box::new(chunks_served_lossless.clone()))?; registry.register(Box::new(chunks_served_lossless.clone()))?;
registry.register(Box::new(chunks_serialisation_requests.clone()))?;
registry.register(Box::new(chunks_distinct_serialisation_requests.clone()))?;
Ok(Self { Ok(Self {
chunks_request_dropped, chunks_request_dropped,
@ -215,6 +227,8 @@ impl NetworkRequestMetrics {
chunks_generation_triggered, chunks_generation_triggered,
chunks_served_lossy, chunks_served_lossy,
chunks_served_lossless, chunks_served_lossless,
chunks_serialisation_requests,
chunks_distinct_serialisation_requests,
}) })
} }
} }

View File

@ -20,18 +20,22 @@ impl<'a> System<'a> for Sys {
const PHASE: Phase = Phase::Create; const PHASE: Phase = Phase::Create;
fn run(_job: &mut Job<Self>, (clients, network_metrics, chunk_receiver): Self::SystemData) { fn run(_job: &mut Job<Self>, (clients, network_metrics, chunk_receiver): Self::SystemData) {
let mut lossy = 0u64;
let mut lossless = 0u64;
for sc in chunk_receiver.try_iter() { for sc in chunk_receiver.try_iter() {
for recipient in sc.recipients { for recipient in sc.recipients {
if let Some(client) = clients.get(recipient) { if let Some(client) = clients.get(recipient) {
if client.send_prepared(&sc.msg).is_err() { if client.send_prepared(&sc.msg).is_err() {
if sc.lossy_compression { if sc.lossy_compression {
network_metrics.chunks_served_lossy.inc() lossy += 1;
} else { } else {
network_metrics.chunks_served_lossless.inc() lossless += 1;
} }
} }
} }
} }
} }
network_metrics.chunks_served_lossy.inc_by(lossy);
network_metrics.chunks_served_lossless.inc_by(lossless);
} }
} }

View File

@ -1,6 +1,7 @@
use crate::{ use crate::{
chunk_serialize::{ChunkSendQueue, SerializedChunk}, chunk_serialize::{ChunkSendQueue, SerializedChunk},
client::Client, client::Client,
metrics::NetworkRequestMetrics,
presence::Presence, presence::Presence,
Tick, Tick,
}; };
@ -24,6 +25,7 @@ impl<'a> System<'a> for Sys {
ReadStorage<'a, Client>, ReadStorage<'a, Client>,
ReadStorage<'a, Presence>, ReadStorage<'a, Presence>,
WriteStorage<'a, ChunkSendQueue>, WriteStorage<'a, ChunkSendQueue>,
ReadExpect<'a, NetworkRequestMetrics>,
ReadExpect<'a, SlowJobPool>, ReadExpect<'a, SlowJobPool>,
ReadExpect<'a, TerrainGrid>, ReadExpect<'a, TerrainGrid>,
ReadExpect<'a, crossbeam_channel::Sender<SerializedChunk>>, ReadExpect<'a, crossbeam_channel::Sender<SerializedChunk>>,
@ -41,6 +43,7 @@ impl<'a> System<'a> for Sys {
clients, clients,
presences, presences,
mut chunk_send_queues, mut chunk_send_queues,
network_metrics,
slow_jobs, slow_jobs,
terrain, terrain,
chunk_sender, chunk_sender,
@ -68,6 +71,8 @@ impl<'a> System<'a> for Sys {
let mut chunks = HashMap::<_, Metadata>::new(); let mut chunks = HashMap::<_, Metadata>::new();
// Grab all chunk requests for all clients and sort them // Grab all chunk requests for all clients and sort them
let mut requests = 0u64;
let mut distinct_requests = 0u64;
for (entity, client, presence, chunk_send_queue) in for (entity, client, presence, chunk_send_queue) in
(&entities, &clients, &presences, &mut chunk_send_queues).join() (&entities, &clients, &presences, &mut chunk_send_queues).join()
{ {
@ -83,11 +88,15 @@ impl<'a> System<'a> for Sys {
} }
}); });
chunk_send_queue.chunks.dedup(); chunk_send_queue.chunks.dedup();
requests += chunk_send_queue.chunks.len() as u64;
for chunk_key in chunk_send_queue.chunks { for chunk_key in chunk_send_queue.chunks {
let meta = chunks.entry(chunk_key).or_insert_with(|| Metadata { let meta = chunks.entry(chunk_key).or_insert_with(|| {
recipients: Vec::default(), distinct_requests += 1;
lossy_compression: true, Metadata {
params: client.terrain_params(), recipients: Vec::default(),
lossy_compression: true,
params: client.terrain_params(),
}
}); });
meta.recipients.push(entity); meta.recipients.push(entity);
// We decide here, to ONLY send lossy compressed data If all clients want those. // We decide here, to ONLY send lossy compressed data If all clients want those.
@ -97,13 +106,30 @@ impl<'a> System<'a> for Sys {
meta.lossy_compression && presence.lossy_terrain_compression; meta.lossy_compression && presence.lossy_terrain_compression;
} }
} }
network_metrics
.chunks_serialisation_requests
.inc_by(requests);
network_metrics
.chunks_distinct_serialisation_requests
.inc_by(distinct_requests);
// Trigger serialization in a SlowJob // Trigger serialization in a SlowJob
for (chunk_key, meta) in chunks { const CHUNK_SIZE: usize = 25; // trigger one job per 25 chunks to reduce SlowJob overhead. as we use a channel, there is no disadvantage to this
if let Some(chunk) = terrain.get_key_arc(chunk_key) { let mut chunks_iter = chunks
let chunk = Arc::clone(chunk); .into_iter()
let chunk_sender = chunk_sender.clone(); .filter_map(|(chunk_key, meta)| {
slow_jobs.spawn("CHUNK_SERIALIZER", move || { terrain
.get_key_arc(chunk_key)
.map(|chunk| (Arc::clone(chunk), chunk_key, meta))
})
.into_iter()
.peekable();
while chunks_iter.peek().is_some() {
let chunks: Vec<_> = chunks_iter.by_ref().take(CHUNK_SIZE).collect();
let chunk_sender = chunk_sender.clone();
slow_jobs.spawn("CHUNK_SERIALIZER", move || {
for (chunk, chunk_key, meta) in chunks {
let msg = Client::prepare_terrain( let msg = Client::prepare_terrain(
ServerGeneral::TerrainChunkUpdate { ServerGeneral::TerrainChunkUpdate {
key: chunk_key, key: chunk_key,
@ -119,10 +145,11 @@ impl<'a> System<'a> for Sys {
msg, msg,
recipients: meta.recipients, recipients: meta.recipients,
}) { }) {
tracing::warn!(?e, "cannot send serialized chunk to sender") tracing::warn!(?e, "cannot send serialized chunk to sender");
break;
}; };
}); }
} });
} }
} }
} }