get rid of async_std::channel

switch to `tokio` and `async_channel` crate.
I wanted to do tokio first, but it doesnt feature Sender::close(), thus i included async_channel
Got rid of `futures` and only need `futures_core` and `futures_util`.

Tokio does not support `Stream` and `StreamExt` so for now i need to use `tokio-stream`, i think this will go in `std` in the future

Created `b2b_close_stream_opened_sender_r` as the shutdown procedure does not need a copy of a Sender, it just need to stop it.

Various adjustments, e.g. for `select!` which now requieres a `&mut` for oneshots.

Future things to do:
 - Use some better signalling than oneshot<()> in some cases.
 - Use a Watch for the Prio propergation (impl. it ofc)
 - Use Bounded Channels in order to improve performance
 - adjust tests coding

bring tests to work
This commit is contained in:
Marcel Märtens 2021-01-15 14:04:32 +01:00
parent 1b77b6dc41
commit 5aa1940ef8
24 changed files with 571 additions and 470 deletions

66
Cargo.lock generated
View File

@ -248,6 +248,17 @@ dependencies = [
"serde_json",
]
[[package]]
name = "async-channel"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59740d83946db6a5af71ae25ddf9562c2b176b2ca42cf99a455f09f4a220d6b9"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "atom"
version = "0.3.6"
@ -452,6 +463,12 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
[[package]]
name = "cache-padded"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba"
[[package]]
name = "calloop"
version = "0.6.5"
@ -712,6 +729,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
dependencies = [
"cache-padded",
]
[[package]]
name = "conrod_core"
version = "0.63.0"
@ -1566,6 +1592,12 @@ dependencies = [
"num-traits",
]
[[package]]
name = "event-listener"
version = "2.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
[[package]]
name = "fallible-iterator"
version = "0.2.0"
@ -5119,6 +5151,17 @@ dependencies = [
"webpki",
]
[[package]]
name = "tokio-stream"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1981ad97df782ab506a1f43bf82c967326960d278acf3bf8279809648c3ff3ea"
dependencies = [
"futures-core",
"pin-project-lite 0.2.4",
"tokio 1.2.0",
]
[[package]]
name = "tokio-util"
version = "0.3.1"
@ -5462,17 +5505,6 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "uvth"
version = "4.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e5910f9106b96334c6cae1f1d77a764bda66ac4ca9f507f73259f184fe1bb6b"
dependencies = [
"crossbeam-channel 0.3.9",
"log",
"num_cpus",
]
[[package]]
name = "vcpkg"
version = "0.2.11"
@ -5543,7 +5575,7 @@ dependencies = [
"tokio 1.2.0",
"tracing",
"tracing-subscriber",
"uvth 3.1.1",
"uvth",
"vek 0.12.0",
"veloren-common",
"veloren-common-net",
@ -5688,7 +5720,7 @@ dependencies = [
"tiny_http",
"tokio 1.2.0",
"tracing",
"uvth 3.1.1",
"uvth",
"vek 0.12.0",
"veloren-common",
"veloren-common-net",
@ -5774,7 +5806,7 @@ dependencies = [
"tracing-subscriber",
"tracing-tracy",
"treeculler",
"uvth 3.1.1",
"uvth",
"vek 0.12.0",
"veloren-client",
"veloren-common",
@ -5836,11 +5868,13 @@ dependencies = [
name = "veloren_network"
version = "0.3.0"
dependencies = [
"async-channel",
"bincode",
"bitflags",
"clap",
"crossbeam-channel 0.5.0",
"futures",
"futures-core",
"futures-util",
"lazy_static",
"lz-fear",
"prometheus",
@ -5849,10 +5883,10 @@ dependencies = [
"shellexpand",
"tiny_http",
"tokio 1.2.0",
"tokio-stream",
"tracing",
"tracing-futures",
"tracing-subscriber",
"uvth 4.0.1",
]
[[package]]

View File

@ -3,7 +3,14 @@
#![deny(clippy::clone_on_ref_ptr)]
use common::{clock::Clock, comp};
use std::{io, net::ToSocketAddrs, sync::mpsc, thread, time::Duration};
use std::{
io,
net::ToSocketAddrs,
sync::{mpsc, Arc},
thread,
time::Duration,
};
use tokio::runtime::Runtime;
use tracing::{error, info};
use veloren_client::{Client, Event};
@ -37,6 +44,8 @@ fn main() {
println!("Enter your password");
let password = read_input();
let runtime = Arc::new(Runtime::new().unwrap());
// Create a client.
let mut client = Client::new(
server_addr
@ -45,6 +54,7 @@ fn main() {
.next()
.unwrap(),
None,
runtime,
)
.expect("Failed to create client instance");

View File

@ -63,8 +63,8 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use tracing::{debug, error, trace, warn};
use tokio::runtime::Runtime;
use tracing::{debug, error, trace, warn};
use uvth::{ThreadPool, ThreadPoolBuilder};
use vek::*;
@ -187,7 +187,11 @@ pub struct CharacterList {
impl Client {
/// Create a new `Client`.
pub fn new<A: Into<SocketAddr>>(addr: A, view_distance: Option<u32>, runtime: Arc<Runtime>) -> Result<Self, Error> {
pub fn new<A: Into<SocketAddr>>(
addr: A,
view_distance: Option<u32>,
runtime: Arc<Runtime>,
) -> Result<Self, Error> {
let mut thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".into())
.build();

View File

@ -20,12 +20,15 @@ serde = { version = "1.0" }
#sending
crossbeam-channel = "0.5"
tokio = { version = "1.2", default-features = false, features = ["io-util", "macros", "rt", "net", "time"] }
tokio-stream = { version = "0.1.2", default-features = false }
#tracing and metrics
tracing = { version = "0.1", default-features = false }
tracing-futures = "0.2"
prometheus = { version = "0.11", default-features = false, optional = true }
#async
futures = { version = "0.3", features = ["thread-pool"] }
futures-core = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false, features = ["std"] }
async-channel = "1.5.1" #use for .close() channels
#mpsc channel registry
lazy_static = { version = "1.4", default-features = false }
rand = { version = "0.8" }
@ -35,8 +38,8 @@ lz-fear = { version = "0.1.1", optional = true }
[dev-dependencies]
tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] }
# `uvth` needed for doc tests
uvth = { version = ">= 3.0, <= 4.0", default-features = false }
tokio = { version = "1.0.1", default-features = false, features = ["io-std", "fs", "rt-multi-thread"] }
futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] }
clap = { version = "2.33", default-features = false }
shellexpand = "2.0.0"
tiny_http = "0.8.0"

View File

@ -3,10 +3,9 @@
//! RUST_BACKTRACE=1 cargo run --example chat -- --trace=info --port 15006
//! RUST_BACKTRACE=1 cargo run --example chat -- --trace=info --port 15006 --mode=client
//! ```
use async_std::{io, sync::RwLock};
use clap::{App, Arg};
use futures::executor::{block_on, ThreadPool};
use std::{sync::Arc, thread, time::Duration};
use tokio::{io, io::AsyncBufReadExt, runtime::Runtime, sync::RwLock};
use tracing::*;
use tracing_subscriber::EnvFilter;
use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr};
@ -100,18 +99,17 @@ fn main() {
}
fn server(address: ProtocolAddr) {
let (server, f) = Network::new(Pid::new());
let r = Arc::new(Runtime::new().unwrap());
let server = Network::new(Pid::new(), Arc::clone(&r));
let server = Arc::new(server);
std::thread::spawn(f);
let pool = ThreadPool::new().unwrap();
let participants = Arc::new(RwLock::new(Vec::new()));
block_on(async {
r.block_on(async {
server.listen(address).await.unwrap();
loop {
let p1 = Arc::new(server.connected().await.unwrap());
let server1 = server.clone();
participants.write().await.push(p1.clone());
pool.spawn_ok(client_connection(server1, p1, participants.clone()));
tokio::spawn(client_connection(server1, p1, participants.clone()));
}
});
}
@ -144,27 +142,27 @@ async fn client_connection(
}
fn client(address: ProtocolAddr) {
let (client, f) = Network::new(Pid::new());
std::thread::spawn(f);
let pool = ThreadPool::new().unwrap();
let r = Arc::new(Runtime::new().unwrap());
let client = Network::new(Pid::new(), Arc::clone(&r));
block_on(async {
r.block_on(async {
let p1 = client.connect(address.clone()).await.unwrap(); //remote representation of p1
let mut s1 = p1
.open(16, Promises::ORDERED | Promises::CONSISTENCY)
.await
.unwrap(); //remote representation of s1
let mut input_lines = io::BufReader::new(io::stdin());
println!("Enter your username:");
let mut username = String::new();
io::stdin().read_line(&mut username).await.unwrap();
input_lines.read_line(&mut username).await.unwrap();
username = username.split_whitespace().collect();
println!("Your username is: {}", username);
println!("write /quit to close");
pool.spawn_ok(read_messages(p1));
tokio::spawn(read_messages(p1));
s1.send(username).unwrap();
loop {
let mut line = String::new();
io::stdin().read_line(&mut line).await.unwrap();
input_lines.read_line(&mut line).await.unwrap();
line = line.split_whitespace().collect();
if line.as_str() == "/quit" {
println!("goodbye");

View File

@ -1,9 +1,7 @@
use async_std::{
fs,
path::{Path, PathBuf},
};
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use tokio::fs;
use veloren_network::{Participant, ProtocolAddr, Stream};
use std::collections::HashMap;

View File

@ -4,14 +4,9 @@
//! --profile=release -Z unstable-options -- --trace=info --port 15006)
//! (cd network/examples/fileshare && RUST_BACKTRACE=1 cargo run
//! --profile=release -Z unstable-options -- --trace=info --port 15007) ```
use async_std::{io, path::PathBuf};
use clap::{App, Arg, SubCommand};
use futures::{
channel::mpsc,
executor::{block_on, ThreadPool},
sink::SinkExt,
};
use std::{thread, time::Duration};
use std::{path::PathBuf, sync::Arc, thread, time::Duration};
use tokio::{io, io::AsyncBufReadExt, runtime::Runtime, sync::mpsc};
use tracing::*;
use tracing_subscriber::EnvFilter;
use veloren_network::ProtocolAddr;
@ -56,14 +51,14 @@ fn main() {
let port: u16 = matches.value_of("port").unwrap().parse().unwrap();
let address = ProtocolAddr::Tcp(format!("{}:{}", "127.0.0.1", port).parse().unwrap());
let runtime = Arc::new(Runtime::new().unwrap());
let (server, cmd_sender) = Server::new();
let pool = ThreadPool::new().unwrap();
pool.spawn_ok(server.run(address));
let (server, cmd_sender) = Server::new(Arc::clone(&runtime));
runtime.spawn(server.run(address));
thread::sleep(Duration::from_millis(50)); //just for trace
block_on(client(cmd_sender));
runtime.block_on(client(cmd_sender));
}
fn file_exists(file: String) -> Result<(), String> {
@ -130,14 +125,15 @@ fn get_options<'a, 'b>() -> App<'a, 'b> {
)
}
async fn client(mut cmd_sender: mpsc::UnboundedSender<LocalCommand>) {
async fn client(cmd_sender: mpsc::UnboundedSender<LocalCommand>) {
use std::io::Write;
loop {
let mut line = String::new();
let mut input_lines = io::BufReader::new(io::stdin());
print!("==> ");
std::io::stdout().flush().unwrap();
io::stdin().read_line(&mut line).await.unwrap();
input_lines.read_line(&mut line).await.unwrap();
let matches = match get_options().get_matches_from_safe(line.split_whitespace()) {
Err(e) => {
println!("{}", e.message);
@ -148,12 +144,12 @@ async fn client(mut cmd_sender: mpsc::UnboundedSender<LocalCommand>) {
match matches.subcommand() {
("quit", _) => {
cmd_sender.send(LocalCommand::Shutdown).await.unwrap();
cmd_sender.send(LocalCommand::Shutdown).unwrap();
println!("goodbye");
break;
},
("disconnect", _) => {
cmd_sender.send(LocalCommand::Disconnect).await.unwrap();
cmd_sender.send(LocalCommand::Disconnect).unwrap();
},
("connect", Some(connect_matches)) => {
let socketaddr = connect_matches
@ -163,7 +159,6 @@ async fn client(mut cmd_sender: mpsc::UnboundedSender<LocalCommand>) {
.unwrap();
cmd_sender
.send(LocalCommand::Connect(ProtocolAddr::Tcp(socketaddr)))
.await
.unwrap();
},
("t", _) => {
@ -171,28 +166,23 @@ async fn client(mut cmd_sender: mpsc::UnboundedSender<LocalCommand>) {
.send(LocalCommand::Connect(ProtocolAddr::Tcp(
"127.0.0.1:1231".parse().unwrap(),
)))
.await
.unwrap();
},
("serve", Some(serve_matches)) => {
let path = shellexpand::tilde(serve_matches.value_of("file").unwrap());
let path: PathBuf = path.parse().unwrap();
if let Some(fileinfo) = FileInfo::new(&path).await {
cmd_sender
.send(LocalCommand::Serve(fileinfo))
.await
.unwrap();
cmd_sender.send(LocalCommand::Serve(fileinfo)).unwrap();
}
},
("list", _) => {
cmd_sender.send(LocalCommand::List).await.unwrap();
cmd_sender.send(LocalCommand::List).unwrap();
},
("get", Some(get_matches)) => {
let id: u32 = get_matches.value_of("id").unwrap().parse().unwrap();
let file = get_matches.value_of("file");
cmd_sender
.send(LocalCommand::Get(id, file.map(|s| s.to_string())))
.await
.unwrap();
},

View File

@ -1,11 +1,12 @@
use crate::commands::{Command, FileInfo, LocalCommand, RemoteInfo};
use async_std::{
fs,
path::PathBuf,
sync::{Mutex, RwLock},
use futures_util::{FutureExt, StreamExt};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use tokio::{
fs, join,
runtime::Runtime,
sync::{mpsc, Mutex, RwLock},
};
use futures::{channel::mpsc, future::FutureExt, stream::StreamExt};
use std::{collections::HashMap, sync::Arc};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr, Stream};
@ -23,11 +24,10 @@ pub struct Server {
}
impl Server {
pub fn new() -> (Self, mpsc::UnboundedSender<LocalCommand>) {
let (command_sender, command_receiver) = mpsc::unbounded();
pub fn new(runtime: Arc<Runtime>) -> (Self, mpsc::UnboundedSender<LocalCommand>) {
let (command_sender, command_receiver) = mpsc::unbounded_channel();
let (network, f) = Network::new(Pid::new());
std::thread::spawn(f);
let network = Network::new(Pid::new(), runtime);
let run_channels = Some(ControlChannels { command_receiver });
(
@ -47,7 +47,7 @@ impl Server {
self.network.listen(address).await.unwrap();
futures::join!(
join!(
self.command_manager(run_channels.command_receiver,),
self.connect_manager(),
);
@ -55,6 +55,7 @@ impl Server {
async fn command_manager(&self, command_receiver: mpsc::UnboundedReceiver<LocalCommand>) {
trace!("Start command_manager");
let command_receiver = UnboundedReceiverStream::new(command_receiver);
command_receiver
.for_each_concurrent(None, async move |cmd| {
match cmd {
@ -106,7 +107,7 @@ impl Server {
async fn connect_manager(&self) {
trace!("Start connect_manager");
let iter = futures::stream::unfold((), |_| {
let iter = futures_util::stream::unfold((), |_| {
self.network.connected().map(|r| r.ok().map(|v| (v, ())))
});
@ -129,7 +130,7 @@ impl Server {
let id = p.remote_pid();
let ri = Arc::new(Mutex::new(RemoteInfo::new(cmd_out, file_out, p)));
self.remotes.write().await.insert(id, ri.clone());
futures::join!(
join!(
self.handle_remote_cmd(cmd_in, ri.clone()),
self.handle_files(file_in, ri.clone()),
);

View File

@ -6,12 +6,13 @@
mod metrics;
use clap::{App, Arg};
use futures::executor::block_on;
use serde::{Deserialize, Serialize};
use std::{
sync::Arc,
thread,
time::{Duration, Instant},
};
use tokio::runtime::Runtime;
use tracing::*;
use tracing_subscriber::EnvFilter;
use veloren_network::{Message, Network, Pid, Promises, ProtocolAddr};
@ -101,14 +102,16 @@ fn main() {
};
let mut background = None;
let runtime = Arc::new(Runtime::new().unwrap());
match matches.value_of("mode") {
Some("server") => server(address),
Some("client") => client(address),
Some("server") => server(address, Arc::clone(&runtime)),
Some("client") => client(address, Arc::clone(&runtime)),
Some("both") => {
let address1 = address.clone();
background = Some(thread::spawn(|| server(address1)));
let runtime2 = Arc::clone(&runtime);
background = Some(thread::spawn(|| server(address1, runtime2)));
thread::sleep(Duration::from_millis(200)); //start client after server
client(address);
client(address, Arc::clone(&runtime));
},
_ => panic!("Invalid mode, run --help!"),
};
@ -117,18 +120,17 @@ fn main() {
}
}
fn server(address: ProtocolAddr) {
fn server(address: ProtocolAddr, runtime: Arc<Runtime>) {
let mut metrics = metrics::SimpleMetrics::new();
let (server, f) = Network::new_with_registry(Pid::new(), metrics.registry());
std::thread::spawn(f);
let server = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), metrics.registry());
metrics.run("0.0.0.0:59112".parse().unwrap()).unwrap();
block_on(server.listen(address)).unwrap();
runtime.block_on(server.listen(address)).unwrap();
loop {
info!("Waiting for participant to connect");
let p1 = block_on(server.connected()).unwrap(); //remote representation of p1
let mut s1 = block_on(p1.opened()).unwrap(); //remote representation of s1
block_on(async {
let p1 = runtime.block_on(server.connected()).unwrap(); //remote representation of p1
let mut s1 = runtime.block_on(p1.opened()).unwrap(); //remote representation of s1
runtime.block_on(async {
let mut last = Instant::now();
let mut id = 0u64;
while let Ok(_msg) = s1.recv_raw().await {
@ -145,14 +147,15 @@ fn server(address: ProtocolAddr) {
}
}
fn client(address: ProtocolAddr) {
fn client(address: ProtocolAddr, runtime: Arc<Runtime>) {
let mut metrics = metrics::SimpleMetrics::new();
let (client, f) = Network::new_with_registry(Pid::new(), metrics.registry());
std::thread::spawn(f);
let client = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), metrics.registry());
metrics.run("0.0.0.0:59111".parse().unwrap()).unwrap();
let p1 = block_on(client.connect(address)).unwrap(); //remote representation of p1
let mut s1 = block_on(p1.open(16, Promises::ORDERED | Promises::CONSISTENCY)).unwrap(); //remote representation of s1
let p1 = runtime.block_on(client.connect(address)).unwrap(); //remote representation of p1
let mut s1 = runtime
.block_on(p1.open(16, Promises::ORDERED | Promises::CONSISTENCY))
.unwrap(); //remote representation of s1
let mut last = Instant::now();
let mut id = 0u64;
let raw_msg = Message::serialize(
@ -180,7 +183,7 @@ fn client(address: ProtocolAddr) {
drop(s1);
std::thread::sleep(std::time::Duration::from_millis(5000));
info!("Closing participant");
block_on(p1.disconnect()).unwrap();
runtime.block_on(p1.disconnect()).unwrap();
std::thread::sleep(std::time::Duration::from_millis(25000));
info!("DROPPING! client");
drop(client);

View File

@ -8,13 +8,6 @@ use crate::{
scheduler::Scheduler,
types::{Mid, Pid, Prio, Promises, Sid},
};
use tokio::{io, sync::Mutex};
use tokio::runtime::Runtime;
use futures::{
channel::{mpsc, oneshot},
sink::SinkExt,
stream::StreamExt,
};
#[cfg(feature = "compression")]
use lz_fear::raw::DecodeError;
#[cfg(feature = "metrics")]
@ -28,6 +21,11 @@ use std::{
Arc,
},
};
use tokio::{
io,
runtime::Runtime,
sync::{mpsc, oneshot, Mutex},
};
use tracing::*;
use tracing_futures::Instrument;
@ -78,9 +76,8 @@ pub struct Stream {
prio: Prio,
promises: Promises,
send_closed: Arc<AtomicBool>,
runtime: Arc<Runtime>,
a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
b2a_msg_recv_r: Option<mpsc::UnboundedReceiver<IncomingMessage>>,
b2a_msg_recv_r: Option<async_channel::Receiver<IncomingMessage>>,
a2b_close_stream_s: Option<mpsc::UnboundedSender<Sid>>,
}
@ -169,7 +166,8 @@ impl Network {
/// # Arguments
/// * `participant_id` - provide it by calling [`Pid::new()`], usually you
/// don't want to reuse a Pid for 2 `Networks`
/// * `runtime` - provide a tokio::Runtime, it's used to internally spawn tasks
/// * `runtime` - provide a tokio::Runtime, it's used to internally spawn
/// tasks. It is necessary to clean up in the non-async `Drop`.
///
/// # Result
/// * `Self` - returns a `Network` which can be `Send` to multiple areas of
@ -178,22 +176,15 @@ impl Network {
///
/// # Examples
/// ```rust
/// //Example with uvth
/// use uvth::ThreadPoolBuilder;
/// //Example with tokio
/// use std::sync::Arc;
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// let pool = ThreadPoolBuilder::new().build();
/// let (network, f) = Network::new(Pid::new());
/// pool.execute(f);
/// let runtime = Runtime::new();
/// let network = Network::new(Pid::new(), Arc::new(runtime));
/// ```
///
/// ```rust
/// //Example with std::thread
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// let (network, f) = Network::new(Pid::new());
/// std::thread::spawn(f);
/// ```
///
/// Usually you only create a single `Network` for an application,
/// except when client and server are in the same application, then you
@ -252,20 +243,18 @@ impl Network {
#[cfg(feature = "metrics")]
registry,
);
runtime.spawn(
async move {
trace!(?p, "Starting scheduler in own thread");
let _handle = tokio::spawn(
scheduler
.run()
.instrument(tracing::info_span!("scheduler", ?p)),
);
trace!(?p, "Stopping scheduler and his own thread");
}
);
runtime.spawn(async move {
trace!(?p, "Starting scheduler in own thread");
let _handle = tokio::spawn(
scheduler
.run()
.instrument(tracing::info_span!("scheduler", ?p)),
);
trace!(?p, "Stopping scheduler and his own thread");
});
Self {
local_pid: participant_id,
runtime: runtime,
runtime,
participant_disconnect_sender: Mutex::new(HashMap::new()),
listen_sender: Mutex::new(listen_sender),
connect_sender: Mutex::new(connect_sender),
@ -309,8 +298,7 @@ impl Network {
self.listen_sender
.lock()
.await
.send((address, s2a_result_s))
.await?;
.send((address, s2a_result_s))?;
match s2a_result_r.await? {
//waiting guarantees that we either listened successfully or get an error like port in
// use
@ -365,8 +353,7 @@ impl Network {
self.connect_sender
.lock()
.await
.send((address, pid_sender))
.await?;
.send((address, pid_sender))?;
let participant = match pid_receiver.await? {
Ok(p) => p,
Err(e) => return Err(NetworkError::ConnectFailed(e)),
@ -417,7 +404,7 @@ impl Network {
/// [`Streams`]: crate::api::Stream
/// [`listen`]: crate::api::Network::listen
pub async fn connected(&self) -> Result<Participant, NetworkError> {
let participant = self.connected_receiver.lock().await.next().await?;
let participant = self.connected_receiver.lock().await.recv().await?;
self.participant_disconnect_sender.lock().await.insert(
participant.remote_pid,
Arc::clone(&participant.a2s_disconnect_s),
@ -489,12 +476,11 @@ impl Participant {
/// [`Streams`]: crate::api::Stream
pub async fn open(&self, prio: u8, promises: Promises) -> Result<Stream, ParticipantError> {
let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel();
if let Err(e) = self
.a2b_stream_open_s
.lock()
.await
.send((prio, promises, p2a_return_stream_s))
.await
if let Err(e) =
self.a2b_stream_open_s
.lock()
.await
.send((prio, promises, p2a_return_stream_s))
{
debug!(?e, "bParticipant is already closed, notifying");
return Err(ParticipantError::ParticipantDisconnected);
@ -546,7 +532,7 @@ impl Participant {
/// [`connected`]: Network::connected
/// [`open`]: Participant::open
pub async fn opened(&self) -> Result<Stream, ParticipantError> {
match self.b2a_stream_opened_r.lock().await.next().await {
match self.b2a_stream_opened_r.lock().await.recv().await {
Some(stream) => {
let sid = stream.sid;
debug!(?sid, ?self.remote_pid, "Receive opened stream");
@ -609,13 +595,12 @@ impl Participant {
//Streams will be closed by BParticipant
match self.a2s_disconnect_s.lock().await.take() {
Some(mut a2s_disconnect_s) => {
Some(a2s_disconnect_s) => {
let (finished_sender, finished_receiver) = oneshot::channel();
// Participant is connecting to Scheduler here, not as usual
// Participant<->BParticipant
a2s_disconnect_s
.send((pid, finished_sender))
.await
.expect("Something is wrong in internal scheduler coding");
match finished_receiver.await {
Ok(res) => {
@ -661,9 +646,8 @@ impl Stream {
prio: Prio,
promises: Promises,
send_closed: Arc<AtomicBool>,
runtime: Arc<Runtime>,
a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
b2a_msg_recv_r: mpsc::UnboundedReceiver<IncomingMessage>,
b2a_msg_recv_r: async_channel::Receiver<IncomingMessage>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
) -> Self {
Self {
@ -673,7 +657,6 @@ impl Stream {
prio,
promises,
send_closed,
runtime,
a2b_msg_s,
b2a_msg_recv_r: Some(b2a_msg_recv_r),
a2b_close_stream_s: Some(a2b_close_stream_s),
@ -877,13 +860,13 @@ impl Stream {
pub async fn recv_raw(&mut self) -> Result<Message, StreamError> {
match &mut self.b2a_msg_recv_r {
Some(b2a_msg_recv_r) => {
match b2a_msg_recv_r.next().await {
Some(msg) => Ok(Message {
match b2a_msg_recv_r.recv().await {
Ok(msg) => Ok(Message {
buffer: Arc::new(msg.buffer),
#[cfg(feature = "compression")]
compressed: self.promises.contains(Promises::COMPRESSED),
}),
None => {
Err(_) => {
self.b2a_msg_recv_r = None; //prevent panic
Err(StreamError::StreamClosed)
},
@ -929,13 +912,8 @@ impl Stream {
#[inline]
pub fn try_recv<M: DeserializeOwned>(&mut self) -> Result<Option<M>, StreamError> {
match &mut self.b2a_msg_recv_r {
Some(b2a_msg_recv_r) => match b2a_msg_recv_r.try_next() {
Err(_) => Ok(None),
Ok(None) => {
self.b2a_msg_recv_r = None; //prevent panic
Err(StreamError::StreamClosed)
},
Ok(Some(msg)) => Ok(Some(
Some(b2a_msg_recv_r) => match b2a_msg_recv_r.try_recv() {
Ok(msg) => Ok(Some(
Message {
buffer: Arc::new(msg.buffer),
#[cfg(feature = "compression")]
@ -943,6 +921,11 @@ impl Stream {
}
.deserialize()?,
)),
Err(async_channel::TryRecvError::Empty) => Ok(None),
Err(async_channel::TryRecvError::Closed) => {
self.b2a_msg_recv_r = None; //prevent panic
Err(StreamError::StreamClosed)
},
},
None => Err(StreamError::StreamClosed),
}
@ -975,16 +958,13 @@ impl Drop for Network {
self.participant_disconnect_sender.lock().await.drain()
{
match a2s_disconnect_s.lock().await.take() {
Some(mut a2s_disconnect_s) => {
Some(a2s_disconnect_s) => {
trace!(?remote_pid, "Participants will be closed");
let (finished_sender, finished_receiver) = oneshot::channel();
finished_receiver_list.push((remote_pid, finished_receiver));
a2s_disconnect_s
.send((remote_pid, finished_sender))
.await
.expect(
"Scheduler is closed, but nobody other should be able to close it",
);
a2s_disconnect_s.send((remote_pid, finished_sender)).expect(
"Scheduler is closed, but nobody other should be able to close it",
);
},
None => trace!(?remote_pid, "Participant already disconnected gracefully"),
}
@ -1026,13 +1006,12 @@ impl Drop for Participant {
?pid,
"Participant has been shutdown cleanly, no further waiting is required!"
),
Some(mut a2s_disconnect_s) => {
Some(a2s_disconnect_s) => {
debug!(?pid, "Disconnect from Scheduler");
self.runtime.block_on(async {
let (finished_sender, finished_receiver) = oneshot::channel();
a2s_disconnect_s
.send((self.remote_pid, finished_sender))
.await
.expect("Something is wrong in internal scheduler coding");
if let Err(e) = finished_receiver
.await
@ -1059,7 +1038,10 @@ impl Drop for Stream {
let sid = self.sid;
let pid = self.pid;
debug!(?pid, ?sid, "Shutting down Stream");
self.runtime.block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid))
self.a2b_close_stream_s
.take()
.unwrap()
.send(self.sid)
.expect("bparticipant part of a gracefully shutdown must have crashed");
} else {
let sid = self.sid;
@ -1096,12 +1078,16 @@ impl From<std::option::NoneError> for NetworkError {
fn from(_err: std::option::NoneError) -> Self { NetworkError::NetworkClosed }
}
impl From<mpsc::SendError> for NetworkError {
fn from(_err: mpsc::SendError) -> Self { NetworkError::NetworkClosed }
impl<T> From<mpsc::error::SendError<T>> for NetworkError {
fn from(_err: mpsc::error::SendError<T>) -> Self { NetworkError::NetworkClosed }
}
impl From<oneshot::Canceled> for NetworkError {
fn from(_err: oneshot::Canceled) -> Self { NetworkError::NetworkClosed }
impl From<oneshot::error::RecvError> for NetworkError {
fn from(_err: oneshot::error::RecvError) -> Self { NetworkError::NetworkClosed }
}
impl From<std::io::Error> for NetworkError {
fn from(_err: std::io::Error) -> Self { NetworkError::NetworkClosed }
}
impl From<Box<bincode::ErrorKind>> for StreamError {

View File

@ -8,14 +8,16 @@ use crate::{
VELOREN_NETWORK_VERSION,
},
};
use futures::{
channel::{mpsc, oneshot},
join,
sink::SinkExt,
stream::StreamExt,
use futures_core::task::Poll;
use futures_util::{
task::{noop_waker, Context},
FutureExt,
};
#[cfg(feature = "metrics")] use std::sync::Arc;
use tokio::{
join,
sync::{mpsc, oneshot},
};
use tracing::*;
pub(crate) struct Channel {
@ -26,7 +28,7 @@ pub(crate) struct Channel {
impl Channel {
pub fn new(cid: u64) -> (Self, mpsc::UnboundedSender<Frame>, oneshot::Sender<()>) {
let (c2w_frame_s, c2w_frame_r) = mpsc::unbounded::<Frame>();
let (c2w_frame_s, c2w_frame_r) = mpsc::unbounded_channel::<Frame>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
(
Self {
@ -52,7 +54,7 @@ impl Channel {
let cnt = leftover_cid_frame.len();
trace!(?cnt, "Reapplying leftovers");
for cid_frame in leftover_cid_frame.drain(..) {
w2c_cid_frame_s.send(cid_frame).await.unwrap();
w2c_cid_frame_s.send(cid_frame).unwrap();
}
trace!(?cnt, "All leftovers reapplied");
@ -115,8 +117,8 @@ impl Handshake {
}
pub async fn setup(self, protocol: &Protocols) -> Result<(Pid, Sid, u128, Vec<C2pFrame>), ()> {
let (c2w_frame_s, c2w_frame_r) = mpsc::unbounded::<Frame>();
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<C2pFrame>();
let (c2w_frame_s, c2w_frame_r) = mpsc::unbounded_channel::<Frame>();
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded_channel::<C2pFrame>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
let handler_future =
@ -142,8 +144,10 @@ impl Handshake {
match res {
Ok(res) => {
let fake_waker = noop_waker();
let mut ctx = Context::from_waker(&fake_waker);
let mut leftover_frames = vec![];
while let Ok(Some(cid_frame)) = w2c_cid_frame_r.try_next() {
while let Poll::Ready(Some(cid_frame)) = w2c_cid_frame_r.poll_recv(&mut ctx) {
leftover_frames.push(cid_frame);
}
let cnt = leftover_frames.len();
@ -175,7 +179,7 @@ impl Handshake {
self.send_handshake(&mut c2w_frame_s).await;
}
let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame);
let frame = w2c_cid_frame_r.recv().await.map(|(_cid, frame)| frame);
#[cfg(feature = "metrics")]
{
if let Some(Ok(ref frame)) = frame {
@ -254,7 +258,7 @@ impl Handshake {
return Err(());
}
let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame);
let frame = w2c_cid_frame_r.recv().await.map(|(_cid, frame)| frame);
let r = match frame {
Some(Ok(Frame::Init { pid, secret })) => {
debug!(?pid, "Participant send their ID");
@ -315,7 +319,6 @@ impl Handshake {
magic_number: VELOREN_MAGIC_NUMBER,
version: VELOREN_NETWORK_VERSION,
})
.await
.unwrap();
}
@ -330,7 +333,6 @@ impl Handshake {
pid: self.local_pid,
secret: self.secret,
})
.await
.unwrap();
}
@ -353,7 +355,7 @@ impl Handshake {
.with_label_values(&[&cid_string, "Shutdown"])
.inc();
}
c2w_frame_s.send(Frame::Raw(data)).await.unwrap();
c2w_frame_s.send(Frame::Shutdown).await.unwrap();
c2w_frame_s.send(Frame::Raw(data)).unwrap();
c2w_frame_s.send(Frame::Shutdown).unwrap();
}
}

View File

@ -39,8 +39,8 @@
//!
//! # Examples
//! ```rust
//! use tokio::task::sleep;
//! use futures::{executor::block_on, join};
//! use tokio::task::sleep;
//! use veloren_network::{Network, Pid, Promises, ProtocolAddr};
//!
//! // Client

View File

@ -256,8 +256,8 @@ impl std::fmt::Debug for MessageBuffer {
#[cfg(test)]
mod tests {
use crate::{api::Stream, message::*};
use futures::channel::mpsc;
use std::sync::{atomic::AtomicBool, Arc};
use tokio::sync::mpsc;
fn stub_stream(compressed: bool) -> Stream {
use crate::{api::*, types::*};
@ -273,8 +273,8 @@ mod tests {
let promises = Promises::empty();
let (a2b_msg_s, _a2b_msg_r) = crossbeam_channel::unbounded();
let (_b2a_msg_recv_s, b2a_msg_recv_r) = mpsc::unbounded();
let (a2b_close_stream_s, _a2b_close_stream_r) = mpsc::unbounded();
let (_b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded();
let (a2b_close_stream_s, _a2b_close_stream_r) = mpsc::unbounded_channel();
Stream::new(
Pid::fake(0),

View File

@ -8,15 +8,7 @@ use crate::{
protocols::Protocols,
types::{Cid, Frame, Pid, Prio, Promises, Sid},
};
use tokio::sync::{Mutex, RwLock};
use tokio::runtime::Runtime;
use futures::{
channel::{mpsc, oneshot},
future::FutureExt,
select,
sink::SinkExt,
stream::StreamExt,
};
use futures_util::{FutureExt, StreamExt};
use std::{
collections::{HashMap, VecDeque},
sync::{
@ -25,6 +17,12 @@ use std::{
},
time::{Duration, Instant},
};
use tokio::{
runtime::Runtime,
select,
sync::{mpsc, oneshot, Mutex, RwLock},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
use tracing_futures::Instrument;
@ -47,13 +45,14 @@ struct StreamInfo {
prio: Prio,
promises: Promises,
send_closed: Arc<AtomicBool>,
b2a_msg_recv_s: Mutex<mpsc::UnboundedSender<IncomingMessage>>,
b2a_msg_recv_s: Mutex<async_channel::Sender<IncomingMessage>>,
}
#[derive(Debug)]
struct ControlChannels {
a2b_stream_open_r: mpsc::UnboundedReceiver<A2bStreamOpen>,
b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
b2b_close_stream_opened_sender_r: oneshot::Receiver<()>,
s2b_create_channel_r: mpsc::UnboundedReceiver<S2bCreateChannel>,
a2b_close_stream_r: mpsc::UnboundedReceiver<Sid>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
@ -63,7 +62,7 @@ struct ControlChannels {
#[derive(Debug)]
struct ShutdownInfo {
//a2b_stream_open_r: mpsc::UnboundedReceiver<A2bStreamOpen>,
b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
b2b_close_stream_opened_sender_s: Option<oneshot::Sender<()>>,
error: Option<ParticipantError>,
}
@ -84,6 +83,12 @@ pub struct BParticipant {
}
impl BParticipant {
const BANDWIDTH: u64 = 25_000_000;
const FRAMES_PER_TICK: u64 = Self::BANDWIDTH * Self::TICK_TIME_MS / 1000 / 1400 /*TCP FRAME*/;
const TICK_TIME: Duration = Duration::from_millis(Self::TICK_TIME_MS);
//in bit/s
const TICK_TIME_MS: u64 = 10;
#[allow(clippy::type_complexity)]
pub(crate) fn new(
remote_pid: Pid,
@ -97,21 +102,24 @@ impl BParticipant {
mpsc::UnboundedSender<S2bCreateChannel>,
oneshot::Sender<S2bShutdownBparticipant>,
) {
let (a2b_steam_open_s, a2b_stream_open_r) = mpsc::unbounded::<A2bStreamOpen>();
let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded::<Stream>();
let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded();
let (a2b_steam_open_s, a2b_stream_open_r) = mpsc::unbounded_channel::<A2bStreamOpen>();
let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded_channel::<Stream>();
let (b2b_close_stream_opened_sender_s, b2b_close_stream_opened_sender_r) =
oneshot::channel();
let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded_channel();
let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel();
let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded();
let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded_channel();
let shutdown_info = RwLock::new(ShutdownInfo {
//a2b_stream_open_r: a2b_stream_open_r.clone(),
b2a_stream_opened_s: b2a_stream_opened_s.clone(),
b2b_close_stream_opened_sender_s: Some(b2b_close_stream_opened_sender_s),
error: None,
});
let run_channels = Some(ControlChannels {
a2b_stream_open_r,
b2a_stream_opened_s,
b2b_close_stream_opened_sender_r,
s2b_create_channel_r,
a2b_close_stream_r,
a2b_close_stream_s,
@ -147,7 +155,7 @@ impl BParticipant {
let (shutdown_stream_close_mgr_sender, shutdown_stream_close_mgr_receiver) =
oneshot::channel();
let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel();
let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<C2pFrame>();
let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded_channel::<C2pFrame>();
let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = PrioManager::new(
#[cfg(feature = "metrics")]
Arc::clone(&self.metrics),
@ -155,7 +163,7 @@ impl BParticipant {
);
let run_channels = self.run_channels.take().unwrap();
futures::join!(
tokio::join!(
self.open_mgr(
run_channels.a2b_stream_open_r,
run_channels.a2b_close_stream_s.clone(),
@ -165,6 +173,7 @@ impl BParticipant {
self.handle_frames_mgr(
w2b_frames_r,
run_channels.b2a_stream_opened_s,
run_channels.b2b_close_stream_opened_sender_r,
run_channels.a2b_close_stream_s,
a2p_msg_s.clone(),
),
@ -188,13 +197,11 @@ impl BParticipant {
&self,
mut prios: PrioManager,
mut shutdown_send_mgr_receiver: oneshot::Receiver<oneshot::Sender<()>>,
mut b2s_prio_statistic_s: mpsc::UnboundedSender<B2sPrioStatistic>,
b2s_prio_statistic_s: mpsc::UnboundedSender<B2sPrioStatistic>,
) {
//This time equals the MINIMUM Latency in average, so keep it down and //Todo:
// make it configurable or switch to await E.g. Prio 0 = await, prio 50
// wait for more messages
const TICK_TIME: Duration = Duration::from_millis(10);
const FRAMES_PER_TICK: usize = 10005;
self.running_mgr.fetch_add(1, Ordering::Relaxed);
let mut b2b_prios_flushed_s = None; //closing up
trace!("Start send_mgr");
@ -203,7 +210,9 @@ impl BParticipant {
let mut i: u64 = 0;
loop {
let mut frames = VecDeque::new();
prios.fill_frames(FRAMES_PER_TICK, &mut frames).await;
prios
.fill_frames(Self::FRAMES_PER_TICK as usize, &mut frames)
.await;
let len = frames.len();
for (_, frame) in frames {
self.send_frame(
@ -215,9 +224,8 @@ impl BParticipant {
}
b2s_prio_statistic_s
.send((self.remote_pid, len as u64, /* */ 0))
.await
.unwrap();
tokio::time::sleep(TICK_TIME).await;
tokio::time::sleep(Self::TICK_TIME).await;
i += 1;
if i.rem_euclid(1000) == 0 {
trace!("Did 1000 ticks");
@ -229,7 +237,7 @@ impl BParticipant {
break;
}
if b2b_prios_flushed_s.is_none() {
if let Some(prios_flushed_s) = shutdown_send_mgr_receiver.try_recv().unwrap() {
if let Ok(prios_flushed_s) = shutdown_send_mgr_receiver.try_recv() {
b2b_prios_flushed_s = Some(prios_flushed_s);
}
}
@ -252,8 +260,9 @@ impl BParticipant {
) -> bool {
let mut drop_cid = None;
// TODO: find out ideal channel here
let res = if let Some(ci) = self.channels.read().await.values().next() {
let mut ci = ci.lock().await;
let ci = ci.lock().await;
//we are increasing metrics without checking the result to please
// borrow_checker. otherwise we would need to close `frame` what we
// dont want!
@ -261,7 +270,7 @@ impl BParticipant {
frames_out_total_cache
.with_label_values(ci.cid, &frame)
.inc();
if let Err(e) = ci.b2w_frame_s.send(frame).await {
if let Err(e) = ci.b2w_frame_s.send(frame) {
let cid = ci.cid;
info!(?e, ?cid, "channel no longer available");
drop_cid = Some(cid);
@ -294,7 +303,6 @@ impl BParticipant {
if let Err(e) = ci.b2r_read_shutdown.send(()) {
trace!(?cid, ?e, "seems like was already shut down");
}
ci.b2w_frame_s.close_channel();
}
//TODO FIXME tags: takeover channel multiple
info!(
@ -311,7 +319,8 @@ impl BParticipant {
async fn handle_frames_mgr(
&self,
mut w2b_frames_r: mpsc::UnboundedReceiver<C2pFrame>,
mut b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
b2b_close_stream_opened_sender_r: oneshot::Receiver<()>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
) {
@ -323,21 +332,24 @@ impl BParticipant {
let mut dropped_instant = Instant::now();
let mut dropped_cnt = 0u64;
let mut dropped_sid = Sid::new(0);
let mut b2a_stream_opened_s = Some(b2a_stream_opened_s);
let mut b2b_close_stream_opened_sender_r = b2b_close_stream_opened_sender_r.fuse();
while let Some((cid, result_frame)) = w2b_frames_r.next().await {
while let Some((cid, result_frame)) = select!(
next = w2b_frames_r.recv().fuse() => next,
_ = &mut b2b_close_stream_opened_sender_r => {
b2a_stream_opened_s = None;
None
},
) {
//trace!(?result_frame, "handling frame");
let frame = match result_frame {
Ok(frame) => frame,
Err(()) => {
// The read protocol stopped, i need to make sure that write gets stopped
debug!("read protocol was closed. Stopping write protocol");
if let Some(ci) = self.channels.read().await.get(&cid) {
let mut ci = ci.lock().await;
ci.b2w_frame_s
.close()
.await
.expect("couldn't stop write protocol");
}
// The read protocol stopped, i need to make sure that write gets stopped, can
// drop channel as it's dead anyway
debug!("read protocol was closed. Stopping channel");
self.channels.write().await.remove(&cid);
continue;
},
};
@ -360,13 +372,18 @@ impl BParticipant {
let stream = self
.create_stream(sid, prio, promises, a2p_msg_s, &a2b_close_stream_s)
.await;
if let Err(e) = b2a_stream_opened_s.send(stream).await {
warn!(
?e,
?sid,
"couldn't notify api::Participant that a stream got opened. Is the \
participant already dropped?"
);
match &b2a_stream_opened_s {
None => debug!("dropping openStream as Channel is already closing"),
Some(s) => {
if let Err(e) = s.send(stream) {
warn!(
?e,
?sid,
"couldn't notify api::Participant that a stream got opened. \
Is the participant already dropped?"
);
}
},
}
},
Frame::CloseStream { sid } => {
@ -465,6 +482,7 @@ impl BParticipant {
) {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("Start create_channel_mgr");
let s2b_create_channel_r = UnboundedReceiverStream::new(s2b_create_channel_r);
s2b_create_channel_r
.for_each_concurrent(
None,
@ -549,8 +567,8 @@ impl BParticipant {
let mut shutdown_open_mgr_receiver = shutdown_open_mgr_receiver.fuse();
//from api or shutdown signal
while let Some((prio, promises, p2a_return_stream)) = select! {
next = a2b_stream_open_r.next().fuse() => next,
_ = shutdown_open_mgr_receiver => None,
next = a2b_stream_open_r.recv().fuse() => next,
_ = &mut shutdown_open_mgr_receiver => None,
} {
debug!(?prio, ?promises, "Got request to open a new steam");
//TODO: a2b_stream_open_r isn't closed on api_close yet. This needs to change.
@ -657,7 +675,6 @@ impl BParticipant {
itself, ignoring"
);
};
ci.b2w_frame_s.close_channel();
}
//Wait for other bparticipants mgr to close via AtomicUsize
@ -712,8 +729,8 @@ impl BParticipant {
//from api or shutdown signal
while let Some(sid) = select! {
next = a2b_close_stream_r.next().fuse() => next,
sender = shutdown_stream_close_mgr_receiver => {
next = a2b_close_stream_r.recv().fuse() => next,
sender = &mut shutdown_stream_close_mgr_receiver => {
b2b_stream_close_shutdown_confirmed_s = Some(sender.unwrap());
None
}
@ -779,7 +796,7 @@ impl BParticipant {
match self.streams.read().await.get(&sid) {
Some(si) => {
si.send_closed.store(true, Ordering::Relaxed);
si.b2a_msg_recv_s.lock().await.close_channel();
si.b2a_msg_recv_s.lock().await.close();
},
None => trace!(
"Couldn't find the stream, might be simultaneous close from local/remote"
@ -828,7 +845,7 @@ impl BParticipant {
a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
a2b_close_stream_s: &mpsc::UnboundedSender<Sid>,
) -> Stream {
let (b2a_msg_recv_s, b2a_msg_recv_r) = mpsc::unbounded::<IncomingMessage>();
let (b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded::<IncomingMessage>();
let send_closed = Arc::new(AtomicBool::new(false));
self.streams.write().await.insert(sid, StreamInfo {
prio,
@ -847,7 +864,6 @@ impl BParticipant {
prio,
promises,
send_closed,
Arc::clone(&self.runtime),
a2p_msg_s,
b2a_msg_recv_r,
a2b_close_stream_s.clone(),
@ -860,7 +876,9 @@ impl BParticipant {
if let Some(r) = reason {
lock.error = Some(r);
}
lock.b2a_stream_opened_s.close_channel();
lock.b2b_close_stream_opened_sender_s
.take()
.map(|s| s.send(()));
debug!("Closing all streams for write");
for (sid, si) in self.streams.read().await.iter() {
@ -876,7 +894,7 @@ impl BParticipant {
debug!("Closing all streams");
for (sid, si) in self.streams.read().await.iter() {
trace!(?sid, "Shutting down Stream");
si.b2a_msg_recv_s.lock().await.close_channel();
si.b2a_msg_recv_s.lock().await.close();
}
}
}

View File

@ -11,9 +11,9 @@ use crate::{
types::{Frame, Prio, Sid},
};
use crossbeam_channel::{unbounded, Receiver, Sender};
use futures::channel::oneshot;
use std::collections::{HashMap, HashSet, VecDeque};
#[cfg(feature = "metrics")] use std::sync::Arc;
use tokio::sync::oneshot;
use tracing::trace;
const PRIO_MAX: usize = 64;
@ -289,8 +289,8 @@ mod tests {
types::{Frame, Pid, Prio, Sid},
};
use crossbeam_channel::Sender;
use futures::{channel::oneshot, executor::block_on};
use std::{collections::VecDeque, sync::Arc};
use tokio::{runtime::Runtime, sync::oneshot};
const SIZE: u64 = OutgoingMessage::FRAME_DATA_SIZE;
const USIZE: usize = OutgoingMessage::FRAME_DATA_SIZE as usize;
@ -366,7 +366,9 @@ mod tests {
let (mut mgr, msg_tx, _flush_tx) = mock_new();
msg_tx.send(mock_out(16, 1337)).unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(100, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(100, &mut frames));
assert_header(&mut frames, 1337, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
@ -380,7 +382,9 @@ mod tests {
msg_tx.send(mock_out(20, 42)).unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(100, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(100, &mut frames));
assert_header(&mut frames, 1337, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
assert_header(&mut frames, 42, 3);
@ -394,7 +398,9 @@ mod tests {
msg_tx.send(mock_out(20, 42)).unwrap();
msg_tx.send(mock_out(16, 1337)).unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(100, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(100, &mut frames));
assert_header(&mut frames, 1337, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
@ -420,7 +426,9 @@ mod tests {
msg_tx.send(mock_out(16, 11)).unwrap();
msg_tx.send(mock_out(20, 13)).unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(100, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(100, &mut frames));
for i in 1..14 {
assert_header(&mut frames, i, 3);
@ -447,13 +455,17 @@ mod tests {
msg_tx.send(mock_out(20, 13)).unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(3, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(3, &mut frames));
for i in 1..4 {
assert_header(&mut frames, i, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
}
assert!(frames.is_empty());
block_on(mgr.fill_frames(11, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(11, &mut frames));
for i in 4..14 {
assert_header(&mut frames, i, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
@ -466,7 +478,9 @@ mod tests {
let (mut mgr, msg_tx, _flush_tx) = mock_new();
msg_tx.send(mock_out_large(16, 1)).unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(100, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(100, &mut frames));
assert_header(&mut frames, 1, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
@ -481,7 +495,9 @@ mod tests {
msg_tx.send(mock_out_large(16, 1)).unwrap();
msg_tx.send(mock_out_large(16, 2)).unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(100, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(100, &mut frames));
assert_header(&mut frames, 1, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
@ -500,14 +516,18 @@ mod tests {
msg_tx.send(mock_out_large(16, 1)).unwrap();
msg_tx.send(mock_out_large(16, 2)).unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(2, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(2, &mut frames));
assert_header(&mut frames, 1, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
assert_data(&mut frames, SIZE, vec![49; USIZE]);
msg_tx.send(mock_out(0, 3)).unwrap();
block_on(mgr.fill_frames(100, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(100, &mut frames));
assert_header(&mut frames, 3, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
@ -530,7 +550,9 @@ mod tests {
msg_tx.send(mock_out(16, 2)).unwrap();
msg_tx.send(mock_out(16, 2)).unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(2000, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(2000, &mut frames));
assert_header(&mut frames, 2, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
@ -549,13 +571,17 @@ mod tests {
msg_tx.send(mock_out(16, 2)).unwrap();
}
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(2000, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(2000, &mut frames));
//^unimportant frames, gonna be dropped
msg_tx.send(mock_out(20, 1)).unwrap();
msg_tx.send(mock_out(16, 2)).unwrap();
msg_tx.send(mock_out(16, 2)).unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(2000, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(2000, &mut frames));
//important in that test is, that after the first frames got cleared i reset
// the Points even though 998 prio 16 messages have been send at this
@ -589,7 +615,9 @@ mod tests {
.unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(2000, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(2000, &mut frames));
assert_header(&mut frames, 2, 7000);
assert_data(&mut frames, 0, vec![1; USIZE]);
@ -619,7 +647,9 @@ mod tests {
msg_tx.send(mock_out(16, 8)).unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(2000, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(2000, &mut frames));
assert_header(&mut frames, 2, 7000);
assert_data(&mut frames, 0, vec![1; USIZE]);
@ -651,7 +681,9 @@ mod tests {
msg_tx.send(mock_out(20, 8)).unwrap();
let mut frames = VecDeque::new();
block_on(mgr.fill_frames(2000, &mut frames));
Runtime::new()
.unwrap()
.block_on(mgr.fill_frames(2000, &mut frames));
assert_header(&mut frames, 2, 7000);
assert_data(&mut frames, 0, vec![1; USIZE]);

View File

@ -4,19 +4,14 @@ use crate::{
participant::C2pFrame,
types::{Cid, Frame},
};
use futures_util::{future::Fuse, FutureExt};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpStream, UdpSocket},
select,
sync::{mpsc, oneshot, Mutex},
};
use futures::{
channel::{mpsc, oneshot},
future::{Fuse, FutureExt},
lock::Mutex,
select,
sink::SinkExt,
stream::StreamExt,
};
use std::{convert::TryFrom, net::SocketAddr, sync::Arc};
use tracing::*;
@ -75,7 +70,7 @@ impl TcpProtocol {
async fn read_frame<R: AsyncReadExt + std::marker::Unpin>(
r: &mut R,
mut end_receiver: &mut Fuse<oneshot::Receiver<()>>,
end_receiver: &mut Fuse<oneshot::Receiver<()>>,
) -> Result<Frame, Option<std::io::Error>> {
let handle = |read_result| match read_result {
Ok(_) => Ok(()),
@ -190,7 +185,6 @@ impl TcpProtocol {
}
w2c_cid_frame_s
.send((cid, Ok(frame)))
.await
.expect("Channel or Participant seems no longer to exist");
},
Err(e_option) => {
@ -201,7 +195,6 @@ impl TcpProtocol {
// need a explicit STOP here
w2c_cid_frame_s
.send((cid, Err(())))
.await
.expect("Channel or Participant seems no longer to exist");
}
//None is clean shutdown
@ -284,7 +277,7 @@ impl TcpProtocol {
#[cfg(not(feature = "metrics"))]
let _cid = cid;
while let Some(frame) = c2w_frame_r.next().await {
while let Some(frame) = c2w_frame_r.recv().await {
#[cfg(feature = "metrics")]
{
metrics_cache.with_label_values(&frame).inc();
@ -343,15 +336,15 @@ impl UdpProtocol {
let mut data_in = self.data_in.lock().await;
let mut end_r = end_r.fuse();
while let Some(bytes) = select! {
r = data_in.next().fuse() => match r {
r = data_in.recv().fuse() => match r {
Some(r) => Some(r),
None => {
info!("Udp read ended");
w2c_cid_frame_s.send((cid, Err(()))).await.expect("Channel or Participant seems no longer to exist");
w2c_cid_frame_s.send((cid, Err(()))).expect("Channel or Participant seems no longer to exist");
None
}
},
_ = end_r => None,
_ = &mut end_r => None,
} {
trace!("Got raw UDP message with len: {}", bytes.len());
let frame_no = bytes[0];
@ -389,7 +382,7 @@ impl UdpProtocol {
};
#[cfg(feature = "metrics")]
metrics_cache.with_label_values(&frame).inc();
w2c_cid_frame_s.send((cid, Ok(frame))).await.unwrap();
w2c_cid_frame_s.send((cid, Ok(frame))).unwrap();
}
trace!("Shutting down udp read()");
}
@ -406,7 +399,7 @@ impl UdpProtocol {
.with_label_values(&[&cid.to_string()]);
#[cfg(not(feature = "metrics"))]
let _cid = cid;
while let Some(frame) = c2w_frame_r.next().await {
while let Some(frame) = c2w_frame_r.recv().await {
#[cfg(feature = "metrics")]
metrics_cache.with_label_values(&frame).inc();
let len = match frame {
@ -501,9 +494,8 @@ impl UdpProtocol {
mod tests {
use super::*;
use crate::{metrics::NetworkMetrics, types::Pid};
use tokio::net;
use futures::{executor::block_on, stream::StreamExt};
use std::sync::Arc;
use tokio::{net, runtime::Runtime, sync::mpsc};
#[test]
fn tcp_read_handshake() {
@ -511,11 +503,11 @@ mod tests {
let cid = 80085;
let metrics = Arc::new(NetworkMetrics::new(&pid).unwrap());
let addr = std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 0, 0, 1), 50500);
block_on(async {
Runtime::new().unwrap().block_on(async {
let server = net::TcpListener::bind(addr).await.unwrap();
let mut client = net::TcpStream::connect(addr).await.unwrap();
let s_stream = server.incoming().next().await.unwrap().unwrap();
let (s_stream, _) = server.accept().await.unwrap();
let prot = TcpProtocol::new(s_stream, metrics);
//Send Handshake
@ -524,21 +516,21 @@ mod tests {
client.write_all(&1337u32.to_le_bytes()).await.unwrap();
client.write_all(&0u32.to_le_bytes()).await.unwrap();
client.write_all(&42u32.to_le_bytes()).await.unwrap();
client.flush();
client.flush().await.unwrap();
//handle data
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<C2pFrame>();
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded_channel::<C2pFrame>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
let cid2 = cid;
let t = std::thread::spawn(move || {
block_on(async {
Runtime::new().unwrap().block_on(async {
prot.read_from_wire(cid2, &mut w2c_cid_frame_s, read_stop_receiver)
.await;
})
});
// Assert than we get some value back! Its a Handshake!
//tokio::task::sleep(std::time::Duration::from_millis(1000));
let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap();
let (cid_r, frame) = w2c_cid_frame_r.recv().await.unwrap();
assert_eq!(cid, cid_r);
if let Ok(Frame::Handshake {
magic_number,
@ -561,11 +553,11 @@ mod tests {
let cid = 80085;
let metrics = Arc::new(NetworkMetrics::new(&pid).unwrap());
let addr = std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 0, 0, 1), 50501);
block_on(async {
Runtime::new().unwrap().block_on(async {
let server = net::TcpListener::bind(addr).await.unwrap();
let mut client = net::TcpStream::connect(addr).await.unwrap();
let s_stream = server.incoming().next().await.unwrap().unwrap();
let (s_stream, _) = server.accept().await.unwrap();
let prot = TcpProtocol::new(s_stream, metrics);
//Send Handshake
@ -573,19 +565,19 @@ mod tests {
.write_all("x4hrtzsektfhxugzdtz5r78gzrtzfhxfdthfthuzhfzzufasgasdfg".as_bytes())
.await
.unwrap();
client.flush();
client.flush().await.unwrap();
//handle data
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<C2pFrame>();
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded_channel::<C2pFrame>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
let cid2 = cid;
let t = std::thread::spawn(move || {
block_on(async {
Runtime::new().unwrap().block_on(async {
prot.read_from_wire(cid2, &mut w2c_cid_frame_s, read_stop_receiver)
.await;
})
});
// Assert than we get some value back! Its a Raw!
let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap();
let (cid_r, frame) = w2c_cid_frame_r.recv().await.unwrap();
assert_eq!(cid, cid_r);
if let Ok(Frame::Raw(data)) = frame {
assert_eq!(&data.as_slice(), b"x4hrtzsektfhxugzdtz5r78gzrtzfhxf");

View File

@ -7,15 +7,7 @@ use crate::{
protocols::{Protocols, TcpProtocol, UdpProtocol},
types::Pid,
};
use tokio::{io, net, sync::Mutex};
use tokio::runtime::Runtime;
use futures::{
channel::{mpsc, oneshot},
future::FutureExt,
select,
sink::SinkExt,
stream::StreamExt,
};
use futures_util::{FutureExt, StreamExt};
#[cfg(feature = "metrics")]
use prometheus::Registry;
use rand::Rng;
@ -26,6 +18,13 @@ use std::{
Arc,
},
};
use tokio::{
io, net,
runtime::Runtime,
select,
sync::{mpsc, oneshot, Mutex},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
use tracing_futures::Instrument;
@ -92,12 +91,13 @@ impl Scheduler {
mpsc::UnboundedReceiver<Participant>,
oneshot::Sender<()>,
) {
let (a2s_listen_s, a2s_listen_r) = mpsc::unbounded::<A2sListen>();
let (a2s_connect_s, a2s_connect_r) = mpsc::unbounded::<A2sConnect>();
let (s2a_connected_s, s2a_connected_r) = mpsc::unbounded::<Participant>();
let (a2s_listen_s, a2s_listen_r) = mpsc::unbounded_channel::<A2sListen>();
let (a2s_connect_s, a2s_connect_r) = mpsc::unbounded_channel::<A2sConnect>();
let (s2a_connected_s, s2a_connected_r) = mpsc::unbounded_channel::<Participant>();
let (a2s_scheduler_shutdown_s, a2s_scheduler_shutdown_r) = oneshot::channel::<()>();
let (a2s_disconnect_s, a2s_disconnect_r) = mpsc::unbounded::<A2sDisconnect>();
let (b2s_prio_statistic_s, b2s_prio_statistic_r) = mpsc::unbounded::<B2sPrioStatistic>();
let (a2s_disconnect_s, a2s_disconnect_r) = mpsc::unbounded_channel::<A2sDisconnect>();
let (b2s_prio_statistic_s, b2s_prio_statistic_r) =
mpsc::unbounded_channel::<B2sPrioStatistic>();
let run_channels = Some(ControlChannels {
a2s_listen_r,
@ -150,7 +150,7 @@ impl Scheduler {
pub async fn run(mut self) {
let run_channels = self.run_channels.take().unwrap();
futures::join!(
tokio::join!(
self.listen_mgr(run_channels.a2s_listen_r),
self.connect_mgr(run_channels.a2s_connect_r),
self.disconnect_mgr(run_channels.a2s_disconnect_r),
@ -161,6 +161,7 @@ impl Scheduler {
async fn listen_mgr(&self, a2s_listen_r: mpsc::UnboundedReceiver<A2sListen>) {
trace!("Start listen_mgr");
let a2s_listen_r = UnboundedReceiverStream::new(a2s_listen_r);
a2s_listen_r
.for_each_concurrent(None, |(address, s2a_listen_result_s)| {
let address = address;
@ -197,7 +198,7 @@ impl Scheduler {
)>,
) {
trace!("Start connect_mgr");
while let Some((addr, pid_sender)) = a2s_connect_r.next().await {
while let Some((addr, pid_sender)) = a2s_connect_r.recv().await {
let (protocol, handshake) = match addr {
ProtocolAddr::Tcp(addr) => {
#[cfg(feature = "metrics")]
@ -240,7 +241,7 @@ impl Scheduler {
continue;
};
info!("Connecting Udp to: {}", addr);
let (udp_data_sender, udp_data_receiver) = mpsc::unbounded::<Vec<u8>>();
let (udp_data_sender, udp_data_receiver) = mpsc::unbounded_channel::<Vec<u8>>();
let protocol = UdpProtocol::new(
Arc::clone(&socket),
addr,
@ -264,7 +265,7 @@ impl Scheduler {
async fn disconnect_mgr(&self, mut a2s_disconnect_r: mpsc::UnboundedReceiver<A2sDisconnect>) {
trace!("Start disconnect_mgr");
while let Some((pid, return_once_successful_shutdown)) = a2s_disconnect_r.next().await {
while let Some((pid, return_once_successful_shutdown)) = a2s_disconnect_r.recv().await {
//Closing Participants is done the following way:
// 1. We drop our senders and receivers
// 2. we need to close BParticipant, this will drop its senderns and receivers
@ -299,7 +300,7 @@ impl Scheduler {
mut b2s_prio_statistic_r: mpsc::UnboundedReceiver<B2sPrioStatistic>,
) {
trace!("Start prio_adj_mgr");
while let Some((_pid, _frame_cnt, _unused)) = b2s_prio_statistic_r.next().await {
while let Some((_pid, _frame_cnt, _unused)) = b2s_prio_statistic_r.recv().await {
//TODO adjust prios in participants here!
}
@ -381,7 +382,7 @@ impl Scheduler {
let mut end_receiver = s2s_stop_listening_r.fuse();
while let Some(data) = select! {
next = listener.accept().fuse() => Some(next),
_ = end_receiver => None,
_ = &mut end_receiver => None,
} {
let (stream, remote_addr) = match data {
Ok((s, p)) => (s, p),
@ -425,7 +426,7 @@ impl Scheduler {
let mut data = [0u8; UDP_MAXIMUM_SINGLE_PACKET_SIZE_EVER];
while let Ok((size, remote_addr)) = select! {
next = socket.recv_from(&mut data).fuse() => next,
_ = end_receiver => Err(std::io::Error::new(std::io::ErrorKind::Other, "")),
_ = &mut end_receiver => Err(std::io::Error::new(std::io::ErrorKind::Other, "")),
} {
let mut datavec = Vec::with_capacity(size);
datavec.extend_from_slice(&data[0..size]);
@ -434,7 +435,8 @@ impl Scheduler {
#[allow(clippy::map_entry)]
if !listeners.contains_key(&remote_addr) {
info!("Accepting Udp from: {}", &remote_addr);
let (udp_data_sender, udp_data_receiver) = mpsc::unbounded::<Vec<u8>>();
let (udp_data_sender, udp_data_receiver) =
mpsc::unbounded_channel::<Vec<u8>>();
listeners.insert(remote_addr, udp_data_sender);
let protocol = UdpProtocol::new(
Arc::clone(&socket),
@ -447,7 +449,7 @@ impl Scheduler {
.await;
}
let udp_data_sender = listeners.get_mut(&remote_addr).unwrap();
udp_data_sender.send(datavec).await.unwrap();
udp_data_sender.send(datavec).unwrap();
}
},
_ => unimplemented!(),
@ -457,7 +459,7 @@ impl Scheduler {
async fn udp_single_channel_connect(
socket: Arc<net::UdpSocket>,
mut w2p_udp_package_s: mpsc::UnboundedSender<Vec<u8>>,
w2p_udp_package_s: mpsc::UnboundedSender<Vec<u8>>,
) {
let addr = socket.local_addr();
trace!(?addr, "Start udp_single_channel_connect");
@ -470,11 +472,11 @@ impl Scheduler {
let mut data = [0u8; 9216];
while let Ok(size) = select! {
next = socket.recv(&mut data).fuse() => next,
_ = end_receiver => Err(std::io::Error::new(std::io::ErrorKind::Other, "")),
_ = &mut end_receiver => Err(std::io::Error::new(std::io::ErrorKind::Other, "")),
} {
let mut datavec = Vec::with_capacity(size);
datavec.extend_from_slice(&data[0..size]);
w2p_udp_package_s.send(datavec).await.unwrap();
w2p_udp_package_s.send(datavec).unwrap();
}
trace!(?addr, "Stop udp_single_channel_connect");
}
@ -491,7 +493,7 @@ impl Scheduler {
Contra: - DOS possibility because we answer first
- Speed, because otherwise the message can be send with the creation
*/
let mut participant_channels = self.participant_channels.lock().await.clone().unwrap();
let participant_channels = self.participant_channels.lock().await.clone().unwrap();
// spawn is needed here, e.g. for TCP connect it would mean that only 1
// participant can be in handshake phase ever! Someone could deadlock
// the whole server easily for new clients UDP doesnt work at all, as
@ -533,7 +535,7 @@ impl Scheduler {
bparticipant,
a2b_stream_open_s,
b2a_stream_opened_r,
mut s2b_create_channel_s,
s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
) = BParticipant::new(
pid,
@ -578,7 +580,6 @@ impl Scheduler {
leftover_cid_frame,
b2s_create_channel_done_s,
))
.await
.unwrap();
b2s_create_channel_done_r.await.unwrap();
if let Some(pid_oneshot) = s2a_return_pid_s {
@ -589,7 +590,6 @@ impl Scheduler {
participant_channels
.s2a_connected_s
.send(participant)
.await
.unwrap();
}
} else {

View File

@ -18,8 +18,8 @@
//! - You sometimes see sleep(1000ms) this is used when we rely on the
//! underlying TCP functionality, as this simulates client and server
use async_std::task;
use task::block_on;
use std::sync::Arc;
use tokio::runtime::Runtime;
use veloren_network::{Network, ParticipantError, Pid, Promises, StreamError};
mod helper;
use helper::{network_participant_stream, tcp};
@ -27,26 +27,26 @@ use helper::{network_participant_stream, tcp};
#[test]
fn close_network() {
let (_, _) = helper::setup(false, 0);
let (_, _p1_a, mut s1_a, _, _p1_b, mut s1_b) = block_on(network_participant_stream(tcp()));
let (r, _, _p1_a, mut s1_a, _, _p1_b, mut s1_b) = network_participant_stream(tcp());
std::thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed));
let msg1: Result<String, _> = block_on(s1_b.recv());
let msg1: Result<String, _> = r.block_on(s1_b.recv());
assert_eq!(msg1, Err(StreamError::StreamClosed));
}
#[test]
fn close_participant() {
let (_, _) = helper::setup(false, 0);
let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp()));
let (r, _n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = network_participant_stream(tcp());
block_on(p1_a.disconnect()).unwrap();
block_on(p1_b.disconnect()).unwrap();
r.block_on(p1_a.disconnect()).unwrap();
r.block_on(p1_b.disconnect()).unwrap();
assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed));
assert_eq!(
block_on(s1_b.recv::<String>()),
r.block_on(s1_b.recv::<String>()),
Err(StreamError::StreamClosed)
);
}
@ -54,14 +54,14 @@ fn close_participant() {
#[test]
fn close_stream() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _, mut s1_a, _n_b, _, _) = block_on(network_participant_stream(tcp()));
let (r, _n_a, _, mut s1_a, _n_b, _, _) = network_participant_stream(tcp());
// s1_b is dropped directly while s1_a isn't
std::thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed));
assert_eq!(
block_on(s1_a.recv::<String>()),
r.block_on(s1_a.recv::<String>()),
Err(StreamError::StreamClosed)
);
}
@ -72,8 +72,8 @@ fn close_stream() {
#[test]
fn close_streams_in_block_on() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, s1_a, _n_b, _p_b, s1_b) = block_on(network_participant_stream(tcp()));
block_on(async {
let (r, _n_a, _p_a, s1_a, _n_b, _p_b, s1_b) = network_participant_stream(tcp());
r.block_on(async {
//make it locally so that they are dropped later
let mut s1_a = s1_a;
let mut s1_b = s1_b;
@ -86,14 +86,14 @@ fn close_streams_in_block_on() {
#[test]
fn stream_simple_3msg_then_close() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(1u8).unwrap();
s1_a.send(42).unwrap();
s1_a.send("3rdMessage").unwrap();
assert_eq!(block_on(s1_b.recv()), Ok(1u8));
assert_eq!(block_on(s1_b.recv()), Ok(42));
assert_eq!(block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
assert_eq!(r.block_on(s1_b.recv()), Ok(1u8));
assert_eq!(r.block_on(s1_b.recv()), Ok(42));
assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
drop(s1_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(s1_b.send("Hello World"), Err(StreamError::StreamClosed));
@ -103,43 +103,43 @@ fn stream_simple_3msg_then_close() {
fn stream_send_first_then_receive() {
// recv should still be possible even if stream got closed if they are in queue
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(1u8).unwrap();
s1_a.send(42).unwrap();
s1_a.send("3rdMessage").unwrap();
drop(s1_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(block_on(s1_b.recv()), Ok(1u8));
assert_eq!(block_on(s1_b.recv()), Ok(42));
assert_eq!(block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
assert_eq!(r.block_on(s1_b.recv()), Ok(1u8));
assert_eq!(r.block_on(s1_b.recv()), Ok(42));
assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
assert_eq!(s1_b.send("Hello World"), Err(StreamError::StreamClosed));
}
#[test]
fn stream_send_1_then_close_stream() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send("this message must be received, even if stream is closed already!")
.unwrap();
drop(s1_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
let exp = Ok("this message must be received, even if stream is closed already!".to_string());
assert_eq!(block_on(s1_b.recv()), exp);
assert_eq!(r.block_on(s1_b.recv()), exp);
println!("all received and done");
}
#[test]
fn stream_send_100000_then_close_stream() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
drop(s1_a);
let exp = Ok("woop_PARTY_HARD_woop".to_string());
println!("start receiving");
block_on(async {
r.block_on(async {
for _ in 0..100000 {
assert_eq!(s1_b.recv().await, exp);
}
@ -150,7 +150,7 @@ fn stream_send_100000_then_close_stream() {
#[test]
fn stream_send_100000_then_close_stream_remote() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
@ -162,7 +162,7 @@ fn stream_send_100000_then_close_stream_remote() {
#[test]
fn stream_send_100000_then_close_stream_remote2() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
@ -175,7 +175,7 @@ fn stream_send_100000_then_close_stream_remote2() {
#[test]
fn stream_send_100000_then_close_stream_remote3() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
@ -188,7 +188,7 @@ fn stream_send_100000_then_close_stream_remote3() {
#[test]
fn close_part_then_network() {
let (_, _) = helper::setup(false, 0);
let (n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
let (_, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..1000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
@ -201,7 +201,7 @@ fn close_part_then_network() {
#[test]
fn close_network_then_part() {
let (_, _) = helper::setup(false, 0);
let (n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
let (_, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..1000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
@ -214,39 +214,39 @@ fn close_network_then_part() {
#[test]
fn close_network_then_disconnect_part() {
let (_, _) = helper::setup(false, 0);
let (n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
let (r, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..1000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
drop(n_a);
assert!(block_on(p_a.disconnect()).is_err());
assert!(r.block_on(p_a.disconnect()).is_err());
std::thread::sleep(std::time::Duration::from_millis(1000));
}
#[test]
fn opened_stream_before_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp()));
let mut s2_a = block_on(p_a.open(10, Promises::empty())).unwrap();
let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp());
let mut s2_a = r.block_on(p_a.open(10, Promises::empty())).unwrap();
s2_a.send("HelloWorld").unwrap();
let mut s2_b = block_on(p_b.opened()).unwrap();
let mut s2_b = r.block_on(p_b.opened()).unwrap();
drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(block_on(s2_b.recv()), Ok("HelloWorld".to_string()));
assert_eq!(r.block_on(s2_b.recv()), Ok("HelloWorld".to_string()));
}
#[test]
fn opened_stream_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp()));
let mut s2_a = block_on(p_a.open(10, Promises::empty())).unwrap();
let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp());
let mut s2_a = r.block_on(p_a.open(10, Promises::empty())).unwrap();
s2_a.send("HelloWorld").unwrap();
drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
let mut s2_b = block_on(p_b.opened()).unwrap();
assert_eq!(block_on(s2_b.recv()), Ok("HelloWorld".to_string()));
let mut s2_b = r.block_on(p_b.opened()).unwrap();
assert_eq!(r.block_on(s2_b.recv()), Ok("HelloWorld".to_string()));
assert_eq!(
block_on(p_b.opened()).unwrap_err(),
r.block_on(p_b.opened()).unwrap_err(),
ParticipantError::ParticipantDisconnected
);
}
@ -254,15 +254,15 @@ fn opened_stream_after_remote_part_is_closed() {
#[test]
fn open_stream_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp()));
let mut s2_a = block_on(p_a.open(10, Promises::empty())).unwrap();
let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp());
let mut s2_a = r.block_on(p_a.open(10, Promises::empty())).unwrap();
s2_a.send("HelloWorld").unwrap();
drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
let mut s2_b = block_on(p_b.opened()).unwrap();
assert_eq!(block_on(s2_b.recv()), Ok("HelloWorld".to_string()));
let mut s2_b = r.block_on(p_b.opened()).unwrap();
assert_eq!(r.block_on(s2_b.recv()), Ok("HelloWorld".to_string()));
assert_eq!(
block_on(p_b.open(20, Promises::empty())).unwrap_err(),
r.block_on(p_b.open(20, Promises::empty())).unwrap_err(),
ParticipantError::ParticipantDisconnected
);
}
@ -270,11 +270,11 @@ fn open_stream_after_remote_part_is_closed() {
#[test]
fn failed_stream_open_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp()));
let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp());
drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(
block_on(p_b.opened()).unwrap_err(),
r.block_on(p_b.opened()).unwrap_err(),
ParticipantError::ParticipantDisconnected
);
}
@ -282,72 +282,69 @@ fn failed_stream_open_after_remote_part_is_closed() {
#[test]
fn open_participant_before_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let (n_a, f) = Network::new(Pid::fake(0));
std::thread::spawn(f);
let (n_b, f) = Network::new(Pid::fake(1));
std::thread::spawn(f);
let r = Arc::new(Runtime::new().unwrap());
let n_a = Network::new(Pid::fake(0), Arc::clone(&r));
let n_b = Network::new(Pid::fake(1), Arc::clone(&r));
let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap();
let p_b = block_on(n_b.connect(addr)).unwrap();
let mut s1_b = block_on(p_b.open(10, Promises::empty())).unwrap();
r.block_on(n_a.listen(addr.clone())).unwrap();
let p_b = r.block_on(n_b.connect(addr)).unwrap();
let mut s1_b = r.block_on(p_b.open(10, Promises::empty())).unwrap();
s1_b.send("HelloWorld").unwrap();
let p_a = block_on(n_a.connected()).unwrap();
let p_a = r.block_on(n_a.connected()).unwrap();
drop(s1_b);
drop(p_b);
drop(n_b);
std::thread::sleep(std::time::Duration::from_millis(1000));
let mut s1_a = block_on(p_a.opened()).unwrap();
assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string()));
let mut s1_a = r.block_on(p_a.opened()).unwrap();
assert_eq!(r.block_on(s1_a.recv()), Ok("HelloWorld".to_string()));
}
#[test]
fn open_participant_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let (n_a, f) = Network::new(Pid::fake(0));
std::thread::spawn(f);
let (n_b, f) = Network::new(Pid::fake(1));
std::thread::spawn(f);
let r = Arc::new(Runtime::new().unwrap());
let n_a = Network::new(Pid::fake(0), Arc::clone(&r));
let n_b = Network::new(Pid::fake(1), Arc::clone(&r));
let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap();
let p_b = block_on(n_b.connect(addr)).unwrap();
let mut s1_b = block_on(p_b.open(10, Promises::empty())).unwrap();
r.block_on(n_a.listen(addr.clone())).unwrap();
let p_b = r.block_on(n_b.connect(addr)).unwrap();
let mut s1_b = r.block_on(p_b.open(10, Promises::empty())).unwrap();
s1_b.send("HelloWorld").unwrap();
drop(s1_b);
drop(p_b);
drop(n_b);
std::thread::sleep(std::time::Duration::from_millis(1000));
let p_a = block_on(n_a.connected()).unwrap();
let mut s1_a = block_on(p_a.opened()).unwrap();
assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string()));
let p_a = r.block_on(n_a.connected()).unwrap();
let mut s1_a = r.block_on(p_a.opened()).unwrap();
assert_eq!(r.block_on(s1_a.recv()), Ok("HelloWorld".to_string()));
}
#[test]
fn close_network_scheduler_completely() {
let (_, _) = helper::setup(false, 0);
let (n_a, f) = Network::new(Pid::fake(0));
let ha = std::thread::spawn(f);
let (n_b, f) = Network::new(Pid::fake(1));
let hb = std::thread::spawn(f);
let r = Arc::new(Runtime::new().unwrap());
let n_a = Network::new(Pid::fake(0), Arc::clone(&r));
let n_b = Network::new(Pid::fake(1), Arc::clone(&r));
let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap();
let p_b = block_on(n_b.connect(addr)).unwrap();
let mut s1_b = block_on(p_b.open(10, Promises::empty())).unwrap();
r.block_on(n_a.listen(addr.clone())).unwrap();
let p_b = r.block_on(n_b.connect(addr)).unwrap();
let mut s1_b = r.block_on(p_b.open(10, Promises::empty())).unwrap();
s1_b.send("HelloWorld").unwrap();
let p_a = block_on(n_a.connected()).unwrap();
let mut s1_a = block_on(p_a.opened()).unwrap();
assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string()));
let p_a = r.block_on(n_a.connected()).unwrap();
let mut s1_a = r.block_on(p_a.opened()).unwrap();
assert_eq!(r.block_on(s1_a.recv()), Ok("HelloWorld".to_string()));
drop(n_a);
drop(n_b);
std::thread::sleep(std::time::Duration::from_millis(1000));
ha.join().unwrap();
hb.join().unwrap();
let runtime = Arc::try_unwrap(r).expect("runtime is not alone, there still exist a reference");
runtime.shutdown_timeout(std::time::Duration::from_secs(300));
}
#[test]
fn dont_panic_on_multiply_recv_after_close() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(11u32).unwrap();
drop(s1_a);
@ -362,7 +359,7 @@ fn dont_panic_on_multiply_recv_after_close() {
#[test]
fn dont_panic_on_recv_send_after_close() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(11u32).unwrap();
drop(s1_a);
@ -375,7 +372,7 @@ fn dont_panic_on_recv_send_after_close() {
#[test]
fn dont_panic_on_multiple_send_after_close() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(11u32).unwrap();
drop(s1_a);

View File

@ -1,10 +1,14 @@
use lazy_static::*;
use std::{
net::SocketAddr,
sync::atomic::{AtomicU16, Ordering},
sync::{
atomic::{AtomicU16, Ordering},
Arc,
},
thread,
time::Duration,
};
use tokio::runtime::Runtime;
use tracing::*;
use tracing_subscriber::EnvFilter;
use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr, Stream};
@ -43,22 +47,32 @@ pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) {
}
#[allow(dead_code)]
pub async fn network_participant_stream(
pub fn network_participant_stream(
addr: ProtocolAddr,
) -> (Network, Participant, Stream, Network, Participant, Stream) {
let (n_a, f_a) = Network::new(Pid::fake(0));
std::thread::spawn(f_a);
let (n_b, f_b) = Network::new(Pid::fake(1));
std::thread::spawn(f_b);
) -> (
Arc<Runtime>,
Network,
Participant,
Stream,
Network,
Participant,
Stream,
) {
let runtime = Arc::new(Runtime::new().unwrap());
let (n_a, p1_a, s1_a, n_b, p1_b, s1_b) = runtime.block_on(async {
let n_a = Network::new(Pid::fake(0), Arc::clone(&runtime));
let n_b = Network::new(Pid::fake(1), Arc::clone(&runtime));
n_a.listen(addr.clone()).await.unwrap();
let p1_b = n_b.connect(addr).await.unwrap();
let p1_a = n_a.connected().await.unwrap();
n_a.listen(addr.clone()).await.unwrap();
let p1_b = n_b.connect(addr).await.unwrap();
let p1_a = n_a.connected().await.unwrap();
let s1_a = p1_a.open(10, Promises::empty()).await.unwrap();
let s1_b = p1_b.opened().await.unwrap();
let s1_a = p1_a.open(10, Promises::empty()).await.unwrap();
let s1_b = p1_b.opened().await.unwrap();
(n_a, p1_a, s1_a, n_b, p1_b, s1_b)
(n_a, p1_a, s1_a, n_b, p1_b, s1_b)
});
(runtime, n_a, p1_a, s1_a, n_b, p1_b, s1_b)
}
#[allow(dead_code)]

View File

@ -1,5 +1,5 @@
use async_std::task;
use task::block_on;
use std::sync::Arc;
use tokio::runtime::Runtime;
use veloren_network::{NetworkError, StreamError};
mod helper;
use helper::{network_participant_stream, tcp, udp};
@ -10,23 +10,23 @@ use veloren_network::{Network, Pid, Promises, ProtocolAddr};
#[ignore]
fn network_20s() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _, _, _n_b, _, _) = block_on(network_participant_stream(tcp()));
let (_, _n_a, _, _, _n_b, _, _) = network_participant_stream(tcp());
std::thread::sleep(std::time::Duration::from_secs(30));
}
#[test]
fn stream_simple() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send("Hello World").unwrap();
assert_eq!(block_on(s1_b.recv()), Ok("Hello World".to_string()));
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
}
#[test]
fn stream_try_recv() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(4242u32).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
@ -36,47 +36,46 @@ fn stream_try_recv() {
#[test]
fn stream_simple_3msg() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send("Hello World").unwrap();
s1_a.send(1337).unwrap();
assert_eq!(block_on(s1_b.recv()), Ok("Hello World".to_string()));
assert_eq!(block_on(s1_b.recv()), Ok(1337));
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
assert_eq!(r.block_on(s1_b.recv()), Ok(1337));
s1_a.send("3rdMessage").unwrap();
assert_eq!(block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
}
#[test]
fn stream_simple_udp() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(udp()));
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp());
s1_a.send("Hello World").unwrap();
assert_eq!(block_on(s1_b.recv()), Ok("Hello World".to_string()));
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
}
#[test]
fn stream_simple_udp_3msg() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(udp()));
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp());
s1_a.send("Hello World").unwrap();
s1_a.send(1337).unwrap();
assert_eq!(block_on(s1_b.recv()), Ok("Hello World".to_string()));
assert_eq!(block_on(s1_b.recv()), Ok(1337));
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
assert_eq!(r.block_on(s1_b.recv()), Ok(1337));
s1_a.send("3rdMessage").unwrap();
assert_eq!(block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
}
#[test]
#[ignore]
fn tcp_and_udp_2_connections() -> std::result::Result<(), Box<dyn std::error::Error>> {
let (_, _) = helper::setup(false, 0);
let (network, f) = Network::new(Pid::new());
let (remote, fr) = Network::new(Pid::new());
std::thread::spawn(f);
std::thread::spawn(fr);
block_on(async {
let r = Arc::new(Runtime::new().unwrap());
let network = Network::new(Pid::new(), Arc::clone(&r));
let remote = Network::new(Pid::new(), Arc::clone(&r));
r.block_on(async {
remote
.listen(ProtocolAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
.await?;
@ -97,18 +96,17 @@ fn tcp_and_udp_2_connections() -> std::result::Result<(), Box<dyn std::error::Er
#[test]
fn failed_listen_on_used_ports() -> std::result::Result<(), Box<dyn std::error::Error>> {
let (_, _) = helper::setup(false, 0);
let (network, f) = Network::new(Pid::new());
std::thread::spawn(f);
let r = Arc::new(Runtime::new().unwrap());
let network = Network::new(Pid::new(), Arc::clone(&r));
let udp1 = udp();
let tcp1 = tcp();
block_on(network.listen(udp1.clone()))?;
block_on(network.listen(tcp1.clone()))?;
r.block_on(network.listen(udp1.clone()))?;
r.block_on(network.listen(tcp1.clone()))?;
std::thread::sleep(std::time::Duration::from_millis(200));
let (network2, f2) = Network::new(Pid::new());
std::thread::spawn(f2);
let e1 = block_on(network2.listen(udp1));
let e2 = block_on(network2.listen(tcp1));
let network2 = Network::new(Pid::new(), Arc::clone(&r));
let e1 = r.block_on(network2.listen(udp1));
let e2 = r.block_on(network2.listen(tcp1));
match e1 {
Err(NetworkError::ListenFailed(e)) if e.kind() == ErrorKind::AddrInUse => (),
_ => panic!(),
@ -130,11 +128,10 @@ fn api_stream_send_main() -> std::result::Result<(), Box<dyn std::error::Error>>
let (_, _) = helper::setup(false, 0);
// Create a Network, listen on Port `1200` and wait for a Stream to be opened,
// then answer `Hello World`
let (network, f) = Network::new(Pid::new());
let (remote, fr) = Network::new(Pid::new());
std::thread::spawn(f);
std::thread::spawn(fr);
block_on(async {
let r = Arc::new(Runtime::new().unwrap());
let network = Network::new(Pid::new(), Arc::clone(&r));
let remote = Network::new(Pid::new(), Arc::clone(&r));
r.block_on(async {
network
.listen(ProtocolAddr::Tcp("127.0.0.1:1200".parse().unwrap()))
.await?;
@ -158,11 +155,10 @@ fn api_stream_recv_main() -> std::result::Result<(), Box<dyn std::error::Error>>
let (_, _) = helper::setup(false, 0);
// Create a Network, listen on Port `1220` and wait for a Stream to be opened,
// then listen on it
let (network, f) = Network::new(Pid::new());
let (remote, fr) = Network::new(Pid::new());
std::thread::spawn(f);
std::thread::spawn(fr);
block_on(async {
let r = Arc::new(Runtime::new().unwrap());
let network = Network::new(Pid::new(), Arc::clone(&r));
let remote = Network::new(Pid::new(), Arc::clone(&r));
r.block_on(async {
network
.listen(ProtocolAddr::Tcp("127.0.0.1:1220".parse().unwrap()))
.await?;
@ -184,10 +180,10 @@ fn api_stream_recv_main() -> std::result::Result<(), Box<dyn std::error::Error>>
#[test]
fn wrong_parse() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(1337).unwrap();
match block_on(s1_b.recv::<String>()) {
match r.block_on(s1_b.recv::<String>()) {
Err(StreamError::Deserialize(_)) => (),
_ => panic!("this should fail, but it doesnt!"),
}
@ -196,7 +192,7 @@ fn wrong_parse() {
#[test]
fn multiple_try_recv() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send("asd").unwrap();
s1_a.send(11u32).unwrap();

View File

@ -129,9 +129,19 @@ fn main() -> io::Result<()> {
let server_port = &server_settings.gameserver_address.port();
let metrics_port = &server_settings.metrics_address.port();
// Create server
let runtime = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap());
let mut server = Server::new(server_settings, editable_settings, &server_data_dir, runtime)
.expect("Failed to create server instance!");
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
);
let mut server = Server::new(
server_settings,
editable_settings,
&server_data_dir,
runtime,
)
.expect("Failed to create server instance!");
info!(
?server_port,

View File

@ -91,8 +91,8 @@ use std::{
};
#[cfg(not(feature = "worldgen"))]
use test_world::{IndexOwned, World};
use tracing::{debug, error, info, trace};
use tokio::runtime::Runtime;
use tracing::{debug, error, info, trace};
use uvth::{ThreadPool, ThreadPoolBuilder};
use vek::*;
@ -121,7 +121,7 @@ pub struct Server {
connection_handler: ConnectionHandler,
runtime: Arc<Runtime>,
_runtime: Arc<Runtime>,
thread_pool: ThreadPool,
metrics: ServerMetrics,
@ -367,7 +367,8 @@ impl Server {
let thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".to_string())
.build();
let network = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &metrics.registry());
let network =
Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &metrics.registry());
metrics
.run(settings.metrics_address)
.expect("Failed to initialize server metrics submodule.");
@ -388,7 +389,7 @@ impl Server {
connection_handler,
runtime,
_runtime: runtime,
thread_pool,
metrics,

View File

@ -72,7 +72,13 @@ impl ClientInit {
let mut last_err = None;
let cores = num_cpus::get();
let runtime = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(if cores > 4 {cores-1} else {cores}).build().unwrap());
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(if cores > 4 { cores - 1 } else { cores })
.build()
.unwrap(),
);
const FOUR_MINUTES_RETRIES: u64 = 48;
'tries: for _ in 0..FOUR_MINUTES_RETRIES {

View File

@ -83,7 +83,13 @@ impl Singleplayer {
let thread_pool = client.map(|c| c.thread_pool().clone());
let cores = num_cpus::get();
let runtime = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(if cores > 4 {cores-1} else {cores}).build().unwrap());
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(if cores > 4 { cores - 1 } else { cores })
.build()
.unwrap(),
);
let settings2 = settings.clone();
let paused = Arc::new(AtomicBool::new(false));