Merge branch 'xMAC94x/reduce_metrics' into 'master'

reduce load on metrics by ALOT!

See merge request veloren/veloren!1337
This commit is contained in:
Marcel 2020-08-27 00:34:49 +00:00
commit 5bedc1aac9
3 changed files with 75 additions and 65 deletions

View File

@ -181,7 +181,7 @@ impl Handshake {
if let Some(Ok(ref frame)) = frame {
self.metrics
.frames_in_total
.with_label_values(&["", &cid_string, &frame.get_string()])
.with_label_values(&[&cid_string, &frame.get_string()])
.inc();
}
}
@ -216,7 +216,7 @@ impl Handshake {
} else {
debug!("Handshake completed");
if self.init_handshake {
self.send_init(&mut c2w_frame_s, "").await;
self.send_init(&mut c2w_frame_s).await;
} else {
self.send_handshake(&mut c2w_frame_s).await;
}
@ -227,7 +227,7 @@ impl Handshake {
#[cfg(feature = "metrics")]
self.metrics
.frames_in_total
.with_label_values(&["", &cid_string, frame.get_string()])
.with_label_values(&[&cid_string, frame.get_string()])
.inc();
if let Frame::Raw(bytes) = frame {
match std::str::from_utf8(bytes.as_slice()) {
@ -258,16 +258,15 @@ impl Handshake {
let r = match frame {
Some(Ok(Frame::Init { pid, secret })) => {
debug!(?pid, "Participant send their ID");
let pid_string = pid.to_string();
#[cfg(feature = "metrics")]
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, "ParticipantId"])
.with_label_values(&[&cid_string, "ParticipantId"])
.inc();
let stream_id_offset = if self.init_handshake {
STREAM_ID_OFFSET1
} else {
self.send_init(&mut c2w_frame_s, &pid_string).await;
self.send_init(&mut c2w_frame_s).await;
STREAM_ID_OFFSET2
};
info!(?pid, "This Handshake is now configured!");
@ -277,7 +276,7 @@ impl Handshake {
#[cfg(feature = "metrics")]
self.metrics
.frames_in_total
.with_label_values(&["", &cid_string, frame.get_string()])
.with_label_values(&[&cid_string, frame.get_string()])
.inc();
if let Frame::Raw(bytes) = frame {
match std::str::from_utf8(bytes.as_slice()) {
@ -309,7 +308,7 @@ impl Handshake {
#[cfg(feature = "metrics")]
self.metrics
.frames_out_total
.with_label_values(&["", &self.cid.to_string(), "Handshake"])
.with_label_values(&[&self.cid.to_string(), "Handshake"])
.inc();
c2w_frame_s
.send(Frame::Handshake {
@ -323,13 +322,12 @@ impl Handshake {
async fn send_init(
&self,
c2w_frame_s: &mut mpsc::UnboundedSender<Frame>,
#[cfg(feature = "metrics")] pid_string: &str,
#[cfg(not(feature = "metrics"))] _pid_string: &str,
) {
#[cfg(feature = "metrics")]
self.metrics
.frames_out_total
.with_label_values(&[pid_string, &self.cid.to_string(), "ParticipantId"])
.with_label_values(&[&self.cid.to_string(), "ParticipantId"])
.inc();
c2w_frame_s
.send(Frame::Init {
@ -352,11 +350,11 @@ impl Handshake {
let cid_string = self.cid.to_string();
self.metrics
.frames_out_total
.with_label_values(&["", &cid_string, "Raw"])
.with_label_values(&[&cid_string, "Raw"])
.inc();
self.metrics
.frames_out_total
.with_label_values(&["", &cid_string, "Shutdown"])
.with_label_values(&[&cid_string, "Shutdown"])
.inc();
}
c2w_frame_s.send(Frame::Raw(data)).await.unwrap();

View File

@ -6,14 +6,18 @@ use prometheus::{
use std::error::Error;
use tracing::*;
//TODO: switch over to Counter for frames_count, message_count, bytes_send,
// frames_message_count 1 NetworkMetrics per Network
/// 1:1 relation between NetworkMetrics and Network
/// use 2NF here and avoid redundant data like CHANNEL AND PARTICIPANT encoding.
/// as this will cause a matrix that is full of 0 but needs alot of bandwith and
/// storage
#[allow(dead_code)]
pub struct NetworkMetrics {
pub listen_requests_total: IntCounterVec,
pub connect_requests_total: IntCounterVec,
pub participants_connected_total: IntCounter,
pub participants_disconnected_total: IntCounter,
// channel id's, seperated by PARTICIPANT, max 5
pub participants_channel_ids: IntGaugeVec,
// opened Channels, seperated by PARTICIPANT
pub channels_connected_total: IntCounterVec,
pub channels_disconnected_total: IntCounterVec,
@ -71,6 +75,13 @@ impl NetworkMetrics {
"participants_disconnected_total",
"Shows the number of participants disconnected to the network",
))?;
let participants_channel_ids = IntGaugeVec::new(
Opts::new(
"participants_channel_ids",
"Channel numbers belonging to a Participant in the network",
),
&["participant", "no"],
)?;
let channels_connected_total = IntCounterVec::new(
Opts::new(
"channels_connected_total",
@ -116,14 +127,14 @@ impl NetworkMetrics {
"frames_out_total",
"Number of all frames send per channel, at the channel level",
),
&["participant", "channel", "frametype"],
&["channel", "frametype"],
)?;
let frames_in_total = IntCounterVec::new(
Opts::new(
"frames_in_total",
"Number of all frames received per channel, at the channel level",
),
&["participant", "channel", "frametype"],
&["channel", "frametype"],
)?;
let frames_wire_out_total = IntCounterVec::new(
Opts::new(
@ -203,6 +214,7 @@ impl NetworkMetrics {
connect_requests_total,
participants_connected_total,
participants_disconnected_total,
participants_channel_ids,
channels_connected_total,
channels_disconnected_total,
streams_opened_total,
@ -228,6 +240,7 @@ impl NetworkMetrics {
registry.register(Box::new(self.connect_requests_total.clone()))?;
registry.register(Box::new(self.participants_connected_total.clone()))?;
registry.register(Box::new(self.participants_disconnected_total.clone()))?;
registry.register(Box::new(self.participants_channel_ids.clone()))?;
registry.register(Box::new(self.channels_connected_total.clone()))?;
registry.register(Box::new(self.channels_disconnected_total.clone()))?;
registry.register(Box::new(self.streams_opened_total.clone()))?;
@ -266,19 +279,17 @@ pub(crate) struct PidCidFrameCache<T: MetricVecBuilder> {
}
*/
pub(crate) struct PidCidFrameCache {
pub(crate) struct MultiCidFrameCache {
metric: IntCounterVec,
pid: String,
cache: Vec<[GenericCounter<AtomicI64>; Frame::FRAMES_LEN as usize]>,
cache: Vec<[Option<GenericCounter<AtomicI64>>; Frame::FRAMES_LEN as usize]>,
}
impl PidCidFrameCache {
const CACHE_SIZE: usize = 512;
impl MultiCidFrameCache {
const CACHE_SIZE: usize = 2048;
pub fn new(metric: IntCounterVec, pid: Pid) -> Self {
pub fn new(metric: IntCounterVec) -> Self {
Self {
metric,
pid: pid.to_string(),
cache: vec![],
}
}
@ -291,33 +302,22 @@ impl PidCidFrameCache {
"cid, getting quite high, is this a attack on the cache?"
);
}
for i in start_cid..=cid as usize {
let cid = (i as Cid).to_string();
let entry = [
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(0)]),
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(1)]),
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(2)]),
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(3)]),
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(4)]),
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(5)]),
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(6)]),
self.metric
.with_label_values(&[&self.pid, &cid, Frame::int_to_string(7)]),
];
self.cache.push(entry);
}
self.cache.resize((cid + 1) as usize, [
None, None, None, None, None, None, None, None,
]);
}
pub fn with_label_values(&mut self, cid: Cid, frame: &Frame) -> &GenericCounter<AtomicI64> {
self.populate(cid);
&self.cache[cid as usize][frame.get_int() as usize]
let frame_int = frame.get_int() as usize;
let r = &mut self.cache[cid as usize][frame_int];
if r.is_none() {
*r = Some(
self.metric
.with_label_values(&[&cid.to_string(), &frame_int.to_string()]),
);
}
r.as_ref().unwrap()
}
}
@ -361,12 +361,12 @@ mod tests {
}
#[test]
fn pid_cid_frame_cache() {
fn multi_cid_frame_cache() {
let pid = Pid::fake(1);
let frame1 = Frame::Raw(b"Foo".to_vec());
let frame2 = Frame::Raw(b"Bar".to_vec());
let metrics = NetworkMetrics::new(&pid).unwrap();
let mut cache = PidCidFrameCache::new(metrics.frames_in_total, pid);
let mut cache = MultiCidFrameCache::new(metrics.frames_in_total);
let v1 = cache.with_label_values(1, &frame1);
v1.inc();
assert_eq!(v1.get(), 1);

View File

@ -1,5 +1,5 @@
#[cfg(feature = "metrics")]
use crate::metrics::{NetworkMetrics, PidCidFrameCache};
use crate::metrics::{MultiCidFrameCache, NetworkMetrics};
use crate::{
api::{ParticipantError, Stream},
channel::Channel,
@ -205,8 +205,7 @@ impl BParticipant {
let mut closing_up = false;
trace!("Start send_mgr");
#[cfg(feature = "metrics")]
let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone());
let mut i: u64 = 0;
loop {
let mut frames = VecDeque::new();
@ -250,7 +249,7 @@ impl BParticipant {
async fn send_frame(
&self,
frame: Frame,
#[cfg(feature = "metrics")] frames_out_total_cache: &mut PidCidFrameCache,
#[cfg(feature = "metrics")] frames_out_total_cache: &mut MultiCidFrameCache,
) -> bool {
let mut drop_cid = None;
// TODO: find out ideal channel here
@ -345,7 +344,7 @@ impl BParticipant {
let cid_string = cid.to_string();
self.metrics
.frames_in_total
.with_label_values(&[&self.remote_pid_string, &cid_string, frame.get_string()])
.with_label_values(&[&cid_string, frame.get_string()])
.inc();
}
match frame {
@ -490,7 +489,9 @@ impl BParticipant {
let channels = self.channels.clone();
async move {
let (channel, b2w_frame_s, b2r_read_shutdown) = Channel::new(cid);
channels.write().await.insert(
let mut lock = channels.write().await;
let mut channel_no = lock.len();
lock.insert(
cid,
Mutex::new(ChannelInfo {
cid,
@ -499,13 +500,27 @@ impl BParticipant {
b2r_read_shutdown,
}),
);
drop(lock);
b2s_create_channel_done_s.send(()).unwrap();
#[cfg(feature = "metrics")]
self.metrics
.channels_connected_total
.with_label_values(&[&self.remote_pid_string])
.inc();
trace!(?cid, "Running channel in participant");
{
self.metrics
.channels_connected_total
.with_label_values(&[&self.remote_pid_string])
.inc();
if channel_no > 5 {
debug!(?channel_no, "metrics will overwrite channel #5");
channel_no = 5;
}
self.metrics
.participants_channel_ids
.with_label_values(&[
&self.remote_pid_string,
&channel_no.to_string(),
])
.set(cid as i64);
}
trace!(?cid, ?channel_no, "Running channel in participant");
channel
.run(protocol, w2b_frames_s, leftover_cid_frame)
.instrument(tracing::info_span!("", ?cid))
@ -541,8 +556,7 @@ impl BParticipant {
trace!("Start open_mgr");
let mut stream_ids = self.offset_sid;
#[cfg(feature = "metrics")]
let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone());
let mut shutdown_open_mgr_receiver = shutdown_open_mgr_receiver.fuse();
//from api or shutdown signal
while let Some((prio, promises, p2a_return_stream)) = select! {
@ -600,8 +614,7 @@ impl BParticipant {
trace!("Start participant_shutdown_mgr");
let sender = s2b_shutdown_bparticipant_r.await.unwrap();
let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone());
self.close_api(None).await;
@ -687,8 +700,7 @@ impl BParticipant {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("Start stream_close_mgr");
#[cfg(feature = "metrics")]
let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone());
let mut shutdown_stream_close_mgr_receiver = shutdown_stream_close_mgr_receiver.fuse();
//from api or shutdown signal