veloren/network/src/api.rs

1162 lines
46 KiB
Rust

//!
//!
//!
//! (cd network/examples/async_recv && RUST_BACKTRACE=1 cargo run)
use crate::{
message::{partial_eq_bincode, IncomingMessage, Message, OutgoingMessage},
participant::{A2bStreamOpen, S2bShutdownBparticipant},
scheduler::Scheduler,
types::{Mid, Pid, Prio, Promises, Sid},
};
#[cfg(feature = "compression")]
use lz_fear::raw::DecodeError;
#[cfg(feature = "metrics")]
use prometheus::Registry;
use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::HashMap,
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::{
io,
runtime::Runtime,
sync::{mpsc, oneshot, Mutex},
};
use tracing::*;
use tracing_futures::Instrument;
type A2sDisconnect = Arc<Mutex<Option<mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>>>>;
/// Represents a Tcp or Udp or Mpsc address
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum ProtocolAddr {
Tcp(SocketAddr),
Udp(SocketAddr),
Mpsc(u64),
}
/// `Participants` are generated by the [`Network`] and represent a connection
/// to a remote Participant. Look at the [`connect`] and [`connected`] method of
/// [`Networks`] on how to generate `Participants`
///
/// [`Networks`]: crate::api::Network
/// [`connect`]: Network::connect
/// [`connected`]: Network::connected
pub struct Participant {
local_pid: Pid,
remote_pid: Pid,
runtime: Arc<Runtime>,
a2b_stream_open_s: Mutex<mpsc::UnboundedSender<A2bStreamOpen>>,
b2a_stream_opened_r: Mutex<mpsc::UnboundedReceiver<Stream>>,
a2s_disconnect_s: A2sDisconnect,
}
/// `Streams` represents a channel to send `n` messages with a certain priority
/// and [`Promises`]. messages need always to be send between 2 `Streams`.
///
/// `Streams` are generated by the [`Participant`].
/// Look at the [`open`] and [`opened`] method of [`Participant`] on how to
/// generate `Streams`
///
/// Unlike [`Network`] and [`Participant`], `Streams` don't implement interior
/// mutability, as multiple threads don't need access to the same `Stream`.
///
/// [`Networks`]: crate::api::Network
/// [`open`]: Participant::open
/// [`opened`]: Participant::opened
#[derive(Debug)]
pub struct Stream {
pid: Pid,
sid: Sid,
mid: Mid,
prio: Prio,
promises: Promises,
send_closed: Arc<AtomicBool>,
a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
b2a_msg_recv_r: Option<async_channel::Receiver<IncomingMessage>>,
a2b_close_stream_s: Option<mpsc::UnboundedSender<Sid>>,
}
/// Error type thrown by [`Networks`](Network) methods
#[derive(Debug)]
pub enum NetworkError {
NetworkClosed,
ListenFailed(std::io::Error),
ConnectFailed(std::io::Error),
}
/// Error type thrown by [`Participants`](Participant) methods
#[derive(Debug, PartialEq, Clone)]
pub enum ParticipantError {
///Participant was closed by remote side
ParticipantDisconnected,
///Underlying Protocol failed and wasn't able to recover, expect some Data
/// loss unfortunately, there is no method to get the exact messages
/// that failed. This is also returned when local side tries to do
/// something while remote site gracefully disconnects
ProtocolFailedUnrecoverable,
}
/// Error type thrown by [`Streams`](Stream) methods
/// A Compression Error should only happen if a client sends malicious code.
/// A Deserialize Error probably means you are expecting Type X while you
/// actually got send type Y.
#[derive(Debug)]
pub enum StreamError {
StreamClosed,
#[cfg(feature = "compression")]
Compression(DecodeError),
Deserialize(bincode::Error),
}
/// Use the `Network` to create connections to other [`Participants`]
///
/// The `Network` is the single source that handles all connections in your
/// Application. You can pass it around multiple threads in an
/// [`Arc`](std::sync::Arc) as all commands have internal mutability.
///
/// The `Network` has methods to [`connect`] to other [`Participants`] actively
/// via their [`ProtocolAddr`], or [`listen`] passively for [`connected`]
/// [`Participants`].
///
/// # Examples
/// ```rust
/// use veloren_network::{Network, ProtocolAddr, Pid};
/// use futures::executor::block_on;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// block_on(async{
/// # //setup pseudo database!
/// # let (database, fd) = Network::new(Pid::new());
/// # std::thread::spawn(fd);
/// # database.listen(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?;
/// let database = network.connect(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Participants`]: crate::api::Participant
/// [`connect`]: Network::connect
/// [`listen`]: Network::listen
/// [`connected`]: Network::connected
pub struct Network {
local_pid: Pid,
runtime: Arc<Runtime>,
participant_disconnect_sender: Mutex<HashMap<Pid, A2sDisconnect>>,
listen_sender:
Mutex<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<tokio::io::Result<()>>)>>,
connect_sender:
Mutex<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<io::Result<Participant>>)>>,
connected_receiver: Mutex<mpsc::UnboundedReceiver<Participant>>,
shutdown_sender: Option<oneshot::Sender<()>>,
}
impl Network {
/// Generates a new `Network` to handle all connections in an Application
///
/// # Arguments
/// * `participant_id` - provide it by calling [`Pid::new()`], usually you
/// don't want to reuse a Pid for 2 `Networks`
/// * `runtime` - provide a tokio::Runtime, it's used to internally spawn
/// tasks. It is necessary to clean up in the non-async `Drop`. **All**
/// network related components **must** be dropped before the runtime is
/// stopped. dropping the runtime while a shutdown is still in progress
/// leaves the network in a bad state which might cause a panic!
///
/// # Result
/// * `Self` - returns a `Network` which can be `Send` to multiple areas of
/// your code, including multiple threads. This is the base strct of this
/// crate.
///
/// # Examples
/// ```rust
/// //Example with tokio
/// use std::sync::Arc;
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// let runtime = Runtime::new();
/// let network = Network::new(Pid::new(), Arc::new(runtime));
/// ```
///
///
/// Usually you only create a single `Network` for an application,
/// except when client and server are in the same application, then you
/// will want 2. However there are no technical limitations from
/// creating more.
///
/// [`Pid::new()`]: crate::types::Pid::new
/// [`ThreadPool`]: https://docs.rs/uvth/newest/uvth/struct.ThreadPool.html
/// [`uvth`]: https://docs.rs/uvth
pub fn new(participant_id: Pid, runtime: Arc<Runtime>) -> Self {
Self::internal_new(
participant_id,
runtime,
#[cfg(feature = "metrics")]
None,
)
}
/// See [`new`]
///
/// # additional Arguments
/// * `registry` - Provide a Registry in order to collect Prometheus metrics
/// by this `Network`, `None` will deactivate Tracing. Tracing is done via
/// [`prometheus`]
///
/// # Examples
/// ```rust
/// use prometheus::Registry;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// let registry = Registry::new();
/// let (network, f) = Network::new_with_registry(Pid::new(), &registry);
/// std::thread::spawn(f);
/// ```
/// [`new`]: crate::api::Network::new
#[cfg(feature = "metrics")]
pub fn new_with_registry(
participant_id: Pid,
runtime: Arc<Runtime>,
registry: &Registry,
) -> Self {
Self::internal_new(participant_id, runtime, Some(registry))
}
fn internal_new(
participant_id: Pid,
runtime: Arc<Runtime>,
#[cfg(feature = "metrics")] registry: Option<&Registry>,
) -> Self {
let p = participant_id;
debug!(?p, "Starting Network");
let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) =
Scheduler::new(
participant_id,
Arc::clone(&runtime),
#[cfg(feature = "metrics")]
registry,
);
runtime.spawn(async move {
trace!(?p, "Starting scheduler in own thread");
scheduler
.run()
.instrument(tracing::info_span!("scheduler", ?p))
.await;
trace!(?p, "Stopping scheduler and his own thread");
});
Self {
local_pid: participant_id,
runtime,
participant_disconnect_sender: Mutex::new(HashMap::new()),
listen_sender: Mutex::new(listen_sender),
connect_sender: Mutex::new(connect_sender),
connected_receiver: Mutex::new(connected_receiver),
shutdown_sender: Some(shutdown_sender),
}
}
/// starts listening on an [`ProtocolAddr`].
/// When the method returns the `Network` is ready to listen for incoming
/// connections OR has returned a [`NetworkError`] (e.g. port already used).
/// You can call [`connected`] to asynchrony wait for a [`Participant`] to
/// connect. You can call `listen` on multiple addresses, e.g. to
/// support multiple Protocols or NICs.
///
/// # Examples
/// ```rust
/// use futures::executor::block_on;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// block_on(async {
/// network
/// .listen(ProtocolAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
/// .await?;
/// network
/// .listen(ProtocolAddr::Udp("127.0.0.1:2001".parse().unwrap()))
/// .await?;
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`connected`]: Network::connected
pub async fn listen(&self, address: ProtocolAddr) -> Result<(), NetworkError> {
let (s2a_result_s, s2a_result_r) = oneshot::channel::<tokio::io::Result<()>>();
debug!(?address, "listening on address");
self.listen_sender
.lock()
.await
.send((address, s2a_result_s))?;
match s2a_result_r.await? {
//waiting guarantees that we either listened successfully or get an error like port in
// use
Ok(()) => Ok(()),
Err(e) => Err(NetworkError::ListenFailed(e)),
}
}
/// starts connection to an [`ProtocolAddr`].
/// When the method returns the Network either returns a [`Participant`]
/// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g.
/// can't connect, or invalid Handshake) # Examples
/// ```rust
/// use futures::executor::block_on;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2010".parse().unwrap())).await?;
/// # remote.listen(ProtocolAddr::Udp("127.0.0.1:2011".parse().unwrap())).await?;
/// let p1 = network
/// .connect(ProtocolAddr::Tcp("127.0.0.1:2010".parse().unwrap()))
/// .await?;
/// # //this doesn't work yet, so skip the test
/// # //TODO fixme!
/// # return Ok(());
/// let p2 = network
/// .connect(ProtocolAddr::Udp("127.0.0.1:2011".parse().unwrap()))
/// .await?;
/// assert_eq!(&p1, &p2);
/// # Ok(())
/// })
/// # }
/// ```
/// Usually the `Network` guarantees that a operation on a [`Participant`]
/// succeeds, e.g. by automatic retrying unless it fails completely e.g. by
/// disconnecting from the remote. If 2 [`ProtocolAddres`] you `connect` to
/// belongs to the same [`Participant`], you get the same [`Participant`] as
/// a result. This is useful e.g. by connecting to the same
/// [`Participant`] via multiple Protocols.
///
/// [`Streams`]: crate::api::Stream
/// [`ProtocolAddres`]: crate::api::ProtocolAddr
pub async fn connect(&self, address: ProtocolAddr) -> Result<Participant, NetworkError> {
let (pid_sender, pid_receiver) = oneshot::channel::<io::Result<Participant>>();
debug!(?address, "Connect to address");
self.connect_sender
.lock()
.await
.send((address, pid_sender))?;
let participant = match pid_receiver.await? {
Ok(p) => p,
Err(e) => return Err(NetworkError::ConnectFailed(e)),
};
let pid = participant.remote_pid;
debug!(
?pid,
"Received Participant id from remote and return to user"
);
self.participant_disconnect_sender
.lock()
.await
.insert(pid, Arc::clone(&participant.a2s_disconnect_s));
Ok(participant)
}
/// returns a [`Participant`] created from a [`ProtocolAddr`] you called
/// [`listen`] on before. This function will either return a working
/// [`Participant`] ready to open [`Streams`] on OR has returned a
/// [`NetworkError`] (e.g. Network got closed)
///
/// # Examples
/// ```rust
/// use futures::executor::block_on;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2020` TCP and opens returns their Pid
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// network
/// .listen(ProtocolAddr::Tcp("127.0.0.1:2020".parse().unwrap()))
/// .await?;
/// # remote.connect(ProtocolAddr::Tcp("127.0.0.1:2020".parse().unwrap())).await?;
/// while let Ok(participant) = network.connected().await {
/// println!("Participant connected: {}", participant.remote_pid());
/// # //skip test here as it would be a endless loop
/// # break;
/// }
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Streams`]: crate::api::Stream
/// [`listen`]: crate::api::Network::listen
pub async fn connected(&self) -> Result<Participant, NetworkError> {
let participant = self.connected_receiver.lock().await.recv().await?;
self.participant_disconnect_sender.lock().await.insert(
participant.remote_pid,
Arc::clone(&participant.a2s_disconnect_s),
);
Ok(participant)
}
}
impl Participant {
pub(crate) fn new(
local_pid: Pid,
remote_pid: Pid,
runtime: Arc<Runtime>,
a2b_stream_open_s: mpsc::UnboundedSender<A2bStreamOpen>,
b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>,
) -> Self {
Self {
local_pid,
remote_pid,
runtime,
a2b_stream_open_s: Mutex::new(a2b_stream_open_s),
b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r),
a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
}
}
/// Opens a [`Stream`] on this `Participant` with a certain Priority and
/// [`Promises`]
///
/// # Arguments
/// * `prio` - valid between 0-63. The priority rates the throughput for
/// messages of the [`Stream`] e.g. prio 5 messages will get 1/2 the speed
/// prio0 messages have. Prio10 messages only 1/4 and Prio 15 only 1/8,
/// etc...
/// * `promises` - use a combination of you prefered [`Promises`], see the
/// link for further documentation. You can combine them, e.g.
/// `Promises::ORDERED | Promises::CONSISTENCY` The Stream will then
/// guarantee that those promises are met.
///
/// A [`ParticipantError`] might be thrown if the `Participant` is already
/// closed. [`Streams`] can be created without a answer from the remote
/// side, resulting in very fast creation and closing latency.
///
/// # Examples
/// ```rust
/// use futures::executor::block_on;
/// use veloren_network::{Network, Pid, Promises, ProtocolAddr};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, connect on port 2100 and open a stream
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap())).await?;
/// let p1 = network
/// .connect(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap()))
/// .await?;
/// let _s1 = p1
/// .open(16, Promises::ORDERED | Promises::CONSISTENCY)
/// .await?;
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Streams`]: crate::api::Stream
pub async fn open(&self, prio: u8, promises: Promises) -> Result<Stream, ParticipantError> {
let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel();
if let Err(e) =
self.a2b_stream_open_s
.lock()
.await
.send((prio, promises, p2a_return_stream_s))
{
debug!(?e, "bParticipant is already closed, notifying");
return Err(ParticipantError::ParticipantDisconnected);
}
match p2a_return_stream_r.await {
Ok(stream) => {
let sid = stream.sid;
debug!(?sid, ?self.remote_pid, "opened stream");
Ok(stream)
},
Err(_) => {
debug!(?self.remote_pid, "p2a_return_stream_r failed, closing participant");
Err(ParticipantError::ParticipantDisconnected)
},
}
}
/// Use this method to handle [`Streams`] opened from remote site, like the
/// [`connected`] method of [`Network`]. This is the associated method
/// to [`open`]. It's guaranteed that the order of [`open`] and `opened`
/// is equal. The `nth` [`Streams`] on one side will represent the `nth` on
/// the other side. A [`ParticipantError`] might be thrown if the
/// `Participant` is already closed.
///
/// # Examples
/// ```rust
/// use veloren_network::{Network, Pid, ProtocolAddr, Promises};
/// use futures::executor::block_on;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, connect on port 2110 and wait for the other side to open a stream
/// // Note: It's quite unusual to actively connect, but then wait on a stream to be connected, usually the Application taking initiative want's to also create the first Stream.
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
/// let p1 = network.connect(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
/// # let p2 = remote.connected().await?;
/// # p2.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?;
/// let _s1 = p1.opened().await?;
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Streams`]: crate::api::Stream
/// [`connected`]: Network::connected
/// [`open`]: Participant::open
pub async fn opened(&self) -> Result<Stream, ParticipantError> {
match self.b2a_stream_opened_r.lock().await.recv().await {
Some(stream) => {
let sid = stream.sid;
debug!(?sid, ?self.remote_pid, "Receive opened stream");
Ok(stream)
},
None => {
debug!(?self.remote_pid, "stream_opened_receiver failed, closing participant");
Err(ParticipantError::ParticipantDisconnected)
},
}
}
/// disconnecting a `Participant` in a async way.
/// Use this rather than `Participant::Drop` if you want to close multiple
/// `Participants`.
///
/// This function will wait for all [`Streams`] to properly close, including
/// all messages to be send before closing. If an error occurs with one
/// of the messages.
/// Except if the remote side already dropped the `Participant`
/// simultaneously, then messages won't be send
///
/// There is NO `disconnected` function in `Participant`, if a `Participant`
/// is no longer reachable (e.g. as the network cable was unplugged) the
/// `Participant` will fail all action, but needs to be manually
/// disconnected, using this function.
///
/// # Examples
/// ```rust
/// use futures::executor::block_on;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection.
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// network
/// .listen(ProtocolAddr::Tcp("127.0.0.1:2030".parse().unwrap()))
/// .await?;
/// # let keep_alive = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2030".parse().unwrap())).await?;
/// while let Ok(participant) = network.connected().await {
/// println!("Participant connected: {}", participant.remote_pid());
/// participant.disconnect().await?;
/// # //skip test here as it would be a endless loop
/// # break;
/// }
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Streams`]: crate::api::Stream
pub async fn disconnect(self) -> Result<(), ParticipantError> {
// Remove, Close and try_unwrap error when unwrap fails!
let pid = self.remote_pid;
debug!(?pid, "Closing participant from network");
//Streams will be closed by BParticipant
match self.a2s_disconnect_s.lock().await.take() {
Some(a2s_disconnect_s) => {
let (finished_sender, finished_receiver) = oneshot::channel();
// Participant is connecting to Scheduler here, not as usual
// Participant<->BParticipant
a2s_disconnect_s
.send((pid, finished_sender))
.expect("Something is wrong in internal scheduler coding");
match finished_receiver.await {
Ok(res) => {
match res {
Ok(()) => trace!(?pid, "Participant is now closed"),
Err(ref e) => {
trace!(?pid, ?e, "Error occurred during shutdown of participant")
},
};
res
},
Err(e) => {
//this is a bug. but as i am Participant i can't destroy the network
error!(
?pid,
?e,
"Failed to get a message back from the scheduler, seems like the \
network is already closed"
);
Err(ParticipantError::ProtocolFailedUnrecoverable)
},
}
},
None => {
warn!(
"seems like you are trying to disconnecting a participant after the network \
was already dropped. It was already dropped with the network!"
);
Err(ParticipantError::ParticipantDisconnected)
},
}
}
/// Returns the remote [`Pid`]
pub fn remote_pid(&self) -> Pid { self.remote_pid }
}
impl Stream {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
pid: Pid,
sid: Sid,
prio: Prio,
promises: Promises,
send_closed: Arc<AtomicBool>,
a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
b2a_msg_recv_r: async_channel::Receiver<IncomingMessage>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
) -> Self {
Self {
pid,
sid,
mid: 0,
prio,
promises,
send_closed,
a2b_msg_s,
b2a_msg_recv_r: Some(b2a_msg_recv_r),
a2b_close_stream_s: Some(a2b_close_stream_s),
}
}
/// use to send a arbitrary message to the remote side, by having the remote
/// side also opened a `Stream` linked to this. the message will be
/// [`Serialized`], which actually is quite slow compared to most other
/// calculations done. A faster method [`send_raw`] exists, when extra
/// speed is needed. The other side needs to use the respective [`recv`]
/// function and know the type send.
///
/// `send` is an exception to the `async` messages, as it's probably called
/// quite often so it doesn't wait for execution. Which also means, that
/// no feedback is provided. It's to assume that the Message got `send`
/// correctly. If a error occurred, the next call will return an Error.
/// If the [`Participant`] disconnected it will also be unable to be used
/// any more. A [`StreamError`] will be returned in the error case, e.g.
/// when the `Stream` got closed already.
///
/// Note when a `Stream` is dropped locally, it will still send all
/// messages, though the `drop` will return immediately, however, when a
/// [`Participant`] gets gracefully shut down, all remaining messages
/// will be send. If the `Stream` is dropped from remote side no further
/// messages are send, because the remote side has no way of listening
/// to them either way. If the last channel is destroyed (e.g. by losing
/// the internet connection or non-graceful shutdown, pending messages
/// are also dropped.
///
/// # Example
/// ```
/// use veloren_network::{Network, ProtocolAddr, Pid};
/// # use veloren_network::Promises;
/// use futures::executor::block_on;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World`
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
/// # // keep it alive
/// # let _stream_p = remote_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?;
/// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// //Send Message
/// stream_a.send("Hello World")?;
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`send_raw`]: Stream::send_raw
/// [`recv`]: Stream::recv
/// [`Serialized`]: Serialize
#[inline]
pub fn send<M: Serialize>(&mut self, msg: M) -> Result<(), StreamError> {
self.send_raw(&Message::serialize(&msg, &self))
}
/// This methods give the option to skip multiple calls of [`bincode`] and
/// [`compress`], e.g. in case the same Message needs to send on
/// multiple `Streams` to multiple [`Participants`]. Other then that,
/// the same rules apply than for [`send`].
/// You need to create a Message via [`Message::serialize`].
///
/// # Example
/// ```rust
/// use veloren_network::{Network, ProtocolAddr, Pid, Message};
/// # use veloren_network::Promises;
/// use futures::executor::block_on;
/// use bincode;
/// use std::sync::Arc;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote1, fr1) = Network::new(Pid::new());
/// # std::thread::spawn(fr1);
/// # let (remote2, fr2) = Network::new(Pid::new());
/// # std::thread::spawn(fr2);
/// block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
/// # let remote1_p = remote1.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
/// # let remote2_p = remote2.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
/// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid());
/// # remote1_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?;
/// # remote2_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?;
/// let participant_a = network.connected().await?;
/// let participant_b = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// let mut stream_b = participant_b.opened().await?;
///
/// //Prepare Message and decode it
/// let msg = Message::serialize("Hello World", &stream_a);
/// //Send same Message to multiple Streams
/// stream_a.send_raw(&msg);
/// stream_b.send_raw(&msg);
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`send`]: Stream::send
/// [`Participants`]: crate::api::Participant
/// [`compress`]: lz_fear::raw::compress2
/// [`Message::serialize`]: crate::message::Message::serialize
pub fn send_raw(&mut self, message: &Message) -> Result<(), StreamError> {
if self.send_closed.load(Ordering::Relaxed) {
return Err(StreamError::StreamClosed);
}
#[cfg(debug_assertions)]
message.verify(&self);
self.a2b_msg_s.send((self.prio, self.sid, OutgoingMessage {
buffer: Arc::clone(&message.buffer),
cursor: 0,
mid: self.mid,
sid: self.sid,
}))?;
self.mid += 1;
Ok(())
}
/// use `recv` to wait on a Message send from the remote side by their
/// `Stream`. The Message needs to implement [`DeserializeOwned`] and
/// thus, the resulting type must already be known by the receiving side.
/// If this is not know from the Application logic, one could use a `Enum`
/// and then handle the received message via a `match` state.
///
/// A [`StreamError`] will be returned in the error case, e.g. when the
/// `Stream` got closed already.
///
/// # Example
/// ```
/// use veloren_network::{Network, ProtocolAddr, Pid};
/// # use veloren_network::Promises;
/// use futures::executor::block_on;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
/// # let mut stream_p = remote_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?;
/// # stream_p.send("Hello World");
/// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// //Recv Message
/// println!("{}", stream_a.recv::<String>().await?);
/// # Ok(())
/// })
/// # }
/// ```
#[inline]
pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError> {
self.recv_raw().await?.deserialize()
}
/// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] or
/// [`decompress`] is executed for performance reasons.
///
/// # Example
/// ```
/// use veloren_network::{Network, ProtocolAddr, Pid};
/// # use veloren_network::Promises;
/// use futures::executor::block_on;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on Port `2230` and wait for a Stream to be opened, then listen on it
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
/// # let mut stream_p = remote_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?;
/// # stream_p.send("Hello World");
/// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// //Recv Message
/// let msg = stream_a.recv_raw().await?;
/// //Resend Message, without deserializing
/// stream_a.send_raw(&msg)?;
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`send_raw`]: Stream::send_raw
/// [`recv`]: Stream::recv
/// [`decompress`]: lz_fear::raw::decompress_raw
pub async fn recv_raw(&mut self) -> Result<Message, StreamError> {
match &mut self.b2a_msg_recv_r {
Some(b2a_msg_recv_r) => {
match b2a_msg_recv_r.recv().await {
Ok(msg) => Ok(Message {
buffer: Arc::new(msg.buffer),
#[cfg(feature = "compression")]
compressed: self.promises.contains(Promises::COMPRESSED),
}),
Err(_) => {
self.b2a_msg_recv_r = None; //prevent panic
Err(StreamError::StreamClosed)
},
}
},
None => Err(StreamError::StreamClosed),
}
}
/// use `try_recv` to check for a Message send from the remote side by their
/// `Stream`. This function does not block and returns immediately. It's
/// intended for use in non-async context only. Other then that, the
/// same rules apply than for [`recv`].
///
/// # Example
/// ```
/// use veloren_network::{Network, ProtocolAddr, Pid};
/// # use veloren_network::Promises;
/// use futures::executor::block_on;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on Port `2240` and wait for a Stream to be opened, then listen on it
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
/// # let mut stream_p = remote_p.open(16, Promises::ORDERED | Promises::CONSISTENCY).await?;
/// # stream_p.send("Hello World");
/// # std::thread::sleep(std::time::Duration::from_secs(1));
/// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// //Try Recv Message
/// println!("{:?}", stream_a.try_recv::<String>()?);
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`recv`]: Stream::recv
#[inline]
pub fn try_recv<M: DeserializeOwned>(&mut self) -> Result<Option<M>, StreamError> {
match &mut self.b2a_msg_recv_r {
Some(b2a_msg_recv_r) => match b2a_msg_recv_r.try_recv() {
Ok(msg) => Ok(Some(
Message {
buffer: Arc::new(msg.buffer),
#[cfg(feature = "compression")]
compressed: self.promises().contains(Promises::COMPRESSED),
}
.deserialize()?,
)),
Err(async_channel::TryRecvError::Empty) => Ok(None),
Err(async_channel::TryRecvError::Closed) => {
self.b2a_msg_recv_r = None; //prevent panic
Err(StreamError::StreamClosed)
},
},
None => Err(StreamError::StreamClosed),
}
}
pub fn promises(&self) -> Promises { self.promises }
}
impl core::cmp::PartialEq for Participant {
fn eq(&self, other: &Self) -> bool {
//don't check local_pid, 2 Participant from different network should match if
// they are the "same"
self.remote_pid == other.remote_pid
}
}
impl Drop for Network {
fn drop(&mut self) {
let pid = self.local_pid;
debug!(?pid, "Shutting down Network");
trace!(
?pid,
"Shutting down Participants of Network, while we still have metrics"
);
let mut finished_receiver_list = vec![];
self.runtime.block_on(async {
// we MUST avoid nested block_on, good that Network::Drop no longer triggers
// Participant::Drop directly but just the BParticipant
for (remote_pid, a2s_disconnect_s) in
self.participant_disconnect_sender.lock().await.drain()
{
match a2s_disconnect_s.lock().await.take() {
Some(a2s_disconnect_s) => {
trace!(?remote_pid, "Participants will be closed");
let (finished_sender, finished_receiver) = oneshot::channel();
finished_receiver_list.push((remote_pid, finished_receiver));
a2s_disconnect_s.send((remote_pid, finished_sender)).expect(
"Scheduler is closed, but nobody other should be able to close it",
);
},
None => trace!(?remote_pid, "Participant already disconnected gracefully"),
}
}
//wait after close is requested for all
for (remote_pid, finished_receiver) in finished_receiver_list.drain(..) {
match finished_receiver.await {
Ok(Ok(())) => trace!(?remote_pid, "disconnect successful"),
Ok(Err(e)) => info!(?remote_pid, ?e, "unclean disconnect"),
Err(e) => warn!(
?remote_pid,
?e,
"Failed to get a message back from the scheduler, seems like the network \
is already closed"
),
}
}
});
trace!(?pid, "Participants have shut down!");
trace!(?pid, "Shutting down Scheduler");
self.shutdown_sender.take().unwrap().send(()).expect("Scheduler is closed, but nobody other should be able to close it");
debug!(?pid, "Network has shut down");
}
}
impl Drop for Participant {
fn drop(&mut self) {
// ignore closed, as we need to send it even though we disconnected the
// participant from network
let pid = self.remote_pid;
debug!(?pid, "Shutting down Participant");
match self
.a2s_disconnect_s
.try_lock()
.expect("Participant in use while beeing dropped")
.take()
{
None => trace!(
?pid,
"Participant has been shutdown cleanly, no further waiting is required!"
),
Some(a2s_disconnect_s) => {
debug!(?pid, "Disconnect from Scheduler");
self.runtime.block_on(async {
let (finished_sender, finished_receiver) = oneshot::channel();
a2s_disconnect_s
.send((self.remote_pid, finished_sender))
.expect("Something is wrong in internal scheduler coding");
if let Err(e) = finished_receiver
.await
.expect("Something is wrong in internal scheduler/participant coding")
{
error!(
?pid,
?e,
"Error while dropping the participant, couldn't send all outgoing \
messages, dropping remaining"
);
};
});
},
}
debug!(?pid, "Participant dropped");
}
}
impl Drop for Stream {
fn drop(&mut self) {
// send if closed is unnecessary but doesn't hurt, we must not crash
if !self.send_closed.load(Ordering::Relaxed) {
let sid = self.sid;
let pid = self.pid;
debug!(?pid, ?sid, "Shutting down Stream");
self.a2b_close_stream_s
.take()
.unwrap()
.send(self.sid)
.expect("bparticipant part of a gracefully shutdown must have crashed");
} else {
let sid = self.sid;
let pid = self.pid;
trace!(?pid, ?sid, "Stream Drop not needed");
}
}
}
impl std::fmt::Debug for Participant {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Participant {{ local_pid: {:?}, remote_pid: {:?} }}",
&self.local_pid, &self.remote_pid,
)
}
}
impl<T> From<crossbeam_channel::SendError<T>> for StreamError {
fn from(_err: crossbeam_channel::SendError<T>) -> Self { StreamError::StreamClosed }
}
impl<T> From<crossbeam_channel::SendError<T>> for NetworkError {
fn from(_err: crossbeam_channel::SendError<T>) -> Self { NetworkError::NetworkClosed }
}
impl From<std::option::NoneError> for StreamError {
fn from(_err: std::option::NoneError) -> Self { StreamError::StreamClosed }
}
impl From<std::option::NoneError> for NetworkError {
fn from(_err: std::option::NoneError) -> Self { NetworkError::NetworkClosed }
}
impl<T> From<mpsc::error::SendError<T>> for NetworkError {
fn from(_err: mpsc::error::SendError<T>) -> Self { NetworkError::NetworkClosed }
}
impl From<oneshot::error::RecvError> for NetworkError {
fn from(_err: oneshot::error::RecvError) -> Self { NetworkError::NetworkClosed }
}
impl From<std::io::Error> for NetworkError {
fn from(_err: std::io::Error) -> Self { NetworkError::NetworkClosed }
}
impl From<Box<bincode::ErrorKind>> for StreamError {
fn from(err: Box<bincode::ErrorKind>) -> Self { StreamError::Deserialize(err) }
}
impl core::fmt::Display for StreamError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
StreamError::StreamClosed => write!(f, "stream closed"),
#[cfg(feature = "compression")]
StreamError::Compression(err) => write!(f, "compression error on message: {}", err),
StreamError::Deserialize(err) => write!(f, "deserialize error on message: {}", err),
}
}
}
impl core::fmt::Display for ParticipantError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
ParticipantError::ParticipantDisconnected => write!(f, "Participant disconnect"),
ParticipantError::ProtocolFailedUnrecoverable => {
write!(f, "underlying protocol failed unrecoverable")
},
}
}
}
impl core::fmt::Display for NetworkError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
NetworkError::NetworkClosed => write!(f, "Network closed"),
NetworkError::ListenFailed(_) => write!(f, "Listening failed"),
NetworkError::ConnectFailed(_) => write!(f, "Connecting failed"),
}
}
}
/// implementing PartialEq as it's super convenient in tests
impl core::cmp::PartialEq for StreamError {
fn eq(&self, other: &Self) -> bool {
match self {
StreamError::StreamClosed => match other {
StreamError::StreamClosed => true,
#[cfg(feature = "compression")]
StreamError::Compression(_) => false,
StreamError::Deserialize(_) => false,
},
#[cfg(feature = "compression")]
StreamError::Compression(err) => match other {
StreamError::StreamClosed => false,
#[cfg(feature = "compression")]
StreamError::Compression(other_err) => err == other_err,
StreamError::Deserialize(_) => false,
},
StreamError::Deserialize(err) => match other {
StreamError::StreamClosed => false,
#[cfg(feature = "compression")]
StreamError::Compression(_) => false,
StreamError::Deserialize(other_err) => partial_eq_bincode(err, other_err),
},
}
}
}
impl std::error::Error for StreamError {}
impl std::error::Error for ParticipantError {}
impl std::error::Error for NetworkError {}