mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
entities are synced from and displayed in. NOTE: Syncing entities work at the granularity regions which are multi-chunk squares but the display of entities in voxygen is limited in a circle with the radiues of the supplied distance. Additional details and changes: * Added `ViewDistances` struct in `common` that contains separate terrain and entity view distances (the entity view distance will be clamped by the terrain view distance in uses of this). * View distance requests from the client to the server now use this type. * When requesting the character or spectate state the client now passes its desired view distances. This is exposed as a new parameter on `Client::request_character`/`Client::request_spectate`. And the client no longer needs to send a view distance request after entering these states. This also allows us to avoid needing to initialize `Presence` with a default view distance value on the server. * Removed `DerefFlaggedStorage` from `Presence` and `RegionSubscription` since the change tracking isn't used for these components. * Add sliders in voxygen graphics and network tabs for this new setting. Show the clamped value as well as the selected value next to the slider. * Rename existing "Entities View Distance" slider (which AFAIK controls the distance at which different LOD levels apply to figures) to "Entities Detail Distance" so we can use the former name for this new slider.
428 lines
18 KiB
Rust
428 lines
18 KiB
Rust
use super::sentinel::{DeletedEntities, TrackedStorages, UpdateTrackers};
|
|
use crate::{
|
|
client::Client,
|
|
presence::{Presence, RegionSubscription},
|
|
Tick,
|
|
};
|
|
use common::{
|
|
calendar::Calendar,
|
|
comp::{Collider, ForceUpdate, InventoryUpdate, Last, Ori, Pos, Vel},
|
|
event::EventBus,
|
|
outcome::Outcome,
|
|
region::{Event as RegionEvent, RegionMap},
|
|
resources::{PlayerPhysicsSettings, TimeOfDay},
|
|
terrain::TerrainChunkSize,
|
|
uid::Uid,
|
|
vol::RectVolSize,
|
|
};
|
|
use common_ecs::{Job, Origin, Phase, System};
|
|
use common_net::{msg::ServerGeneral, sync::CompSyncPackage};
|
|
use itertools::Either;
|
|
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, Write, WriteStorage};
|
|
use vek::*;
|
|
|
|
/// This system will send physics updates to the client
|
|
#[derive(Default)]
|
|
pub struct Sys;
|
|
|
|
impl<'a> System<'a> for Sys {
|
|
#[allow(clippy::type_complexity)]
|
|
type SystemData = (
|
|
Entities<'a>,
|
|
Read<'a, Tick>,
|
|
Read<'a, PlayerPhysicsSettings>,
|
|
TrackedStorages<'a>,
|
|
ReadExpect<'a, TimeOfDay>,
|
|
ReadExpect<'a, Calendar>,
|
|
ReadExpect<'a, RegionMap>,
|
|
ReadExpect<'a, UpdateTrackers>,
|
|
ReadStorage<'a, Pos>,
|
|
ReadStorage<'a, Vel>,
|
|
ReadStorage<'a, Ori>,
|
|
ReadStorage<'a, RegionSubscription>,
|
|
ReadStorage<'a, Presence>,
|
|
ReadStorage<'a, Client>,
|
|
WriteStorage<'a, Last<Pos>>,
|
|
WriteStorage<'a, Last<Vel>>,
|
|
WriteStorage<'a, Last<Ori>>,
|
|
WriteStorage<'a, ForceUpdate>,
|
|
WriteStorage<'a, InventoryUpdate>,
|
|
Write<'a, DeletedEntities>,
|
|
Read<'a, EventBus<Outcome>>,
|
|
);
|
|
|
|
const NAME: &'static str = "entity_sync";
|
|
const ORIGIN: Origin = Origin::Server;
|
|
const PHASE: Phase = Phase::Create;
|
|
|
|
fn run(
|
|
job: &mut Job<Self>,
|
|
(
|
|
entities,
|
|
tick,
|
|
player_physics_settings,
|
|
tracked_storages,
|
|
time_of_day,
|
|
calendar,
|
|
region_map,
|
|
trackers,
|
|
positions,
|
|
velocities,
|
|
orientations,
|
|
subscriptions,
|
|
presences,
|
|
clients,
|
|
mut last_pos,
|
|
mut last_vel,
|
|
mut last_ori,
|
|
mut force_updates,
|
|
mut inventory_updates,
|
|
mut deleted_entities,
|
|
outcomes,
|
|
): Self::SystemData,
|
|
) {
|
|
let tick = tick.0;
|
|
|
|
// Storages already provided in `TrackedStorages` that we need to use
|
|
// for other things besides change detection.
|
|
let uids = &tracked_storages.uid;
|
|
let colliders = &tracked_storages.collider;
|
|
let inventories = &tracked_storages.inventory;
|
|
let players = &tracked_storages.player;
|
|
let is_rider = &tracked_storages.is_rider;
|
|
|
|
// To send entity updates
|
|
// 1. Iterate through regions
|
|
// 2. Iterate through region subscribers (ie clients)
|
|
// - Collect a list of entity ids for clients who are subscribed to this
|
|
// region (hash calc to check each)
|
|
// 3. Iterate through events from that region
|
|
// - For each entity entered event, iterate through the client list and
|
|
// check if they are subscribed to the source (hash calc per subscribed
|
|
// client per entity event), if not subscribed to the source send a entity
|
|
// creation message to that client
|
|
// - For each entity left event, iterate through the client list and check
|
|
// if they are subscribed to the destination (hash calc per subscribed
|
|
// client per entity event)
|
|
// 4. Iterate through entities in that region
|
|
// 5. Inform clients of the component changes for that entity
|
|
// - Throttle update rate base on distance to each client
|
|
|
|
// Sync physics and other components
|
|
// via iterating through regions (in parallel)
|
|
|
|
// Pre-collect regions paired with deleted entity list so we can iterate over
|
|
// them in parallel below
|
|
let regions_and_deleted_entities = region_map
|
|
.iter()
|
|
.map(|(key, region)| (key, region, deleted_entities.take_deleted_in_region(key)))
|
|
.collect::<Vec<_>>();
|
|
|
|
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
|
job.cpu_stats.measure(common_ecs::ParMode::Rayon);
|
|
common_base::prof_span!(guard, "regions");
|
|
regions_and_deleted_entities.into_par_iter().for_each_init(
|
|
|| {
|
|
common_base::prof_span!(guard, "entity sync rayon job");
|
|
guard
|
|
},
|
|
|_guard, (key, region, deleted_entities_in_region)| {
|
|
// Assemble subscriber list for this region by iterating through clients and
|
|
// checking if they are subscribed to this region
|
|
let mut subscribers = (
|
|
&clients,
|
|
&entities,
|
|
presences.maybe(),
|
|
&subscriptions,
|
|
&positions,
|
|
)
|
|
.join()
|
|
.filter_map(|(client, entity, presence, subscription, pos)| {
|
|
if presence.is_some() && subscription.regions.contains(&key) {
|
|
Some((client, &subscription.regions, entity, *pos))
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect::<Vec<_>>();
|
|
|
|
for event in region.events() {
|
|
match event {
|
|
RegionEvent::Entered(id, maybe_key) => {
|
|
// Don't process newly created entities here (redundant network
|
|
// messages)
|
|
if trackers.uid.inserted().contains(*id) {
|
|
continue;
|
|
}
|
|
let entity = entities.entity(*id);
|
|
if let Some(pkg) = positions
|
|
.get(entity)
|
|
.map(|pos| (pos, velocities.get(entity), orientations.get(entity)))
|
|
.and_then(|(pos, vel, ori)| {
|
|
tracked_storages.create_entity_package(
|
|
entity,
|
|
Some(*pos),
|
|
vel.copied(),
|
|
ori.copied(),
|
|
)
|
|
})
|
|
{
|
|
let create_msg = ServerGeneral::CreateEntity(pkg);
|
|
for (client, regions, client_entity, _) in &mut subscribers {
|
|
if maybe_key
|
|
.as_ref()
|
|
.map(|key| !regions.contains(key))
|
|
.unwrap_or(true)
|
|
// Client doesn't need to know about itself
|
|
&& *client_entity != entity
|
|
{
|
|
client.send_fallible(create_msg.clone());
|
|
}
|
|
}
|
|
}
|
|
},
|
|
RegionEvent::Left(id, maybe_key) => {
|
|
// Lookup UID for entity
|
|
if let Some(&uid) = uids.get(entities.entity(*id)) {
|
|
for (client, regions, _, _) in &mut subscribers {
|
|
if maybe_key
|
|
.as_ref()
|
|
.map(|key| !regions.contains(key))
|
|
.unwrap_or(true)
|
|
{
|
|
client.send_fallible(ServerGeneral::DeleteEntity(uid));
|
|
}
|
|
}
|
|
}
|
|
},
|
|
}
|
|
}
|
|
|
|
// Sync tracked components
|
|
// Get deleted entities in this region from DeletedEntities
|
|
let (entity_sync_package, comp_sync_package) = trackers.create_sync_packages(
|
|
&tracked_storages,
|
|
region.entities(),
|
|
deleted_entities_in_region,
|
|
);
|
|
// We lazily initialize the the synchronization messages in case there are no
|
|
// clients.
|
|
let mut entity_comp_sync = Either::Left((entity_sync_package, comp_sync_package));
|
|
for (client, _, client_entity, _) in &mut subscribers {
|
|
let msg = entity_comp_sync.right_or_else(
|
|
|(entity_sync_package, comp_sync_package)| {
|
|
(
|
|
client.prepare(ServerGeneral::EntitySync(entity_sync_package)),
|
|
client.prepare(ServerGeneral::CompSync(
|
|
comp_sync_package,
|
|
force_updates.get(*client_entity).map_or(0, |f| f.counter()),
|
|
)),
|
|
)
|
|
},
|
|
);
|
|
// We don't care much about stream errors here since they could just represent
|
|
// network disconnection, which is handled elsewhere.
|
|
let _ = client.send_prepared(&msg.0);
|
|
let _ = client.send_prepared(&msg.1);
|
|
entity_comp_sync = Either::Right(msg);
|
|
}
|
|
|
|
for (client, _, client_entity, client_pos) in &mut subscribers {
|
|
let mut comp_sync_package = CompSyncPackage::new();
|
|
|
|
for (_, entity, &uid, (&pos, last_pos), vel, ori, force_update, collider) in (
|
|
region.entities(),
|
|
&entities,
|
|
uids,
|
|
(&positions, last_pos.mask().maybe()),
|
|
(&velocities, last_vel.mask().maybe()).maybe(),
|
|
(&orientations, last_vel.mask().maybe()).maybe(),
|
|
force_updates.maybe(),
|
|
colliders.maybe(),
|
|
)
|
|
.join()
|
|
{
|
|
// Decide how regularly to send physics updates.
|
|
let send_now = if client_entity == &entity {
|
|
let player_physics_setting = players
|
|
.get(entity)
|
|
.and_then(|p| {
|
|
player_physics_settings.settings.get(&p.uuid()).copied()
|
|
})
|
|
.unwrap_or_default();
|
|
// Don't send client physics updates about itself unless force update is
|
|
// set or the client is subject to
|
|
// server-authoritative physics
|
|
force_update.map_or(false, |f| f.is_forced())
|
|
|| player_physics_setting.server_authoritative()
|
|
|| is_rider.get(entity).is_some()
|
|
} else if matches!(collider, Some(Collider::Voxel { .. })) {
|
|
// Things with a voxel collider (airships, etc.) need to have very
|
|
// stable physics so we always send updated
|
|
// for these where we can.
|
|
true
|
|
} else {
|
|
// Throttle update rates for all other entities based on distance to
|
|
// client
|
|
let distance_sq = client_pos.0.distance_squared(pos.0);
|
|
let id_staggered_tick = tick + entity.id() as u64;
|
|
|
|
// More entities farther away so checks start there
|
|
if distance_sq > 500.0f32.powi(2) {
|
|
id_staggered_tick % 32 == 0
|
|
} else if distance_sq > 300.0f32.powi(2) {
|
|
id_staggered_tick % 16 == 0
|
|
} else if distance_sq > 200.0f32.powi(2) {
|
|
id_staggered_tick % 8 == 0
|
|
} else if distance_sq > 120.0f32.powi(2) {
|
|
id_staggered_tick % 6 == 0
|
|
} else if distance_sq > 64.0f32.powi(2) {
|
|
id_staggered_tick % 3 == 0
|
|
} else if distance_sq > 24.0f32.powi(2) {
|
|
id_staggered_tick % 2 == 0
|
|
} else {
|
|
true
|
|
}
|
|
};
|
|
|
|
if last_pos.is_none() {
|
|
comp_sync_package.comp_inserted(uid, pos);
|
|
} else if send_now {
|
|
comp_sync_package.comp_modified(uid, pos);
|
|
}
|
|
|
|
if let Some((v, last_vel)) = vel {
|
|
if last_vel.is_none() {
|
|
comp_sync_package.comp_inserted(uid, *v);
|
|
} else if send_now {
|
|
comp_sync_package.comp_modified(uid, *v);
|
|
}
|
|
}
|
|
|
|
if let Some((o, last_ori)) = ori {
|
|
if last_ori.is_none() {
|
|
comp_sync_package.comp_inserted(uid, *o);
|
|
} else if send_now {
|
|
comp_sync_package.comp_modified(uid, *o);
|
|
}
|
|
}
|
|
}
|
|
|
|
client.send_fallible(ServerGeneral::CompSync(
|
|
comp_sync_package,
|
|
force_updates.get(*client_entity).map_or(0, |f| f.counter()),
|
|
));
|
|
}
|
|
},
|
|
);
|
|
drop(guard);
|
|
job.cpu_stats.measure(common_ecs::ParMode::Single);
|
|
|
|
// Update the last physics components for each entity
|
|
for (_, &pos, vel, ori, last_pos, last_vel, last_ori) in (
|
|
&entities,
|
|
&positions,
|
|
velocities.maybe(),
|
|
orientations.maybe(),
|
|
last_pos.entries(),
|
|
last_vel.entries(),
|
|
last_ori.entries(),
|
|
)
|
|
.join()
|
|
{
|
|
last_pos.replace(Last(pos));
|
|
vel.and_then(|&v| last_vel.replace(Last(v)));
|
|
ori.and_then(|&o| last_ori.replace(Last(o)));
|
|
}
|
|
|
|
// Handle entity deletion in regions that don't exist in RegionMap
|
|
// (theoretically none)
|
|
for (region_key, deleted) in deleted_entities.take_remaining_deleted() {
|
|
for client in (presences.maybe(), &subscriptions, &clients)
|
|
.join()
|
|
.filter_map(|(presence, subscription, client)| {
|
|
if presence.is_some() && subscription.regions.contains(®ion_key) {
|
|
Some(client)
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
{
|
|
for uid in &deleted {
|
|
client.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid)));
|
|
}
|
|
}
|
|
}
|
|
|
|
// TODO: Sync clients that don't have a position?
|
|
|
|
// Sync inventories
|
|
for (inventory, update, client) in (inventories, &inventory_updates, &clients).join() {
|
|
client.send_fallible(ServerGeneral::InventoryUpdate(
|
|
inventory.clone(),
|
|
update.event(),
|
|
));
|
|
}
|
|
|
|
// Sync components that are only synced for the client's own entity.
|
|
for (entity, client) in (&entities, &clients).join() {
|
|
let comp_sync_package =
|
|
trackers.create_sync_from_client_package(&tracked_storages, entity);
|
|
if !comp_sync_package.is_empty() {
|
|
client.send_fallible(ServerGeneral::CompSync(
|
|
comp_sync_package,
|
|
force_updates.get(entity).map_or(0, |f| f.counter()),
|
|
));
|
|
}
|
|
}
|
|
|
|
// Consume/clear the current outcomes and convert them to a vec
|
|
let outcomes = outcomes.recv_all().collect::<Vec<_>>();
|
|
|
|
// Sync outcomes
|
|
for (presence, pos, client) in (presences.maybe(), positions.maybe(), &clients).join() {
|
|
let is_near = |o_pos: Vec3<f32>| {
|
|
pos.zip_with(presence, |pos, presence| {
|
|
pos.0.xy().distance_squared(o_pos.xy())
|
|
< (presence.entity_view_distance.current() as f32
|
|
* TerrainChunkSize::RECT_SIZE.x as f32)
|
|
.powi(2)
|
|
})
|
|
};
|
|
|
|
let outcomes = outcomes
|
|
.iter()
|
|
.filter(|o| o.get_pos().and_then(&is_near).unwrap_or(true))
|
|
.cloned()
|
|
.collect::<Vec<_>>();
|
|
|
|
if !outcomes.is_empty() {
|
|
client.send_fallible(ServerGeneral::Outcomes(outcomes));
|
|
}
|
|
}
|
|
|
|
// Remove all force flags.
|
|
for force_update in (&mut force_updates).join() {
|
|
force_update.clear();
|
|
}
|
|
inventory_updates.clear();
|
|
|
|
// Sync resources
|
|
// TODO: doesn't really belong in this system (rename system or create another
|
|
// system?)
|
|
const TOD_SYNC_FREQ: u64 = 100;
|
|
if tick % TOD_SYNC_FREQ == 0 {
|
|
let mut tod_lazymsg = None;
|
|
for client in (&clients).join() {
|
|
let msg = tod_lazymsg.unwrap_or_else(|| {
|
|
client.prepare(ServerGeneral::TimeOfDay(*time_of_day, (*calendar).clone()))
|
|
});
|
|
// We don't care much about stream errors here since they could just represent
|
|
// network disconnection, which is handled elsewhere.
|
|
let _ = client.send_prepared(&msg);
|
|
tod_lazymsg = Some(msg);
|
|
}
|
|
}
|
|
}
|
|
}
|