mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
DOCUMENTATION for everything
This commit is contained in:
parent
a8f1bc178a
commit
007f5cabaa
@ -108,7 +108,7 @@ fn main() {
|
||||
}
|
||||
|
||||
fn server(address: Address) {
|
||||
let thread_pool = ThreadPoolBuilder::new().build();
|
||||
let thread_pool = ThreadPoolBuilder::new().num_threads(1).build();
|
||||
let mut metrics = metrics::SimpleMetrics::new();
|
||||
let server = Network::new(Pid::new(), &thread_pool, Some(metrics.registry()));
|
||||
metrics.run("0.0.0.0:59112".parse().unwrap()).unwrap();
|
||||
@ -136,7 +136,7 @@ fn server(address: Address) {
|
||||
}
|
||||
|
||||
fn client(address: Address) {
|
||||
let thread_pool = ThreadPoolBuilder::new().build();
|
||||
let thread_pool = ThreadPoolBuilder::new().num_threads(1).build();
|
||||
let mut metrics = metrics::SimpleMetrics::new();
|
||||
let client = Network::new(Pid::new(), &thread_pool, Some(metrics.registry()));
|
||||
metrics.run("0.0.0.0:59111".parse().unwrap()).unwrap();
|
||||
|
@ -22,6 +22,7 @@ use tracing::*;
|
||||
use tracing_futures::Instrument;
|
||||
use uvth::ThreadPool;
|
||||
|
||||
/// Represents a Tcp or Udp or Mpsc address
|
||||
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
|
||||
pub enum Address {
|
||||
Tcp(std::net::SocketAddr),
|
||||
@ -29,6 +30,13 @@ pub enum Address {
|
||||
Mpsc(u64),
|
||||
}
|
||||
|
||||
/// `Participants` are generated by the [`Network`] and represent a connection
|
||||
/// to a remote Participant. Look at the [`connect`] and [`connected`] method of
|
||||
/// [`Networks`] on how to generate `Participants`
|
||||
///
|
||||
/// [`Networks`]: crate::api::Network
|
||||
/// [`connect`]: Network::connect
|
||||
/// [`connected`]: Network::connected
|
||||
pub struct Participant {
|
||||
local_pid: Pid,
|
||||
remote_pid: Pid,
|
||||
@ -38,6 +46,23 @@ pub struct Participant {
|
||||
disconnect_sender: Option<mpsc::UnboundedSender<Pid>>,
|
||||
}
|
||||
|
||||
/// `Streams` represents a channel to send `n` messages with a certain priority
|
||||
/// and [`Promises`]. messages need always to be send between 2 `Streams`.
|
||||
///
|
||||
/// `Streams` are generated by the [`Participant`].
|
||||
/// Look at the [`open`] and [`opened`] method of [`Participant`] on how to
|
||||
/// generate `Streams`
|
||||
///
|
||||
/// Unlike [`Network`] and [`Participant`], `Streams` don't implement interior
|
||||
/// mutability, as multiple threads don't need access to the same `Stream`.
|
||||
/// [`Sync`] is not supported! In that case multiple `Streams` should be used
|
||||
/// instead. However it's still possible to [`Send`] `Streams`.
|
||||
///
|
||||
/// [`Networks`]: crate::api::Network
|
||||
/// [`open`]: Participant::open
|
||||
/// [`opened`]: Participant::opened
|
||||
/// [`Send`]: std::marker::Send
|
||||
/// [`Sync`]: std::marker::Sync
|
||||
#[derive(Debug)]
|
||||
pub struct Stream {
|
||||
pid: Pid,
|
||||
@ -51,22 +76,52 @@ pub struct Stream {
|
||||
shutdown_sender: Option<mpsc::UnboundedSender<Sid>>,
|
||||
}
|
||||
|
||||
/// Error type thrown by [`Networks`](Network) methods
|
||||
#[derive(Debug)]
|
||||
pub enum NetworkError {
|
||||
NetworkClosed,
|
||||
ListenFailed(std::io::Error),
|
||||
}
|
||||
|
||||
/// Error type thrown by [`Participants`](Participant) methods
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ParticipantError {
|
||||
ParticipantClosed,
|
||||
}
|
||||
|
||||
/// Error type thrown by [`Streams`](Stream) methods
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum StreamError {
|
||||
StreamClosed,
|
||||
}
|
||||
|
||||
/// Use the `Network` to create connections to other [`Participants`]
|
||||
///
|
||||
/// The `Network` is the single source that handles all connections in your
|
||||
/// Application. You can pass it around multiple threads in an
|
||||
/// [`Arc`](std::sync::Arc) as all commands have internal mutability.
|
||||
///
|
||||
/// The `Network` has methods to [`connect`] and [`disconnect`] to other
|
||||
/// [`Participants`] via their [`Address`]. All [`Participants`] will be stored
|
||||
/// in the Network until explicitly disconnected, which is the only way to close
|
||||
/// the sockets.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```rust
|
||||
/// use veloren_network::{Network, Pid};
|
||||
/// use uvth::ThreadPoolBuilder;
|
||||
///
|
||||
/// // Create a Network, listen on port `12345` to accept connections and connect to port `80` to connect to a (pseudo) database Application
|
||||
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
|
||||
/// block_on(async {
|
||||
/// network.listen(Address::Tcp("127.0.0.1:12345".parse().unwrap())).await?;
|
||||
/// let database = network.connect(Address::Tcp("127.0.0.1:80".parse().unwrap())).await?;
|
||||
/// });
|
||||
/// ```
|
||||
///
|
||||
/// [`Participants`]: crate::api::Participant
|
||||
/// [`connect`]: Network::connect
|
||||
/// [`disconnect`]: Network::disconnect
|
||||
pub struct Network {
|
||||
local_pid: Pid,
|
||||
participants: RwLock<HashMap<Pid, Arc<Participant>>>,
|
||||
@ -79,8 +134,33 @@ pub struct Network {
|
||||
}
|
||||
|
||||
impl Network {
|
||||
/// Generates a new `Network` to handle all connections in an Application
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `participant_id` - provide it by calling [`Pid::new()`], usually you
|
||||
/// don't want to reuse a Pid for 2 `Networks`
|
||||
/// * `thread_pool` - you need to provide a [`ThreadPool`] where exactly 1
|
||||
/// thread will be created to handle all `Network` internals. Additional
|
||||
/// threads will be allocated on an internal async-aware threadpool
|
||||
/// * `registry` - Provide a Registy in order to collect Prometheus metrics
|
||||
/// by this `Network`, `None` will deactivate Tracing. Tracing is done via
|
||||
/// [`prometheus`]
|
||||
///
|
||||
/// # Examples
|
||||
/// ```rust
|
||||
/// use uvth::ThreadPoolBuilder;
|
||||
/// use veloren_network::{Network, Pid};
|
||||
///
|
||||
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
|
||||
/// ```
|
||||
///
|
||||
/// Usually you only create a single `Network` for an application, except
|
||||
/// when client and server are in the same application, then you will want
|
||||
/// 2. However there are no technical limitations from creating more.
|
||||
///
|
||||
/// [`Pid::new()`]: crate::types::Pid::new
|
||||
/// [`ThreadPool`]: uvth::ThreadPool
|
||||
pub fn new(participant_id: Pid, thread_pool: &ThreadPool, registry: Option<&Registry>) -> Self {
|
||||
//let participants = RwLock::new(vec![]);
|
||||
let p = participant_id;
|
||||
debug!(?p, ?User, "starting Network");
|
||||
let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) =
|
||||
@ -104,6 +184,31 @@ impl Network {
|
||||
}
|
||||
}
|
||||
|
||||
/// starts listening on an [`Address`].
|
||||
/// When the method returns the `Network` is ready to listen for incoming
|
||||
/// connections OR has returned a [`NetworkError`] (e.g. port already used).
|
||||
/// You can call [`connected`] to asynchrony wait for a [`Participant`] to
|
||||
/// connect. You can call `listen` on multiple addresses, e.g. to
|
||||
/// support multiple Protocols or NICs.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```rust
|
||||
/// use uvth::ThreadPoolBuilder;
|
||||
/// use veloren_network::{Network, Pid};
|
||||
///
|
||||
/// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally
|
||||
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
|
||||
/// block_on(async {
|
||||
/// network
|
||||
/// .listen(Address::Tcp("0.0.0.0:2000".parse().unwrap()))
|
||||
/// .await?;
|
||||
/// network
|
||||
/// .listen(Address::Udp("127.0.0.1:2001".parse().unwrap()))
|
||||
/// .await?;
|
||||
/// });
|
||||
/// ```
|
||||
///
|
||||
/// [`connected`]: Network::connected
|
||||
pub async fn listen(&self, address: Address) -> Result<(), NetworkError> {
|
||||
let (result_sender, result_receiver) = oneshot::channel::<async_std::io::Result<()>>();
|
||||
debug!(?address, ?User, "listening on address");
|
||||
@ -120,6 +225,35 @@ impl Network {
|
||||
}
|
||||
}
|
||||
|
||||
/// starts connectiong to an [`Address`].
|
||||
/// When the method returns the Network either returns a [`Participant`]
|
||||
/// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g.
|
||||
/// can't connect, or invalid Handshake) # Examples
|
||||
/// ```rust
|
||||
/// use uvth::ThreadPoolBuilder;
|
||||
/// use veloren_network::{Network, Pid};
|
||||
///
|
||||
/// // Create a Network, connect on port `2000` TCP and `2001` UDP like listening above
|
||||
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
|
||||
/// block_on(async {
|
||||
/// let p1 = network
|
||||
/// .connect(Address::Tcp("127.0.0.1:2000".parse().unwrap()))
|
||||
/// .await?;
|
||||
/// let p2 = network
|
||||
/// .connect(Address::Udp("127.0.0.1:2001".parse().unwrap()))
|
||||
/// .await?;
|
||||
/// assert!(p1.ptr_eq(p2));
|
||||
/// });
|
||||
/// ```
|
||||
/// Usually the `Network` guarantees that a operation on a [`Participant`]
|
||||
/// succeeds, e.g. by automatic retrying unless it fails completely e.g. by
|
||||
/// disconnecting from the remote. If 2 [`Addresses`] you `connect` to
|
||||
/// belongs to the same [`Participant`], you get the same [`Participant`] as
|
||||
/// a result. This is useful e.g. by connecting to the same
|
||||
/// [`Participant`] via multiple Protocols.
|
||||
///
|
||||
/// [`Streams`]: crate::api::Stream
|
||||
/// [`Addresses`]: crate::api::Address
|
||||
pub async fn connect(&self, address: Address) -> Result<Arc<Participant>, NetworkError> {
|
||||
let (pid_sender, pid_receiver) = oneshot::channel::<io::Result<Participant>>();
|
||||
debug!(?address, ?User, "connect to address");
|
||||
@ -143,6 +277,30 @@ impl Network {
|
||||
Ok(participant)
|
||||
}
|
||||
|
||||
/// returns a [`Participant`] created from a [`Address`] you called
|
||||
/// [`listen`] on before. This function will either return a working
|
||||
/// [`Participant`] ready to open [`Streams`] on OR has returned a
|
||||
/// [`NetworkError`] (e.g. Network got closed)
|
||||
///
|
||||
/// # Examples
|
||||
/// ```rust
|
||||
/// use uvth::ThreadPoolBuilder;
|
||||
/// use veloren_network::{Network, Pid};
|
||||
///
|
||||
/// // Create a Network, listen on port `2000` TCP and opens returns their Pid
|
||||
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
|
||||
/// block_on(async {
|
||||
/// network
|
||||
/// .listen(Address::Tcp("0.0.0.0:2000".parse().unwrap()))
|
||||
/// .await?;
|
||||
/// while let Some(participant) = network.connected().await? {
|
||||
/// println!("Participant connected: {}", participant.remote_pid());
|
||||
/// }
|
||||
/// });
|
||||
/// ```
|
||||
///
|
||||
/// [`Streams`]: crate::api::Stream
|
||||
/// [`listen`]: crate::api::Network::listen
|
||||
pub async fn connected(&self) -> Result<Arc<Participant>, NetworkError> {
|
||||
let participant = self.connected_receiver.write().await.next().await?;
|
||||
let participant = Arc::new(participant);
|
||||
@ -153,6 +311,37 @@ impl Network {
|
||||
Ok(participant)
|
||||
}
|
||||
|
||||
/// disconnecting a [`Participant`] where you move the last existing
|
||||
/// [`Arc<Participant>`]. As the [`Network`] also holds [`Arc`] to the
|
||||
/// [`Participant`], you need to provide the last [`Arc<Participant>`] and
|
||||
/// are not allowed to keep others. If you do so the [`Participant`]
|
||||
/// can't be disconnected properly. If you no longer have the respective
|
||||
/// [`Participant`], try using the [`participants`] method to get it.
|
||||
/// This function will wait for all [`Streams`] to properly close, including
|
||||
/// all messages to be send before closing.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```rust
|
||||
/// use uvth::ThreadPoolBuilder;
|
||||
/// use veloren_network::{Network, Pid};
|
||||
///
|
||||
/// // Create a Network, listen on port `2000` TCP and opens returns their Pid and close connection.
|
||||
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
|
||||
/// block_on(async {
|
||||
/// network
|
||||
/// .listen(Address::Tcp("0.0.0.0:2000".parse().unwrap()))
|
||||
/// .await?;
|
||||
/// while let Some(participant) = network.connected().await? {
|
||||
/// println!("Participant connected: {}", participant.remote_pid());
|
||||
/// network.disconnect(participant).await?;
|
||||
/// }
|
||||
/// });
|
||||
/// ```
|
||||
///
|
||||
/// [`Arc<Participant>`]: crate::api::Participant
|
||||
/// [`Streams`]: crate::api::Stream
|
||||
/// [`participants`]: Network::participants
|
||||
/// [`Arc`]: std::sync::Arc
|
||||
pub async fn disconnect(&self, participant: Arc<Participant>) -> Result<(), NetworkError> {
|
||||
// Remove, Close and try_unwrap error when unwrap fails!
|
||||
let pid = participant.remote_pid;
|
||||
@ -169,6 +358,9 @@ impl Network {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// returns a copy of all current connected [`Participants`]
|
||||
///
|
||||
/// [`Participants`]: crate::api::Participant
|
||||
pub async fn participants(&self) -> HashMap<Pid, Arc<Participant>> {
|
||||
self.participants.read().await.clone()
|
||||
}
|
||||
@ -192,6 +384,41 @@ impl Participant {
|
||||
}
|
||||
}
|
||||
|
||||
/// Opens a [`Stream`] on this `Participant` with a certain Priority and
|
||||
/// [`Promises`]
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `prio` - valid between 0-63. The priority rates the throughput for
|
||||
/// messages of the [`Stream`] e.g. prio 5 messages will get 1/2 the speed
|
||||
/// prio0 messages have. Prio10 messages only 1/4 and Prio 15 only 1/8,
|
||||
/// etc...
|
||||
/// * `promises` - use a combination of you prefered [`Promises`], see the
|
||||
/// link for further documentation. You can combine them, e.g.
|
||||
/// `PROMISES_ORDERED | PROMISES_CONSISTENCY` The Stream will then
|
||||
/// guarantee that those promisses are met.
|
||||
///
|
||||
/// A [`ParticipantError`] might be thrown if the `Participant` is already
|
||||
/// closed. [`Streams`] can be created without a answer from the remote
|
||||
/// side, resulting in very fast creation and closing latency.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```rust
|
||||
/// use uvth::ThreadPoolBuilder;
|
||||
/// use veloren_network::{Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
||||
///
|
||||
/// // Create a Network, connect on port 2000 and open a stream
|
||||
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
|
||||
/// block_on(async {
|
||||
/// let p1 = network
|
||||
/// .connect(Address::Tcp("127.0.0.1:2000".parse().unwrap()))
|
||||
/// .await?;
|
||||
/// let _s1 = p1
|
||||
/// .open(100, PROMISES_ORDERED | PROMISES_CONSISTENCY)
|
||||
/// .await?;
|
||||
/// });
|
||||
/// ```
|
||||
///
|
||||
/// [`Streams`]: crate::api::Stream
|
||||
pub async fn open(&self, prio: u8, promises: Promises) -> Result<Stream, ParticipantError> {
|
||||
//use this lock for now to make sure that only one open at a time is made,
|
||||
// TODO: not sure if we can paralise that, check in future
|
||||
@ -224,6 +451,30 @@ impl Participant {
|
||||
}
|
||||
}
|
||||
|
||||
/// Use this method to handle [`Streams`] opened from remote site, like the
|
||||
/// [`connected`] method of [`Network`]. This is the associated method
|
||||
/// to [`open`]. It's guaranteed that the order of [`open`] and `opened`
|
||||
/// is equal. The `nth` [`Streams`] on one side will represent the `nth` on
|
||||
/// the other side. A [`ParticipantError`] might be thrown if the
|
||||
/// `Participant` is already closed.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```rust
|
||||
/// use veloren_network::{Network, Pid, PROMISES_ORDERED, PROMISES_CONSISTENCY};
|
||||
/// use uvth::ThreadPoolBuilder;
|
||||
///
|
||||
/// // Create a Network, connect on port 2000 and wait for the other side to open a stream
|
||||
/// // Note: It's quite unusal to activly connect, but then wait on a stream to be connected, usually the Appication taking initiative want's to also create the first Stream.
|
||||
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
|
||||
/// block_on(async {
|
||||
/// let p1 = network.connect(Address::Tcp("127.0.0.1:2000".parse().unwrap())).await?;
|
||||
/// let _s1 = p1.opened().await?;
|
||||
/// });
|
||||
/// ```
|
||||
///
|
||||
/// [`Streams`]: crate::api::Stream
|
||||
/// [`connected`]: Network::connected
|
||||
/// [`open`]: Participant::open
|
||||
pub async fn opened(&self) -> Result<Stream, ParticipantError> {
|
||||
//use this lock for now to make sure that only one open at a time is made,
|
||||
// TODO: not sure if we can paralise that, check in future
|
||||
@ -246,6 +497,7 @@ impl Participant {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the remote [`Pid`]
|
||||
pub fn remote_pid(&self) -> Pid { self.remote_pid }
|
||||
}
|
||||
|
||||
@ -273,10 +525,77 @@ impl Stream {
|
||||
}
|
||||
}
|
||||
|
||||
/// use to send a arbitrary message to the remote side, by having the remote
|
||||
/// side also opened a `Stream` linked to this. the message will be
|
||||
/// [`Serialized`], which actually is quite slow compared to most other
|
||||
/// calculations done. A faster method [`send_raw`] exists, when extra
|
||||
/// speed is needed. The other side needs to use the respective [`recv`]
|
||||
/// function and know the type send.
|
||||
///
|
||||
/// `send` is an exception to the `async` messages, as it's probably called
|
||||
/// quite often so it doesn't wait for execution. Which also means, that
|
||||
/// no feedback is provided. It's to assume that the Message got `send`
|
||||
/// correctly. If a error occurred, the next call will return an Error.
|
||||
/// If the [`Participant`] disconnected it will also be unable to be used
|
||||
/// any more. A [`StreamError`] will be returned in the error case, e.g.
|
||||
/// when the `Stream` got closed already.
|
||||
///
|
||||
/// Note when a `Stream` is dropped, it will still send all messages, though
|
||||
/// the `drop` will return immediately, however, when a [`Participant`]
|
||||
/// gets gracefully shut down, all remaining messages will be send.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use futures::executor::block_on;
|
||||
/// use veloren_network::{Network, Pid};
|
||||
///
|
||||
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
|
||||
/// block_on(async {
|
||||
/// let participant_a = network.connected().await;
|
||||
/// let mut stream_a = participant_a.opened().await;
|
||||
/// //Send Message
|
||||
/// stream_a.send("Hello World");
|
||||
/// });
|
||||
/// ```
|
||||
///
|
||||
/// [`send_raw`]: Stream::send_raw
|
||||
/// [`recv`]: Stream::recv
|
||||
/// [`Serialized`]: Serialize
|
||||
pub fn send<M: Serialize>(&mut self, msg: M) -> Result<(), StreamError> {
|
||||
self.send_raw(Arc::new(message::serialize(&msg)))
|
||||
}
|
||||
|
||||
/// This methods give the option to skip multiple calls of [`bincode`], e.g.
|
||||
/// in case the same Message needs to send on multiple `Streams` to multiple
|
||||
/// [`Participants`]. Other then that, the same rules apply than for
|
||||
/// [`send`]
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use bincode;
|
||||
/// use futures::executor::block_on;
|
||||
/// use veloren_network::{Network, Pid};
|
||||
///
|
||||
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
|
||||
/// block_on(async {
|
||||
/// let participant_a = network.connected().await;
|
||||
/// let participant_b = network.connected().await;
|
||||
/// let mut stream_a = participant_a.opened().await;
|
||||
/// let mut stream_b = participant_a.opened().await;
|
||||
///
|
||||
/// //Prepare Message and decode it
|
||||
/// let msg = "Hello World";
|
||||
/// let raw_msg = Arc::new(MessageBuffer {
|
||||
/// data: bincode::serialize(&msg).unwrap(),
|
||||
/// });
|
||||
/// //Send same Message to multiple Streams
|
||||
/// stream_a.send_raw(raw_msg.clone());
|
||||
/// stream_b.send_raw(raw_msg.clone());
|
||||
/// });
|
||||
/// ```
|
||||
///
|
||||
/// [`send`]: Stream::send
|
||||
/// [`Participants`]: crate::api::Participant
|
||||
pub fn send_raw(&mut self, messagebuffer: Arc<MessageBuffer>) -> Result<(), StreamError> {
|
||||
if self.closed.load(Ordering::Relaxed) {
|
||||
return Err(StreamError::StreamClosed);
|
||||
@ -293,10 +612,23 @@ impl Stream {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// use `recv` to wait on a Message send from the remote side by their
|
||||
/// `Stream`. The Message needs to implement [`DeserializeOwned`] and
|
||||
/// thus, the resulting type must already be known by the receiving side.
|
||||
/// If this is not know from the Application logic, one could use a `Enum`
|
||||
/// and then handle the received message via a `match` state.
|
||||
///
|
||||
/// A [`StreamError`] will be returned in the error case, e.g. when the
|
||||
/// `Stream` got closed already.
|
||||
pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError> {
|
||||
Ok(message::deserialize(self.recv_raw().await?))
|
||||
}
|
||||
|
||||
/// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] is
|
||||
/// executed for performance reasons.
|
||||
///
|
||||
/// [`send_raw`]: Stream::send_raw
|
||||
/// [`recv`]: Stream::recv
|
||||
pub async fn recv_raw(&mut self) -> Result<MessageBuffer, StreamError> {
|
||||
//no need to access self.closed here, as when this stream is closed the Channel
|
||||
// is closed which will trigger a None
|
||||
|
@ -1,5 +1,90 @@
|
||||
#![feature(trait_alias, try_trait, async_closure)]
|
||||
|
||||
//! Crate to handle high level networking of messages with different
|
||||
//! requirements and priorities over a number of protocols
|
||||
//!
|
||||
//! To start with the `veloren_network` crate you should focus on the 3
|
||||
//! elementar structs [`Network`], [`Participant`] and [`Stream`].
|
||||
//!
|
||||
//! Say you have an application that wants to communicate with other application
|
||||
//! over a Network or on the same computer. Now each application instances the
|
||||
//! struct [`Network`] once with a new [`Pid`]. The Pid is necessary to identify
|
||||
//! other [`Networks`] over the network protocols (e.g. TCP, UDP)
|
||||
//!
|
||||
//! To connect to another application, you must know it's [`Address`]. One side
|
||||
//! will call [`connect`], the other [`connected`]. If successfull both
|
||||
//! applications will now get a [`Arc<Participant>`].
|
||||
//!
|
||||
//! This [`Participant`] represents the connection between those 2 applications.
|
||||
//! over the respective [`Address`] and with it the choosen network protocol.
|
||||
//! However messages can't be send directly via [`Participants`], instead you
|
||||
//! must open a [`Stream`] on it. Like above, one side has to call [`open`], the
|
||||
//! other [`opened`]. [`Streams`] can have a different priority and
|
||||
//! [`Promises`].
|
||||
//!
|
||||
//! You can now use the [`Stream`] to [`send`] and [`recv`] in both directions.
|
||||
//! You can send all kind of messages that implement [`serde`].
|
||||
//! As the receiving side needs to know the format, it sometimes is useful to
|
||||
//! always send a specific Enum and then handling it with a big `match`
|
||||
//! statement This create makes heavily use of `async`, except for [`send`]
|
||||
//! which returns always directly.
|
||||
//!
|
||||
//! For best practices see the `examples` folder of this crate containing useful
|
||||
//! code snippets, a simple client/server below. Of course due to the async
|
||||
//! nature, no strict client server separation is necessary
|
||||
//!
|
||||
//! # Examples
|
||||
//! ```rust
|
||||
//! // Client
|
||||
//! use futures::executor::block_on;
|
||||
//! use veloren_network::{Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
||||
//!
|
||||
//! let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
|
||||
//! block_on(async {
|
||||
//! let server = network
|
||||
//! .connect(Address::Tcp("127.0.0.1:12345".parse().unwrap()))
|
||||
//! .await?;
|
||||
//! let stream = server
|
||||
//! .open(10, PROMISES_ORDERED | PROMISES_CONSISTENCY)
|
||||
//! .await?;
|
||||
//! stream.send("Hello World")?;
|
||||
//! });
|
||||
//! ```
|
||||
//!
|
||||
//! ```rust
|
||||
//! // Server
|
||||
//! use futures::executor::block_on;
|
||||
//! use veloren_network::{Network, Pid};
|
||||
//!
|
||||
//! let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
|
||||
//! block_on(async {
|
||||
//! network
|
||||
//! .listen(Address::Tcp("127.0.0.1:12345".parse().unwrap()))
|
||||
//! .await?;
|
||||
//! let client = network.connected().await?;
|
||||
//! let stream = server.opened().await?;
|
||||
//! let msg: String = stream.recv().await?;
|
||||
//! println!("got message: {}", msg);
|
||||
//! });
|
||||
//! ```
|
||||
//!
|
||||
//! [`Network`]: crate::api::Network
|
||||
//! [`Networks`]: crate::api::Network
|
||||
//! [`connect`]: crate::api::Network::connect
|
||||
//! [`connected`]: crate::api::Network::connected
|
||||
//! [`Arc<Participant>`]: crate::api::Participant
|
||||
//! [`Participant`]: crate::api::Participant
|
||||
//! [`Participants`]: crate::api::Participant
|
||||
//! [`open`]: crate::api::Participant::open
|
||||
//! [`opened`]: crate::api::Participant::opened
|
||||
//! [`Stream`]: crate::api::Stream
|
||||
//! [`Streams`]: crate::api::Stream
|
||||
//! [`send`]: crate::api::Stream::send
|
||||
//! [`recv`]: crate::api::Stream::recv
|
||||
//! [`Pid`]: crate::types::Pid
|
||||
//! [`Address`]: crate::api::Address
|
||||
//! [`Promises`]: crate::types::Promises
|
||||
|
||||
mod api;
|
||||
mod channel;
|
||||
mod message;
|
||||
|
@ -4,9 +4,17 @@ use serde::{de::DeserializeOwned, Serialize};
|
||||
use crate::types::{Mid, Sid};
|
||||
use std::sync::Arc;
|
||||
|
||||
//Todo: Evaluate switching to VecDeque for quickly adding and removing data
|
||||
// from front, back.
|
||||
// - It would prob requiere custom bincode code but thats possible.
|
||||
/// Support struct used for optimising sending the same Message to multiple
|
||||
/// [`Stream`]
|
||||
///
|
||||
/// For an example usage see: [`send_raw`]
|
||||
///
|
||||
/// [`Stream`]: crate::api::Stream
|
||||
/// [`send_raw`]: crate::api::Stream::send_raw
|
||||
pub struct MessageBuffer {
|
||||
// use VecDeque for msg storage, because it allows to quickly remove data from front.
|
||||
//however VecDeque needs custom bincode code, but it's possible
|
||||
pub data: Vec<u8>,
|
||||
}
|
||||
|
||||
|
@ -1,10 +1,9 @@
|
||||
/*
|
||||
Priorities are handled the following way.
|
||||
Prios from 0-63 are allowed.
|
||||
all 5 numbers the throughput i halved.
|
||||
E.g. in the same time 100 prio0 messages are send, only 50 prio5, 25 prio10, 12 prio15 or 6 prio20 messages are send.
|
||||
Note: TODO: prio0 will be send immeadiatly when found!
|
||||
*/
|
||||
//!Priorities are handled the following way.
|
||||
//!Prios from 0-63 are allowed.
|
||||
//!all 5 numbers the throughput is halved.
|
||||
//!E.g. in the same time 100 prio0 messages are send, only 50 prio5, 25 prio10,
|
||||
//! 12 prio15 or 6 prio20 messages are send. Note: TODO: prio0 will be send
|
||||
//! immeadiatly when found!
|
||||
|
||||
use crate::{
|
||||
message::OutGoingMessage,
|
||||
|
@ -3,13 +3,34 @@ use rand::Rng;
|
||||
pub type Mid = u64;
|
||||
pub type Cid = u64;
|
||||
pub type Prio = u8;
|
||||
/// use promises to modify the behavior of [`Streams`].
|
||||
/// available promises are:
|
||||
/// * [`PROMISES_NONE`]
|
||||
/// * [`PROMISES_ORDERED`]
|
||||
/// * [`PROMISES_CONSISTENCY`]
|
||||
/// * [`PROMISES_GUARANTEED_DELIVERY`]
|
||||
/// * [`PROMISES_COMPRESSED`]
|
||||
/// * [`PROMISES_ENCRYPTED`]
|
||||
///
|
||||
/// [`Streams`]: crate::api::Stream
|
||||
pub type Promises = u8;
|
||||
|
||||
/// use for no special promises on this [`Stream`](crate::api::Stream).
|
||||
pub const PROMISES_NONE: Promises = 0;
|
||||
/// this will guarantee that the order of messages which are send on one side,
|
||||
/// is the same when received on the other.
|
||||
pub const PROMISES_ORDERED: Promises = 1;
|
||||
/// this will guarantee that messages received haven't been altered by errors,
|
||||
/// like bit flips, this is done with a checksum.
|
||||
pub const PROMISES_CONSISTENCY: Promises = 2;
|
||||
/// this will guarantee that the other side will receive every message exactly
|
||||
/// once no messages are droped
|
||||
pub const PROMISES_GUARANTEED_DELIVERY: Promises = 4;
|
||||
/// this will enable the internal compression on this
|
||||
/// [`Stream`](crate::api::Stream)
|
||||
pub const PROMISES_COMPRESSED: Promises = 8;
|
||||
/// this will enable the internal encryption on this
|
||||
/// [`Stream`](crate::api::Stream)
|
||||
pub const PROMISES_ENCRYPTED: Promises = 16;
|
||||
|
||||
pub(crate) const VELOREN_MAGIC_NUMBER: [u8; 7] = [86, 69, 76, 79, 82, 69, 78]; //VELOREN
|
||||
@ -17,6 +38,11 @@ pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 2, 0];
|
||||
pub(crate) const STREAM_ID_OFFSET1: Sid = Sid::new(0);
|
||||
pub(crate) const STREAM_ID_OFFSET2: Sid = Sid::new(u64::MAX / 2);
|
||||
|
||||
/// Support struct used for uniquely identifying [`Participant`] over the
|
||||
/// [`Network`].
|
||||
///
|
||||
/// [`Participant`]: crate::api::Participant
|
||||
/// [`Network`]: crate::api::Network
|
||||
#[derive(PartialEq, Eq, Hash, Clone, Copy)]
|
||||
pub struct Pid {
|
||||
internal: u128,
|
||||
@ -101,6 +127,16 @@ pub(crate) enum Requestor {
|
||||
}
|
||||
|
||||
impl Pid {
|
||||
/// create a new Pid with a random interior value
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use uvth::ThreadPoolBuilder;
|
||||
/// use veloren_network::Network;
|
||||
///
|
||||
/// let pid = Pid::new();
|
||||
/// let _network = Network::new(pid, ThreadPoolBuilder::new().build(), None);
|
||||
/// ```
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
internal: rand::thread_rng().gen(),
|
||||
@ -108,8 +144,9 @@ impl Pid {
|
||||
}
|
||||
|
||||
/// don't use fake! just for testing!
|
||||
/// This will panic if pid i greater than 7, as i do not want you to use
|
||||
/// This will panic if pid i greater than 7, as I do not want you to use
|
||||
/// this in production!
|
||||
#[doc(hidden)]
|
||||
pub fn fake(pid: u8) -> Self {
|
||||
assert!(pid < 8);
|
||||
Self {
|
||||
|
Loading…
Reference in New Issue
Block a user