From 514d5db038d856a89c061b98af3854aaa52ee9a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Thu, 18 Feb 2021 01:01:57 +0100 Subject: [PATCH] Update Network Protocol - now last digit version is compatible 0.6.0 will connect to 0.6.1 - the TCP DATA Frames no longer contain START field, as it's not needed - the TCP OPENSTREAM Frames will now contain the BANDWIDTH field - MID is not Protocol internal Update network - update API with Bandwidth Update veloren - introduce better runtime and `async` things that are IO bound. - Remove `uvth` and instead use `tokio::runtime::Runtime::spawn_blocking` - remove futures_execute from client and server use tokio::runtime::Runtime instead - give threads a Name --- Cargo.lock | 63 +------ client/Cargo.toml | 4 - client/examples/chat-cli/main.rs | 28 +-- client/src/lib.rs | 70 +++----- network/Cargo.toml | 4 +- network/benches/speed.rs | 4 +- network/examples/chat.rs | 7 +- network/examples/fileshare/server.rs | 5 +- network/examples/network-speed/main.rs | 2 +- network/protocol/Cargo.toml | 2 +- network/protocol/benches/protocols.rs | 2 - network/protocol/src/event.rs | 7 +- network/protocol/src/frame.rs | 41 ++--- network/protocol/src/handshake.rs | 4 +- network/protocol/src/lib.rs | 4 +- network/protocol/src/message.rs | 2 - network/protocol/src/mpsc.rs | 3 +- network/protocol/src/tcp.rs | 37 ++-- network/protocol/src/types.rs | 2 +- network/src/api.rs | 29 ++-- network/src/channel.rs | 1 - network/src/lib.rs | 2 +- network/src/message.rs | 2 +- network/src/participant.rs | 20 +-- network/tests/closing.rs | 14 +- network/tests/helper.rs | 2 +- network/tests/integration.rs | 4 +- server-cli/src/logging.rs | 1 - server/Cargo.toml | 4 - server/src/chunk_generator.rs | 5 +- server/src/connection_handler.rs | 52 +++--- server/src/events/player.rs | 41 +++-- server/src/lib.rs | 27 +-- server/src/persistence/character_loader.rs | 21 +-- server/src/persistence/character_updater.rs | 17 +- voxygen/Cargo.toml | 1 - voxygen/src/logging.rs | 9 +- voxygen/src/main.rs | 15 +- voxygen/src/menu/char_selection/mod.rs | 2 +- voxygen/src/menu/main/client_init.rs | 179 ++++++++++++-------- voxygen/src/scene/figure/cache.rs | 5 +- voxygen/src/scene/figure/mod.rs | 26 +-- voxygen/src/scene/mod.rs | 4 +- voxygen/src/scene/simple.rs | 6 +- voxygen/src/scene/terrain.rs | 16 +- voxygen/src/session.rs | 4 +- voxygen/src/singleplayer.rs | 80 ++++----- voxygen/src/window.rs | 51 +++--- 48 files changed, 437 insertions(+), 494 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d2d049811d..c556d660ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -233,7 +233,7 @@ checksum = "dac94eeee6ebd1165959e440836a452109f9f839d6cfde12974d75a5b4222406" dependencies = [ "ahash 0.6.3", "bincode", - "crossbeam-channel 0.5.0", + "crossbeam-channel", "log", "notify 4.0.15", "parking_lot 0.11.1", @@ -1093,22 +1093,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd01a6eb3daaafa260f6fc94c3a6c36390abc2080e38e3e34ced87393fb77d80" dependencies = [ "cfg-if 1.0.0", - "crossbeam-channel 0.5.0", + "crossbeam-channel", "crossbeam-deque 0.8.0", "crossbeam-epoch 0.9.1", "crossbeam-queue", "crossbeam-utils 0.8.1", ] -[[package]] -name = "crossbeam-channel" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8ec7fcd21571dc78f96cc96243cab8d8f035247c3efd16c687be154c3fa9efa" -dependencies = [ - "crossbeam-utils 0.6.6", -] - [[package]] name = "crossbeam-channel" version = "0.5.0" @@ -1180,16 +1171,6 @@ dependencies = [ "crossbeam-utils 0.8.1", ] -[[package]] -name = "crossbeam-utils" -version = "0.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6" -dependencies = [ - "cfg-if 0.1.10", - "lazy_static", -] - [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -1829,12 +1810,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "futures-timer" -version = "3.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" - [[package]] name = "futures-util" version = "0.3.12" @@ -2162,7 +2137,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac2c82074cafb68b9e459c50c655f7eedcb92d6ee7166813802934bc6fc29fa3" dependencies = [ "ab_glyph", - "crossbeam-channel 0.5.0", + "crossbeam-channel", "crossbeam-deque 0.8.0", "linked-hash-map", "rayon", @@ -3302,7 +3277,7 @@ checksum = "58e54552360d7b89a698eca6de3927205a8e03e8080dc13d779de5c7876e098b" dependencies = [ "anymap", "bitflags", - "crossbeam-channel 0.5.0", + "crossbeam-channel", "filetime", "fsevent 2.0.2", "fsevent-sys 3.0.2", @@ -4142,7 +4117,7 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ab346ac5921dc62ffa9f89b7a773907511cdfa5490c572ae9be1be33e8afa4a" dependencies = [ - "crossbeam-channel 0.5.0", + "crossbeam-channel", "crossbeam-deque 0.8.0", "crossbeam-utils 0.8.1", "lazy_static", @@ -5241,7 +5216,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9965507e507f12c8901432a33e31131222abac31edd90cabbcf85cf544b7127a" dependencies = [ "chrono", - "crossbeam-channel 0.5.0", + "crossbeam-channel", "tracing-subscriber", ] @@ -5524,17 +5499,6 @@ dependencies = [ "serde", ] -[[package]] -name = "uvth" -version = "3.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e59a167890d173eb0fcd7a1b99b84dc05c521ae8d76599130b8e19bef287abbf" -dependencies = [ - "crossbeam-channel 0.3.9", - "log", - "num_cpus", -] - [[package]] name = "vcpkg" version = "0.2.11" @@ -5593,19 +5557,15 @@ version = "0.8.0" dependencies = [ "authc", "byteorder", - "futures-executor", - "futures-timer", "futures-util", "hashbrown 0.9.1", "image", "num 0.3.1", - "num_cpus", "rayon", "specs", "tokio 1.2.0", "tracing", "tracing-subscriber", - "uvth", "vek 0.12.0", "veloren-common", "veloren-common-net", @@ -5621,7 +5581,7 @@ dependencies = [ "arraygen", "assets_manager", "criterion", - "crossbeam-channel 0.5.0", + "crossbeam-channel", "crossbeam-utils 0.8.1", "csv", "directories-next", @@ -5702,7 +5662,7 @@ dependencies = [ "bytes 1.0.1", "clap", "criterion", - "crossbeam-channel 0.5.0", + "crossbeam-channel", "futures-core", "futures-util", "lazy_static", @@ -5767,13 +5727,10 @@ version = "0.8.0" dependencies = [ "authc", "chrono", - "crossbeam-channel 0.5.0", + "crossbeam-channel", "diesel", "diesel_migrations", "dotenv", - "futures-channel", - "futures-executor", - "futures-timer", "futures-util", "hashbrown 0.9.1", "itertools 0.9.0", @@ -5793,7 +5750,6 @@ dependencies = [ "specs-idvs", "tokio 1.2.0", "tracing", - "uvth", "vek 0.12.0", "veloren-common", "veloren-common-net", @@ -5880,7 +5836,6 @@ dependencies = [ "tracing-subscriber", "tracing-tracy", "treeculler", - "uvth", "vek 0.12.0", "veloren-client", "veloren-common", diff --git a/client/Cargo.toml b/client/Cargo.toml index 1578350d29..19bfe35608 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -17,14 +17,10 @@ common-net = { package = "veloren-common-net", path = "../common/net" } network = { package = "veloren-network", path = "../network", features = ["compression"], default-features = false } byteorder = "1.3.2" -uvth = "3.1.1" futures-util = "0.3.7" -futures-executor = "0.3" -futures-timer = "3.0" tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] } image = { version = "0.23.12", default-features = false, features = ["png"] } num = "0.3.1" -num_cpus = "1.10.1" tracing = { version = "0.1", default-features = false } rayon = "1.5" specs = { git = "https://github.com/amethyst/specs.git", rev = "d4435bdf496cf322c74886ca09dd8795984919b4" } diff --git a/client/examples/chat-cli/main.rs b/client/examples/chat-cli/main.rs index 115d9ae50c..a89c0dab5f 100644 --- a/client/examples/chat-cli/main.rs +++ b/client/examples/chat-cli/main.rs @@ -45,27 +45,29 @@ fn main() { let password = read_input(); let runtime = Arc::new(Runtime::new().unwrap()); + let runtime2 = Arc::clone(&runtime); // Create a client. - let mut client = Client::new( - server_addr - .to_socket_addrs() - .expect("Invalid server address") - .next() - .unwrap(), - None, - runtime, - ) - .expect("Failed to create client instance"); + let mut client = runtime + .block_on(Client::new( + server_addr + .to_socket_addrs() + .expect("Invalid server address") + .next() + .unwrap(), + None, + runtime2, + )) + .expect("Failed to create client instance"); println!("Server info: {:?}", client.server_info()); println!("Players online: {:?}", client.get_players()); - client - .register(username, password, |provider| { + runtime + .block_on(client.register(username, password, |provider| { provider == "https://auth.veloren.net" - }) + })) .unwrap(); let (tx, rx) = mpsc::channel(); diff --git a/client/src/lib.rs b/client/src/lib.rs index 9b1a37d46f..74d51b0d81 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -48,9 +48,7 @@ use common_net::{ }; use common_sys::state::State; use comp::BuffKind; -use futures_executor::block_on; -use futures_timer::Delay; -use futures_util::{select, FutureExt}; +use futures_util::FutureExt; use hashbrown::{HashMap, HashSet}; use image::DynamicImage; use network::{Network, Participant, Pid, ProtocolAddr, Stream}; @@ -63,9 +61,8 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use tokio::runtime::Runtime; +use tokio::{runtime::Runtime, select}; use tracing::{debug, error, trace, warn}; -use uvth::{ThreadPool, ThreadPoolBuilder}; use vek::*; const PING_ROLLING_AVERAGE_SECS: usize = 10; @@ -131,7 +128,6 @@ pub struct Client { registered: bool, presence: Option, runtime: Arc, - thread_pool: ThreadPool, server_info: ServerInfo, world_data: WorldData, player_list: HashMap, @@ -187,28 +183,22 @@ pub struct CharacterList { impl Client { /// Create a new `Client`. - pub fn new>( + pub async fn new>( addr: A, view_distance: Option, runtime: Arc, ) -> Result { - let mut thread_pool = ThreadPoolBuilder::new() - .name("veloren-worker".into()) - .build(); - // We reduce the thread count by 1 to keep rendering smooth - thread_pool.set_num_threads((num_cpus::get() - 1).max(1)); - let network = Network::new(Pid::new(), Arc::clone(&runtime)); - let participant = block_on(network.connect(ProtocolAddr::Tcp(addr.into())))?; - let stream = block_on(participant.opened())?; - let mut ping_stream = block_on(participant.opened())?; - let mut register_stream = block_on(participant.opened())?; - let character_screen_stream = block_on(participant.opened())?; - let in_game_stream = block_on(participant.opened())?; + let participant = network.connect(ProtocolAddr::Tcp(addr.into())).await?; + let stream = participant.opened().await?; + let mut ping_stream = participant.opened().await?; + let mut register_stream = participant.opened().await?; + let character_screen_stream = participant.opened().await?; + let in_game_stream = participant.opened().await?; register_stream.send(ClientType::Game)?; - let server_info: ServerInfo = block_on(register_stream.recv())?; + let server_info: ServerInfo = register_stream.recv().await?; // TODO: Display that versions don't match in Voxygen if server_info.git_hash != *common::util::GIT_HASH { @@ -236,7 +226,7 @@ impl Client { recipe_book, max_group_size, client_timeout, - ) = match block_on(register_stream.recv())? { + ) = match register_stream.recv().await? { ServerInit::GameSync { entity_package, time_of_day, @@ -411,19 +401,12 @@ impl Client { }?; ping_stream.send(PingMsg::Ping)?; - let mut thread_pool = ThreadPoolBuilder::new() - .name("veloren-worker".into()) - .build(); - // We reduce the thread count by 1 to keep rendering smooth - thread_pool.set_num_threads((num_cpus::get() - 1).max(1)); - debug!("Initial sync done"); Ok(Self { registered: false, presence: None, runtime, - thread_pool, server_info, world_data: WorldData { lod_base, @@ -470,13 +453,8 @@ impl Client { }) } - pub fn with_thread_pool(mut self, thread_pool: ThreadPool) -> Self { - self.thread_pool = thread_pool; - self - } - /// Request a state transition to `ClientState::Registered`. - pub fn register( + pub async fn register( &mut self, username: String, password: String, @@ -496,7 +474,7 @@ impl Client { self.send_msg_err(ClientRegister { token_or_username })?; - match block_on(self.register_stream.recv::())? { + match self.register_stream.recv::().await? { Err(RegisterError::AlreadyLoggedIn) => Err(Error::AlreadyLoggedIn), Err(RegisterError::AuthError(err)) => Err(Error::AuthErr(err)), Err(RegisterError::InvalidCharacter) => Err(Error::InvalidCharacter), @@ -1688,10 +1666,11 @@ impl Client { let mut handles_msg = 0; - block_on(async { + let runtime = Arc::clone(&self.runtime); + runtime.block_on(async { //TIMEOUT 0.01 ms for msg handling select!( - _ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()), + _ = tokio::time::sleep(std::time::Duration::from_micros(10)).fuse() => Ok(()), err = self.handle_messages(&mut frontend_events, &mut handles_msg).fuse() => err, ) })?; @@ -1733,12 +1712,10 @@ impl Client { * 1000.0 } - /// Get a reference to the client's worker thread pool. This pool should be + /// Get a reference to the client's runtime thread pool. This pool should be /// used for any computationally expensive operations that run outside /// of the main thread (i.e., threads that block on I/O operations are /// exempt). - pub fn thread_pool(&self) -> &ThreadPool { &self.thread_pool } - pub fn runtime(&self) -> &Arc { &self.runtime } /// Get a reference to the client's game state. @@ -2042,7 +2019,10 @@ impl Drop for Client { } else { trace!("no disconnect msg necessary as client wasn't registered") } - if let Err(e) = block_on(self.participant.take().unwrap().disconnect()) { + if let Err(e) = self + .runtime + .block_on(self.participant.take().unwrap().disconnect()) + { warn!(?e, "error when disconnecting, couldn't send all data"); } } @@ -2067,7 +2047,9 @@ mod tests { let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9000); let view_distance: Option = None; let runtime = Arc::new(Runtime::new().unwrap()); - let veloren_client: Result = Client::new(socket, view_distance, runtime); + let runtime2 = Arc::clone(&runtime); + let veloren_client: Result = + runtime.block_on(Client::new(socket, view_distance, runtime2)); let _ = veloren_client.map(|mut client| { //register @@ -2075,9 +2057,9 @@ mod tests { let password: String = "Bar".to_string(); let auth_server: String = "auth.veloren.net".to_string(); let _result: Result<(), Error> = - client.register(username, password, |suggestion: &str| { + runtime.block_on(client.register(username, password, |suggestion: &str| { suggestion == auth_server - }); + })); //clock let mut clock = Clock::new(Duration::from_secs_f64(SPT)); diff --git a/network/Cargo.toml b/network/Cargo.toml index b5aeb4be3b..179c927da4 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -28,7 +28,7 @@ tracing = { version = "0.1", default-features = false, features = ["attributes"] prometheus = { version = "0.11", default-features = false, optional = true } #async futures-core = { version = "0.3", default-features = false } -futures-util = { version = "0.3", default-features = false, features = ["std"] } +futures-util = { version = "0.3.7", default-features = false, features = ["std"] } async-channel = "1.5.1" #use for .close() channels #mpsc channel registry lazy_static = { version = "1.4", default-features = false } @@ -43,7 +43,7 @@ bytes = "^1" [dev-dependencies] tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] } tokio = { version = "1.2", default-features = false, features = ["io-std", "fs", "rt-multi-thread"] } -futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] } +futures-util = { version = "0.3.7", default-features = false, features = ["sink", "std"] } clap = { version = "2.33", default-features = false } shellexpand = "2.0.0" serde = { version = "1.0", features = ["derive"] } diff --git a/network/benches/speed.rs b/network/benches/speed.rs index 8de5c78335..76828ee523 100644 --- a/network/benches/speed.rs +++ b/network/benches/speed.rs @@ -31,7 +31,7 @@ fn criterion_util(c: &mut Criterion) { let (r, _n_a, p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(ProtocolAddr::Mpsc(5000)); - let s2_a = r.block_on(p_a.open(4, Promises::COMPRESSED)).unwrap(); + let s2_a = r.block_on(p_a.open(4, Promises::COMPRESSED, 0)).unwrap(); c.throughput(Throughput::Bytes(1000)) .bench_function("message_serialize", |b| { @@ -134,7 +134,7 @@ pub fn network_participant_stream( let p1_b = n_b.connect(addr).await.unwrap(); let p1_a = n_a.connected().await.unwrap(); - let s1_a = p1_a.open(4, Promises::empty()).await.unwrap(); + let s1_a = p1_a.open(4, Promises::empty(), 0).await.unwrap(); let s1_b = p1_b.opened().await.unwrap(); (n_a, p1_a, s1_a, n_b, p1_b, s1_b) diff --git a/network/examples/chat.rs b/network/examples/chat.rs index e5c7737531..9fb0c60eee 100644 --- a/network/examples/chat.rs +++ b/network/examples/chat.rs @@ -130,7 +130,10 @@ async fn client_connection( Ok(msg) => { println!("[{}]: {}", username, msg); for p in participants.read().await.iter() { - match p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await { + match p + .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0) + .await + { Err(_) => info!("error talking to client, //TODO drop it"), Ok(mut s) => s.send((username.clone(), msg.clone())).unwrap(), }; @@ -148,7 +151,7 @@ fn client(address: ProtocolAddr) { r.block_on(async { let p1 = client.connect(address.clone()).await.unwrap(); //remote representation of p1 let mut s1 = p1 - .open(4, Promises::ORDERED | Promises::CONSISTENCY) + .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0) .await .unwrap(); //remote representation of s1 let mut input_lines = io::BufReader::new(io::stdin()); diff --git a/network/examples/fileshare/server.rs b/network/examples/fileshare/server.rs index b6cf6c38dd..ba3e666c5e 100644 --- a/network/examples/fileshare/server.rs +++ b/network/examples/fileshare/server.rs @@ -121,8 +121,9 @@ impl Server { #[allow(clippy::eval_order_dependence)] async fn loop_participant(&self, p: Participant) { if let (Ok(cmd_out), Ok(file_out), Ok(cmd_in), Ok(file_in)) = ( - p.open(3, Promises::ORDERED | Promises::CONSISTENCY).await, - p.open(6, Promises::CONSISTENCY).await, + p.open(3, Promises::ORDERED | Promises::CONSISTENCY, 0) + .await, + p.open(6, Promises::CONSISTENCY, 0).await, p.opened().await, p.opened().await, ) { diff --git a/network/examples/network-speed/main.rs b/network/examples/network-speed/main.rs index bb6684658a..2bab12af5e 100644 --- a/network/examples/network-speed/main.rs +++ b/network/examples/network-speed/main.rs @@ -164,7 +164,7 @@ fn client(address: ProtocolAddr, runtime: Arc) { let p1 = runtime.block_on(client.connect(address)).unwrap(); //remote representation of p1 let mut s1 = runtime - .block_on(p1.open(4, Promises::ORDERED | Promises::CONSISTENCY)) + .block_on(p1.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)) .unwrap(); //remote representation of s1 let mut last = Instant::now(); let mut id = 0u64; diff --git a/network/protocol/Cargo.toml b/network/protocol/Cargo.toml index a9bd701940..954c217fe0 100644 --- a/network/protocol/Cargo.toml +++ b/network/protocol/Cargo.toml @@ -27,7 +27,7 @@ bytes = "^1" [dev-dependencies] async-channel = "1.5.1" -tokio = { version = "1.2", default-features = false, features = ["rt", "macros"] } +tokio = { version = "^1", default-features = false, features = ["rt", "macros"] } criterion = { version = "0.3.4", features = ["default", "async_tokio"] } [[bench]] diff --git a/network/protocol/benches/protocols.rs b/network/protocol/benches/protocols.rs index dfe6a57084..d8859943ed 100644 --- a/network/protocol/benches/protocols.rs +++ b/network/protocol/benches/protocols.rs @@ -47,7 +47,6 @@ async fn send_msg(mut s: T, data: Bytes, cnt: usize) { for i in 0..cnt { s.send(ProtocolEvent::Message { sid: Sid::new(12), - mid: i as u64, data: data.clone(), }) .await @@ -93,7 +92,6 @@ fn criterion_util(c: &mut Criterion) { let mut buffer = BytesMut::with_capacity(1500); let frame = OTFrame::Data { mid: 65, - start: 89u64, data: Bytes::from(&b"hello_world"[..]), }; b.iter_with_setup( diff --git a/network/protocol/src/event.rs b/network/protocol/src/event.rs index cc332e5d3c..f0f333ed8a 100644 --- a/network/protocol/src/event.rs +++ b/network/protocol/src/event.rs @@ -1,6 +1,6 @@ use crate::{ frame::OTFrame, - types::{Bandwidth, Mid, Prio, Promises, Sid}, + types::{Bandwidth, Prio, Promises, Sid}, }; use bytes::Bytes; @@ -23,7 +23,6 @@ pub enum ProtocolEvent { }, Message { data: Bytes, - mid: Mid, sid: Sid, }, } @@ -36,11 +35,12 @@ impl ProtocolEvent { sid, prio, promises, - guaranteed_bandwidth: _, + guaranteed_bandwidth, } => OTFrame::OpenStream { sid: *sid, prio: *prio, promises: *promises, + guaranteed_bandwidth: *guaranteed_bandwidth, }, ProtocolEvent::CloseStream { sid } => OTFrame::CloseStream { sid: *sid }, ProtocolEvent::Message { .. } => { @@ -68,7 +68,6 @@ mod tests { fn test_msg_buffer_panic() { let _ = ProtocolEvent::Message { data: Bytes::new(), - mid: 0, sid: Sid::new(23), } .to_frame(); diff --git a/network/protocol/src/frame.rs b/network/protocol/src/frame.rs index a490d67b4d..20a0439953 100644 --- a/network/protocol/src/frame.rs +++ b/network/protocol/src/frame.rs @@ -1,4 +1,4 @@ -use crate::types::{Mid, Pid, Prio, Promises, Sid}; +use crate::types::{Bandwidth, Mid, Pid, Prio, Promises, Sid}; use bytes::{Buf, BufMut, Bytes, BytesMut}; // const FRAME_RESERVED_1: u8 = 0; @@ -38,6 +38,7 @@ pub enum OTFrame { sid: Sid, prio: Prio, promises: Promises, + guaranteed_bandwidth: Bandwidth, }, CloseStream { sid: Sid, @@ -49,7 +50,6 @@ pub enum OTFrame { }, Data { mid: Mid, - start: u64, /* remove */ data: Bytes, }, } @@ -63,6 +63,7 @@ pub enum ITFrame { sid: Sid, prio: Prio, promises: Promises, + guaranteed_bandwidth: Bandwidth, }, CloseStream { sid: Sid, @@ -74,7 +75,6 @@ pub enum ITFrame { }, Data { mid: Mid, - start: u64, /* remove */ data: BytesMut, }, } @@ -161,9 +161,9 @@ impl InitFrame { pub(crate) const TCP_CLOSE_STREAM_CNS: usize = 8; /// const part of the DATA frame, actual size is variable -pub(crate) const TCP_DATA_CNS: usize = 18; +pub(crate) const TCP_DATA_CNS: usize = 10; pub(crate) const TCP_DATA_HEADER_CNS: usize = 24; -pub(crate) const TCP_OPEN_STREAM_CNS: usize = 10; +pub(crate) const TCP_OPEN_STREAM_CNS: usize = 18; // Size WITHOUT the 1rst indicating byte pub(crate) const TCP_SHUTDOWN_CNS: usize = 0; @@ -177,11 +177,13 @@ impl OTFrame { sid, prio, promises, + guaranteed_bandwidth, } => { bytes.put_u8(FRAME_OPEN_STREAM); sid.to_bytes(bytes); bytes.put_u8(prio); bytes.put_u8(promises.to_le_bytes()[0]); + bytes.put_u64_le(guaranteed_bandwidth); }, Self::CloseStream { sid } => { bytes.put_u8(FRAME_CLOSE_STREAM); @@ -193,10 +195,9 @@ impl OTFrame { sid.to_bytes(bytes); bytes.put_u64_le(length); }, - Self::Data { mid, start, data } => { + Self::Data { mid, data } => { bytes.put_u8(FRAME_DATA); bytes.put_u64_le(mid); - bytes.put_u64_le(start); bytes.put_u16_le(data.len() as u16); bytes.put_slice(&data); }, @@ -216,10 +217,10 @@ impl ITFrame { FRAME_CLOSE_STREAM => TCP_CLOSE_STREAM_CNS, FRAME_DATA_HEADER => TCP_DATA_HEADER_CNS, FRAME_DATA => { - if bytes.len() < 17 + 1 + 1 { + if bytes.len() < 9 + 1 + 1 { return None; } - u16::from_le_bytes([bytes[16 + 1], bytes[17 + 1]]) as usize + TCP_DATA_CNS + u16::from_le_bytes([bytes[8 + 1], bytes[9 + 1]]) as usize + TCP_DATA_CNS }, _ => return None, }; @@ -240,6 +241,7 @@ impl ITFrame { sid: Sid::from_bytes(&mut bytes), prio: bytes.get_u8(), promises: Promises::from_bits_truncate(bytes.get_u8()), + guaranteed_bandwidth: bytes.get_u64_le(), } }, FRAME_CLOSE_STREAM => { @@ -261,11 +263,10 @@ impl ITFrame { FRAME_DATA => { bytes.advance(1); let mid = bytes.get_u64_le(); - let start = bytes.get_u64_le(); let length = bytes.get_u16_le(); debug_assert_eq!(length as usize, size - TCP_DATA_CNS); let data = bytes.split_to(length as usize); - Self::Data { mid, start, data } + Self::Data { mid, data } }, _ => unreachable!("Frame::to_frame should be handled before!"), }; @@ -282,16 +283,18 @@ impl PartialEq for OTFrame { sid, prio, promises, + guaranteed_bandwidth, } => matches!(other, ITFrame::OpenStream { sid, prio, - promises + promises, + guaranteed_bandwidth, }), Self::CloseStream { sid } => matches!(other, ITFrame::CloseStream { sid }), Self::DataHeader { mid, sid, length } => { matches!(other, ITFrame::DataHeader { mid, sid, length }) }, - Self::Data { mid, start, data } => matches!(other, ITFrame::Data { mid, start, data }), + Self::Data { mid, data } => matches!(other, ITFrame::Data { mid, data }), } } } @@ -321,6 +324,7 @@ mod tests { sid: Sid::new(1337), prio: 14, promises: Promises::GUARANTEED_DELIVERY, + guaranteed_bandwidth: 1_000_000, }, OTFrame::DataHeader { sid: Sid::new(1337), @@ -329,12 +333,10 @@ mod tests { }, OTFrame::Data { mid: 0, - start: 0, data: Bytes::from(&[77u8; 20][..]), }, OTFrame::Data { mid: 0, - start: 20, data: Bytes::from(&[42u8; 16][..]), }, OTFrame::CloseStream { @@ -496,6 +498,7 @@ mod tests { sid: Sid::new(88), promises: Promises::ENCRYPTED, prio: 88, + guaranteed_bandwidth: 1_000_000, }; OTFrame::write_bytes(frame1, &mut buffer); } @@ -508,6 +511,7 @@ mod tests { sid: Sid::new(88), promises: Promises::ENCRYPTED, prio: 88, + guaranteed_bandwidth: 1_000_000, }; OTFrame::write_bytes(frame1, &mut buffer); buffer.truncate(6); // simulate partial retrieve @@ -527,12 +531,11 @@ mod tests { let frame1 = OTFrame::Data { mid: 7u64, - start: 1u64, data: Bytes::from(&b"foobar"[..]), }; OTFrame::write_bytes(frame1, &mut buffer); - buffer[17] = 255; + buffer[9] = 255; let framed = ITFrame::read_frame(&mut buffer); assert_eq!(framed, None); } @@ -543,18 +546,16 @@ mod tests { let frame1 = OTFrame::Data { mid: 7u64, - start: 1u64, data: Bytes::from(&b"foobar"[..]), }; OTFrame::write_bytes(frame1, &mut buffer); - buffer[17] = 3; + buffer[9] = 3; let framed = ITFrame::read_frame(&mut buffer); assert_eq!( framed, Some(ITFrame::Data { mid: 7u64, - start: 1u64, data: BytesMut::from(&b"foo"[..]), }) ); diff --git a/network/protocol/src/handshake.rs b/network/protocol/src/handshake.rs index fda3893d72..bc672420d3 100644 --- a/network/protocol/src/handshake.rs +++ b/network/protocol/src/handshake.rs @@ -80,7 +80,9 @@ where .send(InitFrame::Raw(WRONG_NUMBER.as_bytes().to_vec())) .await?; Err(InitProtocolError::WrongMagicNumber(magic_number)) - } else if version != VELOREN_NETWORK_VERSION { + } else if version[0] != VELOREN_NETWORK_VERSION[0] + || version[1] != VELOREN_NETWORK_VERSION[1] + { error!(?version, "Connection with wrong network version"); #[cfg(debug_assertions)] drain diff --git a/network/protocol/src/lib.rs b/network/protocol/src/lib.rs index fc22b9b711..8896d7c447 100644 --- a/network/protocol/src/lib.rs +++ b/network/protocol/src/lib.rs @@ -65,9 +65,7 @@ pub use metrics::ProtocolMetricCache; pub use metrics::ProtocolMetrics; pub use mpsc::{MpscMsg, MpscRecvProtocol, MpscSendProtocol}; pub use tcp::{TcpRecvProtocol, TcpSendProtocol}; -pub use types::{ - Bandwidth, Cid, Mid, Pid, Prio, Promises, Sid, HIGHEST_PRIO, VELOREN_NETWORK_VERSION, -}; +pub use types::{Bandwidth, Cid, Pid, Prio, Promises, Sid, HIGHEST_PRIO, VELOREN_NETWORK_VERSION}; ///use at own risk, might change any time, for internal benchmarks pub mod _internal { diff --git a/network/protocol/src/message.rs b/network/protocol/src/message.rs index c71c0b5515..6e39b96eae 100644 --- a/network/protocol/src/message.rs +++ b/network/protocol/src/message.rs @@ -57,12 +57,10 @@ impl OTMessage { fn get_next_data(&mut self) -> OTFrame { let to_send = std::cmp::min(self.data.len(), Self::FRAME_DATA_SIZE as usize); let data = self.data.split_to(to_send); - let start = self.start; self.start += Self::FRAME_DATA_SIZE; OTFrame::Data { mid: self.mid, - start, data, } } diff --git a/network/protocol/src/mpsc.rs b/network/protocol/src/mpsc.rs index 1eb987c75c..dca355fec2 100644 --- a/network/protocol/src/mpsc.rs +++ b/network/protocol/src/mpsc.rs @@ -78,7 +78,6 @@ where match &event { ProtocolEvent::Message { data: _data, - mid: _, sid: _sid, } => { #[cfg(feature = "metrics")] @@ -118,7 +117,7 @@ where MpscMsg::Event(e) => { #[cfg(feature = "metrics")] { - if let ProtocolEvent::Message { data, mid: _, sid } = &e { + if let ProtocolEvent::Message { data, sid } = &e { let sid = *sid; let bytes = data.len() as u64; let line = self.metrics.init_sid(sid); diff --git a/network/protocol/src/tcp.rs b/network/protocol/src/tcp.rs index 07a44ab0de..95c7a31215 100644 --- a/network/protocol/src/tcp.rs +++ b/network/protocol/src/tcp.rs @@ -28,6 +28,7 @@ where { buffer: BytesMut, store: PrioManager, + next_mid: Mid, closing_streams: Vec, notify_closing_streams: Vec, pending_shutdown: bool, @@ -59,6 +60,7 @@ where Self { buffer: BytesMut::new(), store: PrioManager::new(metrics.clone()), + next_mid: 0u64, closing_streams: vec![], notify_closing_streams: vec![], pending_shutdown: false, @@ -146,9 +148,10 @@ where self.pending_shutdown = true; } }, - ProtocolEvent::Message { data, mid, sid } => { + ProtocolEvent::Message { data, sid } => { self.metrics.smsg_ib(sid, data.len() as u64); - self.store.add(data, mid, sid); + self.store.add(data, self.next_mid, sid); + self.next_mid += 1; }, } Ok(()) @@ -160,12 +163,7 @@ where let mut data_frames = 0; let mut data_bandwidth = 0; for frame in frames { - if let OTFrame::Data { - mid: _, - start: _, - data, - } = &frame - { + if let OTFrame::Data { mid: _, data } = &frame { data_bandwidth += data.len(); data_frames += 1; } @@ -228,12 +226,13 @@ where sid, prio, promises, + guaranteed_bandwidth, } => { break 'outer Ok(ProtocolEvent::OpenStream { sid, prio: prio.min(crate::types::HIGHEST_PRIO), promises, - guaranteed_bandwidth: 1_000_000, + guaranteed_bandwidth, }); }, ITFrame::CloseStream { sid } => { @@ -244,11 +243,7 @@ where self.metrics.rmsg_ib(sid, length); self.incoming.insert(mid, m); }, - ITFrame::Data { - mid, - start: _, - data, - } => { + ITFrame::Data { mid, data } => { self.metrics.rdata_frames_b(data.len() as u64); let m = match self.incoming.get_mut(&mid) { Some(m) => m, @@ -271,7 +266,6 @@ where ); break 'outer Ok(ProtocolEvent::Message { sid: m.sid, - mid, data: m.data.freeze(), }); } @@ -431,7 +425,6 @@ mod tests { let _ = r.recv().await.unwrap(); let event = ProtocolEvent::Message { sid: Sid::new(10), - mid: 0, data: Bytes::from(&[188u8; 600][..]), }; s.send(event.clone()).await.unwrap(); @@ -441,7 +434,6 @@ mod tests { // 2nd short message let event = ProtocolEvent::Message { sid: Sid::new(10), - mid: 1, data: Bytes::from(&[7u8; 30][..]), }; s.send(event.clone()).await.unwrap(); @@ -467,7 +459,6 @@ mod tests { let _ = r.recv().await.unwrap(); let event = ProtocolEvent::Message { sid, - mid: 77, data: Bytes::from(&[99u8; 500_000][..]), }; s.send(event.clone()).await.unwrap(); @@ -495,7 +486,6 @@ mod tests { let _ = r.recv().await.unwrap(); let event = ProtocolEvent::Message { sid, - mid: 77, data: Bytes::from(&[99u8; 500_000][..]), }; s.send(event).await.unwrap(); @@ -524,7 +514,6 @@ mod tests { let _ = r.recv().await.unwrap(); let event = ProtocolEvent::Message { sid, - mid: 77, data: Bytes::from(&[99u8; 500_000][..]), }; s.send(event).await.unwrap(); @@ -556,14 +545,12 @@ mod tests { s.send(event).await.unwrap(); let event = ProtocolEvent::Message { sid, - mid: 77, data: Bytes::from(&[99u8; 500_000][..]), }; s.send(event).await.unwrap(); s.flush(1_000_000, Duration::from_secs(1)).await.unwrap(); let event = ProtocolEvent::Message { sid, - mid: 78, data: Bytes::from(&[100u8; 500_000][..]), }; s.send(event).await.unwrap(); @@ -593,6 +580,7 @@ mod tests { sid, prio: 5u8, promises: Promises::COMPRESSED, + guaranteed_bandwidth: 1_000_000, } .write_bytes(&mut bytes); OTFrame::DataHeader { @@ -605,13 +593,11 @@ mod tests { OTFrame::Data { mid: 99, - start: 0, data: Bytes::from(&DATA1[..]), } .write_bytes(&mut bytes); OTFrame::Data { mid: 99, - start: DATA1.len() as u64, data: Bytes::from(&DATA2[..]), } .write_bytes(&mut bytes); @@ -641,6 +627,7 @@ mod tests { sid, prio: 5u8, promises: Promises::COMPRESSED, + guaranteed_bandwidth: 1_000_000, } .write_bytes(&mut bytes); s.send(bytes.split()).await.unwrap(); @@ -670,7 +657,6 @@ mod tests { let _ = p2.1.recv().await.unwrap(); let event = ProtocolEvent::Message { sid: Sid::new(10), - mid: 0, data: Bytes::from(&[188u8; 600][..]), }; p2.0.send(event.clone()).await.unwrap(); @@ -695,7 +681,6 @@ mod tests { p2.0.notify_from_recv(e); let event = ProtocolEvent::Message { sid: Sid::new(10), - mid: 0, data: Bytes::from(&[188u8; 600][..]), }; p2.0.send(event.clone()).await.unwrap(); diff --git a/network/protocol/src/types.rs b/network/protocol/src/types.rs index afa5f0d866..0037795314 100644 --- a/network/protocol/src/types.rs +++ b/network/protocol/src/types.rs @@ -49,7 +49,7 @@ impl Promises { pub(crate) const VELOREN_MAGIC_NUMBER: [u8; 7] = *b"VELOREN"; /// When this semver differs, 2 Networks can't communicate. -pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 5, 0]; +pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 6, 0]; pub(crate) const STREAM_ID_OFFSET1: Sid = Sid::new(0); pub(crate) const STREAM_ID_OFFSET2: Sid = Sid::new(u64::MAX / 2); /// Maximal possible Prio to choose (for performance reasons) diff --git a/network/src/api.rs b/network/src/api.rs index f60b4c4743..c1110298b0 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -438,6 +438,9 @@ impl Participant { /// link for further documentation. You can combine them, e.g. /// `Promises::ORDERED | Promises::CONSISTENCY` The Stream will then /// guarantee that those promises are met. + /// * `bandwidth` - sets a guaranteed bandwidth which is reserved for this + /// stream. When excess bandwidth is available it will be used. See + /// [`Bandwidth`] for details. /// /// A [`ParticipantError`] might be thrown if the `Participant` is already /// closed. [`Streams`] can be created without a answer from the remote @@ -460,7 +463,7 @@ impl Participant { /// .connect(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap())) /// .await?; /// let _s1 = p1 - /// .open(4, Promises::ORDERED | Promises::CONSISTENCY) + /// .open(4, Promises::ORDERED | Promises::CONSISTENCY, 1000) /// .await?; /// # Ok(()) /// }) @@ -468,16 +471,22 @@ impl Participant { /// ``` /// /// [`Prio`]: network_protocol::Prio + /// [`Bandwidth`]: network_protocol::Bandwidth /// [`Promises`]: network_protocol::Promises /// [`Streams`]: crate::api::Stream #[instrument(name="network", skip(self, prio, promises), fields(p = %self.local_pid))] - pub async fn open(&self, prio: u8, promises: Promises) -> Result { + pub async fn open( + &self, + prio: u8, + promises: Promises, + bandwidth: Bandwidth, + ) -> Result { debug_assert!(prio <= network_protocol::HIGHEST_PRIO, "invalid prio"); let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel::(); if let Err(e) = self.a2b_open_stream_s.lock().await.send(( prio, promises, - 1_000_000, + bandwidth, p2a_return_stream_s, )) { debug!(?e, "bParticipant is already closed, notifying"); @@ -519,7 +528,7 @@ impl Participant { /// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?; /// let p1 = network.connect(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?; /// # let p2 = remote.connected().await?; - /// # p2.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; + /// # p2.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// let _s1 = p1.opened().await?; /// # Ok(()) /// }) @@ -704,7 +713,7 @@ impl Stream { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?; /// # // keep it alive - /// # let _stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; + /// # let _stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// let participant_a = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; /// //Send Message @@ -746,8 +755,8 @@ impl Stream { /// # let remote1_p = remote1.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; /// # let remote2_p = remote2.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; /// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid()); - /// # remote1_p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; - /// # remote2_p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; + /// # remote1_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; + /// # remote2_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// let participant_a = network.connected().await?; /// let participant_b = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; @@ -801,7 +810,7 @@ impl Stream { /// runtime.block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?; - /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; + /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// # stream_p.send("Hello World"); /// let participant_a = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; @@ -834,7 +843,7 @@ impl Stream { /// runtime.block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?; - /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; + /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// # stream_p.send("Hello World"); /// let participant_a = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; @@ -889,7 +898,7 @@ impl Stream { /// runtime.block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?; - /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; + /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// # stream_p.send("Hello World"); /// # std::thread::sleep(std::time::Duration::from_secs(1)); /// let participant_a = network.connected().await?; diff --git a/network/src/channel.rs b/network/src/channel.rs index 22265a7f8e..affaa1ee34 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -222,7 +222,6 @@ mod tests { s.send(event.clone()).await.unwrap(); s.send(ProtocolEvent::Message { sid: Sid::new(1), - mid: 0, data: Bytes::from(&[8u8; 8][..]), }) .await diff --git a/network/src/lib.rs b/network/src/lib.rs index 9981a9c987..4dd6b5eef7 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -51,7 +51,7 @@ //! .connect(ProtocolAddr::Tcp("127.0.0.1:12345".parse().unwrap())) //! .await?; //! let mut stream = server -//! .open(4, Promises::ORDERED | Promises::CONSISTENCY) +//! .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0) //! .await?; //! stream.send("Hello World")?; //! Ok(()) diff --git a/network/src/message.rs b/network/src/message.rs index 27c50abf8d..8da46a4fb2 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -83,7 +83,7 @@ impl Message { /// # runtime.block_on(async { /// # network.listen(ProtocolAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?; - /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; + /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// # stream_p.send("Hello World"); /// # let participant_a = network.connected().await?; /// let mut stream_a = participant_a.opened().await?; diff --git a/network/src/participant.rs b/network/src/participant.rs index bcbc219c5c..170c80c2ce 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -51,12 +51,6 @@ struct ControlChannels { s2b_shutdown_bparticipant_r: oneshot::Receiver, /* own */ } -#[derive(Debug)] -struct ShutdownInfo { - b2b_close_stream_opened_sender_s: Option>, - error: Option, -} - #[derive(Debug)] struct OpenStreamInfo { a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>, @@ -74,7 +68,6 @@ pub struct BParticipant { run_channels: Option, shutdown_barrier: AtomicI32, metrics: Arc, - no_channel_error_info: RwLock<(Instant, u64)>, open_stream_channels: Arc>>, } @@ -84,7 +77,7 @@ impl BParticipant { const BARR_RECV: i32 = 4; const BARR_SEND: i32 = 2; const TICK_TIME: Duration = Duration::from_millis(Self::TICK_TIME_MS); - const TICK_TIME_MS: u64 = 10; + const TICK_TIME_MS: u64 = 5; #[allow(clippy::type_complexity)] pub(crate) fn new( @@ -124,7 +117,6 @@ impl BParticipant { ), run_channels, metrics, - no_channel_error_info: RwLock::new((Instant::now(), 0)), open_stream_channels: Arc::new(Mutex::new(None)), }, a2b_open_stream_s, @@ -203,7 +195,6 @@ impl BParticipant { let mut interval = tokio::time::interval(Self::TICK_TIME); let mut last_instant = Instant::now(); let mut stream_ids = self.offset_sid; - let mut fake_mid = 0; //TODO: move MID to protocol, should be inc per stream ? or ? trace!("workaround, actively wait for first protocol"); b2b_add_protocol_r .recv() @@ -267,13 +258,8 @@ impl BParticipant { // get all messages and assign it to a channel for (sid, buffer) in a2b_msg_r.try_iter() { - fake_mid += 1; active - .send(ProtocolEvent::Message { - data: buffer, - mid: fake_mid, - sid, - }) + .send(ProtocolEvent::Message { data: buffer, sid }) .await? } @@ -416,7 +402,7 @@ impl BParticipant { self.delete_stream(sid).await; retrigger(cid, p, &mut recv_protocols); }, - Ok(ProtocolEvent::Message { data, mid: _, sid }) => { + Ok(ProtocolEvent::Message { data, sid }) => { let lock = self.streams.read().await; match lock.get(&sid) { Some(stream) => { diff --git a/network/tests/closing.rs b/network/tests/closing.rs index 8606879174..76c684a5e4 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -230,7 +230,7 @@ fn close_network_then_disconnect_part() { fn opened_stream_before_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); - let mut s2_a = r.block_on(p_a.open(4, Promises::empty())).unwrap(); + let mut s2_a = r.block_on(p_a.open(4, Promises::empty(), 0)).unwrap(); s2_a.send("HelloWorld").unwrap(); let mut s2_b = r.block_on(p_b.opened()).unwrap(); drop(p_a); @@ -243,7 +243,7 @@ fn opened_stream_before_remote_part_is_closed() { fn opened_stream_after_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); - let mut s2_a = r.block_on(p_a.open(3, Promises::empty())).unwrap(); + let mut s2_a = r.block_on(p_a.open(3, Promises::empty(), 0)).unwrap(); s2_a.send("HelloWorld").unwrap(); drop(p_a); std::thread::sleep(std::time::Duration::from_millis(1000)); @@ -260,14 +260,14 @@ fn opened_stream_after_remote_part_is_closed() { fn open_stream_after_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); - let mut s2_a = r.block_on(p_a.open(4, Promises::empty())).unwrap(); + let mut s2_a = r.block_on(p_a.open(4, Promises::empty(), 0)).unwrap(); s2_a.send("HelloWorld").unwrap(); drop(p_a); std::thread::sleep(std::time::Duration::from_millis(1000)); let mut s2_b = r.block_on(p_b.opened()).unwrap(); assert_eq!(r.block_on(s2_b.recv()), Ok("HelloWorld".to_string())); assert_eq!( - r.block_on(p_b.open(5, Promises::empty())).unwrap_err(), + r.block_on(p_b.open(5, Promises::empty(), 0)).unwrap_err(), ParticipantError::ParticipantDisconnected ); drop((_n_a, _n_b, p_b)); //clean teardown @@ -294,7 +294,7 @@ fn open_participant_before_remote_part_is_closed() { let addr = tcp(); r.block_on(n_a.listen(addr.clone())).unwrap(); let p_b = r.block_on(n_b.connect(addr)).unwrap(); - let mut s1_b = r.block_on(p_b.open(4, Promises::empty())).unwrap(); + let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap(); s1_b.send("HelloWorld").unwrap(); let p_a = r.block_on(n_a.connected()).unwrap(); drop(s1_b); @@ -314,7 +314,7 @@ fn open_participant_after_remote_part_is_closed() { let addr = tcp(); r.block_on(n_a.listen(addr.clone())).unwrap(); let p_b = r.block_on(n_b.connect(addr)).unwrap(); - let mut s1_b = r.block_on(p_b.open(4, Promises::empty())).unwrap(); + let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap(); s1_b.send("HelloWorld").unwrap(); drop(s1_b); drop(p_b); @@ -334,7 +334,7 @@ fn close_network_scheduler_completely() { let addr = tcp(); r.block_on(n_a.listen(addr.clone())).unwrap(); let p_b = r.block_on(n_b.connect(addr)).unwrap(); - let mut s1_b = r.block_on(p_b.open(4, Promises::empty())).unwrap(); + let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap(); s1_b.send("HelloWorld").unwrap(); let p_a = r.block_on(n_a.connected()).unwrap(); diff --git a/network/tests/helper.rs b/network/tests/helper.rs index eb5806190b..bc3b1c58a6 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -67,7 +67,7 @@ pub fn network_participant_stream( let p1_b = n_b.connect(addr).await.unwrap(); let p1_a = n_a.connected().await.unwrap(); - let s1_a = p1_a.open(4, Promises::empty()).await.unwrap(); + let s1_a = p1_a.open(4, Promises::empty(), 0).await.unwrap(); let s1_b = p1_b.opened().await.unwrap(); (n_a, p1_a, s1_a, n_b, p1_b, s1_b) diff --git a/network/tests/integration.rs b/network/tests/integration.rs index af30b1c89f..56d90cb4cc 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -177,7 +177,7 @@ fn api_stream_send_main() -> std::result::Result<(), Box> .await?; // keep it alive let _stream_p = remote_p - .open(4, Promises::ORDERED | Promises::CONSISTENCY) + .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0) .await?; let participant_a = network.connected().await?; let mut stream_a = participant_a.opened().await?; @@ -205,7 +205,7 @@ fn api_stream_recv_main() -> std::result::Result<(), Box> .connect(ProtocolAddr::Tcp("127.0.0.1:1220".parse().unwrap())) .await?; let mut stream_p = remote_p - .open(4, Promises::ORDERED | Promises::CONSISTENCY) + .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0) .await?; stream_p.send("Hello World")?; let participant_a = network.connected().await?; diff --git a/server-cli/src/logging.rs b/server-cli/src/logging.rs index 74eb2dc304..1814eac83a 100644 --- a/server-cli/src/logging.rs +++ b/server-cli/src/logging.rs @@ -16,7 +16,6 @@ pub fn init(basic: bool) { let base_exceptions = |env: EnvFilter| { env.add_directive("veloren_world::sim=info".parse().unwrap()) .add_directive("veloren_world::civ=info".parse().unwrap()) - .add_directive("uvth=warn".parse().unwrap()) .add_directive("hyper=info".parse().unwrap()) .add_directive("prometheus_hyper=info".parse().unwrap()) .add_directive("mio::pool=info".parse().unwrap()) diff --git a/server/Cargo.toml b/server/Cargo.toml index f73ed1f02f..e70dfafcbd 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -23,11 +23,7 @@ specs-idvs = { git = "https://gitlab.com/veloren/specs-idvs.git", rev = "9fab7b3 tracing = "0.1" vek = { version = "0.12.0", features = ["serde"] } -uvth = "3.1.1" futures-util = "0.3.7" -futures-executor = "0.3" -futures-timer = "3.0" -futures-channel = "0.3" tokio = { version = "1", default-features = false, features = ["rt"] } prometheus-hyper = "0.1.1" itertools = "0.9" diff --git a/server/src/chunk_generator.rs b/server/src/chunk_generator.rs index 624c67c899..791b81ff81 100644 --- a/server/src/chunk_generator.rs +++ b/server/src/chunk_generator.rs @@ -8,6 +8,7 @@ use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; +use tokio::runtime::Runtime; use vek::*; #[cfg(feature = "worldgen")] use world::{IndexOwned, World}; @@ -39,7 +40,7 @@ impl ChunkGenerator { &mut self, entity: Option, key: Vec2, - thread_pool: &mut uvth::ThreadPool, + runtime: &mut Arc, world: Arc, index: IndexOwned, ) { @@ -52,7 +53,7 @@ impl ChunkGenerator { v.insert(Arc::clone(&cancel)); let chunk_tx = self.chunk_tx.clone(); self.metrics.chunks_requested.inc(); - thread_pool.execute(move || { + runtime.spawn_blocking(move || { let index = index.as_index_ref(); let payload = world .generate_chunk(index, key, || cancel.load(Ordering::Relaxed)) diff --git a/server/src/connection_handler.rs b/server/src/connection_handler.rs index 4128d3115c..e8697991a3 100644 --- a/server/src/connection_handler.rs +++ b/server/src/connection_handler.rs @@ -1,11 +1,9 @@ use crate::{Client, ClientType, ServerInfo}; use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; -use futures_channel::oneshot; -use futures_executor::block_on; -use futures_timer::Delay; -use futures_util::{select, FutureExt}; +use futures_util::future::FutureExt; use network::{Network, Participant, Promises}; -use std::{sync::Arc, thread, time::Duration}; +use std::{sync::Arc, time::Duration}; +use tokio::{runtime::Runtime, select, sync::oneshot}; use tracing::{debug, error, trace, warn}; pub(crate) struct ServerInfoPacket { @@ -17,7 +15,7 @@ pub(crate) type IncomingClient = Client; pub(crate) struct ConnectionHandler { _network: Arc, - thread_handle: Option>, + thread_handle: Option>, pub client_receiver: Receiver, pub info_requester_receiver: Receiver>, stop_sender: Option>, @@ -28,7 +26,7 @@ pub(crate) struct ConnectionHandler { /// to the Server main thread sometimes though to get the current server_info /// and time impl ConnectionHandler { - pub fn new(network: Network) -> Self { + pub fn new(network: Network, runtime: Arc) -> Self { let network = Arc::new(network); let network_clone = Arc::clone(&network); let (stop_sender, stop_receiver) = oneshot::channel(); @@ -37,14 +35,12 @@ impl ConnectionHandler { let (info_requester_sender, info_requester_receiver) = bounded::>(1); - let thread_handle = Some(thread::spawn(|| { - block_on(Self::work( - network_clone, - client_sender, - info_requester_sender, - stop_receiver, - )); - })); + let thread_handle = Some(runtime.spawn(Self::work( + network_clone, + client_sender, + info_requester_sender, + stop_receiver, + ))); Self { _network: network, @@ -64,7 +60,7 @@ impl ConnectionHandler { let mut stop_receiver = stop_receiver.fuse(); loop { let participant = match select!( - _ = stop_receiver => None, + _ = &mut stop_receiver => None, p = network.connected().fuse() => Some(p), ) { None => break, @@ -82,7 +78,7 @@ impl ConnectionHandler { let info_requester_sender = info_requester_sender.clone(); match select!( - _ = stop_receiver => None, + _ = &mut stop_receiver => None, e = Self::init_participant(participant, client_sender, info_requester_sender).fuse() => Some(e), ) { None => break, @@ -104,11 +100,11 @@ impl ConnectionHandler { let reliable = Promises::ORDERED | Promises::CONSISTENCY; let reliablec = reliable | Promises::COMPRESSED; - let general_stream = participant.open(3, reliablec).await?; - let ping_stream = participant.open(2, reliable).await?; - let mut register_stream = participant.open(3, reliablec).await?; - let character_screen_stream = participant.open(3, reliablec).await?; - let in_game_stream = participant.open(3, reliablec).await?; + let general_stream = participant.open(3, reliablec, 500).await?; + let ping_stream = participant.open(2, reliable, 500).await?; + let mut register_stream = participant.open(3, reliablec, 0).await?; + let character_screen_stream = participant.open(3, reliablec, 0).await?; + let in_game_stream = participant.open(3, reliablec, 400_000).await?; let server_data = receiver.recv()?; @@ -116,7 +112,7 @@ impl ConnectionHandler { const TIMEOUT: Duration = Duration::from_secs(5); let client_type = match select!( - _ = Delay::new(TIMEOUT).fuse() => None, + _ = tokio::time::sleep(TIMEOUT).fuse() => None, t = register_stream.recv::().fuse() => Some(t), ) { None => { @@ -145,12 +141,8 @@ impl ConnectionHandler { impl Drop for ConnectionHandler { fn drop(&mut self) { let _ = self.stop_sender.take().unwrap().send(()); - trace!("blocking till ConnectionHandler is closed"); - self.thread_handle - .take() - .unwrap() - .join() - .expect("There was an error in ConnectionHandler, clean shutdown impossible"); - trace!("gracefully closed ConnectionHandler!"); + trace!("aborting ConnectionHandler"); + self.thread_handle.take().unwrap().abort(); + trace!("aborted ConnectionHandler!"); } } diff --git a/server/src/events/player.rs b/server/src/events/player.rs index 03ac7dde10..9900a9fc3e 100644 --- a/server/src/events/player.rs +++ b/server/src/events/player.rs @@ -11,9 +11,8 @@ use common::{ }; use common_net::msg::{PlayerListUpdate, PresenceKind, ServerGeneral}; use common_sys::state::State; -use futures_executor::block_on; use specs::{saveload::MarkerAllocator, Builder, Entity as EcsEntity, WorldExt}; -use tracing::{debug, error, trace, warn}; +use tracing::*; pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) { span!(_guard, "handle_exit_ingame"); @@ -107,26 +106,26 @@ pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event { let participant = client.participant.take().unwrap(); let pid = participant.remote_pid(); - std::thread::spawn(move || { - let span = tracing::span!(tracing::Level::DEBUG, "client_disconnect", ?pid, ?entity); - let _enter = span.enter(); - let now = std::time::Instant::now(); - debug!(?pid, ?entity, "Start handle disconnect of client"); - if let Err(e) = block_on(participant.disconnect()) { - debug!( - ?e, - ?pid, - "Error when disconnecting client, maybe the pipe already broke" - ); - }; - trace!(?pid, "finished disconnect"); - let elapsed = now.elapsed(); - if elapsed.as_millis() > 100 { - warn!(?elapsed, ?pid, "disconnecting took quite long"); - } else { - debug!(?elapsed, ?pid, "disconnecting took"); + server.runtime.spawn( + async { + let now = std::time::Instant::now(); + debug!("Start handle disconnect of client"); + if let Err(e) = participant.disconnect().await { + debug!( + ?e, + "Error when disconnecting client, maybe the pipe already broke" + ); + }; + trace!("finished disconnect"); + let elapsed = now.elapsed(); + if elapsed.as_millis() > 100 { + warn!(?elapsed, "disconnecting took quite long"); + } else { + debug!(?elapsed, "disconnecting took"); + } } - }); + .instrument(tracing::debug_span!("client_disconnect", ?pid, ?entity)), + ); } let state = server.state_mut(); diff --git a/server/src/lib.rs b/server/src/lib.rs index 40aa2b0804..bffdf79430 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -75,7 +75,6 @@ use common_net::{ #[cfg(feature = "plugins")] use common_sys::plugin::PluginMgr; use common_sys::state::State; -use futures_executor::block_on; use metrics::{PhysicsMetrics, StateTickMetrics, TickMetrics}; use network::{Network, Pid, ProtocolAddr}; use persistence::{ @@ -95,7 +94,6 @@ use std::{ use test_world::{IndexOwned, World}; use tokio::{runtime::Runtime, sync::Notify}; use tracing::{debug, error, info, trace}; -use uvth::{ThreadPool, ThreadPoolBuilder}; use vek::*; #[cfg(feature = "worldgen")] @@ -123,8 +121,7 @@ pub struct Server { connection_handler: ConnectionHandler, - _runtime: Arc, - thread_pool: ThreadPool, + runtime: Arc, metrics_shutdown: Arc, tick_metrics: TickMetrics, @@ -366,9 +363,6 @@ impl Server { registry_state(®istry).expect("failed to register state metrics"); registry_physics(®istry).expect("failed to register state metrics"); - let thread_pool = ThreadPoolBuilder::new() - .name("veloren-worker".to_string()) - .build(); let network = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), ®istry); let metrics_shutdown = Arc::new(Notify::new()); let metrics_shutdown_clone = Arc::clone(&metrics_shutdown); @@ -381,8 +375,8 @@ impl Server { ) .await }); - block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?; - let connection_handler = ConnectionHandler::new(network); + runtime.block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?; + let connection_handler = ConnectionHandler::new(network, Arc::clone(&runtime)); // Initiate real-time world simulation #[cfg(feature = "worldgen")] @@ -397,9 +391,7 @@ impl Server { map, connection_handler, - - _runtime: runtime, - thread_pool, + runtime, metrics_shutdown, tick_metrics, @@ -431,11 +423,6 @@ impl Server { } } - pub fn with_thread_pool(mut self, thread_pool: ThreadPool) -> Self { - self.thread_pool = thread_pool; - self - } - /// Get a reference to the server's settings pub fn settings(&self) -> impl Deref + '_ { self.state.ecs().fetch::() @@ -676,7 +663,7 @@ impl Server { // only work we do here on the fast path is perform a relaxed read on an atomic. // boolean. let index = &mut self.index; - let thread_pool = &mut self.thread_pool; + let runtime = &mut self.runtime; let world = &mut self.world; let ecs = self.state.ecs_mut(); @@ -697,7 +684,7 @@ impl Server { chunk_generator.generate_chunk( None, pos, - thread_pool, + runtime, Arc::clone(&world), index.clone(), ); @@ -1021,7 +1008,7 @@ impl Server { .generate_chunk( Some(entity), key, - &mut self.thread_pool, + &mut self.runtime, Arc::clone(&self.world), self.index.clone(), ); diff --git a/server/src/persistence/character_loader.rs b/server/src/persistence/character_loader.rs index c1b4d8ffcf..4d2aeff569 100644 --- a/server/src/persistence/character_loader.rs +++ b/server/src/persistence/character_loader.rs @@ -73,12 +73,13 @@ impl CharacterLoader { let mut conn = establish_connection(db_dir)?; - std::thread::spawn(move || { - for request in internal_rx { - let (entity, kind) = request; + let builder = std::thread::Builder::new().name("persistence_loader".into()); + builder + .spawn(move || { + for request in internal_rx { + let (entity, kind) = request; - if let Err(e) = - internal_tx.send(CharacterLoaderResponse { + if let Err(e) = internal_tx.send(CharacterLoaderResponse { entity, result: match kind { CharacterLoaderRequestKind::CreateCharacter { @@ -123,12 +124,12 @@ impl CharacterLoader { CharacterLoaderResponseKind::CharacterData(Box::new(result)) }, }, - }) - { - error!(?e, "Could not send send persistence request"); + }) { + error!(?e, "Could not send send persistence request"); + } } - } - }); + }) + .unwrap(); Ok(Self { update_tx, diff --git a/server/src/persistence/character_updater.rs b/server/src/persistence/character_updater.rs index 902b207d42..69b459194c 100644 --- a/server/src/persistence/character_updater.rs +++ b/server/src/persistence/character_updater.rs @@ -24,13 +24,16 @@ impl CharacterUpdater { let mut conn = establish_connection(db_dir)?; - let handle = std::thread::spawn(move || { - while let Ok(updates) = update_rx.recv() { - trace!("Persistence batch update starting"); - execute_batch_update(updates, &mut conn); - trace!("Persistence batch update finished"); - } - }); + let builder = std::thread::Builder::new().name("persistence_updater".into()); + let handle = builder + .spawn(move || { + while let Ok(updates) = update_rx.recv() { + trace!("Persistence batch update starting"); + execute_batch_update(updates, &mut conn); + trace!("Persistence batch update finished"); + } + }) + .unwrap(); Ok(Self { update_tx: Some(update_tx), diff --git a/voxygen/Cargo.toml b/voxygen/Cargo.toml index 67f735a608..8095d4f4d4 100644 --- a/voxygen/Cargo.toml +++ b/voxygen/Cargo.toml @@ -81,7 +81,6 @@ rodio = {version = "0.13", default-features = false, features = ["wav", "vorbis" ron = {version = "0.6", default-features = false} serde = {version = "1.0", features = [ "rc", "derive" ]} treeculler = "0.1.0" -uvth = "3.1.1" tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] } num_cpus = "1.0" # vec_map = { version = "0.8.2" } diff --git a/voxygen/src/logging.rs b/voxygen/src/logging.rs index d0c3bd98d5..2266b9c0cb 100644 --- a/voxygen/src/logging.rs +++ b/voxygen/src/logging.rs @@ -22,14 +22,14 @@ const RUST_LOG_ENV: &str = "RUST_LOG"; /// `RUST_LOG="veloren_voxygen=trace"` /// /// more complex tracing can be done by concatenating with a `,` as seperator: -/// - warn for `uvth`, `tiny_http`, `dot_vox`, `gfx_device_gl::factory, +/// - warn for `prometheus_hyper`, `dot_vox`, `gfx_device_gl::factory, /// `gfx_device_gl::shade` trace for `veloren_voxygen`, info for everything /// else -/// `RUST_LOG="uvth=warn,tiny_http=warn,dot_vox::parser=warn,gfx_device_gl:: +/// `RUST_LOG="prometheus_hyper=warn,dot_vox::parser=warn,gfx_device_gl:: /// factory=warn,gfx_device_gl::shade=warn,veloren_voxygen=trace,info"` /// /// By default a few directives are set to `warn` by default, until explicitly -/// overwritten! e.g. `RUST_LOG="uvth=debug"` +/// overwritten! e.g. `RUST_LOG="gfx_device_gl=debug"` pub fn init(settings: &Settings) -> Vec { // To hold the guards that we create, they will cause the logs to be // flushed when they're dropped. @@ -42,8 +42,7 @@ pub fn init(settings: &Settings) -> Vec { let base_exceptions = |env: EnvFilter| { env.add_directive("dot_vox::parser=warn".parse().unwrap()) .add_directive("gfx_device_gl=warn".parse().unwrap()) - .add_directive("uvth=warn".parse().unwrap()) - .add_directive("tiny_http=warn".parse().unwrap()) + .add_directive("prometheus_hyper=warn".parse().unwrap()) .add_directive("mio::sys::windows=debug".parse().unwrap()) .add_directive("veloren_network_protocol=info".parse().unwrap()) .add_directive( diff --git a/voxygen/src/main.rs b/voxygen/src/main.rs index 882377d92d..c44e6ecd0a 100644 --- a/voxygen/src/main.rs +++ b/voxygen/src/main.rs @@ -124,11 +124,16 @@ fn main() { // On windows we need to spawn a thread as the msg doesn't work otherwise #[cfg(target_os = "windows")] - std::thread::spawn(move || { - mbox(); - }) - .join() - .unwrap(); + { + let builder = std::thread::Builder::new().name("shutdown".into()); + builder + .spawn(move || { + mbox(); + }) + .unwrap() + .join() + .unwrap(); + } #[cfg(not(target_os = "windows"))] mbox(); diff --git a/voxygen/src/menu/char_selection/mod.rs b/voxygen/src/menu/char_selection/mod.rs index 7219fc1d87..69b65def02 100644 --- a/voxygen/src/menu/char_selection/mod.rs +++ b/voxygen/src/menu/char_selection/mod.rs @@ -147,7 +147,7 @@ impl PlayState for CharSelectionState { time: client.state().get_time(), delta_time: client.state().ecs().read_resource::().0, tick: client.get_tick(), - thread_pool: client.thread_pool(), + runtime: client.runtime(), body: humanoid_body, gamma: global_state.settings.graphics.gamma, exposure: global_state.settings.graphics.exposure, diff --git a/voxygen/src/menu/main/client_init.rs b/voxygen/src/menu/main/client_init.rs index 2297bf9232..07f1b41fd1 100644 --- a/voxygen/src/menu/main/client_init.rs +++ b/voxygen/src/menu/main/client_init.rs @@ -4,14 +4,14 @@ use client::{ }; use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError}; use std::{ - net::ToSocketAddrs, + net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, - thread, time::Duration, }; +use tokio::{net::lookup_host, runtime}; use tracing::{trace, warn}; #[derive(Debug)] @@ -40,6 +40,7 @@ pub struct ClientInit { rx: Receiver, trust_tx: Sender, cancel: Arc, + _runtime: Arc, } impl ClientInit { #[allow(clippy::op_ref)] // TODO: Pending review in #587 @@ -50,101 +51,129 @@ impl ClientInit { view_distance: Option, password: String, ) -> Self { - let (server_address, default_port, prefer_ipv6) = connection_args; + let (server_address, port, prefer_ipv6) = connection_args; let (tx, rx) = unbounded(); let (trust_tx, trust_rx) = unbounded(); let cancel = Arc::new(AtomicBool::new(false)); let cancel2 = Arc::clone(&cancel); - thread::spawn(move || { - // Parse ip address or resolves hostname. - // Note: if you use an ipv6 address, the number after the last colon will be - // used as the port unless you use [] around the address. - match server_address - .to_socket_addrs() - .or((server_address.as_ref(), default_port).to_socket_addrs()) - { - Ok(socket_address) => { - let (first_addrs, second_addrs) = - socket_address.partition::, _>(|a| a.is_ipv6() == prefer_ipv6); + let cores = num_cpus::get(); - let mut last_err = None; + let runtime = Arc::new( + runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(if cores > 4 { cores - 1 } else { cores }) + .build() + .unwrap(), + ); + let runtime2 = Arc::clone(&runtime); - let cores = num_cpus::get(); - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads(if cores > 4 { cores - 1 } else { cores }) - .build() - .unwrap(), - ); + runtime.spawn(async move { + let addresses = match Self::resolve(server_address, port, prefer_ipv6).await { + Ok(a) => a, + Err(e) => { + let _ = tx.send(Msg::Done(Err(Error::BadAddress(e)))); + return; + }, + }; + let mut last_err = None; - const FOUR_MINUTES_RETRIES: u64 = 48; - 'tries: for _ in 0..FOUR_MINUTES_RETRIES { - if cancel2.load(Ordering::Relaxed) { - break; - } - for socket_addr in - first_addrs.clone().into_iter().chain(second_addrs.clone()) - { - match Client::new(socket_addr, view_distance, Arc::clone(&runtime)) { - Ok(mut client) => { - if let Err(e) = - client.register(username, password, |auth_server| { - let _ = tx - .send(Msg::IsAuthTrusted(auth_server.to_string())); - trust_rx - .recv() - .map(|AuthTrust(server, trust)| { - trust && &server == auth_server - }) - .unwrap_or(false) + const FOUR_MINUTES_RETRIES: u64 = 48; + 'tries: for _ in 0..FOUR_MINUTES_RETRIES { + if cancel2.load(Ordering::Relaxed) { + break; + } + for socket_addr in &addresses { + match Client::new(socket_addr.clone(), view_distance, Arc::clone(&runtime2)) + .await + { + Ok(mut client) => { + if let Err(e) = client + .register(username, password, |auth_server| { + let _ = tx.send(Msg::IsAuthTrusted(auth_server.to_string())); + trust_rx + .recv() + .map(|AuthTrust(server, trust)| { + trust && &server == auth_server }) - { - last_err = Some(Error::ClientError(e)); - break 'tries; - } - let _ = tx.send(Msg::Done(Ok(client))); - return; - }, - Err(ClientError::NetworkErr(NetworkError::ConnectFailed(e))) => { - if e.kind() == std::io::ErrorKind::PermissionDenied { - warn!(?e, "Cannot connect to server: Incompatible version"); - last_err = Some(Error::ClientError( - ClientError::NetworkErr(NetworkError::ConnectFailed(e)), - )); - break 'tries; - } else { - warn!(?e, "Failed to connect to the server. Retrying..."); - } - }, - Err(e) => { - trace!(?e, "Aborting server connection attempt"); - last_err = Some(Error::ClientError(e)); - break 'tries; - }, + .unwrap_or(false) + }) + .await + { + last_err = Some(Error::ClientError(e)); + break 'tries; } - } - thread::sleep(Duration::from_secs(5)); + let _ = tx.send(Msg::Done(Ok(client))); + return; + }, + Err(ClientError::NetworkErr(NetworkError::ConnectFailed(e))) => { + if e.kind() == std::io::ErrorKind::PermissionDenied { + warn!(?e, "Cannot connect to server: Incompatible version"); + last_err = Some(Error::ClientError(ClientError::NetworkErr( + NetworkError::ConnectFailed(e), + ))); + break 'tries; + } else { + warn!(?e, "Failed to connect to the server. Retrying..."); + } + }, + Err(e) => { + trace!(?e, "Aborting server connection attempt"); + last_err = Some(Error::ClientError(e)); + break 'tries; + }, } - // Parsing/host name resolution successful but no connection succeeded. - let _ = tx.send(Msg::Done(Err(last_err.unwrap_or(Error::NoAddress)))); - }, - Err(err) => { - // Error parsing input string or error resolving host name. - let _ = tx.send(Msg::Done(Err(Error::BadAddress(err)))); - }, + } + tokio::time::sleep(Duration::from_secs(5)).await; } + + // Parsing/host name resolution successful but no connection succeeded. + let _ = tx.send(Msg::Done(Err(last_err.unwrap_or(Error::NoAddress)))); }); ClientInit { rx, trust_tx, cancel, + _runtime: runtime, } } + async fn resolve( + server_address: String, + port: u16, + prefer_ipv6: bool, + ) -> Result, std::io::Error> { + //1. try if server_address already contains a port + if let Ok(addr) = server_address.parse::() { + warn!("please don't add port directly to server_address"); + return Ok(vec![addr]); + } + + //2, try server_address and port + if let Ok(addr) = format!("{}:{}", server_address, port).parse::() { + return Ok(vec![addr]); + } + + //3. do DNS call + let (mut first_addrs, mut second_addrs) = match lookup_host(server_address).await { + Ok(s) => s.partition::, _>(|a| a.is_ipv6() == prefer_ipv6), + Err(e) => { + return Err(e); + }, + }; + + Ok( + std::iter::Iterator::chain(first_addrs.drain(..), second_addrs.drain(..)) + .map(|mut addr| { + addr.set_port(port); + addr + }) + .collect(), + ) + } + /// Poll if the thread is complete. /// Returns None if the thread is still running, otherwise returns the /// Result of client creation. diff --git a/voxygen/src/scene/figure/cache.rs b/voxygen/src/scene/figure/cache.rs index 38c90a4eaf..aa5d047e80 100644 --- a/voxygen/src/scene/figure/cache.rs +++ b/voxygen/src/scene/figure/cache.rs @@ -27,6 +27,7 @@ use core::{hash::Hash, ops::Range}; use crossbeam::atomic; use hashbrown::{hash_map::Entry, HashMap}; use std::sync::Arc; +use tokio::runtime::Runtime; use vek::*; /// A type produced by mesh worker threads corresponding to the information @@ -311,7 +312,7 @@ where tick: u64, camera_mode: CameraMode, character_state: Option<&CharacterState>, - thread_pool: &uvth::ThreadPool, + runtime: &Arc, ) -> (FigureModelEntryLod<'c>, &'c Skel::Attr) where for<'a> &'a Skel::Body: Into, @@ -377,7 +378,7 @@ where let manifests = self.manifests; let slot_ = Arc::clone(&slot); - thread_pool.execute(move || { + runtime.spawn_blocking(move || { // First, load all the base vertex data. let manifests = &*manifests.read(); let meshes = ::bone_meshes(&key, manifests); diff --git a/voxygen/src/scene/figure/mod.rs b/voxygen/src/scene/figure/mod.rs index f1717bd619..18264b5bb2 100644 --- a/voxygen/src/scene/figure/mod.rs +++ b/voxygen/src/scene/figure/mod.rs @@ -719,7 +719,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.thread_pool, + scene_data.runtime, ); let state = self @@ -1445,7 +1445,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.thread_pool, + scene_data.runtime, ); let state = self @@ -1653,7 +1653,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.thread_pool, + scene_data.runtime, ); let state = self @@ -1976,7 +1976,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.thread_pool, + scene_data.runtime, ); let state = self @@ -2329,7 +2329,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.thread_pool, + scene_data.runtime, ); let state = self @@ -2435,7 +2435,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.thread_pool, + scene_data.runtime, ); let state = self @@ -2520,7 +2520,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.thread_pool, + scene_data.runtime, ); let state = @@ -2610,7 +2610,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.thread_pool, + scene_data.runtime, ); let state = self @@ -2762,7 +2762,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.thread_pool, + scene_data.runtime, ); let state = self @@ -2849,7 +2849,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.thread_pool, + scene_data.runtime, ); let state = self @@ -2934,7 +2934,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.thread_pool, + scene_data.runtime, ); let state = self @@ -3344,7 +3344,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.thread_pool, + scene_data.runtime, ); let state = @@ -3480,7 +3480,7 @@ impl FigureMgr { tick, player_camera_mode, player_character_state, - scene_data.thread_pool, + scene_data.runtime, ); let state = diff --git a/voxygen/src/scene/mod.rs b/voxygen/src/scene/mod.rs index ec17af6f3b..06a9b57af8 100644 --- a/voxygen/src/scene/mod.rs +++ b/voxygen/src/scene/mod.rs @@ -36,6 +36,8 @@ use common_sys::state::State; use comp::item::Reagent; use num::traits::{Float, FloatConst}; use specs::{Entity as EcsEntity, Join, WorldExt}; +use std::sync::Arc; +use tokio::runtime::Runtime; use vek::*; // TODO: Don't hard-code this. @@ -114,7 +116,7 @@ pub struct SceneData<'a> { pub loaded_distance: f32, pub view_distance: u32, pub tick: u64, - pub thread_pool: &'a uvth::ThreadPool, + pub runtime: &'a Arc, pub gamma: f32, pub exposure: f32, pub ambiance: f32, diff --git a/voxygen/src/scene/simple.rs b/voxygen/src/scene/simple.rs index c3cd629d07..9e4838ad25 100644 --- a/voxygen/src/scene/simple.rs +++ b/voxygen/src/scene/simple.rs @@ -29,6 +29,8 @@ use common::{ terrain::BlockKind, vol::{BaseVol, ReadVol}, }; +use std::sync::Arc; +use tokio::runtime::Runtime; use tracing::error; use vek::*; use winit::event::MouseButton; @@ -96,7 +98,7 @@ pub struct SceneData<'a> { pub time: f64, pub delta_time: f32, pub tick: u64, - pub thread_pool: &'a uvth::ThreadPool, + pub runtime: &'a Arc, pub body: Option, pub gamma: f32, pub exposure: f32, @@ -350,7 +352,7 @@ impl Scene { scene_data.tick, CameraMode::default(), None, - scene_data.thread_pool, + scene_data.runtime, ) .0; let mut buf = [Default::default(); anim::MAX_BONE_COUNT]; diff --git a/voxygen/src/scene/terrain.rs b/voxygen/src/scene/terrain.rs index 4dd28b146e..3b143283a2 100644 --- a/voxygen/src/scene/terrain.rs +++ b/voxygen/src/scene/terrain.rs @@ -27,7 +27,10 @@ use enum_iterator::IntoEnumIterator; use guillotiere::AtlasAllocator; use hashbrown::HashMap; use serde::Deserialize; -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; use tracing::warn; use treeculler::{BVol, Frustum, AABB}; use vek::*; @@ -259,6 +262,7 @@ pub struct Terrain { mesh_send_tmp: channel::Sender, mesh_recv: channel::Receiver, mesh_todo: HashMap, ChunkMeshState>, + mesh_todos_active: Arc, // GPU data sprite_data: Arc>>, @@ -406,6 +410,7 @@ impl Terrain { mesh_send_tmp: send, mesh_recv: recv, mesh_todo: HashMap::default(), + mesh_todos_active: Arc::new(AtomicU64::new(0)), sprite_data: Arc::new(sprite_data), sprite_col_lights, waves: renderer @@ -633,6 +638,7 @@ impl Terrain { // Limit ourselves to u16::MAX even if larger textures are supported. let max_texture_size = renderer.max_texture_size(); + let cores = num_cpus::get(); span!(guard, "Queue meshing from todo list"); for (todo, chunk) in self @@ -649,8 +655,7 @@ impl Terrain { .cloned()?)) }) { - // TODO: find a alternative! - if scene_data.thread_pool.queued_jobs() > 0 { + if self.mesh_todos_active.load(Ordering::Relaxed) > (cores as u64 - 1).max(1) { break; } @@ -701,7 +706,9 @@ impl Terrain { let started_tick = todo.started_tick; let sprite_data = Arc::clone(&self.sprite_data); let sprite_config = Arc::clone(&self.sprite_config); - scene_data.thread_pool.execute(move || { + let cnt = Arc::clone(&self.mesh_todos_active); + cnt.fetch_add(1, Ordering::Relaxed); + scene_data.runtime.spawn_blocking(move || { let sprite_data = sprite_data; let _ = send.send(mesh_worker( pos, @@ -714,6 +721,7 @@ impl Terrain { &sprite_data, &sprite_config, )); + cnt.fetch_sub(1, Ordering::Relaxed); }); todo.is_worker_active = true; } diff --git a/voxygen/src/session.rs b/voxygen/src/session.rs index f8ff6ddc7d..2c9b052c1e 100644 --- a/voxygen/src/session.rs +++ b/voxygen/src/session.rs @@ -1307,7 +1307,7 @@ impl PlayState for SessionState { loaded_distance: client.loaded_distance(), view_distance: client.view_distance().unwrap_or(1), tick: client.get_tick(), - thread_pool: client.thread_pool(), + runtime: &client.runtime(), gamma: global_state.settings.graphics.gamma, exposure: global_state.settings.graphics.exposure, ambiance: global_state.settings.graphics.ambiance, @@ -1375,7 +1375,7 @@ impl PlayState for SessionState { loaded_distance: client.loaded_distance(), view_distance: client.view_distance().unwrap_or(1), tick: client.get_tick(), - thread_pool: client.thread_pool(), + runtime: &client.runtime(), gamma: settings.graphics.gamma, exposure: settings.graphics.exposure, ambiance: settings.graphics.ambiance, diff --git a/voxygen/src/singleplayer.rs b/voxygen/src/singleplayer.rs index bda1c1fd2b..8c2b76baff 100644 --- a/voxygen/src/singleplayer.rs +++ b/voxygen/src/singleplayer.rs @@ -10,7 +10,7 @@ use std::{ thread::{self, JoinHandle}, time::Duration, }; -use tracing::{error, info, warn}; +use tracing::{error, info, warn, trace, debug}; const TPS: u64 = 30; @@ -81,15 +81,19 @@ impl Singleplayer { let settings = server::Settings::singleplayer(&server_data_dir); let editable_settings = server::EditableSettings::singleplayer(&server_data_dir); - let thread_pool = client.map(|c| c.thread_pool().clone()); - let cores = num_cpus::get(); - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads(if cores > 4 { cores - 1 } else { cores }) - .build() - .unwrap(), - ); + let runtime = if let Some(c) = client { + Arc::clone(&c.runtime()) + } else { + let cores = num_cpus::get(); + debug!("creating a new runtime for server"); + Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(if cores > 4 { cores - 1 } else { cores }) + .build() + .unwrap(), + ) + }; let settings2 = settings.clone(); let paused = Arc::new(AtomicBool::new(false)); @@ -97,37 +101,37 @@ impl Singleplayer { let (result_sender, result_receiver) = bounded(1); - let thread = thread::spawn(move || { - let mut server = None; - if let Err(e) = result_sender.send( - match Server::new(settings2, editable_settings, &server_data_dir, runtime) { - Ok(s) => { - server = Some(s); - Ok(()) + let builder = thread::Builder::new().name("singleplayer-server-thread".into()); + let thread = builder + .spawn(move || { + trace!("starting singleplayer server thread"); + let mut server = None; + if let Err(e) = result_sender.send( + match Server::new(settings2, editable_settings, &server_data_dir, runtime) { + Ok(s) => { + server = Some(s); + Ok(()) + }, + Err(e) => Err(e), }, - Err(e) => Err(e), - }, - ) { - warn!( - ?e, - "Failed to send singleplayer server initialization result. Most likely the \ - channel was closed by cancelling server creation. Stopping Server" - ); - return; - }; + ) { + warn!( + ?e, + "Failed to send singleplayer server initialization result. Most likely \ + the channel was closed by cancelling server creation. Stopping Server" + ); + return; + }; - let server = match server { - Some(s) => s, - None => return, - }; + let server = match server { + Some(s) => s, + None => return, + }; - let server = match thread_pool { - Some(pool) => server.with_thread_pool(pool), - None => server, - }; - - run_server(server, receiver, paused1); - }); + run_server(server, receiver, paused1); + trace!("ending singleplayer server thread"); + }) + .unwrap(); Singleplayer { _server_thread: thread, diff --git a/voxygen/src/window.rs b/voxygen/src/window.rs index ffd0b387b7..8bd5175731 100644 --- a/voxygen/src/window.rs +++ b/voxygen/src/window.rs @@ -1319,31 +1319,34 @@ impl Window { let mut path = settings.screenshots_path.clone(); let sender = self.message_sender.clone(); - std::thread::spawn(move || { - use std::time::SystemTime; - // Check if folder exists and create it if it does not - if !path.exists() { - if let Err(e) = std::fs::create_dir_all(&path) { - warn!(?e, "Couldn't create folder for screenshot"); - let _result = - sender.send(String::from("Couldn't create folder for screenshot")); + let builder = std::thread::Builder::new().name("screenshot".into()); + builder + .spawn(move || { + use std::time::SystemTime; + // Check if folder exists and create it if it does not + if !path.exists() { + if let Err(e) = std::fs::create_dir_all(&path) { + warn!(?e, "Couldn't create folder for screenshot"); + let _result = sender + .send(String::from("Couldn't create folder for screenshot")); + } } - } - path.push(format!( - "screenshot_{}.png", - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .map(|d| d.as_millis()) - .unwrap_or(0) - )); - if let Err(e) = img.save(&path) { - warn!(?e, "Couldn't save screenshot"); - let _result = sender.send(String::from("Couldn't save screenshot")); - } else { - let _result = - sender.send(format!("Screenshot saved to {}", path.to_string_lossy())); - } - }); + path.push(format!( + "screenshot_{}.png", + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or(0) + )); + if let Err(e) = img.save(&path) { + warn!(?e, "Couldn't save screenshot"); + let _result = sender.send(String::from("Couldn't save screenshot")); + } else { + let _result = sender + .send(format!("Screenshot saved to {}", path.to_string_lossy())); + } + }) + .unwrap(); }, Err(e) => error!(?e, "Couldn't create screenshot due to renderer error"), }