implement Upload Bandwidth prediction.

Its available to `api` and `metrics` and can be used to slow down msg send in veloren.
It uses a tokio::watch for now, as i plan to have a watch job in the scheduler that recalculates prio on change.
Also cleaning up participant metrics after a disconnect
This commit is contained in:
Marcel Märtens 2021-03-25 18:28:50 +01:00
parent 034d0f0c5d
commit aea52d8b54
8 changed files with 92 additions and 11 deletions

View File

@ -117,7 +117,7 @@ pub trait SendProtocol {
&mut self,
bandwidth: Bandwidth,
dt: std::time::Duration,
) -> Result<(), ProtocolError>;
) -> Result<Bandwidth, ProtocolError>;
}
/// Generic Network Recv Protocol. See: [`SendProtocol`]

View File

@ -112,7 +112,9 @@ where
}
}
async fn flush(&mut self, _: Bandwidth, _: Duration) -> Result<(), ProtocolError> { Ok(()) }
async fn flush(&mut self, _: Bandwidth, _: Duration) -> Result<Bandwidth, ProtocolError> {
Ok(0)
}
}
#[async_trait]

View File

@ -167,7 +167,11 @@ where
Ok(())
}
async fn flush(&mut self, bandwidth: Bandwidth, dt: Duration) -> Result<(), ProtocolError> {
async fn flush(
&mut self,
bandwidth: Bandwidth,
dt: Duration,
) -> Result</* actual */ Bandwidth, ProtocolError> {
let (frames, total_bytes) = self.store.grab(bandwidth, dt);
self.buffer.reserve(total_bytes as usize);
let mut data_frames = 0;
@ -216,7 +220,7 @@ where
self.drain.send(self.buffer.split()).await?;
self.pending_shutdown = false;
}
Ok(())
Ok(data_bandwidth as u64)
}
}

View File

@ -22,7 +22,7 @@ use std::{
use tokio::{
io,
runtime::Runtime,
sync::{mpsc, oneshot, Mutex},
sync::{mpsc, oneshot, watch, Mutex},
};
use tracing::*;
@ -48,6 +48,7 @@ pub struct Participant {
remote_pid: Pid,
a2b_open_stream_s: Mutex<mpsc::UnboundedSender<A2bStreamOpen>>,
b2a_stream_opened_r: Mutex<mpsc::UnboundedReceiver<Stream>>,
b2a_bandwidth_stats_r: watch::Receiver<f32>,
a2s_disconnect_s: A2sDisconnect,
}
@ -493,6 +494,7 @@ impl Participant {
remote_pid: Pid,
a2b_open_stream_s: mpsc::UnboundedSender<A2bStreamOpen>,
b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
b2a_bandwidth_stats_r: watch::Receiver<f32>,
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>,
) -> Self {
Self {
@ -500,6 +502,7 @@ impl Participant {
remote_pid,
a2b_open_stream_s: Mutex::new(a2b_open_stream_s),
b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r),
b2a_bandwidth_stats_r,
a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
}
}
@ -721,6 +724,10 @@ impl Participant {
}
}
/// Returns the current approximation on the maximum bandwidth available.
/// This WILL fluctuate based on the amount/size of send messages.
pub fn bandwidth(&self) -> f32 { *self.b2a_bandwidth_stats_r.borrow() }
/// Returns the remote [`Pid`](network_protocol::Pid)
pub fn remote_pid(&self) -> Pid { self.remote_pid }
}

View File

@ -1,9 +1,9 @@
use async_trait::async_trait;
use bytes::BytesMut;
use network_protocol::{
Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid, ProtocolError,
ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol, TcpSendProtocol,
UnreliableDrain, UnreliableSink,
Bandwidth, Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid,
ProtocolError, ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol,
TcpSendProtocol, UnreliableDrain, UnreliableSink,
};
use std::{sync::Arc, time::Duration};
use tokio::{
@ -102,7 +102,11 @@ impl network_protocol::SendProtocol for SendProtocols {
}
}
async fn flush(&mut self, bandwidth: u64, dt: Duration) -> Result<(), ProtocolError> {
async fn flush(
&mut self,
bandwidth: Bandwidth,
dt: Duration,
) -> Result<Bandwidth, ProtocolError> {
match self {
SendProtocols::Tcp(s) => s.flush(bandwidth, dt).await,
SendProtocols::Mpsc(s) => s.flush(bandwidth, dt).await,

View File

@ -13,6 +13,8 @@ pub struct NetworkMetrics {
pub participants_disconnected_total: IntCounter,
// channel id's, seperated by PARTICIPANT, max 5
pub participants_channel_ids: IntGaugeVec,
// upload to remote, averaged, seperated by PARTICIPANT
pub participants_bandwidth: IntGaugeVec,
// opened Channels, seperated by PARTICIPANT
pub channels_connected_total: IntCounterVec,
pub channels_disconnected_total: IntCounterVec,
@ -57,6 +59,13 @@ impl NetworkMetrics {
),
&["participant", "no"],
)?;
let participants_bandwidth = IntGaugeVec::new(
Opts::new(
"participants_bandwidth",
"max upload possible to Participant",
),
&["participant"],
)?;
let channels_connected_total = IntCounterVec::new(
Opts::new(
"channels_connected_total",
@ -104,6 +113,7 @@ impl NetworkMetrics {
participants_connected_total,
participants_disconnected_total,
participants_channel_ids,
participants_bandwidth,
channels_connected_total,
channels_disconnected_total,
streams_opened_total,
@ -118,6 +128,7 @@ impl NetworkMetrics {
registry.register(Box::new(self.participants_connected_total.clone()))?;
registry.register(Box::new(self.participants_disconnected_total.clone()))?;
registry.register(Box::new(self.participants_channel_ids.clone()))?;
registry.register(Box::new(self.participants_bandwidth.clone()))?;
registry.register(Box::new(self.channels_connected_total.clone()))?;
registry.register(Box::new(self.channels_disconnected_total.clone()))?;
registry.register(Box::new(self.streams_opened_total.clone()))?;
@ -141,6 +152,12 @@ impl NetworkMetrics {
.inc();
}
pub(crate) fn participant_bandwidth(&self, remote_p: &str, bandwidth: f32) {
self.participants_bandwidth
.with_label_values(&[remote_p])
.set(bandwidth as i64);
}
pub(crate) fn streams_opened(&self, remote_p: &str) {
self.streams_opened_total
.with_label_values(&[remote_p])
@ -164,6 +181,25 @@ impl NetworkMetrics {
.with_label_values(&[protocol_name(protocol)])
.inc();
}
pub(crate) fn cleanup_participant(&self, remote_p: &str) {
for no in 0..5 {
let _ = self
.participants_channel_ids
.remove_label_values(&[&remote_p, &no.to_string()]);
}
let _ = self
.channels_connected_total
.remove_label_values(&[&remote_p]);
let _ = self
.channels_disconnected_total
.remove_label_values(&[&remote_p]);
let _ = self
.participants_bandwidth
.remove_label_values(&[&remote_p]);
let _ = self.streams_opened_total.remove_label_values(&[&remote_p]);
let _ = self.streams_closed_total.remove_label_values(&[&remote_p]);
}
}
#[cfg(feature = "metrics")]
@ -183,6 +219,8 @@ impl NetworkMetrics {
pub(crate) fn channels_disconnected(&self, _remote_p: &str) {}
pub(crate) fn participant_bandwidth(&self, _remote_p: &str, _bandwidth: f32) {}
pub(crate) fn streams_opened(&self, _remote_p: &str) {}
pub(crate) fn streams_closed(&self, _remote_p: &str) {}
@ -190,6 +228,8 @@ impl NetworkMetrics {
pub(crate) fn listen_request(&self, _protocol: &ProtocolAddr) {}
pub(crate) fn connect_request(&self, _protocol: &ProtocolAddr) {}
pub(crate) fn cleanup_participant(&self, _remote_p: &str) {}
}
impl std::fmt::Debug for NetworkMetrics {

View File

@ -19,7 +19,7 @@ use std::{
};
use tokio::{
select,
sync::{mpsc, oneshot, Mutex, RwLock},
sync::{mpsc, oneshot, watch, Mutex, RwLock},
task::JoinHandle,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
@ -49,6 +49,7 @@ struct ControlChannels {
a2b_open_stream_r: mpsc::UnboundedReceiver<A2bStreamOpen>,
b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
s2b_create_channel_r: mpsc::UnboundedReceiver<S2bCreateChannel>,
b2a_bandwidth_stats_s: watch::Sender<f32>,
s2b_shutdown_bparticipant_r: oneshot::Receiver<S2bShutdownBparticipant>, /* own */
}
@ -92,16 +93,19 @@ impl BParticipant {
mpsc::UnboundedReceiver<Stream>,
mpsc::UnboundedSender<S2bCreateChannel>,
oneshot::Sender<S2bShutdownBparticipant>,
watch::Receiver<f32>,
) {
let (a2b_open_stream_s, a2b_open_stream_r) = mpsc::unbounded_channel::<A2bStreamOpen>();
let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded_channel::<Stream>();
let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel();
let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded_channel();
let (b2a_bandwidth_stats_s, b2a_bandwidth_stats_r) = watch::channel::<f32>(0.0);
let run_channels = Some(ControlChannels {
a2b_open_stream_r,
b2a_stream_opened_s,
s2b_create_channel_r,
b2a_bandwidth_stats_s,
s2b_shutdown_bparticipant_r,
});
@ -124,6 +128,7 @@ impl BParticipant {
b2a_stream_opened_r,
s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2a_bandwidth_stats_r,
)
}
@ -160,6 +165,7 @@ impl BParticipant {
b2b_notify_send_of_recv_open_r,
b2b_notify_send_of_recv_close_r,
b2s_prio_statistic_s,
run_channels.b2a_bandwidth_stats_s,
)
.instrument(tracing::info_span!("send")),
self.recv_mgr(
@ -224,12 +230,14 @@ impl BParticipant {
)>,
b2b_notify_send_of_recv_close_r: crossbeam_channel::Receiver<(Cid, Sid)>,
_b2s_prio_statistic_s: mpsc::UnboundedSender<B2sPrioStatistic>,
b2a_bandwidth_stats_s: watch::Sender<f32>,
) {
let mut sorted_send_protocols = SortedVec::<Cid, SendProtocols>::default();
let mut sorted_stream_protocols = SortedVec::<Sid, Cid>::default();
let mut interval = tokio::time::interval(Self::TICK_TIME);
let mut last_instant = Instant::now();
let mut stream_ids = self.offset_sid;
let mut part_bandwidth = 0.0f32;
trace!("workaround, actively wait for first protocol");
if let Some((c, p)) = b2b_add_protocol_r.recv().await {
sorted_send_protocols.insert(c, p)
@ -345,9 +353,15 @@ impl BParticipant {
let send_time = Instant::now();
let diff = send_time.duration_since(last_instant);
last_instant = send_time;
let mut cnt = 0;
for (_, p) in sorted_send_protocols.data.iter_mut() {
p.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it.
cnt += p.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it.
}
let flush_time = send_time.elapsed().as_secs_f32();
part_bandwidth = 0.99 * part_bandwidth + 0.01 * (cnt as f32 / flush_time);
self.metrics
.participant_bandwidth(&self.remote_pid_string, part_bandwidth);
let _ = b2a_bandwidth_stats_s.send(part_bandwidth);
let r: Result<(), network_protocol::ProtocolError> = Ok(());
r
}
@ -669,6 +683,7 @@ impl BParticipant {
#[cfg(feature = "metrics")]
self.metrics.participants_disconnected_total.inc();
self.metrics.cleanup_participant(&self.remote_pid_string);
trace!("Stop participant_shutdown_mgr");
}
@ -755,6 +770,7 @@ mod tests {
mpsc::UnboundedSender<S2bCreateChannel>,
oneshot::Sender<S2bShutdownBparticipant>,
mpsc::UnboundedReceiver<B2sPrioStatistic>,
watch::Receiver<f32>,
JoinHandle<()>,
) {
let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap());
@ -769,6 +785,7 @@ mod tests {
b2a_stream_opened_r,
s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2a_bandwidth_stats_r,
) = runtime_clone.block_on(async move {
let local_pid = Pid::fake(0);
let remote_pid = Pid::fake(1);
@ -786,6 +803,7 @@ mod tests {
s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
b2a_bandwidth_stats_r,
handle,
)
}
@ -816,6 +834,7 @@ mod tests {
mut s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
handle,
) = mock_bparticipant();
@ -851,6 +870,7 @@ mod tests {
mut s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
handle,
) = mock_bparticipant();
@ -887,6 +907,7 @@ mod tests {
mut s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
handle,
) = mock_bparticipant();
@ -941,6 +962,7 @@ mod tests {
mut s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
handle,
) = mock_bparticipant();

View File

@ -568,6 +568,7 @@ impl Scheduler {
b2a_stream_opened_r,
s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2a_bandwidth_stats_r,
) = BParticipant::new(local_pid, pid, sid, Arc::clone(&metrics));
let participant = Participant::new(
@ -575,6 +576,7 @@ impl Scheduler {
pid,
a2b_open_stream_s,
b2a_stream_opened_r,
b2a_bandwidth_stats_r,
participant_channels.a2s_disconnect_s,
);