use crate::{ message::{partial_eq_bincode, Message}, participant::{A2bStreamOpen, S2bShutdownBparticipant}, scheduler::{A2sConnect, Scheduler}, }; use bytes::Bytes; #[cfg(feature = "compression")] use lz_fear::raw::DecodeError; use network_protocol::{Bandwidth, InitProtocolError, Pid, Prio, Promises, Sid}; #[cfg(feature = "metrics")] use prometheus::Registry; use serde::{de::DeserializeOwned, Serialize}; use std::{ collections::HashMap, net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, time::Duration, }; use tokio::{ io, runtime::Runtime, sync::{mpsc, oneshot, watch, Mutex}, }; use tracing::*; type A2sDisconnect = Arc>>>; /// Represents a Tcp or Udp or Mpsc address #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub enum ProtocolAddr { Tcp(SocketAddr), Udp(SocketAddr), Mpsc(u64), } /// `Participants` are generated by the [`Network`] and represent a connection /// to a remote Participant. Look at the [`connect`] and [`connected`] method of /// [`Networks`] on how to generate `Participants` /// /// [`Networks`]: crate::api::Network /// [`connect`]: Network::connect /// [`connected`]: Network::connected pub struct Participant { local_pid: Pid, remote_pid: Pid, a2b_open_stream_s: Mutex>, b2a_stream_opened_r: Mutex>, b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: A2sDisconnect, } /// `Streams` represents a channel to send `n` messages with a certain priority /// and [`Promises`]. messages need always to be send between 2 `Streams`. /// /// `Streams` are generated by the [`Participant`]. /// Look at the [`open`] and [`opened`] method of [`Participant`] on how to /// generate `Streams` /// /// Unlike [`Network`] and [`Participant`], `Streams` don't implement interior /// mutability, as multiple threads don't need access to the same `Stream`. /// /// [`Networks`]: crate::api::Network /// [`open`]: Participant::open /// [`opened`]: Participant::opened #[derive(Debug)] pub struct Stream { local_pid: Pid, remote_pid: Pid, sid: Sid, prio: Prio, promises: Promises, guaranteed_bandwidth: Bandwidth, send_closed: Arc, a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>, b2a_msg_recv_r: Option>, a2b_close_stream_s: Option>, } /// Error type thrown by [`Networks`](Network) methods #[derive(Debug)] pub enum NetworkError { NetworkClosed, ListenFailed(std::io::Error), ConnectFailed(NetworkConnectError), } /// Error type thrown by [`Networks`](Network) connect #[derive(Debug)] pub enum NetworkConnectError { /// Either a Pid UUID clash or you are trying to hijack a connection InvalidSecret, Handshake(InitProtocolError), Io(std::io::Error), } /// Error type thrown by [`Participants`](Participant) methods #[derive(Debug, PartialEq, Clone)] pub enum ParticipantError { ///Participant was closed by remote side ParticipantDisconnected, ///Underlying Protocol failed and wasn't able to recover, expect some Data /// loss unfortunately, there is no method to get the exact messages /// that failed. This is also returned when local side tries to do /// something while remote site gracefully disconnects ProtocolFailedUnrecoverable, } /// Error type thrown by [`Streams`](Stream) methods /// A Compression Error should only happen if a client sends malicious code. /// A Deserialize Error probably means you are expecting Type X while you /// actually got send type Y. #[derive(Debug)] pub enum StreamError { StreamClosed, #[cfg(feature = "compression")] Compression(DecodeError), Deserialize(bincode::Error), } /// All Parameters of a Stream, can be used to generate RawMessages #[derive(Debug, Clone)] pub struct StreamParams { pub(crate) promises: Promises, } /// Use the `Network` to create connections to other [`Participants`] /// /// The `Network` is the single source that handles all connections in your /// Application. You can pass it around multiple threads in an /// [`Arc`](std::sync::Arc) as all commands have internal mutability. /// /// The `Network` has methods to [`connect`] to other [`Participants`] actively /// via their [`ProtocolAddr`], or [`listen`] passively for [`connected`] /// [`Participants`]. /// /// Too guarantee a clean shutdown, the [`Runtime`] MUST NOT be droped before /// the Network. /// /// # Examples /// ```rust /// use tokio::runtime::Runtime; /// use veloren_network::{Network, ProtocolAddr, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application /// let runtime = Runtime::new().unwrap(); /// let network = Network::new(Pid::new(), &runtime); /// runtime.block_on(async{ /// # //setup pseudo database! /// # let database = Network::new(Pid::new(), &runtime); /// # database.listen(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?; /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?; /// let database = network.connect(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?; /// drop(network); /// # drop(database); /// # Ok(()) /// }) /// # } /// ``` /// /// [`Participants`]: crate::api::Participant /// [`Runtime`]: tokio::runtime::Runtime /// [`connect`]: Network::connect /// [`listen`]: Network::listen /// [`connected`]: Network::connected pub struct Network { local_pid: Pid, participant_disconnect_sender: Arc>>, listen_sender: Mutex>)>>, connect_sender: Mutex>, connected_receiver: Mutex>, shutdown_network_s: Option>>, } impl Network { /// Generates a new `Network` to handle all connections in an Application /// /// # Arguments /// * `participant_id` - provide it by calling [`Pid::new()`], usually you /// don't want to reuse a Pid for 2 `Networks` /// * `runtime` - provide a [`Runtime`], it's used to internally spawn /// tasks. It is necessary to clean up in the non-async `Drop`. **All** /// network related components **must** be dropped before the runtime is /// stopped. dropping the runtime while a shutdown is still in progress /// leaves the network in a bad state which might cause a panic! /// /// # Result /// * `Self` - returns a `Network` which can be `Send` to multiple areas of /// your code, including multiple threads. This is the base strct of this /// crate. /// /// # Examples /// ```rust /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, ProtocolAddr}; /// /// let runtime = Runtime::new().unwrap(); /// let network = Network::new(Pid::new(), &runtime); /// ``` /// /// Usually you only create a single `Network` for an application, /// except when client and server are in the same application, then you /// will want 2. However there are no technical limitations from /// creating more. /// /// [`Pid::new()`]: network_protocol::Pid::new /// [`Runtime`]: tokio::runtime::Runtime pub fn new(participant_id: Pid, runtime: &Runtime) -> Self { Self::internal_new( participant_id, runtime, #[cfg(feature = "metrics")] None, ) } /// See [`new`] /// /// # additional Arguments /// * `registry` - Provide a Registry in order to collect Prometheus metrics /// by this `Network`, `None` will deactivate Tracing. Tracing is done via /// [`prometheus`] /// /// # Examples /// ```rust /// use prometheus::Registry; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, ProtocolAddr}; /// /// let runtime = Runtime::new().unwrap(); /// let registry = Registry::new(); /// let network = Network::new_with_registry(Pid::new(), &runtime, ®istry); /// ``` /// [`new`]: crate::api::Network::new #[cfg(feature = "metrics")] pub fn new_with_registry(participant_id: Pid, runtime: &Runtime, registry: &Registry) -> Self { Self::internal_new(participant_id, runtime, Some(registry)) } fn internal_new( participant_id: Pid, runtime: &Runtime, #[cfg(feature = "metrics")] registry: Option<&Registry>, ) -> Self { let p = participant_id; let span = tracing::info_span!("network", ?p); span.in_scope(|| trace!("Starting Network")); let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) = Scheduler::new( participant_id, #[cfg(feature = "metrics")] registry, ); let participant_disconnect_sender = Arc::new(Mutex::new(HashMap::new())); let (shutdown_network_s, shutdown_network_r) = oneshot::channel(); let f = Self::shutdown_mgr( p, shutdown_network_r, Arc::clone(&participant_disconnect_sender), shutdown_sender, ); runtime.spawn(f); runtime.spawn( async move { trace!("Starting scheduler in own thread"); scheduler.run().await; trace!("Stopping scheduler and his own thread"); } .instrument(tracing::info_span!("network", ?p)), ); Self { local_pid: participant_id, participant_disconnect_sender, listen_sender: Mutex::new(listen_sender), connect_sender: Mutex::new(connect_sender), connected_receiver: Mutex::new(connected_receiver), shutdown_network_s: Some(shutdown_network_s), } } /// starts listening on an [`ProtocolAddr`]. /// When the method returns the `Network` is ready to listen for incoming /// connections OR has returned a [`NetworkError`] (e.g. port already used). /// You can call [`connected`] to asynchrony wait for a [`Participant`] to /// connect. You can call `listen` on multiple addresses, e.g. to /// support multiple Protocols or NICs. /// /// # Examples /// ```ignore /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, ProtocolAddr}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally /// let runtime = Runtime::new().unwrap(); /// let network = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network /// .listen(ProtocolAddr::Tcp("127.0.0.1:2000".parse().unwrap())) /// .await?; /// network /// .listen(ProtocolAddr::Udp("127.0.0.1:2001".parse().unwrap())) /// .await?; /// drop(network); /// # Ok(()) /// }) /// # } /// ``` /// /// [`connected`]: Network::connected #[instrument(name="network", skip(self, address), fields(p = %self.local_pid))] pub async fn listen(&self, address: ProtocolAddr) -> Result<(), NetworkError> { let (s2a_result_s, s2a_result_r) = oneshot::channel::>(); debug!(?address, "listening on address"); self.listen_sender .lock() .await .send((address, s2a_result_s))?; match s2a_result_r.await? { //waiting guarantees that we either listened successfully or get an error like port in // use Ok(()) => Ok(()), Err(e) => Err(NetworkError::ListenFailed(e)), } } /// starts connection to an [`ProtocolAddr`]. /// When the method returns the Network either returns a [`Participant`] /// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g. /// can't connect, or invalid Handshake) # Examples /// ```ignore /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, ProtocolAddr}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above /// let runtime = Runtime::new().unwrap(); /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2010".parse().unwrap())).await?; /// # remote.listen(ProtocolAddr::Udp("127.0.0.1:2011".parse().unwrap())).await?; /// let p1 = network /// .connect(ProtocolAddr::Tcp("127.0.0.1:2010".parse().unwrap())) /// .await?; /// # //this doesn't work yet, so skip the test /// # //TODO fixme! /// # return Ok(()); /// let p2 = network /// .connect(ProtocolAddr::Udp("127.0.0.1:2011".parse().unwrap())) /// .await?; /// assert_eq!(&p1, &p2); /// # Ok(()) /// })?; /// drop(network); /// # drop(remote); /// # Ok(()) /// # } /// ``` /// Usually the `Network` guarantees that a operation on a [`Participant`] /// succeeds, e.g. by automatic retrying unless it fails completely e.g. by /// disconnecting from the remote. If 2 [`ProtocolAddres`] you `connect` to /// belongs to the same [`Participant`], you get the same [`Participant`] as /// a result. This is useful e.g. by connecting to the same /// [`Participant`] via multiple Protocols. /// /// [`Streams`]: crate::api::Stream /// [`ProtocolAddres`]: crate::api::ProtocolAddr #[instrument(name="network", skip(self, address), fields(p = %self.local_pid))] pub async fn connect(&self, address: ProtocolAddr) -> Result { let (pid_sender, pid_receiver) = oneshot::channel::>(); debug!(?address, "Connect to address"); self.connect_sender .lock() .await .send((address, pid_sender))?; let participant = match pid_receiver.await? { Ok(p) => p, Err(e) => return Err(NetworkError::ConnectFailed(e)), }; let remote_pid = participant.remote_pid; trace!(?remote_pid, "connected"); self.participant_disconnect_sender .lock() .await .insert(remote_pid, Arc::clone(&participant.a2s_disconnect_s)); Ok(participant) } /// returns a [`Participant`] created from a [`ProtocolAddr`] you called /// [`listen`] on before. This function will either return a working /// [`Participant`] ready to open [`Streams`] on OR has returned a /// [`NetworkError`] (e.g. Network got closed) /// /// # Examples /// ```rust /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, ProtocolAddr}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2020` TCP and opens returns their Pid /// let runtime = Runtime::new().unwrap(); /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network /// .listen(ProtocolAddr::Tcp("127.0.0.1:2020".parse().unwrap())) /// .await?; /// # remote.connect(ProtocolAddr::Tcp("127.0.0.1:2020".parse().unwrap())).await?; /// while let Ok(participant) = network.connected().await { /// println!("Participant connected: {}", participant.remote_pid()); /// # //skip test here as it would be a endless loop /// # break; /// } /// drop(network); /// # drop(remote); /// # Ok(()) /// }) /// # } /// ``` /// /// [`Streams`]: crate::api::Stream /// [`listen`]: crate::api::Network::listen #[instrument(name="network", skip(self), fields(p = %self.local_pid))] pub async fn connected(&self) -> Result { let participant = self.connected_receiver.lock().await.recv().await?; self.participant_disconnect_sender.lock().await.insert( participant.remote_pid, Arc::clone(&participant.a2s_disconnect_s), ); Ok(participant) } /// Use a mgr to handle shutdown smoothly and not in `Drop` #[instrument(name="network", skip(participant_disconnect_sender, shutdown_scheduler_s), fields(p = %local_pid))] async fn shutdown_mgr( local_pid: Pid, shutdown_network_r: oneshot::Receiver>, participant_disconnect_sender: Arc>>, shutdown_scheduler_s: oneshot::Sender<()>, ) { trace!("waiting for shutdown triggerNetwork"); let return_s = shutdown_network_r.await; trace!("Shutting down Participants of Network"); let mut finished_receiver_list = vec![]; for (remote_pid, a2s_disconnect_s) in participant_disconnect_sender.lock().await.drain() { match a2s_disconnect_s.lock().await.take() { Some(a2s_disconnect_s) => { trace!(?remote_pid, "Participants will be closed"); let (finished_sender, finished_receiver) = oneshot::channel(); finished_receiver_list.push((remote_pid, finished_receiver)); a2s_disconnect_s .send((remote_pid, (Duration::from_secs(120), finished_sender))) .expect("Scheduler is closed, but nobody other should be able to close it"); }, None => trace!(?remote_pid, "Participant already disconnected gracefully"), } } //wait after close is requested for all for (remote_pid, finished_receiver) in finished_receiver_list.drain(..) { match finished_receiver.await { Ok(Ok(())) => trace!(?remote_pid, "disconnect successful"), Ok(Err(e)) => info!(?remote_pid, ?e, "unclean disconnect"), Err(e) => warn!( ?remote_pid, ?e, "Failed to get a message back from the scheduler, seems like the network is \ already closed" ), } } trace!("Participants have shut down - next: Scheduler"); if let Err(()) = shutdown_scheduler_s.send(()) { error!("Scheduler is closed, but nobody other should be able to close it") }; if let Ok(return_s) = return_s { if return_s.send(()).is_err() { warn!("Network::drop stoped after a timeout and didn't wait for our shutdown"); }; } debug!("Network has shut down"); } } impl Participant { pub(crate) fn new( local_pid: Pid, remote_pid: Pid, a2b_open_stream_s: mpsc::UnboundedSender, b2a_stream_opened_r: mpsc::UnboundedReceiver, b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>, ) -> Self { Self { local_pid, remote_pid, a2b_open_stream_s: Mutex::new(a2b_open_stream_s), b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r), b2a_bandwidth_stats_r, a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), } } /// Opens a [`Stream`] on this `Participant` with a certain Priority and /// [`Promises`] /// /// # Arguments /// * `prio` - defines which stream is processed first when limited on /// bandwidth. See [`Prio`] for documentation. /// * `promises` - use a combination of you prefered [`Promises`], see the /// link for further documentation. You can combine them, e.g. /// `Promises::ORDERED | Promises::CONSISTENCY` The Stream will then /// guarantee that those promises are met. /// * `bandwidth` - sets a guaranteed bandwidth which is reserved for this /// stream. When excess bandwidth is available it will be used. See /// [`Bandwidth`] for details. /// /// A [`ParticipantError`] might be thrown if the `Participant` is already /// closed. [`Streams`] can be created without a answer from the remote /// side, resulting in very fast creation and closing latency. /// /// # Examples /// ```rust /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, Promises, ProtocolAddr}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port 2100 and open a stream /// let runtime = Runtime::new().unwrap(); /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap())).await?; /// let p1 = network /// .connect(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap())) /// .await?; /// let _s1 = p1 /// .open(4, Promises::ORDERED | Promises::CONSISTENCY, 1000) /// .await?; /// drop(network); /// # drop(remote); /// # Ok(()) /// }) /// # } /// ``` /// /// [`Prio`]: network_protocol::Prio /// [`Bandwidth`]: network_protocol::Bandwidth /// [`Promises`]: network_protocol::Promises /// [`Streams`]: crate::api::Stream #[instrument(name="network", skip(self, prio, promises, bandwidth), fields(p = %self.local_pid))] pub async fn open( &self, prio: u8, promises: Promises, bandwidth: Bandwidth, ) -> Result { debug_assert!(prio <= network_protocol::HIGHEST_PRIO, "invalid prio"); let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel::(); if let Err(e) = self.a2b_open_stream_s.lock().await.send(( prio, promises, bandwidth, p2a_return_stream_s, )) { debug!(?e, "bParticipant is already closed, notifying"); return Err(ParticipantError::ParticipantDisconnected); } match p2a_return_stream_r.await { Ok(stream) => { let sid = stream.sid; trace!(?sid, "opened stream"); Ok(stream) }, Err(_) => { debug!("p2a_return_stream_r failed, closing participant"); Err(ParticipantError::ParticipantDisconnected) }, } } /// Use this method to handle [`Streams`] opened from remote site, like the /// [`connected`] method of [`Network`]. This is the associated method /// to [`open`]. It's guaranteed that the order of [`open`] and `opened` /// is equal. The `nth` [`Streams`] on one side will represent the `nth` on /// the other side. A [`ParticipantError`] might be thrown if the /// `Participant` is already closed. /// /// # Examples /// ```rust /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, ProtocolAddr, Promises}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port 2110 and wait for the other side to open a stream /// // Note: It's quite unusual to actively connect, but then wait on a stream to be connected, usually the Application taking initiative want's to also create the first Stream. /// let runtime = Runtime::new().unwrap(); /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?; /// let p1 = network.connect(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?; /// # let p2 = remote.connected().await?; /// # p2.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// let _s1 = p1.opened().await?; /// drop(network); /// # drop(remote); /// # Ok(()) /// }) /// # } /// ``` /// /// [`Streams`]: crate::api::Stream /// [`connected`]: Network::connected /// [`open`]: Participant::open #[instrument(name="network", skip(self), fields(p = %self.local_pid))] pub async fn opened(&self) -> Result { match self.b2a_stream_opened_r.lock().await.recv().await { Some(stream) => { let sid = stream.sid; debug!(?sid, "Receive opened stream"); Ok(stream) }, None => { debug!("stream_opened_receiver failed, closing participant"); Err(ParticipantError::ParticipantDisconnected) }, } } /// disconnecting a `Participant` in a async way. /// Use this rather than `Participant::Drop` if you want to close multiple /// `Participants`. /// /// This function will wait for all [`Streams`] to properly close, including /// all messages to be send before closing. If an error occurs with one /// of the messages. /// Except if the remote side already dropped the `Participant` /// simultaneously, then messages won't be send /// /// There is NO `disconnected` function in `Participant`, if a `Participant` /// is no longer reachable (e.g. as the network cable was unplugged) the /// `Participant` will fail all action, but needs to be manually /// disconnected, using this function. /// /// # Examples /// ```rust /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, ProtocolAddr}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection. /// let runtime = Runtime::new().unwrap(); /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// let err = runtime.block_on(async { /// network /// .listen(ProtocolAddr::Tcp("127.0.0.1:2030".parse().unwrap())) /// .await?; /// # let keep_alive = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2030".parse().unwrap())).await?; /// while let Ok(participant) = network.connected().await { /// println!("Participant connected: {}", participant.remote_pid()); /// participant.disconnect().await?; /// # //skip test here as it would be a endless loop /// # break; /// } /// # Ok(()) /// }); /// drop(network); /// # drop(remote); /// # err /// # } /// ``` /// /// [`Streams`]: crate::api::Stream #[instrument(name="network", skip(self), fields(p = %self.local_pid))] pub async fn disconnect(self) -> Result<(), ParticipantError> { // Remove, Close and try_unwrap error when unwrap fails! debug!("Closing participant from network"); //Streams will be closed by BParticipant match self.a2s_disconnect_s.lock().await.take() { Some(a2s_disconnect_s) => { let (finished_sender, finished_receiver) = oneshot::channel(); // Participant is connecting to Scheduler here, not as usual // Participant<->BParticipant a2s_disconnect_s .send((self.remote_pid, (Duration::from_secs(120), finished_sender))) .expect("Something is wrong in internal scheduler coding"); match finished_receiver.await { Ok(res) => { match res { Ok(()) => trace!("Participant is now closed"), Err(ref e) => { trace!(?e, "Error occurred during shutdown of participant") }, }; res }, Err(e) => { //this is a bug. but as i am Participant i can't destroy the network error!( ?e, "Failed to get a message back from the scheduler, seems like the \ network is already closed" ); Err(ParticipantError::ProtocolFailedUnrecoverable) }, } }, None => { warn!( "seems like you are trying to disconnecting a participant after the network \ was already dropped. It was already dropped with the network!" ); Err(ParticipantError::ParticipantDisconnected) }, } } /// Returns the current approximation on the maximum bandwidth available. /// This WILL fluctuate based on the amount/size of send messages. pub fn bandwidth(&self) -> f32 { *self.b2a_bandwidth_stats_r.borrow() } /// Returns the remote [`Pid`](network_protocol::Pid) pub fn remote_pid(&self) -> Pid { self.remote_pid } } impl Stream { #[allow(clippy::too_many_arguments)] pub(crate) fn new( local_pid: Pid, remote_pid: Pid, sid: Sid, prio: Prio, promises: Promises, guaranteed_bandwidth: Bandwidth, send_closed: Arc, a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>, b2a_msg_recv_r: async_channel::Receiver, a2b_close_stream_s: mpsc::UnboundedSender, ) -> Self { Self { local_pid, remote_pid, sid, prio, promises, guaranteed_bandwidth, send_closed, a2b_msg_s, b2a_msg_recv_r: Some(b2a_msg_recv_r), a2b_close_stream_s: Some(a2b_close_stream_s), } } /// use to send a arbitrary message to the remote side, by having the remote /// side also opened a `Stream` linked to this. the message will be /// [`Serialized`], which actually is quite slow compared to most other /// calculations done. A faster method [`send_raw`] exists, when extra /// speed is needed. The other side needs to use the respective [`recv`] /// function and know the type send. /// /// `send` is an exception to the `async` messages, as it's probably called /// quite often so it doesn't wait for execution. Which also means, that /// no feedback is provided. It's to assume that the Message got `send` /// correctly. If a error occurred, the next call will return an Error. /// If the [`Participant`] disconnected it will also be unable to be used /// any more. A [`StreamError`] will be returned in the error case, e.g. /// when the `Stream` got closed already. /// /// Note when a `Stream` is dropped locally, it will still send all /// messages, though the `drop` will return immediately, however, when a /// [`Participant`] gets gracefully shut down, all remaining messages /// will be send. If the `Stream` is dropped from remote side no further /// messages are send, because the remote side has no way of listening /// to them either way. If the last channel is destroyed (e.g. by losing /// the internet connection or non-graceful shutdown, pending messages /// are also dropped. /// /// # Example /// ``` /// # use veloren_network::Promises; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, ProtocolAddr, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World` /// let runtime = Runtime::new().unwrap(); /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?; /// # // keep it alive /// # let _stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// let participant_a = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; /// //Send Message /// stream_a.send("Hello World")?; /// drop(network); /// # drop(remote); /// # Ok(()) /// }) /// # } /// ``` /// /// [`send_raw`]: Stream::send_raw /// [`recv`]: Stream::recv /// [`Serialized`]: Serialize #[inline] pub fn send(&mut self, msg: M) -> Result<(), StreamError> { self.send_raw(&Message::serialize(&msg, self.params())) } /// This methods give the option to skip multiple calls of [`bincode`] and /// [`compress`], e.g. in case the same Message needs to send on /// multiple `Streams` to multiple [`Participants`]. Other then that, /// the same rules apply than for [`send`]. /// You need to create a Message via [`Message::serialize`]. /// /// # Example /// ```rust /// # use veloren_network::Promises; /// use tokio::runtime::Runtime; /// use bincode; /// use veloren_network::{Network, ProtocolAddr, Pid, Message}; /// /// # fn main() -> std::result::Result<(), Box> { /// let runtime = Runtime::new().unwrap(); /// let network = Network::new(Pid::new(), &runtime); /// # let remote1 = Network::new(Pid::new(), &runtime); /// # let remote2 = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; /// # let remote1_p = remote1.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; /// # let remote2_p = remote2.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; /// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid()); /// # remote1_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// # remote2_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// let participant_a = network.connected().await?; /// let participant_b = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; /// let mut stream_b = participant_b.opened().await?; /// /// //Prepare Message and decode it /// let msg = Message::serialize("Hello World", stream_a.params()); /// //Send same Message to multiple Streams /// stream_a.send_raw(&msg); /// stream_b.send_raw(&msg); /// drop(network); /// # drop(remote1); /// # drop(remote2); /// # Ok(()) /// }) /// # } /// ``` /// /// [`send`]: Stream::send /// [`Participants`]: crate::api::Participant /// [`compress`]: lz_fear::raw::compress2 /// [`Message::serialize`]: crate::message::Message::serialize pub fn send_raw(&mut self, message: &Message) -> Result<(), StreamError> { if self.send_closed.load(Ordering::Relaxed) { return Err(StreamError::StreamClosed); } #[cfg(debug_assertions)] message.verify(self.params()); self.a2b_msg_s.send((self.sid, message.data.clone()))?; Ok(()) } /// use `recv` to wait on a Message send from the remote side by their /// `Stream`. The Message needs to implement [`DeserializeOwned`] and /// thus, the resulting type must already be known by the receiving side. /// If this is not know from the Application logic, one could use a `Enum` /// and then handle the received message via a `match` state. /// /// A [`StreamError`] will be returned in the error case, e.g. when the /// `Stream` got closed already. /// /// # Example /// ``` /// # use veloren_network::Promises; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, ProtocolAddr, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it /// let runtime = Runtime::new().unwrap(); /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?; /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// # stream_p.send("Hello World"); /// let participant_a = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; /// //Recv Message /// println!("{}", stream_a.recv::().await?); /// drop(network); /// # drop(remote); /// # Ok(()) /// }) /// # } /// ``` #[inline] pub async fn recv(&mut self) -> Result { self.recv_raw().await?.deserialize() } /// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] or /// [`decompress`] is executed for performance reasons. /// /// # Example /// ``` /// # use veloren_network::Promises; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, ProtocolAddr, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2230` and wait for a Stream to be opened, then listen on it /// let runtime = Runtime::new().unwrap(); /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?; /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// # stream_p.send("Hello World"); /// let participant_a = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; /// //Recv Message /// let msg = stream_a.recv_raw().await?; /// //Resend Message, without deserializing /// stream_a.send_raw(&msg)?; /// drop(network); /// # drop(remote); /// # Ok(()) /// }) /// # } /// ``` /// /// [`send_raw`]: Stream::send_raw /// [`recv`]: Stream::recv /// [`decompress`]: lz_fear::raw::decompress_raw pub async fn recv_raw(&mut self) -> Result { match &mut self.b2a_msg_recv_r { Some(b2a_msg_recv_r) => { match b2a_msg_recv_r.recv().await { Ok(data) => Ok(Message { data, #[cfg(feature = "compression")] compressed: self.promises.contains(Promises::COMPRESSED), }), Err(_) => { self.b2a_msg_recv_r = None; //prevent panic Err(StreamError::StreamClosed) }, } }, None => Err(StreamError::StreamClosed), } } /// use `try_recv` to check for a Message send from the remote side by their /// `Stream`. This function does not block and returns immediately. It's /// intended for use in non-async context only. Other then that, the /// same rules apply than for [`recv`]. /// /// # Example /// ``` /// # use veloren_network::Promises; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, ProtocolAddr, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2240` and wait for a Stream to be opened, then listen on it /// let runtime = Runtime::new().unwrap(); /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?; /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// # stream_p.send("Hello World"); /// # std::thread::sleep(std::time::Duration::from_secs(1)); /// let participant_a = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; /// //Try Recv Message /// println!("{:?}", stream_a.try_recv::()?); /// drop(network); /// # drop(remote); /// # Ok(()) /// }) /// # } /// ``` /// /// [`recv`]: Stream::recv #[inline] pub fn try_recv(&mut self) -> Result, StreamError> { match &mut self.b2a_msg_recv_r { Some(b2a_msg_recv_r) => match b2a_msg_recv_r.try_recv() { Ok(data) => Ok(Some( Message { data, #[cfg(feature = "compression")] compressed: self.promises.contains(Promises::COMPRESSED), } .deserialize()?, )), Err(async_channel::TryRecvError::Empty) => Ok(None), Err(async_channel::TryRecvError::Closed) => { self.b2a_msg_recv_r = None; //prevent panic Err(StreamError::StreamClosed) }, }, None => Err(StreamError::StreamClosed), } } pub fn params(&self) -> StreamParams { StreamParams { promises: self.promises, } } } impl core::cmp::PartialEq for Participant { fn eq(&self, other: &Self) -> bool { //don't check local_pid, 2 Participant from different network should match if // they are the "same" self.remote_pid == other.remote_pid } } fn actively_wait(name: &'static str, mut finished_receiver: oneshot::Receiver, f: F) where F: FnOnce(T) + Send + 'static, T: Send + 'static, { const CHANNEL_ERR: &str = "Something is wrong in internal scheduler/participant coding"; if let Ok(handle) = tokio::runtime::Handle::try_current() { // When in Async Context WE MUST NOT SYNC BLOCK (as a deadlock might occur as // other is queued behind). And we CANNOT join our Future_Handle trace!("async context detected, defer shutdown"); handle.spawn(async move { match finished_receiver.await { Ok(data) => f(data), Err(e) => panic!("{}{}: {}", name, CHANNEL_ERR, e), } }); } else { let mut cnt = 0; loop { use tokio::sync::oneshot::error::TryRecvError; match finished_receiver.try_recv() { Ok(data) => { f(data); break; }, Err(TryRecvError::Closed) => panic!("{}{}", name, CHANNEL_ERR), Err(TryRecvError::Empty) => { trace!("activly sleeping"); cnt += 1; if cnt > 120 { error!("Timeout waiting for shutdown, dropping"); break; } std::thread::sleep(Duration::from_millis(100) * cnt); }, } } }; } impl Drop for Network { #[instrument(name="network", skip(self), fields(p = %self.local_pid))] fn drop(&mut self) { trace!("Dropping Network"); let (finished_sender, finished_receiver) = oneshot::channel(); match self .shutdown_network_s .take() .unwrap() .send(finished_sender) { Err(e) => warn!(?e, "Runtime seems to be dropped already"), Ok(()) => actively_wait("network", finished_receiver, |()| { info!("Network dropped gracefully") }), }; } } impl Drop for Participant { #[instrument(name="remote", skip(self), fields(p = %self.remote_pid))] #[instrument(name="network", skip(self), fields(p = %self.local_pid))] fn drop(&mut self) { const SHUTDOWN_ERR: &str = "Error while dropping the participant, couldn't send all \ outgoing messages, dropping remaining"; const SCHEDULER_ERR: &str = "Something is wrong in internal scheduler coding or you dropped the runtime to early"; // ignore closed, as we need to send it even though we disconnected the // participant from network debug!("Shutting down Participant"); match self.a2s_disconnect_s.try_lock() { Err(e) => debug!(?e, "Participant is beeing dropped by Network right now"), Ok(mut s) => match s.take() { None => info!("Participant already has been shutdown gracefully"), Some(a2s_disconnect_s) => { debug!("Disconnect from Scheduler"); let (finished_sender, finished_receiver) = oneshot::channel(); match a2s_disconnect_s .send((self.remote_pid, (Duration::from_secs(120), finished_sender))) { Err(e) => warn!(?e, SCHEDULER_ERR), Ok(()) => { actively_wait("participant", finished_receiver, |d| match d { Ok(()) => info!("Participant dropped gracefully"), Err(e) => error!(?e, SHUTDOWN_ERR), }); }, } }, }, } } } impl Drop for Stream { #[instrument(name="remote", skip(self), fields(p = %self.remote_pid))] #[instrument(name="network", skip(self), fields(p = %self.local_pid))] fn drop(&mut self) { // send if closed is unnecessary but doesn't hurt, we must not crash if !self.send_closed.load(Ordering::Relaxed) { let sid = self.sid; debug!(?sid, "Shutting down Stream"); if let Err(e) = self.a2b_close_stream_s.take().unwrap().send(self.sid) { debug!( ?e, "bparticipant part of a gracefully shutdown was already closed" ); } } else { let sid = self.sid; trace!(?sid, "Stream Drop not needed"); } } } impl std::fmt::Debug for Participant { #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, "Participant {{ local_pid: {:?}, remote_pid: {:?} }}", &self.local_pid, &self.remote_pid, ) } } impl From> for StreamError { fn from(_err: crossbeam_channel::SendError) -> Self { StreamError::StreamClosed } } impl From> for NetworkError { fn from(_err: crossbeam_channel::SendError) -> Self { NetworkError::NetworkClosed } } impl From for StreamError { fn from(_err: std::option::NoneError) -> Self { StreamError::StreamClosed } } impl From for NetworkError { fn from(_err: std::option::NoneError) -> Self { NetworkError::NetworkClosed } } impl From> for NetworkError { fn from(_err: mpsc::error::SendError) -> Self { NetworkError::NetworkClosed } } impl From for NetworkError { fn from(_err: oneshot::error::RecvError) -> Self { NetworkError::NetworkClosed } } impl From for NetworkError { fn from(_err: std::io::Error) -> Self { NetworkError::NetworkClosed } } impl From> for StreamError { fn from(err: Box) -> Self { StreamError::Deserialize(err) } } impl core::fmt::Display for StreamError { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { StreamError::StreamClosed => write!(f, "stream closed"), #[cfg(feature = "compression")] StreamError::Compression(err) => write!(f, "compression error on message: {}", err), StreamError::Deserialize(err) => write!(f, "deserialize error on message: {}", err), } } } impl core::fmt::Display for ParticipantError { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { ParticipantError::ParticipantDisconnected => write!(f, "Participant disconnect"), ParticipantError::ProtocolFailedUnrecoverable => { write!(f, "underlying protocol failed unrecoverable") }, } } } impl core::fmt::Display for NetworkError { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { NetworkError::NetworkClosed => write!(f, "Network closed"), NetworkError::ListenFailed(_) => write!(f, "Listening failed"), NetworkError::ConnectFailed(_) => write!(f, "Connecting failed"), } } } impl core::fmt::Display for NetworkConnectError { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { NetworkConnectError::Io(e) => write!(f, "Io error: {}", e), NetworkConnectError::Handshake(e) => write!(f, "Handshake error: {}", e), NetworkConnectError::InvalidSecret => { write!(f, "You specified the wrong secret on your second channel") }, } } } /// implementing PartialEq as it's super convenient in tests impl core::cmp::PartialEq for StreamError { fn eq(&self, other: &Self) -> bool { match self { StreamError::StreamClosed => match other { StreamError::StreamClosed => true, #[cfg(feature = "compression")] StreamError::Compression(_) => false, StreamError::Deserialize(_) => false, }, #[cfg(feature = "compression")] StreamError::Compression(err) => match other { StreamError::StreamClosed => false, #[cfg(feature = "compression")] StreamError::Compression(other_err) => err == other_err, StreamError::Deserialize(_) => false, }, StreamError::Deserialize(err) => match other { StreamError::StreamClosed => false, #[cfg(feature = "compression")] StreamError::Compression(_) => false, StreamError::Deserialize(other_err) => partial_eq_bincode(err, other_err), }, } } } impl std::error::Error for StreamError {} impl std::error::Error for ParticipantError {} impl std::error::Error for NetworkError {} impl std::error::Error for NetworkConnectError {}