diff --git a/network/src/channel.rs b/network/src/channel.rs index cff0591e3b..815d682160 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -181,7 +181,7 @@ impl Handshake { if let Some(Ok(ref frame)) = frame { self.metrics .frames_in_total - .with_label_values(&["", &cid_string, &frame.get_string()]) + .with_label_values(&[&cid_string, &frame.get_string()]) .inc(); } } @@ -216,7 +216,7 @@ impl Handshake { } else { debug!("Handshake completed"); if self.init_handshake { - self.send_init(&mut c2w_frame_s, "").await; + self.send_init(&mut c2w_frame_s).await; } else { self.send_handshake(&mut c2w_frame_s).await; } @@ -227,7 +227,7 @@ impl Handshake { #[cfg(feature = "metrics")] self.metrics .frames_in_total - .with_label_values(&["", &cid_string, frame.get_string()]) + .with_label_values(&[&cid_string, frame.get_string()]) .inc(); if let Frame::Raw(bytes) = frame { match std::str::from_utf8(bytes.as_slice()) { @@ -258,16 +258,15 @@ impl Handshake { let r = match frame { Some(Ok(Frame::Init { pid, secret })) => { debug!(?pid, "Participant send their ID"); - let pid_string = pid.to_string(); #[cfg(feature = "metrics")] self.metrics .frames_in_total - .with_label_values(&[&pid_string, &cid_string, "ParticipantId"]) + .with_label_values(&[&cid_string, "ParticipantId"]) .inc(); let stream_id_offset = if self.init_handshake { STREAM_ID_OFFSET1 } else { - self.send_init(&mut c2w_frame_s, &pid_string).await; + self.send_init(&mut c2w_frame_s).await; STREAM_ID_OFFSET2 }; info!(?pid, "This Handshake is now configured!"); @@ -277,7 +276,7 @@ impl Handshake { #[cfg(feature = "metrics")] self.metrics .frames_in_total - .with_label_values(&["", &cid_string, frame.get_string()]) + .with_label_values(&[&cid_string, frame.get_string()]) .inc(); if let Frame::Raw(bytes) = frame { match std::str::from_utf8(bytes.as_slice()) { @@ -309,7 +308,7 @@ impl Handshake { #[cfg(feature = "metrics")] self.metrics .frames_out_total - .with_label_values(&["", &self.cid.to_string(), "Handshake"]) + .with_label_values(&[&self.cid.to_string(), "Handshake"]) .inc(); c2w_frame_s .send(Frame::Handshake { @@ -323,13 +322,12 @@ impl Handshake { async fn send_init( &self, c2w_frame_s: &mut mpsc::UnboundedSender, - #[cfg(feature = "metrics")] pid_string: &str, #[cfg(not(feature = "metrics"))] _pid_string: &str, ) { #[cfg(feature = "metrics")] self.metrics .frames_out_total - .with_label_values(&[pid_string, &self.cid.to_string(), "ParticipantId"]) + .with_label_values(&[&self.cid.to_string(), "ParticipantId"]) .inc(); c2w_frame_s .send(Frame::Init { @@ -352,11 +350,11 @@ impl Handshake { let cid_string = self.cid.to_string(); self.metrics .frames_out_total - .with_label_values(&["", &cid_string, "Raw"]) + .with_label_values(&[&cid_string, "Raw"]) .inc(); self.metrics .frames_out_total - .with_label_values(&["", &cid_string, "Shutdown"]) + .with_label_values(&[&cid_string, "Shutdown"]) .inc(); } c2w_frame_s.send(Frame::Raw(data)).await.unwrap(); diff --git a/network/src/metrics.rs b/network/src/metrics.rs index 02940acd40..a753229a55 100644 --- a/network/src/metrics.rs +++ b/network/src/metrics.rs @@ -6,14 +6,18 @@ use prometheus::{ use std::error::Error; use tracing::*; -//TODO: switch over to Counter for frames_count, message_count, bytes_send, -// frames_message_count 1 NetworkMetrics per Network +/// 1:1 relation between NetworkMetrics and Network +/// use 2NF here and avoid redundant data like CHANNEL AND PARTICIPANT encoding. +/// as this will cause a matrix that is full of 0 but needs alot of bandwith and +/// storage #[allow(dead_code)] pub struct NetworkMetrics { pub listen_requests_total: IntCounterVec, pub connect_requests_total: IntCounterVec, pub participants_connected_total: IntCounter, pub participants_disconnected_total: IntCounter, + // channel id's, seperated by PARTICIPANT, max 5 + pub participants_channel_ids: IntGaugeVec, // opened Channels, seperated by PARTICIPANT pub channels_connected_total: IntCounterVec, pub channels_disconnected_total: IntCounterVec, @@ -71,6 +75,13 @@ impl NetworkMetrics { "participants_disconnected_total", "Shows the number of participants disconnected to the network", ))?; + let participants_channel_ids = IntGaugeVec::new( + Opts::new( + "participants_channel_ids", + "Channel numbers belonging to a Participant in the network", + ), + &["participant", "no"], + )?; let channels_connected_total = IntCounterVec::new( Opts::new( "channels_connected_total", @@ -116,14 +127,14 @@ impl NetworkMetrics { "frames_out_total", "Number of all frames send per channel, at the channel level", ), - &["participant", "channel", "frametype"], + &["channel", "frametype"], )?; let frames_in_total = IntCounterVec::new( Opts::new( "frames_in_total", "Number of all frames received per channel, at the channel level", ), - &["participant", "channel", "frametype"], + &["channel", "frametype"], )?; let frames_wire_out_total = IntCounterVec::new( Opts::new( @@ -203,6 +214,7 @@ impl NetworkMetrics { connect_requests_total, participants_connected_total, participants_disconnected_total, + participants_channel_ids, channels_connected_total, channels_disconnected_total, streams_opened_total, @@ -228,6 +240,7 @@ impl NetworkMetrics { registry.register(Box::new(self.connect_requests_total.clone()))?; registry.register(Box::new(self.participants_connected_total.clone()))?; registry.register(Box::new(self.participants_disconnected_total.clone()))?; + registry.register(Box::new(self.participants_channel_ids.clone()))?; registry.register(Box::new(self.channels_connected_total.clone()))?; registry.register(Box::new(self.channels_disconnected_total.clone()))?; registry.register(Box::new(self.streams_opened_total.clone()))?; @@ -266,19 +279,17 @@ pub(crate) struct PidCidFrameCache { } */ -pub(crate) struct PidCidFrameCache { +pub(crate) struct MultiCidFrameCache { metric: IntCounterVec, - pid: String, - cache: Vec<[GenericCounter; Frame::FRAMES_LEN as usize]>, + cache: Vec<[Option>; Frame::FRAMES_LEN as usize]>, } -impl PidCidFrameCache { - const CACHE_SIZE: usize = 512; +impl MultiCidFrameCache { + const CACHE_SIZE: usize = 2048; - pub fn new(metric: IntCounterVec, pid: Pid) -> Self { + pub fn new(metric: IntCounterVec) -> Self { Self { metric, - pid: pid.to_string(), cache: vec![], } } @@ -291,33 +302,22 @@ impl PidCidFrameCache { "cid, getting quite high, is this a attack on the cache?" ); } - for i in start_cid..=cid as usize { - let cid = (i as Cid).to_string(); - let entry = [ - self.metric - .with_label_values(&[&self.pid, &cid, Frame::int_to_string(0)]), - self.metric - .with_label_values(&[&self.pid, &cid, Frame::int_to_string(1)]), - self.metric - .with_label_values(&[&self.pid, &cid, Frame::int_to_string(2)]), - self.metric - .with_label_values(&[&self.pid, &cid, Frame::int_to_string(3)]), - self.metric - .with_label_values(&[&self.pid, &cid, Frame::int_to_string(4)]), - self.metric - .with_label_values(&[&self.pid, &cid, Frame::int_to_string(5)]), - self.metric - .with_label_values(&[&self.pid, &cid, Frame::int_to_string(6)]), - self.metric - .with_label_values(&[&self.pid, &cid, Frame::int_to_string(7)]), - ]; - self.cache.push(entry); - } + self.cache.resize((cid + 1) as usize, [ + None, None, None, None, None, None, None, None, + ]); } pub fn with_label_values(&mut self, cid: Cid, frame: &Frame) -> &GenericCounter { self.populate(cid); - &self.cache[cid as usize][frame.get_int() as usize] + let frame_int = frame.get_int() as usize; + let r = &mut self.cache[cid as usize][frame_int]; + if r.is_none() { + *r = Some( + self.metric + .with_label_values(&[&cid.to_string(), &frame_int.to_string()]), + ); + } + r.as_ref().unwrap() } } @@ -361,12 +361,12 @@ mod tests { } #[test] - fn pid_cid_frame_cache() { + fn multi_cid_frame_cache() { let pid = Pid::fake(1); let frame1 = Frame::Raw(b"Foo".to_vec()); let frame2 = Frame::Raw(b"Bar".to_vec()); let metrics = NetworkMetrics::new(&pid).unwrap(); - let mut cache = PidCidFrameCache::new(metrics.frames_in_total, pid); + let mut cache = MultiCidFrameCache::new(metrics.frames_in_total); let v1 = cache.with_label_values(1, &frame1); v1.inc(); assert_eq!(v1.get(), 1); diff --git a/network/src/participant.rs b/network/src/participant.rs index 2eefa73857..9cedd3ea45 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -1,5 +1,5 @@ #[cfg(feature = "metrics")] -use crate::metrics::{NetworkMetrics, PidCidFrameCache}; +use crate::metrics::{MultiCidFrameCache, NetworkMetrics}; use crate::{ api::{ParticipantError, Stream}, channel::Channel, @@ -205,8 +205,7 @@ impl BParticipant { let mut closing_up = false; trace!("Start send_mgr"); #[cfg(feature = "metrics")] - let mut send_cache = - PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); + let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone()); let mut i: u64 = 0; loop { let mut frames = VecDeque::new(); @@ -250,7 +249,7 @@ impl BParticipant { async fn send_frame( &self, frame: Frame, - #[cfg(feature = "metrics")] frames_out_total_cache: &mut PidCidFrameCache, + #[cfg(feature = "metrics")] frames_out_total_cache: &mut MultiCidFrameCache, ) -> bool { let mut drop_cid = None; // TODO: find out ideal channel here @@ -345,7 +344,7 @@ impl BParticipant { let cid_string = cid.to_string(); self.metrics .frames_in_total - .with_label_values(&[&self.remote_pid_string, &cid_string, frame.get_string()]) + .with_label_values(&[&cid_string, frame.get_string()]) .inc(); } match frame { @@ -490,7 +489,9 @@ impl BParticipant { let channels = self.channels.clone(); async move { let (channel, b2w_frame_s, b2r_read_shutdown) = Channel::new(cid); - channels.write().await.insert( + let mut lock = channels.write().await; + let mut channel_no = lock.len(); + lock.insert( cid, Mutex::new(ChannelInfo { cid, @@ -499,13 +500,27 @@ impl BParticipant { b2r_read_shutdown, }), ); + drop(lock); b2s_create_channel_done_s.send(()).unwrap(); #[cfg(feature = "metrics")] - self.metrics - .channels_connected_total - .with_label_values(&[&self.remote_pid_string]) - .inc(); - trace!(?cid, "Running channel in participant"); + { + self.metrics + .channels_connected_total + .with_label_values(&[&self.remote_pid_string]) + .inc(); + if channel_no > 5 { + debug!(?channel_no, "metrics will overwrite channel #5"); + channel_no = 5; + } + self.metrics + .participants_channel_ids + .with_label_values(&[ + &self.remote_pid_string, + &channel_no.to_string(), + ]) + .set(cid as i64); + } + trace!(?cid, ?channel_no, "Running channel in participant"); channel .run(protocol, w2b_frames_s, leftover_cid_frame) .instrument(tracing::info_span!("", ?cid)) @@ -541,8 +556,7 @@ impl BParticipant { trace!("Start open_mgr"); let mut stream_ids = self.offset_sid; #[cfg(feature = "metrics")] - let mut send_cache = - PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); + let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone()); let mut shutdown_open_mgr_receiver = shutdown_open_mgr_receiver.fuse(); //from api or shutdown signal while let Some((prio, promises, p2a_return_stream)) = select! { @@ -600,8 +614,7 @@ impl BParticipant { trace!("Start participant_shutdown_mgr"); let sender = s2b_shutdown_bparticipant_r.await.unwrap(); - let mut send_cache = - PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); + let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone()); self.close_api(None).await; @@ -687,8 +700,7 @@ impl BParticipant { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("Start stream_close_mgr"); #[cfg(feature = "metrics")] - let mut send_cache = - PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); + let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone()); let mut shutdown_stream_close_mgr_receiver = shutdown_stream_close_mgr_receiver.fuse(); //from api or shutdown signal