Parallelize ingame messages.

This commit is contained in:
Joshua Yanovski 2022-09-15 19:38:13 -07:00
parent 4b5a9fe0f4
commit 7d3b8ea81e
6 changed files with 218 additions and 87 deletions

View File

@ -887,7 +887,6 @@ impl Client {
| ClientGeneral::PlayerPhysics { .. }
| ClientGeneral::UnlockSkill(_)
| ClientGeneral::RequestSiteInfo(_)
| ClientGeneral::UnlockSkillGroup(_)
| ClientGeneral::RequestPlayerPhysics { .. }
| ClientGeneral::RequestLossyTerrainCompression { .. }
| ClientGeneral::UpdateMapMarker(_) => {

View File

@ -2,7 +2,7 @@ use super::{world_msg::SiteId, PingMsg};
use common::{
character::CharacterId,
comp,
comp::{Skill, SkillGroupKind},
comp::Skill,
terrain::block::Block,
};
use serde::{Deserialize, Serialize};
@ -76,7 +76,6 @@ pub enum ClientGeneral {
ori: comp::Ori,
},
UnlockSkill(Skill),
UnlockSkillGroup(SkillGroupKind),
RequestSiteInfo(SiteId),
UpdateMapMarker(comp::MapMarkerChange),
//Only in Game, via terrain stream
@ -133,7 +132,6 @@ impl ClientMsg {
| ClientGeneral::LodZoneRequest { .. }
| ClientGeneral::UnlockSkill(_)
| ClientGeneral::RequestSiteInfo(_)
| ClientGeneral::UnlockSkillGroup(_)
| ClientGeneral::RequestPlayerPhysics { .. }
| ClientGeneral::RequestLossyTerrainCompression { .. }
| ClientGeneral::UpdateMapMarker(_) => {

View File

@ -5,6 +5,7 @@ use crate::{
skills::{GeneralSkill, Skill},
},
};
use core::borrow::{Borrow, BorrowMut};
use hashbrown::HashMap;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
@ -328,15 +329,21 @@ impl SkillSet {
(skillset, persistence_load_error)
}
/// Check if a particular skill group is accessible for an entity, *if* it
/// exists.
fn skill_group_accessible_if_exists(&self, skill_group_kind: SkillGroupKind) -> bool {
self.has_skill(Skill::UnlockGroup(skill_group_kind))
}
/// Checks if a particular skill group is accessible for an entity
pub fn skill_group_accessible(&self, skill_group_kind: SkillGroupKind) -> bool {
self.skill_groups.contains_key(&skill_group_kind)
&& self.has_skill(Skill::UnlockGroup(skill_group_kind))
&& self.skill_group_accessible_if_exists(skill_group_kind)
}
/// Unlocks a skill group for a player. It starts with 0 exp and 0 skill
/// points.
pub fn unlock_skill_group(&mut self, skill_group_kind: SkillGroupKind) {
fn unlock_skill_group(&mut self, skill_group_kind: SkillGroupKind) {
if !self.skill_groups.contains_key(&skill_group_kind) {
self.skill_groups
.insert(skill_group_kind, SkillGroup::new(skill_group_kind));
@ -459,33 +466,56 @@ impl SkillSet {
/// Unlocks a skill for a player, assuming they have the relevant skill
/// group unlocked and available SP in that skill group.
pub fn unlock_skill(&mut self, skill: Skill) -> Result<(), SkillUnlockError> {
///
/// NOTE: Please don't use pathological or clever implementations of to_mut
/// here.
pub fn unlock_skill_cow<'a, B, C: 'a>(
this_: &'a mut B,
skill: Skill,
to_mut: impl FnOnce(&'a mut B) -> &'a mut C,
) -> Result<(), SkillUnlockError>
where
B: Borrow<SkillSet>,
C: BorrowMut<SkillSet>,
{
if let Some(skill_group_kind) = skill.skill_group_kind() {
let next_level = self.next_skill_level(skill);
let prerequisites_met = self.prerequisites_met(skill);
let this = (&*this_).borrow();
let next_level = this.next_skill_level(skill);
let prerequisites_met = this.prerequisites_met(skill);
// Check that skill is not yet at max level
if !matches!(self.skills.get(&skill), Some(level) if *level == skill.max_level()) {
if let Some(mut skill_group) = self.skill_group_mut(skill_group_kind) {
if !matches!(this.skills.get(&skill), Some(level) if *level == skill.max_level()) {
if let Some(skill_group) = this.skill_groups.get(&skill_group_kind) &&
this.skill_group_accessible_if_exists(skill_group_kind)
{
if prerequisites_met {
if let Some(new_available_sp) = skill_group
.available_sp
.checked_sub(skill.skill_cost(next_level))
{
// Perform all mutation inside this branch, to avoid triggering a copy
// on write or flagged storage in cases where this matters.
let this_ = to_mut(this_);
let mut this = this_.borrow_mut();
// NOTE: Verified to exist previously when we accessed
// this.skill_groups (assuming a non-pathological implementation of
// ToOwned).
let skill_group = this.skill_groups.get_mut(&skill_group_kind)
.expect("Verified to exist when we previously accessed this.skill_groups");
skill_group.available_sp = new_available_sp;
skill_group.ordered_skills.push(skill);
match skill {
Skill::UnlockGroup(group) => {
self.unlock_skill_group(group);
this.unlock_skill_group(group);
},
Skill::General(GeneralSkill::HealthIncrease) => {
self.modify_health = true;
this.modify_health = true;
},
Skill::General(GeneralSkill::EnergyIncrease) => {
self.modify_energy = true;
this.modify_energy = true;
},
_ => {},
}
self.skills.insert(skill, next_level);
this.skills.insert(skill, next_level);
Ok(())
} else {
trace!("Tried to unlock skill for skill group with insufficient SP");
@ -512,6 +542,12 @@ impl SkillSet {
}
}
/// Convenience function for the case where you have mutable access to the
/// skill.
pub fn unlock_skill(&mut self, skill: Skill) -> Result<(), SkillUnlockError> {
Self::unlock_skill_cow(self, skill, |x| x)
}
/// Checks if the player has available SP to spend
pub fn has_available_sp(&self) -> bool {
self.skill_groups.iter().any(|(kind, sg)| {

View File

@ -14,6 +14,7 @@
generic_const_exprs,
generic_arg_infer,
label_break_value,
let_chains,
option_zip,
portable_simd,
slice_as_chunks,

View File

@ -41,7 +41,7 @@ pub enum GameMode {
#[derive(Copy, Clone, Default, Debug)]
pub struct PlayerEntity(pub Option<Entity>);
#[derive(Copy, Clone, Debug, Default)]
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
pub struct PlayerPhysicsSetting {
/// true if the client wants server-authoratative physics (e.g. to use
/// airships properly)

View File

@ -9,21 +9,38 @@ use common::{
event::{EventBus, ServerEvent},
link::Is,
mounting::Rider,
resources::PlayerPhysicsSettings,
resources::{PlayerPhysicsSetting, PlayerPhysicsSettings},
slowjob::SlowJobPool,
terrain::TerrainGrid,
vol::ReadVol,
};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{ClientGeneral, ServerGeneral};
use common_state::{BlockChange, BuildAreas};
use core::mem;
use rayon::prelude::*;
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, Write, WriteStorage};
use std::borrow::Cow;
use tracing::{debug, trace, warn};
use vek::*;
#[cfg(feature = "persistent_world")]
pub type TerrainPersistenceData<'a> = Option<Write<'a, TerrainPersistence>>;
#[cfg(not(feature = "persistent_world"))]
pub type TerrainPersistenceData<'a> = ();
pub type TerrainPersistenceData<'a> = core::marker::PhantomData<&'a mut ()>;
// NOTE: These writes are considered "rare", meaning (currently) that they are
// admin-gated features that players shouldn't normally access, and which we're
// not that concerned about the performance of when two players try to use them
// at once.
//
// In such cases, we're okay putting them behind a mutex and penalizing the
// system if they're actually used concurrently by lots of users. Please do not
// put less rare writes here, unless you want to serialize the system!
struct RareWrites<'a, 'b> {
block_changes: &'b mut BlockChange,
_terrain_persistence: &'b mut TerrainPersistenceData<'a>,
}
impl Sys {
#[allow(clippy::too_many_arguments)]
@ -36,18 +53,16 @@ impl Sys {
can_build: &ReadStorage<'_, CanBuild>,
is_rider: &ReadStorage<'_, Is<Rider>>,
force_updates: &ReadStorage<'_, ForceUpdate>,
skill_sets: &mut WriteStorage<'_, SkillSet>,
skill_set: &mut Option<Cow<'_, SkillSet>>,
healths: &ReadStorage<'_, Health>,
block_changes: &mut Write<'_, BlockChange>,
positions: &mut WriteStorage<'_, Pos>,
velocities: &mut WriteStorage<'_, Vel>,
orientations: &mut WriteStorage<'_, Ori>,
controllers: &mut WriteStorage<'_, Controller>,
rare_writes: &parking_lot::Mutex<RareWrites<'_, '_>>,
position: Option<&mut Pos>,
velocity: Option<&mut Vel>,
orientation: Option<&mut Ori>,
controller: Option<&mut Controller>,
settings: &Read<'_, Settings>,
build_areas: &Read<'_, BuildAreas>,
player_physics_settings: &mut Write<'_, PlayerPhysicsSettings>,
_terrain_persistence: &mut TerrainPersistenceData<'_>,
maybe_player: &Option<&Player>,
player_physics_setting: Option<&mut PlayerPhysicsSetting>,
maybe_admin: &Option<&Admin>,
msg: ClientGeneral,
) -> Result<(), crate::error::Error> {
@ -85,7 +100,7 @@ impl Sys {
},
ClientGeneral::ControllerInputs(inputs) => {
if presence.kind.controlling_char() {
if let Some(controller) = controllers.get_mut(entity) {
if let Some(controller) = controller {
controller.inputs.update_with_new(*inputs);
}
}
@ -99,26 +114,19 @@ impl Sys {
return Ok(());
}
}
if let Some(controller) = controllers.get_mut(entity) {
if let Some(controller) = controller {
controller.push_event(event);
}
}
},
ClientGeneral::ControlAction(event) => {
if presence.kind.controlling_char() {
if let Some(controller) = controllers.get_mut(entity) {
if let Some(controller) = controller {
controller.push_action(event);
}
}
},
ClientGeneral::PlayerPhysics { pos, vel, ori } => {
let player_physics_setting = maybe_player.map(|p| {
player_physics_settings
.settings
.entry(p.uuid())
.or_default()
});
if presence.kind.controlling_char()
&& force_updates.get(entity).is_none()
&& healths.get(entity).map_or(true, |h| !h.is_dead)
@ -142,7 +150,7 @@ impl Sys {
let rejection = None
// Check position
.or_else(|| {
if let Some(prev_pos) = positions.get(entity) {
if let Some(prev_pos) = &position {
if prev_pos.0.distance_squared(pos.0) > (500.0f32).powf(2.0) {
Some(Rejection::TooFar { old: prev_pos.0, new: pos.0 })
} else {
@ -191,9 +199,9 @@ impl Sys {
),
None => {
// Don't insert unless the component already exists
let _ = positions.get_mut(entity).map(|p| *p = pos);
let _ = velocities.get_mut(entity).map(|v| *v = vel);
let _ = orientations.get_mut(entity).map(|o| *o = ori);
position.map(|p| *p = pos);
velocity.map(|v| *v = vel);
orientation.map(|o| *o = ori);
},
}
}
@ -211,10 +219,12 @@ impl Sys {
.and_then(|_| terrain.get(pos).ok())
{
let new_block = old_block.into_vacant();
let _was_set = block_changes.try_set(pos, new_block).is_some();
// Take the rare writes lock as briefly as possible.
let mut guard = rare_writes.lock();
let _was_set = guard.block_changes.try_set(pos, new_block).is_some();
#[cfg(feature = "persistent_world")]
if _was_set {
if let Some(terrain_persistence) = _terrain_persistence.as_mut()
if let Some(terrain_persistence) = guard._terrain_persistence.as_mut()
{
terrain_persistence.set_block(pos, new_block);
}
@ -236,10 +246,12 @@ impl Sys {
.filter(|aabb| aabb.contains_point(pos))
.is_some()
{
let _was_set = block_changes.try_set(pos, new_block).is_some();
// Take the rare writes lock as briefly as possible.
let mut guard = rare_writes.lock();
let _was_set = guard.block_changes.try_set(pos, new_block).is_some();
#[cfg(feature = "persistent_world")]
if _was_set {
if let Some(terrain_persistence) = _terrain_persistence.as_mut()
if let Some(terrain_persistence) = guard._terrain_persistence.as_mut()
{
terrain_persistence.set_block(pos, new_block);
}
@ -250,14 +262,10 @@ impl Sys {
}
},
ClientGeneral::UnlockSkill(skill) => {
skill_sets
.get_mut(entity)
.map(|mut skill_set| skill_set.unlock_skill(skill));
},
ClientGeneral::UnlockSkillGroup(skill_group_kind) => {
skill_sets
.get_mut(entity)
.map(|mut skill_set| skill_set.unlock_skill_group(skill_group_kind));
// FIXME: How do we want to handle the error? Probably not by swallowing it.
let _ = skill_set.as_mut().map(|skill_set| {
SkillSet::unlock_skill_cow(skill_set, skill, |skill_set| skill_set.to_mut())
}).transpose();
},
ClientGeneral::RequestSiteInfo(id) => {
server_emitter.emit(ServerEvent::RequestSiteInfo { entity, id });
@ -265,12 +273,6 @@ impl Sys {
ClientGeneral::RequestPlayerPhysics {
server_authoritative,
} => {
let player_physics_setting = maybe_player.map(|p| {
player_physics_settings
.settings
.entry(p.uuid())
.or_default()
});
if let Some(setting) = player_physics_setting {
setting.client_optin = server_authoritative;
}
@ -314,6 +316,7 @@ impl<'a> System<'a> for Sys {
Entities<'a>,
Read<'a, EventBus<ServerEvent>>,
ReadExpect<'a, TerrainGrid>,
ReadExpect<'a, SlowJobPool>,
ReadStorage<'a, CanBuild>,
ReadStorage<'a, ForceUpdate>,
ReadStorage<'a, Is<Rider>>,
@ -344,6 +347,7 @@ impl<'a> System<'a> for Sys {
entities,
server_event_bus,
terrain,
slow_jobs,
can_build,
force_updates,
is_rider,
@ -358,49 +362,142 @@ impl<'a> System<'a> for Sys {
mut controllers,
settings,
build_areas,
mut player_physics_settings,
mut player_physics_settings_,
mut terrain_persistence,
players,
admins,
): Self::SystemData,
) {
let mut server_emitter = server_event_bus.emitter();
// NOTE: stdlib mutex is more than good enough on Linux and (probably) Windows,
// but not Mac.
let rare_writes = parking_lot::Mutex::new(RareWrites {
block_changes: &mut block_changes,
_terrain_persistence: &mut terrain_persistence,
});
for (entity, client, mut maybe_presence, player, maybe_admin) in (
let player_physics_settings = &*player_physics_settings_;
let mut deferred_updates = (
&entities,
&mut clients,
(&mut presences).maybe(),
players.maybe(),
admins.maybe(),
(&skill_sets).maybe(),
(&mut positions).maybe(),
(&mut velocities).maybe(),
(&mut orientations).maybe(),
(&mut controllers).maybe(),
)
.join()
{
let _ = super::try_recv_all(client, 2, |client, msg| {
Self::handle_client_in_game_msg(
&mut server_emitter,
// NOTE: Required because Specs has very poor work splitting for sparse joins.
.par_bridge()
.map_init(
|| server_event_bus.emitter(),
|server_emitter, (
entity,
client,
&mut maybe_presence.as_deref_mut(),
&terrain,
&can_build,
&is_rider,
&force_updates,
&mut skill_sets,
&healths,
&mut block_changes,
&mut positions,
&mut velocities,
&mut orientations,
&mut controllers,
&settings,
&build_areas,
&mut player_physics_settings,
&mut terrain_persistence,
&player,
&maybe_admin,
msg,
)
mut maybe_presence,
maybe_player,
maybe_admin,
skill_set,
ref mut pos,
ref mut vel,
ref mut ori,
ref mut controller,
)| {
let old_player_physics_setting = maybe_player.map(|p| {
player_physics_settings
.settings
.get(&p.uuid())
.copied()
.unwrap_or_default()
});
let mut new_player_physics_setting = old_player_physics_setting;
// If an `ExitInGame` message is received this is set to `None` allowing further
// ingame messages to be ignored.
let mut clearable_maybe_presence = maybe_presence.as_deref_mut();
let mut skill_set = skill_set.map(Cow::Borrowed);
let _ = super::try_recv_all(client, 2, |client, msg| {
Self::handle_client_in_game_msg(
server_emitter,
entity,
client,
&mut clearable_maybe_presence,
&terrain,
&can_build,
&is_rider,
&force_updates,
&mut skill_set,
&healths,
&rare_writes,
pos.as_deref_mut(),
vel.as_deref_mut(),
ori.as_deref_mut(),
controller.as_deref_mut(),
&settings,
&build_areas,
new_player_physics_setting.as_mut(),
&maybe_admin,
msg,
)
});
// Return the possibly modified skill set, and possibly modified server physics
// settings.
let skill_set_update = skill_set.and_then(|skill_set| match skill_set {
Cow::Borrowed(_) => None,
Cow::Owned(skill_set) => Some((entity, skill_set)),
});
// NOTE: Since we pass Option<&mut _> rather than &mut Option<_> to
// handle_client_in_game_msg, and the new player was initialized to the same
// value as the old player, we know that either both the new and old player
// are Some, or they are both None.
let physics_update = maybe_player.map(|p| p.uuid())
.zip(new_player_physics_setting
.filter(|_| old_player_physics_setting != new_player_physics_setting));
(skill_set_update, physics_update)
},
)
// NOTE: Would be nice to combine this with the map_init somehow, but I'm not sure if
// that's possible.
.filter(|(x, y)| x.is_some() || y.is_some())
// NOTE: I feel like we shouldn't actually need to allocate here, but hopefully this
// doesn't turn out to be important as there shouldn't be that many connected clients.
// The reason we can't just use unzip is that the two sides might be different lengths.
.collect::<Vec<_>>();
let player_physics_settings = &mut *player_physics_settings_;
// Deferred updates to skillsets and player physics.
//
// NOTE: It is an invariant that there is at most one client entry per player
// uuid; since we joined on clients, it follows that there's just one update
// per uuid, so the physics update is sound and doesn't depend on evaluation
// order, even though we're not updating directly by entity or uid (note that
// for a given entity, we process messages serially).
deferred_updates
.iter_mut()
.for_each(|(skill_set_update, physics_update)| {
if let Some((entity, new_skill_set)) = skill_set_update {
// We know this exists, because we already iterated over it with the skillset
// lock taken, so we can ignore the error.
//
// Note that we replace rather than just updating. This is in order to avoid
// dropping here; we'll drop later on a background thread, in case skillsets are
// slow to drop.
skill_sets
.get_mut(*entity)
.map(|mut old_skill_set| mem::swap(&mut *old_skill_set, new_skill_set));
}
if let &mut Some((uuid, player_physics_setting)) = physics_update {
// We don't necessarily know this exists, but that's fine, because dropping
// player physics is a no op.
player_physics_settings
.settings
.insert(uuid, player_physics_setting);
}
});
}
// Finally, drop the deferred updates in another thread.
slow_jobs.spawn(&"CHUNK_DROP", async move {
drop(deferred_updates);
});
}
}