diff --git a/client/src/lib.rs b/client/src/lib.rs index e263eff199..bf439d77ec 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -887,7 +887,6 @@ impl Client { | ClientGeneral::PlayerPhysics { .. } | ClientGeneral::UnlockSkill(_) | ClientGeneral::RequestSiteInfo(_) - | ClientGeneral::UnlockSkillGroup(_) | ClientGeneral::RequestPlayerPhysics { .. } | ClientGeneral::RequestLossyTerrainCompression { .. } | ClientGeneral::UpdateMapMarker(_) => { diff --git a/common/net/src/msg/client.rs b/common/net/src/msg/client.rs index 679652205f..faaddaf5eb 100644 --- a/common/net/src/msg/client.rs +++ b/common/net/src/msg/client.rs @@ -2,7 +2,7 @@ use super::{world_msg::SiteId, PingMsg}; use common::{ character::CharacterId, comp, - comp::{Skill, SkillGroupKind}, + comp::Skill, terrain::block::Block, }; use serde::{Deserialize, Serialize}; @@ -76,7 +76,6 @@ pub enum ClientGeneral { ori: comp::Ori, }, UnlockSkill(Skill), - UnlockSkillGroup(SkillGroupKind), RequestSiteInfo(SiteId), UpdateMapMarker(comp::MapMarkerChange), //Only in Game, via terrain stream @@ -133,7 +132,6 @@ impl ClientMsg { | ClientGeneral::LodZoneRequest { .. } | ClientGeneral::UnlockSkill(_) | ClientGeneral::RequestSiteInfo(_) - | ClientGeneral::UnlockSkillGroup(_) | ClientGeneral::RequestPlayerPhysics { .. } | ClientGeneral::RequestLossyTerrainCompression { .. } | ClientGeneral::UpdateMapMarker(_) => { diff --git a/common/src/comp/skillset/mod.rs b/common/src/comp/skillset/mod.rs index b9f382bed5..ae1ff5fb8f 100644 --- a/common/src/comp/skillset/mod.rs +++ b/common/src/comp/skillset/mod.rs @@ -5,6 +5,7 @@ use crate::{ skills::{GeneralSkill, Skill}, }, }; +use core::borrow::{Borrow, BorrowMut}; use hashbrown::HashMap; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; @@ -328,15 +329,21 @@ impl SkillSet { (skillset, persistence_load_error) } + /// Check if a particular skill group is accessible for an entity, *if* it + /// exists. + fn skill_group_accessible_if_exists(&self, skill_group_kind: SkillGroupKind) -> bool { + self.has_skill(Skill::UnlockGroup(skill_group_kind)) + } + /// Checks if a particular skill group is accessible for an entity pub fn skill_group_accessible(&self, skill_group_kind: SkillGroupKind) -> bool { self.skill_groups.contains_key(&skill_group_kind) - && self.has_skill(Skill::UnlockGroup(skill_group_kind)) + && self.skill_group_accessible_if_exists(skill_group_kind) } /// Unlocks a skill group for a player. It starts with 0 exp and 0 skill /// points. - pub fn unlock_skill_group(&mut self, skill_group_kind: SkillGroupKind) { + fn unlock_skill_group(&mut self, skill_group_kind: SkillGroupKind) { if !self.skill_groups.contains_key(&skill_group_kind) { self.skill_groups .insert(skill_group_kind, SkillGroup::new(skill_group_kind)); @@ -459,33 +466,56 @@ impl SkillSet { /// Unlocks a skill for a player, assuming they have the relevant skill /// group unlocked and available SP in that skill group. - pub fn unlock_skill(&mut self, skill: Skill) -> Result<(), SkillUnlockError> { + /// + /// NOTE: Please don't use pathological or clever implementations of to_mut + /// here. + pub fn unlock_skill_cow<'a, B, C: 'a>( + this_: &'a mut B, + skill: Skill, + to_mut: impl FnOnce(&'a mut B) -> &'a mut C, + ) -> Result<(), SkillUnlockError> + where + B: Borrow, + C: BorrowMut, + { if let Some(skill_group_kind) = skill.skill_group_kind() { - let next_level = self.next_skill_level(skill); - let prerequisites_met = self.prerequisites_met(skill); + let this = (&*this_).borrow(); + let next_level = this.next_skill_level(skill); + let prerequisites_met = this.prerequisites_met(skill); // Check that skill is not yet at max level - if !matches!(self.skills.get(&skill), Some(level) if *level == skill.max_level()) { - if let Some(mut skill_group) = self.skill_group_mut(skill_group_kind) { + if !matches!(this.skills.get(&skill), Some(level) if *level == skill.max_level()) { + if let Some(skill_group) = this.skill_groups.get(&skill_group_kind) && + this.skill_group_accessible_if_exists(skill_group_kind) + { if prerequisites_met { if let Some(new_available_sp) = skill_group .available_sp .checked_sub(skill.skill_cost(next_level)) { + // Perform all mutation inside this branch, to avoid triggering a copy + // on write or flagged storage in cases where this matters. + let this_ = to_mut(this_); + let mut this = this_.borrow_mut(); + // NOTE: Verified to exist previously when we accessed + // this.skill_groups (assuming a non-pathological implementation of + // ToOwned). + let skill_group = this.skill_groups.get_mut(&skill_group_kind) + .expect("Verified to exist when we previously accessed this.skill_groups"); skill_group.available_sp = new_available_sp; skill_group.ordered_skills.push(skill); match skill { Skill::UnlockGroup(group) => { - self.unlock_skill_group(group); + this.unlock_skill_group(group); }, Skill::General(GeneralSkill::HealthIncrease) => { - self.modify_health = true; + this.modify_health = true; }, Skill::General(GeneralSkill::EnergyIncrease) => { - self.modify_energy = true; + this.modify_energy = true; }, _ => {}, } - self.skills.insert(skill, next_level); + this.skills.insert(skill, next_level); Ok(()) } else { trace!("Tried to unlock skill for skill group with insufficient SP"); @@ -512,6 +542,12 @@ impl SkillSet { } } + /// Convenience function for the case where you have mutable access to the + /// skill. + pub fn unlock_skill(&mut self, skill: Skill) -> Result<(), SkillUnlockError> { + Self::unlock_skill_cow(self, skill, |x| x) + } + /// Checks if the player has available SP to spend pub fn has_available_sp(&self) -> bool { self.skill_groups.iter().any(|(kind, sg)| { diff --git a/common/src/lib.rs b/common/src/lib.rs index 3d65a743a4..f719796720 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -14,6 +14,7 @@ generic_const_exprs, generic_arg_infer, label_break_value, + let_chains, option_zip, portable_simd, slice_as_chunks, diff --git a/common/src/resources.rs b/common/src/resources.rs index 5b0061aa8b..7e984ed569 100644 --- a/common/src/resources.rs +++ b/common/src/resources.rs @@ -41,7 +41,7 @@ pub enum GameMode { #[derive(Copy, Clone, Default, Debug)] pub struct PlayerEntity(pub Option); -#[derive(Copy, Clone, Debug, Default)] +#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)] pub struct PlayerPhysicsSetting { /// true if the client wants server-authoratative physics (e.g. to use /// airships properly) diff --git a/server/src/sys/msg/in_game.rs b/server/src/sys/msg/in_game.rs index 8f03fe02dd..92ba0cdc6d 100644 --- a/server/src/sys/msg/in_game.rs +++ b/server/src/sys/msg/in_game.rs @@ -9,21 +9,38 @@ use common::{ event::{EventBus, ServerEvent}, link::Is, mounting::Rider, - resources::PlayerPhysicsSettings, + resources::{PlayerPhysicsSetting, PlayerPhysicsSettings}, + slowjob::SlowJobPool, terrain::TerrainGrid, vol::ReadVol, }; use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{ClientGeneral, ServerGeneral}; use common_state::{BlockChange, BuildAreas}; +use core::mem; +use rayon::prelude::*; use specs::{Entities, Join, Read, ReadExpect, ReadStorage, Write, WriteStorage}; +use std::borrow::Cow; use tracing::{debug, trace, warn}; use vek::*; #[cfg(feature = "persistent_world")] pub type TerrainPersistenceData<'a> = Option>; #[cfg(not(feature = "persistent_world"))] -pub type TerrainPersistenceData<'a> = (); +pub type TerrainPersistenceData<'a> = core::marker::PhantomData<&'a mut ()>; + +// NOTE: These writes are considered "rare", meaning (currently) that they are +// admin-gated features that players shouldn't normally access, and which we're +// not that concerned about the performance of when two players try to use them +// at once. +// +// In such cases, we're okay putting them behind a mutex and penalizing the +// system if they're actually used concurrently by lots of users. Please do not +// put less rare writes here, unless you want to serialize the system! +struct RareWrites<'a, 'b> { + block_changes: &'b mut BlockChange, + _terrain_persistence: &'b mut TerrainPersistenceData<'a>, +} impl Sys { #[allow(clippy::too_many_arguments)] @@ -36,18 +53,16 @@ impl Sys { can_build: &ReadStorage<'_, CanBuild>, is_rider: &ReadStorage<'_, Is>, force_updates: &ReadStorage<'_, ForceUpdate>, - skill_sets: &mut WriteStorage<'_, SkillSet>, + skill_set: &mut Option>, healths: &ReadStorage<'_, Health>, - block_changes: &mut Write<'_, BlockChange>, - positions: &mut WriteStorage<'_, Pos>, - velocities: &mut WriteStorage<'_, Vel>, - orientations: &mut WriteStorage<'_, Ori>, - controllers: &mut WriteStorage<'_, Controller>, + rare_writes: &parking_lot::Mutex>, + position: Option<&mut Pos>, + velocity: Option<&mut Vel>, + orientation: Option<&mut Ori>, + controller: Option<&mut Controller>, settings: &Read<'_, Settings>, build_areas: &Read<'_, BuildAreas>, - player_physics_settings: &mut Write<'_, PlayerPhysicsSettings>, - _terrain_persistence: &mut TerrainPersistenceData<'_>, - maybe_player: &Option<&Player>, + player_physics_setting: Option<&mut PlayerPhysicsSetting>, maybe_admin: &Option<&Admin>, msg: ClientGeneral, ) -> Result<(), crate::error::Error> { @@ -85,7 +100,7 @@ impl Sys { }, ClientGeneral::ControllerInputs(inputs) => { if presence.kind.controlling_char() { - if let Some(controller) = controllers.get_mut(entity) { + if let Some(controller) = controller { controller.inputs.update_with_new(*inputs); } } @@ -99,26 +114,19 @@ impl Sys { return Ok(()); } } - if let Some(controller) = controllers.get_mut(entity) { + if let Some(controller) = controller { controller.push_event(event); } } }, ClientGeneral::ControlAction(event) => { if presence.kind.controlling_char() { - if let Some(controller) = controllers.get_mut(entity) { + if let Some(controller) = controller { controller.push_action(event); } } }, ClientGeneral::PlayerPhysics { pos, vel, ori } => { - let player_physics_setting = maybe_player.map(|p| { - player_physics_settings - .settings - .entry(p.uuid()) - .or_default() - }); - if presence.kind.controlling_char() && force_updates.get(entity).is_none() && healths.get(entity).map_or(true, |h| !h.is_dead) @@ -142,7 +150,7 @@ impl Sys { let rejection = None // Check position .or_else(|| { - if let Some(prev_pos) = positions.get(entity) { + if let Some(prev_pos) = &position { if prev_pos.0.distance_squared(pos.0) > (500.0f32).powf(2.0) { Some(Rejection::TooFar { old: prev_pos.0, new: pos.0 }) } else { @@ -191,9 +199,9 @@ impl Sys { ), None => { // Don't insert unless the component already exists - let _ = positions.get_mut(entity).map(|p| *p = pos); - let _ = velocities.get_mut(entity).map(|v| *v = vel); - let _ = orientations.get_mut(entity).map(|o| *o = ori); + position.map(|p| *p = pos); + velocity.map(|v| *v = vel); + orientation.map(|o| *o = ori); }, } } @@ -211,10 +219,12 @@ impl Sys { .and_then(|_| terrain.get(pos).ok()) { let new_block = old_block.into_vacant(); - let _was_set = block_changes.try_set(pos, new_block).is_some(); + // Take the rare writes lock as briefly as possible. + let mut guard = rare_writes.lock(); + let _was_set = guard.block_changes.try_set(pos, new_block).is_some(); #[cfg(feature = "persistent_world")] if _was_set { - if let Some(terrain_persistence) = _terrain_persistence.as_mut() + if let Some(terrain_persistence) = guard._terrain_persistence.as_mut() { terrain_persistence.set_block(pos, new_block); } @@ -236,10 +246,12 @@ impl Sys { .filter(|aabb| aabb.contains_point(pos)) .is_some() { - let _was_set = block_changes.try_set(pos, new_block).is_some(); + // Take the rare writes lock as briefly as possible. + let mut guard = rare_writes.lock(); + let _was_set = guard.block_changes.try_set(pos, new_block).is_some(); #[cfg(feature = "persistent_world")] if _was_set { - if let Some(terrain_persistence) = _terrain_persistence.as_mut() + if let Some(terrain_persistence) = guard._terrain_persistence.as_mut() { terrain_persistence.set_block(pos, new_block); } @@ -250,14 +262,10 @@ impl Sys { } }, ClientGeneral::UnlockSkill(skill) => { - skill_sets - .get_mut(entity) - .map(|mut skill_set| skill_set.unlock_skill(skill)); - }, - ClientGeneral::UnlockSkillGroup(skill_group_kind) => { - skill_sets - .get_mut(entity) - .map(|mut skill_set| skill_set.unlock_skill_group(skill_group_kind)); + // FIXME: How do we want to handle the error? Probably not by swallowing it. + let _ = skill_set.as_mut().map(|skill_set| { + SkillSet::unlock_skill_cow(skill_set, skill, |skill_set| skill_set.to_mut()) + }).transpose(); }, ClientGeneral::RequestSiteInfo(id) => { server_emitter.emit(ServerEvent::RequestSiteInfo { entity, id }); @@ -265,12 +273,6 @@ impl Sys { ClientGeneral::RequestPlayerPhysics { server_authoritative, } => { - let player_physics_setting = maybe_player.map(|p| { - player_physics_settings - .settings - .entry(p.uuid()) - .or_default() - }); if let Some(setting) = player_physics_setting { setting.client_optin = server_authoritative; } @@ -314,6 +316,7 @@ impl<'a> System<'a> for Sys { Entities<'a>, Read<'a, EventBus>, ReadExpect<'a, TerrainGrid>, + ReadExpect<'a, SlowJobPool>, ReadStorage<'a, CanBuild>, ReadStorage<'a, ForceUpdate>, ReadStorage<'a, Is>, @@ -344,6 +347,7 @@ impl<'a> System<'a> for Sys { entities, server_event_bus, terrain, + slow_jobs, can_build, force_updates, is_rider, @@ -358,49 +362,142 @@ impl<'a> System<'a> for Sys { mut controllers, settings, build_areas, - mut player_physics_settings, + mut player_physics_settings_, mut terrain_persistence, players, admins, ): Self::SystemData, ) { - let mut server_emitter = server_event_bus.emitter(); + // NOTE: stdlib mutex is more than good enough on Linux and (probably) Windows, + // but not Mac. + let rare_writes = parking_lot::Mutex::new(RareWrites { + block_changes: &mut block_changes, + _terrain_persistence: &mut terrain_persistence, + }); - for (entity, client, mut maybe_presence, player, maybe_admin) in ( + let player_physics_settings = &*player_physics_settings_; + let mut deferred_updates = ( &entities, &mut clients, (&mut presences).maybe(), players.maybe(), admins.maybe(), + (&skill_sets).maybe(), + (&mut positions).maybe(), + (&mut velocities).maybe(), + (&mut orientations).maybe(), + (&mut controllers).maybe(), ) .join() - { - let _ = super::try_recv_all(client, 2, |client, msg| { - Self::handle_client_in_game_msg( - &mut server_emitter, + // NOTE: Required because Specs has very poor work splitting for sparse joins. + .par_bridge() + .map_init( + || server_event_bus.emitter(), + |server_emitter, ( entity, client, - &mut maybe_presence.as_deref_mut(), - &terrain, - &can_build, - &is_rider, - &force_updates, - &mut skill_sets, - &healths, - &mut block_changes, - &mut positions, - &mut velocities, - &mut orientations, - &mut controllers, - &settings, - &build_areas, - &mut player_physics_settings, - &mut terrain_persistence, - &player, - &maybe_admin, - msg, - ) + mut maybe_presence, + maybe_player, + maybe_admin, + skill_set, + ref mut pos, + ref mut vel, + ref mut ori, + ref mut controller, + )| { + let old_player_physics_setting = maybe_player.map(|p| { + player_physics_settings + .settings + .get(&p.uuid()) + .copied() + .unwrap_or_default() + }); + let mut new_player_physics_setting = old_player_physics_setting; + // If an `ExitInGame` message is received this is set to `None` allowing further + // ingame messages to be ignored. + let mut clearable_maybe_presence = maybe_presence.as_deref_mut(); + let mut skill_set = skill_set.map(Cow::Borrowed); + let _ = super::try_recv_all(client, 2, |client, msg| { + Self::handle_client_in_game_msg( + server_emitter, + entity, + client, + &mut clearable_maybe_presence, + &terrain, + &can_build, + &is_rider, + &force_updates, + &mut skill_set, + &healths, + &rare_writes, + pos.as_deref_mut(), + vel.as_deref_mut(), + ori.as_deref_mut(), + controller.as_deref_mut(), + &settings, + &build_areas, + new_player_physics_setting.as_mut(), + &maybe_admin, + msg, + ) + }); + + // Return the possibly modified skill set, and possibly modified server physics + // settings. + let skill_set_update = skill_set.and_then(|skill_set| match skill_set { + Cow::Borrowed(_) => None, + Cow::Owned(skill_set) => Some((entity, skill_set)), + }); + // NOTE: Since we pass Option<&mut _> rather than &mut Option<_> to + // handle_client_in_game_msg, and the new player was initialized to the same + // value as the old player, we know that either both the new and old player + // are Some, or they are both None. + let physics_update = maybe_player.map(|p| p.uuid()) + .zip(new_player_physics_setting + .filter(|_| old_player_physics_setting != new_player_physics_setting)); + (skill_set_update, physics_update) + }, + ) + // NOTE: Would be nice to combine this with the map_init somehow, but I'm not sure if + // that's possible. + .filter(|(x, y)| x.is_some() || y.is_some()) + // NOTE: I feel like we shouldn't actually need to allocate here, but hopefully this + // doesn't turn out to be important as there shouldn't be that many connected clients. + // The reason we can't just use unzip is that the two sides might be different lengths. + .collect::>(); + let player_physics_settings = &mut *player_physics_settings_; + // Deferred updates to skillsets and player physics. + // + // NOTE: It is an invariant that there is at most one client entry per player + // uuid; since we joined on clients, it follows that there's just one update + // per uuid, so the physics update is sound and doesn't depend on evaluation + // order, even though we're not updating directly by entity or uid (note that + // for a given entity, we process messages serially). + deferred_updates + .iter_mut() + .for_each(|(skill_set_update, physics_update)| { + if let Some((entity, new_skill_set)) = skill_set_update { + // We know this exists, because we already iterated over it with the skillset + // lock taken, so we can ignore the error. + // + // Note that we replace rather than just updating. This is in order to avoid + // dropping here; we'll drop later on a background thread, in case skillsets are + // slow to drop. + skill_sets + .get_mut(*entity) + .map(|mut old_skill_set| mem::swap(&mut *old_skill_set, new_skill_set)); + } + if let &mut Some((uuid, player_physics_setting)) = physics_update { + // We don't necessarily know this exists, but that's fine, because dropping + // player physics is a no op. + player_physics_settings + .settings + .insert(uuid, player_physics_setting); + } }); - } + // Finally, drop the deferred updates in another thread. + slow_jobs.spawn(&"CHUNK_DROP", async move { + drop(deferred_updates); + }); } }