From 7d93d907f67ad3070498ee59762c84c23499deb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Tue, 20 Jul 2021 19:40:29 +0200 Subject: [PATCH 1/2] implement slowjob metrics to measuere time in queue and execution time --- common/src/slowjob.rs | 114 ++++++++++++++++++++++++++++++-------- common/state/src/state.rs | 6 +- server/src/lib.rs | 2 + server/src/metrics.rs | 54 ++++++++++++++++++ server/src/sys/metrics.rs | 26 ++++++++- 5 files changed, 175 insertions(+), 27 deletions(-) diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs index 445d4f7d1c..b66b08aab0 100644 --- a/common/src/slowjob.rs +++ b/common/src/slowjob.rs @@ -3,6 +3,7 @@ use rayon::ThreadPool; use std::{ collections::VecDeque, sync::{Arc, Mutex}, + time::Instant, }; use tracing::{error, warn}; @@ -39,7 +40,7 @@ use tracing::{error, warn}; /// .num_threads(16) /// .build() /// .unwrap(); -/// let pool = SlowJobPool::new(3, Arc::new(threadpool)); +/// let pool = SlowJobPool::new(3, 10, Arc::new(threadpool)); /// pool.configure("CHUNK_GENERATOR", |n| n / 2); /// pool.spawn("CHUNK_GENERATOR", move || println!("this is a job")); /// ``` @@ -61,6 +62,8 @@ struct InternalSlowJobPool { last_spawned_configs: Vec, global_spawned_and_running: u64, global_limit: u64, + jobs_metrics_cnt: usize, + jobs_metrics: HashMap>, threadpool: Arc, internal: Option>>, } @@ -74,9 +77,16 @@ struct Config { struct Queue { id: u64, name: String, + _queue_created: Instant, task: Box, } +pub struct JobMetrics { + pub queue_created: Instant, + pub execution_start: Instant, + pub execution_end: Instant, +} + impl Queue { fn new(name: &str, id: u64, internal: &Arc>, f: F) -> Self where @@ -84,16 +94,25 @@ impl Queue { { let internal = Arc::clone(internal); let name_cloned = name.to_owned(); + let queue_created = Instant::now(); Self { id, name: name.to_owned(), + _queue_created: queue_created, task: Box::new(move || { common_base::prof_span!(_guard, &name_cloned); + let execution_start = Instant::now(); f(); + let execution_end = Instant::now(); + let metrics = JobMetrics { + queue_created, + execution_start, + execution_end, + }; // directly maintain the next task afterwards { let mut lock = internal.lock().expect("slowjob lock poisoned"); - lock.finish(&name_cloned); + lock.finish(&name_cloned, metrics); lock.spawn_queued(); } }), @@ -102,7 +121,11 @@ impl Queue { } impl InternalSlowJobPool { - pub fn new(global_limit: u64, threadpool: Arc) -> Arc> { + pub fn new( + global_limit: u64, + jobs_metrics_cnt: usize, + threadpool: Arc, + ) -> Arc> { let link = Arc::new(Mutex::new(Self { next_id: 0, queue: HashMap::new(), @@ -110,6 +133,8 @@ impl InternalSlowJobPool { last_spawned_configs: Vec::new(), global_spawned_and_running: 0, global_limit: global_limit.max(1), + jobs_metrics_cnt, + jobs_metrics: HashMap::new(), threadpool, internal: None, })); @@ -247,7 +272,12 @@ impl InternalSlowJobPool { } } - fn finish(&mut self, name: &str) { + fn finish(&mut self, name: &str, metrics: JobMetrics) { + let metric = self.jobs_metrics.entry(name.to_string()).or_default(); + + if metric.len() < self.jobs_metrics_cnt { + metric.push(metrics); + } self.global_spawned_and_running -= 1; if let Some(c) = self.configs.get_mut(name) { c.local_spawned_and_running -= 1; @@ -293,12 +323,16 @@ impl InternalSlowJobPool { } } } + + pub fn take_metrics(&mut self) -> HashMap> { + std::mem::replace(&mut self.jobs_metrics, HashMap::new()) + } } impl SlowJobPool { - pub fn new(global_limit: u64, threadpool: Arc) -> Self { + pub fn new(global_limit: u64, jobs_metrics_cnt: usize, threadpool: Arc) -> Self { Self { - internal: InternalSlowJobPool::new(global_limit, threadpool), + internal: InternalSlowJobPool::new(global_limit, jobs_metrics_cnt, threadpool), } } @@ -356,6 +390,13 @@ impl SlowJobPool { } Err(job) } + + pub fn take_metrics(&self) -> HashMap> { + self.internal + .lock() + .expect("lock poisoned while take_metrics") + .take_metrics() + } } #[cfg(test)] @@ -366,6 +407,7 @@ mod tests { fn mock_pool( pool_threads: usize, global_threads: u64, + metrics: usize, foo: u64, bar: u64, baz: u64, @@ -374,7 +416,7 @@ mod tests { .num_threads(pool_threads) .build() .unwrap(); - let pool = SlowJobPool::new(global_threads, Arc::new(threadpool)); + let pool = SlowJobPool::new(global_threads, metrics, Arc::new(threadpool)); if foo != 0 { pool.configure("FOO", |x| x / foo); } @@ -389,7 +431,7 @@ mod tests { #[test] fn simple_queue() { - let pool = mock_pool(4, 4, 1, 0, 0); + let pool = mock_pool(4, 4, 0, 1, 0, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 1u64)] .iter() @@ -406,7 +448,7 @@ mod tests { #[test] fn multiple_queue() { - let pool = mock_pool(4, 4, 1, 0, 0); + let pool = mock_pool(4, 4, 0, 1, 0, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 2u64)] .iter() @@ -424,7 +466,7 @@ mod tests { #[test] fn limit_queue() { - let pool = mock_pool(5, 5, 1, 0, 0); + let pool = mock_pool(5, 5, 0, 1, 0, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 80u64)] .iter() @@ -444,7 +486,7 @@ mod tests { #[test] fn simple_queue_2() { - let pool = mock_pool(4, 4, 1, 1, 0); + let pool = mock_pool(4, 4, 0, 1, 1, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 1u64), ("BAR", 1u64)] .iter() @@ -462,7 +504,7 @@ mod tests { #[test] fn multiple_queue_3() { - let pool = mock_pool(4, 4, 1, 1, 0); + let pool = mock_pool(4, 4, 0, 1, 1, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 2u64), ("BAR", 2u64)] .iter() @@ -480,7 +522,7 @@ mod tests { #[test] fn multiple_queue_4() { - let pool = mock_pool(4, 4, 2, 1, 0); + let pool = mock_pool(4, 4, 0, 2, 1, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 3u64), ("BAR", 3u64)] .iter() @@ -498,7 +540,7 @@ mod tests { #[test] fn multiple_queue_5() { - let pool = mock_pool(4, 4, 2, 1, 0); + let pool = mock_pool(4, 4, 0, 2, 1, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 5u64), ("BAR", 5u64)] .iter() @@ -516,7 +558,7 @@ mod tests { #[test] fn multiple_queue_6() { - let pool = mock_pool(40, 40, 2, 1, 0); + let pool = mock_pool(40, 40, 0, 2, 1, 0); let internal = pool.internal.lock().unwrap(); let queue_data = [("FOO", 5u64), ("BAR", 5u64)] .iter() @@ -534,7 +576,7 @@ mod tests { #[test] fn roundrobin() { - let pool = mock_pool(4, 4, 2, 2, 0); + let pool = mock_pool(4, 4, 0, 2, 2, 0); let queue_data = [("FOO", 5u64), ("BAR", 5u64)] .iter() .map(|(n, c)| ((*n).to_owned(), *c)) @@ -583,14 +625,14 @@ mod tests { #[test] #[should_panic] fn unconfigured() { - let pool = mock_pool(4, 4, 2, 1, 0); + let pool = mock_pool(4, 4, 0, 2, 1, 0); let mut internal = pool.internal.lock().unwrap(); internal.spawn("UNCONFIGURED", || println!()); } #[test] fn correct_spawn_doesnt_panic() { - let pool = mock_pool(4, 4, 2, 1, 0); + let pool = mock_pool(4, 4, 0, 2, 1, 0); let mut internal = pool.internal.lock().unwrap(); internal.spawn("FOO", || println!("foo")); internal.spawn("BAR", || println!("bar")); @@ -598,7 +640,7 @@ mod tests { #[test] fn can_spawn() { - let pool = mock_pool(4, 4, 2, 1, 0); + let pool = mock_pool(4, 4, 0, 2, 1, 0); let internal = pool.internal.lock().unwrap(); assert!(internal.can_spawn("FOO")); assert!(internal.can_spawn("BAR")); @@ -606,7 +648,7 @@ mod tests { #[test] fn try_run_works() { - let pool = mock_pool(4, 4, 2, 1, 0); + let pool = mock_pool(4, 4, 0, 2, 1, 0); pool.try_run("FOO", || println!("foo")).unwrap(); pool.try_run("BAR", || println!("bar")).unwrap(); } @@ -614,7 +656,7 @@ mod tests { #[test] fn try_run_exhausted() { use std::{thread::sleep, time::Duration}; - let pool = mock_pool(8, 8, 4, 2, 0); + let pool = mock_pool(8, 8, 0, 4, 2, 0); let func = || loop { sleep(Duration::from_secs(1)) }; @@ -633,7 +675,7 @@ mod tests { #[test] fn actually_runs_1() { - let pool = mock_pool(4, 4, 0, 0, 1); + let pool = mock_pool(4, 4, 0, 0, 0, 1); let barrier = Arc::new(std::sync::Barrier::new(2)); let barrier_clone = Arc::clone(&barrier); pool.try_run("BAZ", move || { @@ -645,7 +687,7 @@ mod tests { #[test] fn actually_runs_2() { - let pool = mock_pool(4, 4, 0, 0, 1); + let pool = mock_pool(4, 4, 0, 0, 0, 1); let barrier = Arc::new(std::sync::Barrier::new(2)); let barrier_clone = Arc::clone(&barrier); pool.spawn("BAZ", move || { @@ -660,7 +702,7 @@ mod tests { atomic::{AtomicBool, Ordering}, Barrier, }; - let pool = mock_pool(4, 4, 4, 0, 1); + let pool = mock_pool(4, 4, 0, 4, 0, 1); let ops_i_ran = Arc::new(AtomicBool::new(false)); let ops_i_ran_clone = Arc::clone(&ops_i_ran); let barrier = Arc::new(Barrier::new(2)); @@ -683,4 +725,28 @@ mod tests { // now wait on the second job to be actually finished barrier2.wait(); } + + #[test] + fn verify_metrics() { + use std::sync::Barrier; + let pool = mock_pool(4, 4, 2, 1, 0, 4); + let barrier = Arc::new(Barrier::new(5)); + for name in &["FOO", "BAZ", "FOO", "FOO"] { + let barrier_clone = Arc::clone(&barrier); + pool.spawn(name, move || { + barrier_clone.wait(); + }); + } + // now finish all jobs + barrier.wait(); + // in this case we have to sleep to give it some time to store all the metrics + std::thread::sleep(std::time::Duration::from_secs(2)); + let metrics = pool.take_metrics(); + let foo = metrics.get("FOO").expect("FOO doesn't exist in metrics"); + //its limited to 2, even though we had 3 jobs + assert_eq!(foo.len(), 2); + assert!(metrics.get("BAR").is_none()); + let baz = metrics.get("BAZ").expect("BAZ doesn't exist in metrics"); + assert_eq!(baz.len(), 1); + } } diff --git a/common/state/src/state.rs b/common/state/src/state.rs index 62ad8ec347..599d6013d4 100644 --- a/common/state/src/state.rs +++ b/common/state/src/state.rs @@ -216,7 +216,11 @@ impl State { let num_cpu = num_cpus::get() as u64; let slow_limit = (num_cpu / 2 + num_cpu / 4).max(1); tracing::trace!(?slow_limit, "Slow Thread limit"); - ecs.insert(SlowJobPool::new(slow_limit, Arc::clone(thread_pool))); + ecs.insert(SlowJobPool::new( + slow_limit, + 10_000, + Arc::clone(thread_pool), + )); // TODO: only register on the server ecs.insert(EventBus::::default()); diff --git a/server/src/lib.rs b/server/src/lib.rs index 2a24620d04..1603fd2a9c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -184,6 +184,7 @@ impl Server { let registry = Arc::new(Registry::new()); let chunk_gen_metrics = metrics::ChunkGenMetrics::new(®istry).unwrap(); + let job_metrics = metrics::JobMetrics::new(®istry).unwrap(); let network_request_metrics = metrics::NetworkRequestMetrics::new(®istry).unwrap(); let player_metrics = metrics::PlayerMetrics::new(®istry).unwrap(); let ecs_system_metrics = EcsSystemMetrics::new(®istry).unwrap(); @@ -207,6 +208,7 @@ impl Server { }); state.ecs_mut().insert(Tick(0)); state.ecs_mut().insert(TickStart(Instant::now())); + state.ecs_mut().insert(job_metrics); state.ecs_mut().insert(network_request_metrics); state.ecs_mut().insert(player_metrics); state.ecs_mut().insert(ecs_system_metrics); diff --git a/server/src/metrics.rs b/server/src/metrics.rs index f20107c74d..76c06d422c 100644 --- a/server/src/metrics.rs +++ b/server/src/metrics.rs @@ -44,6 +44,11 @@ pub struct ChunkGenMetrics { pub chunks_canceled: IntCounter, } +pub struct JobMetrics { + pub job_queried_hst: HistogramVec, + pub job_execution_hst: HistogramVec, +} + pub struct TickMetrics { pub chonks_count: IntGauge, pub chunks_count: IntGauge, @@ -241,6 +246,55 @@ impl ChunkGenMetrics { } } +impl JobMetrics { + pub fn new(registry: &Registry) -> Result { + let bucket = vec![ + Duration::from_micros(100).as_secs_f64(), + Duration::from_millis(2).as_secs_f64(), + Duration::from_millis(100).as_secs_f64(), + ]; + + let job_queried_hst = HistogramVec::new( + HistogramOpts::new( + "job_queried_hst", + "shows the detailed time each job name took from query till it started to execute \ + as histogram", + ) + .buckets(bucket), + &["name"], + )?; + + let bucket = vec![ + Duration::from_millis(5).as_secs_f64(), + Duration::from_millis(20).as_secs_f64(), + Duration::from_millis(50).as_secs_f64(), + Duration::from_millis(100).as_secs_f64(), + Duration::from_millis(200).as_secs_f64(), + Duration::from_millis(500).as_secs_f64(), + Duration::from_millis(1000).as_secs_f64(), + Duration::from_millis(10000).as_secs_f64(), + ]; + + let job_execution_hst = HistogramVec::new( + HistogramOpts::new( + "job_execution_hst", + "shows the detailed time each job name took from start of execution until it \ + finished as histogram", + ) + .buckets(bucket), + &["name"], + )?; + + registry.register(Box::new(job_queried_hst.clone()))?; + registry.register(Box::new(job_execution_hst.clone()))?; + + Ok(Self { + job_queried_hst, + job_execution_hst, + }) + } +} + impl TickMetrics { pub fn new(registry: &Registry) -> Result> { let chonks_count = IntGauge::with_opts(Opts::new( diff --git a/server/src/sys/metrics.rs b/server/src/sys/metrics.rs index d122adf8bf..17b0c84338 100644 --- a/server/src/sys/metrics.rs +++ b/server/src/sys/metrics.rs @@ -1,8 +1,8 @@ use crate::{ - metrics::{EcsSystemMetrics, PhysicsMetrics, TickMetrics}, + metrics::{EcsSystemMetrics, JobMetrics, PhysicsMetrics, TickMetrics}, HwStats, Tick, TickStart, }; -use common::{resources::TimeOfDay, terrain::TerrainGrid}; +use common::{resources::TimeOfDay, slowjob::SlowJobPool, terrain::TerrainGrid}; use common_ecs::{Job, Origin, Phase, SysMetrics, System}; use specs::{Entities, Join, Read, ReadExpect}; use std::time::Instant; @@ -21,9 +21,11 @@ impl<'a> System<'a> for Sys { Option>, Read<'a, SysMetrics>, Read<'a, common_ecs::PhysicsMetrics>, + ReadExpect<'a, SlowJobPool>, ReadExpect<'a, EcsSystemMetrics>, ReadExpect<'a, TickMetrics>, ReadExpect<'a, PhysicsMetrics>, + ReadExpect<'a, JobMetrics>, ); const NAME: &'static str = "metrics"; @@ -41,9 +43,11 @@ impl<'a> System<'a> for Sys { terrain, sys_metrics, phys_metrics, + slowjobpool, export_ecs, export_tick, export_physics, + export_jobs, ): Self::SystemData, ) { const NANOSEC_PER_SEC: f64 = std::time::Duration::from_secs(1).as_nanos() as f64; @@ -114,6 +118,24 @@ impl<'a> System<'a> for Sys { .entity_entity_collisions_count .inc_by(phys_metrics.entity_entity_collisions); + //detailed job metrics + for (name, jobs) in slowjobpool.take_metrics() { + let queried = export_jobs.job_queried_hst.with_label_values(&[&name]); + let executed = export_jobs.job_execution_hst.with_label_values(&[&name]); + for job in jobs { + queried.observe( + job.execution_start + .duration_since(job.queue_created) + .as_secs_f64(), + ); + executed.observe( + job.execution_end + .duration_since(job.execution_start) + .as_secs_f64(), + ); + } + } + // export self time as best as possible export_ecs .system_start_time From baf19b4d6006533df29aba3daa72c4628aab30ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Thu, 22 Jul 2021 20:57:16 +0200 Subject: [PATCH 2/2] remove an unused queue_created in the Queue, might be added in the future again for more metrics. Also use core than std in one place --- common/src/slowjob.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/common/src/slowjob.rs b/common/src/slowjob.rs index b66b08aab0..1251aae4b5 100644 --- a/common/src/slowjob.rs +++ b/common/src/slowjob.rs @@ -77,7 +77,6 @@ struct Config { struct Queue { id: u64, name: String, - _queue_created: Instant, task: Box, } @@ -98,7 +97,6 @@ impl Queue { Self { id, name: name.to_owned(), - _queue_created: queue_created, task: Box::new(move || { common_base::prof_span!(_guard, &name_cloned); let execution_start = Instant::now(); @@ -325,7 +323,7 @@ impl InternalSlowJobPool { } pub fn take_metrics(&mut self) -> HashMap> { - std::mem::replace(&mut self.jobs_metrics, HashMap::new()) + core::mem::take(&mut self.jobs_metrics) } }