Initial switch to tokio for network, minimum working example.

This commit is contained in:
Marcel Märtens 2021-01-13 14:16:22 +01:00
parent 68990d1a17
commit 1b77b6dc41
16 changed files with 149 additions and 180 deletions

143
Cargo.lock generated
View File

@ -248,41 +248,6 @@ dependencies = [
"serde_json",
]
[[package]]
name = "async-std"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "538ecb01eb64eecd772087e5b6f7540cbc917f047727339a472dafed2185b267"
dependencies = [
"async-task",
"crossbeam-channel 0.4.4",
"crossbeam-deque 0.7.3",
"crossbeam-utils 0.7.2",
"futures-core",
"futures-io",
"futures-timer 2.0.2",
"kv-log-macro",
"log",
"memchr",
"mio 0.6.23",
"mio-uds",
"num_cpus",
"once_cell",
"pin-project-lite 0.1.11",
"pin-utils",
"slab",
]
[[package]]
name = "async-task"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ac2c016b079e771204030951c366db398864f5026f84a44dafb0ff20f02085d"
dependencies = [
"libc",
"winapi 0.3.9",
]
[[package]]
name = "atom"
version = "0.3.6"
@ -1114,16 +1079,6 @@ dependencies = [
"crossbeam-utils 0.6.6",
]
[[package]]
name = "crossbeam-channel"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87"
dependencies = [
"crossbeam-utils 0.7.2",
"maybe-uninit",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.0"
@ -1290,16 +1245,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "ctor"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8f45d9ad417bcef4817d614a501ab55cdd96a6fdb24f49aab89a54acfd66b19"
dependencies = [
"quote 1.0.9",
"syn 1.0.60",
]
[[package]]
name = "daggy"
version = "0.5.0"
@ -1861,12 +1806,6 @@ dependencies = [
"once_cell",
]
[[package]]
name = "futures-timer"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1de7508b218029b0f01662ed8f61b1c964b3ae99d6f25462d0f55a595109df6"
[[package]]
name = "futures-timer"
version = "3.0.2"
@ -2242,7 +2181,7 @@ dependencies = [
"http",
"indexmap",
"slab",
"tokio",
"tokio 0.2.25",
"tokio-util",
"tracing",
"tracing-futures",
@ -2393,7 +2332,7 @@ dependencies = [
"itoa",
"pin-project 1.0.5",
"socket2",
"tokio",
"tokio 0.2.25",
"tower-service",
"tracing",
"want",
@ -2410,7 +2349,7 @@ dependencies = [
"hyper",
"log",
"rustls 0.18.1",
"tokio",
"tokio 0.2.25",
"tokio-rustls",
"webpki",
]
@ -2687,15 +2626,6 @@ version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2db585e1d738fc771bf08a151420d3ed193d9d895a36df7f6f8a9456b911ddc"
[[package]]
name = "kv-log-macro"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f"
dependencies = [
"log",
]
[[package]]
name = "lazy-bytes-cast"
version = "5.0.1"
@ -2863,7 +2793,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [
"cfg-if 1.0.0",
"value-bag",
]
[[package]]
@ -3088,17 +3017,6 @@ dependencies = [
"slab",
]
[[package]]
name = "mio-uds"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0"
dependencies = [
"iovec",
"libc",
"mio 0.6.23",
]
[[package]]
name = "miow"
version = "0.2.2"
@ -4282,7 +4200,7 @@ dependencies = [
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio 0.2.25",
"tokio-rustls",
"url",
"wasm-bindgen",
@ -5162,6 +5080,33 @@ dependencies = [
"slab",
]
[[package]]
name = "tokio"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8190d04c665ea9e6b6a0dc45523ade572c088d2e6566244c1122671dbf4ae3a"
dependencies = [
"autocfg",
"bytes 1.0.1",
"libc",
"memchr",
"mio 0.7.7",
"num_cpus",
"pin-project-lite 0.2.4",
"tokio-macros",
]
[[package]]
name = "tokio-macros"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.60",
]
[[package]]
name = "tokio-rustls"
version = "0.14.1"
@ -5170,7 +5115,7 @@ checksum = "e12831b255bcfa39dc0436b01e19fea231a37db570686c06ee72c423479f889a"
dependencies = [
"futures-core",
"rustls 0.18.1",
"tokio",
"tokio 0.2.25",
"webpki",
]
@ -5185,7 +5130,7 @@ dependencies = [
"futures-sink",
"log",
"pin-project-lite 0.1.11",
"tokio",
"tokio 0.2.25",
]
[[package]]
@ -5528,15 +5473,6 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "value-bag"
version = "1.0.0-alpha.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b676010e055c99033117c2343b33a40a30b91fecd6c49055ac9cd2d6c305ab1"
dependencies = [
"ctor",
]
[[package]]
name = "vcpkg"
version = "0.2.11"
@ -5596,7 +5532,7 @@ dependencies = [
"authc",
"byteorder",
"futures-executor",
"futures-timer 3.0.2",
"futures-timer",
"futures-util",
"hashbrown 0.9.1",
"image",
@ -5604,6 +5540,7 @@ dependencies = [
"num_cpus",
"rayon",
"specs",
"tokio 1.2.0",
"tracing",
"tracing-subscriber",
"uvth 3.1.1",
@ -5731,7 +5668,7 @@ dependencies = [
"dotenv",
"futures-channel",
"futures-executor",
"futures-timer 3.0.2",
"futures-timer",
"futures-util",
"hashbrown 0.9.1",
"itertools 0.9.0",
@ -5749,6 +5686,7 @@ dependencies = [
"specs",
"specs-idvs",
"tiny_http",
"tokio 1.2.0",
"tracing",
"uvth 3.1.1",
"vek 0.12.0",
@ -5772,6 +5710,7 @@ dependencies = [
"serde",
"signal-hook 0.2.3",
"termcolor",
"tokio 1.2.0",
"tracing",
"tracing-subscriber",
"tracing-tracy",
@ -5818,6 +5757,7 @@ dependencies = [
"lazy_static",
"native-dialog",
"num 0.3.1",
"num_cpus",
"old_school_gfx_glutin_ext",
"ordered-float 2.1.1",
"rand 0.8.3",
@ -5827,6 +5767,7 @@ dependencies = [
"specs",
"specs-idvs",
"termcolor",
"tokio 1.2.0",
"tracing",
"tracing-appender",
"tracing-log",
@ -5893,9 +5834,8 @@ dependencies = [
[[package]]
name = "veloren_network"
version = "0.2.0"
version = "0.3.0"
dependencies = [
"async-std",
"bincode",
"bitflags",
"clap",
@ -5908,6 +5848,7 @@ dependencies = [
"serde",
"shellexpand",
"tiny_http",
"tokio 1.2.0",
"tracing",
"tracing-futures",
"tracing-subscriber",

View File

@ -21,6 +21,7 @@ uvth = "3.1.1"
futures-util = "0.3.7"
futures-executor = "0.3"
futures-timer = "3.0"
tokio = { version = "1.0.1", default-features = false, features = ["rt"] }
image = { version = "0.23.12", default-features = false, features = ["png"] }
num = "0.3.1"
num_cpus = "1.10.1"

View File

@ -64,6 +64,7 @@ use std::{
time::{Duration, Instant},
};
use tracing::{debug, error, trace, warn};
use tokio::runtime::Runtime;
use uvth::{ThreadPool, ThreadPoolBuilder};
use vek::*;
@ -129,6 +130,7 @@ impl WorldData {
pub struct Client {
registered: bool,
presence: Option<PresenceKind>,
runtime: Arc<Runtime>,
thread_pool: ThreadPool,
server_info: ServerInfo,
world_data: WorldData,
@ -185,15 +187,14 @@ pub struct CharacterList {
impl Client {
/// Create a new `Client`.
pub fn new<A: Into<SocketAddr>>(addr: A, view_distance: Option<u32>) -> Result<Self, Error> {
pub fn new<A: Into<SocketAddr>>(addr: A, view_distance: Option<u32>, runtime: Arc<Runtime>) -> Result<Self, Error> {
let mut thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".into())
.build();
// We reduce the thread count by 1 to keep rendering smooth
thread_pool.set_num_threads((num_cpus::get() - 1).max(1));
let (network, scheduler) = Network::new(Pid::new());
thread_pool.execute(scheduler);
let network = Network::new(Pid::new(), Arc::clone(&runtime));
let participant = block_on(network.connect(ProtocolAddr::Tcp(addr.into())))?;
let stream = block_on(participant.opened())?;
@ -417,6 +418,7 @@ impl Client {
Ok(Self {
registered: false,
presence: None,
runtime,
thread_pool,
server_info,
world_data: WorldData {
@ -1733,6 +1735,8 @@ impl Client {
/// exempt).
pub fn thread_pool(&self) -> &ThreadPool { &self.thread_pool }
pub fn runtime(&self) -> &Arc<Runtime> { &self.runtime }
/// Get a reference to the client's game state.
pub fn state(&self) -> &State { &self.state }

View File

@ -1,6 +1,6 @@
[package]
name = "veloren_network"
version = "0.2.0"
version = "0.3.0"
authors = ["Marcel Märtens <marcel.cochem@googlemail.com>"]
edition = "2018"
@ -19,8 +19,7 @@ bincode = "1.3.1"
serde = { version = "1.0" }
#sending
crossbeam-channel = "0.5"
# NOTE: Upgrading async-std can trigger spontanious crashes for `network`ing. Consider elaborate tests before upgrading
async-std = { version = "~1.5", default-features = false, features = ["std", "async-task", "default"] }
tokio = { version = "1.2", default-features = false, features = ["io-util", "macros", "rt", "net", "time"] }
#tracing and metrics
tracing = { version = "0.1", default-features = false }
tracing-futures = "0.2"

View File

@ -8,7 +8,8 @@ use crate::{
scheduler::Scheduler,
types::{Mid, Pid, Prio, Promises, Sid},
};
use async_std::{io, sync::Mutex, task};
use tokio::{io, sync::Mutex};
use tokio::runtime::Runtime;
use futures::{
channel::{mpsc, oneshot},
sink::SinkExt,
@ -50,6 +51,7 @@ pub enum ProtocolAddr {
pub struct Participant {
local_pid: Pid,
remote_pid: Pid,
runtime: Arc<Runtime>,
a2b_stream_open_s: Mutex<mpsc::UnboundedSender<A2bStreamOpen>>,
b2a_stream_opened_r: Mutex<mpsc::UnboundedReceiver<Stream>>,
a2s_disconnect_s: A2sDisconnect,
@ -76,6 +78,7 @@ pub struct Stream {
prio: Prio,
promises: Promises,
send_closed: Arc<AtomicBool>,
runtime: Arc<Runtime>,
a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
b2a_msg_recv_r: Option<mpsc::UnboundedReceiver<IncomingMessage>>,
a2b_close_stream_s: Option<mpsc::UnboundedSender<Sid>>,
@ -150,9 +153,10 @@ pub enum StreamError {
/// [`connected`]: Network::connected
pub struct Network {
local_pid: Pid,
runtime: Arc<Runtime>,
participant_disconnect_sender: Mutex<HashMap<Pid, A2sDisconnect>>,
listen_sender:
Mutex<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<async_std::io::Result<()>>)>>,
Mutex<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<tokio::io::Result<()>>)>>,
connect_sender:
Mutex<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<io::Result<Participant>>)>>,
connected_receiver: Mutex<mpsc::UnboundedReceiver<Participant>>,
@ -165,17 +169,12 @@ impl Network {
/// # Arguments
/// * `participant_id` - provide it by calling [`Pid::new()`], usually you
/// don't want to reuse a Pid for 2 `Networks`
/// * `runtime` - provide a tokio::Runtime, it's used to internally spawn tasks
///
/// # Result
/// * `Self` - returns a `Network` which can be `Send` to multiple areas of
/// your code, including multiple threads. This is the base strct of this
/// crate.
/// * `FnOnce` - you need to run the returning FnOnce exactly once, probably
/// in it's own thread. this is NOT done internally, so that you are free
/// to choose the threadpool implementation of your choice. We recommend
/// using [`ThreadPool`] from [`uvth`] crate. This fn will run the
/// Scheduler to handle all `Network` internals. Additional threads will
/// be allocated on an internal async-aware threadpool
///
/// # Examples
/// ```rust
@ -204,9 +203,10 @@ impl Network {
/// [`Pid::new()`]: crate::types::Pid::new
/// [`ThreadPool`]: https://docs.rs/uvth/newest/uvth/struct.ThreadPool.html
/// [`uvth`]: https://docs.rs/uvth
pub fn new(participant_id: Pid) -> (Self, impl std::ops::FnOnce()) {
pub fn new(participant_id: Pid, runtime: Arc<Runtime>) -> Self {
Self::internal_new(
participant_id,
runtime,
#[cfg(feature = "metrics")]
None,
)
@ -232,42 +232,46 @@ impl Network {
#[cfg(feature = "metrics")]
pub fn new_with_registry(
participant_id: Pid,
runtime: Arc<Runtime>,
registry: &Registry,
) -> (Self, impl std::ops::FnOnce()) {
Self::internal_new(participant_id, Some(registry))
) -> Self {
Self::internal_new(participant_id, runtime, Some(registry))
}
fn internal_new(
participant_id: Pid,
runtime: Arc<Runtime>,
#[cfg(feature = "metrics")] registry: Option<&Registry>,
) -> (Self, impl std::ops::FnOnce()) {
) -> Self {
let p = participant_id;
debug!(?p, "Starting Network");
let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) =
Scheduler::new(
participant_id,
Arc::clone(&runtime),
#[cfg(feature = "metrics")]
registry,
);
(
Self {
local_pid: participant_id,
participant_disconnect_sender: Mutex::new(HashMap::new()),
listen_sender: Mutex::new(listen_sender),
connect_sender: Mutex::new(connect_sender),
connected_receiver: Mutex::new(connected_receiver),
shutdown_sender: Some(shutdown_sender),
},
move || {
runtime.spawn(
async move {
trace!(?p, "Starting scheduler in own thread");
let _handle = task::block_on(
let _handle = tokio::spawn(
scheduler
.run()
.instrument(tracing::info_span!("scheduler", ?p)),
);
trace!(?p, "Stopping scheduler and his own thread");
},
)
}
);
Self {
local_pid: participant_id,
runtime: runtime,
participant_disconnect_sender: Mutex::new(HashMap::new()),
listen_sender: Mutex::new(listen_sender),
connect_sender: Mutex::new(connect_sender),
connected_receiver: Mutex::new(connected_receiver),
shutdown_sender: Some(shutdown_sender),
}
}
/// starts listening on an [`ProtocolAddr`].
@ -300,7 +304,7 @@ impl Network {
///
/// [`connected`]: Network::connected
pub async fn listen(&self, address: ProtocolAddr) -> Result<(), NetworkError> {
let (s2a_result_s, s2a_result_r) = oneshot::channel::<async_std::io::Result<()>>();
let (s2a_result_s, s2a_result_r) = oneshot::channel::<tokio::io::Result<()>>();
debug!(?address, "listening on address");
self.listen_sender
.lock()
@ -426,6 +430,7 @@ impl Participant {
pub(crate) fn new(
local_pid: Pid,
remote_pid: Pid,
runtime: Arc<Runtime>,
a2b_stream_open_s: mpsc::UnboundedSender<A2bStreamOpen>,
b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>,
@ -433,6 +438,7 @@ impl Participant {
Self {
local_pid,
remote_pid,
runtime,
a2b_stream_open_s: Mutex::new(a2b_stream_open_s),
b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r),
a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
@ -655,6 +661,7 @@ impl Stream {
prio: Prio,
promises: Promises,
send_closed: Arc<AtomicBool>,
runtime: Arc<Runtime>,
a2b_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
b2a_msg_recv_r: mpsc::UnboundedReceiver<IncomingMessage>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
@ -666,6 +673,7 @@ impl Stream {
prio,
promises,
send_closed,
runtime,
a2b_msg_s,
b2a_msg_recv_r: Some(b2a_msg_recv_r),
a2b_close_stream_s: Some(a2b_close_stream_s),
@ -960,7 +968,7 @@ impl Drop for Network {
"Shutting down Participants of Network, while we still have metrics"
);
let mut finished_receiver_list = vec![];
task::block_on(async {
self.runtime.block_on(async {
// we MUST avoid nested block_on, good that Network::Drop no longer triggers
// Participant::Drop directly but just the BParticipant
for (remote_pid, a2s_disconnect_s) in
@ -1013,14 +1021,14 @@ impl Drop for Participant {
let pid = self.remote_pid;
debug!(?pid, "Shutting down Participant");
match task::block_on(self.a2s_disconnect_s.lock()).take() {
match self.runtime.block_on(self.a2s_disconnect_s.lock()).take() {
None => trace!(
?pid,
"Participant has been shutdown cleanly, no further waiting is required!"
),
Some(mut a2s_disconnect_s) => {
debug!(?pid, "Disconnect from Scheduler");
task::block_on(async {
self.runtime.block_on(async {
let (finished_sender, finished_receiver) = oneshot::channel();
a2s_disconnect_s
.send((self.remote_pid, finished_sender))
@ -1051,7 +1059,7 @@ impl Drop for Stream {
let sid = self.sid;
let pid = self.pid;
debug!(?pid, ?sid, "Shutting down Stream");
task::block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid))
self.runtime.block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid))
.expect("bparticipant part of a gracefully shutdown must have crashed");
} else {
let sid = self.sid;

View File

@ -39,7 +39,7 @@
//!
//! # Examples
//! ```rust
//! use async_std::task::sleep;
//! use tokio::task::sleep;
//! use futures::{executor::block_on, join};
//! use veloren_network::{Network, Pid, Promises, ProtocolAddr};
//!

View File

@ -8,7 +8,8 @@ use crate::{
protocols::Protocols,
types::{Cid, Frame, Pid, Prio, Promises, Sid},
};
use async_std::sync::{Mutex, RwLock};
use tokio::sync::{Mutex, RwLock};
use tokio::runtime::Runtime;
use futures::{
channel::{mpsc, oneshot},
future::FutureExt,
@ -71,6 +72,7 @@ pub struct BParticipant {
remote_pid: Pid,
remote_pid_string: String, //optimisation
offset_sid: Sid,
runtime: Arc<Runtime>,
channels: Arc<RwLock<HashMap<Cid, Mutex<ChannelInfo>>>>,
streams: RwLock<HashMap<Sid, StreamInfo>>,
running_mgr: AtomicUsize,
@ -86,6 +88,7 @@ impl BParticipant {
pub(crate) fn new(
remote_pid: Pid,
offset_sid: Sid,
runtime: Arc<Runtime>,
#[cfg(feature = "metrics")] metrics: Arc<NetworkMetrics>,
) -> (
Self,
@ -120,6 +123,7 @@ impl BParticipant {
remote_pid,
remote_pid_string: remote_pid.to_string(),
offset_sid,
runtime,
channels: Arc::new(RwLock::new(HashMap::new())),
streams: RwLock::new(HashMap::new()),
running_mgr: AtomicUsize::new(0),
@ -213,7 +217,7 @@ impl BParticipant {
.send((self.remote_pid, len as u64, /* */ 0))
.await
.unwrap();
async_std::task::sleep(TICK_TIME).await;
tokio::time::sleep(TICK_TIME).await;
i += 1;
if i.rem_euclid(1000) == 0 {
trace!("Did 1000 ticks");
@ -659,7 +663,7 @@ impl BParticipant {
//Wait for other bparticipants mgr to close via AtomicUsize
const SLEEP_TIME: Duration = Duration::from_millis(5);
const ALLOWED_MANAGER: usize = 1;
async_std::task::sleep(SLEEP_TIME).await;
tokio::time::sleep(SLEEP_TIME).await;
let mut i: u32 = 1;
while self.running_mgr.load(Ordering::Relaxed) > ALLOWED_MANAGER {
i += 1;
@ -670,7 +674,7 @@ impl BParticipant {
self.running_mgr.load(Ordering::Relaxed) - ALLOWED_MANAGER
);
}
async_std::task::sleep(SLEEP_TIME * i).await;
tokio::time::sleep(SLEEP_TIME * i).await;
}
trace!("All BParticipant mgr (except me) are shut down now");
@ -843,6 +847,7 @@ impl BParticipant {
prio,
promises,
send_closed,
Arc::clone(&self.runtime),
a2p_msg_s,
b2a_msg_recv_r,
a2b_close_stream_s.clone(),

View File

@ -4,8 +4,8 @@ use crate::{
participant::C2pFrame,
types::{Cid, Frame},
};
use async_std::{
io::prelude::*,
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpStream, UdpSocket},
};
@ -43,7 +43,8 @@ pub(crate) enum Protocols {
#[derive(Debug)]
pub(crate) struct TcpProtocol {
stream: TcpStream,
read_stream: tokio::sync::Mutex<tokio::net::tcp::OwnedReadHalf>,
write_stream: tokio::sync::Mutex<tokio::net::tcp::OwnedWriteHalf>,
#[cfg(feature = "metrics")]
metrics: Arc<NetworkMetrics>,
}
@ -63,14 +64,16 @@ impl TcpProtocol {
stream: TcpStream,
#[cfg(feature = "metrics")] metrics: Arc<NetworkMetrics>,
) -> Self {
let (read_stream, write_stream) = stream.into_split();
Self {
stream,
read_stream: tokio::sync::Mutex::new(read_stream),
write_stream: tokio::sync::Mutex::new(write_stream),
#[cfg(feature = "metrics")]
metrics,
}
}
async fn read_frame<R: ReadExt + std::marker::Unpin>(
async fn read_frame<R: AsyncReadExt + std::marker::Unpin>(
r: &mut R,
mut end_receiver: &mut Fuse<oneshot::Receiver<()>>,
) -> Result<Frame, Option<std::io::Error>> {
@ -167,11 +170,11 @@ impl TcpProtocol {
.metrics
.wire_in_throughput
.with_label_values(&[&cid.to_string()]);
let mut stream = self.stream.clone();
let mut read_stream = self.read_stream.lock().await;
let mut end_r = end_r.fuse();
loop {
match Self::read_frame(&mut stream, &mut end_r).await {
match Self::read_frame(&mut *read_stream, &mut end_r).await {
Ok(frame) => {
#[cfg(feature = "metrics")]
{
@ -209,7 +212,7 @@ impl TcpProtocol {
trace!("Shutting down tcp read()");
}
pub async fn write_frame<W: WriteExt + std::marker::Unpin>(
pub async fn write_frame<W: AsyncWriteExt + std::marker::Unpin>(
w: &mut W,
frame: Frame,
) -> Result<(), std::io::Error> {
@ -270,7 +273,7 @@ impl TcpProtocol {
pub async fn write_to_wire(&self, cid: Cid, mut c2w_frame_r: mpsc::UnboundedReceiver<Frame>) {
trace!("Starting up tcp write()");
let mut stream = self.stream.clone();
let mut write_stream = self.write_stream.lock().await;
#[cfg(feature = "metrics")]
let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_out_total.clone(), cid);
#[cfg(feature = "metrics")]
@ -294,7 +297,7 @@ impl TcpProtocol {
throughput_cache.inc_by(data.len() as u64);
}
}
if let Err(e) = Self::write_frame(&mut stream, frame).await {
if let Err(e) = Self::write_frame(&mut *write_stream, frame).await {
info!(
?e,
"Got an error writing to tcp, going to close this channel"
@ -498,7 +501,7 @@ impl UdpProtocol {
mod tests {
use super::*;
use crate::{metrics::NetworkMetrics, types::Pid};
use async_std::net;
use tokio::net;
use futures::{executor::block_on, stream::StreamExt};
use std::sync::Arc;
@ -534,7 +537,7 @@ mod tests {
})
});
// Assert than we get some value back! Its a Handshake!
//async_std::task::sleep(std::time::Duration::from_millis(1000));
//tokio::task::sleep(std::time::Duration::from_millis(1000));
let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap();
assert_eq!(cid, cid_r);
if let Ok(Frame::Handshake {

View File

@ -7,10 +7,10 @@ use crate::{
protocols::{Protocols, TcpProtocol, UdpProtocol},
types::Pid,
};
use async_std::{io, net, sync::Mutex};
use tokio::{io, net, sync::Mutex};
use tokio::runtime::Runtime;
use futures::{
channel::{mpsc, oneshot},
executor::ThreadPool,
future::FutureExt,
select,
sink::SinkExt,
@ -68,9 +68,9 @@ struct ParticipantChannels {
#[derive(Debug)]
pub struct Scheduler {
local_pid: Pid,
runtime: Arc<Runtime>,
local_secret: u128,
closed: AtomicBool,
pool: Arc<ThreadPool>,
run_channels: Option<ControlChannels>,
participant_channels: Arc<Mutex<Option<ParticipantChannels>>>,
participants: Arc<Mutex<HashMap<Pid, ParticipantInfo>>>,
@ -83,6 +83,7 @@ pub struct Scheduler {
impl Scheduler {
pub fn new(
local_pid: Pid,
runtime: Arc<Runtime>,
#[cfg(feature = "metrics")] registry: Option<&Registry>,
) -> (
Self,
@ -128,9 +129,9 @@ impl Scheduler {
(
Self {
local_pid,
runtime,
local_secret,
closed: AtomicBool::new(false),
pool: Arc::new(ThreadPool::new().unwrap()),
run_channels,
participant_channels: Arc::new(Mutex::new(Some(participant_channels))),
participants: Arc::new(Mutex::new(HashMap::new())),
@ -247,7 +248,7 @@ impl Scheduler {
Arc::clone(&self.metrics),
udp_data_receiver,
);
self.pool.spawn_ok(
self.runtime.spawn(
Self::udp_single_channel_connect(Arc::clone(&socket), udp_data_sender)
.instrument(tracing::info_span!("udp", ?addr)),
);
@ -377,27 +378,19 @@ impl Scheduler {
},
};
trace!(?addr, "Listener bound");
let mut incoming = listener.incoming();
let mut end_receiver = s2s_stop_listening_r.fuse();
while let Some(stream) = select! {
next = incoming.next().fuse() => next,
while let Some(data) = select! {
next = listener.accept().fuse() => Some(next),
_ = end_receiver => None,
} {
let stream = match stream {
Ok(s) => s,
let (stream, remote_addr) = match data {
Ok((s, p)) => (s, p),
Err(e) => {
warn!(?e, "TcpStream Error, ignoring connection attempt");
continue;
},
};
let peer_addr = match stream.peer_addr() {
Ok(s) => s,
Err(e) => {
warn!(?e, "TcpStream Error, ignoring connection attempt");
continue;
},
};
info!("Accepting Tcp from: {}", peer_addr);
info!("Accepting Tcp from: {}", remote_addr);
let protocol = TcpProtocol::new(
stream,
#[cfg(feature = "metrics")]
@ -505,13 +498,13 @@ impl Scheduler {
// the UDP listening is done in another place.
let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed);
let participants = Arc::clone(&self.participants);
let runtime = Arc::clone(&self.runtime);
#[cfg(feature = "metrics")]
let metrics = Arc::clone(&self.metrics);
let pool = Arc::clone(&self.pool);
let local_pid = self.local_pid;
let local_secret = self.local_secret;
// this is necessary for UDP to work at all and to remove code duplication
self.pool.spawn_ok(
self.runtime.spawn(
async move {
trace!(?cid, "Open channel and be ready for Handshake");
let handshake = Handshake::new(
@ -545,6 +538,7 @@ impl Scheduler {
) = BParticipant::new(
pid,
sid,
Arc::clone(&runtime),
#[cfg(feature = "metrics")]
Arc::clone(&metrics),
);
@ -552,6 +546,7 @@ impl Scheduler {
let participant = Participant::new(
local_pid,
pid,
Arc::clone(&runtime),
a2b_stream_open_s,
b2a_stream_opened_r,
participant_channels.a2s_disconnect_s,
@ -566,7 +561,7 @@ impl Scheduler {
});
drop(participants);
trace!("dropped participants lock");
pool.spawn_ok(
runtime.spawn(
bparticipant
.run(participant_channels.b2s_prio_statistic_s)
.instrument(tracing::info_span!("participant", ?pid)),

View File

@ -15,6 +15,7 @@ server = { package = "veloren-server", path = "../server", default-features = fa
common = { package = "veloren-common", path = "../common" }
common-net = { package = "veloren-common-net", path = "../common/net" }
tokio = { version = "1.0.1", default-features = false, features = ["rt-multi-thread"] }
ansi-parser = "0.7"
clap = "2.33"
crossterm = "0.18"

View File

@ -129,7 +129,8 @@ fn main() -> io::Result<()> {
let server_port = &server_settings.gameserver_address.port();
let metrics_port = &server_settings.metrics_address.port();
// Create server
let mut server = Server::new(server_settings, editable_settings, &server_data_dir)
let runtime = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap());
let mut server = Server::new(server_settings, editable_settings, &server_data_dir, runtime)
.expect("Failed to create server instance!");
info!(

View File

@ -28,6 +28,7 @@ futures-util = "0.3.7"
futures-executor = "0.3"
futures-timer = "3.0"
futures-channel = "0.3"
tokio = { version = "1.0.1", default-features = false, features = ["rt"] }
itertools = "0.9"
lazy_static = "1.4.0"
scan_fmt = { git = "https://github.com/Imberflur/scan_fmt" }

View File

@ -92,6 +92,7 @@ use std::{
#[cfg(not(feature = "worldgen"))]
use test_world::{IndexOwned, World};
use tracing::{debug, error, info, trace};
use tokio::runtime::Runtime;
use uvth::{ThreadPool, ThreadPoolBuilder};
use vek::*;
@ -120,6 +121,7 @@ pub struct Server {
connection_handler: ConnectionHandler,
runtime: Arc<Runtime>,
thread_pool: ThreadPool,
metrics: ServerMetrics,
@ -136,6 +138,7 @@ impl Server {
settings: Settings,
editable_settings: EditableSettings,
data_dir: &std::path::Path,
runtime: Arc<Runtime>,
) -> Result<Self, Error> {
info!("Server is data dir is: {}", data_dir.display());
if settings.auth_server_address.is_none() {
@ -364,11 +367,10 @@ impl Server {
let thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".to_string())
.build();
let (network, f) = Network::new_with_registry(Pid::new(), &metrics.registry());
let network = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &metrics.registry());
metrics
.run(settings.metrics_address)
.expect("Failed to initialize server metrics submodule.");
thread_pool.execute(f);
block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?;
let connection_handler = ConnectionHandler::new(network);
@ -386,6 +388,7 @@ impl Server {
connection_handler,
runtime,
thread_pool,
metrics,

View File

@ -82,6 +82,8 @@ ron = {version = "0.6", default-features = false}
serde = {version = "1.0", features = [ "rc", "derive" ]}
treeculler = "0.1.0"
uvth = "3.1.1"
tokio = { version = "1.0.1", default-features = false, features = ["rt-multi-thread"] }
num_cpus = "1.0"
# vec_map = { version = "0.8.2" }
inline_tweak = "1.0.2"
itertools = "0.10.0"

View File

@ -71,6 +71,9 @@ impl ClientInit {
let mut last_err = None;
let cores = num_cpus::get();
let runtime = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(if cores > 4 {cores-1} else {cores}).build().unwrap());
const FOUR_MINUTES_RETRIES: u64 = 48;
'tries: for _ in 0..FOUR_MINUTES_RETRIES {
if cancel2.load(Ordering::Relaxed) {
@ -79,7 +82,7 @@ impl ClientInit {
for socket_addr in
first_addrs.clone().into_iter().chain(second_addrs.clone())
{
match Client::new(socket_addr, view_distance) {
match Client::new(socket_addr, view_distance, Arc::clone(&runtime)) {
Ok(mut client) => {
if let Err(e) =
client.register(username, password, |auth_server| {

View File

@ -82,6 +82,8 @@ impl Singleplayer {
let editable_settings = server::EditableSettings::singleplayer(&server_data_dir);
let thread_pool = client.map(|c| c.thread_pool().clone());
let cores = num_cpus::get();
let runtime = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().worker_threads(if cores > 4 {cores-1} else {cores}).build().unwrap());
let settings2 = settings.clone();
let paused = Arc::new(AtomicBool::new(false));
@ -92,7 +94,7 @@ impl Singleplayer {
let thread = thread::spawn(move || {
let mut server = None;
if let Err(e) = result_sender.send(
match Server::new(settings2, editable_settings, &server_data_dir) {
match Server::new(settings2, editable_settings, &server_data_dir, runtime) {
Ok(s) => {
server = Some(s);
Ok(())