diff --git a/network/src/lib.rs b/network/src/lib.rs index 0d8776f8c2..faef183cb5 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -1,4 +1,4 @@ -#![feature(trait_alias, try_trait, async_closure)] +#![feature(trait_alias, try_trait, async_closure, const_if_match)] //! Crate to handle high level networking of messages with different //! requirements and priorities over a number of protocols diff --git a/network/src/metrics.rs b/network/src/metrics.rs index 79e951151a..0bc03044d1 100644 --- a/network/src/metrics.rs +++ b/network/src/metrics.rs @@ -1,6 +1,10 @@ -use crate::types::Pid; -use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; +use crate::types::{Cid, Frame, Pid}; +use prometheus::{ + core::{AtomicI64, GenericCounter}, + IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, +}; use std::error::Error; +use tracing::*; //TODO: switch over to Counter for frames_count, message_count, bytes_send, // frames_message_count 1 NetworkMetrics per Network @@ -239,3 +243,91 @@ impl std::fmt::Debug for NetworkMetrics { write!(f, "NetworkMetrics()") } } + +/* +pub(crate) struct PidCidFrameCache { + metric: MetricVec, + pid: String, + cache: Vec<[T::M; 8]>, +} +*/ + +pub(crate) struct PidCidFrameCache { + metric: IntCounterVec, + pid: String, + cache: Vec<[GenericCounter; 8]>, +} + +impl PidCidFrameCache { + const CACHE_SIZE: usize = 16; + + pub fn new(metric: IntCounterVec, pid: Pid) -> Self { + Self { + metric, + pid: pid.to_string(), + cache: vec![], + } + } + + fn populate(&mut self, cid: Cid) { + let start_cid = self.cache.len(); + for i in start_cid..=cid as usize { + let cid = (i as Cid).to_string(); + let entry = [ + self.metric + .with_label_values(&[&self.pid, &cid, Frame::int_to_string(0)]), + self.metric + .with_label_values(&[&self.pid, &cid, Frame::int_to_string(1)]), + self.metric + .with_label_values(&[&self.pid, &cid, Frame::int_to_string(2)]), + self.metric + .with_label_values(&[&self.pid, &cid, Frame::int_to_string(3)]), + self.metric + .with_label_values(&[&self.pid, &cid, Frame::int_to_string(4)]), + self.metric + .with_label_values(&[&self.pid, &cid, Frame::int_to_string(5)]), + self.metric + .with_label_values(&[&self.pid, &cid, Frame::int_to_string(6)]), + self.metric + .with_label_values(&[&self.pid, &cid, Frame::int_to_string(7)]), + ]; + self.cache.push(entry); + } + } + + pub fn with_label_values(&mut self, cid: Cid, frame: &Frame) -> &GenericCounter { + if cid > (Self::CACHE_SIZE as Cid) { + warn!( + ?cid, + "cid, getting quite high, is this a attack on the cache?" + ); + } + self.populate(cid); + &self.cache[cid as usize][frame.get_int() as usize] + } +} + +pub(crate) struct CidFrameCache { + cache: [GenericCounter; 8], +} + +impl CidFrameCache { + pub fn new(metric: IntCounterVec, cid: Cid) -> Self { + let cid = cid.to_string(); + let cache = [ + metric.with_label_values(&[&cid, Frame::int_to_string(0)]), + metric.with_label_values(&[&cid, Frame::int_to_string(1)]), + metric.with_label_values(&[&cid, Frame::int_to_string(2)]), + metric.with_label_values(&[&cid, Frame::int_to_string(3)]), + metric.with_label_values(&[&cid, Frame::int_to_string(4)]), + metric.with_label_values(&[&cid, Frame::int_to_string(5)]), + metric.with_label_values(&[&cid, Frame::int_to_string(6)]), + metric.with_label_values(&[&cid, Frame::int_to_string(7)]), + ]; + Self { cache } + } + + pub fn with_label_values(&mut self, frame: &Frame) -> &GenericCounter { + &self.cache[frame.get_int() as usize] + } +} diff --git a/network/src/participant.rs b/network/src/participant.rs index b8fc5cc4c8..f80f819b4c 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -2,7 +2,7 @@ use crate::{ api::Stream, channel::Channel, message::{InCommingMessage, MessageBuffer, OutGoingMessage}, - metrics::NetworkMetrics, + metrics::{NetworkMetrics, PidCidFrameCache}, prios::PrioManager, protocols::Protocols, types::{Cid, Frame, Pid, Prio, Promises, Sid}, @@ -27,6 +27,7 @@ use tracing::*; #[derive(Debug)] struct ChannelInfo { cid: Cid, + cid_string: String, //optimisation b2w_frame_s: mpsc::UnboundedSender, b2r_read_shutdown: oneshot::Sender<()>, } @@ -52,6 +53,7 @@ struct ControlChannels { #[derive(Debug)] pub struct BParticipant { remote_pid: Pid, + remote_pid_string: String, //optimisation offset_sid: Sid, channels: Arc>>, streams: RwLock>, @@ -92,6 +94,7 @@ impl BParticipant { ( Self { remote_pid, + remote_pid_string: remote_pid.to_string(), offset_sid, channels: Arc::new(RwLock::new(vec![])), streams: RwLock::new(HashMap::new()), @@ -164,6 +167,8 @@ impl BParticipant { self.running_mgr.fetch_add(1, Ordering::Relaxed); let mut closing_up = false; trace!("start send_mgr"); + let mut send_cache = + PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); //while !self.closed.load(Ordering::Relaxed) { loop { let mut frames = VecDeque::new(); @@ -173,7 +178,7 @@ impl BParticipant { trace!("tick {}", len); } for (_, frame) in frames { - self.send_frame(frame).await; + self.send_frame(frame, &mut send_cache).await; } async_std::task::sleep(TICK_TIME).await; //shutdown after all msg are send! @@ -189,17 +194,12 @@ impl BParticipant { self.running_mgr.fetch_sub(1, Ordering::Relaxed); } - async fn send_frame(&self, frame: Frame) { + async fn send_frame(&self, frame: Frame, frames_out_total_cache: &mut PidCidFrameCache) { // find out ideal channel here //TODO: just take first if let Some(ci) = self.channels.write().await.get_mut(0) { - self.metrics - .frames_out_total - .with_label_values(&[ - &self.remote_pid.to_string(), - &ci.cid.to_string(), - frame.get_string(), - ]) + frames_out_total_cache + .with_label_values(ci.cid, &frame) .inc(); ci.b2w_frame_s.send(frame).await.unwrap(); } else { @@ -217,13 +217,12 @@ impl BParticipant { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("start handle_frames_mgr"); let mut messages = HashMap::new(); - let pid_string = &self.remote_pid.to_string(); while let Some((cid, frame)) = w2b_frames_r.next().await { let cid_string = cid.to_string(); //trace!("handling frame"); self.metrics .frames_in_total - .with_label_values(&[&pid_string, &cid_string, frame.get_string()]) + .with_label_values(&[&self.remote_pid_string, &cid_string, frame.get_string()]) .inc(); match frame { Frame::OpenStream { @@ -247,7 +246,7 @@ impl BParticipant { if let Some(si) = self.streams.write().await.remove(&sid) { self.metrics .streams_closed_total - .with_label_values(&[&pid_string]) + .with_label_values(&[&self.remote_pid_string]) .inc(); si.closed.store(true, Ordering::Relaxed); } else { @@ -313,19 +312,20 @@ impl BParticipant { Channel::new(cid, self.remote_pid); channels.write().await.push(ChannelInfo { cid, + cid_string: cid.to_string(), b2w_frame_s, b2r_read_shutdown, }); b2s_create_channel_done_s.send(()).unwrap(); self.metrics .channels_connected_total - .with_label_values(&[&self.remote_pid.to_string()]) + .with_label_values(&[&self.remote_pid_string]) .inc(); trace!(?cid, "running channel in participant"); channel.run(protocol, w2b_frames_s).await; self.metrics .channels_disconnected_total - .with_label_values(&[&self.remote_pid.to_string()]) + .with_label_values(&[&self.remote_pid_string]) .inc(); trace!(?cid, "channel got closed"); } @@ -345,6 +345,8 @@ impl BParticipant { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("start open_mgr"); let mut stream_ids = self.offset_sid; + let mut send_cache = + PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); let mut shutdown_open_mgr_receiver = shutdown_open_mgr_receiver.fuse(); //from api or shutdown signal while let Some((prio, promises, p2a_return_stream)) = select! { @@ -357,11 +359,14 @@ impl BParticipant { let stream = self .create_stream(sid, prio, promises, a2p_msg_s, &a2b_close_stream_s) .await; - self.send_frame(Frame::OpenStream { - sid, - prio, - promises, - }) + self.send_frame( + Frame::OpenStream { + sid, + prio, + promises, + }, + &mut send_cache, + ) .await; p2a_return_stream.send(stream).unwrap(); stream_ids += Sid::from(1); @@ -429,6 +434,8 @@ impl BParticipant { ) { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("start stream_close_mgr"); + let mut send_cache = + PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); let mut shutdown_stream_close_mgr_receiver = shutdown_stream_close_mgr_receiver.fuse(); //from api or shutdown signal @@ -465,11 +472,12 @@ impl BParticipant { trace!(?sid, "stream was successfully flushed"); self.metrics .streams_closed_total - .with_label_values(&[&self.remote_pid.to_string()]) + .with_label_values(&[&self.remote_pid_string]) .inc(); //only now remove the Stream, that means we can still recv on it. self.streams.write().await.remove(&sid); - self.send_frame(Frame::CloseStream { sid }).await; + self.send_frame(Frame::CloseStream { sid }, &mut send_cache) + .await; } trace!("stop stream_close_mgr"); self.running_mgr.fetch_sub(1, Ordering::Relaxed); @@ -493,7 +501,7 @@ impl BParticipant { }); self.metrics .streams_opened_total - .with_label_values(&[&self.remote_pid.to_string()]) + .with_label_values(&[&self.remote_pid_string]) .inc(); Stream::new( self.remote_pid, diff --git a/network/src/protocols.rs b/network/src/protocols.rs index d70890db7d..2bbafaca71 100644 --- a/network/src/protocols.rs +++ b/network/src/protocols.rs @@ -1,5 +1,5 @@ use crate::{ - metrics::NetworkMetrics, + metrics::{CidFrameCache, NetworkMetrics}, types::{Cid, Frame, Mid, Pid, Sid}, }; use async_std::{ @@ -64,6 +64,7 @@ impl TcpProtocol { end_receiver: oneshot::Receiver<()>, ) { trace!("starting up tcp write()"); + let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_in_total.clone(), cid); let mut stream = self.stream.clone(); let mut end_receiver = end_receiver.fuse(); loop { @@ -173,10 +174,7 @@ impl TcpProtocol { Frame::Raw(data) }, }; - self.metrics - .frames_wire_in_total - .with_label_values(&[&cid.to_string(), frame.get_string()]) - .inc(); + metrics_cache.with_label_values(&frame).inc(); from_wire_sender.send((cid, frame)).await.unwrap(); } trace!("shutting down tcp read()"); @@ -188,12 +186,9 @@ impl TcpProtocol { pub async fn write(&self, cid: Cid, mut to_wire_receiver: mpsc::UnboundedReceiver) { trace!("starting up tcp write()"); let mut stream = self.stream.clone(); - let cid_string = cid.to_string(); + let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_out_total.clone(), cid); while let Some(frame) = to_wire_receiver.next().await { - self.metrics - .frames_wire_out_total - .with_label_values(&[&cid_string, frame.get_string()]) - .inc(); + metrics_cache.with_label_values(&frame).inc(); match frame { Frame::Handshake { magic_number, @@ -296,6 +291,7 @@ impl UdpProtocol { end_receiver: oneshot::Receiver<()>, ) { trace!("starting up udp read()"); + let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_in_total.clone(), cid); let mut data_in = self.data_in.write().await; let mut end_receiver = end_receiver.fuse(); while let Some(bytes) = select! { @@ -388,10 +384,7 @@ impl UdpProtocol { }, _ => Frame::Raw(bytes), }; - self.metrics - .frames_wire_in_total - .with_label_values(&[&cid.to_string(), frame.get_string()]) - .inc(); + metrics_cache.with_label_values(&frame).inc(); from_wire_sender.send((cid, frame)).await.unwrap(); } trace!("shutting down udp read()"); @@ -400,12 +393,9 @@ impl UdpProtocol { pub async fn write(&self, cid: Cid, mut to_wire_receiver: mpsc::UnboundedReceiver) { trace!("starting up udp write()"); let mut buffer = [0u8; 2000]; - let cid_string = cid.to_string(); + let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_out_total.clone(), cid); while let Some(frame) = to_wire_receiver.next().await { - self.metrics - .frames_wire_out_total - .with_label_values(&[&cid_string, frame.get_string()]) - .inc(); + metrics_cache.with_label_values(&frame).inc(); let len = match frame { Frame::Handshake { magic_number, diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 5abc4e4262..a2b2ecfe96 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -133,6 +133,7 @@ impl Scheduler { self.listen_mgr(run_channels.a2s_listen_r), self.connect_mgr(run_channels.a2s_connect_r), self.disconnect_mgr(run_channels.a2s_disconnect_r), + self.prio_adj_mgr(), self.scheduler_shutdown_mgr(run_channels.a2s_scheduler_shutdown_r), ); } @@ -261,6 +262,12 @@ impl Scheduler { trace!("stop disconnect_mgr"); } + async fn prio_adj_mgr(&self) { + trace!("start prio_adj_mgr"); + //TODO adjust prios in participants here! + trace!("stop prio_adj_mgr"); + } + async fn scheduler_shutdown_mgr(&self, a2s_scheduler_shutdown_r: oneshot::Receiver<()>) { trace!("start scheduler_shutdown_mgr"); a2s_scheduler_shutdown_r.await.unwrap(); diff --git a/network/src/types.rs b/network/src/types.rs index b98de3ba71..dfa3ab1d9a 100644 --- a/network/src/types.rs +++ b/network/src/types.rs @@ -89,33 +89,49 @@ pub(crate) enum Frame { } impl Frame { - pub fn get_string(&self) -> &str { + pub const fn int_to_string(i: u8) -> &'static str { + match i { + 0 => "Handshake", + 1 => "ParticipantId", + 2 => "Shutdown", + 3 => "OpenStream", + 4 => "CloseStream", + 5 => "DataHeader", + 6 => "Data", + 7 => "Raw", + _ => "", + } + } + + pub fn get_int(&self) -> u8 { match self { Frame::Handshake { magic_number: _, version: _, - } => "Handshake", - Frame::ParticipantId { pid: _ } => "ParticipantId", - Frame::Shutdown => "Shutdown", + } => 0, + Frame::ParticipantId { pid: _ } => 1, + Frame::Shutdown => 2, Frame::OpenStream { sid: _, prio: _, promises: _, - } => "OpenStream", - Frame::CloseStream { sid: _ } => "CloseStream", + } => 3, + Frame::CloseStream { sid: _ } => 4, Frame::DataHeader { mid: _, sid: _, length: _, - } => "DataHeader", + } => 5, Frame::Data { mid: _, start: _, data: _, - } => "Data", - Frame::Raw(_) => "Raw", + } => 6, + Frame::Raw(_) => 7, } } + + pub fn get_string(&self) -> &str { Self::int_to_string(self.get_int()) } } impl Pid {