From 6547a6bf5e7dd0b7d3375973d9e7305a26ba7ad1 Mon Sep 17 00:00:00 2001 From: Imbris Date: Sat, 25 Dec 2021 18:06:55 -0500 Subject: [PATCH] Parallelize entity sync loop over regions. --- Cargo.lock | 1 + common/Cargo.toml | 2 +- common/src/region.rs | 14 +- server/src/client.rs | 7 +- server/src/sys/entity_sync.rs | 382 ++++++++++++++++++--------------- server/src/sys/sentinel.rs | 4 +- server/src/sys/subscription.rs | 6 +- 7 files changed, 226 insertions(+), 190 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c435d5ae65..ffd6f77b0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2620,6 +2620,7 @@ checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" dependencies = [ "autocfg", "hashbrown 0.11.2", + "rayon", "serde", ] diff --git a/common/Cargo.toml b/common/Cargo.toml index f935d53c0e..8e5977d4bf 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -73,7 +73,7 @@ kiddo = { version = "0.1", optional = true } # Data structures hashbrown = { version = "0.11", features = ["rayon", "serde", "nightly"] } slotmap = { version = "1.0", features = ["serde"] } -indexmap = "1.3.0" +indexmap = { version = "1.3.0", features = ["rayon"] } slab = "0.4.2" # ECS diff --git a/common/src/region.rs b/common/src/region.rs index 6949f63a7d..4437384f43 100644 --- a/common/src/region.rs +++ b/common/src/region.rs @@ -342,16 +342,24 @@ impl RegionMap { } } - // Returns a region given a key + /// Returns a region given a key pub fn get(&self, key: Vec2) -> Option<&Region> { self.regions.get(&key) } - // Returns an iterator of (Position, Region) + /// Returns an iterator of (Position, Region) pub fn iter(&self) -> impl Iterator, &Region)> { self.regions.iter().map(|(key, r)| (*key, r)) } + + /// Returns a parallel iterator of (Position, Regions) + pub fn par_iter( + &self, + ) -> impl rayon::iter::IndexedParallelIterator, &Region)> { + use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; + self.regions.par_iter().map(|(key, r)| (*key, r)) + } } -// Note vd is in blocks in this case +/// Note vd is in blocks in this case pub fn region_in_vd(key: Vec2, pos: Vec3, vd: f32) -> bool { let vd_extended = vd + TETHER_LENGTH as f32 * 2.0f32.sqrt(); diff --git a/server/src/client.rs b/server/src/client.rs index ca69bb0421..f03e44d566 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -136,7 +136,12 @@ impl Client { } } - pub(crate) fn send_fallible>(&self, msg: M) { let _ = self.send(msg); } + pub(crate) fn send_fallible>(&self, msg: M) { + // TODO: hack to avoid locking stream mutex while serializing the message, + // remove this when the mutexes on the Streams are removed + let prepared = self.prepare(msg); + let _ = self.send_prepared(&prepared); + } pub(crate) fn send_prepared(&self, msg: &PreparedMsg) -> Result<(), StreamError> { match msg.stream_id { diff --git a/server/src/sys/entity_sync.rs b/server/src/sys/entity_sync.rs index ad6742dcce..2b76f73bb6 100644 --- a/server/src/sys/entity_sync.rs +++ b/server/src/sys/entity_sync.rs @@ -23,6 +23,7 @@ use vek::*; /// This system will send physics updates to the client #[derive(Default)] pub struct Sys; + impl<'a> System<'a> for Sys { #[allow(clippy::type_complexity)] type SystemData = ( @@ -105,201 +106,224 @@ impl<'a> System<'a> for Sys { // 5. Inform clients of the component changes for that entity // - Throttle update rate base on distance to each client - // Sync physics - // via iterating through regions - for (key, region) in region_map.iter() { - // Assemble subscriber list for this region by iterating through clients and - // checking if they are subscribed to this region - let mut subscribers = ( - &clients, - &entities, - presences.maybe(), - &subscriptions, - &positions, - ) - .join() - .filter_map(|(client, entity, presence, subscription, pos)| { - if presence.is_some() && subscription.regions.contains(&key) { - Some((client, &subscription.regions, entity, *pos)) - } else { - None - } - }) - .collect::>(); - - for event in region.events() { - match event { - RegionEvent::Entered(id, maybe_key) => { - // Don't process newly created entities here (redundant network messages) - if trackers.uid.inserted().contains(*id) { - continue; + // Sync physics and other components + // via iterating through regions (in parallel) + use rayon::iter::ParallelIterator; + common_base::prof_span!(guard, "regions"); + region_map.par_iter().for_each_init( + || { + common_base::prof_span!(guard, "entity sync rayon job"); + guard + }, + |_guard, (key, region)| { + // Assemble subscriber list for this region by iterating through clients and + // checking if they are subscribed to this region + let mut subscribers = ( + &clients, + &entities, + presences.maybe(), + &subscriptions, + &positions, + ) + .join() + .filter_map(|(client, entity, presence, subscription, pos)| { + if presence.is_some() && subscription.regions.contains(&key) { + Some((client, &subscription.regions, entity, *pos)) + } else { + None } - let entity = entities.entity(*id); - if let Some(pkg) = positions - .get(entity) - .map(|pos| (pos, velocities.get(entity), orientations.get(entity))) - .and_then(|(pos, vel, ori)| { - tracked_comps.create_entity_package( - entity, - Some(*pos), - vel.copied(), - ori.copied(), - ) - }) - { - let create_msg = ServerGeneral::CreateEntity(pkg); - for (client, regions, client_entity, _) in &mut subscribers { - if maybe_key + }) + .collect::>(); + + for event in region.events() { + match event { + RegionEvent::Entered(id, maybe_key) => { + // Don't process newly created entities here (redundant network + // messages) + if trackers.uid.inserted().contains(*id) { + continue; + } + let entity = entities.entity(*id); + if let Some(pkg) = positions + .get(entity) + .map(|pos| (pos, velocities.get(entity), orientations.get(entity))) + .and_then(|(pos, vel, ori)| { + tracked_comps.create_entity_package( + entity, + Some(*pos), + vel.copied(), + ori.copied(), + ) + }) + { + let create_msg = ServerGeneral::CreateEntity(pkg); + for (client, regions, client_entity, _) in &mut subscribers { + if maybe_key .as_ref() .map(|key| !regions.contains(key)) .unwrap_or(true) // Client doesn't need to know about itself && *client_entity != entity - { - client.send_fallible(create_msg.clone()); + { + client.send_fallible(create_msg.clone()); + } } } - } - }, - RegionEvent::Left(id, maybe_key) => { - // Lookup UID for entity - if let Some(&uid) = uids.get(entities.entity(*id)) { - for (client, regions, _, _) in &mut subscribers { - if maybe_key - .as_ref() - .map(|key| !regions.contains(key)) - .unwrap_or(true) - { - client.send_fallible(ServerGeneral::DeleteEntity(uid)); + }, + RegionEvent::Left(id, maybe_key) => { + // Lookup UID for entity + if let Some(&uid) = uids.get(entities.entity(*id)) { + for (client, regions, _, _) in &mut subscribers { + if maybe_key + .as_ref() + .map(|key| !regions.contains(key)) + .unwrap_or(true) + { + client.send_fallible(ServerGeneral::DeleteEntity(uid)); + } } } - } - }, + }, + } } - } - // Sync tracked components - // Get deleted entities in this region from DeletedEntities - let (entity_sync_package, comp_sync_package) = trackers.create_sync_packages( - &tracked_comps, - region.entities(), - deleted_entities - .take_deleted_in_region(key) - .unwrap_or_default(), - ); - // We lazily initializethe the synchronization messages in case there are no - // clients. - let mut entity_comp_sync = Either::Left((entity_sync_package, comp_sync_package)); - for (client, _, _, _) in &mut subscribers { - let msg = - entity_comp_sync.right_or_else(|(entity_sync_package, comp_sync_package)| { - ( - client.prepare(ServerGeneral::EntitySync(entity_sync_package)), - client.prepare(ServerGeneral::CompSync(comp_sync_package)), - ) - }); - // We don't care much about stream errors here since they could just represent - // network disconnection, which is handled elsewhere. - let _ = client.send_prepared(&msg.0); - let _ = client.send_prepared(&msg.1); - entity_comp_sync = Either::Right(msg); - } - - for (client, _, client_entity, client_pos) in &mut subscribers { - let mut comp_sync_package = CompSyncPackage::new(); - - for (_, entity, &uid, (&pos, last_pos), vel, ori, force_update, collider) in ( + // Sync tracked components + // Get deleted entities in this region from DeletedEntities + let (entity_sync_package, comp_sync_package) = trackers.create_sync_packages( + &tracked_comps, region.entities(), - &entities, - &uids, - (&positions, last_pos.mask().maybe()), - (&velocities, last_vel.mask().maybe()).maybe(), - (&orientations, last_vel.mask().maybe()).maybe(), - force_updates.mask().maybe(), - colliders.maybe(), - ) - .join() - { - // Decide how regularly to send physics updates. - let send_now = if client_entity == &entity { - let player_physics_setting = players - .get(entity) - .and_then(|p| player_physics_settings.settings.get(&p.uuid()).copied()) - .unwrap_or_default(); - // Don't send client physics updates about itself unless force update is set - // or the client is subject to server-authoritative physics - force_update.is_some() || player_physics_setting.server_authoritative() - } else if matches!(collider, Some(Collider::Voxel { .. })) { - // Things with a voxel collider (airships, etc.) need to have very stable - // physics so we always send updated for these where - // we can. - true - } else { - // Throttle update rates for all other entities based on distance to client - let distance_sq = client_pos.0.distance_squared(pos.0); - let id_staggered_tick = tick + entity.id() as u64; - - // More entities farther away so checks start there - if distance_sq > 500.0f32.powi(2) { - id_staggered_tick % 32 == 0 - } else if distance_sq > 300.0f32.powi(2) { - id_staggered_tick % 16 == 0 - } else if distance_sq > 200.0f32.powi(2) { - id_staggered_tick % 8 == 0 - } else if distance_sq > 120.0f32.powi(2) { - id_staggered_tick % 6 == 0 - } else if distance_sq > 64.0f32.powi(2) { - id_staggered_tick % 3 == 0 - } else if distance_sq > 24.0f32.powi(2) { - id_staggered_tick % 2 == 0 - } else { - true - } - }; - - if last_pos.is_none() { - comp_sync_package.comp_inserted(uid, pos); - } else if send_now { - comp_sync_package.comp_modified(uid, pos); - } - - if let Some((v, last_vel)) = vel { - if last_vel.is_none() { - comp_sync_package.comp_inserted(uid, *v); - } else if send_now { - comp_sync_package.comp_modified(uid, *v); - } - } - - if let Some((o, last_ori)) = ori { - if last_ori.is_none() { - comp_sync_package.comp_inserted(uid, *o); - } else if send_now { - comp_sync_package.comp_modified(uid, *o); - } - } + deleted_entities + .get_deleted_in_region(key) + .cloned() // TODO: quick hack to make this parallel, we can avoid this + .unwrap_or_default(), + ); + // We lazily initialize the the synchronization messages in case there are no + // clients. + let mut entity_comp_sync = Either::Left((entity_sync_package, comp_sync_package)); + for (client, _, _, _) in &mut subscribers { + let msg = entity_comp_sync.right_or_else( + |(entity_sync_package, comp_sync_package)| { + ( + client.prepare(ServerGeneral::EntitySync(entity_sync_package)), + client.prepare(ServerGeneral::CompSync(comp_sync_package)), + ) + }, + ); + // We don't care much about stream errors here since they could just represent + // network disconnection, which is handled elsewhere. + let _ = client.send_prepared(&msg.0); + let _ = client.send_prepared(&msg.1); + entity_comp_sync = Either::Right(msg); } - client.send_fallible(ServerGeneral::CompSync(comp_sync_package)); - } + for (client, _, client_entity, client_pos) in &mut subscribers { + let mut comp_sync_package = CompSyncPackage::new(); - // Update the last physics components for each entity - for (_, _, &pos, vel, ori, last_pos, last_vel, last_ori) in ( - region.entities(), - &entities, - &positions, - velocities.maybe(), - orientations.maybe(), - last_pos.entries(), - last_vel.entries(), - last_ori.entries(), - ) - .join() - { - last_pos.replace(Last(pos)); - vel.and_then(|&v| last_vel.replace(Last(v))); - ori.and_then(|&o| last_ori.replace(Last(o))); - } + for (_, entity, &uid, (&pos, last_pos), vel, ori, force_update, collider) in ( + region.entities(), + &entities, + &uids, + (&positions, last_pos.mask().maybe()), + (&velocities, last_vel.mask().maybe()).maybe(), + (&orientations, last_vel.mask().maybe()).maybe(), + force_updates.mask().maybe(), + colliders.maybe(), + ) + .join() + { + // Decide how regularly to send physics updates. + let send_now = if client_entity == &entity { + let player_physics_setting = players + .get(entity) + .and_then(|p| { + player_physics_settings.settings.get(&p.uuid()).copied() + }) + .unwrap_or_default(); + // Don't send client physics updates about itself unless force update is + // set or the client is subject to + // server-authoritative physics + force_update.is_some() || player_physics_setting.server_authoritative() + } else if matches!(collider, Some(Collider::Voxel { .. })) { + // Things with a voxel collider (airships, etc.) need to have very + // stable physics so we always send updated + // for these where we can. + true + } else { + // Throttle update rates for all other entities based on distance to + // client + let distance_sq = client_pos.0.distance_squared(pos.0); + let id_staggered_tick = tick + entity.id() as u64; + + // More entities farther away so checks start there + if distance_sq > 500.0f32.powi(2) { + id_staggered_tick % 32 == 0 + } else if distance_sq > 300.0f32.powi(2) { + id_staggered_tick % 16 == 0 + } else if distance_sq > 200.0f32.powi(2) { + id_staggered_tick % 8 == 0 + } else if distance_sq > 120.0f32.powi(2) { + id_staggered_tick % 6 == 0 + } else if distance_sq > 64.0f32.powi(2) { + id_staggered_tick % 3 == 0 + } else if distance_sq > 24.0f32.powi(2) { + id_staggered_tick % 2 == 0 + } else { + true + } + }; + + if last_pos.is_none() { + comp_sync_package.comp_inserted(uid, pos); + } else if send_now { + comp_sync_package.comp_modified(uid, pos); + } + + if let Some((v, last_vel)) = vel { + if last_vel.is_none() { + comp_sync_package.comp_inserted(uid, *v); + } else if send_now { + comp_sync_package.comp_modified(uid, *v); + } + } + + if let Some((o, last_ori)) = ori { + if last_ori.is_none() { + comp_sync_package.comp_inserted(uid, *o); + } else if send_now { + comp_sync_package.comp_modified(uid, *o); + } + } + } + + client.send_fallible(ServerGeneral::CompSync(comp_sync_package)); + } + }, + ); + drop(guard); + + // TODO: this is a quick hack to make the loop over regions above parallel, + // there might is probably a way to make it cleaner + // + // Remove delete entities for each region that we alread handled above + region_map + .iter() + .for_each(|(key, _)| drop(deleted_entities.take_deleted_in_region(key))); + + // Update the last physics components for each entity + for (_, &pos, vel, ori, last_pos, last_vel, last_ori) in ( + &entities, + &positions, + velocities.maybe(), + orientations.maybe(), + last_pos.entries(), + last_vel.entries(), + last_ori.entries(), + ) + .join() + { + last_pos.replace(Last(pos)); + vel.and_then(|&v| last_vel.replace(Last(v))); + ori.and_then(|&o| last_ori.replace(Last(o))); } // Handle entity deletion in regions that don't exist in RegionMap diff --git a/server/src/sys/sentinel.rs b/server/src/sys/sentinel.rs index 7d4008ec5d..96ee14cf53 100644 --- a/server/src/sys/sentinel.rs +++ b/server/src/sys/sentinel.rs @@ -434,9 +434,7 @@ impl DeletedEntities { self.map.remove(&key) } - pub fn get_deleted_in_region(&mut self, key: Vec2) -> Option<&Vec> { - self.map.get(&key) - } + pub fn get_deleted_in_region(&self, key: Vec2) -> Option<&Vec> { self.map.get(&key) } pub fn take_remaining_deleted(&mut self) -> Vec<(Vec2, Vec)> { // TODO: don't allocate diff --git a/server/src/sys/subscription.rs b/server/src/sys/subscription.rs index 9b83dfede7..43a4329f00 100644 --- a/server/src/sys/subscription.rs +++ b/server/src/sys/subscription.rs @@ -13,7 +13,7 @@ use common::{ use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::ServerGeneral; use specs::{ - Entities, Join, ReadExpect, ReadStorage, SystemData, World, WorldExt, Write, WriteStorage, + Entities, Join, Read, ReadExpect, ReadStorage, SystemData, World, WorldExt, WriteStorage, }; use tracing::{debug, error}; use vek::*; @@ -33,7 +33,7 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Presence>, ReadStorage<'a, Client>, WriteStorage<'a, RegionSubscription>, - Write<'a, DeletedEntities>, + Read<'a, DeletedEntities>, TrackedComps<'a>, ); @@ -54,7 +54,7 @@ impl<'a> System<'a> for Sys { presences, clients, mut subscriptions, - mut deleted_entities, + deleted_entities, tracked_comps, ): Self::SystemData, ) {