mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
proper voxygen connect and code cleanups:
- voxygen abort when the server has a invalid veloren_network handshake, e.g. by outdated version instead of try again - rename Network `Address` to `ProtocolAddr` as sugested by zest as it's a combination of Protocol and std::io::Addr - remove the manual byte arrays in `protocols.rs` with something more nice
This commit is contained in:
parent
041349be48
commit
9d32e3f884
@ -35,7 +35,9 @@ use futures_timer::Delay;
|
||||
use futures_util::{select, FutureExt};
|
||||
use hashbrown::HashMap;
|
||||
use image::DynamicImage;
|
||||
use network::{Address, Network, Participant, Pid, Stream, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
||||
use network::{
|
||||
Network, Participant, Pid, ProtocolAddr, Stream, PROMISES_CONSISTENCY, PROMISES_ORDERED,
|
||||
};
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
net::SocketAddr,
|
||||
@ -117,7 +119,7 @@ impl Client {
|
||||
let (network, f) = Network::new(Pid::new(), None);
|
||||
thread_pool.execute(f);
|
||||
|
||||
let participant = block_on(network.connect(Address::Tcp(addr.into())))?;
|
||||
let participant = block_on(network.connect(ProtocolAddr::Tcp(addr.into())))?;
|
||||
let mut stream = block_on(participant.open(10, PROMISES_ORDERED | PROMISES_CONSISTENCY))?;
|
||||
|
||||
// Wait for initial sync
|
||||
|
@ -7,7 +7,7 @@ use async_std::io;
|
||||
use async_std::sync::RwLock;
|
||||
use clap::{App, Arg};
|
||||
use futures::executor::{block_on, ThreadPool};
|
||||
use network::{Address, Network, Participant, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
||||
use network::{ProtocolAddr, Network, Participant, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
||||
use std::{sync::Arc, thread, time::Duration, collections::HashMap};
|
||||
use tracing::*;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
@ -77,8 +77,8 @@ fn main() {
|
||||
let port: u16 = matches.value_of("port").unwrap().parse().unwrap();
|
||||
let ip: &str = matches.value_of("ip").unwrap();
|
||||
let address = match matches.value_of("protocol") {
|
||||
Some("tcp") => Address::Tcp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||
Some("udp") => Address::Udp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||
Some("tcp") => ProtocolAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||
Some("udp") => ProtocolAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||
_ => panic!("invalid mode, run --help!"),
|
||||
};
|
||||
|
||||
@ -99,7 +99,7 @@ fn main() {
|
||||
}
|
||||
}
|
||||
|
||||
fn server(address: Address) {
|
||||
fn server(address: ProtocolAddr) {
|
||||
let (server, f) = Network::new(Pid::new(), None);
|
||||
let server = Arc::new(server);
|
||||
std::thread::spawn(f);
|
||||
@ -143,7 +143,7 @@ async fn client_connection(_network: Arc<Network>, participant: Arc<Participant>
|
||||
println!("[{}] disconnected", username);
|
||||
}
|
||||
|
||||
fn client(address: Address) {
|
||||
fn client(address: ProtocolAddr) {
|
||||
let (client, f) = Network::new(Pid::new(), None);
|
||||
std::thread::spawn(f);
|
||||
let pool = ThreadPool::new().unwrap();
|
||||
|
@ -2,7 +2,7 @@ use async_std::{
|
||||
fs,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use network::{Address, Participant, Stream};
|
||||
use network::{ProtocolAddr, Participant, Stream};
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@ -12,7 +12,7 @@ use std::collections::HashMap;
|
||||
pub enum LocalCommand {
|
||||
Shutdown,
|
||||
Disconnect,
|
||||
Connect(Address),
|
||||
Connect(ProtocolAddr),
|
||||
List,
|
||||
Serve(FileInfo),
|
||||
Get(u32, Option<String>),
|
||||
|
@ -10,7 +10,7 @@ use futures::{
|
||||
executor::{block_on, ThreadPool},
|
||||
sink::SinkExt,
|
||||
};
|
||||
use network::Address;
|
||||
use network::ProtocolAddr;
|
||||
use std::{thread, time::Duration};
|
||||
use tracing::*;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
@ -54,7 +54,7 @@ fn main() {
|
||||
.init();
|
||||
|
||||
let port: u16 = matches.value_of("port").unwrap().parse().unwrap();
|
||||
let address = Address::Tcp(format!("{}:{}", "127.0.0.1", port).parse().unwrap());
|
||||
let address = ProtocolAddr::Tcp(format!("{}:{}", "127.0.0.1", port).parse().unwrap());
|
||||
|
||||
let (server, cmd_sender) = Server::new();
|
||||
let pool = ThreadPool::new().unwrap();
|
||||
@ -157,13 +157,13 @@ async fn client(mut cmd_sender: mpsc::UnboundedSender<LocalCommand>) {
|
||||
("connect", Some(connect_matches)) => {
|
||||
let socketaddr = connect_matches.value_of("ip:port").unwrap().parse().unwrap();
|
||||
cmd_sender
|
||||
.send(LocalCommand::Connect(Address::Tcp(socketaddr)))
|
||||
.send(LocalCommand::Connect(ProtocolAddr::Tcp(socketaddr)))
|
||||
.await
|
||||
.unwrap();
|
||||
},
|
||||
("t", _) => {
|
||||
cmd_sender
|
||||
.send(LocalCommand::Connect(Address::Tcp(
|
||||
.send(LocalCommand::Connect(ProtocolAddr::Tcp(
|
||||
"127.0.0.1:1231".parse().unwrap(),
|
||||
)))
|
||||
.await
|
||||
|
@ -5,7 +5,7 @@ use async_std::{
|
||||
sync::{Mutex, RwLock},
|
||||
};
|
||||
use futures::{channel::mpsc, future::FutureExt, stream::StreamExt};
|
||||
use network::{Address, Network, Participant, Pid, Stream, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
||||
use network::{ProtocolAddr, Network, Participant, Pid, Stream, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tracing::*;
|
||||
|
||||
@ -42,7 +42,7 @@ impl Server {
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn run(mut self, address: Address) {
|
||||
pub async fn run(mut self, address: ProtocolAddr) {
|
||||
let run_channels = self.run_channels.take().unwrap();
|
||||
|
||||
self.network.listen(address).await.unwrap();
|
||||
|
@ -7,7 +7,7 @@ mod metrics;
|
||||
|
||||
use clap::{App, Arg};
|
||||
use futures::executor::block_on;
|
||||
use network::{Address, MessageBuffer, Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
||||
use network::{ProtocolAddr, MessageBuffer, Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
sync::Arc,
|
||||
@ -96,8 +96,8 @@ fn main() {
|
||||
let port: u16 = matches.value_of("port").unwrap().parse().unwrap();
|
||||
let ip: &str = matches.value_of("ip").unwrap();
|
||||
let address = match matches.value_of("protocol") {
|
||||
Some("tcp") => Address::Tcp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||
Some("udp") => Address::Udp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||
Some("tcp") => ProtocolAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||
Some("udp") => ProtocolAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||
_ => panic!("Invalid mode, run --help!"),
|
||||
};
|
||||
|
||||
@ -118,7 +118,7 @@ fn main() {
|
||||
}
|
||||
}
|
||||
|
||||
fn server(address: Address) {
|
||||
fn server(address: ProtocolAddr) {
|
||||
let mut metrics = metrics::SimpleMetrics::new();
|
||||
let (server, f) = Network::new(Pid::new(), Some(metrics.registry()));
|
||||
std::thread::spawn(f);
|
||||
@ -146,7 +146,7 @@ fn server(address: Address) {
|
||||
}
|
||||
}
|
||||
|
||||
fn client(address: Address) {
|
||||
fn client(address: ProtocolAddr) {
|
||||
let mut metrics = metrics::SimpleMetrics::new();
|
||||
let (client, f) = Network::new(Pid::new(), Some(metrics.registry()));
|
||||
std::thread::spawn(f);
|
||||
|
@ -31,7 +31,7 @@ type ParticipantCloseChannel =
|
||||
|
||||
/// Represents a Tcp or Udp or Mpsc address
|
||||
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
|
||||
pub enum Address {
|
||||
pub enum ProtocolAddr {
|
||||
Tcp(SocketAddr),
|
||||
Udp(SocketAddr),
|
||||
Mpsc(u64),
|
||||
@ -111,12 +111,12 @@ pub enum StreamError {
|
||||
/// [`Arc`](std::sync::Arc) as all commands have internal mutability.
|
||||
///
|
||||
/// The `Network` has methods to [`connect`] to other [`Participants`] actively
|
||||
/// via their [`Address`], or [`listen`] passively for [`connected`]
|
||||
/// via their [`ProtocolAddr`], or [`listen`] passively for [`connected`]
|
||||
/// [`Participants`].
|
||||
///
|
||||
/// # Examples
|
||||
/// ```rust
|
||||
/// use veloren_network::{Network, Address, Pid};
|
||||
/// use veloren_network::{Network, ProtocolAddr, Pid};
|
||||
/// use futures::executor::block_on;
|
||||
///
|
||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
@ -127,9 +127,9 @@ pub enum StreamError {
|
||||
/// # //setup pseudo database!
|
||||
/// # let (database, fd) = Network::new(Pid::new(), None);
|
||||
/// # std::thread::spawn(fd);
|
||||
/// # database.listen(Address::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
|
||||
/// network.listen(Address::Tcp("127.0.0.1:2999".parse().unwrap())).await?;
|
||||
/// let database = network.connect(Address::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
|
||||
/// # database.listen(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
|
||||
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?;
|
||||
/// let database = network.connect(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
|
||||
/// # Ok(())
|
||||
/// })
|
||||
/// # }
|
||||
@ -144,9 +144,9 @@ pub struct Network {
|
||||
participant_disconnect_sender:
|
||||
RwLock<HashMap<Pid, Arc<std::sync::Mutex<Option<ParticipantCloseChannel>>>>>,
|
||||
listen_sender:
|
||||
RwLock<mpsc::UnboundedSender<(Address, oneshot::Sender<async_std::io::Result<()>>)>>,
|
||||
RwLock<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<async_std::io::Result<()>>)>>,
|
||||
connect_sender:
|
||||
RwLock<mpsc::UnboundedSender<(Address, oneshot::Sender<io::Result<Participant>>)>>,
|
||||
RwLock<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<io::Result<Participant>>)>>,
|
||||
connected_receiver: RwLock<mpsc::UnboundedReceiver<Participant>>,
|
||||
shutdown_sender: Option<oneshot::Sender<()>>,
|
||||
}
|
||||
@ -176,7 +176,7 @@ impl Network {
|
||||
/// ```rust
|
||||
/// //Example with uvth
|
||||
/// use uvth::ThreadPoolBuilder;
|
||||
/// use veloren_network::{Address, Network, Pid};
|
||||
/// use veloren_network::{Network, Pid, ProtocolAddr};
|
||||
///
|
||||
/// let pool = ThreadPoolBuilder::new().build();
|
||||
/// let (network, f) = Network::new(Pid::new(), None);
|
||||
@ -185,7 +185,7 @@ impl Network {
|
||||
///
|
||||
/// ```rust
|
||||
/// //Example with std::thread
|
||||
/// use veloren_network::{Address, Network, Pid};
|
||||
/// use veloren_network::{Network, Pid, ProtocolAddr};
|
||||
///
|
||||
/// let (network, f) = Network::new(Pid::new(), None);
|
||||
/// std::thread::spawn(f);
|
||||
@ -228,7 +228,7 @@ impl Network {
|
||||
)
|
||||
}
|
||||
|
||||
/// starts listening on an [`Address`].
|
||||
/// starts listening on an [`ProtocolAddr`].
|
||||
/// When the method returns the `Network` is ready to listen for incoming
|
||||
/// connections OR has returned a [`NetworkError`] (e.g. port already used).
|
||||
/// You can call [`connected`] to asynchrony wait for a [`Participant`] to
|
||||
@ -238,7 +238,7 @@ impl Network {
|
||||
/// # Examples
|
||||
/// ```rust
|
||||
/// use futures::executor::block_on;
|
||||
/// use veloren_network::{Address, Network, Pid};
|
||||
/// use veloren_network::{Network, Pid, ProtocolAddr};
|
||||
///
|
||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
/// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally
|
||||
@ -246,10 +246,10 @@ impl Network {
|
||||
/// std::thread::spawn(f);
|
||||
/// block_on(async {
|
||||
/// network
|
||||
/// .listen(Address::Tcp("0.0.0.0:2000".parse().unwrap()))
|
||||
/// .listen(ProtocolAddr::Tcp("0.0.0.0:2000".parse().unwrap()))
|
||||
/// .await?;
|
||||
/// network
|
||||
/// .listen(Address::Udp("127.0.0.1:2001".parse().unwrap()))
|
||||
/// .listen(ProtocolAddr::Udp("127.0.0.1:2001".parse().unwrap()))
|
||||
/// .await?;
|
||||
/// # Ok(())
|
||||
/// })
|
||||
@ -257,7 +257,7 @@ impl Network {
|
||||
/// ```
|
||||
///
|
||||
/// [`connected`]: Network::connected
|
||||
pub async fn listen(&self, address: Address) -> Result<(), NetworkError> {
|
||||
pub async fn listen(&self, address: ProtocolAddr) -> Result<(), NetworkError> {
|
||||
let (s2a_result_s, s2a_result_r) = oneshot::channel::<async_std::io::Result<()>>();
|
||||
debug!(?address, "listening on address");
|
||||
self.listen_sender
|
||||
@ -273,13 +273,13 @@ impl Network {
|
||||
}
|
||||
}
|
||||
|
||||
/// starts connectiong to an [`Address`].
|
||||
/// starts connectiong to an [`ProtocolAddr`].
|
||||
/// When the method returns the Network either returns a [`Participant`]
|
||||
/// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g.
|
||||
/// can't connect, or invalid Handshake) # Examples
|
||||
/// ```rust
|
||||
/// use futures::executor::block_on;
|
||||
/// use veloren_network::{Address, Network, Pid};
|
||||
/// use veloren_network::{Network, Pid, ProtocolAddr};
|
||||
///
|
||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
/// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above
|
||||
@ -288,16 +288,16 @@ impl Network {
|
||||
/// # let (remote, fr) = Network::new(Pid::new(), None);
|
||||
/// # std::thread::spawn(fr);
|
||||
/// block_on(async {
|
||||
/// # remote.listen(Address::Tcp("0.0.0.0:2010".parse().unwrap())).await?;
|
||||
/// # remote.listen(Address::Udp("0.0.0.0:2011".parse().unwrap())).await?;
|
||||
/// # remote.listen(ProtocolAddr::Tcp("0.0.0.0:2010".parse().unwrap())).await?;
|
||||
/// # remote.listen(ProtocolAddr::Udp("0.0.0.0:2011".parse().unwrap())).await?;
|
||||
/// let p1 = network
|
||||
/// .connect(Address::Tcp("127.0.0.1:2010".parse().unwrap()))
|
||||
/// .connect(ProtocolAddr::Tcp("127.0.0.1:2010".parse().unwrap()))
|
||||
/// .await?;
|
||||
/// # //this doesn't work yet, so skip the test
|
||||
/// # //TODO fixme!
|
||||
/// # return Ok(());
|
||||
/// let p2 = network
|
||||
/// .connect(Address::Udp("127.0.0.1:2011".parse().unwrap()))
|
||||
/// .connect(ProtocolAddr::Udp("127.0.0.1:2011".parse().unwrap()))
|
||||
/// .await?;
|
||||
/// assert_eq!(&p1, &p2);
|
||||
/// # Ok(())
|
||||
@ -306,14 +306,14 @@ impl Network {
|
||||
/// ```
|
||||
/// Usually the `Network` guarantees that a operation on a [`Participant`]
|
||||
/// succeeds, e.g. by automatic retrying unless it fails completely e.g. by
|
||||
/// disconnecting from the remote. If 2 [`Addresses`] you `connect` to
|
||||
/// disconnecting from the remote. If 2 [`ProtocolAddres`] you `connect` to
|
||||
/// belongs to the same [`Participant`], you get the same [`Participant`] as
|
||||
/// a result. This is useful e.g. by connecting to the same
|
||||
/// [`Participant`] via multiple Protocols.
|
||||
///
|
||||
/// [`Streams`]: crate::api::Stream
|
||||
/// [`Addresses`]: crate::api::Address
|
||||
pub async fn connect(&self, address: Address) -> Result<Participant, NetworkError> {
|
||||
/// [`ProtocolAddres`]: crate::api::ProtocolAddr
|
||||
pub async fn connect(&self, address: ProtocolAddr) -> Result<Participant, NetworkError> {
|
||||
let (pid_sender, pid_receiver) = oneshot::channel::<io::Result<Participant>>();
|
||||
debug!(?address, "Connect to address");
|
||||
self.connect_sender
|
||||
@ -337,7 +337,7 @@ impl Network {
|
||||
Ok(participant)
|
||||
}
|
||||
|
||||
/// returns a [`Participant`] created from a [`Address`] you called
|
||||
/// returns a [`Participant`] created from a [`ProtocolAddr`] you called
|
||||
/// [`listen`] on before. This function will either return a working
|
||||
/// [`Participant`] ready to open [`Streams`] on OR has returned a
|
||||
/// [`NetworkError`] (e.g. Network got closed)
|
||||
@ -345,7 +345,7 @@ impl Network {
|
||||
/// # Examples
|
||||
/// ```rust
|
||||
/// use futures::executor::block_on;
|
||||
/// use veloren_network::{Address, Network, Pid};
|
||||
/// use veloren_network::{Network, Pid, ProtocolAddr};
|
||||
///
|
||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
/// // Create a Network, listen on port `2020` TCP and opens returns their Pid
|
||||
@ -355,9 +355,9 @@ impl Network {
|
||||
/// # std::thread::spawn(fr);
|
||||
/// block_on(async {
|
||||
/// network
|
||||
/// .listen(Address::Tcp("0.0.0.0:2020".parse().unwrap()))
|
||||
/// .listen(ProtocolAddr::Tcp("0.0.0.0:2020".parse().unwrap()))
|
||||
/// .await?;
|
||||
/// # remote.connect(Address::Tcp("0.0.0.0:2020".parse().unwrap())).await?;
|
||||
/// # remote.connect(ProtocolAddr::Tcp("0.0.0.0:2020".parse().unwrap())).await?;
|
||||
/// while let Ok(participant) = network.connected().await {
|
||||
/// println!("Participant connected: {}", participant.remote_pid());
|
||||
/// # //skip test here as it would be a endless loop
|
||||
@ -418,7 +418,7 @@ impl Participant {
|
||||
/// # Examples
|
||||
/// ```rust
|
||||
/// use futures::executor::block_on;
|
||||
/// use veloren_network::{Address, Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
||||
/// use veloren_network::{Network, Pid, ProtocolAddr, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
||||
///
|
||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
/// // Create a Network, connect on port 2100 and open a stream
|
||||
@ -427,9 +427,9 @@ impl Participant {
|
||||
/// # let (remote, fr) = Network::new(Pid::new(), None);
|
||||
/// # std::thread::spawn(fr);
|
||||
/// block_on(async {
|
||||
/// # remote.listen(Address::Tcp("0.0.0.0:2100".parse().unwrap())).await?;
|
||||
/// # remote.listen(ProtocolAddr::Tcp("0.0.0.0:2100".parse().unwrap())).await?;
|
||||
/// let p1 = network
|
||||
/// .connect(Address::Tcp("127.0.0.1:2100".parse().unwrap()))
|
||||
/// .connect(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap()))
|
||||
/// .await?;
|
||||
/// let _s1 = p1.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
|
||||
/// # Ok(())
|
||||
@ -479,7 +479,7 @@ impl Participant {
|
||||
///
|
||||
/// # Examples
|
||||
/// ```rust
|
||||
/// use veloren_network::{Network, Pid, Address, PROMISES_ORDERED, PROMISES_CONSISTENCY};
|
||||
/// use veloren_network::{Network, Pid, ProtocolAddr, PROMISES_ORDERED, PROMISES_CONSISTENCY};
|
||||
/// use futures::executor::block_on;
|
||||
///
|
||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
@ -490,8 +490,8 @@ impl Participant {
|
||||
/// # let (remote, fr) = Network::new(Pid::new(), None);
|
||||
/// # std::thread::spawn(fr);
|
||||
/// block_on(async {
|
||||
/// # remote.listen(Address::Tcp("0.0.0.0:2110".parse().unwrap())).await?;
|
||||
/// let p1 = network.connect(Address::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
|
||||
/// # remote.listen(ProtocolAddr::Tcp("0.0.0.0:2110".parse().unwrap())).await?;
|
||||
/// let p1 = network.connect(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
|
||||
/// # let p2 = remote.connected().await?;
|
||||
/// # p2.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
|
||||
/// let _s1 = p1.opened().await?;
|
||||
@ -543,7 +543,7 @@ impl Participant {
|
||||
/// # Examples
|
||||
/// ```rust
|
||||
/// use futures::executor::block_on;
|
||||
/// use veloren_network::{Address, Network, Pid};
|
||||
/// use veloren_network::{Network, Pid, ProtocolAddr};
|
||||
///
|
||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
/// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection.
|
||||
@ -553,9 +553,9 @@ impl Participant {
|
||||
/// # std::thread::spawn(fr);
|
||||
/// block_on(async {
|
||||
/// network
|
||||
/// .listen(Address::Tcp("0.0.0.0:2030".parse().unwrap()))
|
||||
/// .listen(ProtocolAddr::Tcp("0.0.0.0:2030".parse().unwrap()))
|
||||
/// .await?;
|
||||
/// # remote.connect(Address::Tcp("0.0.0.0:2030".parse().unwrap())).await?;
|
||||
/// # remote.connect(ProtocolAddr::Tcp("0.0.0.0:2030".parse().unwrap())).await?;
|
||||
/// while let Ok(participant) = network.connected().await {
|
||||
/// println!("Participant connected: {}", participant.remote_pid());
|
||||
/// participant.disconnect().await?;
|
||||
@ -674,7 +674,7 @@ impl Stream {
|
||||
///
|
||||
/// # Example
|
||||
/// ```
|
||||
/// use veloren_network::{Network, Address, Pid};
|
||||
/// use veloren_network::{Network, ProtocolAddr, Pid};
|
||||
/// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY};
|
||||
/// use futures::executor::block_on;
|
||||
///
|
||||
@ -685,8 +685,8 @@ impl Stream {
|
||||
/// # let (remote, fr) = Network::new(Pid::new(), None);
|
||||
/// # std::thread::spawn(fr);
|
||||
/// block_on(async {
|
||||
/// network.listen(Address::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
|
||||
/// # let remote_p = remote.connect(Address::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
|
||||
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
|
||||
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
|
||||
/// # // keep it alive
|
||||
/// # let _stream_p = remote_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
|
||||
/// let participant_a = network.connected().await?;
|
||||
@ -713,7 +713,7 @@ impl Stream {
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use veloren_network::{Network, Address, Pid, MessageBuffer};
|
||||
/// use veloren_network::{Network, ProtocolAddr, Pid, MessageBuffer};
|
||||
/// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY};
|
||||
/// use futures::executor::block_on;
|
||||
/// use bincode;
|
||||
@ -727,9 +727,9 @@ impl Stream {
|
||||
/// # let (remote2, fr2) = Network::new(Pid::new(), None);
|
||||
/// # std::thread::spawn(fr2);
|
||||
/// block_on(async {
|
||||
/// network.listen(Address::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
||||
/// # let remote1_p = remote1.connect(Address::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
||||
/// # let remote2_p = remote2.connect(Address::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
||||
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
||||
/// # let remote1_p = remote1.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
||||
/// # let remote2_p = remote2.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
||||
/// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid());
|
||||
/// # remote1_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
|
||||
/// # remote2_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
|
||||
@ -779,7 +779,7 @@ impl Stream {
|
||||
///
|
||||
/// # Example
|
||||
/// ```
|
||||
/// use veloren_network::{Network, Address, Pid};
|
||||
/// use veloren_network::{Network, ProtocolAddr, Pid};
|
||||
/// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY};
|
||||
/// use futures::executor::block_on;
|
||||
///
|
||||
@ -790,8 +790,8 @@ impl Stream {
|
||||
/// # let (remote, fr) = Network::new(Pid::new(), None);
|
||||
/// # std::thread::spawn(fr);
|
||||
/// block_on(async {
|
||||
/// network.listen(Address::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
|
||||
/// # let remote_p = remote.connect(Address::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
|
||||
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
|
||||
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
|
||||
/// # let mut stream_p = remote_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
|
||||
/// # stream_p.send("Hello World");
|
||||
/// let participant_a = network.connected().await?;
|
||||
|
@ -14,16 +14,16 @@
|
||||
//! struct [`Network`] once with a new [`Pid`]. The Pid is necessary to identify
|
||||
//! other [`Networks`] over the network protocols (e.g. TCP, UDP)
|
||||
//!
|
||||
//! To connect to another application, you must know it's [`Address`]. One side
|
||||
//! will call [`connect`], the other [`connected`]. If successfull both
|
||||
//! To connect to another application, you must know it's [`ProtocolAddr`]. One
|
||||
//! side will call [`connect`], the other [`connected`]. If successfull both
|
||||
//! applications will now get a [`Participant`].
|
||||
//!
|
||||
//! This [`Participant`] represents the connection between those 2 applications.
|
||||
//! over the respective [`Address`] and with it the choosen network protocol.
|
||||
//! However messages can't be send directly via [`Participants`], instead you
|
||||
//! must open a [`Stream`] on it. Like above, one side has to call [`open`], the
|
||||
//! other [`opened`]. [`Streams`] can have a different priority and
|
||||
//! [`Promises`].
|
||||
//! over the respective [`ProtocolAddr`] and with it the choosen network
|
||||
//! protocol. However messages can't be send directly via [`Participants`],
|
||||
//! instead you must open a [`Stream`] on it. Like above, one side has to call
|
||||
//! [`open`], the other [`opened`]. [`Streams`] can have a different priority
|
||||
//! and [`Promises`].
|
||||
//!
|
||||
//! You can now use the [`Stream`] to [`send`] and [`recv`] in both directions.
|
||||
//! You can send all kind of messages that implement [`serde`].
|
||||
@ -40,7 +40,7 @@
|
||||
//! ```rust
|
||||
//! use async_std::task::sleep;
|
||||
//! use futures::{executor::block_on, join};
|
||||
//! use veloren_network::{Address, Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
||||
//! use veloren_network::{Network, Pid, ProtocolAddr, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
||||
//!
|
||||
//! // Client
|
||||
//! async fn client() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
@ -48,7 +48,7 @@
|
||||
//! let (client_network, f) = Network::new(Pid::new(), None);
|
||||
//! std::thread::spawn(f);
|
||||
//! let server = client_network
|
||||
//! .connect(Address::Tcp("127.0.0.1:12345".parse().unwrap()))
|
||||
//! .connect(ProtocolAddr::Tcp("127.0.0.1:12345".parse().unwrap()))
|
||||
//! .await?;
|
||||
//! let mut stream = server
|
||||
//! .open(10, PROMISES_ORDERED | PROMISES_CONSISTENCY)
|
||||
@ -62,7 +62,7 @@
|
||||
//! let (server_network, f) = Network::new(Pid::new(), None);
|
||||
//! std::thread::spawn(f);
|
||||
//! server_network
|
||||
//! .listen(Address::Tcp("127.0.0.1:12345".parse().unwrap()))
|
||||
//! .listen(ProtocolAddr::Tcp("127.0.0.1:12345".parse().unwrap()))
|
||||
//! .await?;
|
||||
//! let client = server_network.connected().await?;
|
||||
//! let mut stream = client.opened().await?;
|
||||
@ -95,7 +95,7 @@
|
||||
//! [`send`]: crate::api::Stream::send
|
||||
//! [`recv`]: crate::api::Stream::recv
|
||||
//! [`Pid`]: crate::types::Pid
|
||||
//! [`Address`]: crate::api::Address
|
||||
//! [`ProtocolAddr`]: crate::api::ProtocolAddr
|
||||
//! [`Promises`]: crate::types::Promises
|
||||
|
||||
mod api;
|
||||
@ -109,7 +109,9 @@ mod scheduler;
|
||||
#[macro_use]
|
||||
mod types;
|
||||
|
||||
pub use api::{Address, Network, NetworkError, Participant, ParticipantError, Stream, StreamError};
|
||||
pub use api::{
|
||||
Network, NetworkError, Participant, ParticipantError, ProtocolAddr, Stream, StreamError,
|
||||
};
|
||||
pub use message::MessageBuffer;
|
||||
pub use types::{
|
||||
Pid, Promises, PROMISES_COMPRESSED, PROMISES_CONSISTENCY, PROMISES_ENCRYPTED,
|
||||
|
@ -1,6 +1,6 @@
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
//use std::collections::VecDeque;
|
||||
use crate::types::{Mid, Sid};
|
||||
use crate::types::{Frame, Mid, Sid};
|
||||
use std::{io, sync::Arc};
|
||||
|
||||
//Todo: Evaluate switching to VecDeque for quickly adding and removing data
|
||||
@ -52,6 +52,38 @@ pub(crate) fn deserialize<M: DeserializeOwned>(buffer: MessageBuffer) -> bincode
|
||||
bincode::deserialize(span.as_slice())
|
||||
}
|
||||
|
||||
impl OutgoingMessage {
|
||||
pub(crate) const FRAME_DATA_SIZE: u64 = 1400;
|
||||
|
||||
/// returns if msg is empty
|
||||
pub(crate) fn fill_next<E: Extend<(Sid, Frame)>>(
|
||||
&mut self,
|
||||
msg_sid: Sid,
|
||||
frames: &mut E,
|
||||
) -> bool {
|
||||
let to_send = std::cmp::min(
|
||||
self.buffer.data[self.cursor as usize..].len() as u64,
|
||||
Self::FRAME_DATA_SIZE,
|
||||
);
|
||||
if to_send > 0 {
|
||||
if self.cursor == 0 {
|
||||
frames.extend(std::iter::once((msg_sid, Frame::DataHeader {
|
||||
mid: self.mid,
|
||||
sid: self.sid,
|
||||
length: self.buffer.data.len() as u64,
|
||||
})));
|
||||
}
|
||||
frames.extend(std::iter::once((msg_sid, Frame::Data {
|
||||
mid: self.mid,
|
||||
start: self.cursor,
|
||||
data: self.buffer.data[self.cursor as usize..][..to_send as usize].to_vec(),
|
||||
})));
|
||||
};
|
||||
self.cursor += to_send;
|
||||
self.cursor >= self.buffer.data.len() as u64
|
||||
}
|
||||
}
|
||||
|
||||
///wouldn't trust this aaaassss much, fine for tests
|
||||
pub(crate) fn partial_eq_io_error(first: &io::Error, second: &io::Error) -> bool {
|
||||
if let Some(f) = first.raw_os_error() {
|
||||
|
@ -40,7 +40,6 @@ pub(crate) struct PrioManager {
|
||||
}
|
||||
|
||||
impl PrioManager {
|
||||
const FRAME_DATA_SIZE: u64 = 1400;
|
||||
const PRIOS: [u32; PRIO_MAX] = [
|
||||
100, 115, 132, 152, 174, 200, 230, 264, 303, 348, 400, 459, 528, 606, 696, 800, 919, 1056,
|
||||
1213, 1393, 1600, 1838, 2111, 2425, 2786, 3200, 3676, 4222, 4850, 5572, 6400, 7352, 8445,
|
||||
@ -201,34 +200,6 @@ impl PrioManager {
|
||||
.min_by_key(|&n| self.points[*n as usize]).cloned()*/
|
||||
}
|
||||
|
||||
/// returns if msg is empty
|
||||
fn tick_msg<E: Extend<(Sid, Frame)>>(
|
||||
msg: &mut OutgoingMessage,
|
||||
msg_sid: Sid,
|
||||
frames: &mut E,
|
||||
) -> bool {
|
||||
let to_send = std::cmp::min(
|
||||
msg.buffer.data[msg.cursor as usize..].len() as u64,
|
||||
Self::FRAME_DATA_SIZE,
|
||||
);
|
||||
if to_send > 0 {
|
||||
if msg.cursor == 0 {
|
||||
frames.extend(std::iter::once((msg_sid, Frame::DataHeader {
|
||||
mid: msg.mid,
|
||||
sid: msg.sid,
|
||||
length: msg.buffer.data.len() as u64,
|
||||
})));
|
||||
}
|
||||
frames.extend(std::iter::once((msg_sid, Frame::Data {
|
||||
mid: msg.mid,
|
||||
start: msg.cursor,
|
||||
data: msg.buffer.data[msg.cursor as usize..][..to_send as usize].to_vec(),
|
||||
})));
|
||||
};
|
||||
msg.cursor += to_send;
|
||||
msg.cursor >= msg.buffer.data.len() as u64
|
||||
}
|
||||
|
||||
/// no_of_frames = frames.len()
|
||||
/// Your goal is to try to find a realistic no_of_frames!
|
||||
/// no_of_frames should be choosen so, that all Frames can be send out till
|
||||
@ -257,7 +228,7 @@ impl PrioManager {
|
||||
// => messages with same prio get a fair chance :)
|
||||
//TODO: evalaute not poping every time
|
||||
let (sid, mut msg) = self.messages[prio as usize].pop_front().unwrap();
|
||||
if Self::tick_msg(&mut msg, sid, frames) {
|
||||
if msg.fill_next(sid, frames) {
|
||||
//trace!(?m.mid, "finish message");
|
||||
//check if prio is empty
|
||||
if self.messages[prio as usize].is_empty() {
|
||||
@ -314,8 +285,8 @@ mod tests {
|
||||
use futures::{channel::oneshot, executor::block_on};
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
|
||||
const SIZE: u64 = PrioManager::FRAME_DATA_SIZE;
|
||||
const USIZE: usize = PrioManager::FRAME_DATA_SIZE as usize;
|
||||
const SIZE: u64 = OutgoingMessage::FRAME_DATA_SIZE;
|
||||
const USIZE: usize = OutgoingMessage::FRAME_DATA_SIZE as usize;
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn mock_new() -> (
|
||||
|
@ -14,7 +14,7 @@ use futures::{
|
||||
sink::SinkExt,
|
||||
stream::StreamExt,
|
||||
};
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use std::{convert::TryFrom, net::SocketAddr, sync::Arc};
|
||||
use tracing::*;
|
||||
|
||||
// Reserving bytes 0, 10, 13 as i have enough space and want to make it easy to
|
||||
@ -108,15 +108,13 @@ impl TcpProtocol {
|
||||
FRAME_HANDSHAKE => {
|
||||
let mut bytes = [0u8; 19];
|
||||
Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await;
|
||||
let magic_number = [
|
||||
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
|
||||
];
|
||||
let magic_number = *<&[u8; 7]>::try_from(&bytes[0..7]).unwrap();
|
||||
Frame::Handshake {
|
||||
magic_number,
|
||||
version: [
|
||||
u32::from_le_bytes([bytes[7], bytes[8], bytes[9], bytes[10]]),
|
||||
u32::from_le_bytes([bytes[11], bytes[12], bytes[13], bytes[14]]),
|
||||
u32::from_le_bytes([bytes[15], bytes[16], bytes[17], bytes[18]]),
|
||||
u32::from_le_bytes(*<&[u8; 4]>::try_from(&bytes[7..11]).unwrap()),
|
||||
u32::from_le_bytes(*<&[u8; 4]>::try_from(&bytes[11..15]).unwrap()),
|
||||
u32::from_le_bytes(*<&[u8; 4]>::try_from(&bytes[15..19]).unwrap()),
|
||||
],
|
||||
}
|
||||
},
|
||||
@ -124,7 +122,7 @@ impl TcpProtocol {
|
||||
let mut bytes = [0u8; 16];
|
||||
Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await;
|
||||
let pid = Pid::from_le_bytes(bytes);
|
||||
stream.read_exact(&mut bytes).await.unwrap();
|
||||
Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await;
|
||||
let secret = u128::from_le_bytes(bytes);
|
||||
Frame::Init { pid, secret }
|
||||
},
|
||||
@ -132,10 +130,7 @@ impl TcpProtocol {
|
||||
FRAME_OPEN_STREAM => {
|
||||
let mut bytes = [0u8; 10];
|
||||
Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await;
|
||||
let sid = Sid::from_le_bytes([
|
||||
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
|
||||
bytes[7],
|
||||
]);
|
||||
let sid = Sid::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[0..8]).unwrap());
|
||||
let prio = bytes[8];
|
||||
let promises = bytes[9];
|
||||
Frame::OpenStream {
|
||||
@ -147,41 +142,23 @@ impl TcpProtocol {
|
||||
FRAME_CLOSE_STREAM => {
|
||||
let mut bytes = [0u8; 8];
|
||||
Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await;
|
||||
let sid = Sid::from_le_bytes([
|
||||
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
|
||||
bytes[7],
|
||||
]);
|
||||
let sid = Sid::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[0..8]).unwrap());
|
||||
Frame::CloseStream { sid }
|
||||
},
|
||||
FRAME_DATA_HEADER => {
|
||||
let mut bytes = [0u8; 24];
|
||||
Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await;
|
||||
let mid = Mid::from_le_bytes([
|
||||
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
|
||||
bytes[7],
|
||||
]);
|
||||
let sid = Sid::from_le_bytes([
|
||||
bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14],
|
||||
bytes[15],
|
||||
]);
|
||||
let length = u64::from_le_bytes([
|
||||
bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21],
|
||||
bytes[22], bytes[23],
|
||||
]);
|
||||
let mid = Mid::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[0..8]).unwrap());
|
||||
let sid = Sid::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[8..16]).unwrap());
|
||||
let length = u64::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[16..24]).unwrap());
|
||||
Frame::DataHeader { mid, sid, length }
|
||||
},
|
||||
FRAME_DATA => {
|
||||
let mut bytes = [0u8; 18];
|
||||
Self::read_except_or_close(cid, &stream, &mut bytes, w2c_cid_frame_s).await;
|
||||
let mid = Mid::from_le_bytes([
|
||||
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
|
||||
bytes[7],
|
||||
]);
|
||||
let start = u64::from_le_bytes([
|
||||
bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14],
|
||||
bytes[15],
|
||||
]);
|
||||
let length = u16::from_le_bytes([bytes[16], bytes[17]]);
|
||||
let mid = Mid::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[0..8]).unwrap());
|
||||
let start = u64::from_le_bytes(*<&[u8; 8]>::try_from(&bytes[8..16]).unwrap());
|
||||
let length = u16::from_le_bytes(*<&[u8; 2]>::try_from(&bytes[16..18]).unwrap());
|
||||
let mut data = vec![0; length as usize];
|
||||
throughput_cache.inc_by(length as i64);
|
||||
Self::read_except_or_close(cid, &stream, &mut data, w2c_cid_frame_s).await;
|
||||
|
@ -1,5 +1,5 @@
|
||||
use crate::{
|
||||
api::{Address, Participant},
|
||||
api::{Participant, ProtocolAddr},
|
||||
channel::Handshake,
|
||||
metrics::NetworkMetrics,
|
||||
participant::BParticipant,
|
||||
@ -50,8 +50,9 @@ struct ParticipantInfo {
|
||||
/// - c: channel/handshake
|
||||
#[derive(Debug)]
|
||||
struct ControlChannels {
|
||||
a2s_listen_r: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<()>>)>,
|
||||
a2s_connect_r: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<Participant>>)>,
|
||||
a2s_listen_r: mpsc::UnboundedReceiver<(ProtocolAddr, oneshot::Sender<io::Result<()>>)>,
|
||||
a2s_connect_r:
|
||||
mpsc::UnboundedReceiver<(ProtocolAddr, oneshot::Sender<io::Result<Participant>>)>,
|
||||
a2s_scheduler_shutdown_r: oneshot::Receiver<()>,
|
||||
a2s_disconnect_r: mpsc::UnboundedReceiver<(Pid, oneshot::Sender<async_std::io::Result<()>>)>,
|
||||
b2s_prio_statistic_r: mpsc::UnboundedReceiver<(Pid, u64, u64)>,
|
||||
@ -74,7 +75,7 @@ pub struct Scheduler {
|
||||
participant_channels: Arc<Mutex<Option<ParticipantChannels>>>,
|
||||
participants: Arc<RwLock<HashMap<Pid, ParticipantInfo>>>,
|
||||
channel_ids: Arc<AtomicU64>,
|
||||
channel_listener: RwLock<HashMap<Address, oneshot::Sender<()>>>,
|
||||
channel_listener: RwLock<HashMap<ProtocolAddr, oneshot::Sender<()>>>,
|
||||
metrics: Arc<NetworkMetrics>,
|
||||
}
|
||||
|
||||
@ -85,15 +86,15 @@ impl Scheduler {
|
||||
registry: Option<&Registry>,
|
||||
) -> (
|
||||
Self,
|
||||
mpsc::UnboundedSender<(Address, oneshot::Sender<io::Result<()>>)>,
|
||||
mpsc::UnboundedSender<(Address, oneshot::Sender<io::Result<Participant>>)>,
|
||||
mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<io::Result<()>>)>,
|
||||
mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<io::Result<Participant>>)>,
|
||||
mpsc::UnboundedReceiver<Participant>,
|
||||
oneshot::Sender<()>,
|
||||
) {
|
||||
let (a2s_listen_s, a2s_listen_r) =
|
||||
mpsc::unbounded::<(Address, oneshot::Sender<io::Result<()>>)>();
|
||||
mpsc::unbounded::<(ProtocolAddr, oneshot::Sender<io::Result<()>>)>();
|
||||
let (a2s_connect_s, a2s_connect_r) =
|
||||
mpsc::unbounded::<(Address, oneshot::Sender<io::Result<Participant>>)>();
|
||||
mpsc::unbounded::<(ProtocolAddr, oneshot::Sender<io::Result<Participant>>)>();
|
||||
let (s2a_connected_s, s2a_connected_r) = mpsc::unbounded::<Participant>();
|
||||
let (a2s_scheduler_shutdown_s, a2s_scheduler_shutdown_r) = oneshot::channel::<()>();
|
||||
let (a2s_disconnect_s, a2s_disconnect_r) =
|
||||
@ -156,7 +157,7 @@ impl Scheduler {
|
||||
|
||||
async fn listen_mgr(
|
||||
&self,
|
||||
a2s_listen_r: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<()>>)>,
|
||||
a2s_listen_r: mpsc::UnboundedReceiver<(ProtocolAddr, oneshot::Sender<io::Result<()>>)>,
|
||||
) {
|
||||
trace!("Start listen_mgr");
|
||||
a2s_listen_r
|
||||
@ -168,9 +169,9 @@ impl Scheduler {
|
||||
self.metrics
|
||||
.listen_requests_total
|
||||
.with_label_values(&[match address {
|
||||
Address::Tcp(_) => "tcp",
|
||||
Address::Udp(_) => "udp",
|
||||
Address::Mpsc(_) => "mpsc",
|
||||
ProtocolAddr::Tcp(_) => "tcp",
|
||||
ProtocolAddr::Udp(_) => "udp",
|
||||
ProtocolAddr::Mpsc(_) => "mpsc",
|
||||
}])
|
||||
.inc();
|
||||
let (end_sender, end_receiver) = oneshot::channel::<()>();
|
||||
@ -189,14 +190,14 @@ impl Scheduler {
|
||||
async fn connect_mgr(
|
||||
&self,
|
||||
mut a2s_connect_r: mpsc::UnboundedReceiver<(
|
||||
Address,
|
||||
ProtocolAddr,
|
||||
oneshot::Sender<io::Result<Participant>>,
|
||||
)>,
|
||||
) {
|
||||
trace!("Start connect_mgr");
|
||||
while let Some((addr, pid_sender)) = a2s_connect_r.next().await {
|
||||
let (protocol, handshake) = match addr {
|
||||
Address::Tcp(addr) => {
|
||||
ProtocolAddr::Tcp(addr) => {
|
||||
self.metrics
|
||||
.connect_requests_total
|
||||
.with_label_values(&["tcp"])
|
||||
@ -214,7 +215,7 @@ impl Scheduler {
|
||||
false,
|
||||
)
|
||||
},
|
||||
Address::Udp(addr) => {
|
||||
ProtocolAddr::Udp(addr) => {
|
||||
self.metrics
|
||||
.connect_requests_total
|
||||
.with_label_values(&["udp"])
|
||||
@ -338,13 +339,13 @@ impl Scheduler {
|
||||
|
||||
async fn channel_creator(
|
||||
&self,
|
||||
addr: Address,
|
||||
addr: ProtocolAddr,
|
||||
s2s_stop_listening_r: oneshot::Receiver<()>,
|
||||
s2a_listen_result_s: oneshot::Sender<io::Result<()>>,
|
||||
) {
|
||||
trace!(?addr, "Start up channel creator");
|
||||
match addr {
|
||||
Address::Tcp(addr) => {
|
||||
ProtocolAddr::Tcp(addr) => {
|
||||
let listener = match net::TcpListener::bind(addr).await {
|
||||
Ok(listener) => {
|
||||
s2a_listen_result_s.send(Ok(())).unwrap();
|
||||
@ -374,7 +375,7 @@ impl Scheduler {
|
||||
.await;
|
||||
}
|
||||
},
|
||||
Address::Udp(addr) => {
|
||||
ProtocolAddr::Udp(addr) => {
|
||||
let socket = match net::UdpSocket::bind(addr).await {
|
||||
Ok(socket) => {
|
||||
s2a_listen_result_s.send(Ok(())).unwrap();
|
||||
|
@ -7,7 +7,7 @@ use std::{
|
||||
};
|
||||
use tracing::*;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use veloren_network::{Address, Network, Participant, Pid, Stream, PROMISES_NONE};
|
||||
use veloren_network::{Network, Participant, Pid, ProtocolAddr, Stream, PROMISES_NONE};
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) {
|
||||
@ -47,7 +47,7 @@ pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) {
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn network_participant_stream(
|
||||
addr: Address,
|
||||
addr: ProtocolAddr,
|
||||
) -> (Network, Participant, Stream, Network, Participant, Stream) {
|
||||
let (n_a, f_a) = Network::new(Pid::fake(1), None);
|
||||
std::thread::spawn(f_a);
|
||||
@ -65,19 +65,19 @@ pub async fn network_participant_stream(
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn tcp() -> veloren_network::Address {
|
||||
pub fn tcp() -> veloren_network::ProtocolAddr {
|
||||
lazy_static! {
|
||||
static ref PORTS: AtomicU16 = AtomicU16::new(5000);
|
||||
}
|
||||
let port = PORTS.fetch_add(1, Ordering::Relaxed);
|
||||
veloren_network::Address::Tcp(SocketAddr::from(([127, 0, 0, 1], port)))
|
||||
veloren_network::ProtocolAddr::Tcp(SocketAddr::from(([127, 0, 0, 1], port)))
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn udp() -> veloren_network::Address {
|
||||
pub fn udp() -> veloren_network::ProtocolAddr {
|
||||
lazy_static! {
|
||||
static ref PORTS: AtomicU16 = AtomicU16::new(5000);
|
||||
}
|
||||
let port = PORTS.fetch_add(1, Ordering::Relaxed);
|
||||
veloren_network::Address::Udp(SocketAddr::from(([127, 0, 0, 1], port)))
|
||||
veloren_network::ProtocolAddr::Udp(SocketAddr::from(([127, 0, 0, 1], port)))
|
||||
}
|
||||
|
@ -4,7 +4,7 @@ use veloren_network::{NetworkError, StreamError};
|
||||
mod helper;
|
||||
use helper::{network_participant_stream, tcp, udp};
|
||||
use std::io::ErrorKind;
|
||||
use veloren_network::{Address, Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
||||
use veloren_network::{Network, Pid, ProtocolAddr, PROMISES_CONSISTENCY, PROMISES_ORDERED};
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
@ -68,16 +68,16 @@ fn tcp_and_udp_2_connections() -> std::result::Result<(), Box<dyn std::error::Er
|
||||
std::thread::spawn(fr);
|
||||
block_on(async {
|
||||
remote
|
||||
.listen(Address::Tcp("0.0.0.0:2000".parse().unwrap()))
|
||||
.listen(ProtocolAddr::Tcp("0.0.0.0:2000".parse().unwrap()))
|
||||
.await?;
|
||||
remote
|
||||
.listen(Address::Udp("0.0.0.0:2001".parse().unwrap()))
|
||||
.listen(ProtocolAddr::Udp("0.0.0.0:2001".parse().unwrap()))
|
||||
.await?;
|
||||
let p1 = network
|
||||
.connect(Address::Tcp("127.0.0.1:2000".parse().unwrap()))
|
||||
.connect(ProtocolAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
|
||||
.await?;
|
||||
let p2 = network
|
||||
.connect(Address::Udp("127.0.0.1:2001".parse().unwrap()))
|
||||
.connect(ProtocolAddr::Udp("127.0.0.1:2001".parse().unwrap()))
|
||||
.await?;
|
||||
assert_eq!(&p1, &p2);
|
||||
Ok(())
|
||||
@ -126,10 +126,10 @@ fn api_stream_send_main() -> std::result::Result<(), Box<dyn std::error::Error>>
|
||||
std::thread::spawn(fr);
|
||||
block_on(async {
|
||||
network
|
||||
.listen(Address::Tcp("127.0.0.1:1200".parse().unwrap()))
|
||||
.listen(ProtocolAddr::Tcp("127.0.0.1:1200".parse().unwrap()))
|
||||
.await?;
|
||||
let remote_p = remote
|
||||
.connect(Address::Tcp("127.0.0.1:1200".parse().unwrap()))
|
||||
.connect(ProtocolAddr::Tcp("127.0.0.1:1200".parse().unwrap()))
|
||||
.await?;
|
||||
// keep it alive
|
||||
let _stream_p = remote_p
|
||||
@ -154,10 +154,10 @@ fn api_stream_recv_main() -> std::result::Result<(), Box<dyn std::error::Error>>
|
||||
std::thread::spawn(fr);
|
||||
block_on(async {
|
||||
network
|
||||
.listen(Address::Tcp("127.0.0.1:1220".parse().unwrap()))
|
||||
.listen(ProtocolAddr::Tcp("127.0.0.1:1220".parse().unwrap()))
|
||||
.await?;
|
||||
let remote_p = remote
|
||||
.connect(Address::Tcp("127.0.0.1:1220".parse().unwrap()))
|
||||
.connect(ProtocolAddr::Tcp("127.0.0.1:1220".parse().unwrap()))
|
||||
.await?;
|
||||
let mut stream_p = remote_p
|
||||
.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY)
|
||||
|
@ -41,7 +41,7 @@ use futures_executor::block_on;
|
||||
use futures_timer::Delay;
|
||||
use futures_util::{select, FutureExt};
|
||||
use metrics::{ServerMetrics, TickMetrics};
|
||||
use network::{Address, Network, Pid};
|
||||
use network::{Network, Pid, ProtocolAddr};
|
||||
use persistence::character::{CharacterLoader, CharacterLoaderResponseType, CharacterUpdater};
|
||||
use specs::{join::Join, Builder, Entity as EcsEntity, RunNow, SystemData, WorldExt};
|
||||
use std::{
|
||||
@ -241,7 +241,7 @@ impl Server {
|
||||
.build();
|
||||
let (network, f) = Network::new(Pid::new(), None);
|
||||
thread_pool.execute(f);
|
||||
block_on(network.listen(Address::Tcp(settings.gameserver_address)))?;
|
||||
block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?;
|
||||
|
||||
let this = Self {
|
||||
state,
|
||||
|
@ -12,7 +12,7 @@ use std::{
|
||||
thread,
|
||||
time::Duration,
|
||||
};
|
||||
use tracing::debug;
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
@ -81,7 +81,7 @@ impl ClientInit {
|
||||
{
|
||||
match Client::new(socket_addr, view_distance) {
|
||||
Ok(mut client) => {
|
||||
if let Err(err) =
|
||||
if let Err(e) =
|
||||
client.register(username, password, |auth_server| {
|
||||
let _ = tx
|
||||
.send(Msg::IsAuthTrusted(auth_server.to_string()));
|
||||
@ -93,29 +93,35 @@ impl ClientInit {
|
||||
.unwrap_or(false)
|
||||
})
|
||||
{
|
||||
last_err = Some(Error::ClientError(err));
|
||||
last_err = Some(Error::ClientError(e));
|
||||
break 'tries;
|
||||
}
|
||||
let _ = tx.send(Msg::Done(Ok(client)));
|
||||
return;
|
||||
},
|
||||
Err(err) => {
|
||||
match err {
|
||||
ClientError::NetworkErr(NetworkError::ConnectFailed(
|
||||
..,
|
||||
)) => {
|
||||
Err(ClientError::NetworkErr(NetworkError::ConnectFailed(e))) => {
|
||||
if e.kind() == std::io::ErrorKind::PermissionDenied {
|
||||
warn!(
|
||||
?e,
|
||||
"You can't connect to the server, you are running a \
|
||||
incompatible version than the server"
|
||||
);
|
||||
last_err = Some(Error::ClientError(
|
||||
ClientError::NetworkErr(NetworkError::ConnectFailed(e)),
|
||||
));
|
||||
break 'tries;
|
||||
} else {
|
||||
debug!(
|
||||
"can't reach the server, going to retry in a few \
|
||||
seconds"
|
||||
);
|
||||
},
|
||||
// Non-connection error, stop attempts
|
||||
err => {
|
||||
last_err = Some(Error::ClientError(err));
|
||||
break 'tries;
|
||||
},
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
trace!(?e, "stopping connecting to server, due to error");
|
||||
last_err = Some(Error::ClientError(e));
|
||||
break 'tries;
|
||||
},
|
||||
}
|
||||
}
|
||||
thread::sleep(Duration::from_secs(5));
|
||||
|
Loading…
Reference in New Issue
Block a user