Differ Metrics to make it easier to implement your own metric coding!

Implement my own metric coding in networking
This commit is contained in:
Marcel Märtens 2020-02-19 18:08:57 +01:00
parent f3251c0879
commit 88f6b36a4e
9 changed files with 287 additions and 140 deletions

1
Cargo.lock generated
View File

@ -5167,6 +5167,7 @@ dependencies = [
"futures 0.3.4",
"mio",
"mio-extras",
"prometheus",
"serde",
"serde_derive",
"tlid",

View File

@ -19,5 +19,6 @@ tracing-subscriber = "0.2.0-alpha.4"
byteorder = "1.3"
mio-extras = "2.0"
futures = "0.3"
prometheus = "0.7"
uuid = { version = "0.8", features = ["serde", "v4"] }
tlid = { path = "../../tlid", features = ["serde"]}

View File

@ -2,6 +2,7 @@ use crate::{
internal::RemoteParticipant,
message::{self, OutGoingMessage},
worker::{
metrics::NetworkMetrics,
types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects},
Channel, Controller, TcpChannel,
},
@ -71,6 +72,7 @@ pub struct Network<E: Events> {
thread_pool: Arc<ThreadPool>,
participant_id: Pid,
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
metrics: Arc<Option<NetworkMetrics>>,
_pe: PhantomData<E>,
}
@ -85,11 +87,13 @@ impl<E: Events> Network<E> {
// created and we do not want to polute the traces with
// network pid everytime
}
let metrics = Arc::new(None);
let controller = Arc::new(vec![Controller::new(
worker_pool.next(),
participant_id,
thread_pool.clone(),
token_pool.subpool(1000000).unwrap(),
metrics.clone(),
remotes.clone(),
)]);
Self {
@ -99,6 +103,7 @@ impl<E: Events> Network<E> {
thread_pool,
participant_id,
remotes,
metrics,
_pe: PhantomData::<E> {},
}
}

View File

@ -0,0 +1,144 @@
use prometheus::{IntGauge, IntGaugeVec, Opts, Registry};
use std::{
error::Error,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
// 1 NetworkMetrics per Network
pub struct NetworkMetrics {
pub participants_connected: IntGauge,
pub channels_connected: IntGauge,
pub streams_open: IntGauge,
pub worker_count: IntGauge,
pub network_info: IntGauge,
// Frames, seperated by CHANNEL (add PART and PROTOCOL) AND FRAME TYPE,
pub frames_count: IntGaugeVec,
// send Messages, seperated by STREAM (add PART and PROTOCOL, CHANNEL),
pub message_count: IntGaugeVec,
// send Messages bytes, seperated by STREAM (add PART and PROTOCOL, CHANNEL),
pub bytes_send: IntGaugeVec,
// queued Messages, seperated by STREAM (add PART and PROTOCOL, CHANNEL),
pub queue_count: IntGaugeVec,
// worker seperated by CHANNEL (add PART and PROTOCOL),
pub worker_work_time: IntGaugeVec,
// worker seperated by CHANNEL (add PART and PROTOCOL),
pub worker_idle_time: IntGaugeVec,
// ping calculated based on last msg
pub participants_ping: IntGaugeVec,
tick: Arc<AtomicU64>,
}
impl NetworkMetrics {
pub fn new(registry: &Registry, tick: Arc<AtomicU64>) -> Result<Self, Box<dyn Error>> {
let participants_connected = IntGauge::with_opts(Opts::new(
"participants_connected",
"shows the number of participants connected to the network",
))?;
let channels_connected = IntGauge::with_opts(Opts::new(
"channels_connected",
"number of all channels currently connected on the network",
))?;
let streams_open = IntGauge::with_opts(Opts::new(
"streams_open",
"number of all streams currently open on the network",
))?;
let worker_count = IntGauge::with_opts(Opts::new(
"worker_count",
"number of workers currently running",
))?;
let opts = Opts::new("network_info", "Static Network information").const_label(
"version",
&format!(
"{}.{}.{}",
&crate::internal::VELOREN_NETWORK_VERSION[0],
&crate::internal::VELOREN_NETWORK_VERSION[1],
&crate::internal::VELOREN_NETWORK_VERSION[2]
),
);
let network_info = IntGauge::with_opts(opts)?;
let frames_count = IntGaugeVec::from(IntGaugeVec::new(
Opts::new(
"frames_count",
"time in ns requiered for a tick of the server",
),
&["channel"],
)?);
let message_count = IntGaugeVec::from(IntGaugeVec::new(
Opts::new(
"message_count",
"time in ns requiered for a tick of the server",
),
&["channel"],
)?);
let bytes_send = IntGaugeVec::from(IntGaugeVec::new(
Opts::new(
"bytes_send",
"time in ns requiered for a tick of the server",
),
&["channel"],
)?);
let queue_count = IntGaugeVec::from(IntGaugeVec::new(
Opts::new(
"queue_count",
"time in ns requiered for a tick of the server",
),
&["channel"],
)?);
let worker_work_time = IntGaugeVec::from(IntGaugeVec::new(
Opts::new(
"worker_work_time",
"time in ns requiered for a tick of the server",
),
&["channel"],
)?);
let worker_idle_time = IntGaugeVec::from(IntGaugeVec::new(
Opts::new(
"worker_idle_time",
"time in ns requiered for a tick of the server",
),
&["channel"],
)?);
let participants_ping = IntGaugeVec::from(IntGaugeVec::new(
Opts::new(
"participants_ping",
"time in ns requiered for a tick of the server",
),
&["channel"],
)?);
registry.register(Box::new(participants_connected.clone()))?;
registry.register(Box::new(channels_connected.clone()))?;
registry.register(Box::new(streams_open.clone()))?;
registry.register(Box::new(worker_count.clone()))?;
registry.register(Box::new(network_info.clone()))?;
registry.register(Box::new(frames_count.clone()))?;
registry.register(Box::new(message_count.clone()))?;
registry.register(Box::new(bytes_send.clone()))?;
registry.register(Box::new(queue_count.clone()))?;
registry.register(Box::new(worker_work_time.clone()))?;
registry.register(Box::new(worker_idle_time.clone()))?;
registry.register(Box::new(participants_ping.clone()))?;
Ok(Self {
participants_connected,
channels_connected,
streams_open,
worker_count,
network_info,
frames_count,
message_count,
bytes_send,
queue_count,
worker_work_time,
worker_idle_time,
participants_ping,
tick,
})
}
pub fn is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 }
}

View File

@ -5,6 +5,7 @@
communication is done via channels.
*/
pub mod channel;
pub mod metrics;
pub mod mpsc;
pub mod tcp;
pub mod types;
@ -19,7 +20,8 @@ pub(crate) use udp::UdpChannel;
use crate::{
internal::RemoteParticipant,
worker::{
types::{CtrlMsg, Pid, RtrnMsg, Statistics},
metrics::NetworkMetrics,
types::{CtrlMsg, Pid, RtrnMsg},
worker::Worker,
},
};
@ -40,7 +42,6 @@ use uvth::ThreadPool;
*/
pub struct Controller {
poll: Arc<Poll>,
statistics: Arc<RwLock<Statistics>>,
ctrl_tx: Sender<CtrlMsg>,
rtrn_rx: Receiver<RtrnMsg>,
}
@ -53,12 +54,11 @@ impl Controller {
pid: uuid::Uuid,
thread_pool: Arc<ThreadPool>,
mut token_pool: tlid::Pool<tlid::Wrapping<usize>>,
metrics: Arc<Option<NetworkMetrics>>,
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
) -> Self {
let poll = Arc::new(Poll::new().unwrap());
let poll_clone = poll.clone();
let statistics = Arc::new(RwLock::new(Statistics::default()));
let statistics_clone = statistics.clone();
let (ctrl_tx, ctrl_rx) = channel();
let (rtrn_tx, rtrn_rx) = channel();
@ -74,29 +74,17 @@ impl Controller {
let span = span!(Level::INFO, "worker", ?w);
let _enter = span.enter();
let mut worker = Worker::new(
pid,
poll_clone,
statistics_clone,
remotes,
token_pool,
ctrl_rx,
rtrn_tx,
pid, poll_clone, metrics, remotes, token_pool, ctrl_rx, rtrn_tx,
);
worker.run();
});
Controller {
poll,
statistics,
ctrl_tx,
rtrn_rx,
}
}
pub fn get_load_ratio(&self) -> f32 {
let statistics = self.statistics.read().unwrap();
statistics.nano_busy as f32 / (statistics.nano_busy + statistics.nano_wait + 1) as f32
}
//TODO: split 4->5 MioWorkers and merge 5->4 MioWorkers
pub(crate) fn get_tx(&self) -> Sender<CtrlMsg> { self.ctrl_tx.clone() }

View File

@ -44,13 +44,6 @@ pub(crate) enum RtrnMsg {
Receive(InCommingMessage),
}
// MioStatistics should be copied in order to not hold locks for long
#[derive(Clone, Default)]
pub struct Statistics {
pub nano_wait: u128,
pub nano_busy: u128,
}
pub(crate) enum TokenObjects {
TcpListener(TcpListener),
TcpChannel(Channel<TcpChannel>, Option<Sender<Pid>>),

View File

@ -1,7 +1,8 @@
use crate::{
internal::RemoteParticipant,
worker::{
types::{CtrlMsg, Pid, RtrnMsg, Statistics, TokenObjects},
metrics::NetworkMetrics,
types::{CtrlMsg, Pid, RtrnMsg, TokenObjects},
Channel, Controller, TcpChannel,
},
};
@ -42,7 +43,7 @@ impl MioTokens {
pub(crate) struct Worker {
pid: Pid,
poll: Arc<Poll>,
statistics: Arc<RwLock<Statistics>>,
metrics: Arc<Option<NetworkMetrics>>,
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
ctrl_rx: Receiver<CtrlMsg>,
rtrn_tx: Sender<RtrnMsg>,
@ -55,7 +56,7 @@ impl Worker {
pub fn new(
pid: Pid,
poll: Arc<Poll>,
statistics: Arc<RwLock<Statistics>>,
metrics: Arc<Option<NetworkMetrics>>,
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
token_pool: tlid::Pool<tlid::Wrapping<usize>>,
ctrl_rx: Receiver<CtrlMsg>,
@ -65,7 +66,7 @@ impl Worker {
Worker {
pid,
poll,
statistics,
metrics,
remotes,
ctrl_rx,
rtrn_tx,
@ -252,33 +253,19 @@ impl Worker {
fn handle_statistics(&mut self) {
let time_after_work = Instant::now();
let mut statistics = match self.statistics.try_write() {
Ok(s) => s,
Err(e) => {
warn!(
?e,
"statistics dropped because they are currently accecssed"
);
return;
},
};
const KEEP_FACTOR: f64 = 0.995;
//in order to weight new data stronger than older we fade them out with a
// factor < 1. for 0.995 under full load (500 ticks a 1ms) we keep 8% of the old
// value this means, that we start to see load comming up after
// 500ms, but not for small spikes - as reordering for smaller spikes would be
// to slow
let first = self.time_after_poll.duration_since(self.time_before_poll);
let second = time_after_work.duration_since(self.time_after_poll);
statistics.nano_wait =
(statistics.nano_wait as f64 * KEEP_FACTOR) as u128 + first.as_nanos();
statistics.nano_busy =
(statistics.nano_busy as f64 * KEEP_FACTOR) as u128 + second.as_nanos();
let idle = self.time_after_poll.duration_since(self.time_before_poll);
let work = time_after_work.duration_since(self.time_after_poll);
trace!(
"current Load {}",
statistics.nano_busy as f32 / (statistics.nano_busy + statistics.nano_wait + 1) as f32
);
if let Some(metric) = &*self.metrics {
metric
.worker_idle_time
.with_label_values(&["message"])
.add(idle.as_millis() as i64); //todo convert correctly !
metric
.worker_work_time
.with_label_values(&["message"])
.add(work.as_millis() as i64);
}
}
}

View File

@ -38,7 +38,7 @@ use common::{
vol::{ReadVol, RectVolSize},
};
use log::{debug, error};
use metrics::ServerMetrics;
use metrics::{ServerMetrics, TickMetrics};
use specs::{join::Join, Builder, Entity as EcsEntity, RunNow, SystemData, WorldExt};
use std::{
i32,
@ -80,6 +80,7 @@ pub struct Server {
server_info: ServerInfo,
metrics: ServerMetrics,
tick_metrics: TickMetrics,
server_settings: ServerSettings,
}
@ -215,6 +216,14 @@ impl Server {
state.ecs_mut().insert(DeletedEntities::default());
let mut metrics = ServerMetrics::new();
// register all metrics submodules here
let tick_metrics = TickMetrics::new(metrics.registry(), metrics.tick_clone())
.expect("Failed to initialize server tick metrics submodule.");
metrics
.run(settings.metrics_address)
.expect("Failed to initialize server metrics submodule.");
let this = Self {
state,
world: Arc::new(world),
@ -233,8 +242,8 @@ impl Server {
git_date: common::util::GIT_DATE.to_string(),
auth_provider: settings.auth_server_address.clone(),
},
metrics: ServerMetrics::new(settings.metrics_address)
.expect("Failed to initialize server metrics submodule."),
metrics,
tick_metrics,
server_settings: settings.clone(),
};
@ -401,87 +410,87 @@ impl Server {
let total_sys_ran_in_dispatcher_nanos = terrain_nanos + waypoint_nanos;
// Report timing info
self.metrics
self.tick_metrics
.tick_time
.with_label_values(&["new connections"])
.set((before_message_system - before_new_connections).as_nanos() as i64);
self.metrics
self.tick_metrics
.tick_time
.with_label_values(&["state tick"])
.set(
(before_handle_events - before_state_tick).as_nanos() as i64
- total_sys_ran_in_dispatcher_nanos,
);
self.metrics
self.tick_metrics
.tick_time
.with_label_values(&["handle server events"])
.set((before_update_terrain_and_regions - before_handle_events).as_nanos() as i64);
self.metrics
self.tick_metrics
.tick_time
.with_label_values(&["update terrain and region map"])
.set((before_sync - before_update_terrain_and_regions).as_nanos() as i64);
self.metrics
self.tick_metrics
.tick_time
.with_label_values(&["world tick"])
.set((before_entity_cleanup - before_world_tick).as_nanos() as i64);
self.metrics
self.tick_metrics
.tick_time
.with_label_values(&["entity cleanup"])
.set((end_of_server_tick - before_entity_cleanup).as_nanos() as i64);
self.metrics
self.tick_metrics
.tick_time
.with_label_values(&["entity sync"])
.set(entity_sync_nanos);
self.metrics
self.tick_metrics
.tick_time
.with_label_values(&["message"])
.set(message_nanos);
self.metrics
self.tick_metrics
.tick_time
.with_label_values(&["sentinel"])
.set(sentinel_nanos);
self.metrics
self.tick_metrics
.tick_time
.with_label_values(&["subscription"])
.set(subscription_nanos);
self.metrics
self.tick_metrics
.tick_time
.with_label_values(&["terrain sync"])
.set(terrain_sync_nanos);
self.metrics
self.tick_metrics
.tick_time
.with_label_values(&["terrain"])
.set(terrain_nanos);
self.metrics
self.tick_metrics
.tick_time
.with_label_values(&["waypoint"])
.set(waypoint_nanos);
self.metrics
self.tick_metrics
.tick_time
.with_label_values(&["persistence:stats"])
.set(stats_persistence_nanos);
// Report other info
self.metrics
self.tick_metrics
.player_online
.set(self.state.ecs().read_storage::<Client>().join().count() as i64);
self.metrics
self.tick_metrics
.time_of_day
.set(self.state.ecs().read_resource::<TimeOfDay>().0);
if self.metrics.is_100th_tick() {
if self.tick_metrics.is_100th_tick() {
let mut chonk_cnt = 0;
let chunk_cnt = self.state.terrain().iter().fold(0, |a, (_, c)| {
chonk_cnt += 1;
a + c.sub_chunks_len()
});
self.metrics.chonks_count.set(chonk_cnt as i64);
self.metrics.chunks_count.set(chunk_cnt as i64);
self.tick_metrics.chonks_count.set(chonk_cnt as i64);
self.tick_metrics.chunks_count.set(chunk_cnt as i64);
let entity_count = self.state.ecs().entities().join().count();
self.metrics.entity_count.set(entity_count as i64);
self.tick_metrics.entity_count.set(entity_count as i64);
}
//self.metrics.entity_count.set(self.state.);
self.metrics
self.tick_metrics
.tick_time
.with_label_values(&["metrics"])
.set(end_of_server_tick.elapsed().as_nanos() as i64);
@ -588,7 +597,7 @@ impl Server {
.is_some()
}
pub fn number_of_players(&self) -> i64 { self.metrics.player_online.get() }
pub fn number_of_players(&self) -> i64 { self.tick_metrics.player_online.get() }
}
impl Drop for Server {

View File

@ -6,14 +6,14 @@ use std::{
error::Error,
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
thread,
time::{Duration, SystemTime, UNIX_EPOCH},
};
pub struct ServerMetrics {
pub struct TickMetrics {
pub chonks_count: IntGauge,
pub chunks_count: IntGauge,
pub player_online: IntGauge,
@ -23,77 +23,116 @@ pub struct ServerMetrics {
pub start_time: IntGauge,
pub time_of_day: Gauge,
pub light_count: IntGauge,
running: Arc<AtomicBool>,
pub handle: Option<thread::JoinHandle<()>>,
pub every_100th: i8,
tick: Arc<AtomicU64>,
}
impl ServerMetrics {
pub fn new(addr: SocketAddr) -> Result<Self, Box<dyn Error>> {
let opts = Opts::new(
pub struct ServerMetrics {
running: Arc<AtomicBool>,
handle: Option<thread::JoinHandle<()>>,
registry: Option<Registry>,
tick: Arc<AtomicU64>,
}
impl TickMetrics {
pub fn new(registry: &Registry, tick: Arc<AtomicU64>) -> Result<Self, Box<dyn Error>> {
let player_online = IntGauge::with_opts(Opts::new(
"player_online",
"shows the number of clients connected to the server",
);
let player_online = IntGauge::with_opts(opts)?;
let opts = Opts::new(
))?;
let entity_count = IntGauge::with_opts(Opts::new(
"entity_count",
"number of all entities currently active on the server",
);
let entity_count = IntGauge::with_opts(opts)?;
))?;
let opts = Opts::new("veloren_build_info", "Build information")
.const_label("hash", &common::util::GIT_HASH)
.const_label("version", "");
let build_info = IntGauge::with_opts(opts)?;
let opts = Opts::new(
let start_time = IntGauge::with_opts(Opts::new(
"veloren_start_time",
"start time of the server in seconds since EPOCH",
);
let start_time = IntGauge::with_opts(opts)?;
let opts = Opts::new("time_of_day", "ingame time in ingame-seconds");
let time_of_day = Gauge::with_opts(opts)?;
let opts = Opts::new(
))?;
let time_of_day =
Gauge::with_opts(Opts::new("time_of_day", "ingame time in ingame-seconds"))?;
let light_count = IntGauge::with_opts(Opts::new(
"light_count",
"number of all lights currently active on the server",
);
let light_count = IntGauge::with_opts(opts)?;
let opts = Opts::new(
))?;
let chonks_count = IntGauge::with_opts(Opts::new(
"chonks_count",
"number of all chonks currently active on the server",
);
let chonks_count = IntGauge::with_opts(opts)?;
let opts = Opts::new(
))?;
let chunks_count = IntGauge::with_opts(Opts::new(
"chunks_count",
"number of all chunks currently active on the server",
);
let chunks_count = IntGauge::with_opts(opts)?;
let vec = IntGaugeVec::new(
))?;
let tick_time = IntGaugeVec::from(IntGaugeVec::new(
Opts::new("tick_time", "time in ns requiered for a tick of the server"),
&["period"],
)?;
let tick_time = IntGaugeVec::from(vec);
)?);
let since_the_epoch = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
start_time.set(since_the_epoch.as_secs().try_into()?);
let registry = Registry::new();
//registry.register(Box::new(chonks_count.clone())).unwrap();
registry.register(Box::new(player_online.clone()))?;
registry.register(Box::new(entity_count.clone()))?;
registry.register(Box::new(build_info.clone()))?;
registry.register(Box::new(start_time.clone()))?;
registry.register(Box::new(time_of_day.clone()))?;
//registry.register(Box::new(light_count.clone())).unwrap();
registry.register(Box::new(chonks_count.clone()))?;
registry.register(Box::new(chunks_count.clone()))?;
registry.register(Box::new(tick_time.clone()))?;
let running = Arc::new(AtomicBool::new(true));
let running2 = running.clone();
Ok(Self {
chonks_count,
chunks_count,
player_online,
entity_count,
tick_time,
build_info,
start_time,
time_of_day,
light_count,
tick,
})
}
pub fn is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 }
}
impl ServerMetrics {
pub fn new() -> Self {
let running = Arc::new(AtomicBool::new(false));
let tick = Arc::new(AtomicU64::new(0));
let registry = Some(Registry::new());
Self {
running,
handle: None,
registry,
tick,
}
}
pub fn registry(&self) -> &Registry {
match self.registry {
Some(ref r) => r,
None => panic!("You cannot longer register new metrics after the server has started!"),
}
}
pub fn run(&mut self, addr: SocketAddr) -> Result<(), Box<dyn Error>> {
self.running.store(true, Ordering::Relaxed);
let running2 = self.running.clone();
let registry = self
.registry
.take()
.expect("ServerMetrics must be already started");
//TODO: make this a job
let handle = Some(thread::spawn(move || {
self.handle = Some(thread::spawn(move || {
let server = Server::new(addr, move |request| {
router!(request,
(GET) (/metrics) => {
@ -106,7 +145,7 @@ impl ServerMetrics {
_ => rouille::Response::empty_404()
)
})
.expect("Failed to start server");
.expect("Failed to start server");
info!("Started server metrics: {}", addr);
while running2.load(Ordering::Relaxed) {
server.poll();
@ -114,32 +153,12 @@ impl ServerMetrics {
thread::sleep(Duration::from_millis(100));
}
}));
Ok(Self {
chonks_count,
chunks_count,
player_online,
entity_count,
tick_time,
build_info,
start_time,
time_of_day,
light_count,
running,
handle,
every_100th: 0,
})
Ok(())
}
pub fn is_100th_tick(&mut self) -> bool {
self.every_100th += 1;
if self.every_100th == 100 {
self.every_100th = 0;
true
} else {
false
}
}
pub fn tick(&self) -> u64 { self.tick.fetch_add(1, Ordering::Relaxed) + 1 }
pub fn tick_clone(&self) -> Arc<AtomicU64> { self.tick.clone() }
}
impl Drop for ServerMetrics {