Merge branch 'xMAC94x/job_metrics' into 'master'

implement slowjob metrics to measuere time in queue and execution time

See merge request veloren/veloren!2654
This commit is contained in:
Marcel 2021-07-22 19:17:30 +00:00
commit e9f6ae6b37
5 changed files with 173 additions and 27 deletions

View File

@ -3,6 +3,7 @@ use rayon::ThreadPool;
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
time::Instant,
}; };
use tracing::{error, warn}; use tracing::{error, warn};
@ -39,7 +40,7 @@ use tracing::{error, warn};
/// .num_threads(16) /// .num_threads(16)
/// .build() /// .build()
/// .unwrap(); /// .unwrap();
/// let pool = SlowJobPool::new(3, Arc::new(threadpool)); /// let pool = SlowJobPool::new(3, 10, Arc::new(threadpool));
/// pool.configure("CHUNK_GENERATOR", |n| n / 2); /// pool.configure("CHUNK_GENERATOR", |n| n / 2);
/// pool.spawn("CHUNK_GENERATOR", move || println!("this is a job")); /// pool.spawn("CHUNK_GENERATOR", move || println!("this is a job"));
/// ``` /// ```
@ -61,6 +62,8 @@ struct InternalSlowJobPool {
last_spawned_configs: Vec<String>, last_spawned_configs: Vec<String>,
global_spawned_and_running: u64, global_spawned_and_running: u64,
global_limit: u64, global_limit: u64,
jobs_metrics_cnt: usize,
jobs_metrics: HashMap<String, Vec<JobMetrics>>,
threadpool: Arc<ThreadPool>, threadpool: Arc<ThreadPool>,
internal: Option<Arc<Mutex<Self>>>, internal: Option<Arc<Mutex<Self>>>,
} }
@ -77,6 +80,12 @@ struct Queue {
task: Box<dyn FnOnce() + Send + Sync + 'static>, task: Box<dyn FnOnce() + Send + Sync + 'static>,
} }
pub struct JobMetrics {
pub queue_created: Instant,
pub execution_start: Instant,
pub execution_end: Instant,
}
impl Queue { impl Queue {
fn new<F>(name: &str, id: u64, internal: &Arc<Mutex<InternalSlowJobPool>>, f: F) -> Self fn new<F>(name: &str, id: u64, internal: &Arc<Mutex<InternalSlowJobPool>>, f: F) -> Self
where where
@ -84,16 +93,24 @@ impl Queue {
{ {
let internal = Arc::clone(internal); let internal = Arc::clone(internal);
let name_cloned = name.to_owned(); let name_cloned = name.to_owned();
let queue_created = Instant::now();
Self { Self {
id, id,
name: name.to_owned(), name: name.to_owned(),
task: Box::new(move || { task: Box::new(move || {
common_base::prof_span!(_guard, &name_cloned); common_base::prof_span!(_guard, &name_cloned);
let execution_start = Instant::now();
f(); f();
let execution_end = Instant::now();
let metrics = JobMetrics {
queue_created,
execution_start,
execution_end,
};
// directly maintain the next task afterwards // directly maintain the next task afterwards
{ {
let mut lock = internal.lock().expect("slowjob lock poisoned"); let mut lock = internal.lock().expect("slowjob lock poisoned");
lock.finish(&name_cloned); lock.finish(&name_cloned, metrics);
lock.spawn_queued(); lock.spawn_queued();
} }
}), }),
@ -102,7 +119,11 @@ impl Queue {
} }
impl InternalSlowJobPool { impl InternalSlowJobPool {
pub fn new(global_limit: u64, threadpool: Arc<ThreadPool>) -> Arc<Mutex<Self>> { pub fn new(
global_limit: u64,
jobs_metrics_cnt: usize,
threadpool: Arc<ThreadPool>,
) -> Arc<Mutex<Self>> {
let link = Arc::new(Mutex::new(Self { let link = Arc::new(Mutex::new(Self {
next_id: 0, next_id: 0,
queue: HashMap::new(), queue: HashMap::new(),
@ -110,6 +131,8 @@ impl InternalSlowJobPool {
last_spawned_configs: Vec::new(), last_spawned_configs: Vec::new(),
global_spawned_and_running: 0, global_spawned_and_running: 0,
global_limit: global_limit.max(1), global_limit: global_limit.max(1),
jobs_metrics_cnt,
jobs_metrics: HashMap::new(),
threadpool, threadpool,
internal: None, internal: None,
})); }));
@ -247,7 +270,12 @@ impl InternalSlowJobPool {
} }
} }
fn finish(&mut self, name: &str) { fn finish(&mut self, name: &str, metrics: JobMetrics) {
let metric = self.jobs_metrics.entry(name.to_string()).or_default();
if metric.len() < self.jobs_metrics_cnt {
metric.push(metrics);
}
self.global_spawned_and_running -= 1; self.global_spawned_and_running -= 1;
if let Some(c) = self.configs.get_mut(name) { if let Some(c) = self.configs.get_mut(name) {
c.local_spawned_and_running -= 1; c.local_spawned_and_running -= 1;
@ -293,12 +321,16 @@ impl InternalSlowJobPool {
} }
} }
} }
pub fn take_metrics(&mut self) -> HashMap<String, Vec<JobMetrics>> {
core::mem::take(&mut self.jobs_metrics)
}
} }
impl SlowJobPool { impl SlowJobPool {
pub fn new(global_limit: u64, threadpool: Arc<ThreadPool>) -> Self { pub fn new(global_limit: u64, jobs_metrics_cnt: usize, threadpool: Arc<ThreadPool>) -> Self {
Self { Self {
internal: InternalSlowJobPool::new(global_limit, threadpool), internal: InternalSlowJobPool::new(global_limit, jobs_metrics_cnt, threadpool),
} }
} }
@ -356,6 +388,13 @@ impl SlowJobPool {
} }
Err(job) Err(job)
} }
pub fn take_metrics(&self) -> HashMap<String, Vec<JobMetrics>> {
self.internal
.lock()
.expect("lock poisoned while take_metrics")
.take_metrics()
}
} }
#[cfg(test)] #[cfg(test)]
@ -366,6 +405,7 @@ mod tests {
fn mock_pool( fn mock_pool(
pool_threads: usize, pool_threads: usize,
global_threads: u64, global_threads: u64,
metrics: usize,
foo: u64, foo: u64,
bar: u64, bar: u64,
baz: u64, baz: u64,
@ -374,7 +414,7 @@ mod tests {
.num_threads(pool_threads) .num_threads(pool_threads)
.build() .build()
.unwrap(); .unwrap();
let pool = SlowJobPool::new(global_threads, Arc::new(threadpool)); let pool = SlowJobPool::new(global_threads, metrics, Arc::new(threadpool));
if foo != 0 { if foo != 0 {
pool.configure("FOO", |x| x / foo); pool.configure("FOO", |x| x / foo);
} }
@ -389,7 +429,7 @@ mod tests {
#[test] #[test]
fn simple_queue() { fn simple_queue() {
let pool = mock_pool(4, 4, 1, 0, 0); let pool = mock_pool(4, 4, 0, 1, 0, 0);
let internal = pool.internal.lock().unwrap(); let internal = pool.internal.lock().unwrap();
let queue_data = [("FOO", 1u64)] let queue_data = [("FOO", 1u64)]
.iter() .iter()
@ -406,7 +446,7 @@ mod tests {
#[test] #[test]
fn multiple_queue() { fn multiple_queue() {
let pool = mock_pool(4, 4, 1, 0, 0); let pool = mock_pool(4, 4, 0, 1, 0, 0);
let internal = pool.internal.lock().unwrap(); let internal = pool.internal.lock().unwrap();
let queue_data = [("FOO", 2u64)] let queue_data = [("FOO", 2u64)]
.iter() .iter()
@ -424,7 +464,7 @@ mod tests {
#[test] #[test]
fn limit_queue() { fn limit_queue() {
let pool = mock_pool(5, 5, 1, 0, 0); let pool = mock_pool(5, 5, 0, 1, 0, 0);
let internal = pool.internal.lock().unwrap(); let internal = pool.internal.lock().unwrap();
let queue_data = [("FOO", 80u64)] let queue_data = [("FOO", 80u64)]
.iter() .iter()
@ -444,7 +484,7 @@ mod tests {
#[test] #[test]
fn simple_queue_2() { fn simple_queue_2() {
let pool = mock_pool(4, 4, 1, 1, 0); let pool = mock_pool(4, 4, 0, 1, 1, 0);
let internal = pool.internal.lock().unwrap(); let internal = pool.internal.lock().unwrap();
let queue_data = [("FOO", 1u64), ("BAR", 1u64)] let queue_data = [("FOO", 1u64), ("BAR", 1u64)]
.iter() .iter()
@ -462,7 +502,7 @@ mod tests {
#[test] #[test]
fn multiple_queue_3() { fn multiple_queue_3() {
let pool = mock_pool(4, 4, 1, 1, 0); let pool = mock_pool(4, 4, 0, 1, 1, 0);
let internal = pool.internal.lock().unwrap(); let internal = pool.internal.lock().unwrap();
let queue_data = [("FOO", 2u64), ("BAR", 2u64)] let queue_data = [("FOO", 2u64), ("BAR", 2u64)]
.iter() .iter()
@ -480,7 +520,7 @@ mod tests {
#[test] #[test]
fn multiple_queue_4() { fn multiple_queue_4() {
let pool = mock_pool(4, 4, 2, 1, 0); let pool = mock_pool(4, 4, 0, 2, 1, 0);
let internal = pool.internal.lock().unwrap(); let internal = pool.internal.lock().unwrap();
let queue_data = [("FOO", 3u64), ("BAR", 3u64)] let queue_data = [("FOO", 3u64), ("BAR", 3u64)]
.iter() .iter()
@ -498,7 +538,7 @@ mod tests {
#[test] #[test]
fn multiple_queue_5() { fn multiple_queue_5() {
let pool = mock_pool(4, 4, 2, 1, 0); let pool = mock_pool(4, 4, 0, 2, 1, 0);
let internal = pool.internal.lock().unwrap(); let internal = pool.internal.lock().unwrap();
let queue_data = [("FOO", 5u64), ("BAR", 5u64)] let queue_data = [("FOO", 5u64), ("BAR", 5u64)]
.iter() .iter()
@ -516,7 +556,7 @@ mod tests {
#[test] #[test]
fn multiple_queue_6() { fn multiple_queue_6() {
let pool = mock_pool(40, 40, 2, 1, 0); let pool = mock_pool(40, 40, 0, 2, 1, 0);
let internal = pool.internal.lock().unwrap(); let internal = pool.internal.lock().unwrap();
let queue_data = [("FOO", 5u64), ("BAR", 5u64)] let queue_data = [("FOO", 5u64), ("BAR", 5u64)]
.iter() .iter()
@ -534,7 +574,7 @@ mod tests {
#[test] #[test]
fn roundrobin() { fn roundrobin() {
let pool = mock_pool(4, 4, 2, 2, 0); let pool = mock_pool(4, 4, 0, 2, 2, 0);
let queue_data = [("FOO", 5u64), ("BAR", 5u64)] let queue_data = [("FOO", 5u64), ("BAR", 5u64)]
.iter() .iter()
.map(|(n, c)| ((*n).to_owned(), *c)) .map(|(n, c)| ((*n).to_owned(), *c))
@ -583,14 +623,14 @@ mod tests {
#[test] #[test]
#[should_panic] #[should_panic]
fn unconfigured() { fn unconfigured() {
let pool = mock_pool(4, 4, 2, 1, 0); let pool = mock_pool(4, 4, 0, 2, 1, 0);
let mut internal = pool.internal.lock().unwrap(); let mut internal = pool.internal.lock().unwrap();
internal.spawn("UNCONFIGURED", || println!()); internal.spawn("UNCONFIGURED", || println!());
} }
#[test] #[test]
fn correct_spawn_doesnt_panic() { fn correct_spawn_doesnt_panic() {
let pool = mock_pool(4, 4, 2, 1, 0); let pool = mock_pool(4, 4, 0, 2, 1, 0);
let mut internal = pool.internal.lock().unwrap(); let mut internal = pool.internal.lock().unwrap();
internal.spawn("FOO", || println!("foo")); internal.spawn("FOO", || println!("foo"));
internal.spawn("BAR", || println!("bar")); internal.spawn("BAR", || println!("bar"));
@ -598,7 +638,7 @@ mod tests {
#[test] #[test]
fn can_spawn() { fn can_spawn() {
let pool = mock_pool(4, 4, 2, 1, 0); let pool = mock_pool(4, 4, 0, 2, 1, 0);
let internal = pool.internal.lock().unwrap(); let internal = pool.internal.lock().unwrap();
assert!(internal.can_spawn("FOO")); assert!(internal.can_spawn("FOO"));
assert!(internal.can_spawn("BAR")); assert!(internal.can_spawn("BAR"));
@ -606,7 +646,7 @@ mod tests {
#[test] #[test]
fn try_run_works() { fn try_run_works() {
let pool = mock_pool(4, 4, 2, 1, 0); let pool = mock_pool(4, 4, 0, 2, 1, 0);
pool.try_run("FOO", || println!("foo")).unwrap(); pool.try_run("FOO", || println!("foo")).unwrap();
pool.try_run("BAR", || println!("bar")).unwrap(); pool.try_run("BAR", || println!("bar")).unwrap();
} }
@ -614,7 +654,7 @@ mod tests {
#[test] #[test]
fn try_run_exhausted() { fn try_run_exhausted() {
use std::{thread::sleep, time::Duration}; use std::{thread::sleep, time::Duration};
let pool = mock_pool(8, 8, 4, 2, 0); let pool = mock_pool(8, 8, 0, 4, 2, 0);
let func = || loop { let func = || loop {
sleep(Duration::from_secs(1)) sleep(Duration::from_secs(1))
}; };
@ -633,7 +673,7 @@ mod tests {
#[test] #[test]
fn actually_runs_1() { fn actually_runs_1() {
let pool = mock_pool(4, 4, 0, 0, 1); let pool = mock_pool(4, 4, 0, 0, 0, 1);
let barrier = Arc::new(std::sync::Barrier::new(2)); let barrier = Arc::new(std::sync::Barrier::new(2));
let barrier_clone = Arc::clone(&barrier); let barrier_clone = Arc::clone(&barrier);
pool.try_run("BAZ", move || { pool.try_run("BAZ", move || {
@ -645,7 +685,7 @@ mod tests {
#[test] #[test]
fn actually_runs_2() { fn actually_runs_2() {
let pool = mock_pool(4, 4, 0, 0, 1); let pool = mock_pool(4, 4, 0, 0, 0, 1);
let barrier = Arc::new(std::sync::Barrier::new(2)); let barrier = Arc::new(std::sync::Barrier::new(2));
let barrier_clone = Arc::clone(&barrier); let barrier_clone = Arc::clone(&barrier);
pool.spawn("BAZ", move || { pool.spawn("BAZ", move || {
@ -660,7 +700,7 @@ mod tests {
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Barrier, Barrier,
}; };
let pool = mock_pool(4, 4, 4, 0, 1); let pool = mock_pool(4, 4, 0, 4, 0, 1);
let ops_i_ran = Arc::new(AtomicBool::new(false)); let ops_i_ran = Arc::new(AtomicBool::new(false));
let ops_i_ran_clone = Arc::clone(&ops_i_ran); let ops_i_ran_clone = Arc::clone(&ops_i_ran);
let barrier = Arc::new(Barrier::new(2)); let barrier = Arc::new(Barrier::new(2));
@ -683,4 +723,28 @@ mod tests {
// now wait on the second job to be actually finished // now wait on the second job to be actually finished
barrier2.wait(); barrier2.wait();
} }
#[test]
fn verify_metrics() {
use std::sync::Barrier;
let pool = mock_pool(4, 4, 2, 1, 0, 4);
let barrier = Arc::new(Barrier::new(5));
for name in &["FOO", "BAZ", "FOO", "FOO"] {
let barrier_clone = Arc::clone(&barrier);
pool.spawn(name, move || {
barrier_clone.wait();
});
}
// now finish all jobs
barrier.wait();
// in this case we have to sleep to give it some time to store all the metrics
std::thread::sleep(std::time::Duration::from_secs(2));
let metrics = pool.take_metrics();
let foo = metrics.get("FOO").expect("FOO doesn't exist in metrics");
//its limited to 2, even though we had 3 jobs
assert_eq!(foo.len(), 2);
assert!(metrics.get("BAR").is_none());
let baz = metrics.get("BAZ").expect("BAZ doesn't exist in metrics");
assert_eq!(baz.len(), 1);
}
} }

View File

@ -216,7 +216,11 @@ impl State {
let num_cpu = num_cpus::get() as u64; let num_cpu = num_cpus::get() as u64;
let slow_limit = (num_cpu / 2 + num_cpu / 4).max(1); let slow_limit = (num_cpu / 2 + num_cpu / 4).max(1);
tracing::trace!(?slow_limit, "Slow Thread limit"); tracing::trace!(?slow_limit, "Slow Thread limit");
ecs.insert(SlowJobPool::new(slow_limit, Arc::clone(thread_pool))); ecs.insert(SlowJobPool::new(
slow_limit,
10_000,
Arc::clone(thread_pool),
));
// TODO: only register on the server // TODO: only register on the server
ecs.insert(EventBus::<ServerEvent>::default()); ecs.insert(EventBus::<ServerEvent>::default());

View File

@ -184,6 +184,7 @@ impl Server {
let registry = Arc::new(Registry::new()); let registry = Arc::new(Registry::new());
let chunk_gen_metrics = metrics::ChunkGenMetrics::new(&registry).unwrap(); let chunk_gen_metrics = metrics::ChunkGenMetrics::new(&registry).unwrap();
let job_metrics = metrics::JobMetrics::new(&registry).unwrap();
let network_request_metrics = metrics::NetworkRequestMetrics::new(&registry).unwrap(); let network_request_metrics = metrics::NetworkRequestMetrics::new(&registry).unwrap();
let player_metrics = metrics::PlayerMetrics::new(&registry).unwrap(); let player_metrics = metrics::PlayerMetrics::new(&registry).unwrap();
let ecs_system_metrics = EcsSystemMetrics::new(&registry).unwrap(); let ecs_system_metrics = EcsSystemMetrics::new(&registry).unwrap();
@ -207,6 +208,7 @@ impl Server {
}); });
state.ecs_mut().insert(Tick(0)); state.ecs_mut().insert(Tick(0));
state.ecs_mut().insert(TickStart(Instant::now())); state.ecs_mut().insert(TickStart(Instant::now()));
state.ecs_mut().insert(job_metrics);
state.ecs_mut().insert(network_request_metrics); state.ecs_mut().insert(network_request_metrics);
state.ecs_mut().insert(player_metrics); state.ecs_mut().insert(player_metrics);
state.ecs_mut().insert(ecs_system_metrics); state.ecs_mut().insert(ecs_system_metrics);

View File

@ -44,6 +44,11 @@ pub struct ChunkGenMetrics {
pub chunks_canceled: IntCounter, pub chunks_canceled: IntCounter,
} }
pub struct JobMetrics {
pub job_queried_hst: HistogramVec,
pub job_execution_hst: HistogramVec,
}
pub struct TickMetrics { pub struct TickMetrics {
pub chonks_count: IntGauge, pub chonks_count: IntGauge,
pub chunks_count: IntGauge, pub chunks_count: IntGauge,
@ -241,6 +246,55 @@ impl ChunkGenMetrics {
} }
} }
impl JobMetrics {
pub fn new(registry: &Registry) -> Result<Self, prometheus::Error> {
let bucket = vec![
Duration::from_micros(100).as_secs_f64(),
Duration::from_millis(2).as_secs_f64(),
Duration::from_millis(100).as_secs_f64(),
];
let job_queried_hst = HistogramVec::new(
HistogramOpts::new(
"job_queried_hst",
"shows the detailed time each job name took from query till it started to execute \
as histogram",
)
.buckets(bucket),
&["name"],
)?;
let bucket = vec![
Duration::from_millis(5).as_secs_f64(),
Duration::from_millis(20).as_secs_f64(),
Duration::from_millis(50).as_secs_f64(),
Duration::from_millis(100).as_secs_f64(),
Duration::from_millis(200).as_secs_f64(),
Duration::from_millis(500).as_secs_f64(),
Duration::from_millis(1000).as_secs_f64(),
Duration::from_millis(10000).as_secs_f64(),
];
let job_execution_hst = HistogramVec::new(
HistogramOpts::new(
"job_execution_hst",
"shows the detailed time each job name took from start of execution until it \
finished as histogram",
)
.buckets(bucket),
&["name"],
)?;
registry.register(Box::new(job_queried_hst.clone()))?;
registry.register(Box::new(job_execution_hst.clone()))?;
Ok(Self {
job_queried_hst,
job_execution_hst,
})
}
}
impl TickMetrics { impl TickMetrics {
pub fn new(registry: &Registry) -> Result<Self, Box<dyn Error>> { pub fn new(registry: &Registry) -> Result<Self, Box<dyn Error>> {
let chonks_count = IntGauge::with_opts(Opts::new( let chonks_count = IntGauge::with_opts(Opts::new(

View File

@ -1,8 +1,8 @@
use crate::{ use crate::{
metrics::{EcsSystemMetrics, PhysicsMetrics, TickMetrics}, metrics::{EcsSystemMetrics, JobMetrics, PhysicsMetrics, TickMetrics},
HwStats, Tick, TickStart, HwStats, Tick, TickStart,
}; };
use common::{resources::TimeOfDay, terrain::TerrainGrid}; use common::{resources::TimeOfDay, slowjob::SlowJobPool, terrain::TerrainGrid};
use common_ecs::{Job, Origin, Phase, SysMetrics, System}; use common_ecs::{Job, Origin, Phase, SysMetrics, System};
use specs::{Entities, Join, Read, ReadExpect}; use specs::{Entities, Join, Read, ReadExpect};
use std::time::Instant; use std::time::Instant;
@ -21,9 +21,11 @@ impl<'a> System<'a> for Sys {
Option<Read<'a, TerrainGrid>>, Option<Read<'a, TerrainGrid>>,
Read<'a, SysMetrics>, Read<'a, SysMetrics>,
Read<'a, common_ecs::PhysicsMetrics>, Read<'a, common_ecs::PhysicsMetrics>,
ReadExpect<'a, SlowJobPool>,
ReadExpect<'a, EcsSystemMetrics>, ReadExpect<'a, EcsSystemMetrics>,
ReadExpect<'a, TickMetrics>, ReadExpect<'a, TickMetrics>,
ReadExpect<'a, PhysicsMetrics>, ReadExpect<'a, PhysicsMetrics>,
ReadExpect<'a, JobMetrics>,
); );
const NAME: &'static str = "metrics"; const NAME: &'static str = "metrics";
@ -41,9 +43,11 @@ impl<'a> System<'a> for Sys {
terrain, terrain,
sys_metrics, sys_metrics,
phys_metrics, phys_metrics,
slowjobpool,
export_ecs, export_ecs,
export_tick, export_tick,
export_physics, export_physics,
export_jobs,
): Self::SystemData, ): Self::SystemData,
) { ) {
const NANOSEC_PER_SEC: f64 = std::time::Duration::from_secs(1).as_nanos() as f64; const NANOSEC_PER_SEC: f64 = std::time::Duration::from_secs(1).as_nanos() as f64;
@ -114,6 +118,24 @@ impl<'a> System<'a> for Sys {
.entity_entity_collisions_count .entity_entity_collisions_count
.inc_by(phys_metrics.entity_entity_collisions); .inc_by(phys_metrics.entity_entity_collisions);
//detailed job metrics
for (name, jobs) in slowjobpool.take_metrics() {
let queried = export_jobs.job_queried_hst.with_label_values(&[&name]);
let executed = export_jobs.job_execution_hst.with_label_values(&[&name]);
for job in jobs {
queried.observe(
job.execution_start
.duration_since(job.queue_created)
.as_secs_f64(),
);
executed.observe(
job.execution_end
.duration_since(job.execution_start)
.as_secs_f64(),
);
}
}
// export self time as best as possible // export self time as best as possible
export_ecs export_ecs
.system_start_time .system_start_time