mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
protocoladdr change for listen and connect
(remove a loop in quic protocol which wasnt a actual loop)
This commit is contained in:
parent
9f0aceba4c
commit
760c382ed9
33
Cargo.lock
generated
33
Cargo.lock
generated
@ -3631,6 +3631,17 @@ version = "0.1.2"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
|
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pem"
|
||||||
|
version = "0.8.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "fd56cbd21fea48d0c440b41cd69c589faacade08c992d9a54e471b79d0fd13eb"
|
||||||
|
dependencies = [
|
||||||
|
"base64",
|
||||||
|
"once_cell",
|
||||||
|
"regex",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "percent-encoding"
|
name = "percent-encoding"
|
||||||
version = "2.1.0"
|
version = "2.1.0"
|
||||||
@ -4061,6 +4072,18 @@ dependencies = [
|
|||||||
"num_cpus",
|
"num_cpus",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rcgen"
|
||||||
|
version = "0.8.10"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4e80a701a04edd9cab874a3d59323bebe24c9a92dd602088c78da83732066d1b"
|
||||||
|
dependencies = [
|
||||||
|
"chrono",
|
||||||
|
"pem",
|
||||||
|
"ring",
|
||||||
|
"yasna",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "redox_syscall"
|
name = "redox_syscall"
|
||||||
version = "0.1.57"
|
version = "0.1.57"
|
||||||
@ -5628,6 +5651,7 @@ dependencies = [
|
|||||||
"prometheus-hyper",
|
"prometheus-hyper",
|
||||||
"quinn",
|
"quinn",
|
||||||
"rand 0.8.3",
|
"rand 0.8.3",
|
||||||
|
"rcgen",
|
||||||
"serde",
|
"serde",
|
||||||
"shellexpand",
|
"shellexpand",
|
||||||
"tokio",
|
"tokio",
|
||||||
@ -6638,3 +6662,12 @@ name = "xml-rs"
|
|||||||
version = "0.8.3"
|
version = "0.8.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
|
checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "yasna"
|
||||||
|
version = "0.3.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0de7bff972b4f2a06c85f6d8454b09df153af7e3a4ec2aac81db1b105b684ddb"
|
||||||
|
dependencies = [
|
||||||
|
"chrono",
|
||||||
|
]
|
||||||
|
@ -61,7 +61,7 @@ use comp::BuffKind;
|
|||||||
use futures_util::FutureExt;
|
use futures_util::FutureExt;
|
||||||
use hashbrown::{HashMap, HashSet};
|
use hashbrown::{HashMap, HashSet};
|
||||||
use image::DynamicImage;
|
use image::DynamicImage;
|
||||||
use network::{Network, Participant, Pid, ProtocolAddr, Stream};
|
use network::{ConnectAddr, Network, Participant, Pid, Stream};
|
||||||
use num::traits::FloatConst;
|
use num::traits::FloatConst;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use specs::Component;
|
use specs::Component;
|
||||||
@ -217,7 +217,7 @@ impl Client {
|
|||||||
// Try to connect to all IP's and return the first that works
|
// Try to connect to all IP's and return the first that works
|
||||||
let mut participant = None;
|
let mut participant = None;
|
||||||
for addr in addrs {
|
for addr in addrs {
|
||||||
match network.connect(ProtocolAddr::Tcp(addr)).await {
|
match network.connect(ConnectAddr::Tcp(addr)).await {
|
||||||
Ok(p) => {
|
Ok(p) => {
|
||||||
participant = Some(Ok(p));
|
participant = Some(Ok(p));
|
||||||
break;
|
break;
|
||||||
@ -228,7 +228,7 @@ impl Client {
|
|||||||
participant
|
participant
|
||||||
.unwrap_or_else(|| Err(Error::Other("No Ip Addr provided".to_string())))?
|
.unwrap_or_else(|| Err(Error::Other("No Ip Addr provided".to_string())))?
|
||||||
},
|
},
|
||||||
ConnectionArgs::Mpsc(id) => network.connect(ProtocolAddr::Mpsc(id)).await?,
|
ConnectionArgs::Mpsc(id) => network.connect(ConnectAddr::Mpsc(id)).await?,
|
||||||
};
|
};
|
||||||
|
|
||||||
let stream = participant.opened().await?;
|
let stream = participant.opened().await?;
|
||||||
|
@ -52,6 +52,8 @@ shellexpand = "2.0.0"
|
|||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
prometheus-hyper = "0.1.2"
|
prometheus-hyper = "0.1.2"
|
||||||
criterion = { version = "0.3.4", features = ["default", "async_tokio"] }
|
criterion = { version = "0.3.4", features = ["default", "async_tokio"] }
|
||||||
|
#quic
|
||||||
|
rcgen = { version = "0.8.10"}
|
||||||
|
|
||||||
[[bench]]
|
[[bench]]
|
||||||
name = "speed"
|
name = "speed"
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
|
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
|
||||||
use std::{net::SocketAddr, sync::Arc};
|
use std::{net::SocketAddr, sync::Arc};
|
||||||
use tokio::{runtime::Runtime, sync::Mutex};
|
use tokio::{runtime::Runtime, sync::Mutex};
|
||||||
use veloren_network::{Message, Network, Participant, Pid, Promises, ProtocolAddr, Stream};
|
use veloren_network::{
|
||||||
|
ConnectAddr, ListenAddr, Message, Network, Participant, Pid, Promises, Stream,
|
||||||
|
};
|
||||||
|
|
||||||
fn serialize(data: &[u8], stream: &Stream) { let _ = Message::serialize(data, stream.params()); }
|
fn serialize(data: &[u8], stream: &Stream) { let _ = Message::serialize(data, stream.params()); }
|
||||||
|
|
||||||
@ -30,7 +32,7 @@ fn criterion_util(c: &mut Criterion) {
|
|||||||
c.significance_level(0.1).sample_size(100);
|
c.significance_level(0.1).sample_size(100);
|
||||||
|
|
||||||
let (r, _n_a, p_a, s1_a, _n_b, _p_b, _s1_b) =
|
let (r, _n_a, p_a, s1_a, _n_b, _p_b, _s1_b) =
|
||||||
network_participant_stream(ProtocolAddr::Mpsc(5000));
|
network_participant_stream((ListenAddr::Mpsc(5000), ConnectAddr::Mpsc(5000)));
|
||||||
let s2_a = r.block_on(p_a.open(4, Promises::COMPRESSED, 0)).unwrap();
|
let s2_a = r.block_on(p_a.open(4, Promises::COMPRESSED, 0)).unwrap();
|
||||||
|
|
||||||
c.throughput(Throughput::Bytes(1000))
|
c.throughput(Throughput::Bytes(1000))
|
||||||
@ -50,7 +52,7 @@ fn criterion_mpsc(c: &mut Criterion) {
|
|||||||
c.significance_level(0.1).sample_size(10);
|
c.significance_level(0.1).sample_size(10);
|
||||||
|
|
||||||
let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, s1_b) =
|
let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, s1_b) =
|
||||||
network_participant_stream(ProtocolAddr::Mpsc(5000));
|
network_participant_stream((ListenAddr::Mpsc(5000), ConnectAddr::Mpsc(5000)));
|
||||||
let s1_a = Arc::new(Mutex::new(s1_a));
|
let s1_a = Arc::new(Mutex::new(s1_a));
|
||||||
let s1_b = Arc::new(Mutex::new(s1_b));
|
let s1_b = Arc::new(Mutex::new(s1_b));
|
||||||
|
|
||||||
@ -82,8 +84,9 @@ fn criterion_tcp(c: &mut Criterion) {
|
|||||||
let mut c = c.benchmark_group("net_tcp");
|
let mut c = c.benchmark_group("net_tcp");
|
||||||
c.significance_level(0.1).sample_size(10);
|
c.significance_level(0.1).sample_size(10);
|
||||||
|
|
||||||
|
let socket_addr = SocketAddr::from(([127, 0, 0, 1], 5000));
|
||||||
let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, s1_b) =
|
let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, s1_b) =
|
||||||
network_participant_stream(ProtocolAddr::Tcp(SocketAddr::from(([127, 0, 0, 1], 5000))));
|
network_participant_stream((ListenAddr::Tcp(socket_addr), ConnectAddr::Tcp(socket_addr)));
|
||||||
let s1_a = Arc::new(Mutex::new(s1_a));
|
let s1_a = Arc::new(Mutex::new(s1_a));
|
||||||
let s1_b = Arc::new(Mutex::new(s1_b));
|
let s1_b = Arc::new(Mutex::new(s1_b));
|
||||||
|
|
||||||
@ -115,7 +118,7 @@ criterion_group!(benches, criterion_util, criterion_mpsc, criterion_tcp);
|
|||||||
criterion_main!(benches);
|
criterion_main!(benches);
|
||||||
|
|
||||||
pub fn network_participant_stream(
|
pub fn network_participant_stream(
|
||||||
addr: ProtocolAddr,
|
addr: (ListenAddr, ConnectAddr),
|
||||||
) -> (
|
) -> (
|
||||||
Runtime,
|
Runtime,
|
||||||
Network,
|
Network,
|
||||||
@ -130,8 +133,8 @@ pub fn network_participant_stream(
|
|||||||
let n_a = Network::new(Pid::fake(0), &runtime);
|
let n_a = Network::new(Pid::fake(0), &runtime);
|
||||||
let n_b = Network::new(Pid::fake(1), &runtime);
|
let n_b = Network::new(Pid::fake(1), &runtime);
|
||||||
|
|
||||||
n_a.listen(addr.clone()).await.unwrap();
|
n_a.listen(addr.0).await.unwrap();
|
||||||
let p1_b = n_b.connect(addr).await.unwrap();
|
let p1_b = n_b.connect(addr.1).await.unwrap();
|
||||||
let p1_a = n_a.connected().await.unwrap();
|
let p1_a = n_a.connected().await.unwrap();
|
||||||
|
|
||||||
let s1_a = p1_a.open(4, Promises::empty(), 0).await.unwrap();
|
let s1_a = p1_a.open(4, Promises::empty(), 0).await.unwrap();
|
||||||
|
@ -8,7 +8,7 @@ use std::{sync::Arc, thread, time::Duration};
|
|||||||
use tokio::{io, io::AsyncBufReadExt, runtime::Runtime, sync::RwLock};
|
use tokio::{io, io::AsyncBufReadExt, runtime::Runtime, sync::RwLock};
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
use tracing_subscriber::EnvFilter;
|
use tracing_subscriber::EnvFilter;
|
||||||
use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr};
|
use veloren_network::{ConnectAddr, ListenAddr, Network, Participant, Pid, Promises};
|
||||||
|
|
||||||
///This example contains a simple chatserver, that allows to send messages
|
///This example contains a simple chatserver, that allows to send messages
|
||||||
/// between participants, it's neither pretty nor perfect, but it should show
|
/// between participants, it's neither pretty nor perfect, but it should show
|
||||||
@ -75,21 +75,27 @@ fn main() {
|
|||||||
|
|
||||||
let port: u16 = matches.value_of("port").unwrap().parse().unwrap();
|
let port: u16 = matches.value_of("port").unwrap().parse().unwrap();
|
||||||
let ip: &str = matches.value_of("ip").unwrap();
|
let ip: &str = matches.value_of("ip").unwrap();
|
||||||
let address = match matches.value_of("protocol") {
|
let addresses = match matches.value_of("protocol") {
|
||||||
Some("tcp") => ProtocolAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()),
|
Some("tcp") => (
|
||||||
Some("udp") => ProtocolAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()),
|
ListenAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||||
|
ConnectAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||||
|
),
|
||||||
|
Some("udp") => (
|
||||||
|
ListenAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||||
|
ConnectAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||||
|
),
|
||||||
_ => panic!("invalid mode, run --help!"),
|
_ => panic!("invalid mode, run --help!"),
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut background = None;
|
let mut background = None;
|
||||||
match matches.value_of("mode") {
|
match matches.value_of("mode") {
|
||||||
Some("server") => server(address),
|
Some("server") => server(addresses.0),
|
||||||
Some("client") => client(address),
|
Some("client") => client(addresses.1),
|
||||||
Some("both") => {
|
Some("both") => {
|
||||||
let address1 = address.clone();
|
let s = addresses.0;
|
||||||
background = Some(thread::spawn(|| server(address1)));
|
background = Some(thread::spawn(|| server(s)));
|
||||||
thread::sleep(Duration::from_millis(200)); //start client after server
|
thread::sleep(Duration::from_millis(200)); //start client after server
|
||||||
client(address)
|
client(addresses.1)
|
||||||
},
|
},
|
||||||
_ => panic!("invalid mode, run --help!"),
|
_ => panic!("invalid mode, run --help!"),
|
||||||
};
|
};
|
||||||
@ -98,7 +104,7 @@ fn main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn server(address: ProtocolAddr) {
|
fn server(address: ListenAddr) {
|
||||||
let r = Arc::new(Runtime::new().unwrap());
|
let r = Arc::new(Runtime::new().unwrap());
|
||||||
let server = Network::new(Pid::new(), &r);
|
let server = Network::new(Pid::new(), &r);
|
||||||
let server = Arc::new(server);
|
let server = Arc::new(server);
|
||||||
@ -144,7 +150,7 @@ async fn client_connection(
|
|||||||
println!("[{}] disconnected", username);
|
println!("[{}] disconnected", username);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn client(address: ProtocolAddr) {
|
fn client(address: ConnectAddr) {
|
||||||
let r = Arc::new(Runtime::new().unwrap());
|
let r = Arc::new(Runtime::new().unwrap());
|
||||||
let client = Network::new(Pid::new(), &r);
|
let client = Network::new(Pid::new(), &r);
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@ use rand::Rng;
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use veloren_network::{Participant, ProtocolAddr, Stream};
|
use veloren_network::{ConnectAddr, Participant, Stream};
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
@ -10,7 +10,7 @@ use std::collections::HashMap;
|
|||||||
pub enum LocalCommand {
|
pub enum LocalCommand {
|
||||||
Shutdown,
|
Shutdown,
|
||||||
Disconnect,
|
Disconnect,
|
||||||
Connect(ProtocolAddr),
|
Connect(ConnectAddr),
|
||||||
List,
|
List,
|
||||||
Serve(FileInfo),
|
Serve(FileInfo),
|
||||||
Get(u32, Option<String>),
|
Get(u32, Option<String>),
|
||||||
|
@ -9,7 +9,7 @@ use std::{path::PathBuf, sync::Arc, thread, time::Duration};
|
|||||||
use tokio::{io, io::AsyncBufReadExt, runtime::Runtime, sync::mpsc};
|
use tokio::{io, io::AsyncBufReadExt, runtime::Runtime, sync::mpsc};
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
use tracing_subscriber::EnvFilter;
|
use tracing_subscriber::EnvFilter;
|
||||||
use veloren_network::ProtocolAddr;
|
use veloren_network::{ConnectAddr, ListenAddr};
|
||||||
mod commands;
|
mod commands;
|
||||||
mod server;
|
mod server;
|
||||||
use commands::{FileInfo, LocalCommand};
|
use commands::{FileInfo, LocalCommand};
|
||||||
@ -50,7 +50,7 @@ fn main() {
|
|||||||
.init();
|
.init();
|
||||||
|
|
||||||
let port: u16 = matches.value_of("port").unwrap().parse().unwrap();
|
let port: u16 = matches.value_of("port").unwrap().parse().unwrap();
|
||||||
let address = ProtocolAddr::Tcp(format!("{}:{}", "127.0.0.1", port).parse().unwrap());
|
let address = ListenAddr::Tcp(format!("{}:{}", "127.0.0.1", port).parse().unwrap());
|
||||||
let runtime = Arc::new(Runtime::new().unwrap());
|
let runtime = Arc::new(Runtime::new().unwrap());
|
||||||
|
|
||||||
let (server, cmd_sender) = Server::new(Arc::clone(&runtime));
|
let (server, cmd_sender) = Server::new(Arc::clone(&runtime));
|
||||||
@ -158,12 +158,12 @@ async fn client(cmd_sender: mpsc::UnboundedSender<LocalCommand>) {
|
|||||||
.parse()
|
.parse()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
cmd_sender
|
cmd_sender
|
||||||
.send(LocalCommand::Connect(ProtocolAddr::Tcp(socketaddr)))
|
.send(LocalCommand::Connect(ConnectAddr::Tcp(socketaddr)))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
},
|
},
|
||||||
("t", _) => {
|
("t", _) => {
|
||||||
cmd_sender
|
cmd_sender
|
||||||
.send(LocalCommand::Connect(ProtocolAddr::Tcp(
|
.send(LocalCommand::Connect(ConnectAddr::Tcp(
|
||||||
"127.0.0.1:1231".parse().unwrap(),
|
"127.0.0.1:1231".parse().unwrap(),
|
||||||
)))
|
)))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
@ -8,7 +8,7 @@ use tokio::{
|
|||||||
};
|
};
|
||||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr, Stream};
|
use veloren_network::{ListenAddr, Network, Participant, Pid, Promises, Stream};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct ControlChannels {
|
struct ControlChannels {
|
||||||
@ -42,7 +42,7 @@ impl Server {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(mut self, address: ProtocolAddr) {
|
pub async fn run(mut self, address: ListenAddr) {
|
||||||
let run_channels = self.run_channels.take().unwrap();
|
let run_channels = self.run_channels.take().unwrap();
|
||||||
|
|
||||||
self.network.listen(address).await.unwrap();
|
self.network.listen(address).await.unwrap();
|
||||||
|
@ -16,7 +16,7 @@ use std::{
|
|||||||
use tokio::runtime::Runtime;
|
use tokio::runtime::Runtime;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
use tracing_subscriber::EnvFilter;
|
use tracing_subscriber::EnvFilter;
|
||||||
use veloren_network::{Message, Network, Pid, Promises, ProtocolAddr};
|
use veloren_network::{ConnectAddr, ListenAddr, Message, Network, Pid, Promises};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
enum Msg {
|
enum Msg {
|
||||||
@ -96,23 +96,29 @@ fn main() {
|
|||||||
|
|
||||||
let port: u16 = matches.value_of("port").unwrap().parse().unwrap();
|
let port: u16 = matches.value_of("port").unwrap().parse().unwrap();
|
||||||
let ip: &str = matches.value_of("ip").unwrap();
|
let ip: &str = matches.value_of("ip").unwrap();
|
||||||
let address = match matches.value_of("protocol") {
|
let addresses = match matches.value_of("protocol") {
|
||||||
Some("tcp") => ProtocolAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()),
|
Some("tcp") => (
|
||||||
Some("udp") => ProtocolAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()),
|
ListenAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||||
_ => panic!("Invalid mode, run --help!"),
|
ConnectAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||||
|
),
|
||||||
|
Some("udp") => (
|
||||||
|
ListenAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||||
|
ConnectAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()),
|
||||||
|
),
|
||||||
|
_ => panic!("invalid mode, run --help!"),
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut background = None;
|
let mut background = None;
|
||||||
let runtime = Arc::new(Runtime::new().unwrap());
|
let runtime = Arc::new(Runtime::new().unwrap());
|
||||||
match matches.value_of("mode") {
|
match matches.value_of("mode") {
|
||||||
Some("server") => server(address, Arc::clone(&runtime)),
|
Some("server") => server(addresses.0, Arc::clone(&runtime)),
|
||||||
Some("client") => client(address, Arc::clone(&runtime)),
|
Some("client") => client(addresses.1, Arc::clone(&runtime)),
|
||||||
Some("both") => {
|
Some("both") => {
|
||||||
let address1 = address.clone();
|
let s = addresses.0;
|
||||||
let runtime2 = Arc::clone(&runtime);
|
let runtime2 = Arc::clone(&runtime);
|
||||||
background = Some(thread::spawn(|| server(address1, runtime2)));
|
background = Some(thread::spawn(|| server(s, runtime2)));
|
||||||
thread::sleep(Duration::from_millis(200)); //start client after server
|
thread::sleep(Duration::from_millis(200)); //start client after server
|
||||||
client(address, Arc::clone(&runtime));
|
client(addresses.1, Arc::clone(&runtime));
|
||||||
},
|
},
|
||||||
_ => panic!("Invalid mode, run --help!"),
|
_ => panic!("Invalid mode, run --help!"),
|
||||||
};
|
};
|
||||||
@ -121,7 +127,7 @@ fn main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn server(address: ProtocolAddr, runtime: Arc<Runtime>) {
|
fn server(address: ListenAddr, runtime: Arc<Runtime>) {
|
||||||
let registry = Arc::new(Registry::new());
|
let registry = Arc::new(Registry::new());
|
||||||
let server = Network::new_with_registry(Pid::new(), &runtime, ®istry);
|
let server = Network::new_with_registry(Pid::new(), &runtime, ®istry);
|
||||||
runtime.spawn(Server::run(
|
runtime.spawn(Server::run(
|
||||||
@ -153,7 +159,7 @@ fn server(address: ProtocolAddr, runtime: Arc<Runtime>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn client(address: ProtocolAddr, runtime: Arc<Runtime>) {
|
fn client(address: ConnectAddr, runtime: Arc<Runtime>) {
|
||||||
let registry = Arc::new(Registry::new());
|
let registry = Arc::new(Registry::new());
|
||||||
let client = Network::new_with_registry(Pid::new(), &runtime, ®istry);
|
let client = Network::new_with_registry(Pid::new(), &runtime, ®istry);
|
||||||
runtime.spawn(Server::run(
|
runtime.spawn(Server::run(
|
||||||
|
@ -285,10 +285,12 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (id, (_, buffer)) in self.reliable_buffers.data.iter_mut().enumerate() {
|
for (id, (_, buffer)) in self.reliable_buffers.data.iter_mut().enumerate() {
|
||||||
|
if !buffer.is_empty() {
|
||||||
self.drain
|
self.drain
|
||||||
.send(QuicDataFormat::with_reliable(buffer, id as u64))
|
.send(QuicDataFormat::with_reliable(buffer, id as u64))
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
self.metrics
|
self.metrics
|
||||||
.sdata_frames_b(data_frames, data_bandwidth as u64);
|
.sdata_frames_b(data_frames, data_bandwidth as u64);
|
||||||
|
|
||||||
@ -340,7 +342,6 @@ where
|
|||||||
{
|
{
|
||||||
async fn recv(&mut self) -> Result<ProtocolEvent, ProtocolError> {
|
async fn recv(&mut self) -> Result<ProtocolEvent, ProtocolError> {
|
||||||
'outer: loop {
|
'outer: loop {
|
||||||
loop {
|
|
||||||
match ITFrame::read_frame(&mut self.main_buffer) {
|
match ITFrame::read_frame(&mut self.main_buffer) {
|
||||||
Ok(Some(frame)) => {
|
Ok(Some(frame)) => {
|
||||||
#[cfg(feature = "trace_pedantic")]
|
#[cfg(feature = "trace_pedantic")]
|
||||||
@ -374,10 +375,9 @@ where
|
|||||||
_ => break 'outer Err(ProtocolError::Violated),
|
_ => break 'outer Err(ProtocolError::Violated),
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
Ok(None) => break, //inner => read more data
|
Ok(None) => {},
|
||||||
Err(()) => return Err(ProtocolError::Violated),
|
Err(()) => return Err(ProtocolError::Violated),
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// try to order pending
|
// try to order pending
|
||||||
let mut pending_violated = false;
|
let mut pending_violated = false;
|
||||||
@ -401,6 +401,7 @@ where
|
|||||||
Ok(None) => false,
|
Ok(None) => false,
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
if pending_violated {
|
if pending_violated {
|
||||||
break 'outer Err(ProtocolError::Violated);
|
break 'outer Err(ProtocolError::Violated);
|
||||||
}
|
}
|
||||||
@ -436,8 +437,8 @@ where
|
|||||||
if reliable {
|
if reliable {
|
||||||
info!(
|
info!(
|
||||||
?mid,
|
?mid,
|
||||||
"protocol violation by remote side: send Data before \
|
"protocol violation by remote side: send Data \
|
||||||
Header"
|
before Header"
|
||||||
);
|
);
|
||||||
break 'outer Err(ProtocolError::Violated);
|
break 'outer Err(ProtocolError::Violated);
|
||||||
} else {
|
} else {
|
||||||
|
@ -28,9 +28,19 @@ use tracing::*;
|
|||||||
|
|
||||||
type A2sDisconnect = Arc<Mutex<Option<mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>>>>;
|
type A2sDisconnect = Arc<Mutex<Option<mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>>>>;
|
||||||
|
|
||||||
/// Represents a Tcp or Udp or Mpsc address
|
/// Represents a Tcp, Quic, Udp or Mpsc connection address
|
||||||
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum ProtocolAddr {
|
pub enum ConnectAddr {
|
||||||
|
Tcp(SocketAddr),
|
||||||
|
Udp(SocketAddr),
|
||||||
|
#[cfg(feature = "quic")]
|
||||||
|
Quic(SocketAddr, quinn::ClientConfig, String),
|
||||||
|
Mpsc(u64),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Represents a Tcp, Quic, Udp or Mpsc listen address
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub enum ListenAddr {
|
||||||
Tcp(SocketAddr),
|
Tcp(SocketAddr),
|
||||||
Udp(SocketAddr),
|
Udp(SocketAddr),
|
||||||
#[cfg(feature = "quic")]
|
#[cfg(feature = "quic")]
|
||||||
@ -135,8 +145,8 @@ pub struct StreamParams {
|
|||||||
/// [`Arc`](std::sync::Arc) as all commands have internal mutability.
|
/// [`Arc`](std::sync::Arc) as all commands have internal mutability.
|
||||||
///
|
///
|
||||||
/// The `Network` has methods to [`connect`] to other [`Participants`] actively
|
/// The `Network` has methods to [`connect`] to other [`Participants`] actively
|
||||||
/// via their [`ProtocolAddr`], or [`listen`] passively for [`connected`]
|
/// via their [`ProtocolConnectAddr`], or [`listen`] passively for [`connected`]
|
||||||
/// [`Participants`].
|
/// [`Participants`] via [`ProtocolListenAddr`].
|
||||||
///
|
///
|
||||||
/// Too guarantee a clean shutdown, the [`Runtime`] MUST NOT be droped before
|
/// Too guarantee a clean shutdown, the [`Runtime`] MUST NOT be droped before
|
||||||
/// the Network.
|
/// the Network.
|
||||||
@ -144,7 +154,7 @@ pub struct StreamParams {
|
|||||||
/// # Examples
|
/// # Examples
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// use tokio::runtime::Runtime;
|
/// use tokio::runtime::Runtime;
|
||||||
/// use veloren_network::{Network, ProtocolAddr, Pid};
|
/// use veloren_network::{Network, ConnectAddr, ListenAddr, Pid};
|
||||||
///
|
///
|
||||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
/// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application
|
/// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application
|
||||||
@ -153,9 +163,9 @@ pub struct StreamParams {
|
|||||||
/// runtime.block_on(async{
|
/// runtime.block_on(async{
|
||||||
/// # //setup pseudo database!
|
/// # //setup pseudo database!
|
||||||
/// # let database = Network::new(Pid::new(), &runtime);
|
/// # let database = Network::new(Pid::new(), &runtime);
|
||||||
/// # database.listen(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
|
/// # database.listen(ListenAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
|
||||||
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?;
|
/// network.listen(ListenAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?;
|
||||||
/// let database = network.connect(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
|
/// let database = network.connect(ConnectAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
|
||||||
/// drop(network);
|
/// drop(network);
|
||||||
/// # drop(database);
|
/// # drop(database);
|
||||||
/// # Ok(())
|
/// # Ok(())
|
||||||
@ -171,7 +181,7 @@ pub struct StreamParams {
|
|||||||
pub struct Network {
|
pub struct Network {
|
||||||
local_pid: Pid,
|
local_pid: Pid,
|
||||||
participant_disconnect_sender: Arc<Mutex<HashMap<Pid, A2sDisconnect>>>,
|
participant_disconnect_sender: Arc<Mutex<HashMap<Pid, A2sDisconnect>>>,
|
||||||
listen_sender: Mutex<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<io::Result<()>>)>>,
|
listen_sender: Mutex<mpsc::UnboundedSender<(ListenAddr, oneshot::Sender<io::Result<()>>)>>,
|
||||||
connect_sender: Mutex<mpsc::UnboundedSender<A2sConnect>>,
|
connect_sender: Mutex<mpsc::UnboundedSender<A2sConnect>>,
|
||||||
connected_receiver: Mutex<mpsc::UnboundedReceiver<Participant>>,
|
connected_receiver: Mutex<mpsc::UnboundedReceiver<Participant>>,
|
||||||
shutdown_network_s: Option<oneshot::Sender<oneshot::Sender<()>>>,
|
shutdown_network_s: Option<oneshot::Sender<oneshot::Sender<()>>>,
|
||||||
@ -197,7 +207,7 @@ impl Network {
|
|||||||
/// # Examples
|
/// # Examples
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// use tokio::runtime::Runtime;
|
/// use tokio::runtime::Runtime;
|
||||||
/// use veloren_network::{Network, Pid, ProtocolAddr};
|
/// use veloren_network::{Network, Pid};
|
||||||
///
|
///
|
||||||
/// let runtime = Runtime::new().unwrap();
|
/// let runtime = Runtime::new().unwrap();
|
||||||
/// let network = Network::new(Pid::new(), &runtime);
|
/// let network = Network::new(Pid::new(), &runtime);
|
||||||
@ -230,7 +240,7 @@ impl Network {
|
|||||||
/// ```rust
|
/// ```rust
|
||||||
/// use prometheus::Registry;
|
/// use prometheus::Registry;
|
||||||
/// use tokio::runtime::Runtime;
|
/// use tokio::runtime::Runtime;
|
||||||
/// use veloren_network::{Network, Pid, ProtocolAddr};
|
/// use veloren_network::{Network, Pid};
|
||||||
///
|
///
|
||||||
/// let runtime = Runtime::new().unwrap();
|
/// let runtime = Runtime::new().unwrap();
|
||||||
/// let registry = Registry::new();
|
/// let registry = Registry::new();
|
||||||
@ -283,7 +293,7 @@ impl Network {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// starts listening on an [`ProtocolAddr`].
|
/// starts listening on an [`ProtocolListenAddr`].
|
||||||
/// When the method returns the `Network` is ready to listen for incoming
|
/// When the method returns the `Network` is ready to listen for incoming
|
||||||
/// connections OR has returned a [`NetworkError`] (e.g. port already used).
|
/// connections OR has returned a [`NetworkError`] (e.g. port already used).
|
||||||
/// You can call [`connected`] to asynchrony wait for a [`Participant`] to
|
/// You can call [`connected`] to asynchrony wait for a [`Participant`] to
|
||||||
@ -293,7 +303,7 @@ impl Network {
|
|||||||
/// # Examples
|
/// # Examples
|
||||||
/// ```ignore
|
/// ```ignore
|
||||||
/// use tokio::runtime::Runtime;
|
/// use tokio::runtime::Runtime;
|
||||||
/// use veloren_network::{Network, Pid, ProtocolAddr};
|
/// use veloren_network::{Network, Pid, ProtocolListenAddr};
|
||||||
///
|
///
|
||||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
/// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally
|
/// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally
|
||||||
@ -301,10 +311,10 @@ impl Network {
|
|||||||
/// let network = Network::new(Pid::new(), &runtime);
|
/// let network = Network::new(Pid::new(), &runtime);
|
||||||
/// runtime.block_on(async {
|
/// runtime.block_on(async {
|
||||||
/// network
|
/// network
|
||||||
/// .listen(ProtocolAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
|
/// .listen(ProtocolListenAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
|
||||||
/// .await?;
|
/// .await?;
|
||||||
/// network
|
/// network
|
||||||
/// .listen(ProtocolAddr::Udp("127.0.0.1:2001".parse().unwrap()))
|
/// .listen(ProtocolListenAddr::Udp("127.0.0.1:2001".parse().unwrap()))
|
||||||
/// .await?;
|
/// .await?;
|
||||||
/// drop(network);
|
/// drop(network);
|
||||||
/// # Ok(())
|
/// # Ok(())
|
||||||
@ -314,7 +324,7 @@ impl Network {
|
|||||||
///
|
///
|
||||||
/// [`connected`]: Network::connected
|
/// [`connected`]: Network::connected
|
||||||
#[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
|
#[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
|
||||||
pub async fn listen(&self, address: ProtocolAddr) -> Result<(), NetworkError> {
|
pub async fn listen(&self, address: ListenAddr) -> Result<(), NetworkError> {
|
||||||
let (s2a_result_s, s2a_result_r) = oneshot::channel::<tokio::io::Result<()>>();
|
let (s2a_result_s, s2a_result_r) = oneshot::channel::<tokio::io::Result<()>>();
|
||||||
debug!(?address, "listening on address");
|
debug!(?address, "listening on address");
|
||||||
self.listen_sender
|
self.listen_sender
|
||||||
@ -329,13 +339,13 @@ impl Network {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// starts connection to an [`ProtocolAddr`].
|
/// starts connection to an [`ProtocolConnectAddr`].
|
||||||
/// When the method returns the Network either returns a [`Participant`]
|
/// When the method returns the Network either returns a [`Participant`]
|
||||||
/// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g.
|
/// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g.
|
||||||
/// can't connect, or invalid Handshake) # Examples
|
/// can't connect, or invalid Handshake) # Examples
|
||||||
/// ```ignore
|
/// ```ignore
|
||||||
/// use tokio::runtime::Runtime;
|
/// use tokio::runtime::Runtime;
|
||||||
/// use veloren_network::{Network, Pid, ProtocolAddr};
|
/// use veloren_network::{Network, Pid, ProtocolListenAddr, ProtocolConnectAddr};
|
||||||
///
|
///
|
||||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
/// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above
|
/// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above
|
||||||
@ -343,16 +353,16 @@ impl Network {
|
|||||||
/// let network = Network::new(Pid::new(), &runtime);
|
/// let network = Network::new(Pid::new(), &runtime);
|
||||||
/// # let remote = Network::new(Pid::new(), &runtime);
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
||||||
/// runtime.block_on(async {
|
/// runtime.block_on(async {
|
||||||
/// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2010".parse().unwrap())).await?;
|
/// # remote.listen(ProtocolListenAddr::Tcp("127.0.0.1:2010".parse().unwrap())).await?;
|
||||||
/// # remote.listen(ProtocolAddr::Udp("127.0.0.1:2011".parse().unwrap())).await?;
|
/// # remote.listen(ProtocolListenAddr::Udp("127.0.0.1:2011".parse().unwrap())).await?;
|
||||||
/// let p1 = network
|
/// let p1 = network
|
||||||
/// .connect(ProtocolAddr::Tcp("127.0.0.1:2010".parse().unwrap()))
|
/// .connect(ProtocolConnectAddr::Tcp("127.0.0.1:2010".parse().unwrap()))
|
||||||
/// .await?;
|
/// .await?;
|
||||||
/// # //this doesn't work yet, so skip the test
|
/// # //this doesn't work yet, so skip the test
|
||||||
/// # //TODO fixme!
|
/// # //TODO fixme!
|
||||||
/// # return Ok(());
|
/// # return Ok(());
|
||||||
/// let p2 = network
|
/// let p2 = network
|
||||||
/// .connect(ProtocolAddr::Udp("127.0.0.1:2011".parse().unwrap()))
|
/// .connect(ProtocolConnectAddr::Udp("127.0.0.1:2011".parse().unwrap()))
|
||||||
/// .await?;
|
/// .await?;
|
||||||
/// assert_eq!(&p1, &p2);
|
/// assert_eq!(&p1, &p2);
|
||||||
/// # Ok(())
|
/// # Ok(())
|
||||||
@ -364,15 +374,15 @@ impl Network {
|
|||||||
/// ```
|
/// ```
|
||||||
/// Usually the `Network` guarantees that a operation on a [`Participant`]
|
/// Usually the `Network` guarantees that a operation on a [`Participant`]
|
||||||
/// succeeds, e.g. by automatic retrying unless it fails completely e.g. by
|
/// succeeds, e.g. by automatic retrying unless it fails completely e.g. by
|
||||||
/// disconnecting from the remote. If 2 [`ProtocolAddres`] you `connect` to
|
/// disconnecting from the remote. If 2 [`ProtocolConnectAddres`] you
|
||||||
/// belongs to the same [`Participant`], you get the same [`Participant`] as
|
/// `connect` to belongs to the same [`Participant`], you get the same
|
||||||
/// a result. This is useful e.g. by connecting to the same
|
/// [`Participant`] as a result. This is useful e.g. by connecting to
|
||||||
/// [`Participant`] via multiple Protocols.
|
/// the same [`Participant`] via multiple Protocols.
|
||||||
///
|
///
|
||||||
/// [`Streams`]: crate::api::Stream
|
/// [`Streams`]: crate::api::Stream
|
||||||
/// [`ProtocolAddres`]: crate::api::ProtocolAddr
|
/// [`ProtocolConnectAddres`]: crate::api::ProtocolConnectAddr
|
||||||
#[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
|
#[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
|
||||||
pub async fn connect(&self, address: ProtocolAddr) -> Result<Participant, NetworkError> {
|
pub async fn connect(&self, address: ConnectAddr) -> Result<Participant, NetworkError> {
|
||||||
let (pid_sender, pid_receiver) =
|
let (pid_sender, pid_receiver) =
|
||||||
oneshot::channel::<Result<Participant, NetworkConnectError>>();
|
oneshot::channel::<Result<Participant, NetworkConnectError>>();
|
||||||
debug!(?address, "Connect to address");
|
debug!(?address, "Connect to address");
|
||||||
@ -393,15 +403,15 @@ impl Network {
|
|||||||
Ok(participant)
|
Ok(participant)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// returns a [`Participant`] created from a [`ProtocolAddr`] you called
|
/// returns a [`Participant`] created from a [`ProtocolListenAddr`] you
|
||||||
/// [`listen`] on before. This function will either return a working
|
/// called [`listen`] on before. This function will either return a
|
||||||
/// [`Participant`] ready to open [`Streams`] on OR has returned a
|
/// working [`Participant`] ready to open [`Streams`] on OR has returned
|
||||||
/// [`NetworkError`] (e.g. Network got closed)
|
/// a [`NetworkError`] (e.g. Network got closed)
|
||||||
///
|
///
|
||||||
/// # Examples
|
/// # Examples
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// use tokio::runtime::Runtime;
|
/// use tokio::runtime::Runtime;
|
||||||
/// use veloren_network::{Network, Pid, ProtocolAddr};
|
/// use veloren_network::{ConnectAddr, ListenAddr, Network, Pid};
|
||||||
///
|
///
|
||||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
/// // Create a Network, listen on port `2020` TCP and opens returns their Pid
|
/// // Create a Network, listen on port `2020` TCP and opens returns their Pid
|
||||||
@ -410,9 +420,9 @@ impl Network {
|
|||||||
/// # let remote = Network::new(Pid::new(), &runtime);
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
||||||
/// runtime.block_on(async {
|
/// runtime.block_on(async {
|
||||||
/// network
|
/// network
|
||||||
/// .listen(ProtocolAddr::Tcp("127.0.0.1:2020".parse().unwrap()))
|
/// .listen(ListenAddr::Tcp("127.0.0.1:2020".parse().unwrap()))
|
||||||
/// .await?;
|
/// .await?;
|
||||||
/// # remote.connect(ProtocolAddr::Tcp("127.0.0.1:2020".parse().unwrap())).await?;
|
/// # remote.connect(ConnectAddr::Tcp("127.0.0.1:2020".parse().unwrap())).await?;
|
||||||
/// while let Ok(participant) = network.connected().await {
|
/// while let Ok(participant) = network.connected().await {
|
||||||
/// println!("Participant connected: {}", participant.remote_pid());
|
/// println!("Participant connected: {}", participant.remote_pid());
|
||||||
/// # //skip test here as it would be a endless loop
|
/// # //skip test here as it would be a endless loop
|
||||||
@ -530,7 +540,7 @@ impl Participant {
|
|||||||
/// # Examples
|
/// # Examples
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// use tokio::runtime::Runtime;
|
/// use tokio::runtime::Runtime;
|
||||||
/// use veloren_network::{Network, Pid, Promises, ProtocolAddr};
|
/// use veloren_network::{ConnectAddr, ListenAddr, Network, Pid, Promises};
|
||||||
///
|
///
|
||||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
/// // Create a Network, connect on port 2100 and open a stream
|
/// // Create a Network, connect on port 2100 and open a stream
|
||||||
@ -538,9 +548,9 @@ impl Participant {
|
|||||||
/// let network = Network::new(Pid::new(), &runtime);
|
/// let network = Network::new(Pid::new(), &runtime);
|
||||||
/// # let remote = Network::new(Pid::new(), &runtime);
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
||||||
/// runtime.block_on(async {
|
/// runtime.block_on(async {
|
||||||
/// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap())).await?;
|
/// # remote.listen(ListenAddr::Tcp("127.0.0.1:2100".parse().unwrap())).await?;
|
||||||
/// let p1 = network
|
/// let p1 = network
|
||||||
/// .connect(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap()))
|
/// .connect(ConnectAddr::Tcp("127.0.0.1:2100".parse().unwrap()))
|
||||||
/// .await?;
|
/// .await?;
|
||||||
/// let _s1 = p1
|
/// let _s1 = p1
|
||||||
/// .open(4, Promises::ORDERED | Promises::CONSISTENCY, 1000)
|
/// .open(4, Promises::ORDERED | Promises::CONSISTENCY, 1000)
|
||||||
@ -597,7 +607,7 @@ impl Participant {
|
|||||||
/// # Examples
|
/// # Examples
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// use tokio::runtime::Runtime;
|
/// use tokio::runtime::Runtime;
|
||||||
/// use veloren_network::{Network, Pid, ProtocolAddr, Promises};
|
/// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr, Promises};
|
||||||
///
|
///
|
||||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
/// // Create a Network, connect on port 2110 and wait for the other side to open a stream
|
/// // Create a Network, connect on port 2110 and wait for the other side to open a stream
|
||||||
@ -606,8 +616,8 @@ impl Participant {
|
|||||||
/// let network = Network::new(Pid::new(), &runtime);
|
/// let network = Network::new(Pid::new(), &runtime);
|
||||||
/// # let remote = Network::new(Pid::new(), &runtime);
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
||||||
/// runtime.block_on(async {
|
/// runtime.block_on(async {
|
||||||
/// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
|
/// # remote.listen(ListenAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
|
||||||
/// let p1 = network.connect(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
|
/// let p1 = network.connect(ConnectAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
|
||||||
/// # let p2 = remote.connected().await?;
|
/// # let p2 = remote.connected().await?;
|
||||||
/// # p2.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
/// # p2.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
||||||
/// let _s1 = p1.opened().await?;
|
/// let _s1 = p1.opened().await?;
|
||||||
@ -654,7 +664,7 @@ impl Participant {
|
|||||||
/// # Examples
|
/// # Examples
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// use tokio::runtime::Runtime;
|
/// use tokio::runtime::Runtime;
|
||||||
/// use veloren_network::{Network, Pid, ProtocolAddr};
|
/// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr};
|
||||||
///
|
///
|
||||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
/// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection.
|
/// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection.
|
||||||
@ -663,9 +673,9 @@ impl Participant {
|
|||||||
/// # let remote = Network::new(Pid::new(), &runtime);
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
||||||
/// let err = runtime.block_on(async {
|
/// let err = runtime.block_on(async {
|
||||||
/// network
|
/// network
|
||||||
/// .listen(ProtocolAddr::Tcp("127.0.0.1:2030".parse().unwrap()))
|
/// .listen(ListenAddr::Tcp("127.0.0.1:2030".parse().unwrap()))
|
||||||
/// .await?;
|
/// .await?;
|
||||||
/// # let keep_alive = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2030".parse().unwrap())).await?;
|
/// # let keep_alive = remote.connect(ConnectAddr::Tcp("127.0.0.1:2030".parse().unwrap())).await?;
|
||||||
/// while let Ok(participant) = network.connected().await {
|
/// while let Ok(participant) = network.connected().await {
|
||||||
/// println!("Participant connected: {}", participant.remote_pid());
|
/// println!("Participant connected: {}", participant.remote_pid());
|
||||||
/// participant.disconnect().await?;
|
/// participant.disconnect().await?;
|
||||||
@ -790,7 +800,7 @@ impl Stream {
|
|||||||
/// ```
|
/// ```
|
||||||
/// # use veloren_network::Promises;
|
/// # use veloren_network::Promises;
|
||||||
/// use tokio::runtime::Runtime;
|
/// use tokio::runtime::Runtime;
|
||||||
/// use veloren_network::{Network, ProtocolAddr, Pid};
|
/// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
|
||||||
///
|
///
|
||||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
/// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World`
|
/// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World`
|
||||||
@ -798,8 +808,8 @@ impl Stream {
|
|||||||
/// let network = Network::new(Pid::new(), &runtime);
|
/// let network = Network::new(Pid::new(), &runtime);
|
||||||
/// # let remote = Network::new(Pid::new(), &runtime);
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
||||||
/// runtime.block_on(async {
|
/// runtime.block_on(async {
|
||||||
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
|
/// network.listen(ListenAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
|
||||||
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
|
/// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
|
||||||
/// # // keep it alive
|
/// # // keep it alive
|
||||||
/// # let _stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
/// # let _stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
||||||
/// let participant_a = network.connected().await?;
|
/// let participant_a = network.connected().await?;
|
||||||
@ -832,7 +842,7 @@ impl Stream {
|
|||||||
/// # use veloren_network::Promises;
|
/// # use veloren_network::Promises;
|
||||||
/// use tokio::runtime::Runtime;
|
/// use tokio::runtime::Runtime;
|
||||||
/// use bincode;
|
/// use bincode;
|
||||||
/// use veloren_network::{Network, ProtocolAddr, Pid, Message};
|
/// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid, Message};
|
||||||
///
|
///
|
||||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
/// let runtime = Runtime::new().unwrap();
|
/// let runtime = Runtime::new().unwrap();
|
||||||
@ -840,9 +850,9 @@ impl Stream {
|
|||||||
/// # let remote1 = Network::new(Pid::new(), &runtime);
|
/// # let remote1 = Network::new(Pid::new(), &runtime);
|
||||||
/// # let remote2 = Network::new(Pid::new(), &runtime);
|
/// # let remote2 = Network::new(Pid::new(), &runtime);
|
||||||
/// runtime.block_on(async {
|
/// runtime.block_on(async {
|
||||||
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
/// network.listen(ListenAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
||||||
/// # let remote1_p = remote1.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
/// # let remote1_p = remote1.connect(ConnectAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
||||||
/// # let remote2_p = remote2.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
/// # let remote2_p = remote2.connect(ConnectAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
|
||||||
/// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid());
|
/// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid());
|
||||||
/// # remote1_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
/// # remote1_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
||||||
/// # remote2_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
/// # remote2_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
||||||
@ -891,7 +901,7 @@ impl Stream {
|
|||||||
/// ```
|
/// ```
|
||||||
/// # use veloren_network::Promises;
|
/// # use veloren_network::Promises;
|
||||||
/// use tokio::runtime::Runtime;
|
/// use tokio::runtime::Runtime;
|
||||||
/// use veloren_network::{Network, ProtocolAddr, Pid};
|
/// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
|
||||||
///
|
///
|
||||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
/// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it
|
/// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it
|
||||||
@ -899,8 +909,8 @@ impl Stream {
|
|||||||
/// let network = Network::new(Pid::new(), &runtime);
|
/// let network = Network::new(Pid::new(), &runtime);
|
||||||
/// # let remote = Network::new(Pid::new(), &runtime);
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
||||||
/// runtime.block_on(async {
|
/// runtime.block_on(async {
|
||||||
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
|
/// network.listen(ListenAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
|
||||||
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
|
/// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
|
||||||
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
||||||
/// # stream_p.send("Hello World");
|
/// # stream_p.send("Hello World");
|
||||||
/// let participant_a = network.connected().await?;
|
/// let participant_a = network.connected().await?;
|
||||||
@ -925,7 +935,7 @@ impl Stream {
|
|||||||
/// ```
|
/// ```
|
||||||
/// # use veloren_network::Promises;
|
/// # use veloren_network::Promises;
|
||||||
/// use tokio::runtime::Runtime;
|
/// use tokio::runtime::Runtime;
|
||||||
/// use veloren_network::{Network, ProtocolAddr, Pid};
|
/// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
|
||||||
///
|
///
|
||||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
/// // Create a Network, listen on Port `2230` and wait for a Stream to be opened, then listen on it
|
/// // Create a Network, listen on Port `2230` and wait for a Stream to be opened, then listen on it
|
||||||
@ -933,8 +943,8 @@ impl Stream {
|
|||||||
/// let network = Network::new(Pid::new(), &runtime);
|
/// let network = Network::new(Pid::new(), &runtime);
|
||||||
/// # let remote = Network::new(Pid::new(), &runtime);
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
||||||
/// runtime.block_on(async {
|
/// runtime.block_on(async {
|
||||||
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
|
/// network.listen(ListenAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
|
||||||
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
|
/// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
|
||||||
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
||||||
/// # stream_p.send("Hello World");
|
/// # stream_p.send("Hello World");
|
||||||
/// let participant_a = network.connected().await?;
|
/// let participant_a = network.connected().await?;
|
||||||
@ -981,7 +991,7 @@ impl Stream {
|
|||||||
/// ```
|
/// ```
|
||||||
/// # use veloren_network::Promises;
|
/// # use veloren_network::Promises;
|
||||||
/// use tokio::runtime::Runtime;
|
/// use tokio::runtime::Runtime;
|
||||||
/// use veloren_network::{Network, ProtocolAddr, Pid};
|
/// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
|
||||||
///
|
///
|
||||||
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
/// // Create a Network, listen on Port `2240` and wait for a Stream to be opened, then listen on it
|
/// // Create a Network, listen on Port `2240` and wait for a Stream to be opened, then listen on it
|
||||||
@ -989,8 +999,8 @@ impl Stream {
|
|||||||
/// let network = Network::new(Pid::new(), &runtime);
|
/// let network = Network::new(Pid::new(), &runtime);
|
||||||
/// # let remote = Network::new(Pid::new(), &runtime);
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
||||||
/// runtime.block_on(async {
|
/// runtime.block_on(async {
|
||||||
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
|
/// network.listen(ListenAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
|
||||||
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
|
/// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
|
||||||
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
||||||
/// # stream_p.send("Hello World");
|
/// # stream_p.send("Hello World");
|
||||||
/// # std::thread::sleep(std::time::Duration::from_secs(1));
|
/// # std::thread::sleep(std::time::Duration::from_secs(1));
|
||||||
|
@ -1,19 +1,20 @@
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use network_protocol::{
|
use network_protocol::{
|
||||||
QuicDataFormat, QuicDataFormatStream, QuicSendProtocol, QuicRecvProtocol,
|
|
||||||
Bandwidth, Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid,
|
Bandwidth, Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid,
|
||||||
ProtocolError, ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol,
|
ProtocolError, ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, QuicDataFormat,
|
||||||
|
QuicDataFormatStream, QuicRecvProtocol, QuicSendProtocol, Sid, TcpRecvProtocol,
|
||||||
TcpSendProtocol, UnreliableDrain, UnreliableSink,
|
TcpSendProtocol, UnreliableDrain, UnreliableSink,
|
||||||
};
|
};
|
||||||
#[cfg(feature = "quic")] use quinn::*;
|
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{AsyncReadExt, AsyncWriteExt},
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
net::tcp::{OwnedReadHalf, OwnedWriteHalf},
|
net::tcp::{OwnedReadHalf, OwnedWriteHalf},
|
||||||
sync::mpsc,
|
sync::mpsc,
|
||||||
};
|
};
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
|
||||||
|
#[allow(clippy::large_enum_variant)]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) enum Protocols {
|
pub(crate) enum Protocols {
|
||||||
Tcp((TcpSendProtocol<TcpDrain>, TcpRecvProtocol<TcpSink>)),
|
Tcp((TcpSendProtocol<TcpDrain>, TcpRecvProtocol<TcpSink>)),
|
||||||
@ -35,7 +36,7 @@ pub(crate) enum RecvProtocols {
|
|||||||
Tcp(TcpRecvProtocol<TcpSink>),
|
Tcp(TcpRecvProtocol<TcpSink>),
|
||||||
Mpsc(MpscRecvProtocol<MpscSink>),
|
Mpsc(MpscRecvProtocol<MpscSink>),
|
||||||
#[cfg(feature = "quic")]
|
#[cfg(feature = "quic")]
|
||||||
Quic(QuicSendProtocol<QuicDrain>),
|
Quic(QuicRecvProtocol<QuicSink>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Protocols {
|
impl Protocols {
|
||||||
@ -73,26 +74,39 @@ impl Protocols {
|
|||||||
|
|
||||||
#[cfg(feature = "quic")]
|
#[cfg(feature = "quic")]
|
||||||
pub(crate) async fn new_quic(
|
pub(crate) async fn new_quic(
|
||||||
connection: quinn::NewConnection,
|
mut connection: quinn::NewConnection,
|
||||||
|
listen: bool,
|
||||||
cid: Cid,
|
cid: Cid,
|
||||||
metrics: Arc<ProtocolMetrics>,
|
metrics: Arc<ProtocolMetrics>,
|
||||||
) -> Result<Self, quinn::ConnectionError> {
|
) -> Result<Self, quinn::ConnectionError> {
|
||||||
let metrics = ProtocolMetricCache::new(&cid.to_string(), metrics);
|
let metrics = ProtocolMetricCache::new(&cid.to_string(), metrics);
|
||||||
|
|
||||||
let (sendstream, recvstream) = connection.connection.open_bi().await?;
|
let (sendstream, recvstream) = if listen {
|
||||||
|
connection.connection.open_bi().await?
|
||||||
|
} else {
|
||||||
let sp = QuicSendProtocol::new(QuicDrain {
|
connection.bi_streams.next().await.expect("none").expect("dasdasd")
|
||||||
|
};
|
||||||
|
let (streams_s,streams_r) = mpsc::unbounded_channel();
|
||||||
|
let streams_s_clone = streams_s.clone();
|
||||||
|
let sp = QuicSendProtocol::new(
|
||||||
|
QuicDrain {
|
||||||
con: connection.connection.clone(),
|
con: connection.connection.clone(),
|
||||||
main: sendstream,
|
main: sendstream,
|
||||||
reliables: vec!(),
|
reliables: std::collections::HashMap::new(),
|
||||||
}, metrics.clone());
|
streams_s: streams_s_clone,
|
||||||
let rp = QuicRecvProtocol::new(QuicSink {
|
},
|
||||||
|
metrics.clone(),
|
||||||
|
);
|
||||||
|
spawn_new(recvstream, None, &streams_s);
|
||||||
|
let rp = QuicRecvProtocol::new(
|
||||||
|
QuicSink {
|
||||||
con: connection.connection,
|
con: connection.connection,
|
||||||
main: recvstream,
|
bi: connection.bi_streams,
|
||||||
reliables: vec!(),
|
streams_r,
|
||||||
buffer: BytesMut::new(),
|
streams_s,
|
||||||
}, metrics);
|
},
|
||||||
|
metrics,
|
||||||
|
);
|
||||||
Ok(Protocols::Quic((sp, rp)))
|
Ok(Protocols::Quic((sp, rp)))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,50 +257,128 @@ impl UnreliableSink for MpscSink {
|
|||||||
|
|
||||||
///////////////////////////////////////
|
///////////////////////////////////////
|
||||||
//// QUIC
|
//// QUIC
|
||||||
|
#[cfg(feature = "quic")]
|
||||||
|
type QuicStream = (BytesMut, Result<Option<usize>, quinn::ReadError>, quinn::RecvStream, Option<u64>);
|
||||||
|
|
||||||
|
#[cfg(feature = "quic")]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct QuicDrain {
|
pub struct QuicDrain {
|
||||||
con: quinn::Connection,
|
con: quinn::Connection,
|
||||||
main: quinn::SendStream,
|
main: quinn::SendStream,
|
||||||
reliables: Vec<quinn::SendStream>,
|
reliables: std::collections::HashMap<u64, quinn::SendStream>,
|
||||||
|
streams_s: mpsc::UnboundedSender<QuicStream>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "quic")]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct QuicSink {
|
pub struct QuicSink {
|
||||||
con: quinn::Connection,
|
con: quinn::Connection,
|
||||||
main: quinn::RecvStream,
|
bi: quinn::IncomingBiStreams,
|
||||||
reliables: Vec<quinn::RecvStream>,
|
streams_r: mpsc::UnboundedReceiver<QuicStream>,
|
||||||
buffer: BytesMut,
|
streams_s: mpsc::UnboundedSender<QuicStream>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "quic")]
|
||||||
|
fn spawn_new(mut recvstream: quinn::RecvStream, id: Option<u64>, streams_s: &mpsc::UnboundedSender<QuicStream>) {
|
||||||
|
let streams_s_clone = streams_s.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut buffer = BytesMut::new();
|
||||||
|
buffer.resize(1500, 0u8);
|
||||||
|
let r = recvstream.read(&mut buffer).await;
|
||||||
|
let _ = streams_s_clone.send((buffer, r, recvstream, id));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "quic")]
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl UnreliableDrain for QuicDrain {
|
impl UnreliableDrain for QuicDrain {
|
||||||
type DataFormat = QuicDataFormat;
|
type DataFormat = QuicDataFormat;
|
||||||
|
|
||||||
async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> {
|
async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> {
|
||||||
match match data.stream {
|
match match data.stream {
|
||||||
QuicDataFormatStream::Main => self.main.write_all(&data.data),
|
QuicDataFormatStream::Main => {
|
||||||
|
self.main.write_all(&data.data).await
|
||||||
|
},
|
||||||
QuicDataFormatStream::Unreliable => unimplemented!(),
|
QuicDataFormatStream::Unreliable => unimplemented!(),
|
||||||
QuicDataFormatStream::Reliable(id) => self.reliables.get_mut(id as usize).ok_or(ProtocolError::Closed)?.write_all(&data.data),
|
QuicDataFormatStream::Reliable(id) => {
|
||||||
}.await {
|
use std::collections::hash_map::Entry;
|
||||||
|
match self.reliables.entry(id) {
|
||||||
|
Entry::Occupied(mut occupied) => {
|
||||||
|
occupied.get_mut().write_all(&data.data).await
|
||||||
|
},
|
||||||
|
Entry::Vacant(vacant) => {
|
||||||
|
match self.con.open_bi().await {
|
||||||
|
Ok((sendstream, recvstream)) => {
|
||||||
|
let id = Some(0); //TODO FIXME
|
||||||
|
spawn_new(recvstream, id, &self.streams_s);
|
||||||
|
vacant.insert(sendstream).write_all(&data.data).await
|
||||||
|
},
|
||||||
|
Err(_) => return Err(ProtocolError::Closed),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
{
|
||||||
Ok(()) => Ok(()),
|
Ok(()) => Ok(()),
|
||||||
Err(_) => Err(ProtocolError::Closed),
|
Err(_) => Err(ProtocolError::Closed),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "quic")]
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl UnreliableSink for QuicSink {
|
impl UnreliableSink for QuicSink {
|
||||||
type DataFormat = QuicDataFormat;
|
type DataFormat = QuicDataFormat;
|
||||||
|
|
||||||
async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError> {
|
async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError> {
|
||||||
self.buffer.resize(1500, 0u8);
|
let (mut buffer, result, mut recvstream, id) = loop {
|
||||||
//TODO improve
|
use futures_util::FutureExt;
|
||||||
match self.main.read(&mut self.buffer).await {
|
// first handle all bi streams!
|
||||||
|
let (a, b) = tokio::select! {
|
||||||
|
biased;
|
||||||
|
Some(n) = self.bi.next().fuse() => (Some(n), None),
|
||||||
|
Some(n) = self.streams_r.recv().fuse() => (None, Some(n)),
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(remote_stream) = a {
|
||||||
|
match remote_stream {
|
||||||
|
Ok((sendstream, recvstream)) => {
|
||||||
|
//FIXME TODO
|
||||||
|
let id = Some(0); // get real ID
|
||||||
|
drop(sendstream); // not drop it!
|
||||||
|
spawn_new(recvstream, id, &self.streams_s);
|
||||||
|
},
|
||||||
|
Err(_) => return Err(ProtocolError::Closed),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(data) = b {
|
||||||
|
break data;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let r = match result {
|
||||||
Ok(Some(0)) => Err(ProtocolError::Closed),
|
Ok(Some(0)) => Err(ProtocolError::Closed),
|
||||||
Ok(Some(n)) => Ok(QuicDataFormat{stream: QuicDataFormatStream::Main, data: self.buffer.split_to(n)}),
|
Ok(Some(n)) => Ok(QuicDataFormat {
|
||||||
|
stream: match id {
|
||||||
|
Some(id) => QuicDataFormatStream::Reliable(id),
|
||||||
|
None => QuicDataFormatStream::Main,
|
||||||
|
},
|
||||||
|
data: buffer.split_to(n),
|
||||||
|
}),
|
||||||
Ok(None) => Err(ProtocolError::Closed),
|
Ok(None) => Err(ProtocolError::Closed),
|
||||||
Err(_) => Err(ProtocolError::Closed),
|
Err(_) => Err(ProtocolError::Closed),
|
||||||
}
|
}?;
|
||||||
|
|
||||||
|
|
||||||
|
let streams_s_clone = self.streams_s.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
buffer.resize(1500, 0u8);
|
||||||
|
let r = recvstream.read(&mut buffer).await;
|
||||||
|
let _ = streams_s_clone.send((buffer, r, recvstream, id));
|
||||||
|
});
|
||||||
|
Ok(r)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,14 +13,14 @@
|
|||||||
//! Say you have an application that wants to communicate with other application
|
//! Say you have an application that wants to communicate with other application
|
||||||
//! over a Network or on the same computer. Now each application instances the
|
//! over a Network or on the same computer. Now each application instances the
|
||||||
//! struct [`Network`] once with a new [`Pid`]. The Pid is necessary to identify
|
//! struct [`Network`] once with a new [`Pid`]. The Pid is necessary to identify
|
||||||
//! other [`Networks`] over the network protocols (e.g. TCP, UDP)
|
//! other [`Networks`] over the network protocols (e.g. TCP, UDP, QUIC, MPSC)
|
||||||
//!
|
//!
|
||||||
//! To connect to another application, you must know it's [`ProtocolAddr`]. One
|
//! To connect to another application, you must know it's [`ConnectAddr`]. One
|
||||||
//! side will call [`connect`], the other [`connected`]. If successful both
|
//! side will call [`connect`], the other [`connected`]. If successful both
|
||||||
//! applications will now get a [`Participant`].
|
//! applications will now get a [`Participant`].
|
||||||
//!
|
//!
|
||||||
//! This [`Participant`] represents the connection between those 2 applications.
|
//! This [`Participant`] represents the connection between those 2 applications.
|
||||||
//! over the respective [`ProtocolAddr`] and with it the chosen network
|
//! over the respective [`ConnectAddr`] and with it the chosen network
|
||||||
//! protocol. However messages can't be send directly via [`Participants`],
|
//! protocol. However messages can't be send directly via [`Participants`],
|
||||||
//! instead you must open a [`Stream`] on it. Like above, one side has to call
|
//! instead you must open a [`Stream`] on it. Like above, one side has to call
|
||||||
//! [`open`], the other [`opened`]. [`Streams`] can have a different priority
|
//! [`open`], the other [`opened`]. [`Streams`] can have a different priority
|
||||||
@ -41,14 +41,14 @@
|
|||||||
//! ```rust
|
//! ```rust
|
||||||
//! use std::sync::Arc;
|
//! use std::sync::Arc;
|
||||||
//! use tokio::{join, runtime::Runtime, time::sleep};
|
//! use tokio::{join, runtime::Runtime, time::sleep};
|
||||||
//! use veloren_network::{Network, Pid, Promises, ProtocolAddr};
|
//! use veloren_network::{ConnectAddr, ListenAddr, Network, Pid, Promises};
|
||||||
//!
|
//!
|
||||||
//! // Client
|
//! // Client
|
||||||
//! async fn client(runtime: &Runtime) -> std::result::Result<(), Box<dyn std::error::Error>> {
|
//! async fn client(runtime: &Runtime) -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
//! sleep(std::time::Duration::from_secs(1)).await; // `connect` MUST be after `listen`
|
//! sleep(std::time::Duration::from_secs(1)).await; // `connect` MUST be after `listen`
|
||||||
//! let client_network = Network::new(Pid::new(), runtime);
|
//! let client_network = Network::new(Pid::new(), runtime);
|
||||||
//! let server = client_network
|
//! let server = client_network
|
||||||
//! .connect(ProtocolAddr::Tcp("127.0.0.1:12345".parse().unwrap()))
|
//! .connect(ConnectAddr::Tcp("127.0.0.1:12345".parse().unwrap()))
|
||||||
//! .await?;
|
//! .await?;
|
||||||
//! let mut stream = server
|
//! let mut stream = server
|
||||||
//! .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
|
//! .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
|
||||||
@ -61,7 +61,7 @@
|
|||||||
//! async fn server(runtime: &Runtime) -> std::result::Result<(), Box<dyn std::error::Error>> {
|
//! async fn server(runtime: &Runtime) -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
//! let server_network = Network::new(Pid::new(), runtime);
|
//! let server_network = Network::new(Pid::new(), runtime);
|
||||||
//! server_network
|
//! server_network
|
||||||
//! .listen(ProtocolAddr::Tcp("127.0.0.1:12345".parse().unwrap()))
|
//! .listen(ListenAddr::Tcp("127.0.0.1:12345".parse().unwrap()))
|
||||||
//! .await?;
|
//! .await?;
|
||||||
//! let client = server_network.connected().await?;
|
//! let client = server_network.connected().await?;
|
||||||
//! let mut stream = client.opened().await?;
|
//! let mut stream = client.opened().await?;
|
||||||
@ -95,7 +95,8 @@
|
|||||||
//! [`send`]: crate::api::Stream::send
|
//! [`send`]: crate::api::Stream::send
|
||||||
//! [`recv`]: crate::api::Stream::recv
|
//! [`recv`]: crate::api::Stream::recv
|
||||||
//! [`Pid`]: network_protocol::Pid
|
//! [`Pid`]: network_protocol::Pid
|
||||||
//! [`ProtocolAddr`]: crate::api::ProtocolAddr
|
//! [`ListenAddr`]: crate::api::ListenAddr
|
||||||
|
//! [`ConnectAddr`]: crate::api::ConnectAddr
|
||||||
//! [`Promises`]: network_protocol::Promises
|
//! [`Promises`]: network_protocol::Promises
|
||||||
|
|
||||||
mod api;
|
mod api;
|
||||||
@ -107,8 +108,8 @@ mod scheduler;
|
|||||||
mod util;
|
mod util;
|
||||||
|
|
||||||
pub use api::{
|
pub use api::{
|
||||||
Network, NetworkConnectError, NetworkError, Participant, ParticipantError, ProtocolAddr,
|
ConnectAddr, ListenAddr, Network, NetworkConnectError, NetworkError, Participant,
|
||||||
Stream, StreamError, StreamParams,
|
ParticipantError, Stream, StreamError, StreamParams,
|
||||||
};
|
};
|
||||||
pub use message::Message;
|
pub use message::Message;
|
||||||
pub use network_protocol::{InitProtocolError, Pid, Promises};
|
pub use network_protocol::{InitProtocolError, Pid, Promises};
|
||||||
|
@ -70,7 +70,7 @@ impl Message {
|
|||||||
///
|
///
|
||||||
/// # Example
|
/// # Example
|
||||||
/// ```
|
/// ```
|
||||||
/// # use veloren_network::{Network, ProtocolAddr, Pid};
|
/// # use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
|
||||||
/// # use veloren_network::Promises;
|
/// # use veloren_network::Promises;
|
||||||
/// # use tokio::runtime::Runtime;
|
/// # use tokio::runtime::Runtime;
|
||||||
/// # use std::sync::Arc;
|
/// # use std::sync::Arc;
|
||||||
@ -81,8 +81,8 @@ impl Message {
|
|||||||
/// # let network = Network::new(Pid::new(), &runtime);
|
/// # let network = Network::new(Pid::new(), &runtime);
|
||||||
/// # let remote = Network::new(Pid::new(), &runtime);
|
/// # let remote = Network::new(Pid::new(), &runtime);
|
||||||
/// # runtime.block_on(async {
|
/// # runtime.block_on(async {
|
||||||
/// # network.listen(ProtocolAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?;
|
/// # network.listen(ListenAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?;
|
||||||
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?;
|
/// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?;
|
||||||
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
|
||||||
/// # stream_p.send("Hello World");
|
/// # stream_p.send("Hello World");
|
||||||
/// # let participant_a = network.connected().await?;
|
/// # let participant_a = network.connected().await?;
|
||||||
|
@ -1,8 +1,29 @@
|
|||||||
use crate::api::ProtocolAddr;
|
use crate::api::{ConnectAddr, ListenAddr};
|
||||||
use network_protocol::{Cid, Pid};
|
use network_protocol::{Cid, Pid};
|
||||||
#[cfg(feature = "metrics")]
|
#[cfg(feature = "metrics")]
|
||||||
use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
|
use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
|
||||||
use std::error::Error;
|
use std::{error::Error, net::SocketAddr};
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
|
||||||
|
pub(crate) enum ProtocolInfo {
|
||||||
|
Tcp(SocketAddr),
|
||||||
|
Udp(SocketAddr),
|
||||||
|
#[cfg(feature = "quic")]
|
||||||
|
Quic(SocketAddr),
|
||||||
|
Mpsc(u64),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<ListenAddr> for ProtocolInfo {
|
||||||
|
fn from(other: ListenAddr) -> ProtocolInfo {
|
||||||
|
match other {
|
||||||
|
ListenAddr::Tcp(s) => ProtocolInfo::Tcp(s),
|
||||||
|
ListenAddr::Udp(s) => ProtocolInfo::Udp(s),
|
||||||
|
#[cfg(feature = "quic")]
|
||||||
|
ListenAddr::Quic(s, _) => ProtocolInfo::Quic(s),
|
||||||
|
ListenAddr::Mpsc(s) => ProtocolInfo::Mpsc(s),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// 1:1 relation between NetworkMetrics and Network
|
/// 1:1 relation between NetworkMetrics and Network
|
||||||
#[cfg(feature = "metrics")]
|
#[cfg(feature = "metrics")]
|
||||||
@ -154,9 +175,9 @@ impl NetworkMetrics {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn connect_requests_cache(&self, protocol: &ProtocolAddr) -> prometheus::IntCounter {
|
pub(crate) fn connect_requests_cache(&self, protocol: &ListenAddr) -> prometheus::IntCounter {
|
||||||
self.incoming_connections_total
|
self.incoming_connections_total
|
||||||
.with_label_values(&[protocol_name(protocol)])
|
.with_label_values(&[protocollisten_name(protocol)])
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn channels_connected(&self, remote_p: &str, no: usize, cid: Cid) {
|
pub(crate) fn channels_connected(&self, remote_p: &str, no: usize, cid: Cid) {
|
||||||
@ -192,15 +213,15 @@ impl NetworkMetrics {
|
|||||||
.inc();
|
.inc();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn listen_request(&self, protocol: &ProtocolAddr) {
|
pub(crate) fn listen_request(&self, protocol: &ListenAddr) {
|
||||||
self.listen_requests_total
|
self.listen_requests_total
|
||||||
.with_label_values(&[protocol_name(protocol)])
|
.with_label_values(&[protocollisten_name(protocol)])
|
||||||
.inc();
|
.inc();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn connect_request(&self, protocol: &ProtocolAddr) {
|
pub(crate) fn connect_request(&self, protocol: &ConnectAddr) {
|
||||||
self.connect_requests_total
|
self.connect_requests_total
|
||||||
.with_label_values(&[protocol_name(protocol)])
|
.with_label_values(&[protocolconnect_name(protocol)])
|
||||||
.inc();
|
.inc();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -225,11 +246,22 @@ impl NetworkMetrics {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
#[cfg(feature = "metrics")]
|
||||||
fn protocol_name(protocol: &ProtocolAddr) -> &str {
|
fn protocolconnect_name(protocol: &ConnectAddr) -> &str {
|
||||||
match protocol {
|
match protocol {
|
||||||
ProtocolAddr::Tcp(_) => "tcp",
|
ConnectAddr::Tcp(_) => "tcp",
|
||||||
ProtocolAddr::Udp(_) => "udp",
|
ConnectAddr::Udp(_) => "udp",
|
||||||
ProtocolAddr::Mpsc(_) => "mpsc",
|
ConnectAddr::Mpsc(_) => "mpsc",
|
||||||
|
ConnectAddr::Quic(_, _, _) => "quic",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "metrics")]
|
||||||
|
fn protocollisten_name(protocol: &ListenAddr) -> &str {
|
||||||
|
match protocol {
|
||||||
|
ListenAddr::Tcp(_) => "tcp",
|
||||||
|
ListenAddr::Udp(_) => "udp",
|
||||||
|
ListenAddr::Mpsc(_) => "mpsc",
|
||||||
|
ListenAddr::Quic(_, _) => "quic",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -247,9 +279,9 @@ impl NetworkMetrics {
|
|||||||
|
|
||||||
pub(crate) fn streams_closed(&self, _remote_p: &str) {}
|
pub(crate) fn streams_closed(&self, _remote_p: &str) {}
|
||||||
|
|
||||||
pub(crate) fn listen_request(&self, _protocol: &ProtocolAddr) {}
|
pub(crate) fn listen_request(&self, _protocol: &ListenAddr) {}
|
||||||
|
|
||||||
pub(crate) fn connect_request(&self, _protocol: &ProtocolAddr) {}
|
pub(crate) fn connect_request(&self, _protocol: &ConnectAddr) {}
|
||||||
|
|
||||||
pub(crate) fn cleanup_participant(&self, _remote_p: &str) {}
|
pub(crate) fn cleanup_participant(&self, _remote_p: &str) {}
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
api::{NetworkConnectError, Participant, ProtocolAddr},
|
api::{ConnectAddr, ListenAddr, NetworkConnectError, Participant},
|
||||||
channel::Protocols,
|
channel::Protocols,
|
||||||
metrics::NetworkMetrics,
|
metrics::{NetworkMetrics, ProtocolInfo},
|
||||||
participant::{B2sPrioStatistic, BParticipant, S2bCreateChannel, S2bShutdownBparticipant},
|
participant::{B2sPrioStatistic, BParticipant, S2bCreateChannel, S2bShutdownBparticipant},
|
||||||
};
|
};
|
||||||
use futures_util::{FutureExt, StreamExt};
|
use futures_util::{FutureExt, StreamExt};
|
||||||
@ -46,9 +46,9 @@ struct ParticipantInfo {
|
|||||||
s2b_shutdown_bparticipant_s: Option<oneshot::Sender<S2bShutdownBparticipant>>,
|
s2b_shutdown_bparticipant_s: Option<oneshot::Sender<S2bShutdownBparticipant>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
type A2sListen = (ProtocolAddr, oneshot::Sender<io::Result<()>>);
|
type A2sListen = (ListenAddr, oneshot::Sender<io::Result<()>>);
|
||||||
pub(crate) type A2sConnect = (
|
pub(crate) type A2sConnect = (
|
||||||
ProtocolAddr,
|
ConnectAddr,
|
||||||
oneshot::Sender<Result<Participant, NetworkConnectError>>,
|
oneshot::Sender<Result<Participant, NetworkConnectError>>,
|
||||||
);
|
);
|
||||||
type A2sDisconnect = (Pid, S2bShutdownBparticipant);
|
type A2sDisconnect = (Pid, S2bShutdownBparticipant);
|
||||||
@ -82,7 +82,7 @@ pub struct Scheduler {
|
|||||||
participant_channels: Arc<Mutex<Option<ParticipantChannels>>>,
|
participant_channels: Arc<Mutex<Option<ParticipantChannels>>>,
|
||||||
participants: Arc<Mutex<HashMap<Pid, ParticipantInfo>>>,
|
participants: Arc<Mutex<HashMap<Pid, ParticipantInfo>>>,
|
||||||
channel_ids: Arc<AtomicU64>,
|
channel_ids: Arc<AtomicU64>,
|
||||||
channel_listener: Mutex<HashMap<ProtocolAddr, oneshot::Sender<()>>>,
|
channel_listener: Mutex<HashMap<ProtocolInfo, oneshot::Sender<()>>>,
|
||||||
metrics: Arc<NetworkMetrics>,
|
metrics: Arc<NetworkMetrics>,
|
||||||
protocol_metrics: Arc<ProtocolMetrics>,
|
protocol_metrics: Arc<ProtocolMetrics>,
|
||||||
}
|
}
|
||||||
@ -182,7 +182,7 @@ impl Scheduler {
|
|||||||
self.channel_listener
|
self.channel_listener
|
||||||
.lock()
|
.lock()
|
||||||
.await
|
.await
|
||||||
.insert(address.clone(), end_sender);
|
.insert(address.clone().into(), end_sender);
|
||||||
self.channel_creator(address, end_receiver, s2a_listen_result_s)
|
self.channel_creator(address, end_receiver, s2a_listen_result_s)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
@ -198,7 +198,7 @@ impl Scheduler {
|
|||||||
let metrics = Arc::clone(&self.protocol_metrics);
|
let metrics = Arc::clone(&self.protocol_metrics);
|
||||||
self.metrics.connect_request(&addr);
|
self.metrics.connect_request(&addr);
|
||||||
let (protocol, handshake) = match addr {
|
let (protocol, handshake) = match addr {
|
||||||
ProtocolAddr::Tcp(addr) => {
|
ConnectAddr::Tcp(addr) => {
|
||||||
let stream = match net::TcpStream::connect(addr).await {
|
let stream = match net::TcpStream::connect(addr).await {
|
||||||
Ok(stream) => stream,
|
Ok(stream) => stream,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@ -209,7 +209,21 @@ impl Scheduler {
|
|||||||
info!("Connecting Tcp to: {}", stream.peer_addr().unwrap());
|
info!("Connecting Tcp to: {}", stream.peer_addr().unwrap());
|
||||||
(Protocols::new_tcp(stream, cid, metrics), false)
|
(Protocols::new_tcp(stream, cid, metrics), false)
|
||||||
},
|
},
|
||||||
ProtocolAddr::Mpsc(addr) => {
|
#[cfg(feature = "quic")]
|
||||||
|
ConnectAddr::Quic(addr, ref config, name) => {
|
||||||
|
let config = config.clone();
|
||||||
|
let endpoint = quinn::Endpoint::builder();
|
||||||
|
let (endpoint, _) = endpoint.bind(&"[::]:0".parse().unwrap()).expect("FIXME");
|
||||||
|
|
||||||
|
let connecting = endpoint.connect_with(config, &addr, &name).expect("FIXME");
|
||||||
|
let connection = connecting.await.expect("FIXME");
|
||||||
|
(
|
||||||
|
Protocols::new_quic(connection, false, cid, metrics).await.unwrap(),
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
//pid_sender.send(Ok(())).unwrap();
|
||||||
|
},
|
||||||
|
ConnectAddr::Mpsc(addr) => {
|
||||||
let mpsc_s = match MPSC_POOL.lock().await.get(&addr) {
|
let mpsc_s = match MPSC_POOL.lock().await.get(&addr) {
|
||||||
Some(s) => s.clone(),
|
Some(s) => s.clone(),
|
||||||
None => {
|
None => {
|
||||||
@ -236,7 +250,7 @@ impl Scheduler {
|
|||||||
)
|
)
|
||||||
},
|
},
|
||||||
/* */
|
/* */
|
||||||
//ProtocolAddr::Udp(addr) => {
|
//ProtocolConnectAddr::Udp(addr) => {
|
||||||
//#[cfg(feature = "metrics")]
|
//#[cfg(feature = "metrics")]
|
||||||
//self.metrics
|
//self.metrics
|
||||||
//.connect_requests_total
|
//.connect_requests_total
|
||||||
@ -386,7 +400,7 @@ impl Scheduler {
|
|||||||
|
|
||||||
async fn channel_creator(
|
async fn channel_creator(
|
||||||
&self,
|
&self,
|
||||||
addr: ProtocolAddr,
|
addr: ListenAddr,
|
||||||
s2s_stop_listening_r: oneshot::Receiver<()>,
|
s2s_stop_listening_r: oneshot::Receiver<()>,
|
||||||
s2a_listen_result_s: oneshot::Sender<io::Result<()>>,
|
s2a_listen_result_s: oneshot::Sender<io::Result<()>>,
|
||||||
) {
|
) {
|
||||||
@ -394,7 +408,7 @@ impl Scheduler {
|
|||||||
#[cfg(feature = "metrics")]
|
#[cfg(feature = "metrics")]
|
||||||
let mcache = self.metrics.connect_requests_cache(&addr);
|
let mcache = self.metrics.connect_requests_cache(&addr);
|
||||||
match addr {
|
match addr {
|
||||||
ProtocolAddr::Tcp(addr) => {
|
ListenAddr::Tcp(addr) => {
|
||||||
let listener = match net::TcpListener::bind(addr).await {
|
let listener = match net::TcpListener::bind(addr).await {
|
||||||
Ok(listener) => {
|
Ok(listener) => {
|
||||||
s2a_listen_result_s.send(Ok(())).unwrap();
|
s2a_listen_result_s.send(Ok(())).unwrap();
|
||||||
@ -432,10 +446,10 @@ impl Scheduler {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
#[cfg(feature = "quic")]
|
#[cfg(feature = "quic")]
|
||||||
ProtocolAddr::Quic(addr, server_config) => {
|
ListenAddr::Quic(addr, ref server_config) => {
|
||||||
let mut endpoint = quinn::Endpoint::builder();
|
let mut endpoint = quinn::Endpoint::builder();
|
||||||
endpoint.listen(server_config);
|
endpoint.listen(server_config.clone());
|
||||||
let (endpoint, mut listener) = match endpoint.bind(&addr) {
|
let (_endpoint, mut listener) = match endpoint.bind(&addr) {
|
||||||
Ok((endpoint, listener)) => {
|
Ok((endpoint, listener)) => {
|
||||||
s2a_listen_result_s.send(Ok(())).unwrap();
|
s2a_listen_result_s.send(Ok(())).unwrap();
|
||||||
(endpoint, listener)
|
(endpoint, listener)
|
||||||
@ -468,11 +482,18 @@ impl Scheduler {
|
|||||||
mcache.inc();
|
mcache.inc();
|
||||||
let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed);
|
let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed);
|
||||||
info!(?remote_addr, ?cid, "Accepting Quic from");
|
info!(?remote_addr, ?cid, "Accepting Quic from");
|
||||||
self.init_protocol(Protocols::new_quic(connection, cid, Arc::clone(&self.protocol_metrics)), cid, None, true)
|
let quic = match Protocols::new_quic(connection, true, cid, Arc::clone(&self.protocol_metrics)).await {
|
||||||
|
Ok(quic) => quic,
|
||||||
|
Err(e) => {
|
||||||
|
trace!(?e, "failed to start quic");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
self.init_protocol(quic, cid, None, true)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
ProtocolAddr::Mpsc(addr) => {
|
ListenAddr::Mpsc(addr) => {
|
||||||
let (mpsc_s, mut mpsc_r) = mpsc::unbounded_channel();
|
let (mpsc_s, mut mpsc_r) = mpsc::unbounded_channel();
|
||||||
MPSC_POOL.lock().await.insert(addr, mpsc_s);
|
MPSC_POOL.lock().await.insert(addr, mpsc_s);
|
||||||
s2a_listen_result_s.send(Ok(())).unwrap();
|
s2a_listen_result_s.send(Ok(())).unwrap();
|
||||||
@ -494,7 +515,7 @@ impl Scheduler {
|
|||||||
}
|
}
|
||||||
warn!("MpscStream Failed, stopping");
|
warn!("MpscStream Failed, stopping");
|
||||||
},/*
|
},/*
|
||||||
ProtocolAddr::Udp(addr) => {
|
ProtocolListenAddr::Udp(addr) => {
|
||||||
let socket = match net::UdpSocket::bind(addr).await {
|
let socket = match net::UdpSocket::bind(addr).await {
|
||||||
Ok(socket) => {
|
Ok(socket) => {
|
||||||
s2a_listen_result_s.send(Ok(())).unwrap();
|
s2a_listen_result_s.send(Ok(())).unwrap();
|
||||||
|
@ -347,8 +347,8 @@ fn open_participant_before_remote_part_is_closed() {
|
|||||||
let n_a = Network::new(Pid::fake(0), &r);
|
let n_a = Network::new(Pid::fake(0), &r);
|
||||||
let n_b = Network::new(Pid::fake(1), &r);
|
let n_b = Network::new(Pid::fake(1), &r);
|
||||||
let addr = tcp();
|
let addr = tcp();
|
||||||
r.block_on(n_a.listen(addr.clone())).unwrap();
|
r.block_on(n_a.listen(addr.0)).unwrap();
|
||||||
let p_b = r.block_on(n_b.connect(addr)).unwrap();
|
let p_b = r.block_on(n_b.connect(addr.1)).unwrap();
|
||||||
let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap();
|
let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap();
|
||||||
s1_b.send("HelloWorld").unwrap();
|
s1_b.send("HelloWorld").unwrap();
|
||||||
let p_a = r.block_on(n_a.connected()).unwrap();
|
let p_a = r.block_on(n_a.connected()).unwrap();
|
||||||
@ -367,8 +367,8 @@ fn open_participant_after_remote_part_is_closed() {
|
|||||||
let n_a = Network::new(Pid::fake(0), &r);
|
let n_a = Network::new(Pid::fake(0), &r);
|
||||||
let n_b = Network::new(Pid::fake(1), &r);
|
let n_b = Network::new(Pid::fake(1), &r);
|
||||||
let addr = tcp();
|
let addr = tcp();
|
||||||
r.block_on(n_a.listen(addr.clone())).unwrap();
|
r.block_on(n_a.listen(addr.0)).unwrap();
|
||||||
let p_b = r.block_on(n_b.connect(addr)).unwrap();
|
let p_b = r.block_on(n_b.connect(addr.1)).unwrap();
|
||||||
let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap();
|
let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap();
|
||||||
s1_b.send("HelloWorld").unwrap();
|
s1_b.send("HelloWorld").unwrap();
|
||||||
drop(s1_b);
|
drop(s1_b);
|
||||||
@ -387,8 +387,8 @@ fn close_network_scheduler_completely() {
|
|||||||
let n_a = Network::new(Pid::fake(0), &r);
|
let n_a = Network::new(Pid::fake(0), &r);
|
||||||
let n_b = Network::new(Pid::fake(1), &r);
|
let n_b = Network::new(Pid::fake(1), &r);
|
||||||
let addr = tcp();
|
let addr = tcp();
|
||||||
r.block_on(n_a.listen(addr.clone())).unwrap();
|
r.block_on(n_a.listen(addr.0)).unwrap();
|
||||||
let p_b = r.block_on(n_b.connect(addr)).unwrap();
|
let p_b = r.block_on(n_b.connect(addr.1)).unwrap();
|
||||||
let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap();
|
let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap();
|
||||||
s1_b.send("HelloWorld").unwrap();
|
s1_b.send("HelloWorld").unwrap();
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@ use std::{
|
|||||||
use tokio::runtime::Runtime;
|
use tokio::runtime::Runtime;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
use tracing_subscriber::EnvFilter;
|
use tracing_subscriber::EnvFilter;
|
||||||
use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr, Stream};
|
use veloren_network::{ConnectAddr, ListenAddr, Network, Participant, Pid, Promises, Stream};
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) {
|
pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) {
|
||||||
@ -47,7 +47,7 @@ pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) {
|
|||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn network_participant_stream(
|
pub fn network_participant_stream(
|
||||||
addr: ProtocolAddr,
|
addr: (ListenAddr, ConnectAddr),
|
||||||
) -> (
|
) -> (
|
||||||
Arc<Runtime>,
|
Arc<Runtime>,
|
||||||
Network,
|
Network,
|
||||||
@ -62,11 +62,11 @@ pub fn network_participant_stream(
|
|||||||
let n_a = Network::new(Pid::fake(0), &runtime);
|
let n_a = Network::new(Pid::fake(0), &runtime);
|
||||||
let n_b = Network::new(Pid::fake(1), &runtime);
|
let n_b = Network::new(Pid::fake(1), &runtime);
|
||||||
|
|
||||||
n_a.listen(addr.clone()).await.unwrap();
|
n_a.listen(addr.0).await.unwrap();
|
||||||
let p1_b = n_b.connect(addr).await.unwrap();
|
let p1_b = n_b.connect(addr.1).await.unwrap();
|
||||||
let p1_a = n_a.connected().await.unwrap();
|
let p1_a = n_a.connected().await.unwrap();
|
||||||
|
|
||||||
let s1_a = p1_a.open(4, Promises::empty(), 0).await.unwrap();
|
let s1_a = p1_a.open(4, Promises::ORDERED, 0).await.unwrap();
|
||||||
let s1_b = p1_b.opened().await.unwrap();
|
let s1_b = p1_b.opened().await.unwrap();
|
||||||
|
|
||||||
(n_a, p1_a, s1_a, n_b, p1_b, s1_b)
|
(n_a, p1_a, s1_a, n_b, p1_b, s1_b)
|
||||||
@ -75,28 +75,76 @@ pub fn network_participant_stream(
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn tcp() -> ProtocolAddr {
|
pub fn tcp() -> (ListenAddr, ConnectAddr) {
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref PORTS: AtomicU16 = AtomicU16::new(5000);
|
static ref PORTS: AtomicU16 = AtomicU16::new(5000);
|
||||||
}
|
}
|
||||||
let port = PORTS.fetch_add(1, Ordering::Relaxed);
|
let port = PORTS.fetch_add(1, Ordering::Relaxed);
|
||||||
ProtocolAddr::Tcp(SocketAddr::from(([127, 0, 0, 1], port)))
|
(
|
||||||
|
ListenAddr::Tcp(SocketAddr::from(([127, 0, 0, 1], port))),
|
||||||
|
ConnectAddr::Tcp(SocketAddr::from(([127, 0, 0, 1], port))),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
static ref UDP_PORTS: AtomicU16 = AtomicU16::new(5000);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn udp() -> ProtocolAddr {
|
pub fn quic() -> (ListenAddr, ConnectAddr) {
|
||||||
lazy_static! {
|
const LOCALHOST: &str = "localhost";
|
||||||
static ref PORTS: AtomicU16 = AtomicU16::new(5000);
|
let port = UDP_PORTS.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
|
||||||
let port = PORTS.fetch_add(1, Ordering::Relaxed);
|
let transport_config = quinn::TransportConfig::default();
|
||||||
ProtocolAddr::Udp(SocketAddr::from(([127, 0, 0, 1], port)))
|
let mut server_config = quinn::ServerConfig::default();
|
||||||
|
server_config.transport = Arc::new(transport_config);
|
||||||
|
let mut server_config = quinn::ServerConfigBuilder::new(server_config);
|
||||||
|
server_config.protocols(&[b"veloren"]);
|
||||||
|
|
||||||
|
trace!("generating self-signed certificate");
|
||||||
|
let cert = rcgen::generate_simple_self_signed(vec![LOCALHOST.into()]).unwrap();
|
||||||
|
let key = cert.serialize_private_key_der();
|
||||||
|
let cert = cert.serialize_der().unwrap();
|
||||||
|
|
||||||
|
let key = quinn::PrivateKey::from_der(&key).expect("private key failed");
|
||||||
|
let cert = quinn::Certificate::from_der(&cert).expect("cert failed");
|
||||||
|
server_config
|
||||||
|
.certificate(quinn::CertificateChain::from_certs(vec![cert.clone()]), key)
|
||||||
|
.expect("set cert failed");
|
||||||
|
|
||||||
|
let server_config = server_config.build();
|
||||||
|
|
||||||
|
let mut client_config = quinn::ClientConfigBuilder::default();
|
||||||
|
client_config.protocols(&[b"veloren"]);
|
||||||
|
client_config
|
||||||
|
.add_certificate_authority(cert)
|
||||||
|
.expect("adding certificate failed");
|
||||||
|
|
||||||
|
let client_config = client_config.build();
|
||||||
|
(
|
||||||
|
ListenAddr::Quic(SocketAddr::from(([127, 0, 0, 1], port)), server_config),
|
||||||
|
ConnectAddr::Quic(
|
||||||
|
SocketAddr::from(([127, 0, 0, 1], port)),
|
||||||
|
client_config,
|
||||||
|
LOCALHOST.to_owned(),
|
||||||
|
),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn mpsc() -> ProtocolAddr {
|
pub fn udp() -> (ListenAddr, ConnectAddr) {
|
||||||
|
let port = UDP_PORTS.fetch_add(1, Ordering::Relaxed);
|
||||||
|
(
|
||||||
|
ListenAddr::Udp(SocketAddr::from(([127, 0, 0, 1], port))),
|
||||||
|
ConnectAddr::Udp(SocketAddr::from(([127, 0, 0, 1], port))),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub fn mpsc() -> (ListenAddr, ConnectAddr) {
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref PORTS: AtomicU64 = AtomicU64::new(5000);
|
static ref PORTS: AtomicU64 = AtomicU64::new(5000);
|
||||||
}
|
}
|
||||||
let port = PORTS.fetch_add(1, Ordering::Relaxed);
|
let port = PORTS.fetch_add(1, Ordering::Relaxed);
|
||||||
ProtocolAddr::Mpsc(port)
|
(ListenAddr::Mpsc(port), ConnectAddr::Mpsc(port))
|
||||||
}
|
}
|
||||||
|
@ -2,9 +2,9 @@ use std::sync::Arc;
|
|||||||
use tokio::runtime::Runtime;
|
use tokio::runtime::Runtime;
|
||||||
use veloren_network::{NetworkError, StreamError};
|
use veloren_network::{NetworkError, StreamError};
|
||||||
mod helper;
|
mod helper;
|
||||||
use helper::{mpsc, network_participant_stream, tcp, udp};
|
use helper::{mpsc, network_participant_stream, quic, tcp, udp};
|
||||||
use std::io::ErrorKind;
|
use std::io::ErrorKind;
|
||||||
use veloren_network::{Network, Pid, Promises, ProtocolAddr};
|
use veloren_network::{ConnectAddr, ListenAddr, Network, Pid, Promises};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
@ -73,6 +73,30 @@ fn stream_simple_mpsc_3msg() {
|
|||||||
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
|
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn stream_simple_quic() {
|
||||||
|
let (_, _) = helper::setup(false, 0);
|
||||||
|
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(quic());
|
||||||
|
|
||||||
|
s1_a.send("Hello World").unwrap();
|
||||||
|
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
|
||||||
|
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn stream_simple_quic_3msg() {
|
||||||
|
let (_, _) = helper::setup(true, 0);
|
||||||
|
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(quic());
|
||||||
|
|
||||||
|
s1_a.send("Hello World").unwrap();
|
||||||
|
s1_a.send(1337).unwrap();
|
||||||
|
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
|
||||||
|
assert_eq!(r.block_on(s1_b.recv()), Ok(1337));
|
||||||
|
s1_a.send("3rdMessage").unwrap();
|
||||||
|
assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
|
||||||
|
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
fn stream_simple_udp() {
|
fn stream_simple_udp() {
|
||||||
@ -110,16 +134,16 @@ fn tcp_and_udp_2_connections() -> std::result::Result<(), Box<dyn std::error::Er
|
|||||||
let network = network;
|
let network = network;
|
||||||
let remote = remote;
|
let remote = remote;
|
||||||
remote
|
remote
|
||||||
.listen(ProtocolAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
|
.listen(ListenAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
|
||||||
.await?;
|
.await?;
|
||||||
remote
|
remote
|
||||||
.listen(ProtocolAddr::Udp("127.0.0.1:2001".parse().unwrap()))
|
.listen(ListenAddr::Udp("127.0.0.1:2001".parse().unwrap()))
|
||||||
.await?;
|
.await?;
|
||||||
let p1 = network
|
let p1 = network
|
||||||
.connect(ProtocolAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
|
.connect(ConnectAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
|
||||||
.await?;
|
.await?;
|
||||||
let p2 = network
|
let p2 = network
|
||||||
.connect(ProtocolAddr::Udp("127.0.0.1:2001".parse().unwrap()))
|
.connect(ConnectAddr::Udp("127.0.0.1:2001".parse().unwrap()))
|
||||||
.await?;
|
.await?;
|
||||||
assert_eq!(&p1, &p2);
|
assert_eq!(&p1, &p2);
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -134,13 +158,13 @@ fn failed_listen_on_used_ports() -> std::result::Result<(), Box<dyn std::error::
|
|||||||
let network = Network::new(Pid::new(), &r);
|
let network = Network::new(Pid::new(), &r);
|
||||||
let udp1 = udp();
|
let udp1 = udp();
|
||||||
let tcp1 = tcp();
|
let tcp1 = tcp();
|
||||||
r.block_on(network.listen(udp1.clone()))?;
|
r.block_on(network.listen(udp1.0.clone()))?;
|
||||||
r.block_on(network.listen(tcp1.clone()))?;
|
r.block_on(network.listen(tcp1.0.clone()))?;
|
||||||
std::thread::sleep(std::time::Duration::from_millis(200));
|
std::thread::sleep(std::time::Duration::from_millis(200));
|
||||||
|
|
||||||
let network2 = Network::new(Pid::new(), &r);
|
let network2 = Network::new(Pid::new(), &r);
|
||||||
let e1 = r.block_on(network2.listen(udp1));
|
let e1 = r.block_on(network2.listen(udp1.0));
|
||||||
let e2 = r.block_on(network2.listen(tcp1));
|
let e2 = r.block_on(network2.listen(tcp1.0));
|
||||||
match e1 {
|
match e1 {
|
||||||
Err(NetworkError::ListenFailed(e)) if e.kind() == ErrorKind::AddrInUse => (),
|
Err(NetworkError::ListenFailed(e)) if e.kind() == ErrorKind::AddrInUse => (),
|
||||||
_ => panic!(),
|
_ => panic!(),
|
||||||
@ -170,10 +194,10 @@ fn api_stream_send_main() -> std::result::Result<(), Box<dyn std::error::Error>>
|
|||||||
let network = network;
|
let network = network;
|
||||||
let remote = remote;
|
let remote = remote;
|
||||||
network
|
network
|
||||||
.listen(ProtocolAddr::Tcp("127.0.0.1:1200".parse().unwrap()))
|
.listen(ListenAddr::Tcp("127.0.0.1:1200".parse().unwrap()))
|
||||||
.await?;
|
.await?;
|
||||||
let remote_p = remote
|
let remote_p = remote
|
||||||
.connect(ProtocolAddr::Tcp("127.0.0.1:1200".parse().unwrap()))
|
.connect(ConnectAddr::Tcp("127.0.0.1:1200".parse().unwrap()))
|
||||||
.await?;
|
.await?;
|
||||||
// keep it alive
|
// keep it alive
|
||||||
let _stream_p = remote_p
|
let _stream_p = remote_p
|
||||||
@ -199,10 +223,10 @@ fn api_stream_recv_main() -> std::result::Result<(), Box<dyn std::error::Error>>
|
|||||||
let network = network;
|
let network = network;
|
||||||
let remote = remote;
|
let remote = remote;
|
||||||
network
|
network
|
||||||
.listen(ProtocolAddr::Tcp("127.0.0.1:1220".parse().unwrap()))
|
.listen(ListenAddr::Tcp("127.0.0.1:1220".parse().unwrap()))
|
||||||
.await?;
|
.await?;
|
||||||
let remote_p = remote
|
let remote_p = remote
|
||||||
.connect(ProtocolAddr::Tcp("127.0.0.1:1220".parse().unwrap()))
|
.connect(ConnectAddr::Tcp("127.0.0.1:1220".parse().unwrap()))
|
||||||
.await?;
|
.await?;
|
||||||
let mut stream_p = remote_p
|
let mut stream_p = remote_p
|
||||||
.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
|
.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
|
||||||
|
@ -83,7 +83,7 @@ use common_state::plugin::PluginMgr;
|
|||||||
use common_state::{BuildAreas, State};
|
use common_state::{BuildAreas, State};
|
||||||
use common_systems::add_local_systems;
|
use common_systems::add_local_systems;
|
||||||
use metrics::{EcsSystemMetrics, PhysicsMetrics, TickMetrics};
|
use metrics::{EcsSystemMetrics, PhysicsMetrics, TickMetrics};
|
||||||
use network::{Network, Pid, ProtocolAddr};
|
use network::{ListenAddr, Network, Pid};
|
||||||
use persistence::{
|
use persistence::{
|
||||||
character_loader::{CharacterLoader, CharacterLoaderResponseKind},
|
character_loader::{CharacterLoader, CharacterLoaderResponseKind},
|
||||||
character_updater::CharacterUpdater,
|
character_updater::CharacterUpdater,
|
||||||
@ -386,8 +386,8 @@ impl Server {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
runtime.block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?;
|
runtime.block_on(network.listen(ListenAddr::Tcp(settings.gameserver_address)))?;
|
||||||
runtime.block_on(network.listen(ProtocolAddr::Mpsc(14004)))?;
|
runtime.block_on(network.listen(ListenAddr::Mpsc(14004)))?;
|
||||||
let connection_handler = ConnectionHandler::new(network, &runtime);
|
let connection_handler = ConnectionHandler::new(network, &runtime);
|
||||||
|
|
||||||
// Initiate real-time world simulation
|
// Initiate real-time world simulation
|
||||||
|
Loading…
Reference in New Issue
Block a user