From 59d8266f2a04b9456ae226778e7127a92503de80 Mon Sep 17 00:00:00 2001 From: Joshua Yanovski Date: Sat, 10 Sep 2022 13:23:40 -0700 Subject: [PATCH] Run terrain chunk removal far from players in parallel. --- common/src/volumes/vol_grid_2d.rs | 7 ++++++ server/src/chunk_generator.rs | 5 ++++ server/src/sys/msg/register.rs | 38 +++++++++++++++---------------- server/src/sys/terrain.rs | 21 ++++++++--------- 4 files changed, 41 insertions(+), 30 deletions(-) diff --git a/common/src/volumes/vol_grid_2d.rs b/common/src/volumes/vol_grid_2d.rs index 7cb026188d..59537eda60 100644 --- a/common/src/volumes/vol_grid_2d.rs +++ b/common/src/volumes/vol_grid_2d.rs @@ -29,6 +29,13 @@ impl VolGrid2d { .map2(V::RECT_SIZE, |e, sz: u32| e.div_euclid(sz as i32)) } + #[inline(always)] + pub fn par_keys(&self) -> hashbrown::hash_map::rayon::ParKeys, Arc> + where V: Send + Sync, + { + self.chunks.par_keys() + } + #[inline(always)] pub fn chunk_offs(pos: Vec3) -> Vec3 { let offs = Vec2::::from(pos).map2(V::RECT_SIZE, |e, sz| e & (sz - 1) as i32); diff --git a/server/src/chunk_generator.rs b/server/src/chunk_generator.rs index 5788c798f0..d83031138c 100644 --- a/server/src/chunk_generator.rs +++ b/server/src/chunk_generator.rs @@ -6,6 +6,7 @@ use common::{ terrain::TerrainChunk, }; use hashbrown::{hash_map::Entry, HashMap}; +use rayon::iter::ParallelIterator; use specs::Entity as EcsEntity; use std::sync::{ atomic::{AtomicBool, Ordering}, @@ -82,6 +83,10 @@ impl ChunkGenerator { self.pending_chunks.keys().copied() } + pub fn par_pending_chunks(&self) -> impl rayon::iter::ParallelIterator> + '_ { + self.pending_chunks.par_keys().copied() + } + pub fn cancel_if_pending(&mut self, key: Vec2) { if let Some(cancel) = self.pending_chunks.remove(&key) { cancel.store(true, Ordering::Relaxed); diff --git a/server/src/sys/msg/register.rs b/server/src/sys/msg/register.rs index b93b3bb23f..44459859f1 100644 --- a/server/src/sys/msg/register.rs +++ b/server/src/sys/msg/register.rs @@ -12,6 +12,7 @@ use common::{ resources::TimeOfDay, uid::{Uid, UidAllocator}, }; +use common_base::prof_span; use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{ CharacterInfo, ClientRegister, DisconnectReason, PlayerInfo, PlayerListUpdate, RegisterError, @@ -109,14 +110,16 @@ impl<'a> System<'a> for Sys { // // Big enough that we hopefully won't have to reallocate. // - // Also includes a list of logins to retry, since we happen to update those - // around the same time that we update the new players list. + // Also includes a list of logins to retry and finished_pending, since we + // happen to update those around the same time that we update the new + // players list. // // NOTE: stdlib mutex is more than good enough on Linux and (probably) Windows, // but not Mac. let new_players = parking_lot::Mutex::new(( HashMap::<_, (_, _, _, _)>::with_capacity(capacity), Vec::with_capacity(capacity), + Vec::with_capacity(capacity), )); // defer auth lockup @@ -153,21 +156,20 @@ impl<'a> System<'a> for Sys { // It will be overwritten in ServerExt::update_character_data. let battle_mode = read_data.settings.gameplay.battle_mode.default_mode(); - let finished_pending = ( + ( &read_data.entities, &read_data.uids, &clients, !players.mask(), &mut pending_logins, ) - .par_join() - .fold_with( - // (Finished pending entity list, emitter) - (vec![], OptionClone(None)), - |(mut finished_pending, mut server_emitter_), (entity, uid, client, _, pending)| { - let server_emitter = server_emitter_ - .0 - .get_or_insert_with(|| read_data.server_event_bus.emitter()); + .join() + // NOTE: Required because Specs has very poor work splitting for sparse joins. + .par_bridge() + .for_each_init( + || read_data.server_event_bus.emitter(), + |server_emitter, (entity, uid, client, _, pending)| { + prof_span!("msg::register login"); if let Err(e) = || -> Result<(), crate::error::Error> { // Destructure new_players_guard last so it gets dropped before the other // three. @@ -229,9 +231,9 @@ impl<'a> System<'a> for Sys { ) { None => return Ok(()), Some(r) => { - finished_pending.push(entity); match r { Err(e) => { + new_players.lock().2.push(entity); // NOTE: Done only on error to avoid doing extra work within // the lock. trace!(?e, "pending login returned error"); @@ -249,7 +251,8 @@ impl<'a> System<'a> for Sys { }, }; - let (new_players_by_uuid, retries) = &mut *new_players_guard; + let (new_players_by_uuid, retries, finished_pending) = &mut *new_players_guard; + finished_pending.push(entity); // Check if the user logged in before us during this tick (this is why we // need the lock held). let uuid = player.uuid(); @@ -355,16 +358,13 @@ impl<'a> System<'a> for Sys { }() { trace!(?e, "failed to process register"); } - (finished_pending, server_emitter_) }, - ) - .map(|(finished_pending, _server_emitter)| finished_pending) - .collect::>(); - finished_pending.into_iter().flatten().for_each(|e| { + ); + let (new_players, retries, finished_pending) = new_players.into_inner(); + finished_pending.into_iter().for_each(|e| { // Remove all entities in finished_pending from pending_logins. pending_logins.remove(e); }); - let (new_players, retries) = new_players.into_inner(); // Insert retry attempts back into pending_logins to be processed next tick for (entity, pending) in retries { diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 5d5f9e9850..cf4a8667bd 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -34,6 +34,7 @@ use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::ServerGeneral; use common_state::TerrainChanges; use comp::Behavior; +use rayon::iter::ParallelIterator; use specs::{Entities, Join, Read, ReadExpect, ReadStorage, Write, WriteExpect, WriteStorage}; use std::sync::Arc; use vek::*; @@ -310,29 +311,27 @@ impl<'a> System<'a> for Sys { ); // Remove chunks that are too far from players. - let mut chunks_to_remove = Vec::new(); - terrain - .iter() - .map(|(k, _)| k) + let chunks_to_remove = terrain + .par_keys() + .copied() // Don't check every chunk every tick (spread over 16 ticks) .filter(|k| k.x.unsigned_abs() % 4 + (k.y.unsigned_abs() % 4) * 4 == (tick.0 % 16) as u32) // There shouldn't be to many pending chunks so we will just check them all - .chain(chunk_generator.pending_chunks()) - .for_each(|chunk_key| { + .chain(chunk_generator.par_pending_chunks()) + .filter(|chunk_key| { let mut should_drop = true; // For each player with a position, calculate the distance. for (presence, pos) in (&presences, &positions).join() { - if chunk_in_vd(pos.0, chunk_key, &terrain, presence.view_distance) { + if chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance) { should_drop = false; break; } } - if should_drop { - chunks_to_remove.push(chunk_key); - } - }); + !should_drop + }) + .collect::>(); for key in chunks_to_remove { // Register the unloading of this chunk from terrain persistence