//! //! //! //! (cd network/examples/async_recv && RUST_BACKTRACE=1 cargo run) use crate::{ message::{self, InCommingMessage, MessageBuffer, OutGoingMessage}, scheduler::Scheduler, types::{Mid, Pid, Prio, Promises, Sid}, }; use async_std::{io, sync::RwLock, task}; use futures::{ channel::{mpsc, oneshot}, sink::SinkExt, stream::StreamExt, }; use prometheus::Registry; use serde::{de::DeserializeOwned, Serialize}; use std::{ collections::HashMap, net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, }; use tracing::*; use tracing_futures::Instrument; use uvth::ThreadPool; /// Represents a Tcp or Udp or Mpsc address #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub enum Address { Tcp(SocketAddr), Udp(SocketAddr), Mpsc(u64), } /// `Participants` are generated by the [`Network`] and represent a connection /// to a remote Participant. Look at the [`connect`] and [`connected`] method of /// [`Networks`] on how to generate `Participants` /// /// [`Networks`]: crate::api::Network /// [`connect`]: Network::connect /// [`connected`]: Network::connected pub struct Participant { local_pid: Pid, remote_pid: Pid, a2b_steam_open_s: RwLock)>>, b2a_stream_opened_r: RwLock>, closed: AtomicBool, a2s_disconnect_s: Option>)>>, } /// `Streams` represents a channel to send `n` messages with a certain priority /// and [`Promises`]. messages need always to be send between 2 `Streams`. /// /// `Streams` are generated by the [`Participant`]. /// Look at the [`open`] and [`opened`] method of [`Participant`] on how to /// generate `Streams` /// /// Unlike [`Network`] and [`Participant`], `Streams` don't implement interior /// mutability, as multiple threads don't need access to the same `Stream`. /// [`Sync`] is not supported! In that case multiple `Streams` should be used /// instead. However it's still possible to [`Send`] `Streams`. /// /// [`Networks`]: crate::api::Network /// [`open`]: Participant::open /// [`opened`]: Participant::opened /// [`Send`]: std::marker::Send /// [`Sync`]: std::marker::Sync #[derive(Debug)] pub struct Stream { pid: Pid, sid: Sid, mid: Mid, prio: Prio, promises: Promises, a2b_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutGoingMessage)>, b2a_msg_recv_r: mpsc::UnboundedReceiver, closed: Arc, a2b_close_stream_s: Option>, } /// Error type thrown by [`Networks`](Network) methods #[derive(Debug)] pub enum NetworkError { NetworkClosed, ListenFailed(std::io::Error), } /// Error type thrown by [`Participants`](Participant) methods #[derive(Debug, PartialEq)] pub enum ParticipantError { ParticipantClosed, } /// Error type thrown by [`Streams`](Stream) methods #[derive(Debug, PartialEq)] pub enum StreamError { StreamClosed, } /// Use the `Network` to create connections to other [`Participants`] /// /// The `Network` is the single source that handles all connections in your /// Application. You can pass it around multiple threads in an /// [`Arc`](std::sync::Arc) as all commands have internal mutability. /// /// The `Network` has methods to [`connect`] and [`disconnect`] to other /// [`Participants`] via their [`Address`]. All [`Participants`] will be stored /// in the Network until explicitly disconnected, which is the only way to close /// the sockets. /// /// # Examples /// ```rust /// use veloren_network::{Network, Address, Pid}; /// use uvth::ThreadPoolBuilder; /// use futures::executor::block_on; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// block_on(async{ /// # //setup pseudo database! /// # let database = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// # database.listen(Address::Tcp("127.0.0.1:8080".parse().unwrap())).await?; /// network.listen(Address::Tcp("127.0.0.1:2999".parse().unwrap())).await?; /// let database = network.connect(Address::Tcp("127.0.0.1:8080".parse().unwrap())).await?; /// # Ok(()) /// }) /// # } /// ``` /// /// [`Participants`]: crate::api::Participant /// [`connect`]: Network::connect /// [`disconnect`]: Network::disconnect pub struct Network { local_pid: Pid, participants: RwLock>>, listen_sender: RwLock>)>>, connect_sender: RwLock>)>>, connected_receiver: RwLock>, shutdown_sender: Option>, } impl Network { /// Generates a new `Network` to handle all connections in an Application /// /// # Arguments /// * `participant_id` - provide it by calling [`Pid::new()`], usually you /// don't want to reuse a Pid for 2 `Networks` /// * `thread_pool` - you need to provide a [`ThreadPool`] where exactly 1 /// thread will be created to handle all `Network` internals. Additional /// threads will be allocated on an internal async-aware threadpool /// * `registry` - Provide a Registy in order to collect Prometheus metrics /// by this `Network`, `None` will deactivate Tracing. Tracing is done via /// [`prometheus`] /// /// # Examples /// ```rust /// use uvth::ThreadPoolBuilder; /// use veloren_network::{Address, Network, Pid}; /// /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// ``` /// /// Usually you only create a single `Network` for an application, except /// when client and server are in the same application, then you will want /// 2. However there are no technical limitations from creating more. /// /// [`Pid::new()`]: crate::types::Pid::new /// [`ThreadPool`]: uvth::ThreadPool pub fn new(participant_id: Pid, thread_pool: &ThreadPool, registry: Option<&Registry>) -> Self { let p = participant_id; debug!(?p, "starting Network"); let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) = Scheduler::new(participant_id, registry); thread_pool.execute(move || { trace!(?p, "starting sheduler in own thread"); let _handle = task::block_on( scheduler .run() .instrument(tracing::info_span!("scheduler", ?p)), ); trace!(?p, "stopping sheduler and his own thread"); }); Self { local_pid: participant_id, participants: RwLock::new(HashMap::new()), listen_sender: RwLock::new(listen_sender), connect_sender: RwLock::new(connect_sender), connected_receiver: RwLock::new(connected_receiver), shutdown_sender: Some(shutdown_sender), } } /// starts listening on an [`Address`]. /// When the method returns the `Network` is ready to listen for incoming /// connections OR has returned a [`NetworkError`] (e.g. port already used). /// You can call [`connected`] to asynchrony wait for a [`Participant`] to /// connect. You can call `listen` on multiple addresses, e.g. to /// support multiple Protocols or NICs. /// /// # Examples /// ```rust /// use futures::executor::block_on; /// use uvth::ThreadPoolBuilder; /// use veloren_network::{Address, Network, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// block_on(async { /// network /// .listen(Address::Tcp("0.0.0.0:2000".parse().unwrap())) /// .await?; /// network /// .listen(Address::Udp("127.0.0.1:2001".parse().unwrap())) /// .await?; /// # Ok(()) /// }) /// # } /// ``` /// /// [`connected`]: Network::connected pub async fn listen(&self, address: Address) -> Result<(), NetworkError> { let (s2a_result_s, s2a_result_r) = oneshot::channel::>(); debug!(?address, "listening on address"); self.listen_sender .write() .await .send((address, s2a_result_s)) .await?; match s2a_result_r.await? { //waiting guarantees that we either listened sucessfully or get an error like port in // use Ok(()) => Ok(()), Err(e) => Err(NetworkError::ListenFailed(e)), } } /// starts connectiong to an [`Address`]. /// When the method returns the Network either returns a [`Participant`] /// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g. /// can't connect, or invalid Handshake) # Examples /// ```rust /// use futures::executor::block_on; /// use uvth::ThreadPoolBuilder; /// use veloren_network::{Address, Network, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// block_on(async { /// # remote.listen(Address::Tcp("0.0.0.0:2010".parse().unwrap())).await?; /// # remote.listen(Address::Udp("0.0.0.0:2011".parse().unwrap())).await?; /// let p1 = network /// .connect(Address::Tcp("127.0.0.1:2010".parse().unwrap())) /// .await?; /// # //this doesn't work yet, so skip the test /// # //TODO fixme! /// # return Ok(()); /// let p2 = network /// .connect(Address::Udp("127.0.0.1:2011".parse().unwrap())) /// .await?; /// assert!(std::sync::Arc::ptr_eq(&p1, &p2)); /// # Ok(()) /// }) /// # } /// ``` /// Usually the `Network` guarantees that a operation on a [`Participant`] /// succeeds, e.g. by automatic retrying unless it fails completely e.g. by /// disconnecting from the remote. If 2 [`Addresses`] you `connect` to /// belongs to the same [`Participant`], you get the same [`Participant`] as /// a result. This is useful e.g. by connecting to the same /// [`Participant`] via multiple Protocols. /// /// [`Streams`]: crate::api::Stream /// [`Addresses`]: crate::api::Address pub async fn connect(&self, address: Address) -> Result, NetworkError> { let (pid_sender, pid_receiver) = oneshot::channel::>(); debug!(?address, "connect to address"); self.connect_sender .write() .await .send((address, pid_sender)) .await?; let participant = pid_receiver.await??; let pid = participant.remote_pid; debug!( ?pid, "received Participant id from remote and return to user" ); let participant = Arc::new(participant); self.participants .write() .await .insert(participant.remote_pid, participant.clone()); Ok(participant) } /// returns a [`Participant`] created from a [`Address`] you called /// [`listen`] on before. This function will either return a working /// [`Participant`] ready to open [`Streams`] on OR has returned a /// [`NetworkError`] (e.g. Network got closed) /// /// # Examples /// ```rust /// use futures::executor::block_on; /// use uvth::ThreadPoolBuilder; /// use veloren_network::{Address, Network, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2020` TCP and opens returns their Pid /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// block_on(async { /// network /// .listen(Address::Tcp("0.0.0.0:2020".parse().unwrap())) /// .await?; /// # remote.connect(Address::Tcp("0.0.0.0:2020".parse().unwrap())).await?; /// while let Ok(participant) = network.connected().await { /// println!("Participant connected: {}", participant.remote_pid()); /// # //skip test here as it would be a endless loop /// # break; /// } /// # Ok(()) /// }) /// # } /// ``` /// /// [`Streams`]: crate::api::Stream /// [`listen`]: crate::api::Network::listen pub async fn connected(&self) -> Result, NetworkError> { let participant = self.connected_receiver.write().await.next().await?; let participant = Arc::new(participant); self.participants .write() .await .insert(participant.remote_pid, participant.clone()); Ok(participant) } /// disconnecting a [`Participant`] where you move the last existing /// [`Arc`]. As the [`Network`] also holds [`Arc`] to the /// [`Participant`], you need to provide the last [`Arc`] and /// are not allowed to keep others. If you do so the [`Participant`] /// can't be disconnected properly. If you no longer have the respective /// [`Participant`], try using the [`participants`] method to get it. /// /// This function will wait for all [`Streams`] to properly close, including /// all messages to be send before closing. If an error occurs with one /// of the messages. /// Except if the remote side already dropped the [`Participant`] /// simultaneously, then messages won't be sended /// /// # Examples /// ```rust /// use futures::executor::block_on; /// use uvth::ThreadPoolBuilder; /// use veloren_network::{Address, Network, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection. /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// block_on(async { /// network /// .listen(Address::Tcp("0.0.0.0:2030".parse().unwrap())) /// .await?; /// # remote.connect(Address::Tcp("0.0.0.0:2030".parse().unwrap())).await?; /// while let Ok(participant) = network.connected().await { /// println!("Participant connected: {}", participant.remote_pid()); /// network.disconnect(participant).await?; /// # //skip test here as it would be a endless loop /// # break; /// } /// # Ok(()) /// }) /// # } /// ``` /// /// [`Arc`]: crate::api::Participant /// [`Streams`]: crate::api::Stream /// [`participants`]: Network::participants /// [`Arc`]: std::sync::Arc pub async fn disconnect(&self, participant: Arc) -> Result<(), NetworkError> { // Remove, Close and try_unwrap error when unwrap fails! let pid = participant.remote_pid; debug!(?pid, "removing participant from network"); self.participants.write().await.remove(&pid)?; participant.closed.store(true, Ordering::Relaxed); match Arc::try_unwrap(participant) { Err(_) => { warn!( "you are disconnecting and still keeping a reference to this participant, \ this is a bad idea. Participant will only be dropped when you drop your last \ reference" ); }, Ok(mut participant) => { trace!("waiting now for participant to close"); let (finished_sender, finished_receiver) = oneshot::channel(); // we are deleting here asyncly before DROP is called. Because this is done // nativly async, while drop needs an BLOCK! Drop will recognis // that it has been delete here and don't try another double delete. participant .a2s_disconnect_s .take() .unwrap() .send((pid, finished_sender)) .await .expect("something is wrong in internal scheduler coding"); let res = finished_receiver.await.unwrap(); trace!("participant is now closed"); res?; }, }; Ok(()) } /// returns a copy of all current connected [`Participants`] /// /// [`Participants`]: crate::api::Participant pub async fn participants(&self) -> HashMap> { self.participants.read().await.clone() } } impl Participant { pub(crate) fn new( local_pid: Pid, remote_pid: Pid, a2b_steam_open_s: mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender)>, b2a_stream_opened_r: mpsc::UnboundedReceiver, a2s_disconnect_s: mpsc::UnboundedSender<(Pid, oneshot::Sender>)>, ) -> Self { Self { local_pid, remote_pid, a2b_steam_open_s: RwLock::new(a2b_steam_open_s), b2a_stream_opened_r: RwLock::new(b2a_stream_opened_r), closed: AtomicBool::new(false), a2s_disconnect_s: Some(a2s_disconnect_s), } } /// Opens a [`Stream`] on this `Participant` with a certain Priority and /// [`Promises`] /// /// # Arguments /// * `prio` - valid between 0-63. The priority rates the throughput for /// messages of the [`Stream`] e.g. prio 5 messages will get 1/2 the speed /// prio0 messages have. Prio10 messages only 1/4 and Prio 15 only 1/8, /// etc... /// * `promises` - use a combination of you prefered [`Promises`], see the /// link for further documentation. You can combine them, e.g. /// `PROMISES_ORDERED | PROMISES_CONSISTENCY` The Stream will then /// guarantee that those promisses are met. /// /// A [`ParticipantError`] might be thrown if the `Participant` is already /// closed. [`Streams`] can be created without a answer from the remote /// side, resulting in very fast creation and closing latency. /// /// # Examples /// ```rust /// use futures::executor::block_on; /// use uvth::ThreadPoolBuilder; /// use veloren_network::{Address, Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port 2100 and open a stream /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// block_on(async { /// # remote.listen(Address::Tcp("0.0.0.0:2100".parse().unwrap())).await?; /// let p1 = network /// .connect(Address::Tcp("127.0.0.1:2100".parse().unwrap())) /// .await?; /// let _s1 = p1.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?; /// # Ok(()) /// }) /// # } /// ``` /// /// [`Streams`]: crate::api::Stream pub async fn open(&self, prio: u8, promises: Promises) -> Result { //use this lock for now to make sure that only one open at a time is made, // TODO: not sure if we can paralise that, check in future let mut a2b_steam_open_s = self.a2b_steam_open_s.write().await; if self.closed.load(Ordering::Relaxed) { warn!(?self.remote_pid, "participant is closed but another open is tried on it"); return Err(ParticipantError::ParticipantClosed); } let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel(); if a2b_steam_open_s .send((prio, promises, p2a_return_stream_s)) .await .is_err() { debug!(?self.remote_pid, "stream_open_sender failed, closing participant"); self.closed.store(true, Ordering::Relaxed); return Err(ParticipantError::ParticipantClosed); } match p2a_return_stream_r.await { Ok(stream) => { let sid = stream.sid; debug!(?sid, ?self.remote_pid, "opened stream"); Ok(stream) }, Err(_) => { debug!(?self.remote_pid, "p2a_return_stream_r failed, closing participant"); self.closed.store(true, Ordering::Relaxed); Err(ParticipantError::ParticipantClosed) }, } } /// Use this method to handle [`Streams`] opened from remote site, like the /// [`connected`] method of [`Network`]. This is the associated method /// to [`open`]. It's guaranteed that the order of [`open`] and `opened` /// is equal. The `nth` [`Streams`] on one side will represent the `nth` on /// the other side. A [`ParticipantError`] might be thrown if the /// `Participant` is already closed. /// /// # Examples /// ```rust /// use veloren_network::{Network, Pid, Address, PROMISES_ORDERED, PROMISES_CONSISTENCY}; /// use uvth::ThreadPoolBuilder; /// use futures::executor::block_on; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port 2110 and wait for the other side to open a stream /// // Note: It's quite unusal to activly connect, but then wait on a stream to be connected, usually the Appication taking initiative want's to also create the first Stream. /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// block_on(async { /// # remote.listen(Address::Tcp("0.0.0.0:2110".parse().unwrap())).await?; /// let p1 = network.connect(Address::Tcp("127.0.0.1:2110".parse().unwrap())).await?; /// # let p2 = remote.connected().await?; /// # p2.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?; /// let _s1 = p1.opened().await?; /// # Ok(()) /// }) /// # } /// ``` /// /// [`Streams`]: crate::api::Stream /// [`connected`]: Network::connected /// [`open`]: Participant::open pub async fn opened(&self) -> Result { //use this lock for now to make sure that only one open at a time is made, // TODO: not sure if we can paralise that, check in future let mut stream_opened_receiver = self.b2a_stream_opened_r.write().await; if self.closed.load(Ordering::Relaxed) { warn!(?self.remote_pid, "participant is closed but another open is tried on it"); return Err(ParticipantError::ParticipantClosed); } match stream_opened_receiver.next().await { Some(stream) => { let sid = stream.sid; debug!(?sid, ?self.remote_pid, "receive opened stream"); Ok(stream) }, None => { debug!(?self.remote_pid, "stream_opened_receiver failed, closing participant"); self.closed.store(true, Ordering::Relaxed); Err(ParticipantError::ParticipantClosed) }, } } /// Returns the remote [`Pid`] pub fn remote_pid(&self) -> Pid { self.remote_pid } } impl Stream { pub(crate) fn new( pid: Pid, sid: Sid, prio: Prio, promises: Promises, a2b_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutGoingMessage)>, b2a_msg_recv_r: mpsc::UnboundedReceiver, closed: Arc, a2b_close_stream_s: mpsc::UnboundedSender, ) -> Self { Self { pid, sid, mid: 0, prio, promises, a2b_msg_s, b2a_msg_recv_r, closed, a2b_close_stream_s: Some(a2b_close_stream_s), } } /// use to send a arbitrary message to the remote side, by having the remote /// side also opened a `Stream` linked to this. the message will be /// [`Serialized`], which actually is quite slow compared to most other /// calculations done. A faster method [`send_raw`] exists, when extra /// speed is needed. The other side needs to use the respective [`recv`] /// function and know the type send. /// /// `send` is an exception to the `async` messages, as it's probably called /// quite often so it doesn't wait for execution. Which also means, that /// no feedback is provided. It's to assume that the Message got `send` /// correctly. If a error occurred, the next call will return an Error. /// If the [`Participant`] disconnected it will also be unable to be used /// any more. A [`StreamError`] will be returned in the error case, e.g. /// when the `Stream` got closed already. /// /// Note when a `Stream` is dropped locally, it will still send all /// messages, though the `drop` will return immediately, however, when a /// [`Participant`] gets gracefully shut down, all remaining messages /// will be send. If the `Stream` is dropped from remote side no further /// messages are send, because the remote side has no way of listening /// to them either way. If the last channel is destroyed (e.g. by losing /// the internet connection or non-gracefull shutdown, pending messages /// are also dropped. /// /// # Example /// ```rust /// use veloren_network::{Network, Address, Pid}; /// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY}; /// use uvth::ThreadPoolBuilder; /// use futures::executor::block_on; /// use tracing::*; /// use tracing_subscriber::EnvFilter; /// /// # fn main() -> std::result::Result<(), Box> { /// /// std::thread::spawn(|| { /// let filter = EnvFilter::from_default_env() /// .add_directive("trace".parse().unwrap()) /// .add_directive("async_std::task::block_on=warn".parse().unwrap()) /// .add_directive("veloren_network::tests=trace".parse().unwrap()) /// .add_directive("veloren_network::controller=trace".parse().unwrap()) /// .add_directive("veloren_network::channel=trace".parse().unwrap()) /// .add_directive("veloren_network::message=trace".parse().unwrap()) /// .add_directive("veloren_network::metrics=trace".parse().unwrap()) /// .add_directive("veloren_network::types=trace".parse().unwrap()); /// let _sub = tracing_subscriber::FmtSubscriber::builder() /// // all spans/events with a level higher than TRACE (e.g, info, warn, etc.) /// // will be written to stdout. /// .with_max_level(Level::TRACE) /// .with_env_filter(filter) /// // sets this to be the default, global subscriber for this application. /// .try_init(); /// /// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World` /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// block_on(async { /// network.listen(Address::Tcp("127.0.0.1:2200".parse().unwrap())).await.unwrap(); /// # let remote_p = remote.connect(Address::Tcp("127.0.0.1:2200".parse().unwrap())).await.unwrap(); /// # remote_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await.unwrap(); /// let participant_a = network.connected().await.unwrap(); /// let mut stream_a = participant_a.opened().await.unwrap(); /// //Send Message /// stream_a.send("Hello World").unwrap(); /// }) /// }); /// /// std::thread::sleep(std::time::Duration::from_secs(70)); /// println!("Sleep another 10s"); /// std::thread::sleep(std::time::Duration::from_secs(10)); /// println!("TRACING THE DEADLOCK"); /// assert!(false); /// /// std::thread::sleep(std::time::Duration::from_secs(150)); /// Ok(()) /// # } /// ``` /// /// [`send_raw`]: Stream::send_raw /// [`recv`]: Stream::recv /// [`Serialized`]: Serialize #[inline] pub fn send(&mut self, msg: M) -> Result<(), StreamError> { self.send_raw(Arc::new(message::serialize(&msg))) } /// This methods give the option to skip multiple calls of [`bincode`], e.g. /// in case the same Message needs to send on multiple `Streams` to multiple /// [`Participants`]. Other then that, the same rules apply than for /// [`send`] /// /// # Example /// ```rust /// use veloren_network::{Network, Address, Pid, MessageBuffer}; /// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY}; /// use futures::executor::block_on; /// use uvth::ThreadPoolBuilder; /// use bincode; /// use std::sync::Arc; /// /// # fn main() -> std::result::Result<(), Box> { /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// # let remote1 = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// # let remote2 = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// block_on(async { /// network.listen(Address::Tcp("127.0.0.1:2210".parse().unwrap())).await?; /// # let remote1_p = remote1.connect(Address::Tcp("127.0.0.1:2210".parse().unwrap())).await?; /// # let remote2_p = remote2.connect(Address::Tcp("127.0.0.1:2210".parse().unwrap())).await?; /// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid()); /// # remote1_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?; /// # remote2_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?; /// let participant_a = network.connected().await?; /// let participant_b = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; /// let mut stream_b = participant_b.opened().await?; /// /// //Prepare Message and decode it /// let msg = "Hello World"; /// let raw_msg = Arc::new(MessageBuffer{ /// data: bincode::serialize(&msg).unwrap(), /// }); /// //Send same Message to multiple Streams /// stream_a.send_raw(raw_msg.clone()); /// stream_b.send_raw(raw_msg.clone()); /// # Ok(()) /// }) /// # } /// ``` /// /// [`send`]: Stream::send /// [`Participants`]: crate::api::Participant pub fn send_raw(&mut self, messagebuffer: Arc) -> Result<(), StreamError> { if self.closed.load(Ordering::Relaxed) { return Err(StreamError::StreamClosed); } //debug!(?messagebuffer, "sending a message"); self.a2b_msg_s.send((self.prio, self.sid, OutGoingMessage { buffer: messagebuffer, cursor: 0, mid: self.mid, sid: self.sid, }))?; self.mid += 1; Ok(()) } /// use `recv` to wait on a Message send from the remote side by their /// `Stream`. The Message needs to implement [`DeserializeOwned`] and /// thus, the resulting type must already be known by the receiving side. /// If this is not know from the Application logic, one could use a `Enum` /// and then handle the received message via a `match` state. /// /// A [`StreamError`] will be returned in the error case, e.g. when the /// `Stream` got closed already. /// /// # Example /// ```rust /// use veloren_network::{Network, Address, Pid}; /// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY}; /// use uvth::ThreadPoolBuilder; /// use futures::executor::block_on; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it /// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None); /// block_on(async { /// network.listen(Address::Tcp("127.0.0.1:2220".parse().unwrap())).await?; /// # let remote_p = remote.connect(Address::Tcp("127.0.0.1:2220".parse().unwrap())).await?; /// # let mut stream_p = remote_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?; /// # stream_p.send("Hello World"); /// let participant_a = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; /// //Send Message /// println!("{}", stream_a.recv::().await?); /// # Ok(()) /// }) /// # } /// ``` #[inline] pub async fn recv(&mut self) -> Result { Ok(message::deserialize(self.recv_raw().await?)) } /// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] is /// executed for performance reasons. /// /// [`send_raw`]: Stream::send_raw /// [`recv`]: Stream::recv pub async fn recv_raw(&mut self) -> Result { //no need to access self.closed here, as when this stream is closed the Channel // is closed which will trigger a None let msg = self.b2a_msg_recv_r.next().await?; //info!(?msg, "delivering a message"); Ok(msg.buffer) } } impl Drop for Network { fn drop(&mut self) { let pid = self.local_pid; debug!(?pid, "shutting down Network"); debug!( ?pid, "shutting down Participants of Network, while we still have metrics" ); task::block_on(async { // we need to carefully shut down here! as otherwise we might call // Participant::Drop with a2s_disconnect_s here which would open // another task::block, which would panic! also i can't `.write` on // `self.participants` as the `disconnect` fn needs it. let mut participant_clone = self.participants().await; for (_, p) in participant_clone.drain() { match self.disconnect(p).await { Err(e) => { error!( ?e, "error while dropping network, the error occured when dropping a \ participant but can't be notified to the user any more" ); }, _ => (), } } self.participants.write().await.clear(); }); debug!(?pid, "shutting down Scheduler"); self.shutdown_sender .take() .unwrap() .send(()) .expect("scheduler is closed, but nobody other should be able to close it"); debug!(?pid, "participants have shut down!"); } } impl Drop for Participant { fn drop(&mut self) { // ignore closed, as we need to send it even though we disconnected the // participant from network let pid = self.remote_pid; debug!(?pid, "shutting down Participant"); match self.a2s_disconnect_s.take() { None => debug!( ?pid, "Participant has been shutdown cleanly, no further waiting is requiered!" ), Some(mut a2s_disconnect_s) => { debug!( ?pid, "unclean shutdown detected, active waiting for client to be disconnected" ); task::block_on(async { let (finished_sender, finished_receiver) = oneshot::channel(); a2s_disconnect_s .send((self.remote_pid, finished_sender)) .await .expect("something is wrong in internal scheduler coding"); match finished_receiver.await { Ok(Err(e)) => error!( ?pid, ?e, "Error while dropping the participant, couldn't send all outgoing \ messages, dropping remaining" ), Err(e) => warn!( ?e, "//TODO i dont know why the finish doesnt work, i normally would \ expect to have sended a return message from the participant... \ ignoring to not caue a panic for now, please fix me" ), _ => (), }; }); }, } debug!(?pid, "network dropped"); } } impl Drop for Stream { fn drop(&mut self) { // a send if closed is unecessary but doesnt hurt, we must not crash here if !self.closed.load(Ordering::Relaxed) { let sid = self.sid; let pid = self.pid; debug!(?pid, ?sid, "shutting down Stream"); if task::block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid)).is_err() { warn!( "Other side got already dropped, probably due to timing, other side will \ handle this gracefully" ); }; } else { let sid = self.sid; let pid = self.pid; debug!(?pid, ?sid, "not needed"); } } } impl std::fmt::Debug for Participant { #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let status = if self.closed.load(Ordering::Relaxed) { "[CLOSED]" } else { "[OPEN]" }; write!( f, "Participant {{{} local_pid: {:?}, remote_pid: {:?} }}", status, &self.local_pid, &self.remote_pid, ) } } impl From> for StreamError { fn from(_err: std::sync::mpsc::SendError) -> Self { StreamError::StreamClosed } } impl From> for ParticipantError { fn from(_err: std::sync::mpsc::SendError) -> Self { ParticipantError::ParticipantClosed } } impl From> for NetworkError { fn from(_err: std::sync::mpsc::SendError) -> Self { NetworkError::NetworkClosed } } impl From for NetworkError { fn from(err: async_std::io::Error) -> Self { NetworkError::ListenFailed(err) } } impl From for StreamError { fn from(_err: std::option::NoneError) -> Self { StreamError::StreamClosed } } impl From for ParticipantError { fn from(_err: std::option::NoneError) -> Self { ParticipantError::ParticipantClosed } } impl From for NetworkError { fn from(_err: std::option::NoneError) -> Self { NetworkError::NetworkClosed } } impl From for ParticipantError { fn from(_err: mpsc::SendError) -> Self { ParticipantError::ParticipantClosed } } impl From for NetworkError { fn from(_err: mpsc::SendError) -> Self { NetworkError::NetworkClosed } } impl From for ParticipantError { fn from(_err: oneshot::Canceled) -> Self { ParticipantError::ParticipantClosed } } impl From for NetworkError { fn from(_err: oneshot::Canceled) -> Self { NetworkError::NetworkClosed } } impl core::fmt::Display for StreamError { fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { match self { StreamError::StreamClosed => write!(f, "stream closed"), } } } impl core::fmt::Display for ParticipantError { fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { match self { ParticipantError::ParticipantClosed => write!(f, "participant closed"), } } } impl core::fmt::Display for NetworkError { fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { match self { NetworkError::NetworkClosed => write!(f, "network closed"), NetworkError::ListenFailed(_) => write!(f, "listening failed"), } } } impl std::error::Error for StreamError {} impl std::error::Error for ParticipantError {} impl std::error::Error for NetworkError {}