mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
1028 lines
41 KiB
Rust
1028 lines
41 KiB
Rust
//!
|
|
//!
|
|
//!
|
|
//! (cd network/examples/async_recv && RUST_BACKTRACE=1 cargo run)
|
|
use crate::{
|
|
message::{self, partial_eq_bincode, IncomingMessage, MessageBuffer, OutgoingMessage},
|
|
scheduler::Scheduler,
|
|
types::{Mid, Pid, Prio, Promises, Sid},
|
|
};
|
|
use async_std::{io, sync::RwLock, task};
|
|
use futures::{
|
|
channel::{mpsc, oneshot},
|
|
sink::SinkExt,
|
|
stream::StreamExt,
|
|
};
|
|
use prometheus::Registry;
|
|
use serde::{de::DeserializeOwned, Serialize};
|
|
use std::{
|
|
collections::HashMap,
|
|
net::SocketAddr,
|
|
sync::{
|
|
atomic::{AtomicBool, Ordering},
|
|
Arc,
|
|
},
|
|
};
|
|
use tracing::*;
|
|
use tracing_futures::Instrument;
|
|
|
|
/// Represents a Tcp or Udp or Mpsc address
|
|
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
|
|
pub enum Address {
|
|
Tcp(SocketAddr),
|
|
Udp(SocketAddr),
|
|
Mpsc(u64),
|
|
}
|
|
|
|
/// `Participants` are generated by the [`Network`] and represent a connection
|
|
/// to a remote Participant. Look at the [`connect`] and [`connected`] method of
|
|
/// [`Networks`] on how to generate `Participants`
|
|
///
|
|
/// [`Networks`]: crate::api::Network
|
|
/// [`connect`]: Network::connect
|
|
/// [`connected`]: Network::connected
|
|
pub struct Participant {
|
|
local_pid: Pid,
|
|
remote_pid: Pid,
|
|
a2b_steam_open_s: RwLock<mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>>,
|
|
b2a_stream_opened_r: RwLock<mpsc::UnboundedReceiver<Stream>>,
|
|
closed: AtomicBool,
|
|
a2s_disconnect_s:
|
|
Option<mpsc::UnboundedSender<(Pid, oneshot::Sender<async_std::io::Result<()>>)>>,
|
|
}
|
|
|
|
/// `Streams` represents a channel to send `n` messages with a certain priority
|
|
/// and [`Promises`]. messages need always to be send between 2 `Streams`.
|
|
///
|
|
/// `Streams` are generated by the [`Participant`].
|
|
/// Look at the [`open`] and [`opened`] method of [`Participant`] on how to
|
|
/// generate `Streams`
|
|
///
|
|
/// Unlike [`Network`] and [`Participant`], `Streams` don't implement interior
|
|
/// mutability, as multiple threads don't need access to the same `Stream`.
|
|
///
|
|
/// [`Networks`]: crate::api::Network
|
|
/// [`open`]: Participant::open
|
|
/// [`opened`]: Participant::opened
|
|
#[derive(Debug)]
|
|
pub struct Stream {
|
|
pid: Pid,
|
|
sid: Sid,
|
|
mid: Mid,
|
|
prio: Prio,
|
|
promises: Promises,
|
|
a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
|
|
b2a_msg_recv_r: mpsc::UnboundedReceiver<IncomingMessage>,
|
|
closed: Arc<AtomicBool>,
|
|
a2b_close_stream_s: Option<mpsc::UnboundedSender<Sid>>,
|
|
}
|
|
|
|
/// Error type thrown by [`Networks`](Network) methods
|
|
#[derive(Debug)]
|
|
pub enum NetworkError {
|
|
NetworkClosed,
|
|
ListenFailed(std::io::Error),
|
|
}
|
|
|
|
/// Error type thrown by [`Participants`](Participant) methods
|
|
#[derive(Debug, PartialEq)]
|
|
pub enum ParticipantError {
|
|
ParticipantClosed,
|
|
}
|
|
|
|
/// Error type thrown by [`Streams`](Stream) methods
|
|
#[derive(Debug)]
|
|
pub enum StreamError {
|
|
StreamClosed,
|
|
DeserializeError(Box<bincode::ErrorKind>),
|
|
}
|
|
|
|
/// Use the `Network` to create connections to other [`Participants`]
|
|
///
|
|
/// The `Network` is the single source that handles all connections in your
|
|
/// Application. You can pass it around multiple threads in an
|
|
/// [`Arc`](std::sync::Arc) as all commands have internal mutability.
|
|
///
|
|
/// The `Network` has methods to [`connect`] and [`disconnect`] to other
|
|
/// [`Participants`] via their [`Address`]. All [`Participants`] will be stored
|
|
/// in the Network until explicitly disconnected, which is the only way to close
|
|
/// the sockets.
|
|
///
|
|
/// # Examples
|
|
/// ```rust
|
|
/// use veloren_network::{Network, Address, Pid};
|
|
/// use futures::executor::block_on;
|
|
///
|
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application
|
|
/// let (network, f) = Network::new(Pid::new(), None);
|
|
/// std::thread::spawn(f);
|
|
/// block_on(async{
|
|
/// # //setup pseudo database!
|
|
/// # let (database, fd) = Network::new(Pid::new(), None);
|
|
/// # std::thread::spawn(fd);
|
|
/// # database.listen(Address::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
|
|
/// network.listen(Address::Tcp("127.0.0.1:2999".parse().unwrap())).await?;
|
|
/// let database = network.connect(Address::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`Participants`]: crate::api::Participant
|
|
/// [`connect`]: Network::connect
|
|
/// [`disconnect`]: Network::disconnect
|
|
pub struct Network {
|
|
local_pid: Pid,
|
|
participants: RwLock<HashMap<Pid, Arc<Participant>>>,
|
|
listen_sender:
|
|
RwLock<mpsc::UnboundedSender<(Address, oneshot::Sender<async_std::io::Result<()>>)>>,
|
|
connect_sender:
|
|
RwLock<mpsc::UnboundedSender<(Address, oneshot::Sender<io::Result<Participant>>)>>,
|
|
connected_receiver: RwLock<mpsc::UnboundedReceiver<Participant>>,
|
|
shutdown_sender: Option<oneshot::Sender<()>>,
|
|
}
|
|
|
|
impl Network {
|
|
/// Generates a new `Network` to handle all connections in an Application
|
|
///
|
|
/// # Arguments
|
|
/// * `participant_id` - provide it by calling [`Pid::new()`], usually you
|
|
/// don't want to reuse a Pid for 2 `Networks`
|
|
/// * `registry` - Provide a Registy in order to collect Prometheus metrics
|
|
/// by this `Network`, `None` will deactivate Tracing. Tracing is done via
|
|
/// [`prometheus`]
|
|
///
|
|
/// # Result
|
|
/// * `Self` - returns a `Network` which can be `Send` to multiple areas of
|
|
/// your code, including multiple threads. This is the base strct of this
|
|
/// crate.
|
|
/// * `FnOnce` - you need to run the returning FnOnce exactly once, probably
|
|
/// in it's own thread. this is NOT done internally, so that you are free
|
|
/// to choose the threadpool implementation of your choice. We recommend
|
|
/// using [`ThreadPool`] from [`uvth`] crate. This fn will runn the
|
|
/// Scheduler to handle all `Network` internals. Additional threads will
|
|
/// be allocated on an internal async-aware threadpool
|
|
///
|
|
/// # Examples
|
|
/// ```rust
|
|
/// //Example with uvth
|
|
/// use uvth::ThreadPoolBuilder;
|
|
/// use veloren_network::{Address, Network, Pid};
|
|
///
|
|
/// let pool = ThreadPoolBuilder::new().build();
|
|
/// let (network, f) = Network::new(Pid::new(), None);
|
|
/// pool.execute(f);
|
|
/// ```
|
|
///
|
|
/// ```rust
|
|
/// //Example with std::thread
|
|
/// use veloren_network::{Address, Network, Pid};
|
|
///
|
|
/// let (network, f) = Network::new(Pid::new(), None);
|
|
/// std::thread::spawn(f);
|
|
/// ```
|
|
///
|
|
/// Usually you only create a single `Network` for an appliregistrycation,
|
|
/// except when client and server are in the same application, then you
|
|
/// will want 2. However there are no technical limitations from
|
|
/// creating more.
|
|
///
|
|
/// [`Pid::new()`]: crate::types::Pid::new
|
|
/// [`ThreadPool`]: https://docs.rs/uvth/newest/uvth/struct.ThreadPool.html
|
|
/// [`uvth`]: https://docs.rs/uvth
|
|
pub fn new(
|
|
participant_id: Pid,
|
|
registry: Option<&Registry>,
|
|
) -> (Self, impl std::ops::FnOnce()) {
|
|
let p = participant_id;
|
|
debug!(?p, "starting Network");
|
|
let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) =
|
|
Scheduler::new(participant_id, registry);
|
|
(
|
|
Self {
|
|
local_pid: participant_id,
|
|
participants: RwLock::new(HashMap::new()),
|
|
listen_sender: RwLock::new(listen_sender),
|
|
connect_sender: RwLock::new(connect_sender),
|
|
connected_receiver: RwLock::new(connected_receiver),
|
|
shutdown_sender: Some(shutdown_sender),
|
|
},
|
|
move || {
|
|
trace!(?p, "starting sheduler in own thread");
|
|
let _handle = task::block_on(
|
|
scheduler
|
|
.run()
|
|
.instrument(tracing::info_span!("scheduler", ?p)),
|
|
);
|
|
trace!(?p, "stopping sheduler and his own thread");
|
|
},
|
|
)
|
|
}
|
|
|
|
/// starts listening on an [`Address`].
|
|
/// When the method returns the `Network` is ready to listen for incoming
|
|
/// connections OR has returned a [`NetworkError`] (e.g. port already used).
|
|
/// You can call [`connected`] to asynchrony wait for a [`Participant`] to
|
|
/// connect. You can call `listen` on multiple addresses, e.g. to
|
|
/// support multiple Protocols or NICs.
|
|
///
|
|
/// # Examples
|
|
/// ```rust
|
|
/// use futures::executor::block_on;
|
|
/// use veloren_network::{Address, Network, Pid};
|
|
///
|
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally
|
|
/// let (network, f) = Network::new(Pid::new(), None);
|
|
/// std::thread::spawn(f);
|
|
/// block_on(async {
|
|
/// network
|
|
/// .listen(Address::Tcp("0.0.0.0:2000".parse().unwrap()))
|
|
/// .await?;
|
|
/// network
|
|
/// .listen(Address::Udp("127.0.0.1:2001".parse().unwrap()))
|
|
/// .await?;
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`connected`]: Network::connected
|
|
pub async fn listen(&self, address: Address) -> Result<(), NetworkError> {
|
|
let (s2a_result_s, s2a_result_r) = oneshot::channel::<async_std::io::Result<()>>();
|
|
debug!(?address, "listening on address");
|
|
self.listen_sender
|
|
.write()
|
|
.await
|
|
.send((address, s2a_result_s))
|
|
.await?;
|
|
match s2a_result_r.await? {
|
|
//waiting guarantees that we either listened sucessfully or get an error like port in
|
|
// use
|
|
Ok(()) => Ok(()),
|
|
Err(e) => Err(NetworkError::ListenFailed(e)),
|
|
}
|
|
}
|
|
|
|
/// starts connectiong to an [`Address`].
|
|
/// When the method returns the Network either returns a [`Participant`]
|
|
/// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g.
|
|
/// can't connect, or invalid Handshake) # Examples
|
|
/// ```rust
|
|
/// use futures::executor::block_on;
|
|
/// use veloren_network::{Address, Network, Pid};
|
|
///
|
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above
|
|
/// let (network, f) = Network::new(Pid::new(), None);
|
|
/// std::thread::spawn(f);
|
|
/// # let (remote, fr) = Network::new(Pid::new(), None);
|
|
/// # std::thread::spawn(fr);
|
|
/// block_on(async {
|
|
/// # remote.listen(Address::Tcp("0.0.0.0:2010".parse().unwrap())).await?;
|
|
/// # remote.listen(Address::Udp("0.0.0.0:2011".parse().unwrap())).await?;
|
|
/// let p1 = network
|
|
/// .connect(Address::Tcp("127.0.0.1:2010".parse().unwrap()))
|
|
/// .await?;
|
|
/// # //this doesn't work yet, so skip the test
|
|
/// # //TODO fixme!
|
|
/// # return Ok(());
|
|
/// let p2 = network
|
|
/// .connect(Address::Udp("127.0.0.1:2011".parse().unwrap()))
|
|
/// .await?;
|
|
/// assert!(std::sync::Arc::ptr_eq(&p1, &p2));
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
/// Usually the `Network` guarantees that a operation on a [`Participant`]
|
|
/// succeeds, e.g. by automatic retrying unless it fails completely e.g. by
|
|
/// disconnecting from the remote. If 2 [`Addresses`] you `connect` to
|
|
/// belongs to the same [`Participant`], you get the same [`Participant`] as
|
|
/// a result. This is useful e.g. by connecting to the same
|
|
/// [`Participant`] via multiple Protocols.
|
|
///
|
|
/// [`Streams`]: crate::api::Stream
|
|
/// [`Addresses`]: crate::api::Address
|
|
pub async fn connect(&self, address: Address) -> Result<Arc<Participant>, NetworkError> {
|
|
let (pid_sender, pid_receiver) = oneshot::channel::<io::Result<Participant>>();
|
|
debug!(?address, "connect to address");
|
|
self.connect_sender
|
|
.write()
|
|
.await
|
|
.send((address, pid_sender))
|
|
.await?;
|
|
let participant = pid_receiver.await??;
|
|
let pid = participant.remote_pid;
|
|
debug!(
|
|
?pid,
|
|
"received Participant id from remote and return to user"
|
|
);
|
|
let participant = Arc::new(participant);
|
|
self.participants
|
|
.write()
|
|
.await
|
|
.insert(participant.remote_pid, participant.clone());
|
|
Ok(participant)
|
|
}
|
|
|
|
/// returns a [`Participant`] created from a [`Address`] you called
|
|
/// [`listen`] on before. This function will either return a working
|
|
/// [`Participant`] ready to open [`Streams`] on OR has returned a
|
|
/// [`NetworkError`] (e.g. Network got closed)
|
|
///
|
|
/// # Examples
|
|
/// ```rust
|
|
/// use futures::executor::block_on;
|
|
/// use veloren_network::{Address, Network, Pid};
|
|
///
|
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, listen on port `2020` TCP and opens returns their Pid
|
|
/// let (network, f) = Network::new(Pid::new(), None);
|
|
/// std::thread::spawn(f);
|
|
/// # let (remote, fr) = Network::new(Pid::new(), None);
|
|
/// # std::thread::spawn(fr);
|
|
/// block_on(async {
|
|
/// network
|
|
/// .listen(Address::Tcp("0.0.0.0:2020".parse().unwrap()))
|
|
/// .await?;
|
|
/// # remote.connect(Address::Tcp("0.0.0.0:2020".parse().unwrap())).await?;
|
|
/// while let Ok(participant) = network.connected().await {
|
|
/// println!("Participant connected: {}", participant.remote_pid());
|
|
/// # //skip test here as it would be a endless loop
|
|
/// # break;
|
|
/// }
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`Streams`]: crate::api::Stream
|
|
/// [`listen`]: crate::api::Network::listen
|
|
pub async fn connected(&self) -> Result<Arc<Participant>, NetworkError> {
|
|
let participant = self.connected_receiver.write().await.next().await?;
|
|
let participant = Arc::new(participant);
|
|
self.participants
|
|
.write()
|
|
.await
|
|
.insert(participant.remote_pid, participant.clone());
|
|
Ok(participant)
|
|
}
|
|
|
|
/// disconnecting a [`Participant`] where you move the last existing
|
|
/// [`Arc<Participant>`]. As the [`Network`] also holds [`Arc`] to the
|
|
/// [`Participant`], you need to provide the last [`Arc<Participant>`] and
|
|
/// are not allowed to keep others. If you do so the [`Participant`]
|
|
/// can't be disconnected properly. If you no longer have the respective
|
|
/// [`Participant`], try using the [`participants`] method to get it.
|
|
///
|
|
/// This function will wait for all [`Streams`] to properly close, including
|
|
/// all messages to be send before closing. If an error occurs with one
|
|
/// of the messages.
|
|
/// Except if the remote side already dropped the [`Participant`]
|
|
/// simultaneously, then messages won't be sended
|
|
///
|
|
/// There is NO `disconnected` function in `Network`, if a [`Participant`]
|
|
/// is no longer reachable (e.g. as the network cable was unplugged) the
|
|
/// [`Participant`] will fail all action, but needs to be manually
|
|
/// disconected, using this function.
|
|
///
|
|
/// # Examples
|
|
/// ```rust
|
|
/// use futures::executor::block_on;
|
|
/// use veloren_network::{Address, Network, Pid};
|
|
///
|
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection.
|
|
/// let (network, f) = Network::new(Pid::new(), None);
|
|
/// std::thread::spawn(f);
|
|
/// # let (remote, fr) = Network::new(Pid::new(), None);
|
|
/// # std::thread::spawn(fr);
|
|
/// block_on(async {
|
|
/// network
|
|
/// .listen(Address::Tcp("0.0.0.0:2030".parse().unwrap()))
|
|
/// .await?;
|
|
/// # remote.connect(Address::Tcp("0.0.0.0:2030".parse().unwrap())).await?;
|
|
/// while let Ok(participant) = network.connected().await {
|
|
/// println!("Participant connected: {}", participant.remote_pid());
|
|
/// network.disconnect(participant).await?;
|
|
/// # //skip test here as it would be a endless loop
|
|
/// # break;
|
|
/// }
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`Arc<Participant>`]: crate::api::Participant
|
|
/// [`Streams`]: crate::api::Stream
|
|
/// [`participants`]: Network::participants
|
|
/// [`Arc`]: std::sync::Arc
|
|
pub async fn disconnect(&self, participant: Arc<Participant>) -> Result<(), NetworkError> {
|
|
// Remove, Close and try_unwrap error when unwrap fails!
|
|
let pid = participant.remote_pid;
|
|
debug!(?pid, "removing participant from network");
|
|
self.participants.write().await.remove(&pid)?;
|
|
participant.closed.store(true, Ordering::Relaxed);
|
|
|
|
match Arc::try_unwrap(participant) {
|
|
Err(_) => {
|
|
warn!(
|
|
"you are disconnecting and still keeping a reference to this participant, \
|
|
this is a bad idea. Participant will only be dropped when you drop your last \
|
|
reference"
|
|
);
|
|
},
|
|
Ok(mut participant) => {
|
|
trace!("waiting now for participant to close");
|
|
let (finished_sender, finished_receiver) = oneshot::channel();
|
|
// we are deleting here asyncly before DROP is called. Because this is done
|
|
// nativly async, while drop needs an BLOCK! Drop will recognis
|
|
// that it has been delete here and don't try another double delete.
|
|
participant
|
|
.a2s_disconnect_s
|
|
.take()
|
|
.unwrap()
|
|
.send((pid, finished_sender))
|
|
.await
|
|
.expect("something is wrong in internal scheduler coding");
|
|
let res = finished_receiver.await.unwrap();
|
|
trace!("participant is now closed");
|
|
res?;
|
|
},
|
|
};
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// returns a copy of all current connected [`Participants`],
|
|
/// including ones, which can't send data anymore as the underlying sockets
|
|
/// are closed already but haven't been [`disconnected`] yet.
|
|
///
|
|
/// [`Participants`]: crate::api::Participant
|
|
/// [`disconnected`]: Network::disconnect
|
|
pub async fn participants(&self) -> HashMap<Pid, Arc<Participant>> {
|
|
self.participants.read().await.clone()
|
|
}
|
|
}
|
|
|
|
impl Participant {
|
|
pub(crate) fn new(
|
|
local_pid: Pid,
|
|
remote_pid: Pid,
|
|
a2b_steam_open_s: mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>,
|
|
b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
|
|
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, oneshot::Sender<async_std::io::Result<()>>)>,
|
|
) -> Self {
|
|
Self {
|
|
local_pid,
|
|
remote_pid,
|
|
a2b_steam_open_s: RwLock::new(a2b_steam_open_s),
|
|
b2a_stream_opened_r: RwLock::new(b2a_stream_opened_r),
|
|
closed: AtomicBool::new(false),
|
|
a2s_disconnect_s: Some(a2s_disconnect_s),
|
|
}
|
|
}
|
|
|
|
/// Opens a [`Stream`] on this `Participant` with a certain Priority and
|
|
/// [`Promises`]
|
|
///
|
|
/// # Arguments
|
|
/// * `prio` - valid between 0-63. The priority rates the throughput for
|
|
/// messages of the [`Stream`] e.g. prio 5 messages will get 1/2 the speed
|
|
/// prio0 messages have. Prio10 messages only 1/4 and Prio 15 only 1/8,
|
|
/// etc...
|
|
/// * `promises` - use a combination of you prefered [`Promises`], see the
|
|
/// link for further documentation. You can combine them, e.g.
|
|
/// `PROMISES_ORDERED | PROMISES_CONSISTENCY` The Stream will then
|
|
/// guarantee that those promisses are met.
|
|
///
|
|
/// A [`ParticipantError`] might be thrown if the `Participant` is already
|
|
/// closed. [`Streams`] can be created without a answer from the remote
|
|
/// side, resulting in very fast creation and closing latency.
|
|
///
|
|
/// # Examples
|
|
/// ```rust
|
|
/// use futures::executor::block_on;
|
|
/// use veloren_network::{Address, Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
|
///
|
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, connect on port 2100 and open a stream
|
|
/// let (network, f) = Network::new(Pid::new(), None);
|
|
/// std::thread::spawn(f);
|
|
/// # let (remote, fr) = Network::new(Pid::new(), None);
|
|
/// # std::thread::spawn(fr);
|
|
/// block_on(async {
|
|
/// # remote.listen(Address::Tcp("0.0.0.0:2100".parse().unwrap())).await?;
|
|
/// let p1 = network
|
|
/// .connect(Address::Tcp("127.0.0.1:2100".parse().unwrap()))
|
|
/// .await?;
|
|
/// let _s1 = p1.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`Streams`]: crate::api::Stream
|
|
pub async fn open(&self, prio: u8, promises: Promises) -> Result<Stream, ParticipantError> {
|
|
//use this lock for now to make sure that only one open at a time is made,
|
|
// TODO: not sure if we can paralise that, check in future
|
|
let mut a2b_steam_open_s = self.a2b_steam_open_s.write().await;
|
|
if self.closed.load(Ordering::Relaxed) {
|
|
warn!(?self.remote_pid, "participant is closed but another open is tried on it");
|
|
return Err(ParticipantError::ParticipantClosed);
|
|
}
|
|
let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel();
|
|
if a2b_steam_open_s
|
|
.send((prio, promises, p2a_return_stream_s))
|
|
.await
|
|
.is_err()
|
|
{
|
|
debug!(?self.remote_pid, "stream_open_sender failed, closing participant");
|
|
self.closed.store(true, Ordering::Relaxed);
|
|
return Err(ParticipantError::ParticipantClosed);
|
|
}
|
|
match p2a_return_stream_r.await {
|
|
Ok(stream) => {
|
|
let sid = stream.sid;
|
|
debug!(?sid, ?self.remote_pid, "opened stream");
|
|
Ok(stream)
|
|
},
|
|
Err(_) => {
|
|
debug!(?self.remote_pid, "p2a_return_stream_r failed, closing participant");
|
|
self.closed.store(true, Ordering::Relaxed);
|
|
Err(ParticipantError::ParticipantClosed)
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Use this method to handle [`Streams`] opened from remote site, like the
|
|
/// [`connected`] method of [`Network`]. This is the associated method
|
|
/// to [`open`]. It's guaranteed that the order of [`open`] and `opened`
|
|
/// is equal. The `nth` [`Streams`] on one side will represent the `nth` on
|
|
/// the other side. A [`ParticipantError`] might be thrown if the
|
|
/// `Participant` is already closed.
|
|
///
|
|
/// # Examples
|
|
/// ```rust
|
|
/// use veloren_network::{Network, Pid, Address, PROMISES_ORDERED, PROMISES_CONSISTENCY};
|
|
/// use futures::executor::block_on;
|
|
///
|
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, connect on port 2110 and wait for the other side to open a stream
|
|
/// // Note: It's quite unusal to activly connect, but then wait on a stream to be connected, usually the Appication taking initiative want's to also create the first Stream.
|
|
/// let (network, f) = Network::new(Pid::new(), None);
|
|
/// std::thread::spawn(f);
|
|
/// # let (remote, fr) = Network::new(Pid::new(), None);
|
|
/// # std::thread::spawn(fr);
|
|
/// block_on(async {
|
|
/// # remote.listen(Address::Tcp("0.0.0.0:2110".parse().unwrap())).await?;
|
|
/// let p1 = network.connect(Address::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
|
|
/// # let p2 = remote.connected().await?;
|
|
/// # p2.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
|
|
/// let _s1 = p1.opened().await?;
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`Streams`]: crate::api::Stream
|
|
/// [`connected`]: Network::connected
|
|
/// [`open`]: Participant::open
|
|
pub async fn opened(&self) -> Result<Stream, ParticipantError> {
|
|
//use this lock for now to make sure that only one open at a time is made,
|
|
// TODO: not sure if we can paralise that, check in future
|
|
let mut stream_opened_receiver = self.b2a_stream_opened_r.write().await;
|
|
if self.closed.load(Ordering::Relaxed) {
|
|
warn!(?self.remote_pid, "participant is closed but another open is tried on it");
|
|
return Err(ParticipantError::ParticipantClosed);
|
|
}
|
|
match stream_opened_receiver.next().await {
|
|
Some(stream) => {
|
|
let sid = stream.sid;
|
|
debug!(?sid, ?self.remote_pid, "receive opened stream");
|
|
Ok(stream)
|
|
},
|
|
None => {
|
|
debug!(?self.remote_pid, "stream_opened_receiver failed, closing participant");
|
|
self.closed.store(true, Ordering::Relaxed);
|
|
Err(ParticipantError::ParticipantClosed)
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Returns the remote [`Pid`]
|
|
pub fn remote_pid(&self) -> Pid { self.remote_pid }
|
|
}
|
|
|
|
impl Stream {
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub(crate) fn new(
|
|
pid: Pid,
|
|
sid: Sid,
|
|
prio: Prio,
|
|
promises: Promises,
|
|
a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
|
|
b2a_msg_recv_r: mpsc::UnboundedReceiver<IncomingMessage>,
|
|
closed: Arc<AtomicBool>,
|
|
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
|
|
) -> Self {
|
|
Self {
|
|
pid,
|
|
sid,
|
|
mid: 0,
|
|
prio,
|
|
promises,
|
|
a2b_msg_s,
|
|
b2a_msg_recv_r,
|
|
closed,
|
|
a2b_close_stream_s: Some(a2b_close_stream_s),
|
|
}
|
|
}
|
|
|
|
/// use to send a arbitrary message to the remote side, by having the remote
|
|
/// side also opened a `Stream` linked to this. the message will be
|
|
/// [`Serialized`], which actually is quite slow compared to most other
|
|
/// calculations done. A faster method [`send_raw`] exists, when extra
|
|
/// speed is needed. The other side needs to use the respective [`recv`]
|
|
/// function and know the type send.
|
|
///
|
|
/// `send` is an exception to the `async` messages, as it's probably called
|
|
/// quite often so it doesn't wait for execution. Which also means, that
|
|
/// no feedback is provided. It's to assume that the Message got `send`
|
|
/// correctly. If a error occurred, the next call will return an Error.
|
|
/// If the [`Participant`] disconnected it will also be unable to be used
|
|
/// any more. A [`StreamError`] will be returned in the error case, e.g.
|
|
/// when the `Stream` got closed already.
|
|
///
|
|
/// Note when a `Stream` is dropped locally, it will still send all
|
|
/// messages, though the `drop` will return immediately, however, when a
|
|
/// [`Participant`] gets gracefully shut down, all remaining messages
|
|
/// will be send. If the `Stream` is dropped from remote side no further
|
|
/// messages are send, because the remote side has no way of listening
|
|
/// to them either way. If the last channel is destroyed (e.g. by losing
|
|
/// the internet connection or non-gracefull shutdown, pending messages
|
|
/// are also dropped.
|
|
///
|
|
/// # Example
|
|
/// ```
|
|
/// use veloren_network::{Network, Address, Pid};
|
|
/// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY};
|
|
/// use futures::executor::block_on;
|
|
///
|
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World`
|
|
/// let (network, f) = Network::new(Pid::new(), None);
|
|
/// std::thread::spawn(f);
|
|
/// # let (remote, fr) = Network::new(Pid::new(), None);
|
|
/// # std::thread::spawn(fr);
|
|
/// block_on(async {
|
|
/// network.listen(Address::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
|
|
/// # let remote_p = remote.connect(Address::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
|
|
/// # // keep it alive
|
|
/// # let _stream_p = remote_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
|
|
/// let participant_a = network.connected().await?;
|
|
/// let mut stream_a = participant_a.opened().await?;
|
|
/// //Send Message
|
|
/// stream_a.send("Hello World")?;
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`send_raw`]: Stream::send_raw
|
|
/// [`recv`]: Stream::recv
|
|
/// [`Serialized`]: Serialize
|
|
#[inline]
|
|
pub fn send<M: Serialize>(&mut self, msg: M) -> Result<(), StreamError> {
|
|
self.send_raw(Arc::new(message::serialize(&msg)))
|
|
}
|
|
|
|
/// This methods give the option to skip multiple calls of [`bincode`], e.g.
|
|
/// in case the same Message needs to send on multiple `Streams` to multiple
|
|
/// [`Participants`]. Other then that, the same rules apply than for
|
|
/// [`send`]
|
|
///
|
|
/// # Example
|
|
/// ```rust
|
|
/// use veloren_network::{Network, Address, Pid, MessageBuffer};
|
|
/// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY};
|
|
/// use futures::executor::block_on;
|
|
/// use bincode;
|
|
/// use std::sync::Arc;
|
|
///
|
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
/// let (network, f) = Network::new(Pid::new(), None);
|
|
/// std::thread::spawn(f);
|
|
/// # let (remote1, fr1) = Network::new(Pid::new(), None);
|
|
/// # std::thread::spawn(fr1);
|
|
/// # let (remote2, fr2) = Network::new(Pid::new(), None);
|
|
/// # std::thread::spawn(fr2);
|
|
/// block_on(async {
|
|
/// network.listen(Address::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
|
/// # let remote1_p = remote1.connect(Address::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
|
/// # let remote2_p = remote2.connect(Address::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
|
/// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid());
|
|
/// # remote1_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
|
|
/// # remote2_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
|
|
/// let participant_a = network.connected().await?;
|
|
/// let participant_b = network.connected().await?;
|
|
/// let mut stream_a = participant_a.opened().await?;
|
|
/// let mut stream_b = participant_b.opened().await?;
|
|
///
|
|
/// //Prepare Message and decode it
|
|
/// let msg = "Hello World";
|
|
/// let raw_msg = Arc::new(MessageBuffer{
|
|
/// data: bincode::serialize(&msg).unwrap(),
|
|
/// });
|
|
/// //Send same Message to multiple Streams
|
|
/// stream_a.send_raw(raw_msg.clone());
|
|
/// stream_b.send_raw(raw_msg.clone());
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`send`]: Stream::send
|
|
/// [`Participants`]: crate::api::Participant
|
|
pub fn send_raw(&mut self, messagebuffer: Arc<MessageBuffer>) -> Result<(), StreamError> {
|
|
if self.closed.load(Ordering::Relaxed) {
|
|
return Err(StreamError::StreamClosed);
|
|
}
|
|
//debug!(?messagebuffer, "sending a message");
|
|
self.a2b_msg_s.send((self.prio, self.sid, OutgoingMessage {
|
|
buffer: messagebuffer,
|
|
cursor: 0,
|
|
mid: self.mid,
|
|
sid: self.sid,
|
|
}))?;
|
|
self.mid += 1;
|
|
Ok(())
|
|
}
|
|
|
|
/// use `recv` to wait on a Message send from the remote side by their
|
|
/// `Stream`. The Message needs to implement [`DeserializeOwned`] and
|
|
/// thus, the resulting type must already be known by the receiving side.
|
|
/// If this is not know from the Application logic, one could use a `Enum`
|
|
/// and then handle the received message via a `match` state.
|
|
///
|
|
/// A [`StreamError`] will be returned in the error case, e.g. when the
|
|
/// `Stream` got closed already.
|
|
///
|
|
/// # Example
|
|
/// ```
|
|
/// use veloren_network::{Network, Address, Pid};
|
|
/// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY};
|
|
/// use futures::executor::block_on;
|
|
///
|
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it
|
|
/// let (network, f) = Network::new(Pid::new(), None);
|
|
/// std::thread::spawn(f);
|
|
/// # let (remote, fr) = Network::new(Pid::new(), None);
|
|
/// # std::thread::spawn(fr);
|
|
/// block_on(async {
|
|
/// network.listen(Address::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
|
|
/// # let remote_p = remote.connect(Address::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
|
|
/// # let mut stream_p = remote_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
|
|
/// # stream_p.send("Hello World");
|
|
/// let participant_a = network.connected().await?;
|
|
/// let mut stream_a = participant_a.opened().await?;
|
|
/// //Send Message
|
|
/// println!("{}", stream_a.recv::<String>().await?);
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
#[inline]
|
|
pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError> {
|
|
Ok(message::deserialize(self.recv_raw().await?)?)
|
|
}
|
|
|
|
/// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] is
|
|
/// executed for performance reasons.
|
|
///
|
|
/// [`send_raw`]: Stream::send_raw
|
|
/// [`recv`]: Stream::recv
|
|
pub async fn recv_raw(&mut self) -> Result<MessageBuffer, StreamError> {
|
|
//no need to access self.closed here, as when this stream is closed the Channel
|
|
// is closed which will trigger a None
|
|
let msg = self.b2a_msg_recv_r.next().await?;
|
|
//info!(?msg, "delivering a message");
|
|
Ok(msg.buffer)
|
|
}
|
|
}
|
|
|
|
impl Drop for Network {
|
|
fn drop(&mut self) {
|
|
let pid = self.local_pid;
|
|
debug!(?pid, "shutting down Network");
|
|
debug!(
|
|
?pid,
|
|
"shutting down Participants of Network, while we still have metrics"
|
|
);
|
|
task::block_on(async {
|
|
// we need to carefully shut down here! as otherwise we might call
|
|
// Participant::Drop with a2s_disconnect_s here which would open
|
|
// another task::block, which would panic! also i can't `.write` on
|
|
// `self.participants` as the `disconnect` fn needs it.
|
|
let mut participant_clone = self.participants().await;
|
|
for (_, p) in participant_clone.drain() {
|
|
if let Err(e) = self.disconnect(p).await {
|
|
error!(
|
|
?e,
|
|
"error while dropping network, the error occured when dropping a \
|
|
participant but can't be notified to the user any more"
|
|
);
|
|
}
|
|
}
|
|
self.participants.write().await.clear();
|
|
});
|
|
debug!(?pid, "shutting down Scheduler");
|
|
self.shutdown_sender
|
|
.take()
|
|
.unwrap()
|
|
.send(())
|
|
.expect("scheduler is closed, but nobody other should be able to close it");
|
|
debug!(?pid, "participants have shut down!");
|
|
}
|
|
}
|
|
|
|
impl Drop for Participant {
|
|
fn drop(&mut self) {
|
|
// ignore closed, as we need to send it even though we disconnected the
|
|
// participant from network
|
|
let pid = self.remote_pid;
|
|
debug!(?pid, "shutting down Participant");
|
|
match self.a2s_disconnect_s.take() {
|
|
None => debug!(
|
|
?pid,
|
|
"Participant has been shutdown cleanly, no further waiting is requiered!"
|
|
),
|
|
Some(mut a2s_disconnect_s) => {
|
|
debug!(
|
|
?pid,
|
|
"unclean shutdown detected, active waiting for client to be disconnected"
|
|
);
|
|
task::block_on(async {
|
|
let (finished_sender, finished_receiver) = oneshot::channel();
|
|
a2s_disconnect_s
|
|
.send((self.remote_pid, finished_sender))
|
|
.await
|
|
.expect("something is wrong in internal scheduler coding");
|
|
match finished_receiver.await {
|
|
Ok(Err(e)) => error!(
|
|
?pid,
|
|
?e,
|
|
"Error while dropping the participant, couldn't send all outgoing \
|
|
messages, dropping remaining"
|
|
),
|
|
Err(e) => warn!(
|
|
?e,
|
|
"//TODO i dont know why the finish doesnt work, i normally would \
|
|
expect to have sended a return message from the participant... \
|
|
ignoring to not caue a panic for now, please fix me"
|
|
),
|
|
_ => (),
|
|
};
|
|
});
|
|
},
|
|
}
|
|
debug!(?pid, "network dropped");
|
|
}
|
|
}
|
|
|
|
impl Drop for Stream {
|
|
fn drop(&mut self) {
|
|
// a send if closed is unecessary but doesnt hurt, we must not crash here
|
|
if !self.closed.load(Ordering::Relaxed) {
|
|
let sid = self.sid;
|
|
let pid = self.pid;
|
|
debug!(?pid, ?sid, "shutting down Stream");
|
|
if task::block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid)).is_err() {
|
|
warn!(
|
|
"Other side got already dropped, probably due to timing, other side will \
|
|
handle this gracefully"
|
|
);
|
|
};
|
|
} else {
|
|
let sid = self.sid;
|
|
let pid = self.pid;
|
|
debug!(?pid, ?sid, "not needed");
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::fmt::Debug for Participant {
|
|
#[inline]
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
let status = if self.closed.load(Ordering::Relaxed) {
|
|
"[CLOSED]"
|
|
} else {
|
|
"[OPEN]"
|
|
};
|
|
write!(
|
|
f,
|
|
"Participant {{{} local_pid: {:?}, remote_pid: {:?} }}",
|
|
status, &self.local_pid, &self.remote_pid,
|
|
)
|
|
}
|
|
}
|
|
|
|
impl<T> From<crossbeam_channel::SendError<T>> for StreamError {
|
|
fn from(_err: crossbeam_channel::SendError<T>) -> Self { StreamError::StreamClosed }
|
|
}
|
|
|
|
impl<T> From<crossbeam_channel::SendError<T>> for ParticipantError {
|
|
fn from(_err: crossbeam_channel::SendError<T>) -> Self { ParticipantError::ParticipantClosed }
|
|
}
|
|
|
|
impl<T> From<crossbeam_channel::SendError<T>> for NetworkError {
|
|
fn from(_err: crossbeam_channel::SendError<T>) -> Self { NetworkError::NetworkClosed }
|
|
}
|
|
|
|
impl From<async_std::io::Error> for NetworkError {
|
|
fn from(err: async_std::io::Error) -> Self { NetworkError::ListenFailed(err) }
|
|
}
|
|
|
|
impl From<std::option::NoneError> for StreamError {
|
|
fn from(_err: std::option::NoneError) -> Self { StreamError::StreamClosed }
|
|
}
|
|
|
|
impl From<std::option::NoneError> for ParticipantError {
|
|
fn from(_err: std::option::NoneError) -> Self { ParticipantError::ParticipantClosed }
|
|
}
|
|
|
|
impl From<std::option::NoneError> for NetworkError {
|
|
fn from(_err: std::option::NoneError) -> Self { NetworkError::NetworkClosed }
|
|
}
|
|
|
|
impl From<mpsc::SendError> for ParticipantError {
|
|
fn from(_err: mpsc::SendError) -> Self { ParticipantError::ParticipantClosed }
|
|
}
|
|
|
|
impl From<mpsc::SendError> for NetworkError {
|
|
fn from(_err: mpsc::SendError) -> Self { NetworkError::NetworkClosed }
|
|
}
|
|
|
|
impl From<oneshot::Canceled> for ParticipantError {
|
|
fn from(_err: oneshot::Canceled) -> Self { ParticipantError::ParticipantClosed }
|
|
}
|
|
|
|
impl From<oneshot::Canceled> for NetworkError {
|
|
fn from(_err: oneshot::Canceled) -> Self { NetworkError::NetworkClosed }
|
|
}
|
|
|
|
impl From<Box<bincode::ErrorKind>> for StreamError {
|
|
fn from(err: Box<bincode::ErrorKind>) -> Self { StreamError::DeserializeError(err) }
|
|
}
|
|
|
|
impl core::fmt::Display for StreamError {
|
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
|
match self {
|
|
StreamError::StreamClosed => write!(f, "stream closed"),
|
|
StreamError::DeserializeError(err) => {
|
|
write!(f, "deserialize error on message: {}", err)
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
impl core::fmt::Display for ParticipantError {
|
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
|
match self {
|
|
ParticipantError::ParticipantClosed => write!(f, "participant closed"),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl core::fmt::Display for NetworkError {
|
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
|
match self {
|
|
NetworkError::NetworkClosed => write!(f, "network closed"),
|
|
NetworkError::ListenFailed(_) => write!(f, "listening failed"),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// implementing PartialEq as it's super convenient in tests
|
|
impl core::cmp::PartialEq for StreamError {
|
|
fn eq(&self, other: &Self) -> bool {
|
|
match self {
|
|
StreamError::StreamClosed => match other {
|
|
StreamError::StreamClosed => true,
|
|
StreamError::DeserializeError(_) => false,
|
|
},
|
|
StreamError::DeserializeError(err) => match other {
|
|
StreamError::StreamClosed => false,
|
|
StreamError::DeserializeError(other_err) => partial_eq_bincode(err, other_err),
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::error::Error for StreamError {}
|
|
impl std::error::Error for ParticipantError {}
|
|
impl std::error::Error for NetworkError {}
|