[WIP] improving thread pool usage and bot client.

This commit is contained in:
Joshua Yanovski 2022-09-12 20:26:22 -07:00
parent 3b424e9049
commit 60060a9913
18 changed files with 591 additions and 521 deletions

9
Cargo.lock generated
View File

@ -1639,6 +1639,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ea835d29036a4087793836fa931b08837ad5e957da9e23886b29586fb9b6650" checksum = "9ea835d29036a4087793836fa931b08837ad5e957da9e23886b29586fb9b6650"
[[package]]
name = "drop_guard"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c4a817d8b683f6e649aed359aab0c47a875377516bb5791d0f7e46d9066d209"
[[package]] [[package]]
name = "egui" name = "egui"
version = "0.12.0" version = "0.12.0"
@ -6561,6 +6567,7 @@ dependencies = [
"hashbrown 0.12.0", "hashbrown 0.12.0",
"image", "image",
"num 0.4.0", "num 0.4.0",
"num_cpus",
"quinn", "quinn",
"rayon", "rayon",
"ron 0.7.0", "ron 0.7.0",
@ -6718,6 +6725,7 @@ dependencies = [
"specs", "specs",
"tar", "tar",
"thread-priority", "thread-priority",
"tokio",
"toml", "toml",
"tracing", "tracing",
"vek 0.15.8", "vek 0.15.8",
@ -6836,6 +6844,7 @@ dependencies = [
"chrono", "chrono",
"chrono-tz", "chrono-tz",
"crossbeam-channel", "crossbeam-channel",
"drop_guard",
"enumset", "enumset",
"futures-util", "futures-util",
"hashbrown 0.12.0", "hashbrown 0.12.0",

View File

@ -24,6 +24,7 @@ network = { package = "veloren-network", path = "../network", features = ["compr
bincode = "1.3.2" bincode = "1.3.2"
byteorder = "1.3.2" byteorder = "1.3.2"
crossbeam-channel = "0.5" crossbeam-channel = "0.5"
num_cpus = "1.0"
tokio = { version = "1.14", default-features = false, features = ["rt-multi-thread"] } tokio = { version = "1.14", default-features = false, features = ["rt-multi-thread"] }
quinn = "0.8" quinn = "0.8"
image = { version = "0.24", default-features = false, features = ["png"] } image = { version = "0.24", default-features = false, features = ["png"] }

View File

@ -50,11 +50,15 @@ fn main() {
}; };
// Create a client. // Create a client.
let mut client = runtime let pools = common_state::State::pools(
common::resources::GameMode::Client,
tokio::runtime::Builder::new_multi_thread(),
);
let mut client = pools.runtime
.block_on(Client::new( .block_on(Client::new(
addr, addr,
runtime2,
&mut None, &mut None,
pools.clone(),
&username, &username,
&password, &password,
|provider| provider == "https://auth.veloren.net", |provider| provider == "https://auth.veloren.net",

View File

@ -46,29 +46,28 @@ pub fn main() {
pub struct BotClient { pub struct BotClient {
settings: Settings, settings: Settings,
runtime: Arc<Runtime>, pools: common_state::Pools,
server_info: ServerInfo, server_info: ServerInfo,
bot_clients: HashMap<String, Client>, bot_clients: HashMap<String, Client>,
clock: Clock, clock: Clock,
} }
pub fn make_client( pub fn make_client(
runtime: &Arc<Runtime>, pools: &common_state::Pools,
server: &str, server: &str,
server_info: &mut Option<ServerInfo>, server_info: &mut Option<ServerInfo>,
username: &str, username: &str,
password: &str, password: &str,
) -> Option<Client> { ) -> Option<Client> {
let runtime_clone = Arc::clone(runtime);
let addr = ConnectionArgs::Tcp { let addr = ConnectionArgs::Tcp {
prefer_ipv6: false, prefer_ipv6: false,
hostname: server.to_owned(), hostname: server.to_owned(),
}; };
runtime pools.runtime
.block_on(Client::new( .block_on(Client::new(
addr, addr,
runtime_clone,
server_info, server_info,
pools.clone(),
username, username,
password, password,
|_| true, |_| true,
@ -78,15 +77,18 @@ pub fn make_client(
impl BotClient { impl BotClient {
pub fn new(settings: Settings) -> BotClient { pub fn new(settings: Settings) -> BotClient {
let runtime = Arc::new(Runtime::new().unwrap()); let pools = common_state::State::pools(
common::resources::GameMode::Client,
tokio::runtime::Builder::new_multi_thread(),
);
let mut server_info = None; let mut server_info = None;
// Don't care if we connect, just trying to grab the server info. // Don't care if we connect, just trying to grab the server info.
let _ = make_client(&runtime, &settings.server, &mut server_info, "", ""); let _ = make_client(&pools, &settings.server, &mut server_info, "", "");
let server_info = server_info.expect("Failed to connect to server."); let server_info = server_info.expect("Failed to connect to server.");
let clock = Clock::new(Duration::from_secs_f64(1.0 / 60.0)); let clock = Clock::new(Duration::from_secs_f64(1.0 / 60.0));
BotClient { BotClient {
settings, settings,
runtime, pools,
server_info, server_info,
bot_clients: HashMap::new(), bot_clients: HashMap::new(),
clock, clock,
@ -142,7 +144,7 @@ impl BotClient {
{ {
continue; continue;
} }
match self.runtime.block_on(authc.register(username, password)) { match self.pools.runtime.block_on(authc.register(username, password)) {
Ok(()) => { Ok(()) => {
self.settings.bot_logins.push(BotCreds { self.settings.bot_logins.push(BotCreds {
username: username.to_string(), username: username.to_string(),
@ -171,15 +173,13 @@ impl BotClient {
.cloned() .cloned()
.collect(); .collect();
for cred in creds.iter() { for cred in creds.iter() {
let runtime = Arc::clone(&self.runtime);
let server = &self.settings.server; let server = &self.settings.server;
// TODO: log the clients in in parallel instead of in series // TODO: log the clients in in parallel instead of in series
let client = self let client = self
.bot_clients .bot_clients
.entry(cred.username.clone()) .entry(cred.username.clone())
.or_insert_with(|| { .or_insert_with(|| {
make_client(&runtime, server, &mut None, &cred.username, &cred.password) make_client(&self.pools, server, &mut None, &cred.username, &cred.password)
.expect("Failed to connect to server") .expect("Failed to connect to server")
}); });

View File

@ -9,7 +9,7 @@ use std::{
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
use structopt::StructOpt; use structopt::StructOpt;
use tokio::runtime::Runtime; use tokio::runtime::{self, Runtime};
use vek::*; use vek::*;
use common_state::State; use common_state::State;
use veloren_client::{addr::ConnectionArgs, Client}; use veloren_client::{addr::ConnectionArgs, Client};
@ -48,9 +48,12 @@ fn main() {
let to_adminify = usernames.clone(); let to_adminify = usernames.clone();
let finished_init = Arc::new(AtomicU32::new(0)); let finished_init = Arc::new(AtomicU32::new(0));
let runtime = Arc::new(Runtime::new().unwrap()); let mut builder = runtime::Builder::new_multi_thread();
let mut pools = common_state::State::pools(common::resources::GameMode::Client); builder
pools.0 = 0; .max_blocking_threads(opt.size as usize + 1)
.thread_name("swarm");
let mut pools = common_state::State::pools(common::resources::GameMode::Client, builder);
pools.slowjob_threads = 0;
// TODO: calculate and log the required chunks per second to maintain the // TODO: calculate and log the required chunks per second to maintain the
// selected scenario with full vd loaded // selected scenario with full vd loaded
@ -59,7 +62,6 @@ fn main() {
admin_username, admin_username,
0, 0,
to_adminify, to_adminify,
&runtime,
&pools, &pools,
opt, opt,
&finished_init, &finished_init,
@ -70,7 +72,6 @@ fn main() {
name, name,
index as u32, index as u32,
Vec::new(), Vec::new(),
&runtime,
&pools, &pools,
opt, opt,
&finished_init, &finished_init,
@ -84,16 +85,15 @@ fn run_client_new_thread(
username: String, username: String,
index: u32, index: u32,
to_adminify: Vec<String>, to_adminify: Vec<String>,
runtime: &Arc<Runtime>,
pools: &common_state::Pools, pools: &common_state::Pools,
opt: Opt, opt: Opt,
finished_init: &Arc<AtomicU32>, finished_init: &Arc<AtomicU32>,
) { ) {
let runtime = Arc::clone(runtime); let runtime = Arc::clone(&pools.runtime);
let pools = pools.clone(); let pools = pools.clone();
let finished_init = Arc::clone(finished_init); let finished_init = Arc::clone(finished_init);
thread::spawn(move || { thread::spawn(move || {
if let Err(err) = run_client(username, index, to_adminify, pools, runtime, opt, finished_init) { if let Err(err) = run_client(username, index, to_adminify, pools, opt, finished_init) {
tracing::error!("swarm member {} exited with an error: {:?}", index, err); tracing::error!("swarm member {} exited with an error: {:?}", index, err);
} }
}); });
@ -104,47 +104,41 @@ fn run_client(
index: u32, index: u32,
to_adminify: Vec<String>, to_adminify: Vec<String>,
pools: common_state::Pools, pools: common_state::Pools,
runtime: Arc<Runtime>,
opt: Opt, opt: Opt,
finished_init: Arc<AtomicU32>, finished_init: Arc<AtomicU32>,
) -> Result<(), veloren_client::Error> { ) -> Result<(), veloren_client::Error> {
let mut client = loop {
// Connect to localhost
let addr = ConnectionArgs::Tcp {
prefer_ipv6: false,
hostname: "localhost".into(),
};
let runtime_clone = Arc::clone(&runtime);
// NOTE: use a no-auth server
match runtime
.block_on(Client::new(
addr,
runtime_clone,
&mut None,
pools.clone(),
&username,
"",
|_| false,
)) {
Err(e) => tracing::warn!(?e, "Client {} disconnected", index),
Ok(client) => break client,
}
};
drop(pools);
let mut clock = common::clock::Clock::new(Duration::from_secs_f32(1.0 / 30.0)); let mut clock = common::clock::Clock::new(Duration::from_secs_f32(1.0 / 30.0));
let mut tick = |client: &mut Client| -> Result<(), veloren_client::Error> { let mut tick = |client: &mut Client| -> Result<(), veloren_client::Error> {
clock.tick(); clock.tick();
client.tick_network(clock.dt())?; client.tick_network(clock.dt())?;
client.cleanup();
Ok(()) Ok(())
}; };
let mut run = || -> Result<_, veloren_client::Error> {
// Connect to localhost
let addr = ConnectionArgs::Tcp {
prefer_ipv6: false,
hostname: "localhost".into(),
};
// NOTE: use a no-auth server
let mut client = pools.runtime.block_on(Client::new(
addr,
&mut None,
pools.clone(),
&username,
"",
|_| false,
))?;
tracing::info!("Client {} connected", index);
// Wait for character list to load // Wait for character list to load
client.load_character_list(); client.load_character_list();
while client.character_list().loading { while client.character_list().loading {
tick(&mut client)?; tick(&mut client)?;
} }
tracing::info!("Client {} loaded character list", index);
// Create character if none exist // Create character if none exist
if client.character_list().characters.is_empty() { if client.character_list().characters.is_empty() {
@ -161,6 +155,9 @@ fn run_client(
tick(&mut client)?; tick(&mut client)?;
} }
} }
tracing::info!("Client {} found or created character", index);
client.set_view_distance(opt.vd);
// Select the first character // Select the first character
client.request_character( client.request_character(
@ -174,8 +171,6 @@ fn run_client(
.expect("Why is this an option?"), .expect("Why is this an option?"),
); );
client.set_view_distance(opt.vd);
// If this is the admin client then adminify the other swarm members // If this is the admin client then adminify the other swarm members
if !to_adminify.is_empty() { if !to_adminify.is_empty() {
// Wait for other clients to connect // Wait for other clients to connect
@ -206,10 +201,26 @@ fn run_client(
while !client.is_moderator() { while !client.is_moderator() {
tick(&mut client)?; tick(&mut client)?;
} }
client.clear_terrain();
client.request_player_physics(false);
Ok(client)
};
let mut client = loop {
match run() {
Err(e) => tracing::warn!(?e, "Client {} disconnected", index),
Ok(client) => {
thread::sleep(Duration::from_secs(1));
break client
},
}
};
drop(pools);
finished_init.fetch_add(1, Ordering::Relaxed); finished_init.fetch_add(1, Ordering::Relaxed);
// Wait for initialization of all other swarm clients to finish // Wait for initialization of all other swarm clients to finish
while !finished_init.load(Ordering::Relaxed) == opt.size { while finished_init.load(Ordering::Relaxed) != opt.size {
tick(&mut client)?; tick(&mut client)?;
} }

View File

@ -281,12 +281,11 @@ pub struct CharacterList {
} }
/// Higher than what's needed at VD = 65. /// Higher than what's needed at VD = 65.
const TOTAL_PENDING_CHUNKS_LIMIT: usize = /*1024*/13800; const TOTAL_PENDING_CHUNKS_LIMIT: usize = /*1024*//*13800*/12;
impl Client { impl Client {
pub async fn new( pub async fn new(
addr: ConnectionArgs, addr: ConnectionArgs,
runtime: Arc<Runtime>,
// TODO: refactor to avoid needing to use this out parameter // TODO: refactor to avoid needing to use this out parameter
mismatched_server_info: &mut Option<ServerInfo>, mismatched_server_info: &mut Option<ServerInfo>,
pools: common_state::Pools, pools: common_state::Pools,
@ -294,7 +293,8 @@ impl Client {
password: &str, password: &str,
auth_trusted: impl FnMut(&str) -> bool, auth_trusted: impl FnMut(&str) -> bool,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let network = Network::new(Pid::new(), &runtime); let network = Network::new(Pid::new(), &pools.runtime);
let runtime = Arc::clone(&pools.runtime);
let mut participant = match addr { let mut participant = match addr {
ConnectionArgs::Tcp { ConnectionArgs::Tcp {
@ -374,27 +374,12 @@ impl Client {
let msg = bincode::deserialize(&msg)?; let msg = bincode::deserialize(&msg)?;
Self::handle_server_terrain_msg(msg) Self::handle_server_terrain_msg(msg)
}; };
terrain_tx_.send(handle_msg()); if terrain_tx_.send(handle_msg()).is_err() {
break;
}
} }
}); });
let (
state,
lod_base,
lod_alt,
lod_horizon,
world_map,
sites,
pois,
recipe_book,
component_recipe_book,
max_group_size,
client_timeout,
) = loop {
tokio::select! {
// Spawn in a blocking thread (leaving the network thread free). This is mostly
// useful for bots.
res = register_stream.recv() => {
let ServerInit::GameSync { let ServerInit::GameSync {
entity_package, entity_package,
time_of_day, time_of_day,
@ -405,9 +390,18 @@ impl Client {
component_recipe_book, component_recipe_book,
material_stats, material_stats,
ability_map, ability_map,
} = res?; } = loop {
tokio::select! {
// Spawn in a blocking thread (leaving the network thread free). This is mostly
// useful for bots.
res = register_stream.recv() => break res?,
_ = ping_interval.tick() => ping_stream.send(PingMsg::Ping)?,
}
};
break tokio::task::spawn_blocking(move || { // Spawn in a blocking thread (leaving the network thread free). This is mostly
// useful for bots.
let mut task = tokio::task::spawn_blocking(move || {
let map_size_lg = common::terrain::MapSizeLg::new(world_map.dimensions_lg) let map_size_lg = common::terrain::MapSizeLg::new(world_map.dimensions_lg)
.map_err(|_| { .map_err(|_| {
Error::Other(format!( Error::Other(format!(
@ -696,8 +690,23 @@ impl Client {
max_group_size, max_group_size,
client_timeout, client_timeout,
)) ))
}).await.expect("Client thread should not panic")?; });
},
let (
state,
lod_base,
lod_alt,
lod_horizon,
world_map,
sites,
pois,
recipe_book,
component_recipe_book,
max_group_size,
client_timeout,
) = loop {
tokio::select! {
res = &mut task => break res.expect("Client thread should not panic")?,
_ = ping_interval.tick() => ping_stream.send(PingMsg::Ping)?, _ = ping_interval.tick() => ping_stream.send(PingMsg::Ping)?,
} }
}; };
@ -1900,7 +1909,7 @@ impl Client {
+ TerrainChunkSize::RECT_SIZE.map(|x| x as f32) / 2.0) + TerrainChunkSize::RECT_SIZE.map(|x| x as f32) / 2.0)
.distance_squared(pos.0.into()); .distance_squared(pos.0.into());
let mut terrain = self.state.terrain(); let terrain = self.state.terrain();
if let Some(chunk) = terrain.get_key_arc(*key) { if let Some(chunk) = terrain.get_key_arc(*key) {
if !skip_mode && !terrain.contains_key_real(*key) { if !skip_mode && !terrain.contains_key_real(*key) {
let chunk = Arc::clone(chunk); let chunk = Arc::clone(chunk);
@ -1910,10 +1919,10 @@ impl Client {
} else { } else {
drop(terrain); drop(terrain);
if !skip_mode && !self.pending_chunks.contains_key(key) { if !skip_mode && !self.pending_chunks.contains_key(key) {
const CURRENT_TICK_PENDING_CHUNKS_LIMIT: usize = 8 * 4; const CURRENT_TICK_PENDING_CHUNKS_LIMIT: usize = /*8 * 4*/2;
if self.pending_chunks.len() < TOTAL_PENDING_CHUNKS_LIMIT if self.pending_chunks.len() < TOTAL_PENDING_CHUNKS_LIMIT
&& /* current_tick_send_chunk_requests && current_tick_send_chunk_requests
< CURRENT_TICK_PENDING_CHUNKS_LIMIT */true < CURRENT_TICK_PENDING_CHUNKS_LIMIT
{ {
self.send_msg_err(ClientGeneral::TerrainChunkRequest { self.send_msg_err(ClientGeneral::TerrainChunkRequest {
key: *key, key: *key,
@ -2477,7 +2486,22 @@ impl Client {
{ {
terrain_cnt += msg.size(); terrain_cnt += msg.size();
} }
self.terrain_tx.send(msg); match self.terrain_tx.try_send(msg) {
Ok(()) => {},
Err(mpsc::TrySendError::Full(msg)) => {
// The message thread is fully backed up, which means we can blocking
// process at least the size of the channel in chunk receives. For now, to
// avoid blocking for too long, we just ask for a single chunk. Note that
// since the thread is processed serially, try_recv would actually be
// guaranteed to succeed here, but we avoid relying on that assumption.
let msg_ = self.terrain_rx.recv()
.expect("Deserialization thread shouold not panic")?;
self.handle_terrain_msg(msg_);
self.terrain_tx.send(msg).expect("Deserialization thread should not panic");
},
Err(mpsc::TrySendError::Disconnected(_)) =>
unreachable!("Deserialization thread should not panic."),
}
} }
/* if !terrain_messages.is_empty() { /* if !terrain_messages.is_empty() {
cnt += terrain_messages.len() as u64; cnt += terrain_messages.len() as u64;
@ -2537,8 +2561,9 @@ impl Client {
return Err(Error::ServerTimeout); return Err(Error::ServerTimeout);
} }
// ignore network events
while let Some(Ok(Some(event))) = self.participant.as_mut().map(|p| p.try_fetch_event()) { while let Some(res) = self.participant.as_mut().and_then(|p| p.try_fetch_event().transpose()) {
let event = res?;
trace!(?event, "received network event"); trace!(?event, "received network event");
} }
@ -2868,6 +2893,12 @@ impl Client {
// Handle new messages from the server. // Handle new messages from the server.
self.handle_new_messages()?; self.handle_new_messages()?;
// TODO: avoid emitting these in the first place
self.state
.ecs()
.fetch::<EventBus<common::event::ServerEvent>>()
.recv_all();
// 5) Terrain // 5) Terrain
self.tick_terrain()?; self.tick_terrain()?;
let empty = Arc::new(TerrainChunk::new( let empty = Arc::new(TerrainChunk::new(

View File

@ -20,6 +20,7 @@ core_affinity = "0.5"
rayon = "1.5" rayon = "1.5"
num_cpus = "1.0" num_cpus = "1.0"
thread-priority = { version = "0.9.2" } thread-priority = { version = "0.9.2" }
tokio = { version = "1.14", default-features = false, features = ["rt"] }
tracing = { version = "0.1", default-features = false } tracing = { version = "0.1", default-features = false }
vek = { version = "0.15.8", features = ["serde"] } vek = { version = "0.15.8", features = ["serde"] }

View File

@ -36,7 +36,7 @@ use specs::{
Component, DispatcherBuilder, Entity as EcsEntity, WorldExt, Component, DispatcherBuilder, Entity as EcsEntity, WorldExt,
}; };
use thread_priority::{ThreadBuilder, ThreadPriority}; use thread_priority::{ThreadBuilder, ThreadPriority};
use std::sync::Arc; use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
use vek::*; use vek::*;
/// How much faster should an in-game day be compared to a real day? /// How much faster should an in-game day be compared to a real day?
@ -95,36 +95,50 @@ pub struct State {
thread_pool: Arc<ThreadPool>, thread_pool: Arc<ThreadPool>,
} }
pub type Pools = (usize, GameMode/*u64*/, Arc<ThreadPool>/*, slowjob::SlowJobPool*/); #[derive(Clone,Debug)]
pub struct Pools {
pub slowjob_threads: usize,
game_mode: GameMode,
pub rayon_pool: Arc<ThreadPool>,
pub runtime: Arc<tokio::runtime::Runtime>,
/* slowjob: slowjob::SlowJobPool,*/
}
impl State { impl State {
pub fn pools(game_mode: GameMode) -> Pools { pub fn pools(game_mode: GameMode, mut tokio_builder: tokio::runtime::Builder) -> Pools {
let num_cpu = num_cpus::get()/* - 1*/; let num_cpu = num_cpus::get()/* - 1*/;
let (thread_name_infix, rayon_offset) = match game_mode { let (thread_name_infix, rayon_offset, tokio_count) = match game_mode {
// The server does work on both the main thread and the rayon pool, but not at the same // The server does work on both the main thread and the rayon pool, but not at the same
// time, so there's no reason to hold back a core from rayon. It does effectively no // time, so there's no reason to hold back a core from rayon. It does effectively no
// other non-slowjob, non-rayon work that blocks the main thread. // other non-slowjob, non-rayon work that blocks the main thread, but we do want to
GameMode::Server => ("s", 0), // leave num_cpu / 4 cores available for tokio, since otherwise TCP buffers can build
// up to unreasonable degrees.
GameMode::Server => ("s", 0, num_cpu / 4),
// The client does work on both the main thread and the rayon pool, but not at the same // The client does work on both the main thread and the rayon pool, but not at the same
// time. It does run some other non-slowjob and non-rayon threads, but none of them // time. It does run some other non-slowjob and non-rayon threads, but none of them
// block work on the main thread. // block work on the main thread. It does not do enough networking for it to be worth
GameMode::Client => ("c", 0), // dedicating anything to the tokio pool.
GameMode::Client => ("c", 0, 0),
// Singleplayer does work on both the main thread and the rayon pool; unlike the server // Singleplayer does work on both the main thread and the rayon pool; unlike the server
// and client cases, the rayon work may interfere with work on one of the main threads, // and client cases, the rayon work may interfere with work on one of the main threads,
// since the server and client don't coordinate their rayon work. Therefore, we // since the server and client don't coordinate their rayon work. Therefore, we
// reserve a core for the main thread(s) from both slowjob and rayon tasks. Since many // reserve a core for the main thread(s) from both slowjob and rayon tasks. Since many
// CPUs can't afford to lose an entire core during the common periods when the server // CPUs can't afford to lose an entire core during the common periods when the server
// and client are not interfering with each other, we still spawn an extra rayon thread // and client are not interfering with each other, we still spawn an extra rayon thread
// in this case, but leave it floating so the OS can schedule it. // in this case, but leave it floating so the OS can schedule it. Of course,
GameMode::Singleplayer => ("sp", /*2*/1), // singleplayer need not devote any resources to the tokio pool, as it's not expected
// to be in use.
GameMode::Singleplayer => ("sp", /*2*/1, 0),
}; };
let rayon_threads = /*match game_mode { let rayon_threads = /*match game_mode {
GameMode::Server | GameMode::Client => num_cpu / 2, GameMode::Server | GameMode::Client => num_cpu / 2,
GameMode::Singleplayer => num_cpu / 4, GameMode::Singleplayer => num_cpu / 4,
}*/num_cpu.max(common::consts::MIN_RECOMMENDED_RAYON_THREADS); }*/num_cpu/*.saturating_sub(tokio_count)*/.max(common::consts::MIN_RECOMMENDED_RAYON_THREADS);
let core_ids = /*(rayon_threads >= 16).then(|| */core_affinity::get_core_ids().unwrap_or(vec![])/*).unwrap_or(vec![])*/; let core_ids = /*(rayon_threads >= 16).then(|| */core_affinity::get_core_ids().unwrap_or(vec![])/*).unwrap_or(vec![])*/;
// Don't pin rayon threads to the cores expected to be used by tokio threads.
/* core_ids.truncate(core_ids.len() - tokio_count); */
let mut core_ids_ = core_ids.clone();
let core_count = core_ids.len(); let core_count = core_ids.len();
let rayon_pool = Arc::new( let rayon_pool = Arc::new(
ThreadPoolBuilder::new() ThreadPoolBuilder::new()
@ -141,14 +155,14 @@ impl State {
} }
// pinned rayon threads run with high priority // pinned rayon threads run with high priority
let index = thread.index(); let index = thread.index();
if index.checked_sub(rayon_offset).map_or(false, |i| i < core_count) { if index.checked_sub(rayon_offset + tokio_count).map_or(false, |i| i < core_count) {
b = b.priority(ThreadPriority::Max); b = b.priority(ThreadPriority::Max);
} }
b.spawn_careless(|| thread.run())?; b.spawn_careless(|| thread.run())?;
Ok(()) Ok(())
}) })
.start_handler(move |i| { .start_handler(move |i| {
if let Some(&core_id) = i.checked_sub(rayon_offset).and_then(|i| core_ids.get(i)) { if let Some(&core_id) = i.checked_sub(rayon_offset + tokio_count).and_then(|i| core_ids_.get(i)) {
core_affinity::set_for_current(core_id); core_affinity::set_for_current(core_id);
} }
}) })
@ -173,9 +187,49 @@ impl State {
/*Arc::clone(*/slow_pool/*)*/, /*Arc::clone(*/slow_pool/*)*/,
); */ ); */
// Setup tokio runtime
use tokio::runtime::Builder;
// TODO: evaluate std::thread::available_concurrency as a num_cpus replacement
let cores = num_cpus::get();
// We don't need that many threads in the async pool, at least 2 but generally
// 25% of all available will do
let mut core_ids_ = core_ids.clone();
let runtime = Arc::new(
tokio_builder
.enable_all()
.worker_threads(tokio_count.max(common::consts::MIN_RECOMMENDED_TOKIO_THREADS))
.thread_name_fn(move || {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("tokio-{}-{}", thread_name_infix, id)
})
.on_thread_start(move || {
if tokio_count > 0 {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let index = ATOMIC_ID.fetch_add(1, Ordering::SeqCst) % tokio_count;
if let Some(&core_id) = index.checked_add(num_cpu - tokio_count - 1).and_then(|i| core_ids_.get(i)) {
core_affinity::set_for_current(core_id);
}
}
})
.build()
.unwrap(),
);
// We always reserve at least one non-slowjob core, if possible, to make sure systems get a // We always reserve at least one non-slowjob core, if possible, to make sure systems get a
// chance to run unobstructed. // chance to run unobstructed. We also leave any dedicated tokio threads their own
(num_cpu - 1/*slow_limit*//* as u64*/, game_mode, rayon_pool/*, slowjob*/) // reserved cores, since networking is quite latency critical on a server.
//
// Note that for all n > 1, n - n / 4 - 1 > 0 (using integer arithmetic), which is fine
// since we require at least 2 hardware threads.
Pools {
slowjob_threads: num_cpu - tokio_count - 1/*slow_limit*//* as u64*/,
game_mode,
rayon_pool,
runtime,
/*slowjob,*/
}
} }
/// Create a new `State` in client mode. /// Create a new `State` in client mode.
@ -195,8 +249,8 @@ impl State {
GameMode::Singleplayer => "sp", GameMode::Singleplayer => "sp",
}; */ }; */
let num_cpu = /*num_cpus::get()*/pools.0/* / 2 + pools.0 / 4*/; let num_cpu = /*num_cpus::get()*/pools.slowjob_threads/* / 2 + pools.0 / 4*/;
let game_mode = pools.1; let game_mode = pools.game_mode;
/* let rayon_threads = match game_mode { /* let rayon_threads = match game_mode {
GameMode::Server | GameMode::Client => num_cpu/* / 2*/, GameMode::Server | GameMode::Client => num_cpu/* / 2*/,
GameMode::Singleplayer => num_cpu/* / 4*// 2, GameMode::Singleplayer => num_cpu/* / 4*// 2,
@ -248,7 +302,7 @@ impl State {
Self { Self {
ecs: Self::setup_ecs_world(ecs_role, /*num_cpu as u64*//*, &thread_pool, *//*pools.1*/slowjob/*pools.3*/, map_size_lg, default_chunk), ecs: Self::setup_ecs_world(ecs_role, /*num_cpu as u64*//*, &thread_pool, *//*pools.1*/slowjob/*pools.3*/, map_size_lg, default_chunk),
thread_pool: pools.2, thread_pool: pools.rayon_pool,
} }
} }

View File

@ -18,9 +18,8 @@ use crate::{
tui_runner::Tui, tui_runner::Tui,
tuilog::TuiLog, tuilog::TuiLog,
}; };
use common::{clock::Clock, consts::MIN_RECOMMENDED_TOKIO_THREADS}; use common::clock::Clock;
use common_base::span; use common_base::span;
use core::sync::atomic::{AtomicUsize, Ordering};
use server::{persistence::DatabaseSettings, settings::Protocol, Event, Input, Server}; use server::{persistence::DatabaseSettings, settings::Protocol, Event, Input, Server};
use std::{ use std::{
io, io,
@ -71,20 +70,9 @@ fn main() -> io::Result<()> {
path path
}; };
// We don't need that many threads in the async pool, at least 2 but generally let pools = common_state::State::pools(
// 25% of all available will do common::resources::GameMode::Server,
// TODO: evaluate std::thread::available_concurrency as a num_cpus replacement tokio::runtime::Builder::new_multi_thread(),
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads((num_cpus::get() / 4).max(MIN_RECOMMENDED_TOKIO_THREADS))
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("tokio-server-{}", id)
})
.build()
.unwrap(),
); );
// Load server settings // Load server settings
@ -109,7 +97,7 @@ fn main() -> io::Result<()> {
ArgvCommand::Shared(SharedCommand::Admin { command }) => { ArgvCommand::Shared(SharedCommand::Admin { command }) => {
let login_provider = server::login_provider::LoginProvider::new( let login_provider = server::login_provider::LoginProvider::new(
server_settings.auth_server_address, server_settings.auth_server_address,
runtime, pools.runtime,
); );
match command { match command {
@ -168,8 +156,7 @@ fn main() -> io::Result<()> {
editable_settings, editable_settings,
database_settings, database_settings,
&server_data_dir, &server_data_dir,
runtime, pools,
common_state::State::pools(common::resources::GameMode::Server),
) )
.expect("Failed to create server instance!"); .expect("Failed to create server instance!");

View File

@ -212,7 +212,6 @@ impl Server {
editable_settings: EditableSettings, editable_settings: EditableSettings,
database_settings: DatabaseSettings, database_settings: DatabaseSettings,
data_dir: &std::path::Path, data_dir: &std::path::Path,
runtime: Arc<Runtime>,
pools: common_state::Pools, pools: common_state::Pools,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
info!("Server data dir is: {}", data_dir.display()); info!("Server data dir is: {}", data_dir.display());
@ -251,13 +250,13 @@ impl Server {
}, },
calendar: Some(settings.calendar_mode.calendar_now()), calendar: Some(settings.calendar_mode.calendar_now()),
}, },
&pools.2, &pools.rayon_pool,
); );
#[cfg(not(feature = "worldgen"))] #[cfg(not(feature = "worldgen"))]
let (world, index) = World::generate(settings.world_seed); let (world, index) = World::generate(settings.world_seed);
#[cfg(feature = "worldgen")] #[cfg(feature = "worldgen")]
let map = world.get_map_data(index.as_index_ref(), &pools.2); let map = world.get_map_data(index.as_index_ref(), &pools.rayon_pool);
#[cfg(not(feature = "worldgen"))] #[cfg(not(feature = "worldgen"))]
let map = WorldMapMsg { let map = WorldMapMsg {
dimensions_lg: Vec2::zero(), dimensions_lg: Vec2::zero(),
@ -270,6 +269,7 @@ impl Server {
default_chunk: Arc::new(world.generate_oob_chunk()), default_chunk: Arc::new(world.generate_oob_chunk()),
}; };
let runtime = Arc::clone(&pools.runtime);
let mut state = State::server( let mut state = State::server(
pools, pools,
world.sim().map_size_lg(), world.sim().map_size_lg(),

View File

@ -415,11 +415,13 @@ impl<'a> System<'a> for Sys {
chunk chunk
}) })
}).collect::<Vec<_>>(); }).collect::<Vec<_>>();
if !chunks_to_remove.is_empty() {
// Drop chunks in a background thread. // Drop chunks in a background thread.
slow_jobs.spawn(&"CHUNK_DROP", async move { slow_jobs.spawn(&"CHUNK_DROP", async move {
drop(chunks_to_remove); drop(chunks_to_remove);
}); });
} }
}
} }
/// Convinient structure to use when you need to create new npc /// Convinient structure to use when you need to create new npc

View File

@ -59,7 +59,6 @@ use i18n::LocalizationHandle;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::Runtime;
/// A type used to store state that is shared between all play states. /// A type used to store state that is shared between all play states.
pub struct GlobalState { pub struct GlobalState {
@ -68,7 +67,6 @@ pub struct GlobalState {
pub settings: Settings, pub settings: Settings,
pub profile: Profile, pub profile: Profile,
pub window: Window, pub window: Window,
pub tokio_runtime: Arc<Runtime>,
#[cfg(feature = "egui-ui")] #[cfg(feature = "egui-ui")]
pub egui_state: EguiState, pub egui_state: EguiState,
pub lazy_init: scene::terrain::SpriteRenderContextLazy, pub lazy_init: scene::terrain::SpriteRenderContextLazy,

View File

@ -181,29 +181,6 @@ fn main() {
default_hook(panic_info); default_hook(panic_info);
})); }));
// Setup tokio runtime
use common::consts::MIN_RECOMMENDED_TOKIO_THREADS;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio::runtime::Builder;
// TODO: evaluate std::thread::available_concurrency as a num_cpus replacement
let cores = num_cpus::get();
let tokio_runtime = Arc::new(
Builder::new_multi_thread()
.enable_all()
.worker_threads((cores / 4).max(MIN_RECOMMENDED_TOKIO_THREADS))
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("tokio-voxygen-{}", id)
})
.build()
.unwrap(),
);
#[cfg(feature = "hot-reloading")] #[cfg(feature = "hot-reloading")]
assets::start_hot_reloading(); assets::start_hot_reloading();
@ -253,7 +230,7 @@ fn main() {
// Create window // Create window
use veloren_voxygen::{error::Error, render::RenderError}; use veloren_voxygen::{error::Error, render::RenderError};
let (mut window, event_loop) = match Window::new(&settings, &tokio_runtime) { let (mut window, event_loop) = match Window::new(&settings) {
Ok(ok) => ok, Ok(ok) => ok,
// Custom panic message when a graphics backend could not be found // Custom panic message when a graphics backend could not be found
Err(Error::RenderError(RenderError::CouldNotFindAdapter)) => { Err(Error::RenderError(RenderError::CouldNotFindAdapter)) => {
@ -290,7 +267,6 @@ fn main() {
audio, audio,
profile, profile,
window, window,
tokio_runtime,
#[cfg(feature = "egui-ui")] #[cfg(feature = "egui-ui")]
egui_state, egui_state,
lazy_init, lazy_init,

View File

@ -11,7 +11,6 @@ use std::{
}, },
time::Duration, time::Duration,
}; };
use tokio::runtime;
use tracing::{trace, warn}; use tracing::{trace, warn};
#[derive(Debug)] #[derive(Debug)]
@ -47,7 +46,6 @@ impl ClientInit {
connection_args: ConnectionArgs, connection_args: ConnectionArgs,
username: String, username: String,
password: String, password: String,
runtime: Arc<runtime::Runtime>,
pools: common_state::Pools, pools: common_state::Pools,
) -> Self { ) -> Self {
let (tx, rx) = unbounded(); let (tx, rx) = unbounded();
@ -55,9 +53,7 @@ impl ClientInit {
let cancel = Arc::new(AtomicBool::new(false)); let cancel = Arc::new(AtomicBool::new(false));
let cancel2 = Arc::clone(&cancel); let cancel2 = Arc::clone(&cancel);
let runtime2 = Arc::clone(&runtime); pools.runtime.spawn(async move {
runtime.spawn(async move {
let trust_fn = |auth_server: &str| { let trust_fn = |auth_server: &str| {
let _ = tx.send(Msg::IsAuthTrusted(auth_server.to_string())); let _ = tx.send(Msg::IsAuthTrusted(auth_server.to_string()));
trust_rx trust_rx
@ -76,7 +72,6 @@ impl ClientInit {
let mut mismatched_server_info = None; let mut mismatched_server_info = None;
match Client::new( match Client::new(
connection_args.clone(), connection_args.clone(),
Arc::clone(&runtime2),
&mut mismatched_server_info, &mut mismatched_server_info,
pools.clone(), pools.clone(),
&username, &username,
@ -87,8 +82,7 @@ impl ClientInit {
{ {
Ok(client) => { Ok(client) => {
let _ = tx.send(Msg::Done(Ok(client))); let _ = tx.send(Msg::Done(Ok(client)));
drop(pools); tokio::task::block_in_place(move || drop(pools));
tokio::task::block_in_place(move || drop(runtime2));
return; return;
}, },
Err(ClientError::NetworkErr(NetworkError::ConnectFailed( Err(ClientError::NetworkErr(NetworkError::ConnectFailed(
@ -112,9 +106,8 @@ impl ClientInit {
// address and all the attempts timed out. // address and all the attempts timed out.
let _ = tx.send(Msg::Done(Err(last_err.unwrap_or(Error::ServerNotFound)))); let _ = tx.send(Msg::Done(Err(last_err.unwrap_or(Error::ServerNotFound))));
drop(pools);
// Safe drop runtime // Safe drop runtime
tokio::task::block_in_place(move || drop(runtime2)); tokio::task::block_in_place(move || drop(pools));
}); });
ClientInit { ClientInit {

View File

@ -22,7 +22,7 @@ use common_base::span;
use i18n::LocalizationHandle; use i18n::LocalizationHandle;
use scene::Scene; use scene::Scene;
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime; use tokio::Builder;
use tracing::error; use tracing::error;
use ui::{Event as MainMenuEvent, MainMenuUi}; use ui::{Event as MainMenuEvent, MainMenuUi};
@ -104,7 +104,7 @@ impl PlayState for MainMenuState {
"".to_owned(), "".to_owned(),
ConnectionArgs::Mpsc(14004), ConnectionArgs::Mpsc(14004),
&mut self.init, &mut self.init,
&global_state.tokio_runtime, &pools.runtime,
&global_state.i18n, &global_state.i18n,
Some(pools), Some(pools),
); );
@ -300,7 +300,6 @@ impl PlayState for MainMenuState {
password, password,
connection_args, connection_args,
&mut self.init, &mut self.init,
&global_state.tokio_runtime,
&global_state.i18n, &global_state.i18n,
None, None,
); );
@ -331,7 +330,8 @@ impl PlayState for MainMenuState {
}, },
#[cfg(feature = "singleplayer")] #[cfg(feature = "singleplayer")]
MainMenuEvent::StartSingleplayer => { MainMenuEvent::StartSingleplayer => {
let singleplayer = Singleplayer::new(&global_state.tokio_runtime);
let singleplayer = Singleplayer::new();
global_state.singleplayer = Some(singleplayer); global_state.singleplayer = Some(singleplayer);
}, },
@ -500,7 +500,6 @@ fn attempt_login(
password: String, password: String,
connection_args: ConnectionArgs, connection_args: ConnectionArgs,
init: &mut InitState, init: &mut InitState,
runtime: &Arc<runtime::Runtime>,
localized_strings: &LocalizationHandle, localized_strings: &LocalizationHandle,
pools: Option<common_state::Pools>, pools: Option<common_state::Pools>,
) { ) {
@ -533,7 +532,12 @@ fn attempt_login(
username, username,
password, password,
Arc::clone(runtime), Arc::clone(runtime),
pools.unwrap_or_else(|| common_state::State::pools(common::resources::GameMode::Client)), pools.unwrap_or_else(|| {
common_state::State::pools(
common::resources::GameMode::Client,
tokio::runtime::Builder::new_multi_thread(),
)
}),
)); ));
} }
} }

View File

@ -198,7 +198,6 @@ impl Renderer {
pub fn new( pub fn new(
window: &winit::window::Window, window: &winit::window::Window,
mode: RenderMode, mode: RenderMode,
runtime: &tokio::runtime::Runtime,
) -> Result<Self, RenderError> { ) -> Result<Self, RenderError> {
let (pipeline_modes, mut other_modes) = mode.split(); let (pipeline_modes, mut other_modes) = mode.split();
// Enable seamless cubemaps globally, where available--they are essentially a // Enable seamless cubemaps globally, where available--they are essentially a
@ -312,6 +311,9 @@ impl Renderer {
path path
}); });
let runtime = runtime::Builder::new_current_thread().build()
.expect("Failed to create single-threaded tokio runtime (for renderer initialization).");
let (device, queue) = runtime.block_on(adapter.request_device( let (device, queue) = runtime.block_on(adapter.request_device(
&wgpu::DeviceDescriptor { &wgpu::DeviceDescriptor {
// TODO // TODO

View File

@ -12,7 +12,6 @@ use std::{
thread::{self, JoinHandle}, thread::{self, JoinHandle},
time::Duration, time::Duration,
}; };
use tokio::runtime::Runtime;
use tracing::{error, info, trace, warn}; use tracing::{error, info, trace, warn};
const TPS: u64 = 30; const TPS: u64 = 30;
@ -30,7 +29,7 @@ pub struct Singleplayer {
} }
impl Singleplayer { impl Singleplayer {
pub fn new(runtime: &Arc<Runtime>) -> Self { pub fn new() -> Self {
let (stop_server_s, stop_server_r) = unbounded(); let (stop_server_s, stop_server_r) = unbounded();
// Determine folder to save server data in // Determine folder to save server data in
@ -100,18 +99,19 @@ impl Singleplayer {
let (result_sender, result_receiver) = bounded(1); let (result_sender, result_receiver) = bounded(1);
let builder = thread::Builder::new().name("singleplayer-server-thread".into()); let builder = thread::Builder::new().name("singleplayer-server-thread".into());
let runtime = Arc::clone(runtime);
let thread = builder let thread = builder
.spawn(move || { .spawn(move || {
trace!("starting singleplayer server thread"); trace!("starting singleplayer server thread");
let pools = common_state::State::pools(common::resources::GameMode::Singleplayer); let pools = common_state::State::pools(
common::resources::GameMode::Singleplayer,
tokio::runtime::Builder::new_multi_thread(),
);
let (server, init_result) = match Server::new( let (server, init_result) = match Server::new(
settings2, settings2,
editable_settings, editable_settings,
database_settings, database_settings,
&server_data_dir, &server_data_dir,
runtime,
pools.clone(), pools.clone(),
) { ) {
Ok(server) => (Some(server), Ok(pools)), Ok(server) => (Some(server), Ok(pools)),

View File

@ -411,10 +411,7 @@ pub struct Window {
} }
impl Window { impl Window {
pub fn new( pub fn new(settings: &Settings) -> Result<(Window, EventLoop), Error> {
settings: &Settings,
runtime: &tokio::runtime::Runtime,
) -> Result<(Window, EventLoop), Error> {
let event_loop = EventLoop::new(); let event_loop = EventLoop::new();
let size = settings.graphics.window_size; let size = settings.graphics.window_size;
@ -434,7 +431,7 @@ impl Window {
let window = win_builder.build(&event_loop).unwrap(); let window = win_builder.build(&event_loop).unwrap();
let renderer = Renderer::new(&window, settings.graphics.render_mode.clone(), runtime)?; let renderer = Renderer::new(&window, settings.graphics.render_mode.clone())?;
let keypress_map = HashMap::new(); let keypress_map = HashMap::new();