Run terrain chunk removal far from players in parallel.

This commit is contained in:
Joshua Yanovski 2022-09-10 13:23:40 -07:00
parent 7fe721247c
commit 59d8266f2a
4 changed files with 41 additions and 30 deletions

View File

@ -29,6 +29,13 @@ impl<V: RectRasterableVol> VolGrid2d<V> {
.map2(V::RECT_SIZE, |e, sz: u32| e.div_euclid(sz as i32))
}
#[inline(always)]
pub fn par_keys(&self) -> hashbrown::hash_map::rayon::ParKeys<Vec2<i32>, Arc<V>>
where V: Send + Sync,
{
self.chunks.par_keys()
}
#[inline(always)]
pub fn chunk_offs(pos: Vec3<i32>) -> Vec3<i32> {
let offs = Vec2::<i32>::from(pos).map2(V::RECT_SIZE, |e, sz| e & (sz - 1) as i32);

View File

@ -6,6 +6,7 @@ use common::{
terrain::TerrainChunk,
};
use hashbrown::{hash_map::Entry, HashMap};
use rayon::iter::ParallelIterator;
use specs::Entity as EcsEntity;
use std::sync::{
atomic::{AtomicBool, Ordering},
@ -82,6 +83,10 @@ impl ChunkGenerator {
self.pending_chunks.keys().copied()
}
pub fn par_pending_chunks(&self) -> impl rayon::iter::ParallelIterator<Item = Vec2<i32>> + '_ {
self.pending_chunks.par_keys().copied()
}
pub fn cancel_if_pending(&mut self, key: Vec2<i32>) {
if let Some(cancel) = self.pending_chunks.remove(&key) {
cancel.store(true, Ordering::Relaxed);

View File

@ -12,6 +12,7 @@ use common::{
resources::TimeOfDay,
uid::{Uid, UidAllocator},
};
use common_base::prof_span;
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{
CharacterInfo, ClientRegister, DisconnectReason, PlayerInfo, PlayerListUpdate, RegisterError,
@ -109,14 +110,16 @@ impl<'a> System<'a> for Sys {
//
// Big enough that we hopefully won't have to reallocate.
//
// Also includes a list of logins to retry, since we happen to update those
// around the same time that we update the new players list.
// Also includes a list of logins to retry and finished_pending, since we
// happen to update those around the same time that we update the new
// players list.
//
// NOTE: stdlib mutex is more than good enough on Linux and (probably) Windows,
// but not Mac.
let new_players = parking_lot::Mutex::new((
HashMap::<_, (_, _, _, _)>::with_capacity(capacity),
Vec::with_capacity(capacity),
Vec::with_capacity(capacity),
));
// defer auth lockup
@ -153,21 +156,20 @@ impl<'a> System<'a> for Sys {
// It will be overwritten in ServerExt::update_character_data.
let battle_mode = read_data.settings.gameplay.battle_mode.default_mode();
let finished_pending = (
(
&read_data.entities,
&read_data.uids,
&clients,
!players.mask(),
&mut pending_logins,
)
.par_join()
.fold_with(
// (Finished pending entity list, emitter)
(vec![], OptionClone(None)),
|(mut finished_pending, mut server_emitter_), (entity, uid, client, _, pending)| {
let server_emitter = server_emitter_
.0
.get_or_insert_with(|| read_data.server_event_bus.emitter());
.join()
// NOTE: Required because Specs has very poor work splitting for sparse joins.
.par_bridge()
.for_each_init(
|| read_data.server_event_bus.emitter(),
|server_emitter, (entity, uid, client, _, pending)| {
prof_span!("msg::register login");
if let Err(e) = || -> Result<(), crate::error::Error> {
// Destructure new_players_guard last so it gets dropped before the other
// three.
@ -229,9 +231,9 @@ impl<'a> System<'a> for Sys {
) {
None => return Ok(()),
Some(r) => {
finished_pending.push(entity);
match r {
Err(e) => {
new_players.lock().2.push(entity);
// NOTE: Done only on error to avoid doing extra work within
// the lock.
trace!(?e, "pending login returned error");
@ -249,7 +251,8 @@ impl<'a> System<'a> for Sys {
},
};
let (new_players_by_uuid, retries) = &mut *new_players_guard;
let (new_players_by_uuid, retries, finished_pending) = &mut *new_players_guard;
finished_pending.push(entity);
// Check if the user logged in before us during this tick (this is why we
// need the lock held).
let uuid = player.uuid();
@ -355,16 +358,13 @@ impl<'a> System<'a> for Sys {
}() {
trace!(?e, "failed to process register");
}
(finished_pending, server_emitter_)
},
)
.map(|(finished_pending, _server_emitter)| finished_pending)
.collect::<Vec<_>>();
finished_pending.into_iter().flatten().for_each(|e| {
);
let (new_players, retries, finished_pending) = new_players.into_inner();
finished_pending.into_iter().for_each(|e| {
// Remove all entities in finished_pending from pending_logins.
pending_logins.remove(e);
});
let (new_players, retries) = new_players.into_inner();
// Insert retry attempts back into pending_logins to be processed next tick
for (entity, pending) in retries {

View File

@ -34,6 +34,7 @@ use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::ServerGeneral;
use common_state::TerrainChanges;
use comp::Behavior;
use rayon::iter::ParallelIterator;
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, Write, WriteExpect, WriteStorage};
use std::sync::Arc;
use vek::*;
@ -310,29 +311,27 @@ impl<'a> System<'a> for Sys {
);
// Remove chunks that are too far from players.
let mut chunks_to_remove = Vec::new();
terrain
.iter()
.map(|(k, _)| k)
let chunks_to_remove = terrain
.par_keys()
.copied()
// Don't check every chunk every tick (spread over 16 ticks)
.filter(|k| k.x.unsigned_abs() % 4 + (k.y.unsigned_abs() % 4) * 4 == (tick.0 % 16) as u32)
// There shouldn't be to many pending chunks so we will just check them all
.chain(chunk_generator.pending_chunks())
.for_each(|chunk_key| {
.chain(chunk_generator.par_pending_chunks())
.filter(|chunk_key| {
let mut should_drop = true;
// For each player with a position, calculate the distance.
for (presence, pos) in (&presences, &positions).join() {
if chunk_in_vd(pos.0, chunk_key, &terrain, presence.view_distance) {
if chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance) {
should_drop = false;
break;
}
}
if should_drop {
chunks_to_remove.push(chunk_key);
}
});
!should_drop
})
.collect::<Vec<_>>();
for key in chunks_to_remove {
// Register the unloading of this chunk from terrain persistence