diff --git a/Cargo.lock b/Cargo.lock index b2d47b3ea8..226c507cf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4649,6 +4649,7 @@ version = "0.1.0" dependencies = [ "async-std", "bincode", + "crossbeam-channel 0.4.2", "futures 0.3.5", "lazy_static", "prometheus", diff --git a/network/Cargo.toml b/network/Cargo.toml index ff62d67be2..9282a14eba 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -12,6 +12,7 @@ edition = "2018" bincode = "1.2" serde = { version = "1.0" } #sending +crossbeam-channel = "0.4.2" async-std = { version = "~1.5", default-features = false, features = ["std", "async-task", "default"] } #tracing and metrics tracing = { version = "0.1", default-features = false } diff --git a/network/src/api.rs b/network/src/api.rs index 05be3c3f3b..47cee1f713 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -75,7 +75,7 @@ pub struct Stream { mid: Mid, prio: Prio, promises: Promises, - a2b_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutgoingMessage)>, + a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, b2a_msg_recv_r: mpsc::UnboundedReceiver, closed: Arc, a2b_close_stream_s: Option>, @@ -627,7 +627,7 @@ impl Stream { sid: Sid, prio: Prio, promises: Promises, - a2b_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutgoingMessage)>, + a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, b2a_msg_recv_r: mpsc::UnboundedReceiver, closed: Arc, a2b_close_stream_s: mpsc::UnboundedSender, @@ -934,16 +934,16 @@ impl std::fmt::Debug for Participant { } } -impl From> for StreamError { - fn from(_err: std::sync::mpsc::SendError) -> Self { StreamError::StreamClosed } +impl From> for StreamError { + fn from(_err: crossbeam_channel::SendError) -> Self { StreamError::StreamClosed } } -impl From> for ParticipantError { - fn from(_err: std::sync::mpsc::SendError) -> Self { ParticipantError::ParticipantClosed } +impl From> for ParticipantError { + fn from(_err: crossbeam_channel::SendError) -> Self { ParticipantError::ParticipantClosed } } -impl From> for NetworkError { - fn from(_err: std::sync::mpsc::SendError) -> Self { NetworkError::NetworkClosed } +impl From> for NetworkError { + fn from(_err: crossbeam_channel::SendError) -> Self { NetworkError::NetworkClosed } } impl From for NetworkError { diff --git a/network/src/metrics.rs b/network/src/metrics.rs index 83d169642d..cd809c72bc 100644 --- a/network/src/metrics.rs +++ b/network/src/metrics.rs @@ -263,7 +263,7 @@ pub(crate) struct PidCidFrameCache { } impl PidCidFrameCache { - const CACHE_SIZE: usize = 16; + const CACHE_SIZE: usize = 512; pub fn new(metric: IntCounterVec, pid: Pid) -> Self { Self { @@ -275,6 +275,12 @@ impl PidCidFrameCache { fn populate(&mut self, cid: Cid) { let start_cid = self.cache.len(); + if cid >= start_cid as u64 && cid > (Self::CACHE_SIZE as Cid) { + warn!( + ?cid, + "cid, getting quite high, is this a attack on the cache?" + ); + } for i in start_cid..=cid as usize { let cid = (i as Cid).to_string(); let entry = [ @@ -300,12 +306,6 @@ impl PidCidFrameCache { } pub fn with_label_values(&mut self, cid: Cid, frame: &Frame) -> &GenericCounter { - if cid > (Self::CACHE_SIZE as Cid) { - warn!( - ?cid, - "cid, getting quite high, is this a attack on the cache?" - ); - } self.populate(cid); &self.cache[cid as usize][frame.get_int() as usize] } diff --git a/network/src/participant.rs b/network/src/participant.rs index 1221dec4b5..802fbb7759 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -272,7 +272,7 @@ impl BParticipant { mut w2b_frames_r: mpsc::UnboundedReceiver<(Cid, Frame)>, mut b2a_stream_opened_s: mpsc::UnboundedSender, a2b_close_stream_s: mpsc::UnboundedSender, - a2p_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutgoingMessage)>, + a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, ) { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("start handle_frames_mgr"); @@ -450,7 +450,7 @@ impl BParticipant { &self, mut a2b_steam_open_r: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender)>, a2b_close_stream_s: mpsc::UnboundedSender, - a2p_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutgoingMessage)>, + a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, shutdown_open_mgr_receiver: oneshot::Receiver<()>, ) { self.running_mgr.fetch_add(1, Ordering::Relaxed); @@ -548,7 +548,7 @@ impl BParticipant { &self, mut a2b_close_stream_r: mpsc::UnboundedReceiver, shutdown_stream_close_mgr_receiver: oneshot::Receiver<()>, - b2p_notify_empty_stream_s: std::sync::mpsc::Sender<(Sid, oneshot::Sender<()>)>, + b2p_notify_empty_stream_s: crossbeam_channel::Sender<(Sid, oneshot::Sender<()>)>, ) { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("start stream_close_mgr"); @@ -607,7 +607,7 @@ impl BParticipant { sid: Sid, prio: Prio, promises: Promises, - a2p_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutgoingMessage)>, + a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, a2b_close_stream_s: &mpsc::UnboundedSender, ) -> Stream { let (b2a_msg_recv_s, b2a_msg_recv_r) = mpsc::unbounded::(); diff --git a/network/src/prios.rs b/network/src/prios.rs index ae30cf878c..2d9dc9a834 100644 --- a/network/src/prios.rs +++ b/network/src/prios.rs @@ -10,13 +10,11 @@ use crate::{ metrics::NetworkMetrics, types::{Frame, Prio, Sid}, }; +use crossbeam_channel::{unbounded, Receiver, Sender}; use futures::channel::oneshot; use std::{ collections::{HashMap, HashSet, VecDeque}, - sync::{ - mpsc::{channel, Receiver, Sender}, - Arc, - }, + sync::Arc, }; use tracing::*; @@ -60,8 +58,8 @@ impl PrioManager { Sender<(Sid, oneshot::Sender<()>)>, ) { // (a2p_msg_s, a2p_msg_r) - let (messages_tx, messages_rx) = channel(); - let (sid_flushed_tx, sid_flushed_rx) = channel(); + let (messages_tx, messages_rx) = unbounded(); + let (sid_flushed_tx, sid_flushed_rx) = unbounded(); ( Self { points: [0; PRIO_MAX], @@ -318,11 +316,9 @@ mod tests { prios::*, types::{Frame, Pid, Prio, Sid}, }; + use crossbeam_channel::Sender; use futures::{channel::oneshot, executor::block_on}; - use std::{ - collections::VecDeque, - sync::{mpsc::Sender, Arc}, - }; + use std::{collections::VecDeque, sync::Arc}; const SIZE: u64 = PrioManager::FRAME_DATA_SIZE; const USIZE: usize = PrioManager::FRAME_DATA_SIZE as usize;