Merge branch 'xMAC94x/better-metrics' into 'master'

Better Metrics for Server

See merge request veloren/veloren!1381
This commit is contained in:
Marcel
2020-09-16 16:58:22 +00:00
15 changed files with 452 additions and 68 deletions

View File

@ -28,6 +28,7 @@ pub mod figure;
pub mod generation; pub mod generation;
pub mod loadout_builder; pub mod loadout_builder;
pub mod lottery; pub mod lottery;
pub mod metrics;
pub mod msg; pub mod msg;
pub mod npc; pub mod npc;
pub mod outcome; pub mod outcome;

13
common/src/metrics.rs Normal file
View File

@ -0,0 +1,13 @@
use std::sync::atomic::AtomicI64;
#[derive(Default)]
pub struct SysMetrics {
pub agent_ns: AtomicI64,
pub mount_ns: AtomicI64,
pub controller_ns: AtomicI64,
pub character_behavior_ns: AtomicI64,
pub stats_ns: AtomicI64,
pub phys_ns: AtomicI64,
pub projectile_ns: AtomicI64,
pub combat_ns: AtomicI64,
}

View File

@ -1,6 +1,7 @@
use crate::{ use crate::{
comp, comp,
event::{EventBus, LocalEvent, ServerEvent}, event::{EventBus, LocalEvent, ServerEvent},
metrics::SysMetrics,
region::RegionMap, region::RegionMap,
sync::WorldSyncExt, sync::WorldSyncExt,
sys, sys,
@ -176,6 +177,7 @@ impl State {
ecs.insert(EventBus::<ServerEvent>::default()); ecs.insert(EventBus::<ServerEvent>::default());
ecs.insert(comp::group::GroupManager::default()); ecs.insert(comp::group::GroupManager::default());
ecs.insert(RegionMap::new()); ecs.insert(RegionMap::new());
ecs.insert(SysMetrics::default());
ecs ecs
} }

View File

@ -10,6 +10,7 @@ use crate::{
Vel, Vel,
}, },
event::{EventBus, ServerEvent}, event::{EventBus, ServerEvent},
metrics::SysMetrics,
path::{Chaser, TraversalConfig}, path::{Chaser, TraversalConfig},
span, span,
state::{DeltaTime, Time}, state::{DeltaTime, Time},
@ -34,6 +35,7 @@ impl<'a> System<'a> for Sys {
Read<'a, Time>, Read<'a, Time>,
Read<'a, DeltaTime>, Read<'a, DeltaTime>,
Read<'a, group::GroupManager>, Read<'a, group::GroupManager>,
ReadExpect<'a, SysMetrics>,
Write<'a, EventBus<ServerEvent>>, Write<'a, EventBus<ServerEvent>>,
Entities<'a>, Entities<'a>,
ReadStorage<'a, Pos>, ReadStorage<'a, Pos>,
@ -63,6 +65,7 @@ impl<'a> System<'a> for Sys {
time, time,
dt, dt,
group_manager, group_manager,
sys_metrics,
event_bus, event_bus,
entities, entities,
positions, positions,
@ -84,6 +87,7 @@ impl<'a> System<'a> for Sys {
invites, invites,
): Self::SystemData, ): Self::SystemData,
) { ) {
let start_time = std::time::Instant::now();
span!(_guard, "run", "agent::Sys::run"); span!(_guard, "run", "agent::Sys::run");
for ( for (
entity, entity,
@ -572,5 +576,9 @@ impl<'a> System<'a> for Sys {
.push(ControlEvent::GroupManip(GroupManip::Decline)); .push(ControlEvent::GroupManip(GroupManip::Decline));
} }
} }
sys_metrics.agent_ns.store(
start_time.elapsed().as_nanos() as i64,
std::sync::atomic::Ordering::Relaxed,
);
} }
} }

View File

@ -4,6 +4,7 @@ use crate::{
Loadout, Mounting, Ori, PhysicsState, Pos, StateUpdate, Stats, Vel, Loadout, Mounting, Ori, PhysicsState, Pos, StateUpdate, Stats, Vel,
}, },
event::{EventBus, LocalEvent, ServerEvent}, event::{EventBus, LocalEvent, ServerEvent},
metrics::SysMetrics,
span, span,
state::DeltaTime, state::DeltaTime,
states, states,
@ -13,7 +14,8 @@ use crate::{
use specs::{ use specs::{
hibitset, hibitset,
storage::{PairedStorage, SequentialRestriction}, storage::{PairedStorage, SequentialRestriction},
Entities, Entity, FlaggedStorage, Join, LazyUpdate, Read, ReadStorage, System, WriteStorage, Entities, Entity, FlaggedStorage, Join, LazyUpdate, Read, ReadExpect, ReadStorage, System,
WriteStorage,
}; };
use specs_idvs::IdvStorage; use specs_idvs::IdvStorage;
@ -144,6 +146,7 @@ impl<'a> System<'a> for Sys {
Read<'a, EventBus<LocalEvent>>, Read<'a, EventBus<LocalEvent>>,
Read<'a, DeltaTime>, Read<'a, DeltaTime>,
Read<'a, LazyUpdate>, Read<'a, LazyUpdate>,
ReadExpect<'a, SysMetrics>,
WriteStorage<'a, CharacterState>, WriteStorage<'a, CharacterState>,
WriteStorage<'a, Pos>, WriteStorage<'a, Pos>,
WriteStorage<'a, Vel>, WriteStorage<'a, Vel>,
@ -169,6 +172,7 @@ impl<'a> System<'a> for Sys {
local_bus, local_bus,
dt, dt,
updater, updater,
sys_metrics,
mut character_states, mut character_states,
mut positions, mut positions,
mut velocities, mut velocities,
@ -184,6 +188,7 @@ impl<'a> System<'a> for Sys {
mountings, mountings,
): Self::SystemData, ): Self::SystemData,
) { ) {
let start_time = std::time::Instant::now();
span!(_guard, "run", "character_behavior::Sys::run"); span!(_guard, "run", "character_behavior::Sys::run");
let mut server_emitter = server_bus.emitter(); let mut server_emitter = server_bus.emitter();
let mut local_emitter = local_bus.emitter(); let mut local_emitter = local_bus.emitter();
@ -287,5 +292,9 @@ impl<'a> System<'a> for Sys {
server_emitter.append(&mut state_update.server_events); server_emitter.append(&mut state_update.server_events);
incorporate_update(&mut tuple, state_update); incorporate_update(&mut tuple, state_update);
} }
sys_metrics.character_behavior_ns.store(
start_time.elapsed().as_nanos() as i64,
std::sync::atomic::Ordering::Relaxed,
);
} }
} }

View File

@ -4,11 +4,12 @@ use crate::{
Loadout, Ori, Pos, Scale, Stats, Loadout, Ori, Pos, Scale, Stats,
}, },
event::{EventBus, LocalEvent, ServerEvent}, event::{EventBus, LocalEvent, ServerEvent},
metrics::SysMetrics,
span, span,
sync::Uid, sync::Uid,
util::Dir, util::Dir,
}; };
use specs::{Entities, Join, Read, ReadStorage, System, WriteStorage}; use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, WriteStorage};
use vek::*; use vek::*;
pub const BLOCK_EFFICIENCY: f32 = 0.9; pub const BLOCK_EFFICIENCY: f32 = 0.9;
@ -23,6 +24,7 @@ impl<'a> System<'a> for Sys {
Entities<'a>, Entities<'a>,
Read<'a, EventBus<ServerEvent>>, Read<'a, EventBus<ServerEvent>>,
Read<'a, EventBus<LocalEvent>>, Read<'a, EventBus<LocalEvent>>,
ReadExpect<'a, SysMetrics>,
ReadStorage<'a, Uid>, ReadStorage<'a, Uid>,
ReadStorage<'a, Pos>, ReadStorage<'a, Pos>,
ReadStorage<'a, Ori>, ReadStorage<'a, Ori>,
@ -41,6 +43,7 @@ impl<'a> System<'a> for Sys {
entities, entities,
server_bus, server_bus,
local_bus, local_bus,
sys_metrics,
uids, uids,
positions, positions,
orientations, orientations,
@ -53,6 +56,7 @@ impl<'a> System<'a> for Sys {
character_states, character_states,
): Self::SystemData, ): Self::SystemData,
) { ) {
let start_time = std::time::Instant::now();
span!(_guard, "run", "combat::Sys::run"); span!(_guard, "run", "combat::Sys::run");
let mut server_emitter = server_bus.emitter(); let mut server_emitter = server_bus.emitter();
let mut local_emitter = local_bus.emitter(); let mut local_emitter = local_bus.emitter();
@ -160,5 +164,9 @@ impl<'a> System<'a> for Sys {
} }
} }
} }
sys_metrics.combat_ns.store(
start_time.elapsed().as_nanos() as i64,
std::sync::atomic::Ordering::Relaxed,
);
} }
} }

View File

@ -4,13 +4,14 @@ use crate::{
CharacterState, ControlEvent, Controller, InventoryManip, CharacterState, ControlEvent, Controller, InventoryManip,
}, },
event::{EventBus, LocalEvent, ServerEvent}, event::{EventBus, LocalEvent, ServerEvent},
metrics::SysMetrics,
span, span,
state::DeltaTime, state::DeltaTime,
sync::{Uid, UidAllocator}, sync::{Uid, UidAllocator},
}; };
use specs::{ use specs::{
saveload::{Marker, MarkerAllocator}, saveload::{Marker, MarkerAllocator},
Entities, Join, Read, ReadStorage, System, WriteStorage, Entities, Join, Read, ReadExpect, ReadStorage, System, WriteStorage,
}; };
// const CHARGE_COST: i32 = 200; // const CHARGE_COST: i32 = 200;
@ -26,6 +27,7 @@ impl<'a> System<'a> for Sys {
Read<'a, EventBus<ServerEvent>>, Read<'a, EventBus<ServerEvent>>,
Read<'a, EventBus<LocalEvent>>, Read<'a, EventBus<LocalEvent>>,
Read<'a, DeltaTime>, Read<'a, DeltaTime>,
ReadExpect<'a, SysMetrics>,
WriteStorage<'a, Controller>, WriteStorage<'a, Controller>,
WriteStorage<'a, CharacterState>, WriteStorage<'a, CharacterState>,
ReadStorage<'a, Uid>, ReadStorage<'a, Uid>,
@ -39,11 +41,13 @@ impl<'a> System<'a> for Sys {
server_bus, server_bus,
_local_bus, _local_bus,
_dt, _dt,
sys_metrics,
mut controllers, mut controllers,
mut character_states, mut character_states,
uids, uids,
): Self::SystemData, ): Self::SystemData,
) { ) {
let start_time = std::time::Instant::now();
span!(_guard, "run", "controller::Sys::run"); span!(_guard, "run", "controller::Sys::run");
let mut server_emitter = server_bus.emitter(); let mut server_emitter = server_bus.emitter();
@ -105,5 +109,9 @@ impl<'a> System<'a> for Sys {
} }
} }
} }
sys_metrics.controller_ns.store(
start_time.elapsed().as_nanos() as i64,
std::sync::atomic::Ordering::Relaxed,
);
} }
} }

View File

@ -1,11 +1,12 @@
use crate::{ use crate::{
comp::{Controller, MountState, Mounting, Ori, Pos, Vel}, comp::{Controller, MountState, Mounting, Ori, Pos, Vel},
metrics::SysMetrics,
span, span,
sync::UidAllocator, sync::UidAllocator,
}; };
use specs::{ use specs::{
saveload::{Marker, MarkerAllocator}, saveload::{Marker, MarkerAllocator},
Entities, Join, Read, System, WriteStorage, Entities, Join, Read, ReadExpect, System, WriteStorage,
}; };
use vek::*; use vek::*;
@ -15,6 +16,7 @@ impl<'a> System<'a> for Sys {
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
type SystemData = ( type SystemData = (
Read<'a, UidAllocator>, Read<'a, UidAllocator>,
ReadExpect<'a, SysMetrics>,
Entities<'a>, Entities<'a>,
WriteStorage<'a, Controller>, WriteStorage<'a, Controller>,
WriteStorage<'a, MountState>, WriteStorage<'a, MountState>,
@ -28,6 +30,7 @@ impl<'a> System<'a> for Sys {
&mut self, &mut self,
( (
uid_allocator, uid_allocator,
sys_metrics,
entities, entities,
mut controllers, mut controllers,
mut mount_state, mut mount_state,
@ -37,6 +40,7 @@ impl<'a> System<'a> for Sys {
mut orientations, mut orientations,
): Self::SystemData, ): Self::SystemData,
) { ) {
let start_time = std::time::Instant::now();
span!(_guard, "run", "mount::Sys::run"); span!(_guard, "run", "mount::Sys::run");
// Mounted entities. // Mounted entities.
for (entity, mut mount_states) in (&entities, &mut mount_state.restrict_mut()).join() { for (entity, mut mount_states) in (&entities, &mut mount_state.restrict_mut()).join() {
@ -88,5 +92,9 @@ impl<'a> System<'a> for Sys {
for entity in to_unmount { for entity in to_unmount {
mountings.remove(entity); mountings.remove(entity);
} }
sys_metrics.mount_ns.store(
start_time.elapsed().as_nanos() as i64,
std::sync::atomic::Ordering::Relaxed,
);
} }
} }

View File

@ -4,6 +4,7 @@ use crate::{
Sticky, Vel, Sticky, Vel,
}, },
event::{EventBus, ServerEvent}, event::{EventBus, ServerEvent},
metrics::SysMetrics,
span, span,
state::DeltaTime, state::DeltaTime,
sync::{Uid, UidAllocator}, sync::{Uid, UidAllocator},
@ -55,6 +56,7 @@ impl<'a> System<'a> for Sys {
ReadExpect<'a, TerrainGrid>, ReadExpect<'a, TerrainGrid>,
Read<'a, DeltaTime>, Read<'a, DeltaTime>,
Read<'a, UidAllocator>, Read<'a, UidAllocator>,
ReadExpect<'a, SysMetrics>,
Read<'a, EventBus<ServerEvent>>, Read<'a, EventBus<ServerEvent>>,
ReadStorage<'a, Scale>, ReadStorage<'a, Scale>,
ReadStorage<'a, Sticky>, ReadStorage<'a, Sticky>,
@ -80,6 +82,7 @@ impl<'a> System<'a> for Sys {
terrain, terrain,
dt, dt,
uid_allocator, uid_allocator,
sys_metrics,
event_bus, event_bus,
scales, scales,
stickies, stickies,
@ -95,6 +98,7 @@ impl<'a> System<'a> for Sys {
projectiles, projectiles,
): Self::SystemData, ): Self::SystemData,
) { ) {
let start_time = std::time::Instant::now();
span!(_guard, "run", "phys::Sys::run"); span!(_guard, "run", "phys::Sys::run");
let mut event_emitter = event_bus.emitter(); let mut event_emitter = event_bus.emitter();
@ -658,5 +662,9 @@ impl<'a> System<'a> for Sys {
land_on_grounds.into_iter().for_each(|(entity, vel)| { land_on_grounds.into_iter().for_each(|(entity, vel)| {
event_emitter.emit(ServerEvent::LandOnGround { entity, vel: vel.0 }); event_emitter.emit(ServerEvent::LandOnGround { entity, vel: vel.0 });
}); });
sys_metrics.phys_ns.store(
start_time.elapsed().as_nanos() as i64,
std::sync::atomic::Ordering::Relaxed,
);
} }
} }

View File

@ -4,12 +4,15 @@ use crate::{
Loadout, Ori, PhysicsState, Pos, Projectile, Vel, Loadout, Ori, PhysicsState, Pos, Projectile, Vel,
}, },
event::{EventBus, LocalEvent, ServerEvent}, event::{EventBus, LocalEvent, ServerEvent},
metrics::SysMetrics,
span, span,
state::DeltaTime, state::DeltaTime,
sync::UidAllocator, sync::UidAllocator,
util::Dir, util::Dir,
}; };
use specs::{saveload::MarkerAllocator, Entities, Join, Read, ReadStorage, System, WriteStorage}; use specs::{
saveload::MarkerAllocator, Entities, Join, Read, ReadExpect, ReadStorage, System, WriteStorage,
};
use std::time::Duration; use std::time::Duration;
use vek::*; use vek::*;
@ -23,6 +26,7 @@ impl<'a> System<'a> for Sys {
Read<'a, UidAllocator>, Read<'a, UidAllocator>,
Read<'a, EventBus<LocalEvent>>, Read<'a, EventBus<LocalEvent>>,
Read<'a, EventBus<ServerEvent>>, Read<'a, EventBus<ServerEvent>>,
ReadExpect<'a, SysMetrics>,
ReadStorage<'a, Pos>, ReadStorage<'a, Pos>,
ReadStorage<'a, PhysicsState>, ReadStorage<'a, PhysicsState>,
ReadStorage<'a, Vel>, ReadStorage<'a, Vel>,
@ -40,6 +44,7 @@ impl<'a> System<'a> for Sys {
uid_allocator, uid_allocator,
local_bus, local_bus,
server_bus, server_bus,
sys_metrics,
positions, positions,
physics_states, physics_states,
velocities, velocities,
@ -49,6 +54,7 @@ impl<'a> System<'a> for Sys {
loadouts, loadouts,
): Self::SystemData, ): Self::SystemData,
) { ) {
let start_time = std::time::Instant::now();
span!(_guard, "run", "projectile::Sys::run"); span!(_guard, "run", "projectile::Sys::run");
let mut local_emitter = local_bus.emitter(); let mut local_emitter = local_bus.emitter();
let mut server_emitter = server_bus.emitter(); let mut server_emitter = server_bus.emitter();
@ -190,5 +196,9 @@ impl<'a> System<'a> for Sys {
.checked_sub(Duration::from_secs_f32(dt.0)) .checked_sub(Duration::from_secs_f32(dt.0))
.unwrap_or_default(); .unwrap_or_default();
} }
sys_metrics.projectile_ns.store(
start_time.elapsed().as_nanos() as i64,
std::sync::atomic::Ordering::Relaxed,
);
} }
} }

View File

@ -1,10 +1,11 @@
use crate::{ use crate::{
comp::{CharacterState, Energy, EnergySource, HealthSource, Stats}, comp::{CharacterState, Energy, EnergySource, HealthSource, Stats},
event::{EventBus, ServerEvent}, event::{EventBus, ServerEvent},
metrics::SysMetrics,
span, span,
state::DeltaTime, state::DeltaTime,
}; };
use specs::{Entities, Join, Read, ReadStorage, System, WriteStorage}; use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, WriteStorage};
const ENERGY_REGEN_ACCEL: f32 = 10.0; const ENERGY_REGEN_ACCEL: f32 = 10.0;
@ -16,6 +17,7 @@ impl<'a> System<'a> for Sys {
Entities<'a>, Entities<'a>,
Read<'a, DeltaTime>, Read<'a, DeltaTime>,
Read<'a, EventBus<ServerEvent>>, Read<'a, EventBus<ServerEvent>>,
ReadExpect<'a, SysMetrics>,
ReadStorage<'a, CharacterState>, ReadStorage<'a, CharacterState>,
WriteStorage<'a, Stats>, WriteStorage<'a, Stats>,
WriteStorage<'a, Energy>, WriteStorage<'a, Energy>,
@ -23,8 +25,9 @@ impl<'a> System<'a> for Sys {
fn run( fn run(
&mut self, &mut self,
(entities, dt, server_event_bus, character_states, mut stats, mut energies): Self::SystemData, (entities, dt, server_event_bus, sys_metrics, character_states, mut stats, mut energies): Self::SystemData,
) { ) {
let start_time = std::time::Instant::now();
span!(_guard, "run", "stats::Sys::run"); span!(_guard, "run", "stats::Sys::run");
let mut server_event_emitter = server_event_bus.emitter(); let mut server_event_emitter = server_event_bus.emitter();
@ -133,5 +136,9 @@ impl<'a> System<'a> for Sys {
CharacterState::Roll { .. } | CharacterState::Climb { .. } => {}, CharacterState::Roll { .. } | CharacterState::Climb { .. } => {},
} }
} }
sys_metrics.stats_ns.store(
start_time.elapsed().as_nanos() as i64,
std::sync::atomic::Ordering::Relaxed,
);
} }
} }

View File

@ -1,3 +1,4 @@
use crate::metrics::ChunkGenMetrics;
#[cfg(not(feature = "worldgen"))] #[cfg(not(feature = "worldgen"))]
use crate::test_world::{IndexOwned, World}; use crate::test_world::{IndexOwned, World};
use common::{generation::ChunkSupplement, terrain::TerrainChunk}; use common::{generation::ChunkSupplement, terrain::TerrainChunk};
@ -21,15 +22,17 @@ pub struct ChunkGenerator {
chunk_tx: channel::Sender<ChunkGenResult>, chunk_tx: channel::Sender<ChunkGenResult>,
chunk_rx: channel::Receiver<ChunkGenResult>, chunk_rx: channel::Receiver<ChunkGenResult>,
pending_chunks: HashMap<Vec2<i32>, Arc<AtomicBool>>, pending_chunks: HashMap<Vec2<i32>, Arc<AtomicBool>>,
metrics: Arc<ChunkGenMetrics>,
} }
impl ChunkGenerator { impl ChunkGenerator {
#[allow(clippy::new_without_default)] // TODO: Pending review in #587 #[allow(clippy::new_without_default)] // TODO: Pending review in #587
pub fn new() -> Self { pub fn new(metrics: ChunkGenMetrics) -> Self {
let (chunk_tx, chunk_rx) = channel::unbounded(); let (chunk_tx, chunk_rx) = channel::unbounded();
Self { Self {
chunk_tx, chunk_tx,
chunk_rx, chunk_rx,
pending_chunks: HashMap::new(), pending_chunks: HashMap::new(),
metrics: Arc::new(metrics),
} }
} }
@ -49,6 +52,7 @@ impl ChunkGenerator {
let cancel = Arc::new(AtomicBool::new(false)); let cancel = Arc::new(AtomicBool::new(false));
v.insert(Arc::clone(&cancel)); v.insert(Arc::clone(&cancel));
let chunk_tx = self.chunk_tx.clone(); let chunk_tx = self.chunk_tx.clone();
self.metrics.chunks_requested.inc();
thread_pool.execute(move || { thread_pool.execute(move || {
let index = index.as_index_ref(); let index = index.as_index_ref();
let payload = world let payload = world
@ -61,6 +65,7 @@ impl ChunkGenerator {
pub fn recv_new_chunk(&mut self) -> Option<ChunkGenResult> { pub fn recv_new_chunk(&mut self) -> Option<ChunkGenResult> {
if let Ok((key, res)) = self.chunk_rx.try_recv() { if let Ok((key, res)) = self.chunk_rx.try_recv() {
self.pending_chunks.remove(&key); self.pending_chunks.remove(&key);
self.metrics.chunks_served.inc();
// TODO: do anything else if res is an Err? // TODO: do anything else if res is an Err?
Some((key, res)) Some((key, res))
} else { } else {
@ -75,12 +80,15 @@ impl ChunkGenerator {
pub fn cancel_if_pending(&mut self, key: Vec2<i32>) { pub fn cancel_if_pending(&mut self, key: Vec2<i32>) {
if let Some(cancel) = self.pending_chunks.remove(&key) { if let Some(cancel) = self.pending_chunks.remove(&key) {
cancel.store(true, Ordering::Relaxed); cancel.store(true, Ordering::Relaxed);
self.metrics.chunks_canceled.inc();
} }
} }
pub fn cancel_all(&mut self) { pub fn cancel_all(&mut self) {
let metrics = self.metrics.clone();
self.pending_chunks.drain().for_each(|(_, cancel)| { self.pending_chunks.drain().for_each(|(_, cancel)| {
cancel.store(true, Ordering::Relaxed); cancel.store(true, Ordering::Relaxed);
metrics.chunks_canceled.inc();
}); });
} }
} }

View File

@ -45,14 +45,14 @@ use common::{
use futures_executor::block_on; use futures_executor::block_on;
use futures_timer::Delay; use futures_timer::Delay;
use futures_util::{select, FutureExt}; use futures_util::{select, FutureExt};
use metrics::{ServerMetrics, TickMetrics}; use metrics::{ServerMetrics, StateTickMetrics, TickMetrics};
use network::{Network, Pid, ProtocolAddr}; use network::{Network, Pid, ProtocolAddr};
use persistence::character::{CharacterLoader, CharacterLoaderResponseType, CharacterUpdater}; use persistence::character::{CharacterLoader, CharacterLoaderResponseType, CharacterUpdater};
use specs::{join::Join, Builder, Entity as EcsEntity, RunNow, SystemData, WorldExt}; use specs::{join::Join, Builder, Entity as EcsEntity, RunNow, SystemData, WorldExt};
use std::{ use std::{
i32, i32,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
sync::Arc, sync::{atomic::Ordering, Arc},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
#[cfg(not(feature = "worldgen"))] #[cfg(not(feature = "worldgen"))]
@ -90,6 +90,7 @@ pub struct Server {
metrics: ServerMetrics, metrics: ServerMetrics,
tick_metrics: TickMetrics, tick_metrics: TickMetrics,
state_tick_metrics: StateTickMetrics,
} }
impl Server { impl Server {
@ -97,6 +98,11 @@ impl Server {
#[allow(clippy::expect_fun_call)] // TODO: Pending review in #587 #[allow(clippy::expect_fun_call)] // TODO: Pending review in #587
#[allow(clippy::needless_update)] // TODO: Pending review in #587 #[allow(clippy::needless_update)] // TODO: Pending review in #587
pub fn new(settings: ServerSettings) -> Result<Self, Error> { pub fn new(settings: ServerSettings) -> Result<Self, Error> {
let (chunk_gen_metrics, registry_chunk) = metrics::ChunkGenMetrics::new().unwrap();
let (network_request_metrics, registry_network) =
metrics::NetworkRequestMetrics::new().unwrap();
let (player_metrics, registry_player) = metrics::PlayerMetrics::new().unwrap();
let mut state = State::default(); let mut state = State::default();
state.ecs_mut().insert(settings.clone()); state.ecs_mut().insert(settings.clone());
state.ecs_mut().insert(EventBus::<ServerEvent>::default()); state.ecs_mut().insert(EventBus::<ServerEvent>::default());
@ -104,7 +110,11 @@ impl Server {
.ecs_mut() .ecs_mut()
.insert(LoginProvider::new(settings.auth_server_address.clone())); .insert(LoginProvider::new(settings.auth_server_address.clone()));
state.ecs_mut().insert(Tick(0)); state.ecs_mut().insert(Tick(0));
state.ecs_mut().insert(ChunkGenerator::new()); state.ecs_mut().insert(network_request_metrics);
state.ecs_mut().insert(player_metrics);
state
.ecs_mut()
.insert(ChunkGenerator::new(chunk_gen_metrics));
state state
.ecs_mut() .ecs_mut()
.insert(CharacterUpdater::new(settings.persistence_db_dir.clone())); .insert(CharacterUpdater::new(settings.persistence_db_dir.clone()));
@ -275,11 +285,15 @@ impl Server {
let mut metrics = ServerMetrics::new(); let mut metrics = ServerMetrics::new();
// register all metrics submodules here // register all metrics submodules here
let tick_metrics = TickMetrics::new(metrics.tick_clone()) let (tick_metrics, registry_tick) = TickMetrics::new(metrics.tick_clone())
.expect("Failed to initialize server tick metrics submodule."); .expect("Failed to initialize server tick metrics submodule.");
tick_metrics let (state_tick_metrics, registry_state) = StateTickMetrics::new().unwrap();
.register(&metrics.registry())
.expect("failed to register tick metrics"); registry_chunk(&metrics.registry()).expect("failed to register chunk gen metrics");
registry_network(&metrics.registry()).expect("failed to register network request metrics");
registry_player(&metrics.registry()).expect("failed to register player metrics");
registry_tick(&metrics.registry()).expect("failed to register tick metrics");
registry_state(&metrics.registry()).expect("failed to register state metrics");
let thread_pool = ThreadPoolBuilder::new() let thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".to_string()) .name("veloren-worker".to_string())
@ -303,6 +317,7 @@ impl Server {
metrics, metrics,
tick_metrics, tick_metrics,
state_tick_metrics,
}; };
// Run pending DB migrations (if any) // Run pending DB migrations (if any)
@ -645,10 +660,60 @@ impl Server {
.with_label_values(&["persistence:stats"]) .with_label_values(&["persistence:stats"])
.set(stats_persistence_nanos); .set(stats_persistence_nanos);
//detailed state metrics
{
let res = self
.state
.ecs()
.read_resource::<common::metrics::SysMetrics>();
let c = &self.state_tick_metrics.state_tick_time_count;
let agent_ns = res.agent_ns.load(Ordering::Relaxed);
let mount_ns = res.mount_ns.load(Ordering::Relaxed);
let controller_ns = res.controller_ns.load(Ordering::Relaxed);
let character_behavior_ns = res.character_behavior_ns.load(Ordering::Relaxed);
let stats_ns = res.stats_ns.load(Ordering::Relaxed);
let phys_ns = res.phys_ns.load(Ordering::Relaxed);
let projectile_ns = res.projectile_ns.load(Ordering::Relaxed);
let combat_ns = res.combat_ns.load(Ordering::Relaxed);
c.with_label_values(&[common::sys::AGENT_SYS])
.inc_by(agent_ns);
c.with_label_values(&[common::sys::MOUNT_SYS])
.inc_by(mount_ns);
c.with_label_values(&[common::sys::CONTROLLER_SYS])
.inc_by(controller_ns);
c.with_label_values(&[common::sys::CHARACTER_BEHAVIOR_SYS])
.inc_by(character_behavior_ns);
c.with_label_values(&[common::sys::STATS_SYS])
.inc_by(stats_ns);
c.with_label_values(&[common::sys::PHYS_SYS])
.inc_by(phys_ns);
c.with_label_values(&[common::sys::PROJECTILE_SYS])
.inc_by(projectile_ns);
c.with_label_values(&[common::sys::COMBAT_SYS])
.inc_by(combat_ns);
const NANOSEC_PER_SEC: f64 = Duration::from_secs(1).as_nanos() as f64;
let h = &self.state_tick_metrics.state_tick_time_hist;
h.with_label_values(&[common::sys::AGENT_SYS])
.observe(agent_ns as f64 / NANOSEC_PER_SEC);
h.with_label_values(&[common::sys::MOUNT_SYS])
.observe(mount_ns as f64 / NANOSEC_PER_SEC);
h.with_label_values(&[common::sys::CONTROLLER_SYS])
.observe(controller_ns as f64 / NANOSEC_PER_SEC);
h.with_label_values(&[common::sys::CHARACTER_BEHAVIOR_SYS])
.observe(character_behavior_ns as f64 / NANOSEC_PER_SEC);
h.with_label_values(&[common::sys::STATS_SYS])
.observe(stats_ns as f64 / NANOSEC_PER_SEC);
h.with_label_values(&[common::sys::PHYS_SYS])
.observe(phys_ns as f64 / NANOSEC_PER_SEC);
h.with_label_values(&[common::sys::PROJECTILE_SYS])
.observe(projectile_ns as f64 / NANOSEC_PER_SEC);
h.with_label_values(&[common::sys::COMBAT_SYS])
.observe(combat_ns as f64 / NANOSEC_PER_SEC);
}
// Report other info // Report other info
self.tick_metrics
.player_online
.set(self.state.ecs().read_storage::<Client>().join().count() as i64);
self.tick_metrics self.tick_metrics
.time_of_day .time_of_day
.set(self.state.ecs().read_resource::<TimeOfDay>().0); .set(self.state.ecs().read_resource::<TimeOfDay>().0);
@ -813,7 +878,9 @@ impl Server {
.is_some() .is_some()
} }
pub fn number_of_players(&self) -> i64 { self.tick_metrics.player_online.get() } pub fn number_of_players(&self) -> i64 {
self.state.ecs().read_storage::<Client>().join().count() as i64
}
} }
impl Drop for Server { impl Drop for Server {

View File

@ -1,4 +1,7 @@
use prometheus::{Encoder, Gauge, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder}; use prometheus::{
Encoder, Gauge, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
Opts, Registry, TextEncoder,
};
use std::{ use std::{
convert::TryInto, convert::TryInto,
error::Error, error::Error,
@ -12,10 +15,35 @@ use std::{
}; };
use tracing::{debug, error}; use tracing::{debug, error};
type RegistryFn = Box<dyn FnOnce(&Registry) -> Result<(), prometheus::Error>>;
pub struct StateTickMetrics {
// Counter will only give us granularity on pool speed (2s?) for actuall spike detection we
// need the Historgram
pub state_tick_time_hist: HistogramVec,
pub state_tick_time_count: IntCounterVec,
}
pub struct PlayerMetrics {
pub players_connected: IntCounter,
pub players_disconnected: IntCounterVec, // timeout, network_error, gracefully
}
pub struct NetworkRequestMetrics {
pub chunks_request_dropped: IntCounter,
pub chunks_served_from_memory: IntCounter,
pub chunks_generation_triggered: IntCounter,
}
pub struct ChunkGenMetrics {
pub chunks_requested: IntCounter,
pub chunks_served: IntCounter,
pub chunks_canceled: IntCounter,
}
pub struct TickMetrics { pub struct TickMetrics {
pub chonks_count: IntGauge, pub chonks_count: IntGauge,
pub chunks_count: IntGauge, pub chunks_count: IntGauge,
pub player_online: IntGauge,
pub entity_count: IntGauge, pub entity_count: IntGauge,
pub tick_time: IntGaugeVec, pub tick_time: IntGaugeVec,
pub build_info: IntGauge, pub build_info: IntGauge,
@ -32,11 +60,173 @@ pub struct ServerMetrics {
tick: Arc<AtomicU64>, tick: Arc<AtomicU64>,
} }
impl StateTickMetrics {
pub fn new() -> Result<(Self, RegistryFn), prometheus::Error> {
let bucket = vec![
Duration::from_micros(1).as_secs_f64(),
Duration::from_micros(10).as_secs_f64(),
Duration::from_micros(100).as_secs_f64(),
Duration::from_micros(200).as_secs_f64(),
Duration::from_micros(400).as_secs_f64(),
Duration::from_millis(2).as_secs_f64(),
Duration::from_millis(5).as_secs_f64(),
Duration::from_millis(10).as_secs_f64(),
Duration::from_millis(20).as_secs_f64(),
Duration::from_millis(30).as_secs_f64(),
Duration::from_millis(50).as_secs_f64(),
Duration::from_millis(100).as_secs_f64(),
];
let state_tick_time_hist = HistogramVec::new(
HistogramOpts::new(
"state_tick_time_hist",
"shows the number of clients joined to the server",
)
.buckets(bucket),
&["system"],
)?;
let state_tick_time_count = IntCounterVec::new(
Opts::new(
"state_tick_time_count",
"shows the detailed time inside the `state_tick` for each system",
),
&["system"],
)?;
let state_tick_time_hist_clone = state_tick_time_hist.clone();
let state_tick_time_count_clone = state_tick_time_count.clone();
let f = |registry: &Registry| {
registry.register(Box::new(state_tick_time_hist_clone))?;
registry.register(Box::new(state_tick_time_count_clone))?;
Ok(())
};
Ok((
Self {
state_tick_time_hist,
state_tick_time_count,
},
Box::new(f),
))
}
}
impl PlayerMetrics {
pub fn new() -> Result<(Self, RegistryFn), prometheus::Error> {
let players_connected = IntCounter::with_opts(Opts::new(
"players_connected",
"shows the number of clients joined to the server",
))?;
let players_disconnected = IntCounterVec::new(
Opts::new(
"players_disconnected",
"shows the number of clients disconnected from the server and the reason",
),
&["reason"],
)?;
let players_connected_clone = players_connected.clone();
let players_disconnected_clone = players_disconnected.clone();
let f = |registry: &Registry| {
registry.register(Box::new(players_connected_clone))?;
registry.register(Box::new(players_disconnected_clone))?;
Ok(())
};
Ok((
Self {
players_connected,
players_disconnected,
},
Box::new(f),
))
}
}
impl NetworkRequestMetrics {
pub fn new() -> Result<(Self, RegistryFn), prometheus::Error> {
let chunks_request_dropped = IntCounter::with_opts(Opts::new(
"chunks_request_dropped",
"number of all chunk request dropped, e.g because the player was to far away",
))?;
let chunks_served_from_memory = IntCounter::with_opts(Opts::new(
"chunks_served_from_memory",
"number of all requested chunks already generated and could be served out of cache",
))?;
let chunks_generation_triggered = IntCounter::with_opts(Opts::new(
"chunks_generation_triggered",
"number of all chunks that were requested and needs to be generated",
))?;
let chunks_request_dropped_clone = chunks_request_dropped.clone();
let chunks_served_from_memory_clone = chunks_served_from_memory.clone();
let chunks_generation_triggered_clone = chunks_generation_triggered.clone();
let f = |registry: &Registry| {
registry.register(Box::new(chunks_request_dropped_clone))?;
registry.register(Box::new(chunks_served_from_memory_clone))?;
registry.register(Box::new(chunks_generation_triggered_clone))?;
Ok(())
};
Ok((
Self {
chunks_request_dropped,
chunks_served_from_memory,
chunks_generation_triggered,
},
Box::new(f),
))
}
}
impl ChunkGenMetrics {
pub fn new() -> Result<(Self, RegistryFn), prometheus::Error> {
let chunks_requested = IntCounter::with_opts(Opts::new(
"chunks_requested",
"number of all chunks requested on the server",
))?;
let chunks_served = IntCounter::with_opts(Opts::new(
"chunks_served",
"number of all requested chunks already served on the server",
))?;
let chunks_canceled = IntCounter::with_opts(Opts::new(
"chunks_canceled",
"number of all canceled chunks on the server",
))?;
let chunks_requested_clone = chunks_requested.clone();
let chunks_served_clone = chunks_served.clone();
let chunks_canceled_clone = chunks_canceled.clone();
let f = |registry: &Registry| {
registry.register(Box::new(chunks_requested_clone))?;
registry.register(Box::new(chunks_served_clone))?;
registry.register(Box::new(chunks_canceled_clone))?;
Ok(())
};
Ok((
Self {
chunks_requested,
chunks_served,
chunks_canceled,
},
Box::new(f),
))
}
}
impl TickMetrics { impl TickMetrics {
pub fn new(tick: Arc<AtomicU64>) -> Result<Self, Box<dyn Error>> { pub fn new(tick: Arc<AtomicU64>) -> Result<(Self, RegistryFn), Box<dyn Error>> {
let player_online = IntGauge::with_opts(Opts::new( let chonks_count = IntGauge::with_opts(Opts::new(
"player_online", "chonks_count",
"shows the number of clients connected to the server", "number of all chonks currently active on the server",
))?;
let chunks_count = IntGauge::with_opts(Opts::new(
"chunks_count",
"number of all chunks currently active on the server",
))?; ))?;
let entity_count = IntGauge::with_opts(Opts::new( let entity_count = IntGauge::with_opts(Opts::new(
"entity_count", "entity_count",
@ -56,14 +246,6 @@ impl TickMetrics {
"light_count", "light_count",
"number of all lights currently active on the server", "number of all lights currently active on the server",
))?; ))?;
let chonks_count = IntGauge::with_opts(Opts::new(
"chonks_count",
"number of all chonks currently active on the server",
))?;
let chunks_count = IntGauge::with_opts(Opts::new(
"chunks_count",
"number of all chunks currently active on the server",
))?;
let tick_time = IntGaugeVec::new( let tick_time = IntGaugeVec::new(
Opts::new("tick_time", "time in ns required for a tick of the server"), Opts::new("tick_time", "time in ns required for a tick of the server"),
&["period"], &["period"],
@ -74,10 +256,31 @@ impl TickMetrics {
.expect("Time went backwards"); .expect("Time went backwards");
start_time.set(since_the_epoch.as_secs().try_into()?); start_time.set(since_the_epoch.as_secs().try_into()?);
Ok(Self { let chonks_count_clone = chonks_count.clone();
let chunks_count_clone = chunks_count.clone();
let entity_count_clone = entity_count.clone();
let build_info_clone = build_info.clone();
let start_time_clone = start_time.clone();
let time_of_day_clone = time_of_day.clone();
let light_count_clone = light_count.clone();
let tick_time_clone = tick_time.clone();
let f = |registry: &Registry| {
registry.register(Box::new(chonks_count_clone))?;
registry.register(Box::new(chunks_count_clone))?;
registry.register(Box::new(entity_count_clone))?;
registry.register(Box::new(build_info_clone))?;
registry.register(Box::new(start_time_clone))?;
registry.register(Box::new(time_of_day_clone))?;
registry.register(Box::new(light_count_clone))?;
registry.register(Box::new(tick_time_clone))?;
Ok(())
};
Ok((
Self {
chonks_count, chonks_count,
chunks_count, chunks_count,
player_online,
entity_count, entity_count,
tick_time, tick_time,
build_info, build_info,
@ -85,19 +288,9 @@ impl TickMetrics {
time_of_day, time_of_day,
light_count, light_count,
tick, tick,
}) },
} Box::new(f),
))
pub fn register(&self, registry: &Registry) -> Result<(), Box<dyn Error>> {
registry.register(Box::new(self.player_online.clone()))?;
registry.register(Box::new(self.entity_count.clone()))?;
registry.register(Box::new(self.build_info.clone()))?;
registry.register(Box::new(self.start_time.clone()))?;
registry.register(Box::new(self.time_of_day.clone()))?;
registry.register(Box::new(self.chonks_count.clone()))?;
registry.register(Box::new(self.chunks_count.clone()))?;
registry.register(Box::new(self.tick_time.clone()))?;
Ok(())
} }
pub fn is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 } pub fn is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 }

View File

@ -1,7 +1,11 @@
use super::SysTimer; use super::SysTimer;
use crate::{ use crate::{
alias_validator::AliasValidator, client::Client, login_provider::LoginProvider, alias_validator::AliasValidator,
persistence::character::CharacterLoader, ServerSettings, client::Client,
login_provider::LoginProvider,
metrics::{NetworkRequestMetrics, PlayerMetrics},
persistence::character::CharacterLoader,
ServerSettings,
}; };
use common::{ use common::{
comp::{ comp::{
@ -43,6 +47,8 @@ impl Sys {
cnt: &mut u64, cnt: &mut u64,
character_loader: &ReadExpect<'_, CharacterLoader>, character_loader: &ReadExpect<'_, CharacterLoader>,
terrain: &ReadExpect<'_, TerrainGrid>, terrain: &ReadExpect<'_, TerrainGrid>,
network_metrics: &ReadExpect<'_, NetworkRequestMetrics>,
player_metrics: &ReadExpect<'_, PlayerMetrics>,
uids: &ReadStorage<'_, Uid>, uids: &ReadStorage<'_, Uid>,
can_build: &ReadStorage<'_, CanBuild>, can_build: &ReadStorage<'_, CanBuild>,
force_updates: &ReadStorage<'_, ForceUpdate>, force_updates: &ReadStorage<'_, ForceUpdate>,
@ -133,6 +139,7 @@ impl Sys {
// Add to list to notify all clients of the new player // Add to list to notify all clients of the new player
new_players.push(entity); new_players.push(entity);
player_metrics.players_connected.inc();
}, },
// Use RequestState instead (No need to send `player` again). // Use RequestState instead (No need to send `player` again).
_ => client.error_state(RequestStateError::Impossible), _ => client.error_state(RequestStateError::Impossible),
@ -312,6 +319,7 @@ impl Sys {
}, },
ClientMsg::TerrainChunkRequest { key } => match client.client_state { ClientMsg::TerrainChunkRequest { key } => match client.client_state {
ClientState::Connected | ClientState::Registered => { ClientState::Connected | ClientState::Registered => {
network_metrics.chunks_request_dropped.inc();
client.error_state(RequestStateError::Impossible); client.error_state(RequestStateError::Impossible);
}, },
ClientState::Spectator | ClientState::Character => { ClientState::Spectator | ClientState::Character => {
@ -329,12 +337,20 @@ impl Sys {
}; };
if in_vd { if in_vd {
match terrain.get_key(key) { match terrain.get_key(key) {
Some(chunk) => client.notify(ServerMsg::TerrainChunkUpdate { Some(chunk) => {
network_metrics.chunks_served_from_memory.inc();
client.notify(ServerMsg::TerrainChunkUpdate {
key, key,
chunk: Ok(Box::new(chunk.clone())), chunk: Ok(Box::new(chunk.clone())),
}), })
None => server_emitter.emit(ServerEvent::ChunkRequest(entity, key)), },
None => {
network_metrics.chunks_generation_triggered.inc();
server_emitter.emit(ServerEvent::ChunkRequest(entity, key))
},
} }
} else {
network_metrics.chunks_request_dropped.inc();
} }
}, },
ClientState::Pending => {}, ClientState::Pending => {},
@ -347,6 +363,10 @@ impl Sys {
}, },
ClientMsg::Terminate => { ClientMsg::Terminate => {
debug!(?entity, "Client send message to termitate session"); debug!(?entity, "Client send message to termitate session");
player_metrics
.players_disconnected
.with_label_values(&["gracefully"])
.inc();
server_emitter.emit(ServerEvent::ClientDisconnect(entity)); server_emitter.emit(ServerEvent::ClientDisconnect(entity));
break Ok(()); break Ok(());
}, },
@ -408,6 +428,8 @@ impl<'a> System<'a> for Sys {
Read<'a, Time>, Read<'a, Time>,
ReadExpect<'a, CharacterLoader>, ReadExpect<'a, CharacterLoader>,
ReadExpect<'a, TerrainGrid>, ReadExpect<'a, TerrainGrid>,
ReadExpect<'a, NetworkRequestMetrics>,
ReadExpect<'a, PlayerMetrics>,
Write<'a, SysTimer<Self>>, Write<'a, SysTimer<Self>>,
ReadStorage<'a, Uid>, ReadStorage<'a, Uid>,
ReadStorage<'a, CanBuild>, ReadStorage<'a, CanBuild>,
@ -439,6 +461,8 @@ impl<'a> System<'a> for Sys {
time, time,
character_loader, character_loader,
terrain, terrain,
network_metrics,
player_metrics,
mut timer, mut timer,
uids, uids,
can_build, can_build,
@ -489,9 +513,7 @@ impl<'a> System<'a> for Sys {
let network_err: Result<(), crate::error::Error> = block_on(async { let network_err: Result<(), crate::error::Error> = block_on(async {
//TIMEOUT 0.02 ms for msg handling //TIMEOUT 0.02 ms for msg handling
select!( let work_future = Self::handle_client_msg(
_ = Delay::new(std::time::Duration::from_micros(20)).fuse() => Ok(()),
err = Self::handle_client_msg(
&mut server_emitter, &mut server_emitter,
&mut new_chat_msgs, &mut new_chat_msgs,
&player_list, &player_list,
@ -499,9 +521,10 @@ impl<'a> System<'a> for Sys {
entity, entity,
client, client,
&mut cnt, &mut cnt,
&character_loader, &character_loader,
&terrain, &terrain,
&network_metrics,
&player_metrics,
&uids, &uids,
&can_build, &can_build,
&force_updates, &force_updates,
@ -518,7 +541,10 @@ impl<'a> System<'a> for Sys {
&mut controllers, &mut controllers,
&settings, &settings,
&alias_validator, &alias_validator,
).fuse() => err, );
select!(
_ = Delay::new(std::time::Duration::from_micros(20)).fuse() => Ok(()),
err = work_future.fuse() => err,
) )
}); });
@ -529,11 +555,19 @@ impl<'a> System<'a> for Sys {
// Timeout // Timeout
{ {
info!(?entity, "timeout error with client, disconnecting"); info!(?entity, "timeout error with client, disconnecting");
player_metrics
.players_disconnected
.with_label_values(&["timeout"])
.inc();
server_emitter.emit(ServerEvent::ClientDisconnect(entity)); server_emitter.emit(ServerEvent::ClientDisconnect(entity));
} else if network_err.is_err() } else if network_err.is_err()
// Postbox error // Postbox error
{ {
debug!(?entity, "postbox error with client, disconnecting"); debug!(?entity, "postbox error with client, disconnecting");
player_metrics
.players_disconnected
.with_label_values(&["network_error"])
.inc();
server_emitter.emit(ServerEvent::ClientDisconnect(entity)); server_emitter.emit(ServerEvent::ClientDisconnect(entity));
} else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 * 0.5 { } else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 * 0.5 {
// Try pinging the client if the timeout is nearing. // Try pinging the client if the timeout is nearing.