diff --git a/client/src/lib.rs b/client/src/lib.rs index 482739defe..1603618a92 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -119,7 +119,7 @@ impl Client { // We reduce the thread count by 1 to keep rendering smooth thread_pool.set_num_threads((num_cpus::get() - 1).max(1)); - let (network, f) = Network::new(Pid::new(), None); + let (network, f) = Network::new(Pid::new()); thread_pool.execute(f); let participant = block_on(network.connect(ProtocolAddr::Tcp(addr.into())))?; diff --git a/network/Cargo.toml b/network/Cargo.toml index 72bdde9ebc..d6c271ee48 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -6,6 +6,12 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[features] +metrics = ["prometheus"] + +default = ["metrics"] + [dependencies] lz4-compress = "0.1.1" @@ -19,7 +25,7 @@ async-std = { version = "~1.5", default-features = false, features = ["std", "as #tracing and metrics tracing = { version = "0.1", default-features = false } tracing-futures = "0.2" -prometheus = { version = "0.9", default-features = false } +prometheus = { version = "0.9", default-features = false, optional = true } #async futures = { version = "0.3", features = ["thread-pool"] } #mpsc channel registry diff --git a/network/examples/chat/src/main.rs b/network/examples/chat/src/main.rs index f8b93b6581..89ca24c054 100644 --- a/network/examples/chat/src/main.rs +++ b/network/examples/chat/src/main.rs @@ -100,7 +100,7 @@ fn main() { } fn server(address: ProtocolAddr) { - let (server, f) = Network::new(Pid::new(), None); + let (server, f) = Network::new(Pid::new()); let server = Arc::new(server); std::thread::spawn(f); let pool = ThreadPool::new().unwrap(); @@ -144,7 +144,7 @@ async fn client_connection(_network: Arc, participant: Arc } fn client(address: ProtocolAddr) { - let (client, f) = Network::new(Pid::new(), None); + let (client, f) = Network::new(Pid::new()); std::thread::spawn(f); let pool = ThreadPool::new().unwrap(); diff --git a/network/examples/fileshare/src/server.rs b/network/examples/fileshare/src/server.rs index 50df21e65e..a94a1e668c 100644 --- a/network/examples/fileshare/src/server.rs +++ b/network/examples/fileshare/src/server.rs @@ -26,7 +26,7 @@ impl Server { pub fn new() -> (Self, mpsc::UnboundedSender) { let (command_sender, command_receiver) = mpsc::unbounded(); - let (network, f) = Network::new(Pid::new(), None); + let (network, f) = Network::new(Pid::new()); std::thread::spawn(f); let run_channels = Some(ControlChannels { command_receiver }); diff --git a/network/examples/network-speed/src/main.rs b/network/examples/network-speed/src/main.rs index 91072d1153..f182b84483 100644 --- a/network/examples/network-speed/src/main.rs +++ b/network/examples/network-speed/src/main.rs @@ -120,7 +120,7 @@ fn main() { fn server(address: ProtocolAddr) { let mut metrics = metrics::SimpleMetrics::new(); - let (server, f) = Network::new(Pid::new(), Some(metrics.registry())); + let (server, f) = Network::new_with_registry(Pid::new(), metrics.registry()); std::thread::spawn(f); metrics.run("0.0.0.0:59112".parse().unwrap()).unwrap(); block_on(server.listen(address)).unwrap(); @@ -148,7 +148,7 @@ fn server(address: ProtocolAddr) { fn client(address: ProtocolAddr) { let mut metrics = metrics::SimpleMetrics::new(); - let (client, f) = Network::new(Pid::new(), Some(metrics.registry())); + let (client, f) = Network::new_with_registry(Pid::new(), metrics.registry()); std::thread::spawn(f); metrics.run("0.0.0.0:59111".parse().unwrap()).unwrap(); diff --git a/network/src/api.rs b/network/src/api.rs index 39b47ad1ce..33ce2de6ad 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -18,6 +18,7 @@ use futures::{ sink::SinkExt, stream::StreamExt, }; +#[cfg(feature = "metrics")] use prometheus::Registry; use serde::{de::DeserializeOwned, Serialize}; use std::{ @@ -127,11 +128,11 @@ pub enum StreamError { /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application -/// let (network, f) = Network::new(Pid::new(), None); +/// let (network, f) = Network::new(Pid::new()); /// std::thread::spawn(f); /// block_on(async{ /// # //setup pseudo database! -/// # let (database, fd) = Network::new(Pid::new(), None); +/// # let (database, fd) = Network::new(Pid::new()); /// # std::thread::spawn(fd); /// # database.listen(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?; /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?; @@ -162,9 +163,6 @@ impl Network { /// # Arguments /// * `participant_id` - provide it by calling [`Pid::new()`], usually you /// don't want to reuse a Pid for 2 `Networks` - /// * `registry` - Provide a Registy in order to collect Prometheus metrics - /// by this `Network`, `None` will deactivate Tracing. Tracing is done via - /// [`prometheus`] /// /// # Result /// * `Self` - returns a `Network` which can be `Send` to multiple areas of @@ -184,7 +182,7 @@ impl Network { /// use veloren_network::{Network, Pid, ProtocolAddr}; /// /// let pool = ThreadPoolBuilder::new().build(); - /// let (network, f) = Network::new(Pid::new(), None); + /// let (network, f) = Network::new(Pid::new()); /// pool.execute(f); /// ``` /// @@ -192,11 +190,11 @@ impl Network { /// //Example with std::thread /// use veloren_network::{Network, Pid, ProtocolAddr}; /// - /// let (network, f) = Network::new(Pid::new(), None); + /// let (network, f) = Network::new(Pid::new()); /// std::thread::spawn(f); /// ``` /// - /// Usually you only create a single `Network` for an appliregistrycation, + /// Usually you only create a single `Network` for an application, /// except when client and server are in the same application, then you /// will want 2. However there are no technical limitations from /// creating more. @@ -204,14 +202,51 @@ impl Network { /// [`Pid::new()`]: crate::types::Pid::new /// [`ThreadPool`]: https://docs.rs/uvth/newest/uvth/struct.ThreadPool.html /// [`uvth`]: https://docs.rs/uvth - pub fn new( + pub fn new(participant_id: Pid) -> (Self, impl std::ops::FnOnce()) { + Self::internal_new( + participant_id, + #[cfg(feature = "metrics")] + None, + ) + } + + /// See [`new`] + /// + /// # additional Arguments + /// * `registry` - Provide a Registy in order to collect Prometheus metrics + /// by this `Network`, `None` will deactivate Tracing. Tracing is done via + /// [`prometheus`] + /// + /// # Examples + /// ```rust + /// use prometheus::Registry; + /// use veloren_network::{Network, Pid, ProtocolAddr}; + /// + /// let registry = Registry::new(); + /// let (network, f) = Network::new_with_registry(Pid::new(), ®istry); + /// std::thread::spawn(f); + /// ``` + /// [`new`]: crate::api::Network::new + #[cfg(feature = "metrics")] + pub fn new_with_registry( participant_id: Pid, - registry: Option<&Registry>, + registry: &Registry, + ) -> (Self, impl std::ops::FnOnce()) { + Self::internal_new(participant_id, Some(registry)) + } + + fn internal_new( + participant_id: Pid, + #[cfg(feature = "metrics")] registry: Option<&Registry>, ) -> (Self, impl std::ops::FnOnce()) { let p = participant_id; debug!(?p, "Starting Network"); let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) = - Scheduler::new(participant_id, registry); + Scheduler::new( + participant_id, + #[cfg(feature = "metrics")] + registry, + ); ( Self { local_pid: participant_id, @@ -222,13 +257,13 @@ impl Network { shutdown_sender: Some(shutdown_sender), }, move || { - trace!(?p, "Starting sheduler in own thread"); + trace!(?p, "Starting scheduler in own thread"); let _handle = task::block_on( scheduler .run() .instrument(tracing::info_span!("scheduler", ?p)), ); - trace!(?p, "Stopping sheduler and his own thread"); + trace!(?p, "Stopping scheduler and his own thread"); }, ) } @@ -247,7 +282,7 @@ impl Network { /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally - /// let (network, f) = Network::new(Pid::new(), None); + /// let (network, f) = Network::new(Pid::new()); /// std::thread::spawn(f); /// block_on(async { /// network @@ -288,9 +323,9 @@ impl Network { /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above - /// let (network, f) = Network::new(Pid::new(), None); + /// let (network, f) = Network::new(Pid::new()); /// std::thread::spawn(f); - /// # let (remote, fr) = Network::new(Pid::new(), None); + /// # let (remote, fr) = Network::new(Pid::new()); /// # std::thread::spawn(fr); /// block_on(async { /// # remote.listen(ProtocolAddr::Tcp("0.0.0.0:2010".parse().unwrap())).await?; @@ -354,9 +389,9 @@ impl Network { /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2020` TCP and opens returns their Pid - /// let (network, f) = Network::new(Pid::new(), None); + /// let (network, f) = Network::new(Pid::new()); /// std::thread::spawn(f); - /// # let (remote, fr) = Network::new(Pid::new(), None); + /// # let (remote, fr) = Network::new(Pid::new()); /// # std::thread::spawn(fr); /// block_on(async { /// network @@ -428,9 +463,9 @@ impl Participant { /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port 2100 and open a stream - /// let (network, f) = Network::new(Pid::new(), None); + /// let (network, f) = Network::new(Pid::new()); /// std::thread::spawn(f); - /// # let (remote, fr) = Network::new(Pid::new(), None); + /// # let (remote, fr) = Network::new(Pid::new()); /// # std::thread::spawn(fr); /// block_on(async { /// # remote.listen(ProtocolAddr::Tcp("0.0.0.0:2100".parse().unwrap())).await?; @@ -483,9 +518,9 @@ impl Participant { /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port 2110 and wait for the other side to open a stream /// // Note: It's quite unusal to activly connect, but then wait on a stream to be connected, usually the Appication taking initiative want's to also create the first Stream. - /// let (network, f) = Network::new(Pid::new(), None); + /// let (network, f) = Network::new(Pid::new()); /// std::thread::spawn(f); - /// # let (remote, fr) = Network::new(Pid::new(), None); + /// # let (remote, fr) = Network::new(Pid::new()); /// # std::thread::spawn(fr); /// block_on(async { /// # remote.listen(ProtocolAddr::Tcp("0.0.0.0:2110".parse().unwrap())).await?; @@ -542,9 +577,9 @@ impl Participant { /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection. - /// let (network, f) = Network::new(Pid::new(), None); + /// let (network, f) = Network::new(Pid::new()); /// std::thread::spawn(f); - /// # let (remote, fr) = Network::new(Pid::new(), None); + /// # let (remote, fr) = Network::new(Pid::new()); /// # std::thread::spawn(fr); /// block_on(async { /// network @@ -679,9 +714,9 @@ impl Stream { /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World` - /// let (network, f) = Network::new(Pid::new(), None); + /// let (network, f) = Network::new(Pid::new()); /// std::thread::spawn(f); - /// # let (remote, fr) = Network::new(Pid::new(), None); + /// # let (remote, fr) = Network::new(Pid::new()); /// # std::thread::spawn(fr); /// block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?; @@ -719,11 +754,11 @@ impl Stream { /// use std::sync::Arc; /// /// # fn main() -> std::result::Result<(), Box> { - /// let (network, f) = Network::new(Pid::new(), None); + /// let (network, f) = Network::new(Pid::new()); /// std::thread::spawn(f); - /// # let (remote1, fr1) = Network::new(Pid::new(), None); + /// # let (remote1, fr1) = Network::new(Pid::new()); /// # std::thread::spawn(fr1); - /// # let (remote2, fr2) = Network::new(Pid::new(), None); + /// # let (remote2, fr2) = Network::new(Pid::new()); /// # std::thread::spawn(fr2); /// block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; @@ -784,9 +819,9 @@ impl Stream { /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it - /// let (network, f) = Network::new(Pid::new(), None); + /// let (network, f) = Network::new(Pid::new()); /// std::thread::spawn(f); - /// # let (remote, fr) = Network::new(Pid::new(), None); + /// # let (remote, fr) = Network::new(Pid::new()); /// # std::thread::spawn(fr); /// block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?; diff --git a/network/src/channel.rs b/network/src/channel.rs index 752dad16cb..25babccd2b 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -1,5 +1,6 @@ +#[cfg(feature = "metrics")] +use crate::metrics::NetworkMetrics; use crate::{ - metrics::NetworkMetrics, protocols::Protocols, types::{ Cid, Frame, Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER, @@ -13,7 +14,7 @@ use futures::{ stream::StreamExt, FutureExt, }; -use std::sync::Arc; +#[cfg(feature = "metrics")] use std::sync::Arc; use tracing::*; pub(crate) struct Channel { @@ -80,6 +81,7 @@ pub(crate) struct Handshake { local_pid: Pid, secret: u128, init_handshake: bool, + #[cfg(feature = "metrics")] metrics: Arc, } @@ -98,13 +100,14 @@ impl Handshake { cid: u64, local_pid: Pid, secret: u128, - metrics: Arc, + #[cfg(feature = "metrics")] metrics: Arc, init_handshake: bool, ) -> Self { Self { cid, local_pid, secret, + #[cfg(feature = "metrics")] metrics, init_handshake, } @@ -163,104 +166,73 @@ impl Handshake { ) -> Result<(Pid, Sid, u128), ()> { const ERR_S: &str = "Got A Raw Message, these are usually Debug Messages indicating that \ something went wrong on network layer and connection will be closed"; - let mut pid_string = "".to_string(); + #[cfg(feature = "metrics")] let cid_string = self.cid.to_string(); if self.init_handshake { self.send_handshake(&mut c2w_frame_s).await; } - let r = match w2c_cid_frame_r.next().await { - Some(( - _, - Frame::Handshake { - magic_number, - version, - }, - )) => { - trace!(?magic_number, ?version, "Recv handshake"); + let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame); + #[cfg(feature = "metrics")] + { + if let Some(ref frame) = frame { self.metrics .frames_in_total - .with_label_values(&["", &cid_string, "Handshake"]) + .with_label_values(&["", &cid_string, &frame.get_string()]) .inc(); + } + } + let r = match frame { + Some(Frame::Handshake { + magic_number, + version, + }) => { + trace!(?magic_number, ?version, "Recv handshake"); if magic_number != VELOREN_MAGIC_NUMBER { error!(?magic_number, "Connection with invalid magic_number"); #[cfg(debug_assertions)] - { - self.metrics - .frames_out_total - .with_label_values(&["", &cid_string, "Raw"]) - .inc(); - debug!("Sending client instructions before killing"); - c2w_frame_s - .send(Frame::Raw(Self::WRONG_NUMBER.to_vec())) - .await - .unwrap(); - c2w_frame_s.send(Frame::Shutdown).await.unwrap(); - } + self.send_raw_and_shutdown(&mut c2w_frame_s, Self::WRONG_NUMBER.to_vec()) + .await; Err(()) } else if version != VELOREN_NETWORK_VERSION { error!(?version, "Connection with wrong network version"); #[cfg(debug_assertions)] - { - debug!("Sending client instructions before killing"); - self.metrics - .frames_out_total - .with_label_values(&["", &cid_string, "Raw"]) - .inc(); - c2w_frame_s - .send(Frame::Raw( - format!( - "{} Our Version: {:?}\nYour Version: {:?}\nClosing the \ - connection", - Self::WRONG_VERSION, - VELOREN_NETWORK_VERSION, - version, - ) - .as_bytes() - .to_vec(), - )) - .await - .unwrap(); - c2w_frame_s.send(Frame::Shutdown {}).await.unwrap(); - } + self.send_raw_and_shutdown( + &mut c2w_frame_s, + format!( + "{} Our Version: {:?}\nYour Version: {:?}\nClosing the connection", + Self::WRONG_VERSION, + VELOREN_NETWORK_VERSION, + version, + ) + .as_bytes() + .to_vec(), + ) + .await; Err(()) } else { debug!("Handshake completed"); if self.init_handshake { - self.send_init(&mut c2w_frame_s, &pid_string).await; + self.send_init(&mut c2w_frame_s, "").await; } else { self.send_handshake(&mut c2w_frame_s).await; } Ok(()) } }, - Some((_, Frame::Shutdown)) => { + Some(Frame::Shutdown) => { info!("Shutdown signal received"); - self.metrics - .frames_in_total - .with_label_values(&[&pid_string, &cid_string, "Shutdown"]) - .inc(); Err(()) }, - Some((_, Frame::Raw(bytes))) => { - self.metrics - .frames_in_total - .with_label_values(&[&pid_string, &cid_string, "Raw"]) - .inc(); + Some(Frame::Raw(bytes)) => { match std::str::from_utf8(bytes.as_slice()) { Ok(string) => error!(?string, ERR_S), _ => error!(?bytes, ERR_S), } Err(()) }, - Some((_, frame)) => { - self.metrics - .frames_in_total - .with_label_values(&[&pid_string, &cid_string, frame.get_string()]) - .inc(); - Err(()) - }, + Some(_) => Err(()), None => Err(()), }; if let Err(()) = r { @@ -274,10 +246,12 @@ impl Handshake { return Err(()); } - let r = match w2c_cid_frame_r.next().await { - Some((_, Frame::Init { pid, secret })) => { + let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame); + let r = match frame { + Some(Frame::Init { pid, secret }) => { debug!(?pid, "Participant send their ID"); - pid_string = pid.to_string(); + let pid_string = pid.to_string(); + #[cfg(feature = "metrics")] self.metrics .frames_in_total .with_label_values(&[&pid_string, &cid_string, "ParticipantId"]) @@ -291,32 +265,22 @@ impl Handshake { info!(?pid, "This Handshake is now configured!"); Ok((pid, stream_id_offset, secret)) }, - Some((_, Frame::Shutdown)) => { - info!("Shutdown signal received"); + Some(frame) => { + #[cfg(feature = "metrics")] self.metrics .frames_in_total - .with_label_values(&[&pid_string, &cid_string, "Shutdown"]) + .with_label_values(&["", &cid_string, frame.get_string()]) .inc(); - Err(()) - }, - Some((_, Frame::Raw(bytes))) => { - self.metrics - .frames_in_total - .with_label_values(&[&pid_string, &cid_string, "Raw"]) - .inc(); - match std::str::from_utf8(bytes.as_slice()) { - Ok(string) => error!(?string, ERR_S), - _ => error!(?bytes, ERR_S), + match frame { + Frame::Shutdown => info!("Shutdown signal received"), + Frame::Raw(bytes) => match std::str::from_utf8(bytes.as_slice()) { + Ok(string) => error!(?string, ERR_S), + _ => error!(?bytes, ERR_S), + }, + _ => (), } Err(()) }, - Some((_, frame)) => { - self.metrics - .frames_in_total - .with_label_values(&[&pid_string, &cid_string, frame.get_string()]) - .inc(); - Err(()) - }, None => Err(()), }; if r.is_err() { @@ -332,6 +296,7 @@ impl Handshake { } async fn send_handshake(&self, c2w_frame_s: &mut mpsc::UnboundedSender) { + #[cfg(feature = "metrics")] self.metrics .frames_out_total .with_label_values(&["", &self.cid.to_string(), "Handshake"]) @@ -345,7 +310,13 @@ impl Handshake { .unwrap(); } - async fn send_init(&self, c2w_frame_s: &mut mpsc::UnboundedSender, pid_string: &str) { + async fn send_init( + &self, + c2w_frame_s: &mut mpsc::UnboundedSender, + #[cfg(feature = "metrics")] pid_string: &str, + #[cfg(not(feature = "metrics"))] _pid_string: &str, + ) { + #[cfg(feature = "metrics")] self.metrics .frames_out_total .with_label_values(&[pid_string, &self.cid.to_string(), "ParticipantId"]) @@ -358,4 +329,27 @@ impl Handshake { .await .unwrap(); } + + #[cfg(debug_assertions)] + async fn send_raw_and_shutdown( + &self, + c2w_frame_s: &mut mpsc::UnboundedSender, + data: Vec, + ) { + debug!("Sending client instructions before killing"); + #[cfg(feature = "metrics")] + { + let cid_string = self.cid.to_string(); + self.metrics + .frames_out_total + .with_label_values(&["", &cid_string, "Raw"]) + .inc(); + self.metrics + .frames_out_total + .with_label_values(&["", &cid_string, "Shutdown"]) + .inc(); + } + c2w_frame_s.send(Frame::Raw(data)).await.unwrap(); + c2w_frame_s.send(Frame::Shutdown).await.unwrap(); + } } diff --git a/network/src/lib.rs b/network/src/lib.rs index 7df6ffe8b6..51d4e7a1e1 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -45,7 +45,7 @@ //! // Client //! async fn client() -> std::result::Result<(), Box> { //! sleep(std::time::Duration::from_secs(1)).await; // `connect` MUST be after `listen` -//! let (client_network, f) = Network::new(Pid::new(), None); +//! let (client_network, f) = Network::new(Pid::new()); //! std::thread::spawn(f); //! let server = client_network //! .connect(ProtocolAddr::Tcp("127.0.0.1:12345".parse().unwrap())) @@ -59,7 +59,7 @@ //! //! // Server //! async fn server() -> std::result::Result<(), Box> { -//! let (server_network, f) = Network::new(Pid::new(), None); +//! let (server_network, f) = Network::new(Pid::new()); //! std::thread::spawn(f); //! server_network //! .listen(ProtocolAddr::Tcp("127.0.0.1:12345".parse().unwrap())) @@ -101,7 +101,7 @@ mod api; mod channel; mod message; -mod metrics; +#[cfg(feature = "metrics")] mod metrics; mod participant; mod prios; mod protocols; diff --git a/network/src/participant.rs b/network/src/participant.rs index 901b10b4c7..8d6fa66aab 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -1,8 +1,9 @@ +#[cfg(feature = "metrics")] +use crate::metrics::{NetworkMetrics, PidCidFrameCache}; use crate::{ api::{ParticipantError, Stream}, channel::Channel, message::{IncomingMessage, MessageBuffer, OutgoingMessage}, - metrics::{NetworkMetrics, PidCidFrameCache}, prios::PrioManager, protocols::Protocols, types::{Cid, Frame, Pid, Prio, Promises, Sid}, @@ -66,6 +67,7 @@ pub struct BParticipant { api_participant_closed: Arc>>, running_mgr: AtomicUsize, run_channels: Option, + #[cfg(feature = "metrics")] metrics: Arc, no_channel_error_info: RwLock<(Instant, u64)>, } @@ -75,7 +77,7 @@ impl BParticipant { pub(crate) fn new( remote_pid: Pid, offset_sid: Sid, - metrics: Arc, + #[cfg(feature = "metrics")] metrics: Arc, ) -> ( Self, mpsc::UnboundedSender, @@ -111,6 +113,7 @@ impl BParticipant { api_participant_closed: api_participant_closed.clone(), running_mgr: AtomicUsize::new(0), run_channels, + #[cfg(feature = "metrics")] metrics, no_channel_error_info: RwLock::new((Instant::now(), 0)), }, @@ -131,8 +134,11 @@ impl BParticipant { let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel(); let (b2b_prios_flushed_s, b2b_prios_flushed_r) = oneshot::channel(); let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<(Cid, Frame)>(); - let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = - PrioManager::new(self.metrics.clone(), self.remote_pid_string.clone()); + let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = PrioManager::new( + #[cfg(feature = "metrics")] + self.metrics.clone(), + self.remote_pid_string.clone(), + ); let run_channels = self.run_channels.take().unwrap(); futures::join!( @@ -187,6 +193,7 @@ impl BParticipant { self.running_mgr.fetch_add(1, Ordering::Relaxed); let mut closing_up = false; trace!("Start send_mgr"); + #[cfg(feature = "metrics")] let mut send_cache = PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); loop { @@ -197,7 +204,12 @@ impl BParticipant { trace!("Tick {}", len); } for (_, frame) in frames { - self.send_frame(frame, &mut send_cache).await; + self.send_frame( + frame, + #[cfg(feature = "metrics")] + &mut send_cache, + ) + .await; } b2s_prio_statistic_s .send((self.remote_pid, len as u64, /* */ 0)) @@ -225,7 +237,7 @@ impl BParticipant { async fn send_frame( &self, frame: Frame, - frames_out_total_cache: &mut PidCidFrameCache, + #[cfg(feature = "metrics")] frames_out_total_cache: &mut PidCidFrameCache, ) -> bool { // find out ideal channel here //TODO: just take first @@ -234,6 +246,7 @@ impl BParticipant { //note: this is technically wrong we should only increase when it suceeded, but // this requiered me to clone `frame` which is a to big performance impact for // error handling + #[cfg(feature = "metrics")] frames_out_total_cache .with_label_values(ci.cid, &frame) .inc(); @@ -292,12 +305,17 @@ impl BParticipant { let mut dropped_sid = Sid::new(0); while let Some((cid, frame)) = w2b_frames_r.next().await { - let cid_string = cid.to_string(); //trace!("handling frame"); - self.metrics - .frames_in_total - .with_label_values(&[&self.remote_pid_string, &cid_string, frame.get_string()]) - .inc(); + #[cfg(feature = "metrics")] + { + let cid_string = cid.to_string(); + self.metrics + .frames_in_total + .with_label_values(&[&self.remote_pid_string, &cid_string, frame.get_string()]) + .inc(); + } + #[cfg(not(feature = "metrics"))] + let _cid = cid; match frame { Frame::OpenStream { sid, @@ -324,6 +342,7 @@ impl BParticipant { ); // no wait for flush here, as the remote wouldn't care anyway. if let Some(si) = self.streams.write().await.remove(&sid) { + #[cfg(feature = "metrics")] self.metrics .streams_closed_total .with_label_values(&[&self.remote_pid_string]) @@ -433,6 +452,7 @@ impl BParticipant { b2r_read_shutdown, }); b2s_create_channel_done_s.send(()).unwrap(); + #[cfg(feature = "metrics")] self.metrics .channels_connected_total .with_label_values(&[&self.remote_pid_string]) @@ -441,6 +461,7 @@ impl BParticipant { channel .run(protocol, w2b_frames_s, leftover_cid_frame) .await; + #[cfg(feature = "metrics")] self.metrics .channels_disconnected_total .with_label_values(&[&self.remote_pid_string]) @@ -464,6 +485,7 @@ impl BParticipant { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("Start open_mgr"); let mut stream_ids = self.offset_sid; + #[cfg(feature = "metrics")] let mut send_cache = PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); let mut shutdown_open_mgr_receiver = shutdown_open_mgr_receiver.fuse(); @@ -485,6 +507,7 @@ impl BParticipant { prio, promises, }, + #[cfg(feature = "metrics")] &mut send_cache, ) .await @@ -553,6 +576,7 @@ impl BParticipant { } trace!("All BParticipant mgr (except me) are shut down now"); + #[cfg(feature = "metrics")] self.metrics.participants_disconnected_total.inc(); debug!("BParticipant close done"); @@ -569,6 +593,7 @@ impl BParticipant { ) { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("Start stream_close_mgr"); + #[cfg(feature = "metrics")] let mut send_cache = PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); let mut shutdown_stream_close_mgr_receiver = shutdown_stream_close_mgr_receiver.fuse(); @@ -606,14 +631,19 @@ impl BParticipant { s2b_stream_finished_closed_r.await.unwrap(); trace!(?sid, "Stream was successfully flushed"); + #[cfg(feature = "metrics")] self.metrics .streams_closed_total .with_label_values(&[&self.remote_pid_string]) .inc(); //only now remove the Stream, that means we can still recv on it. self.streams.write().await.remove(&sid); - self.send_frame(Frame::CloseStream { sid }, &mut send_cache) - .await; + self.send_frame( + Frame::CloseStream { sid }, + #[cfg(feature = "metrics")] + &mut send_cache, + ) + .await; } trace!("Stop stream_close_mgr"); self.running_mgr.fetch_sub(1, Ordering::Relaxed); @@ -635,6 +665,7 @@ impl BParticipant { b2a_msg_recv_s, closed: closed.clone(), }); + #[cfg(feature = "metrics")] self.metrics .streams_opened_total .with_label_values(&[&self.remote_pid_string]) diff --git a/network/src/prios.rs b/network/src/prios.rs index 83905a06dd..4e5971a32e 100644 --- a/network/src/prios.rs +++ b/network/src/prios.rs @@ -4,18 +4,17 @@ //!E.g. in the same time 100 prio0 messages are send, only 50 prio5, 25 prio10, //! 12 prio15 or 6 prio20 messages are send. Note: TODO: prio0 will be send //! immeadiatly when found! - +//! +#[cfg(feature = "metrics")] +use crate::metrics::NetworkMetrics; use crate::{ message::OutgoingMessage, - metrics::NetworkMetrics, types::{Frame, Prio, Sid}, }; use crossbeam_channel::{unbounded, Receiver, Sender}; use futures::channel::oneshot; -use std::{ - collections::{HashMap, HashSet, VecDeque}, - sync::Arc, -}; +use std::collections::{HashMap, HashSet, VecDeque}; +#[cfg(feature = "metrics")] use std::sync::Arc; use tracing::*; @@ -35,7 +34,9 @@ pub(crate) struct PrioManager { //you can register to be notified if a pid_sid combination is flushed completly here sid_flushed_rx: Receiver<(Sid, oneshot::Sender<()>)>, queued: HashSet, + #[cfg(feature = "metrics")] metrics: Arc, + #[cfg(feature = "metrics")] pid: String, } @@ -50,13 +51,15 @@ impl PrioManager { #[allow(clippy::type_complexity)] pub fn new( - metrics: Arc, + #[cfg(feature = "metrics")] metrics: Arc, pid: String, ) -> ( Self, Sender<(Prio, Sid, OutgoingMessage)>, Sender<(Sid, oneshot::Sender<()>)>, ) { + #[cfg(not(feature = "metrics"))] + let _pid = pid; // (a2p_msg_s, a2p_msg_r) let (messages_tx, messages_rx) = unbounded(); let (sid_flushed_tx, sid_flushed_rx) = unbounded(); @@ -133,7 +136,9 @@ impl PrioManager { queued: HashSet::new(), //TODO: optimize with u64 and 64 bits sid_flushed_rx, sid_owned: HashMap::new(), + #[cfg(feature = "metrics")] metrics, + #[cfg(feature = "metrics")] pid, }, messages_tx, @@ -148,15 +153,19 @@ impl PrioManager { for (prio, sid, msg) in self.messages_rx.try_iter() { debug_assert!(prio as usize <= PRIO_MAX); messages += 1; - let sid_string = sid.to_string(); - self.metrics - .message_out_total - .with_label_values(&[&self.pid, &sid_string]) - .inc(); - self.metrics - .message_out_throughput - .with_label_values(&[&self.pid, &sid_string]) - .inc_by(msg.buffer.data.len() as i64); + #[cfg(feature = "metrics")] + { + let sid_string = sid.to_string(); + self.metrics + .message_out_total + .with_label_values(&[&self.pid, &sid_string]) + .inc(); + self.metrics + .message_out_throughput + .with_label_values(&[&self.pid, &sid_string]) + .inc_by(msg.buffer.data.len() as i64); + } + //trace!(?prio, ?sid_string, "tick"); self.queued.insert(prio); self.messages[prio as usize].push_back((sid, msg)); diff --git a/network/src/protocols.rs b/network/src/protocols.rs index 970867517f..687e1d8885 100644 --- a/network/src/protocols.rs +++ b/network/src/protocols.rs @@ -1,7 +1,6 @@ -use crate::{ - metrics::{CidFrameCache, NetworkMetrics}, - types::{Cid, Frame, Mid, Pid, Sid}, -}; +#[cfg(feature = "metrics")] +use crate::metrics::{CidFrameCache, NetworkMetrics}; +use crate::types::{Cid, Frame, Mid, Pid, Sid}; use async_std::{ net::{TcpStream, UdpSocket}, prelude::*, @@ -41,6 +40,7 @@ pub(crate) enum Protocols { #[derive(Debug)] pub(crate) struct TcpProtocol { stream: TcpStream, + #[cfg(feature = "metrics")] metrics: Arc, } @@ -48,14 +48,22 @@ pub(crate) struct TcpProtocol { pub(crate) struct UdpProtocol { socket: Arc, remote_addr: SocketAddr, + #[cfg(feature = "metrics")] metrics: Arc, data_in: Mutex>>, } //TODO: PERFORMACE: Use BufWriter and BufReader from std::io! impl TcpProtocol { - pub(crate) fn new(stream: TcpStream, metrics: Arc) -> Self { - Self { stream, metrics } + pub(crate) fn new( + stream: TcpStream, + #[cfg(feature = "metrics")] metrics: Arc, + ) -> Self { + Self { + stream, + #[cfg(feature = "metrics")] + metrics, + } } /// read_except and if it fails, close the protocol @@ -98,7 +106,9 @@ impl TcpProtocol { end_r: oneshot::Receiver<()>, ) { trace!("Starting up tcp read()"); + #[cfg(feature = "metrics")] let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_in_total.clone(), cid); + #[cfg(feature = "metrics")] let throughput_cache = self .metrics .wire_in_throughput @@ -177,6 +187,7 @@ impl TcpProtocol { let start = u64::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[8..16]).unwrap()); let length = u16::from_le_bytes(*<&[u8; 2]>::try_from(&bytes[16..18]).unwrap()); let mut data = vec![0; length as usize]; + #[cfg(feature = "metrics")] throughput_cache.inc_by(length as i64); read_or_close!(&mut data); Frame::Data { mid, start, data } @@ -199,6 +210,7 @@ impl TcpProtocol { Frame::Raw(data) }, }; + #[cfg(feature = "metrics")] metrics_cache.with_label_values(&frame).inc(); w2c_cid_frame_s .send((cid, frame)) @@ -230,11 +242,15 @@ impl TcpProtocol { pub async fn write_to_wire(&self, cid: Cid, mut c2w_frame_r: mpsc::UnboundedReceiver) { trace!("Starting up tcp write()"); let mut stream = self.stream.clone(); + #[cfg(feature = "metrics")] let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_out_total.clone(), cid); + #[cfg(feature = "metrics")] let throughput_cache = self .metrics .wire_out_throughput .with_label_values(&[&cid.to_string()]); + #[cfg(not(feature = "metrics"))] + let _cid = cid; macro_rules! write_or_close { ($x:expr) => { @@ -246,6 +262,7 @@ impl TcpProtocol { } while let Some(frame) = c2w_frame_r.next().await { + #[cfg(feature = "metrics")] metrics_cache.with_label_values(&frame).inc(); match frame { Frame::Handshake { @@ -287,6 +304,7 @@ impl TcpProtocol { write_or_close!(&length.to_le_bytes()); }, Frame::Data { mid, start, data } => { + #[cfg(feature = "metrics")] throughput_cache.inc_by(data.len() as i64); write_or_close!(&FRAME_DATA.to_be_bytes()); write_or_close!(&mid.to_le_bytes()); @@ -309,12 +327,13 @@ impl UdpProtocol { pub(crate) fn new( socket: Arc, remote_addr: SocketAddr, - metrics: Arc, + #[cfg(feature = "metrics")] metrics: Arc, data_in: mpsc::UnboundedReceiver>, ) -> Self { Self { socket, remote_addr, + #[cfg(feature = "metrics")] metrics, data_in: Mutex::new(data_in), } @@ -327,7 +346,9 @@ impl UdpProtocol { end_r: oneshot::Receiver<()>, ) { trace!("Starting up udp read()"); + #[cfg(feature = "metrics")] let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_in_total.clone(), cid); + #[cfg(feature = "metrics")] let throughput_cache = self .metrics .wire_in_throughput @@ -418,6 +439,7 @@ impl UdpProtocol { ]); let length = u16::from_le_bytes([bytes[17], bytes[18]]); let mut data = vec![0; length as usize]; + #[cfg(feature = "metrics")] throughput_cache.inc_by(length as i64); data.copy_from_slice(&bytes[19..]); Frame::Data { mid, start, data } @@ -430,6 +452,7 @@ impl UdpProtocol { }, _ => Frame::Raw(bytes), }; + #[cfg(feature = "metrics")] metrics_cache.with_label_values(&frame).inc(); w2c_cid_frame_s.send((cid, frame)).await.unwrap(); } @@ -439,12 +462,17 @@ impl UdpProtocol { pub async fn write_to_wire(&self, cid: Cid, mut c2w_frame_r: mpsc::UnboundedReceiver) { trace!("Starting up udp write()"); let mut buffer = [0u8; 2000]; + #[cfg(feature = "metrics")] let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_out_total.clone(), cid); + #[cfg(feature = "metrics")] let throughput_cache = self .metrics .wire_out_throughput .with_label_values(&[&cid.to_string()]); + #[cfg(not(feature = "metrics"))] + let _cid = cid; while let Some(frame) = c2w_frame_r.next().await { + #[cfg(feature = "metrics")] metrics_cache.with_label_values(&frame).inc(); let len = match frame { Frame::Handshake { @@ -498,6 +526,7 @@ impl UdpProtocol { buffer[9..17].copy_from_slice(&start.to_le_bytes()); buffer[17..19].copy_from_slice(&(data.len() as u16).to_le_bytes()); buffer[19..(data.len() + 19)].clone_from_slice(&data[..]); + #[cfg(feature = "metrics")] throughput_cache.inc_by(data.len() as i64); 19 + data.len() }, diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 931fabb2a1..b29818d1b1 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -1,7 +1,8 @@ +#[cfg(feature = "metrics")] +use crate::metrics::NetworkMetrics; use crate::{ api::{Participant, ProtocolAddr}, channel::Handshake, - metrics::NetworkMetrics, participant::{B2sPrioStatistic, BParticipant, S2bCreateChannel}, protocols::{Protocols, TcpProtocol, UdpProtocol}, types::Pid, @@ -18,6 +19,7 @@ use futures::{ sink::SinkExt, stream::StreamExt, }; +#[cfg(feature = "metrics")] use prometheus::Registry; use rand::Rng; use std::{ @@ -78,13 +80,14 @@ pub struct Scheduler { participants: Arc>>, channel_ids: Arc, channel_listener: RwLock>>, + #[cfg(feature = "metrics")] metrics: Arc, } impl Scheduler { pub fn new( local_pid: Pid, - registry: Option<&Registry>, + #[cfg(feature = "metrics")] registry: Option<&Registry>, ) -> ( Self, mpsc::UnboundedSender, @@ -113,9 +116,14 @@ impl Scheduler { b2s_prio_statistic_s, }; + #[cfg(feature = "metrics")] let metrics = Arc::new(NetworkMetrics::new(&local_pid).unwrap()); - if let Some(registry) = registry { - metrics.register(registry).unwrap(); + + #[cfg(feature = "metrics")] + { + if let Some(registry) = registry { + metrics.register(registry).unwrap(); + } } let mut rng = rand::thread_rng(); @@ -132,6 +140,7 @@ impl Scheduler { participants: Arc::new(RwLock::new(HashMap::new())), channel_ids: Arc::new(AtomicU64::new(0)), channel_listener: RwLock::new(HashMap::new()), + #[cfg(feature = "metrics")] metrics, }, a2s_listen_s, @@ -161,6 +170,7 @@ impl Scheduler { async move { debug!(?address, "Got request to open a channel_creator"); + #[cfg(feature = "metrics")] self.metrics .listen_requests_total .with_label_values(&[match address { @@ -193,6 +203,7 @@ impl Scheduler { while let Some((addr, pid_sender)) = a2s_connect_r.next().await { let (protocol, handshake) = match addr { ProtocolAddr::Tcp(addr) => { + #[cfg(feature = "metrics")] self.metrics .connect_requests_total .with_label_values(&["tcp"]) @@ -206,11 +217,16 @@ impl Scheduler { }; info!("Connecting Tcp to: {}", stream.peer_addr().unwrap()); ( - Protocols::Tcp(TcpProtocol::new(stream, self.metrics.clone())), + Protocols::Tcp(TcpProtocol::new( + stream, + #[cfg(feature = "metrics")] + self.metrics.clone(), + )), false, ) }, ProtocolAddr::Udp(addr) => { + #[cfg(feature = "metrics")] self.metrics .connect_requests_total .with_label_values(&["udp"]) @@ -231,6 +247,7 @@ impl Scheduler { let protocol = UdpProtocol::new( socket.clone(), addr, + #[cfg(feature = "metrics")] self.metrics.clone(), udp_data_receiver, ); @@ -372,7 +389,11 @@ impl Scheduler { }, }; info!("Accepting Tcp from: {}", peer_addr); - let protocol = TcpProtocol::new(stream, self.metrics.clone()); + let protocol = TcpProtocol::new( + stream, + #[cfg(feature = "metrics")] + self.metrics.clone(), + ); self.init_protocol(Protocols::Tcp(protocol), None, true) .await; } @@ -416,6 +437,7 @@ impl Scheduler { let protocol = UdpProtocol::new( socket.clone(), remote_addr, + #[cfg(feature = "metrics")] self.metrics.clone(), udp_data_receiver, ); @@ -474,6 +496,7 @@ impl Scheduler { // the UDP listening is done in another place. let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed); let participants = self.participants.clone(); + #[cfg(feature = "metrics")] let metrics = self.metrics.clone(); let pool = self.pool.clone(); let local_pid = self.local_pid; @@ -486,6 +509,7 @@ impl Scheduler { cid, local_pid, local_secret, + #[cfg(feature = "metrics")] metrics.clone(), send_handshake, ); @@ -506,7 +530,12 @@ impl Scheduler { mut s2b_create_channel_s, s2b_shutdown_bparticipant_s, api_participant_closed, - ) = BParticipant::new(pid, sid, metrics.clone()); + ) = BParticipant::new( + pid, + sid, + #[cfg(feature = "metrics")] + metrics.clone(), + ); let participant = Participant::new( local_pid, @@ -517,6 +546,7 @@ impl Scheduler { api_participant_closed, ); + #[cfg(feature = "metrics")] metrics.participants_connected_total.inc(); participants.insert(pid, ParticipantInfo { secret, diff --git a/network/src/types.rs b/network/src/types.rs index 6b358cad54..3ede7fd302 100644 --- a/network/src/types.rs +++ b/network/src/types.rs @@ -90,8 +90,10 @@ pub(crate) enum Frame { } impl Frame { + #[cfg(feature = "metrics")] pub const FRAMES_LEN: u8 = 8; + #[cfg(feature = "metrics")] pub const fn int_to_string(i: u8) -> &'static str { match i { 0 => "Handshake", @@ -106,6 +108,7 @@ impl Frame { } } + #[cfg(feature = "metrics")] pub fn get_int(&self) -> u8 { match self { Frame::Handshake { .. } => 0, @@ -119,6 +122,7 @@ impl Frame { } } + #[cfg(feature = "metrics")] pub fn get_string(&self) -> &str { Self::int_to_string(self.get_int()) } } @@ -130,7 +134,7 @@ impl Pid { /// use veloren_network::{Network, Pid}; /// /// let pid = Pid::new(); - /// let _ = Network::new(pid, None); + /// let _ = Network::new(pid); /// ``` pub fn new() -> Self { Self { diff --git a/network/tests/helper.rs b/network/tests/helper.rs index f20d29b3cf..53ce8e0c47 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -49,9 +49,9 @@ pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) { pub async fn network_participant_stream( addr: ProtocolAddr, ) -> (Network, Participant, Stream, Network, Participant, Stream) { - let (n_a, f_a) = Network::new(Pid::fake(1), None); + let (n_a, f_a) = Network::new(Pid::fake(1)); std::thread::spawn(f_a); - let (n_b, f_b) = Network::new(Pid::fake(2), None); + let (n_b, f_b) = Network::new(Pid::fake(2)); std::thread::spawn(f_b); n_a.listen(addr.clone()).await.unwrap(); diff --git a/network/tests/integration.rs b/network/tests/integration.rs index be83cb853b..7f37d5e78e 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -62,8 +62,8 @@ fn stream_simple_udp_3msg() { #[ignore] fn tcp_and_udp_2_connections() -> std::result::Result<(), Box> { let (_, _) = helper::setup(false, 0); - let (network, f) = Network::new(Pid::new(), None); - let (remote, fr) = Network::new(Pid::new(), None); + let (network, f) = Network::new(Pid::new()); + let (remote, fr) = Network::new(Pid::new()); std::thread::spawn(f); std::thread::spawn(fr); block_on(async { @@ -87,7 +87,7 @@ fn tcp_and_udp_2_connections() -> std::result::Result<(), Box std::result::Result<(), Box> { let (_, _) = helper::setup(false, 0); - let (network, f) = Network::new(Pid::new(), None); + let (network, f) = Network::new(Pid::new()); std::thread::spawn(f); let udp1 = udp(); let tcp1 = tcp(); @@ -95,7 +95,7 @@ fn failed_listen_on_used_ports() -> std::result::Result<(), Box std::result::Result<(), Box> let (_, _) = helper::setup(false, 0); // Create a Network, listen on Port `1200` and wait for a Stream to be opened, // then answer `Hello World` - let (network, f) = Network::new(Pid::new(), None); - let (remote, fr) = Network::new(Pid::new(), None); + let (network, f) = Network::new(Pid::new()); + let (remote, fr) = Network::new(Pid::new()); std::thread::spawn(f); std::thread::spawn(fr); block_on(async { @@ -148,8 +148,8 @@ fn api_stream_recv_main() -> std::result::Result<(), Box> let (_, _) = helper::setup(false, 0); // Create a Network, listen on Port `1220` and wait for a Stream to be opened, // then listen on it - let (network, f) = Network::new(Pid::new(), None); - let (remote, fr) = Network::new(Pid::new(), None); + let (network, f) = Network::new(Pid::new()); + let (remote, fr) = Network::new(Pid::new()); std::thread::spawn(f); std::thread::spawn(fr); block_on(async { diff --git a/server/Cargo.toml b/server/Cargo.toml index 2a5215866a..8d52f0b276 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,7 +11,7 @@ default = ["worldgen"] [dependencies] common = { package = "veloren-common", path = "../common" } world = { package = "veloren-world", path = "../world" } -network = { package = "veloren_network", path = "../network", default-features = false } +network = { package = "veloren_network", path = "../network", features = ["metrics"], default-features = false } specs-idvs = { git = "https://gitlab.com/veloren/specs-idvs.git", branch = "specs-git" } diff --git a/server/src/lib.rs b/server/src/lib.rs index 88166f25e9..4ed6b6272f 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -53,7 +53,7 @@ use std::{ }; #[cfg(not(feature = "worldgen"))] use test_world::{World, WORLD_SIZE}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use uvth::{ThreadPool, ThreadPoolBuilder}; use vek::*; #[cfg(feature = "worldgen")] @@ -240,7 +240,7 @@ impl Server { let thread_pool = ThreadPoolBuilder::new() .name("veloren-worker".to_string()) .build(); - let (network, f) = Network::new(Pid::new(), None); + let (network, f) = Network::new(Pid::new()); thread_pool.execute(f); block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?; @@ -599,7 +599,16 @@ impl Server { loop { let participant = self.network.connected().await?; debug!("New Participant connected to the server"); - let singleton_stream = participant.opened().await?; + let singleton_stream = match participant.opened().await { + Ok(s) => s, + Err(e) => { + warn!( + ?e, + "Failed to open a Stream from remote client. Dropping it" + ); + continue; + }, + }; let mut client = Client { client_state: ClientState::Connected,