From 1b77b6dc41841a867220dd94bae7fb9058ff05e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Wed, 13 Jan 2021 14:16:22 +0100 Subject: [PATCH] Initial switch to tokio for network, minimum working example. --- Cargo.lock | 143 ++++++++------------------- client/Cargo.toml | 1 + client/src/lib.rs | 10 +- network/Cargo.toml | 5 +- network/src/api.rs | 68 +++++++------ network/src/lib.rs | 2 +- network/src/participant.rs | 13 ++- network/src/protocols.rs | 27 ++--- network/src/scheduler.rs | 37 +++---- server-cli/Cargo.toml | 1 + server-cli/src/main.rs | 3 +- server/Cargo.toml | 1 + server/src/lib.rs | 7 +- voxygen/Cargo.toml | 2 + voxygen/src/menu/main/client_init.rs | 5 +- voxygen/src/singleplayer.rs | 4 +- 16 files changed, 149 insertions(+), 180 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d2595f8d09..56d9e2a87a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -248,41 +248,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "async-std" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "538ecb01eb64eecd772087e5b6f7540cbc917f047727339a472dafed2185b267" -dependencies = [ - "async-task", - "crossbeam-channel 0.4.4", - "crossbeam-deque 0.7.3", - "crossbeam-utils 0.7.2", - "futures-core", - "futures-io", - "futures-timer 2.0.2", - "kv-log-macro", - "log", - "memchr", - "mio 0.6.23", - "mio-uds", - "num_cpus", - "once_cell", - "pin-project-lite 0.1.11", - "pin-utils", - "slab", -] - -[[package]] -name = "async-task" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ac2c016b079e771204030951c366db398864f5026f84a44dafb0ff20f02085d" -dependencies = [ - "libc", - "winapi 0.3.9", -] - [[package]] name = "atom" version = "0.3.6" @@ -1114,16 +1079,6 @@ dependencies = [ "crossbeam-utils 0.6.6", ] -[[package]] -name = "crossbeam-channel" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" -dependencies = [ - "crossbeam-utils 0.7.2", - "maybe-uninit", -] - [[package]] name = "crossbeam-channel" version = "0.5.0" @@ -1290,16 +1245,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "ctor" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8f45d9ad417bcef4817d614a501ab55cdd96a6fdb24f49aab89a54acfd66b19" -dependencies = [ - "quote 1.0.9", - "syn 1.0.60", -] - [[package]] name = "daggy" version = "0.5.0" @@ -1861,12 +1806,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "futures-timer" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1de7508b218029b0f01662ed8f61b1c964b3ae99d6f25462d0f55a595109df6" - [[package]] name = "futures-timer" version = "3.0.2" @@ -2242,7 +2181,7 @@ dependencies = [ "http", "indexmap", "slab", - "tokio", + "tokio 0.2.25", "tokio-util", "tracing", "tracing-futures", @@ -2393,7 +2332,7 @@ dependencies = [ "itoa", "pin-project 1.0.5", "socket2", - "tokio", + "tokio 0.2.25", "tower-service", "tracing", "want", @@ -2410,7 +2349,7 @@ dependencies = [ "hyper", "log", "rustls 0.18.1", - "tokio", + "tokio 0.2.25", "tokio-rustls", "webpki", ] @@ -2687,15 +2626,6 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2db585e1d738fc771bf08a151420d3ed193d9d895a36df7f6f8a9456b911ddc" -[[package]] -name = "kv-log-macro" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" -dependencies = [ - "log", -] - [[package]] name = "lazy-bytes-cast" version = "5.0.1" @@ -2863,7 +2793,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" dependencies = [ "cfg-if 1.0.0", - "value-bag", ] [[package]] @@ -3088,17 +3017,6 @@ dependencies = [ "slab", ] -[[package]] -name = "mio-uds" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" -dependencies = [ - "iovec", - "libc", - "mio 0.6.23", -] - [[package]] name = "miow" version = "0.2.2" @@ -4282,7 +4200,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "tokio", + "tokio 0.2.25", "tokio-rustls", "url", "wasm-bindgen", @@ -5162,6 +5080,33 @@ dependencies = [ "slab", ] +[[package]] +name = "tokio" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8190d04c665ea9e6b6a0dc45523ade572c088d2e6566244c1122671dbf4ae3a" +dependencies = [ + "autocfg", + "bytes 1.0.1", + "libc", + "memchr", + "mio 0.7.7", + "num_cpus", + "pin-project-lite 0.2.4", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57" +dependencies = [ + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.60", +] + [[package]] name = "tokio-rustls" version = "0.14.1" @@ -5170,7 +5115,7 @@ checksum = "e12831b255bcfa39dc0436b01e19fea231a37db570686c06ee72c423479f889a" dependencies = [ "futures-core", "rustls 0.18.1", - "tokio", + "tokio 0.2.25", "webpki", ] @@ -5185,7 +5130,7 @@ dependencies = [ "futures-sink", "log", "pin-project-lite 0.1.11", - "tokio", + "tokio 0.2.25", ] [[package]] @@ -5528,15 +5473,6 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "value-bag" -version = "1.0.0-alpha.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b676010e055c99033117c2343b33a40a30b91fecd6c49055ac9cd2d6c305ab1" -dependencies = [ - "ctor", -] - [[package]] name = "vcpkg" version = "0.2.11" @@ -5596,7 +5532,7 @@ dependencies = [ "authc", "byteorder", "futures-executor", - "futures-timer 3.0.2", + "futures-timer", "futures-util", "hashbrown 0.9.1", "image", @@ -5604,6 +5540,7 @@ dependencies = [ "num_cpus", "rayon", "specs", + "tokio 1.2.0", "tracing", "tracing-subscriber", "uvth 3.1.1", @@ -5731,7 +5668,7 @@ dependencies = [ "dotenv", "futures-channel", "futures-executor", - "futures-timer 3.0.2", + "futures-timer", "futures-util", "hashbrown 0.9.1", "itertools 0.9.0", @@ -5749,6 +5686,7 @@ dependencies = [ "specs", "specs-idvs", "tiny_http", + "tokio 1.2.0", "tracing", "uvth 3.1.1", "vek 0.12.0", @@ -5772,6 +5710,7 @@ dependencies = [ "serde", "signal-hook 0.2.3", "termcolor", + "tokio 1.2.0", "tracing", "tracing-subscriber", "tracing-tracy", @@ -5818,6 +5757,7 @@ dependencies = [ "lazy_static", "native-dialog", "num 0.3.1", + "num_cpus", "old_school_gfx_glutin_ext", "ordered-float 2.1.1", "rand 0.8.3", @@ -5827,6 +5767,7 @@ dependencies = [ "specs", "specs-idvs", "termcolor", + "tokio 1.2.0", "tracing", "tracing-appender", "tracing-log", @@ -5893,9 +5834,8 @@ dependencies = [ [[package]] name = "veloren_network" -version = "0.2.0" +version = "0.3.0" dependencies = [ - "async-std", "bincode", "bitflags", "clap", @@ -5908,6 +5848,7 @@ dependencies = [ "serde", "shellexpand", "tiny_http", + "tokio 1.2.0", "tracing", "tracing-futures", "tracing-subscriber", diff --git a/client/Cargo.toml b/client/Cargo.toml index c775dcfc79..b2ebcfead1 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -21,6 +21,7 @@ uvth = "3.1.1" futures-util = "0.3.7" futures-executor = "0.3" futures-timer = "3.0" +tokio = { version = "1.0.1", default-features = false, features = ["rt"] } image = { version = "0.23.12", default-features = false, features = ["png"] } num = "0.3.1" num_cpus = "1.10.1" diff --git a/client/src/lib.rs b/client/src/lib.rs index 1f15a66bfe..9b13ed80f1 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -64,6 +64,7 @@ use std::{ time::{Duration, Instant}, }; use tracing::{debug, error, trace, warn}; +use tokio::runtime::Runtime; use uvth::{ThreadPool, ThreadPoolBuilder}; use vek::*; @@ -129,6 +130,7 @@ impl WorldData { pub struct Client { registered: bool, presence: Option, + runtime: Arc, thread_pool: ThreadPool, server_info: ServerInfo, world_data: WorldData, @@ -185,15 +187,14 @@ pub struct CharacterList { impl Client { /// Create a new `Client`. - pub fn new>(addr: A, view_distance: Option) -> Result { + pub fn new>(addr: A, view_distance: Option, runtime: Arc) -> Result { let mut thread_pool = ThreadPoolBuilder::new() .name("veloren-worker".into()) .build(); // We reduce the thread count by 1 to keep rendering smooth thread_pool.set_num_threads((num_cpus::get() - 1).max(1)); - let (network, scheduler) = Network::new(Pid::new()); - thread_pool.execute(scheduler); + let network = Network::new(Pid::new(), Arc::clone(&runtime)); let participant = block_on(network.connect(ProtocolAddr::Tcp(addr.into())))?; let stream = block_on(participant.opened())?; @@ -417,6 +418,7 @@ impl Client { Ok(Self { registered: false, presence: None, + runtime, thread_pool, server_info, world_data: WorldData { @@ -1733,6 +1735,8 @@ impl Client { /// exempt). pub fn thread_pool(&self) -> &ThreadPool { &self.thread_pool } + pub fn runtime(&self) -> &Arc { &self.runtime } + /// Get a reference to the client's game state. pub fn state(&self) -> &State { &self.state } diff --git a/network/Cargo.toml b/network/Cargo.toml index 49caa4d62d..d477be73e1 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "veloren_network" -version = "0.2.0" +version = "0.3.0" authors = ["Marcel Märtens "] edition = "2018" @@ -19,8 +19,7 @@ bincode = "1.3.1" serde = { version = "1.0" } #sending crossbeam-channel = "0.5" -# NOTE: Upgrading async-std can trigger spontanious crashes for `network`ing. Consider elaborate tests before upgrading -async-std = { version = "~1.5", default-features = false, features = ["std", "async-task", "default"] } +tokio = { version = "1.2", default-features = false, features = ["io-util", "macros", "rt", "net", "time"] } #tracing and metrics tracing = { version = "0.1", default-features = false } tracing-futures = "0.2" diff --git a/network/src/api.rs b/network/src/api.rs index 66ffa82096..8baaa72581 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -8,7 +8,8 @@ use crate::{ scheduler::Scheduler, types::{Mid, Pid, Prio, Promises, Sid}, }; -use async_std::{io, sync::Mutex, task}; +use tokio::{io, sync::Mutex}; +use tokio::runtime::Runtime; use futures::{ channel::{mpsc, oneshot}, sink::SinkExt, @@ -50,6 +51,7 @@ pub enum ProtocolAddr { pub struct Participant { local_pid: Pid, remote_pid: Pid, + runtime: Arc, a2b_stream_open_s: Mutex>, b2a_stream_opened_r: Mutex>, a2s_disconnect_s: A2sDisconnect, @@ -76,6 +78,7 @@ pub struct Stream { prio: Prio, promises: Promises, send_closed: Arc, + runtime: Arc, a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, b2a_msg_recv_r: Option>, a2b_close_stream_s: Option>, @@ -150,9 +153,10 @@ pub enum StreamError { /// [`connected`]: Network::connected pub struct Network { local_pid: Pid, + runtime: Arc, participant_disconnect_sender: Mutex>, listen_sender: - Mutex>)>>, + Mutex>)>>, connect_sender: Mutex>)>>, connected_receiver: Mutex>, @@ -165,17 +169,12 @@ impl Network { /// # Arguments /// * `participant_id` - provide it by calling [`Pid::new()`], usually you /// don't want to reuse a Pid for 2 `Networks` + /// * `runtime` - provide a tokio::Runtime, it's used to internally spawn tasks /// /// # Result /// * `Self` - returns a `Network` which can be `Send` to multiple areas of /// your code, including multiple threads. This is the base strct of this /// crate. - /// * `FnOnce` - you need to run the returning FnOnce exactly once, probably - /// in it's own thread. this is NOT done internally, so that you are free - /// to choose the threadpool implementation of your choice. We recommend - /// using [`ThreadPool`] from [`uvth`] crate. This fn will run the - /// Scheduler to handle all `Network` internals. Additional threads will - /// be allocated on an internal async-aware threadpool /// /// # Examples /// ```rust @@ -204,9 +203,10 @@ impl Network { /// [`Pid::new()`]: crate::types::Pid::new /// [`ThreadPool`]: https://docs.rs/uvth/newest/uvth/struct.ThreadPool.html /// [`uvth`]: https://docs.rs/uvth - pub fn new(participant_id: Pid) -> (Self, impl std::ops::FnOnce()) { + pub fn new(participant_id: Pid, runtime: Arc) -> Self { Self::internal_new( participant_id, + runtime, #[cfg(feature = "metrics")] None, ) @@ -232,42 +232,46 @@ impl Network { #[cfg(feature = "metrics")] pub fn new_with_registry( participant_id: Pid, + runtime: Arc, registry: &Registry, - ) -> (Self, impl std::ops::FnOnce()) { - Self::internal_new(participant_id, Some(registry)) + ) -> Self { + Self::internal_new(participant_id, runtime, Some(registry)) } fn internal_new( participant_id: Pid, + runtime: Arc, #[cfg(feature = "metrics")] registry: Option<&Registry>, - ) -> (Self, impl std::ops::FnOnce()) { + ) -> Self { let p = participant_id; debug!(?p, "Starting Network"); let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) = Scheduler::new( participant_id, + Arc::clone(&runtime), #[cfg(feature = "metrics")] registry, ); - ( - Self { - local_pid: participant_id, - participant_disconnect_sender: Mutex::new(HashMap::new()), - listen_sender: Mutex::new(listen_sender), - connect_sender: Mutex::new(connect_sender), - connected_receiver: Mutex::new(connected_receiver), - shutdown_sender: Some(shutdown_sender), - }, - move || { + runtime.spawn( + async move { trace!(?p, "Starting scheduler in own thread"); - let _handle = task::block_on( + let _handle = tokio::spawn( scheduler .run() .instrument(tracing::info_span!("scheduler", ?p)), ); trace!(?p, "Stopping scheduler and his own thread"); - }, - ) + } + ); + Self { + local_pid: participant_id, + runtime: runtime, + participant_disconnect_sender: Mutex::new(HashMap::new()), + listen_sender: Mutex::new(listen_sender), + connect_sender: Mutex::new(connect_sender), + connected_receiver: Mutex::new(connected_receiver), + shutdown_sender: Some(shutdown_sender), + } } /// starts listening on an [`ProtocolAddr`]. @@ -300,7 +304,7 @@ impl Network { /// /// [`connected`]: Network::connected pub async fn listen(&self, address: ProtocolAddr) -> Result<(), NetworkError> { - let (s2a_result_s, s2a_result_r) = oneshot::channel::>(); + let (s2a_result_s, s2a_result_r) = oneshot::channel::>(); debug!(?address, "listening on address"); self.listen_sender .lock() @@ -426,6 +430,7 @@ impl Participant { pub(crate) fn new( local_pid: Pid, remote_pid: Pid, + runtime: Arc, a2b_stream_open_s: mpsc::UnboundedSender, b2a_stream_opened_r: mpsc::UnboundedReceiver, a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>, @@ -433,6 +438,7 @@ impl Participant { Self { local_pid, remote_pid, + runtime, a2b_stream_open_s: Mutex::new(a2b_stream_open_s), b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r), a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), @@ -655,6 +661,7 @@ impl Stream { prio: Prio, promises: Promises, send_closed: Arc, + runtime: Arc, a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, b2a_msg_recv_r: mpsc::UnboundedReceiver, a2b_close_stream_s: mpsc::UnboundedSender, @@ -666,6 +673,7 @@ impl Stream { prio, promises, send_closed, + runtime, a2b_msg_s, b2a_msg_recv_r: Some(b2a_msg_recv_r), a2b_close_stream_s: Some(a2b_close_stream_s), @@ -960,7 +968,7 @@ impl Drop for Network { "Shutting down Participants of Network, while we still have metrics" ); let mut finished_receiver_list = vec![]; - task::block_on(async { + self.runtime.block_on(async { // we MUST avoid nested block_on, good that Network::Drop no longer triggers // Participant::Drop directly but just the BParticipant for (remote_pid, a2s_disconnect_s) in @@ -1013,14 +1021,14 @@ impl Drop for Participant { let pid = self.remote_pid; debug!(?pid, "Shutting down Participant"); - match task::block_on(self.a2s_disconnect_s.lock()).take() { + match self.runtime.block_on(self.a2s_disconnect_s.lock()).take() { None => trace!( ?pid, "Participant has been shutdown cleanly, no further waiting is required!" ), Some(mut a2s_disconnect_s) => { debug!(?pid, "Disconnect from Scheduler"); - task::block_on(async { + self.runtime.block_on(async { let (finished_sender, finished_receiver) = oneshot::channel(); a2s_disconnect_s .send((self.remote_pid, finished_sender)) @@ -1051,7 +1059,7 @@ impl Drop for Stream { let sid = self.sid; let pid = self.pid; debug!(?pid, ?sid, "Shutting down Stream"); - task::block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid)) + self.runtime.block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid)) .expect("bparticipant part of a gracefully shutdown must have crashed"); } else { let sid = self.sid; diff --git a/network/src/lib.rs b/network/src/lib.rs index bb14782a69..69bd5f07c0 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -39,7 +39,7 @@ //! //! # Examples //! ```rust -//! use async_std::task::sleep; +//! use tokio::task::sleep; //! use futures::{executor::block_on, join}; //! use veloren_network::{Network, Pid, Promises, ProtocolAddr}; //! diff --git a/network/src/participant.rs b/network/src/participant.rs index 8e0d0a1904..78d1dacd41 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -8,7 +8,8 @@ use crate::{ protocols::Protocols, types::{Cid, Frame, Pid, Prio, Promises, Sid}, }; -use async_std::sync::{Mutex, RwLock}; +use tokio::sync::{Mutex, RwLock}; +use tokio::runtime::Runtime; use futures::{ channel::{mpsc, oneshot}, future::FutureExt, @@ -71,6 +72,7 @@ pub struct BParticipant { remote_pid: Pid, remote_pid_string: String, //optimisation offset_sid: Sid, + runtime: Arc, channels: Arc>>>, streams: RwLock>, running_mgr: AtomicUsize, @@ -86,6 +88,7 @@ impl BParticipant { pub(crate) fn new( remote_pid: Pid, offset_sid: Sid, + runtime: Arc, #[cfg(feature = "metrics")] metrics: Arc, ) -> ( Self, @@ -120,6 +123,7 @@ impl BParticipant { remote_pid, remote_pid_string: remote_pid.to_string(), offset_sid, + runtime, channels: Arc::new(RwLock::new(HashMap::new())), streams: RwLock::new(HashMap::new()), running_mgr: AtomicUsize::new(0), @@ -213,7 +217,7 @@ impl BParticipant { .send((self.remote_pid, len as u64, /* */ 0)) .await .unwrap(); - async_std::task::sleep(TICK_TIME).await; + tokio::time::sleep(TICK_TIME).await; i += 1; if i.rem_euclid(1000) == 0 { trace!("Did 1000 ticks"); @@ -659,7 +663,7 @@ impl BParticipant { //Wait for other bparticipants mgr to close via AtomicUsize const SLEEP_TIME: Duration = Duration::from_millis(5); const ALLOWED_MANAGER: usize = 1; - async_std::task::sleep(SLEEP_TIME).await; + tokio::time::sleep(SLEEP_TIME).await; let mut i: u32 = 1; while self.running_mgr.load(Ordering::Relaxed) > ALLOWED_MANAGER { i += 1; @@ -670,7 +674,7 @@ impl BParticipant { self.running_mgr.load(Ordering::Relaxed) - ALLOWED_MANAGER ); } - async_std::task::sleep(SLEEP_TIME * i).await; + tokio::time::sleep(SLEEP_TIME * i).await; } trace!("All BParticipant mgr (except me) are shut down now"); @@ -843,6 +847,7 @@ impl BParticipant { prio, promises, send_closed, + Arc::clone(&self.runtime), a2p_msg_s, b2a_msg_recv_r, a2b_close_stream_s.clone(), diff --git a/network/src/protocols.rs b/network/src/protocols.rs index 7b0b8651b6..771ea649e5 100644 --- a/network/src/protocols.rs +++ b/network/src/protocols.rs @@ -4,8 +4,8 @@ use crate::{ participant::C2pFrame, types::{Cid, Frame}, }; -use async_std::{ - io::prelude::*, +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, net::{TcpStream, UdpSocket}, }; @@ -43,7 +43,8 @@ pub(crate) enum Protocols { #[derive(Debug)] pub(crate) struct TcpProtocol { - stream: TcpStream, + read_stream: tokio::sync::Mutex, + write_stream: tokio::sync::Mutex, #[cfg(feature = "metrics")] metrics: Arc, } @@ -63,14 +64,16 @@ impl TcpProtocol { stream: TcpStream, #[cfg(feature = "metrics")] metrics: Arc, ) -> Self { + let (read_stream, write_stream) = stream.into_split(); Self { - stream, + read_stream: tokio::sync::Mutex::new(read_stream), + write_stream: tokio::sync::Mutex::new(write_stream), #[cfg(feature = "metrics")] metrics, } } - async fn read_frame( + async fn read_frame( r: &mut R, mut end_receiver: &mut Fuse>, ) -> Result> { @@ -167,11 +170,11 @@ impl TcpProtocol { .metrics .wire_in_throughput .with_label_values(&[&cid.to_string()]); - let mut stream = self.stream.clone(); + let mut read_stream = self.read_stream.lock().await; let mut end_r = end_r.fuse(); loop { - match Self::read_frame(&mut stream, &mut end_r).await { + match Self::read_frame(&mut *read_stream, &mut end_r).await { Ok(frame) => { #[cfg(feature = "metrics")] { @@ -209,7 +212,7 @@ impl TcpProtocol { trace!("Shutting down tcp read()"); } - pub async fn write_frame( + pub async fn write_frame( w: &mut W, frame: Frame, ) -> Result<(), std::io::Error> { @@ -270,7 +273,7 @@ impl TcpProtocol { pub async fn write_to_wire(&self, cid: Cid, mut c2w_frame_r: mpsc::UnboundedReceiver) { trace!("Starting up tcp write()"); - let mut stream = self.stream.clone(); + let mut write_stream = self.write_stream.lock().await; #[cfg(feature = "metrics")] let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_out_total.clone(), cid); #[cfg(feature = "metrics")] @@ -294,7 +297,7 @@ impl TcpProtocol { throughput_cache.inc_by(data.len() as u64); } } - if let Err(e) = Self::write_frame(&mut stream, frame).await { + if let Err(e) = Self::write_frame(&mut *write_stream, frame).await { info!( ?e, "Got an error writing to tcp, going to close this channel" @@ -498,7 +501,7 @@ impl UdpProtocol { mod tests { use super::*; use crate::{metrics::NetworkMetrics, types::Pid}; - use async_std::net; + use tokio::net; use futures::{executor::block_on, stream::StreamExt}; use std::sync::Arc; @@ -534,7 +537,7 @@ mod tests { }) }); // Assert than we get some value back! Its a Handshake! - //async_std::task::sleep(std::time::Duration::from_millis(1000)); + //tokio::task::sleep(std::time::Duration::from_millis(1000)); let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap(); assert_eq!(cid, cid_r); if let Ok(Frame::Handshake { diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 33cb4ed054..e0c3b0ef84 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -7,10 +7,10 @@ use crate::{ protocols::{Protocols, TcpProtocol, UdpProtocol}, types::Pid, }; -use async_std::{io, net, sync::Mutex}; +use tokio::{io, net, sync::Mutex}; +use tokio::runtime::Runtime; use futures::{ channel::{mpsc, oneshot}, - executor::ThreadPool, future::FutureExt, select, sink::SinkExt, @@ -68,9 +68,9 @@ struct ParticipantChannels { #[derive(Debug)] pub struct Scheduler { local_pid: Pid, + runtime: Arc, local_secret: u128, closed: AtomicBool, - pool: Arc, run_channels: Option, participant_channels: Arc>>, participants: Arc>>, @@ -83,6 +83,7 @@ pub struct Scheduler { impl Scheduler { pub fn new( local_pid: Pid, + runtime: Arc, #[cfg(feature = "metrics")] registry: Option<&Registry>, ) -> ( Self, @@ -128,9 +129,9 @@ impl Scheduler { ( Self { local_pid, + runtime, local_secret, closed: AtomicBool::new(false), - pool: Arc::new(ThreadPool::new().unwrap()), run_channels, participant_channels: Arc::new(Mutex::new(Some(participant_channels))), participants: Arc::new(Mutex::new(HashMap::new())), @@ -247,7 +248,7 @@ impl Scheduler { Arc::clone(&self.metrics), udp_data_receiver, ); - self.pool.spawn_ok( + self.runtime.spawn( Self::udp_single_channel_connect(Arc::clone(&socket), udp_data_sender) .instrument(tracing::info_span!("udp", ?addr)), ); @@ -377,27 +378,19 @@ impl Scheduler { }, }; trace!(?addr, "Listener bound"); - let mut incoming = listener.incoming(); let mut end_receiver = s2s_stop_listening_r.fuse(); - while let Some(stream) = select! { - next = incoming.next().fuse() => next, + while let Some(data) = select! { + next = listener.accept().fuse() => Some(next), _ = end_receiver => None, } { - let stream = match stream { - Ok(s) => s, + let (stream, remote_addr) = match data { + Ok((s, p)) => (s, p), Err(e) => { warn!(?e, "TcpStream Error, ignoring connection attempt"); continue; }, }; - let peer_addr = match stream.peer_addr() { - Ok(s) => s, - Err(e) => { - warn!(?e, "TcpStream Error, ignoring connection attempt"); - continue; - }, - }; - info!("Accepting Tcp from: {}", peer_addr); + info!("Accepting Tcp from: {}", remote_addr); let protocol = TcpProtocol::new( stream, #[cfg(feature = "metrics")] @@ -505,13 +498,13 @@ impl Scheduler { // the UDP listening is done in another place. let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed); let participants = Arc::clone(&self.participants); + let runtime = Arc::clone(&self.runtime); #[cfg(feature = "metrics")] let metrics = Arc::clone(&self.metrics); - let pool = Arc::clone(&self.pool); let local_pid = self.local_pid; let local_secret = self.local_secret; // this is necessary for UDP to work at all and to remove code duplication - self.pool.spawn_ok( + self.runtime.spawn( async move { trace!(?cid, "Open channel and be ready for Handshake"); let handshake = Handshake::new( @@ -545,6 +538,7 @@ impl Scheduler { ) = BParticipant::new( pid, sid, + Arc::clone(&runtime), #[cfg(feature = "metrics")] Arc::clone(&metrics), ); @@ -552,6 +546,7 @@ impl Scheduler { let participant = Participant::new( local_pid, pid, + Arc::clone(&runtime), a2b_stream_open_s, b2a_stream_opened_r, participant_channels.a2s_disconnect_s, @@ -566,7 +561,7 @@ impl Scheduler { }); drop(participants); trace!("dropped participants lock"); - pool.spawn_ok( + runtime.spawn( bparticipant .run(participant_channels.b2s_prio_statistic_s) .instrument(tracing::info_span!("participant", ?pid)), diff --git a/server-cli/Cargo.toml b/server-cli/Cargo.toml index 6269f04932..4d8a3cf866 100644 --- a/server-cli/Cargo.toml +++ b/server-cli/Cargo.toml @@ -15,6 +15,7 @@ server = { package = "veloren-server", path = "../server", default-features = fa common = { package = "veloren-common", path = "../common" } common-net = { package = "veloren-common-net", path = "../common/net" } +tokio = { version = "1.0.1", default-features = false, features = ["rt-multi-thread"] } ansi-parser = "0.7" clap = "2.33" crossterm = "0.18" diff --git a/server-cli/src/main.rs b/server-cli/src/main.rs index ba83e1d67f..18b3390199 100644 --- a/server-cli/src/main.rs +++ b/server-cli/src/main.rs @@ -129,7 +129,8 @@ fn main() -> io::Result<()> { let server_port = &server_settings.gameserver_address.port(); let metrics_port = &server_settings.metrics_address.port(); // Create server - let mut server = Server::new(server_settings, editable_settings, &server_data_dir) + let runtime = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()); + let mut server = Server::new(server_settings, editable_settings, &server_data_dir, runtime) .expect("Failed to create server instance!"); info!( diff --git a/server/Cargo.toml b/server/Cargo.toml index e1f4b7c7e3..f997749565 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -28,6 +28,7 @@ futures-util = "0.3.7" futures-executor = "0.3" futures-timer = "3.0" futures-channel = "0.3" +tokio = { version = "1.0.1", default-features = false, features = ["rt"] } itertools = "0.9" lazy_static = "1.4.0" scan_fmt = { git = "https://github.com/Imberflur/scan_fmt" } diff --git a/server/src/lib.rs b/server/src/lib.rs index 93c5916d72..fe389ae266 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -92,6 +92,7 @@ use std::{ #[cfg(not(feature = "worldgen"))] use test_world::{IndexOwned, World}; use tracing::{debug, error, info, trace}; +use tokio::runtime::Runtime; use uvth::{ThreadPool, ThreadPoolBuilder}; use vek::*; @@ -120,6 +121,7 @@ pub struct Server { connection_handler: ConnectionHandler, + runtime: Arc, thread_pool: ThreadPool, metrics: ServerMetrics, @@ -136,6 +138,7 @@ impl Server { settings: Settings, editable_settings: EditableSettings, data_dir: &std::path::Path, + runtime: Arc, ) -> Result { info!("Server is data dir is: {}", data_dir.display()); if settings.auth_server_address.is_none() { @@ -364,11 +367,10 @@ impl Server { let thread_pool = ThreadPoolBuilder::new() .name("veloren-worker".to_string()) .build(); - let (network, f) = Network::new_with_registry(Pid::new(), &metrics.registry()); + let network = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &metrics.registry()); metrics .run(settings.metrics_address) .expect("Failed to initialize server metrics submodule."); - thread_pool.execute(f); block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?; let connection_handler = ConnectionHandler::new(network); @@ -386,6 +388,7 @@ impl Server { connection_handler, + runtime, thread_pool, metrics, diff --git a/voxygen/Cargo.toml b/voxygen/Cargo.toml index 7480a7ed2f..aeebeb0a30 100644 --- a/voxygen/Cargo.toml +++ b/voxygen/Cargo.toml @@ -82,6 +82,8 @@ ron = {version = "0.6", default-features = false} serde = {version = "1.0", features = [ "rc", "derive" ]} treeculler = "0.1.0" uvth = "3.1.1" +tokio = { version = "1.0.1", default-features = false, features = ["rt-multi-thread"] } +num_cpus = "1.0" # vec_map = { version = "0.8.2" } inline_tweak = "1.0.2" itertools = "0.10.0" diff --git a/voxygen/src/menu/main/client_init.rs b/voxygen/src/menu/main/client_init.rs index a1e01ab71a..010071cd56 100644 --- a/voxygen/src/menu/main/client_init.rs +++ b/voxygen/src/menu/main/client_init.rs @@ -71,6 +71,9 @@ impl ClientInit { let mut last_err = None; + let cores = num_cpus::get(); + let runtime = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(if cores > 4 {cores-1} else {cores}).build().unwrap()); + const FOUR_MINUTES_RETRIES: u64 = 48; 'tries: for _ in 0..FOUR_MINUTES_RETRIES { if cancel2.load(Ordering::Relaxed) { @@ -79,7 +82,7 @@ impl ClientInit { for socket_addr in first_addrs.clone().into_iter().chain(second_addrs.clone()) { - match Client::new(socket_addr, view_distance) { + match Client::new(socket_addr, view_distance, Arc::clone(&runtime)) { Ok(mut client) => { if let Err(e) = client.register(username, password, |auth_server| { diff --git a/voxygen/src/singleplayer.rs b/voxygen/src/singleplayer.rs index 0e1a050988..32368a19fc 100644 --- a/voxygen/src/singleplayer.rs +++ b/voxygen/src/singleplayer.rs @@ -82,6 +82,8 @@ impl Singleplayer { let editable_settings = server::EditableSettings::singleplayer(&server_data_dir); let thread_pool = client.map(|c| c.thread_pool().clone()); + let cores = num_cpus::get(); + let runtime = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(if cores > 4 {cores-1} else {cores}).build().unwrap()); let settings2 = settings.clone(); let paused = Arc::new(AtomicBool::new(false)); @@ -92,7 +94,7 @@ impl Singleplayer { let thread = thread::spawn(move || { let mut server = None; if let Err(e) = result_sender.send( - match Server::new(settings2, editable_settings, &server_data_dir) { + match Server::new(settings2, editable_settings, &server_data_dir, runtime) { Ok(s) => { server = Some(s); Ok(())