Change outcomes to eventbus

This commit is contained in:
Forest Anderson 2022-05-09 19:58:13 +00:00 committed by Marcel
parent 6b69fbe9c5
commit cea55143ce
14 changed files with 89 additions and 64 deletions

View File

@ -251,6 +251,8 @@ pub struct Emitter<'a, E> {
impl<'a, E> Emitter<'a, E> {
pub fn emit(&mut self, event: E) { self.events.push_back(event); }
pub fn emit_many(&mut self, events: impl IntoIterator<Item = E>) { self.events.extend(events); }
pub fn append(&mut self, other: &mut VecDeque<E>) { self.events.append(other) }
// TODO: allow just emitting the whole vec of events at once? without copying

View File

@ -216,7 +216,7 @@ impl State {
ecs.insert(TerrainChanges::default());
ecs.insert(EventBus::<LocalEvent>::default());
ecs.insert(game_mode);
ecs.insert(Vec::<common::outcome::Outcome>::new());
ecs.insert(EventBus::<Outcome>::default());
ecs.insert(common::CachedSpatialGrid::default());
ecs.insert(EntitiesDiedLastTick::default());
@ -529,6 +529,10 @@ impl State {
// Process local events
span!(guard, "process local events");
let outcomes = self.ecs.read_resource::<EventBus<Outcome>>();
let mut outcomes_emitter = outcomes.emitter();
let events = self.ecs.read_resource::<EventBus<LocalEvent>>().recv_all();
for event in events {
let mut velocities = self.ecs.write_storage::<comp::Vel>();
@ -553,7 +557,7 @@ impl State {
}
},
LocalEvent::CreateOutcome(outcome) => {
self.ecs.write_resource::<Vec<Outcome>>().push(outcome);
outcomes_emitter.emit(outcome);
},
}
}

View File

@ -18,7 +18,7 @@ use rand::{thread_rng, Rng};
use rayon::iter::ParallelIterator;
use specs::{
saveload::MarkerAllocator, shred::ResourceId, Entities, Join, ParJoin, Read, ReadExpect,
ReadStorage, SystemData, World, Write, WriteStorage,
ReadStorage, SystemData, World, WriteStorage,
};
use std::time::Duration;
use vek::*;
@ -56,7 +56,7 @@ impl<'a> System<'a> for Sys {
ReadData<'a>,
WriteStorage<'a, BeamSegment>,
WriteStorage<'a, Beam>,
Write<'a, Vec<Outcome>>,
Read<'a, EventBus<Outcome>>,
);
const NAME: &'static str = "beam";
@ -65,9 +65,10 @@ impl<'a> System<'a> for Sys {
fn run(
job: &mut Job<Self>,
(read_data, mut beam_segments, mut beams, mut outcomes): Self::SystemData,
(read_data, mut beam_segments, mut beams, outcomes): Self::SystemData,
) {
let mut server_emitter = read_data.server_bus.emitter();
let mut outcomes_emitter = outcomes.emitter();
let time = read_data.time.0;
let dt = read_data.dt.0;
@ -75,7 +76,7 @@ impl<'a> System<'a> for Sys {
job.cpu_stats.measure(ParMode::Rayon);
// Beams
let (server_events, add_hit_entities, mut new_outcomes) = (
let (server_events, add_hit_entities, new_outcomes) = (
&read_data.entities,
&read_data.positions,
&read_data.orientations,
@ -272,7 +273,8 @@ impl<'a> System<'a> for Sys {
},
);
job.cpu_stats.measure(ParMode::Single);
outcomes.append(&mut new_outcomes);
outcomes_emitter.emit_many(new_outcomes);
for event in server_events {
server_emitter.emit(event);

View File

@ -1,6 +1,6 @@
use specs::{
shred::ResourceId, Entities, Join, LazyUpdate, Read, ReadExpect, ReadStorage, SystemData,
World, Write, WriteStorage,
World, WriteStorage,
};
use common::{
@ -67,7 +67,7 @@ impl<'a> System<'a> for Sys {
WriteStorage<'a, Energy>,
WriteStorage<'a, Controller>,
WriteStorage<'a, Poise>,
Write<'a, Vec<Outcome>>,
Read<'a, EventBus<Outcome>>,
);
const NAME: &'static str = "character_behavior";
@ -86,11 +86,12 @@ impl<'a> System<'a> for Sys {
mut energies,
mut controllers,
mut poises,
mut outcomes,
outcomes,
): Self::SystemData,
) {
let mut server_emitter = read_data.server_bus.emitter();
let mut local_emitter = read_data.local_bus.emitter();
let mut outcomes_emitter = outcomes.emitter();
let mut local_events = Vec::new();
let mut server_events = Vec::new();
@ -155,7 +156,7 @@ impl<'a> System<'a> for Sys {
// Reset poise if there is some stunned state to apply
poise.reset(*read_data.time, stunned_duration);
*char_state = stunned_state;
outcomes.push(Outcome::PoiseChange {
outcomes_emitter.emit(Outcome::PoiseChange {
pos,
state: poise_state,
});

View File

@ -14,7 +14,7 @@ use common::{
};
use common_ecs::{Job, Origin, Phase, System};
use specs::{
shred::ResourceId, Entities, Join, Read, ReadStorage, SystemData, World, Write, WriteStorage,
shred::ResourceId, Entities, Join, Read, ReadStorage, SystemData, World, WriteStorage,
};
use vek::*;
@ -49,15 +49,17 @@ impl<'a> System<'a> for Sys {
type SystemData = (
ReadData<'a>,
WriteStorage<'a, Melee>,
Write<'a, Vec<Outcome>>,
Read<'a, EventBus<Outcome>>,
);
const NAME: &'static str = "melee";
const ORIGIN: Origin = Origin::Common;
const PHASE: Phase = Phase::Create;
fn run(_job: &mut Job<Self>, (read_data, mut melee_attacks, mut outcomes): Self::SystemData) {
fn run(_job: &mut Job<Self>, (read_data, mut melee_attacks, outcomes): Self::SystemData) {
let mut server_emitter = read_data.server_bus.emitter();
let mut outcomes_emitter = outcomes.emitter();
// Attacks
for (attacker, uid, pos, ori, melee_attack, body) in (
&read_data.entities,
@ -196,7 +198,7 @@ impl<'a> System<'a> for Sys {
AttackSource::Melee,
*read_data.time,
|e| server_emitter.emit(e),
|o| outcomes.push(o),
|o| outcomes_emitter.emit(o),
);
if is_applied {

View File

@ -132,7 +132,7 @@ pub struct PhysicsWrite<'a> {
pos_vel_ori_defers: WriteStorage<'a, PosVelOriDefer>,
orientations: WriteStorage<'a, Ori>,
previous_phys_cache: WriteStorage<'a, PreviousPhysCache>,
outcomes: Write<'a, Vec<Outcome>>,
outcomes: Read<'a, EventBus<Outcome>>,
}
#[derive(SystemData)]
@ -725,7 +725,7 @@ impl<'a> PhysicsData<'a> {
);
span!(guard, "Apply terrain collision");
job.cpu_stats.measure(ParMode::Rayon);
let (land_on_grounds, mut outcomes) = (
let (land_on_grounds, outcomes) = (
&read.entities,
read.scales.maybe(),
read.stickies.maybe(),
@ -1228,7 +1228,7 @@ impl<'a> PhysicsData<'a> {
drop(guard);
job.cpu_stats.measure(ParMode::Single);
write.outcomes.append(&mut outcomes);
write.outcomes.emitter().emit_many(outcomes);
prof_span!(guard, "write deferred pos and vel");
for (_, pos, vel, ori, pos_vel_ori_defer, _) in (

View File

@ -16,7 +16,7 @@ use common_ecs::{Job, Origin, Phase, System};
use rand::{thread_rng, Rng};
use specs::{
saveload::MarkerAllocator, shred::ResourceId, Entities, Entity as EcsEntity, Join, Read,
ReadStorage, SystemData, World, Write, WriteStorage,
ReadStorage, SystemData, World, WriteStorage,
};
use std::time::Duration;
use vek::*;
@ -52,7 +52,7 @@ impl<'a> System<'a> for Sys {
ReadData<'a>,
WriteStorage<'a, Ori>,
WriteStorage<'a, Projectile>,
Write<'a, Vec<Outcome>>,
Read<'a, EventBus<Outcome>>,
);
const NAME: &'static str = "projectile";
@ -61,9 +61,11 @@ impl<'a> System<'a> for Sys {
fn run(
_job: &mut Job<Self>,
(read_data, mut orientations, mut projectiles, mut outcomes): Self::SystemData,
(read_data, mut orientations, mut projectiles, outcomes): Self::SystemData,
) {
let mut server_emitter = read_data.server_bus.emitter();
let mut outcomes_emitter = outcomes.emitter();
// Attacks
'projectile_loop: for (entity, pos, physics, vel, mut projectile) in (
&read_data.entities,
@ -143,7 +145,7 @@ impl<'a> System<'a> for Sys {
projectile_target_info,
&read_data,
&mut projectile_vanished,
&mut outcomes,
&mut outcomes_emitter,
&mut server_emitter,
);
}
@ -231,7 +233,7 @@ fn dispatch_hit(
projectile_target_info: ProjectileTargetInfo,
read_data: &ReadData,
projectile_vanished: &mut bool,
outcomes: &mut Vec<Outcome>,
outcomes_emitter: &mut Emitter<Outcome>,
server_emitter: &mut Emitter<ServerEvent>,
) {
match projectile_info.effect {
@ -284,7 +286,7 @@ fn dispatch_hit(
// TODO: Is it possible to have projectile without body??
if let Some(&body) = read_data.bodies.get(projectile_entity) {
outcomes.push(Outcome::ProjectileHit {
outcomes_emitter.emit(Outcome::ProjectileHit {
pos: target_pos,
body,
vel: read_data
@ -322,7 +324,7 @@ fn dispatch_hit(
AttackSource::Projectile,
*read_data.time,
|e| server_emitter.emit(e),
|o| outcomes.push(o),
|o| outcomes_emitter.emit(o),
);
},
projectile::Effect::Explode(e) => {

View File

@ -16,7 +16,7 @@ use common_ecs::{Job, Origin, Phase, System};
use rand::{thread_rng, Rng};
use specs::{
saveload::MarkerAllocator, shred::ResourceId, Entities, Join, Read, ReadStorage, SystemData,
World, Write, WriteStorage,
World, WriteStorage,
};
use vek::*;
@ -53,7 +53,7 @@ impl<'a> System<'a> for Sys {
ReadData<'a>,
WriteStorage<'a, Shockwave>,
WriteStorage<'a, ShockwaveHitEntities>,
Write<'a, Vec<Outcome>>,
Read<'a, EventBus<Outcome>>,
);
const NAME: &'static str = "shockwave";
@ -62,9 +62,10 @@ impl<'a> System<'a> for Sys {
fn run(
_job: &mut Job<Self>,
(read_data, mut shockwaves, mut shockwave_hit_lists, mut outcomes): Self::SystemData,
(read_data, mut shockwaves, mut shockwave_hit_lists, outcomes): Self::SystemData,
) {
let mut server_emitter = read_data.server_bus.emitter();
let mut outcomes_emitter = outcomes.emitter();
let time = read_data.time.0;
let dt = read_data.dt.0;
@ -234,7 +235,7 @@ impl<'a> System<'a> for Sys {
AttackSource::Shockwave,
*read_data.time,
|e| server_emitter.emit(e),
|o| outcomes.push(o),
|o| outcomes_emitter.emit(o),
);
shockwave_hit_list.hit_entities.push(*uid_b);

View File

@ -11,6 +11,7 @@ use common::{
Object, Ori, PidController, Poise, Pos, Projectile, Scale, SkillSet, Stats, Vel,
WaypointArea,
},
event::EventBus,
lottery::LootSpec,
outcome::Outcome,
rtsim::RtSimEntity,
@ -198,8 +199,8 @@ pub fn handle_shoot(
// Add an outcome
state
.ecs()
.write_resource::<Vec<Outcome>>()
.push(Outcome::ProjectileShot { pos, body, vel });
.read_resource::<EventBus<Outcome>>()
.emit_now(Outcome::ProjectileShot { pos, body, vel });
let mut builder = state.create_projectile(Pos(pos), Vel(vel), body, projectile);
if let Some(light) = light {
@ -225,10 +226,11 @@ pub fn handle_shockwave(
pub fn handle_beam(server: &mut Server, properties: beam::Properties, pos: Pos, ori: Ori) {
let state = server.state_mut();
let ecs = state.ecs();
ecs.write_resource::<Vec<Outcome>>().push(Outcome::Beam {
pos: pos.0,
specifier: properties.specifier,
});
ecs.read_resource::<EventBus<Outcome>>()
.emit_now(Outcome::Beam {
pos: pos.0,
specifier: properties.specifier,
});
state.create_beam(properties, pos, ori).build();
}

View File

@ -169,8 +169,8 @@ pub fn handle_destroy(server: &mut Server, entity: EcsEntity, last_change: Healt
if let Some(pos) = state.ecs().read_storage::<Pos>().get(entity) {
state
.ecs()
.write_resource::<Vec<Outcome>>()
.push(Outcome::Death { pos: pos.0 });
.read_resource::<EventBus<Outcome>>()
.emit_now(Outcome::Death { pos: pos.0 });
}
}
@ -295,7 +295,7 @@ pub fn handle_destroy(server: &mut Server, entity: EcsEntity, last_change: Healt
let alignments = state.ecs().read_storage::<Alignment>();
let uids = state.ecs().read_storage::<Uid>();
let mut outcomes = state.ecs().write_resource::<Vec<Outcome>>();
let mut outcomes = state.ecs().write_resource::<EventBus<Outcome>>();
let inventories = state.ecs().read_storage::<comp::Inventory>();
let destroyed_group = groups.get(entity);
@ -606,8 +606,9 @@ pub fn handle_explosion(server: &Server, pos: Vec3<f32>, explosion: Explosion, o
// Add an outcome
// Uses radius as outcome power for now
let outcome_power = explosion.radius;
let mut outcomes = ecs.write_resource::<Vec<Outcome>>();
outcomes.push(Outcome::Explosion {
let outcomes = ecs.read_resource::<EventBus<Outcome>>();
let mut outcomes_emitter = outcomes.emitter();
outcomes_emitter.emit(Outcome::Explosion {
pos,
power: outcome_power,
radius: explosion.radius,
@ -890,7 +891,7 @@ pub fn handle_explosion(server: &Server, pos: Vec3<f32>, explosion: Explosion, o
combat::AttackSource::Explosion,
*time,
|e| emitter.emit(e),
|o| outcomes.push(o),
|o| outcomes_emitter.emit(o),
);
}
}
@ -1095,9 +1096,12 @@ fn handle_exp_gain(
skill_set: &mut SkillSet,
uid: &Uid,
pos: &Pos,
outcomes: &mut Vec<Outcome>,
outcomes: &mut EventBus<Outcome>,
) {
use comp::inventory::{item::ItemKind, slot::EquipSlot};
let mut outcomes_emitter = outcomes.emitter();
// Create hash set of xp pools to consider splitting xp amongst
let mut xp_pools = HashSet::<SkillGroupKind>::new();
// Insert general pool since it is always accessible
@ -1128,7 +1132,7 @@ fn handle_exp_gain(
if let Some(level_outcome) =
skill_set.add_experience(*pool, (exp_reward / num_pools).ceil() as u32)
{
outcomes.push(Outcome::SkillPointGain {
outcomes_emitter.emit(Outcome::SkillPointGain {
uid: *uid,
skill_tree: *pool,
total_points: level_outcome,
@ -1136,7 +1140,7 @@ fn handle_exp_gain(
});
}
}
outcomes.push(Outcome::ExpChange {
outcomes_emitter.emit(Outcome::ExpChange {
uid: *uid,
exp: exp_reward as u32,
xp_pools,
@ -1147,10 +1151,10 @@ pub fn handle_combo_change(server: &Server, entity: EcsEntity, change: i32) {
let ecs = &server.state.ecs();
if let Some(mut combo) = ecs.write_storage::<comp::Combo>().get_mut(entity) {
let time = ecs.read_resource::<Time>();
let mut outcomes = ecs.write_resource::<Vec<Outcome>>();
let outcome_bus = ecs.read_resource::<EventBus<Outcome>>();
combo.change_by(change, time.0);
if let Some(uid) = ecs.read_storage::<Uid>().get(entity) {
outcomes.push(Outcome::ComboChange {
outcome_bus.emit_now(Outcome::ComboChange {
uid: *uid,
combo: combo.counter(),
});
@ -1199,7 +1203,7 @@ pub fn handle_teleport_to(server: &Server, entity: EcsEntity, target: Uid, max_r
pub fn handle_entity_attacked_hook(server: &Server, entity: EcsEntity) {
let ecs = &server.state.ecs();
let server_eventbus = ecs.read_resource::<EventBus<ServerEvent>>();
let mut outcomes = ecs.write_resource::<Vec<Outcome>>();
let time = ecs.read_resource::<Time>();
if let (Some(mut char_state), Some(mut poise), Some(pos)) = (
@ -1220,10 +1224,11 @@ pub fn handle_entity_attacked_hook(server: &Server, entity: EcsEntity) {
// Reset poise if there is some stunned state to apply
poise.reset(*time, stunned_duration);
*char_state = stunned_state;
outcomes.push(Outcome::PoiseChange {
pos: pos.0,
state: poise_state,
});
ecs.read_resource::<EventBus<Outcome>>()
.emit_now(Outcome::PoiseChange {
pos: pos.0,
state: poise_state,
});
if let Some(impulse_strength) = impulse_strength {
server_eventbus.emit_now(ServerEvent::Knockback {
entity,

View File

@ -12,6 +12,7 @@ use common::{
Inventory, Pos, SkillGroupKind,
},
consts::{MAX_MOUNT_RANGE, SOUND_TRAVEL_DIST_PER_VOLUME},
event::EventBus,
link::Is,
mounting::{Mount, Mounting, Rider},
outcome::Outcome,
@ -182,20 +183,20 @@ pub fn handle_mine_block(
.get(item.item_definition_id()),
) {
let skill_group = SkillGroupKind::Weapon(tool);
let mut outcomes = state.ecs().write_resource::<Vec<Outcome>>();
let outcome_bus = state.ecs().read_resource::<EventBus<Outcome>>();
let positions = state.ecs().read_component::<comp::Pos>();
if let (Some(level_outcome), Some(pos)) = (
skillset.add_experience(skill_group, *exp_reward),
positions.get(entity),
) {
outcomes.push(Outcome::SkillPointGain {
outcome_bus.emit_now(Outcome::SkillPointGain {
uid,
skill_tree: skill_group,
total_points: level_outcome,
pos: pos.0,
});
}
outcomes.push(Outcome::ExpChange {
outcome_bus.emit_now(Outcome::ExpChange {
uid,
exp: *exp_reward,
xp_pools: HashSet::from_iter(vec![skill_group]),
@ -242,8 +243,8 @@ pub fn handle_mine_block(
state.set_block(pos, block.into_vacant());
state
.ecs()
.write_resource::<Vec<Outcome>>()
.push(Outcome::BreakBlock {
.read_resource::<EventBus<Outcome>>()
.emit_now(Outcome::BreakBlock {
pos,
color: block.get_color(),
});
@ -285,7 +286,7 @@ pub fn handle_sound(server: &mut Server, sound: &Sound) {
}),
_ => None,
} {
ecs.write_resource::<Vec<Outcome>>().push(outcome);
ecs.read_resource::<EventBus<Outcome>>().emit_now(outcome);
}
}

View File

@ -1108,8 +1108,7 @@ impl Server {
self.state
.ecs()
.read_resource::<EventBus<ServerEvent>>()
.emitter()
.emit(event);
.emit_now(event);
}
self.disconnect_all_clients_requested = false;

View File

@ -58,7 +58,7 @@ use rand::{thread_rng, Rng};
use rayon::iter::ParallelIterator;
use specs::{
saveload::{Marker, MarkerAllocator},
Entity as EcsEntity, Join, ParJoin, Write, WriteExpect, WriteStorage,
Entity as EcsEntity, Join, ParJoin, Read, WriteExpect, WriteStorage,
};
use vek::*;
@ -68,7 +68,7 @@ pub struct Sys;
impl<'a> System<'a> for Sys {
type SystemData = (
ReadData<'a>,
Write<'a, EventBus<ServerEvent>>,
Read<'a, EventBus<ServerEvent>>,
WriteStorage<'a, Agent>,
WriteStorage<'a, Controller>,
WriteExpect<'a, RtSim>,

View File

@ -7,6 +7,7 @@ use crate::{
use common::{
calendar::Calendar,
comp::{Collider, ForceUpdate, InventoryUpdate, Last, Ori, Pos, Vel},
event::EventBus,
outcome::Outcome,
region::{Event as RegionEvent, RegionMap},
resources::{PlayerPhysicsSettings, TimeOfDay},
@ -47,7 +48,7 @@ impl<'a> System<'a> for Sys {
WriteStorage<'a, ForceUpdate>,
WriteStorage<'a, InventoryUpdate>,
Write<'a, DeletedEntities>,
Write<'a, Vec<Outcome>>,
Read<'a, EventBus<Outcome>>,
);
const NAME: &'static str = "entity_sync";
@ -77,7 +78,7 @@ impl<'a> System<'a> for Sys {
mut force_updates,
mut inventory_updates,
mut deleted_entities,
mut outcomes,
outcomes,
): Self::SystemData,
) {
let tick = tick.0;
@ -366,6 +367,9 @@ impl<'a> System<'a> for Sys {
}
}
// Consume/clear the current outcomes and convert them to a vec
let outcomes = outcomes.recv_all().collect::<Vec<_>>();
// Sync outcomes
for (presence, pos, client) in (presences.maybe(), positions.maybe(), &clients).join() {
let is_near = |o_pos: Vec3<f32>| {
@ -381,11 +385,11 @@ impl<'a> System<'a> for Sys {
.filter(|o| o.get_pos().and_then(&is_near).unwrap_or(true))
.cloned()
.collect::<Vec<_>>();
if !outcomes.is_empty() {
client.send_fallible(ServerGeneral::Outcomes(outcomes));
}
}
outcomes.clear();
// Remove all force flags.
force_updates.clear();