Merge branch 'xMAC94x/fearless_concurency_terrainmsg' into 'master'

make msg::terrain parallel via rayon, we parallelize over the number of...

See merge request veloren/veloren!2019
This commit is contained in:
Marcel 2021-03-28 20:05:01 +00:00
commit b2f6e3bdc7

View File

@ -5,9 +5,10 @@ use common::{
terrain::{TerrainChunkSize, TerrainGrid},
vol::RectVolSize,
};
use common_ecs::{Job, Origin, Phase, System};
use common_ecs::{Job, Origin, ParMode, Phase, System};
use common_net::msg::{ClientGeneral, ServerGeneral};
use specs::{Entities, Join, Read, ReadExpect, ReadStorage};
use rayon::iter::ParallelIterator;
use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage};
use tracing::{debug, trace};
/// This system will handle new messages from clients
@ -30,7 +31,7 @@ impl<'a> System<'a> for Sys {
const PHASE: Phase = Phase::Create;
fn run(
_job: &mut Job<Self>,
job: &mut Job<Self>,
(
entities,
server_event_bus,
@ -43,53 +44,65 @@ impl<'a> System<'a> for Sys {
) {
let mut server_emitter = server_event_bus.emitter();
for (entity, client, maybe_presence) in (&entities, &clients, (&presences).maybe()).join() {
let _ = super::try_recv_all(client, 5, |client, msg| {
let presence = match maybe_presence {
Some(g) => g,
None => {
debug!(?entity, "client is not in_game, ignoring msg");
trace!(?msg, "ignored msg content");
if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) {
network_metrics.chunks_request_dropped.inc();
}
return Ok(());
},
};
match msg {
ClientGeneral::TerrainChunkRequest { key } => {
let in_vd = if let Some(pos) = positions.get(entity) {
pos.0.xy().map(|e| e as f64).distance_squared(
key.map(|e| e as f64 + 0.5)
* TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
) < ((presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt())
* TerrainChunkSize::RECT_SIZE.x as f64)
.powi(2)
} else {
true
};
if in_vd {
match terrain.get_key(key) {
Some(chunk) => {
network_metrics.chunks_served_from_memory.inc();
client.send(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
})?
},
None => {
network_metrics.chunks_generation_triggered.inc();
server_emitter.emit(ServerEvent::ChunkRequest(entity, key))
},
job.cpu_stats.measure(ParMode::Rayon);
let mut events = (&entities, &clients, (&presences).maybe())
.par_join()
.map(|(entity, client, maybe_presence)| {
let mut events = Vec::new();
let _ = super::try_recv_all(client, 5, |client, msg| {
let presence = match maybe_presence {
Some(g) => g,
None => {
debug!(?entity, "client is not in_game, ignoring msg");
trace!(?msg, "ignored msg content");
if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) {
network_metrics.chunks_request_dropped.inc();
}
} else {
network_metrics.chunks_request_dropped.inc();
}
},
_ => tracing::error!("not a client_terrain msg"),
}
Ok(())
});
return Ok(());
},
};
match msg {
ClientGeneral::TerrainChunkRequest { key } => {
let in_vd = if let Some(pos) = positions.get(entity) {
pos.0.xy().map(|e| e as f64).distance_squared(
key.map(|e| e as f64 + 0.5)
* TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
) < ((presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt())
* TerrainChunkSize::RECT_SIZE.x as f64)
.powi(2)
} else {
true
};
if in_vd {
match terrain.get_key(key) {
Some(chunk) => {
network_metrics.chunks_served_from_memory.inc();
client.send(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
})?
},
None => {
network_metrics.chunks_generation_triggered.inc();
events.push(ServerEvent::ChunkRequest(entity, key));
},
}
} else {
network_metrics.chunks_request_dropped.inc();
}
},
_ => tracing::error!("not a client_terrain msg"),
}
Ok(())
});
events
})
.flatten()
.collect::<Vec<_>>();
job.cpu_stats.measure(ParMode::Single);
for event in events.drain(..) {
server_emitter.emit(event);
}
}
}