diff --git a/network/benches/speed.rs b/network/benches/speed.rs index 6216d4a7a4..b110d308ae 100644 --- a/network/benches/speed.rs +++ b/network/benches/speed.rs @@ -3,11 +3,11 @@ use std::{net::SocketAddr, sync::Arc}; use tokio::{runtime::Runtime, sync::Mutex}; use veloren_network::{Message, Network, Participant, Pid, Promises, ProtocolAddr, Stream}; -fn serialize(data: &[u8], stream: &Stream) { let _ = Message::serialize(data, &stream); } +fn serialize(data: &[u8], stream: &Stream) { let _ = Message::serialize(data, stream.params()); } async fn stream_msg(s1_a: Arc>, s1_b: Arc>, data: &[u8], cnt: usize) { let mut s1_b = s1_b.lock().await; - let m = Message::serialize(&data, &s1_b); + let m = Message::serialize(&data, s1_b.params()); std::thread::spawn(move || { let mut s1_a = s1_a.try_lock().unwrap(); for _ in 0..cnt { diff --git a/network/examples/network-speed/main.rs b/network/examples/network-speed/main.rs index d67e1a6c4b..e058aac7a8 100644 --- a/network/examples/network-speed/main.rs +++ b/network/examples/network-speed/main.rs @@ -173,7 +173,7 @@ fn client(address: ProtocolAddr, runtime: Arc) { id, data: vec![0; 1000], }, - &s1, + s1.params(), ); loop { s1.send_raw(&raw_msg).unwrap(); diff --git a/network/src/api.rs b/network/src/api.rs index f8ab01602c..5379e431c3 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -119,6 +119,12 @@ pub enum StreamError { Deserialize(bincode::Error), } +/// All Parameters of a Stream, can be used to generate RawMessages +#[derive(Debug, Clone)] +pub struct StreamParams { + pub(crate) promises: Promises, +} + /// Use the `Network` to create connections to other [`Participants`] /// /// The `Network` is the single source that handles all connections in your @@ -803,7 +809,7 @@ impl Stream { /// [`Serialized`]: Serialize #[inline] pub fn send(&mut self, msg: M) -> Result<(), StreamError> { - self.send_raw(&Message::serialize(&msg, &self)) + self.send_raw(&Message::serialize(&msg, self.params())) } /// This methods give the option to skip multiple calls of [`bincode`] and @@ -837,7 +843,7 @@ impl Stream { /// let mut stream_b = participant_b.opened().await?; /// /// //Prepare Message and decode it - /// let msg = Message::serialize("Hello World", &stream_a); + /// let msg = Message::serialize("Hello World", stream_a.params()); /// //Send same Message to multiple Streams /// stream_a.send_raw(&msg); /// stream_b.send_raw(&msg); @@ -858,7 +864,7 @@ impl Stream { return Err(StreamError::StreamClosed); } #[cfg(debug_assertions)] - message.verify(&self); + message.verify(self.params()); self.a2b_msg_s.send((self.sid, message.data.clone()))?; Ok(()) } @@ -999,7 +1005,7 @@ impl Stream { Message { data, #[cfg(feature = "compression")] - compressed: self.promises().contains(Promises::COMPRESSED), + compressed: self.promises.contains(Promises::COMPRESSED), } .deserialize()?, )), @@ -1013,7 +1019,11 @@ impl Stream { } } - pub fn promises(&self) -> Promises { self.promises } + pub fn params(&self) -> StreamParams { + StreamParams { + promises: self.promises, + } + } } impl core::cmp::PartialEq for Participant { diff --git a/network/src/lib.rs b/network/src/lib.rs index b79266e6d0..607eed9d91 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -108,7 +108,7 @@ mod trace; pub use api::{ Network, NetworkConnectError, NetworkError, Participant, ParticipantError, ProtocolAddr, - Stream, StreamError, + Stream, StreamError, StreamParams, }; pub use message::Message; pub use network_protocol::{InitProtocolError, Pid, Promises}; diff --git a/network/src/message.rs b/network/src/message.rs index 516fe06ddd..cbee704a90 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -1,4 +1,4 @@ -use crate::api::{Stream, StreamError}; +use crate::api::{StreamError, StreamParams}; use bytes::Bytes; #[cfg(feature = "compression")] use network_protocol::Promises; @@ -36,12 +36,12 @@ impl Message { /// [`Message::serialize`]: crate::message::Message::serialize /// /// [`Streams`]: crate::api::Stream - pub fn serialize(message: &M, stream: &Stream) -> Self { + pub fn serialize(message: &M, stream_params: StreamParams) -> Self { //this will never fail: https://docs.rs/bincode/0.8.0/bincode/fn.serialize.html let serialized_data = bincode::serialize(message).unwrap(); #[cfg(feature = "compression")] - let compressed = stream.promises().contains(Promises::COMPRESSED); + let compressed = stream_params.promises.contains(Promises::COMPRESSED); #[cfg(feature = "compression")] let data = if compressed { let mut compressed_data = Vec::with_capacity(serialized_data.len() / 4 + 10); @@ -54,7 +54,7 @@ impl Message { #[cfg(not(feature = "compression"))] let data = serialized_data; #[cfg(not(feature = "compression"))] - let _stream = stream; + let _stream_params = stream_params; Self { data: Bytes::from(data), @@ -127,13 +127,13 @@ impl Message { } #[cfg(debug_assertions)] - pub(crate) fn verify(&self, stream: &Stream) { + pub(crate) fn verify(&self, params: StreamParams) { #[cfg(not(feature = "compression"))] let _stream = stream; #[cfg(feature = "compression")] - if self.compressed != stream.promises().contains(Promises::COMPRESSED) { + if self.compressed != params.promises.contains(Promises::COMPRESSED) { warn!( - ?stream, + ?params, "verify failed, msg is {} and it doesn't match with stream", self.compressed ); } @@ -171,14 +171,9 @@ pub(crate) fn partial_eq_bincode(first: &bincode::ErrorKind, second: &bincode::E #[cfg(test)] mod tests { - use crate::{api::Stream, message::*}; - use std::sync::{atomic::AtomicBool, Arc}; - use tokio::sync::mpsc; - - fn stub_stream(compressed: bool) -> Stream { - use crate::api::*; - use network_protocol::*; + use crate::{api::StreamParams, message::*}; + fn stub_stream(compressed: bool) -> StreamParams { #[cfg(feature = "compression")] let promises = if compressed { Promises::COMPRESSED @@ -189,27 +184,12 @@ mod tests { #[cfg(not(feature = "compression"))] let promises = Promises::empty(); - let (a2b_msg_s, _a2b_msg_r) = crossbeam_channel::unbounded(); - let (_b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded(); - let (a2b_close_stream_s, _a2b_close_stream_r) = mpsc::unbounded_channel(); - - Stream::new( - Pid::fake(0), - Pid::fake(1), - Sid::new(0), - 0u8, - promises, - 1_000_000, - Arc::new(AtomicBool::new(true)), - a2b_msg_s, - b2a_msg_recv_r, - a2b_close_stream_s, - ) + StreamParams { promises } } #[test] fn serialize_test() { - let msg = Message::serialize("abc", &stub_stream(false)); + let msg = Message::serialize("abc", stub_stream(false)); assert_eq!(msg.data.len(), 11); assert_eq!(msg.data[0], 3); assert_eq!(msg.data[1..7], [0, 0, 0, 0, 0, 0]); @@ -221,7 +201,7 @@ mod tests { #[cfg(feature = "compression")] #[test] fn serialize_compress_small() { - let msg = Message::serialize("abc", &stub_stream(true)); + let msg = Message::serialize("abc", stub_stream(true)); assert_eq!(msg.data.len(), 12); assert_eq!(msg.data[0], 176); assert_eq!(msg.data[1], 3); @@ -245,7 +225,7 @@ mod tests { 0, "assets/data/plants/flowers/greenrose.ron", ); - let msg = Message::serialize(&msg, &stub_stream(true)); + let msg = Message::serialize(&msg, stub_stream(true)); assert_eq!(msg.data.len(), 79); assert_eq!(msg.data[0], 34); assert_eq!(msg.data[1], 5); @@ -275,7 +255,7 @@ mod tests { _ => {}, } } - let msg = Message::serialize(&msg, &stub_stream(true)); + let msg = Message::serialize(&msg, stub_stream(true)); assert_eq!(msg.data.len(), 1331); } } diff --git a/network/src/metrics.rs b/network/src/metrics.rs index d1b77d76d0..a5874a4b86 100644 --- a/network/src/metrics.rs +++ b/network/src/metrics.rs @@ -1,3 +1,4 @@ +use crate::api::ProtocolAddr; use network_protocol::{Cid, Pid}; #[cfg(feature = "metrics")] use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; @@ -151,6 +152,27 @@ impl NetworkMetrics { .with_label_values(&[remote_p]) .inc(); } + + pub(crate) fn listen_request(&self, protocol: &ProtocolAddr) { + self.listen_requests_total + .with_label_values(&[protocol_name(protocol)]) + .inc(); + } + + pub(crate) fn connect_request(&self, protocol: &ProtocolAddr) { + self.connect_requests_total + .with_label_values(&[protocol_name(protocol)]) + .inc(); + } +} + +#[cfg(feature = "metrics")] +fn protocol_name(protocol: &ProtocolAddr) -> &str { + match protocol { + ProtocolAddr::Tcp(_) => "tcp", + ProtocolAddr::Udp(_) => "udp", + ProtocolAddr::Mpsc(_) => "mpsc", + } } #[cfg(not(feature = "metrics"))] @@ -164,6 +186,10 @@ impl NetworkMetrics { pub(crate) fn streams_opened(&self, _remote_p: &str) {} pub(crate) fn streams_closed(&self, _remote_p: &str) {} + + pub(crate) fn listen_request(&self, _protocol: &ProtocolAddr) {} + + pub(crate) fn connect_request(&self, _protocol: &ProtocolAddr) {} } impl std::fmt::Debug for NetworkMetrics { diff --git a/network/src/participant.rs b/network/src/participant.rs index c0276a4ad2..9e8c496172 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -881,7 +881,7 @@ mod tests { .unwrap(); let stream = runtime.block_on(b2a_stream_opened_r.recv()).unwrap(); - assert_eq!(stream.promises(), Promises::ORDERED); + assert_eq!(stream.params().promises, Promises::ORDERED); let (s, r) = oneshot::channel(); runtime.block_on(async { diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index f711287d38..90becb0793 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -177,15 +177,7 @@ impl Scheduler { async move { debug!(?address, "Got request to open a channel_creator"); - #[cfg(feature = "metrics")] - self.metrics - .listen_requests_total - .with_label_values(&[match address { - ProtocolAddr::Tcp(_) => "tcp", - ProtocolAddr::Udp(_) => "udp", - ProtocolAddr::Mpsc(_) => "mpsc", - }]) - .inc(); + self.metrics.listen_request(&address); let (end_sender, end_receiver) = oneshot::channel::<()>(); self.channel_listener .lock() @@ -202,13 +194,11 @@ impl Scheduler { async fn connect_mgr(&self, mut a2s_connect_r: mpsc::UnboundedReceiver) { trace!("Start connect_mgr"); while let Some((addr, pid_sender)) = a2s_connect_r.recv().await { - let (protocol, cid, handshake) = match addr { + let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed); + let metrics = Arc::clone(&self.protocol_metrics); + self.metrics.connect_request(&addr); + let (protocol, handshake) = match addr { ProtocolAddr::Tcp(addr) => { - #[cfg(feature = "metrics")] - self.metrics - .connect_requests_total - .with_label_values(&["tcp"]) - .inc(); let stream = match net::TcpStream::connect(addr).await { Ok(stream) => stream, Err(e) => { @@ -216,13 +206,8 @@ impl Scheduler { continue; }, }; - let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed); info!("Connecting Tcp to: {}", stream.peer_addr().unwrap()); - ( - Protocols::new_tcp(stream, cid, Arc::clone(&self.protocol_metrics)), - cid, - false, - ) + (Protocols::new_tcp(stream, cid, metrics), false) }, ProtocolAddr::Mpsc(addr) => { let mpsc_s = match MPSC_POOL.lock().await.get(&addr) { @@ -244,17 +229,9 @@ impl Scheduler { .send((remote_to_local_s, local_to_remote_oneshot_s)) .unwrap(); let local_to_remote_s = local_to_remote_oneshot_r.await.unwrap(); - - let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed); info!(?addr, "Connecting Mpsc"); ( - Protocols::new_mpsc( - local_to_remote_s, - remote_to_local_r, - cid, - Arc::clone(&self.protocol_metrics), - ), - cid, + Protocols::new_mpsc(local_to_remote_s, remote_to_local_r, cid, metrics), false, ) }, diff --git a/server/src/client.rs b/server/src/client.rs index 2db4c6d43c..9e03187278 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,5 +1,5 @@ use common_net::msg::{ClientType, ServerGeneral, ServerMsg}; -use network::{Message, Participant, Stream, StreamError}; +use network::{Message, Participant, Stream, StreamError, StreamParams}; use serde::{de::DeserializeOwned, Serialize}; use specs::Component; use specs_idvs::IdvStorage; @@ -26,6 +26,13 @@ pub struct Client { character_screen_stream: Mutex, in_game_stream: Mutex, terrain_stream: Mutex, + + general_stream_params: StreamParams, + ping_stream_params: StreamParams, + register_stream_params: StreamParams, + character_screen_stream_params: StreamParams, + in_game_stream_params: StreamParams, + terrain_stream_params: StreamParams, } pub struct PreparedMsg { @@ -50,6 +57,12 @@ impl Client { in_game_stream: Stream, terrain_stream: Stream, ) -> Self { + let general_stream_params = general_stream.params(); + let ping_stream_params = ping_stream.params(); + let register_stream_params = register_stream.params(); + let character_screen_stream_params = character_screen_stream.params(); + let in_game_stream_params = in_game_stream.params(); + let terrain_stream_params = terrain_stream.params(); Client { client_type, participant: Some(participant), @@ -62,6 +75,12 @@ impl Client { character_screen_stream: Mutex::new(character_screen_stream), in_game_stream: Mutex::new(in_game_stream), terrain_stream: Mutex::new(terrain_stream), + general_stream_params, + ping_stream_params, + register_stream_params, + character_screen_stream_params, + in_game_stream_params, + terrain_stream_params, } } @@ -138,9 +157,9 @@ impl Client { pub(crate) fn prepare>(&self, msg: M) -> PreparedMsg { match msg.into() { - ServerMsg::Info(m) => PreparedMsg::new(0, &m, &self.register_stream), - ServerMsg::Init(m) => PreparedMsg::new(0, &m, &self.register_stream), - ServerMsg::RegisterAnswer(m) => PreparedMsg::new(0, &m, &self.register_stream), + ServerMsg::Info(m) => PreparedMsg::new(0, &m, &self.register_stream_params), + ServerMsg::Init(m) => PreparedMsg::new(0, &m, &self.register_stream_params), + ServerMsg::RegisterAnswer(m) => PreparedMsg::new(0, &m, &self.register_stream_params), ServerMsg::General(g) => { match g { //Character Screen related @@ -149,7 +168,7 @@ impl Client { | ServerGeneral::CharacterActionError(_) | ServerGeneral::CharacterCreated(_) | ServerGeneral::CharacterSuccess => { - PreparedMsg::new(1, &g, &self.character_screen_stream) + PreparedMsg::new(1, &g, &self.character_screen_stream_params) }, //Ingame related ServerGeneral::GroupUpdate(_) @@ -164,12 +183,12 @@ impl Client { | ServerGeneral::SiteEconomy(_) | ServerGeneral::UpdatePendingTrade(_, _, _) | ServerGeneral::FinishedTrade(_) => { - PreparedMsg::new(2, &g, &self.in_game_stream) + PreparedMsg::new(2, &g, &self.in_game_stream_params) }, //Ingame related, terrain ServerGeneral::TerrainChunkUpdate { .. } | ServerGeneral::TerrainBlockUpdates(_) => { - PreparedMsg::new(5, &g, &self.terrain_stream) + PreparedMsg::new(5, &g, &self.terrain_stream_params) }, // Always possible ServerGeneral::PlayerListUpdate(_) @@ -183,11 +202,11 @@ impl Client { | ServerGeneral::DeleteEntity(_) | ServerGeneral::Disconnect(_) | ServerGeneral::Notification(_) => { - PreparedMsg::new(3, &g, &self.general_stream) + PreparedMsg::new(3, &g, &self.general_stream_params) }, } }, - ServerMsg::Ping(m) => PreparedMsg::new(4, &m, &self.ping_stream), + ServerMsg::Ping(m) => PreparedMsg::new(4, &m, &self.ping_stream_params), } } @@ -209,10 +228,10 @@ impl Client { } impl PreparedMsg { - fn new(id: u8, msg: &M, stream: &Mutex) -> PreparedMsg { + fn new(id: u8, msg: &M, stream_params: &StreamParams) -> PreparedMsg { Self { stream_id: id, - message: Message::serialize(&msg, &stream.lock().unwrap()), + message: Message::serialize(&msg, stream_params.clone()), } } }