2020-01-13 16:53:28 +00:00
use crate ::{
2021-01-22 16:09:20 +00:00
message ::{ partial_eq_bincode , Message } ,
2020-07-14 22:18:04 +00:00
participant ::{ A2bStreamOpen , S2bShutdownBparticipant } ,
2021-02-21 23:48:30 +00:00
scheduler ::{ A2sConnect , Scheduler } ,
2020-01-13 16:53:28 +00:00
} ;
2021-02-14 17:45:12 +00:00
use bytes ::Bytes ;
2021-05-04 13:27:30 +00:00
use hashbrown ::HashMap ;
2020-08-25 13:32:42 +00:00
#[ cfg(feature = " compression " ) ]
use lz_fear ::raw ::DecodeError ;
2021-02-21 23:48:30 +00:00
use network_protocol ::{ Bandwidth , InitProtocolError , Pid , Prio , Promises , Sid } ;
2020-07-14 23:34:41 +00:00
#[ cfg(feature = " metrics " ) ]
2020-04-24 10:56:04 +00:00
use prometheus ::Registry ;
2020-03-22 13:47:21 +00:00
use serde ::{ de ::DeserializeOwned , Serialize } ;
2020-02-04 15:42:04 +00:00
use std ::{
2020-05-26 13:06:03 +00:00
net ::SocketAddr ,
2020-03-22 13:47:21 +00:00
sync ::{
2020-05-22 14:00:08 +00:00
atomic ::{ AtomicBool , Ordering } ,
2020-03-22 13:47:21 +00:00
Arc ,
} ,
2021-01-22 16:09:20 +00:00
time ::Duration ,
2020-02-04 15:42:04 +00:00
} ;
2021-01-15 13:04:32 +00:00
use tokio ::{
io ,
runtime ::Runtime ,
2021-03-25 17:28:50 +00:00
sync ::{ mpsc , oneshot , watch , Mutex } ,
2021-01-15 13:04:32 +00:00
} ;
2020-01-13 16:53:28 +00:00
use tracing ::* ;
2019-12-20 13:56:01 +00:00
2020-07-14 22:18:04 +00:00
type A2sDisconnect = Arc < Mutex < Option < mpsc ::UnboundedSender < ( Pid , S2bShutdownBparticipant ) > > > > ;
2020-07-09 07:58:21 +00:00
2021-04-15 08:16:42 +00:00
/// Represents a Tcp, Quic, Udp or Mpsc connection address
#[ derive(Clone, Debug) ]
pub enum ConnectAddr {
Tcp ( SocketAddr ) ,
Udp ( SocketAddr ) ,
#[ cfg(feature = " quic " ) ]
Quic ( SocketAddr , quinn ::ClientConfig , String ) ,
Mpsc ( u64 ) ,
}
/// Represents a Tcp, Quic, Udp or Mpsc listen address
#[ derive(Clone, Debug) ]
pub enum ListenAddr {
2020-05-26 13:06:03 +00:00
Tcp ( SocketAddr ) ,
Udp ( SocketAddr ) ,
2021-04-11 21:37:48 +00:00
#[ cfg(feature = " quic " ) ]
Quic ( SocketAddr , quinn ::ServerConfig ) ,
2020-03-10 00:07:36 +00:00
Mpsc ( u64 ) ,
2019-12-20 13:56:01 +00:00
}
2020-05-10 02:07:46 +00:00
/// `Participants` are generated by the [`Network`] and represent a connection
/// to a remote Participant. Look at the [`connect`] and [`connected`] method of
/// [`Networks`] on how to generate `Participants`
///
/// [`Networks`]: crate::api::Network
/// [`connect`]: Network::connect
/// [`connected`]: Network::connected
2019-12-20 13:56:01 +00:00
pub struct Participant {
2020-03-22 13:47:21 +00:00
local_pid : Pid ,
2020-02-10 17:25:47 +00:00
remote_pid : Pid ,
2021-01-22 16:09:20 +00:00
a2b_open_stream_s : Mutex < mpsc ::UnboundedSender < A2bStreamOpen > > ,
2020-08-23 19:43:17 +00:00
b2a_stream_opened_r : Mutex < mpsc ::UnboundedReceiver < Stream > > ,
2021-03-25 17:28:50 +00:00
b2a_bandwidth_stats_r : watch ::Receiver < f32 > ,
2020-07-14 22:18:04 +00:00
a2s_disconnect_s : A2sDisconnect ,
2019-12-20 13:56:01 +00:00
}
2020-05-10 02:07:46 +00:00
/// `Streams` represents a channel to send `n` messages with a certain priority
/// and [`Promises`]. messages need always to be send between 2 `Streams`.
///
/// `Streams` are generated by the [`Participant`].
/// Look at the [`open`] and [`opened`] method of [`Participant`] on how to
/// generate `Streams`
///
/// Unlike [`Network`] and [`Participant`], `Streams` don't implement interior
/// mutability, as multiple threads don't need access to the same `Stream`.
///
/// [`Networks`]: crate::api::Network
/// [`open`]: Participant::open
/// [`opened`]: Participant::opened
2020-03-22 13:47:21 +00:00
#[ derive(Debug) ]
2020-02-04 15:42:04 +00:00
pub struct Stream {
2021-02-10 10:37:42 +00:00
local_pid : Pid ,
remote_pid : Pid ,
2020-02-04 15:42:04 +00:00
sid : Sid ,
2020-03-22 13:47:21 +00:00
prio : Prio ,
promises : Promises ,
2021-01-22 16:09:20 +00:00
guaranteed_bandwidth : Bandwidth ,
2020-07-16 19:39:33 +00:00
send_closed : Arc < AtomicBool > ,
2021-02-14 17:45:12 +00:00
a2b_msg_s : crossbeam_channel ::Sender < ( Sid , Bytes ) > ,
b2a_msg_recv_r : Option < async_channel ::Receiver < Bytes > > ,
2020-05-15 12:29:17 +00:00
a2b_close_stream_s : Option < mpsc ::UnboundedSender < Sid > > ,
2020-02-04 15:42:04 +00:00
}
2019-12-20 13:56:01 +00:00
2020-05-10 02:07:46 +00:00
/// Error type thrown by [`Networks`](Network) methods
2020-04-08 14:26:42 +00:00
#[ derive(Debug) ]
pub enum NetworkError {
NetworkClosed ,
ListenFailed ( std ::io ::Error ) ,
2021-02-21 23:48:30 +00:00
ConnectFailed ( NetworkConnectError ) ,
}
/// Error type thrown by [`Networks`](Network) connect
#[ derive(Debug) ]
pub enum NetworkConnectError {
/// Either a Pid UUID clash or you are trying to hijack a connection
InvalidSecret ,
Handshake ( InitProtocolError ) ,
Io ( std ::io ::Error ) ,
2020-04-08 14:26:42 +00:00
}
2020-03-22 13:47:21 +00:00
2020-05-10 02:07:46 +00:00
/// Error type thrown by [`Participants`](Participant) methods
2020-07-11 12:34:01 +00:00
#[ derive(Debug, PartialEq, Clone) ]
2020-04-08 14:26:42 +00:00
pub enum ParticipantError {
2020-07-11 12:34:01 +00:00
///Participant was closed by remote side
ParticipantDisconnected ,
///Underlying Protocol failed and wasn't able to recover, expect some Data
/// loss unfortunately, there is no method to get the exact messages
/// that failed. This is also returned when local side tries to do
/// something while remote site gracefully disconnects
ProtocolFailedUnrecoverable ,
2020-04-08 14:26:42 +00:00
}
2020-03-22 13:47:21 +00:00
2020-05-10 02:07:46 +00:00
/// Error type thrown by [`Streams`](Stream) methods
2020-08-25 13:32:42 +00:00
/// A Compression Error should only happen if a client sends malicious code.
/// A Deserialize Error probably means you are expecting Type X while you
/// actually got send type Y.
2020-06-08 09:47:39 +00:00
#[ derive(Debug) ]
2020-04-08 14:26:42 +00:00
pub enum StreamError {
StreamClosed ,
2020-08-25 13:32:42 +00:00
#[ cfg(feature = " compression " ) ]
Compression ( DecodeError ) ,
Deserialize ( bincode ::Error ) ,
2020-04-08 14:26:42 +00:00
}
2020-03-22 13:47:21 +00:00
2021-03-25 11:22:31 +00:00
/// All Parameters of a Stream, can be used to generate RawMessages
#[ derive(Debug, Clone) ]
pub struct StreamParams {
pub ( crate ) promises : Promises ,
}
2020-05-10 02:07:46 +00:00
/// Use the `Network` to create connections to other [`Participants`]
///
/// The `Network` is the single source that handles all connections in your
/// Application. You can pass it around multiple threads in an
/// [`Arc`](std::sync::Arc) as all commands have internal mutability.
///
2020-07-09 07:58:21 +00:00
/// The `Network` has methods to [`connect`] to other [`Participants`] actively
2021-04-27 15:59:36 +00:00
/// via their [`ConnectAddr`], or [`listen`] passively for [`connected`]
/// [`Participants`] via [`ListenAddr`].
2020-05-10 02:07:46 +00:00
///
2021-03-03 09:39:21 +00:00
/// Too guarantee a clean shutdown, the [`Runtime`] MUST NOT be droped before
/// the Network.
///
2020-05-10 02:07:46 +00:00
/// # Examples
/// ```rust
2021-02-14 17:45:12 +00:00
/// use tokio::runtime::Runtime;
2021-04-15 08:16:42 +00:00
/// use veloren_network::{Network, ConnectAddr, ListenAddr, Pid};
2020-05-10 02:07:46 +00:00
///
2020-05-26 13:06:03 +00:00
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
2020-05-27 15:58:57 +00:00
/// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application
2021-03-03 09:39:21 +00:00
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
2021-02-14 17:45:12 +00:00
/// runtime.block_on(async{
2020-05-26 13:06:03 +00:00
/// # //setup pseudo database!
2021-03-03 09:39:21 +00:00
/// # let database = Network::new(Pid::new(), &runtime);
2021-04-15 08:16:42 +00:00
/// # database.listen(ListenAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
/// network.listen(ListenAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?;
/// let database = network.connect(ConnectAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
2021-03-03 09:39:21 +00:00
/// drop(network);
/// # drop(database);
2020-05-26 13:06:03 +00:00
/// # Ok(())
/// })
/// # }
2020-05-10 02:07:46 +00:00
/// ```
///
/// [`Participants`]: crate::api::Participant
2021-03-03 09:39:21 +00:00
/// [`Runtime`]: tokio::runtime::Runtime
2020-05-10 02:07:46 +00:00
/// [`connect`]: Network::connect
2020-07-09 07:58:21 +00:00
/// [`listen`]: Network::listen
/// [`connected`]: Network::connected
2021-04-27 15:59:36 +00:00
/// [`ConnectAddr`]: crate::api::ConnectAddr
/// [`ListenAddr`]: crate::api::ListenAddr
2020-02-21 15:10:55 +00:00
pub struct Network {
2020-03-22 13:47:21 +00:00
local_pid : Pid ,
2021-03-03 09:39:21 +00:00
participant_disconnect_sender : Arc < Mutex < HashMap < Pid , A2sDisconnect > > > ,
2021-04-15 08:16:42 +00:00
listen_sender : Mutex < mpsc ::UnboundedSender < ( ListenAddr , oneshot ::Sender < io ::Result < ( ) > > ) > > ,
2021-02-21 23:48:30 +00:00
connect_sender : Mutex < mpsc ::UnboundedSender < A2sConnect > > ,
2020-08-23 19:43:17 +00:00
connected_receiver : Mutex < mpsc ::UnboundedReceiver < Participant > > ,
2021-03-03 09:39:21 +00:00
shutdown_network_s : Option < oneshot ::Sender < oneshot ::Sender < ( ) > > > ,
2019-12-20 13:56:01 +00:00
}
2020-02-21 15:10:55 +00:00
impl Network {
2020-05-10 02:07:46 +00:00
/// Generates a new `Network` to handle all connections in an Application
///
/// # Arguments
/// * `participant_id` - provide it by calling [`Pid::new()`], usually you
/// don't want to reuse a Pid for 2 `Networks`
2021-03-03 09:39:21 +00:00
/// * `runtime` - provide a [`Runtime`], it's used to internally spawn
2021-01-19 08:48:33 +00:00
/// tasks. It is necessary to clean up in the non-async `Drop`. **All**
/// network related components **must** be dropped before the runtime is
/// stopped. dropping the runtime while a shutdown is still in progress
/// leaves the network in a bad state which might cause a panic!
2020-05-10 02:07:46 +00:00
///
2020-06-08 09:47:39 +00:00
/// # Result
/// * `Self` - returns a `Network` which can be `Send` to multiple areas of
/// your code, including multiple threads. This is the base strct of this
/// crate.
///
2020-05-10 02:07:46 +00:00
/// # Examples
/// ```rust
2021-01-15 13:04:32 +00:00
/// use tokio::runtime::Runtime;
2021-04-15 08:16:42 +00:00
/// use veloren_network::{Network, Pid};
2020-05-10 02:07:46 +00:00
///
2021-02-14 17:45:12 +00:00
/// let runtime = Runtime::new().unwrap();
2021-03-03 09:39:21 +00:00
/// let network = Network::new(Pid::new(), &runtime);
2020-05-10 02:07:46 +00:00
/// ```
///
2020-07-14 23:34:41 +00:00
/// Usually you only create a single `Network` for an application,
2020-06-08 09:47:39 +00:00
/// except when client and server are in the same application, then you
/// will want 2. However there are no technical limitations from
/// creating more.
2020-05-10 02:07:46 +00:00
///
2021-02-14 17:45:12 +00:00
/// [`Pid::new()`]: network_protocol::Pid::new
2021-03-03 09:39:21 +00:00
/// [`Runtime`]: tokio::runtime::Runtime
pub fn new ( participant_id : Pid , runtime : & Runtime ) -> Self {
2020-07-14 23:34:41 +00:00
Self ::internal_new (
participant_id ,
2021-01-13 13:16:22 +00:00
runtime ,
2020-07-14 23:34:41 +00:00
#[ cfg(feature = " metrics " ) ]
None ,
)
}
/// See [`new`]
///
/// # additional Arguments
2020-08-25 12:21:25 +00:00
/// * `registry` - Provide a Registry in order to collect Prometheus metrics
2020-07-14 23:34:41 +00:00
/// by this `Network`, `None` will deactivate Tracing. Tracing is done via
/// [`prometheus`]
///
/// # Examples
/// ```rust
/// use prometheus::Registry;
2021-02-14 17:45:12 +00:00
/// use tokio::runtime::Runtime;
2021-04-15 08:16:42 +00:00
/// use veloren_network::{Network, Pid};
2020-07-14 23:34:41 +00:00
///
2021-02-14 17:45:12 +00:00
/// let runtime = Runtime::new().unwrap();
2020-07-14 23:34:41 +00:00
/// let registry = Registry::new();
2021-03-03 09:39:21 +00:00
/// let network = Network::new_with_registry(Pid::new(), &runtime, ®istry);
2020-07-14 23:34:41 +00:00
/// ```
/// [`new`]: crate::api::Network::new
#[ cfg(feature = " metrics " ) ]
2021-03-03 09:39:21 +00:00
pub fn new_with_registry ( participant_id : Pid , runtime : & Runtime , registry : & Registry ) -> Self {
2021-01-13 13:16:22 +00:00
Self ::internal_new ( participant_id , runtime , Some ( registry ) )
2020-07-14 23:34:41 +00:00
}
fn internal_new (
2020-06-08 09:47:39 +00:00
participant_id : Pid ,
2021-03-03 09:39:21 +00:00
runtime : & Runtime ,
2020-07-14 23:34:41 +00:00
#[ cfg(feature = " metrics " ) ] registry : Option < & Registry > ,
2021-01-13 13:16:22 +00:00
) -> Self {
2020-03-22 13:47:21 +00:00
let p = participant_id ;
2021-02-10 10:37:42 +00:00
let span = tracing ::info_span! ( " network " , ? p ) ;
span . in_scope ( | | trace! ( " Starting Network " ) ) ;
2020-03-22 13:47:21 +00:00
let ( scheduler , listen_sender , connect_sender , connected_receiver , shutdown_sender ) =
2020-07-14 23:34:41 +00:00
Scheduler ::new (
participant_id ,
#[ cfg(feature = " metrics " ) ]
registry ,
) ;
2021-03-03 09:39:21 +00:00
let participant_disconnect_sender = Arc ::new ( Mutex ::new ( HashMap ::new ( ) ) ) ;
let ( shutdown_network_s , shutdown_network_r ) = oneshot ::channel ( ) ;
let f = Self ::shutdown_mgr (
p ,
shutdown_network_r ,
Arc ::clone ( & participant_disconnect_sender ) ,
shutdown_sender ,
) ;
runtime . spawn ( f ) ;
2021-02-10 10:37:42 +00:00
runtime . spawn (
async move {
trace! ( " Starting scheduler in own thread " ) ;
scheduler . run ( ) . await ;
trace! ( " Stopping scheduler and his own thread " ) ;
}
. instrument ( tracing ::info_span! ( " network " , ? p ) ) ,
) ;
2021-01-13 13:16:22 +00:00
Self {
local_pid : participant_id ,
2021-03-03 09:39:21 +00:00
participant_disconnect_sender ,
2021-01-13 13:16:22 +00:00
listen_sender : Mutex ::new ( listen_sender ) ,
connect_sender : Mutex ::new ( connect_sender ) ,
connected_receiver : Mutex ::new ( connected_receiver ) ,
2021-03-03 09:39:21 +00:00
shutdown_network_s : Some ( shutdown_network_s ) ,
2021-01-13 13:16:22 +00:00
}
2019-12-20 13:56:01 +00:00
}
2021-04-27 15:59:36 +00:00
/// starts listening on an [`ListenAddr`].
2020-05-10 02:07:46 +00:00
/// When the method returns the `Network` is ready to listen for incoming
/// connections OR has returned a [`NetworkError`] (e.g. port already used).
/// You can call [`connected`] to asynchrony wait for a [`Participant`] to
/// connect. You can call `listen` on multiple addresses, e.g. to
/// support multiple Protocols or NICs.
///
/// # Examples
2021-02-14 17:45:12 +00:00
/// ```ignore
/// use tokio::runtime::Runtime;
2021-04-27 15:59:36 +00:00
/// use veloren_network::{Network, Pid, ListenAddr};
2020-05-10 02:07:46 +00:00
///
2020-05-26 13:06:03 +00:00
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
2020-05-10 02:07:46 +00:00
/// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally
2021-03-03 09:39:21 +00:00
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
2021-02-14 17:45:12 +00:00
/// runtime.block_on(async {
2020-05-10 02:07:46 +00:00
/// network
2021-04-27 15:59:36 +00:00
/// .listen(ListenAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
2020-05-10 02:07:46 +00:00
/// .await?;
/// network
2021-04-27 15:59:36 +00:00
/// .listen(ListenAddr::Udp("127.0.0.1:2001".parse().unwrap()))
2020-05-10 02:07:46 +00:00
/// .await?;
2021-03-03 09:39:21 +00:00
/// drop(network);
2020-05-26 13:06:03 +00:00
/// # Ok(())
/// })
/// # }
2020-05-10 02:07:46 +00:00
/// ```
///
/// [`connected`]: Network::connected
2021-04-27 15:59:36 +00:00
/// [`ListenAddr`]: crate::api::ListenAddr
2021-02-10 10:37:42 +00:00
#[ instrument(name= " network " , skip(self, address), fields(p = %self.local_pid)) ]
2021-04-15 08:16:42 +00:00
pub async fn listen ( & self , address : ListenAddr ) -> Result < ( ) , NetworkError > {
2021-01-13 13:16:22 +00:00
let ( s2a_result_s , s2a_result_r ) = oneshot ::channel ::< tokio ::io ::Result < ( ) > > ( ) ;
2020-05-15 12:29:17 +00:00
debug! ( ? address , " listening on address " ) ;
2020-04-08 14:26:42 +00:00
self . listen_sender
2020-08-23 19:43:17 +00:00
. lock ( )
2020-04-08 14:26:42 +00:00
. await
2021-01-15 13:04:32 +00:00
. send ( ( address , s2a_result_s ) ) ? ;
2020-05-15 12:29:17 +00:00
match s2a_result_r . await ? {
2020-08-25 12:21:25 +00:00
//waiting guarantees that we either listened successfully or get an error like port in
2020-04-08 14:26:42 +00:00
// use
Ok ( ( ) ) = > Ok ( ( ) ) ,
Err ( e ) = > Err ( NetworkError ::ListenFailed ( e ) ) ,
}
2019-12-20 13:56:01 +00:00
}
2021-04-27 15:59:36 +00:00
/// starts connection to an [`ConnectAddr`].
2020-05-10 02:07:46 +00:00
/// When the method returns the Network either returns a [`Participant`]
/// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g.
/// can't connect, or invalid Handshake) # Examples
2021-02-14 17:45:12 +00:00
/// ```ignore
/// use tokio::runtime::Runtime;
2021-04-27 15:59:36 +00:00
/// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr};
2020-05-10 02:07:46 +00:00
///
2020-05-26 13:06:03 +00:00
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
2020-05-27 15:58:57 +00:00
/// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above
2021-03-03 09:39:21 +00:00
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
2021-02-14 17:45:12 +00:00
/// runtime.block_on(async {
2021-04-27 15:59:36 +00:00
/// # remote.listen(ListenAddr::Tcp("127.0.0.1:2010".parse().unwrap())).await?;
/// # remote.listen(ListenAddr::Udp("127.0.0.1:2011".parse().unwrap())).await?;
2020-05-10 02:07:46 +00:00
/// let p1 = network
2021-04-27 15:59:36 +00:00
/// .connect(ConnectAddr::Tcp("127.0.0.1:2010".parse().unwrap()))
2020-05-10 02:07:46 +00:00
/// .await?;
2020-05-26 13:06:03 +00:00
/// # //this doesn't work yet, so skip the test
/// # //TODO fixme!
/// # return Ok(());
2020-05-10 02:07:46 +00:00
/// let p2 = network
2021-04-27 15:59:36 +00:00
/// .connect(ConnectAddr::Udp("127.0.0.1:2011".parse().unwrap()))
2020-05-10 02:07:46 +00:00
/// .await?;
2020-07-09 07:58:21 +00:00
/// assert_eq!(&p1, &p2);
2020-05-26 13:06:03 +00:00
/// # Ok(())
2021-03-03 09:39:21 +00:00
/// })?;
/// drop(network);
/// # drop(remote);
/// # Ok(())
2020-05-26 13:06:03 +00:00
/// # }
2020-05-10 02:07:46 +00:00
/// ```
/// Usually the `Network` guarantees that a operation on a [`Participant`]
/// succeeds, e.g. by automatic retrying unless it fails completely e.g. by
2021-04-27 15:59:36 +00:00
/// disconnecting from the remote. If 2 [`ConnectAddr] you
2021-04-15 08:16:42 +00:00
/// `connect` to belongs to the same [`Participant`], you get the same
/// [`Participant`] as a result. This is useful e.g. by connecting to
/// the same [`Participant`] via multiple Protocols.
2020-05-10 02:07:46 +00:00
///
/// [`Streams`]: crate::api::Stream
2021-04-27 15:59:36 +00:00
/// [`ConnectAddr`]: crate::api::ConnectAddr
2021-02-10 10:37:42 +00:00
#[ instrument(name= " network " , skip(self, address), fields(p = %self.local_pid)) ]
2021-04-15 08:16:42 +00:00
pub async fn connect ( & self , address : ConnectAddr ) -> Result < Participant , NetworkError > {
2021-02-21 23:48:30 +00:00
let ( pid_sender , pid_receiver ) =
oneshot ::channel ::< Result < Participant , NetworkConnectError > > ( ) ;
2020-07-05 22:13:53 +00:00
debug! ( ? address , " Connect to address " ) ;
2020-03-22 13:47:21 +00:00
self . connect_sender
2020-08-23 19:43:17 +00:00
. lock ( )
2020-03-22 13:47:21 +00:00
. await
2021-01-15 13:04:32 +00:00
. send ( ( address , pid_sender ) ) ? ;
2020-07-04 10:17:33 +00:00
let participant = match pid_receiver . await ? {
Ok ( p ) = > p ,
Err ( e ) = > return Err ( NetworkError ::ConnectFailed ( e ) ) ,
} ;
2021-02-10 10:37:42 +00:00
let remote_pid = participant . remote_pid ;
trace! ( ? remote_pid , " connected " ) ;
2020-07-09 07:58:21 +00:00
self . participant_disconnect_sender
2020-08-23 19:43:17 +00:00
. lock ( )
2020-03-22 13:47:21 +00:00
. await
2021-02-10 10:37:42 +00:00
. insert ( remote_pid , Arc ::clone ( & participant . a2s_disconnect_s ) ) ;
2020-04-08 14:26:42 +00:00
Ok ( participant )
2020-03-22 13:47:21 +00:00
}
2020-03-10 00:07:36 +00:00
2021-04-27 15:59:36 +00:00
/// returns a [`Participant`] created from a [`ListenAddr`] you
2021-04-15 08:16:42 +00:00
/// called [`listen`] on before. This function will either return a
/// working [`Participant`] ready to open [`Streams`] on OR has returned
/// a [`NetworkError`] (e.g. Network got closed)
2020-05-10 02:07:46 +00:00
///
/// # Examples
/// ```rust
2021-02-14 17:45:12 +00:00
/// use tokio::runtime::Runtime;
2021-04-15 08:16:42 +00:00
/// use veloren_network::{ConnectAddr, ListenAddr, Network, Pid};
2020-05-10 02:07:46 +00:00
///
2020-05-26 13:06:03 +00:00
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
2020-05-27 15:58:57 +00:00
/// // Create a Network, listen on port `2020` TCP and opens returns their Pid
2021-03-03 09:39:21 +00:00
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
2021-02-14 17:45:12 +00:00
/// runtime.block_on(async {
2020-05-10 02:07:46 +00:00
/// network
2021-04-15 08:16:42 +00:00
/// .listen(ListenAddr::Tcp("127.0.0.1:2020".parse().unwrap()))
2020-05-10 02:07:46 +00:00
/// .await?;
2021-04-15 08:16:42 +00:00
/// # remote.connect(ConnectAddr::Tcp("127.0.0.1:2020".parse().unwrap())).await?;
2020-05-26 13:06:03 +00:00
/// while let Ok(participant) = network.connected().await {
2020-05-10 02:07:46 +00:00
/// println!("Participant connected: {}", participant.remote_pid());
2020-05-26 13:06:03 +00:00
/// # //skip test here as it would be a endless loop
/// # break;
2020-05-10 02:07:46 +00:00
/// }
2021-03-03 09:39:21 +00:00
/// drop(network);
/// # drop(remote);
2020-05-26 13:06:03 +00:00
/// # Ok(())
/// })
/// # }
2020-05-10 02:07:46 +00:00
/// ```
///
/// [`Streams`]: crate::api::Stream
/// [`listen`]: crate::api::Network::listen
2021-04-27 15:59:36 +00:00
/// [`ListenAddr`]: crate::api::ListenAddr
2021-02-10 10:37:42 +00:00
#[ instrument(name= " network " , skip(self), fields(p = %self.local_pid)) ]
2020-07-09 07:58:21 +00:00
pub async fn connected ( & self ) -> Result < Participant , NetworkError > {
2021-01-15 13:04:32 +00:00
let participant = self . connected_receiver . lock ( ) . await . recv ( ) . await ? ;
2020-09-27 16:20:40 +00:00
self . participant_disconnect_sender . lock ( ) . await . insert (
participant . remote_pid ,
Arc ::clone ( & participant . a2s_disconnect_s ) ,
) ;
2020-04-08 14:26:42 +00:00
Ok ( participant )
2019-12-20 13:56:01 +00:00
}
2021-03-03 09:39:21 +00:00
/// Use a mgr to handle shutdown smoothly and not in `Drop`
#[ instrument(name= " network " , skip(participant_disconnect_sender, shutdown_scheduler_s), fields(p = %local_pid)) ]
async fn shutdown_mgr (
local_pid : Pid ,
shutdown_network_r : oneshot ::Receiver < oneshot ::Sender < ( ) > > ,
participant_disconnect_sender : Arc < Mutex < HashMap < Pid , A2sDisconnect > > > ,
shutdown_scheduler_s : oneshot ::Sender < ( ) > ,
) {
trace! ( " waiting for shutdown triggerNetwork " ) ;
let return_s = shutdown_network_r . await ;
trace! ( " Shutting down Participants of Network " ) ;
let mut finished_receiver_list = vec! [ ] ;
for ( remote_pid , a2s_disconnect_s ) in participant_disconnect_sender . lock ( ) . await . drain ( ) {
match a2s_disconnect_s . lock ( ) . await . take ( ) {
Some ( a2s_disconnect_s ) = > {
trace! ( ? remote_pid , " Participants will be closed " ) ;
let ( finished_sender , finished_receiver ) = oneshot ::channel ( ) ;
finished_receiver_list . push ( ( remote_pid , finished_receiver ) ) ;
Added non-admin moderators and timed bans.
The security model has been updated to reflect this change (for example,
moderators cannot revert a ban by an administrator). Ban history is
also now recorded in the ban file, and much more information about the
ban is stored (whitelists and administrators also have extra
information).
To support the new information without losing important information,
this commit also introduces a new migration path for editable settings
(both from legacy to the new format, and between versions). Examples
of how to do this correctly, and migrate to new versions of a settings
file, are in the settings/ subdirectory.
As part of this effort, editable settings have been revamped to
guarantee atomic saves (due to the increased amount of information in
each file), some latent bugs in networking were fixed, and server-cli
has been updated to go through StructOpt for both calls through TUI
and argv, greatly simplifying parsing logic.
2021-05-08 18:22:21 +00:00
// If the channel was already dropped, we can assume that the other side
// already released its resources.
let _ = a2s_disconnect_s
. send ( ( remote_pid , ( Duration ::from_secs ( 10 ) , finished_sender ) ) ) ;
2021-03-03 09:39:21 +00:00
} ,
None = > trace! ( ? remote_pid , " Participant already disconnected gracefully " ) ,
}
}
//wait after close is requested for all
for ( remote_pid , finished_receiver ) in finished_receiver_list . drain ( .. ) {
match finished_receiver . await {
Ok ( Ok ( ( ) ) ) = > trace! ( ? remote_pid , " disconnect successful " ) ,
Ok ( Err ( e ) ) = > info! ( ? remote_pid , ? e , " unclean disconnect " ) ,
Err ( e ) = > warn! (
? remote_pid ,
? e ,
" Failed to get a message back from the scheduler, seems like the network is \
already closed "
) ,
}
}
trace! ( " Participants have shut down - next: Scheduler " ) ;
2021-04-03 14:58:09 +00:00
if let Err ( ( ) ) = shutdown_scheduler_s . send ( ( ) ) {
error! ( " Scheduler is closed, but nobody other should be able to close it " )
} ;
2021-03-03 09:39:21 +00:00
if let Ok ( return_s ) = return_s {
if return_s . send ( ( ) ) . is_err ( ) {
warn! ( " Network::drop stoped after a timeout and didn't wait for our shutdown " ) ;
} ;
}
debug! ( " Network has shut down " ) ;
}
2020-04-08 14:26:42 +00:00
}
2020-02-10 17:25:47 +00:00
2020-03-22 13:47:21 +00:00
impl Participant {
pub ( crate ) fn new (
local_pid : Pid ,
remote_pid : Pid ,
2021-01-22 16:09:20 +00:00
a2b_open_stream_s : mpsc ::UnboundedSender < A2bStreamOpen > ,
2020-05-15 12:29:17 +00:00
b2a_stream_opened_r : mpsc ::UnboundedReceiver < Stream > ,
2021-03-25 17:28:50 +00:00
b2a_bandwidth_stats_r : watch ::Receiver < f32 > ,
2020-07-14 22:18:04 +00:00
a2s_disconnect_s : mpsc ::UnboundedSender < ( Pid , S2bShutdownBparticipant ) > ,
2020-03-22 13:47:21 +00:00
) -> Self {
Self {
local_pid ,
remote_pid ,
2021-01-22 16:09:20 +00:00
a2b_open_stream_s : Mutex ::new ( a2b_open_stream_s ) ,
2020-08-23 19:43:17 +00:00
b2a_stream_opened_r : Mutex ::new ( b2a_stream_opened_r ) ,
2021-03-25 17:28:50 +00:00
b2a_bandwidth_stats_r ,
2020-07-10 13:31:26 +00:00
a2s_disconnect_s : Arc ::new ( Mutex ::new ( Some ( a2s_disconnect_s ) ) ) ,
2020-03-04 15:52:30 +00:00
}
2020-02-10 17:25:47 +00:00
}
2020-05-10 02:07:46 +00:00
/// Opens a [`Stream`] on this `Participant` with a certain Priority and
/// [`Promises`]
///
/// # Arguments
2021-02-14 17:45:12 +00:00
/// * `prio` - defines which stream is processed first when limited on
/// bandwidth. See [`Prio`] for documentation.
2020-05-10 02:07:46 +00:00
/// * `promises` - use a combination of you prefered [`Promises`], see the
/// link for further documentation. You can combine them, e.g.
2020-08-25 13:32:42 +00:00
/// `Promises::ORDERED | Promises::CONSISTENCY` The Stream will then
2020-08-25 12:21:25 +00:00
/// guarantee that those promises are met.
2021-02-18 00:01:57 +00:00
/// * `bandwidth` - sets a guaranteed bandwidth which is reserved for this
/// stream. When excess bandwidth is available it will be used. See
/// [`Bandwidth`] for details.
2020-05-10 02:07:46 +00:00
///
/// A [`ParticipantError`] might be thrown if the `Participant` is already
/// closed. [`Streams`] can be created without a answer from the remote
/// side, resulting in very fast creation and closing latency.
///
/// # Examples
/// ```rust
2021-02-14 17:45:12 +00:00
/// use tokio::runtime::Runtime;
2021-04-15 08:16:42 +00:00
/// use veloren_network::{ConnectAddr, ListenAddr, Network, Pid, Promises};
2020-05-10 02:07:46 +00:00
///
2020-05-26 13:06:03 +00:00
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
2020-05-27 15:58:57 +00:00
/// // Create a Network, connect on port 2100 and open a stream
2021-03-03 09:39:21 +00:00
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
2021-02-14 17:45:12 +00:00
/// runtime.block_on(async {
2021-04-15 08:16:42 +00:00
/// # remote.listen(ListenAddr::Tcp("127.0.0.1:2100".parse().unwrap())).await?;
2020-05-10 02:07:46 +00:00
/// let p1 = network
2021-04-15 08:16:42 +00:00
/// .connect(ConnectAddr::Tcp("127.0.0.1:2100".parse().unwrap()))
2020-05-10 02:07:46 +00:00
/// .await?;
2020-08-25 13:32:42 +00:00
/// let _s1 = p1
2021-02-18 00:01:57 +00:00
/// .open(4, Promises::ORDERED | Promises::CONSISTENCY, 1000)
2020-08-25 13:32:42 +00:00
/// .await?;
2021-03-03 09:39:21 +00:00
/// drop(network);
/// # drop(remote);
2020-05-26 13:06:03 +00:00
/// # Ok(())
/// })
/// # }
2020-05-10 02:07:46 +00:00
/// ```
///
2021-02-14 17:45:12 +00:00
/// [`Prio`]: network_protocol::Prio
2021-02-18 00:01:57 +00:00
/// [`Bandwidth`]: network_protocol::Bandwidth
2021-02-14 17:45:12 +00:00
/// [`Promises`]: network_protocol::Promises
2020-05-10 02:07:46 +00:00
/// [`Streams`]: crate::api::Stream
2021-02-26 09:45:38 +00:00
#[ instrument(name= " network " , skip(self, prio, promises, bandwidth), fields(p = %self.local_pid)) ]
2021-02-18 00:01:57 +00:00
pub async fn open (
& self ,
prio : u8 ,
promises : Promises ,
bandwidth : Bandwidth ,
) -> Result < Stream , ParticipantError > {
2021-02-14 17:45:12 +00:00
debug_assert! ( prio < = network_protocol ::HIGHEST_PRIO , " invalid prio " ) ;
2021-01-22 16:09:20 +00:00
let ( p2a_return_stream_s , p2a_return_stream_r ) = oneshot ::channel ::< Stream > ( ) ;
if let Err ( e ) = self . a2b_open_stream_s . lock ( ) . await . send ( (
prio ,
promises ,
2021-02-18 00:01:57 +00:00
bandwidth ,
2021-01-22 16:09:20 +00:00
p2a_return_stream_s ,
) ) {
2020-07-16 19:39:33 +00:00
debug! ( ? e , " bParticipant is already closed, notifying " ) ;
return Err ( ParticipantError ::ParticipantDisconnected ) ;
}
2020-05-15 12:29:17 +00:00
match p2a_return_stream_r . await {
2020-03-22 13:47:21 +00:00
Ok ( stream ) = > {
let sid = stream . sid ;
2021-02-10 10:37:42 +00:00
trace! ( ? sid , " opened stream " ) ;
2020-03-22 13:47:21 +00:00
Ok ( stream )
} ,
2020-04-08 14:26:42 +00:00
Err ( _ ) = > {
2021-02-10 10:37:42 +00:00
debug! ( " p2a_return_stream_r failed, closing participant " ) ;
2020-07-16 19:39:33 +00:00
Err ( ParticipantError ::ParticipantDisconnected )
2020-04-08 14:26:42 +00:00
} ,
2020-03-04 15:52:30 +00:00
}
}
2020-02-21 15:10:55 +00:00
2020-05-10 02:07:46 +00:00
/// Use this method to handle [`Streams`] opened from remote site, like the
/// [`connected`] method of [`Network`]. This is the associated method
/// to [`open`]. It's guaranteed that the order of [`open`] and `opened`
/// is equal. The `nth` [`Streams`] on one side will represent the `nth` on
/// the other side. A [`ParticipantError`] might be thrown if the
/// `Participant` is already closed.
///
/// # Examples
/// ```rust
2021-02-14 17:45:12 +00:00
/// use tokio::runtime::Runtime;
2021-04-15 08:16:42 +00:00
/// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr, Promises};
2020-05-10 02:07:46 +00:00
///
2020-05-26 13:06:03 +00:00
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
2020-05-27 15:58:57 +00:00
/// // Create a Network, connect on port 2110 and wait for the other side to open a stream
2020-08-25 12:21:25 +00:00
/// // Note: It's quite unusual to actively connect, but then wait on a stream to be connected, usually the Application taking initiative want's to also create the first Stream.
2021-03-03 09:39:21 +00:00
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
2021-02-14 17:45:12 +00:00
/// runtime.block_on(async {
2021-04-15 08:16:42 +00:00
/// # remote.listen(ListenAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
/// let p1 = network.connect(ConnectAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
2020-05-26 13:06:03 +00:00
/// # let p2 = remote.connected().await?;
2021-02-18 00:01:57 +00:00
/// # p2.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
2020-05-10 02:07:46 +00:00
/// let _s1 = p1.opened().await?;
2021-03-03 09:39:21 +00:00
/// drop(network);
/// # drop(remote);
2020-05-26 13:06:03 +00:00
/// # Ok(())
/// })
/// # }
2020-05-10 02:07:46 +00:00
/// ```
///
/// [`Streams`]: crate::api::Stream
/// [`connected`]: Network::connected
/// [`open`]: Participant::open
2021-02-10 10:37:42 +00:00
#[ instrument(name= " network " , skip(self), fields(p = %self.local_pid)) ]
2020-02-21 15:10:55 +00:00
pub async fn opened ( & self ) -> Result < Stream , ParticipantError > {
2021-01-15 13:04:32 +00:00
match self . b2a_stream_opened_r . lock ( ) . await . recv ( ) . await {
2020-04-08 14:26:42 +00:00
Some ( stream ) = > {
let sid = stream . sid ;
2021-02-10 10:37:42 +00:00
debug! ( ? sid , " Receive opened stream " ) ;
2020-04-08 14:26:42 +00:00
Ok ( stream )
} ,
None = > {
2021-02-10 10:37:42 +00:00
debug! ( " stream_opened_receiver failed, closing participant " ) ;
2020-07-16 19:39:33 +00:00
Err ( ParticipantError ::ParticipantDisconnected )
2020-04-08 14:26:42 +00:00
} ,
2020-02-21 15:10:55 +00:00
}
2020-02-10 17:25:47 +00:00
}
2020-04-08 14:26:42 +00:00
2020-07-09 07:58:21 +00:00
/// disconnecting a `Participant` in a async way.
/// Use this rather than `Participant::Drop` if you want to close multiple
/// `Participants`.
///
/// This function will wait for all [`Streams`] to properly close, including
/// all messages to be send before closing. If an error occurs with one
/// of the messages.
/// Except if the remote side already dropped the `Participant`
/// simultaneously, then messages won't be send
///
/// There is NO `disconnected` function in `Participant`, if a `Participant`
/// is no longer reachable (e.g. as the network cable was unplugged) the
/// `Participant` will fail all action, but needs to be manually
2020-08-25 12:21:25 +00:00
/// disconnected, using this function.
2020-07-09 07:58:21 +00:00
///
/// # Examples
/// ```rust
2021-02-14 17:45:12 +00:00
/// use tokio::runtime::Runtime;
2021-04-15 08:16:42 +00:00
/// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr};
2020-07-09 07:58:21 +00:00
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection.
2021-03-03 09:39:21 +00:00
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
/// let err = runtime.block_on(async {
2020-07-09 07:58:21 +00:00
/// network
2021-04-15 08:16:42 +00:00
/// .listen(ListenAddr::Tcp("127.0.0.1:2030".parse().unwrap()))
2020-07-09 07:58:21 +00:00
/// .await?;
2021-04-15 08:16:42 +00:00
/// # let keep_alive = remote.connect(ConnectAddr::Tcp("127.0.0.1:2030".parse().unwrap())).await?;
2020-07-09 07:58:21 +00:00
/// while let Ok(participant) = network.connected().await {
/// println!("Participant connected: {}", participant.remote_pid());
/// participant.disconnect().await?;
/// # //skip test here as it would be a endless loop
/// # break;
/// }
/// # Ok(())
2021-03-03 09:39:21 +00:00
/// });
/// drop(network);
/// # drop(remote);
/// # err
2020-07-09 07:58:21 +00:00
/// # }
/// ```
///
/// [`Streams`]: crate::api::Stream
2021-02-10 10:37:42 +00:00
#[ instrument(name= " network " , skip(self), fields(p = %self.local_pid)) ]
2020-07-09 07:58:21 +00:00
pub async fn disconnect ( self ) -> Result < ( ) , ParticipantError > {
// Remove, Close and try_unwrap error when unwrap fails!
2021-02-10 10:37:42 +00:00
debug! ( " Closing participant from network " ) ;
2020-07-09 07:58:21 +00:00
2020-07-11 12:34:01 +00:00
//Streams will be closed by BParticipant
2020-07-10 13:31:26 +00:00
match self . a2s_disconnect_s . lock ( ) . await . take ( ) {
2021-01-15 13:04:32 +00:00
Some ( a2s_disconnect_s ) = > {
2020-07-09 07:58:21 +00:00
let ( finished_sender , finished_receiver ) = oneshot ::channel ( ) ;
// Participant is connecting to Scheduler here, not as usual
// Participant<->BParticipant
Added non-admin moderators and timed bans.
The security model has been updated to reflect this change (for example,
moderators cannot revert a ban by an administrator). Ban history is
also now recorded in the ban file, and much more information about the
ban is stored (whitelists and administrators also have extra
information).
To support the new information without losing important information,
this commit also introduces a new migration path for editable settings
(both from legacy to the new format, and between versions). Examples
of how to do this correctly, and migrate to new versions of a settings
file, are in the settings/ subdirectory.
As part of this effort, editable settings have been revamped to
guarantee atomic saves (due to the increased amount of information in
each file), some latent bugs in networking were fixed, and server-cli
has been updated to go through StructOpt for both calls through TUI
and argv, greatly simplifying parsing logic.
2021-05-08 18:22:21 +00:00
// If this is already dropped, we can assume the other side already freed its
// resources.
let _ = a2s_disconnect_s
. send ( ( self . remote_pid , ( Duration ::from_secs ( 120 ) , finished_sender ) ) ) ;
2020-07-09 07:58:21 +00:00
match finished_receiver . await {
2020-07-16 19:39:33 +00:00
Ok ( res ) = > {
match res {
2021-02-10 10:37:42 +00:00
Ok ( ( ) ) = > trace! ( " Participant is now closed " ) ,
2020-07-16 19:39:33 +00:00
Err ( ref e ) = > {
2021-02-10 10:37:42 +00:00
trace! ( ? e , " Error occurred during shutdown of participant " )
2020-07-16 19:39:33 +00:00
} ,
} ;
res
2020-07-09 07:58:21 +00:00
} ,
Err ( e ) = > {
//this is a bug. but as i am Participant i can't destroy the network
error! (
? e ,
" Failed to get a message back from the scheduler, seems like the \
network is already closed "
) ;
2020-07-11 12:34:01 +00:00
Err ( ParticipantError ::ProtocolFailedUnrecoverable )
2020-07-09 07:58:21 +00:00
} ,
}
} ,
None = > {
warn! (
" seems like you are trying to disconnecting a participant after the network \
was already dropped . It was already dropped with the network ! "
) ;
2020-07-11 12:34:01 +00:00
Err ( ParticipantError ::ParticipantDisconnected )
2020-07-09 07:58:21 +00:00
} ,
}
}
2021-03-25 17:28:50 +00:00
/// Returns the current approximation on the maximum bandwidth available.
/// This WILL fluctuate based on the amount/size of send messages.
pub fn bandwidth ( & self ) -> f32 { * self . b2a_bandwidth_stats_r . borrow ( ) }
2021-02-14 17:45:12 +00:00
/// Returns the remote [`Pid`](network_protocol::Pid)
2020-04-08 14:26:42 +00:00
pub fn remote_pid ( & self ) -> Pid { self . remote_pid }
2020-02-10 17:25:47 +00:00
}
impl Stream {
2020-06-08 09:47:39 +00:00
#[ allow(clippy::too_many_arguments) ]
2020-03-10 00:07:36 +00:00
pub ( crate ) fn new (
2021-02-10 10:37:42 +00:00
local_pid : Pid ,
remote_pid : Pid ,
2020-03-10 00:07:36 +00:00
sid : Sid ,
2020-03-22 13:47:21 +00:00
prio : Prio ,
promises : Promises ,
2021-01-22 16:09:20 +00:00
guaranteed_bandwidth : Bandwidth ,
2020-07-16 19:39:33 +00:00
send_closed : Arc < AtomicBool > ,
2021-02-14 17:45:12 +00:00
a2b_msg_s : crossbeam_channel ::Sender < ( Sid , Bytes ) > ,
b2a_msg_recv_r : async_channel ::Receiver < Bytes > ,
2020-05-15 12:29:17 +00:00
a2b_close_stream_s : mpsc ::UnboundedSender < Sid > ,
2020-03-10 00:07:36 +00:00
) -> Self {
Self {
2021-02-10 10:37:42 +00:00
local_pid ,
remote_pid ,
2020-03-10 00:07:36 +00:00
sid ,
2020-03-22 13:47:21 +00:00
prio ,
promises ,
2021-01-22 16:09:20 +00:00
guaranteed_bandwidth ,
2020-07-16 19:39:33 +00:00
send_closed ,
2020-05-15 12:29:17 +00:00
a2b_msg_s ,
2020-10-16 09:21:18 +00:00
b2a_msg_recv_r : Some ( b2a_msg_recv_r ) ,
2020-05-15 12:29:17 +00:00
a2b_close_stream_s : Some ( a2b_close_stream_s ) ,
2020-03-10 00:07:36 +00:00
}
}
2020-02-10 17:25:47 +00:00
2020-05-10 02:07:46 +00:00
/// use to send a arbitrary message to the remote side, by having the remote
/// side also opened a `Stream` linked to this. the message will be
/// [`Serialized`], which actually is quite slow compared to most other
/// calculations done. A faster method [`send_raw`] exists, when extra
/// speed is needed. The other side needs to use the respective [`recv`]
/// function and know the type send.
///
/// `send` is an exception to the `async` messages, as it's probably called
/// quite often so it doesn't wait for execution. Which also means, that
/// no feedback is provided. It's to assume that the Message got `send`
/// correctly. If a error occurred, the next call will return an Error.
/// If the [`Participant`] disconnected it will also be unable to be used
/// any more. A [`StreamError`] will be returned in the error case, e.g.
/// when the `Stream` got closed already.
///
2020-05-27 15:58:57 +00:00
/// Note when a `Stream` is dropped locally, it will still send all
/// messages, though the `drop` will return immediately, however, when a
/// [`Participant`] gets gracefully shut down, all remaining messages
/// will be send. If the `Stream` is dropped from remote side no further
/// messages are send, because the remote side has no way of listening
/// to them either way. If the last channel is destroyed (e.g. by losing
2020-08-25 12:21:25 +00:00
/// the internet connection or non-graceful shutdown, pending messages
2020-05-27 15:58:57 +00:00
/// are also dropped.
2020-05-10 02:07:46 +00:00
///
/// # Example
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
/// ```
2020-08-25 13:32:42 +00:00
/// # use veloren_network::Promises;
2021-02-14 17:45:12 +00:00
/// use tokio::runtime::Runtime;
2021-04-15 08:16:42 +00:00
/// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
2020-05-10 02:07:46 +00:00
///
2020-05-26 13:06:03 +00:00
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
2020-05-27 15:58:57 +00:00
/// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World`
2021-03-03 09:39:21 +00:00
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
2021-02-14 17:45:12 +00:00
/// runtime.block_on(async {
2021-04-15 08:16:42 +00:00
/// network.listen(ListenAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
/// # // keep it alive
2021-02-18 00:01:57 +00:00
/// # let _stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
/// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
2020-05-10 02:07:46 +00:00
/// //Send Message
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
/// stream_a.send("Hello World")?;
2021-03-03 09:39:21 +00:00
/// drop(network);
/// # drop(remote);
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
/// # Ok(())
2020-05-26 13:06:03 +00:00
/// })
/// # }
2020-05-10 02:07:46 +00:00
/// ```
///
/// [`send_raw`]: Stream::send_raw
/// [`recv`]: Stream::recv
/// [`Serialized`]: Serialize
2020-05-22 14:00:08 +00:00
#[ inline ]
2020-04-08 14:26:42 +00:00
pub fn send < M : Serialize > ( & mut self , msg : M ) -> Result < ( ) , StreamError > {
2021-03-25 11:22:31 +00:00
self . send_raw ( & Message ::serialize ( & msg , self . params ( ) ) )
2020-04-24 10:56:04 +00:00
}
2020-08-25 13:32:42 +00:00
/// This methods give the option to skip multiple calls of [`bincode`] and
/// [`compress`], e.g. in case the same Message needs to send on
/// multiple `Streams` to multiple [`Participants`]. Other then that,
2020-10-16 09:21:18 +00:00
/// the same rules apply than for [`send`].
/// You need to create a Message via [`Message::serialize`].
2020-05-10 02:07:46 +00:00
///
/// # Example
/// ```rust
2020-08-25 13:32:42 +00:00
/// # use veloren_network::Promises;
2021-02-14 17:45:12 +00:00
/// use tokio::runtime::Runtime;
2020-05-26 13:06:03 +00:00
/// use bincode;
2021-04-15 08:16:42 +00:00
/// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid, Message};
2020-05-10 02:07:46 +00:00
///
2020-05-26 13:06:03 +00:00
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
2021-03-03 09:39:21 +00:00
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote1 = Network::new(Pid::new(), &runtime);
/// # let remote2 = Network::new(Pid::new(), &runtime);
2021-02-14 17:45:12 +00:00
/// runtime.block_on(async {
2021-04-15 08:16:42 +00:00
/// network.listen(ListenAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
/// # let remote1_p = remote1.connect(ConnectAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
/// # let remote2_p = remote2.connect(ConnectAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
2020-05-26 13:06:03 +00:00
/// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid());
2021-02-18 00:01:57 +00:00
/// # remote1_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
/// # remote2_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
2020-05-26 13:06:03 +00:00
/// let participant_a = network.connected().await?;
/// let participant_b = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// let mut stream_b = participant_b.opened().await?;
2020-05-10 02:07:46 +00:00
///
/// //Prepare Message and decode it
2021-03-25 11:22:31 +00:00
/// let msg = Message::serialize("Hello World", stream_a.params());
2020-05-10 02:07:46 +00:00
/// //Send same Message to multiple Streams
2020-10-16 09:21:18 +00:00
/// stream_a.send_raw(&msg);
/// stream_b.send_raw(&msg);
2021-03-03 09:39:21 +00:00
/// drop(network);
/// # drop(remote1);
/// # drop(remote2);
2020-05-26 13:06:03 +00:00
/// # Ok(())
/// })
/// # }
2020-05-10 02:07:46 +00:00
/// ```
///
/// [`send`]: Stream::send
/// [`Participants`]: crate::api::Participant
2020-08-25 13:32:42 +00:00
/// [`compress`]: lz_fear::raw::compress2
2020-10-16 09:21:18 +00:00
/// [`Message::serialize`]: crate::message::Message::serialize
pub fn send_raw ( & mut self , message : & Message ) -> Result < ( ) , StreamError > {
2020-07-16 19:39:33 +00:00
if self . send_closed . load ( Ordering ::Relaxed ) {
2020-04-08 14:26:42 +00:00
return Err ( StreamError ::StreamClosed ) ;
}
2020-10-16 09:21:18 +00:00
#[ cfg(debug_assertions) ]
2021-03-25 11:22:31 +00:00
message . verify ( self . params ( ) ) ;
2021-02-14 17:45:12 +00:00
self . a2b_msg_s . send ( ( self . sid , message . data . clone ( ) ) ) ? ;
2020-02-21 15:10:55 +00:00
Ok ( ( ) )
2020-02-10 17:25:47 +00:00
}
2020-05-10 02:07:46 +00:00
/// use `recv` to wait on a Message send from the remote side by their
/// `Stream`. The Message needs to implement [`DeserializeOwned`] and
/// thus, the resulting type must already be known by the receiving side.
/// If this is not know from the Application logic, one could use a `Enum`
/// and then handle the received message via a `match` state.
///
/// A [`StreamError`] will be returned in the error case, e.g. when the
/// `Stream` got closed already.
2020-05-27 15:58:57 +00:00
///
/// # Example
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
/// ```
2020-08-25 13:32:42 +00:00
/// # use veloren_network::Promises;
2021-02-14 17:45:12 +00:00
/// use tokio::runtime::Runtime;
2021-04-15 08:16:42 +00:00
/// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
2020-05-27 15:58:57 +00:00
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it
2021-03-03 09:39:21 +00:00
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
2021-02-14 17:45:12 +00:00
/// runtime.block_on(async {
2021-04-15 08:16:42 +00:00
/// network.listen(ListenAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
2021-02-18 00:01:57 +00:00
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
2020-05-27 15:58:57 +00:00
/// # stream_p.send("Hello World");
/// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
2020-10-15 11:22:34 +00:00
/// //Recv Message
2020-05-27 15:58:57 +00:00
/// println!("{}", stream_a.recv::<String>().await?);
2021-03-03 09:39:21 +00:00
/// drop(network);
/// # drop(remote);
2020-05-27 15:58:57 +00:00
/// # Ok(())
/// })
/// # }
/// ```
2020-05-22 14:00:08 +00:00
#[ inline ]
2020-03-04 10:59:19 +00:00
pub async fn recv < M : DeserializeOwned > ( & mut self ) -> Result < M , StreamError > {
2020-10-16 09:21:18 +00:00
self . recv_raw ( ) . await ? . deserialize ( )
2020-04-24 10:56:04 +00:00
}
2020-08-25 13:32:42 +00:00
/// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] or
/// [`decompress`] is executed for performance reasons.
2020-05-10 02:07:46 +00:00
///
2020-10-16 09:21:18 +00:00
/// # Example
/// ```
/// # use veloren_network::Promises;
2021-02-14 17:45:12 +00:00
/// use tokio::runtime::Runtime;
2021-04-15 08:16:42 +00:00
/// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
2020-10-16 09:21:18 +00:00
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on Port `2230` and wait for a Stream to be opened, then listen on it
2021-03-03 09:39:21 +00:00
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
2021-02-14 17:45:12 +00:00
/// runtime.block_on(async {
2021-04-15 08:16:42 +00:00
/// network.listen(ListenAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
2021-02-18 00:01:57 +00:00
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
2020-10-16 09:21:18 +00:00
/// # stream_p.send("Hello World");
/// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// //Recv Message
/// let msg = stream_a.recv_raw().await?;
/// //Resend Message, without deserializing
/// stream_a.send_raw(&msg)?;
2021-03-03 09:39:21 +00:00
/// drop(network);
/// # drop(remote);
2020-10-16 09:21:18 +00:00
/// # Ok(())
/// })
/// # }
/// ```
///
2020-05-10 02:07:46 +00:00
/// [`send_raw`]: Stream::send_raw
/// [`recv`]: Stream::recv
2020-08-25 13:32:42 +00:00
/// [`decompress`]: lz_fear::raw::decompress_raw
2020-10-16 09:21:18 +00:00
pub async fn recv_raw ( & mut self ) -> Result < Message , StreamError > {
match & mut self . b2a_msg_recv_r {
Some ( b2a_msg_recv_r ) = > {
2021-01-15 13:04:32 +00:00
match b2a_msg_recv_r . recv ( ) . await {
2021-02-14 17:45:12 +00:00
Ok ( data ) = > Ok ( Message {
data ,
2020-10-16 09:21:18 +00:00
#[ cfg(feature = " compression " ) ]
compressed : self . promises . contains ( Promises ::COMPRESSED ) ,
} ) ,
2021-01-15 13:04:32 +00:00
Err ( _ ) = > {
2020-10-16 09:21:18 +00:00
self . b2a_msg_recv_r = None ; //prevent panic
Err ( StreamError ::StreamClosed )
} ,
}
} ,
None = > Err ( StreamError ::StreamClosed ) ,
}
2020-02-10 17:25:47 +00:00
}
2020-10-15 11:22:34 +00:00
/// use `try_recv` to check for a Message send from the remote side by their
/// `Stream`. This function does not block and returns immediately. It's
/// intended for use in non-async context only. Other then that, the
/// same rules apply than for [`recv`].
///
/// # Example
/// ```
/// # use veloren_network::Promises;
2021-02-14 17:45:12 +00:00
/// use tokio::runtime::Runtime;
2021-04-15 08:16:42 +00:00
/// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
2020-10-15 11:22:34 +00:00
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
2020-10-16 09:21:18 +00:00
/// // Create a Network, listen on Port `2240` and wait for a Stream to be opened, then listen on it
2021-03-03 09:39:21 +00:00
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
2021-02-14 17:45:12 +00:00
/// runtime.block_on(async {
2021-04-15 08:16:42 +00:00
/// network.listen(ListenAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
2021-02-18 00:01:57 +00:00
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
2020-10-15 11:22:34 +00:00
/// # stream_p.send("Hello World");
/// # std::thread::sleep(std::time::Duration::from_secs(1));
/// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// //Try Recv Message
/// println!("{:?}", stream_a.try_recv::<String>()?);
2021-03-03 09:39:21 +00:00
/// drop(network);
/// # drop(remote);
2020-10-15 11:22:34 +00:00
/// # Ok(())
/// })
/// # }
/// ```
2020-10-16 09:21:18 +00:00
///
/// [`recv`]: Stream::recv
2020-10-15 11:22:34 +00:00
#[ inline ]
pub fn try_recv < M : DeserializeOwned > ( & mut self ) -> Result < Option < M > , StreamError > {
2020-10-16 09:21:18 +00:00
match & mut self . b2a_msg_recv_r {
2021-01-15 13:04:32 +00:00
Some ( b2a_msg_recv_r ) = > match b2a_msg_recv_r . try_recv ( ) {
2021-02-14 17:45:12 +00:00
Ok ( data ) = > Ok ( Some (
2020-10-16 09:21:18 +00:00
Message {
2021-02-14 17:45:12 +00:00
data ,
2020-10-16 09:21:18 +00:00
#[ cfg(feature = " compression " ) ]
2021-03-25 11:22:31 +00:00
compressed : self . promises . contains ( Promises ::COMPRESSED ) ,
2020-10-16 09:21:18 +00:00
}
. deserialize ( ) ? ,
) ) ,
2021-01-15 13:04:32 +00:00
Err ( async_channel ::TryRecvError ::Empty ) = > Ok ( None ) ,
Err ( async_channel ::TryRecvError ::Closed ) = > {
self . b2a_msg_recv_r = None ; //prevent panic
Err ( StreamError ::StreamClosed )
} ,
2020-10-16 09:21:18 +00:00
} ,
None = > Err ( StreamError ::StreamClosed ) ,
2020-10-15 11:22:34 +00:00
}
}
2020-10-16 09:21:18 +00:00
2021-03-25 11:22:31 +00:00
pub fn params ( & self ) -> StreamParams {
StreamParams {
promises : self . promises ,
}
}
2020-03-22 13:47:21 +00:00
}
2020-03-10 00:07:36 +00:00
2020-07-09 07:58:21 +00:00
impl core ::cmp ::PartialEq for Participant {
fn eq ( & self , other : & Self ) -> bool {
//don't check local_pid, 2 Participant from different network should match if
// they are the "same"
self . remote_pid = = other . remote_pid
}
}
2021-04-03 14:58:09 +00:00
fn actively_wait < T , F > ( name : & 'static str , mut finished_receiver : oneshot ::Receiver < T > , f : F )
2021-03-03 09:39:21 +00:00
where
F : FnOnce ( T ) + Send + 'static ,
T : Send + 'static ,
{
const CHANNEL_ERR : & str = " Something is wrong in internal scheduler/participant coding " ;
2021-01-22 16:09:20 +00:00
2021-03-03 09:39:21 +00:00
if let Ok ( handle ) = tokio ::runtime ::Handle ::try_current ( ) {
// When in Async Context WE MUST NOT SYNC BLOCK (as a deadlock might occur as
// other is queued behind). And we CANNOT join our Future_Handle
trace! ( " async context detected, defer shutdown " ) ;
handle . spawn ( async move {
match finished_receiver . await {
Ok ( data ) = > f ( data ) ,
2021-04-22 17:27:38 +00:00
Err ( e ) = > error! ( " {}{}: {} " , name , CHANNEL_ERR , e ) ,
2021-03-03 09:39:21 +00:00
}
} ) ;
} else {
let mut cnt = 0 ;
loop {
use tokio ::sync ::oneshot ::error ::TryRecvError ;
match finished_receiver . try_recv ( ) {
Ok ( data ) = > {
f ( data ) ;
break ;
} ,
2021-04-03 14:58:09 +00:00
Err ( TryRecvError ::Closed ) = > panic! ( " {} {} " , name , CHANNEL_ERR ) ,
2021-03-03 09:39:21 +00:00
Err ( TryRecvError ::Empty ) = > {
trace! ( " activly sleeping " ) ;
cnt + = 1 ;
2021-04-14 14:17:06 +00:00
if cnt > 10 {
2021-03-03 09:39:21 +00:00
error! ( " Timeout waiting for shutdown, dropping " ) ;
break ;
}
std ::thread ::sleep ( Duration ::from_millis ( 100 ) * cnt ) ;
} ,
}
2021-01-22 16:09:20 +00:00
}
2021-03-03 09:39:21 +00:00
} ;
}
2021-01-22 16:09:20 +00:00
2021-03-03 09:39:21 +00:00
impl Drop for Network {
#[ instrument(name= " network " , skip(self), fields(p = %self.local_pid)) ]
fn drop ( & mut self ) {
trace! ( " Dropping Network " ) ;
let ( finished_sender , finished_receiver ) = oneshot ::channel ( ) ;
match self
. shutdown_network_s
2021-01-22 16:09:20 +00:00
. take ( )
. unwrap ( )
2021-03-03 09:39:21 +00:00
. send ( finished_sender )
{
Err ( e ) = > warn! ( ? e , " Runtime seems to be dropped already " ) ,
2021-04-03 14:58:09 +00:00
Ok ( ( ) ) = > actively_wait ( " network " , finished_receiver , | ( ) | {
info! ( " Network dropped gracefully " )
} ) ,
2021-03-03 09:39:21 +00:00
} ;
2020-03-10 00:07:36 +00:00
}
2020-03-22 13:47:21 +00:00
}
2020-03-10 00:07:36 +00:00
2020-03-22 13:47:21 +00:00
impl Drop for Participant {
2021-02-10 10:37:42 +00:00
#[ instrument(name= " remote " , skip(self), fields(p = %self.remote_pid)) ]
#[ instrument(name= " network " , skip(self), fields(p = %self.local_pid)) ]
2020-03-22 13:47:21 +00:00
fn drop ( & mut self ) {
2021-02-26 09:45:38 +00:00
const SHUTDOWN_ERR : & str = " Error while dropping the participant, couldn't send all \
outgoing messages , dropping remaining " ;
2021-03-03 09:39:21 +00:00
const SCHEDULER_ERR : & str =
" Something is wrong in internal scheduler coding or you dropped the runtime to early " ;
2020-04-08 14:26:42 +00:00
// ignore closed, as we need to send it even though we disconnected the
// participant from network
2021-02-10 10:37:42 +00:00
debug! ( " Shutting down Participant " ) ;
2020-07-09 07:58:21 +00:00
2021-03-03 09:39:21 +00:00
match self . a2s_disconnect_s . try_lock ( ) {
Err ( e ) = > debug! ( ? e , " Participant is beeing dropped by Network right now " ) ,
Ok ( mut s ) = > match s . take ( ) {
None = > info! ( " Participant already has been shutdown gracefully " ) ,
Some ( a2s_disconnect_s ) = > {
debug! ( " Disconnect from Scheduler " ) ;
let ( finished_sender , finished_receiver ) = oneshot ::channel ( ) ;
match a2s_disconnect_s
2021-04-14 14:17:06 +00:00
. send ( ( self . remote_pid , ( Duration ::from_secs ( 10 ) , finished_sender ) ) )
2021-03-03 09:39:21 +00:00
{
Err ( e ) = > warn! ( ? e , SCHEDULER_ERR ) ,
Ok ( ( ) ) = > {
2021-04-03 14:58:09 +00:00
actively_wait ( " participant " , finished_receiver , | d | match d {
2021-03-03 09:39:21 +00:00
Ok ( ( ) ) = > info! ( " Participant dropped gracefully " ) ,
Err ( e ) = > error! ( ? e , SHUTDOWN_ERR ) ,
} ) ;
} ,
2021-01-22 16:09:20 +00:00
}
2021-03-03 09:39:21 +00:00
} ,
2020-05-15 12:29:17 +00:00
} ,
}
2020-03-10 00:07:36 +00:00
}
}
impl Drop for Stream {
2021-02-10 10:37:42 +00:00
#[ instrument(name= " remote " , skip(self), fields(p = %self.remote_pid)) ]
#[ instrument(name= " network " , skip(self), fields(p = %self.local_pid)) ]
2020-03-10 00:07:36 +00:00
fn drop ( & mut self ) {
2020-10-14 12:29:39 +00:00
// send if closed is unnecessary but doesn't hurt, we must not crash
2020-07-16 19:39:33 +00:00
if ! self . send_closed . load ( Ordering ::Relaxed ) {
2020-04-08 14:26:42 +00:00
let sid = self . sid ;
2021-02-10 10:37:42 +00:00
debug! ( ? sid , " Shutting down Stream " ) ;
2021-01-22 16:09:20 +00:00
if let Err ( e ) = self . a2b_close_stream_s . take ( ) . unwrap ( ) . send ( self . sid ) {
debug! (
? e ,
" bparticipant part of a gracefully shutdown was already closed "
) ;
}
2020-05-15 12:29:17 +00:00
} else {
let sid = self . sid ;
2021-02-10 10:37:42 +00:00
trace! ( ? sid , " Stream Drop not needed " ) ;
2020-03-22 13:47:21 +00:00
}
2020-03-10 00:07:36 +00:00
}
2020-02-10 17:25:47 +00:00
}
2020-04-08 14:26:42 +00:00
impl std ::fmt ::Debug for Participant {
#[ inline ]
fn fmt ( & self , f : & mut std ::fmt ::Formatter < '_ > ) -> std ::fmt ::Result {
write! (
f ,
2020-07-11 12:34:01 +00:00
" Participant {{ local_pid: {:?}, remote_pid: {:?} }} " ,
& self . local_pid , & self . remote_pid ,
2020-04-08 14:26:42 +00:00
)
}
}
2020-07-03 19:55:00 +00:00
impl < T > From < crossbeam_channel ::SendError < T > > for StreamError {
fn from ( _err : crossbeam_channel ::SendError < T > ) -> Self { StreamError ::StreamClosed }
2020-04-08 14:26:42 +00:00
}
2020-07-03 19:55:00 +00:00
impl < T > From < crossbeam_channel ::SendError < T > > for NetworkError {
fn from ( _err : crossbeam_channel ::SendError < T > ) -> Self { NetworkError ::NetworkClosed }
2020-04-08 14:26:42 +00:00
}
impl From < std ::option ::NoneError > for StreamError {
fn from ( _err : std ::option ::NoneError ) -> Self { StreamError ::StreamClosed }
}
impl From < std ::option ::NoneError > for NetworkError {
fn from ( _err : std ::option ::NoneError ) -> Self { NetworkError ::NetworkClosed }
}
2021-01-15 13:04:32 +00:00
impl < T > From < mpsc ::error ::SendError < T > > for NetworkError {
fn from ( _err : mpsc ::error ::SendError < T > ) -> Self { NetworkError ::NetworkClosed }
}
impl From < oneshot ::error ::RecvError > for NetworkError {
fn from ( _err : oneshot ::error ::RecvError ) -> Self { NetworkError ::NetworkClosed }
2020-04-08 14:26:42 +00:00
}
2021-01-15 13:04:32 +00:00
impl From < std ::io ::Error > for NetworkError {
fn from ( _err : std ::io ::Error ) -> Self { NetworkError ::NetworkClosed }
2020-04-08 14:26:42 +00:00
}
2020-05-26 13:06:03 +00:00
2020-06-08 09:47:39 +00:00
impl From < Box < bincode ::ErrorKind > > for StreamError {
2020-08-25 13:32:42 +00:00
fn from ( err : Box < bincode ::ErrorKind > ) -> Self { StreamError ::Deserialize ( err ) }
2020-06-08 09:47:39 +00:00
}
2020-05-26 13:06:03 +00:00
impl core ::fmt ::Display for StreamError {
2020-06-08 09:47:39 +00:00
fn fmt ( & self , f : & mut core ::fmt ::Formatter < '_ > ) -> core ::fmt ::Result {
2020-05-26 13:06:03 +00:00
match self {
StreamError ::StreamClosed = > write! ( f , " stream closed " ) ,
2020-08-25 13:32:42 +00:00
#[ cfg(feature = " compression " ) ]
StreamError ::Compression ( err ) = > write! ( f , " compression error on message: {} " , err ) ,
StreamError ::Deserialize ( err ) = > write! ( f , " deserialize error on message: {} " , err ) ,
2020-05-26 13:06:03 +00:00
}
}
}
impl core ::fmt ::Display for ParticipantError {
2020-06-08 09:47:39 +00:00
fn fmt ( & self , f : & mut core ::fmt ::Formatter < '_ > ) -> core ::fmt ::Result {
2020-05-26 13:06:03 +00:00
match self {
2020-07-11 12:34:01 +00:00
ParticipantError ::ParticipantDisconnected = > write! ( f , " Participant disconnect " ) ,
ParticipantError ::ProtocolFailedUnrecoverable = > {
write! ( f , " underlying protocol failed unrecoverable " )
2020-07-09 07:58:21 +00:00
} ,
2020-05-26 13:06:03 +00:00
}
}
}
impl core ::fmt ::Display for NetworkError {
2020-06-08 09:47:39 +00:00
fn fmt ( & self , f : & mut core ::fmt ::Formatter < '_ > ) -> core ::fmt ::Result {
2020-05-26 13:06:03 +00:00
match self {
2020-07-05 22:13:53 +00:00
NetworkError ::NetworkClosed = > write! ( f , " Network closed " ) ,
NetworkError ::ListenFailed ( _ ) = > write! ( f , " Listening failed " ) ,
NetworkError ::ConnectFailed ( _ ) = > write! ( f , " Connecting failed " ) ,
2020-05-26 13:06:03 +00:00
}
}
}
2021-02-21 23:48:30 +00:00
impl core ::fmt ::Display for NetworkConnectError {
fn fmt ( & self , f : & mut core ::fmt ::Formatter < '_ > ) -> core ::fmt ::Result {
match self {
NetworkConnectError ::Io ( e ) = > write! ( f , " Io error: {} " , e ) ,
NetworkConnectError ::Handshake ( e ) = > write! ( f , " Handshake error: {} " , e ) ,
NetworkConnectError ::InvalidSecret = > {
write! ( f , " You specified the wrong secret on your second channel " )
} ,
}
}
}
2020-06-08 09:47:39 +00:00
/// implementing PartialEq as it's super convenient in tests
impl core ::cmp ::PartialEq for StreamError {
fn eq ( & self , other : & Self ) -> bool {
match self {
StreamError ::StreamClosed = > match other {
StreamError ::StreamClosed = > true ,
2020-08-25 13:32:42 +00:00
#[ cfg(feature = " compression " ) ]
StreamError ::Compression ( _ ) = > false ,
StreamError ::Deserialize ( _ ) = > false ,
} ,
#[ cfg(feature = " compression " ) ]
StreamError ::Compression ( err ) = > match other {
StreamError ::StreamClosed = > false ,
#[ cfg(feature = " compression " ) ]
StreamError ::Compression ( other_err ) = > err = = other_err ,
StreamError ::Deserialize ( _ ) = > false ,
2020-06-08 09:47:39 +00:00
} ,
2020-08-25 13:32:42 +00:00
StreamError ::Deserialize ( err ) = > match other {
2020-06-08 09:47:39 +00:00
StreamError ::StreamClosed = > false ,
2020-08-25 13:32:42 +00:00
#[ cfg(feature = " compression " ) ]
StreamError ::Compression ( _ ) = > false ,
StreamError ::Deserialize ( other_err ) = > partial_eq_bincode ( err , other_err ) ,
2020-06-08 09:47:39 +00:00
} ,
}
}
}
2020-05-26 13:06:03 +00:00
impl std ::error ::Error for StreamError { }
impl std ::error ::Error for ParticipantError { }
impl std ::error ::Error for NetworkError { }
2021-02-21 23:48:30 +00:00
impl std ::error ::Error for NetworkConnectError { }