Merge branch 'xMAC94x/prot_improvements' into 'master'

xMAC94x/prot_improvements

See merge request veloren/veloren!1796
This commit is contained in:
Marcel
2021-02-22 22:47:43 +00:00
55 changed files with 767 additions and 617 deletions

65
Cargo.lock generated
View File

@ -233,7 +233,7 @@ checksum = "dac94eeee6ebd1165959e440836a452109f9f839d6cfde12974d75a5b4222406"
dependencies = [ dependencies = [
"ahash 0.6.3", "ahash 0.6.3",
"bincode", "bincode",
"crossbeam-channel 0.5.0", "crossbeam-channel",
"log", "log",
"notify 4.0.15", "notify 4.0.15",
"parking_lot 0.11.1", "parking_lot 0.11.1",
@ -1093,22 +1093,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd01a6eb3daaafa260f6fc94c3a6c36390abc2080e38e3e34ced87393fb77d80" checksum = "fd01a6eb3daaafa260f6fc94c3a6c36390abc2080e38e3e34ced87393fb77d80"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"crossbeam-channel 0.5.0", "crossbeam-channel",
"crossbeam-deque 0.8.0", "crossbeam-deque 0.8.0",
"crossbeam-epoch 0.9.1", "crossbeam-epoch 0.9.1",
"crossbeam-queue", "crossbeam-queue",
"crossbeam-utils 0.8.1", "crossbeam-utils 0.8.1",
] ]
[[package]]
name = "crossbeam-channel"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8ec7fcd21571dc78f96cc96243cab8d8f035247c3efd16c687be154c3fa9efa"
dependencies = [
"crossbeam-utils 0.6.6",
]
[[package]] [[package]]
name = "crossbeam-channel" name = "crossbeam-channel"
version = "0.5.0" version = "0.5.0"
@ -1180,16 +1171,6 @@ dependencies = [
"crossbeam-utils 0.8.1", "crossbeam-utils 0.8.1",
] ]
[[package]]
name = "crossbeam-utils"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6"
dependencies = [
"cfg-if 0.1.10",
"lazy_static",
]
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.7.2" version = "0.7.2"
@ -1829,12 +1810,6 @@ dependencies = [
"once_cell", "once_cell",
] ]
[[package]]
name = "futures-timer"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.12" version = "0.3.12"
@ -2162,7 +2137,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac2c82074cafb68b9e459c50c655f7eedcb92d6ee7166813802934bc6fc29fa3" checksum = "ac2c82074cafb68b9e459c50c655f7eedcb92d6ee7166813802934bc6fc29fa3"
dependencies = [ dependencies = [
"ab_glyph", "ab_glyph",
"crossbeam-channel 0.5.0", "crossbeam-channel",
"crossbeam-deque 0.8.0", "crossbeam-deque 0.8.0",
"linked-hash-map", "linked-hash-map",
"rayon", "rayon",
@ -3302,7 +3277,7 @@ checksum = "58e54552360d7b89a698eca6de3927205a8e03e8080dc13d779de5c7876e098b"
dependencies = [ dependencies = [
"anymap", "anymap",
"bitflags", "bitflags",
"crossbeam-channel 0.5.0", "crossbeam-channel",
"filetime", "filetime",
"fsevent 2.0.2", "fsevent 2.0.2",
"fsevent-sys 3.0.2", "fsevent-sys 3.0.2",
@ -4142,7 +4117,7 @@ version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ab346ac5921dc62ffa9f89b7a773907511cdfa5490c572ae9be1be33e8afa4a" checksum = "9ab346ac5921dc62ffa9f89b7a773907511cdfa5490c572ae9be1be33e8afa4a"
dependencies = [ dependencies = [
"crossbeam-channel 0.5.0", "crossbeam-channel",
"crossbeam-deque 0.8.0", "crossbeam-deque 0.8.0",
"crossbeam-utils 0.8.1", "crossbeam-utils 0.8.1",
"lazy_static", "lazy_static",
@ -5241,7 +5216,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9965507e507f12c8901432a33e31131222abac31edd90cabbcf85cf544b7127a" checksum = "9965507e507f12c8901432a33e31131222abac31edd90cabbcf85cf544b7127a"
dependencies = [ dependencies = [
"chrono", "chrono",
"crossbeam-channel 0.5.0", "crossbeam-channel",
"tracing-subscriber", "tracing-subscriber",
] ]
@ -5524,17 +5499,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "uvth"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e59a167890d173eb0fcd7a1b99b84dc05c521ae8d76599130b8e19bef287abbf"
dependencies = [
"crossbeam-channel 0.3.9",
"log",
"num_cpus",
]
[[package]] [[package]]
name = "vcpkg" name = "vcpkg"
version = "0.2.11" version = "0.2.11"
@ -5593,19 +5557,15 @@ version = "0.8.0"
dependencies = [ dependencies = [
"authc", "authc",
"byteorder", "byteorder",
"futures-executor",
"futures-timer",
"futures-util", "futures-util",
"hashbrown 0.9.1", "hashbrown 0.9.1",
"image", "image",
"num 0.3.1", "num 0.3.1",
"num_cpus",
"rayon", "rayon",
"specs", "specs",
"tokio 1.2.0", "tokio 1.2.0",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"uvth",
"vek 0.12.0", "vek 0.12.0",
"veloren-common", "veloren-common",
"veloren-common-net", "veloren-common-net",
@ -5621,7 +5581,7 @@ dependencies = [
"arraygen", "arraygen",
"assets_manager", "assets_manager",
"criterion", "criterion",
"crossbeam-channel 0.5.0", "crossbeam-channel",
"crossbeam-utils 0.8.1", "crossbeam-utils 0.8.1",
"csv", "csv",
"directories-next", "directories-next",
@ -5702,7 +5662,7 @@ dependencies = [
"bytes 1.0.1", "bytes 1.0.1",
"clap", "clap",
"criterion", "criterion",
"crossbeam-channel 0.5.0", "crossbeam-channel",
"futures-core", "futures-core",
"futures-util", "futures-util",
"lazy_static", "lazy_static",
@ -5721,7 +5681,7 @@ dependencies = [
[[package]] [[package]]
name = "veloren-network-protocol" name = "veloren-network-protocol"
version = "0.5.0" version = "0.6.0"
dependencies = [ dependencies = [
"async-channel", "async-channel",
"async-trait", "async-trait",
@ -5767,13 +5727,10 @@ version = "0.8.0"
dependencies = [ dependencies = [
"authc", "authc",
"chrono", "chrono",
"crossbeam-channel 0.5.0", "crossbeam-channel",
"diesel", "diesel",
"diesel_migrations", "diesel_migrations",
"dotenv", "dotenv",
"futures-channel",
"futures-executor",
"futures-timer",
"futures-util", "futures-util",
"hashbrown 0.9.1", "hashbrown 0.9.1",
"itertools 0.9.0", "itertools 0.9.0",
@ -5793,7 +5750,6 @@ dependencies = [
"specs-idvs", "specs-idvs",
"tokio 1.2.0", "tokio 1.2.0",
"tracing", "tracing",
"uvth",
"vek 0.12.0", "vek 0.12.0",
"veloren-common", "veloren-common",
"veloren-common-net", "veloren-common-net",
@ -5880,7 +5836,6 @@ dependencies = [
"tracing-subscriber", "tracing-subscriber",
"tracing-tracy", "tracing-tracy",
"treeculler", "treeculler",
"uvth",
"vek 0.12.0", "vek 0.12.0",
"veloren-client", "veloren-client",
"veloren-common", "veloren-common",

View File

@ -48,6 +48,7 @@ https://veloren.net/account/."#,
"main.login.server_shut_down": "Server shut down", "main.login.server_shut_down": "Server shut down",
"main.login.already_logged_in": "You are already logged into the server.", "main.login.already_logged_in": "You are already logged into the server.",
"main.login.network_error": "Network error", "main.login.network_error": "Network error",
"main.login.network_wrong_version": "The server is running a different version than you are. Check your version and update your game.",
"main.login.failed_sending_request": "Request to Auth server failed", "main.login.failed_sending_request": "Request to Auth server failed",
"main.login.invalid_character": "The selected character is invalid", "main.login.invalid_character": "The selected character is invalid",
"main.login.client_crashed": "Client crashed", "main.login.client_crashed": "Client crashed",

View File

@ -17,14 +17,10 @@ common-net = { package = "veloren-common-net", path = "../common/net" }
network = { package = "veloren-network", path = "../network", features = ["compression"], default-features = false } network = { package = "veloren-network", path = "../network", features = ["compression"], default-features = false }
byteorder = "1.3.2" byteorder = "1.3.2"
uvth = "3.1.1"
futures-util = "0.3.7" futures-util = "0.3.7"
futures-executor = "0.3"
futures-timer = "3.0"
tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] } tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] }
image = { version = "0.23.12", default-features = false, features = ["png"] } image = { version = "0.23.12", default-features = false, features = ["png"] }
num = "0.3.1" num = "0.3.1"
num_cpus = "1.10.1"
tracing = { version = "0.1", default-features = false } tracing = { version = "0.1", default-features = false }
rayon = "1.5" rayon = "1.5"
specs = { git = "https://github.com/amethyst/specs.git", rev = "d4435bdf496cf322c74886ca09dd8795984919b4" } specs = { git = "https://github.com/amethyst/specs.git", rev = "d4435bdf496cf322c74886ca09dd8795984919b4" }

View File

@ -5,14 +5,13 @@
use common::{clock::Clock, comp}; use common::{clock::Clock, comp};
use std::{ use std::{
io, io,
net::ToSocketAddrs,
sync::{mpsc, Arc}, sync::{mpsc, Arc},
thread, thread,
time::Duration, time::Duration,
}; };
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use tracing::{error, info}; use tracing::{error, info};
use veloren_client::{Client, Event}; use veloren_client::{addr::ConnectionArgs, Client, Event};
const TPS: u64 = 10; // Low value is okay, just reading messages. const TPS: u64 = 10; // Low value is okay, just reading messages.
@ -45,27 +44,26 @@ fn main() {
let password = read_input(); let password = read_input();
let runtime = Arc::new(Runtime::new().unwrap()); let runtime = Arc::new(Runtime::new().unwrap());
let runtime2 = Arc::clone(&runtime);
// Create a client. // Create a client.
let mut client = Client::new( let mut client = runtime
server_addr .block_on(async {
.to_socket_addrs() let addr = ConnectionArgs::resolve(&server_addr, false)
.expect("Invalid server address") .await
.next() .expect("dns resolve failed");
.unwrap(), Client::new(addr, None, runtime2).await
None, })
runtime,
)
.expect("Failed to create client instance"); .expect("Failed to create client instance");
println!("Server info: {:?}", client.server_info()); println!("Server info: {:?}", client.server_info());
println!("Players online: {:?}", client.get_players()); println!("Players online: {:?}", client.get_players());
client runtime
.register(username, password, |provider| { .block_on(client.register(username, password, |provider| {
provider == "https://auth.veloren.net" provider == "https://auth.veloren.net"
}) }))
.unwrap(); .unwrap();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();

138
client/src/addr.rs Normal file
View File

@ -0,0 +1,138 @@
use std::net::SocketAddr;
use tokio::net::lookup_host;
use tracing::trace;
#[derive(Clone, Debug)]
pub enum ConnectionArgs {
IpAndPort(Vec<SocketAddr>),
Mpsc(u64),
}
impl ConnectionArgs {
const DEFAULT_PORT: u16 = 14004;
/// Parse ip address or resolves hostname.
/// Note: If you use an ipv6 address, the number after the last
/// colon will be used as the port unless you use [] around the address.
pub async fn resolve(
/* <hostname/ip>:[<port>] */ server_address: &str,
prefer_ipv6: bool,
) -> Result<Self, std::io::Error> {
// `lookup_host` will internally try to parse it as a SocketAddr
// 1. Assume it's a hostname + port
match lookup_host(server_address).await {
Ok(s) => {
trace!("Host lookup succeeded");
Ok(Self::sort_ipv6(s, prefer_ipv6))
},
Err(e) => {
// 2. Assume its a hostname without port
match lookup_host((server_address, Self::DEFAULT_PORT)).await {
Ok(s) => {
trace!("Host lookup without ports succeeded");
Ok(Self::sort_ipv6(s, prefer_ipv6))
},
Err(_) => Err(e), // Todo: evaluate returning both errors
}
},
}
}
fn sort_ipv6(s: impl Iterator<Item = SocketAddr>, prefer_ipv6: bool) -> Self {
let (mut first_addrs, mut second_addrs) =
s.partition::<Vec<_>, _>(|a| a.is_ipv6() == prefer_ipv6);
let addr = std::iter::Iterator::chain(first_addrs.drain(..), second_addrs.drain(..))
.collect::<Vec<_>>();
ConnectionArgs::IpAndPort(addr)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
#[tokio::test]
async fn resolve_localhost() {
let args = ConnectionArgs::resolve("localhost", false)
.await
.expect("resolve failed");
if let ConnectionArgs::IpAndPort(args) = args {
assert!(args.len() == 1 || args.len() == 2);
assert_eq!(args[0].ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
assert_eq!(args[0].port(), 14004);
} else {
panic!("wrong resolution");
}
let args = ConnectionArgs::resolve("localhost:666", false)
.await
.expect("resolve failed");
if let ConnectionArgs::IpAndPort(args) = args {
assert!(args.len() == 1 || args.len() == 2);
assert_eq!(args[0].port(), 666);
} else {
panic!("wrong resolution");
}
}
#[tokio::test]
async fn resolve_ipv6() {
let args = ConnectionArgs::resolve("localhost", true)
.await
.expect("resolve failed");
if let ConnectionArgs::IpAndPort(args) = args {
assert!(args.len() == 1 || args.len() == 2);
assert_eq!(args[0].ip(), Ipv6Addr::LOCALHOST);
assert_eq!(args[0].port(), 14004);
} else {
panic!("wrong resolution");
}
}
#[tokio::test]
async fn resolve() {
let args = ConnectionArgs::resolve("google.com", false)
.await
.expect("resolve failed");
if let ConnectionArgs::IpAndPort(args) = args {
assert!(args.len() == 1 || args.len() == 2);
assert_eq!(args[0].port(), 14004);
} else {
panic!("wrong resolution");
}
let args = ConnectionArgs::resolve("127.0.0.1", false)
.await
.expect("resolve failed");
if let ConnectionArgs::IpAndPort(args) = args {
assert_eq!(args.len(), 1);
assert_eq!(args[0].port(), 14004);
assert_eq!(args[0].ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
} else {
panic!("wrong resolution");
}
let args = ConnectionArgs::resolve("55.66.77.88", false)
.await
.expect("resolve failed");
if let ConnectionArgs::IpAndPort(args) = args {
assert_eq!(args.len(), 1);
assert_eq!(args[0].port(), 14004);
assert_eq!(args[0].ip(), IpAddr::V4(Ipv4Addr::new(55, 66, 77, 88)));
} else {
panic!("wrong resolution");
}
let args = ConnectionArgs::resolve("127.0.0.1:776", false)
.await
.expect("resolve failed");
if let ConnectionArgs::IpAndPort(args) = args {
assert_eq!(args.len(), 1);
assert_eq!(args[0].port(), 776);
assert_eq!(args[0].ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
} else {
panic!("wrong resolution");
}
}
}

View File

@ -1,5 +1,5 @@
use authc::AuthClientError; use authc::AuthClientError;
pub use network::NetworkError; pub use network::{InitProtocolError, NetworkConnectError, NetworkError};
use network::{ParticipantError, StreamError}; use network::{ParticipantError, StreamError};
#[derive(Debug)] #[derive(Debug)]

View File

@ -1,7 +1,8 @@
#![deny(unsafe_code)] #![deny(unsafe_code)]
#![deny(clippy::clone_on_ref_ptr)] #![deny(clippy::clone_on_ref_ptr)]
#![feature(label_break_value, option_zip)] #![feature(label_break_value, option_zip, str_split_once)]
pub mod addr;
pub mod cmd; pub mod cmd;
pub mod error; pub mod error;
@ -14,6 +15,7 @@ pub use specs::{
Builder, DispatcherBuilder, Entity as EcsEntity, ReadStorage, WorldExt, Builder, DispatcherBuilder, Entity as EcsEntity, ReadStorage, WorldExt,
}; };
use crate::addr::ConnectionArgs;
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
use common::{ use common::{
character::{CharacterId, CharacterItem}, character::{CharacterId, CharacterItem},
@ -48,9 +50,7 @@ use common_net::{
}; };
use common_sys::state::State; use common_sys::state::State;
use comp::BuffKind; use comp::BuffKind;
use futures_executor::block_on; use futures_util::FutureExt;
use futures_timer::Delay;
use futures_util::{select, FutureExt};
use hashbrown::{HashMap, HashSet}; use hashbrown::{HashMap, HashSet};
use image::DynamicImage; use image::DynamicImage;
use network::{Network, Participant, Pid, ProtocolAddr, Stream}; use network::{Network, Participant, Pid, ProtocolAddr, Stream};
@ -59,13 +59,11 @@ use rayon::prelude::*;
use specs::Component; use specs::Component;
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
net::SocketAddr,
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::runtime::Runtime; use tokio::{runtime::Runtime, select};
use tracing::{debug, error, trace, warn}; use tracing::{debug, error, trace, warn};
use uvth::{ThreadPool, ThreadPoolBuilder};
use vek::*; use vek::*;
const PING_ROLLING_AVERAGE_SECS: usize = 10; const PING_ROLLING_AVERAGE_SECS: usize = 10;
@ -131,7 +129,6 @@ pub struct Client {
registered: bool, registered: bool,
presence: Option<PresenceKind>, presence: Option<PresenceKind>,
runtime: Arc<Runtime>, runtime: Arc<Runtime>,
thread_pool: ThreadPool,
server_info: ServerInfo, server_info: ServerInfo,
world_data: WorldData, world_data: WorldData,
player_list: HashMap<Uid, PlayerInfo>, player_list: HashMap<Uid, PlayerInfo>,
@ -187,28 +184,40 @@ pub struct CharacterList {
impl Client { impl Client {
/// Create a new `Client`. /// Create a new `Client`.
pub fn new<A: Into<SocketAddr>>( pub async fn new(
addr: A, addr: ConnectionArgs,
view_distance: Option<u32>, view_distance: Option<u32>,
runtime: Arc<Runtime>, runtime: Arc<Runtime>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let mut thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".into())
.build();
// We reduce the thread count by 1 to keep rendering smooth
thread_pool.set_num_threads((num_cpus::get() - 1).max(1));
let network = Network::new(Pid::new(), Arc::clone(&runtime)); let network = Network::new(Pid::new(), Arc::clone(&runtime));
let participant = block_on(network.connect(ProtocolAddr::Tcp(addr.into())))?; let participant = match addr {
let stream = block_on(participant.opened())?; ConnectionArgs::IpAndPort(addrs) => {
let mut ping_stream = block_on(participant.opened())?; // Try to connect to all IP's and return the first that works
let mut register_stream = block_on(participant.opened())?; let mut participant = None;
let character_screen_stream = block_on(participant.opened())?; for addr in addrs {
let in_game_stream = block_on(participant.opened())?; match network.connect(ProtocolAddr::Tcp(addr)).await {
Ok(p) => {
participant = Some(Ok(p));
break;
},
Err(e) => participant = Some(Err(Error::NetworkErr(e))),
}
}
participant
.unwrap_or_else(|| Err(Error::Other("No Ip Addr provided".to_string())))?
},
ConnectionArgs::Mpsc(id) => network.connect(ProtocolAddr::Mpsc(id)).await?,
};
let stream = participant.opened().await?;
let mut ping_stream = participant.opened().await?;
let mut register_stream = participant.opened().await?;
let character_screen_stream = participant.opened().await?;
let in_game_stream = participant.opened().await?;
register_stream.send(ClientType::Game)?; register_stream.send(ClientType::Game)?;
let server_info: ServerInfo = block_on(register_stream.recv())?; let server_info: ServerInfo = register_stream.recv().await?;
// TODO: Display that versions don't match in Voxygen // TODO: Display that versions don't match in Voxygen
if server_info.git_hash != *common::util::GIT_HASH { if server_info.git_hash != *common::util::GIT_HASH {
@ -236,7 +245,7 @@ impl Client {
recipe_book, recipe_book,
max_group_size, max_group_size,
client_timeout, client_timeout,
) = match block_on(register_stream.recv())? { ) = match register_stream.recv().await? {
ServerInit::GameSync { ServerInit::GameSync {
entity_package, entity_package,
time_of_day, time_of_day,
@ -411,19 +420,12 @@ impl Client {
}?; }?;
ping_stream.send(PingMsg::Ping)?; ping_stream.send(PingMsg::Ping)?;
let mut thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".into())
.build();
// We reduce the thread count by 1 to keep rendering smooth
thread_pool.set_num_threads((num_cpus::get() - 1).max(1));
debug!("Initial sync done"); debug!("Initial sync done");
Ok(Self { Ok(Self {
registered: false, registered: false,
presence: None, presence: None,
runtime, runtime,
thread_pool,
server_info, server_info,
world_data: WorldData { world_data: WorldData {
lod_base, lod_base,
@ -470,13 +472,8 @@ impl Client {
}) })
} }
pub fn with_thread_pool(mut self, thread_pool: ThreadPool) -> Self {
self.thread_pool = thread_pool;
self
}
/// Request a state transition to `ClientState::Registered`. /// Request a state transition to `ClientState::Registered`.
pub fn register( pub async fn register(
&mut self, &mut self,
username: String, username: String,
password: String, password: String,
@ -496,7 +493,7 @@ impl Client {
self.send_msg_err(ClientRegister { token_or_username })?; self.send_msg_err(ClientRegister { token_or_username })?;
match block_on(self.register_stream.recv::<ServerRegisterAnswer>())? { match self.register_stream.recv::<ServerRegisterAnswer>().await? {
Err(RegisterError::AlreadyLoggedIn) => Err(Error::AlreadyLoggedIn), Err(RegisterError::AlreadyLoggedIn) => Err(Error::AlreadyLoggedIn),
Err(RegisterError::AuthError(err)) => Err(Error::AuthErr(err)), Err(RegisterError::AuthError(err)) => Err(Error::AuthErr(err)),
Err(RegisterError::InvalidCharacter) => Err(Error::InvalidCharacter), Err(RegisterError::InvalidCharacter) => Err(Error::InvalidCharacter),
@ -1688,10 +1685,11 @@ impl Client {
let mut handles_msg = 0; let mut handles_msg = 0;
block_on(async { let runtime = Arc::clone(&self.runtime);
runtime.block_on(async {
//TIMEOUT 0.01 ms for msg handling //TIMEOUT 0.01 ms for msg handling
select!( select!(
_ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()), _ = tokio::time::sleep(std::time::Duration::from_micros(10)).fuse() => Ok(()),
err = self.handle_messages(&mut frontend_events, &mut handles_msg).fuse() => err, err = self.handle_messages(&mut frontend_events, &mut handles_msg).fuse() => err,
) )
})?; })?;
@ -1733,12 +1731,10 @@ impl Client {
* 1000.0 * 1000.0
} }
/// Get a reference to the client's worker thread pool. This pool should be /// Get a reference to the client's runtime thread pool. This pool should be
/// used for any computationally expensive operations that run outside /// used for any computationally expensive operations that run outside
/// of the main thread (i.e., threads that block on I/O operations are /// of the main thread (i.e., threads that block on I/O operations are
/// exempt). /// exempt).
pub fn thread_pool(&self) -> &ThreadPool { &self.thread_pool }
pub fn runtime(&self) -> &Arc<Runtime> { &self.runtime } pub fn runtime(&self) -> &Arc<Runtime> { &self.runtime }
/// Get a reference to the client's game state. /// Get a reference to the client's game state.
@ -2042,15 +2038,22 @@ impl Drop for Client {
} else { } else {
trace!("no disconnect msg necessary as client wasn't registered") trace!("no disconnect msg necessary as client wasn't registered")
} }
if let Err(e) = block_on(self.participant.take().unwrap().disconnect()) {
tokio::task::block_in_place(|| {
if let Err(e) = self
.runtime
.block_on(self.participant.take().unwrap().disconnect())
{
warn!(?e, "error when disconnecting, couldn't send all data"); warn!(?e, "error when disconnecting, couldn't send all data");
} }
});
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use std::net::SocketAddr;
#[test] #[test]
/// THIS TEST VERIFIES THE CONSTANT API. /// THIS TEST VERIFIES THE CONSTANT API.
@ -2067,7 +2070,12 @@ mod tests {
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9000); let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9000);
let view_distance: Option<u32> = None; let view_distance: Option<u32> = None;
let runtime = Arc::new(Runtime::new().unwrap()); let runtime = Arc::new(Runtime::new().unwrap());
let veloren_client: Result<Client, Error> = Client::new(socket, view_distance, runtime); let runtime2 = Arc::clone(&runtime);
let veloren_client: Result<Client, Error> = runtime.block_on(Client::new(
ConnectionArgs::IpAndPort(vec![socket]),
view_distance,
runtime2,
));
let _ = veloren_client.map(|mut client| { let _ = veloren_client.map(|mut client| {
//register //register
@ -2075,9 +2083,9 @@ mod tests {
let password: String = "Bar".to_string(); let password: String = "Bar".to_string();
let auth_server: String = "auth.veloren.net".to_string(); let auth_server: String = "auth.veloren.net".to_string();
let _result: Result<(), Error> = let _result: Result<(), Error> =
client.register(username, password, |suggestion: &str| { runtime.block_on(client.register(username, password, |suggestion: &str| {
suggestion == auth_server suggestion == auth_server
}); }));
//clock //clock
let mut clock = Clock::new(Duration::from_secs_f64(SPT)); let mut clock = Clock::new(Duration::from_secs_f64(SPT));

View File

@ -5,8 +5,7 @@ use common::{
event::{EventBus, LocalEvent, ServerEvent}, event::{EventBus, LocalEvent, ServerEvent},
metrics::{PhysicsMetrics, SysMetrics}, metrics::{PhysicsMetrics, SysMetrics},
region::RegionMap, region::RegionMap,
resources, resources::{DeltaTime, GameMode, Time, TimeOfDay},
resources::{DeltaTime, Time, TimeOfDay},
span, span,
terrain::{Block, TerrainChunk, TerrainGrid}, terrain::{Block, TerrainChunk, TerrainGrid},
time::DayPeriod, time::DayPeriod,
@ -90,22 +89,34 @@ pub struct State {
impl State { impl State {
/// Create a new `State` in client mode. /// Create a new `State` in client mode.
pub fn client() -> Self { Self::new(resources::GameMode::Client) } pub fn client() -> Self { Self::new(GameMode::Client) }
/// Create a new `State` in server mode. /// Create a new `State` in server mode.
pub fn server() -> Self { Self::new(resources::GameMode::Server) } pub fn server() -> Self { Self::new(GameMode::Server) }
pub fn new(game_mode: resources::GameMode) -> Self { pub fn new(game_mode: GameMode) -> Self {
let thread_name_infix = match game_mode {
GameMode::Server => "s",
GameMode::Client => "c",
GameMode::Singleplayer => "sp",
};
let thread_pool = Arc::new(
ThreadPoolBuilder::new()
.thread_name(move |i| format!("rayon-{}-{}", thread_name_infix, i))
.build()
.unwrap(),
);
Self { Self {
ecs: Self::setup_ecs_world(game_mode), ecs: Self::setup_ecs_world(game_mode),
thread_pool: Arc::new(ThreadPoolBuilder::new().build().unwrap()), thread_pool,
} }
} }
/// Creates ecs world and registers all the common components and resources /// Creates ecs world and registers all the common components and resources
// TODO: Split up registering into server and client (e.g. move // TODO: Split up registering into server and client (e.g. move
// EventBus<ServerEvent> to the server) // EventBus<ServerEvent> to the server)
fn setup_ecs_world(game_mode: resources::GameMode) -> specs::World { fn setup_ecs_world(game_mode: GameMode) -> specs::World {
let mut ecs = specs::World::new(); let mut ecs = specs::World::new();
// Uids for sync // Uids for sync
ecs.register_sync_marker(); ecs.register_sync_marker();

View File

@ -28,7 +28,7 @@ tracing = { version = "0.1", default-features = false, features = ["attributes"]
prometheus = { version = "0.11", default-features = false, optional = true } prometheus = { version = "0.11", default-features = false, optional = true }
#async #async
futures-core = { version = "0.3", default-features = false } futures-core = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false, features = ["std"] } futures-util = { version = "0.3.7", default-features = false, features = ["std"] }
async-channel = "1.5.1" #use for .close() channels async-channel = "1.5.1" #use for .close() channels
#mpsc channel registry #mpsc channel registry
lazy_static = { version = "1.4", default-features = false } lazy_static = { version = "1.4", default-features = false }
@ -43,7 +43,7 @@ bytes = "^1"
[dev-dependencies] [dev-dependencies]
tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] } tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] }
tokio = { version = "1.2", default-features = false, features = ["io-std", "fs", "rt-multi-thread"] } tokio = { version = "1.2", default-features = false, features = ["io-std", "fs", "rt-multi-thread"] }
futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink", "std"] }
clap = { version = "2.33", default-features = false } clap = { version = "2.33", default-features = false }
shellexpand = "2.0.0" shellexpand = "2.0.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }

View File

@ -31,7 +31,7 @@ fn criterion_util(c: &mut Criterion) {
let (r, _n_a, p_a, s1_a, _n_b, _p_b, _s1_b) = let (r, _n_a, p_a, s1_a, _n_b, _p_b, _s1_b) =
network_participant_stream(ProtocolAddr::Mpsc(5000)); network_participant_stream(ProtocolAddr::Mpsc(5000));
let s2_a = r.block_on(p_a.open(4, Promises::COMPRESSED)).unwrap(); let s2_a = r.block_on(p_a.open(4, Promises::COMPRESSED, 0)).unwrap();
c.throughput(Throughput::Bytes(1000)) c.throughput(Throughput::Bytes(1000))
.bench_function("message_serialize", |b| { .bench_function("message_serialize", |b| {
@ -134,7 +134,7 @@ pub fn network_participant_stream(
let p1_b = n_b.connect(addr).await.unwrap(); let p1_b = n_b.connect(addr).await.unwrap();
let p1_a = n_a.connected().await.unwrap(); let p1_a = n_a.connected().await.unwrap();
let s1_a = p1_a.open(4, Promises::empty()).await.unwrap(); let s1_a = p1_a.open(4, Promises::empty(), 0).await.unwrap();
let s1_b = p1_b.opened().await.unwrap(); let s1_b = p1_b.opened().await.unwrap();
(n_a, p1_a, s1_a, n_b, p1_b, s1_b) (n_a, p1_a, s1_a, n_b, p1_b, s1_b)

View File

@ -130,7 +130,10 @@ async fn client_connection(
Ok(msg) => { Ok(msg) => {
println!("[{}]: {}", username, msg); println!("[{}]: {}", username, msg);
for p in participants.read().await.iter() { for p in participants.read().await.iter() {
match p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await { match p
.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
.await
{
Err(_) => info!("error talking to client, //TODO drop it"), Err(_) => info!("error talking to client, //TODO drop it"),
Ok(mut s) => s.send((username.clone(), msg.clone())).unwrap(), Ok(mut s) => s.send((username.clone(), msg.clone())).unwrap(),
}; };
@ -148,7 +151,7 @@ fn client(address: ProtocolAddr) {
r.block_on(async { r.block_on(async {
let p1 = client.connect(address.clone()).await.unwrap(); //remote representation of p1 let p1 = client.connect(address.clone()).await.unwrap(); //remote representation of p1
let mut s1 = p1 let mut s1 = p1
.open(4, Promises::ORDERED | Promises::CONSISTENCY) .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
.await .await
.unwrap(); //remote representation of s1 .unwrap(); //remote representation of s1
let mut input_lines = io::BufReader::new(io::stdin()); let mut input_lines = io::BufReader::new(io::stdin());

View File

@ -121,8 +121,9 @@ impl Server {
#[allow(clippy::eval_order_dependence)] #[allow(clippy::eval_order_dependence)]
async fn loop_participant(&self, p: Participant) { async fn loop_participant(&self, p: Participant) {
if let (Ok(cmd_out), Ok(file_out), Ok(cmd_in), Ok(file_in)) = ( if let (Ok(cmd_out), Ok(file_out), Ok(cmd_in), Ok(file_in)) = (
p.open(3, Promises::ORDERED | Promises::CONSISTENCY).await, p.open(3, Promises::ORDERED | Promises::CONSISTENCY, 0)
p.open(6, Promises::CONSISTENCY).await, .await,
p.open(6, Promises::CONSISTENCY, 0).await,
p.opened().await, p.opened().await,
p.opened().await, p.opened().await,
) { ) {

View File

@ -164,7 +164,7 @@ fn client(address: ProtocolAddr, runtime: Arc<Runtime>) {
let p1 = runtime.block_on(client.connect(address)).unwrap(); //remote representation of p1 let p1 = runtime.block_on(client.connect(address)).unwrap(); //remote representation of p1
let mut s1 = runtime let mut s1 = runtime
.block_on(p1.open(4, Promises::ORDERED | Promises::CONSISTENCY)) .block_on(p1.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0))
.unwrap(); //remote representation of s1 .unwrap(); //remote representation of s1
let mut last = Instant::now(); let mut last = Instant::now();
let mut id = 0u64; let mut id = 0u64;

View File

@ -1,7 +1,7 @@
[package] [package]
name = "veloren-network-protocol" name = "veloren-network-protocol"
description = "pure Protocol without any I/O itself" description = "pure Protocol without any I/O itself"
version = "0.5.0" version = "0.6.0"
authors = ["Marcel Märtens <marcel.cochem@googlemail.com>"] authors = ["Marcel Märtens <marcel.cochem@googlemail.com>"]
edition = "2018" edition = "2018"
@ -27,7 +27,7 @@ bytes = "^1"
[dev-dependencies] [dev-dependencies]
async-channel = "1.5.1" async-channel = "1.5.1"
tokio = { version = "1.2", default-features = false, features = ["rt", "macros"] } tokio = { version = "^1", default-features = false, features = ["rt", "macros"] }
criterion = { version = "0.3.4", features = ["default", "async_tokio"] } criterion = { version = "0.3.4", features = ["default", "async_tokio"] }
[[bench]] [[bench]]

View File

@ -47,7 +47,6 @@ async fn send_msg<T: SendProtocol>(mut s: T, data: Bytes, cnt: usize) {
for i in 0..cnt { for i in 0..cnt {
s.send(ProtocolEvent::Message { s.send(ProtocolEvent::Message {
sid: Sid::new(12), sid: Sid::new(12),
mid: i as u64,
data: data.clone(), data: data.clone(),
}) })
.await .await
@ -93,7 +92,6 @@ fn criterion_util(c: &mut Criterion) {
let mut buffer = BytesMut::with_capacity(1500); let mut buffer = BytesMut::with_capacity(1500);
let frame = OTFrame::Data { let frame = OTFrame::Data {
mid: 65, mid: 65,
start: 89u64,
data: Bytes::from(&b"hello_world"[..]), data: Bytes::from(&b"hello_world"[..]),
}; };
b.iter_with_setup( b.iter_with_setup(

View File

@ -0,0 +1,54 @@
/// All possible Errors that can happen during Handshake [`InitProtocol`]
///
/// [`InitProtocol`]: crate::InitProtocol
#[derive(Debug, PartialEq)]
pub enum InitProtocolError {
Closed,
WrongMagicNumber([u8; 7]),
WrongVersion([u32; 3]),
}
/// When you return closed you must stay closed!
#[derive(Debug, PartialEq)]
pub enum ProtocolError {
Closed,
}
impl From<ProtocolError> for InitProtocolError {
fn from(err: ProtocolError) -> Self {
match err {
ProtocolError::Closed => InitProtocolError::Closed,
}
}
}
impl core::fmt::Display for InitProtocolError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
InitProtocolError::Closed => write!(f, "Channel closed"),
InitProtocolError::WrongMagicNumber(r) => write!(
f,
"Magic Number doesn't match, remote side send '{:?}' instead of '{:?}'",
&r,
&crate::types::VELOREN_MAGIC_NUMBER
),
InitProtocolError::WrongVersion(r) => write!(
f,
"Network doesn't match, remote side send '{:?}' we are on '{:?}'",
&r,
&crate::types::VELOREN_NETWORK_VERSION
),
}
}
}
impl core::fmt::Display for ProtocolError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
ProtocolError::Closed => write!(f, "Channel closed"),
}
}
}
impl std::error::Error for InitProtocolError {}
impl std::error::Error for ProtocolError {}

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
frame::OTFrame, frame::OTFrame,
types::{Bandwidth, Mid, Prio, Promises, Sid}, types::{Bandwidth, Prio, Promises, Sid},
}; };
use bytes::Bytes; use bytes::Bytes;
@ -23,7 +23,6 @@ pub enum ProtocolEvent {
}, },
Message { Message {
data: Bytes, data: Bytes,
mid: Mid,
sid: Sid, sid: Sid,
}, },
} }
@ -36,11 +35,12 @@ impl ProtocolEvent {
sid, sid,
prio, prio,
promises, promises,
guaranteed_bandwidth: _, guaranteed_bandwidth,
} => OTFrame::OpenStream { } => OTFrame::OpenStream {
sid: *sid, sid: *sid,
prio: *prio, prio: *prio,
promises: *promises, promises: *promises,
guaranteed_bandwidth: *guaranteed_bandwidth,
}, },
ProtocolEvent::CloseStream { sid } => OTFrame::CloseStream { sid: *sid }, ProtocolEvent::CloseStream { sid } => OTFrame::CloseStream { sid: *sid },
ProtocolEvent::Message { .. } => { ProtocolEvent::Message { .. } => {
@ -68,7 +68,6 @@ mod tests {
fn test_msg_buffer_panic() { fn test_msg_buffer_panic() {
let _ = ProtocolEvent::Message { let _ = ProtocolEvent::Message {
data: Bytes::new(), data: Bytes::new(),
mid: 0,
sid: Sid::new(23), sid: Sid::new(23),
} }
.to_frame(); .to_frame();

View File

@ -1,4 +1,4 @@
use crate::types::{Mid, Pid, Prio, Promises, Sid}; use crate::types::{Bandwidth, Mid, Pid, Prio, Promises, Sid};
use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytes::{Buf, BufMut, Bytes, BytesMut};
// const FRAME_RESERVED_1: u8 = 0; // const FRAME_RESERVED_1: u8 = 0;
@ -38,6 +38,7 @@ pub enum OTFrame {
sid: Sid, sid: Sid,
prio: Prio, prio: Prio,
promises: Promises, promises: Promises,
guaranteed_bandwidth: Bandwidth,
}, },
CloseStream { CloseStream {
sid: Sid, sid: Sid,
@ -49,7 +50,6 @@ pub enum OTFrame {
}, },
Data { Data {
mid: Mid, mid: Mid,
start: u64, /* remove */
data: Bytes, data: Bytes,
}, },
} }
@ -63,6 +63,7 @@ pub enum ITFrame {
sid: Sid, sid: Sid,
prio: Prio, prio: Prio,
promises: Promises, promises: Promises,
guaranteed_bandwidth: Bandwidth,
}, },
CloseStream { CloseStream {
sid: Sid, sid: Sid,
@ -74,7 +75,6 @@ pub enum ITFrame {
}, },
Data { Data {
mid: Mid, mid: Mid,
start: u64, /* remove */
data: BytesMut, data: BytesMut,
}, },
} }
@ -161,9 +161,9 @@ impl InitFrame {
pub(crate) const TCP_CLOSE_STREAM_CNS: usize = 8; pub(crate) const TCP_CLOSE_STREAM_CNS: usize = 8;
/// const part of the DATA frame, actual size is variable /// const part of the DATA frame, actual size is variable
pub(crate) const TCP_DATA_CNS: usize = 18; pub(crate) const TCP_DATA_CNS: usize = 10;
pub(crate) const TCP_DATA_HEADER_CNS: usize = 24; pub(crate) const TCP_DATA_HEADER_CNS: usize = 24;
pub(crate) const TCP_OPEN_STREAM_CNS: usize = 10; pub(crate) const TCP_OPEN_STREAM_CNS: usize = 18;
// Size WITHOUT the 1rst indicating byte // Size WITHOUT the 1rst indicating byte
pub(crate) const TCP_SHUTDOWN_CNS: usize = 0; pub(crate) const TCP_SHUTDOWN_CNS: usize = 0;
@ -177,11 +177,13 @@ impl OTFrame {
sid, sid,
prio, prio,
promises, promises,
guaranteed_bandwidth,
} => { } => {
bytes.put_u8(FRAME_OPEN_STREAM); bytes.put_u8(FRAME_OPEN_STREAM);
sid.to_bytes(bytes); sid.to_bytes(bytes);
bytes.put_u8(prio); bytes.put_u8(prio);
bytes.put_u8(promises.to_le_bytes()[0]); bytes.put_u8(promises.to_le_bytes()[0]);
bytes.put_u64_le(guaranteed_bandwidth);
}, },
Self::CloseStream { sid } => { Self::CloseStream { sid } => {
bytes.put_u8(FRAME_CLOSE_STREAM); bytes.put_u8(FRAME_CLOSE_STREAM);
@ -193,10 +195,9 @@ impl OTFrame {
sid.to_bytes(bytes); sid.to_bytes(bytes);
bytes.put_u64_le(length); bytes.put_u64_le(length);
}, },
Self::Data { mid, start, data } => { Self::Data { mid, data } => {
bytes.put_u8(FRAME_DATA); bytes.put_u8(FRAME_DATA);
bytes.put_u64_le(mid); bytes.put_u64_le(mid);
bytes.put_u64_le(start);
bytes.put_u16_le(data.len() as u16); bytes.put_u16_le(data.len() as u16);
bytes.put_slice(&data); bytes.put_slice(&data);
}, },
@ -216,10 +217,10 @@ impl ITFrame {
FRAME_CLOSE_STREAM => TCP_CLOSE_STREAM_CNS, FRAME_CLOSE_STREAM => TCP_CLOSE_STREAM_CNS,
FRAME_DATA_HEADER => TCP_DATA_HEADER_CNS, FRAME_DATA_HEADER => TCP_DATA_HEADER_CNS,
FRAME_DATA => { FRAME_DATA => {
if bytes.len() < 17 + 1 + 1 { if bytes.len() < 9 + 1 + 1 {
return None; return None;
} }
u16::from_le_bytes([bytes[16 + 1], bytes[17 + 1]]) as usize + TCP_DATA_CNS u16::from_le_bytes([bytes[8 + 1], bytes[9 + 1]]) as usize + TCP_DATA_CNS
}, },
_ => return None, _ => return None,
}; };
@ -240,6 +241,7 @@ impl ITFrame {
sid: Sid::from_bytes(&mut bytes), sid: Sid::from_bytes(&mut bytes),
prio: bytes.get_u8(), prio: bytes.get_u8(),
promises: Promises::from_bits_truncate(bytes.get_u8()), promises: Promises::from_bits_truncate(bytes.get_u8()),
guaranteed_bandwidth: bytes.get_u64_le(),
} }
}, },
FRAME_CLOSE_STREAM => { FRAME_CLOSE_STREAM => {
@ -261,11 +263,10 @@ impl ITFrame {
FRAME_DATA => { FRAME_DATA => {
bytes.advance(1); bytes.advance(1);
let mid = bytes.get_u64_le(); let mid = bytes.get_u64_le();
let start = bytes.get_u64_le();
let length = bytes.get_u16_le(); let length = bytes.get_u16_le();
debug_assert_eq!(length as usize, size - TCP_DATA_CNS); debug_assert_eq!(length as usize, size - TCP_DATA_CNS);
let data = bytes.split_to(length as usize); let data = bytes.split_to(length as usize);
Self::Data { mid, start, data } Self::Data { mid, data }
}, },
_ => unreachable!("Frame::to_frame should be handled before!"), _ => unreachable!("Frame::to_frame should be handled before!"),
}; };
@ -282,16 +283,18 @@ impl PartialEq<ITFrame> for OTFrame {
sid, sid,
prio, prio,
promises, promises,
guaranteed_bandwidth,
} => matches!(other, ITFrame::OpenStream { } => matches!(other, ITFrame::OpenStream {
sid, sid,
prio, prio,
promises promises,
guaranteed_bandwidth,
}), }),
Self::CloseStream { sid } => matches!(other, ITFrame::CloseStream { sid }), Self::CloseStream { sid } => matches!(other, ITFrame::CloseStream { sid }),
Self::DataHeader { mid, sid, length } => { Self::DataHeader { mid, sid, length } => {
matches!(other, ITFrame::DataHeader { mid, sid, length }) matches!(other, ITFrame::DataHeader { mid, sid, length })
}, },
Self::Data { mid, start, data } => matches!(other, ITFrame::Data { mid, start, data }), Self::Data { mid, data } => matches!(other, ITFrame::Data { mid, data }),
} }
} }
} }
@ -321,6 +324,7 @@ mod tests {
sid: Sid::new(1337), sid: Sid::new(1337),
prio: 14, prio: 14,
promises: Promises::GUARANTEED_DELIVERY, promises: Promises::GUARANTEED_DELIVERY,
guaranteed_bandwidth: 1_000_000,
}, },
OTFrame::DataHeader { OTFrame::DataHeader {
sid: Sid::new(1337), sid: Sid::new(1337),
@ -329,12 +333,10 @@ mod tests {
}, },
OTFrame::Data { OTFrame::Data {
mid: 0, mid: 0,
start: 0,
data: Bytes::from(&[77u8; 20][..]), data: Bytes::from(&[77u8; 20][..]),
}, },
OTFrame::Data { OTFrame::Data {
mid: 0, mid: 0,
start: 20,
data: Bytes::from(&[42u8; 16][..]), data: Bytes::from(&[42u8; 16][..]),
}, },
OTFrame::CloseStream { OTFrame::CloseStream {
@ -496,6 +498,7 @@ mod tests {
sid: Sid::new(88), sid: Sid::new(88),
promises: Promises::ENCRYPTED, promises: Promises::ENCRYPTED,
prio: 88, prio: 88,
guaranteed_bandwidth: 1_000_000,
}; };
OTFrame::write_bytes(frame1, &mut buffer); OTFrame::write_bytes(frame1, &mut buffer);
} }
@ -508,6 +511,7 @@ mod tests {
sid: Sid::new(88), sid: Sid::new(88),
promises: Promises::ENCRYPTED, promises: Promises::ENCRYPTED,
prio: 88, prio: 88,
guaranteed_bandwidth: 1_000_000,
}; };
OTFrame::write_bytes(frame1, &mut buffer); OTFrame::write_bytes(frame1, &mut buffer);
buffer.truncate(6); // simulate partial retrieve buffer.truncate(6); // simulate partial retrieve
@ -527,12 +531,11 @@ mod tests {
let frame1 = OTFrame::Data { let frame1 = OTFrame::Data {
mid: 7u64, mid: 7u64,
start: 1u64,
data: Bytes::from(&b"foobar"[..]), data: Bytes::from(&b"foobar"[..]),
}; };
OTFrame::write_bytes(frame1, &mut buffer); OTFrame::write_bytes(frame1, &mut buffer);
buffer[17] = 255; buffer[9] = 255;
let framed = ITFrame::read_frame(&mut buffer); let framed = ITFrame::read_frame(&mut buffer);
assert_eq!(framed, None); assert_eq!(framed, None);
} }
@ -543,18 +546,16 @@ mod tests {
let frame1 = OTFrame::Data { let frame1 = OTFrame::Data {
mid: 7u64, mid: 7u64,
start: 1u64,
data: Bytes::from(&b"foobar"[..]), data: Bytes::from(&b"foobar"[..]),
}; };
OTFrame::write_bytes(frame1, &mut buffer); OTFrame::write_bytes(frame1, &mut buffer);
buffer[17] = 3; buffer[9] = 3;
let framed = ITFrame::read_frame(&mut buffer); let framed = ITFrame::read_frame(&mut buffer);
assert_eq!( assert_eq!(
framed, framed,
Some(ITFrame::Data { Some(ITFrame::Data {
mid: 7u64, mid: 7u64,
start: 1u64,
data: BytesMut::from(&b"foo"[..]), data: BytesMut::from(&b"foo"[..]),
}) })
); );

View File

@ -1,10 +1,11 @@
use crate::{ use crate::{
error::{InitProtocolError, ProtocolError},
frame::InitFrame, frame::InitFrame,
types::{ types::{
Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER, Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER,
VELOREN_NETWORK_VERSION, VELOREN_NETWORK_VERSION,
}, },
InitProtocol, InitProtocolError, ProtocolError, InitProtocol,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use tracing::{debug, error, info, trace}; use tracing::{debug, error, info, trace};
@ -80,7 +81,9 @@ where
.send(InitFrame::Raw(WRONG_NUMBER.as_bytes().to_vec())) .send(InitFrame::Raw(WRONG_NUMBER.as_bytes().to_vec()))
.await?; .await?;
Err(InitProtocolError::WrongMagicNumber(magic_number)) Err(InitProtocolError::WrongMagicNumber(magic_number))
} else if version != VELOREN_NETWORK_VERSION { } else if version[0] != VELOREN_NETWORK_VERSION[0]
|| version[1] != VELOREN_NETWORK_VERSION[1]
{
error!(?version, "Connection with wrong network version"); error!(?version, "Connection with wrong network version");
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
drain drain

View File

@ -49,6 +49,7 @@
//! [`RecvProtocol`]: crate::RecvProtocol //! [`RecvProtocol`]: crate::RecvProtocol
//! [`InitProtocol`]: crate::InitProtocol //! [`InitProtocol`]: crate::InitProtocol
mod error;
mod event; mod event;
mod frame; mod frame;
mod handshake; mod handshake;
@ -59,15 +60,14 @@ mod prio;
mod tcp; mod tcp;
mod types; mod types;
pub use error::{InitProtocolError, ProtocolError};
pub use event::ProtocolEvent; pub use event::ProtocolEvent;
pub use metrics::ProtocolMetricCache; pub use metrics::ProtocolMetricCache;
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
pub use metrics::ProtocolMetrics; pub use metrics::ProtocolMetrics;
pub use mpsc::{MpscMsg, MpscRecvProtocol, MpscSendProtocol}; pub use mpsc::{MpscMsg, MpscRecvProtocol, MpscSendProtocol};
pub use tcp::{TcpRecvProtocol, TcpSendProtocol}; pub use tcp::{TcpRecvProtocol, TcpSendProtocol};
pub use types::{ pub use types::{Bandwidth, Cid, Pid, Prio, Promises, Sid, HIGHEST_PRIO, VELOREN_NETWORK_VERSION};
Bandwidth, Cid, Mid, Pid, Prio, Promises, Sid, HIGHEST_PRIO, VELOREN_NETWORK_VERSION,
};
///use at own risk, might change any time, for internal benchmarks ///use at own risk, might change any time, for internal benchmarks
pub mod _internal { pub mod _internal {
@ -152,27 +152,3 @@ pub trait UnreliableSink: Send {
type DataFormat; type DataFormat;
async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError>; async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError>;
} }
/// All possible Errors that can happen during Handshake [`InitProtocol`]
///
/// [`InitProtocol`]: crate::InitProtocol
#[derive(Debug, PartialEq)]
pub enum InitProtocolError {
Closed,
WrongMagicNumber([u8; 7]),
WrongVersion([u32; 3]),
}
/// When you return closed you must stay closed!
#[derive(Debug, PartialEq)]
pub enum ProtocolError {
Closed,
}
impl From<ProtocolError> for InitProtocolError {
fn from(err: ProtocolError) -> Self {
match err {
ProtocolError::Closed => InitProtocolError::Closed,
}
}
}

View File

@ -57,12 +57,10 @@ impl OTMessage {
fn get_next_data(&mut self) -> OTFrame { fn get_next_data(&mut self) -> OTFrame {
let to_send = std::cmp::min(self.data.len(), Self::FRAME_DATA_SIZE as usize); let to_send = std::cmp::min(self.data.len(), Self::FRAME_DATA_SIZE as usize);
let data = self.data.split_to(to_send); let data = self.data.split_to(to_send);
let start = self.start;
self.start += Self::FRAME_DATA_SIZE; self.start += Self::FRAME_DATA_SIZE;
OTFrame::Data { OTFrame::Data {
mid: self.mid, mid: self.mid,
start,
data, data,
} }
} }

View File

@ -1,12 +1,13 @@
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
use crate::metrics::RemoveReason; use crate::metrics::RemoveReason;
use crate::{ use crate::{
error::ProtocolError,
event::ProtocolEvent, event::ProtocolEvent,
frame::InitFrame, frame::InitFrame,
handshake::{ReliableDrain, ReliableSink}, handshake::{ReliableDrain, ReliableSink},
metrics::ProtocolMetricCache, metrics::ProtocolMetricCache,
types::Bandwidth, types::Bandwidth,
ProtocolError, RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink, RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -78,7 +79,6 @@ where
match &event { match &event {
ProtocolEvent::Message { ProtocolEvent::Message {
data: _data, data: _data,
mid: _,
sid: _sid, sid: _sid,
} => { } => {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
@ -118,7 +118,7 @@ where
MpscMsg::Event(e) => { MpscMsg::Event(e) => {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
{ {
if let ProtocolEvent::Message { data, mid: _, sid } = &e { if let ProtocolEvent::Message { data, sid } = &e {
let sid = *sid; let sid = *sid;
let bytes = data.len() as u64; let bytes = data.len() as u64;
let line = self.metrics.init_sid(sid); let line = self.metrics.init_sid(sid);

View File

@ -1,4 +1,5 @@
use crate::{ use crate::{
error::ProtocolError,
event::ProtocolEvent, event::ProtocolEvent,
frame::{ITFrame, InitFrame, OTFrame}, frame::{ITFrame, InitFrame, OTFrame},
handshake::{ReliableDrain, ReliableSink}, handshake::{ReliableDrain, ReliableSink},
@ -6,7 +7,7 @@ use crate::{
metrics::{ProtocolMetricCache, RemoveReason}, metrics::{ProtocolMetricCache, RemoveReason},
prio::PrioManager, prio::PrioManager,
types::{Bandwidth, Mid, Sid}, types::{Bandwidth, Mid, Sid},
ProtocolError, RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink, RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use bytes::BytesMut; use bytes::BytesMut;
@ -28,6 +29,7 @@ where
{ {
buffer: BytesMut, buffer: BytesMut,
store: PrioManager, store: PrioManager,
next_mid: Mid,
closing_streams: Vec<Sid>, closing_streams: Vec<Sid>,
notify_closing_streams: Vec<Sid>, notify_closing_streams: Vec<Sid>,
pending_shutdown: bool, pending_shutdown: bool,
@ -59,6 +61,7 @@ where
Self { Self {
buffer: BytesMut::new(), buffer: BytesMut::new(),
store: PrioManager::new(metrics.clone()), store: PrioManager::new(metrics.clone()),
next_mid: 0u64,
closing_streams: vec![], closing_streams: vec![],
notify_closing_streams: vec![], notify_closing_streams: vec![],
pending_shutdown: false, pending_shutdown: false,
@ -146,9 +149,10 @@ where
self.pending_shutdown = true; self.pending_shutdown = true;
} }
}, },
ProtocolEvent::Message { data, mid, sid } => { ProtocolEvent::Message { data, sid } => {
self.metrics.smsg_ib(sid, data.len() as u64); self.metrics.smsg_ib(sid, data.len() as u64);
self.store.add(data, mid, sid); self.store.add(data, self.next_mid, sid);
self.next_mid += 1;
}, },
} }
Ok(()) Ok(())
@ -160,12 +164,7 @@ where
let mut data_frames = 0; let mut data_frames = 0;
let mut data_bandwidth = 0; let mut data_bandwidth = 0;
for frame in frames { for frame in frames {
if let OTFrame::Data { if let OTFrame::Data { mid: _, data } = &frame {
mid: _,
start: _,
data,
} = &frame
{
data_bandwidth += data.len(); data_bandwidth += data.len();
data_frames += 1; data_frames += 1;
} }
@ -228,12 +227,13 @@ where
sid, sid,
prio, prio,
promises, promises,
guaranteed_bandwidth,
} => { } => {
break 'outer Ok(ProtocolEvent::OpenStream { break 'outer Ok(ProtocolEvent::OpenStream {
sid, sid,
prio: prio.min(crate::types::HIGHEST_PRIO), prio: prio.min(crate::types::HIGHEST_PRIO),
promises, promises,
guaranteed_bandwidth: 1_000_000, guaranteed_bandwidth,
}); });
}, },
ITFrame::CloseStream { sid } => { ITFrame::CloseStream { sid } => {
@ -244,11 +244,7 @@ where
self.metrics.rmsg_ib(sid, length); self.metrics.rmsg_ib(sid, length);
self.incoming.insert(mid, m); self.incoming.insert(mid, m);
}, },
ITFrame::Data { ITFrame::Data { mid, data } => {
mid,
start: _,
data,
} => {
self.metrics.rdata_frames_b(data.len() as u64); self.metrics.rdata_frames_b(data.len() as u64);
let m = match self.incoming.get_mut(&mid) { let m = match self.incoming.get_mut(&mid) {
Some(m) => m, Some(m) => m,
@ -271,7 +267,6 @@ where
); );
break 'outer Ok(ProtocolEvent::Message { break 'outer Ok(ProtocolEvent::Message {
sid: m.sid, sid: m.sid,
mid,
data: m.data.freeze(), data: m.data.freeze(),
}); });
} }
@ -383,11 +378,12 @@ mod test_utils {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::{ use crate::{
error::ProtocolError,
frame::OTFrame, frame::OTFrame,
metrics::{ProtocolMetricCache, ProtocolMetrics, RemoveReason}, metrics::{ProtocolMetricCache, ProtocolMetrics, RemoveReason},
tcp::test_utils::*, tcp::test_utils::*,
types::{Pid, Promises, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2}, types::{Pid, Promises, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2},
InitProtocol, ProtocolError, ProtocolEvent, RecvProtocol, SendProtocol, InitProtocol, ProtocolEvent, RecvProtocol, SendProtocol,
}; };
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
@ -431,7 +427,6 @@ mod tests {
let _ = r.recv().await.unwrap(); let _ = r.recv().await.unwrap();
let event = ProtocolEvent::Message { let event = ProtocolEvent::Message {
sid: Sid::new(10), sid: Sid::new(10),
mid: 0,
data: Bytes::from(&[188u8; 600][..]), data: Bytes::from(&[188u8; 600][..]),
}; };
s.send(event.clone()).await.unwrap(); s.send(event.clone()).await.unwrap();
@ -441,7 +436,6 @@ mod tests {
// 2nd short message // 2nd short message
let event = ProtocolEvent::Message { let event = ProtocolEvent::Message {
sid: Sid::new(10), sid: Sid::new(10),
mid: 1,
data: Bytes::from(&[7u8; 30][..]), data: Bytes::from(&[7u8; 30][..]),
}; };
s.send(event.clone()).await.unwrap(); s.send(event.clone()).await.unwrap();
@ -467,7 +461,6 @@ mod tests {
let _ = r.recv().await.unwrap(); let _ = r.recv().await.unwrap();
let event = ProtocolEvent::Message { let event = ProtocolEvent::Message {
sid, sid,
mid: 77,
data: Bytes::from(&[99u8; 500_000][..]), data: Bytes::from(&[99u8; 500_000][..]),
}; };
s.send(event.clone()).await.unwrap(); s.send(event.clone()).await.unwrap();
@ -495,7 +488,6 @@ mod tests {
let _ = r.recv().await.unwrap(); let _ = r.recv().await.unwrap();
let event = ProtocolEvent::Message { let event = ProtocolEvent::Message {
sid, sid,
mid: 77,
data: Bytes::from(&[99u8; 500_000][..]), data: Bytes::from(&[99u8; 500_000][..]),
}; };
s.send(event).await.unwrap(); s.send(event).await.unwrap();
@ -524,7 +516,6 @@ mod tests {
let _ = r.recv().await.unwrap(); let _ = r.recv().await.unwrap();
let event = ProtocolEvent::Message { let event = ProtocolEvent::Message {
sid, sid,
mid: 77,
data: Bytes::from(&[99u8; 500_000][..]), data: Bytes::from(&[99u8; 500_000][..]),
}; };
s.send(event).await.unwrap(); s.send(event).await.unwrap();
@ -556,14 +547,12 @@ mod tests {
s.send(event).await.unwrap(); s.send(event).await.unwrap();
let event = ProtocolEvent::Message { let event = ProtocolEvent::Message {
sid, sid,
mid: 77,
data: Bytes::from(&[99u8; 500_000][..]), data: Bytes::from(&[99u8; 500_000][..]),
}; };
s.send(event).await.unwrap(); s.send(event).await.unwrap();
s.flush(1_000_000, Duration::from_secs(1)).await.unwrap(); s.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
let event = ProtocolEvent::Message { let event = ProtocolEvent::Message {
sid, sid,
mid: 78,
data: Bytes::from(&[100u8; 500_000][..]), data: Bytes::from(&[100u8; 500_000][..]),
}; };
s.send(event).await.unwrap(); s.send(event).await.unwrap();
@ -593,6 +582,7 @@ mod tests {
sid, sid,
prio: 5u8, prio: 5u8,
promises: Promises::COMPRESSED, promises: Promises::COMPRESSED,
guaranteed_bandwidth: 1_000_000,
} }
.write_bytes(&mut bytes); .write_bytes(&mut bytes);
OTFrame::DataHeader { OTFrame::DataHeader {
@ -605,13 +595,11 @@ mod tests {
OTFrame::Data { OTFrame::Data {
mid: 99, mid: 99,
start: 0,
data: Bytes::from(&DATA1[..]), data: Bytes::from(&DATA1[..]),
} }
.write_bytes(&mut bytes); .write_bytes(&mut bytes);
OTFrame::Data { OTFrame::Data {
mid: 99, mid: 99,
start: DATA1.len() as u64,
data: Bytes::from(&DATA2[..]), data: Bytes::from(&DATA2[..]),
} }
.write_bytes(&mut bytes); .write_bytes(&mut bytes);
@ -641,6 +629,7 @@ mod tests {
sid, sid,
prio: 5u8, prio: 5u8,
promises: Promises::COMPRESSED, promises: Promises::COMPRESSED,
guaranteed_bandwidth: 1_000_000,
} }
.write_bytes(&mut bytes); .write_bytes(&mut bytes);
s.send(bytes.split()).await.unwrap(); s.send(bytes.split()).await.unwrap();
@ -670,7 +659,6 @@ mod tests {
let _ = p2.1.recv().await.unwrap(); let _ = p2.1.recv().await.unwrap();
let event = ProtocolEvent::Message { let event = ProtocolEvent::Message {
sid: Sid::new(10), sid: Sid::new(10),
mid: 0,
data: Bytes::from(&[188u8; 600][..]), data: Bytes::from(&[188u8; 600][..]),
}; };
p2.0.send(event.clone()).await.unwrap(); p2.0.send(event.clone()).await.unwrap();
@ -695,7 +683,6 @@ mod tests {
p2.0.notify_from_recv(e); p2.0.notify_from_recv(e);
let event = ProtocolEvent::Message { let event = ProtocolEvent::Message {
sid: Sid::new(10), sid: Sid::new(10),
mid: 0,
data: Bytes::from(&[188u8; 600][..]), data: Bytes::from(&[188u8; 600][..]),
}; };
p2.0.send(event.clone()).await.unwrap(); p2.0.send(event.clone()).await.unwrap();

View File

@ -49,7 +49,7 @@ impl Promises {
pub(crate) const VELOREN_MAGIC_NUMBER: [u8; 7] = *b"VELOREN"; pub(crate) const VELOREN_MAGIC_NUMBER: [u8; 7] = *b"VELOREN";
/// When this semver differs, 2 Networks can't communicate. /// When this semver differs, 2 Networks can't communicate.
pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 5, 0]; pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 6, 0];
pub(crate) const STREAM_ID_OFFSET1: Sid = Sid::new(0); pub(crate) const STREAM_ID_OFFSET1: Sid = Sid::new(0);
pub(crate) const STREAM_ID_OFFSET2: Sid = Sid::new(u64::MAX / 2); pub(crate) const STREAM_ID_OFFSET2: Sid = Sid::new(u64::MAX / 2);
/// Maximal possible Prio to choose (for performance reasons) /// Maximal possible Prio to choose (for performance reasons)

View File

@ -1,12 +1,12 @@
use crate::{ use crate::{
message::{partial_eq_bincode, Message}, message::{partial_eq_bincode, Message},
participant::{A2bStreamOpen, S2bShutdownBparticipant}, participant::{A2bStreamOpen, S2bShutdownBparticipant},
scheduler::Scheduler, scheduler::{A2sConnect, Scheduler},
}; };
use bytes::Bytes; use bytes::Bytes;
#[cfg(feature = "compression")] #[cfg(feature = "compression")]
use lz_fear::raw::DecodeError; use lz_fear::raw::DecodeError;
use network_protocol::{Bandwidth, Pid, Prio, Promises, Sid}; use network_protocol::{Bandwidth, InitProtocolError, Pid, Prio, Promises, Sid};
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
use prometheus::Registry; use prometheus::Registry;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
@ -83,7 +83,16 @@ pub struct Stream {
pub enum NetworkError { pub enum NetworkError {
NetworkClosed, NetworkClosed,
ListenFailed(std::io::Error), ListenFailed(std::io::Error),
ConnectFailed(std::io::Error), ConnectFailed(NetworkConnectError),
}
/// Error type thrown by [`Networks`](Network) connect
#[derive(Debug)]
pub enum NetworkConnectError {
/// Either a Pid UUID clash or you are trying to hijack a connection
InvalidSecret,
Handshake(InitProtocolError),
Io(std::io::Error),
} }
/// Error type thrown by [`Participants`](Participant) methods /// Error type thrown by [`Participants`](Participant) methods
@ -149,10 +158,8 @@ pub struct Network {
local_pid: Pid, local_pid: Pid,
runtime: Arc<Runtime>, runtime: Arc<Runtime>,
participant_disconnect_sender: Mutex<HashMap<Pid, A2sDisconnect>>, participant_disconnect_sender: Mutex<HashMap<Pid, A2sDisconnect>>,
listen_sender: listen_sender: Mutex<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<io::Result<()>>)>>,
Mutex<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<tokio::io::Result<()>>)>>, connect_sender: Mutex<mpsc::UnboundedSender<A2sConnect>>,
connect_sender:
Mutex<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<io::Result<Participant>>)>>,
connected_receiver: Mutex<mpsc::UnboundedReceiver<Participant>>, connected_receiver: Mutex<mpsc::UnboundedReceiver<Participant>>,
shutdown_sender: Option<oneshot::Sender<()>>, shutdown_sender: Option<oneshot::Sender<()>>,
} }
@ -348,7 +355,8 @@ impl Network {
/// [`ProtocolAddres`]: crate::api::ProtocolAddr /// [`ProtocolAddres`]: crate::api::ProtocolAddr
#[instrument(name="network", skip(self, address), fields(p = %self.local_pid))] #[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
pub async fn connect(&self, address: ProtocolAddr) -> Result<Participant, NetworkError> { pub async fn connect(&self, address: ProtocolAddr) -> Result<Participant, NetworkError> {
let (pid_sender, pid_receiver) = oneshot::channel::<io::Result<Participant>>(); let (pid_sender, pid_receiver) =
oneshot::channel::<Result<Participant, NetworkConnectError>>();
debug!(?address, "Connect to address"); debug!(?address, "Connect to address");
self.connect_sender self.connect_sender
.lock() .lock()
@ -438,6 +446,9 @@ impl Participant {
/// link for further documentation. You can combine them, e.g. /// link for further documentation. You can combine them, e.g.
/// `Promises::ORDERED | Promises::CONSISTENCY` The Stream will then /// `Promises::ORDERED | Promises::CONSISTENCY` The Stream will then
/// guarantee that those promises are met. /// guarantee that those promises are met.
/// * `bandwidth` - sets a guaranteed bandwidth which is reserved for this
/// stream. When excess bandwidth is available it will be used. See
/// [`Bandwidth`] for details.
/// ///
/// A [`ParticipantError`] might be thrown if the `Participant` is already /// A [`ParticipantError`] might be thrown if the `Participant` is already
/// closed. [`Streams`] can be created without a answer from the remote /// closed. [`Streams`] can be created without a answer from the remote
@ -460,7 +471,7 @@ impl Participant {
/// .connect(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap())) /// .connect(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap()))
/// .await?; /// .await?;
/// let _s1 = p1 /// let _s1 = p1
/// .open(4, Promises::ORDERED | Promises::CONSISTENCY) /// .open(4, Promises::ORDERED | Promises::CONSISTENCY, 1000)
/// .await?; /// .await?;
/// # Ok(()) /// # Ok(())
/// }) /// })
@ -468,16 +479,22 @@ impl Participant {
/// ``` /// ```
/// ///
/// [`Prio`]: network_protocol::Prio /// [`Prio`]: network_protocol::Prio
/// [`Bandwidth`]: network_protocol::Bandwidth
/// [`Promises`]: network_protocol::Promises /// [`Promises`]: network_protocol::Promises
/// [`Streams`]: crate::api::Stream /// [`Streams`]: crate::api::Stream
#[instrument(name="network", skip(self, prio, promises), fields(p = %self.local_pid))] #[instrument(name="network", skip(self, prio, promises), fields(p = %self.local_pid))]
pub async fn open(&self, prio: u8, promises: Promises) -> Result<Stream, ParticipantError> { pub async fn open(
&self,
prio: u8,
promises: Promises,
bandwidth: Bandwidth,
) -> Result<Stream, ParticipantError> {
debug_assert!(prio <= network_protocol::HIGHEST_PRIO, "invalid prio"); debug_assert!(prio <= network_protocol::HIGHEST_PRIO, "invalid prio");
let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel::<Stream>(); let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel::<Stream>();
if let Err(e) = self.a2b_open_stream_s.lock().await.send(( if let Err(e) = self.a2b_open_stream_s.lock().await.send((
prio, prio,
promises, promises,
1_000_000, bandwidth,
p2a_return_stream_s, p2a_return_stream_s,
)) { )) {
debug!(?e, "bParticipant is already closed, notifying"); debug!(?e, "bParticipant is already closed, notifying");
@ -519,7 +536,7 @@ impl Participant {
/// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?; /// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
/// let p1 = network.connect(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?; /// let p1 = network.connect(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
/// # let p2 = remote.connected().await?; /// # let p2 = remote.connected().await?;
/// # p2.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; /// # p2.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
/// let _s1 = p1.opened().await?; /// let _s1 = p1.opened().await?;
/// # Ok(()) /// # Ok(())
/// }) /// })
@ -704,7 +721,7 @@ impl Stream {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?; /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
/// # // keep it alive /// # // keep it alive
/// # let _stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; /// # let _stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
/// let participant_a = network.connected().await?; /// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?; /// let mut stream_a = participant_a.opened().await?;
/// //Send Message /// //Send Message
@ -746,8 +763,8 @@ impl Stream {
/// # let remote1_p = remote1.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; /// # let remote1_p = remote1.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
/// # let remote2_p = remote2.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; /// # let remote2_p = remote2.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
/// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid()); /// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid());
/// # remote1_p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; /// # remote1_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
/// # remote2_p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; /// # remote2_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
/// let participant_a = network.connected().await?; /// let participant_a = network.connected().await?;
/// let participant_b = network.connected().await?; /// let participant_b = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?; /// let mut stream_a = participant_a.opened().await?;
@ -801,7 +818,7 @@ impl Stream {
/// runtime.block_on(async { /// runtime.block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?; /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
/// # stream_p.send("Hello World"); /// # stream_p.send("Hello World");
/// let participant_a = network.connected().await?; /// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?; /// let mut stream_a = participant_a.opened().await?;
@ -834,7 +851,7 @@ impl Stream {
/// runtime.block_on(async { /// runtime.block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?; /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
/// # stream_p.send("Hello World"); /// # stream_p.send("Hello World");
/// let participant_a = network.connected().await?; /// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?; /// let mut stream_a = participant_a.opened().await?;
@ -889,7 +906,7 @@ impl Stream {
/// runtime.block_on(async { /// runtime.block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?; /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
/// # stream_p.send("Hello World"); /// # stream_p.send("Hello World");
/// # std::thread::sleep(std::time::Duration::from_secs(1)); /// # std::thread::sleep(std::time::Duration::from_secs(1));
/// let participant_a = network.connected().await?; /// let participant_a = network.connected().await?;
@ -1138,6 +1155,18 @@ impl core::fmt::Display for NetworkError {
} }
} }
impl core::fmt::Display for NetworkConnectError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
NetworkConnectError::Io(e) => write!(f, "Io error: {}", e),
NetworkConnectError::Handshake(e) => write!(f, "Handshake error: {}", e),
NetworkConnectError::InvalidSecret => {
write!(f, "You specified the wrong secret on your second channel")
},
}
}
}
/// implementing PartialEq as it's super convenient in tests /// implementing PartialEq as it's super convenient in tests
impl core::cmp::PartialEq for StreamError { impl core::cmp::PartialEq for StreamError {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
@ -1168,3 +1197,4 @@ impl core::cmp::PartialEq for StreamError {
impl std::error::Error for StreamError {} impl std::error::Error for StreamError {}
impl std::error::Error for ParticipantError {} impl std::error::Error for ParticipantError {}
impl std::error::Error for NetworkError {} impl std::error::Error for NetworkError {}
impl std::error::Error for NetworkConnectError {}

View File

@ -222,7 +222,6 @@ mod tests {
s.send(event.clone()).await.unwrap(); s.send(event.clone()).await.unwrap();
s.send(ProtocolEvent::Message { s.send(ProtocolEvent::Message {
sid: Sid::new(1), sid: Sid::new(1),
mid: 0,
data: Bytes::from(&[8u8; 8][..]), data: Bytes::from(&[8u8; 8][..]),
}) })
.await .await

View File

@ -51,7 +51,7 @@
//! .connect(ProtocolAddr::Tcp("127.0.0.1:12345".parse().unwrap())) //! .connect(ProtocolAddr::Tcp("127.0.0.1:12345".parse().unwrap()))
//! .await?; //! .await?;
//! let mut stream = server //! let mut stream = server
//! .open(4, Promises::ORDERED | Promises::CONSISTENCY) //! .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
//! .await?; //! .await?;
//! stream.send("Hello World")?; //! stream.send("Hello World")?;
//! Ok(()) //! Ok(())
@ -107,7 +107,8 @@ mod participant;
mod scheduler; mod scheduler;
pub use api::{ pub use api::{
Network, NetworkError, Participant, ParticipantError, ProtocolAddr, Stream, StreamError, Network, NetworkConnectError, NetworkError, Participant, ParticipantError, ProtocolAddr,
Stream, StreamError,
}; };
pub use message::Message; pub use message::Message;
pub use network_protocol::{Pid, Promises}; pub use network_protocol::{InitProtocolError, Pid, Promises};

View File

@ -83,7 +83,7 @@ impl Message {
/// # runtime.block_on(async { /// # runtime.block_on(async {
/// # network.listen(ProtocolAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?; /// # network.listen(ProtocolAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?;
/// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY).await?; /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
/// # stream_p.send("Hello World"); /// # stream_p.send("Hello World");
/// # let participant_a = network.connected().await?; /// # let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?; /// let mut stream_a = participant_a.opened().await?;

View File

@ -51,12 +51,6 @@ struct ControlChannels {
s2b_shutdown_bparticipant_r: oneshot::Receiver<S2bShutdownBparticipant>, /* own */ s2b_shutdown_bparticipant_r: oneshot::Receiver<S2bShutdownBparticipant>, /* own */
} }
#[derive(Debug)]
struct ShutdownInfo {
b2b_close_stream_opened_sender_s: Option<oneshot::Sender<()>>,
error: Option<ParticipantError>,
}
#[derive(Debug)] #[derive(Debug)]
struct OpenStreamInfo { struct OpenStreamInfo {
a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>, a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
@ -74,7 +68,6 @@ pub struct BParticipant {
run_channels: Option<ControlChannels>, run_channels: Option<ControlChannels>,
shutdown_barrier: AtomicI32, shutdown_barrier: AtomicI32,
metrics: Arc<NetworkMetrics>, metrics: Arc<NetworkMetrics>,
no_channel_error_info: RwLock<(Instant, u64)>,
open_stream_channels: Arc<Mutex<Option<OpenStreamInfo>>>, open_stream_channels: Arc<Mutex<Option<OpenStreamInfo>>>,
} }
@ -84,7 +77,7 @@ impl BParticipant {
const BARR_RECV: i32 = 4; const BARR_RECV: i32 = 4;
const BARR_SEND: i32 = 2; const BARR_SEND: i32 = 2;
const TICK_TIME: Duration = Duration::from_millis(Self::TICK_TIME_MS); const TICK_TIME: Duration = Duration::from_millis(Self::TICK_TIME_MS);
const TICK_TIME_MS: u64 = 10; const TICK_TIME_MS: u64 = 5;
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
pub(crate) fn new( pub(crate) fn new(
@ -124,7 +117,6 @@ impl BParticipant {
), ),
run_channels, run_channels,
metrics, metrics,
no_channel_error_info: RwLock::new((Instant::now(), 0)),
open_stream_channels: Arc::new(Mutex::new(None)), open_stream_channels: Arc::new(Mutex::new(None)),
}, },
a2b_open_stream_s, a2b_open_stream_s,
@ -203,7 +195,6 @@ impl BParticipant {
let mut interval = tokio::time::interval(Self::TICK_TIME); let mut interval = tokio::time::interval(Self::TICK_TIME);
let mut last_instant = Instant::now(); let mut last_instant = Instant::now();
let mut stream_ids = self.offset_sid; let mut stream_ids = self.offset_sid;
let mut fake_mid = 0; //TODO: move MID to protocol, should be inc per stream ? or ?
trace!("workaround, actively wait for first protocol"); trace!("workaround, actively wait for first protocol");
b2b_add_protocol_r b2b_add_protocol_r
.recv() .recv()
@ -267,13 +258,8 @@ impl BParticipant {
// get all messages and assign it to a channel // get all messages and assign it to a channel
for (sid, buffer) in a2b_msg_r.try_iter() { for (sid, buffer) in a2b_msg_r.try_iter() {
fake_mid += 1;
active active
.send(ProtocolEvent::Message { .send(ProtocolEvent::Message { data: buffer, sid })
data: buffer,
mid: fake_mid,
sid,
})
.await? .await?
} }
@ -416,7 +402,7 @@ impl BParticipant {
self.delete_stream(sid).await; self.delete_stream(sid).await;
retrigger(cid, p, &mut recv_protocols); retrigger(cid, p, &mut recv_protocols);
}, },
Ok(ProtocolEvent::Message { data, mid: _, sid }) => { Ok(ProtocolEvent::Message { data, sid }) => {
let lock = self.streams.read().await; let lock = self.streams.read().await;
match lock.get(&sid) { match lock.get(&sid) {
Some(stream) => { Some(stream) => {

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
api::{Participant, ProtocolAddr}, api::{NetworkConnectError, Participant, ProtocolAddr},
channel::Protocols, channel::Protocols,
metrics::NetworkMetrics, metrics::NetworkMetrics,
participant::{B2sPrioStatistic, BParticipant, S2bCreateChannel, S2bShutdownBparticipant}, participant::{B2sPrioStatistic, BParticipant, S2bCreateChannel, S2bShutdownBparticipant},
@ -47,7 +47,10 @@ struct ParticipantInfo {
} }
type A2sListen = (ProtocolAddr, oneshot::Sender<io::Result<()>>); type A2sListen = (ProtocolAddr, oneshot::Sender<io::Result<()>>);
type A2sConnect = (ProtocolAddr, oneshot::Sender<io::Result<Participant>>); pub(crate) type A2sConnect = (
ProtocolAddr,
oneshot::Sender<Result<Participant, NetworkConnectError>>,
);
type A2sDisconnect = (Pid, S2bShutdownBparticipant); type A2sDisconnect = (Pid, S2bShutdownBparticipant);
type S2sMpscConnect = ( type S2sMpscConnect = (
mpsc::Sender<MpscMsg>, mpsc::Sender<MpscMsg>,
@ -196,13 +199,7 @@ impl Scheduler {
trace!("Stop listen_mgr"); trace!("Stop listen_mgr");
} }
async fn connect_mgr( async fn connect_mgr(&self, mut a2s_connect_r: mpsc::UnboundedReceiver<A2sConnect>) {
&self,
mut a2s_connect_r: mpsc::UnboundedReceiver<(
ProtocolAddr,
oneshot::Sender<io::Result<Participant>>,
)>,
) {
trace!("Start connect_mgr"); trace!("Start connect_mgr");
while let Some((addr, pid_sender)) = a2s_connect_r.recv().await { while let Some((addr, pid_sender)) = a2s_connect_r.recv().await {
let (protocol, cid, handshake) = match addr { let (protocol, cid, handshake) = match addr {
@ -215,7 +212,7 @@ impl Scheduler {
let stream = match net::TcpStream::connect(addr).await { let stream = match net::TcpStream::connect(addr).await {
Ok(stream) => stream, Ok(stream) => stream,
Err(e) => { Err(e) => {
pid_sender.send(Err(e)).unwrap(); pid_sender.send(Err(NetworkConnectError::Io(e))).unwrap();
continue; continue;
}, },
}; };
@ -232,10 +229,10 @@ impl Scheduler {
Some(s) => s.clone(), Some(s) => s.clone(),
None => { None => {
pid_sender pid_sender
.send(Err(std::io::Error::new( .send(Err(NetworkConnectError::Io(std::io::Error::new(
std::io::ErrorKind::NotConnected, std::io::ErrorKind::NotConnected,
"no mpsc listen on this addr", "no mpsc listen on this addr",
))) ))))
.unwrap(); .unwrap();
continue; continue;
}, },
@ -543,7 +540,7 @@ impl Scheduler {
&self, &self,
mut protocol: Protocols, mut protocol: Protocols,
cid: Cid, cid: Cid,
s2a_return_pid_s: Option<oneshot::Sender<io::Result<Participant>>>, s2a_return_pid_s: Option<oneshot::Sender<Result<Participant, NetworkConnectError>>>,
send_handshake: bool, send_handshake: bool,
) { ) {
//channels are unknown till PID is known! //channels are unknown till PID is known!
@ -647,10 +644,7 @@ impl Scheduler {
if let Some(pid_oneshot) = s2a_return_pid_s { if let Some(pid_oneshot) = s2a_return_pid_s {
// someone is waiting with `connect`, so give them their Error // someone is waiting with `connect`, so give them their Error
pid_oneshot pid_oneshot
.send(Err(std::io::Error::new( .send(Err(NetworkConnectError::InvalidSecret))
std::io::ErrorKind::PermissionDenied,
"invalid secret, denying connection",
)))
.unwrap(); .unwrap();
} }
return; return;
@ -670,10 +664,7 @@ impl Scheduler {
// someone is waiting with `connect`, so give them their Error // someone is waiting with `connect`, so give them their Error
trace!(?cid, "returning the Err to api who requested the connect"); trace!(?cid, "returning the Err to api who requested the connect");
pid_oneshot pid_oneshot
.send(Err(std::io::Error::new( .send(Err(NetworkConnectError::Handshake(e)))
std::io::ErrorKind::PermissionDenied,
"Handshake failed, denying connection",
)))
.unwrap(); .unwrap();
} }
}, },

View File

@ -230,7 +230,7 @@ fn close_network_then_disconnect_part() {
fn opened_stream_before_remote_part_is_closed() { fn opened_stream_before_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp());
let mut s2_a = r.block_on(p_a.open(4, Promises::empty())).unwrap(); let mut s2_a = r.block_on(p_a.open(4, Promises::empty(), 0)).unwrap();
s2_a.send("HelloWorld").unwrap(); s2_a.send("HelloWorld").unwrap();
let mut s2_b = r.block_on(p_b.opened()).unwrap(); let mut s2_b = r.block_on(p_b.opened()).unwrap();
drop(p_a); drop(p_a);
@ -243,7 +243,7 @@ fn opened_stream_before_remote_part_is_closed() {
fn opened_stream_after_remote_part_is_closed() { fn opened_stream_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp());
let mut s2_a = r.block_on(p_a.open(3, Promises::empty())).unwrap(); let mut s2_a = r.block_on(p_a.open(3, Promises::empty(), 0)).unwrap();
s2_a.send("HelloWorld").unwrap(); s2_a.send("HelloWorld").unwrap();
drop(p_a); drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000)); std::thread::sleep(std::time::Duration::from_millis(1000));
@ -260,14 +260,14 @@ fn opened_stream_after_remote_part_is_closed() {
fn open_stream_after_remote_part_is_closed() { fn open_stream_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp());
let mut s2_a = r.block_on(p_a.open(4, Promises::empty())).unwrap(); let mut s2_a = r.block_on(p_a.open(4, Promises::empty(), 0)).unwrap();
s2_a.send("HelloWorld").unwrap(); s2_a.send("HelloWorld").unwrap();
drop(p_a); drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000)); std::thread::sleep(std::time::Duration::from_millis(1000));
let mut s2_b = r.block_on(p_b.opened()).unwrap(); let mut s2_b = r.block_on(p_b.opened()).unwrap();
assert_eq!(r.block_on(s2_b.recv()), Ok("HelloWorld".to_string())); assert_eq!(r.block_on(s2_b.recv()), Ok("HelloWorld".to_string()));
assert_eq!( assert_eq!(
r.block_on(p_b.open(5, Promises::empty())).unwrap_err(), r.block_on(p_b.open(5, Promises::empty(), 0)).unwrap_err(),
ParticipantError::ParticipantDisconnected ParticipantError::ParticipantDisconnected
); );
drop((_n_a, _n_b, p_b)); //clean teardown drop((_n_a, _n_b, p_b)); //clean teardown
@ -294,7 +294,7 @@ fn open_participant_before_remote_part_is_closed() {
let addr = tcp(); let addr = tcp();
r.block_on(n_a.listen(addr.clone())).unwrap(); r.block_on(n_a.listen(addr.clone())).unwrap();
let p_b = r.block_on(n_b.connect(addr)).unwrap(); let p_b = r.block_on(n_b.connect(addr)).unwrap();
let mut s1_b = r.block_on(p_b.open(4, Promises::empty())).unwrap(); let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap();
s1_b.send("HelloWorld").unwrap(); s1_b.send("HelloWorld").unwrap();
let p_a = r.block_on(n_a.connected()).unwrap(); let p_a = r.block_on(n_a.connected()).unwrap();
drop(s1_b); drop(s1_b);
@ -314,7 +314,7 @@ fn open_participant_after_remote_part_is_closed() {
let addr = tcp(); let addr = tcp();
r.block_on(n_a.listen(addr.clone())).unwrap(); r.block_on(n_a.listen(addr.clone())).unwrap();
let p_b = r.block_on(n_b.connect(addr)).unwrap(); let p_b = r.block_on(n_b.connect(addr)).unwrap();
let mut s1_b = r.block_on(p_b.open(4, Promises::empty())).unwrap(); let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap();
s1_b.send("HelloWorld").unwrap(); s1_b.send("HelloWorld").unwrap();
drop(s1_b); drop(s1_b);
drop(p_b); drop(p_b);
@ -334,7 +334,7 @@ fn close_network_scheduler_completely() {
let addr = tcp(); let addr = tcp();
r.block_on(n_a.listen(addr.clone())).unwrap(); r.block_on(n_a.listen(addr.clone())).unwrap();
let p_b = r.block_on(n_b.connect(addr)).unwrap(); let p_b = r.block_on(n_b.connect(addr)).unwrap();
let mut s1_b = r.block_on(p_b.open(4, Promises::empty())).unwrap(); let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap();
s1_b.send("HelloWorld").unwrap(); s1_b.send("HelloWorld").unwrap();
let p_a = r.block_on(n_a.connected()).unwrap(); let p_a = r.block_on(n_a.connected()).unwrap();

View File

@ -22,7 +22,6 @@ pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) {
let _subscriber = if tracing { let _subscriber = if tracing {
let filter = EnvFilter::from_default_env() let filter = EnvFilter::from_default_env()
.add_directive("trace".parse().unwrap()) .add_directive("trace".parse().unwrap())
.add_directive("async_std::task::block_on=warn".parse().unwrap())
.add_directive("veloren_network::tests=trace".parse().unwrap()) .add_directive("veloren_network::tests=trace".parse().unwrap())
.add_directive("veloren_network::controller=trace".parse().unwrap()) .add_directive("veloren_network::controller=trace".parse().unwrap())
.add_directive("veloren_network::channel=trace".parse().unwrap()) .add_directive("veloren_network::channel=trace".parse().unwrap())
@ -67,7 +66,7 @@ pub fn network_participant_stream(
let p1_b = n_b.connect(addr).await.unwrap(); let p1_b = n_b.connect(addr).await.unwrap();
let p1_a = n_a.connected().await.unwrap(); let p1_a = n_a.connected().await.unwrap();
let s1_a = p1_a.open(4, Promises::empty()).await.unwrap(); let s1_a = p1_a.open(4, Promises::empty(), 0).await.unwrap();
let s1_b = p1_b.opened().await.unwrap(); let s1_b = p1_b.opened().await.unwrap();
(n_a, p1_a, s1_a, n_b, p1_b, s1_b) (n_a, p1_a, s1_a, n_b, p1_b, s1_b)

View File

@ -177,7 +177,7 @@ fn api_stream_send_main() -> std::result::Result<(), Box<dyn std::error::Error>>
.await?; .await?;
// keep it alive // keep it alive
let _stream_p = remote_p let _stream_p = remote_p
.open(4, Promises::ORDERED | Promises::CONSISTENCY) .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
.await?; .await?;
let participant_a = network.connected().await?; let participant_a = network.connected().await?;
let mut stream_a = participant_a.opened().await?; let mut stream_a = participant_a.opened().await?;
@ -205,7 +205,7 @@ fn api_stream_recv_main() -> std::result::Result<(), Box<dyn std::error::Error>>
.connect(ProtocolAddr::Tcp("127.0.0.1:1220".parse().unwrap())) .connect(ProtocolAddr::Tcp("127.0.0.1:1220".parse().unwrap()))
.await?; .await?;
let mut stream_p = remote_p let mut stream_p = remote_p
.open(4, Promises::ORDERED | Promises::CONSISTENCY) .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
.await?; .await?;
stream_p.send("Hello World")?; stream_p.send("Hello World")?;
let participant_a = network.connected().await?; let participant_a = network.connected().await?;

View File

@ -16,7 +16,6 @@ pub fn init(basic: bool) {
let base_exceptions = |env: EnvFilter| { let base_exceptions = |env: EnvFilter| {
env.add_directive("veloren_world::sim=info".parse().unwrap()) env.add_directive("veloren_world::sim=info".parse().unwrap())
.add_directive("veloren_world::civ=info".parse().unwrap()) .add_directive("veloren_world::civ=info".parse().unwrap())
.add_directive("uvth=warn".parse().unwrap())
.add_directive("hyper=info".parse().unwrap()) .add_directive("hyper=info".parse().unwrap())
.add_directive("prometheus_hyper=info".parse().unwrap()) .add_directive("prometheus_hyper=info".parse().unwrap())
.add_directive("mio::pool=info".parse().unwrap()) .add_directive("mio::pool=info".parse().unwrap())

View File

@ -23,11 +23,7 @@ specs-idvs = { git = "https://gitlab.com/veloren/specs-idvs.git", rev = "9fab7b3
tracing = "0.1" tracing = "0.1"
vek = { version = "0.12.0", features = ["serde"] } vek = { version = "0.12.0", features = ["serde"] }
uvth = "3.1.1"
futures-util = "0.3.7" futures-util = "0.3.7"
futures-executor = "0.3"
futures-timer = "3.0"
futures-channel = "0.3"
tokio = { version = "1", default-features = false, features = ["rt"] } tokio = { version = "1", default-features = false, features = ["rt"] }
prometheus-hyper = "0.1.1" prometheus-hyper = "0.1.1"
itertools = "0.9" itertools = "0.9"

View File

@ -8,6 +8,7 @@ use std::sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Arc, Arc,
}; };
use tokio::runtime::Runtime;
use vek::*; use vek::*;
#[cfg(feature = "worldgen")] #[cfg(feature = "worldgen")]
use world::{IndexOwned, World}; use world::{IndexOwned, World};
@ -39,7 +40,7 @@ impl ChunkGenerator {
&mut self, &mut self,
entity: Option<EcsEntity>, entity: Option<EcsEntity>,
key: Vec2<i32>, key: Vec2<i32>,
thread_pool: &mut uvth::ThreadPool, runtime: &Runtime,
world: Arc<World>, world: Arc<World>,
index: IndexOwned, index: IndexOwned,
) { ) {
@ -52,7 +53,7 @@ impl ChunkGenerator {
v.insert(Arc::clone(&cancel)); v.insert(Arc::clone(&cancel));
let chunk_tx = self.chunk_tx.clone(); let chunk_tx = self.chunk_tx.clone();
self.metrics.chunks_requested.inc(); self.metrics.chunks_requested.inc();
thread_pool.execute(move || { runtime.spawn_blocking(move || {
let index = index.as_index_ref(); let index = index.as_index_ref();
let payload = world let payload = world
.generate_chunk(index, key, || cancel.load(Ordering::Relaxed)) .generate_chunk(index, key, || cancel.load(Ordering::Relaxed))

View File

@ -1,11 +1,9 @@
use crate::{Client, ClientType, ServerInfo}; use crate::{Client, ClientType, ServerInfo};
use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use futures_channel::oneshot; use futures_util::future::FutureExt;
use futures_executor::block_on;
use futures_timer::Delay;
use futures_util::{select, FutureExt};
use network::{Network, Participant, Promises}; use network::{Network, Participant, Promises};
use std::{sync::Arc, thread, time::Duration}; use std::{sync::Arc, time::Duration};
use tokio::{runtime::Runtime, select, sync::oneshot};
use tracing::{debug, error, trace, warn}; use tracing::{debug, error, trace, warn};
pub(crate) struct ServerInfoPacket { pub(crate) struct ServerInfoPacket {
@ -17,7 +15,7 @@ pub(crate) type IncomingClient = Client;
pub(crate) struct ConnectionHandler { pub(crate) struct ConnectionHandler {
_network: Arc<Network>, _network: Arc<Network>,
thread_handle: Option<thread::JoinHandle<()>>, thread_handle: Option<tokio::task::JoinHandle<()>>,
pub client_receiver: Receiver<IncomingClient>, pub client_receiver: Receiver<IncomingClient>,
pub info_requester_receiver: Receiver<Sender<ServerInfoPacket>>, pub info_requester_receiver: Receiver<Sender<ServerInfoPacket>>,
stop_sender: Option<oneshot::Sender<()>>, stop_sender: Option<oneshot::Sender<()>>,
@ -28,7 +26,7 @@ pub(crate) struct ConnectionHandler {
/// to the Server main thread sometimes though to get the current server_info /// to the Server main thread sometimes though to get the current server_info
/// and time /// and time
impl ConnectionHandler { impl ConnectionHandler {
pub fn new(network: Network) -> Self { pub fn new(network: Network, runtime: &Runtime) -> Self {
let network = Arc::new(network); let network = Arc::new(network);
let network_clone = Arc::clone(&network); let network_clone = Arc::clone(&network);
let (stop_sender, stop_receiver) = oneshot::channel(); let (stop_sender, stop_receiver) = oneshot::channel();
@ -37,14 +35,12 @@ impl ConnectionHandler {
let (info_requester_sender, info_requester_receiver) = let (info_requester_sender, info_requester_receiver) =
bounded::<Sender<ServerInfoPacket>>(1); bounded::<Sender<ServerInfoPacket>>(1);
let thread_handle = Some(thread::spawn(|| { let thread_handle = Some(runtime.spawn(Self::work(
block_on(Self::work(
network_clone, network_clone,
client_sender, client_sender,
info_requester_sender, info_requester_sender,
stop_receiver, stop_receiver,
)); )));
}));
Self { Self {
_network: network, _network: network,
@ -64,7 +60,7 @@ impl ConnectionHandler {
let mut stop_receiver = stop_receiver.fuse(); let mut stop_receiver = stop_receiver.fuse();
loop { loop {
let participant = match select!( let participant = match select!(
_ = stop_receiver => None, _ = &mut stop_receiver => None,
p = network.connected().fuse() => Some(p), p = network.connected().fuse() => Some(p),
) { ) {
None => break, None => break,
@ -82,7 +78,7 @@ impl ConnectionHandler {
let info_requester_sender = info_requester_sender.clone(); let info_requester_sender = info_requester_sender.clone();
match select!( match select!(
_ = stop_receiver => None, _ = &mut stop_receiver => None,
e = Self::init_participant(participant, client_sender, info_requester_sender).fuse() => Some(e), e = Self::init_participant(participant, client_sender, info_requester_sender).fuse() => Some(e),
) { ) {
None => break, None => break,
@ -104,11 +100,11 @@ impl ConnectionHandler {
let reliable = Promises::ORDERED | Promises::CONSISTENCY; let reliable = Promises::ORDERED | Promises::CONSISTENCY;
let reliablec = reliable | Promises::COMPRESSED; let reliablec = reliable | Promises::COMPRESSED;
let general_stream = participant.open(3, reliablec).await?; let general_stream = participant.open(3, reliablec, 500).await?;
let ping_stream = participant.open(2, reliable).await?; let ping_stream = participant.open(2, reliable, 500).await?;
let mut register_stream = participant.open(3, reliablec).await?; let mut register_stream = participant.open(3, reliablec, 0).await?;
let character_screen_stream = participant.open(3, reliablec).await?; let character_screen_stream = participant.open(3, reliablec, 0).await?;
let in_game_stream = participant.open(3, reliablec).await?; let in_game_stream = participant.open(3, reliablec, 400_000).await?;
let server_data = receiver.recv()?; let server_data = receiver.recv()?;
@ -116,7 +112,7 @@ impl ConnectionHandler {
const TIMEOUT: Duration = Duration::from_secs(5); const TIMEOUT: Duration = Duration::from_secs(5);
let client_type = match select!( let client_type = match select!(
_ = Delay::new(TIMEOUT).fuse() => None, _ = tokio::time::sleep(TIMEOUT).fuse() => None,
t = register_stream.recv::<ClientType>().fuse() => Some(t), t = register_stream.recv::<ClientType>().fuse() => Some(t),
) { ) {
None => { None => {
@ -145,12 +141,8 @@ impl ConnectionHandler {
impl Drop for ConnectionHandler { impl Drop for ConnectionHandler {
fn drop(&mut self) { fn drop(&mut self) {
let _ = self.stop_sender.take().unwrap().send(()); let _ = self.stop_sender.take().unwrap().send(());
trace!("blocking till ConnectionHandler is closed"); trace!("aborting ConnectionHandler");
self.thread_handle self.thread_handle.take().unwrap().abort();
.take() trace!("aborted ConnectionHandler!");
.unwrap()
.join()
.expect("There was an error in ConnectionHandler, clean shutdown impossible");
trace!("gracefully closed ConnectionHandler!");
} }
} }

View File

@ -11,9 +11,8 @@ use common::{
}; };
use common_net::msg::{PlayerListUpdate, PresenceKind, ServerGeneral}; use common_net::msg::{PlayerListUpdate, PresenceKind, ServerGeneral};
use common_sys::state::State; use common_sys::state::State;
use futures_executor::block_on;
use specs::{saveload::MarkerAllocator, Builder, Entity as EcsEntity, WorldExt}; use specs::{saveload::MarkerAllocator, Builder, Entity as EcsEntity, WorldExt};
use tracing::{debug, error, trace, warn}; use tracing::{debug, error, trace, warn, Instrument};
pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) { pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) {
span!(_guard, "handle_exit_ingame"); span!(_guard, "handle_exit_ingame");
@ -107,26 +106,26 @@ pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event
{ {
let participant = client.participant.take().unwrap(); let participant = client.participant.take().unwrap();
let pid = participant.remote_pid(); let pid = participant.remote_pid();
std::thread::spawn(move || { server.runtime.spawn(
let span = tracing::span!(tracing::Level::DEBUG, "client_disconnect", ?pid, ?entity); async {
let _enter = span.enter();
let now = std::time::Instant::now(); let now = std::time::Instant::now();
debug!(?pid, ?entity, "Start handle disconnect of client"); debug!("Start handle disconnect of client");
if let Err(e) = block_on(participant.disconnect()) { if let Err(e) = participant.disconnect().await {
debug!( debug!(
?e, ?e,
?pid,
"Error when disconnecting client, maybe the pipe already broke" "Error when disconnecting client, maybe the pipe already broke"
); );
}; };
trace!(?pid, "finished disconnect"); trace!("finished disconnect");
let elapsed = now.elapsed(); let elapsed = now.elapsed();
if elapsed.as_millis() > 100 { if elapsed.as_millis() > 100 {
warn!(?elapsed, ?pid, "disconnecting took quite long"); warn!(?elapsed, "disconnecting took quite long");
} else { } else {
debug!(?elapsed, ?pid, "disconnecting took"); debug!(?elapsed, "disconnecting took");
} }
}); }
.instrument(tracing::debug_span!("client_disconnect", ?pid, ?entity)),
);
} }
let state = server.state_mut(); let state = server.state_mut();

View File

@ -75,7 +75,6 @@ use common_net::{
#[cfg(feature = "plugins")] #[cfg(feature = "plugins")]
use common_sys::plugin::PluginMgr; use common_sys::plugin::PluginMgr;
use common_sys::state::State; use common_sys::state::State;
use futures_executor::block_on;
use metrics::{PhysicsMetrics, StateTickMetrics, TickMetrics}; use metrics::{PhysicsMetrics, StateTickMetrics, TickMetrics};
use network::{Network, Pid, ProtocolAddr}; use network::{Network, Pid, ProtocolAddr};
use persistence::{ use persistence::{
@ -95,7 +94,6 @@ use std::{
use test_world::{IndexOwned, World}; use test_world::{IndexOwned, World};
use tokio::{runtime::Runtime, sync::Notify}; use tokio::{runtime::Runtime, sync::Notify};
use tracing::{debug, error, info, trace}; use tracing::{debug, error, info, trace};
use uvth::{ThreadPool, ThreadPoolBuilder};
use vek::*; use vek::*;
#[cfg(feature = "worldgen")] #[cfg(feature = "worldgen")]
@ -123,8 +121,7 @@ pub struct Server {
connection_handler: ConnectionHandler, connection_handler: ConnectionHandler,
_runtime: Arc<Runtime>, runtime: Arc<Runtime>,
thread_pool: ThreadPool,
metrics_shutdown: Arc<Notify>, metrics_shutdown: Arc<Notify>,
tick_metrics: TickMetrics, tick_metrics: TickMetrics,
@ -366,9 +363,6 @@ impl Server {
registry_state(&registry).expect("failed to register state metrics"); registry_state(&registry).expect("failed to register state metrics");
registry_physics(&registry).expect("failed to register state metrics"); registry_physics(&registry).expect("failed to register state metrics");
let thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".to_string())
.build();
let network = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &registry); let network = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &registry);
let metrics_shutdown = Arc::new(Notify::new()); let metrics_shutdown = Arc::new(Notify::new());
let metrics_shutdown_clone = Arc::clone(&metrics_shutdown); let metrics_shutdown_clone = Arc::clone(&metrics_shutdown);
@ -381,8 +375,8 @@ impl Server {
) )
.await .await
}); });
block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?; runtime.block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?;
let connection_handler = ConnectionHandler::new(network); let connection_handler = ConnectionHandler::new(network, &runtime);
// Initiate real-time world simulation // Initiate real-time world simulation
#[cfg(feature = "worldgen")] #[cfg(feature = "worldgen")]
@ -397,9 +391,7 @@ impl Server {
map, map,
connection_handler, connection_handler,
runtime,
_runtime: runtime,
thread_pool,
metrics_shutdown, metrics_shutdown,
tick_metrics, tick_metrics,
@ -431,11 +423,6 @@ impl Server {
} }
} }
pub fn with_thread_pool(mut self, thread_pool: ThreadPool) -> Self {
self.thread_pool = thread_pool;
self
}
/// Get a reference to the server's settings /// Get a reference to the server's settings
pub fn settings(&self) -> impl Deref<Target = Settings> + '_ { pub fn settings(&self) -> impl Deref<Target = Settings> + '_ {
self.state.ecs().fetch::<Settings>() self.state.ecs().fetch::<Settings>()
@ -676,7 +663,7 @@ impl Server {
// only work we do here on the fast path is perform a relaxed read on an atomic. // only work we do here on the fast path is perform a relaxed read on an atomic.
// boolean. // boolean.
let index = &mut self.index; let index = &mut self.index;
let thread_pool = &mut self.thread_pool; let runtime = &mut self.runtime;
let world = &mut self.world; let world = &mut self.world;
let ecs = self.state.ecs_mut(); let ecs = self.state.ecs_mut();
@ -697,7 +684,7 @@ impl Server {
chunk_generator.generate_chunk( chunk_generator.generate_chunk(
None, None,
pos, pos,
thread_pool, runtime,
Arc::clone(&world), Arc::clone(&world),
index.clone(), index.clone(),
); );
@ -1021,7 +1008,7 @@ impl Server {
.generate_chunk( .generate_chunk(
Some(entity), Some(entity),
key, key,
&mut self.thread_pool, &self.runtime,
Arc::clone(&self.world), Arc::clone(&self.world),
self.index.clone(), self.index.clone(),
); );

View File

@ -73,12 +73,13 @@ impl CharacterLoader {
let mut conn = establish_connection(db_dir)?; let mut conn = establish_connection(db_dir)?;
std::thread::spawn(move || { let builder = std::thread::Builder::new().name("persistence_loader".into());
builder
.spawn(move || {
for request in internal_rx { for request in internal_rx {
let (entity, kind) = request; let (entity, kind) = request;
if let Err(e) = if let Err(e) = internal_tx.send(CharacterLoaderResponse {
internal_tx.send(CharacterLoaderResponse {
entity, entity,
result: match kind { result: match kind {
CharacterLoaderRequestKind::CreateCharacter { CharacterLoaderRequestKind::CreateCharacter {
@ -123,12 +124,12 @@ impl CharacterLoader {
CharacterLoaderResponseKind::CharacterData(Box::new(result)) CharacterLoaderResponseKind::CharacterData(Box::new(result))
}, },
}, },
}) }) {
{
error!(?e, "Could not send send persistence request"); error!(?e, "Could not send send persistence request");
} }
} }
}); })
.unwrap();
Ok(Self { Ok(Self {
update_tx, update_tx,

View File

@ -24,13 +24,16 @@ impl CharacterUpdater {
let mut conn = establish_connection(db_dir)?; let mut conn = establish_connection(db_dir)?;
let handle = std::thread::spawn(move || { let builder = std::thread::Builder::new().name("persistence_updater".into());
let handle = builder
.spawn(move || {
while let Ok(updates) = update_rx.recv() { while let Ok(updates) = update_rx.recv() {
trace!("Persistence batch update starting"); trace!("Persistence batch update starting");
execute_batch_update(updates, &mut conn); execute_batch_update(updates, &mut conn);
trace!("Persistence batch update finished"); trace!("Persistence batch update finished");
} }
}); })
.unwrap();
Ok(Self { Ok(Self {
update_tx: Some(update_tx), update_tx: Some(update_tx),

View File

@ -81,7 +81,6 @@ rodio = {version = "0.13", default-features = false, features = ["wav", "vorbis"
ron = {version = "0.6", default-features = false} ron = {version = "0.6", default-features = false}
serde = {version = "1.0", features = [ "rc", "derive" ]} serde = {version = "1.0", features = [ "rc", "derive" ]}
treeculler = "0.1.0" treeculler = "0.1.0"
uvth = "3.1.1"
tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] } tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] }
num_cpus = "1.0" num_cpus = "1.0"
# vec_map = { version = "0.8.2" } # vec_map = { version = "0.8.2" }

View File

@ -22,14 +22,14 @@ const RUST_LOG_ENV: &str = "RUST_LOG";
/// `RUST_LOG="veloren_voxygen=trace"` /// `RUST_LOG="veloren_voxygen=trace"`
/// ///
/// more complex tracing can be done by concatenating with a `,` as seperator: /// more complex tracing can be done by concatenating with a `,` as seperator:
/// - warn for `uvth`, `tiny_http`, `dot_vox`, `gfx_device_gl::factory, /// - warn for `prometheus_hyper`, `dot_vox`, `gfx_device_gl::factory,
/// `gfx_device_gl::shade` trace for `veloren_voxygen`, info for everything /// `gfx_device_gl::shade` trace for `veloren_voxygen`, info for everything
/// else /// else
/// `RUST_LOG="uvth=warn,tiny_http=warn,dot_vox::parser=warn,gfx_device_gl:: /// `RUST_LOG="prometheus_hyper=warn,dot_vox::parser=warn,gfx_device_gl::
/// factory=warn,gfx_device_gl::shade=warn,veloren_voxygen=trace,info"` /// factory=warn,gfx_device_gl::shade=warn,veloren_voxygen=trace,info"`
/// ///
/// By default a few directives are set to `warn` by default, until explicitly /// By default a few directives are set to `warn` by default, until explicitly
/// overwritten! e.g. `RUST_LOG="uvth=debug"` /// overwritten! e.g. `RUST_LOG="gfx_device_gl=debug"`
pub fn init(settings: &Settings) -> Vec<impl Drop> { pub fn init(settings: &Settings) -> Vec<impl Drop> {
// To hold the guards that we create, they will cause the logs to be // To hold the guards that we create, they will cause the logs to be
// flushed when they're dropped. // flushed when they're dropped.
@ -42,8 +42,7 @@ pub fn init(settings: &Settings) -> Vec<impl Drop> {
let base_exceptions = |env: EnvFilter| { let base_exceptions = |env: EnvFilter| {
env.add_directive("dot_vox::parser=warn".parse().unwrap()) env.add_directive("dot_vox::parser=warn".parse().unwrap())
.add_directive("gfx_device_gl=warn".parse().unwrap()) .add_directive("gfx_device_gl=warn".parse().unwrap())
.add_directive("uvth=warn".parse().unwrap()) .add_directive("prometheus_hyper=warn".parse().unwrap())
.add_directive("tiny_http=warn".parse().unwrap())
.add_directive("mio::sys::windows=debug".parse().unwrap()) .add_directive("mio::sys::windows=debug".parse().unwrap())
.add_directive("veloren_network_protocol=info".parse().unwrap()) .add_directive("veloren_network_protocol=info".parse().unwrap())
.add_directive( .add_directive(

View File

@ -124,11 +124,16 @@ fn main() {
// On windows we need to spawn a thread as the msg doesn't work otherwise // On windows we need to spawn a thread as the msg doesn't work otherwise
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
std::thread::spawn(move || { {
let builder = std::thread::Builder::new().name("shutdown".into());
builder
.spawn(move || {
mbox(); mbox();
}) })
.unwrap()
.join() .join()
.unwrap(); .unwrap();
}
#[cfg(not(target_os = "windows"))] #[cfg(not(target_os = "windows"))]
mbox(); mbox();

View File

@ -147,7 +147,7 @@ impl PlayState for CharSelectionState {
time: client.state().get_time(), time: client.state().get_time(),
delta_time: client.state().ecs().read_resource::<DeltaTime>().0, delta_time: client.state().ecs().read_resource::<DeltaTime>().0,
tick: client.get_tick(), tick: client.get_tick(),
thread_pool: client.thread_pool(), runtime: client.runtime(),
body: humanoid_body, body: humanoid_body,
gamma: global_state.settings.graphics.gamma, gamma: global_state.settings.graphics.gamma,
exposure: global_state.settings.graphics.exposure, exposure: global_state.settings.graphics.exposure,

View File

@ -1,27 +1,23 @@
use client::{ use client::{
error::{Error as ClientError, NetworkError}, addr::ConnectionArgs,
error::{Error as ClientError, NetworkConnectError, NetworkError},
Client, Client,
}; };
use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError}; use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError};
use std::{ use std::{
net::ToSocketAddrs,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Arc,
}, },
thread,
time::Duration, time::Duration,
}; };
use tokio::runtime;
use tracing::{trace, warn}; use tracing::{trace, warn};
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
// Error parsing input string or error resolving host name.
BadAddress(std::io::Error),
// Parsing/host name resolution successful but there was an error within the client.
ClientError(ClientError),
// Parsing yielded an empty iterator (specifically to_socket_addrs()).
NoAddress, NoAddress,
ClientError(ClientError),
ClientCrashed, ClientCrashed,
} }
@ -31,6 +27,11 @@ pub enum Msg {
Done(Result<Client, Error>), Done(Result<Client, Error>),
} }
pub enum ClientConnArgs {
Host(String),
Resolved(ConnectionArgs),
}
pub struct AuthTrust(String, bool); pub struct AuthTrust(String, bool);
// Used to asynchronously parse the server address, resolve host names, // Used to asynchronously parse the server address, resolve host names,
@ -45,79 +46,81 @@ impl ClientInit {
#[allow(clippy::op_ref)] // TODO: Pending review in #587 #[allow(clippy::op_ref)] // TODO: Pending review in #587
#[allow(clippy::or_fun_call)] // TODO: Pending review in #587 #[allow(clippy::or_fun_call)] // TODO: Pending review in #587
pub fn new( pub fn new(
connection_args: (String, u16, bool), connection_args: ClientConnArgs,
username: String, username: String,
view_distance: Option<u32>, view_distance: Option<u32>,
password: String, password: String,
runtime: Option<Arc<runtime::Runtime>>,
) -> Self { ) -> Self {
let (server_address, default_port, prefer_ipv6) = connection_args;
let (tx, rx) = unbounded(); let (tx, rx) = unbounded();
let (trust_tx, trust_rx) = unbounded(); let (trust_tx, trust_rx) = unbounded();
let cancel = Arc::new(AtomicBool::new(false)); let cancel = Arc::new(AtomicBool::new(false));
let cancel2 = Arc::clone(&cancel); let cancel2 = Arc::clone(&cancel);
thread::spawn(move || { let runtime = runtime.unwrap_or_else(|| {
// Parse ip address or resolves hostname.
// Note: if you use an ipv6 address, the number after the last colon will be
// used as the port unless you use [] around the address.
match server_address
.to_socket_addrs()
.or((server_address.as_ref(), default_port).to_socket_addrs())
{
Ok(socket_address) => {
let (first_addrs, second_addrs) =
socket_address.partition::<Vec<_>, _>(|a| a.is_ipv6() == prefer_ipv6);
let mut last_err = None;
let cores = num_cpus::get(); let cores = num_cpus::get();
let runtime = Arc::new( Arc::new(
tokio::runtime::Builder::new_multi_thread() runtime::Builder::new_multi_thread()
.enable_all() .enable_all()
.worker_threads(if cores > 4 { cores - 1 } else { cores }) .worker_threads(if cores > 4 { cores - 1 } else { cores })
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("tokio-voxygen-{}", id)
})
.build() .build()
.unwrap(), .unwrap(),
); )
});
let runtime2 = Arc::clone(&runtime);
runtime.spawn(async move {
let trust_fn = |auth_server: &str| {
let _ = tx.send(Msg::IsAuthTrusted(auth_server.to_string()));
trust_rx
.recv()
.map(|AuthTrust(server, trust)| trust && &server == auth_server)
.unwrap_or(false)
};
let connection_args = match connection_args {
ClientConnArgs::Host(host) => match ConnectionArgs::resolve(&host, false).await {
Ok(r) => r,
Err(_) => {
let _ = tx.send(Msg::Done(Err(Error::NoAddress)));
tokio::task::block_in_place(move || drop(runtime2));
return;
},
},
ClientConnArgs::Resolved(r) => r,
};
let mut last_err = None;
const FOUR_MINUTES_RETRIES: u64 = 48; const FOUR_MINUTES_RETRIES: u64 = 48;
'tries: for _ in 0..FOUR_MINUTES_RETRIES { 'tries: for _ in 0..FOUR_MINUTES_RETRIES {
if cancel2.load(Ordering::Relaxed) { if cancel2.load(Ordering::Relaxed) {
break; break;
} }
for socket_addr in match Client::new(
first_addrs.clone().into_iter().chain(second_addrs.clone()) connection_args.clone(),
view_distance,
Arc::clone(&runtime2),
)
.await
{ {
match Client::new(socket_addr, view_distance, Arc::clone(&runtime)) {
Ok(mut client) => { Ok(mut client) => {
if let Err(e) = if let Err(e) = client.register(username, password, trust_fn).await {
client.register(username, password, |auth_server| {
let _ = tx
.send(Msg::IsAuthTrusted(auth_server.to_string()));
trust_rx
.recv()
.map(|AuthTrust(server, trust)| {
trust && &server == auth_server
})
.unwrap_or(false)
})
{
last_err = Some(Error::ClientError(e)); last_err = Some(Error::ClientError(e));
break 'tries; break 'tries;
} }
let _ = tx.send(Msg::Done(Ok(client))); let _ = tx.send(Msg::Done(Ok(client)));
return; return;
}, },
Err(ClientError::NetworkErr(NetworkError::ConnectFailed(e))) => { Err(ClientError::NetworkErr(NetworkError::ConnectFailed(
if e.kind() == std::io::ErrorKind::PermissionDenied { NetworkConnectError::Io(e),
warn!(?e, "Cannot connect to server: Incompatible version"); ))) => {
last_err = Some(Error::ClientError(
ClientError::NetworkErr(NetworkError::ConnectFailed(e)),
));
break 'tries;
} else {
warn!(?e, "Failed to connect to the server. Retrying..."); warn!(?e, "Failed to connect to the server. Retrying...");
}
}, },
Err(e) => { Err(e) => {
trace!(?e, "Aborting server connection attempt"); trace!(?e, "Aborting server connection attempt");
@ -125,17 +128,14 @@ impl ClientInit {
break 'tries; break 'tries;
}, },
} }
tokio::time::sleep(Duration::from_secs(5)).await;
} }
thread::sleep(Duration::from_secs(5));
}
// Parsing/host name resolution successful but no connection succeeded. // Parsing/host name resolution successful but no connection succeeded.
let _ = tx.send(Msg::Done(Err(last_err.unwrap_or(Error::NoAddress)))); let _ = tx.send(Msg::Done(Err(last_err.unwrap_or(Error::NoAddress))));
},
Err(err) => { // Safe drop runtime
// Error parsing input string or error resolving host name. tokio::task::block_in_place(move || drop(runtime2));
let _ = tx.send(Msg::Done(Err(Error::BadAddress(err))));
},
}
}); });
ClientInit { ClientInit {

View File

@ -11,8 +11,14 @@ use crate::{
window::Event, window::Event,
Direction, GlobalState, PlayState, PlayStateResult, Direction, GlobalState, PlayState, PlayStateResult,
}; };
use client_init::{ClientInit, Error as InitError, Msg as InitMsg}; use client::{
addr::ConnectionArgs,
error::{InitProtocolError, NetworkConnectError, NetworkError},
};
use client_init::{ClientConnArgs, ClientInit, Error as InitError, Msg as InitMsg};
use common::{assets::AssetExt, comp, span}; use common::{assets::AssetExt, comp, span};
use std::sync::Arc;
use tokio::runtime;
use tracing::error; use tracing::error;
use ui::{Event as MainMenuEvent, MainMenuUi}; use ui::{Event as MainMenuEvent, MainMenuUi};
@ -32,8 +38,6 @@ impl MainMenuState {
} }
} }
const DEFAULT_PORT: u16 = 14004;
impl PlayState for MainMenuState { impl PlayState for MainMenuState {
fn enter(&mut self, global_state: &mut GlobalState, _: Direction) { fn enter(&mut self, global_state: &mut GlobalState, _: Direction) {
// Kick off title music // Kick off title music
@ -63,14 +67,8 @@ impl PlayState for MainMenuState {
#[cfg(feature = "singleplayer")] #[cfg(feature = "singleplayer")]
{ {
if let Some(singleplayer) = &global_state.singleplayer { if let Some(singleplayer) = &global_state.singleplayer {
if let Ok(result) = singleplayer.receiver.try_recv() { match singleplayer.receiver.try_recv() {
if let Err(error) = result { Ok(Ok(runtime)) => {
tracing::error!(?error, "Could not start server");
global_state.singleplayer = None;
self.client_init = None;
self.main_menu_ui.cancel_connection();
self.main_menu_ui.show_info(format!("Error: {:?}", error));
} else {
let server_settings = singleplayer.settings(); let server_settings = singleplayer.settings();
// Attempt login after the server is finished initializing // Attempt login after the server is finished initializing
attempt_login( attempt_login(
@ -78,11 +76,21 @@ impl PlayState for MainMenuState {
&mut global_state.info_message, &mut global_state.info_message,
"singleplayer".to_owned(), "singleplayer".to_owned(),
"".to_owned(), "".to_owned(),
server_settings.gameserver_address.ip().to_string(), ClientConnArgs::Resolved(ConnectionArgs::IpAndPort(vec![
server_settings.gameserver_address.port(), server_settings.gameserver_address,
])),
&mut self.client_init, &mut self.client_init,
Some(runtime),
); );
} },
Ok(Err(e)) => {
error!(?e, "Could not start server");
global_state.singleplayer = None;
self.client_init = None;
self.main_menu_ui.cancel_connection();
self.main_menu_ui.show_info(format!("Error: {:?}", e));
},
Err(_) => (),
} }
} }
} }
@ -117,7 +125,7 @@ impl PlayState for MainMenuState {
self.client_init = None; self.client_init = None;
global_state.info_message = Some({ global_state.info_message = Some({
let err = match err { let err = match err {
InitError::BadAddress(_) | InitError::NoAddress => { InitError::NoAddress => {
localized_strings.get("main.login.server_not_found").into() localized_strings.get("main.login.server_not_found").into()
}, },
InitError::ClientError(err) => match err { InitError::ClientError(err) => match err {
@ -155,6 +163,11 @@ impl PlayState for MainMenuState {
client::Error::InvalidCharacter => { client::Error::InvalidCharacter => {
localized_strings.get("main.login.invalid_character").into() localized_strings.get("main.login.invalid_character").into()
}, },
client::Error::NetworkErr(NetworkError::ConnectFailed(
NetworkConnectError::Handshake(InitProtocolError::WrongVersion(_)),
)) => localized_strings
.get("main.login.network_wrong_version")
.into(),
client::Error::NetworkErr(e) => format!( client::Error::NetworkErr(e) => format!(
"{}: {:?}", "{}: {:?}",
localized_strings.get("main.login.network_error"), localized_strings.get("main.login.network_error"),
@ -242,9 +255,9 @@ impl PlayState for MainMenuState {
&mut global_state.info_message, &mut global_state.info_message,
username, username,
password, password,
server_address, ClientConnArgs::Host(server_address),
DEFAULT_PORT,
&mut self.client_init, &mut self.client_init,
None,
); );
}, },
MainMenuEvent::CancelLoginAttempt => { MainMenuEvent::CancelLoginAttempt => {
@ -270,7 +283,7 @@ impl PlayState for MainMenuState {
}, },
#[cfg(feature = "singleplayer")] #[cfg(feature = "singleplayer")]
MainMenuEvent::StartSingleplayer => { MainMenuEvent::StartSingleplayer => {
let singleplayer = Singleplayer::new(None); // TODO: Make client and server use the same thread pool let singleplayer = Singleplayer::new();
global_state.singleplayer = Some(singleplayer); global_state.singleplayer = Some(singleplayer);
}, },
@ -315,18 +328,19 @@ fn attempt_login(
info_message: &mut Option<String>, info_message: &mut Option<String>,
username: String, username: String,
password: String, password: String,
server_address: String, connection_args: ClientConnArgs,
server_port: u16,
client_init: &mut Option<ClientInit>, client_init: &mut Option<ClientInit>,
runtime: Option<Arc<runtime::Runtime>>,
) { ) {
if comp::Player::alias_is_valid(&username) { if comp::Player::alias_is_valid(&username) {
// Don't try to connect if there is already a connection in progress. // Don't try to connect if there is already a connection in progress.
if client_init.is_none() { if client_init.is_none() {
*client_init = Some(ClientInit::new( *client_init = Some(ClientInit::new(
(server_address, server_port, false), connection_args,
username, username,
Some(settings.graphics.view_distance), Some(settings.graphics.view_distance),
password, password,
runtime,
)); ));
} }
} else { } else {

View File

@ -27,6 +27,7 @@ use core::{hash::Hash, ops::Range};
use crossbeam::atomic; use crossbeam::atomic;
use hashbrown::{hash_map::Entry, HashMap}; use hashbrown::{hash_map::Entry, HashMap};
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::Runtime;
use vek::*; use vek::*;
/// A type produced by mesh worker threads corresponding to the information /// A type produced by mesh worker threads corresponding to the information
@ -311,7 +312,7 @@ where
tick: u64, tick: u64,
camera_mode: CameraMode, camera_mode: CameraMode,
character_state: Option<&CharacterState>, character_state: Option<&CharacterState>,
thread_pool: &uvth::ThreadPool, runtime: &Runtime,
) -> (FigureModelEntryLod<'c>, &'c Skel::Attr) ) -> (FigureModelEntryLod<'c>, &'c Skel::Attr)
where where
for<'a> &'a Skel::Body: Into<Skel::Attr>, for<'a> &'a Skel::Body: Into<Skel::Attr>,
@ -377,7 +378,7 @@ where
let manifests = self.manifests; let manifests = self.manifests;
let slot_ = Arc::clone(&slot); let slot_ = Arc::clone(&slot);
thread_pool.execute(move || { runtime.spawn_blocking(move || {
// First, load all the base vertex data. // First, load all the base vertex data.
let manifests = &*manifests.read(); let manifests = &*manifests.read();
let meshes = <Skel::Body as BodySpec>::bone_meshes(&key, manifests); let meshes = <Skel::Body as BodySpec>::bone_meshes(&key, manifests);

View File

@ -725,7 +725,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.thread_pool, scene_data.runtime,
); );
let state = self let state = self
@ -1453,7 +1453,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.thread_pool, scene_data.runtime,
); );
let state = self let state = self
@ -1661,7 +1661,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.thread_pool, scene_data.runtime,
); );
let state = self let state = self
@ -1984,7 +1984,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.thread_pool, scene_data.runtime,
); );
let state = self let state = self
@ -2337,7 +2337,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.thread_pool, scene_data.runtime,
); );
let state = self let state = self
@ -2443,7 +2443,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.thread_pool, scene_data.runtime,
); );
let state = self let state = self
@ -2528,7 +2528,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.thread_pool, scene_data.runtime,
); );
let state = let state =
@ -2618,7 +2618,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.thread_pool, scene_data.runtime,
); );
let state = self let state = self
@ -2770,7 +2770,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.thread_pool, scene_data.runtime,
); );
let state = self let state = self
@ -2857,7 +2857,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.thread_pool, scene_data.runtime,
); );
let state = self let state = self
@ -2942,7 +2942,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.thread_pool, scene_data.runtime,
); );
let state = self let state = self
@ -3352,7 +3352,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.thread_pool, scene_data.runtime,
); );
let state = let state =
@ -3488,7 +3488,7 @@ impl FigureMgr {
tick, tick,
player_camera_mode, player_camera_mode,
player_character_state, player_character_state,
scene_data.thread_pool, scene_data.runtime,
); );
let state = let state =

View File

@ -36,6 +36,7 @@ use common_sys::state::State;
use comp::item::Reagent; use comp::item::Reagent;
use num::traits::{Float, FloatConst}; use num::traits::{Float, FloatConst};
use specs::{Entity as EcsEntity, Join, WorldExt}; use specs::{Entity as EcsEntity, Join, WorldExt};
use tokio::runtime::Runtime;
use vek::*; use vek::*;
// TODO: Don't hard-code this. // TODO: Don't hard-code this.
@ -114,7 +115,7 @@ pub struct SceneData<'a> {
pub loaded_distance: f32, pub loaded_distance: f32,
pub view_distance: u32, pub view_distance: u32,
pub tick: u64, pub tick: u64,
pub thread_pool: &'a uvth::ThreadPool, pub runtime: &'a Runtime,
pub gamma: f32, pub gamma: f32,
pub exposure: f32, pub exposure: f32,
pub ambiance: f32, pub ambiance: f32,

View File

@ -29,6 +29,7 @@ use common::{
terrain::BlockKind, terrain::BlockKind,
vol::{BaseVol, ReadVol}, vol::{BaseVol, ReadVol},
}; };
use tokio::runtime::Runtime;
use tracing::error; use tracing::error;
use vek::*; use vek::*;
use winit::event::MouseButton; use winit::event::MouseButton;
@ -96,7 +97,7 @@ pub struct SceneData<'a> {
pub time: f64, pub time: f64,
pub delta_time: f32, pub delta_time: f32,
pub tick: u64, pub tick: u64,
pub thread_pool: &'a uvth::ThreadPool, pub runtime: &'a Runtime,
pub body: Option<humanoid::Body>, pub body: Option<humanoid::Body>,
pub gamma: f32, pub gamma: f32,
pub exposure: f32, pub exposure: f32,
@ -350,7 +351,7 @@ impl Scene {
scene_data.tick, scene_data.tick,
CameraMode::default(), CameraMode::default(),
None, None,
scene_data.thread_pool, scene_data.runtime,
) )
.0; .0;
let mut buf = [Default::default(); anim::MAX_BONE_COUNT]; let mut buf = [Default::default(); anim::MAX_BONE_COUNT];

View File

@ -27,7 +27,10 @@ use enum_iterator::IntoEnumIterator;
use guillotiere::AtlasAllocator; use guillotiere::AtlasAllocator;
use hashbrown::HashMap; use hashbrown::HashMap;
use serde::Deserialize; use serde::Deserialize;
use std::sync::Arc; use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use tracing::warn; use tracing::warn;
use treeculler::{BVol, Frustum, AABB}; use treeculler::{BVol, Frustum, AABB};
use vek::*; use vek::*;
@ -259,6 +262,7 @@ pub struct Terrain<V: RectRasterableVol = TerrainChunk> {
mesh_send_tmp: channel::Sender<MeshWorkerResponse>, mesh_send_tmp: channel::Sender<MeshWorkerResponse>,
mesh_recv: channel::Receiver<MeshWorkerResponse>, mesh_recv: channel::Receiver<MeshWorkerResponse>,
mesh_todo: HashMap<Vec2<i32>, ChunkMeshState>, mesh_todo: HashMap<Vec2<i32>, ChunkMeshState>,
mesh_todos_active: Arc<AtomicU64>,
// GPU data // GPU data
sprite_data: Arc<HashMap<(SpriteKind, usize), Vec<SpriteData>>>, sprite_data: Arc<HashMap<(SpriteKind, usize), Vec<SpriteData>>>,
@ -406,6 +410,7 @@ impl<V: RectRasterableVol> Terrain<V> {
mesh_send_tmp: send, mesh_send_tmp: send,
mesh_recv: recv, mesh_recv: recv,
mesh_todo: HashMap::default(), mesh_todo: HashMap::default(),
mesh_todos_active: Arc::new(AtomicU64::new(0)),
sprite_data: Arc::new(sprite_data), sprite_data: Arc::new(sprite_data),
sprite_col_lights, sprite_col_lights,
waves: renderer waves: renderer
@ -633,6 +638,11 @@ impl<V: RectRasterableVol> Terrain<V> {
// Limit ourselves to u16::MAX even if larger textures are supported. // Limit ourselves to u16::MAX even if larger textures are supported.
let max_texture_size = renderer.max_texture_size(); let max_texture_size = renderer.max_texture_size();
let meshing_cores = match num_cpus::get() as u64 {
n if n < 4 => 1,
n if n < 8 => n - 3,
n => n - 4,
};
span!(guard, "Queue meshing from todo list"); span!(guard, "Queue meshing from todo list");
for (todo, chunk) in self for (todo, chunk) in self
@ -649,8 +659,7 @@ impl<V: RectRasterableVol> Terrain<V> {
.cloned()?)) .cloned()?))
}) })
{ {
// TODO: find a alternative! if self.mesh_todos_active.load(Ordering::Relaxed) > meshing_cores {
if scene_data.thread_pool.queued_jobs() > 0 {
break; break;
} }
@ -701,7 +710,9 @@ impl<V: RectRasterableVol> Terrain<V> {
let started_tick = todo.started_tick; let started_tick = todo.started_tick;
let sprite_data = Arc::clone(&self.sprite_data); let sprite_data = Arc::clone(&self.sprite_data);
let sprite_config = Arc::clone(&self.sprite_config); let sprite_config = Arc::clone(&self.sprite_config);
scene_data.thread_pool.execute(move || { let cnt = Arc::clone(&self.mesh_todos_active);
cnt.fetch_add(1, Ordering::Relaxed);
scene_data.runtime.spawn_blocking(move || {
let sprite_data = sprite_data; let sprite_data = sprite_data;
let _ = send.send(mesh_worker( let _ = send.send(mesh_worker(
pos, pos,
@ -714,6 +725,7 @@ impl<V: RectRasterableVol> Terrain<V> {
&sprite_data, &sprite_data,
&sprite_config, &sprite_config,
)); ));
cnt.fetch_sub(1, Ordering::Relaxed);
}); });
todo.is_worker_active = true; todo.is_worker_active = true;
} }

View File

@ -1307,7 +1307,7 @@ impl PlayState for SessionState {
loaded_distance: client.loaded_distance(), loaded_distance: client.loaded_distance(),
view_distance: client.view_distance().unwrap_or(1), view_distance: client.view_distance().unwrap_or(1),
tick: client.get_tick(), tick: client.get_tick(),
thread_pool: client.thread_pool(), runtime: &client.runtime(),
gamma: global_state.settings.graphics.gamma, gamma: global_state.settings.graphics.gamma,
exposure: global_state.settings.graphics.exposure, exposure: global_state.settings.graphics.exposure,
ambiance: global_state.settings.graphics.ambiance, ambiance: global_state.settings.graphics.ambiance,
@ -1375,7 +1375,7 @@ impl PlayState for SessionState {
loaded_distance: client.loaded_distance(), loaded_distance: client.loaded_distance(),
view_distance: client.view_distance().unwrap_or(1), view_distance: client.view_distance().unwrap_or(1),
tick: client.get_tick(), tick: client.get_tick(),
thread_pool: client.thread_pool(), runtime: &client.runtime(),
gamma: settings.graphics.gamma, gamma: settings.graphics.gamma,
exposure: settings.graphics.exposure, exposure: settings.graphics.exposure,
ambiance: settings.graphics.ambiance, ambiance: settings.graphics.ambiance,

View File

@ -1,29 +1,25 @@
use client::Client;
use common::clock::Clock; use common::clock::Clock;
use crossbeam::channel::{bounded, unbounded, Receiver, Sender, TryRecvError}; use crossbeam::channel::{bounded, unbounded, Receiver, Sender, TryRecvError};
use server::{Error as ServerError, Event, Input, Server}; use server::{Error as ServerError, Event, Input, Server};
use std::{ use std::{
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Arc,
}, },
thread::{self, JoinHandle}, thread::{self, JoinHandle},
time::Duration, time::Duration,
}; };
use tracing::{error, info, warn}; use tokio::runtime::Runtime;
use tracing::{debug, error, info, trace, warn};
const TPS: u64 = 30; const TPS: u64 = 30;
enum Msg {
Stop,
}
/// Used to start and stop the background thread running the server /// Used to start and stop the background thread running the server
/// when in singleplayer mode. /// when in singleplayer mode.
pub struct Singleplayer { pub struct Singleplayer {
_server_thread: JoinHandle<()>, _server_thread: JoinHandle<()>,
sender: Sender<Msg>, stop_server_s: Sender<()>,
pub receiver: Receiver<Result<(), ServerError>>, pub receiver: Receiver<Result<Arc<Runtime>, ServerError>>,
// Wether the server is stopped or not // Wether the server is stopped or not
paused: Arc<AtomicBool>, paused: Arc<AtomicBool>,
// Settings that the server was started with // Settings that the server was started with
@ -31,8 +27,9 @@ pub struct Singleplayer {
} }
impl Singleplayer { impl Singleplayer {
pub fn new(client: Option<&Client>) -> Self { #[allow(clippy::new_without_default)]
let (sender, receiver) = unbounded(); pub fn new() -> Self {
let (stop_server_s, stop_server_r) = unbounded();
// Determine folder to save server data in // Determine folder to save server data in
let server_data_dir = { let server_data_dir = {
@ -81,15 +78,21 @@ impl Singleplayer {
let settings = server::Settings::singleplayer(&server_data_dir); let settings = server::Settings::singleplayer(&server_data_dir);
let editable_settings = server::EditableSettings::singleplayer(&server_data_dir); let editable_settings = server::EditableSettings::singleplayer(&server_data_dir);
let thread_pool = client.map(|c| c.thread_pool().clone());
let cores = num_cpus::get(); let cores = num_cpus::get();
debug!("Creating a new runtime for server");
let runtime = Arc::new( let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
.enable_all() .enable_all()
.worker_threads(if cores > 4 { cores - 1 } else { cores }) .worker_threads(if cores > 4 { cores - 1 } else { cores })
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("tokio-sp-{}", id)
})
.build() .build()
.unwrap(), .unwrap(),
); );
let settings2 = settings.clone(); let settings2 = settings.clone();
let paused = Arc::new(AtomicBool::new(false)); let paused = Arc::new(AtomicBool::new(false));
@ -97,21 +100,29 @@ impl Singleplayer {
let (result_sender, result_receiver) = bounded(1); let (result_sender, result_receiver) = bounded(1);
let thread = thread::spawn(move || { let builder = thread::Builder::new().name("singleplayer-server-thread".into());
let thread = builder
.spawn(move || {
trace!("starting singleplayer server thread");
let mut server = None; let mut server = None;
if let Err(e) = result_sender.send( if let Err(e) = result_sender.send(
match Server::new(settings2, editable_settings, &server_data_dir, runtime) { match Server::new(
settings2,
editable_settings,
&server_data_dir,
Arc::clone(&runtime),
) {
Ok(s) => { Ok(s) => {
server = Some(s); server = Some(s);
Ok(()) Ok(runtime)
}, },
Err(e) => Err(e), Err(e) => Err(e),
}, },
) { ) {
warn!( warn!(
?e, ?e,
"Failed to send singleplayer server initialization result. Most likely the \ "Failed to send singleplayer server initialization result. Most likely \
channel was closed by cancelling server creation. Stopping Server" the channel was closed by cancelling server creation. Stopping Server"
); );
return; return;
}; };
@ -121,17 +132,14 @@ impl Singleplayer {
None => return, None => return,
}; };
let server = match thread_pool { run_server(server, stop_server_r, paused1);
Some(pool) => server.with_thread_pool(pool), trace!("ending singleplayer server thread");
None => server, })
}; .unwrap();
run_server(server, receiver, paused1);
});
Singleplayer { Singleplayer {
_server_thread: thread, _server_thread: thread,
sender, stop_server_s,
receiver: result_receiver, receiver: result_receiver,
paused, paused,
settings, settings,
@ -152,11 +160,11 @@ impl Singleplayer {
impl Drop for Singleplayer { impl Drop for Singleplayer {
fn drop(&mut self) { fn drop(&mut self) {
// Ignore the result // Ignore the result
let _ = self.sender.send(Msg::Stop); let _ = self.stop_server_s.send(());
} }
} }
fn run_server(mut server: Server, rec: Receiver<Msg>, paused: Arc<AtomicBool>) { fn run_server(mut server: Server, stop_server_r: Receiver<()>, paused: Arc<AtomicBool>) {
info!("Starting server-cli..."); info!("Starting server-cli...");
// Set up an fps clock // Set up an fps clock
@ -164,14 +172,10 @@ fn run_server(mut server: Server, rec: Receiver<Msg>, paused: Arc<AtomicBool>) {
loop { loop {
// Check any event such as stopping and pausing // Check any event such as stopping and pausing
match rec.try_recv() { match stop_server_r.try_recv() {
Ok(msg) => match msg { Ok(()) => break,
Msg::Stop => break, Err(TryRecvError::Disconnected) => break,
}, Err(TryRecvError::Empty) => (),
Err(err) => match err {
TryRecvError::Empty => (),
TryRecvError::Disconnected => break,
},
} }
// Wait for the next tick. // Wait for the next tick.

View File

@ -1319,14 +1319,16 @@ impl Window {
let mut path = settings.screenshots_path.clone(); let mut path = settings.screenshots_path.clone();
let sender = self.message_sender.clone(); let sender = self.message_sender.clone();
std::thread::spawn(move || { let builder = std::thread::Builder::new().name("screenshot".into());
builder
.spawn(move || {
use std::time::SystemTime; use std::time::SystemTime;
// Check if folder exists and create it if it does not // Check if folder exists and create it if it does not
if !path.exists() { if !path.exists() {
if let Err(e) = std::fs::create_dir_all(&path) { if let Err(e) = std::fs::create_dir_all(&path) {
warn!(?e, "Couldn't create folder for screenshot"); warn!(?e, "Couldn't create folder for screenshot");
let _result = let _result = sender
sender.send(String::from("Couldn't create folder for screenshot")); .send(String::from("Couldn't create folder for screenshot"));
} }
} }
path.push(format!( path.push(format!(
@ -1340,10 +1342,11 @@ impl Window {
warn!(?e, "Couldn't save screenshot"); warn!(?e, "Couldn't save screenshot");
let _result = sender.send(String::from("Couldn't save screenshot")); let _result = sender.send(String::from("Couldn't save screenshot"));
} else { } else {
let _result = let _result = sender
sender.send(format!("Screenshot saved to {}", path.to_string_lossy())); .send(format!("Screenshot saved to {}", path.to_string_lossy()));
} }
}); })
.unwrap();
}, },
Err(e) => error!(?e, "Couldn't create screenshot due to renderer error"), Err(e) => error!(?e, "Couldn't create screenshot due to renderer error"),
} }