share tokio Runtime between Client and Server, name rayon Threadpool

This commit is contained in:
Marcel Märtens 2021-02-19 10:55:43 +01:00
parent 514d5db038
commit 1a7c179bbb
11 changed files with 109 additions and 81 deletions

View File

@ -5,8 +5,7 @@ use common::{
event::{EventBus, LocalEvent, ServerEvent},
metrics::{PhysicsMetrics, SysMetrics},
region::RegionMap,
resources,
resources::{DeltaTime, Time, TimeOfDay},
resources::{DeltaTime, GameMode, Time, TimeOfDay},
span,
terrain::{Block, TerrainChunk, TerrainGrid},
time::DayPeriod,
@ -90,22 +89,34 @@ pub struct State {
impl State {
/// Create a new `State` in client mode.
pub fn client() -> Self { Self::new(resources::GameMode::Client) }
pub fn client() -> Self { Self::new(GameMode::Client) }
/// Create a new `State` in server mode.
pub fn server() -> Self { Self::new(resources::GameMode::Server) }
pub fn server() -> Self { Self::new(GameMode::Server) }
pub fn new(game_mode: resources::GameMode) -> Self {
pub fn new(game_mode: GameMode) -> Self {
let thread_name_infix = match game_mode {
GameMode::Server => "s",
GameMode::Client => "c",
GameMode::Singleplayer => "sp",
};
let thread_pool = Arc::new(
ThreadPoolBuilder::new()
.thread_name(move |i| format!("rayon-{}-{}", thread_name_infix, i))
.build()
.unwrap(),
);
Self {
ecs: Self::setup_ecs_world(game_mode),
thread_pool: Arc::new(ThreadPoolBuilder::new().build().unwrap()),
thread_pool,
}
}
/// Creates ecs world and registers all the common components and resources
// TODO: Split up registering into server and client (e.g. move
// EventBus<ServerEvent> to the server)
fn setup_ecs_world(game_mode: resources::GameMode) -> specs::World {
fn setup_ecs_world(game_mode: GameMode) -> specs::World {
let mut ecs = specs::World::new();
// Uids for sync
ecs.register_sync_marker();

View File

@ -22,7 +22,6 @@ pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) {
let _subscriber = if tracing {
let filter = EnvFilter::from_default_env()
.add_directive("trace".parse().unwrap())
.add_directive("async_std::task::block_on=warn".parse().unwrap())
.add_directive("veloren_network::tests=trace".parse().unwrap())
.add_directive("veloren_network::controller=trace".parse().unwrap())
.add_directive("veloren_network::channel=trace".parse().unwrap())

View File

@ -26,7 +26,7 @@ pub(crate) struct ConnectionHandler {
/// to the Server main thread sometimes though to get the current server_info
/// and time
impl ConnectionHandler {
pub fn new(network: Network, runtime: Arc<Runtime>) -> Self {
pub fn new(network: Network, runtime: &Runtime) -> Self {
let network = Arc::new(network);
let network_clone = Arc::clone(&network);
let (stop_sender, stop_receiver) = oneshot::channel();

View File

@ -376,7 +376,7 @@ impl Server {
.await
});
runtime.block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?;
let connection_handler = ConnectionHandler::new(network, Arc::clone(&runtime));
let connection_handler = ConnectionHandler::new(network, &runtime);
// Initiate real-time world simulation
#[cfg(feature = "worldgen")]

View File

@ -6,7 +6,7 @@ use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError};
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
time::Duration,
@ -50,6 +50,7 @@ impl ClientInit {
username: String,
view_distance: Option<u32>,
password: String,
runtime: Option<Arc<runtime::Runtime>>,
) -> Self {
let (server_address, port, prefer_ipv6) = connection_args;
@ -58,15 +59,21 @@ impl ClientInit {
let cancel = Arc::new(AtomicBool::new(false));
let cancel2 = Arc::clone(&cancel);
let cores = num_cpus::get();
let runtime = Arc::new(
runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(if cores > 4 { cores - 1 } else { cores })
.build()
.unwrap(),
);
let runtime = runtime.unwrap_or_else(|| {
let cores = num_cpus::get();
Arc::new(
runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(if cores > 4 { cores - 1 } else { cores })
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("tokio-voxygen-{}", id)
})
.build()
.unwrap(),
)
});
let runtime2 = Arc::clone(&runtime);
runtime.spawn(async move {
@ -85,9 +92,7 @@ impl ClientInit {
break;
}
for socket_addr in &addresses {
match Client::new(socket_addr.clone(), view_distance, Arc::clone(&runtime2))
.await
{
match Client::new(*socket_addr, view_distance, Arc::clone(&runtime2)).await {
Ok(mut client) => {
if let Err(e) = client
.register(username, password, |auth_server| {
@ -140,23 +145,26 @@ impl ClientInit {
}
}
/// Parse ip address or resolves hostname.
/// Note: if you use an ipv6 address, the number after the last colon will
/// be used as the port unless you use [] around the address.
async fn resolve(
server_address: String,
port: u16,
prefer_ipv6: bool,
) -> Result<Vec<SocketAddr>, std::io::Error> {
//1. try if server_address already contains a port
// 1. try if server_address already contains a port
if let Ok(addr) = server_address.parse::<SocketAddr>() {
warn!("please don't add port directly to server_address");
return Ok(vec![addr]);
}
//2, try server_address and port
// 2, try server_address and port
if let Ok(addr) = format!("{}:{}", server_address, port).parse::<SocketAddr>() {
return Ok(vec![addr]);
}
//3. do DNS call
// 3. do DNS call
let (mut first_addrs, mut second_addrs) = match lookup_host(server_address).await {
Ok(s) => s.partition::<Vec<_>, _>(|a| a.is_ipv6() == prefer_ipv6),
Err(e) => {

View File

@ -13,6 +13,8 @@ use crate::{
};
use client_init::{ClientInit, Error as InitError, Msg as InitMsg};
use common::{assets::AssetExt, comp, span};
use std::sync::Arc;
use tokio::runtime;
use tracing::error;
use ui::{Event as MainMenuEvent, MainMenuUi};
@ -63,14 +65,8 @@ impl PlayState for MainMenuState {
#[cfg(feature = "singleplayer")]
{
if let Some(singleplayer) = &global_state.singleplayer {
if let Ok(result) = singleplayer.receiver.try_recv() {
if let Err(error) = result {
tracing::error!(?error, "Could not start server");
global_state.singleplayer = None;
self.client_init = None;
self.main_menu_ui.cancel_connection();
self.main_menu_ui.show_info(format!("Error: {:?}", error));
} else {
match singleplayer.receiver.try_recv() {
Ok(Ok(runtime)) => {
let server_settings = singleplayer.settings();
// Attempt login after the server is finished initializing
attempt_login(
@ -81,8 +77,17 @@ impl PlayState for MainMenuState {
server_settings.gameserver_address.ip().to_string(),
server_settings.gameserver_address.port(),
&mut self.client_init,
Some(runtime),
);
}
},
Ok(Err(e)) => {
error!(?e, "Could not start server");
global_state.singleplayer = None;
self.client_init = None;
self.main_menu_ui.cancel_connection();
self.main_menu_ui.show_info(format!("Error: {:?}", e));
},
Err(_) => (),
}
}
}
@ -245,6 +250,7 @@ impl PlayState for MainMenuState {
server_address,
DEFAULT_PORT,
&mut self.client_init,
None,
);
},
MainMenuEvent::CancelLoginAttempt => {
@ -270,7 +276,7 @@ impl PlayState for MainMenuState {
},
#[cfg(feature = "singleplayer")]
MainMenuEvent::StartSingleplayer => {
let singleplayer = Singleplayer::new(None); // TODO: Make client and server use the same thread pool
let singleplayer = Singleplayer::new();
global_state.singleplayer = Some(singleplayer);
},
@ -318,6 +324,7 @@ fn attempt_login(
server_address: String,
server_port: u16,
client_init: &mut Option<ClientInit>,
runtime: Option<Arc<runtime::Runtime>>,
) {
if comp::Player::alias_is_valid(&username) {
// Don't try to connect if there is already a connection in progress.
@ -327,6 +334,7 @@ fn attempt_login(
username,
Some(settings.graphics.view_distance),
password,
runtime,
));
}
} else {

View File

@ -312,7 +312,7 @@ where
tick: u64,
camera_mode: CameraMode,
character_state: Option<&CharacterState>,
runtime: &Arc<Runtime>,
runtime: &Runtime,
) -> (FigureModelEntryLod<'c>, &'c Skel::Attr)
where
for<'a> &'a Skel::Body: Into<Skel::Attr>,

View File

@ -36,7 +36,6 @@ use common_sys::state::State;
use comp::item::Reagent;
use num::traits::{Float, FloatConst};
use specs::{Entity as EcsEntity, Join, WorldExt};
use std::sync::Arc;
use tokio::runtime::Runtime;
use vek::*;
@ -116,7 +115,7 @@ pub struct SceneData<'a> {
pub loaded_distance: f32,
pub view_distance: u32,
pub tick: u64,
pub runtime: &'a Arc<Runtime>,
pub runtime: &'a Runtime,
pub gamma: f32,
pub exposure: f32,
pub ambiance: f32,

View File

@ -29,7 +29,6 @@ use common::{
terrain::BlockKind,
vol::{BaseVol, ReadVol},
};
use std::sync::Arc;
use tokio::runtime::Runtime;
use tracing::error;
use vek::*;
@ -98,7 +97,7 @@ pub struct SceneData<'a> {
pub time: f64,
pub delta_time: f32,
pub tick: u64,
pub runtime: &'a Arc<Runtime>,
pub runtime: &'a Runtime,
pub body: Option<humanoid::Body>,
pub gamma: f32,
pub exposure: f32,

View File

@ -638,7 +638,11 @@ impl<V: RectRasterableVol> Terrain<V> {
// Limit ourselves to u16::MAX even if larger textures are supported.
let max_texture_size = renderer.max_texture_size();
let cores = num_cpus::get();
let meshing_cores = match num_cpus::get() as u64 {
n if n < 4 => 1,
n if n < 8 => n - 3,
n => n - 4,
};
span!(guard, "Queue meshing from todo list");
for (todo, chunk) in self
@ -655,7 +659,7 @@ impl<V: RectRasterableVol> Terrain<V> {
.cloned()?))
})
{
if self.mesh_todos_active.load(Ordering::Relaxed) > (cores as u64 - 1).max(1) {
if self.mesh_todos_active.load(Ordering::Relaxed) > meshing_cores {
break;
}

View File

@ -1,29 +1,25 @@
use client::Client;
use common::clock::Clock;
use crossbeam::channel::{bounded, unbounded, Receiver, Sender, TryRecvError};
use server::{Error as ServerError, Event, Input, Server};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
thread::{self, JoinHandle},
time::Duration,
};
use tracing::{error, info, warn, trace, debug};
use tokio::runtime::Runtime;
use tracing::{debug, error, info, trace, warn};
const TPS: u64 = 30;
enum Msg {
Stop,
}
/// Used to start and stop the background thread running the server
/// when in singleplayer mode.
pub struct Singleplayer {
_server_thread: JoinHandle<()>,
sender: Sender<Msg>,
pub receiver: Receiver<Result<(), ServerError>>,
stop_server_s: Sender<()>,
pub receiver: Receiver<Result<Arc<Runtime>, ServerError>>,
// Wether the server is stopped or not
paused: Arc<AtomicBool>,
// Settings that the server was started with
@ -31,8 +27,9 @@ pub struct Singleplayer {
}
impl Singleplayer {
pub fn new(client: Option<&Client>) -> Self {
let (sender, receiver) = unbounded();
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let (stop_server_s, stop_server_r) = unbounded();
// Determine folder to save server data in
let server_data_dir = {
@ -81,19 +78,21 @@ impl Singleplayer {
let settings = server::Settings::singleplayer(&server_data_dir);
let editable_settings = server::EditableSettings::singleplayer(&server_data_dir);
let runtime = if let Some(c) = client {
Arc::clone(&c.runtime())
} else {
let cores = num_cpus::get();
debug!("creating a new runtime for server");
Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(if cores > 4 { cores - 1 } else { cores })
.build()
.unwrap(),
)
};
let cores = num_cpus::get();
debug!("Creating a new runtime for server");
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(if cores > 4 { cores - 1 } else { cores })
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("tokio-sp-{}", id)
})
.build()
.unwrap(),
);
let settings2 = settings.clone();
let paused = Arc::new(AtomicBool::new(false));
@ -107,10 +106,15 @@ impl Singleplayer {
trace!("starting singleplayer server thread");
let mut server = None;
if let Err(e) = result_sender.send(
match Server::new(settings2, editable_settings, &server_data_dir, runtime) {
match Server::new(
settings2,
editable_settings,
&server_data_dir,
Arc::clone(&runtime),
) {
Ok(s) => {
server = Some(s);
Ok(())
Ok(runtime)
},
Err(e) => Err(e),
},
@ -128,14 +132,14 @@ impl Singleplayer {
None => return,
};
run_server(server, receiver, paused1);
run_server(server, stop_server_r, paused1);
trace!("ending singleplayer server thread");
})
.unwrap();
Singleplayer {
_server_thread: thread,
sender,
stop_server_s,
receiver: result_receiver,
paused,
settings,
@ -156,11 +160,11 @@ impl Singleplayer {
impl Drop for Singleplayer {
fn drop(&mut self) {
// Ignore the result
let _ = self.sender.send(Msg::Stop);
let _ = self.stop_server_s.send(());
}
}
fn run_server(mut server: Server, rec: Receiver<Msg>, paused: Arc<AtomicBool>) {
fn run_server(mut server: Server, stop_server_r: Receiver<()>, paused: Arc<AtomicBool>) {
info!("Starting server-cli...");
// Set up an fps clock
@ -168,14 +172,10 @@ fn run_server(mut server: Server, rec: Receiver<Msg>, paused: Arc<AtomicBool>) {
loop {
// Check any event such as stopping and pausing
match rec.try_recv() {
Ok(msg) => match msg {
Msg::Stop => break,
},
Err(err) => match err {
TryRecvError::Empty => (),
TryRecvError::Disconnected => break,
},
match stop_server_r.try_recv() {
Ok(()) => break,
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => (),
}
// Wait for the next tick.