diff --git a/Cargo.lock b/Cargo.lock index 56d9e2a87a..69b549cd2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -248,6 +248,17 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-channel" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59740d83946db6a5af71ae25ddf9562c2b176b2ca42cf99a455f09f4a220d6b9" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "atom" version = "0.3.6" @@ -452,6 +463,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" +[[package]] +name = "cache-padded" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" + [[package]] name = "calloop" version = "0.6.5" @@ -712,6 +729,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "concurrent-queue" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" +dependencies = [ + "cache-padded", +] + [[package]] name = "conrod_core" version = "0.63.0" @@ -1566,6 +1592,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "event-listener" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -5119,6 +5151,17 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-stream" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1981ad97df782ab506a1f43bf82c967326960d278acf3bf8279809648c3ff3ea" +dependencies = [ + "futures-core", + "pin-project-lite 0.2.4", + "tokio 1.2.0", +] + [[package]] name = "tokio-util" version = "0.3.1" @@ -5462,17 +5505,6 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "uvth" -version = "4.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e5910f9106b96334c6cae1f1d77a764bda66ac4ca9f507f73259f184fe1bb6b" -dependencies = [ - "crossbeam-channel 0.3.9", - "log", - "num_cpus", -] - [[package]] name = "vcpkg" version = "0.2.11" @@ -5543,7 +5575,7 @@ dependencies = [ "tokio 1.2.0", "tracing", "tracing-subscriber", - "uvth 3.1.1", + "uvth", "vek 0.12.0", "veloren-common", "veloren-common-net", @@ -5688,7 +5720,7 @@ dependencies = [ "tiny_http", "tokio 1.2.0", "tracing", - "uvth 3.1.1", + "uvth", "vek 0.12.0", "veloren-common", "veloren-common-net", @@ -5774,7 +5806,7 @@ dependencies = [ "tracing-subscriber", "tracing-tracy", "treeculler", - "uvth 3.1.1", + "uvth", "vek 0.12.0", "veloren-client", "veloren-common", @@ -5836,11 +5868,13 @@ dependencies = [ name = "veloren_network" version = "0.3.0" dependencies = [ + "async-channel", "bincode", "bitflags", "clap", "crossbeam-channel 0.5.0", - "futures", + "futures-core", + "futures-util", "lazy_static", "lz-fear", "prometheus", @@ -5849,10 +5883,10 @@ dependencies = [ "shellexpand", "tiny_http", "tokio 1.2.0", + "tokio-stream", "tracing", "tracing-futures", "tracing-subscriber", - "uvth 4.0.1", ] [[package]] diff --git a/client/examples/chat-cli/main.rs b/client/examples/chat-cli/main.rs index 3ffaa7d5b9..115d9ae50c 100644 --- a/client/examples/chat-cli/main.rs +++ b/client/examples/chat-cli/main.rs @@ -3,7 +3,14 @@ #![deny(clippy::clone_on_ref_ptr)] use common::{clock::Clock, comp}; -use std::{io, net::ToSocketAddrs, sync::mpsc, thread, time::Duration}; +use std::{ + io, + net::ToSocketAddrs, + sync::{mpsc, Arc}, + thread, + time::Duration, +}; +use tokio::runtime::Runtime; use tracing::{error, info}; use veloren_client::{Client, Event}; @@ -37,6 +44,8 @@ fn main() { println!("Enter your password"); let password = read_input(); + let runtime = Arc::new(Runtime::new().unwrap()); + // Create a client. let mut client = Client::new( server_addr @@ -45,6 +54,7 @@ fn main() { .next() .unwrap(), None, + runtime, ) .expect("Failed to create client instance"); diff --git a/client/src/lib.rs b/client/src/lib.rs index 9b13ed80f1..b48fa7335b 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -63,8 +63,8 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use tracing::{debug, error, trace, warn}; use tokio::runtime::Runtime; +use tracing::{debug, error, trace, warn}; use uvth::{ThreadPool, ThreadPoolBuilder}; use vek::*; @@ -187,7 +187,11 @@ pub struct CharacterList { impl Client { /// Create a new `Client`. - pub fn new>(addr: A, view_distance: Option, runtime: Arc) -> Result { + pub fn new>( + addr: A, + view_distance: Option, + runtime: Arc, + ) -> Result { let mut thread_pool = ThreadPoolBuilder::new() .name("veloren-worker".into()) .build(); diff --git a/network/Cargo.toml b/network/Cargo.toml index d477be73e1..0a540ca6dc 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -20,12 +20,15 @@ serde = { version = "1.0" } #sending crossbeam-channel = "0.5" tokio = { version = "1.2", default-features = false, features = ["io-util", "macros", "rt", "net", "time"] } +tokio-stream = { version = "0.1.2", default-features = false } #tracing and metrics tracing = { version = "0.1", default-features = false } tracing-futures = "0.2" prometheus = { version = "0.11", default-features = false, optional = true } #async -futures = { version = "0.3", features = ["thread-pool"] } +futures-core = { version = "0.3", default-features = false } +futures-util = { version = "0.3", default-features = false, features = ["std"] } +async-channel = "1.5.1" #use for .close() channels #mpsc channel registry lazy_static = { version = "1.4", default-features = false } rand = { version = "0.8" } @@ -35,8 +38,8 @@ lz-fear = { version = "0.1.1", optional = true } [dev-dependencies] tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] } -# `uvth` needed for doc tests -uvth = { version = ">= 3.0, <= 4.0", default-features = false } +tokio = { version = "1.0.1", default-features = false, features = ["io-std", "fs", "rt-multi-thread"] } +futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] } clap = { version = "2.33", default-features = false } shellexpand = "2.0.0" tiny_http = "0.8.0" diff --git a/network/examples/chat.rs b/network/examples/chat.rs index 91fcdea733..a1a3f09cf0 100644 --- a/network/examples/chat.rs +++ b/network/examples/chat.rs @@ -3,10 +3,9 @@ //! RUST_BACKTRACE=1 cargo run --example chat -- --trace=info --port 15006 //! RUST_BACKTRACE=1 cargo run --example chat -- --trace=info --port 15006 --mode=client //! ``` -use async_std::{io, sync::RwLock}; use clap::{App, Arg}; -use futures::executor::{block_on, ThreadPool}; use std::{sync::Arc, thread, time::Duration}; +use tokio::{io, io::AsyncBufReadExt, runtime::Runtime, sync::RwLock}; use tracing::*; use tracing_subscriber::EnvFilter; use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr}; @@ -100,18 +99,17 @@ fn main() { } fn server(address: ProtocolAddr) { - let (server, f) = Network::new(Pid::new()); + let r = Arc::new(Runtime::new().unwrap()); + let server = Network::new(Pid::new(), Arc::clone(&r)); let server = Arc::new(server); - std::thread::spawn(f); - let pool = ThreadPool::new().unwrap(); let participants = Arc::new(RwLock::new(Vec::new())); - block_on(async { + r.block_on(async { server.listen(address).await.unwrap(); loop { let p1 = Arc::new(server.connected().await.unwrap()); let server1 = server.clone(); participants.write().await.push(p1.clone()); - pool.spawn_ok(client_connection(server1, p1, participants.clone())); + tokio::spawn(client_connection(server1, p1, participants.clone())); } }); } @@ -144,27 +142,27 @@ async fn client_connection( } fn client(address: ProtocolAddr) { - let (client, f) = Network::new(Pid::new()); - std::thread::spawn(f); - let pool = ThreadPool::new().unwrap(); + let r = Arc::new(Runtime::new().unwrap()); + let client = Network::new(Pid::new(), Arc::clone(&r)); - block_on(async { + r.block_on(async { let p1 = client.connect(address.clone()).await.unwrap(); //remote representation of p1 let mut s1 = p1 .open(16, Promises::ORDERED | Promises::CONSISTENCY) .await .unwrap(); //remote representation of s1 + let mut input_lines = io::BufReader::new(io::stdin()); println!("Enter your username:"); let mut username = String::new(); - io::stdin().read_line(&mut username).await.unwrap(); + input_lines.read_line(&mut username).await.unwrap(); username = username.split_whitespace().collect(); println!("Your username is: {}", username); println!("write /quit to close"); - pool.spawn_ok(read_messages(p1)); + tokio::spawn(read_messages(p1)); s1.send(username).unwrap(); loop { let mut line = String::new(); - io::stdin().read_line(&mut line).await.unwrap(); + input_lines.read_line(&mut line).await.unwrap(); line = line.split_whitespace().collect(); if line.as_str() == "/quit" { println!("goodbye"); diff --git a/network/examples/fileshare/commands.rs b/network/examples/fileshare/commands.rs index 3967631a56..a18c90b38e 100644 --- a/network/examples/fileshare/commands.rs +++ b/network/examples/fileshare/commands.rs @@ -1,9 +1,7 @@ -use async_std::{ - fs, - path::{Path, PathBuf}, -}; use rand::Rng; use serde::{Deserialize, Serialize}; +use std::path::{Path, PathBuf}; +use tokio::fs; use veloren_network::{Participant, ProtocolAddr, Stream}; use std::collections::HashMap; diff --git a/network/examples/fileshare/main.rs b/network/examples/fileshare/main.rs index d4b7b832e7..f000f371e0 100644 --- a/network/examples/fileshare/main.rs +++ b/network/examples/fileshare/main.rs @@ -4,14 +4,9 @@ //! --profile=release -Z unstable-options -- --trace=info --port 15006) //! (cd network/examples/fileshare && RUST_BACKTRACE=1 cargo run //! --profile=release -Z unstable-options -- --trace=info --port 15007) ``` -use async_std::{io, path::PathBuf}; use clap::{App, Arg, SubCommand}; -use futures::{ - channel::mpsc, - executor::{block_on, ThreadPool}, - sink::SinkExt, -}; -use std::{thread, time::Duration}; +use std::{path::PathBuf, sync::Arc, thread, time::Duration}; +use tokio::{io, io::AsyncBufReadExt, runtime::Runtime, sync::mpsc}; use tracing::*; use tracing_subscriber::EnvFilter; use veloren_network::ProtocolAddr; @@ -56,14 +51,14 @@ fn main() { let port: u16 = matches.value_of("port").unwrap().parse().unwrap(); let address = ProtocolAddr::Tcp(format!("{}:{}", "127.0.0.1", port).parse().unwrap()); + let runtime = Arc::new(Runtime::new().unwrap()); - let (server, cmd_sender) = Server::new(); - let pool = ThreadPool::new().unwrap(); - pool.spawn_ok(server.run(address)); + let (server, cmd_sender) = Server::new(Arc::clone(&runtime)); + runtime.spawn(server.run(address)); thread::sleep(Duration::from_millis(50)); //just for trace - block_on(client(cmd_sender)); + runtime.block_on(client(cmd_sender)); } fn file_exists(file: String) -> Result<(), String> { @@ -130,14 +125,15 @@ fn get_options<'a, 'b>() -> App<'a, 'b> { ) } -async fn client(mut cmd_sender: mpsc::UnboundedSender) { +async fn client(cmd_sender: mpsc::UnboundedSender) { use std::io::Write; loop { let mut line = String::new(); + let mut input_lines = io::BufReader::new(io::stdin()); print!("==> "); std::io::stdout().flush().unwrap(); - io::stdin().read_line(&mut line).await.unwrap(); + input_lines.read_line(&mut line).await.unwrap(); let matches = match get_options().get_matches_from_safe(line.split_whitespace()) { Err(e) => { println!("{}", e.message); @@ -148,12 +144,12 @@ async fn client(mut cmd_sender: mpsc::UnboundedSender) { match matches.subcommand() { ("quit", _) => { - cmd_sender.send(LocalCommand::Shutdown).await.unwrap(); + cmd_sender.send(LocalCommand::Shutdown).unwrap(); println!("goodbye"); break; }, ("disconnect", _) => { - cmd_sender.send(LocalCommand::Disconnect).await.unwrap(); + cmd_sender.send(LocalCommand::Disconnect).unwrap(); }, ("connect", Some(connect_matches)) => { let socketaddr = connect_matches @@ -163,7 +159,6 @@ async fn client(mut cmd_sender: mpsc::UnboundedSender) { .unwrap(); cmd_sender .send(LocalCommand::Connect(ProtocolAddr::Tcp(socketaddr))) - .await .unwrap(); }, ("t", _) => { @@ -171,28 +166,23 @@ async fn client(mut cmd_sender: mpsc::UnboundedSender) { .send(LocalCommand::Connect(ProtocolAddr::Tcp( "127.0.0.1:1231".parse().unwrap(), ))) - .await .unwrap(); }, ("serve", Some(serve_matches)) => { let path = shellexpand::tilde(serve_matches.value_of("file").unwrap()); let path: PathBuf = path.parse().unwrap(); if let Some(fileinfo) = FileInfo::new(&path).await { - cmd_sender - .send(LocalCommand::Serve(fileinfo)) - .await - .unwrap(); + cmd_sender.send(LocalCommand::Serve(fileinfo)).unwrap(); } }, ("list", _) => { - cmd_sender.send(LocalCommand::List).await.unwrap(); + cmd_sender.send(LocalCommand::List).unwrap(); }, ("get", Some(get_matches)) => { let id: u32 = get_matches.value_of("id").unwrap().parse().unwrap(); let file = get_matches.value_of("file"); cmd_sender .send(LocalCommand::Get(id, file.map(|s| s.to_string()))) - .await .unwrap(); }, diff --git a/network/examples/fileshare/server.rs b/network/examples/fileshare/server.rs index 080b85fff5..5db8345d46 100644 --- a/network/examples/fileshare/server.rs +++ b/network/examples/fileshare/server.rs @@ -1,11 +1,12 @@ use crate::commands::{Command, FileInfo, LocalCommand, RemoteInfo}; -use async_std::{ - fs, - path::PathBuf, - sync::{Mutex, RwLock}, +use futures_util::{FutureExt, StreamExt}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; +use tokio::{ + fs, join, + runtime::Runtime, + sync::{mpsc, Mutex, RwLock}, }; -use futures::{channel::mpsc, future::FutureExt, stream::StreamExt}; -use std::{collections::HashMap, sync::Arc}; +use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr, Stream}; @@ -23,11 +24,10 @@ pub struct Server { } impl Server { - pub fn new() -> (Self, mpsc::UnboundedSender) { - let (command_sender, command_receiver) = mpsc::unbounded(); + pub fn new(runtime: Arc) -> (Self, mpsc::UnboundedSender) { + let (command_sender, command_receiver) = mpsc::unbounded_channel(); - let (network, f) = Network::new(Pid::new()); - std::thread::spawn(f); + let network = Network::new(Pid::new(), runtime); let run_channels = Some(ControlChannels { command_receiver }); ( @@ -47,7 +47,7 @@ impl Server { self.network.listen(address).await.unwrap(); - futures::join!( + join!( self.command_manager(run_channels.command_receiver,), self.connect_manager(), ); @@ -55,6 +55,7 @@ impl Server { async fn command_manager(&self, command_receiver: mpsc::UnboundedReceiver) { trace!("Start command_manager"); + let command_receiver = UnboundedReceiverStream::new(command_receiver); command_receiver .for_each_concurrent(None, async move |cmd| { match cmd { @@ -106,7 +107,7 @@ impl Server { async fn connect_manager(&self) { trace!("Start connect_manager"); - let iter = futures::stream::unfold((), |_| { + let iter = futures_util::stream::unfold((), |_| { self.network.connected().map(|r| r.ok().map(|v| (v, ()))) }); @@ -129,7 +130,7 @@ impl Server { let id = p.remote_pid(); let ri = Arc::new(Mutex::new(RemoteInfo::new(cmd_out, file_out, p))); self.remotes.write().await.insert(id, ri.clone()); - futures::join!( + join!( self.handle_remote_cmd(cmd_in, ri.clone()), self.handle_files(file_in, ri.clone()), ); diff --git a/network/examples/network-speed/main.rs b/network/examples/network-speed/main.rs index 5f0617ec68..9814cec998 100644 --- a/network/examples/network-speed/main.rs +++ b/network/examples/network-speed/main.rs @@ -6,12 +6,13 @@ mod metrics; use clap::{App, Arg}; -use futures::executor::block_on; use serde::{Deserialize, Serialize}; use std::{ + sync::Arc, thread, time::{Duration, Instant}, }; +use tokio::runtime::Runtime; use tracing::*; use tracing_subscriber::EnvFilter; use veloren_network::{Message, Network, Pid, Promises, ProtocolAddr}; @@ -101,14 +102,16 @@ fn main() { }; let mut background = None; + let runtime = Arc::new(Runtime::new().unwrap()); match matches.value_of("mode") { - Some("server") => server(address), - Some("client") => client(address), + Some("server") => server(address, Arc::clone(&runtime)), + Some("client") => client(address, Arc::clone(&runtime)), Some("both") => { let address1 = address.clone(); - background = Some(thread::spawn(|| server(address1))); + let runtime2 = Arc::clone(&runtime); + background = Some(thread::spawn(|| server(address1, runtime2))); thread::sleep(Duration::from_millis(200)); //start client after server - client(address); + client(address, Arc::clone(&runtime)); }, _ => panic!("Invalid mode, run --help!"), }; @@ -117,18 +120,17 @@ fn main() { } } -fn server(address: ProtocolAddr) { +fn server(address: ProtocolAddr, runtime: Arc) { let mut metrics = metrics::SimpleMetrics::new(); - let (server, f) = Network::new_with_registry(Pid::new(), metrics.registry()); - std::thread::spawn(f); + let server = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), metrics.registry()); metrics.run("0.0.0.0:59112".parse().unwrap()).unwrap(); - block_on(server.listen(address)).unwrap(); + runtime.block_on(server.listen(address)).unwrap(); loop { info!("Waiting for participant to connect"); - let p1 = block_on(server.connected()).unwrap(); //remote representation of p1 - let mut s1 = block_on(p1.opened()).unwrap(); //remote representation of s1 - block_on(async { + let p1 = runtime.block_on(server.connected()).unwrap(); //remote representation of p1 + let mut s1 = runtime.block_on(p1.opened()).unwrap(); //remote representation of s1 + runtime.block_on(async { let mut last = Instant::now(); let mut id = 0u64; while let Ok(_msg) = s1.recv_raw().await { @@ -145,14 +147,15 @@ fn server(address: ProtocolAddr) { } } -fn client(address: ProtocolAddr) { +fn client(address: ProtocolAddr, runtime: Arc) { let mut metrics = metrics::SimpleMetrics::new(); - let (client, f) = Network::new_with_registry(Pid::new(), metrics.registry()); - std::thread::spawn(f); + let client = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), metrics.registry()); metrics.run("0.0.0.0:59111".parse().unwrap()).unwrap(); - let p1 = block_on(client.connect(address)).unwrap(); //remote representation of p1 - let mut s1 = block_on(p1.open(16, Promises::ORDERED | Promises::CONSISTENCY)).unwrap(); //remote representation of s1 + let p1 = runtime.block_on(client.connect(address)).unwrap(); //remote representation of p1 + let mut s1 = runtime + .block_on(p1.open(16, Promises::ORDERED | Promises::CONSISTENCY)) + .unwrap(); //remote representation of s1 let mut last = Instant::now(); let mut id = 0u64; let raw_msg = Message::serialize( @@ -180,7 +183,7 @@ fn client(address: ProtocolAddr) { drop(s1); std::thread::sleep(std::time::Duration::from_millis(5000)); info!("Closing participant"); - block_on(p1.disconnect()).unwrap(); + runtime.block_on(p1.disconnect()).unwrap(); std::thread::sleep(std::time::Duration::from_millis(25000)); info!("DROPPING! client"); drop(client); diff --git a/network/src/api.rs b/network/src/api.rs index 8baaa72581..1b349f3248 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -8,13 +8,6 @@ use crate::{ scheduler::Scheduler, types::{Mid, Pid, Prio, Promises, Sid}, }; -use tokio::{io, sync::Mutex}; -use tokio::runtime::Runtime; -use futures::{ - channel::{mpsc, oneshot}, - sink::SinkExt, - stream::StreamExt, -}; #[cfg(feature = "compression")] use lz_fear::raw::DecodeError; #[cfg(feature = "metrics")] @@ -28,6 +21,11 @@ use std::{ Arc, }, }; +use tokio::{ + io, + runtime::Runtime, + sync::{mpsc, oneshot, Mutex}, +}; use tracing::*; use tracing_futures::Instrument; @@ -78,9 +76,8 @@ pub struct Stream { prio: Prio, promises: Promises, send_closed: Arc, - runtime: Arc, a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, - b2a_msg_recv_r: Option>, + b2a_msg_recv_r: Option>, a2b_close_stream_s: Option>, } @@ -169,7 +166,8 @@ impl Network { /// # Arguments /// * `participant_id` - provide it by calling [`Pid::new()`], usually you /// don't want to reuse a Pid for 2 `Networks` - /// * `runtime` - provide a tokio::Runtime, it's used to internally spawn tasks + /// * `runtime` - provide a tokio::Runtime, it's used to internally spawn + /// tasks. It is necessary to clean up in the non-async `Drop`. /// /// # Result /// * `Self` - returns a `Network` which can be `Send` to multiple areas of @@ -178,22 +176,15 @@ impl Network { /// /// # Examples /// ```rust - /// //Example with uvth - /// use uvth::ThreadPoolBuilder; + /// //Example with tokio + /// use std::sync::Arc; + /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, ProtocolAddr}; /// - /// let pool = ThreadPoolBuilder::new().build(); - /// let (network, f) = Network::new(Pid::new()); - /// pool.execute(f); + /// let runtime = Runtime::new(); + /// let network = Network::new(Pid::new(), Arc::new(runtime)); /// ``` /// - /// ```rust - /// //Example with std::thread - /// use veloren_network::{Network, Pid, ProtocolAddr}; - /// - /// let (network, f) = Network::new(Pid::new()); - /// std::thread::spawn(f); - /// ``` /// /// Usually you only create a single `Network` for an application, /// except when client and server are in the same application, then you @@ -252,20 +243,18 @@ impl Network { #[cfg(feature = "metrics")] registry, ); - runtime.spawn( - async move { - trace!(?p, "Starting scheduler in own thread"); - let _handle = tokio::spawn( - scheduler - .run() - .instrument(tracing::info_span!("scheduler", ?p)), - ); - trace!(?p, "Stopping scheduler and his own thread"); - } - ); + runtime.spawn(async move { + trace!(?p, "Starting scheduler in own thread"); + let _handle = tokio::spawn( + scheduler + .run() + .instrument(tracing::info_span!("scheduler", ?p)), + ); + trace!(?p, "Stopping scheduler and his own thread"); + }); Self { local_pid: participant_id, - runtime: runtime, + runtime, participant_disconnect_sender: Mutex::new(HashMap::new()), listen_sender: Mutex::new(listen_sender), connect_sender: Mutex::new(connect_sender), @@ -309,8 +298,7 @@ impl Network { self.listen_sender .lock() .await - .send((address, s2a_result_s)) - .await?; + .send((address, s2a_result_s))?; match s2a_result_r.await? { //waiting guarantees that we either listened successfully or get an error like port in // use @@ -365,8 +353,7 @@ impl Network { self.connect_sender .lock() .await - .send((address, pid_sender)) - .await?; + .send((address, pid_sender))?; let participant = match pid_receiver.await? { Ok(p) => p, Err(e) => return Err(NetworkError::ConnectFailed(e)), @@ -417,7 +404,7 @@ impl Network { /// [`Streams`]: crate::api::Stream /// [`listen`]: crate::api::Network::listen pub async fn connected(&self) -> Result { - let participant = self.connected_receiver.lock().await.next().await?; + let participant = self.connected_receiver.lock().await.recv().await?; self.participant_disconnect_sender.lock().await.insert( participant.remote_pid, Arc::clone(&participant.a2s_disconnect_s), @@ -489,12 +476,11 @@ impl Participant { /// [`Streams`]: crate::api::Stream pub async fn open(&self, prio: u8, promises: Promises) -> Result { let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel(); - if let Err(e) = self - .a2b_stream_open_s - .lock() - .await - .send((prio, promises, p2a_return_stream_s)) - .await + if let Err(e) = + self.a2b_stream_open_s + .lock() + .await + .send((prio, promises, p2a_return_stream_s)) { debug!(?e, "bParticipant is already closed, notifying"); return Err(ParticipantError::ParticipantDisconnected); @@ -546,7 +532,7 @@ impl Participant { /// [`connected`]: Network::connected /// [`open`]: Participant::open pub async fn opened(&self) -> Result { - match self.b2a_stream_opened_r.lock().await.next().await { + match self.b2a_stream_opened_r.lock().await.recv().await { Some(stream) => { let sid = stream.sid; debug!(?sid, ?self.remote_pid, "Receive opened stream"); @@ -609,13 +595,12 @@ impl Participant { //Streams will be closed by BParticipant match self.a2s_disconnect_s.lock().await.take() { - Some(mut a2s_disconnect_s) => { + Some(a2s_disconnect_s) => { let (finished_sender, finished_receiver) = oneshot::channel(); // Participant is connecting to Scheduler here, not as usual // Participant<->BParticipant a2s_disconnect_s .send((pid, finished_sender)) - .await .expect("Something is wrong in internal scheduler coding"); match finished_receiver.await { Ok(res) => { @@ -661,9 +646,8 @@ impl Stream { prio: Prio, promises: Promises, send_closed: Arc, - runtime: Arc, a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, - b2a_msg_recv_r: mpsc::UnboundedReceiver, + b2a_msg_recv_r: async_channel::Receiver, a2b_close_stream_s: mpsc::UnboundedSender, ) -> Self { Self { @@ -673,7 +657,6 @@ impl Stream { prio, promises, send_closed, - runtime, a2b_msg_s, b2a_msg_recv_r: Some(b2a_msg_recv_r), a2b_close_stream_s: Some(a2b_close_stream_s), @@ -877,13 +860,13 @@ impl Stream { pub async fn recv_raw(&mut self) -> Result { match &mut self.b2a_msg_recv_r { Some(b2a_msg_recv_r) => { - match b2a_msg_recv_r.next().await { - Some(msg) => Ok(Message { + match b2a_msg_recv_r.recv().await { + Ok(msg) => Ok(Message { buffer: Arc::new(msg.buffer), #[cfg(feature = "compression")] compressed: self.promises.contains(Promises::COMPRESSED), }), - None => { + Err(_) => { self.b2a_msg_recv_r = None; //prevent panic Err(StreamError::StreamClosed) }, @@ -929,13 +912,8 @@ impl Stream { #[inline] pub fn try_recv(&mut self) -> Result, StreamError> { match &mut self.b2a_msg_recv_r { - Some(b2a_msg_recv_r) => match b2a_msg_recv_r.try_next() { - Err(_) => Ok(None), - Ok(None) => { - self.b2a_msg_recv_r = None; //prevent panic - Err(StreamError::StreamClosed) - }, - Ok(Some(msg)) => Ok(Some( + Some(b2a_msg_recv_r) => match b2a_msg_recv_r.try_recv() { + Ok(msg) => Ok(Some( Message { buffer: Arc::new(msg.buffer), #[cfg(feature = "compression")] @@ -943,6 +921,11 @@ impl Stream { } .deserialize()?, )), + Err(async_channel::TryRecvError::Empty) => Ok(None), + Err(async_channel::TryRecvError::Closed) => { + self.b2a_msg_recv_r = None; //prevent panic + Err(StreamError::StreamClosed) + }, }, None => Err(StreamError::StreamClosed), } @@ -975,16 +958,13 @@ impl Drop for Network { self.participant_disconnect_sender.lock().await.drain() { match a2s_disconnect_s.lock().await.take() { - Some(mut a2s_disconnect_s) => { + Some(a2s_disconnect_s) => { trace!(?remote_pid, "Participants will be closed"); let (finished_sender, finished_receiver) = oneshot::channel(); finished_receiver_list.push((remote_pid, finished_receiver)); - a2s_disconnect_s - .send((remote_pid, finished_sender)) - .await - .expect( - "Scheduler is closed, but nobody other should be able to close it", - ); + a2s_disconnect_s.send((remote_pid, finished_sender)).expect( + "Scheduler is closed, but nobody other should be able to close it", + ); }, None => trace!(?remote_pid, "Participant already disconnected gracefully"), } @@ -1026,13 +1006,12 @@ impl Drop for Participant { ?pid, "Participant has been shutdown cleanly, no further waiting is required!" ), - Some(mut a2s_disconnect_s) => { + Some(a2s_disconnect_s) => { debug!(?pid, "Disconnect from Scheduler"); self.runtime.block_on(async { let (finished_sender, finished_receiver) = oneshot::channel(); a2s_disconnect_s .send((self.remote_pid, finished_sender)) - .await .expect("Something is wrong in internal scheduler coding"); if let Err(e) = finished_receiver .await @@ -1059,7 +1038,10 @@ impl Drop for Stream { let sid = self.sid; let pid = self.pid; debug!(?pid, ?sid, "Shutting down Stream"); - self.runtime.block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid)) + self.a2b_close_stream_s + .take() + .unwrap() + .send(self.sid) .expect("bparticipant part of a gracefully shutdown must have crashed"); } else { let sid = self.sid; @@ -1096,12 +1078,16 @@ impl From for NetworkError { fn from(_err: std::option::NoneError) -> Self { NetworkError::NetworkClosed } } -impl From for NetworkError { - fn from(_err: mpsc::SendError) -> Self { NetworkError::NetworkClosed } +impl From> for NetworkError { + fn from(_err: mpsc::error::SendError) -> Self { NetworkError::NetworkClosed } } -impl From for NetworkError { - fn from(_err: oneshot::Canceled) -> Self { NetworkError::NetworkClosed } +impl From for NetworkError { + fn from(_err: oneshot::error::RecvError) -> Self { NetworkError::NetworkClosed } +} + +impl From for NetworkError { + fn from(_err: std::io::Error) -> Self { NetworkError::NetworkClosed } } impl From> for StreamError { diff --git a/network/src/channel.rs b/network/src/channel.rs index c591fb0b88..7928337bd1 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -8,14 +8,16 @@ use crate::{ VELOREN_NETWORK_VERSION, }, }; -use futures::{ - channel::{mpsc, oneshot}, - join, - sink::SinkExt, - stream::StreamExt, +use futures_core::task::Poll; +use futures_util::{ + task::{noop_waker, Context}, FutureExt, }; #[cfg(feature = "metrics")] use std::sync::Arc; +use tokio::{ + join, + sync::{mpsc, oneshot}, +}; use tracing::*; pub(crate) struct Channel { @@ -26,7 +28,7 @@ pub(crate) struct Channel { impl Channel { pub fn new(cid: u64) -> (Self, mpsc::UnboundedSender, oneshot::Sender<()>) { - let (c2w_frame_s, c2w_frame_r) = mpsc::unbounded::(); + let (c2w_frame_s, c2w_frame_r) = mpsc::unbounded_channel::(); let (read_stop_sender, read_stop_receiver) = oneshot::channel(); ( Self { @@ -52,7 +54,7 @@ impl Channel { let cnt = leftover_cid_frame.len(); trace!(?cnt, "Reapplying leftovers"); for cid_frame in leftover_cid_frame.drain(..) { - w2c_cid_frame_s.send(cid_frame).await.unwrap(); + w2c_cid_frame_s.send(cid_frame).unwrap(); } trace!(?cnt, "All leftovers reapplied"); @@ -115,8 +117,8 @@ impl Handshake { } pub async fn setup(self, protocol: &Protocols) -> Result<(Pid, Sid, u128, Vec), ()> { - let (c2w_frame_s, c2w_frame_r) = mpsc::unbounded::(); - let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::(); + let (c2w_frame_s, c2w_frame_r) = mpsc::unbounded_channel::(); + let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded_channel::(); let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let handler_future = @@ -142,8 +144,10 @@ impl Handshake { match res { Ok(res) => { + let fake_waker = noop_waker(); + let mut ctx = Context::from_waker(&fake_waker); let mut leftover_frames = vec![]; - while let Ok(Some(cid_frame)) = w2c_cid_frame_r.try_next() { + while let Poll::Ready(Some(cid_frame)) = w2c_cid_frame_r.poll_recv(&mut ctx) { leftover_frames.push(cid_frame); } let cnt = leftover_frames.len(); @@ -175,7 +179,7 @@ impl Handshake { self.send_handshake(&mut c2w_frame_s).await; } - let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame); + let frame = w2c_cid_frame_r.recv().await.map(|(_cid, frame)| frame); #[cfg(feature = "metrics")] { if let Some(Ok(ref frame)) = frame { @@ -254,7 +258,7 @@ impl Handshake { return Err(()); } - let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame); + let frame = w2c_cid_frame_r.recv().await.map(|(_cid, frame)| frame); let r = match frame { Some(Ok(Frame::Init { pid, secret })) => { debug!(?pid, "Participant send their ID"); @@ -315,7 +319,6 @@ impl Handshake { magic_number: VELOREN_MAGIC_NUMBER, version: VELOREN_NETWORK_VERSION, }) - .await .unwrap(); } @@ -330,7 +333,6 @@ impl Handshake { pid: self.local_pid, secret: self.secret, }) - .await .unwrap(); } @@ -353,7 +355,7 @@ impl Handshake { .with_label_values(&[&cid_string, "Shutdown"]) .inc(); } - c2w_frame_s.send(Frame::Raw(data)).await.unwrap(); - c2w_frame_s.send(Frame::Shutdown).await.unwrap(); + c2w_frame_s.send(Frame::Raw(data)).unwrap(); + c2w_frame_s.send(Frame::Shutdown).unwrap(); } } diff --git a/network/src/lib.rs b/network/src/lib.rs index 69bd5f07c0..ffba192643 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -39,8 +39,8 @@ //! //! # Examples //! ```rust -//! use tokio::task::sleep; //! use futures::{executor::block_on, join}; +//! use tokio::task::sleep; //! use veloren_network::{Network, Pid, Promises, ProtocolAddr}; //! //! // Client diff --git a/network/src/message.rs b/network/src/message.rs index ad668908b6..9ab9941599 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -256,8 +256,8 @@ impl std::fmt::Debug for MessageBuffer { #[cfg(test)] mod tests { use crate::{api::Stream, message::*}; - use futures::channel::mpsc; use std::sync::{atomic::AtomicBool, Arc}; + use tokio::sync::mpsc; fn stub_stream(compressed: bool) -> Stream { use crate::{api::*, types::*}; @@ -273,8 +273,8 @@ mod tests { let promises = Promises::empty(); let (a2b_msg_s, _a2b_msg_r) = crossbeam_channel::unbounded(); - let (_b2a_msg_recv_s, b2a_msg_recv_r) = mpsc::unbounded(); - let (a2b_close_stream_s, _a2b_close_stream_r) = mpsc::unbounded(); + let (_b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded(); + let (a2b_close_stream_s, _a2b_close_stream_r) = mpsc::unbounded_channel(); Stream::new( Pid::fake(0), diff --git a/network/src/participant.rs b/network/src/participant.rs index 78d1dacd41..764f407cdf 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -8,15 +8,7 @@ use crate::{ protocols::Protocols, types::{Cid, Frame, Pid, Prio, Promises, Sid}, }; -use tokio::sync::{Mutex, RwLock}; -use tokio::runtime::Runtime; -use futures::{ - channel::{mpsc, oneshot}, - future::FutureExt, - select, - sink::SinkExt, - stream::StreamExt, -}; +use futures_util::{FutureExt, StreamExt}; use std::{ collections::{HashMap, VecDeque}, sync::{ @@ -25,6 +17,12 @@ use std::{ }, time::{Duration, Instant}, }; +use tokio::{ + runtime::Runtime, + select, + sync::{mpsc, oneshot, Mutex, RwLock}, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; use tracing_futures::Instrument; @@ -47,13 +45,14 @@ struct StreamInfo { prio: Prio, promises: Promises, send_closed: Arc, - b2a_msg_recv_s: Mutex>, + b2a_msg_recv_s: Mutex>, } #[derive(Debug)] struct ControlChannels { a2b_stream_open_r: mpsc::UnboundedReceiver, b2a_stream_opened_s: mpsc::UnboundedSender, + b2b_close_stream_opened_sender_r: oneshot::Receiver<()>, s2b_create_channel_r: mpsc::UnboundedReceiver, a2b_close_stream_r: mpsc::UnboundedReceiver, a2b_close_stream_s: mpsc::UnboundedSender, @@ -63,7 +62,7 @@ struct ControlChannels { #[derive(Debug)] struct ShutdownInfo { //a2b_stream_open_r: mpsc::UnboundedReceiver, - b2a_stream_opened_s: mpsc::UnboundedSender, + b2b_close_stream_opened_sender_s: Option>, error: Option, } @@ -84,6 +83,12 @@ pub struct BParticipant { } impl BParticipant { + const BANDWIDTH: u64 = 25_000_000; + const FRAMES_PER_TICK: u64 = Self::BANDWIDTH * Self::TICK_TIME_MS / 1000 / 1400 /*TCP FRAME*/; + const TICK_TIME: Duration = Duration::from_millis(Self::TICK_TIME_MS); + //in bit/s + const TICK_TIME_MS: u64 = 10; + #[allow(clippy::type_complexity)] pub(crate) fn new( remote_pid: Pid, @@ -97,21 +102,24 @@ impl BParticipant { mpsc::UnboundedSender, oneshot::Sender, ) { - let (a2b_steam_open_s, a2b_stream_open_r) = mpsc::unbounded::(); - let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded::(); - let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded(); + let (a2b_steam_open_s, a2b_stream_open_r) = mpsc::unbounded_channel::(); + let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded_channel::(); + let (b2b_close_stream_opened_sender_s, b2b_close_stream_opened_sender_r) = + oneshot::channel(); + let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded_channel(); let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel(); - let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded(); + let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded_channel(); let shutdown_info = RwLock::new(ShutdownInfo { //a2b_stream_open_r: a2b_stream_open_r.clone(), - b2a_stream_opened_s: b2a_stream_opened_s.clone(), + b2b_close_stream_opened_sender_s: Some(b2b_close_stream_opened_sender_s), error: None, }); let run_channels = Some(ControlChannels { a2b_stream_open_r, b2a_stream_opened_s, + b2b_close_stream_opened_sender_r, s2b_create_channel_r, a2b_close_stream_r, a2b_close_stream_s, @@ -147,7 +155,7 @@ impl BParticipant { let (shutdown_stream_close_mgr_sender, shutdown_stream_close_mgr_receiver) = oneshot::channel(); let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel(); - let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::(); + let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded_channel::(); let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = PrioManager::new( #[cfg(feature = "metrics")] Arc::clone(&self.metrics), @@ -155,7 +163,7 @@ impl BParticipant { ); let run_channels = self.run_channels.take().unwrap(); - futures::join!( + tokio::join!( self.open_mgr( run_channels.a2b_stream_open_r, run_channels.a2b_close_stream_s.clone(), @@ -165,6 +173,7 @@ impl BParticipant { self.handle_frames_mgr( w2b_frames_r, run_channels.b2a_stream_opened_s, + run_channels.b2b_close_stream_opened_sender_r, run_channels.a2b_close_stream_s, a2p_msg_s.clone(), ), @@ -188,13 +197,11 @@ impl BParticipant { &self, mut prios: PrioManager, mut shutdown_send_mgr_receiver: oneshot::Receiver>, - mut b2s_prio_statistic_s: mpsc::UnboundedSender, + b2s_prio_statistic_s: mpsc::UnboundedSender, ) { //This time equals the MINIMUM Latency in average, so keep it down and //Todo: // make it configurable or switch to await E.g. Prio 0 = await, prio 50 // wait for more messages - const TICK_TIME: Duration = Duration::from_millis(10); - const FRAMES_PER_TICK: usize = 10005; self.running_mgr.fetch_add(1, Ordering::Relaxed); let mut b2b_prios_flushed_s = None; //closing up trace!("Start send_mgr"); @@ -203,7 +210,9 @@ impl BParticipant { let mut i: u64 = 0; loop { let mut frames = VecDeque::new(); - prios.fill_frames(FRAMES_PER_TICK, &mut frames).await; + prios + .fill_frames(Self::FRAMES_PER_TICK as usize, &mut frames) + .await; let len = frames.len(); for (_, frame) in frames { self.send_frame( @@ -215,9 +224,8 @@ impl BParticipant { } b2s_prio_statistic_s .send((self.remote_pid, len as u64, /* */ 0)) - .await .unwrap(); - tokio::time::sleep(TICK_TIME).await; + tokio::time::sleep(Self::TICK_TIME).await; i += 1; if i.rem_euclid(1000) == 0 { trace!("Did 1000 ticks"); @@ -229,7 +237,7 @@ impl BParticipant { break; } if b2b_prios_flushed_s.is_none() { - if let Some(prios_flushed_s) = shutdown_send_mgr_receiver.try_recv().unwrap() { + if let Ok(prios_flushed_s) = shutdown_send_mgr_receiver.try_recv() { b2b_prios_flushed_s = Some(prios_flushed_s); } } @@ -252,8 +260,9 @@ impl BParticipant { ) -> bool { let mut drop_cid = None; // TODO: find out ideal channel here + let res = if let Some(ci) = self.channels.read().await.values().next() { - let mut ci = ci.lock().await; + let ci = ci.lock().await; //we are increasing metrics without checking the result to please // borrow_checker. otherwise we would need to close `frame` what we // dont want! @@ -261,7 +270,7 @@ impl BParticipant { frames_out_total_cache .with_label_values(ci.cid, &frame) .inc(); - if let Err(e) = ci.b2w_frame_s.send(frame).await { + if let Err(e) = ci.b2w_frame_s.send(frame) { let cid = ci.cid; info!(?e, ?cid, "channel no longer available"); drop_cid = Some(cid); @@ -294,7 +303,6 @@ impl BParticipant { if let Err(e) = ci.b2r_read_shutdown.send(()) { trace!(?cid, ?e, "seems like was already shut down"); } - ci.b2w_frame_s.close_channel(); } //TODO FIXME tags: takeover channel multiple info!( @@ -311,7 +319,8 @@ impl BParticipant { async fn handle_frames_mgr( &self, mut w2b_frames_r: mpsc::UnboundedReceiver, - mut b2a_stream_opened_s: mpsc::UnboundedSender, + b2a_stream_opened_s: mpsc::UnboundedSender, + b2b_close_stream_opened_sender_r: oneshot::Receiver<()>, a2b_close_stream_s: mpsc::UnboundedSender, a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, ) { @@ -323,21 +332,24 @@ impl BParticipant { let mut dropped_instant = Instant::now(); let mut dropped_cnt = 0u64; let mut dropped_sid = Sid::new(0); + let mut b2a_stream_opened_s = Some(b2a_stream_opened_s); + let mut b2b_close_stream_opened_sender_r = b2b_close_stream_opened_sender_r.fuse(); - while let Some((cid, result_frame)) = w2b_frames_r.next().await { + while let Some((cid, result_frame)) = select!( + next = w2b_frames_r.recv().fuse() => next, + _ = &mut b2b_close_stream_opened_sender_r => { + b2a_stream_opened_s = None; + None + }, + ) { //trace!(?result_frame, "handling frame"); let frame = match result_frame { Ok(frame) => frame, Err(()) => { - // The read protocol stopped, i need to make sure that write gets stopped - debug!("read protocol was closed. Stopping write protocol"); - if let Some(ci) = self.channels.read().await.get(&cid) { - let mut ci = ci.lock().await; - ci.b2w_frame_s - .close() - .await - .expect("couldn't stop write protocol"); - } + // The read protocol stopped, i need to make sure that write gets stopped, can + // drop channel as it's dead anyway + debug!("read protocol was closed. Stopping channel"); + self.channels.write().await.remove(&cid); continue; }, }; @@ -360,13 +372,18 @@ impl BParticipant { let stream = self .create_stream(sid, prio, promises, a2p_msg_s, &a2b_close_stream_s) .await; - if let Err(e) = b2a_stream_opened_s.send(stream).await { - warn!( - ?e, - ?sid, - "couldn't notify api::Participant that a stream got opened. Is the \ - participant already dropped?" - ); + match &b2a_stream_opened_s { + None => debug!("dropping openStream as Channel is already closing"), + Some(s) => { + if let Err(e) = s.send(stream) { + warn!( + ?e, + ?sid, + "couldn't notify api::Participant that a stream got opened. \ + Is the participant already dropped?" + ); + } + }, } }, Frame::CloseStream { sid } => { @@ -465,6 +482,7 @@ impl BParticipant { ) { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("Start create_channel_mgr"); + let s2b_create_channel_r = UnboundedReceiverStream::new(s2b_create_channel_r); s2b_create_channel_r .for_each_concurrent( None, @@ -549,8 +567,8 @@ impl BParticipant { let mut shutdown_open_mgr_receiver = shutdown_open_mgr_receiver.fuse(); //from api or shutdown signal while let Some((prio, promises, p2a_return_stream)) = select! { - next = a2b_stream_open_r.next().fuse() => next, - _ = shutdown_open_mgr_receiver => None, + next = a2b_stream_open_r.recv().fuse() => next, + _ = &mut shutdown_open_mgr_receiver => None, } { debug!(?prio, ?promises, "Got request to open a new steam"); //TODO: a2b_stream_open_r isn't closed on api_close yet. This needs to change. @@ -657,7 +675,6 @@ impl BParticipant { itself, ignoring" ); }; - ci.b2w_frame_s.close_channel(); } //Wait for other bparticipants mgr to close via AtomicUsize @@ -712,8 +729,8 @@ impl BParticipant { //from api or shutdown signal while let Some(sid) = select! { - next = a2b_close_stream_r.next().fuse() => next, - sender = shutdown_stream_close_mgr_receiver => { + next = a2b_close_stream_r.recv().fuse() => next, + sender = &mut shutdown_stream_close_mgr_receiver => { b2b_stream_close_shutdown_confirmed_s = Some(sender.unwrap()); None } @@ -779,7 +796,7 @@ impl BParticipant { match self.streams.read().await.get(&sid) { Some(si) => { si.send_closed.store(true, Ordering::Relaxed); - si.b2a_msg_recv_s.lock().await.close_channel(); + si.b2a_msg_recv_s.lock().await.close(); }, None => trace!( "Couldn't find the stream, might be simultaneous close from local/remote" @@ -828,7 +845,7 @@ impl BParticipant { a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, a2b_close_stream_s: &mpsc::UnboundedSender, ) -> Stream { - let (b2a_msg_recv_s, b2a_msg_recv_r) = mpsc::unbounded::(); + let (b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded::(); let send_closed = Arc::new(AtomicBool::new(false)); self.streams.write().await.insert(sid, StreamInfo { prio, @@ -847,7 +864,6 @@ impl BParticipant { prio, promises, send_closed, - Arc::clone(&self.runtime), a2p_msg_s, b2a_msg_recv_r, a2b_close_stream_s.clone(), @@ -860,7 +876,9 @@ impl BParticipant { if let Some(r) = reason { lock.error = Some(r); } - lock.b2a_stream_opened_s.close_channel(); + lock.b2b_close_stream_opened_sender_s + .take() + .map(|s| s.send(())); debug!("Closing all streams for write"); for (sid, si) in self.streams.read().await.iter() { @@ -876,7 +894,7 @@ impl BParticipant { debug!("Closing all streams"); for (sid, si) in self.streams.read().await.iter() { trace!(?sid, "Shutting down Stream"); - si.b2a_msg_recv_s.lock().await.close_channel(); + si.b2a_msg_recv_s.lock().await.close(); } } } diff --git a/network/src/prios.rs b/network/src/prios.rs index 46d31024b5..a544a31241 100644 --- a/network/src/prios.rs +++ b/network/src/prios.rs @@ -11,9 +11,9 @@ use crate::{ types::{Frame, Prio, Sid}, }; use crossbeam_channel::{unbounded, Receiver, Sender}; -use futures::channel::oneshot; use std::collections::{HashMap, HashSet, VecDeque}; #[cfg(feature = "metrics")] use std::sync::Arc; +use tokio::sync::oneshot; use tracing::trace; const PRIO_MAX: usize = 64; @@ -289,8 +289,8 @@ mod tests { types::{Frame, Pid, Prio, Sid}, }; use crossbeam_channel::Sender; - use futures::{channel::oneshot, executor::block_on}; use std::{collections::VecDeque, sync::Arc}; + use tokio::{runtime::Runtime, sync::oneshot}; const SIZE: u64 = OutgoingMessage::FRAME_DATA_SIZE; const USIZE: usize = OutgoingMessage::FRAME_DATA_SIZE as usize; @@ -366,7 +366,9 @@ mod tests { let (mut mgr, msg_tx, _flush_tx) = mock_new(); msg_tx.send(mock_out(16, 1337)).unwrap(); let mut frames = VecDeque::new(); - block_on(mgr.fill_frames(100, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(100, &mut frames)); assert_header(&mut frames, 1337, 3); assert_data(&mut frames, 0, vec![48, 49, 50]); @@ -380,7 +382,9 @@ mod tests { msg_tx.send(mock_out(20, 42)).unwrap(); let mut frames = VecDeque::new(); - block_on(mgr.fill_frames(100, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(100, &mut frames)); assert_header(&mut frames, 1337, 3); assert_data(&mut frames, 0, vec![48, 49, 50]); assert_header(&mut frames, 42, 3); @@ -394,7 +398,9 @@ mod tests { msg_tx.send(mock_out(20, 42)).unwrap(); msg_tx.send(mock_out(16, 1337)).unwrap(); let mut frames = VecDeque::new(); - block_on(mgr.fill_frames(100, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(100, &mut frames)); assert_header(&mut frames, 1337, 3); assert_data(&mut frames, 0, vec![48, 49, 50]); @@ -420,7 +426,9 @@ mod tests { msg_tx.send(mock_out(16, 11)).unwrap(); msg_tx.send(mock_out(20, 13)).unwrap(); let mut frames = VecDeque::new(); - block_on(mgr.fill_frames(100, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(100, &mut frames)); for i in 1..14 { assert_header(&mut frames, i, 3); @@ -447,13 +455,17 @@ mod tests { msg_tx.send(mock_out(20, 13)).unwrap(); let mut frames = VecDeque::new(); - block_on(mgr.fill_frames(3, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(3, &mut frames)); for i in 1..4 { assert_header(&mut frames, i, 3); assert_data(&mut frames, 0, vec![48, 49, 50]); } assert!(frames.is_empty()); - block_on(mgr.fill_frames(11, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(11, &mut frames)); for i in 4..14 { assert_header(&mut frames, i, 3); assert_data(&mut frames, 0, vec![48, 49, 50]); @@ -466,7 +478,9 @@ mod tests { let (mut mgr, msg_tx, _flush_tx) = mock_new(); msg_tx.send(mock_out_large(16, 1)).unwrap(); let mut frames = VecDeque::new(); - block_on(mgr.fill_frames(100, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(100, &mut frames)); assert_header(&mut frames, 1, SIZE * 2 + 20); assert_data(&mut frames, 0, vec![48; USIZE]); @@ -481,7 +495,9 @@ mod tests { msg_tx.send(mock_out_large(16, 1)).unwrap(); msg_tx.send(mock_out_large(16, 2)).unwrap(); let mut frames = VecDeque::new(); - block_on(mgr.fill_frames(100, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(100, &mut frames)); assert_header(&mut frames, 1, SIZE * 2 + 20); assert_data(&mut frames, 0, vec![48; USIZE]); @@ -500,14 +516,18 @@ mod tests { msg_tx.send(mock_out_large(16, 1)).unwrap(); msg_tx.send(mock_out_large(16, 2)).unwrap(); let mut frames = VecDeque::new(); - block_on(mgr.fill_frames(2, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(2, &mut frames)); assert_header(&mut frames, 1, SIZE * 2 + 20); assert_data(&mut frames, 0, vec![48; USIZE]); assert_data(&mut frames, SIZE, vec![49; USIZE]); msg_tx.send(mock_out(0, 3)).unwrap(); - block_on(mgr.fill_frames(100, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(100, &mut frames)); assert_header(&mut frames, 3, 3); assert_data(&mut frames, 0, vec![48, 49, 50]); @@ -530,7 +550,9 @@ mod tests { msg_tx.send(mock_out(16, 2)).unwrap(); msg_tx.send(mock_out(16, 2)).unwrap(); let mut frames = VecDeque::new(); - block_on(mgr.fill_frames(2000, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(2000, &mut frames)); assert_header(&mut frames, 2, 3); assert_data(&mut frames, 0, vec![48, 49, 50]); @@ -549,13 +571,17 @@ mod tests { msg_tx.send(mock_out(16, 2)).unwrap(); } let mut frames = VecDeque::new(); - block_on(mgr.fill_frames(2000, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(2000, &mut frames)); //^unimportant frames, gonna be dropped msg_tx.send(mock_out(20, 1)).unwrap(); msg_tx.send(mock_out(16, 2)).unwrap(); msg_tx.send(mock_out(16, 2)).unwrap(); let mut frames = VecDeque::new(); - block_on(mgr.fill_frames(2000, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(2000, &mut frames)); //important in that test is, that after the first frames got cleared i reset // the Points even though 998 prio 16 messages have been send at this @@ -589,7 +615,9 @@ mod tests { .unwrap(); let mut frames = VecDeque::new(); - block_on(mgr.fill_frames(2000, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(2000, &mut frames)); assert_header(&mut frames, 2, 7000); assert_data(&mut frames, 0, vec![1; USIZE]); @@ -619,7 +647,9 @@ mod tests { msg_tx.send(mock_out(16, 8)).unwrap(); let mut frames = VecDeque::new(); - block_on(mgr.fill_frames(2000, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(2000, &mut frames)); assert_header(&mut frames, 2, 7000); assert_data(&mut frames, 0, vec![1; USIZE]); @@ -651,7 +681,9 @@ mod tests { msg_tx.send(mock_out(20, 8)).unwrap(); let mut frames = VecDeque::new(); - block_on(mgr.fill_frames(2000, &mut frames)); + Runtime::new() + .unwrap() + .block_on(mgr.fill_frames(2000, &mut frames)); assert_header(&mut frames, 2, 7000); assert_data(&mut frames, 0, vec![1; USIZE]); diff --git a/network/src/protocols.rs b/network/src/protocols.rs index 771ea649e5..b92ef27eee 100644 --- a/network/src/protocols.rs +++ b/network/src/protocols.rs @@ -4,19 +4,14 @@ use crate::{ participant::C2pFrame, types::{Cid, Frame}, }; +use futures_util::{future::Fuse, FutureExt}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::{TcpStream, UdpSocket}, + select, + sync::{mpsc, oneshot, Mutex}, }; -use futures::{ - channel::{mpsc, oneshot}, - future::{Fuse, FutureExt}, - lock::Mutex, - select, - sink::SinkExt, - stream::StreamExt, -}; use std::{convert::TryFrom, net::SocketAddr, sync::Arc}; use tracing::*; @@ -75,7 +70,7 @@ impl TcpProtocol { async fn read_frame( r: &mut R, - mut end_receiver: &mut Fuse>, + end_receiver: &mut Fuse>, ) -> Result> { let handle = |read_result| match read_result { Ok(_) => Ok(()), @@ -190,7 +185,6 @@ impl TcpProtocol { } w2c_cid_frame_s .send((cid, Ok(frame))) - .await .expect("Channel or Participant seems no longer to exist"); }, Err(e_option) => { @@ -201,7 +195,6 @@ impl TcpProtocol { // need a explicit STOP here w2c_cid_frame_s .send((cid, Err(()))) - .await .expect("Channel or Participant seems no longer to exist"); } //None is clean shutdown @@ -284,7 +277,7 @@ impl TcpProtocol { #[cfg(not(feature = "metrics"))] let _cid = cid; - while let Some(frame) = c2w_frame_r.next().await { + while let Some(frame) = c2w_frame_r.recv().await { #[cfg(feature = "metrics")] { metrics_cache.with_label_values(&frame).inc(); @@ -343,15 +336,15 @@ impl UdpProtocol { let mut data_in = self.data_in.lock().await; let mut end_r = end_r.fuse(); while let Some(bytes) = select! { - r = data_in.next().fuse() => match r { + r = data_in.recv().fuse() => match r { Some(r) => Some(r), None => { info!("Udp read ended"); - w2c_cid_frame_s.send((cid, Err(()))).await.expect("Channel or Participant seems no longer to exist"); + w2c_cid_frame_s.send((cid, Err(()))).expect("Channel or Participant seems no longer to exist"); None } }, - _ = end_r => None, + _ = &mut end_r => None, } { trace!("Got raw UDP message with len: {}", bytes.len()); let frame_no = bytes[0]; @@ -389,7 +382,7 @@ impl UdpProtocol { }; #[cfg(feature = "metrics")] metrics_cache.with_label_values(&frame).inc(); - w2c_cid_frame_s.send((cid, Ok(frame))).await.unwrap(); + w2c_cid_frame_s.send((cid, Ok(frame))).unwrap(); } trace!("Shutting down udp read()"); } @@ -406,7 +399,7 @@ impl UdpProtocol { .with_label_values(&[&cid.to_string()]); #[cfg(not(feature = "metrics"))] let _cid = cid; - while let Some(frame) = c2w_frame_r.next().await { + while let Some(frame) = c2w_frame_r.recv().await { #[cfg(feature = "metrics")] metrics_cache.with_label_values(&frame).inc(); let len = match frame { @@ -501,9 +494,8 @@ impl UdpProtocol { mod tests { use super::*; use crate::{metrics::NetworkMetrics, types::Pid}; - use tokio::net; - use futures::{executor::block_on, stream::StreamExt}; use std::sync::Arc; + use tokio::{net, runtime::Runtime, sync::mpsc}; #[test] fn tcp_read_handshake() { @@ -511,11 +503,11 @@ mod tests { let cid = 80085; let metrics = Arc::new(NetworkMetrics::new(&pid).unwrap()); let addr = std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 0, 0, 1), 50500); - block_on(async { + Runtime::new().unwrap().block_on(async { let server = net::TcpListener::bind(addr).await.unwrap(); let mut client = net::TcpStream::connect(addr).await.unwrap(); - let s_stream = server.incoming().next().await.unwrap().unwrap(); + let (s_stream, _) = server.accept().await.unwrap(); let prot = TcpProtocol::new(s_stream, metrics); //Send Handshake @@ -524,21 +516,21 @@ mod tests { client.write_all(&1337u32.to_le_bytes()).await.unwrap(); client.write_all(&0u32.to_le_bytes()).await.unwrap(); client.write_all(&42u32.to_le_bytes()).await.unwrap(); - client.flush(); + client.flush().await.unwrap(); //handle data - let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::(); + let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded_channel::(); let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let cid2 = cid; let t = std::thread::spawn(move || { - block_on(async { + Runtime::new().unwrap().block_on(async { prot.read_from_wire(cid2, &mut w2c_cid_frame_s, read_stop_receiver) .await; }) }); // Assert than we get some value back! Its a Handshake! //tokio::task::sleep(std::time::Duration::from_millis(1000)); - let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap(); + let (cid_r, frame) = w2c_cid_frame_r.recv().await.unwrap(); assert_eq!(cid, cid_r); if let Ok(Frame::Handshake { magic_number, @@ -561,11 +553,11 @@ mod tests { let cid = 80085; let metrics = Arc::new(NetworkMetrics::new(&pid).unwrap()); let addr = std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 0, 0, 1), 50501); - block_on(async { + Runtime::new().unwrap().block_on(async { let server = net::TcpListener::bind(addr).await.unwrap(); let mut client = net::TcpStream::connect(addr).await.unwrap(); - let s_stream = server.incoming().next().await.unwrap().unwrap(); + let (s_stream, _) = server.accept().await.unwrap(); let prot = TcpProtocol::new(s_stream, metrics); //Send Handshake @@ -573,19 +565,19 @@ mod tests { .write_all("x4hrtzsektfhxugzdtz5r78gzrtzfhxfdthfthuzhfzzufasgasdfg".as_bytes()) .await .unwrap(); - client.flush(); + client.flush().await.unwrap(); //handle data - let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::(); + let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded_channel::(); let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let cid2 = cid; let t = std::thread::spawn(move || { - block_on(async { + Runtime::new().unwrap().block_on(async { prot.read_from_wire(cid2, &mut w2c_cid_frame_s, read_stop_receiver) .await; }) }); // Assert than we get some value back! Its a Raw! - let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap(); + let (cid_r, frame) = w2c_cid_frame_r.recv().await.unwrap(); assert_eq!(cid, cid_r); if let Ok(Frame::Raw(data)) = frame { assert_eq!(&data.as_slice(), b"x4hrtzsektfhxugzdtz5r78gzrtzfhxf"); diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index e0c3b0ef84..f648d48a15 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -7,15 +7,7 @@ use crate::{ protocols::{Protocols, TcpProtocol, UdpProtocol}, types::Pid, }; -use tokio::{io, net, sync::Mutex}; -use tokio::runtime::Runtime; -use futures::{ - channel::{mpsc, oneshot}, - future::FutureExt, - select, - sink::SinkExt, - stream::StreamExt, -}; +use futures_util::{FutureExt, StreamExt}; #[cfg(feature = "metrics")] use prometheus::Registry; use rand::Rng; @@ -26,6 +18,13 @@ use std::{ Arc, }, }; +use tokio::{ + io, net, + runtime::Runtime, + select, + sync::{mpsc, oneshot, Mutex}, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; use tracing_futures::Instrument; @@ -92,12 +91,13 @@ impl Scheduler { mpsc::UnboundedReceiver, oneshot::Sender<()>, ) { - let (a2s_listen_s, a2s_listen_r) = mpsc::unbounded::(); - let (a2s_connect_s, a2s_connect_r) = mpsc::unbounded::(); - let (s2a_connected_s, s2a_connected_r) = mpsc::unbounded::(); + let (a2s_listen_s, a2s_listen_r) = mpsc::unbounded_channel::(); + let (a2s_connect_s, a2s_connect_r) = mpsc::unbounded_channel::(); + let (s2a_connected_s, s2a_connected_r) = mpsc::unbounded_channel::(); let (a2s_scheduler_shutdown_s, a2s_scheduler_shutdown_r) = oneshot::channel::<()>(); - let (a2s_disconnect_s, a2s_disconnect_r) = mpsc::unbounded::(); - let (b2s_prio_statistic_s, b2s_prio_statistic_r) = mpsc::unbounded::(); + let (a2s_disconnect_s, a2s_disconnect_r) = mpsc::unbounded_channel::(); + let (b2s_prio_statistic_s, b2s_prio_statistic_r) = + mpsc::unbounded_channel::(); let run_channels = Some(ControlChannels { a2s_listen_r, @@ -150,7 +150,7 @@ impl Scheduler { pub async fn run(mut self) { let run_channels = self.run_channels.take().unwrap(); - futures::join!( + tokio::join!( self.listen_mgr(run_channels.a2s_listen_r), self.connect_mgr(run_channels.a2s_connect_r), self.disconnect_mgr(run_channels.a2s_disconnect_r), @@ -161,6 +161,7 @@ impl Scheduler { async fn listen_mgr(&self, a2s_listen_r: mpsc::UnboundedReceiver) { trace!("Start listen_mgr"); + let a2s_listen_r = UnboundedReceiverStream::new(a2s_listen_r); a2s_listen_r .for_each_concurrent(None, |(address, s2a_listen_result_s)| { let address = address; @@ -197,7 +198,7 @@ impl Scheduler { )>, ) { trace!("Start connect_mgr"); - while let Some((addr, pid_sender)) = a2s_connect_r.next().await { + while let Some((addr, pid_sender)) = a2s_connect_r.recv().await { let (protocol, handshake) = match addr { ProtocolAddr::Tcp(addr) => { #[cfg(feature = "metrics")] @@ -240,7 +241,7 @@ impl Scheduler { continue; }; info!("Connecting Udp to: {}", addr); - let (udp_data_sender, udp_data_receiver) = mpsc::unbounded::>(); + let (udp_data_sender, udp_data_receiver) = mpsc::unbounded_channel::>(); let protocol = UdpProtocol::new( Arc::clone(&socket), addr, @@ -264,7 +265,7 @@ impl Scheduler { async fn disconnect_mgr(&self, mut a2s_disconnect_r: mpsc::UnboundedReceiver) { trace!("Start disconnect_mgr"); - while let Some((pid, return_once_successful_shutdown)) = a2s_disconnect_r.next().await { + while let Some((pid, return_once_successful_shutdown)) = a2s_disconnect_r.recv().await { //Closing Participants is done the following way: // 1. We drop our senders and receivers // 2. we need to close BParticipant, this will drop its senderns and receivers @@ -299,7 +300,7 @@ impl Scheduler { mut b2s_prio_statistic_r: mpsc::UnboundedReceiver, ) { trace!("Start prio_adj_mgr"); - while let Some((_pid, _frame_cnt, _unused)) = b2s_prio_statistic_r.next().await { + while let Some((_pid, _frame_cnt, _unused)) = b2s_prio_statistic_r.recv().await { //TODO adjust prios in participants here! } @@ -381,7 +382,7 @@ impl Scheduler { let mut end_receiver = s2s_stop_listening_r.fuse(); while let Some(data) = select! { next = listener.accept().fuse() => Some(next), - _ = end_receiver => None, + _ = &mut end_receiver => None, } { let (stream, remote_addr) = match data { Ok((s, p)) => (s, p), @@ -425,7 +426,7 @@ impl Scheduler { let mut data = [0u8; UDP_MAXIMUM_SINGLE_PACKET_SIZE_EVER]; while let Ok((size, remote_addr)) = select! { next = socket.recv_from(&mut data).fuse() => next, - _ = end_receiver => Err(std::io::Error::new(std::io::ErrorKind::Other, "")), + _ = &mut end_receiver => Err(std::io::Error::new(std::io::ErrorKind::Other, "")), } { let mut datavec = Vec::with_capacity(size); datavec.extend_from_slice(&data[0..size]); @@ -434,7 +435,8 @@ impl Scheduler { #[allow(clippy::map_entry)] if !listeners.contains_key(&remote_addr) { info!("Accepting Udp from: {}", &remote_addr); - let (udp_data_sender, udp_data_receiver) = mpsc::unbounded::>(); + let (udp_data_sender, udp_data_receiver) = + mpsc::unbounded_channel::>(); listeners.insert(remote_addr, udp_data_sender); let protocol = UdpProtocol::new( Arc::clone(&socket), @@ -447,7 +449,7 @@ impl Scheduler { .await; } let udp_data_sender = listeners.get_mut(&remote_addr).unwrap(); - udp_data_sender.send(datavec).await.unwrap(); + udp_data_sender.send(datavec).unwrap(); } }, _ => unimplemented!(), @@ -457,7 +459,7 @@ impl Scheduler { async fn udp_single_channel_connect( socket: Arc, - mut w2p_udp_package_s: mpsc::UnboundedSender>, + w2p_udp_package_s: mpsc::UnboundedSender>, ) { let addr = socket.local_addr(); trace!(?addr, "Start udp_single_channel_connect"); @@ -470,11 +472,11 @@ impl Scheduler { let mut data = [0u8; 9216]; while let Ok(size) = select! { next = socket.recv(&mut data).fuse() => next, - _ = end_receiver => Err(std::io::Error::new(std::io::ErrorKind::Other, "")), + _ = &mut end_receiver => Err(std::io::Error::new(std::io::ErrorKind::Other, "")), } { let mut datavec = Vec::with_capacity(size); datavec.extend_from_slice(&data[0..size]); - w2p_udp_package_s.send(datavec).await.unwrap(); + w2p_udp_package_s.send(datavec).unwrap(); } trace!(?addr, "Stop udp_single_channel_connect"); } @@ -491,7 +493,7 @@ impl Scheduler { Contra: - DOS possibility because we answer first - Speed, because otherwise the message can be send with the creation */ - let mut participant_channels = self.participant_channels.lock().await.clone().unwrap(); + let participant_channels = self.participant_channels.lock().await.clone().unwrap(); // spawn is needed here, e.g. for TCP connect it would mean that only 1 // participant can be in handshake phase ever! Someone could deadlock // the whole server easily for new clients UDP doesnt work at all, as @@ -533,7 +535,7 @@ impl Scheduler { bparticipant, a2b_stream_open_s, b2a_stream_opened_r, - mut s2b_create_channel_s, + s2b_create_channel_s, s2b_shutdown_bparticipant_s, ) = BParticipant::new( pid, @@ -578,7 +580,6 @@ impl Scheduler { leftover_cid_frame, b2s_create_channel_done_s, )) - .await .unwrap(); b2s_create_channel_done_r.await.unwrap(); if let Some(pid_oneshot) = s2a_return_pid_s { @@ -589,7 +590,6 @@ impl Scheduler { participant_channels .s2a_connected_s .send(participant) - .await .unwrap(); } } else { diff --git a/network/tests/closing.rs b/network/tests/closing.rs index fac118ff5a..e3abb25533 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -18,8 +18,8 @@ //! - You sometimes see sleep(1000ms) this is used when we rely on the //! underlying TCP functionality, as this simulates client and server -use async_std::task; -use task::block_on; +use std::sync::Arc; +use tokio::runtime::Runtime; use veloren_network::{Network, ParticipantError, Pid, Promises, StreamError}; mod helper; use helper::{network_participant_stream, tcp}; @@ -27,26 +27,26 @@ use helper::{network_participant_stream, tcp}; #[test] fn close_network() { let (_, _) = helper::setup(false, 0); - let (_, _p1_a, mut s1_a, _, _p1_b, mut s1_b) = block_on(network_participant_stream(tcp())); + let (r, _, _p1_a, mut s1_a, _, _p1_b, mut s1_b) = network_participant_stream(tcp()); std::thread::sleep(std::time::Duration::from_millis(1000)); assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed)); - let msg1: Result = block_on(s1_b.recv()); + let msg1: Result = r.block_on(s1_b.recv()); assert_eq!(msg1, Err(StreamError::StreamClosed)); } #[test] fn close_participant() { let (_, _) = helper::setup(false, 0); - let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp())); + let (r, _n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = network_participant_stream(tcp()); - block_on(p1_a.disconnect()).unwrap(); - block_on(p1_b.disconnect()).unwrap(); + r.block_on(p1_a.disconnect()).unwrap(); + r.block_on(p1_b.disconnect()).unwrap(); assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed)); assert_eq!( - block_on(s1_b.recv::()), + r.block_on(s1_b.recv::()), Err(StreamError::StreamClosed) ); } @@ -54,14 +54,14 @@ fn close_participant() { #[test] fn close_stream() { let (_, _) = helper::setup(false, 0); - let (_n_a, _, mut s1_a, _n_b, _, _) = block_on(network_participant_stream(tcp())); + let (r, _n_a, _, mut s1_a, _n_b, _, _) = network_participant_stream(tcp()); // s1_b is dropped directly while s1_a isn't std::thread::sleep(std::time::Duration::from_millis(1000)); assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed)); assert_eq!( - block_on(s1_a.recv::()), + r.block_on(s1_a.recv::()), Err(StreamError::StreamClosed) ); } @@ -72,8 +72,8 @@ fn close_stream() { #[test] fn close_streams_in_block_on() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, s1_a, _n_b, _p_b, s1_b) = block_on(network_participant_stream(tcp())); - block_on(async { + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, s1_b) = network_participant_stream(tcp()); + r.block_on(async { //make it locally so that they are dropped later let mut s1_a = s1_a; let mut s1_b = s1_b; @@ -86,14 +86,14 @@ fn close_streams_in_block_on() { #[test] fn stream_simple_3msg_then_close() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(1u8).unwrap(); s1_a.send(42).unwrap(); s1_a.send("3rdMessage").unwrap(); - assert_eq!(block_on(s1_b.recv()), Ok(1u8)); - assert_eq!(block_on(s1_b.recv()), Ok(42)); - assert_eq!(block_on(s1_b.recv()), Ok("3rdMessage".to_string())); + assert_eq!(r.block_on(s1_b.recv()), Ok(1u8)); + assert_eq!(r.block_on(s1_b.recv()), Ok(42)); + assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string())); drop(s1_a); std::thread::sleep(std::time::Duration::from_millis(1000)); assert_eq!(s1_b.send("Hello World"), Err(StreamError::StreamClosed)); @@ -103,43 +103,43 @@ fn stream_simple_3msg_then_close() { fn stream_send_first_then_receive() { // recv should still be possible even if stream got closed if they are in queue let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(1u8).unwrap(); s1_a.send(42).unwrap(); s1_a.send("3rdMessage").unwrap(); drop(s1_a); std::thread::sleep(std::time::Duration::from_millis(1000)); - assert_eq!(block_on(s1_b.recv()), Ok(1u8)); - assert_eq!(block_on(s1_b.recv()), Ok(42)); - assert_eq!(block_on(s1_b.recv()), Ok("3rdMessage".to_string())); + assert_eq!(r.block_on(s1_b.recv()), Ok(1u8)); + assert_eq!(r.block_on(s1_b.recv()), Ok(42)); + assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string())); assert_eq!(s1_b.send("Hello World"), Err(StreamError::StreamClosed)); } #[test] fn stream_send_1_then_close_stream() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send("this message must be received, even if stream is closed already!") .unwrap(); drop(s1_a); std::thread::sleep(std::time::Duration::from_millis(1000)); let exp = Ok("this message must be received, even if stream is closed already!".to_string()); - assert_eq!(block_on(s1_b.recv()), exp); + assert_eq!(r.block_on(s1_b.recv()), exp); println!("all received and done"); } #[test] fn stream_send_100000_then_close_stream() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); for _ in 0..100000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } drop(s1_a); let exp = Ok("woop_PARTY_HARD_woop".to_string()); println!("start receiving"); - block_on(async { + r.block_on(async { for _ in 0..100000 { assert_eq!(s1_b.recv().await, exp); } @@ -150,7 +150,7 @@ fn stream_send_100000_then_close_stream() { #[test] fn stream_send_100000_then_close_stream_remote() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp())); + let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..100000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -162,7 +162,7 @@ fn stream_send_100000_then_close_stream_remote() { #[test] fn stream_send_100000_then_close_stream_remote2() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp())); + let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..100000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -175,7 +175,7 @@ fn stream_send_100000_then_close_stream_remote2() { #[test] fn stream_send_100000_then_close_stream_remote3() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp())); + let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..100000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -188,7 +188,7 @@ fn stream_send_100000_then_close_stream_remote3() { #[test] fn close_part_then_network() { let (_, _) = helper::setup(false, 0); - let (n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp())); + let (_, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..1000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -201,7 +201,7 @@ fn close_part_then_network() { #[test] fn close_network_then_part() { let (_, _) = helper::setup(false, 0); - let (n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp())); + let (_, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..1000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -214,39 +214,39 @@ fn close_network_then_part() { #[test] fn close_network_then_disconnect_part() { let (_, _) = helper::setup(false, 0); - let (n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp())); + let (r, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..1000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } drop(n_a); - assert!(block_on(p_a.disconnect()).is_err()); + assert!(r.block_on(p_a.disconnect()).is_err()); std::thread::sleep(std::time::Duration::from_millis(1000)); } #[test] fn opened_stream_before_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); - let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp())); - let mut s2_a = block_on(p_a.open(10, Promises::empty())).unwrap(); + let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); + let mut s2_a = r.block_on(p_a.open(10, Promises::empty())).unwrap(); s2_a.send("HelloWorld").unwrap(); - let mut s2_b = block_on(p_b.opened()).unwrap(); + let mut s2_b = r.block_on(p_b.opened()).unwrap(); drop(p_a); std::thread::sleep(std::time::Duration::from_millis(1000)); - assert_eq!(block_on(s2_b.recv()), Ok("HelloWorld".to_string())); + assert_eq!(r.block_on(s2_b.recv()), Ok("HelloWorld".to_string())); } #[test] fn opened_stream_after_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); - let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp())); - let mut s2_a = block_on(p_a.open(10, Promises::empty())).unwrap(); + let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); + let mut s2_a = r.block_on(p_a.open(10, Promises::empty())).unwrap(); s2_a.send("HelloWorld").unwrap(); drop(p_a); std::thread::sleep(std::time::Duration::from_millis(1000)); - let mut s2_b = block_on(p_b.opened()).unwrap(); - assert_eq!(block_on(s2_b.recv()), Ok("HelloWorld".to_string())); + let mut s2_b = r.block_on(p_b.opened()).unwrap(); + assert_eq!(r.block_on(s2_b.recv()), Ok("HelloWorld".to_string())); assert_eq!( - block_on(p_b.opened()).unwrap_err(), + r.block_on(p_b.opened()).unwrap_err(), ParticipantError::ParticipantDisconnected ); } @@ -254,15 +254,15 @@ fn opened_stream_after_remote_part_is_closed() { #[test] fn open_stream_after_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); - let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp())); - let mut s2_a = block_on(p_a.open(10, Promises::empty())).unwrap(); + let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); + let mut s2_a = r.block_on(p_a.open(10, Promises::empty())).unwrap(); s2_a.send("HelloWorld").unwrap(); drop(p_a); std::thread::sleep(std::time::Duration::from_millis(1000)); - let mut s2_b = block_on(p_b.opened()).unwrap(); - assert_eq!(block_on(s2_b.recv()), Ok("HelloWorld".to_string())); + let mut s2_b = r.block_on(p_b.opened()).unwrap(); + assert_eq!(r.block_on(s2_b.recv()), Ok("HelloWorld".to_string())); assert_eq!( - block_on(p_b.open(20, Promises::empty())).unwrap_err(), + r.block_on(p_b.open(20, Promises::empty())).unwrap_err(), ParticipantError::ParticipantDisconnected ); } @@ -270,11 +270,11 @@ fn open_stream_after_remote_part_is_closed() { #[test] fn failed_stream_open_after_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); - let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp())); + let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); drop(p_a); std::thread::sleep(std::time::Duration::from_millis(1000)); assert_eq!( - block_on(p_b.opened()).unwrap_err(), + r.block_on(p_b.opened()).unwrap_err(), ParticipantError::ParticipantDisconnected ); } @@ -282,72 +282,69 @@ fn failed_stream_open_after_remote_part_is_closed() { #[test] fn open_participant_before_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); - let (n_a, f) = Network::new(Pid::fake(0)); - std::thread::spawn(f); - let (n_b, f) = Network::new(Pid::fake(1)); - std::thread::spawn(f); + let r = Arc::new(Runtime::new().unwrap()); + let n_a = Network::new(Pid::fake(0), Arc::clone(&r)); + let n_b = Network::new(Pid::fake(1), Arc::clone(&r)); let addr = tcp(); - block_on(n_a.listen(addr.clone())).unwrap(); - let p_b = block_on(n_b.connect(addr)).unwrap(); - let mut s1_b = block_on(p_b.open(10, Promises::empty())).unwrap(); + r.block_on(n_a.listen(addr.clone())).unwrap(); + let p_b = r.block_on(n_b.connect(addr)).unwrap(); + let mut s1_b = r.block_on(p_b.open(10, Promises::empty())).unwrap(); s1_b.send("HelloWorld").unwrap(); - let p_a = block_on(n_a.connected()).unwrap(); + let p_a = r.block_on(n_a.connected()).unwrap(); drop(s1_b); drop(p_b); drop(n_b); std::thread::sleep(std::time::Duration::from_millis(1000)); - let mut s1_a = block_on(p_a.opened()).unwrap(); - assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string())); + let mut s1_a = r.block_on(p_a.opened()).unwrap(); + assert_eq!(r.block_on(s1_a.recv()), Ok("HelloWorld".to_string())); } #[test] fn open_participant_after_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); - let (n_a, f) = Network::new(Pid::fake(0)); - std::thread::spawn(f); - let (n_b, f) = Network::new(Pid::fake(1)); - std::thread::spawn(f); + let r = Arc::new(Runtime::new().unwrap()); + let n_a = Network::new(Pid::fake(0), Arc::clone(&r)); + let n_b = Network::new(Pid::fake(1), Arc::clone(&r)); let addr = tcp(); - block_on(n_a.listen(addr.clone())).unwrap(); - let p_b = block_on(n_b.connect(addr)).unwrap(); - let mut s1_b = block_on(p_b.open(10, Promises::empty())).unwrap(); + r.block_on(n_a.listen(addr.clone())).unwrap(); + let p_b = r.block_on(n_b.connect(addr)).unwrap(); + let mut s1_b = r.block_on(p_b.open(10, Promises::empty())).unwrap(); s1_b.send("HelloWorld").unwrap(); drop(s1_b); drop(p_b); drop(n_b); std::thread::sleep(std::time::Duration::from_millis(1000)); - let p_a = block_on(n_a.connected()).unwrap(); - let mut s1_a = block_on(p_a.opened()).unwrap(); - assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string())); + let p_a = r.block_on(n_a.connected()).unwrap(); + let mut s1_a = r.block_on(p_a.opened()).unwrap(); + assert_eq!(r.block_on(s1_a.recv()), Ok("HelloWorld".to_string())); } #[test] fn close_network_scheduler_completely() { let (_, _) = helper::setup(false, 0); - let (n_a, f) = Network::new(Pid::fake(0)); - let ha = std::thread::spawn(f); - let (n_b, f) = Network::new(Pid::fake(1)); - let hb = std::thread::spawn(f); + let r = Arc::new(Runtime::new().unwrap()); + let n_a = Network::new(Pid::fake(0), Arc::clone(&r)); + let n_b = Network::new(Pid::fake(1), Arc::clone(&r)); let addr = tcp(); - block_on(n_a.listen(addr.clone())).unwrap(); - let p_b = block_on(n_b.connect(addr)).unwrap(); - let mut s1_b = block_on(p_b.open(10, Promises::empty())).unwrap(); + r.block_on(n_a.listen(addr.clone())).unwrap(); + let p_b = r.block_on(n_b.connect(addr)).unwrap(); + let mut s1_b = r.block_on(p_b.open(10, Promises::empty())).unwrap(); s1_b.send("HelloWorld").unwrap(); - let p_a = block_on(n_a.connected()).unwrap(); - let mut s1_a = block_on(p_a.opened()).unwrap(); - assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string())); + let p_a = r.block_on(n_a.connected()).unwrap(); + let mut s1_a = r.block_on(p_a.opened()).unwrap(); + assert_eq!(r.block_on(s1_a.recv()), Ok("HelloWorld".to_string())); drop(n_a); drop(n_b); std::thread::sleep(std::time::Duration::from_millis(1000)); - ha.join().unwrap(); - hb.join().unwrap(); + let runtime = Arc::try_unwrap(r).expect("runtime is not alone, there still exist a reference"); + runtime.shutdown_timeout(std::time::Duration::from_secs(300)); } #[test] fn dont_panic_on_multiply_recv_after_close() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); + let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(11u32).unwrap(); drop(s1_a); @@ -362,7 +359,7 @@ fn dont_panic_on_multiply_recv_after_close() { #[test] fn dont_panic_on_recv_send_after_close() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); + let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(11u32).unwrap(); drop(s1_a); @@ -375,7 +372,7 @@ fn dont_panic_on_recv_send_after_close() { #[test] fn dont_panic_on_multiple_send_after_close() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); + let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(11u32).unwrap(); drop(s1_a); diff --git a/network/tests/helper.rs b/network/tests/helper.rs index 93ee64a1e6..64c65b0e91 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -1,10 +1,14 @@ use lazy_static::*; use std::{ net::SocketAddr, - sync::atomic::{AtomicU16, Ordering}, + sync::{ + atomic::{AtomicU16, Ordering}, + Arc, + }, thread, time::Duration, }; +use tokio::runtime::Runtime; use tracing::*; use tracing_subscriber::EnvFilter; use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr, Stream}; @@ -43,22 +47,32 @@ pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) { } #[allow(dead_code)] -pub async fn network_participant_stream( +pub fn network_participant_stream( addr: ProtocolAddr, -) -> (Network, Participant, Stream, Network, Participant, Stream) { - let (n_a, f_a) = Network::new(Pid::fake(0)); - std::thread::spawn(f_a); - let (n_b, f_b) = Network::new(Pid::fake(1)); - std::thread::spawn(f_b); +) -> ( + Arc, + Network, + Participant, + Stream, + Network, + Participant, + Stream, +) { + let runtime = Arc::new(Runtime::new().unwrap()); + let (n_a, p1_a, s1_a, n_b, p1_b, s1_b) = runtime.block_on(async { + let n_a = Network::new(Pid::fake(0), Arc::clone(&runtime)); + let n_b = Network::new(Pid::fake(1), Arc::clone(&runtime)); - n_a.listen(addr.clone()).await.unwrap(); - let p1_b = n_b.connect(addr).await.unwrap(); - let p1_a = n_a.connected().await.unwrap(); + n_a.listen(addr.clone()).await.unwrap(); + let p1_b = n_b.connect(addr).await.unwrap(); + let p1_a = n_a.connected().await.unwrap(); - let s1_a = p1_a.open(10, Promises::empty()).await.unwrap(); - let s1_b = p1_b.opened().await.unwrap(); + let s1_a = p1_a.open(10, Promises::empty()).await.unwrap(); + let s1_b = p1_b.opened().await.unwrap(); - (n_a, p1_a, s1_a, n_b, p1_b, s1_b) + (n_a, p1_a, s1_a, n_b, p1_b, s1_b) + }); + (runtime, n_a, p1_a, s1_a, n_b, p1_b, s1_b) } #[allow(dead_code)] diff --git a/network/tests/integration.rs b/network/tests/integration.rs index f4c8367841..b83f50b570 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -1,5 +1,5 @@ -use async_std::task; -use task::block_on; +use std::sync::Arc; +use tokio::runtime::Runtime; use veloren_network::{NetworkError, StreamError}; mod helper; use helper::{network_participant_stream, tcp, udp}; @@ -10,23 +10,23 @@ use veloren_network::{Network, Pid, Promises, ProtocolAddr}; #[ignore] fn network_20s() { let (_, _) = helper::setup(false, 0); - let (_n_a, _, _, _n_b, _, _) = block_on(network_participant_stream(tcp())); + let (_, _n_a, _, _, _n_b, _, _) = network_participant_stream(tcp()); std::thread::sleep(std::time::Duration::from_secs(30)); } #[test] fn stream_simple() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send("Hello World").unwrap(); - assert_eq!(block_on(s1_b.recv()), Ok("Hello World".to_string())); + assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); } #[test] fn stream_try_recv() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); + let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(4242u32).unwrap(); std::thread::sleep(std::time::Duration::from_secs(1)); @@ -36,47 +36,46 @@ fn stream_try_recv() { #[test] fn stream_simple_3msg() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send("Hello World").unwrap(); s1_a.send(1337).unwrap(); - assert_eq!(block_on(s1_b.recv()), Ok("Hello World".to_string())); - assert_eq!(block_on(s1_b.recv()), Ok(1337)); + assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); + assert_eq!(r.block_on(s1_b.recv()), Ok(1337)); s1_a.send("3rdMessage").unwrap(); - assert_eq!(block_on(s1_b.recv()), Ok("3rdMessage".to_string())); + assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string())); } #[test] fn stream_simple_udp() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(udp())); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp()); s1_a.send("Hello World").unwrap(); - assert_eq!(block_on(s1_b.recv()), Ok("Hello World".to_string())); + assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); } #[test] fn stream_simple_udp_3msg() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(udp())); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp()); s1_a.send("Hello World").unwrap(); s1_a.send(1337).unwrap(); - assert_eq!(block_on(s1_b.recv()), Ok("Hello World".to_string())); - assert_eq!(block_on(s1_b.recv()), Ok(1337)); + assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); + assert_eq!(r.block_on(s1_b.recv()), Ok(1337)); s1_a.send("3rdMessage").unwrap(); - assert_eq!(block_on(s1_b.recv()), Ok("3rdMessage".to_string())); + assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string())); } #[test] #[ignore] fn tcp_and_udp_2_connections() -> std::result::Result<(), Box> { let (_, _) = helper::setup(false, 0); - let (network, f) = Network::new(Pid::new()); - let (remote, fr) = Network::new(Pid::new()); - std::thread::spawn(f); - std::thread::spawn(fr); - block_on(async { + let r = Arc::new(Runtime::new().unwrap()); + let network = Network::new(Pid::new(), Arc::clone(&r)); + let remote = Network::new(Pid::new(), Arc::clone(&r)); + r.block_on(async { remote .listen(ProtocolAddr::Tcp("127.0.0.1:2000".parse().unwrap())) .await?; @@ -97,18 +96,17 @@ fn tcp_and_udp_2_connections() -> std::result::Result<(), Box std::result::Result<(), Box> { let (_, _) = helper::setup(false, 0); - let (network, f) = Network::new(Pid::new()); - std::thread::spawn(f); + let r = Arc::new(Runtime::new().unwrap()); + let network = Network::new(Pid::new(), Arc::clone(&r)); let udp1 = udp(); let tcp1 = tcp(); - block_on(network.listen(udp1.clone()))?; - block_on(network.listen(tcp1.clone()))?; + r.block_on(network.listen(udp1.clone()))?; + r.block_on(network.listen(tcp1.clone()))?; std::thread::sleep(std::time::Duration::from_millis(200)); - let (network2, f2) = Network::new(Pid::new()); - std::thread::spawn(f2); - let e1 = block_on(network2.listen(udp1)); - let e2 = block_on(network2.listen(tcp1)); + let network2 = Network::new(Pid::new(), Arc::clone(&r)); + let e1 = r.block_on(network2.listen(udp1)); + let e2 = r.block_on(network2.listen(tcp1)); match e1 { Err(NetworkError::ListenFailed(e)) if e.kind() == ErrorKind::AddrInUse => (), _ => panic!(), @@ -130,11 +128,10 @@ fn api_stream_send_main() -> std::result::Result<(), Box> let (_, _) = helper::setup(false, 0); // Create a Network, listen on Port `1200` and wait for a Stream to be opened, // then answer `Hello World` - let (network, f) = Network::new(Pid::new()); - let (remote, fr) = Network::new(Pid::new()); - std::thread::spawn(f); - std::thread::spawn(fr); - block_on(async { + let r = Arc::new(Runtime::new().unwrap()); + let network = Network::new(Pid::new(), Arc::clone(&r)); + let remote = Network::new(Pid::new(), Arc::clone(&r)); + r.block_on(async { network .listen(ProtocolAddr::Tcp("127.0.0.1:1200".parse().unwrap())) .await?; @@ -158,11 +155,10 @@ fn api_stream_recv_main() -> std::result::Result<(), Box> let (_, _) = helper::setup(false, 0); // Create a Network, listen on Port `1220` and wait for a Stream to be opened, // then listen on it - let (network, f) = Network::new(Pid::new()); - let (remote, fr) = Network::new(Pid::new()); - std::thread::spawn(f); - std::thread::spawn(fr); - block_on(async { + let r = Arc::new(Runtime::new().unwrap()); + let network = Network::new(Pid::new(), Arc::clone(&r)); + let remote = Network::new(Pid::new(), Arc::clone(&r)); + r.block_on(async { network .listen(ProtocolAddr::Tcp("127.0.0.1:1220".parse().unwrap())) .await?; @@ -184,10 +180,10 @@ fn api_stream_recv_main() -> std::result::Result<(), Box> #[test] fn wrong_parse() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(1337).unwrap(); - match block_on(s1_b.recv::()) { + match r.block_on(s1_b.recv::()) { Err(StreamError::Deserialize(_)) => (), _ => panic!("this should fail, but it doesnt!"), } @@ -196,7 +192,7 @@ fn wrong_parse() { #[test] fn multiple_try_recv() { let (_, _) = helper::setup(false, 0); - let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); + let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send("asd").unwrap(); s1_a.send(11u32).unwrap(); diff --git a/server-cli/src/main.rs b/server-cli/src/main.rs index 18b3390199..b930bd9de9 100644 --- a/server-cli/src/main.rs +++ b/server-cli/src/main.rs @@ -129,9 +129,19 @@ fn main() -> io::Result<()> { let server_port = &server_settings.gameserver_address.port(); let metrics_port = &server_settings.metrics_address.port(); // Create server - let runtime = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()); - let mut server = Server::new(server_settings, editable_settings, &server_data_dir, runtime) - .expect("Failed to create server instance!"); + let runtime = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + ); + let mut server = Server::new( + server_settings, + editable_settings, + &server_data_dir, + runtime, + ) + .expect("Failed to create server instance!"); info!( ?server_port, diff --git a/server/src/lib.rs b/server/src/lib.rs index fe389ae266..a8ac9487fe 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -91,8 +91,8 @@ use std::{ }; #[cfg(not(feature = "worldgen"))] use test_world::{IndexOwned, World}; -use tracing::{debug, error, info, trace}; use tokio::runtime::Runtime; +use tracing::{debug, error, info, trace}; use uvth::{ThreadPool, ThreadPoolBuilder}; use vek::*; @@ -121,7 +121,7 @@ pub struct Server { connection_handler: ConnectionHandler, - runtime: Arc, + _runtime: Arc, thread_pool: ThreadPool, metrics: ServerMetrics, @@ -367,7 +367,8 @@ impl Server { let thread_pool = ThreadPoolBuilder::new() .name("veloren-worker".to_string()) .build(); - let network = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &metrics.registry()); + let network = + Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &metrics.registry()); metrics .run(settings.metrics_address) .expect("Failed to initialize server metrics submodule."); @@ -388,7 +389,7 @@ impl Server { connection_handler, - runtime, + _runtime: runtime, thread_pool, metrics, diff --git a/voxygen/src/menu/main/client_init.rs b/voxygen/src/menu/main/client_init.rs index 010071cd56..2297bf9232 100644 --- a/voxygen/src/menu/main/client_init.rs +++ b/voxygen/src/menu/main/client_init.rs @@ -72,7 +72,13 @@ impl ClientInit { let mut last_err = None; let cores = num_cpus::get(); - let runtime = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(if cores > 4 {cores-1} else {cores}).build().unwrap()); + let runtime = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(if cores > 4 { cores - 1 } else { cores }) + .build() + .unwrap(), + ); const FOUR_MINUTES_RETRIES: u64 = 48; 'tries: for _ in 0..FOUR_MINUTES_RETRIES { diff --git a/voxygen/src/singleplayer.rs b/voxygen/src/singleplayer.rs index 32368a19fc..bda1c1fd2b 100644 --- a/voxygen/src/singleplayer.rs +++ b/voxygen/src/singleplayer.rs @@ -83,7 +83,13 @@ impl Singleplayer { let thread_pool = client.map(|c| c.thread_pool().clone()); let cores = num_cpus::get(); - let runtime = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(if cores > 4 {cores-1} else {cores}).build().unwrap()); + let runtime = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(if cores > 4 { cores - 1 } else { cores }) + .build() + .unwrap(), + ); let settings2 = settings.clone(); let paused = Arc::new(AtomicBool::new(false));