From 7f17d6cfd1e990a93619e5f0452699aef6f3e08c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Thu, 25 Mar 2021 12:22:31 +0100 Subject: [PATCH 1/4] network scheduler and rawmsg cleanup --- network/benches/speed.rs | 4 +-- network/examples/network-speed/main.rs | 2 +- network/src/api.rs | 20 ++++++++--- network/src/lib.rs | 2 +- network/src/message.rs | 48 ++++++++------------------ network/src/metrics.rs | 26 ++++++++++++++ network/src/participant.rs | 2 +- network/src/scheduler.rs | 37 ++++---------------- server/src/client.rs | 41 ++++++++++++++++------ 9 files changed, 97 insertions(+), 85 deletions(-) diff --git a/network/benches/speed.rs b/network/benches/speed.rs index 6216d4a7a4..b110d308ae 100644 --- a/network/benches/speed.rs +++ b/network/benches/speed.rs @@ -3,11 +3,11 @@ use std::{net::SocketAddr, sync::Arc}; use tokio::{runtime::Runtime, sync::Mutex}; use veloren_network::{Message, Network, Participant, Pid, Promises, ProtocolAddr, Stream}; -fn serialize(data: &[u8], stream: &Stream) { let _ = Message::serialize(data, &stream); } +fn serialize(data: &[u8], stream: &Stream) { let _ = Message::serialize(data, stream.params()); } async fn stream_msg(s1_a: Arc>, s1_b: Arc>, data: &[u8], cnt: usize) { let mut s1_b = s1_b.lock().await; - let m = Message::serialize(&data, &s1_b); + let m = Message::serialize(&data, s1_b.params()); std::thread::spawn(move || { let mut s1_a = s1_a.try_lock().unwrap(); for _ in 0..cnt { diff --git a/network/examples/network-speed/main.rs b/network/examples/network-speed/main.rs index d67e1a6c4b..e058aac7a8 100644 --- a/network/examples/network-speed/main.rs +++ b/network/examples/network-speed/main.rs @@ -173,7 +173,7 @@ fn client(address: ProtocolAddr, runtime: Arc) { id, data: vec![0; 1000], }, - &s1, + s1.params(), ); loop { s1.send_raw(&raw_msg).unwrap(); diff --git a/network/src/api.rs b/network/src/api.rs index f8ab01602c..5379e431c3 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -119,6 +119,12 @@ pub enum StreamError { Deserialize(bincode::Error), } +/// All Parameters of a Stream, can be used to generate RawMessages +#[derive(Debug, Clone)] +pub struct StreamParams { + pub(crate) promises: Promises, +} + /// Use the `Network` to create connections to other [`Participants`] /// /// The `Network` is the single source that handles all connections in your @@ -803,7 +809,7 @@ impl Stream { /// [`Serialized`]: Serialize #[inline] pub fn send(&mut self, msg: M) -> Result<(), StreamError> { - self.send_raw(&Message::serialize(&msg, &self)) + self.send_raw(&Message::serialize(&msg, self.params())) } /// This methods give the option to skip multiple calls of [`bincode`] and @@ -837,7 +843,7 @@ impl Stream { /// let mut stream_b = participant_b.opened().await?; /// /// //Prepare Message and decode it - /// let msg = Message::serialize("Hello World", &stream_a); + /// let msg = Message::serialize("Hello World", stream_a.params()); /// //Send same Message to multiple Streams /// stream_a.send_raw(&msg); /// stream_b.send_raw(&msg); @@ -858,7 +864,7 @@ impl Stream { return Err(StreamError::StreamClosed); } #[cfg(debug_assertions)] - message.verify(&self); + message.verify(self.params()); self.a2b_msg_s.send((self.sid, message.data.clone()))?; Ok(()) } @@ -999,7 +1005,7 @@ impl Stream { Message { data, #[cfg(feature = "compression")] - compressed: self.promises().contains(Promises::COMPRESSED), + compressed: self.promises.contains(Promises::COMPRESSED), } .deserialize()?, )), @@ -1013,7 +1019,11 @@ impl Stream { } } - pub fn promises(&self) -> Promises { self.promises } + pub fn params(&self) -> StreamParams { + StreamParams { + promises: self.promises, + } + } } impl core::cmp::PartialEq for Participant { diff --git a/network/src/lib.rs b/network/src/lib.rs index b79266e6d0..607eed9d91 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -108,7 +108,7 @@ mod trace; pub use api::{ Network, NetworkConnectError, NetworkError, Participant, ParticipantError, ProtocolAddr, - Stream, StreamError, + Stream, StreamError, StreamParams, }; pub use message::Message; pub use network_protocol::{InitProtocolError, Pid, Promises}; diff --git a/network/src/message.rs b/network/src/message.rs index 516fe06ddd..cbee704a90 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -1,4 +1,4 @@ -use crate::api::{Stream, StreamError}; +use crate::api::{StreamError, StreamParams}; use bytes::Bytes; #[cfg(feature = "compression")] use network_protocol::Promises; @@ -36,12 +36,12 @@ impl Message { /// [`Message::serialize`]: crate::message::Message::serialize /// /// [`Streams`]: crate::api::Stream - pub fn serialize(message: &M, stream: &Stream) -> Self { + pub fn serialize(message: &M, stream_params: StreamParams) -> Self { //this will never fail: https://docs.rs/bincode/0.8.0/bincode/fn.serialize.html let serialized_data = bincode::serialize(message).unwrap(); #[cfg(feature = "compression")] - let compressed = stream.promises().contains(Promises::COMPRESSED); + let compressed = stream_params.promises.contains(Promises::COMPRESSED); #[cfg(feature = "compression")] let data = if compressed { let mut compressed_data = Vec::with_capacity(serialized_data.len() / 4 + 10); @@ -54,7 +54,7 @@ impl Message { #[cfg(not(feature = "compression"))] let data = serialized_data; #[cfg(not(feature = "compression"))] - let _stream = stream; + let _stream_params = stream_params; Self { data: Bytes::from(data), @@ -127,13 +127,13 @@ impl Message { } #[cfg(debug_assertions)] - pub(crate) fn verify(&self, stream: &Stream) { + pub(crate) fn verify(&self, params: StreamParams) { #[cfg(not(feature = "compression"))] let _stream = stream; #[cfg(feature = "compression")] - if self.compressed != stream.promises().contains(Promises::COMPRESSED) { + if self.compressed != params.promises.contains(Promises::COMPRESSED) { warn!( - ?stream, + ?params, "verify failed, msg is {} and it doesn't match with stream", self.compressed ); } @@ -171,14 +171,9 @@ pub(crate) fn partial_eq_bincode(first: &bincode::ErrorKind, second: &bincode::E #[cfg(test)] mod tests { - use crate::{api::Stream, message::*}; - use std::sync::{atomic::AtomicBool, Arc}; - use tokio::sync::mpsc; - - fn stub_stream(compressed: bool) -> Stream { - use crate::api::*; - use network_protocol::*; + use crate::{api::StreamParams, message::*}; + fn stub_stream(compressed: bool) -> StreamParams { #[cfg(feature = "compression")] let promises = if compressed { Promises::COMPRESSED @@ -189,27 +184,12 @@ mod tests { #[cfg(not(feature = "compression"))] let promises = Promises::empty(); - let (a2b_msg_s, _a2b_msg_r) = crossbeam_channel::unbounded(); - let (_b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded(); - let (a2b_close_stream_s, _a2b_close_stream_r) = mpsc::unbounded_channel(); - - Stream::new( - Pid::fake(0), - Pid::fake(1), - Sid::new(0), - 0u8, - promises, - 1_000_000, - Arc::new(AtomicBool::new(true)), - a2b_msg_s, - b2a_msg_recv_r, - a2b_close_stream_s, - ) + StreamParams { promises } } #[test] fn serialize_test() { - let msg = Message::serialize("abc", &stub_stream(false)); + let msg = Message::serialize("abc", stub_stream(false)); assert_eq!(msg.data.len(), 11); assert_eq!(msg.data[0], 3); assert_eq!(msg.data[1..7], [0, 0, 0, 0, 0, 0]); @@ -221,7 +201,7 @@ mod tests { #[cfg(feature = "compression")] #[test] fn serialize_compress_small() { - let msg = Message::serialize("abc", &stub_stream(true)); + let msg = Message::serialize("abc", stub_stream(true)); assert_eq!(msg.data.len(), 12); assert_eq!(msg.data[0], 176); assert_eq!(msg.data[1], 3); @@ -245,7 +225,7 @@ mod tests { 0, "assets/data/plants/flowers/greenrose.ron", ); - let msg = Message::serialize(&msg, &stub_stream(true)); + let msg = Message::serialize(&msg, stub_stream(true)); assert_eq!(msg.data.len(), 79); assert_eq!(msg.data[0], 34); assert_eq!(msg.data[1], 5); @@ -275,7 +255,7 @@ mod tests { _ => {}, } } - let msg = Message::serialize(&msg, &stub_stream(true)); + let msg = Message::serialize(&msg, stub_stream(true)); assert_eq!(msg.data.len(), 1331); } } diff --git a/network/src/metrics.rs b/network/src/metrics.rs index d1b77d76d0..a5874a4b86 100644 --- a/network/src/metrics.rs +++ b/network/src/metrics.rs @@ -1,3 +1,4 @@ +use crate::api::ProtocolAddr; use network_protocol::{Cid, Pid}; #[cfg(feature = "metrics")] use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; @@ -151,6 +152,27 @@ impl NetworkMetrics { .with_label_values(&[remote_p]) .inc(); } + + pub(crate) fn listen_request(&self, protocol: &ProtocolAddr) { + self.listen_requests_total + .with_label_values(&[protocol_name(protocol)]) + .inc(); + } + + pub(crate) fn connect_request(&self, protocol: &ProtocolAddr) { + self.connect_requests_total + .with_label_values(&[protocol_name(protocol)]) + .inc(); + } +} + +#[cfg(feature = "metrics")] +fn protocol_name(protocol: &ProtocolAddr) -> &str { + match protocol { + ProtocolAddr::Tcp(_) => "tcp", + ProtocolAddr::Udp(_) => "udp", + ProtocolAddr::Mpsc(_) => "mpsc", + } } #[cfg(not(feature = "metrics"))] @@ -164,6 +186,10 @@ impl NetworkMetrics { pub(crate) fn streams_opened(&self, _remote_p: &str) {} pub(crate) fn streams_closed(&self, _remote_p: &str) {} + + pub(crate) fn listen_request(&self, _protocol: &ProtocolAddr) {} + + pub(crate) fn connect_request(&self, _protocol: &ProtocolAddr) {} } impl std::fmt::Debug for NetworkMetrics { diff --git a/network/src/participant.rs b/network/src/participant.rs index c0276a4ad2..9e8c496172 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -881,7 +881,7 @@ mod tests { .unwrap(); let stream = runtime.block_on(b2a_stream_opened_r.recv()).unwrap(); - assert_eq!(stream.promises(), Promises::ORDERED); + assert_eq!(stream.params().promises, Promises::ORDERED); let (s, r) = oneshot::channel(); runtime.block_on(async { diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index f711287d38..90becb0793 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -177,15 +177,7 @@ impl Scheduler { async move { debug!(?address, "Got request to open a channel_creator"); - #[cfg(feature = "metrics")] - self.metrics - .listen_requests_total - .with_label_values(&[match address { - ProtocolAddr::Tcp(_) => "tcp", - ProtocolAddr::Udp(_) => "udp", - ProtocolAddr::Mpsc(_) => "mpsc", - }]) - .inc(); + self.metrics.listen_request(&address); let (end_sender, end_receiver) = oneshot::channel::<()>(); self.channel_listener .lock() @@ -202,13 +194,11 @@ impl Scheduler { async fn connect_mgr(&self, mut a2s_connect_r: mpsc::UnboundedReceiver) { trace!("Start connect_mgr"); while let Some((addr, pid_sender)) = a2s_connect_r.recv().await { - let (protocol, cid, handshake) = match addr { + let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed); + let metrics = Arc::clone(&self.protocol_metrics); + self.metrics.connect_request(&addr); + let (protocol, handshake) = match addr { ProtocolAddr::Tcp(addr) => { - #[cfg(feature = "metrics")] - self.metrics - .connect_requests_total - .with_label_values(&["tcp"]) - .inc(); let stream = match net::TcpStream::connect(addr).await { Ok(stream) => stream, Err(e) => { @@ -216,13 +206,8 @@ impl Scheduler { continue; }, }; - let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed); info!("Connecting Tcp to: {}", stream.peer_addr().unwrap()); - ( - Protocols::new_tcp(stream, cid, Arc::clone(&self.protocol_metrics)), - cid, - false, - ) + (Protocols::new_tcp(stream, cid, metrics), false) }, ProtocolAddr::Mpsc(addr) => { let mpsc_s = match MPSC_POOL.lock().await.get(&addr) { @@ -244,17 +229,9 @@ impl Scheduler { .send((remote_to_local_s, local_to_remote_oneshot_s)) .unwrap(); let local_to_remote_s = local_to_remote_oneshot_r.await.unwrap(); - - let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed); info!(?addr, "Connecting Mpsc"); ( - Protocols::new_mpsc( - local_to_remote_s, - remote_to_local_r, - cid, - Arc::clone(&self.protocol_metrics), - ), - cid, + Protocols::new_mpsc(local_to_remote_s, remote_to_local_r, cid, metrics), false, ) }, diff --git a/server/src/client.rs b/server/src/client.rs index 2db4c6d43c..9e03187278 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,5 +1,5 @@ use common_net::msg::{ClientType, ServerGeneral, ServerMsg}; -use network::{Message, Participant, Stream, StreamError}; +use network::{Message, Participant, Stream, StreamError, StreamParams}; use serde::{de::DeserializeOwned, Serialize}; use specs::Component; use specs_idvs::IdvStorage; @@ -26,6 +26,13 @@ pub struct Client { character_screen_stream: Mutex, in_game_stream: Mutex, terrain_stream: Mutex, + + general_stream_params: StreamParams, + ping_stream_params: StreamParams, + register_stream_params: StreamParams, + character_screen_stream_params: StreamParams, + in_game_stream_params: StreamParams, + terrain_stream_params: StreamParams, } pub struct PreparedMsg { @@ -50,6 +57,12 @@ impl Client { in_game_stream: Stream, terrain_stream: Stream, ) -> Self { + let general_stream_params = general_stream.params(); + let ping_stream_params = ping_stream.params(); + let register_stream_params = register_stream.params(); + let character_screen_stream_params = character_screen_stream.params(); + let in_game_stream_params = in_game_stream.params(); + let terrain_stream_params = terrain_stream.params(); Client { client_type, participant: Some(participant), @@ -62,6 +75,12 @@ impl Client { character_screen_stream: Mutex::new(character_screen_stream), in_game_stream: Mutex::new(in_game_stream), terrain_stream: Mutex::new(terrain_stream), + general_stream_params, + ping_stream_params, + register_stream_params, + character_screen_stream_params, + in_game_stream_params, + terrain_stream_params, } } @@ -138,9 +157,9 @@ impl Client { pub(crate) fn prepare>(&self, msg: M) -> PreparedMsg { match msg.into() { - ServerMsg::Info(m) => PreparedMsg::new(0, &m, &self.register_stream), - ServerMsg::Init(m) => PreparedMsg::new(0, &m, &self.register_stream), - ServerMsg::RegisterAnswer(m) => PreparedMsg::new(0, &m, &self.register_stream), + ServerMsg::Info(m) => PreparedMsg::new(0, &m, &self.register_stream_params), + ServerMsg::Init(m) => PreparedMsg::new(0, &m, &self.register_stream_params), + ServerMsg::RegisterAnswer(m) => PreparedMsg::new(0, &m, &self.register_stream_params), ServerMsg::General(g) => { match g { //Character Screen related @@ -149,7 +168,7 @@ impl Client { | ServerGeneral::CharacterActionError(_) | ServerGeneral::CharacterCreated(_) | ServerGeneral::CharacterSuccess => { - PreparedMsg::new(1, &g, &self.character_screen_stream) + PreparedMsg::new(1, &g, &self.character_screen_stream_params) }, //Ingame related ServerGeneral::GroupUpdate(_) @@ -164,12 +183,12 @@ impl Client { | ServerGeneral::SiteEconomy(_) | ServerGeneral::UpdatePendingTrade(_, _, _) | ServerGeneral::FinishedTrade(_) => { - PreparedMsg::new(2, &g, &self.in_game_stream) + PreparedMsg::new(2, &g, &self.in_game_stream_params) }, //Ingame related, terrain ServerGeneral::TerrainChunkUpdate { .. } | ServerGeneral::TerrainBlockUpdates(_) => { - PreparedMsg::new(5, &g, &self.terrain_stream) + PreparedMsg::new(5, &g, &self.terrain_stream_params) }, // Always possible ServerGeneral::PlayerListUpdate(_) @@ -183,11 +202,11 @@ impl Client { | ServerGeneral::DeleteEntity(_) | ServerGeneral::Disconnect(_) | ServerGeneral::Notification(_) => { - PreparedMsg::new(3, &g, &self.general_stream) + PreparedMsg::new(3, &g, &self.general_stream_params) }, } }, - ServerMsg::Ping(m) => PreparedMsg::new(4, &m, &self.ping_stream), + ServerMsg::Ping(m) => PreparedMsg::new(4, &m, &self.ping_stream_params), } } @@ -209,10 +228,10 @@ impl Client { } impl PreparedMsg { - fn new(id: u8, msg: &M, stream: &Mutex) -> PreparedMsg { + fn new(id: u8, msg: &M, stream_params: &StreamParams) -> PreparedMsg { Self { stream_id: id, - message: Message::serialize(&msg, &stream.lock().unwrap()), + message: Message::serialize(&msg, stream_params.clone()), } } } From 85023119724dda152236ab40ceeb7fb94c65501a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Thu, 25 Mar 2021 16:27:13 +0100 Subject: [PATCH 2/4] preparation for multiple-channel participants. When a stream is opened we are searching for the best (currently) available channel. The stream will then be keept on that channel. Adjusted the rest of the algorithms that they now respect this rule. improved a HashMap for Pids to be based on a Vec. Also using this for Sid -> Cid relation which is more performance critical WARN: our current send()? error handling allows it for some close_stream messages to get lost. --- network/protocol/src/mpsc.rs | 12 ++- network/protocol/src/tcp.rs | 11 ++- network/protocol/src/types.rs | 2 +- network/src/lib.rs | 2 +- network/src/participant.rs | 144 ++++++++++++++++++++++++---------- network/src/trace.rs | 46 ----------- network/src/util.rs | 117 +++++++++++++++++++++++++++ 7 files changed, 243 insertions(+), 91 deletions(-) delete mode 100644 network/src/trace.rs create mode 100644 network/src/util.rs diff --git a/network/protocol/src/mpsc.rs b/network/protocol/src/mpsc.rs index 5a00f27209..76969aa0be 100644 --- a/network/protocol/src/mpsc.rs +++ b/network/protocol/src/mpsc.rs @@ -6,7 +6,7 @@ use crate::{ frame::InitFrame, handshake::{ReliableDrain, ReliableSink}, metrics::ProtocolMetricCache, - types::Bandwidth, + types::{Bandwidth, Promises}, RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink, }; use async_trait::async_trait; @@ -57,6 +57,16 @@ where metrics, } } + + /// returns all promises that this Protocol can take care of + /// If you open a Stream anyway, unsupported promises are ignored. + pub fn supported_promises() -> Promises { + Promises::ORDERED + | Promises::CONSISTENCY + | Promises::GUARANTEED_DELIVERY + | Promises::COMPRESSED + | Promises::ENCRYPTED /*assume a direct mpsc connection is secure*/ + } } impl MpscRecvProtocol diff --git a/network/protocol/src/tcp.rs b/network/protocol/src/tcp.rs index c944674ad6..315fd0d076 100644 --- a/network/protocol/src/tcp.rs +++ b/network/protocol/src/tcp.rs @@ -6,7 +6,7 @@ use crate::{ message::{ITMessage, ALLOC_BLOCK}, metrics::{ProtocolMetricCache, RemoveReason}, prio::PrioManager, - types::{Bandwidth, Mid, Sid}, + types::{Bandwidth, Mid, Promises, Sid}, RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink, }; use async_trait::async_trait; @@ -70,6 +70,15 @@ where metrics, } } + + /// returns all promises that this Protocol can take care of + /// If you open a Stream anyway, unsupported promises are ignored. + pub fn supported_promises() -> Promises { + Promises::ORDERED + | Promises::CONSISTENCY + | Promises::GUARANTEED_DELIVERY + | Promises::COMPRESSED + } } impl TcpRecvProtocol diff --git a/network/protocol/src/types.rs b/network/protocol/src/types.rs index 0037795314..dfc9142f38 100644 --- a/network/protocol/src/types.rs +++ b/network/protocol/src/types.rs @@ -65,7 +65,7 @@ pub struct Pid { /// Unique ID per Stream, in one Channel. /// one side will always start with 0, while the other start with u64::MAX / 2. /// number increases for each created Stream. -#[derive(PartialEq, Eq, Hash, Clone, Copy)] +#[derive(PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)] pub struct Sid { internal: u64, } diff --git a/network/src/lib.rs b/network/src/lib.rs index 607eed9d91..448b50f41c 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -104,7 +104,7 @@ mod message; mod metrics; mod participant; mod scheduler; -mod trace; +mod util; pub use api::{ Network, NetworkConnectError, NetworkError, Participant, ParticipantError, ProtocolAddr, diff --git a/network/src/participant.rs b/network/src/participant.rs index 9e8c496172..c1b350c817 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -2,7 +2,7 @@ use crate::{ api::{ParticipantError, Stream}, channel::{Protocols, RecvProtocols, SendProtocols}, metrics::NetworkMetrics, - trace::DeferredTracer, + util::{DeferredTracer, SortedVec}, }; use bytes::Bytes; use futures_util::{FutureExt, StreamExt}; @@ -137,7 +137,7 @@ impl BParticipant { let (b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_r) = async_channel::unbounded::(); let (b2b_notify_send_of_recv_s, b2b_notify_send_of_recv_r) = - crossbeam_channel::unbounded::(); + crossbeam_channel::unbounded::<(Cid, ProtocolEvent)>(); let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded_channel::(); let (a2b_msg_s, a2b_msg_r) = crossbeam_channel::unbounded::<(Sid, Bytes)>(); @@ -180,6 +180,28 @@ impl BParticipant { ); } + fn best_protocol(all: &SortedVec, promises: Promises) -> Option { + // check for mpsc + for (cid, p) in all.data.iter() { + if matches!(p, SendProtocols::Mpsc(_)) { + return Some(*cid); + } + } + // check for tcp + if network_protocol::TcpSendProtocol::::supported_promises() + == promises + { + for (cid, p) in all.data.iter() { + if matches!(p, SendProtocols::Tcp(_)) { + return Some(*cid); + } + } + } + + warn!("couldn't satisfy promises"); + all.data.first().map(|(c, _)| *c) + } + //TODO: local stream_cid: HashMap to know the respective protocol #[allow(clippy::too_many_arguments)] async fn send_mgr( @@ -189,18 +211,18 @@ impl BParticipant { a2b_msg_r: crossbeam_channel::Receiver<(Sid, Bytes)>, mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, SendProtocols)>, b2b_close_send_protocol_r: async_channel::Receiver, - b2b_notify_send_of_recv_r: crossbeam_channel::Receiver, + b2b_notify_send_of_recv_r: crossbeam_channel::Receiver<(Cid, ProtocolEvent)>, _b2s_prio_statistic_s: mpsc::UnboundedSender, ) { - let mut send_protocols: HashMap = HashMap::new(); + let mut sorted_send_protocols = SortedVec::::default(); + let mut sorted_stream_protocols = SortedVec::::default(); let mut interval = tokio::time::interval(Self::TICK_TIME); let mut last_instant = Instant::now(); let mut stream_ids = self.offset_sid; trace!("workaround, actively wait for first protocol"); - b2b_add_protocol_r - .recv() - .await - .map(|(c, p)| send_protocols.insert(c, p)); + if let Some((c, p)) = b2b_add_protocol_r.recv().await { + sorted_send_protocols.insert(c, p) + } loop { let (open, close, _, addp, remp) = select!( Some(n) = a2b_open_stream_r.recv().fuse() => (Some(n), None, None, None, None), @@ -210,25 +232,29 @@ impl BParticipant { Ok(n) = b2b_close_send_protocol_r.recv().fuse() => (None, None, None, None, Some(n)), ); - addp.map(|(cid, p)| { + if let Some((cid, p)) = addp { debug!(?cid, "add protocol"); - send_protocols.insert(cid, p) - }); + sorted_send_protocols.insert(cid, p); + } - let (cid, active) = match send_protocols.iter_mut().next() { - Some((cid, a)) => (*cid, a), - None => { - warn!("no channel"); - tokio::time::sleep(Self::TICK_TIME * 1000).await; //TODO: failover - continue; - }, - }; + //verify that we have at LEAST 1 channel before continuing + if sorted_send_protocols.data.is_empty() { + warn!("no channel"); + tokio::time::sleep(Self::TICK_TIME * 1000).await; //TODO: failover + continue; + } + + //let (cid, active) = sorted_send_protocols.data.iter_mut().next().unwrap(); + //used for error handling + let mut cid = u64::MAX; let active_err = async { if let Some((prio, promises, guaranteed_bandwidth, return_s)) = open { let sid = stream_ids; - trace!(?sid, "open stream"); stream_ids += Sid::from(1); + cid = Self::best_protocol(&sorted_send_protocols, promises).unwrap(); + trace!(?sid, ?cid, "open stream"); + let stream = self .create_stream(sid, prio, promises, guaranteed_bandwidth) .await; @@ -240,34 +266,62 @@ impl BParticipant { guaranteed_bandwidth, }; + sorted_stream_protocols.insert(sid, cid); return_s.send(stream).unwrap(); - active.send(event).await?; + sorted_send_protocols + .get_mut(&cid) + .unwrap() + .send(event) + .await?; } // process recv content first let mut closeevents = b2b_notify_send_of_recv_r .try_iter() - .map(|e| { - if matches!(e, ProtocolEvent::OpenStream { .. }) { - active.notify_from_recv(e); + .map(|(cid, e)| match e { + ProtocolEvent::OpenStream { sid, .. } => { + match sorted_send_protocols.get_mut(&cid) { + Some(p) => { + sorted_stream_protocols.insert(sid, cid); + p.notify_from_recv(e); + }, + None => { + warn!(?cid, "couldn't notify create protocol, doesn't exist") + }, + }; None - } else { - Some(e) - } + }, + e => Some((cid, e)), }) .collect::>(); // get all messages and assign it to a channel for (sid, buffer) in a2b_msg_r.try_iter() { - active - .send(ProtocolEvent::Message { data: buffer, sid }) - .await? + cid = *sorted_stream_protocols.get(&sid).unwrap(); + let event = ProtocolEvent::Message { data: buffer, sid }; + sorted_send_protocols + .get_mut(&cid) + .unwrap() + .send(event) + .await?; } // process recv content afterwards + //TODO: this might get skipped when a send msg fails on another channel in the + // previous line let _ = closeevents.drain(..).map(|e| { - if let Some(e) = e { - active.notify_from_recv(e); + if let Some((cid, e)) = e { + match sorted_send_protocols.get_mut(&cid) { + Some(p) => { + if let ProtocolEvent::OpenStream { sid, .. } = e { + let _ = sorted_stream_protocols.delete(&sid); + p.notify_from_recv(e); + } else { + unreachable!("we dont send other over this channel"); + } + }, + None => warn!(?cid, "couldn't notify close protocol, doesn't exist"), + }; } }); @@ -276,13 +330,21 @@ impl BParticipant { self.delete_stream(sid).await; // Fire&Forget the protocol will take care to verify that this Frame is delayed // till the last msg was received! - active.send(ProtocolEvent::CloseStream { sid }).await?; + cid = sorted_stream_protocols.delete(&sid).unwrap(); + let event = ProtocolEvent::CloseStream { sid }; + sorted_send_protocols + .get_mut(&cid) + .unwrap() + .send(event) + .await?; } let send_time = Instant::now(); let diff = send_time.duration_since(last_instant); last_instant = send_time; - active.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it. + for (_, p) in sorted_send_protocols.data.iter_mut() { + p.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it. + } let r: Result<(), network_protocol::ProtocolError> = Ok(()); r } @@ -292,16 +354,16 @@ impl BParticipant { // remote recv will now fail, which will trigger remote send which will trigger // recv trace!("TODO: for now decide to FAIL this participant and not wait for a failover"); - send_protocols.remove(&cid).unwrap(); + sorted_send_protocols.delete(&cid).unwrap(); self.metrics.channels_disconnected(&self.remote_pid_string); - if send_protocols.is_empty() { + if sorted_send_protocols.data.is_empty() { break; } } if let Some(cid) = remp { debug!(?cid, "remove protocol"); - match send_protocols.remove(&cid) { + match sorted_send_protocols.delete(&cid) { Some(mut prot) => { self.metrics.channels_disconnected(&self.remote_pid_string); trace!("blocking flush"); @@ -311,7 +373,7 @@ impl BParticipant { }, None => trace!("tried to remove protocol twice"), }; - if send_protocols.is_empty() { + if sorted_send_protocols.data.is_empty() { break; } } @@ -330,7 +392,7 @@ impl BParticipant { mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, RecvProtocols)>, b2b_force_close_recv_protocol_r: async_channel::Receiver, b2b_close_send_protocol_s: async_channel::Sender, - b2b_notify_send_of_recv_s: crossbeam_channel::Sender, + b2b_notify_send_of_recv_s: crossbeam_channel::Sender<(Cid, ProtocolEvent)>, ) { let mut recv_protocols: HashMap> = HashMap::new(); // we should be able to directly await futures imo @@ -390,7 +452,7 @@ impl BParticipant { guaranteed_bandwidth, }) => { trace!(?sid, "open stream"); - let _ = b2b_notify_send_of_recv_s.send(r.unwrap()); + let _ = b2b_notify_send_of_recv_s.send((cid, r.unwrap())); // waiting for receiving is not necessary, because the send_mgr will first // process this before process messages! let stream = self @@ -401,7 +463,7 @@ impl BParticipant { }, Ok(ProtocolEvent::CloseStream { sid }) => { trace!(?sid, "close stream"); - let _ = b2b_notify_send_of_recv_s.send(r.unwrap()); + let _ = b2b_notify_send_of_recv_s.send((cid, r.unwrap())); self.delete_stream(sid).await; retrigger(cid, p, &mut recv_protocols); }, diff --git a/network/src/trace.rs b/network/src/trace.rs deleted file mode 100644 index 640d65ee55..0000000000 --- a/network/src/trace.rs +++ /dev/null @@ -1,46 +0,0 @@ -use core::hash::Hash; -use std::{collections::HashMap, time::Instant}; -use tracing::Level; - -/// used to collect multiple traces and not spam the console -pub(crate) struct DeferredTracer { - level: Level, - items: HashMap, - last: Instant, - last_cnt: u32, -} - -impl DeferredTracer { - pub(crate) fn new(level: Level) -> Self { - Self { - level, - items: HashMap::new(), - last: Instant::now(), - last_cnt: 0, - } - } - - pub(crate) fn log(&mut self, t: T) { - if tracing::level_enabled!(self.level) { - *self.items.entry(t).or_default() += 1; - self.last = Instant::now(); - self.last_cnt += 1; - } else { - } - } - - pub(crate) fn print(&mut self) -> Option> { - const MAX_LOGS: u32 = 10_000; - const MAX_SECS: u64 = 1; - if tracing::level_enabled!(self.level) - && (self.last_cnt > MAX_LOGS || self.last.elapsed().as_secs() >= MAX_SECS) - { - if self.last_cnt > MAX_LOGS { - tracing::debug!("this seems to be logged continuesly"); - } - Some(std::mem::take(&mut self.items)) - } else { - None - } - } -} diff --git a/network/src/util.rs b/network/src/util.rs new file mode 100644 index 0000000000..b9a8801263 --- /dev/null +++ b/network/src/util.rs @@ -0,0 +1,117 @@ +use core::hash::Hash; +use std::{collections::HashMap, time::Instant}; +use tracing::Level; + +/// used to collect multiple traces and not spam the console +pub(crate) struct DeferredTracer { + level: Level, + items: HashMap, + last: Instant, + last_cnt: u32, +} + +impl DeferredTracer { + pub(crate) fn new(level: Level) -> Self { + Self { + level, + items: HashMap::new(), + last: Instant::now(), + last_cnt: 0, + } + } + + pub(crate) fn log(&mut self, t: T) { + if tracing::level_enabled!(self.level) { + *self.items.entry(t).or_default() += 1; + self.last = Instant::now(); + self.last_cnt += 1; + } else { + } + } + + pub(crate) fn print(&mut self) -> Option> { + const MAX_LOGS: u32 = 10_000; + const MAX_SECS: u64 = 1; + if tracing::level_enabled!(self.level) + && (self.last_cnt > MAX_LOGS || self.last.elapsed().as_secs() >= MAX_SECS) + { + if self.last_cnt > MAX_LOGS { + tracing::debug!("this seems to be logged continuesly"); + } + Some(std::mem::take(&mut self.items)) + } else { + None + } + } +} + +/// Used for storing Protocols in a Participant or Stream <-> Protocol +pub(crate) struct SortedVec { + pub data: Vec<(K, V)>, +} + +impl Default for SortedVec { + fn default() -> Self { Self { data: vec![] } } +} + +impl SortedVec +where + K: Ord + Copy, +{ + pub fn insert(&mut self, k: K, v: V) { + self.data.push((k, v)); + self.data.sort_by_key(|&(k, _)| k); + } + + pub fn delete(&mut self, k: &K) -> Option { + if let Ok(i) = self.data.binary_search_by_key(k, |&(k, _)| k) { + Some(self.data.remove(i).1) + } else { + None + } + } + + pub fn get(&self, k: &K) -> Option<&V> { + if let Ok(i) = self.data.binary_search_by_key(k, |&(k, _)| k) { + Some(&self.data[i].1) + } else { + None + } + } + + pub fn get_mut(&mut self, k: &K) -> Option<&mut V> { + if let Ok(i) = self.data.binary_search_by_key(k, |&(k, _)| k) { + Some(&mut self.data[i].1) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sorted_vec() { + let mut vec = SortedVec::default(); + vec.insert(10, "Hello"); + println!("{:?}", vec.data); + vec.insert(30, "World"); + println!("{:?}", vec.data); + vec.insert(20, " "); + println!("{:?}", vec.data); + assert_eq!(vec.data[0].1, "Hello"); + assert_eq!(vec.data[1].1, " "); + assert_eq!(vec.data[2].1, "World"); + assert_eq!(vec.get(&30), Some(&"World")); + assert_eq!(vec.get_mut(&20), Some(&mut " ")); + assert_eq!(vec.get(&10), Some(&"Hello")); + assert_eq!(vec.delete(&40), None); + assert_eq!(vec.delete(&10), Some("Hello")); + assert_eq!(vec.delete(&10), None); + assert_eq!(vec.get(&30), Some(&"World")); + assert_eq!(vec.get_mut(&20), Some(&mut " ")); + assert_eq!(vec.get(&10), None); + } +} From 3ac7120f391aae727d6765cd72c06a1de372de76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Thu, 25 Mar 2021 16:46:40 +0100 Subject: [PATCH 3/4] fix a bug that some closes could get lost --- network/src/participant.rs | 114 +++++++++++++++++++++---------------- 1 file changed, 65 insertions(+), 49 deletions(-) diff --git a/network/src/participant.rs b/network/src/participant.rs index c1b350c817..1688df36bd 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -136,8 +136,10 @@ impl BParticipant { async_channel::unbounded::(); let (b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_r) = async_channel::unbounded::(); - let (b2b_notify_send_of_recv_s, b2b_notify_send_of_recv_r) = - crossbeam_channel::unbounded::<(Cid, ProtocolEvent)>(); + let (b2b_notify_send_of_recv_open_s, b2b_notify_send_of_recv_open_r) = + crossbeam_channel::unbounded::<(Cid, Sid, Prio, Promises, u64)>(); + let (b2b_notify_send_of_recv_close_s, b2b_notify_send_of_recv_close_r) = + crossbeam_channel::unbounded::<(Cid, Sid)>(); let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded_channel::(); let (a2b_msg_s, a2b_msg_r) = crossbeam_channel::unbounded::<(Sid, Bytes)>(); @@ -155,7 +157,8 @@ impl BParticipant { a2b_msg_r, b2b_add_send_protocol_r, b2b_close_send_protocol_r, - b2b_notify_send_of_recv_r, + b2b_notify_send_of_recv_open_r, + b2b_notify_send_of_recv_close_r, b2s_prio_statistic_s, ) .instrument(tracing::info_span!("send")), @@ -164,7 +167,8 @@ impl BParticipant { b2b_add_recv_protocol_r, b2b_force_close_recv_protocol_r, b2b_close_send_protocol_s.clone(), - b2b_notify_send_of_recv_s, + b2b_notify_send_of_recv_open_s, + b2b_notify_send_of_recv_close_s, ) .instrument(tracing::info_span!("recv")), self.create_channel_mgr( @@ -211,7 +215,14 @@ impl BParticipant { a2b_msg_r: crossbeam_channel::Receiver<(Sid, Bytes)>, mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, SendProtocols)>, b2b_close_send_protocol_r: async_channel::Receiver, - b2b_notify_send_of_recv_r: crossbeam_channel::Receiver<(Cid, ProtocolEvent)>, + b2b_notify_send_of_recv_open_r: crossbeam_channel::Receiver<( + Cid, + Sid, + Prio, + Promises, + Bandwidth, + )>, + b2b_notify_send_of_recv_close_r: crossbeam_channel::Receiver<(Cid, Sid)>, _b2s_prio_statistic_s: mpsc::UnboundedSender, ) { let mut sorted_send_protocols = SortedVec::::default(); @@ -276,24 +287,22 @@ impl BParticipant { } // process recv content first - let mut closeevents = b2b_notify_send_of_recv_r - .try_iter() - .map(|(cid, e)| match e { - ProtocolEvent::OpenStream { sid, .. } => { - match sorted_send_protocols.get_mut(&cid) { - Some(p) => { - sorted_stream_protocols.insert(sid, cid); - p.notify_from_recv(e); - }, - None => { - warn!(?cid, "couldn't notify create protocol, doesn't exist") - }, - }; - None + for (cid, sid, prio, promises, guaranteed_bandwidth) in + b2b_notify_send_of_recv_open_r.try_iter() + { + match sorted_send_protocols.get_mut(&cid) { + Some(p) => { + sorted_stream_protocols.insert(sid, cid); + p.notify_from_recv(ProtocolEvent::OpenStream { + sid, + prio, + promises, + guaranteed_bandwidth, + }); }, - e => Some((cid, e)), - }) - .collect::>(); + None => warn!(?cid, "couldn't notify create protocol, doesn't exist"), + }; + } // get all messages and assign it to a channel for (sid, buffer) in a2b_msg_r.try_iter() { @@ -307,36 +316,30 @@ impl BParticipant { } // process recv content afterwards - //TODO: this might get skipped when a send msg fails on another channel in the - // previous line - let _ = closeevents.drain(..).map(|e| { - if let Some((cid, e)) = e { - match sorted_send_protocols.get_mut(&cid) { - Some(p) => { - if let ProtocolEvent::OpenStream { sid, .. } = e { - let _ = sorted_stream_protocols.delete(&sid); - p.notify_from_recv(e); - } else { - unreachable!("we dont send other over this channel"); - } - }, - None => warn!(?cid, "couldn't notify close protocol, doesn't exist"), - }; - } - }); + for (cid, sid) in b2b_notify_send_of_recv_close_r.try_iter() { + match sorted_send_protocols.get_mut(&cid) { + Some(p) => { + let _ = sorted_stream_protocols.delete(&sid); + p.notify_from_recv(ProtocolEvent::CloseStream { sid }); + }, + None => warn!(?cid, "couldn't notify close protocol, doesn't exist"), + }; + } if let Some(sid) = close { trace!(?stream_ids, "delete stream"); self.delete_stream(sid).await; // Fire&Forget the protocol will take care to verify that this Frame is delayed // till the last msg was received! - cid = sorted_stream_protocols.delete(&sid).unwrap(); - let event = ProtocolEvent::CloseStream { sid }; - sorted_send_protocols - .get_mut(&cid) - .unwrap() - .send(event) - .await?; + if let Some(c) = sorted_stream_protocols.delete(&sid) { + cid = c; + let event = ProtocolEvent::CloseStream { sid }; + sorted_send_protocols + .get_mut(&c) + .unwrap() + .send(event) + .await?; + } } let send_time = Instant::now(); @@ -392,7 +395,14 @@ impl BParticipant { mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, RecvProtocols)>, b2b_force_close_recv_protocol_r: async_channel::Receiver, b2b_close_send_protocol_s: async_channel::Sender, - b2b_notify_send_of_recv_s: crossbeam_channel::Sender<(Cid, ProtocolEvent)>, + b2b_notify_send_of_recv_open_r: crossbeam_channel::Sender<( + Cid, + Sid, + Prio, + Promises, + Bandwidth, + )>, + b2b_notify_send_of_recv_close_s: crossbeam_channel::Sender<(Cid, Sid)>, ) { let mut recv_protocols: HashMap> = HashMap::new(); // we should be able to directly await futures imo @@ -452,7 +462,13 @@ impl BParticipant { guaranteed_bandwidth, }) => { trace!(?sid, "open stream"); - let _ = b2b_notify_send_of_recv_s.send((cid, r.unwrap())); + let _ = b2b_notify_send_of_recv_open_r.send(( + cid, + sid, + prio, + promises, + guaranteed_bandwidth, + )); // waiting for receiving is not necessary, because the send_mgr will first // process this before process messages! let stream = self @@ -463,7 +479,7 @@ impl BParticipant { }, Ok(ProtocolEvent::CloseStream { sid }) => { trace!(?sid, "close stream"); - let _ = b2b_notify_send_of_recv_s.send((cid, r.unwrap())); + let _ = b2b_notify_send_of_recv_close_s.send((cid, sid)); self.delete_stream(sid).await; retrigger(cid, p, &mut recv_protocols); }, From f3b6bb1dd166aa153f569b54da22551366797c11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Thu, 25 Mar 2021 18:28:50 +0100 Subject: [PATCH 4/4] implement Upload Bandwidth prediction. Its available to `api` and `metrics` and can be used to slow down msg send in veloren. It uses a tokio::watch for now, as i plan to have a watch job in the scheduler that recalculates prio on change. Also cleaning up participant metrics after a disconnect --- network/protocol/src/lib.rs | 2 +- network/protocol/src/mpsc.rs | 4 +++- network/protocol/src/tcp.rs | 8 ++++++-- network/src/api.rs | 9 +++++++- network/src/channel.rs | 12 +++++++---- network/src/metrics.rs | 40 ++++++++++++++++++++++++++++++++++++ network/src/participant.rs | 26 +++++++++++++++++++++-- network/src/scheduler.rs | 2 ++ 8 files changed, 92 insertions(+), 11 deletions(-) diff --git a/network/protocol/src/lib.rs b/network/protocol/src/lib.rs index de036cee76..79c1ae867a 100644 --- a/network/protocol/src/lib.rs +++ b/network/protocol/src/lib.rs @@ -117,7 +117,7 @@ pub trait SendProtocol { &mut self, bandwidth: Bandwidth, dt: std::time::Duration, - ) -> Result<(), ProtocolError>; + ) -> Result; } /// Generic Network Recv Protocol. See: [`SendProtocol`] diff --git a/network/protocol/src/mpsc.rs b/network/protocol/src/mpsc.rs index 76969aa0be..1f3219bab2 100644 --- a/network/protocol/src/mpsc.rs +++ b/network/protocol/src/mpsc.rs @@ -112,7 +112,9 @@ where } } - async fn flush(&mut self, _: Bandwidth, _: Duration) -> Result<(), ProtocolError> { Ok(()) } + async fn flush(&mut self, _: Bandwidth, _: Duration) -> Result { + Ok(0) + } } #[async_trait] diff --git a/network/protocol/src/tcp.rs b/network/protocol/src/tcp.rs index 315fd0d076..e78741fc01 100644 --- a/network/protocol/src/tcp.rs +++ b/network/protocol/src/tcp.rs @@ -167,7 +167,11 @@ where Ok(()) } - async fn flush(&mut self, bandwidth: Bandwidth, dt: Duration) -> Result<(), ProtocolError> { + async fn flush( + &mut self, + bandwidth: Bandwidth, + dt: Duration, + ) -> Result { let (frames, total_bytes) = self.store.grab(bandwidth, dt); self.buffer.reserve(total_bytes as usize); let mut data_frames = 0; @@ -216,7 +220,7 @@ where self.drain.send(self.buffer.split()).await?; self.pending_shutdown = false; } - Ok(()) + Ok(data_bandwidth as u64) } } diff --git a/network/src/api.rs b/network/src/api.rs index 5379e431c3..bd5e03cddc 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -22,7 +22,7 @@ use std::{ use tokio::{ io, runtime::Runtime, - sync::{mpsc, oneshot, Mutex}, + sync::{mpsc, oneshot, watch, Mutex}, }; use tracing::*; @@ -48,6 +48,7 @@ pub struct Participant { remote_pid: Pid, a2b_open_stream_s: Mutex>, b2a_stream_opened_r: Mutex>, + b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: A2sDisconnect, } @@ -493,6 +494,7 @@ impl Participant { remote_pid: Pid, a2b_open_stream_s: mpsc::UnboundedSender, b2a_stream_opened_r: mpsc::UnboundedReceiver, + b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>, ) -> Self { Self { @@ -500,6 +502,7 @@ impl Participant { remote_pid, a2b_open_stream_s: Mutex::new(a2b_open_stream_s), b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r), + b2a_bandwidth_stats_r, a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), } } @@ -721,6 +724,10 @@ impl Participant { } } + /// Returns the current approximation on the maximum bandwidth available. + /// This WILL fluctuate based on the amount/size of send messages. + pub fn bandwidth(&self) -> f32 { *self.b2a_bandwidth_stats_r.borrow() } + /// Returns the remote [`Pid`](network_protocol::Pid) pub fn remote_pid(&self) -> Pid { self.remote_pid } } diff --git a/network/src/channel.rs b/network/src/channel.rs index affaa1ee34..cf7a7851bd 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -1,9 +1,9 @@ use async_trait::async_trait; use bytes::BytesMut; use network_protocol::{ - Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid, ProtocolError, - ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol, TcpSendProtocol, - UnreliableDrain, UnreliableSink, + Bandwidth, Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid, + ProtocolError, ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol, + TcpSendProtocol, UnreliableDrain, UnreliableSink, }; use std::{sync::Arc, time::Duration}; use tokio::{ @@ -102,7 +102,11 @@ impl network_protocol::SendProtocol for SendProtocols { } } - async fn flush(&mut self, bandwidth: u64, dt: Duration) -> Result<(), ProtocolError> { + async fn flush( + &mut self, + bandwidth: Bandwidth, + dt: Duration, + ) -> Result { match self { SendProtocols::Tcp(s) => s.flush(bandwidth, dt).await, SendProtocols::Mpsc(s) => s.flush(bandwidth, dt).await, diff --git a/network/src/metrics.rs b/network/src/metrics.rs index a5874a4b86..9733b477a1 100644 --- a/network/src/metrics.rs +++ b/network/src/metrics.rs @@ -13,6 +13,8 @@ pub struct NetworkMetrics { pub participants_disconnected_total: IntCounter, // channel id's, seperated by PARTICIPANT, max 5 pub participants_channel_ids: IntGaugeVec, + // upload to remote, averaged, seperated by PARTICIPANT + pub participants_bandwidth: IntGaugeVec, // opened Channels, seperated by PARTICIPANT pub channels_connected_total: IntCounterVec, pub channels_disconnected_total: IntCounterVec, @@ -57,6 +59,13 @@ impl NetworkMetrics { ), &["participant", "no"], )?; + let participants_bandwidth = IntGaugeVec::new( + Opts::new( + "participants_bandwidth", + "max upload possible to Participant", + ), + &["participant"], + )?; let channels_connected_total = IntCounterVec::new( Opts::new( "channels_connected_total", @@ -104,6 +113,7 @@ impl NetworkMetrics { participants_connected_total, participants_disconnected_total, participants_channel_ids, + participants_bandwidth, channels_connected_total, channels_disconnected_total, streams_opened_total, @@ -118,6 +128,7 @@ impl NetworkMetrics { registry.register(Box::new(self.participants_connected_total.clone()))?; registry.register(Box::new(self.participants_disconnected_total.clone()))?; registry.register(Box::new(self.participants_channel_ids.clone()))?; + registry.register(Box::new(self.participants_bandwidth.clone()))?; registry.register(Box::new(self.channels_connected_total.clone()))?; registry.register(Box::new(self.channels_disconnected_total.clone()))?; registry.register(Box::new(self.streams_opened_total.clone()))?; @@ -141,6 +152,12 @@ impl NetworkMetrics { .inc(); } + pub(crate) fn participant_bandwidth(&self, remote_p: &str, bandwidth: f32) { + self.participants_bandwidth + .with_label_values(&[remote_p]) + .set(bandwidth as i64); + } + pub(crate) fn streams_opened(&self, remote_p: &str) { self.streams_opened_total .with_label_values(&[remote_p]) @@ -164,6 +181,25 @@ impl NetworkMetrics { .with_label_values(&[protocol_name(protocol)]) .inc(); } + + pub(crate) fn cleanup_participant(&self, remote_p: &str) { + for no in 0..5 { + let _ = self + .participants_channel_ids + .remove_label_values(&[&remote_p, &no.to_string()]); + } + let _ = self + .channels_connected_total + .remove_label_values(&[&remote_p]); + let _ = self + .channels_disconnected_total + .remove_label_values(&[&remote_p]); + let _ = self + .participants_bandwidth + .remove_label_values(&[&remote_p]); + let _ = self.streams_opened_total.remove_label_values(&[&remote_p]); + let _ = self.streams_closed_total.remove_label_values(&[&remote_p]); + } } #[cfg(feature = "metrics")] @@ -183,6 +219,8 @@ impl NetworkMetrics { pub(crate) fn channels_disconnected(&self, _remote_p: &str) {} + pub(crate) fn participant_bandwidth(&self, _remote_p: &str, _bandwidth: f32) {} + pub(crate) fn streams_opened(&self, _remote_p: &str) {} pub(crate) fn streams_closed(&self, _remote_p: &str) {} @@ -190,6 +228,8 @@ impl NetworkMetrics { pub(crate) fn listen_request(&self, _protocol: &ProtocolAddr) {} pub(crate) fn connect_request(&self, _protocol: &ProtocolAddr) {} + + pub(crate) fn cleanup_participant(&self, _remote_p: &str) {} } impl std::fmt::Debug for NetworkMetrics { diff --git a/network/src/participant.rs b/network/src/participant.rs index 1688df36bd..bc38a97b59 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -19,7 +19,7 @@ use std::{ }; use tokio::{ select, - sync::{mpsc, oneshot, Mutex, RwLock}, + sync::{mpsc, oneshot, watch, Mutex, RwLock}, task::JoinHandle, }; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -49,6 +49,7 @@ struct ControlChannels { a2b_open_stream_r: mpsc::UnboundedReceiver, b2a_stream_opened_s: mpsc::UnboundedSender, s2b_create_channel_r: mpsc::UnboundedReceiver, + b2a_bandwidth_stats_s: watch::Sender, s2b_shutdown_bparticipant_r: oneshot::Receiver, /* own */ } @@ -92,16 +93,19 @@ impl BParticipant { mpsc::UnboundedReceiver, mpsc::UnboundedSender, oneshot::Sender, + watch::Receiver, ) { let (a2b_open_stream_s, a2b_open_stream_r) = mpsc::unbounded_channel::(); let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded_channel::(); let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel(); let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded_channel(); + let (b2a_bandwidth_stats_s, b2a_bandwidth_stats_r) = watch::channel::(0.0); let run_channels = Some(ControlChannels { a2b_open_stream_r, b2a_stream_opened_s, s2b_create_channel_r, + b2a_bandwidth_stats_s, s2b_shutdown_bparticipant_r, }); @@ -124,6 +128,7 @@ impl BParticipant { b2a_stream_opened_r, s2b_create_channel_s, s2b_shutdown_bparticipant_s, + b2a_bandwidth_stats_r, ) } @@ -160,6 +165,7 @@ impl BParticipant { b2b_notify_send_of_recv_open_r, b2b_notify_send_of_recv_close_r, b2s_prio_statistic_s, + run_channels.b2a_bandwidth_stats_s, ) .instrument(tracing::info_span!("send")), self.recv_mgr( @@ -224,12 +230,14 @@ impl BParticipant { )>, b2b_notify_send_of_recv_close_r: crossbeam_channel::Receiver<(Cid, Sid)>, _b2s_prio_statistic_s: mpsc::UnboundedSender, + b2a_bandwidth_stats_s: watch::Sender, ) { let mut sorted_send_protocols = SortedVec::::default(); let mut sorted_stream_protocols = SortedVec::::default(); let mut interval = tokio::time::interval(Self::TICK_TIME); let mut last_instant = Instant::now(); let mut stream_ids = self.offset_sid; + let mut part_bandwidth = 0.0f32; trace!("workaround, actively wait for first protocol"); if let Some((c, p)) = b2b_add_protocol_r.recv().await { sorted_send_protocols.insert(c, p) @@ -345,9 +353,15 @@ impl BParticipant { let send_time = Instant::now(); let diff = send_time.duration_since(last_instant); last_instant = send_time; + let mut cnt = 0; for (_, p) in sorted_send_protocols.data.iter_mut() { - p.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it. + cnt += p.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it. } + let flush_time = send_time.elapsed().as_secs_f32(); + part_bandwidth = 0.99 * part_bandwidth + 0.01 * (cnt as f32 / flush_time); + self.metrics + .participant_bandwidth(&self.remote_pid_string, part_bandwidth); + let _ = b2a_bandwidth_stats_s.send(part_bandwidth); let r: Result<(), network_protocol::ProtocolError> = Ok(()); r } @@ -669,6 +683,7 @@ impl BParticipant { #[cfg(feature = "metrics")] self.metrics.participants_disconnected_total.inc(); + self.metrics.cleanup_participant(&self.remote_pid_string); trace!("Stop participant_shutdown_mgr"); } @@ -755,6 +770,7 @@ mod tests { mpsc::UnboundedSender, oneshot::Sender, mpsc::UnboundedReceiver, + watch::Receiver, JoinHandle<()>, ) { let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap()); @@ -769,6 +785,7 @@ mod tests { b2a_stream_opened_r, s2b_create_channel_s, s2b_shutdown_bparticipant_s, + b2a_bandwidth_stats_r, ) = runtime_clone.block_on(async move { let local_pid = Pid::fake(0); let remote_pid = Pid::fake(1); @@ -786,6 +803,7 @@ mod tests { s2b_create_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, + b2a_bandwidth_stats_r, handle, ) } @@ -816,6 +834,7 @@ mod tests { mut s2b_create_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, + _b2a_bandwidth_stats_r, handle, ) = mock_bparticipant(); @@ -851,6 +870,7 @@ mod tests { mut s2b_create_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, + _b2a_bandwidth_stats_r, handle, ) = mock_bparticipant(); @@ -887,6 +907,7 @@ mod tests { mut s2b_create_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, + _b2a_bandwidth_stats_r, handle, ) = mock_bparticipant(); @@ -941,6 +962,7 @@ mod tests { mut s2b_create_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, + _b2a_bandwidth_stats_r, handle, ) = mock_bparticipant(); diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 90becb0793..abb54b6578 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -568,6 +568,7 @@ impl Scheduler { b2a_stream_opened_r, s2b_create_channel_s, s2b_shutdown_bparticipant_s, + b2a_bandwidth_stats_r, ) = BParticipant::new(local_pid, pid, sid, Arc::clone(&metrics)); let participant = Participant::new( @@ -575,6 +576,7 @@ impl Scheduler { pid, a2b_open_stream_s, b2a_stream_opened_r, + b2a_bandwidth_stats_r, participant_channels.a2s_disconnect_s, );