diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs index 445d4f7d1c..1251aae4b5 100644 --- a/common/src/slowjob.rs +++ b/common/src/slowjob.rs @@ -3,6 +3,7 @@ use rayon::ThreadPool; use std::{ collections::VecDeque, sync::{Arc, Mutex}, + time::Instant, }; use tracing::{error, warn}; @@ -39,7 +40,7 @@ use tracing::{error, warn}; /// .num_threads(16) /// .build() /// .unwrap(); -/// let pool = SlowJobPool::new(3, Arc::new(threadpool)); +/// let pool = SlowJobPool::new(3, 10, Arc::new(threadpool)); /// pool.configure("CHUNK_GENERATOR", |n| n / 2); /// pool.spawn("CHUNK_GENERATOR", move || println!("this is a job")); /// ``` @@ -61,6 +62,8 @@ struct InternalSlowJobPool { last_spawned_configs: Vec, global_spawned_and_running: u64, global_limit: u64, + jobs_metrics_cnt: usize, + jobs_metrics: HashMap>, threadpool: Arc, internal: Option>>, } @@ -77,6 +80,12 @@ struct Queue { task: Box, } +pub struct JobMetrics { + pub queue_created: Instant, + pub execution_start: Instant, + pub execution_end: Instant, +} + impl Queue { fn new(name: &str, id: u64, internal: &Arc>, f: F) -> Self where @@ -84,16 +93,24 @@ impl Queue { { let internal = Arc::clone(internal); let name_cloned = name.to_owned(); + let queue_created = Instant::now(); Self { id, name: name.to_owned(), task: Box::new(move || { common_base::prof_span!(_guard, &name_cloned); + let execution_start = Instant::now(); f(); + let execution_end = Instant::now(); + let metrics = JobMetrics { + queue_created, + execution_start, + execution_end, + }; // directly maintain the next task afterwards { let mut lock = internal.lock().expect("slowjob lock poisoned"); - lock.finish(&name_cloned); + lock.finish(&name_cloned, metrics); lock.spawn_queued(); } }), @@ -102,7 +119,11 @@ impl Queue { } impl InternalSlowJobPool { - pub fn new(global_limit: u64, threadpool: Arc) -> Arc> { + pub fn new( + global_limit: u64, + jobs_metrics_cnt: usize, + threadpool: Arc, + ) -> Arc> { let link = Arc::new(Mutex::new(Self { next_id: 0, queue: HashMap::new(), @@ -110,6 +131,8 @@ impl InternalSlowJobPool { last_spawned_configs: Vec::new(), global_spawned_and_running: 0, global_limit: global_limit.max(1), + jobs_metrics_cnt, + jobs_metrics: HashMap::new(), threadpool, internal: None, })); @@ -247,7 +270,12 @@ impl InternalSlowJobPool { } } - fn finish(&mut self, name: &str) { + fn finish(&mut self, name: &str, metrics: JobMetrics) { + let metric = self.jobs_metrics.entry(name.to_string()).or_default(); + + if metric.len() < self.jobs_metrics_cnt { + metric.push(metrics); + } self.global_spawned_and_running -= 1; if let Some(c) = self.configs.get_mut(name) { c.local_spawned_and_running -= 1; @@ -293,12 +321,16 @@ impl InternalSlowJobPool { } } } + + pub fn take_metrics(&mut self) -> HashMap> { + core::mem::take(&mut self.jobs_metrics) + } } impl SlowJobPool { - pub fn new(global_limit: u64, threadpool: Arc) -> Self { + pub fn new(global_limit: u64, jobs_metrics_cnt: usize, threadpool: Arc) -> Self { Self { - internal: InternalSlowJobPool::new(global_limit, threadpool), + internal: InternalSlowJobPool::new(global_limit, jobs_metrics_cnt, threadpool), } } @@ -356,6 +388,13 @@ impl SlowJobPool { } Err(job) } + + pub fn take_metrics(&self) -> HashMap> { + self.internal + .lock() + .expect("lock poisoned while take_metrics") + .take_metrics() + } } #[cfg(test)] @@ -366,6 +405,7 @@ mod tests { fn mock_pool( pool_threads: usize, global_threads: u64, + metrics: usize, foo: u64, bar: u64, baz: u64, @@ -374,7 +414,7 @@ mod tests { .num_threads(pool_threads) .build() .unwrap(); - let pool = SlowJobPool::new(global_threads, Arc::new(threadpool)); + let pool = SlowJobPool::new(global_threads, metrics, Arc::new(threadpool)); if foo != 0 { pool.configure("FOO", |x| x / foo); } @@ -389,7 +429,7 @@ mod tests { #[test] fn simple_queue() { - let pool = mock_pool(4, 4, 1, 0, 0); + let pool = mock_pool(4, 4, 0, 1, 0, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 1u64)] .iter() @@ -406,7 +446,7 @@ mod tests { #[test] fn multiple_queue() { - let pool = mock_pool(4, 4, 1, 0, 0); + let pool = mock_pool(4, 4, 0, 1, 0, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 2u64)] .iter() @@ -424,7 +464,7 @@ mod tests { #[test] fn limit_queue() { - let pool = mock_pool(5, 5, 1, 0, 0); + let pool = mock_pool(5, 5, 0, 1, 0, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 80u64)] .iter() @@ -444,7 +484,7 @@ mod tests { #[test] fn simple_queue_2() { - let pool = mock_pool(4, 4, 1, 1, 0); + let pool = mock_pool(4, 4, 0, 1, 1, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 1u64), ("BAR", 1u64)] .iter() @@ -462,7 +502,7 @@ mod tests { #[test] fn multiple_queue_3() { - let pool = mock_pool(4, 4, 1, 1, 0); + let pool = mock_pool(4, 4, 0, 1, 1, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 2u64), ("BAR", 2u64)] .iter() @@ -480,7 +520,7 @@ mod tests { #[test] fn multiple_queue_4() { - let pool = mock_pool(4, 4, 2, 1, 0); + let pool = mock_pool(4, 4, 0, 2, 1, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 3u64), ("BAR", 3u64)] .iter() @@ -498,7 +538,7 @@ mod tests { #[test] fn multiple_queue_5() { - let pool = mock_pool(4, 4, 2, 1, 0); + let pool = mock_pool(4, 4, 0, 2, 1, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 5u64), ("BAR", 5u64)] .iter() @@ -516,7 +556,7 @@ mod tests { #[test] fn multiple_queue_6() { - let pool = mock_pool(40, 40, 2, 1, 0); + let pool = mock_pool(40, 40, 0, 2, 1, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 5u64), ("BAR", 5u64)] .iter() @@ -534,7 +574,7 @@ mod tests { #[test] fn roundrobin() { - let pool = mock_pool(4, 4, 2, 2, 0); + let pool = mock_pool(4, 4, 0, 2, 2, 0); let queue_data = [("FOO", 5u64), ("BAR", 5u64)] .iter() .map(|(n, c)| ((*n).to_owned(), *c)) @@ -583,14 +623,14 @@ mod tests { #[test] #[should_panic] fn unconfigured() { - let pool = mock_pool(4, 4, 2, 1, 0); + let pool = mock_pool(4, 4, 0, 2, 1, 0); let mut internal = pool.internal.lock().unwrap(); internal.spawn("UNCONFIGURED", || println!()); } #[test] fn correct_spawn_doesnt_panic() { - let pool = mock_pool(4, 4, 2, 1, 0); + let pool = mock_pool(4, 4, 0, 2, 1, 0); let mut internal = pool.internal.lock().unwrap(); internal.spawn("FOO", || println!("foo")); internal.spawn("BAR", || println!("bar")); @@ -598,7 +638,7 @@ mod tests { #[test] fn can_spawn() { - let pool = mock_pool(4, 4, 2, 1, 0); + let pool = mock_pool(4, 4, 0, 2, 1, 0); let internal = pool.internal.lock().unwrap(); assert!(internal.can_spawn("FOO")); assert!(internal.can_spawn("BAR")); @@ -606,7 +646,7 @@ mod tests { #[test] fn try_run_works() { - let pool = mock_pool(4, 4, 2, 1, 0); + let pool = mock_pool(4, 4, 0, 2, 1, 0); pool.try_run("FOO", || println!("foo")).unwrap(); pool.try_run("BAR", || println!("bar")).unwrap(); } @@ -614,7 +654,7 @@ mod tests { #[test] fn try_run_exhausted() { use std::{thread::sleep, time::Duration}; - let pool = mock_pool(8, 8, 4, 2, 0); + let pool = mock_pool(8, 8, 0, 4, 2, 0); let func = || loop { sleep(Duration::from_secs(1)) }; @@ -633,7 +673,7 @@ mod tests { #[test] fn actually_runs_1() { - let pool = mock_pool(4, 4, 0, 0, 1); + let pool = mock_pool(4, 4, 0, 0, 0, 1); let barrier = Arc::new(std::sync::Barrier::new(2)); let barrier_clone = Arc::clone(&barrier); pool.try_run("BAZ", move || { @@ -645,7 +685,7 @@ mod tests { #[test] fn actually_runs_2() { - let pool = mock_pool(4, 4, 0, 0, 1); + let pool = mock_pool(4, 4, 0, 0, 0, 1); let barrier = Arc::new(std::sync::Barrier::new(2)); let barrier_clone = Arc::clone(&barrier); pool.spawn("BAZ", move || { @@ -660,7 +700,7 @@ mod tests { atomic::{AtomicBool, Ordering}, Barrier, }; - let pool = mock_pool(4, 4, 4, 0, 1); + let pool = mock_pool(4, 4, 0, 4, 0, 1); let ops_i_ran = Arc::new(AtomicBool::new(false)); let ops_i_ran_clone = Arc::clone(&ops_i_ran); let barrier = Arc::new(Barrier::new(2)); @@ -683,4 +723,28 @@ mod tests { // now wait on the second job to be actually finished barrier2.wait(); } + + #[test] + fn verify_metrics() { + use std::sync::Barrier; + let pool = mock_pool(4, 4, 2, 1, 0, 4); + let barrier = Arc::new(Barrier::new(5)); + for name in &["FOO", "BAZ", "FOO", "FOO"] { + let barrier_clone = Arc::clone(&barrier); + pool.spawn(name, move || { + barrier_clone.wait(); + }); + } + // now finish all jobs + barrier.wait(); + // in this case we have to sleep to give it some time to store all the metrics + std::thread::sleep(std::time::Duration::from_secs(2)); + let metrics = pool.take_metrics(); + let foo = metrics.get("FOO").expect("FOO doesn't exist in metrics"); + //its limited to 2, even though we had 3 jobs + assert_eq!(foo.len(), 2); + assert!(metrics.get("BAR").is_none()); + let baz = metrics.get("BAZ").expect("BAZ doesn't exist in metrics"); + assert_eq!(baz.len(), 1); + } } diff --git a/common/state/src/state.rs b/common/state/src/state.rs index 62ad8ec347..599d6013d4 100644 --- a/common/state/src/state.rs +++ b/common/state/src/state.rs @@ -216,7 +216,11 @@ impl State { let num_cpu = num_cpus::get() as u64; let slow_limit = (num_cpu / 2 + num_cpu / 4).max(1); tracing::trace!(?slow_limit, "Slow Thread limit"); - ecs.insert(SlowJobPool::new(slow_limit, Arc::clone(thread_pool))); + ecs.insert(SlowJobPool::new( + slow_limit, + 10_000, + Arc::clone(thread_pool), + )); // TODO: only register on the server ecs.insert(EventBus::::default()); diff --git a/server/src/lib.rs b/server/src/lib.rs index 2a24620d04..1603fd2a9c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -184,6 +184,7 @@ impl Server { let registry = Arc::new(Registry::new()); let chunk_gen_metrics = metrics::ChunkGenMetrics::new(®istry).unwrap(); + let job_metrics = metrics::JobMetrics::new(®istry).unwrap(); let network_request_metrics = metrics::NetworkRequestMetrics::new(®istry).unwrap(); let player_metrics = metrics::PlayerMetrics::new(®istry).unwrap(); let ecs_system_metrics = EcsSystemMetrics::new(®istry).unwrap(); @@ -207,6 +208,7 @@ impl Server { }); state.ecs_mut().insert(Tick(0)); state.ecs_mut().insert(TickStart(Instant::now())); + state.ecs_mut().insert(job_metrics); state.ecs_mut().insert(network_request_metrics); state.ecs_mut().insert(player_metrics); state.ecs_mut().insert(ecs_system_metrics); diff --git a/server/src/metrics.rs b/server/src/metrics.rs index f20107c74d..76c06d422c 100644 --- a/server/src/metrics.rs +++ b/server/src/metrics.rs @@ -44,6 +44,11 @@ pub struct ChunkGenMetrics { pub chunks_canceled: IntCounter, } +pub struct JobMetrics { + pub job_queried_hst: HistogramVec, + pub job_execution_hst: HistogramVec, +} + pub struct TickMetrics { pub chonks_count: IntGauge, pub chunks_count: IntGauge, @@ -241,6 +246,55 @@ impl ChunkGenMetrics { } } +impl JobMetrics { + pub fn new(registry: &Registry) -> Result { + let bucket = vec![ + Duration::from_micros(100).as_secs_f64(), + Duration::from_millis(2).as_secs_f64(), + Duration::from_millis(100).as_secs_f64(), + ]; + + let job_queried_hst = HistogramVec::new( + HistogramOpts::new( + "job_queried_hst", + "shows the detailed time each job name took from query till it started to execute \ + as histogram", + ) + .buckets(bucket), + &["name"], + )?; + + let bucket = vec![ + Duration::from_millis(5).as_secs_f64(), + Duration::from_millis(20).as_secs_f64(), + Duration::from_millis(50).as_secs_f64(), + Duration::from_millis(100).as_secs_f64(), + Duration::from_millis(200).as_secs_f64(), + Duration::from_millis(500).as_secs_f64(), + Duration::from_millis(1000).as_secs_f64(), + Duration::from_millis(10000).as_secs_f64(), + ]; + + let job_execution_hst = HistogramVec::new( + HistogramOpts::new( + "job_execution_hst", + "shows the detailed time each job name took from start of execution until it \ + finished as histogram", + ) + .buckets(bucket), + &["name"], + )?; + + registry.register(Box::new(job_queried_hst.clone()))?; + registry.register(Box::new(job_execution_hst.clone()))?; + + Ok(Self { + job_queried_hst, + job_execution_hst, + }) + } +} + impl TickMetrics { pub fn new(registry: &Registry) -> Result> { let chonks_count = IntGauge::with_opts(Opts::new( diff --git a/server/src/sys/metrics.rs b/server/src/sys/metrics.rs index d122adf8bf..17b0c84338 100644 --- a/server/src/sys/metrics.rs +++ b/server/src/sys/metrics.rs @@ -1,8 +1,8 @@ use crate::{ - metrics::{EcsSystemMetrics, PhysicsMetrics, TickMetrics}, + metrics::{EcsSystemMetrics, JobMetrics, PhysicsMetrics, TickMetrics}, HwStats, Tick, TickStart, }; -use common::{resources::TimeOfDay, terrain::TerrainGrid}; +use common::{resources::TimeOfDay, slowjob::SlowJobPool, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, SysMetrics, System}; use specs::{Entities, Join, Read, ReadExpect}; use std::time::Instant; @@ -21,9 +21,11 @@ impl<'a> System<'a> for Sys { Option>, Read<'a, SysMetrics>, Read<'a, common_ecs::PhysicsMetrics>, + ReadExpect<'a, SlowJobPool>, ReadExpect<'a, EcsSystemMetrics>, ReadExpect<'a, TickMetrics>, ReadExpect<'a, PhysicsMetrics>, + ReadExpect<'a, JobMetrics>, ); const NAME: &'static str = "metrics"; @@ -41,9 +43,11 @@ impl<'a> System<'a> for Sys { terrain, sys_metrics, phys_metrics, + slowjobpool, export_ecs, export_tick, export_physics, + export_jobs, ): Self::SystemData, ) { const NANOSEC_PER_SEC: f64 = std::time::Duration::from_secs(1).as_nanos() as f64; @@ -114,6 +118,24 @@ impl<'a> System<'a> for Sys { .entity_entity_collisions_count .inc_by(phys_metrics.entity_entity_collisions); + //detailed job metrics + for (name, jobs) in slowjobpool.take_metrics() { + let queried = export_jobs.job_queried_hst.with_label_values(&[&name]); + let executed = export_jobs.job_execution_hst.with_label_values(&[&name]); + for job in jobs { + queried.observe( + job.execution_start + .duration_since(job.queue_created) + .as_secs_f64(), + ); + executed.observe( + job.execution_end + .duration_since(job.execution_start) + .as_secs_f64(), + ); + } + } + // export self time as best as possible export_ecs .system_start_time