mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
1383 lines
55 KiB
Rust
1383 lines
55 KiB
Rust
use crate::{
|
|
channel::ProtocolsError,
|
|
message::{partial_eq_bincode, Message},
|
|
participant::{A2bStreamOpen, S2bShutdownBparticipant},
|
|
scheduler::{A2sConnect, Scheduler},
|
|
};
|
|
use bytes::Bytes;
|
|
use hashbrown::HashMap;
|
|
#[cfg(feature = "compression")]
|
|
use lz_fear::raw::DecodeError;
|
|
use network_protocol::{Bandwidth, InitProtocolError, Pid, Prio, Promises, Sid};
|
|
#[cfg(feature = "metrics")]
|
|
use prometheus::Registry;
|
|
use serde::{de::DeserializeOwned, Serialize};
|
|
use std::{
|
|
net::SocketAddr,
|
|
sync::{
|
|
atomic::{AtomicBool, Ordering},
|
|
Arc,
|
|
},
|
|
time::Duration,
|
|
};
|
|
use tokio::{
|
|
io,
|
|
runtime::Runtime,
|
|
sync::{mpsc, oneshot, watch, Mutex},
|
|
};
|
|
use tracing::*;
|
|
|
|
type A2sDisconnect = Arc<Mutex<Option<mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>>>>;
|
|
|
|
/// Represents a Tcp, Quic, Udp or Mpsc connection address
|
|
#[derive(Clone, Debug)]
|
|
pub enum ConnectAddr {
|
|
Tcp(SocketAddr),
|
|
Udp(SocketAddr),
|
|
#[cfg(feature = "quic")]
|
|
Quic(SocketAddr, quinn::ClientConfig, String),
|
|
Mpsc(u64),
|
|
}
|
|
|
|
/// Represents a Tcp, Quic, Udp or Mpsc listen address
|
|
#[derive(Clone, Debug)]
|
|
pub enum ListenAddr {
|
|
Tcp(SocketAddr),
|
|
Udp(SocketAddr),
|
|
#[cfg(feature = "quic")]
|
|
Quic(SocketAddr, quinn::ServerConfig),
|
|
Mpsc(u64),
|
|
}
|
|
|
|
/// a Participant can throw different events, you are obligated to carefully
|
|
/// empty the queue from time to time
|
|
#[derive(Clone, Debug)]
|
|
pub enum ParticipantEvent {
|
|
ChannelCreated(ConnectAddr),
|
|
ChannelDeleted(ConnectAddr),
|
|
}
|
|
|
|
/// `Participants` are generated by the [`Network`] and represent a connection
|
|
/// to a remote Participant. Look at the [`connect`] and [`connected`] method of
|
|
/// [`Networks`] on how to generate `Participants`
|
|
///
|
|
/// [`Networks`]: crate::api::Network
|
|
/// [`connect`]: Network::connect
|
|
/// [`connected`]: Network::connected
|
|
pub struct Participant {
|
|
local_pid: Pid,
|
|
remote_pid: Pid,
|
|
a2b_open_stream_s: Mutex<mpsc::UnboundedSender<A2bStreamOpen>>,
|
|
b2a_stream_opened_r: Mutex<mpsc::UnboundedReceiver<Stream>>,
|
|
b2a_event_r: Mutex<mpsc::UnboundedReceiver<ParticipantEvent>>,
|
|
b2a_bandwidth_stats_r: watch::Receiver<f32>,
|
|
a2s_disconnect_s: A2sDisconnect,
|
|
}
|
|
|
|
/// `Streams` represents a channel to send `n` messages with a certain priority
|
|
/// and [`Promises`]. messages need always to be send between 2 `Streams`.
|
|
///
|
|
/// `Streams` are generated by the [`Participant`].
|
|
/// Look at the [`open`] and [`opened`] method of [`Participant`] on how to
|
|
/// generate `Streams`
|
|
///
|
|
/// Unlike [`Network`] and [`Participant`], `Streams` don't implement interior
|
|
/// mutability, as multiple threads don't need access to the same `Stream`.
|
|
///
|
|
/// [`Networks`]: crate::api::Network
|
|
/// [`open`]: Participant::open
|
|
/// [`opened`]: Participant::opened
|
|
#[derive(Debug)]
|
|
pub struct Stream {
|
|
local_pid: Pid,
|
|
remote_pid: Pid,
|
|
sid: Sid,
|
|
#[allow(dead_code)]
|
|
prio: Prio,
|
|
promises: Promises,
|
|
#[allow(dead_code)]
|
|
guaranteed_bandwidth: Bandwidth,
|
|
send_closed: Arc<AtomicBool>,
|
|
a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
|
|
b2a_msg_recv_r: Option<async_channel::Receiver<Bytes>>,
|
|
a2b_close_stream_s: Option<mpsc::UnboundedSender<Sid>>,
|
|
}
|
|
|
|
/// Error type thrown by [`Networks`](Network) methods
|
|
#[derive(Debug)]
|
|
pub enum NetworkError {
|
|
NetworkClosed,
|
|
ListenFailed(io::Error),
|
|
ConnectFailed(NetworkConnectError),
|
|
}
|
|
|
|
/// Error type thrown by [`Networks`](Network) connect
|
|
#[derive(Debug)]
|
|
pub enum NetworkConnectError {
|
|
/// Either a Pid UUID clash or you are trying to hijack a connection
|
|
InvalidSecret,
|
|
Handshake(InitProtocolError<ProtocolsError>),
|
|
Io(io::Error),
|
|
}
|
|
|
|
/// Error type thrown by [`Participants`](Participant) methods
|
|
#[derive(Debug, PartialEq, Clone)]
|
|
pub enum ParticipantError {
|
|
///Participant was closed by remote side
|
|
ParticipantDisconnected,
|
|
///Underlying Protocol failed and wasn't able to recover, expect some Data
|
|
/// loss unfortunately, there is no method to get the exact messages
|
|
/// that failed. This is also returned when local side tries to do
|
|
/// something while remote site gracefully disconnects
|
|
ProtocolFailedUnrecoverable,
|
|
}
|
|
|
|
/// Error type thrown by [`Streams`](Stream) methods
|
|
/// A Compression Error should only happen if a client sends malicious code.
|
|
/// A Deserialize Error probably means you are expecting Type X while you
|
|
/// actually got send type Y.
|
|
#[derive(Debug)]
|
|
pub enum StreamError {
|
|
StreamClosed,
|
|
#[cfg(feature = "compression")]
|
|
Compression(DecodeError),
|
|
Deserialize(bincode::Error),
|
|
}
|
|
|
|
/// All Parameters of a Stream, can be used to generate RawMessages
|
|
#[derive(Debug, Clone)]
|
|
pub struct StreamParams {
|
|
pub(crate) promises: Promises,
|
|
}
|
|
|
|
/// Use the `Network` to create connections to other [`Participants`]
|
|
///
|
|
/// The `Network` is the single source that handles all connections in your
|
|
/// Application. You can pass it around multiple threads in an
|
|
/// [`Arc`](std::sync::Arc) as all commands have internal mutability.
|
|
///
|
|
/// The `Network` has methods to [`connect`] to other [`Participants`] actively
|
|
/// via their [`ConnectAddr`], or [`listen`] passively for [`connected`]
|
|
/// [`Participants`] via [`ListenAddr`].
|
|
///
|
|
/// Too guarantee a clean shutdown, the [`Runtime`] MUST NOT be dropped before
|
|
/// the Network.
|
|
///
|
|
/// # Examples
|
|
/// ```rust
|
|
/// use tokio::runtime::Runtime;
|
|
/// use veloren_network::{Network, ConnectAddr, ListenAddr, Pid};
|
|
///
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application
|
|
/// let runtime = Runtime::new().unwrap();
|
|
/// let network = Network::new(Pid::new(), &runtime);
|
|
/// runtime.block_on(async{
|
|
/// # //setup pseudo database!
|
|
/// # let database = Network::new(Pid::new(), &runtime);
|
|
/// # database.listen(ListenAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
|
|
/// network.listen(ListenAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?;
|
|
/// let database = network.connect(ConnectAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
|
|
/// drop(network);
|
|
/// # drop(database);
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`Participants`]: crate::api::Participant
|
|
/// [`Runtime`]: tokio::runtime::Runtime
|
|
/// [`connect`]: Network::connect
|
|
/// [`listen`]: Network::listen
|
|
/// [`connected`]: Network::connected
|
|
/// [`ConnectAddr`]: crate::api::ConnectAddr
|
|
/// [`ListenAddr`]: crate::api::ListenAddr
|
|
pub struct Network {
|
|
local_pid: Pid,
|
|
participant_disconnect_sender: Arc<Mutex<HashMap<Pid, A2sDisconnect>>>,
|
|
listen_sender: Mutex<mpsc::UnboundedSender<(ListenAddr, oneshot::Sender<io::Result<()>>)>>,
|
|
connect_sender: Mutex<mpsc::UnboundedSender<A2sConnect>>,
|
|
connected_receiver: Mutex<mpsc::UnboundedReceiver<Participant>>,
|
|
shutdown_network_s: Option<oneshot::Sender<oneshot::Sender<()>>>,
|
|
}
|
|
|
|
impl Network {
|
|
/// Generates a new `Network` to handle all connections in an Application
|
|
///
|
|
/// # Arguments
|
|
/// * `participant_id` - provide it by calling [`Pid::new()`], usually you
|
|
/// don't want to reuse a Pid for 2 `Networks`
|
|
/// * `runtime` - provide a [`Runtime`], it's used to internally spawn
|
|
/// tasks. It is necessary to clean up in the non-async `Drop`. **All**
|
|
/// network related components **must** be dropped before the runtime is
|
|
/// stopped. dropping the runtime while a shutdown is still in progress
|
|
/// leaves the network in a bad state which might cause a panic!
|
|
///
|
|
/// # Result
|
|
/// * `Self` - returns a `Network` which can be `Send` to multiple areas of
|
|
/// your code, including multiple threads. This is the base strct of this
|
|
/// crate.
|
|
///
|
|
/// # Examples
|
|
/// ```rust
|
|
/// use tokio::runtime::Runtime;
|
|
/// use veloren_network::{Network, Pid};
|
|
///
|
|
/// let runtime = Runtime::new().unwrap();
|
|
/// let network = Network::new(Pid::new(), &runtime);
|
|
/// ```
|
|
///
|
|
/// Usually you only create a single `Network` for an application,
|
|
/// except when client and server are in the same application, then you
|
|
/// will want 2. However there are no technical limitations from
|
|
/// creating more.
|
|
///
|
|
/// [`Pid::new()`]: network_protocol::Pid::new
|
|
/// [`Runtime`]: tokio::runtime::Runtime
|
|
pub fn new(participant_id: Pid, runtime: &Runtime) -> Self {
|
|
Self::internal_new(
|
|
participant_id,
|
|
runtime,
|
|
#[cfg(feature = "metrics")]
|
|
None,
|
|
)
|
|
}
|
|
|
|
/// See [`new`]
|
|
///
|
|
/// # additional Arguments
|
|
/// * `registry` - Provide a Registry in order to collect Prometheus metrics
|
|
/// by this `Network`, `None` will deactivate Tracing. Tracing is done via
|
|
/// [`prometheus`]
|
|
///
|
|
/// # Examples
|
|
/// ```rust
|
|
/// use prometheus::Registry;
|
|
/// use tokio::runtime::Runtime;
|
|
/// use veloren_network::{Network, Pid};
|
|
///
|
|
/// let runtime = Runtime::new().unwrap();
|
|
/// let registry = Registry::new();
|
|
/// let network = Network::new_with_registry(Pid::new(), &runtime, ®istry);
|
|
/// ```
|
|
/// [`new`]: crate::api::Network::new
|
|
#[cfg(feature = "metrics")]
|
|
pub fn new_with_registry(participant_id: Pid, runtime: &Runtime, registry: &Registry) -> Self {
|
|
Self::internal_new(participant_id, runtime, Some(registry))
|
|
}
|
|
|
|
fn internal_new(
|
|
participant_id: Pid,
|
|
runtime: &Runtime,
|
|
#[cfg(feature = "metrics")] registry: Option<&Registry>,
|
|
) -> Self {
|
|
let p = participant_id;
|
|
let span = info_span!("network", ?p);
|
|
span.in_scope(|| trace!("Starting Network"));
|
|
let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) =
|
|
Scheduler::new(
|
|
participant_id,
|
|
#[cfg(feature = "metrics")]
|
|
registry,
|
|
);
|
|
let participant_disconnect_sender = Arc::new(Mutex::new(HashMap::new()));
|
|
let (shutdown_network_s, shutdown_network_r) = oneshot::channel();
|
|
let f = Self::shutdown_mgr(
|
|
p,
|
|
shutdown_network_r,
|
|
Arc::clone(&participant_disconnect_sender),
|
|
shutdown_sender,
|
|
);
|
|
runtime.spawn(f);
|
|
runtime.spawn(
|
|
async move {
|
|
trace!("Starting scheduler in own thread");
|
|
scheduler.run().await;
|
|
trace!("Stopping scheduler and his own thread");
|
|
}
|
|
.instrument(info_span!("network", ?p)),
|
|
);
|
|
Self {
|
|
local_pid: participant_id,
|
|
participant_disconnect_sender,
|
|
listen_sender: Mutex::new(listen_sender),
|
|
connect_sender: Mutex::new(connect_sender),
|
|
connected_receiver: Mutex::new(connected_receiver),
|
|
shutdown_network_s: Some(shutdown_network_s),
|
|
}
|
|
}
|
|
|
|
/// starts listening on an [`ListenAddr`].
|
|
/// When the method returns the `Network` is ready to listen for incoming
|
|
/// connections OR has returned a [`NetworkError`] (e.g. port already used).
|
|
/// You can call [`connected`] to asynchrony wait for a [`Participant`] to
|
|
/// connect. You can call `listen` on multiple addresses, e.g. to
|
|
/// support multiple Protocols or NICs.
|
|
///
|
|
/// # Examples
|
|
/// ```ignore
|
|
/// use tokio::runtime::Runtime;
|
|
/// use veloren_network::{Network, Pid, ListenAddr};
|
|
///
|
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally
|
|
/// let runtime = Runtime::new().unwrap();
|
|
/// let network = Network::new(Pid::new(), &runtime);
|
|
/// runtime.block_on(async {
|
|
/// network
|
|
/// .listen(ListenAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
|
|
/// .await?;
|
|
/// network
|
|
/// .listen(ListenAddr::Udp("127.0.0.1:2001".parse().unwrap()))
|
|
/// .await?;
|
|
/// drop(network);
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`connected`]: Network::connected
|
|
/// [`ListenAddr`]: crate::api::ListenAddr
|
|
#[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
|
|
pub async fn listen(&self, address: ListenAddr) -> Result<(), NetworkError> {
|
|
let (s2a_result_s, s2a_result_r) = oneshot::channel::<io::Result<()>>();
|
|
debug!(?address, "listening on address");
|
|
self.listen_sender
|
|
.lock()
|
|
.await
|
|
.send((address, s2a_result_s))?;
|
|
match s2a_result_r.await? {
|
|
//waiting guarantees that we either listened successfully or get an error like port in
|
|
// use
|
|
Ok(()) => Ok(()),
|
|
Err(e) => Err(NetworkError::ListenFailed(e)),
|
|
}
|
|
}
|
|
|
|
/// starts connection to an [`ConnectAddr`].
|
|
/// When the method returns the Network either returns a [`Participant`]
|
|
/// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g.
|
|
/// can't connect, or invalid Handshake) # Examples
|
|
/// ```ignore
|
|
/// use tokio::runtime::Runtime;
|
|
/// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr};
|
|
///
|
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above
|
|
/// let runtime = Runtime::new().unwrap();
|
|
/// let network = Network::new(Pid::new(), &runtime);
|
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
|
/// runtime.block_on(async {
|
|
/// # remote.listen(ListenAddr::Tcp("127.0.0.1:2010".parse().unwrap())).await?;
|
|
/// # remote.listen(ListenAddr::Udp("127.0.0.1:2011".parse().unwrap())).await?;
|
|
/// let p1 = network
|
|
/// .connect(ConnectAddr::Tcp("127.0.0.1:2010".parse().unwrap()))
|
|
/// .await?;
|
|
/// # //this doesn't work yet, so skip the test
|
|
/// # //TODO fixme!
|
|
/// # return Ok(());
|
|
/// let p2 = network
|
|
/// .connect(ConnectAddr::Udp("127.0.0.1:2011".parse().unwrap()))
|
|
/// .await?;
|
|
/// assert_eq!(&p1, &p2);
|
|
/// # Ok(())
|
|
/// })?;
|
|
/// drop(network);
|
|
/// # drop(remote);
|
|
/// # Ok(())
|
|
/// # }
|
|
/// ```
|
|
/// Usually the `Network` guarantees that a operation on a [`Participant`]
|
|
/// succeeds, e.g. by automatic retrying unless it fails completely e.g. by
|
|
/// disconnecting from the remote. If 2 [`ConnectAddr] you
|
|
/// `connect` to belongs to the same [`Participant`], you get the same
|
|
/// [`Participant`] as a result. This is useful e.g. by connecting to
|
|
/// the same [`Participant`] via multiple Protocols.
|
|
///
|
|
/// [`Streams`]: crate::api::Stream
|
|
/// [`ConnectAddr`]: crate::api::ConnectAddr
|
|
#[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
|
|
pub async fn connect(&self, address: ConnectAddr) -> Result<Participant, NetworkError> {
|
|
let (pid_sender, pid_receiver) =
|
|
oneshot::channel::<Result<Participant, NetworkConnectError>>();
|
|
debug!(?address, "Connect to address");
|
|
self.connect_sender
|
|
.lock()
|
|
.await
|
|
.send((address, pid_sender))?;
|
|
let participant = match pid_receiver.await? {
|
|
Ok(p) => p,
|
|
Err(e) => return Err(NetworkError::ConnectFailed(e)),
|
|
};
|
|
let remote_pid = participant.remote_pid;
|
|
trace!(?remote_pid, "connected");
|
|
self.participant_disconnect_sender
|
|
.lock()
|
|
.await
|
|
.insert(remote_pid, Arc::clone(&participant.a2s_disconnect_s));
|
|
Ok(participant)
|
|
}
|
|
|
|
/// returns a [`Participant`] created from a [`ListenAddr`] you
|
|
/// called [`listen`] on before. This function will either return a
|
|
/// working [`Participant`] ready to open [`Streams`] on OR has returned
|
|
/// a [`NetworkError`] (e.g. Network got closed)
|
|
///
|
|
/// # Examples
|
|
/// ```rust
|
|
/// use tokio::runtime::Runtime;
|
|
/// use veloren_network::{ConnectAddr, ListenAddr, Network, Pid};
|
|
///
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, listen on port `2020` TCP and opens returns their Pid
|
|
/// let runtime = Runtime::new().unwrap();
|
|
/// let network = Network::new(Pid::new(), &runtime);
|
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
|
/// runtime.block_on(async {
|
|
/// network
|
|
/// .listen(ListenAddr::Tcp("127.0.0.1:2020".parse().unwrap()))
|
|
/// .await?;
|
|
/// # remote.connect(ConnectAddr::Tcp("127.0.0.1:2020".parse().unwrap())).await?;
|
|
/// while let Ok(participant) = network.connected().await {
|
|
/// println!("Participant connected: {}", participant.remote_pid());
|
|
/// # //skip test here as it would be a endless loop
|
|
/// # break;
|
|
/// }
|
|
/// drop(network);
|
|
/// # drop(remote);
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`Streams`]: crate::api::Stream
|
|
/// [`listen`]: crate::api::Network::listen
|
|
/// [`ListenAddr`]: crate::api::ListenAddr
|
|
#[instrument(name="network", skip(self), fields(p = %self.local_pid))]
|
|
pub async fn connected(&self) -> Result<Participant, NetworkError> {
|
|
let participant = self
|
|
.connected_receiver
|
|
.lock()
|
|
.await
|
|
.recv()
|
|
.await
|
|
.ok_or(NetworkError::NetworkClosed)?;
|
|
self.participant_disconnect_sender.lock().await.insert(
|
|
participant.remote_pid,
|
|
Arc::clone(&participant.a2s_disconnect_s),
|
|
);
|
|
Ok(participant)
|
|
}
|
|
|
|
/// Use a mgr to handle shutdown smoothly and not in `Drop`
|
|
#[instrument(name="network", skip(participant_disconnect_sender, shutdown_scheduler_s), fields(p = %local_pid))]
|
|
async fn shutdown_mgr(
|
|
local_pid: Pid,
|
|
shutdown_network_r: oneshot::Receiver<oneshot::Sender<()>>,
|
|
participant_disconnect_sender: Arc<Mutex<HashMap<Pid, A2sDisconnect>>>,
|
|
shutdown_scheduler_s: oneshot::Sender<()>,
|
|
) {
|
|
trace!("waiting for shutdown triggerNetwork");
|
|
let return_s = shutdown_network_r.await;
|
|
trace!("Shutting down Participants of Network");
|
|
let mut finished_receiver_list = vec![];
|
|
|
|
for (remote_pid, a2s_disconnect_s) in participant_disconnect_sender.lock().await.drain() {
|
|
match a2s_disconnect_s.lock().await.take() {
|
|
Some(a2s_disconnect_s) => {
|
|
trace!(?remote_pid, "Participants will be closed");
|
|
let (finished_sender, finished_receiver) = oneshot::channel();
|
|
finished_receiver_list.push((remote_pid, finished_receiver));
|
|
// If the channel was already dropped, we can assume that the other side
|
|
// already released its resources.
|
|
let _ = a2s_disconnect_s
|
|
.send((remote_pid, (Duration::from_secs(10), finished_sender)));
|
|
},
|
|
None => trace!(?remote_pid, "Participant already disconnected gracefully"),
|
|
}
|
|
}
|
|
//wait after close is requested for all
|
|
for (remote_pid, finished_receiver) in finished_receiver_list.drain(..) {
|
|
match finished_receiver.await {
|
|
Ok(Ok(())) => trace!(?remote_pid, "disconnect successful"),
|
|
Ok(Err(e)) => info!(?remote_pid, ?e, "unclean disconnect"),
|
|
Err(e) => warn!(
|
|
?remote_pid,
|
|
?e,
|
|
"Failed to get a message back from the scheduler, seems like the network is \
|
|
already closed"
|
|
),
|
|
}
|
|
}
|
|
|
|
trace!("Participants have shut down - next: Scheduler");
|
|
if let Err(()) = shutdown_scheduler_s.send(()) {
|
|
error!("Scheduler is closed, but nobody other should be able to close it")
|
|
};
|
|
if let Ok(return_s) = return_s {
|
|
if return_s.send(()).is_err() {
|
|
warn!("Network::drop stopped after a timeout and didn't wait for our shutdown");
|
|
};
|
|
}
|
|
debug!("Network has shut down");
|
|
}
|
|
}
|
|
|
|
impl Participant {
|
|
pub(crate) fn new(
|
|
local_pid: Pid,
|
|
remote_pid: Pid,
|
|
a2b_open_stream_s: mpsc::UnboundedSender<A2bStreamOpen>,
|
|
b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
|
|
b2a_event_r: mpsc::UnboundedReceiver<ParticipantEvent>,
|
|
b2a_bandwidth_stats_r: watch::Receiver<f32>,
|
|
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>,
|
|
) -> Self {
|
|
Self {
|
|
local_pid,
|
|
remote_pid,
|
|
a2b_open_stream_s: Mutex::new(a2b_open_stream_s),
|
|
b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r),
|
|
b2a_event_r: Mutex::new(b2a_event_r),
|
|
b2a_bandwidth_stats_r,
|
|
a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
|
|
}
|
|
}
|
|
|
|
/// Opens a [`Stream`] on this `Participant` with a certain Priority and
|
|
/// [`Promises`]
|
|
///
|
|
/// # Arguments
|
|
/// * `prio` - defines which stream is processed first when limited on
|
|
/// bandwidth. See [`Prio`] for documentation.
|
|
/// * `promises` - use a combination of you preferred [`Promises`], see the
|
|
/// link for further documentation. You can combine them, e.g.
|
|
/// `Promises::ORDERED | Promises::CONSISTENCY` The Stream will then
|
|
/// guarantee that those promises are met.
|
|
/// * `bandwidth` - sets a guaranteed bandwidth which is reserved for this
|
|
/// stream. When excess bandwidth is available it will be used. See
|
|
/// [`Bandwidth`] for details.
|
|
///
|
|
/// A [`ParticipantError`] might be thrown if the `Participant` is already
|
|
/// closed. [`Streams`] can be created without a answer from the remote
|
|
/// side, resulting in very fast creation and closing latency.
|
|
///
|
|
/// # Examples
|
|
/// ```rust
|
|
/// use tokio::runtime::Runtime;
|
|
/// use veloren_network::{ConnectAddr, ListenAddr, Network, Pid, Promises};
|
|
///
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, connect on port 2100 and open a stream
|
|
/// let runtime = Runtime::new().unwrap();
|
|
/// let network = Network::new(Pid::new(), &runtime);
|
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
|
/// runtime.block_on(async {
|
|
/// # remote.listen(ListenAddr::Tcp("127.0.0.1:2100".parse().unwrap())).await?;
|
|
/// let p1 = network
|
|
/// .connect(ConnectAddr::Tcp("127.0.0.1:2100".parse().unwrap()))
|
|
/// .await?;
|
|
/// let _s1 = p1
|
|
/// .open(4, Promises::ORDERED | Promises::CONSISTENCY, 1000)
|
|
/// .await?;
|
|
/// drop(network);
|
|
/// # drop(remote);
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`Prio`]: network_protocol::Prio
|
|
/// [`Bandwidth`]: network_protocol::Bandwidth
|
|
/// [`Promises`]: network_protocol::Promises
|
|
/// [`Streams`]: crate::api::Stream
|
|
#[instrument(name="network", skip(self, prio, promises, bandwidth), fields(p = %self.local_pid))]
|
|
pub async fn open(
|
|
&self,
|
|
prio: u8,
|
|
promises: Promises,
|
|
bandwidth: Bandwidth,
|
|
) -> Result<Stream, ParticipantError> {
|
|
debug_assert!(prio <= network_protocol::HIGHEST_PRIO, "invalid prio");
|
|
let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel::<Stream>();
|
|
if let Err(e) = self.a2b_open_stream_s.lock().await.send((
|
|
prio,
|
|
promises,
|
|
bandwidth,
|
|
p2a_return_stream_s,
|
|
)) {
|
|
debug!(?e, "bParticipant is already closed, notifying");
|
|
return Err(ParticipantError::ParticipantDisconnected);
|
|
}
|
|
match p2a_return_stream_r.await {
|
|
Ok(stream) => {
|
|
let sid = stream.sid;
|
|
trace!(?sid, "opened stream");
|
|
Ok(stream)
|
|
},
|
|
Err(_) => {
|
|
debug!("p2a_return_stream_r failed, closing participant");
|
|
Err(ParticipantError::ParticipantDisconnected)
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Use this method to handle [`Streams`] opened from remote site, like the
|
|
/// [`connected`] method of [`Network`]. This is the associated method
|
|
/// to [`open`]. It's guaranteed that the order of [`open`] and `opened`
|
|
/// is equal. The `nth` [`Streams`] on one side will represent the `nth` on
|
|
/// the other side. A [`ParticipantError`] might be thrown if the
|
|
/// `Participant` is already closed.
|
|
///
|
|
/// # Examples
|
|
/// ```rust
|
|
/// use tokio::runtime::Runtime;
|
|
/// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr, Promises};
|
|
///
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, connect on port 2110 and wait for the other side to open a stream
|
|
/// // Note: It's quite unusual to actively connect, but then wait on a stream to be connected, usually the Application taking initiative want's to also create the first Stream.
|
|
/// let runtime = Runtime::new().unwrap();
|
|
/// let network = Network::new(Pid::new(), &runtime);
|
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
|
/// runtime.block_on(async {
|
|
/// # remote.listen(ListenAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
|
|
/// let p1 = network.connect(ConnectAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
|
|
/// # let p2 = remote.connected().await?;
|
|
/// # p2.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
|
/// let _s1 = p1.opened().await?;
|
|
/// drop(network);
|
|
/// # drop(remote);
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`Streams`]: crate::api::Stream
|
|
/// [`connected`]: Network::connected
|
|
/// [`open`]: Participant::open
|
|
#[instrument(name="network", skip(self), fields(p = %self.local_pid))]
|
|
pub async fn opened(&self) -> Result<Stream, ParticipantError> {
|
|
match self.b2a_stream_opened_r.lock().await.recv().await {
|
|
Some(stream) => {
|
|
let sid = stream.sid;
|
|
debug!(?sid, "Receive opened stream");
|
|
Ok(stream)
|
|
},
|
|
None => {
|
|
debug!("stream_opened_receiver failed, closing participant");
|
|
Err(ParticipantError::ParticipantDisconnected)
|
|
},
|
|
}
|
|
}
|
|
|
|
/// disconnecting a `Participant` in a async way.
|
|
/// Use this rather than `Participant::Drop` if you want to close multiple
|
|
/// `Participants`.
|
|
///
|
|
/// This function will wait for all [`Streams`] to properly close, including
|
|
/// all messages to be send before closing. If an error occurs with one
|
|
/// of the messages.
|
|
/// Except if the remote side already dropped the `Participant`
|
|
/// simultaneously, then messages won't be send
|
|
///
|
|
/// There is NO `disconnected` function in `Participant`, if a `Participant`
|
|
/// is no longer reachable (e.g. as the network cable was unplugged) the
|
|
/// `Participant` will fail all action, but needs to be manually
|
|
/// disconnected, using this function.
|
|
///
|
|
/// # Examples
|
|
/// ```rust
|
|
/// use tokio::runtime::Runtime;
|
|
/// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr};
|
|
///
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection.
|
|
/// let runtime = Runtime::new().unwrap();
|
|
/// let network = Network::new(Pid::new(), &runtime);
|
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
|
/// let err = runtime.block_on(async {
|
|
/// network
|
|
/// .listen(ListenAddr::Tcp("127.0.0.1:2030".parse().unwrap()))
|
|
/// .await?;
|
|
/// # let keep_alive = remote.connect(ConnectAddr::Tcp("127.0.0.1:2030".parse().unwrap())).await?;
|
|
/// while let Ok(participant) = network.connected().await {
|
|
/// println!("Participant connected: {}", participant.remote_pid());
|
|
/// participant.disconnect().await?;
|
|
/// # //skip test here as it would be a endless loop
|
|
/// # break;
|
|
/// }
|
|
/// # Ok(())
|
|
/// });
|
|
/// drop(network);
|
|
/// # drop(remote);
|
|
/// # err
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`Streams`]: crate::api::Stream
|
|
#[instrument(name="network", skip(self), fields(p = %self.local_pid))]
|
|
pub async fn disconnect(self) -> Result<(), ParticipantError> {
|
|
// Remove, Close and try_unwrap error when unwrap fails!
|
|
debug!("Closing participant from network");
|
|
|
|
//Streams will be closed by BParticipant
|
|
match self.a2s_disconnect_s.lock().await.take() {
|
|
Some(a2s_disconnect_s) => {
|
|
let (finished_sender, finished_receiver) = oneshot::channel();
|
|
// Participant is connecting to Scheduler here, not as usual
|
|
// Participant<->BParticipant
|
|
|
|
// If this is already dropped, we can assume the other side already freed its
|
|
// resources.
|
|
let _ = a2s_disconnect_s
|
|
.send((self.remote_pid, (Duration::from_secs(120), finished_sender)));
|
|
match finished_receiver.await {
|
|
Ok(res) => {
|
|
match res {
|
|
Ok(()) => trace!("Participant is now closed"),
|
|
Err(ref e) => {
|
|
trace!(?e, "Error occurred during shutdown of participant")
|
|
},
|
|
};
|
|
res
|
|
},
|
|
Err(e) => {
|
|
//this is a bug. but as i am Participant i can't destroy the network
|
|
error!(
|
|
?e,
|
|
"Failed to get a message back from the scheduler, seems like the \
|
|
network is already closed"
|
|
);
|
|
Err(ParticipantError::ProtocolFailedUnrecoverable)
|
|
},
|
|
}
|
|
},
|
|
None => {
|
|
warn!(
|
|
"seems like you are trying to disconnecting a participant after the network \
|
|
was already dropped. It was already dropped with the network!"
|
|
);
|
|
Err(ParticipantError::ParticipantDisconnected)
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Use this method to query [`ParticipantEvent`]. Those are internal events
|
|
/// from the network crate that will get reported to the frontend.
|
|
/// E.g. Creation and Deletion of Channels.
|
|
///
|
|
/// Make sure to call this function from time to time to not let events
|
|
/// stack up endlessly and create a memory leak.
|
|
///
|
|
/// # Examples
|
|
/// ```rust
|
|
/// use tokio::runtime::Runtime;
|
|
/// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr, Promises, ParticipantEvent};
|
|
///
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, connect on port 2040 and wait for the other side to open a stream
|
|
/// // Note: It's quite unusual to actively connect, but then wait on a stream to be connected, usually the Application taking initiative want's to also create the first Stream.
|
|
/// let runtime = Runtime::new().unwrap();
|
|
/// let network = Network::new(Pid::new(), &runtime);
|
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
|
/// runtime.block_on(async {
|
|
/// # remote.listen(ListenAddr::Tcp("127.0.0.1:2040".parse().unwrap())).await?;
|
|
/// let p1 = network.connect(ConnectAddr::Tcp("127.0.0.1:2040".parse().unwrap())).await?;
|
|
/// # let p2 = remote.connected().await?;
|
|
/// let event = p1.fetch_event().await?;
|
|
/// drop(network);
|
|
/// # drop(remote);
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`ParticipantEvent`]: crate::api::ParticipantEvent
|
|
pub async fn fetch_event(&self) -> Result<ParticipantEvent, ParticipantError> {
|
|
match self.b2a_event_r.lock().await.recv().await {
|
|
Some(event) => Ok(event),
|
|
None => {
|
|
debug!("event_receiver failed, closing participant");
|
|
Err(ParticipantError::ParticipantDisconnected)
|
|
},
|
|
}
|
|
}
|
|
|
|
/// use `try_fetch_event` to check for a [`ParticipantEvent`] . This
|
|
/// function does not block and returns immediately. It's intended for
|
|
/// use in non-async context only. Other then that, the same rules apply
|
|
/// than for [`fetch_event`].
|
|
///
|
|
/// [`ParticipantEvent`]: crate::api::ParticipantEvent
|
|
/// [`fetch_event`]: Participant::fetch_event
|
|
pub fn try_fetch_event(&self) -> Result<Option<ParticipantEvent>, ParticipantError> {
|
|
match &mut self.b2a_event_r.try_lock() {
|
|
Ok(b2a_event_r) => match b2a_event_r.try_recv() {
|
|
Ok(event) => Ok(Some(event)),
|
|
Err(mpsc::error::TryRecvError::Empty) => Ok(None),
|
|
Err(mpsc::error::TryRecvError::Disconnected) => {
|
|
Err(ParticipantError::ParticipantDisconnected)
|
|
},
|
|
},
|
|
Err(_) => Ok(None),
|
|
}
|
|
}
|
|
|
|
/// Returns the current approximation on the maximum bandwidth available.
|
|
/// This WILL fluctuate based on the amount/size of send messages.
|
|
pub fn bandwidth(&self) -> f32 { *self.b2a_bandwidth_stats_r.borrow() }
|
|
|
|
/// Returns the remote [`Pid`](network_protocol::Pid)
|
|
pub fn remote_pid(&self) -> Pid { self.remote_pid }
|
|
}
|
|
|
|
impl Stream {
|
|
pub(crate) fn new(
|
|
local_pid: Pid,
|
|
remote_pid: Pid,
|
|
sid: Sid,
|
|
prio: Prio,
|
|
promises: Promises,
|
|
guaranteed_bandwidth: Bandwidth,
|
|
send_closed: Arc<AtomicBool>,
|
|
a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
|
|
b2a_msg_recv_r: async_channel::Receiver<Bytes>,
|
|
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
|
|
) -> Self {
|
|
Self {
|
|
local_pid,
|
|
remote_pid,
|
|
sid,
|
|
prio,
|
|
promises,
|
|
guaranteed_bandwidth,
|
|
send_closed,
|
|
a2b_msg_s,
|
|
b2a_msg_recv_r: Some(b2a_msg_recv_r),
|
|
a2b_close_stream_s: Some(a2b_close_stream_s),
|
|
}
|
|
}
|
|
|
|
/// use to send a arbitrary message to the remote side, by having the remote
|
|
/// side also opened a `Stream` linked to this. the message will be
|
|
/// [`Serialized`], which actually is quite slow compared to most other
|
|
/// calculations done. A faster method [`send_raw`] exists, when extra
|
|
/// speed is needed. The other side needs to use the respective [`recv`]
|
|
/// function and know the type send.
|
|
///
|
|
/// `send` is an exception to the `async` messages, as it's probably called
|
|
/// quite often so it doesn't wait for execution. Which also means, that
|
|
/// no feedback is provided. It's to assume that the Message got `send`
|
|
/// correctly. If a error occurred, the next call will return an Error.
|
|
/// If the [`Participant`] disconnected it will also be unable to be used
|
|
/// any more. A [`StreamError`] will be returned in the error case, e.g.
|
|
/// when the `Stream` got closed already.
|
|
///
|
|
/// Note when a `Stream` is dropped locally, it will still send all
|
|
/// messages, though the `drop` will return immediately, however, when a
|
|
/// [`Participant`] gets gracefully shut down, all remaining messages
|
|
/// will be send. If the `Stream` is dropped from remote side no further
|
|
/// messages are send, because the remote side has no way of listening
|
|
/// to them either way. If the last channel is destroyed (e.g. by losing
|
|
/// the internet connection or non-graceful shutdown, pending messages
|
|
/// are also dropped.
|
|
///
|
|
/// # Example
|
|
/// ```
|
|
/// # use veloren_network::Promises;
|
|
/// use tokio::runtime::Runtime;
|
|
/// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
|
|
///
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World`
|
|
/// let runtime = Runtime::new().unwrap();
|
|
/// let network = Network::new(Pid::new(), &runtime);
|
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
|
/// runtime.block_on(async {
|
|
/// network.listen(ListenAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
|
|
/// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
|
|
/// # // keep it alive
|
|
/// # let _stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
|
/// let participant_a = network.connected().await?;
|
|
/// let mut stream_a = participant_a.opened().await?;
|
|
/// //Send Message
|
|
/// stream_a.send("Hello World")?;
|
|
/// drop(network);
|
|
/// # drop(remote);
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`send_raw`]: Stream::send_raw
|
|
/// [`recv`]: Stream::recv
|
|
/// [`Serialized`]: Serialize
|
|
#[inline]
|
|
pub fn send<M: Serialize>(&mut self, msg: M) -> Result<(), StreamError> {
|
|
self.send_raw_move(Message::serialize(&msg, self.params()))
|
|
}
|
|
|
|
/// This methods give the option to skip multiple calls of [`bincode`] and
|
|
/// [`compress`], e.g. in case the same Message needs to send on
|
|
/// multiple `Streams` to multiple [`Participants`]. Other then that,
|
|
/// the same rules apply than for [`send`].
|
|
/// You need to create a Message via [`Message::serialize`].
|
|
///
|
|
/// # Example
|
|
/// ```rust
|
|
/// # use veloren_network::Promises;
|
|
/// use tokio::runtime::Runtime;
|
|
/// use bincode;
|
|
/// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid, Message};
|
|
///
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// let runtime = Runtime::new().unwrap();
|
|
/// let network = Network::new(Pid::new(), &runtime);
|
|
/// # let remote1 = Network::new(Pid::new(), &runtime);
|
|
/// # let remote2 = Network::new(Pid::new(), &runtime);
|
|
/// runtime.block_on(async {
|
|
/// network.listen(ListenAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
|
/// # let remote1_p = remote1.connect(ConnectAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
|
/// # let remote2_p = remote2.connect(ConnectAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
|
/// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid());
|
|
/// # remote1_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
|
/// # remote2_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
|
/// let participant_a = network.connected().await?;
|
|
/// let participant_b = network.connected().await?;
|
|
/// let mut stream_a = participant_a.opened().await?;
|
|
/// let mut stream_b = participant_b.opened().await?;
|
|
///
|
|
/// //Prepare Message and decode it
|
|
/// let msg = Message::serialize("Hello World", stream_a.params());
|
|
/// //Send same Message to multiple Streams
|
|
/// stream_a.send_raw(&msg);
|
|
/// stream_b.send_raw(&msg);
|
|
/// drop(network);
|
|
/// # drop(remote1);
|
|
/// # drop(remote2);
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`send`]: Stream::send
|
|
/// [`Participants`]: crate::api::Participant
|
|
/// [`compress`]: lz_fear::raw::compress2
|
|
/// [`Message::serialize`]: crate::message::Message::serialize
|
|
#[inline]
|
|
pub fn send_raw(&mut self, message: &Message) -> Result<(), StreamError> {
|
|
self.send_raw_move(Message {
|
|
data: message.data.clone(),
|
|
#[cfg(feature = "compression")]
|
|
compressed: message.compressed,
|
|
})
|
|
}
|
|
|
|
fn send_raw_move(&mut self, message: Message) -> Result<(), StreamError> {
|
|
if self.send_closed.load(Ordering::Relaxed) {
|
|
return Err(StreamError::StreamClosed);
|
|
}
|
|
#[cfg(debug_assertions)]
|
|
message.verify(self.params());
|
|
self.a2b_msg_s.send((self.sid, message.data))?;
|
|
Ok(())
|
|
}
|
|
|
|
/// use `recv` to wait on a Message send from the remote side by their
|
|
/// `Stream`. The Message needs to implement [`DeserializeOwned`] and
|
|
/// thus, the resulting type must already be known by the receiving side.
|
|
/// If this is not know from the Application logic, one could use a `Enum`
|
|
/// and then handle the received message via a `match` state.
|
|
///
|
|
/// A [`StreamError`] will be returned in the error case, e.g. when the
|
|
/// `Stream` got closed already.
|
|
///
|
|
/// # Example
|
|
/// ```
|
|
/// # use veloren_network::Promises;
|
|
/// use tokio::runtime::Runtime;
|
|
/// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
|
|
///
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it
|
|
/// let runtime = Runtime::new().unwrap();
|
|
/// let network = Network::new(Pid::new(), &runtime);
|
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
|
/// runtime.block_on(async {
|
|
/// network.listen(ListenAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
|
|
/// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
|
|
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
|
/// # stream_p.send("Hello World");
|
|
/// let participant_a = network.connected().await?;
|
|
/// let mut stream_a = participant_a.opened().await?;
|
|
/// //Recv Message
|
|
/// println!("{}", stream_a.recv::<String>().await?);
|
|
/// drop(network);
|
|
/// # drop(remote);
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
#[inline]
|
|
pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError> {
|
|
self.recv_raw().await?.deserialize()
|
|
}
|
|
|
|
/// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] or
|
|
/// [`decompress`] is executed for performance reasons.
|
|
///
|
|
/// # Example
|
|
/// ```
|
|
/// # use veloren_network::Promises;
|
|
/// use tokio::runtime::Runtime;
|
|
/// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
|
|
///
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, listen on Port `2230` and wait for a Stream to be opened, then listen on it
|
|
/// let runtime = Runtime::new().unwrap();
|
|
/// let network = Network::new(Pid::new(), &runtime);
|
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
|
/// runtime.block_on(async {
|
|
/// network.listen(ListenAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
|
|
/// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
|
|
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
|
/// # stream_p.send("Hello World");
|
|
/// let participant_a = network.connected().await?;
|
|
/// let mut stream_a = participant_a.opened().await?;
|
|
/// //Recv Message
|
|
/// let msg = stream_a.recv_raw().await?;
|
|
/// //Resend Message, without deserializing
|
|
/// stream_a.send_raw(&msg)?;
|
|
/// drop(network);
|
|
/// # drop(remote);
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`send_raw`]: Stream::send_raw
|
|
/// [`recv`]: Stream::recv
|
|
/// [`decompress`]: lz_fear::raw::decompress_raw
|
|
pub async fn recv_raw(&mut self) -> Result<Message, StreamError> {
|
|
match &mut self.b2a_msg_recv_r {
|
|
Some(b2a_msg_recv_r) => {
|
|
match b2a_msg_recv_r.recv().await {
|
|
Ok(data) => Ok(Message {
|
|
data,
|
|
#[cfg(feature = "compression")]
|
|
compressed: self.promises.contains(Promises::COMPRESSED),
|
|
}),
|
|
Err(_) => {
|
|
self.b2a_msg_recv_r = None; //prevent panic
|
|
Err(StreamError::StreamClosed)
|
|
},
|
|
}
|
|
},
|
|
None => Err(StreamError::StreamClosed),
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
pub fn try_recv_raw(&mut self) -> Result<Option<Message>, StreamError> {
|
|
match &mut self.b2a_msg_recv_r {
|
|
Some(b2a_msg_recv_r) => match b2a_msg_recv_r.try_recv() {
|
|
Ok(data) => Ok(Some(
|
|
Message {
|
|
data,
|
|
#[cfg(feature = "compression")]
|
|
compressed: self.promises.contains(Promises::COMPRESSED),
|
|
}
|
|
)),
|
|
Err(async_channel::TryRecvError::Empty) => Ok(None),
|
|
Err(async_channel::TryRecvError::Closed) => {
|
|
self.b2a_msg_recv_r = None; //prevent panic
|
|
Err(StreamError::StreamClosed)
|
|
},
|
|
},
|
|
None => Err(StreamError::StreamClosed),
|
|
}
|
|
}
|
|
|
|
/// use `try_recv` to check for a Message send from the remote side by their
|
|
/// `Stream`. This function does not block and returns immediately. It's
|
|
/// intended for use in non-async context only. Other then that, the
|
|
/// same rules apply than for [`recv`].
|
|
///
|
|
/// # Example
|
|
/// ```
|
|
/// # use veloren_network::Promises;
|
|
/// use tokio::runtime::Runtime;
|
|
/// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
|
|
///
|
|
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// // Create a Network, listen on Port `2240` and wait for a Stream to be opened, then listen on it
|
|
/// let runtime = Runtime::new().unwrap();
|
|
/// let network = Network::new(Pid::new(), &runtime);
|
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
|
/// runtime.block_on(async {
|
|
/// network.listen(ListenAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
|
|
/// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
|
|
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
|
/// # stream_p.send("Hello World");
|
|
/// # std::thread::sleep(std::time::Duration::from_secs(1));
|
|
/// let participant_a = network.connected().await?;
|
|
/// let mut stream_a = participant_a.opened().await?;
|
|
/// //Try Recv Message
|
|
/// println!("{:?}", stream_a.try_recv::<String>()?);
|
|
/// drop(network);
|
|
/// # drop(remote);
|
|
/// # Ok(())
|
|
/// })
|
|
/// # }
|
|
/// ```
|
|
///
|
|
/// [`recv`]: Stream::recv
|
|
#[inline]
|
|
pub fn try_recv<M: DeserializeOwned>(&mut self) -> Result<Option<M>, StreamError> {
|
|
self.try_recv_raw()?.map(|uncompressed| uncompressed.deserialize()).transpose()
|
|
}
|
|
|
|
pub fn params(&self) -> StreamParams {
|
|
StreamParams {
|
|
promises: self.promises,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl PartialEq for Participant {
|
|
fn eq(&self, other: &Self) -> bool {
|
|
//don't check local_pid, 2 Participant from different network should match if
|
|
// they are the "same"
|
|
self.remote_pid == other.remote_pid
|
|
}
|
|
}
|
|
|
|
fn actively_wait<T, F>(name: &'static str, mut finished_receiver: oneshot::Receiver<T>, f: F)
|
|
where
|
|
F: FnOnce(T) + Send + 'static,
|
|
T: Send + 'static,
|
|
{
|
|
const CHANNEL_ERR: &str = "Something is wrong in internal scheduler/participant coding";
|
|
|
|
if let Ok(handle) = tokio::runtime::Handle::try_current() {
|
|
// When in Async Context WE MUST NOT SYNC BLOCK (as a deadlock might occur as
|
|
// other is queued behind). And we CANNOT join our Future_Handle
|
|
trace!("async context detected, defer shutdown");
|
|
handle.spawn(async move {
|
|
match finished_receiver.await {
|
|
Ok(data) => f(data),
|
|
Err(e) => error!("{}{}: {}", name, CHANNEL_ERR, e),
|
|
}
|
|
});
|
|
} else {
|
|
let mut cnt = 0;
|
|
loop {
|
|
use tokio::sync::oneshot::error::TryRecvError;
|
|
match finished_receiver.try_recv() {
|
|
Ok(data) => {
|
|
f(data);
|
|
break;
|
|
},
|
|
Err(TryRecvError::Closed) => panic!("{}{}", name, CHANNEL_ERR),
|
|
Err(TryRecvError::Empty) => {
|
|
trace!("actively sleeping");
|
|
cnt += 1;
|
|
if cnt > 10 {
|
|
error!("Timeout waiting for shutdown, dropping");
|
|
break;
|
|
}
|
|
std::thread::sleep(Duration::from_millis(100) * cnt);
|
|
},
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
impl Drop for Network {
|
|
#[instrument(name="network", skip(self), fields(p = %self.local_pid))]
|
|
fn drop(&mut self) {
|
|
trace!("Dropping Network");
|
|
let (finished_sender, finished_receiver) = oneshot::channel();
|
|
match self
|
|
.shutdown_network_s
|
|
.take()
|
|
.unwrap()
|
|
.send(finished_sender)
|
|
{
|
|
Err(e) => warn!(?e, "Runtime seems to be dropped already"),
|
|
Ok(()) => actively_wait("network", finished_receiver, |()| {
|
|
info!("Network dropped gracefully")
|
|
}),
|
|
};
|
|
}
|
|
}
|
|
|
|
impl Drop for Participant {
|
|
#[instrument(name="remote", skip(self), fields(p = %self.remote_pid))]
|
|
#[instrument(name="network", skip(self), fields(p = %self.local_pid))]
|
|
fn drop(&mut self) {
|
|
const SHUTDOWN_ERR: &str = "Error while dropping the participant, couldn't send all \
|
|
outgoing messages, dropping remaining";
|
|
const SCHEDULER_ERR: &str =
|
|
"Something is wrong in internal scheduler coding or you dropped the runtime to early";
|
|
// ignore closed, as we need to send it even though we disconnected the
|
|
// participant from network
|
|
debug!("Shutting down Participant");
|
|
|
|
match self.a2s_disconnect_s.try_lock() {
|
|
Err(e) => debug!(?e, "Participant is being dropped by Network right now"),
|
|
Ok(mut s) => match s.take() {
|
|
None => info!("Participant already has been shutdown gracefully"),
|
|
Some(a2s_disconnect_s) => {
|
|
debug!("Disconnect from Scheduler");
|
|
let (finished_sender, finished_receiver) = oneshot::channel();
|
|
match a2s_disconnect_s
|
|
.send((self.remote_pid, (Duration::from_secs(10), finished_sender)))
|
|
{
|
|
Err(e) => warn!(?e, SCHEDULER_ERR),
|
|
Ok(()) => {
|
|
actively_wait("participant", finished_receiver, |d| match d {
|
|
Ok(()) => info!("Participant dropped gracefully"),
|
|
Err(e) => error!(?e, SHUTDOWN_ERR),
|
|
});
|
|
},
|
|
}
|
|
},
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for Stream {
|
|
#[instrument(name="remote", skip(self), fields(p = %self.remote_pid))]
|
|
#[instrument(name="network", skip(self), fields(p = %self.local_pid))]
|
|
|
|
fn drop(&mut self) {
|
|
// send if closed is unnecessary but doesn't hurt, we must not crash
|
|
let sid = self.sid;
|
|
if !self.send_closed.load(Ordering::Relaxed) {
|
|
debug!(?sid, "Shutting down Stream");
|
|
if let Err(e) = self.a2b_close_stream_s.take().unwrap().send(self.sid) {
|
|
debug!(
|
|
?e,
|
|
"bparticipant part of a gracefully shutdown was already closed"
|
|
);
|
|
}
|
|
} else {
|
|
trace!(?sid, "Stream Drop not needed");
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::fmt::Debug for Participant {
|
|
#[inline]
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
write!(
|
|
f,
|
|
"Participant {{ local_pid: {:?}, remote_pid: {:?} }}",
|
|
&self.local_pid, &self.remote_pid,
|
|
)
|
|
}
|
|
}
|
|
|
|
impl<T> From<crossbeam_channel::SendError<T>> for StreamError {
|
|
fn from(_err: crossbeam_channel::SendError<T>) -> Self { StreamError::StreamClosed }
|
|
}
|
|
|
|
impl<T> From<crossbeam_channel::SendError<T>> for NetworkError {
|
|
fn from(_err: crossbeam_channel::SendError<T>) -> Self { NetworkError::NetworkClosed }
|
|
}
|
|
|
|
impl<T> From<mpsc::error::SendError<T>> for NetworkError {
|
|
fn from(_err: mpsc::error::SendError<T>) -> Self { NetworkError::NetworkClosed }
|
|
}
|
|
|
|
impl From<oneshot::error::RecvError> for NetworkError {
|
|
fn from(_err: oneshot::error::RecvError) -> Self { NetworkError::NetworkClosed }
|
|
}
|
|
|
|
impl From<io::Error> for NetworkError {
|
|
fn from(_err: io::Error) -> Self { NetworkError::NetworkClosed }
|
|
}
|
|
|
|
impl From<Box<bincode::ErrorKind>> for StreamError {
|
|
fn from(err: Box<bincode::ErrorKind>) -> Self { StreamError::Deserialize(err) }
|
|
}
|
|
|
|
impl core::fmt::Display for StreamError {
|
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
|
match self {
|
|
StreamError::StreamClosed => write!(f, "stream closed"),
|
|
#[cfg(feature = "compression")]
|
|
StreamError::Compression(err) => write!(f, "compression error on message: {}", err),
|
|
StreamError::Deserialize(err) => write!(f, "deserialize error on message: {}", err),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl core::fmt::Display for ParticipantError {
|
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
|
match self {
|
|
ParticipantError::ParticipantDisconnected => write!(f, "Participant disconnect"),
|
|
ParticipantError::ProtocolFailedUnrecoverable => {
|
|
write!(f, "underlying protocol failed unrecoverable")
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
impl core::fmt::Display for NetworkError {
|
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
|
match self {
|
|
NetworkError::NetworkClosed => write!(f, "Network closed"),
|
|
NetworkError::ListenFailed(_) => write!(f, "Listening failed"),
|
|
NetworkError::ConnectFailed(_) => write!(f, "Connecting failed"),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl core::fmt::Display for NetworkConnectError {
|
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
|
match self {
|
|
NetworkConnectError::Io(e) => write!(f, "Io error: {}", e),
|
|
NetworkConnectError::Handshake(e) => write!(f, "Handshake error: {}", e),
|
|
NetworkConnectError::InvalidSecret => {
|
|
write!(f, "You specified the wrong secret on your second channel")
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
/// implementing PartialEq as it's super convenient in tests
|
|
impl PartialEq for StreamError {
|
|
fn eq(&self, other: &Self) -> bool {
|
|
match self {
|
|
StreamError::StreamClosed => match other {
|
|
StreamError::StreamClosed => true,
|
|
#[cfg(feature = "compression")]
|
|
StreamError::Compression(_) => false,
|
|
StreamError::Deserialize(_) => false,
|
|
},
|
|
#[cfg(feature = "compression")]
|
|
StreamError::Compression(err) => match other {
|
|
StreamError::StreamClosed => false,
|
|
#[cfg(feature = "compression")]
|
|
StreamError::Compression(other_err) => err == other_err,
|
|
StreamError::Deserialize(_) => false,
|
|
},
|
|
StreamError::Deserialize(err) => match other {
|
|
StreamError::StreamClosed => false,
|
|
#[cfg(feature = "compression")]
|
|
StreamError::Compression(_) => false,
|
|
StreamError::Deserialize(other_err) => partial_eq_bincode(err, other_err),
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::error::Error for StreamError {}
|
|
impl std::error::Error for ParticipantError {}
|
|
impl std::error::Error for NetworkError {}
|
|
impl std::error::Error for NetworkConnectError {}
|