diff --git a/Cargo.lock b/Cargo.lock index 1dbc92c4fe..9f3e444f5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5167,6 +5167,7 @@ dependencies = [ "futures 0.3.4", "mio", "mio-extras", + "prometheus", "serde", "serde_derive", "tlid", diff --git a/network/Cargo.toml b/network/Cargo.toml index c08e1032cf..e2967e9021 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -19,5 +19,6 @@ tracing-subscriber = "0.2.0-alpha.4" byteorder = "1.3" mio-extras = "2.0" futures = "0.3" +prometheus = "0.7" uuid = { version = "0.8", features = ["serde", "v4"] } tlid = { path = "../../tlid", features = ["serde"]} \ No newline at end of file diff --git a/network/src/api.rs b/network/src/api.rs index a3e534a57f..0c363396cd 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -2,6 +2,7 @@ use crate::{ internal::RemoteParticipant, message::{self, OutGoingMessage}, worker::{ + metrics::NetworkMetrics, types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects}, Channel, Controller, TcpChannel, }, @@ -71,6 +72,7 @@ pub struct Network { thread_pool: Arc, participant_id: Pid, remotes: Arc>>, + metrics: Arc>, _pe: PhantomData, } @@ -85,11 +87,13 @@ impl Network { // created and we do not want to polute the traces with // network pid everytime } + let metrics = Arc::new(None); let controller = Arc::new(vec![Controller::new( worker_pool.next(), participant_id, thread_pool.clone(), token_pool.subpool(1000000).unwrap(), + metrics.clone(), remotes.clone(), )]); Self { @@ -99,6 +103,7 @@ impl Network { thread_pool, participant_id, remotes, + metrics, _pe: PhantomData:: {}, } } diff --git a/network/src/worker/metrics.rs b/network/src/worker/metrics.rs new file mode 100644 index 0000000000..a1ad1fd13a --- /dev/null +++ b/network/src/worker/metrics.rs @@ -0,0 +1,144 @@ +use prometheus::{IntGauge, IntGaugeVec, Opts, Registry}; +use std::{ + error::Error, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; + +// 1 NetworkMetrics per Network +pub struct NetworkMetrics { + pub participants_connected: IntGauge, + pub channels_connected: IntGauge, + pub streams_open: IntGauge, + pub worker_count: IntGauge, + pub network_info: IntGauge, + // Frames, seperated by CHANNEL (add PART and PROTOCOL) AND FRAME TYPE, + pub frames_count: IntGaugeVec, + // send Messages, seperated by STREAM (add PART and PROTOCOL, CHANNEL), + pub message_count: IntGaugeVec, + // send Messages bytes, seperated by STREAM (add PART and PROTOCOL, CHANNEL), + pub bytes_send: IntGaugeVec, + // queued Messages, seperated by STREAM (add PART and PROTOCOL, CHANNEL), + pub queue_count: IntGaugeVec, + // worker seperated by CHANNEL (add PART and PROTOCOL), + pub worker_work_time: IntGaugeVec, + // worker seperated by CHANNEL (add PART and PROTOCOL), + pub worker_idle_time: IntGaugeVec, + // ping calculated based on last msg + pub participants_ping: IntGaugeVec, + tick: Arc, +} + +impl NetworkMetrics { + pub fn new(registry: &Registry, tick: Arc) -> Result> { + let participants_connected = IntGauge::with_opts(Opts::new( + "participants_connected", + "shows the number of participants connected to the network", + ))?; + let channels_connected = IntGauge::with_opts(Opts::new( + "channels_connected", + "number of all channels currently connected on the network", + ))?; + let streams_open = IntGauge::with_opts(Opts::new( + "streams_open", + "number of all streams currently open on the network", + ))?; + let worker_count = IntGauge::with_opts(Opts::new( + "worker_count", + "number of workers currently running", + ))?; + let opts = Opts::new("network_info", "Static Network information").const_label( + "version", + &format!( + "{}.{}.{}", + &crate::internal::VELOREN_NETWORK_VERSION[0], + &crate::internal::VELOREN_NETWORK_VERSION[1], + &crate::internal::VELOREN_NETWORK_VERSION[2] + ), + ); + let network_info = IntGauge::with_opts(opts)?; + + let frames_count = IntGaugeVec::from(IntGaugeVec::new( + Opts::new( + "frames_count", + "time in ns requiered for a tick of the server", + ), + &["channel"], + )?); + let message_count = IntGaugeVec::from(IntGaugeVec::new( + Opts::new( + "message_count", + "time in ns requiered for a tick of the server", + ), + &["channel"], + )?); + let bytes_send = IntGaugeVec::from(IntGaugeVec::new( + Opts::new( + "bytes_send", + "time in ns requiered for a tick of the server", + ), + &["channel"], + )?); + let queue_count = IntGaugeVec::from(IntGaugeVec::new( + Opts::new( + "queue_count", + "time in ns requiered for a tick of the server", + ), + &["channel"], + )?); + let worker_work_time = IntGaugeVec::from(IntGaugeVec::new( + Opts::new( + "worker_work_time", + "time in ns requiered for a tick of the server", + ), + &["channel"], + )?); + let worker_idle_time = IntGaugeVec::from(IntGaugeVec::new( + Opts::new( + "worker_idle_time", + "time in ns requiered for a tick of the server", + ), + &["channel"], + )?); + let participants_ping = IntGaugeVec::from(IntGaugeVec::new( + Opts::new( + "participants_ping", + "time in ns requiered for a tick of the server", + ), + &["channel"], + )?); + + registry.register(Box::new(participants_connected.clone()))?; + registry.register(Box::new(channels_connected.clone()))?; + registry.register(Box::new(streams_open.clone()))?; + registry.register(Box::new(worker_count.clone()))?; + registry.register(Box::new(network_info.clone()))?; + registry.register(Box::new(frames_count.clone()))?; + registry.register(Box::new(message_count.clone()))?; + registry.register(Box::new(bytes_send.clone()))?; + registry.register(Box::new(queue_count.clone()))?; + registry.register(Box::new(worker_work_time.clone()))?; + registry.register(Box::new(worker_idle_time.clone()))?; + registry.register(Box::new(participants_ping.clone()))?; + + Ok(Self { + participants_connected, + channels_connected, + streams_open, + worker_count, + network_info, + frames_count, + message_count, + bytes_send, + queue_count, + worker_work_time, + worker_idle_time, + participants_ping, + tick, + }) + } + + pub fn is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 } +} diff --git a/network/src/worker/mod.rs b/network/src/worker/mod.rs index 6d9b158cc2..4320f4e40a 100644 --- a/network/src/worker/mod.rs +++ b/network/src/worker/mod.rs @@ -5,6 +5,7 @@ communication is done via channels. */ pub mod channel; +pub mod metrics; pub mod mpsc; pub mod tcp; pub mod types; @@ -19,7 +20,8 @@ pub(crate) use udp::UdpChannel; use crate::{ internal::RemoteParticipant, worker::{ - types::{CtrlMsg, Pid, RtrnMsg, Statistics}, + metrics::NetworkMetrics, + types::{CtrlMsg, Pid, RtrnMsg}, worker::Worker, }, }; @@ -40,7 +42,6 @@ use uvth::ThreadPool; */ pub struct Controller { poll: Arc, - statistics: Arc>, ctrl_tx: Sender, rtrn_rx: Receiver, } @@ -53,12 +54,11 @@ impl Controller { pid: uuid::Uuid, thread_pool: Arc, mut token_pool: tlid::Pool>, + metrics: Arc>, remotes: Arc>>, ) -> Self { let poll = Arc::new(Poll::new().unwrap()); let poll_clone = poll.clone(); - let statistics = Arc::new(RwLock::new(Statistics::default())); - let statistics_clone = statistics.clone(); let (ctrl_tx, ctrl_rx) = channel(); let (rtrn_tx, rtrn_rx) = channel(); @@ -74,29 +74,17 @@ impl Controller { let span = span!(Level::INFO, "worker", ?w); let _enter = span.enter(); let mut worker = Worker::new( - pid, - poll_clone, - statistics_clone, - remotes, - token_pool, - ctrl_rx, - rtrn_tx, + pid, poll_clone, metrics, remotes, token_pool, ctrl_rx, rtrn_tx, ); worker.run(); }); Controller { poll, - statistics, ctrl_tx, rtrn_rx, } } - pub fn get_load_ratio(&self) -> f32 { - let statistics = self.statistics.read().unwrap(); - statistics.nano_busy as f32 / (statistics.nano_busy + statistics.nano_wait + 1) as f32 - } - //TODO: split 4->5 MioWorkers and merge 5->4 MioWorkers pub(crate) fn get_tx(&self) -> Sender { self.ctrl_tx.clone() } diff --git a/network/src/worker/types.rs b/network/src/worker/types.rs index 5bec57070c..ce9559e08e 100644 --- a/network/src/worker/types.rs +++ b/network/src/worker/types.rs @@ -44,13 +44,6 @@ pub(crate) enum RtrnMsg { Receive(InCommingMessage), } -// MioStatistics should be copied in order to not hold locks for long -#[derive(Clone, Default)] -pub struct Statistics { - pub nano_wait: u128, - pub nano_busy: u128, -} - pub(crate) enum TokenObjects { TcpListener(TcpListener), TcpChannel(Channel, Option>), diff --git a/network/src/worker/worker.rs b/network/src/worker/worker.rs index e113f19a83..f8e0e3a88d 100644 --- a/network/src/worker/worker.rs +++ b/network/src/worker/worker.rs @@ -1,7 +1,8 @@ use crate::{ internal::RemoteParticipant, worker::{ - types::{CtrlMsg, Pid, RtrnMsg, Statistics, TokenObjects}, + metrics::NetworkMetrics, + types::{CtrlMsg, Pid, RtrnMsg, TokenObjects}, Channel, Controller, TcpChannel, }, }; @@ -42,7 +43,7 @@ impl MioTokens { pub(crate) struct Worker { pid: Pid, poll: Arc, - statistics: Arc>, + metrics: Arc>, remotes: Arc>>, ctrl_rx: Receiver, rtrn_tx: Sender, @@ -55,7 +56,7 @@ impl Worker { pub fn new( pid: Pid, poll: Arc, - statistics: Arc>, + metrics: Arc>, remotes: Arc>>, token_pool: tlid::Pool>, ctrl_rx: Receiver, @@ -65,7 +66,7 @@ impl Worker { Worker { pid, poll, - statistics, + metrics, remotes, ctrl_rx, rtrn_tx, @@ -252,33 +253,19 @@ impl Worker { fn handle_statistics(&mut self) { let time_after_work = Instant::now(); - let mut statistics = match self.statistics.try_write() { - Ok(s) => s, - Err(e) => { - warn!( - ?e, - "statistics dropped because they are currently accecssed" - ); - return; - }, - }; - const KEEP_FACTOR: f64 = 0.995; - //in order to weight new data stronger than older we fade them out with a - // factor < 1. for 0.995 under full load (500 ticks a 1ms) we keep 8% of the old - // value this means, that we start to see load comming up after - // 500ms, but not for small spikes - as reordering for smaller spikes would be - // to slow - let first = self.time_after_poll.duration_since(self.time_before_poll); - let second = time_after_work.duration_since(self.time_after_poll); - statistics.nano_wait = - (statistics.nano_wait as f64 * KEEP_FACTOR) as u128 + first.as_nanos(); - statistics.nano_busy = - (statistics.nano_busy as f64 * KEEP_FACTOR) as u128 + second.as_nanos(); + let idle = self.time_after_poll.duration_since(self.time_before_poll); + let work = time_after_work.duration_since(self.time_after_poll); - trace!( - "current Load {}", - statistics.nano_busy as f32 / (statistics.nano_busy + statistics.nano_wait + 1) as f32 - ); + if let Some(metric) = &*self.metrics { + metric + .worker_idle_time + .with_label_values(&["message"]) + .add(idle.as_millis() as i64); //todo convert correctly ! + metric + .worker_work_time + .with_label_values(&["message"]) + .add(work.as_millis() as i64); + } } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 6b8efeedf5..44d845d2c6 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -38,7 +38,7 @@ use common::{ vol::{ReadVol, RectVolSize}, }; use log::{debug, error}; -use metrics::ServerMetrics; +use metrics::{ServerMetrics, TickMetrics}; use specs::{join::Join, Builder, Entity as EcsEntity, RunNow, SystemData, WorldExt}; use std::{ i32, @@ -80,6 +80,7 @@ pub struct Server { server_info: ServerInfo, metrics: ServerMetrics, + tick_metrics: TickMetrics, server_settings: ServerSettings, } @@ -215,6 +216,14 @@ impl Server { state.ecs_mut().insert(DeletedEntities::default()); + let mut metrics = ServerMetrics::new(); + // register all metrics submodules here + let tick_metrics = TickMetrics::new(metrics.registry(), metrics.tick_clone()) + .expect("Failed to initialize server tick metrics submodule."); + metrics + .run(settings.metrics_address) + .expect("Failed to initialize server metrics submodule."); + let this = Self { state, world: Arc::new(world), @@ -233,8 +242,8 @@ impl Server { git_date: common::util::GIT_DATE.to_string(), auth_provider: settings.auth_server_address.clone(), }, - metrics: ServerMetrics::new(settings.metrics_address) - .expect("Failed to initialize server metrics submodule."), + metrics, + tick_metrics, server_settings: settings.clone(), }; @@ -401,87 +410,87 @@ impl Server { let total_sys_ran_in_dispatcher_nanos = terrain_nanos + waypoint_nanos; // Report timing info - self.metrics + self.tick_metrics .tick_time .with_label_values(&["new connections"]) .set((before_message_system - before_new_connections).as_nanos() as i64); - self.metrics + self.tick_metrics .tick_time .with_label_values(&["state tick"]) .set( (before_handle_events - before_state_tick).as_nanos() as i64 - total_sys_ran_in_dispatcher_nanos, ); - self.metrics + self.tick_metrics .tick_time .with_label_values(&["handle server events"]) .set((before_update_terrain_and_regions - before_handle_events).as_nanos() as i64); - self.metrics + self.tick_metrics .tick_time .with_label_values(&["update terrain and region map"]) .set((before_sync - before_update_terrain_and_regions).as_nanos() as i64); - self.metrics + self.tick_metrics .tick_time .with_label_values(&["world tick"]) .set((before_entity_cleanup - before_world_tick).as_nanos() as i64); - self.metrics + self.tick_metrics .tick_time .with_label_values(&["entity cleanup"]) .set((end_of_server_tick - before_entity_cleanup).as_nanos() as i64); - self.metrics + self.tick_metrics .tick_time .with_label_values(&["entity sync"]) .set(entity_sync_nanos); - self.metrics + self.tick_metrics .tick_time .with_label_values(&["message"]) .set(message_nanos); - self.metrics + self.tick_metrics .tick_time .with_label_values(&["sentinel"]) .set(sentinel_nanos); - self.metrics + self.tick_metrics .tick_time .with_label_values(&["subscription"]) .set(subscription_nanos); - self.metrics + self.tick_metrics .tick_time .with_label_values(&["terrain sync"]) .set(terrain_sync_nanos); - self.metrics + self.tick_metrics .tick_time .with_label_values(&["terrain"]) .set(terrain_nanos); - self.metrics + self.tick_metrics .tick_time .with_label_values(&["waypoint"]) .set(waypoint_nanos); - self.metrics + self.tick_metrics .tick_time .with_label_values(&["persistence:stats"]) .set(stats_persistence_nanos); // Report other info - self.metrics + self.tick_metrics .player_online .set(self.state.ecs().read_storage::().join().count() as i64); - self.metrics + self.tick_metrics .time_of_day .set(self.state.ecs().read_resource::().0); - if self.metrics.is_100th_tick() { + if self.tick_metrics.is_100th_tick() { let mut chonk_cnt = 0; let chunk_cnt = self.state.terrain().iter().fold(0, |a, (_, c)| { chonk_cnt += 1; a + c.sub_chunks_len() }); - self.metrics.chonks_count.set(chonk_cnt as i64); - self.metrics.chunks_count.set(chunk_cnt as i64); + self.tick_metrics.chonks_count.set(chonk_cnt as i64); + self.tick_metrics.chunks_count.set(chunk_cnt as i64); let entity_count = self.state.ecs().entities().join().count(); - self.metrics.entity_count.set(entity_count as i64); + self.tick_metrics.entity_count.set(entity_count as i64); } //self.metrics.entity_count.set(self.state.); - self.metrics + self.tick_metrics .tick_time .with_label_values(&["metrics"]) .set(end_of_server_tick.elapsed().as_nanos() as i64); @@ -588,7 +597,7 @@ impl Server { .is_some() } - pub fn number_of_players(&self) -> i64 { self.metrics.player_online.get() } + pub fn number_of_players(&self) -> i64 { self.tick_metrics.player_online.get() } } impl Drop for Server { diff --git a/server/src/metrics.rs b/server/src/metrics.rs index 23c9a70e51..0c13a98c8c 100644 --- a/server/src/metrics.rs +++ b/server/src/metrics.rs @@ -6,14 +6,14 @@ use std::{ error::Error, net::SocketAddr, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, thread, time::{Duration, SystemTime, UNIX_EPOCH}, }; -pub struct ServerMetrics { +pub struct TickMetrics { pub chonks_count: IntGauge, pub chunks_count: IntGauge, pub player_online: IntGauge, @@ -23,77 +23,116 @@ pub struct ServerMetrics { pub start_time: IntGauge, pub time_of_day: Gauge, pub light_count: IntGauge, - running: Arc, - pub handle: Option>, - pub every_100th: i8, + tick: Arc, } -impl ServerMetrics { - pub fn new(addr: SocketAddr) -> Result> { - let opts = Opts::new( +pub struct ServerMetrics { + running: Arc, + handle: Option>, + registry: Option, + tick: Arc, +} + +impl TickMetrics { + pub fn new(registry: &Registry, tick: Arc) -> Result> { + let player_online = IntGauge::with_opts(Opts::new( "player_online", "shows the number of clients connected to the server", - ); - let player_online = IntGauge::with_opts(opts)?; - let opts = Opts::new( + ))?; + let entity_count = IntGauge::with_opts(Opts::new( "entity_count", "number of all entities currently active on the server", - ); - let entity_count = IntGauge::with_opts(opts)?; + ))?; let opts = Opts::new("veloren_build_info", "Build information") .const_label("hash", &common::util::GIT_HASH) .const_label("version", ""); let build_info = IntGauge::with_opts(opts)?; - let opts = Opts::new( + let start_time = IntGauge::with_opts(Opts::new( "veloren_start_time", "start time of the server in seconds since EPOCH", - ); - let start_time = IntGauge::with_opts(opts)?; - let opts = Opts::new("time_of_day", "ingame time in ingame-seconds"); - let time_of_day = Gauge::with_opts(opts)?; - let opts = Opts::new( + ))?; + let time_of_day = + Gauge::with_opts(Opts::new("time_of_day", "ingame time in ingame-seconds"))?; + let light_count = IntGauge::with_opts(Opts::new( "light_count", "number of all lights currently active on the server", - ); - let light_count = IntGauge::with_opts(opts)?; - let opts = Opts::new( + ))?; + let chonks_count = IntGauge::with_opts(Opts::new( "chonks_count", "number of all chonks currently active on the server", - ); - let chonks_count = IntGauge::with_opts(opts)?; - let opts = Opts::new( + ))?; + let chunks_count = IntGauge::with_opts(Opts::new( "chunks_count", "number of all chunks currently active on the server", - ); - let chunks_count = IntGauge::with_opts(opts)?; - let vec = IntGaugeVec::new( + ))?; + let tick_time = IntGaugeVec::from(IntGaugeVec::new( Opts::new("tick_time", "time in ns requiered for a tick of the server"), &["period"], - )?; - let tick_time = IntGaugeVec::from(vec); + )?); let since_the_epoch = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("Time went backwards"); start_time.set(since_the_epoch.as_secs().try_into()?); - let registry = Registry::new(); - //registry.register(Box::new(chonks_count.clone())).unwrap(); registry.register(Box::new(player_online.clone()))?; registry.register(Box::new(entity_count.clone()))?; registry.register(Box::new(build_info.clone()))?; registry.register(Box::new(start_time.clone()))?; registry.register(Box::new(time_of_day.clone()))?; - //registry.register(Box::new(light_count.clone())).unwrap(); registry.register(Box::new(chonks_count.clone()))?; registry.register(Box::new(chunks_count.clone()))?; registry.register(Box::new(tick_time.clone()))?; - let running = Arc::new(AtomicBool::new(true)); - let running2 = running.clone(); + Ok(Self { + chonks_count, + chunks_count, + player_online, + entity_count, + tick_time, + build_info, + start_time, + time_of_day, + light_count, + tick, + }) + } + + pub fn is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 } +} + +impl ServerMetrics { + pub fn new() -> Self { + let running = Arc::new(AtomicBool::new(false)); + let tick = Arc::new(AtomicU64::new(0)); + let registry = Some(Registry::new()); + + Self { + running, + handle: None, + registry, + tick, + } + } + + pub fn registry(&self) -> &Registry { + match self.registry { + Some(ref r) => r, + None => panic!("You cannot longer register new metrics after the server has started!"), + } + } + + pub fn run(&mut self, addr: SocketAddr) -> Result<(), Box> { + self.running.store(true, Ordering::Relaxed); + let running2 = self.running.clone(); + + let registry = self + .registry + .take() + .expect("ServerMetrics must be already started"); //TODO: make this a job - let handle = Some(thread::spawn(move || { + self.handle = Some(thread::spawn(move || { let server = Server::new(addr, move |request| { router!(request, (GET) (/metrics) => { @@ -106,7 +145,7 @@ impl ServerMetrics { _ => rouille::Response::empty_404() ) }) - .expect("Failed to start server"); + .expect("Failed to start server"); info!("Started server metrics: {}", addr); while running2.load(Ordering::Relaxed) { server.poll(); @@ -114,32 +153,12 @@ impl ServerMetrics { thread::sleep(Duration::from_millis(100)); } })); - - Ok(Self { - chonks_count, - chunks_count, - player_online, - entity_count, - tick_time, - build_info, - start_time, - time_of_day, - light_count, - running, - handle, - every_100th: 0, - }) + Ok(()) } - pub fn is_100th_tick(&mut self) -> bool { - self.every_100th += 1; - if self.every_100th == 100 { - self.every_100th = 0; - true - } else { - false - } - } + pub fn tick(&self) -> u64 { self.tick.fetch_add(1, Ordering::Relaxed) + 1 } + + pub fn tick_clone(&self) -> Arc { self.tick.clone() } } impl Drop for ServerMetrics {