diff --git a/network/benches/speed.rs b/network/benches/speed.rs index 6216d4a7a4..b110d308ae 100644 --- a/network/benches/speed.rs +++ b/network/benches/speed.rs @@ -3,11 +3,11 @@ use std::{net::SocketAddr, sync::Arc}; use tokio::{runtime::Runtime, sync::Mutex}; use veloren_network::{Message, Network, Participant, Pid, Promises, ProtocolAddr, Stream}; -fn serialize(data: &[u8], stream: &Stream) { let _ = Message::serialize(data, &stream); } +fn serialize(data: &[u8], stream: &Stream) { let _ = Message::serialize(data, stream.params()); } async fn stream_msg(s1_a: Arc>, s1_b: Arc>, data: &[u8], cnt: usize) { let mut s1_b = s1_b.lock().await; - let m = Message::serialize(&data, &s1_b); + let m = Message::serialize(&data, s1_b.params()); std::thread::spawn(move || { let mut s1_a = s1_a.try_lock().unwrap(); for _ in 0..cnt { diff --git a/network/examples/network-speed/main.rs b/network/examples/network-speed/main.rs index d67e1a6c4b..e058aac7a8 100644 --- a/network/examples/network-speed/main.rs +++ b/network/examples/network-speed/main.rs @@ -173,7 +173,7 @@ fn client(address: ProtocolAddr, runtime: Arc) { id, data: vec![0; 1000], }, - &s1, + s1.params(), ); loop { s1.send_raw(&raw_msg).unwrap(); diff --git a/network/protocol/src/lib.rs b/network/protocol/src/lib.rs index de036cee76..79c1ae867a 100644 --- a/network/protocol/src/lib.rs +++ b/network/protocol/src/lib.rs @@ -117,7 +117,7 @@ pub trait SendProtocol { &mut self, bandwidth: Bandwidth, dt: std::time::Duration, - ) -> Result<(), ProtocolError>; + ) -> Result; } /// Generic Network Recv Protocol. See: [`SendProtocol`] diff --git a/network/protocol/src/mpsc.rs b/network/protocol/src/mpsc.rs index 5a00f27209..1f3219bab2 100644 --- a/network/protocol/src/mpsc.rs +++ b/network/protocol/src/mpsc.rs @@ -6,7 +6,7 @@ use crate::{ frame::InitFrame, handshake::{ReliableDrain, ReliableSink}, metrics::ProtocolMetricCache, - types::Bandwidth, + types::{Bandwidth, Promises}, RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink, }; use async_trait::async_trait; @@ -57,6 +57,16 @@ where metrics, } } + + /// returns all promises that this Protocol can take care of + /// If you open a Stream anyway, unsupported promises are ignored. + pub fn supported_promises() -> Promises { + Promises::ORDERED + | Promises::CONSISTENCY + | Promises::GUARANTEED_DELIVERY + | Promises::COMPRESSED + | Promises::ENCRYPTED /*assume a direct mpsc connection is secure*/ + } } impl MpscRecvProtocol @@ -102,7 +112,9 @@ where } } - async fn flush(&mut self, _: Bandwidth, _: Duration) -> Result<(), ProtocolError> { Ok(()) } + async fn flush(&mut self, _: Bandwidth, _: Duration) -> Result { + Ok(0) + } } #[async_trait] diff --git a/network/protocol/src/tcp.rs b/network/protocol/src/tcp.rs index c944674ad6..e78741fc01 100644 --- a/network/protocol/src/tcp.rs +++ b/network/protocol/src/tcp.rs @@ -6,7 +6,7 @@ use crate::{ message::{ITMessage, ALLOC_BLOCK}, metrics::{ProtocolMetricCache, RemoveReason}, prio::PrioManager, - types::{Bandwidth, Mid, Sid}, + types::{Bandwidth, Mid, Promises, Sid}, RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink, }; use async_trait::async_trait; @@ -70,6 +70,15 @@ where metrics, } } + + /// returns all promises that this Protocol can take care of + /// If you open a Stream anyway, unsupported promises are ignored. + pub fn supported_promises() -> Promises { + Promises::ORDERED + | Promises::CONSISTENCY + | Promises::GUARANTEED_DELIVERY + | Promises::COMPRESSED + } } impl TcpRecvProtocol @@ -158,7 +167,11 @@ where Ok(()) } - async fn flush(&mut self, bandwidth: Bandwidth, dt: Duration) -> Result<(), ProtocolError> { + async fn flush( + &mut self, + bandwidth: Bandwidth, + dt: Duration, + ) -> Result { let (frames, total_bytes) = self.store.grab(bandwidth, dt); self.buffer.reserve(total_bytes as usize); let mut data_frames = 0; @@ -207,7 +220,7 @@ where self.drain.send(self.buffer.split()).await?; self.pending_shutdown = false; } - Ok(()) + Ok(data_bandwidth as u64) } } diff --git a/network/protocol/src/types.rs b/network/protocol/src/types.rs index 0037795314..dfc9142f38 100644 --- a/network/protocol/src/types.rs +++ b/network/protocol/src/types.rs @@ -65,7 +65,7 @@ pub struct Pid { /// Unique ID per Stream, in one Channel. /// one side will always start with 0, while the other start with u64::MAX / 2. /// number increases for each created Stream. -#[derive(PartialEq, Eq, Hash, Clone, Copy)] +#[derive(PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)] pub struct Sid { internal: u64, } diff --git a/network/src/api.rs b/network/src/api.rs index f8ab01602c..bd5e03cddc 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -22,7 +22,7 @@ use std::{ use tokio::{ io, runtime::Runtime, - sync::{mpsc, oneshot, Mutex}, + sync::{mpsc, oneshot, watch, Mutex}, }; use tracing::*; @@ -48,6 +48,7 @@ pub struct Participant { remote_pid: Pid, a2b_open_stream_s: Mutex>, b2a_stream_opened_r: Mutex>, + b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: A2sDisconnect, } @@ -119,6 +120,12 @@ pub enum StreamError { Deserialize(bincode::Error), } +/// All Parameters of a Stream, can be used to generate RawMessages +#[derive(Debug, Clone)] +pub struct StreamParams { + pub(crate) promises: Promises, +} + /// Use the `Network` to create connections to other [`Participants`] /// /// The `Network` is the single source that handles all connections in your @@ -487,6 +494,7 @@ impl Participant { remote_pid: Pid, a2b_open_stream_s: mpsc::UnboundedSender, b2a_stream_opened_r: mpsc::UnboundedReceiver, + b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>, ) -> Self { Self { @@ -494,6 +502,7 @@ impl Participant { remote_pid, a2b_open_stream_s: Mutex::new(a2b_open_stream_s), b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r), + b2a_bandwidth_stats_r, a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), } } @@ -715,6 +724,10 @@ impl Participant { } } + /// Returns the current approximation on the maximum bandwidth available. + /// This WILL fluctuate based on the amount/size of send messages. + pub fn bandwidth(&self) -> f32 { *self.b2a_bandwidth_stats_r.borrow() } + /// Returns the remote [`Pid`](network_protocol::Pid) pub fn remote_pid(&self) -> Pid { self.remote_pid } } @@ -803,7 +816,7 @@ impl Stream { /// [`Serialized`]: Serialize #[inline] pub fn send(&mut self, msg: M) -> Result<(), StreamError> { - self.send_raw(&Message::serialize(&msg, &self)) + self.send_raw(&Message::serialize(&msg, self.params())) } /// This methods give the option to skip multiple calls of [`bincode`] and @@ -837,7 +850,7 @@ impl Stream { /// let mut stream_b = participant_b.opened().await?; /// /// //Prepare Message and decode it - /// let msg = Message::serialize("Hello World", &stream_a); + /// let msg = Message::serialize("Hello World", stream_a.params()); /// //Send same Message to multiple Streams /// stream_a.send_raw(&msg); /// stream_b.send_raw(&msg); @@ -858,7 +871,7 @@ impl Stream { return Err(StreamError::StreamClosed); } #[cfg(debug_assertions)] - message.verify(&self); + message.verify(self.params()); self.a2b_msg_s.send((self.sid, message.data.clone()))?; Ok(()) } @@ -999,7 +1012,7 @@ impl Stream { Message { data, #[cfg(feature = "compression")] - compressed: self.promises().contains(Promises::COMPRESSED), + compressed: self.promises.contains(Promises::COMPRESSED), } .deserialize()?, )), @@ -1013,7 +1026,11 @@ impl Stream { } } - pub fn promises(&self) -> Promises { self.promises } + pub fn params(&self) -> StreamParams { + StreamParams { + promises: self.promises, + } + } } impl core::cmp::PartialEq for Participant { diff --git a/network/src/channel.rs b/network/src/channel.rs index affaa1ee34..cf7a7851bd 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -1,9 +1,9 @@ use async_trait::async_trait; use bytes::BytesMut; use network_protocol::{ - Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid, ProtocolError, - ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol, TcpSendProtocol, - UnreliableDrain, UnreliableSink, + Bandwidth, Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid, + ProtocolError, ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol, + TcpSendProtocol, UnreliableDrain, UnreliableSink, }; use std::{sync::Arc, time::Duration}; use tokio::{ @@ -102,7 +102,11 @@ impl network_protocol::SendProtocol for SendProtocols { } } - async fn flush(&mut self, bandwidth: u64, dt: Duration) -> Result<(), ProtocolError> { + async fn flush( + &mut self, + bandwidth: Bandwidth, + dt: Duration, + ) -> Result { match self { SendProtocols::Tcp(s) => s.flush(bandwidth, dt).await, SendProtocols::Mpsc(s) => s.flush(bandwidth, dt).await, diff --git a/network/src/lib.rs b/network/src/lib.rs index b79266e6d0..448b50f41c 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -104,11 +104,11 @@ mod message; mod metrics; mod participant; mod scheduler; -mod trace; +mod util; pub use api::{ Network, NetworkConnectError, NetworkError, Participant, ParticipantError, ProtocolAddr, - Stream, StreamError, + Stream, StreamError, StreamParams, }; pub use message::Message; pub use network_protocol::{InitProtocolError, Pid, Promises}; diff --git a/network/src/message.rs b/network/src/message.rs index 516fe06ddd..cbee704a90 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -1,4 +1,4 @@ -use crate::api::{Stream, StreamError}; +use crate::api::{StreamError, StreamParams}; use bytes::Bytes; #[cfg(feature = "compression")] use network_protocol::Promises; @@ -36,12 +36,12 @@ impl Message { /// [`Message::serialize`]: crate::message::Message::serialize /// /// [`Streams`]: crate::api::Stream - pub fn serialize(message: &M, stream: &Stream) -> Self { + pub fn serialize(message: &M, stream_params: StreamParams) -> Self { //this will never fail: https://docs.rs/bincode/0.8.0/bincode/fn.serialize.html let serialized_data = bincode::serialize(message).unwrap(); #[cfg(feature = "compression")] - let compressed = stream.promises().contains(Promises::COMPRESSED); + let compressed = stream_params.promises.contains(Promises::COMPRESSED); #[cfg(feature = "compression")] let data = if compressed { let mut compressed_data = Vec::with_capacity(serialized_data.len() / 4 + 10); @@ -54,7 +54,7 @@ impl Message { #[cfg(not(feature = "compression"))] let data = serialized_data; #[cfg(not(feature = "compression"))] - let _stream = stream; + let _stream_params = stream_params; Self { data: Bytes::from(data), @@ -127,13 +127,13 @@ impl Message { } #[cfg(debug_assertions)] - pub(crate) fn verify(&self, stream: &Stream) { + pub(crate) fn verify(&self, params: StreamParams) { #[cfg(not(feature = "compression"))] let _stream = stream; #[cfg(feature = "compression")] - if self.compressed != stream.promises().contains(Promises::COMPRESSED) { + if self.compressed != params.promises.contains(Promises::COMPRESSED) { warn!( - ?stream, + ?params, "verify failed, msg is {} and it doesn't match with stream", self.compressed ); } @@ -171,14 +171,9 @@ pub(crate) fn partial_eq_bincode(first: &bincode::ErrorKind, second: &bincode::E #[cfg(test)] mod tests { - use crate::{api::Stream, message::*}; - use std::sync::{atomic::AtomicBool, Arc}; - use tokio::sync::mpsc; - - fn stub_stream(compressed: bool) -> Stream { - use crate::api::*; - use network_protocol::*; + use crate::{api::StreamParams, message::*}; + fn stub_stream(compressed: bool) -> StreamParams { #[cfg(feature = "compression")] let promises = if compressed { Promises::COMPRESSED @@ -189,27 +184,12 @@ mod tests { #[cfg(not(feature = "compression"))] let promises = Promises::empty(); - let (a2b_msg_s, _a2b_msg_r) = crossbeam_channel::unbounded(); - let (_b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded(); - let (a2b_close_stream_s, _a2b_close_stream_r) = mpsc::unbounded_channel(); - - Stream::new( - Pid::fake(0), - Pid::fake(1), - Sid::new(0), - 0u8, - promises, - 1_000_000, - Arc::new(AtomicBool::new(true)), - a2b_msg_s, - b2a_msg_recv_r, - a2b_close_stream_s, - ) + StreamParams { promises } } #[test] fn serialize_test() { - let msg = Message::serialize("abc", &stub_stream(false)); + let msg = Message::serialize("abc", stub_stream(false)); assert_eq!(msg.data.len(), 11); assert_eq!(msg.data[0], 3); assert_eq!(msg.data[1..7], [0, 0, 0, 0, 0, 0]); @@ -221,7 +201,7 @@ mod tests { #[cfg(feature = "compression")] #[test] fn serialize_compress_small() { - let msg = Message::serialize("abc", &stub_stream(true)); + let msg = Message::serialize("abc", stub_stream(true)); assert_eq!(msg.data.len(), 12); assert_eq!(msg.data[0], 176); assert_eq!(msg.data[1], 3); @@ -245,7 +225,7 @@ mod tests { 0, "assets/data/plants/flowers/greenrose.ron", ); - let msg = Message::serialize(&msg, &stub_stream(true)); + let msg = Message::serialize(&msg, stub_stream(true)); assert_eq!(msg.data.len(), 79); assert_eq!(msg.data[0], 34); assert_eq!(msg.data[1], 5); @@ -275,7 +255,7 @@ mod tests { _ => {}, } } - let msg = Message::serialize(&msg, &stub_stream(true)); + let msg = Message::serialize(&msg, stub_stream(true)); assert_eq!(msg.data.len(), 1331); } } diff --git a/network/src/metrics.rs b/network/src/metrics.rs index d1b77d76d0..9733b477a1 100644 --- a/network/src/metrics.rs +++ b/network/src/metrics.rs @@ -1,3 +1,4 @@ +use crate::api::ProtocolAddr; use network_protocol::{Cid, Pid}; #[cfg(feature = "metrics")] use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; @@ -12,6 +13,8 @@ pub struct NetworkMetrics { pub participants_disconnected_total: IntCounter, // channel id's, seperated by PARTICIPANT, max 5 pub participants_channel_ids: IntGaugeVec, + // upload to remote, averaged, seperated by PARTICIPANT + pub participants_bandwidth: IntGaugeVec, // opened Channels, seperated by PARTICIPANT pub channels_connected_total: IntCounterVec, pub channels_disconnected_total: IntCounterVec, @@ -56,6 +59,13 @@ impl NetworkMetrics { ), &["participant", "no"], )?; + let participants_bandwidth = IntGaugeVec::new( + Opts::new( + "participants_bandwidth", + "max upload possible to Participant", + ), + &["participant"], + )?; let channels_connected_total = IntCounterVec::new( Opts::new( "channels_connected_total", @@ -103,6 +113,7 @@ impl NetworkMetrics { participants_connected_total, participants_disconnected_total, participants_channel_ids, + participants_bandwidth, channels_connected_total, channels_disconnected_total, streams_opened_total, @@ -117,6 +128,7 @@ impl NetworkMetrics { registry.register(Box::new(self.participants_connected_total.clone()))?; registry.register(Box::new(self.participants_disconnected_total.clone()))?; registry.register(Box::new(self.participants_channel_ids.clone()))?; + registry.register(Box::new(self.participants_bandwidth.clone()))?; registry.register(Box::new(self.channels_connected_total.clone()))?; registry.register(Box::new(self.channels_disconnected_total.clone()))?; registry.register(Box::new(self.streams_opened_total.clone()))?; @@ -140,6 +152,12 @@ impl NetworkMetrics { .inc(); } + pub(crate) fn participant_bandwidth(&self, remote_p: &str, bandwidth: f32) { + self.participants_bandwidth + .with_label_values(&[remote_p]) + .set(bandwidth as i64); + } + pub(crate) fn streams_opened(&self, remote_p: &str) { self.streams_opened_total .with_label_values(&[remote_p]) @@ -151,6 +169,46 @@ impl NetworkMetrics { .with_label_values(&[remote_p]) .inc(); } + + pub(crate) fn listen_request(&self, protocol: &ProtocolAddr) { + self.listen_requests_total + .with_label_values(&[protocol_name(protocol)]) + .inc(); + } + + pub(crate) fn connect_request(&self, protocol: &ProtocolAddr) { + self.connect_requests_total + .with_label_values(&[protocol_name(protocol)]) + .inc(); + } + + pub(crate) fn cleanup_participant(&self, remote_p: &str) { + for no in 0..5 { + let _ = self + .participants_channel_ids + .remove_label_values(&[&remote_p, &no.to_string()]); + } + let _ = self + .channels_connected_total + .remove_label_values(&[&remote_p]); + let _ = self + .channels_disconnected_total + .remove_label_values(&[&remote_p]); + let _ = self + .participants_bandwidth + .remove_label_values(&[&remote_p]); + let _ = self.streams_opened_total.remove_label_values(&[&remote_p]); + let _ = self.streams_closed_total.remove_label_values(&[&remote_p]); + } +} + +#[cfg(feature = "metrics")] +fn protocol_name(protocol: &ProtocolAddr) -> &str { + match protocol { + ProtocolAddr::Tcp(_) => "tcp", + ProtocolAddr::Udp(_) => "udp", + ProtocolAddr::Mpsc(_) => "mpsc", + } } #[cfg(not(feature = "metrics"))] @@ -161,9 +219,17 @@ impl NetworkMetrics { pub(crate) fn channels_disconnected(&self, _remote_p: &str) {} + pub(crate) fn participant_bandwidth(&self, _remote_p: &str, _bandwidth: f32) {} + pub(crate) fn streams_opened(&self, _remote_p: &str) {} pub(crate) fn streams_closed(&self, _remote_p: &str) {} + + pub(crate) fn listen_request(&self, _protocol: &ProtocolAddr) {} + + pub(crate) fn connect_request(&self, _protocol: &ProtocolAddr) {} + + pub(crate) fn cleanup_participant(&self, _remote_p: &str) {} } impl std::fmt::Debug for NetworkMetrics { diff --git a/network/src/participant.rs b/network/src/participant.rs index c0276a4ad2..bc38a97b59 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -2,7 +2,7 @@ use crate::{ api::{ParticipantError, Stream}, channel::{Protocols, RecvProtocols, SendProtocols}, metrics::NetworkMetrics, - trace::DeferredTracer, + util::{DeferredTracer, SortedVec}, }; use bytes::Bytes; use futures_util::{FutureExt, StreamExt}; @@ -19,7 +19,7 @@ use std::{ }; use tokio::{ select, - sync::{mpsc, oneshot, Mutex, RwLock}, + sync::{mpsc, oneshot, watch, Mutex, RwLock}, task::JoinHandle, }; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -49,6 +49,7 @@ struct ControlChannels { a2b_open_stream_r: mpsc::UnboundedReceiver, b2a_stream_opened_s: mpsc::UnboundedSender, s2b_create_channel_r: mpsc::UnboundedReceiver, + b2a_bandwidth_stats_s: watch::Sender, s2b_shutdown_bparticipant_r: oneshot::Receiver, /* own */ } @@ -92,16 +93,19 @@ impl BParticipant { mpsc::UnboundedReceiver, mpsc::UnboundedSender, oneshot::Sender, + watch::Receiver, ) { let (a2b_open_stream_s, a2b_open_stream_r) = mpsc::unbounded_channel::(); let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded_channel::(); let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel(); let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded_channel(); + let (b2a_bandwidth_stats_s, b2a_bandwidth_stats_r) = watch::channel::(0.0); let run_channels = Some(ControlChannels { a2b_open_stream_r, b2a_stream_opened_s, s2b_create_channel_r, + b2a_bandwidth_stats_s, s2b_shutdown_bparticipant_r, }); @@ -124,6 +128,7 @@ impl BParticipant { b2a_stream_opened_r, s2b_create_channel_s, s2b_shutdown_bparticipant_s, + b2a_bandwidth_stats_r, ) } @@ -136,8 +141,10 @@ impl BParticipant { async_channel::unbounded::(); let (b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_r) = async_channel::unbounded::(); - let (b2b_notify_send_of_recv_s, b2b_notify_send_of_recv_r) = - crossbeam_channel::unbounded::(); + let (b2b_notify_send_of_recv_open_s, b2b_notify_send_of_recv_open_r) = + crossbeam_channel::unbounded::<(Cid, Sid, Prio, Promises, u64)>(); + let (b2b_notify_send_of_recv_close_s, b2b_notify_send_of_recv_close_r) = + crossbeam_channel::unbounded::<(Cid, Sid)>(); let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded_channel::(); let (a2b_msg_s, a2b_msg_r) = crossbeam_channel::unbounded::<(Sid, Bytes)>(); @@ -155,8 +162,10 @@ impl BParticipant { a2b_msg_r, b2b_add_send_protocol_r, b2b_close_send_protocol_r, - b2b_notify_send_of_recv_r, + b2b_notify_send_of_recv_open_r, + b2b_notify_send_of_recv_close_r, b2s_prio_statistic_s, + run_channels.b2a_bandwidth_stats_s, ) .instrument(tracing::info_span!("send")), self.recv_mgr( @@ -164,7 +173,8 @@ impl BParticipant { b2b_add_recv_protocol_r, b2b_force_close_recv_protocol_r, b2b_close_send_protocol_s.clone(), - b2b_notify_send_of_recv_s, + b2b_notify_send_of_recv_open_s, + b2b_notify_send_of_recv_close_s, ) .instrument(tracing::info_span!("recv")), self.create_channel_mgr( @@ -180,6 +190,28 @@ impl BParticipant { ); } + fn best_protocol(all: &SortedVec, promises: Promises) -> Option { + // check for mpsc + for (cid, p) in all.data.iter() { + if matches!(p, SendProtocols::Mpsc(_)) { + return Some(*cid); + } + } + // check for tcp + if network_protocol::TcpSendProtocol::::supported_promises() + == promises + { + for (cid, p) in all.data.iter() { + if matches!(p, SendProtocols::Tcp(_)) { + return Some(*cid); + } + } + } + + warn!("couldn't satisfy promises"); + all.data.first().map(|(c, _)| *c) + } + //TODO: local stream_cid: HashMap to know the respective protocol #[allow(clippy::too_many_arguments)] async fn send_mgr( @@ -189,18 +221,27 @@ impl BParticipant { a2b_msg_r: crossbeam_channel::Receiver<(Sid, Bytes)>, mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, SendProtocols)>, b2b_close_send_protocol_r: async_channel::Receiver, - b2b_notify_send_of_recv_r: crossbeam_channel::Receiver, + b2b_notify_send_of_recv_open_r: crossbeam_channel::Receiver<( + Cid, + Sid, + Prio, + Promises, + Bandwidth, + )>, + b2b_notify_send_of_recv_close_r: crossbeam_channel::Receiver<(Cid, Sid)>, _b2s_prio_statistic_s: mpsc::UnboundedSender, + b2a_bandwidth_stats_s: watch::Sender, ) { - let mut send_protocols: HashMap = HashMap::new(); + let mut sorted_send_protocols = SortedVec::::default(); + let mut sorted_stream_protocols = SortedVec::::default(); let mut interval = tokio::time::interval(Self::TICK_TIME); let mut last_instant = Instant::now(); let mut stream_ids = self.offset_sid; + let mut part_bandwidth = 0.0f32; trace!("workaround, actively wait for first protocol"); - b2b_add_protocol_r - .recv() - .await - .map(|(c, p)| send_protocols.insert(c, p)); + if let Some((c, p)) = b2b_add_protocol_r.recv().await { + sorted_send_protocols.insert(c, p) + } loop { let (open, close, _, addp, remp) = select!( Some(n) = a2b_open_stream_r.recv().fuse() => (Some(n), None, None, None, None), @@ -210,25 +251,29 @@ impl BParticipant { Ok(n) = b2b_close_send_protocol_r.recv().fuse() => (None, None, None, None, Some(n)), ); - addp.map(|(cid, p)| { + if let Some((cid, p)) = addp { debug!(?cid, "add protocol"); - send_protocols.insert(cid, p) - }); + sorted_send_protocols.insert(cid, p); + } - let (cid, active) = match send_protocols.iter_mut().next() { - Some((cid, a)) => (*cid, a), - None => { - warn!("no channel"); - tokio::time::sleep(Self::TICK_TIME * 1000).await; //TODO: failover - continue; - }, - }; + //verify that we have at LEAST 1 channel before continuing + if sorted_send_protocols.data.is_empty() { + warn!("no channel"); + tokio::time::sleep(Self::TICK_TIME * 1000).await; //TODO: failover + continue; + } + + //let (cid, active) = sorted_send_protocols.data.iter_mut().next().unwrap(); + //used for error handling + let mut cid = u64::MAX; let active_err = async { if let Some((prio, promises, guaranteed_bandwidth, return_s)) = open { let sid = stream_ids; - trace!(?sid, "open stream"); stream_ids += Sid::from(1); + cid = Self::best_protocol(&sorted_send_protocols, promises).unwrap(); + trace!(?sid, ?cid, "open stream"); + let stream = self .create_stream(sid, prio, promises, guaranteed_bandwidth) .await; @@ -240,49 +285,83 @@ impl BParticipant { guaranteed_bandwidth, }; + sorted_stream_protocols.insert(sid, cid); return_s.send(stream).unwrap(); - active.send(event).await?; + sorted_send_protocols + .get_mut(&cid) + .unwrap() + .send(event) + .await?; } // process recv content first - let mut closeevents = b2b_notify_send_of_recv_r - .try_iter() - .map(|e| { - if matches!(e, ProtocolEvent::OpenStream { .. }) { - active.notify_from_recv(e); - None - } else { - Some(e) - } - }) - .collect::>(); + for (cid, sid, prio, promises, guaranteed_bandwidth) in + b2b_notify_send_of_recv_open_r.try_iter() + { + match sorted_send_protocols.get_mut(&cid) { + Some(p) => { + sorted_stream_protocols.insert(sid, cid); + p.notify_from_recv(ProtocolEvent::OpenStream { + sid, + prio, + promises, + guaranteed_bandwidth, + }); + }, + None => warn!(?cid, "couldn't notify create protocol, doesn't exist"), + }; + } // get all messages and assign it to a channel for (sid, buffer) in a2b_msg_r.try_iter() { - active - .send(ProtocolEvent::Message { data: buffer, sid }) - .await? + cid = *sorted_stream_protocols.get(&sid).unwrap(); + let event = ProtocolEvent::Message { data: buffer, sid }; + sorted_send_protocols + .get_mut(&cid) + .unwrap() + .send(event) + .await?; } // process recv content afterwards - let _ = closeevents.drain(..).map(|e| { - if let Some(e) = e { - active.notify_from_recv(e); - } - }); + for (cid, sid) in b2b_notify_send_of_recv_close_r.try_iter() { + match sorted_send_protocols.get_mut(&cid) { + Some(p) => { + let _ = sorted_stream_protocols.delete(&sid); + p.notify_from_recv(ProtocolEvent::CloseStream { sid }); + }, + None => warn!(?cid, "couldn't notify close protocol, doesn't exist"), + }; + } if let Some(sid) = close { trace!(?stream_ids, "delete stream"); self.delete_stream(sid).await; // Fire&Forget the protocol will take care to verify that this Frame is delayed // till the last msg was received! - active.send(ProtocolEvent::CloseStream { sid }).await?; + if let Some(c) = sorted_stream_protocols.delete(&sid) { + cid = c; + let event = ProtocolEvent::CloseStream { sid }; + sorted_send_protocols + .get_mut(&c) + .unwrap() + .send(event) + .await?; + } } let send_time = Instant::now(); let diff = send_time.duration_since(last_instant); last_instant = send_time; - active.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it. + let mut cnt = 0; + for (_, p) in sorted_send_protocols.data.iter_mut() { + cnt += p.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it. + } + let flush_time = send_time.elapsed().as_secs_f32(); + part_bandwidth = 0.99 * part_bandwidth + 0.01 * (cnt as f32 / flush_time); + self.metrics + .participant_bandwidth(&self.remote_pid_string, part_bandwidth); + let _ = b2a_bandwidth_stats_s.send(part_bandwidth); let r: Result<(), network_protocol::ProtocolError> = Ok(()); r } @@ -292,16 +371,16 @@ impl BParticipant { // remote recv will now fail, which will trigger remote send which will trigger // recv trace!("TODO: for now decide to FAIL this participant and not wait for a failover"); - send_protocols.remove(&cid).unwrap(); + sorted_send_protocols.delete(&cid).unwrap(); self.metrics.channels_disconnected(&self.remote_pid_string); - if send_protocols.is_empty() { + if sorted_send_protocols.data.is_empty() { break; } } if let Some(cid) = remp { debug!(?cid, "remove protocol"); - match send_protocols.remove(&cid) { + match sorted_send_protocols.delete(&cid) { Some(mut prot) => { self.metrics.channels_disconnected(&self.remote_pid_string); trace!("blocking flush"); @@ -311,7 +390,7 @@ impl BParticipant { }, None => trace!("tried to remove protocol twice"), }; - if send_protocols.is_empty() { + if sorted_send_protocols.data.is_empty() { break; } } @@ -330,7 +409,14 @@ impl BParticipant { mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, RecvProtocols)>, b2b_force_close_recv_protocol_r: async_channel::Receiver, b2b_close_send_protocol_s: async_channel::Sender, - b2b_notify_send_of_recv_s: crossbeam_channel::Sender, + b2b_notify_send_of_recv_open_r: crossbeam_channel::Sender<( + Cid, + Sid, + Prio, + Promises, + Bandwidth, + )>, + b2b_notify_send_of_recv_close_s: crossbeam_channel::Sender<(Cid, Sid)>, ) { let mut recv_protocols: HashMap> = HashMap::new(); // we should be able to directly await futures imo @@ -390,7 +476,13 @@ impl BParticipant { guaranteed_bandwidth, }) => { trace!(?sid, "open stream"); - let _ = b2b_notify_send_of_recv_s.send(r.unwrap()); + let _ = b2b_notify_send_of_recv_open_r.send(( + cid, + sid, + prio, + promises, + guaranteed_bandwidth, + )); // waiting for receiving is not necessary, because the send_mgr will first // process this before process messages! let stream = self @@ -401,7 +493,7 @@ impl BParticipant { }, Ok(ProtocolEvent::CloseStream { sid }) => { trace!(?sid, "close stream"); - let _ = b2b_notify_send_of_recv_s.send(r.unwrap()); + let _ = b2b_notify_send_of_recv_close_s.send((cid, sid)); self.delete_stream(sid).await; retrigger(cid, p, &mut recv_protocols); }, @@ -591,6 +683,7 @@ impl BParticipant { #[cfg(feature = "metrics")] self.metrics.participants_disconnected_total.inc(); + self.metrics.cleanup_participant(&self.remote_pid_string); trace!("Stop participant_shutdown_mgr"); } @@ -677,6 +770,7 @@ mod tests { mpsc::UnboundedSender, oneshot::Sender, mpsc::UnboundedReceiver, + watch::Receiver, JoinHandle<()>, ) { let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap()); @@ -691,6 +785,7 @@ mod tests { b2a_stream_opened_r, s2b_create_channel_s, s2b_shutdown_bparticipant_s, + b2a_bandwidth_stats_r, ) = runtime_clone.block_on(async move { let local_pid = Pid::fake(0); let remote_pid = Pid::fake(1); @@ -708,6 +803,7 @@ mod tests { s2b_create_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, + b2a_bandwidth_stats_r, handle, ) } @@ -738,6 +834,7 @@ mod tests { mut s2b_create_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, + _b2a_bandwidth_stats_r, handle, ) = mock_bparticipant(); @@ -773,6 +870,7 @@ mod tests { mut s2b_create_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, + _b2a_bandwidth_stats_r, handle, ) = mock_bparticipant(); @@ -809,6 +907,7 @@ mod tests { mut s2b_create_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, + _b2a_bandwidth_stats_r, handle, ) = mock_bparticipant(); @@ -863,6 +962,7 @@ mod tests { mut s2b_create_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, + _b2a_bandwidth_stats_r, handle, ) = mock_bparticipant(); @@ -881,7 +981,7 @@ mod tests { .unwrap(); let stream = runtime.block_on(b2a_stream_opened_r.recv()).unwrap(); - assert_eq!(stream.promises(), Promises::ORDERED); + assert_eq!(stream.params().promises, Promises::ORDERED); let (s, r) = oneshot::channel(); runtime.block_on(async { diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index f711287d38..abb54b6578 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -177,15 +177,7 @@ impl Scheduler { async move { debug!(?address, "Got request to open a channel_creator"); - #[cfg(feature = "metrics")] - self.metrics - .listen_requests_total - .with_label_values(&[match address { - ProtocolAddr::Tcp(_) => "tcp", - ProtocolAddr::Udp(_) => "udp", - ProtocolAddr::Mpsc(_) => "mpsc", - }]) - .inc(); + self.metrics.listen_request(&address); let (end_sender, end_receiver) = oneshot::channel::<()>(); self.channel_listener .lock() @@ -202,13 +194,11 @@ impl Scheduler { async fn connect_mgr(&self, mut a2s_connect_r: mpsc::UnboundedReceiver) { trace!("Start connect_mgr"); while let Some((addr, pid_sender)) = a2s_connect_r.recv().await { - let (protocol, cid, handshake) = match addr { + let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed); + let metrics = Arc::clone(&self.protocol_metrics); + self.metrics.connect_request(&addr); + let (protocol, handshake) = match addr { ProtocolAddr::Tcp(addr) => { - #[cfg(feature = "metrics")] - self.metrics - .connect_requests_total - .with_label_values(&["tcp"]) - .inc(); let stream = match net::TcpStream::connect(addr).await { Ok(stream) => stream, Err(e) => { @@ -216,13 +206,8 @@ impl Scheduler { continue; }, }; - let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed); info!("Connecting Tcp to: {}", stream.peer_addr().unwrap()); - ( - Protocols::new_tcp(stream, cid, Arc::clone(&self.protocol_metrics)), - cid, - false, - ) + (Protocols::new_tcp(stream, cid, metrics), false) }, ProtocolAddr::Mpsc(addr) => { let mpsc_s = match MPSC_POOL.lock().await.get(&addr) { @@ -244,17 +229,9 @@ impl Scheduler { .send((remote_to_local_s, local_to_remote_oneshot_s)) .unwrap(); let local_to_remote_s = local_to_remote_oneshot_r.await.unwrap(); - - let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed); info!(?addr, "Connecting Mpsc"); ( - Protocols::new_mpsc( - local_to_remote_s, - remote_to_local_r, - cid, - Arc::clone(&self.protocol_metrics), - ), - cid, + Protocols::new_mpsc(local_to_remote_s, remote_to_local_r, cid, metrics), false, ) }, @@ -591,6 +568,7 @@ impl Scheduler { b2a_stream_opened_r, s2b_create_channel_s, s2b_shutdown_bparticipant_s, + b2a_bandwidth_stats_r, ) = BParticipant::new(local_pid, pid, sid, Arc::clone(&metrics)); let participant = Participant::new( @@ -598,6 +576,7 @@ impl Scheduler { pid, a2b_open_stream_s, b2a_stream_opened_r, + b2a_bandwidth_stats_r, participant_channels.a2s_disconnect_s, ); diff --git a/network/src/trace.rs b/network/src/trace.rs deleted file mode 100644 index 640d65ee55..0000000000 --- a/network/src/trace.rs +++ /dev/null @@ -1,46 +0,0 @@ -use core::hash::Hash; -use std::{collections::HashMap, time::Instant}; -use tracing::Level; - -/// used to collect multiple traces and not spam the console -pub(crate) struct DeferredTracer { - level: Level, - items: HashMap, - last: Instant, - last_cnt: u32, -} - -impl DeferredTracer { - pub(crate) fn new(level: Level) -> Self { - Self { - level, - items: HashMap::new(), - last: Instant::now(), - last_cnt: 0, - } - } - - pub(crate) fn log(&mut self, t: T) { - if tracing::level_enabled!(self.level) { - *self.items.entry(t).or_default() += 1; - self.last = Instant::now(); - self.last_cnt += 1; - } else { - } - } - - pub(crate) fn print(&mut self) -> Option> { - const MAX_LOGS: u32 = 10_000; - const MAX_SECS: u64 = 1; - if tracing::level_enabled!(self.level) - && (self.last_cnt > MAX_LOGS || self.last.elapsed().as_secs() >= MAX_SECS) - { - if self.last_cnt > MAX_LOGS { - tracing::debug!("this seems to be logged continuesly"); - } - Some(std::mem::take(&mut self.items)) - } else { - None - } - } -} diff --git a/network/src/util.rs b/network/src/util.rs new file mode 100644 index 0000000000..b9a8801263 --- /dev/null +++ b/network/src/util.rs @@ -0,0 +1,117 @@ +use core::hash::Hash; +use std::{collections::HashMap, time::Instant}; +use tracing::Level; + +/// used to collect multiple traces and not spam the console +pub(crate) struct DeferredTracer { + level: Level, + items: HashMap, + last: Instant, + last_cnt: u32, +} + +impl DeferredTracer { + pub(crate) fn new(level: Level) -> Self { + Self { + level, + items: HashMap::new(), + last: Instant::now(), + last_cnt: 0, + } + } + + pub(crate) fn log(&mut self, t: T) { + if tracing::level_enabled!(self.level) { + *self.items.entry(t).or_default() += 1; + self.last = Instant::now(); + self.last_cnt += 1; + } else { + } + } + + pub(crate) fn print(&mut self) -> Option> { + const MAX_LOGS: u32 = 10_000; + const MAX_SECS: u64 = 1; + if tracing::level_enabled!(self.level) + && (self.last_cnt > MAX_LOGS || self.last.elapsed().as_secs() >= MAX_SECS) + { + if self.last_cnt > MAX_LOGS { + tracing::debug!("this seems to be logged continuesly"); + } + Some(std::mem::take(&mut self.items)) + } else { + None + } + } +} + +/// Used for storing Protocols in a Participant or Stream <-> Protocol +pub(crate) struct SortedVec { + pub data: Vec<(K, V)>, +} + +impl Default for SortedVec { + fn default() -> Self { Self { data: vec![] } } +} + +impl SortedVec +where + K: Ord + Copy, +{ + pub fn insert(&mut self, k: K, v: V) { + self.data.push((k, v)); + self.data.sort_by_key(|&(k, _)| k); + } + + pub fn delete(&mut self, k: &K) -> Option { + if let Ok(i) = self.data.binary_search_by_key(k, |&(k, _)| k) { + Some(self.data.remove(i).1) + } else { + None + } + } + + pub fn get(&self, k: &K) -> Option<&V> { + if let Ok(i) = self.data.binary_search_by_key(k, |&(k, _)| k) { + Some(&self.data[i].1) + } else { + None + } + } + + pub fn get_mut(&mut self, k: &K) -> Option<&mut V> { + if let Ok(i) = self.data.binary_search_by_key(k, |&(k, _)| k) { + Some(&mut self.data[i].1) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sorted_vec() { + let mut vec = SortedVec::default(); + vec.insert(10, "Hello"); + println!("{:?}", vec.data); + vec.insert(30, "World"); + println!("{:?}", vec.data); + vec.insert(20, " "); + println!("{:?}", vec.data); + assert_eq!(vec.data[0].1, "Hello"); + assert_eq!(vec.data[1].1, " "); + assert_eq!(vec.data[2].1, "World"); + assert_eq!(vec.get(&30), Some(&"World")); + assert_eq!(vec.get_mut(&20), Some(&mut " ")); + assert_eq!(vec.get(&10), Some(&"Hello")); + assert_eq!(vec.delete(&40), None); + assert_eq!(vec.delete(&10), Some("Hello")); + assert_eq!(vec.delete(&10), None); + assert_eq!(vec.get(&30), Some(&"World")); + assert_eq!(vec.get_mut(&20), Some(&mut " ")); + assert_eq!(vec.get(&10), None); + } +} diff --git a/server/src/client.rs b/server/src/client.rs index 2db4c6d43c..9e03187278 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,5 +1,5 @@ use common_net::msg::{ClientType, ServerGeneral, ServerMsg}; -use network::{Message, Participant, Stream, StreamError}; +use network::{Message, Participant, Stream, StreamError, StreamParams}; use serde::{de::DeserializeOwned, Serialize}; use specs::Component; use specs_idvs::IdvStorage; @@ -26,6 +26,13 @@ pub struct Client { character_screen_stream: Mutex, in_game_stream: Mutex, terrain_stream: Mutex, + + general_stream_params: StreamParams, + ping_stream_params: StreamParams, + register_stream_params: StreamParams, + character_screen_stream_params: StreamParams, + in_game_stream_params: StreamParams, + terrain_stream_params: StreamParams, } pub struct PreparedMsg { @@ -50,6 +57,12 @@ impl Client { in_game_stream: Stream, terrain_stream: Stream, ) -> Self { + let general_stream_params = general_stream.params(); + let ping_stream_params = ping_stream.params(); + let register_stream_params = register_stream.params(); + let character_screen_stream_params = character_screen_stream.params(); + let in_game_stream_params = in_game_stream.params(); + let terrain_stream_params = terrain_stream.params(); Client { client_type, participant: Some(participant), @@ -62,6 +75,12 @@ impl Client { character_screen_stream: Mutex::new(character_screen_stream), in_game_stream: Mutex::new(in_game_stream), terrain_stream: Mutex::new(terrain_stream), + general_stream_params, + ping_stream_params, + register_stream_params, + character_screen_stream_params, + in_game_stream_params, + terrain_stream_params, } } @@ -138,9 +157,9 @@ impl Client { pub(crate) fn prepare>(&self, msg: M) -> PreparedMsg { match msg.into() { - ServerMsg::Info(m) => PreparedMsg::new(0, &m, &self.register_stream), - ServerMsg::Init(m) => PreparedMsg::new(0, &m, &self.register_stream), - ServerMsg::RegisterAnswer(m) => PreparedMsg::new(0, &m, &self.register_stream), + ServerMsg::Info(m) => PreparedMsg::new(0, &m, &self.register_stream_params), + ServerMsg::Init(m) => PreparedMsg::new(0, &m, &self.register_stream_params), + ServerMsg::RegisterAnswer(m) => PreparedMsg::new(0, &m, &self.register_stream_params), ServerMsg::General(g) => { match g { //Character Screen related @@ -149,7 +168,7 @@ impl Client { | ServerGeneral::CharacterActionError(_) | ServerGeneral::CharacterCreated(_) | ServerGeneral::CharacterSuccess => { - PreparedMsg::new(1, &g, &self.character_screen_stream) + PreparedMsg::new(1, &g, &self.character_screen_stream_params) }, //Ingame related ServerGeneral::GroupUpdate(_) @@ -164,12 +183,12 @@ impl Client { | ServerGeneral::SiteEconomy(_) | ServerGeneral::UpdatePendingTrade(_, _, _) | ServerGeneral::FinishedTrade(_) => { - PreparedMsg::new(2, &g, &self.in_game_stream) + PreparedMsg::new(2, &g, &self.in_game_stream_params) }, //Ingame related, terrain ServerGeneral::TerrainChunkUpdate { .. } | ServerGeneral::TerrainBlockUpdates(_) => { - PreparedMsg::new(5, &g, &self.terrain_stream) + PreparedMsg::new(5, &g, &self.terrain_stream_params) }, // Always possible ServerGeneral::PlayerListUpdate(_) @@ -183,11 +202,11 @@ impl Client { | ServerGeneral::DeleteEntity(_) | ServerGeneral::Disconnect(_) | ServerGeneral::Notification(_) => { - PreparedMsg::new(3, &g, &self.general_stream) + PreparedMsg::new(3, &g, &self.general_stream_params) }, } }, - ServerMsg::Ping(m) => PreparedMsg::new(4, &m, &self.ping_stream), + ServerMsg::Ping(m) => PreparedMsg::new(4, &m, &self.ping_stream_params), } } @@ -209,10 +228,10 @@ impl Client { } impl PreparedMsg { - fn new(id: u8, msg: &M, stream: &Mutex) -> PreparedMsg { + fn new(id: u8, msg: &M, stream_params: &StreamParams) -> PreparedMsg { Self { stream_id: id, - message: Message::serialize(&msg, &stream.lock().unwrap()), + message: Message::serialize(&msg, stream_params.clone()), } } }