speeding up metrics by reducing string generation and Hashmap access with a metrics cache for msg/send and msg/recv

This commit is contained in:
Marcel Märtens 2020-05-25 01:17:03 +02:00
parent 5873fb3a44
commit b746ddfc5c
6 changed files with 167 additions and 54 deletions

View File

@ -1,4 +1,4 @@
#![feature(trait_alias, try_trait, async_closure)] #![feature(trait_alias, try_trait, async_closure, const_if_match)]
//! Crate to handle high level networking of messages with different //! Crate to handle high level networking of messages with different
//! requirements and priorities over a number of protocols //! requirements and priorities over a number of protocols

View File

@ -1,6 +1,10 @@
use crate::types::Pid; use crate::types::{Cid, Frame, Pid};
use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; use prometheus::{
core::{AtomicI64, GenericCounter},
IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry,
};
use std::error::Error; use std::error::Error;
use tracing::*;
//TODO: switch over to Counter for frames_count, message_count, bytes_send, //TODO: switch over to Counter for frames_count, message_count, bytes_send,
// frames_message_count 1 NetworkMetrics per Network // frames_message_count 1 NetworkMetrics per Network
@ -239,3 +243,91 @@ impl std::fmt::Debug for NetworkMetrics {
write!(f, "NetworkMetrics()") write!(f, "NetworkMetrics()")
} }
} }
/*
pub(crate) struct PidCidFrameCache<T: MetricVecBuilder> {
metric: MetricVec<T>,
pid: String,
cache: Vec<[T::M; 8]>,
}
*/
pub(crate) struct PidCidFrameCache {
metric: IntCounterVec,
pid: String,
cache: Vec<[GenericCounter<AtomicI64>; 8]>,
}
impl PidCidFrameCache {
const CACHE_SIZE: usize = 16;
pub fn new(metric: IntCounterVec, pid: Pid) -> Self {
Self {
metric,
pid: pid.to_string(),
cache: vec![],
}
}
fn populate(&mut self, cid: Cid) {
let start_cid = self.cache.len();
for i in start_cid..=cid as usize {
let cid = (i as Cid).to_string();
let entry = [
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(0)]),
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(1)]),
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(2)]),
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(3)]),
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(4)]),
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(5)]),
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(6)]),
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(7)]),
];
self.cache.push(entry);
}
}
pub fn with_label_values(&mut self, cid: Cid, frame: &Frame) -> &GenericCounter<AtomicI64> {
if cid > (Self::CACHE_SIZE as Cid) {
warn!(
?cid,
"cid, getting quite high, is this a attack on the cache?"
);
}
self.populate(cid);
&self.cache[cid as usize][frame.get_int() as usize]
}
}
pub(crate) struct CidFrameCache {
cache: [GenericCounter<AtomicI64>; 8],
}
impl CidFrameCache {
pub fn new(metric: IntCounterVec, cid: Cid) -> Self {
let cid = cid.to_string();
let cache = [
metric.with_label_values(&[&cid, Frame::int_to_string(0)]),
metric.with_label_values(&[&cid, Frame::int_to_string(1)]),
metric.with_label_values(&[&cid, Frame::int_to_string(2)]),
metric.with_label_values(&[&cid, Frame::int_to_string(3)]),
metric.with_label_values(&[&cid, Frame::int_to_string(4)]),
metric.with_label_values(&[&cid, Frame::int_to_string(5)]),
metric.with_label_values(&[&cid, Frame::int_to_string(6)]),
metric.with_label_values(&[&cid, Frame::int_to_string(7)]),
];
Self { cache }
}
pub fn with_label_values(&mut self, frame: &Frame) -> &GenericCounter<AtomicI64> {
&self.cache[frame.get_int() as usize]
}
}

View File

@ -2,7 +2,7 @@ use crate::{
api::Stream, api::Stream,
channel::Channel, channel::Channel,
message::{InCommingMessage, MessageBuffer, OutGoingMessage}, message::{InCommingMessage, MessageBuffer, OutGoingMessage},
metrics::NetworkMetrics, metrics::{NetworkMetrics, PidCidFrameCache},
prios::PrioManager, prios::PrioManager,
protocols::Protocols, protocols::Protocols,
types::{Cid, Frame, Pid, Prio, Promises, Sid}, types::{Cid, Frame, Pid, Prio, Promises, Sid},
@ -27,6 +27,7 @@ use tracing::*;
#[derive(Debug)] #[derive(Debug)]
struct ChannelInfo { struct ChannelInfo {
cid: Cid, cid: Cid,
cid_string: String, //optimisation
b2w_frame_s: mpsc::UnboundedSender<Frame>, b2w_frame_s: mpsc::UnboundedSender<Frame>,
b2r_read_shutdown: oneshot::Sender<()>, b2r_read_shutdown: oneshot::Sender<()>,
} }
@ -52,6 +53,7 @@ struct ControlChannels {
#[derive(Debug)] #[derive(Debug)]
pub struct BParticipant { pub struct BParticipant {
remote_pid: Pid, remote_pid: Pid,
remote_pid_string: String, //optimisation
offset_sid: Sid, offset_sid: Sid,
channels: Arc<RwLock<Vec<ChannelInfo>>>, channels: Arc<RwLock<Vec<ChannelInfo>>>,
streams: RwLock<HashMap<Sid, StreamInfo>>, streams: RwLock<HashMap<Sid, StreamInfo>>,
@ -92,6 +94,7 @@ impl BParticipant {
( (
Self { Self {
remote_pid, remote_pid,
remote_pid_string: remote_pid.to_string(),
offset_sid, offset_sid,
channels: Arc::new(RwLock::new(vec![])), channels: Arc::new(RwLock::new(vec![])),
streams: RwLock::new(HashMap::new()), streams: RwLock::new(HashMap::new()),
@ -164,6 +167,8 @@ impl BParticipant {
self.running_mgr.fetch_add(1, Ordering::Relaxed); self.running_mgr.fetch_add(1, Ordering::Relaxed);
let mut closing_up = false; let mut closing_up = false;
trace!("start send_mgr"); trace!("start send_mgr");
let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
//while !self.closed.load(Ordering::Relaxed) { //while !self.closed.load(Ordering::Relaxed) {
loop { loop {
let mut frames = VecDeque::new(); let mut frames = VecDeque::new();
@ -173,7 +178,7 @@ impl BParticipant {
trace!("tick {}", len); trace!("tick {}", len);
} }
for (_, frame) in frames { for (_, frame) in frames {
self.send_frame(frame).await; self.send_frame(frame, &mut send_cache).await;
} }
async_std::task::sleep(TICK_TIME).await; async_std::task::sleep(TICK_TIME).await;
//shutdown after all msg are send! //shutdown after all msg are send!
@ -189,17 +194,12 @@ impl BParticipant {
self.running_mgr.fetch_sub(1, Ordering::Relaxed); self.running_mgr.fetch_sub(1, Ordering::Relaxed);
} }
async fn send_frame(&self, frame: Frame) { async fn send_frame(&self, frame: Frame, frames_out_total_cache: &mut PidCidFrameCache) {
// find out ideal channel here // find out ideal channel here
//TODO: just take first //TODO: just take first
if let Some(ci) = self.channels.write().await.get_mut(0) { if let Some(ci) = self.channels.write().await.get_mut(0) {
self.metrics frames_out_total_cache
.frames_out_total .with_label_values(ci.cid, &frame)
.with_label_values(&[
&self.remote_pid.to_string(),
&ci.cid.to_string(),
frame.get_string(),
])
.inc(); .inc();
ci.b2w_frame_s.send(frame).await.unwrap(); ci.b2w_frame_s.send(frame).await.unwrap();
} else { } else {
@ -217,13 +217,12 @@ impl BParticipant {
self.running_mgr.fetch_add(1, Ordering::Relaxed); self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start handle_frames_mgr"); trace!("start handle_frames_mgr");
let mut messages = HashMap::new(); let mut messages = HashMap::new();
let pid_string = &self.remote_pid.to_string();
while let Some((cid, frame)) = w2b_frames_r.next().await { while let Some((cid, frame)) = w2b_frames_r.next().await {
let cid_string = cid.to_string(); let cid_string = cid.to_string();
//trace!("handling frame"); //trace!("handling frame");
self.metrics self.metrics
.frames_in_total .frames_in_total
.with_label_values(&[&pid_string, &cid_string, frame.get_string()]) .with_label_values(&[&self.remote_pid_string, &cid_string, frame.get_string()])
.inc(); .inc();
match frame { match frame {
Frame::OpenStream { Frame::OpenStream {
@ -247,7 +246,7 @@ impl BParticipant {
if let Some(si) = self.streams.write().await.remove(&sid) { if let Some(si) = self.streams.write().await.remove(&sid) {
self.metrics self.metrics
.streams_closed_total .streams_closed_total
.with_label_values(&[&pid_string]) .with_label_values(&[&self.remote_pid_string])
.inc(); .inc();
si.closed.store(true, Ordering::Relaxed); si.closed.store(true, Ordering::Relaxed);
} else { } else {
@ -313,19 +312,20 @@ impl BParticipant {
Channel::new(cid, self.remote_pid); Channel::new(cid, self.remote_pid);
channels.write().await.push(ChannelInfo { channels.write().await.push(ChannelInfo {
cid, cid,
cid_string: cid.to_string(),
b2w_frame_s, b2w_frame_s,
b2r_read_shutdown, b2r_read_shutdown,
}); });
b2s_create_channel_done_s.send(()).unwrap(); b2s_create_channel_done_s.send(()).unwrap();
self.metrics self.metrics
.channels_connected_total .channels_connected_total
.with_label_values(&[&self.remote_pid.to_string()]) .with_label_values(&[&self.remote_pid_string])
.inc(); .inc();
trace!(?cid, "running channel in participant"); trace!(?cid, "running channel in participant");
channel.run(protocol, w2b_frames_s).await; channel.run(protocol, w2b_frames_s).await;
self.metrics self.metrics
.channels_disconnected_total .channels_disconnected_total
.with_label_values(&[&self.remote_pid.to_string()]) .with_label_values(&[&self.remote_pid_string])
.inc(); .inc();
trace!(?cid, "channel got closed"); trace!(?cid, "channel got closed");
} }
@ -345,6 +345,8 @@ impl BParticipant {
self.running_mgr.fetch_add(1, Ordering::Relaxed); self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start open_mgr"); trace!("start open_mgr");
let mut stream_ids = self.offset_sid; let mut stream_ids = self.offset_sid;
let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
let mut shutdown_open_mgr_receiver = shutdown_open_mgr_receiver.fuse(); let mut shutdown_open_mgr_receiver = shutdown_open_mgr_receiver.fuse();
//from api or shutdown signal //from api or shutdown signal
while let Some((prio, promises, p2a_return_stream)) = select! { while let Some((prio, promises, p2a_return_stream)) = select! {
@ -357,11 +359,14 @@ impl BParticipant {
let stream = self let stream = self
.create_stream(sid, prio, promises, a2p_msg_s, &a2b_close_stream_s) .create_stream(sid, prio, promises, a2p_msg_s, &a2b_close_stream_s)
.await; .await;
self.send_frame(Frame::OpenStream { self.send_frame(
sid, Frame::OpenStream {
prio, sid,
promises, prio,
}) promises,
},
&mut send_cache,
)
.await; .await;
p2a_return_stream.send(stream).unwrap(); p2a_return_stream.send(stream).unwrap();
stream_ids += Sid::from(1); stream_ids += Sid::from(1);
@ -429,6 +434,8 @@ impl BParticipant {
) { ) {
self.running_mgr.fetch_add(1, Ordering::Relaxed); self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start stream_close_mgr"); trace!("start stream_close_mgr");
let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
let mut shutdown_stream_close_mgr_receiver = shutdown_stream_close_mgr_receiver.fuse(); let mut shutdown_stream_close_mgr_receiver = shutdown_stream_close_mgr_receiver.fuse();
//from api or shutdown signal //from api or shutdown signal
@ -465,11 +472,12 @@ impl BParticipant {
trace!(?sid, "stream was successfully flushed"); trace!(?sid, "stream was successfully flushed");
self.metrics self.metrics
.streams_closed_total .streams_closed_total
.with_label_values(&[&self.remote_pid.to_string()]) .with_label_values(&[&self.remote_pid_string])
.inc(); .inc();
//only now remove the Stream, that means we can still recv on it. //only now remove the Stream, that means we can still recv on it.
self.streams.write().await.remove(&sid); self.streams.write().await.remove(&sid);
self.send_frame(Frame::CloseStream { sid }).await; self.send_frame(Frame::CloseStream { sid }, &mut send_cache)
.await;
} }
trace!("stop stream_close_mgr"); trace!("stop stream_close_mgr");
self.running_mgr.fetch_sub(1, Ordering::Relaxed); self.running_mgr.fetch_sub(1, Ordering::Relaxed);
@ -493,7 +501,7 @@ impl BParticipant {
}); });
self.metrics self.metrics
.streams_opened_total .streams_opened_total
.with_label_values(&[&self.remote_pid.to_string()]) .with_label_values(&[&self.remote_pid_string])
.inc(); .inc();
Stream::new( Stream::new(
self.remote_pid, self.remote_pid,

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
metrics::NetworkMetrics, metrics::{CidFrameCache, NetworkMetrics},
types::{Cid, Frame, Mid, Pid, Sid}, types::{Cid, Frame, Mid, Pid, Sid},
}; };
use async_std::{ use async_std::{
@ -64,6 +64,7 @@ impl TcpProtocol {
end_receiver: oneshot::Receiver<()>, end_receiver: oneshot::Receiver<()>,
) { ) {
trace!("starting up tcp write()"); trace!("starting up tcp write()");
let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_in_total.clone(), cid);
let mut stream = self.stream.clone(); let mut stream = self.stream.clone();
let mut end_receiver = end_receiver.fuse(); let mut end_receiver = end_receiver.fuse();
loop { loop {
@ -173,10 +174,7 @@ impl TcpProtocol {
Frame::Raw(data) Frame::Raw(data)
}, },
}; };
self.metrics metrics_cache.with_label_values(&frame).inc();
.frames_wire_in_total
.with_label_values(&[&cid.to_string(), frame.get_string()])
.inc();
from_wire_sender.send((cid, frame)).await.unwrap(); from_wire_sender.send((cid, frame)).await.unwrap();
} }
trace!("shutting down tcp read()"); trace!("shutting down tcp read()");
@ -188,12 +186,9 @@ impl TcpProtocol {
pub async fn write(&self, cid: Cid, mut to_wire_receiver: mpsc::UnboundedReceiver<Frame>) { pub async fn write(&self, cid: Cid, mut to_wire_receiver: mpsc::UnboundedReceiver<Frame>) {
trace!("starting up tcp write()"); trace!("starting up tcp write()");
let mut stream = self.stream.clone(); let mut stream = self.stream.clone();
let cid_string = cid.to_string(); let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_out_total.clone(), cid);
while let Some(frame) = to_wire_receiver.next().await { while let Some(frame) = to_wire_receiver.next().await {
self.metrics metrics_cache.with_label_values(&frame).inc();
.frames_wire_out_total
.with_label_values(&[&cid_string, frame.get_string()])
.inc();
match frame { match frame {
Frame::Handshake { Frame::Handshake {
magic_number, magic_number,
@ -296,6 +291,7 @@ impl UdpProtocol {
end_receiver: oneshot::Receiver<()>, end_receiver: oneshot::Receiver<()>,
) { ) {
trace!("starting up udp read()"); trace!("starting up udp read()");
let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_in_total.clone(), cid);
let mut data_in = self.data_in.write().await; let mut data_in = self.data_in.write().await;
let mut end_receiver = end_receiver.fuse(); let mut end_receiver = end_receiver.fuse();
while let Some(bytes) = select! { while let Some(bytes) = select! {
@ -388,10 +384,7 @@ impl UdpProtocol {
}, },
_ => Frame::Raw(bytes), _ => Frame::Raw(bytes),
}; };
self.metrics metrics_cache.with_label_values(&frame).inc();
.frames_wire_in_total
.with_label_values(&[&cid.to_string(), frame.get_string()])
.inc();
from_wire_sender.send((cid, frame)).await.unwrap(); from_wire_sender.send((cid, frame)).await.unwrap();
} }
trace!("shutting down udp read()"); trace!("shutting down udp read()");
@ -400,12 +393,9 @@ impl UdpProtocol {
pub async fn write(&self, cid: Cid, mut to_wire_receiver: mpsc::UnboundedReceiver<Frame>) { pub async fn write(&self, cid: Cid, mut to_wire_receiver: mpsc::UnboundedReceiver<Frame>) {
trace!("starting up udp write()"); trace!("starting up udp write()");
let mut buffer = [0u8; 2000]; let mut buffer = [0u8; 2000];
let cid_string = cid.to_string(); let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_out_total.clone(), cid);
while let Some(frame) = to_wire_receiver.next().await { while let Some(frame) = to_wire_receiver.next().await {
self.metrics metrics_cache.with_label_values(&frame).inc();
.frames_wire_out_total
.with_label_values(&[&cid_string, frame.get_string()])
.inc();
let len = match frame { let len = match frame {
Frame::Handshake { Frame::Handshake {
magic_number, magic_number,

View File

@ -133,6 +133,7 @@ impl Scheduler {
self.listen_mgr(run_channels.a2s_listen_r), self.listen_mgr(run_channels.a2s_listen_r),
self.connect_mgr(run_channels.a2s_connect_r), self.connect_mgr(run_channels.a2s_connect_r),
self.disconnect_mgr(run_channels.a2s_disconnect_r), self.disconnect_mgr(run_channels.a2s_disconnect_r),
self.prio_adj_mgr(),
self.scheduler_shutdown_mgr(run_channels.a2s_scheduler_shutdown_r), self.scheduler_shutdown_mgr(run_channels.a2s_scheduler_shutdown_r),
); );
} }
@ -261,6 +262,12 @@ impl Scheduler {
trace!("stop disconnect_mgr"); trace!("stop disconnect_mgr");
} }
async fn prio_adj_mgr(&self) {
trace!("start prio_adj_mgr");
//TODO adjust prios in participants here!
trace!("stop prio_adj_mgr");
}
async fn scheduler_shutdown_mgr(&self, a2s_scheduler_shutdown_r: oneshot::Receiver<()>) { async fn scheduler_shutdown_mgr(&self, a2s_scheduler_shutdown_r: oneshot::Receiver<()>) {
trace!("start scheduler_shutdown_mgr"); trace!("start scheduler_shutdown_mgr");
a2s_scheduler_shutdown_r.await.unwrap(); a2s_scheduler_shutdown_r.await.unwrap();

View File

@ -89,33 +89,49 @@ pub(crate) enum Frame {
} }
impl Frame { impl Frame {
pub fn get_string(&self) -> &str { pub const fn int_to_string(i: u8) -> &'static str {
match i {
0 => "Handshake",
1 => "ParticipantId",
2 => "Shutdown",
3 => "OpenStream",
4 => "CloseStream",
5 => "DataHeader",
6 => "Data",
7 => "Raw",
_ => "",
}
}
pub fn get_int(&self) -> u8 {
match self { match self {
Frame::Handshake { Frame::Handshake {
magic_number: _, magic_number: _,
version: _, version: _,
} => "Handshake", } => 0,
Frame::ParticipantId { pid: _ } => "ParticipantId", Frame::ParticipantId { pid: _ } => 1,
Frame::Shutdown => "Shutdown", Frame::Shutdown => 2,
Frame::OpenStream { Frame::OpenStream {
sid: _, sid: _,
prio: _, prio: _,
promises: _, promises: _,
} => "OpenStream", } => 3,
Frame::CloseStream { sid: _ } => "CloseStream", Frame::CloseStream { sid: _ } => 4,
Frame::DataHeader { Frame::DataHeader {
mid: _, mid: _,
sid: _, sid: _,
length: _, length: _,
} => "DataHeader", } => 5,
Frame::Data { Frame::Data {
mid: _, mid: _,
start: _, start: _,
data: _, data: _,
} => "Data", } => 6,
Frame::Raw(_) => "Raw", Frame::Raw(_) => 7,
} }
} }
pub fn get_string(&self) -> &str { Self::int_to_string(self.get_int()) }
} }
impl Pid { impl Pid {