Parallelize entity sync loop over regions.

This commit is contained in:
Imbris 2021-12-25 18:06:55 -05:00
parent 9788b144ec
commit 6547a6bf5e
7 changed files with 226 additions and 190 deletions

1
Cargo.lock generated
View File

@ -2620,6 +2620,7 @@ checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"hashbrown 0.11.2", "hashbrown 0.11.2",
"rayon",
"serde", "serde",
] ]

View File

@ -73,7 +73,7 @@ kiddo = { version = "0.1", optional = true }
# Data structures # Data structures
hashbrown = { version = "0.11", features = ["rayon", "serde", "nightly"] } hashbrown = { version = "0.11", features = ["rayon", "serde", "nightly"] }
slotmap = { version = "1.0", features = ["serde"] } slotmap = { version = "1.0", features = ["serde"] }
indexmap = "1.3.0" indexmap = { version = "1.3.0", features = ["rayon"] }
slab = "0.4.2" slab = "0.4.2"
# ECS # ECS

View File

@ -342,16 +342,24 @@ impl RegionMap {
} }
} }
// Returns a region given a key /// Returns a region given a key
pub fn get(&self, key: Vec2<i32>) -> Option<&Region> { self.regions.get(&key) } pub fn get(&self, key: Vec2<i32>) -> Option<&Region> { self.regions.get(&key) }
// Returns an iterator of (Position, Region) /// Returns an iterator of (Position, Region)
pub fn iter(&self) -> impl Iterator<Item = (Vec2<i32>, &Region)> { pub fn iter(&self) -> impl Iterator<Item = (Vec2<i32>, &Region)> {
self.regions.iter().map(|(key, r)| (*key, r)) self.regions.iter().map(|(key, r)| (*key, r))
} }
/// Returns a parallel iterator of (Position, Regions)
pub fn par_iter(
&self,
) -> impl rayon::iter::IndexedParallelIterator<Item = (Vec2<i32>, &Region)> {
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
self.regions.par_iter().map(|(key, r)| (*key, r))
}
} }
// Note vd is in blocks in this case /// Note vd is in blocks in this case
pub fn region_in_vd(key: Vec2<i32>, pos: Vec3<f32>, vd: f32) -> bool { pub fn region_in_vd(key: Vec2<i32>, pos: Vec3<f32>, vd: f32) -> bool {
let vd_extended = vd + TETHER_LENGTH as f32 * 2.0f32.sqrt(); let vd_extended = vd + TETHER_LENGTH as f32 * 2.0f32.sqrt();

View File

@ -136,7 +136,12 @@ impl Client {
} }
} }
pub(crate) fn send_fallible<M: Into<ServerMsg>>(&self, msg: M) { let _ = self.send(msg); } pub(crate) fn send_fallible<M: Into<ServerMsg>>(&self, msg: M) {
// TODO: hack to avoid locking stream mutex while serializing the message,
// remove this when the mutexes on the Streams are removed
let prepared = self.prepare(msg);
let _ = self.send_prepared(&prepared);
}
pub(crate) fn send_prepared(&self, msg: &PreparedMsg) -> Result<(), StreamError> { pub(crate) fn send_prepared(&self, msg: &PreparedMsg) -> Result<(), StreamError> {
match msg.stream_id { match msg.stream_id {

View File

@ -23,6 +23,7 @@ use vek::*;
/// This system will send physics updates to the client /// This system will send physics updates to the client
#[derive(Default)] #[derive(Default)]
pub struct Sys; pub struct Sys;
impl<'a> System<'a> for Sys { impl<'a> System<'a> for Sys {
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
type SystemData = ( type SystemData = (
@ -105,201 +106,224 @@ impl<'a> System<'a> for Sys {
// 5. Inform clients of the component changes for that entity // 5. Inform clients of the component changes for that entity
// - Throttle update rate base on distance to each client // - Throttle update rate base on distance to each client
// Sync physics // Sync physics and other components
// via iterating through regions // via iterating through regions (in parallel)
for (key, region) in region_map.iter() { use rayon::iter::ParallelIterator;
// Assemble subscriber list for this region by iterating through clients and common_base::prof_span!(guard, "regions");
// checking if they are subscribed to this region region_map.par_iter().for_each_init(
let mut subscribers = ( || {
&clients, common_base::prof_span!(guard, "entity sync rayon job");
&entities, guard
presences.maybe(), },
&subscriptions, |_guard, (key, region)| {
&positions, // Assemble subscriber list for this region by iterating through clients and
) // checking if they are subscribed to this region
.join() let mut subscribers = (
.filter_map(|(client, entity, presence, subscription, pos)| { &clients,
if presence.is_some() && subscription.regions.contains(&key) { &entities,
Some((client, &subscription.regions, entity, *pos)) presences.maybe(),
} else { &subscriptions,
None &positions,
} )
}) .join()
.collect::<Vec<_>>(); .filter_map(|(client, entity, presence, subscription, pos)| {
if presence.is_some() && subscription.regions.contains(&key) {
for event in region.events() { Some((client, &subscription.regions, entity, *pos))
match event { } else {
RegionEvent::Entered(id, maybe_key) => { None
// Don't process newly created entities here (redundant network messages)
if trackers.uid.inserted().contains(*id) {
continue;
} }
let entity = entities.entity(*id); })
if let Some(pkg) = positions .collect::<Vec<_>>();
.get(entity)
.map(|pos| (pos, velocities.get(entity), orientations.get(entity))) for event in region.events() {
.and_then(|(pos, vel, ori)| { match event {
tracked_comps.create_entity_package( RegionEvent::Entered(id, maybe_key) => {
entity, // Don't process newly created entities here (redundant network
Some(*pos), // messages)
vel.copied(), if trackers.uid.inserted().contains(*id) {
ori.copied(), continue;
) }
}) let entity = entities.entity(*id);
{ if let Some(pkg) = positions
let create_msg = ServerGeneral::CreateEntity(pkg); .get(entity)
for (client, regions, client_entity, _) in &mut subscribers { .map(|pos| (pos, velocities.get(entity), orientations.get(entity)))
if maybe_key .and_then(|(pos, vel, ori)| {
tracked_comps.create_entity_package(
entity,
Some(*pos),
vel.copied(),
ori.copied(),
)
})
{
let create_msg = ServerGeneral::CreateEntity(pkg);
for (client, regions, client_entity, _) in &mut subscribers {
if maybe_key
.as_ref() .as_ref()
.map(|key| !regions.contains(key)) .map(|key| !regions.contains(key))
.unwrap_or(true) .unwrap_or(true)
// Client doesn't need to know about itself // Client doesn't need to know about itself
&& *client_entity != entity && *client_entity != entity
{ {
client.send_fallible(create_msg.clone()); client.send_fallible(create_msg.clone());
}
} }
} }
} },
}, RegionEvent::Left(id, maybe_key) => {
RegionEvent::Left(id, maybe_key) => { // Lookup UID for entity
// Lookup UID for entity if let Some(&uid) = uids.get(entities.entity(*id)) {
if let Some(&uid) = uids.get(entities.entity(*id)) { for (client, regions, _, _) in &mut subscribers {
for (client, regions, _, _) in &mut subscribers { if maybe_key
if maybe_key .as_ref()
.as_ref() .map(|key| !regions.contains(key))
.map(|key| !regions.contains(key)) .unwrap_or(true)
.unwrap_or(true) {
{ client.send_fallible(ServerGeneral::DeleteEntity(uid));
client.send_fallible(ServerGeneral::DeleteEntity(uid)); }
} }
} }
} },
}, }
} }
}
// Sync tracked components // Sync tracked components
// Get deleted entities in this region from DeletedEntities // Get deleted entities in this region from DeletedEntities
let (entity_sync_package, comp_sync_package) = trackers.create_sync_packages( let (entity_sync_package, comp_sync_package) = trackers.create_sync_packages(
&tracked_comps, &tracked_comps,
region.entities(),
deleted_entities
.take_deleted_in_region(key)
.unwrap_or_default(),
);
// We lazily initializethe the synchronization messages in case there are no
// clients.
let mut entity_comp_sync = Either::Left((entity_sync_package, comp_sync_package));
for (client, _, _, _) in &mut subscribers {
let msg =
entity_comp_sync.right_or_else(|(entity_sync_package, comp_sync_package)| {
(
client.prepare(ServerGeneral::EntitySync(entity_sync_package)),
client.prepare(ServerGeneral::CompSync(comp_sync_package)),
)
});
// We don't care much about stream errors here since they could just represent
// network disconnection, which is handled elsewhere.
let _ = client.send_prepared(&msg.0);
let _ = client.send_prepared(&msg.1);
entity_comp_sync = Either::Right(msg);
}
for (client, _, client_entity, client_pos) in &mut subscribers {
let mut comp_sync_package = CompSyncPackage::new();
for (_, entity, &uid, (&pos, last_pos), vel, ori, force_update, collider) in (
region.entities(), region.entities(),
&entities, deleted_entities
&uids, .get_deleted_in_region(key)
(&positions, last_pos.mask().maybe()), .cloned() // TODO: quick hack to make this parallel, we can avoid this
(&velocities, last_vel.mask().maybe()).maybe(), .unwrap_or_default(),
(&orientations, last_vel.mask().maybe()).maybe(), );
force_updates.mask().maybe(), // We lazily initialize the the synchronization messages in case there are no
colliders.maybe(), // clients.
) let mut entity_comp_sync = Either::Left((entity_sync_package, comp_sync_package));
.join() for (client, _, _, _) in &mut subscribers {
{ let msg = entity_comp_sync.right_or_else(
// Decide how regularly to send physics updates. |(entity_sync_package, comp_sync_package)| {
let send_now = if client_entity == &entity { (
let player_physics_setting = players client.prepare(ServerGeneral::EntitySync(entity_sync_package)),
.get(entity) client.prepare(ServerGeneral::CompSync(comp_sync_package)),
.and_then(|p| player_physics_settings.settings.get(&p.uuid()).copied()) )
.unwrap_or_default(); },
// Don't send client physics updates about itself unless force update is set );
// or the client is subject to server-authoritative physics // We don't care much about stream errors here since they could just represent
force_update.is_some() || player_physics_setting.server_authoritative() // network disconnection, which is handled elsewhere.
} else if matches!(collider, Some(Collider::Voxel { .. })) { let _ = client.send_prepared(&msg.0);
// Things with a voxel collider (airships, etc.) need to have very stable let _ = client.send_prepared(&msg.1);
// physics so we always send updated for these where entity_comp_sync = Either::Right(msg);
// we can.
true
} else {
// Throttle update rates for all other entities based on distance to client
let distance_sq = client_pos.0.distance_squared(pos.0);
let id_staggered_tick = tick + entity.id() as u64;
// More entities farther away so checks start there
if distance_sq > 500.0f32.powi(2) {
id_staggered_tick % 32 == 0
} else if distance_sq > 300.0f32.powi(2) {
id_staggered_tick % 16 == 0
} else if distance_sq > 200.0f32.powi(2) {
id_staggered_tick % 8 == 0
} else if distance_sq > 120.0f32.powi(2) {
id_staggered_tick % 6 == 0
} else if distance_sq > 64.0f32.powi(2) {
id_staggered_tick % 3 == 0
} else if distance_sq > 24.0f32.powi(2) {
id_staggered_tick % 2 == 0
} else {
true
}
};
if last_pos.is_none() {
comp_sync_package.comp_inserted(uid, pos);
} else if send_now {
comp_sync_package.comp_modified(uid, pos);
}
if let Some((v, last_vel)) = vel {
if last_vel.is_none() {
comp_sync_package.comp_inserted(uid, *v);
} else if send_now {
comp_sync_package.comp_modified(uid, *v);
}
}
if let Some((o, last_ori)) = ori {
if last_ori.is_none() {
comp_sync_package.comp_inserted(uid, *o);
} else if send_now {
comp_sync_package.comp_modified(uid, *o);
}
}
} }
client.send_fallible(ServerGeneral::CompSync(comp_sync_package)); for (client, _, client_entity, client_pos) in &mut subscribers {
} let mut comp_sync_package = CompSyncPackage::new();
// Update the last physics components for each entity for (_, entity, &uid, (&pos, last_pos), vel, ori, force_update, collider) in (
for (_, _, &pos, vel, ori, last_pos, last_vel, last_ori) in ( region.entities(),
region.entities(), &entities,
&entities, &uids,
&positions, (&positions, last_pos.mask().maybe()),
velocities.maybe(), (&velocities, last_vel.mask().maybe()).maybe(),
orientations.maybe(), (&orientations, last_vel.mask().maybe()).maybe(),
last_pos.entries(), force_updates.mask().maybe(),
last_vel.entries(), colliders.maybe(),
last_ori.entries(), )
) .join()
.join() {
{ // Decide how regularly to send physics updates.
last_pos.replace(Last(pos)); let send_now = if client_entity == &entity {
vel.and_then(|&v| last_vel.replace(Last(v))); let player_physics_setting = players
ori.and_then(|&o| last_ori.replace(Last(o))); .get(entity)
} .and_then(|p| {
player_physics_settings.settings.get(&p.uuid()).copied()
})
.unwrap_or_default();
// Don't send client physics updates about itself unless force update is
// set or the client is subject to
// server-authoritative physics
force_update.is_some() || player_physics_setting.server_authoritative()
} else if matches!(collider, Some(Collider::Voxel { .. })) {
// Things with a voxel collider (airships, etc.) need to have very
// stable physics so we always send updated
// for these where we can.
true
} else {
// Throttle update rates for all other entities based on distance to
// client
let distance_sq = client_pos.0.distance_squared(pos.0);
let id_staggered_tick = tick + entity.id() as u64;
// More entities farther away so checks start there
if distance_sq > 500.0f32.powi(2) {
id_staggered_tick % 32 == 0
} else if distance_sq > 300.0f32.powi(2) {
id_staggered_tick % 16 == 0
} else if distance_sq > 200.0f32.powi(2) {
id_staggered_tick % 8 == 0
} else if distance_sq > 120.0f32.powi(2) {
id_staggered_tick % 6 == 0
} else if distance_sq > 64.0f32.powi(2) {
id_staggered_tick % 3 == 0
} else if distance_sq > 24.0f32.powi(2) {
id_staggered_tick % 2 == 0
} else {
true
}
};
if last_pos.is_none() {
comp_sync_package.comp_inserted(uid, pos);
} else if send_now {
comp_sync_package.comp_modified(uid, pos);
}
if let Some((v, last_vel)) = vel {
if last_vel.is_none() {
comp_sync_package.comp_inserted(uid, *v);
} else if send_now {
comp_sync_package.comp_modified(uid, *v);
}
}
if let Some((o, last_ori)) = ori {
if last_ori.is_none() {
comp_sync_package.comp_inserted(uid, *o);
} else if send_now {
comp_sync_package.comp_modified(uid, *o);
}
}
}
client.send_fallible(ServerGeneral::CompSync(comp_sync_package));
}
},
);
drop(guard);
// TODO: this is a quick hack to make the loop over regions above parallel,
// there might is probably a way to make it cleaner
//
// Remove delete entities for each region that we alread handled above
region_map
.iter()
.for_each(|(key, _)| drop(deleted_entities.take_deleted_in_region(key)));
// Update the last physics components for each entity
for (_, &pos, vel, ori, last_pos, last_vel, last_ori) in (
&entities,
&positions,
velocities.maybe(),
orientations.maybe(),
last_pos.entries(),
last_vel.entries(),
last_ori.entries(),
)
.join()
{
last_pos.replace(Last(pos));
vel.and_then(|&v| last_vel.replace(Last(v)));
ori.and_then(|&o| last_ori.replace(Last(o)));
} }
// Handle entity deletion in regions that don't exist in RegionMap // Handle entity deletion in regions that don't exist in RegionMap

View File

@ -434,9 +434,7 @@ impl DeletedEntities {
self.map.remove(&key) self.map.remove(&key)
} }
pub fn get_deleted_in_region(&mut self, key: Vec2<i32>) -> Option<&Vec<u64>> { pub fn get_deleted_in_region(&self, key: Vec2<i32>) -> Option<&Vec<u64>> { self.map.get(&key) }
self.map.get(&key)
}
pub fn take_remaining_deleted(&mut self) -> Vec<(Vec2<i32>, Vec<u64>)> { pub fn take_remaining_deleted(&mut self) -> Vec<(Vec2<i32>, Vec<u64>)> {
// TODO: don't allocate // TODO: don't allocate

View File

@ -13,7 +13,7 @@ use common::{
use common_ecs::{Job, Origin, Phase, System}; use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::ServerGeneral; use common_net::msg::ServerGeneral;
use specs::{ use specs::{
Entities, Join, ReadExpect, ReadStorage, SystemData, World, WorldExt, Write, WriteStorage, Entities, Join, Read, ReadExpect, ReadStorage, SystemData, World, WorldExt, WriteStorage,
}; };
use tracing::{debug, error}; use tracing::{debug, error};
use vek::*; use vek::*;
@ -33,7 +33,7 @@ impl<'a> System<'a> for Sys {
ReadStorage<'a, Presence>, ReadStorage<'a, Presence>,
ReadStorage<'a, Client>, ReadStorage<'a, Client>,
WriteStorage<'a, RegionSubscription>, WriteStorage<'a, RegionSubscription>,
Write<'a, DeletedEntities>, Read<'a, DeletedEntities>,
TrackedComps<'a>, TrackedComps<'a>,
); );
@ -54,7 +54,7 @@ impl<'a> System<'a> for Sys {
presences, presences,
clients, clients,
mut subscriptions, mut subscriptions,
mut deleted_entities, deleted_entities,
tracked_comps, tracked_comps,
): Self::SystemData, ): Self::SystemData,
) { ) {