Limit the number of cpu heavy threads spawned via tokio

This commit is contained in:
Imbris 2021-03-13 16:29:00 -05:00
parent fb850a4ec8
commit 506f8fa226
20 changed files with 112 additions and 18 deletions

11
Cargo.lock generated
View File

@ -4818,6 +4818,12 @@ dependencies = [
"byteorder",
]
[[package]]
name = "std-semaphore"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ae9eec00137a8eed469fb4148acd9fc6ac8c3f9b110f52cd34698c8b5bfa0e"
[[package]]
name = "stdweb"
version = "0.1.3"
@ -5539,6 +5545,7 @@ dependencies = [
"rustyline",
"serde",
"specs",
"std-semaphore",
"termcolor",
"tokio",
"tracing",
@ -5750,6 +5757,7 @@ dependencies = [
"slab",
"specs",
"specs-idvs",
"std-semaphore",
"tokio",
"tracing",
"vek 0.14.1",
@ -5771,9 +5779,11 @@ dependencies = [
"clap",
"crossterm 0.19.0",
"lazy_static",
"num_cpus",
"ron",
"serde",
"signal-hook 0.3.6",
"std-semaphore",
"termcolor",
"tokio",
"tracing",
@ -5833,6 +5843,7 @@ dependencies = [
"serde",
"specs",
"specs-idvs",
"std-semaphore",
"termcolor",
"tokio",
"tracing",

View File

@ -27,6 +27,7 @@ num = "0.4"
tracing = { version = "0.1", default-features = false }
rayon = "1.5"
specs = { git = "https://github.com/amethyst/specs.git", rev = "5a9b71035007be0e3574f35184acac1cd4530496" }
std-semaphore = "0.1.0"
vek = { version = "=0.14.1", features = ["serde"] }
hashbrown = { version = "0.9", features = ["rayon", "serde", "nightly"] }
authc = { git = "https://gitlab.com/veloren/auth.git", rev = "fb3dcbc4962b367253f8f2f92760ef44d2679c9a" }

View File

@ -44,7 +44,9 @@ fn main() {
let password = read_input();
let runtime = Arc::new(Runtime::new().unwrap());
let background_threads = Arc::new(std_semaphore::Semaphore::new(3));
let runtime2 = Arc::clone(&runtime);
let background_threads2 = Arc::clone(&background_threads);
// Create a client.
let mut client = runtime
@ -52,7 +54,7 @@ fn main() {
let addr = ConnectionArgs::resolve(&server_addr, false)
.await
.expect("dns resolve failed");
Client::new(addr, None, runtime2).await
Client::new(addr, None, runtime2, background_threads2).await
})
.expect("Failed to create client instance");

View File

@ -138,6 +138,7 @@ pub struct Client {
registered: bool,
presence: Option<PresenceKind>,
runtime: Arc<Runtime>,
background_threads: Arc<std_semaphore::Semaphore>,
server_info: ServerInfo,
world_data: WorldData,
player_list: HashMap<Uid, PlayerInfo>,
@ -198,6 +199,7 @@ impl Client {
addr: ConnectionArgs,
view_distance: Option<u32>,
runtime: Arc<Runtime>,
background_threads: Arc<std_semaphore::Semaphore>,
) -> Result<Self, Error> {
let network = Network::new(Pid::new(), &runtime);
@ -446,6 +448,7 @@ impl Client {
registered: false,
presence: None,
runtime,
background_threads,
server_info,
world_data: WorldData {
lod_base,
@ -1842,6 +1845,10 @@ impl Client {
/// exempt).
pub fn runtime(&self) -> &Arc<Runtime> { &self.runtime }
/// Get a reference to the semaphore used to guard the number of cpu heavy
/// background threads running at once
pub fn background_threads(&self) -> &Arc<std_semaphore::Semaphore> { &self.background_threads }
/// Get a reference to the client's game state.
pub fn state(&self) -> &State { &self.state }
@ -2176,11 +2183,13 @@ mod tests {
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9000);
let view_distance: Option<u32> = None;
let runtime = Arc::new(Runtime::new().unwrap());
let background_threads = Arc::new(std_semaphore::Semaphore::new(3));
let runtime2 = Arc::clone(&runtime);
let veloren_client: Result<Client, Error> = runtime.block_on(Client::new(
ConnectionArgs::IpAndPort(vec![socket]),
view_distance,
runtime2,
background_threads,
));
let _ = veloren_client.map(|mut client| {

View File

@ -21,12 +21,14 @@ ansi-parser = "0.7"
clap = "2.33"
crossterm = "0.19"
lazy_static = "1"
num_cpus = "1.0"
signal-hook = "0.3.6"
termcolor = "1.1"
tracing = { version = "0.1", default-features = false }
tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] }
ron = {version = "0.6", default-features = false}
serde = {version = "1.0", features = [ "rc", "derive" ]}
std-semaphore = "0.1.0"
# Tracy
tracing-tracy = { version = "0.6.0", optional = true }

View File

@ -16,6 +16,7 @@ use crate::{cmd::Message, shutdown_coordinator::ShutdownCoordinator, tui_runner:
use clap::{App, Arg, SubCommand};
use common::clock::Clock;
use common_base::span;
use core::sync::atomic::{AtomicUsize, Ordering};
use server::{Event, Input, Server};
use std::{
io,
@ -92,12 +93,23 @@ fn main() -> io::Result<()> {
path
};
let cores = num_cpus::get();
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(if cores < 2 { 1 } else { cores / 2 + cores / 4 })
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("tokio-server-{}", id)
})
.build()
.unwrap(),
);
let background_cpu_threads =
Arc::new(std_semaphore::Semaphore::new(
if cores < 2 { 1 } else { cores / 2 + cores / 4 } as isize,
));
// Load server settings
let mut server_settings = server::Settings::load(&server_data_dir);
@ -143,6 +155,7 @@ fn main() -> io::Result<()> {
editable_settings,
&server_data_dir,
runtime,
background_cpu_threads,
)
.expect("Failed to create server instance!");

View File

@ -49,6 +49,7 @@ diesel = { version = "1.4.3", features = ["sqlite"] }
diesel_migrations = "1.4.0"
dotenv = "0.15.0"
slab = "0.4"
std-semaphore = "0.1.0"
# Plugins
plugin-api = { package = "veloren-plugin-api", path = "../plugin/api"}

View File

@ -22,16 +22,21 @@ pub struct ChunkGenerator {
chunk_tx: crossbeam_channel::Sender<ChunkGenResult>,
chunk_rx: crossbeam_channel::Receiver<ChunkGenResult>,
pending_chunks: HashMap<Vec2<i32>, Arc<AtomicBool>>,
background_threads: Arc<std_semaphore::Semaphore>,
metrics: Arc<ChunkGenMetrics>,
}
impl ChunkGenerator {
#[allow(clippy::new_without_default)] // TODO: Pending review in #587
pub fn new(metrics: ChunkGenMetrics) -> Self {
pub fn new(
metrics: ChunkGenMetrics,
background_threads: Arc<std_semaphore::Semaphore>,
) -> Self {
let (chunk_tx, chunk_rx) = crossbeam_channel::unbounded();
Self {
chunk_tx,
chunk_rx,
pending_chunks: HashMap::new(),
background_threads,
metrics: Arc::new(metrics),
}
}
@ -52,8 +57,10 @@ impl ChunkGenerator {
let cancel = Arc::new(AtomicBool::new(false));
v.insert(Arc::clone(&cancel));
let chunk_tx = self.chunk_tx.clone();
let background_threads = Arc::clone(&self.background_threads);
self.metrics.chunks_requested.inc();
runtime.spawn_blocking(move || {
let _cpu_guard = background_threads.access();
common_base::prof_span!(_guard, "generate_chunk");
let index = index.as_index_ref();
let payload = world

View File

@ -151,6 +151,7 @@ impl Server {
editable_settings: EditableSettings,
data_dir: &std::path::Path,
runtime: Arc<Runtime>,
background_threads: Arc<std_semaphore::Semaphore>,
) -> Result<Self, Error> {
info!("Server is data dir is: {}", data_dir.display());
if settings.auth_server_address.is_none() {
@ -199,7 +200,7 @@ impl Server {
state.ecs_mut().insert(physics_metrics);
state
.ecs_mut()
.insert(ChunkGenerator::new(chunk_gen_metrics));
.insert(ChunkGenerator::new(chunk_gen_metrics, background_threads));
state
.ecs_mut()
.insert(CharacterUpdater::new(&persistence_db_dir)?);

View File

@ -82,6 +82,7 @@ rand = "0.8"
rodio = {version = "0.13", default-features = false, features = ["wav", "vorbis"]}
ron = {version = "0.6", default-features = false}
serde = {version = "1.0", features = [ "rc", "derive" ]}
std-semaphore = "0.1.0"
treeculler = "0.1.0"
tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] }
num_cpus = "1.0"

View File

@ -149,6 +149,7 @@ impl PlayState for CharSelectionState {
delta_time: client.state().ecs().read_resource::<DeltaTime>().0,
tick: client.get_tick(),
runtime: client.runtime(),
background_threads: client.background_threads(),
body: humanoid_body,
gamma: global_state.settings.graphics.gamma,
exposure: global_state.settings.graphics.exposure,

View File

@ -50,19 +50,19 @@ impl ClientInit {
username: String,
view_distance: Option<u32>,
password: String,
runtime: Option<Arc<runtime::Runtime>>,
runtime_and_threads: Option<(Arc<runtime::Runtime>, Arc<std_semaphore::Semaphore>)>,
) -> Self {
let (tx, rx) = unbounded();
let (trust_tx, trust_rx) = unbounded();
let cancel = Arc::new(AtomicBool::new(false));
let cancel2 = Arc::clone(&cancel);
let runtime = runtime.unwrap_or_else(|| {
let (runtime, background_threads) = runtime_and_threads.unwrap_or_else(|| {
let cores = num_cpus::get();
Arc::new(
let runtime = Arc::new(
runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(if cores > 4 { cores - 1 } else { cores })
.worker_threads(if cores < 2 { 1 } else { cores / 2 + cores / 4 })
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
@ -70,9 +70,20 @@ impl ClientInit {
})
.build()
.unwrap(),
)
);
let background_threads = Arc::new(std_semaphore::Semaphore::new(if cores < 2 {
1
} else {
cores / 2 + cores / 4
}
as isize));
(runtime, background_threads)
});
let runtime2 = Arc::clone(&runtime);
let background_threads2 = Arc::clone(&background_threads);
runtime.spawn(async move {
let trust_fn = |auth_server: &str| {
@ -106,6 +117,7 @@ impl ClientInit {
connection_args.clone(),
view_distance,
Arc::clone(&runtime2),
Arc::clone(&background_threads2),
)
.await
{

View File

@ -69,7 +69,7 @@ impl PlayState for MainMenuState {
{
if let Some(singleplayer) = &global_state.singleplayer {
match singleplayer.receiver.try_recv() {
Ok(Ok(runtime)) => {
Ok(Ok((runtime, background_threads))) => {
// Attempt login after the server is finished initializing
attempt_login(
&mut global_state.settings,
@ -78,7 +78,7 @@ impl PlayState for MainMenuState {
"".to_owned(),
ClientConnArgs::Resolved(ConnectionArgs::Mpsc(14004)),
&mut self.client_init,
Some(runtime),
Some((runtime, background_threads)),
);
},
Ok(Err(e)) => {
@ -342,7 +342,7 @@ fn attempt_login(
password: String,
connection_args: ClientConnArgs,
client_init: &mut Option<ClientInit>,
runtime: Option<Arc<runtime::Runtime>>,
runtime_and_threads: Option<(Arc<runtime::Runtime>, Arc<std_semaphore::Semaphore>)>,
) {
if comp::Player::alias_is_valid(&username) {
// Don't try to connect if there is already a connection in progress.
@ -352,7 +352,7 @@ fn attempt_login(
username,
Some(settings.graphics.view_distance),
password,
runtime,
runtime_and_threads,
));
}
} else {

View File

@ -339,6 +339,7 @@ where
camera_mode: CameraMode,
character_state: Option<&CharacterState>,
runtime: &Runtime,
background_threads: &Arc<std_semaphore::Semaphore>,
) -> (FigureModelEntryLod<'c>, &'c Skel::Attr)
where
for<'a> &'a Skel::Body: Into<Skel::Attr>,
@ -403,8 +404,10 @@ where
let slot = Arc::new(atomic::AtomicCell::new(None));
let manifests = self.manifests;
let slot_ = Arc::clone(&slot);
let background_threads_ = Arc::clone(background_threads);
runtime.spawn_blocking(move || {
let _cpu_guard = background_threads_.access();
// First, load all the base vertex data.
let manifests = &*manifests.read();
let meshes = <Skel::Body as BodySpec>::bone_meshes(&key, manifests);

View File

@ -757,6 +757,7 @@ impl FigureMgr {
player_camera_mode,
player_character_state,
scene_data.runtime,
scene_data.background_threads,
);
let state = self
@ -1553,6 +1554,7 @@ impl FigureMgr {
player_camera_mode,
player_character_state,
scene_data.runtime,
scene_data.background_threads,
);
let state = self
@ -1755,6 +1757,7 @@ impl FigureMgr {
player_camera_mode,
player_character_state,
scene_data.runtime,
scene_data.background_threads,
);
let state = self
@ -2082,6 +2085,7 @@ impl FigureMgr {
player_camera_mode,
player_character_state,
scene_data.runtime,
scene_data.background_threads,
);
let state = self
@ -2441,6 +2445,7 @@ impl FigureMgr {
player_camera_mode,
player_character_state,
scene_data.runtime,
scene_data.background_threads,
);
let state = self
@ -2551,6 +2556,7 @@ impl FigureMgr {
player_camera_mode,
player_character_state,
scene_data.runtime,
scene_data.background_threads,
);
let state = self
@ -2640,6 +2646,7 @@ impl FigureMgr {
player_camera_mode,
player_character_state,
scene_data.runtime,
scene_data.background_threads,
);
let state = self
@ -2986,6 +2993,7 @@ impl FigureMgr {
player_camera_mode,
player_character_state,
scene_data.runtime,
scene_data.background_threads,
);
let state =
@ -3080,6 +3088,7 @@ impl FigureMgr {
player_camera_mode,
player_character_state,
scene_data.runtime,
scene_data.background_threads,
);
let state = self
@ -3267,6 +3276,7 @@ impl FigureMgr {
player_camera_mode,
player_character_state,
scene_data.runtime,
scene_data.background_threads,
);
let state = self
@ -3358,6 +3368,7 @@ impl FigureMgr {
player_camera_mode,
player_character_state,
scene_data.runtime,
scene_data.background_threads,
);
let state = self
@ -3447,6 +3458,7 @@ impl FigureMgr {
player_camera_mode,
player_character_state,
scene_data.runtime,
scene_data.background_threads,
);
let state = self
@ -3883,6 +3895,7 @@ impl FigureMgr {
player_camera_mode,
player_character_state,
scene_data.runtime,
scene_data.background_threads,
);
let state =
@ -4067,6 +4080,7 @@ impl FigureMgr {
player_camera_mode,
player_character_state,
scene_data.runtime,
scene_data.background_threads,
);
let state =
@ -4195,6 +4209,7 @@ impl FigureMgr {
player_camera_mode,
player_character_state,
scene_data.runtime,
scene_data.background_threads,
);
let state = self

View File

@ -116,6 +116,7 @@ pub struct SceneData<'a> {
pub view_distance: u32,
pub tick: u64,
pub runtime: &'a Runtime,
pub background_threads: &'a std::sync::Arc<std_semaphore::Semaphore>,
pub gamma: f32,
pub exposure: f32,
pub ambiance: f32,

View File

@ -98,6 +98,7 @@ pub struct SceneData<'a> {
pub delta_time: f32,
pub tick: u64,
pub runtime: &'a Runtime,
pub background_threads: &'a std::sync::Arc<std_semaphore::Semaphore>,
pub body: Option<humanoid::Body>,
pub gamma: f32,
pub exposure: f32,
@ -358,6 +359,7 @@ impl Scene {
CameraMode::default(),
None,
scene_data.runtime,
scene_data.background_threads,
)
.0;
let mut buf = [Default::default(); anim::MAX_BONE_COUNT];

View File

@ -694,10 +694,12 @@ impl<V: RectRasterableVol> Terrain<V> {
// Limit ourselves to u16::MAX even if larger textures are supported.
let max_texture_size = renderer.max_texture_size();
// TODO: we need a blocking and non-blocking semaphore
// so we can use it in a non-blocking fashion on the main thread here
// and use it to block in other places that need that (chunk gen)
let meshing_cores = match num_cpus::get() as u64 {
n if n < 4 => 1,
n if n < 8 => n - 3,
n => n - 4,
n if n < 2 => 1,
n => n / 2 + n / 4,
};
span!(guard, "Queue meshing from todo list");
@ -768,7 +770,9 @@ impl<V: RectRasterableVol> Terrain<V> {
let sprite_config = Arc::clone(&self.sprite_config);
let cnt = Arc::clone(&self.mesh_todos_active);
cnt.fetch_add(1, Ordering::Relaxed);
let background_threads = Arc::clone(scene_data.background_threads);
scene_data.runtime.spawn_blocking(move || {
let _cpu_guard = background_threads.access();
let sprite_data = sprite_data;
let _ = send.send(mesh_worker(
pos,

View File

@ -1443,6 +1443,7 @@ impl PlayState for SessionState {
view_distance: client.view_distance().unwrap_or(1),
tick: client.get_tick(),
runtime: &client.runtime(),
background_threads: client.background_threads(),
gamma: global_state.settings.graphics.gamma,
exposure: global_state.settings.graphics.exposure,
ambiance: global_state.settings.graphics.ambiance,
@ -1511,6 +1512,7 @@ impl PlayState for SessionState {
view_distance: client.view_distance().unwrap_or(1),
tick: client.get_tick(),
runtime: &client.runtime(),
background_threads: client.background_threads(),
gamma: settings.graphics.gamma,
exposure: settings.graphics.exposure,
ambiance: settings.graphics.ambiance,

View File

@ -19,7 +19,8 @@ const TPS: u64 = 30;
pub struct Singleplayer {
_server_thread: JoinHandle<()>,
stop_server_s: Sender<()>,
pub receiver: Receiver<Result<Arc<Runtime>, ServerError>>,
#[allow(clippy::type_complexity)] // TODO: create new type for the sent things
pub receiver: Receiver<Result<(Arc<Runtime>, Arc<std_semaphore::Semaphore>), ServerError>>,
// Wether the server is stopped or not
paused: Arc<AtomicBool>,
// Settings that the server was started with
@ -83,7 +84,7 @@ impl Singleplayer {
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(if cores > 4 { cores - 1 } else { cores })
.worker_threads(if cores < 2 { 1 } else { cores / 2 + cores / 4 })
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
@ -92,6 +93,10 @@ impl Singleplayer {
.build()
.unwrap(),
);
let background_threads =
Arc::new(std_semaphore::Semaphore::new(
if cores < 2 { 1 } else { cores / 2 + cores / 4 } as isize,
));
let settings2 = settings.clone();
@ -111,10 +116,11 @@ impl Singleplayer {
editable_settings,
&server_data_dir,
Arc::clone(&runtime),
Arc::clone(&background_threads),
) {
Ok(s) => {
server = Some(s);
Ok(runtime)
Ok((runtime, background_threads))
},
Err(e) => Err(e),
},