diff --git a/network/protocol/src/lib.rs b/network/protocol/src/lib.rs index de036cee76..79c1ae867a 100644 --- a/network/protocol/src/lib.rs +++ b/network/protocol/src/lib.rs @@ -117,7 +117,7 @@ pub trait SendProtocol { &mut self, bandwidth: Bandwidth, dt: std::time::Duration, - ) -> Result<(), ProtocolError>; + ) -> Result; } /// Generic Network Recv Protocol. See: [`SendProtocol`] diff --git a/network/protocol/src/mpsc.rs b/network/protocol/src/mpsc.rs index 76969aa0be..1f3219bab2 100644 --- a/network/protocol/src/mpsc.rs +++ b/network/protocol/src/mpsc.rs @@ -112,7 +112,9 @@ where } } - async fn flush(&mut self, _: Bandwidth, _: Duration) -> Result<(), ProtocolError> { Ok(()) } + async fn flush(&mut self, _: Bandwidth, _: Duration) -> Result { + Ok(0) + } } #[async_trait] diff --git a/network/protocol/src/tcp.rs b/network/protocol/src/tcp.rs index 315fd0d076..e78741fc01 100644 --- a/network/protocol/src/tcp.rs +++ b/network/protocol/src/tcp.rs @@ -167,7 +167,11 @@ where Ok(()) } - async fn flush(&mut self, bandwidth: Bandwidth, dt: Duration) -> Result<(), ProtocolError> { + async fn flush( + &mut self, + bandwidth: Bandwidth, + dt: Duration, + ) -> Result { let (frames, total_bytes) = self.store.grab(bandwidth, dt); self.buffer.reserve(total_bytes as usize); let mut data_frames = 0; @@ -216,7 +220,7 @@ where self.drain.send(self.buffer.split()).await?; self.pending_shutdown = false; } - Ok(()) + Ok(data_bandwidth as u64) } } diff --git a/network/src/api.rs b/network/src/api.rs index 5379e431c3..bd5e03cddc 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -22,7 +22,7 @@ use std::{ use tokio::{ io, runtime::Runtime, - sync::{mpsc, oneshot, Mutex}, + sync::{mpsc, oneshot, watch, Mutex}, }; use tracing::*; @@ -48,6 +48,7 @@ pub struct Participant { remote_pid: Pid, a2b_open_stream_s: Mutex>, b2a_stream_opened_r: Mutex>, + b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: A2sDisconnect, } @@ -493,6 +494,7 @@ impl Participant { remote_pid: Pid, a2b_open_stream_s: mpsc::UnboundedSender, b2a_stream_opened_r: mpsc::UnboundedReceiver, + b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>, ) -> Self { Self { @@ -500,6 +502,7 @@ impl Participant { remote_pid, a2b_open_stream_s: Mutex::new(a2b_open_stream_s), b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r), + b2a_bandwidth_stats_r, a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), } } @@ -721,6 +724,10 @@ impl Participant { } } + /// Returns the current approximation on the maximum bandwidth available. + /// This WILL fluctuate based on the amount/size of send messages. + pub fn bandwidth(&self) -> f32 { *self.b2a_bandwidth_stats_r.borrow() } + /// Returns the remote [`Pid`](network_protocol::Pid) pub fn remote_pid(&self) -> Pid { self.remote_pid } } diff --git a/network/src/channel.rs b/network/src/channel.rs index affaa1ee34..cf7a7851bd 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -1,9 +1,9 @@ use async_trait::async_trait; use bytes::BytesMut; use network_protocol::{ - Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid, ProtocolError, - ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol, TcpSendProtocol, - UnreliableDrain, UnreliableSink, + Bandwidth, Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid, + ProtocolError, ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol, + TcpSendProtocol, UnreliableDrain, UnreliableSink, }; use std::{sync::Arc, time::Duration}; use tokio::{ @@ -102,7 +102,11 @@ impl network_protocol::SendProtocol for SendProtocols { } } - async fn flush(&mut self, bandwidth: u64, dt: Duration) -> Result<(), ProtocolError> { + async fn flush( + &mut self, + bandwidth: Bandwidth, + dt: Duration, + ) -> Result { match self { SendProtocols::Tcp(s) => s.flush(bandwidth, dt).await, SendProtocols::Mpsc(s) => s.flush(bandwidth, dt).await, diff --git a/network/src/metrics.rs b/network/src/metrics.rs index a5874a4b86..9733b477a1 100644 --- a/network/src/metrics.rs +++ b/network/src/metrics.rs @@ -13,6 +13,8 @@ pub struct NetworkMetrics { pub participants_disconnected_total: IntCounter, // channel id's, seperated by PARTICIPANT, max 5 pub participants_channel_ids: IntGaugeVec, + // upload to remote, averaged, seperated by PARTICIPANT + pub participants_bandwidth: IntGaugeVec, // opened Channels, seperated by PARTICIPANT pub channels_connected_total: IntCounterVec, pub channels_disconnected_total: IntCounterVec, @@ -57,6 +59,13 @@ impl NetworkMetrics { ), &["participant", "no"], )?; + let participants_bandwidth = IntGaugeVec::new( + Opts::new( + "participants_bandwidth", + "max upload possible to Participant", + ), + &["participant"], + )?; let channels_connected_total = IntCounterVec::new( Opts::new( "channels_connected_total", @@ -104,6 +113,7 @@ impl NetworkMetrics { participants_connected_total, participants_disconnected_total, participants_channel_ids, + participants_bandwidth, channels_connected_total, channels_disconnected_total, streams_opened_total, @@ -118,6 +128,7 @@ impl NetworkMetrics { registry.register(Box::new(self.participants_connected_total.clone()))?; registry.register(Box::new(self.participants_disconnected_total.clone()))?; registry.register(Box::new(self.participants_channel_ids.clone()))?; + registry.register(Box::new(self.participants_bandwidth.clone()))?; registry.register(Box::new(self.channels_connected_total.clone()))?; registry.register(Box::new(self.channels_disconnected_total.clone()))?; registry.register(Box::new(self.streams_opened_total.clone()))?; @@ -141,6 +152,12 @@ impl NetworkMetrics { .inc(); } + pub(crate) fn participant_bandwidth(&self, remote_p: &str, bandwidth: f32) { + self.participants_bandwidth + .with_label_values(&[remote_p]) + .set(bandwidth as i64); + } + pub(crate) fn streams_opened(&self, remote_p: &str) { self.streams_opened_total .with_label_values(&[remote_p]) @@ -164,6 +181,25 @@ impl NetworkMetrics { .with_label_values(&[protocol_name(protocol)]) .inc(); } + + pub(crate) fn cleanup_participant(&self, remote_p: &str) { + for no in 0..5 { + let _ = self + .participants_channel_ids + .remove_label_values(&[&remote_p, &no.to_string()]); + } + let _ = self + .channels_connected_total + .remove_label_values(&[&remote_p]); + let _ = self + .channels_disconnected_total + .remove_label_values(&[&remote_p]); + let _ = self + .participants_bandwidth + .remove_label_values(&[&remote_p]); + let _ = self.streams_opened_total.remove_label_values(&[&remote_p]); + let _ = self.streams_closed_total.remove_label_values(&[&remote_p]); + } } #[cfg(feature = "metrics")] @@ -183,6 +219,8 @@ impl NetworkMetrics { pub(crate) fn channels_disconnected(&self, _remote_p: &str) {} + pub(crate) fn participant_bandwidth(&self, _remote_p: &str, _bandwidth: f32) {} + pub(crate) fn streams_opened(&self, _remote_p: &str) {} pub(crate) fn streams_closed(&self, _remote_p: &str) {} @@ -190,6 +228,8 @@ impl NetworkMetrics { pub(crate) fn listen_request(&self, _protocol: &ProtocolAddr) {} pub(crate) fn connect_request(&self, _protocol: &ProtocolAddr) {} + + pub(crate) fn cleanup_participant(&self, _remote_p: &str) {} } impl std::fmt::Debug for NetworkMetrics { diff --git a/network/src/participant.rs b/network/src/participant.rs index 1688df36bd..bc38a97b59 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -19,7 +19,7 @@ use std::{ }; use tokio::{ select, - sync::{mpsc, oneshot, Mutex, RwLock}, + sync::{mpsc, oneshot, watch, Mutex, RwLock}, task::JoinHandle, }; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -49,6 +49,7 @@ struct ControlChannels { a2b_open_stream_r: mpsc::UnboundedReceiver, b2a_stream_opened_s: mpsc::UnboundedSender, s2b_create_channel_r: mpsc::UnboundedReceiver, + b2a_bandwidth_stats_s: watch::Sender, s2b_shutdown_bparticipant_r: oneshot::Receiver, /* own */ } @@ -92,16 +93,19 @@ impl BParticipant { mpsc::UnboundedReceiver, mpsc::UnboundedSender, oneshot::Sender, + watch::Receiver, ) { let (a2b_open_stream_s, a2b_open_stream_r) = mpsc::unbounded_channel::(); let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded_channel::(); let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel(); let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded_channel(); + let (b2a_bandwidth_stats_s, b2a_bandwidth_stats_r) = watch::channel::(0.0); let run_channels = Some(ControlChannels { a2b_open_stream_r, b2a_stream_opened_s, s2b_create_channel_r, + b2a_bandwidth_stats_s, s2b_shutdown_bparticipant_r, }); @@ -124,6 +128,7 @@ impl BParticipant { b2a_stream_opened_r, s2b_create_channel_s, s2b_shutdown_bparticipant_s, + b2a_bandwidth_stats_r, ) } @@ -160,6 +165,7 @@ impl BParticipant { b2b_notify_send_of_recv_open_r, b2b_notify_send_of_recv_close_r, b2s_prio_statistic_s, + run_channels.b2a_bandwidth_stats_s, ) .instrument(tracing::info_span!("send")), self.recv_mgr( @@ -224,12 +230,14 @@ impl BParticipant { )>, b2b_notify_send_of_recv_close_r: crossbeam_channel::Receiver<(Cid, Sid)>, _b2s_prio_statistic_s: mpsc::UnboundedSender, + b2a_bandwidth_stats_s: watch::Sender, ) { let mut sorted_send_protocols = SortedVec::::default(); let mut sorted_stream_protocols = SortedVec::::default(); let mut interval = tokio::time::interval(Self::TICK_TIME); let mut last_instant = Instant::now(); let mut stream_ids = self.offset_sid; + let mut part_bandwidth = 0.0f32; trace!("workaround, actively wait for first protocol"); if let Some((c, p)) = b2b_add_protocol_r.recv().await { sorted_send_protocols.insert(c, p) @@ -345,9 +353,15 @@ impl BParticipant { let send_time = Instant::now(); let diff = send_time.duration_since(last_instant); last_instant = send_time; + let mut cnt = 0; for (_, p) in sorted_send_protocols.data.iter_mut() { - p.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it. + cnt += p.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it. } + let flush_time = send_time.elapsed().as_secs_f32(); + part_bandwidth = 0.99 * part_bandwidth + 0.01 * (cnt as f32 / flush_time); + self.metrics + .participant_bandwidth(&self.remote_pid_string, part_bandwidth); + let _ = b2a_bandwidth_stats_s.send(part_bandwidth); let r: Result<(), network_protocol::ProtocolError> = Ok(()); r } @@ -669,6 +683,7 @@ impl BParticipant { #[cfg(feature = "metrics")] self.metrics.participants_disconnected_total.inc(); + self.metrics.cleanup_participant(&self.remote_pid_string); trace!("Stop participant_shutdown_mgr"); } @@ -755,6 +770,7 @@ mod tests { mpsc::UnboundedSender, oneshot::Sender, mpsc::UnboundedReceiver, + watch::Receiver, JoinHandle<()>, ) { let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap()); @@ -769,6 +785,7 @@ mod tests { b2a_stream_opened_r, s2b_create_channel_s, s2b_shutdown_bparticipant_s, + b2a_bandwidth_stats_r, ) = runtime_clone.block_on(async move { let local_pid = Pid::fake(0); let remote_pid = Pid::fake(1); @@ -786,6 +803,7 @@ mod tests { s2b_create_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, + b2a_bandwidth_stats_r, handle, ) } @@ -816,6 +834,7 @@ mod tests { mut s2b_create_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, + _b2a_bandwidth_stats_r, handle, ) = mock_bparticipant(); @@ -851,6 +870,7 @@ mod tests { mut s2b_create_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, + _b2a_bandwidth_stats_r, handle, ) = mock_bparticipant(); @@ -887,6 +907,7 @@ mod tests { mut s2b_create_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, + _b2a_bandwidth_stats_r, handle, ) = mock_bparticipant(); @@ -941,6 +962,7 @@ mod tests { mut s2b_create_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, + _b2a_bandwidth_stats_r, handle, ) = mock_bparticipant(); diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 90becb0793..abb54b6578 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -568,6 +568,7 @@ impl Scheduler { b2a_stream_opened_r, s2b_create_channel_s, s2b_shutdown_bparticipant_s, + b2a_bandwidth_stats_r, ) = BParticipant::new(local_pid, pid, sid, Arc::clone(&metrics)); let participant = Participant::new( @@ -575,6 +576,7 @@ impl Scheduler { pid, a2b_open_stream_s, b2a_stream_opened_r, + b2a_bandwidth_stats_r, participant_channels.a2s_disconnect_s, );