veloren/network/src/api.rs
Marcel Märtens dd581bc6c0 Participant closure was immeatiatly, even in case a new participant was connected, send a MSG and then dropped immeadiatly.
The remote site should see it connect, be open for 1 single stream and read the message before it's notified that the participant is closed actually.

This caused the faulure in one of our API tests (in lib, with client and server). Where it was possible that all messages were send and one side was dropped before the other side asked for the opened stream

Also introduce better error detection in participant(and scheduler) by removing the std_async::Result and intruduce `Result<(),ParticipantError>` instead
2020-07-22 09:18:15 +02:00

1055 lines
42 KiB
Rust

//!
//!
//!
//! (cd network/examples/async_recv && RUST_BACKTRACE=1 cargo run)
use crate::{
message::{self, partial_eq_bincode, IncomingMessage, MessageBuffer, OutgoingMessage},
participant::{A2bStreamOpen, S2bShutdownBparticipant},
scheduler::Scheduler,
types::{Mid, Pid, Prio, Promises, Sid},
};
use async_std::{
io,
sync::{Mutex, RwLock},
task,
};
use futures::{
channel::{mpsc, oneshot},
sink::SinkExt,
stream::StreamExt,
};
#[cfg(feature = "metrics")]
use prometheus::Registry;
use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::HashMap,
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tracing::*;
use tracing_futures::Instrument;
type A2sDisconnect = Arc<Mutex<Option<mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>>>>;
/// Represents a Tcp or Udp or Mpsc address
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum ProtocolAddr {
Tcp(SocketAddr),
Udp(SocketAddr),
Mpsc(u64),
}
/// `Participants` are generated by the [`Network`] and represent a connection
/// to a remote Participant. Look at the [`connect`] and [`connected`] method of
/// [`Networks`] on how to generate `Participants`
///
/// [`Networks`]: crate::api::Network
/// [`connect`]: Network::connect
/// [`connected`]: Network::connected
pub struct Participant {
local_pid: Pid,
remote_pid: Pid,
a2b_stream_open_s: RwLock<mpsc::UnboundedSender<A2bStreamOpen>>,
b2a_stream_opened_r: RwLock<mpsc::UnboundedReceiver<Stream>>,
a2s_disconnect_s: A2sDisconnect,
}
/// `Streams` represents a channel to send `n` messages with a certain priority
/// and [`Promises`]. messages need always to be send between 2 `Streams`.
///
/// `Streams` are generated by the [`Participant`].
/// Look at the [`open`] and [`opened`] method of [`Participant`] on how to
/// generate `Streams`
///
/// Unlike [`Network`] and [`Participant`], `Streams` don't implement interior
/// mutability, as multiple threads don't need access to the same `Stream`.
///
/// [`Networks`]: crate::api::Network
/// [`open`]: Participant::open
/// [`opened`]: Participant::opened
#[derive(Debug)]
pub struct Stream {
pid: Pid,
sid: Sid,
mid: Mid,
prio: Prio,
promises: Promises,
send_closed: Arc<AtomicBool>,
a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
b2a_msg_recv_r: mpsc::UnboundedReceiver<IncomingMessage>,
a2b_close_stream_s: Option<mpsc::UnboundedSender<Sid>>,
}
/// Error type thrown by [`Networks`](Network) methods
#[derive(Debug)]
pub enum NetworkError {
NetworkClosed,
ListenFailed(std::io::Error),
ConnectFailed(std::io::Error),
}
/// Error type thrown by [`Participants`](Participant) methods
#[derive(Debug, PartialEq, Clone)]
pub enum ParticipantError {
///Participant was closed by remote side
ParticipantDisconnected,
///Underlying Protocol failed and wasn't able to recover, expect some Data
/// loss unfortunately, there is no method to get the exact messages
/// that failed. This is also returned when local side tries to do
/// something while remote site gracefully disconnects
ProtocolFailedUnrecoverable,
}
/// Error type thrown by [`Streams`](Stream) methods
#[derive(Debug)]
pub enum StreamError {
StreamClosed,
DeserializeError(Box<bincode::ErrorKind>),
}
/// Use the `Network` to create connections to other [`Participants`]
///
/// The `Network` is the single source that handles all connections in your
/// Application. You can pass it around multiple threads in an
/// [`Arc`](std::sync::Arc) as all commands have internal mutability.
///
/// The `Network` has methods to [`connect`] to other [`Participants`] actively
/// via their [`ProtocolAddr`], or [`listen`] passively for [`connected`]
/// [`Participants`].
///
/// # Examples
/// ```rust
/// use veloren_network::{Network, ProtocolAddr, Pid};
/// use futures::executor::block_on;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// block_on(async{
/// # //setup pseudo database!
/// # let (database, fd) = Network::new(Pid::new());
/// # std::thread::spawn(fd);
/// # database.listen(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?;
/// let database = network.connect(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Participants`]: crate::api::Participant
/// [`connect`]: Network::connect
/// [`listen`]: Network::listen
/// [`connected`]: Network::connected
pub struct Network {
local_pid: Pid,
participant_disconnect_sender: RwLock<HashMap<Pid, A2sDisconnect>>,
listen_sender:
RwLock<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<async_std::io::Result<()>>)>>,
connect_sender:
RwLock<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<io::Result<Participant>>)>>,
connected_receiver: RwLock<mpsc::UnboundedReceiver<Participant>>,
shutdown_sender: Option<oneshot::Sender<()>>,
}
impl Network {
/// Generates a new `Network` to handle all connections in an Application
///
/// # Arguments
/// * `participant_id` - provide it by calling [`Pid::new()`], usually you
/// don't want to reuse a Pid for 2 `Networks`
///
/// # Result
/// * `Self` - returns a `Network` which can be `Send` to multiple areas of
/// your code, including multiple threads. This is the base strct of this
/// crate.
/// * `FnOnce` - you need to run the returning FnOnce exactly once, probably
/// in it's own thread. this is NOT done internally, so that you are free
/// to choose the threadpool implementation of your choice. We recommend
/// using [`ThreadPool`] from [`uvth`] crate. This fn will runn the
/// Scheduler to handle all `Network` internals. Additional threads will
/// be allocated on an internal async-aware threadpool
///
/// # Examples
/// ```rust
/// //Example with uvth
/// use uvth::ThreadPoolBuilder;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// let pool = ThreadPoolBuilder::new().build();
/// let (network, f) = Network::new(Pid::new());
/// pool.execute(f);
/// ```
///
/// ```rust
/// //Example with std::thread
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// ```
///
/// Usually you only create a single `Network` for an application,
/// except when client and server are in the same application, then you
/// will want 2. However there are no technical limitations from
/// creating more.
///
/// [`Pid::new()`]: crate::types::Pid::new
/// [`ThreadPool`]: https://docs.rs/uvth/newest/uvth/struct.ThreadPool.html
/// [`uvth`]: https://docs.rs/uvth
pub fn new(participant_id: Pid) -> (Self, impl std::ops::FnOnce()) {
Self::internal_new(
participant_id,
#[cfg(feature = "metrics")]
None,
)
}
/// See [`new`]
///
/// # additional Arguments
/// * `registry` - Provide a Registy in order to collect Prometheus metrics
/// by this `Network`, `None` will deactivate Tracing. Tracing is done via
/// [`prometheus`]
///
/// # Examples
/// ```rust
/// use prometheus::Registry;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// let registry = Registry::new();
/// let (network, f) = Network::new_with_registry(Pid::new(), &registry);
/// std::thread::spawn(f);
/// ```
/// [`new`]: crate::api::Network::new
#[cfg(feature = "metrics")]
pub fn new_with_registry(
participant_id: Pid,
registry: &Registry,
) -> (Self, impl std::ops::FnOnce()) {
Self::internal_new(participant_id, Some(registry))
}
fn internal_new(
participant_id: Pid,
#[cfg(feature = "metrics")] registry: Option<&Registry>,
) -> (Self, impl std::ops::FnOnce()) {
let p = participant_id;
debug!(?p, "Starting Network");
let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) =
Scheduler::new(
participant_id,
#[cfg(feature = "metrics")]
registry,
);
(
Self {
local_pid: participant_id,
participant_disconnect_sender: RwLock::new(HashMap::new()),
listen_sender: RwLock::new(listen_sender),
connect_sender: RwLock::new(connect_sender),
connected_receiver: RwLock::new(connected_receiver),
shutdown_sender: Some(shutdown_sender),
},
move || {
trace!(?p, "Starting scheduler in own thread");
let _handle = task::block_on(
scheduler
.run()
.instrument(tracing::info_span!("scheduler", ?p)),
);
trace!(?p, "Stopping scheduler and his own thread");
},
)
}
/// starts listening on an [`ProtocolAddr`].
/// When the method returns the `Network` is ready to listen for incoming
/// connections OR has returned a [`NetworkError`] (e.g. port already used).
/// You can call [`connected`] to asynchrony wait for a [`Participant`] to
/// connect. You can call `listen` on multiple addresses, e.g. to
/// support multiple Protocols or NICs.
///
/// # Examples
/// ```rust
/// use futures::executor::block_on;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// block_on(async {
/// network
/// .listen(ProtocolAddr::Tcp("0.0.0.0:2000".parse().unwrap()))
/// .await?;
/// network
/// .listen(ProtocolAddr::Udp("127.0.0.1:2001".parse().unwrap()))
/// .await?;
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`connected`]: Network::connected
pub async fn listen(&self, address: ProtocolAddr) -> Result<(), NetworkError> {
let (s2a_result_s, s2a_result_r) = oneshot::channel::<async_std::io::Result<()>>();
debug!(?address, "listening on address");
self.listen_sender
.write()
.await
.send((address, s2a_result_s))
.await?;
match s2a_result_r.await? {
//waiting guarantees that we either listened sucessfully or get an error like port in
// use
Ok(()) => Ok(()),
Err(e) => Err(NetworkError::ListenFailed(e)),
}
}
/// starts connectiong to an [`ProtocolAddr`].
/// When the method returns the Network either returns a [`Participant`]
/// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g.
/// can't connect, or invalid Handshake) # Examples
/// ```rust
/// use futures::executor::block_on;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// # remote.listen(ProtocolAddr::Tcp("0.0.0.0:2010".parse().unwrap())).await?;
/// # remote.listen(ProtocolAddr::Udp("0.0.0.0:2011".parse().unwrap())).await?;
/// let p1 = network
/// .connect(ProtocolAddr::Tcp("127.0.0.1:2010".parse().unwrap()))
/// .await?;
/// # //this doesn't work yet, so skip the test
/// # //TODO fixme!
/// # return Ok(());
/// let p2 = network
/// .connect(ProtocolAddr::Udp("127.0.0.1:2011".parse().unwrap()))
/// .await?;
/// assert_eq!(&p1, &p2);
/// # Ok(())
/// })
/// # }
/// ```
/// Usually the `Network` guarantees that a operation on a [`Participant`]
/// succeeds, e.g. by automatic retrying unless it fails completely e.g. by
/// disconnecting from the remote. If 2 [`ProtocolAddres`] you `connect` to
/// belongs to the same [`Participant`], you get the same [`Participant`] as
/// a result. This is useful e.g. by connecting to the same
/// [`Participant`] via multiple Protocols.
///
/// [`Streams`]: crate::api::Stream
/// [`ProtocolAddres`]: crate::api::ProtocolAddr
pub async fn connect(&self, address: ProtocolAddr) -> Result<Participant, NetworkError> {
let (pid_sender, pid_receiver) = oneshot::channel::<io::Result<Participant>>();
debug!(?address, "Connect to address");
self.connect_sender
.write()
.await
.send((address, pid_sender))
.await?;
let participant = match pid_receiver.await? {
Ok(p) => p,
Err(e) => return Err(NetworkError::ConnectFailed(e)),
};
let pid = participant.remote_pid;
debug!(
?pid,
"Received Participant id from remote and return to user"
);
self.participant_disconnect_sender
.write()
.await
.insert(pid, participant.a2s_disconnect_s.clone());
Ok(participant)
}
/// returns a [`Participant`] created from a [`ProtocolAddr`] you called
/// [`listen`] on before. This function will either return a working
/// [`Participant`] ready to open [`Streams`] on OR has returned a
/// [`NetworkError`] (e.g. Network got closed)
///
/// # Examples
/// ```rust
/// use futures::executor::block_on;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2020` TCP and opens returns their Pid
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// network
/// .listen(ProtocolAddr::Tcp("0.0.0.0:2020".parse().unwrap()))
/// .await?;
/// # remote.connect(ProtocolAddr::Tcp("0.0.0.0:2020".parse().unwrap())).await?;
/// while let Ok(participant) = network.connected().await {
/// println!("Participant connected: {}", participant.remote_pid());
/// # //skip test here as it would be a endless loop
/// # break;
/// }
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Streams`]: crate::api::Stream
/// [`listen`]: crate::api::Network::listen
pub async fn connected(&self) -> Result<Participant, NetworkError> {
let participant = self.connected_receiver.write().await.next().await?;
self.participant_disconnect_sender
.write()
.await
.insert(participant.remote_pid, participant.a2s_disconnect_s.clone());
Ok(participant)
}
}
impl Participant {
pub(crate) fn new(
local_pid: Pid,
remote_pid: Pid,
a2b_stream_open_s: mpsc::UnboundedSender<A2bStreamOpen>,
b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>,
) -> Self {
Self {
local_pid,
remote_pid,
a2b_stream_open_s: RwLock::new(a2b_stream_open_s),
b2a_stream_opened_r: RwLock::new(b2a_stream_opened_r),
a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
}
}
/// Opens a [`Stream`] on this `Participant` with a certain Priority and
/// [`Promises`]
///
/// # Arguments
/// * `prio` - valid between 0-63. The priority rates the throughput for
/// messages of the [`Stream`] e.g. prio 5 messages will get 1/2 the speed
/// prio0 messages have. Prio10 messages only 1/4 and Prio 15 only 1/8,
/// etc...
/// * `promises` - use a combination of you prefered [`Promises`], see the
/// link for further documentation. You can combine them, e.g.
/// `PROMISES_ORDERED | PROMISES_CONSISTENCY` The Stream will then
/// guarantee that those promisses are met.
///
/// A [`ParticipantError`] might be thrown if the `Participant` is already
/// closed. [`Streams`] can be created without a answer from the remote
/// side, resulting in very fast creation and closing latency.
///
/// # Examples
/// ```rust
/// use futures::executor::block_on;
/// use veloren_network::{Network, Pid, ProtocolAddr, PROMISES_CONSISTENCY, PROMISES_ORDERED};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, connect on port 2100 and open a stream
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// # remote.listen(ProtocolAddr::Tcp("0.0.0.0:2100".parse().unwrap())).await?;
/// let p1 = network
/// .connect(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap()))
/// .await?;
/// let _s1 = p1.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Streams`]: crate::api::Stream
pub async fn open(&self, prio: u8, promises: Promises) -> Result<Stream, ParticipantError> {
//use this lock for now to make sure that only one open at a time is made,
// TODO: not sure if we can paralise that, check in future
let mut a2b_stream_open_s = self.a2b_stream_open_s.write().await;
let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel();
if let Err(e) = a2b_stream_open_s
.send((prio, promises, p2a_return_stream_s))
.await
{
debug!(?e, "bParticipant is already closed, notifying");
return Err(ParticipantError::ParticipantDisconnected);
}
match p2a_return_stream_r.await {
Ok(stream) => {
let sid = stream.sid;
debug!(?sid, ?self.remote_pid, "opened stream");
Ok(stream)
},
Err(_) => {
debug!(?self.remote_pid, "p2a_return_stream_r failed, closing participant");
Err(ParticipantError::ParticipantDisconnected)
},
}
}
/// Use this method to handle [`Streams`] opened from remote site, like the
/// [`connected`] method of [`Network`]. This is the associated method
/// to [`open`]. It's guaranteed that the order of [`open`] and `opened`
/// is equal. The `nth` [`Streams`] on one side will represent the `nth` on
/// the other side. A [`ParticipantError`] might be thrown if the
/// `Participant` is already closed.
///
/// # Examples
/// ```rust
/// use veloren_network::{Network, Pid, ProtocolAddr, PROMISES_ORDERED, PROMISES_CONSISTENCY};
/// use futures::executor::block_on;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, connect on port 2110 and wait for the other side to open a stream
/// // Note: It's quite unusal to activly connect, but then wait on a stream to be connected, usually the Appication taking initiative want's to also create the first Stream.
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// # remote.listen(ProtocolAddr::Tcp("0.0.0.0:2110".parse().unwrap())).await?;
/// let p1 = network.connect(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
/// # let p2 = remote.connected().await?;
/// # p2.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// let _s1 = p1.opened().await?;
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Streams`]: crate::api::Stream
/// [`connected`]: Network::connected
/// [`open`]: Participant::open
pub async fn opened(&self) -> Result<Stream, ParticipantError> {
//use this lock for now to make sure that only one open at a time is made,
// TODO: not sure if we can paralise that, check in future
let mut stream_opened_receiver = self.b2a_stream_opened_r.write().await;
match stream_opened_receiver.next().await {
Some(stream) => {
let sid = stream.sid;
debug!(?sid, ?self.remote_pid, "Receive opened stream");
Ok(stream)
},
None => {
debug!(?self.remote_pid, "stream_opened_receiver failed, closing participant");
Err(ParticipantError::ParticipantDisconnected)
},
}
}
/// disconnecting a `Participant` in a async way.
/// Use this rather than `Participant::Drop` if you want to close multiple
/// `Participants`.
///
/// This function will wait for all [`Streams`] to properly close, including
/// all messages to be send before closing. If an error occurs with one
/// of the messages.
/// Except if the remote side already dropped the `Participant`
/// simultaneously, then messages won't be send
///
/// There is NO `disconnected` function in `Participant`, if a `Participant`
/// is no longer reachable (e.g. as the network cable was unplugged) the
/// `Participant` will fail all action, but needs to be manually
/// disconected, using this function.
///
/// # Examples
/// ```rust
/// use futures::executor::block_on;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection.
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// network
/// .listen(ProtocolAddr::Tcp("0.0.0.0:2030".parse().unwrap()))
/// .await?;
/// # let keep_alive = remote.connect(ProtocolAddr::Tcp("0.0.0.0:2030".parse().unwrap())).await?;
/// while let Ok(participant) = network.connected().await {
/// println!("Participant connected: {}", participant.remote_pid());
/// participant.disconnect().await?;
/// # //skip test here as it would be a endless loop
/// # break;
/// }
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Streams`]: crate::api::Stream
pub async fn disconnect(self) -> Result<(), ParticipantError> {
// Remove, Close and try_unwrap error when unwrap fails!
let pid = self.remote_pid;
debug!(?pid, "Closing participant from network");
//Streams will be closed by BParticipant
match self.a2s_disconnect_s.lock().await.take() {
Some(mut a2s_disconnect_s) => {
let (finished_sender, finished_receiver) = oneshot::channel();
// Participant is connecting to Scheduler here, not as usual
// Participant<->BParticipant
a2s_disconnect_s
.send((pid, finished_sender))
.await
.expect("Something is wrong in internal scheduler coding");
match finished_receiver.await {
Ok(res) => {
match res {
Ok(()) => trace!(?pid, "Participant is now closed"),
Err(ref e) => {
trace!(?pid, ?e, "Error occured during shutdown of participant")
},
};
res
},
Err(e) => {
//this is a bug. but as i am Participant i can't destroy the network
error!(
?pid,
?e,
"Failed to get a message back from the scheduler, seems like the \
network is already closed"
);
Err(ParticipantError::ProtocolFailedUnrecoverable)
},
}
},
None => {
warn!(
"seems like you are trying to disconnecting a participant after the network \
was already dropped. It was already dropped with the network!"
);
Err(ParticipantError::ParticipantDisconnected)
},
}
}
/// Returns the remote [`Pid`]
pub fn remote_pid(&self) -> Pid { self.remote_pid }
}
impl Stream {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
pid: Pid,
sid: Sid,
prio: Prio,
promises: Promises,
send_closed: Arc<AtomicBool>,
a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
b2a_msg_recv_r: mpsc::UnboundedReceiver<IncomingMessage>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
) -> Self {
Self {
pid,
sid,
mid: 0,
prio,
promises,
send_closed,
a2b_msg_s,
b2a_msg_recv_r,
a2b_close_stream_s: Some(a2b_close_stream_s),
}
}
/// use to send a arbitrary message to the remote side, by having the remote
/// side also opened a `Stream` linked to this. the message will be
/// [`Serialized`], which actually is quite slow compared to most other
/// calculations done. A faster method [`send_raw`] exists, when extra
/// speed is needed. The other side needs to use the respective [`recv`]
/// function and know the type send.
///
/// `send` is an exception to the `async` messages, as it's probably called
/// quite often so it doesn't wait for execution. Which also means, that
/// no feedback is provided. It's to assume that the Message got `send`
/// correctly. If a error occurred, the next call will return an Error.
/// If the [`Participant`] disconnected it will also be unable to be used
/// any more. A [`StreamError`] will be returned in the error case, e.g.
/// when the `Stream` got closed already.
///
/// Note when a `Stream` is dropped locally, it will still send all
/// messages, though the `drop` will return immediately, however, when a
/// [`Participant`] gets gracefully shut down, all remaining messages
/// will be send. If the `Stream` is dropped from remote side no further
/// messages are send, because the remote side has no way of listening
/// to them either way. If the last channel is destroyed (e.g. by losing
/// the internet connection or non-gracefull shutdown, pending messages
/// are also dropped.
///
/// # Example
/// ```
/// use veloren_network::{Network, ProtocolAddr, Pid};
/// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY};
/// use futures::executor::block_on;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World`
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
/// # // keep it alive
/// # let _stream_p = remote_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// //Send Message
/// stream_a.send("Hello World")?;
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`send_raw`]: Stream::send_raw
/// [`recv`]: Stream::recv
/// [`Serialized`]: Serialize
#[inline]
pub fn send<M: Serialize>(&mut self, msg: M) -> Result<(), StreamError> {
self.send_raw(Arc::new(message::serialize(&msg)))
}
/// This methods give the option to skip multiple calls of [`bincode`], e.g.
/// in case the same Message needs to send on multiple `Streams` to multiple
/// [`Participants`]. Other then that, the same rules apply than for
/// [`send`]
///
/// # Example
/// ```rust
/// use veloren_network::{Network, ProtocolAddr, Pid, MessageBuffer};
/// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY};
/// use futures::executor::block_on;
/// use bincode;
/// use std::sync::Arc;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote1, fr1) = Network::new(Pid::new());
/// # std::thread::spawn(fr1);
/// # let (remote2, fr2) = Network::new(Pid::new());
/// # std::thread::spawn(fr2);
/// block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
/// # let remote1_p = remote1.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
/// # let remote2_p = remote2.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
/// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid());
/// # remote1_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// # remote2_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// let participant_a = network.connected().await?;
/// let participant_b = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// let mut stream_b = participant_b.opened().await?;
///
/// //Prepare Message and decode it
/// let msg = "Hello World";
/// let raw_msg = Arc::new(MessageBuffer{
/// data: bincode::serialize(&msg).unwrap(),
/// });
/// //Send same Message to multiple Streams
/// stream_a.send_raw(raw_msg.clone());
/// stream_b.send_raw(raw_msg.clone());
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`send`]: Stream::send
/// [`Participants`]: crate::api::Participant
pub fn send_raw(&mut self, messagebuffer: Arc<MessageBuffer>) -> Result<(), StreamError> {
if self.send_closed.load(Ordering::Relaxed) {
return Err(StreamError::StreamClosed);
}
self.a2b_msg_s.send((self.prio, self.sid, OutgoingMessage {
buffer: messagebuffer,
cursor: 0,
mid: self.mid,
sid: self.sid,
}))?;
self.mid += 1;
Ok(())
}
/// use `recv` to wait on a Message send from the remote side by their
/// `Stream`. The Message needs to implement [`DeserializeOwned`] and
/// thus, the resulting type must already be known by the receiving side.
/// If this is not know from the Application logic, one could use a `Enum`
/// and then handle the received message via a `match` state.
///
/// A [`StreamError`] will be returned in the error case, e.g. when the
/// `Stream` got closed already.
///
/// # Example
/// ```
/// use veloren_network::{Network, ProtocolAddr, Pid};
/// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY};
/// use futures::executor::block_on;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new());
/// # std::thread::spawn(fr);
/// block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
/// # let mut stream_p = remote_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// # stream_p.send("Hello World");
/// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// //Send Message
/// println!("{}", stream_a.recv::<String>().await?);
/// # Ok(())
/// })
/// # }
/// ```
#[inline]
pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError> {
Ok(message::deserialize(self.recv_raw().await?)?)
}
/// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] is
/// executed for performance reasons.
///
/// [`send_raw`]: Stream::send_raw
/// [`recv`]: Stream::recv
pub async fn recv_raw(&mut self) -> Result<MessageBuffer, StreamError> {
let msg = self.b2a_msg_recv_r.next().await?;
Ok(msg.buffer)
}
}
///
impl core::cmp::PartialEq for Participant {
fn eq(&self, other: &Self) -> bool {
//don't check local_pid, 2 Participant from different network should match if
// they are the "same"
self.remote_pid == other.remote_pid
}
}
impl Drop for Network {
fn drop(&mut self) {
let pid = self.local_pid;
debug!(?pid, "Shutting down Network");
trace!(
?pid,
"Shutting down Participants of Network, while we still have metrics"
);
let mut finished_receiver_list = vec![];
task::block_on(async {
// we MUST avoid nested block_on, good that Network::Drop no longer triggers
// Participant::Drop directly but just the BParticipant
for (remote_pid, a2s_disconnect_s) in
self.participant_disconnect_sender.write().await.drain()
{
match a2s_disconnect_s.lock().await.take() {
Some(mut a2s_disconnect_s) => {
trace!(?remote_pid, "Participants will be closed");
let (finished_sender, finished_receiver) = oneshot::channel();
finished_receiver_list.push((remote_pid, finished_receiver));
a2s_disconnect_s
.send((remote_pid, finished_sender))
.await
.expect(
"Scheduler is closed, but nobody other should be able to close it",
);
},
None => trace!(?remote_pid, "Participant already disconnected gracefully"),
}
}
//wait after close is requested for all
for (remote_pid, finished_receiver) in finished_receiver_list.drain(..) {
match finished_receiver.await {
Ok(Ok(())) => trace!(?remote_pid, "disconnect successful"),
Ok(Err(e)) => info!(?remote_pid, ?e, "unclean disconnect"),
Err(e) => warn!(
?remote_pid,
?e,
"Failed to get a message back from the scheduler, seems like the network \
is already closed"
),
}
}
});
trace!(?pid, "Participants have shut down!");
trace!(?pid, "Shutting down Scheduler");
self.shutdown_sender
.take()
.unwrap()
.send(())
.expect("Scheduler is closed, but nobody other should be able to close it");
debug!(?pid, "Network has shut down");
}
}
impl Drop for Participant {
fn drop(&mut self) {
// ignore closed, as we need to send it even though we disconnected the
// participant from network
let pid = self.remote_pid;
debug!(?pid, "Shutting down Participant");
match task::block_on(self.a2s_disconnect_s.lock()).take() {
None => trace!(
?pid,
"Participant has been shutdown cleanly, no further waiting is requiered!"
),
Some(mut a2s_disconnect_s) => {
debug!(?pid, "Disconnect from Scheduler");
task::block_on(async {
let (finished_sender, finished_receiver) = oneshot::channel();
a2s_disconnect_s
.send((self.remote_pid, finished_sender))
.await
.expect("Something is wrong in internal scheduler coding");
if let Err(e) = finished_receiver
.await
.expect("Something is wrong in internal scheduler/participant coding")
{
error!(
?pid,
?e,
"Error while dropping the participant, couldn't send all outgoing \
messages, dropping remaining"
);
};
});
},
}
debug!(?pid, "Participant dropped");
}
}
impl Drop for Stream {
fn drop(&mut self) {
// send if closed is unecessary but doesnt hurt, we must not crash
if !self.send_closed.load(Ordering::Relaxed) {
let sid = self.sid;
let pid = self.pid;
debug!(?pid, ?sid, "Shutting down Stream");
if task::block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid)).is_err() {
warn!(
"Other side got already dropped, probably due to timing, other side will \
handle this gracefully"
);
};
} else {
let sid = self.sid;
let pid = self.pid;
trace!(?pid, ?sid, "Stream Drop not needed");
}
}
}
impl std::fmt::Debug for Participant {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Participant {{ local_pid: {:?}, remote_pid: {:?} }}",
&self.local_pid, &self.remote_pid,
)
}
}
impl<T> From<crossbeam_channel::SendError<T>> for StreamError {
fn from(_err: crossbeam_channel::SendError<T>) -> Self { StreamError::StreamClosed }
}
impl<T> From<crossbeam_channel::SendError<T>> for NetworkError {
fn from(_err: crossbeam_channel::SendError<T>) -> Self { NetworkError::NetworkClosed }
}
impl From<std::option::NoneError> for StreamError {
fn from(_err: std::option::NoneError) -> Self { StreamError::StreamClosed }
}
impl From<std::option::NoneError> for NetworkError {
fn from(_err: std::option::NoneError) -> Self { NetworkError::NetworkClosed }
}
impl From<mpsc::SendError> for NetworkError {
fn from(_err: mpsc::SendError) -> Self { NetworkError::NetworkClosed }
}
impl From<oneshot::Canceled> for NetworkError {
fn from(_err: oneshot::Canceled) -> Self { NetworkError::NetworkClosed }
}
impl From<Box<bincode::ErrorKind>> for StreamError {
fn from(err: Box<bincode::ErrorKind>) -> Self { StreamError::DeserializeError(err) }
}
impl core::fmt::Display for StreamError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
StreamError::StreamClosed => write!(f, "stream closed"),
StreamError::DeserializeError(err) => {
write!(f, "deserialize error on message: {}", err)
},
}
}
}
impl core::fmt::Display for ParticipantError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
ParticipantError::ParticipantDisconnected => write!(f, "Participant disconnect"),
ParticipantError::ProtocolFailedUnrecoverable => {
write!(f, "underlying protocol failed unrecoverable")
},
}
}
}
impl core::fmt::Display for NetworkError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
NetworkError::NetworkClosed => write!(f, "Network closed"),
NetworkError::ListenFailed(_) => write!(f, "Listening failed"),
NetworkError::ConnectFailed(_) => write!(f, "Connecting failed"),
}
}
}
/// implementing PartialEq as it's super convenient in tests
impl core::cmp::PartialEq for StreamError {
fn eq(&self, other: &Self) -> bool {
match self {
StreamError::StreamClosed => match other {
StreamError::StreamClosed => true,
StreamError::DeserializeError(_) => false,
},
StreamError::DeserializeError(err) => match other {
StreamError::StreamClosed => false,
StreamError::DeserializeError(other_err) => partial_eq_bincode(err, other_err),
},
}
}
}
impl std::error::Error for StreamError {}
impl std::error::Error for ParticipantError {}
impl std::error::Error for NetworkError {}