crossbeam-channel and log spam

- swap out std::mpsc with crossbeam-channel in networking crate
 - remove log spam by only logging when populating a new cache entry and not on every get
This commit is contained in:
Marcel Märtens 2020-07-03 21:55:00 +02:00
parent 1f05446ce6
commit f9895a7800
6 changed files with 27 additions and 29 deletions

1
Cargo.lock generated
View File

@ -4649,6 +4649,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"async-std", "async-std",
"bincode", "bincode",
"crossbeam-channel 0.4.2",
"futures 0.3.5", "futures 0.3.5",
"lazy_static", "lazy_static",
"prometheus", "prometheus",

View File

@ -12,6 +12,7 @@ edition = "2018"
bincode = "1.2" bincode = "1.2"
serde = { version = "1.0" } serde = { version = "1.0" }
#sending #sending
crossbeam-channel = "0.4.2"
async-std = { version = "~1.5", default-features = false, features = ["std", "async-task", "default"] } async-std = { version = "~1.5", default-features = false, features = ["std", "async-task", "default"] }
#tracing and metrics #tracing and metrics
tracing = { version = "0.1", default-features = false } tracing = { version = "0.1", default-features = false }

View File

@ -75,7 +75,7 @@ pub struct Stream {
mid: Mid, mid: Mid,
prio: Prio, prio: Prio,
promises: Promises, promises: Promises,
a2b_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutgoingMessage)>, a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
b2a_msg_recv_r: mpsc::UnboundedReceiver<IncomingMessage>, b2a_msg_recv_r: mpsc::UnboundedReceiver<IncomingMessage>,
closed: Arc<AtomicBool>, closed: Arc<AtomicBool>,
a2b_close_stream_s: Option<mpsc::UnboundedSender<Sid>>, a2b_close_stream_s: Option<mpsc::UnboundedSender<Sid>>,
@ -627,7 +627,7 @@ impl Stream {
sid: Sid, sid: Sid,
prio: Prio, prio: Prio,
promises: Promises, promises: Promises,
a2b_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutgoingMessage)>, a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
b2a_msg_recv_r: mpsc::UnboundedReceiver<IncomingMessage>, b2a_msg_recv_r: mpsc::UnboundedReceiver<IncomingMessage>,
closed: Arc<AtomicBool>, closed: Arc<AtomicBool>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>, a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
@ -934,16 +934,16 @@ impl std::fmt::Debug for Participant {
} }
} }
impl<T> From<std::sync::mpsc::SendError<T>> for StreamError { impl<T> From<crossbeam_channel::SendError<T>> for StreamError {
fn from(_err: std::sync::mpsc::SendError<T>) -> Self { StreamError::StreamClosed } fn from(_err: crossbeam_channel::SendError<T>) -> Self { StreamError::StreamClosed }
} }
impl<T> From<std::sync::mpsc::SendError<T>> for ParticipantError { impl<T> From<crossbeam_channel::SendError<T>> for ParticipantError {
fn from(_err: std::sync::mpsc::SendError<T>) -> Self { ParticipantError::ParticipantClosed } fn from(_err: crossbeam_channel::SendError<T>) -> Self { ParticipantError::ParticipantClosed }
} }
impl<T> From<std::sync::mpsc::SendError<T>> for NetworkError { impl<T> From<crossbeam_channel::SendError<T>> for NetworkError {
fn from(_err: std::sync::mpsc::SendError<T>) -> Self { NetworkError::NetworkClosed } fn from(_err: crossbeam_channel::SendError<T>) -> Self { NetworkError::NetworkClosed }
} }
impl From<async_std::io::Error> for NetworkError { impl From<async_std::io::Error> for NetworkError {

View File

@ -263,7 +263,7 @@ pub(crate) struct PidCidFrameCache {
} }
impl PidCidFrameCache { impl PidCidFrameCache {
const CACHE_SIZE: usize = 16; const CACHE_SIZE: usize = 512;
pub fn new(metric: IntCounterVec, pid: Pid) -> Self { pub fn new(metric: IntCounterVec, pid: Pid) -> Self {
Self { Self {
@ -275,6 +275,12 @@ impl PidCidFrameCache {
fn populate(&mut self, cid: Cid) { fn populate(&mut self, cid: Cid) {
let start_cid = self.cache.len(); let start_cid = self.cache.len();
if cid >= start_cid as u64 && cid > (Self::CACHE_SIZE as Cid) {
warn!(
?cid,
"cid, getting quite high, is this a attack on the cache?"
);
}
for i in start_cid..=cid as usize { for i in start_cid..=cid as usize {
let cid = (i as Cid).to_string(); let cid = (i as Cid).to_string();
let entry = [ let entry = [
@ -300,12 +306,6 @@ impl PidCidFrameCache {
} }
pub fn with_label_values(&mut self, cid: Cid, frame: &Frame) -> &GenericCounter<AtomicI64> { pub fn with_label_values(&mut self, cid: Cid, frame: &Frame) -> &GenericCounter<AtomicI64> {
if cid > (Self::CACHE_SIZE as Cid) {
warn!(
?cid,
"cid, getting quite high, is this a attack on the cache?"
);
}
self.populate(cid); self.populate(cid);
&self.cache[cid as usize][frame.get_int() as usize] &self.cache[cid as usize][frame.get_int() as usize]
} }

View File

@ -272,7 +272,7 @@ impl BParticipant {
mut w2b_frames_r: mpsc::UnboundedReceiver<(Cid, Frame)>, mut w2b_frames_r: mpsc::UnboundedReceiver<(Cid, Frame)>,
mut b2a_stream_opened_s: mpsc::UnboundedSender<Stream>, mut b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>, a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
a2p_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutgoingMessage)>, a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
) { ) {
self.running_mgr.fetch_add(1, Ordering::Relaxed); self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start handle_frames_mgr"); trace!("start handle_frames_mgr");
@ -450,7 +450,7 @@ impl BParticipant {
&self, &self,
mut a2b_steam_open_r: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender<Stream>)>, mut a2b_steam_open_r: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender<Stream>)>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>, a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
a2p_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutgoingMessage)>, a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
shutdown_open_mgr_receiver: oneshot::Receiver<()>, shutdown_open_mgr_receiver: oneshot::Receiver<()>,
) { ) {
self.running_mgr.fetch_add(1, Ordering::Relaxed); self.running_mgr.fetch_add(1, Ordering::Relaxed);
@ -548,7 +548,7 @@ impl BParticipant {
&self, &self,
mut a2b_close_stream_r: mpsc::UnboundedReceiver<Sid>, mut a2b_close_stream_r: mpsc::UnboundedReceiver<Sid>,
shutdown_stream_close_mgr_receiver: oneshot::Receiver<()>, shutdown_stream_close_mgr_receiver: oneshot::Receiver<()>,
b2p_notify_empty_stream_s: std::sync::mpsc::Sender<(Sid, oneshot::Sender<()>)>, b2p_notify_empty_stream_s: crossbeam_channel::Sender<(Sid, oneshot::Sender<()>)>,
) { ) {
self.running_mgr.fetch_add(1, Ordering::Relaxed); self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start stream_close_mgr"); trace!("start stream_close_mgr");
@ -607,7 +607,7 @@ impl BParticipant {
sid: Sid, sid: Sid,
prio: Prio, prio: Prio,
promises: Promises, promises: Promises,
a2p_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutgoingMessage)>, a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
a2b_close_stream_s: &mpsc::UnboundedSender<Sid>, a2b_close_stream_s: &mpsc::UnboundedSender<Sid>,
) -> Stream { ) -> Stream {
let (b2a_msg_recv_s, b2a_msg_recv_r) = mpsc::unbounded::<IncomingMessage>(); let (b2a_msg_recv_s, b2a_msg_recv_r) = mpsc::unbounded::<IncomingMessage>();

View File

@ -10,13 +10,11 @@ use crate::{
metrics::NetworkMetrics, metrics::NetworkMetrics,
types::{Frame, Prio, Sid}, types::{Frame, Prio, Sid},
}; };
use crossbeam_channel::{unbounded, Receiver, Sender};
use futures::channel::oneshot; use futures::channel::oneshot;
use std::{ use std::{
collections::{HashMap, HashSet, VecDeque}, collections::{HashMap, HashSet, VecDeque},
sync::{ sync::Arc,
mpsc::{channel, Receiver, Sender},
Arc,
},
}; };
use tracing::*; use tracing::*;
@ -60,8 +58,8 @@ impl PrioManager {
Sender<(Sid, oneshot::Sender<()>)>, Sender<(Sid, oneshot::Sender<()>)>,
) { ) {
// (a2p_msg_s, a2p_msg_r) // (a2p_msg_s, a2p_msg_r)
let (messages_tx, messages_rx) = channel(); let (messages_tx, messages_rx) = unbounded();
let (sid_flushed_tx, sid_flushed_rx) = channel(); let (sid_flushed_tx, sid_flushed_rx) = unbounded();
( (
Self { Self {
points: [0; PRIO_MAX], points: [0; PRIO_MAX],
@ -318,11 +316,9 @@ mod tests {
prios::*, prios::*,
types::{Frame, Pid, Prio, Sid}, types::{Frame, Pid, Prio, Sid},
}; };
use crossbeam_channel::Sender;
use futures::{channel::oneshot, executor::block_on}; use futures::{channel::oneshot, executor::block_on};
use std::{ use std::{collections::VecDeque, sync::Arc};
collections::VecDeque,
sync::{mpsc::Sender, Arc},
};
const SIZE: u64 = PrioManager::FRAME_DATA_SIZE; const SIZE: u64 = PrioManager::FRAME_DATA_SIZE;
const USIZE: usize = PrioManager::FRAME_DATA_SIZE as usize; const USIZE: usize = PrioManager::FRAME_DATA_SIZE as usize;