Threads pinned to cores, initial groundwork for async slowjobs.

This commit is contained in:
Joshua Yanovski 2022-08-24 21:43:14 -07:00
parent 0dd52f9f11
commit 0f01b78e4b
21 changed files with 304 additions and 84 deletions

76
Cargo.lock generated
View File

@ -158,7 +158,7 @@ dependencies = [
[[package]]
name = "arbalest"
version = "0.2.1"
source = "git+https://gitlab.com/veloren/arbalest.git?rev=9cb8f67a4f6d8f3cc908dac4eb5eb8aec9fab07b#9cb8f67a4f6d8f3cc908dac4eb5eb8aec9fab07b"
source = "git+https://gitlab.com/veloren/arbalest.git?rev=743a8a262e4a762d6d7656c1193782ea35dd2a1d#743a8a262e4a762d6d7656c1193782ea35dd2a1d"
[[package]]
name = "arr_macro"
@ -252,6 +252,12 @@ dependencies = [
"futures-core",
]
[[package]]
name = "async-task"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a40729d2133846d9ed0ea60a8b9541bccddab49cd30f0715a1da672fe9a2524"
[[package]]
name = "async-trait"
version = "0.1.53"
@ -1013,6 +1019,18 @@ dependencies = [
"objc",
]
[[package]]
name = "core_affinity"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f8a03115cc34fb0d7c321dd154a3914b3ca082ccc5c11d91bf7117dbbe7171f"
dependencies = [
"kernel32-sys",
"libc",
"num_cpus",
"winapi 0.2.8",
]
[[package]]
name = "coreaudio-rs"
version = "0.10.0"
@ -1183,7 +1201,7 @@ dependencies = [
"crossbeam-channel",
"crossbeam-deque 0.8.1",
"crossbeam-epoch 0.9.8",
"crossbeam-queue",
"crossbeam-queue 0.3.5",
"crossbeam-utils 0.8.8",
]
@ -1248,6 +1266,15 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b"
dependencies = [
"crossbeam-utils 0.6.6",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.5"
@ -1258,6 +1285,16 @@ dependencies = [
"crossbeam-utils 0.8.8",
]
[[package]]
name = "crossbeam-utils"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6"
dependencies = [
"cfg-if 0.1.10",
"lazy_static",
]
[[package]]
name = "crossbeam-utils"
version = "0.7.2"
@ -1791,6 +1828,23 @@ version = "2.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71"
[[package]]
name = "executors"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a94f9d4d0b228598ba8db61d2fe9321df9afcaa10aabdc26aa183cc76b10f7d"
dependencies = [
"arr_macro",
"async-task",
"core_affinity",
"crossbeam-channel",
"crossbeam-deque 0.8.1",
"crossbeam-utils 0.8.8",
"log",
"rand 0.8.5",
"synchronoise",
]
[[package]]
name = "fallible-iterator"
version = "0.2.0"
@ -3388,7 +3442,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b47412f3a52115b936ff2a229b803498c7b4d332adeb87c2f1498c9da54c398c"
dependencies = [
"crossbeam",
"crossbeam-queue",
"crossbeam-queue 0.3.5",
"log",
"mio 0.7.14",
]
@ -5667,7 +5721,7 @@ version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ea85dac2880f84d4025ff5ace80cda6d8bc43bc88b6a389b9277fcf894b51e9"
dependencies = [
"crossbeam-queue",
"crossbeam-queue 0.3.5",
"hashbrown 0.12.0",
"hibitset",
"log",
@ -5926,6 +5980,15 @@ dependencies = [
"unicode-xid 0.2.2",
]
[[package]]
name = "synchronoise"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d717ed0efc9d39ab3b642a096bc369a3e02a38a51c41845d7fe31bdad1d6eaeb"
dependencies = [
"crossbeam-queue 0.1.2",
]
[[package]]
name = "synstructure"
version = "0.12.6"
@ -6529,6 +6592,8 @@ dependencies = [
"csv",
"dot_vox",
"enum-iterator",
"executors",
"futures",
"fxhash",
"hashbrown 0.12.0",
"indexmap",
@ -6538,6 +6603,7 @@ dependencies = [
"num-traits",
"ordered-float 2.10.0",
"petgraph 0.6.0",
"pin-project-lite",
"rand 0.8.5",
"rayon",
"ron 0.7.0",
@ -6634,6 +6700,7 @@ name = "veloren-common-state"
version = "0.10.0"
dependencies = [
"bincode",
"core_affinity",
"hashbrown 0.12.0",
"num_cpus",
"rayon",
@ -6820,6 +6887,7 @@ dependencies = [
"veloren-common-base",
"veloren-common-frontend",
"veloren-common-net",
"veloren-common-state",
"veloren-server",
]

View File

@ -3,7 +3,7 @@
(
caverns: false, // TODO: Disabled by default until cave overhaul
caves: false,
caves: true,
rocks: true,
shrubs: true,
trees: true,

View File

@ -289,6 +289,7 @@ impl Client {
runtime: Arc<Runtime>,
// TODO: refactor to avoid needing to use this out parameter
mismatched_server_info: &mut Option<ServerInfo>,
pools: common_state::Pools,
) -> Result<Self, Error> {
let network = Network::new(Pid::new(), &runtime);
@ -395,7 +396,7 @@ impl Client {
ability_map,
} => {
// Initialize `State`
let mut state = State::client();
let mut state = State::client(pools);
// Client-only components
state.ecs_mut().register::<comp::Last<CharacterState>>();
state.ecs_mut().write_resource::<SlowJobPool>()
@ -1842,7 +1843,7 @@ impl Client {
for key in chunks_to_remove {
let chunk = self.state.remove_chunk(key);
// Drop chunk in a background thread.
slowjob.spawn(&"TERRAIN_DROP", move || { drop(chunk); });
slowjob.spawn(&"TERRAIN_DROP", async move { drop(chunk); });
}
drop(slowjob);

View File

@ -30,6 +30,7 @@ enum-iterator = "0.7"
vek = { version = "0.15.8", features = ["serde"] }
chrono = "0.4"
chrono-tz = "0.6"
pin-project-lite = "0.2"
sha2 = "0.10"
serde_json = "1.0.50"
serde_with = { version = "1.14.0" }
@ -48,6 +49,13 @@ clap = "2.33"
crossbeam-utils = "0.8.1"
bitflags = "1.2"
crossbeam-channel = "0.5"
executors = { version = "0.9", default-features = false, features = [
"workstealing-exec",
"ws-timed-fairness",
"thread-pinning",
"numa-aware"
] }
futures = "0.3"
lazy_static = "1.4.0"
num-derive = "0.3"
num-traits = "0.2"
@ -79,7 +87,8 @@ petgraph = { version = "0.6", optional = true }
kiddo = { version = "0.1", optional = true }
# Data structures
arbalest = { git = "https://gitlab.com/veloren/arbalest.git", rev = "9cb8f67a4f6d8f3cc908dac4eb5eb8aec9fab07b", features = ["nightly"] }
arbalest = { git = "https://gitlab.com/veloren/arbalest.git", rev = "743a8a262e4a762d6d7656c1193782ea35dd2a1d", features = ["nightly"] }
# arbalest = { path = "../../arbalest", features = ["nightly"] }
hashbrown = { version = "0.12", features = ["rayon", "serde", "nightly"] }
slotmap = { version = "1.0", features = ["serde"] }
indexmap = { version = "1.3.0", features = ["rayon"] }

View File

@ -10,6 +10,7 @@
bool_to_option,
coerce_unsized,
dispatch_from_dyn,
exclusive_wrapper,
fundamental,
generic_const_exprs,
generic_arg_infer,

View File

@ -1,18 +1,30 @@
use arbalest::sync::{Strong, Frail};
use core::{
fmt,
future::Future,
marker::Unsize,
ops::{CoerceUnsized, DispatchFromDyn},
sync::atomic::{AtomicBool, Ordering}
pin::Pin,
sync::{atomic::{AtomicBool, Ordering}, Exclusive},
task,
};
use executors::{
crossbeam_workstealing_pool::ThreadPool as ThreadPool_,
parker::{LargeThreadData, StaticParker},
Executor,
};
pub use executors::parker::large;
use hashbrown::{hash_map::Entry, HashMap};
use rayon::ThreadPool;
use pin_project_lite::pin_project;
// use rayon::ThreadPool;
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
time::Instant,
};
use tracing::{error, warn};
use tracing::{error, warn/* , Instrument */};
pub type ThreadPool = ThreadPool_<StaticParker<LargeThreadData>>;
/// Provides a Wrapper around rayon threadpool to execute slow-jobs.
/// slow means, the job doesn't need to not complete within the same tick.
@ -51,24 +63,30 @@ use tracing::{error, warn};
/// pool.configure("CHUNK_GENERATOR", |n| n / 2);
/// pool.spawn("CHUNK_GENERATOR", move || println!("this is a job"));
/// ```
#[derive(Clone)]
// #[derive(Clone)]
pub struct SlowJobPool {
internal: Arc<Mutex<InternalSlowJobPool>>,
threadpool: Arc<ThreadPool>,
threadpool: ThreadPool,
}
impl Drop for SlowJobPool {
fn drop(&mut self) {
self.threadpool.shutdown_borrowed();
}
}
type Name = /*String*/&'static str;
#[derive(Debug)]
pub struct SlowJob {
task: Frail<Queue>,
task: Pin<Frail<Queue>>,
}
// impl<T: ?Sized + Unsize<U> + CoerceUnsized<U>, U: ?Sized> CoerceUnsized<Task<U>> for Task<T> {}
struct InternalSlowJobPool {
cur_slot: usize,
queue: HashMap<Name, VecDeque<Strong<Queue>>>,
queue: HashMap<Name, VecDeque<Pin<Strong<Queue>>>>,
configs: HashMap<Name, Config>,
last_spawned_configs: Vec<Name>,
global_spawned_and_running: u64,
@ -83,20 +101,29 @@ struct Config {
local_spawned_and_running: u64,
}
#[derive(Debug)]
struct Task<F: ?Sized> {
queue_created: Instant,
/// Has this task been canceled?
is_canceled: AtomicBool,
/// The actual task function. Technically, the Option is unnecessary, since we'll only ever
/// run it once, but the performance improvement doesn't justify unsafe in this case.
task: F,
pin_project! {
struct Task<F: ?Sized> {
queue_created: Instant,
// Has this task been canceled?
is_canceled: AtomicBool,
#[pin]
// The actual task future.
task: Exclusive<F>,
}
}
impl<F: Future + ?Sized> Future for Task<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
self.project().task.poll(cx)
}
}
/// NOTE: Should be FnOnce, but can't because there's no easy way to run an FnOnce function on an
/// Arc even if [try_unwrap] would work. We could write unsafe code to do this, but it probably
/// Arc even if [try_unwrap] would work. We could write non-safe code to do this, but it probably
/// isn't worth it.
type Queue = Task<dyn FnMut() + Send + Sync + 'static>;
type Queue = Task<dyn /*FnMut()*/Future<Output=()> + Send + 'static>;
pub struct JobMetrics {
pub queue_created: Instant,
@ -105,20 +132,20 @@ pub struct JobMetrics {
}
impl<F> Task<F> {
fn new(f: F) -> Task<impl FnMut() + Send + Sync + 'static>
where F: FnOnce() + Send + Sync + 'static
fn new(f: F) -> Task<impl /*FnMut()*/Future<Output=F::Output> + Send + 'static>
where F: /*FnOnce()*/Future + Send + 'static
{
let queue_created = Instant::now();
let mut f = Some(f);
/* let mut f = Some(f); */
Task {
queue_created,
is_canceled: AtomicBool::new(false),
task: move || {
task: Exclusive::new(/* move || {
// Working around not being able to call FnOnce in an Arc.
if let Some(f) = f.take() {
f();
}
},
}*/f),
}
}
}
@ -246,14 +273,15 @@ impl InternalSlowJobPool {
fn spawn<F>(&mut self, slowjob: &SlowJobPool, push_back: bool, name: &Name, f: F) -> SlowJob
where
F: FnOnce() + Send + Sync + 'static,
F: /*FnOnce()*/Future<Output=()> + Send + 'static,
{
let queue: Strong<Queue> = Strong::new(Task::new(f));
// let f = f.instrument(tracing::info_span!("{}", name));
let queue: Pin<Strong<Queue>> = Strong::pin(Task::new(f));
let mut deque = self.queue
.entry(name.to_owned())
.or_default();
let job = SlowJob {
task: Strong::downgrade(&queue)
task: Strong::pin_downgrade(&queue)
};
if push_back {
deque.push_back(queue);
@ -287,7 +315,7 @@ impl InternalSlowJobPool {
/// and global task counters, so make sure to actually finish the returned jobs if you consume
/// the iterator, or the position in the queue may be off!
#[must_use = "Remember to actually use the returned jobs if you consume the iterator."]
fn next_jobs<'a>(&'a mut self) -> impl Iterator<Item = (Name, Strong<Queue>)> + 'a {
fn next_jobs<'a>(&'a mut self) -> impl Iterator<Item = (Name, Pin<Strong<Queue>>)> + 'a {
let queued = &mut self.queue;
let configs = &mut self.configs;
let global_spawned_and_running = &mut self.global_spawned_and_running;
@ -426,7 +454,7 @@ impl SlowJob {
// run the task, and we only drop the strong side after the task is complete, this is
// a conservative signal that there's no point in cancelling the task, so this has no
// false positives.
let task = self.task.try_upgrade().or(Err(self))?;
let task = Frail::try_pin_upgrade(&self.task).or(Err(self))?;
// Now that the task is upgraded, any attempt by the strong side to mutably access the
// task will fail, so it will assume it's been canceled. This is fine, because we're
// about to cancel it anyway.
@ -448,7 +476,7 @@ impl SlowJob {
}
impl SlowJobPool {
pub fn new(global_limit: u64, jobs_metrics_cnt: usize, threadpool: Arc<ThreadPool>) -> Self {
pub fn new(global_limit: u64, jobs_metrics_cnt: usize, threadpool: /*Arc<*/ThreadPool/*>*/) -> Self {
Self {
internal: Arc::new(Mutex::new(InternalSlowJobPool::new(global_limit, jobs_metrics_cnt))),
threadpool,
@ -482,12 +510,12 @@ impl SlowJobPool {
/// This runs the task, and then checks at the end to see if there are any more tasks to run
/// before returning for good. In cases with lots of tasks, this may help avoid unnecessary
/// context switches or extra threads being spawned unintentionally.
fn spawn_in_threadpool(&self, mut name_task: (Name, Strong<Queue>)) {
fn spawn_in_threadpool(&self, mut name_task: (Name, Pin<Strong<Queue>>)) {
let internal = Arc::clone(&self.internal);
// NOTE: It's important not to use internal until we're in the spawned thread, since the
// lock is probably currently taken!
self.threadpool.spawn(move || {
self.threadpool.execute(move || {
// Repeatedly run until exit; we do things this way to avoid recursion, which might blow
// our call stack.
loop {
@ -499,14 +527,17 @@ impl SlowJobPool {
// difference is minor and it makes it easier to assign metrics to canceled tasks
// (though maybe we don't want to do that?).
let execution_start = Instant::now();
if let Some(mut task) = Strong::try_borrow_mut(&mut task)
if let Some(mut task) = Strong::try_pin_borrow_mut(&mut task)
.ok()
.filter(|task| !task.is_canceled.load(Ordering::Relaxed)) {
// The task was not canceled.
//
// Run the task in its own scope so perf works correctly.
common_base::prof_span_alloc!(_guard, &name);
(task.task)();
futures::executor::block_on(task.as_mut()/* .instrument({
common_base::prof_span!(span, &name);
span
}) */);
}
let execution_end = Instant::now();
let metrics = JobMetrics {
@ -542,7 +573,7 @@ impl SlowJobPool {
#[allow(clippy::result_unit_err)]
pub fn try_run<F>(&self, name: &Name, f: F) -> Result<SlowJob, ()>
where
F: FnOnce() + Send + Sync + 'static,
F: /*FnOnce()*/Future<Output=()> + Send + 'static,
{
let mut lock = self.internal.lock().expect("lock poisoned while try_run");
let lock = &mut *lock;
@ -557,7 +588,7 @@ impl SlowJobPool {
pub fn spawn<F>(&self, name: &Name, f: F) -> SlowJob
where
F: FnOnce() + Send + Sync + 'static,
F: /*FnOnce()*/Future<Output=()> + Send + 'static,
{
self.internal
.lock()
@ -568,7 +599,7 @@ impl SlowJobPool {
/// Spawn at the front of the queue, which is preferrable in some cases.
pub fn spawn_front<F>(&self, name: &Name, f: F) -> SlowJob
where
F: FnOnce() + Send + Sync + 'static,
F: /*FnOnce()*/Future<Output=()> + Send + 'static,
{
self.internal
.lock()

View File

@ -16,6 +16,7 @@ common-net = { package = "veloren-common-net", path = "../net" }
common-ecs = { package = "veloren-common-ecs", path = "../ecs" }
common-base = { package = "veloren-common-base", path = "../base" }
core_affinity = "0.5"
rayon = "1.5"
num_cpus = "1.0"
tracing = { version = "0.1", default-features = false }

View File

@ -6,4 +6,4 @@ mod build_areas;
mod state;
// TODO: breakup state module and remove glob
pub use build_areas::{BuildAreaError, BuildAreas};
pub use state::{BlockChange, State, TerrainChanges};
pub use state::{BlockChange, Pools, State, TerrainChanges};

View File

@ -16,7 +16,7 @@ use common::{
DeltaTime, EntitiesDiedLastTick, GameMode, PlayerEntity, PlayerPhysicsSettings, Time,
TimeOfDay,
},
slowjob::SlowJobPool,
slowjob::{self, SlowJobPool},
terrain::{Block, TerrainChunk, TerrainGrid},
time::DayPeriod,
trade::Trades,
@ -94,38 +94,125 @@ pub struct State {
thread_pool: Arc<ThreadPool>,
}
pub type Pools = (usize, GameMode/*u64*/, Arc<ThreadPool>/*, slowjob::SlowJobPool*/);
impl State {
/// Create a new `State` in client mode.
pub fn client() -> Self { Self::new(GameMode::Client) }
pub fn pools(game_mode: GameMode) -> Pools {
let num_cpu = num_cpus::get()/* - 1*/;
/// Create a new `State` in server mode.
pub fn server() -> Self { Self::new(GameMode::Server) }
pub fn new(game_mode: GameMode) -> Self {
let thread_name_infix = match game_mode {
GameMode::Server => "s",
GameMode::Client => "c",
GameMode::Singleplayer => "sp",
};
let num_cpu = num_cpus::get();
let thread_pool = Arc::new(
let rayon_threads = match game_mode {
GameMode::Server | GameMode::Client => num_cpu / 2,
GameMode::Singleplayer => num_cpu / 4,
}/*num_cpu*/.max(common::consts::MIN_RECOMMENDED_RAYON_THREADS);
let rayon_pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(num_cpu.max(common::consts::MIN_RECOMMENDED_RAYON_THREADS))
.num_threads(rayon_threads)
// .thread_name(move |i| format!("rayon-{}", i))
.thread_name(move |i| format!("rayon-{}-{}", thread_name_infix, i))
.build()
.unwrap(),
);
// let num_cpu = num_cpu as u64;
/* let slow_limit = /* match game_mode {
GameMode::Server | GameMode::Client => num_cpu / 2,
GameMode::Singleplayer => num_cpu / 4
}.max(1); */(2 * num_cpu).max(1); */
/* let slow_limit = 2 * (num_cpu - 1).max(1);
let cores = core_affinity::get_core_ids().unwrap_or(vec![]).into_iter().take(slow_limit).collect::<Vec<_>>();
let floating = slow_limit.saturating_sub(cores.len());
tracing::trace!(?slow_limit, "Slow Thread limit");
let slow_pool = slowjob::ThreadPool::with_affinity(&cores, floating, slowjob::large());
// let slow_pool = slowjob::large_pool(slow_limit.min(64)/* as usize */);
let slowjob = SlowJobPool::new(
slow_limit as u64,
10_000,
/*Arc::clone(*/slow_pool/*)*/,
); */
(num_cpu - 1/*slow_limit*//* as u64*/, game_mode, rayon_pool/*, slowjob*/)
}
/// Create a new `State` in client mode.
pub fn client(pools: Pools) -> Self { Self::new(GameMode::Client, pools) }
/// Create a new `State` in server mode.
pub fn server(pools: Pools) -> Self { Self::new(GameMode::Server, pools) }
pub fn new(ecs_role: GameMode, pools: Pools) -> Self {
/* let thread_name_infix = match game_mode {
GameMode::Server => "s",
GameMode::Client => "c",
GameMode::Singleplayer => "sp",
}; */
let num_cpu = /*num_cpus::get()*/pools.0/* / 2 + pools.0 / 4*/;
let game_mode = pools.1;
/* let rayon_threads = match game_mode {
GameMode::Server | GameMode::Client => num_cpu/* / 2*/,
GameMode::Singleplayer => num_cpu/* / 4*// 2,
}/*num_cpu*/;
let rayon_threads = rayon_threads.max(common::consts::MIN_RECOMMENDED_RAYON_THREADS);
let thread_pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(rayon_threads)
.thread_name(move |i| format!("rayon-{}-{}", thread_name_infix, i))
.build()
.unwrap(),
); */
// let num_cpu = num_cpu as u64;
let (start, step, total) = match (game_mode, ecs_role) {
(_, GameMode::Singleplayer) => todo!("Singleplayer is not a valid ECS role (yet)"),
(GameMode::Server | GameMode::Client, _) => (0, 1, num_cpu),
(GameMode::Singleplayer, GameMode::Server) => (0, 2, num_cpu / 2/* + num_cpu / 4 */),
(GameMode::Singleplayer, GameMode::Client) => (1, 2, num_cpu - num_cpu / 2/* + num_cpu / 4 */),
};
let total = total.max(1);
let cores = core_affinity::get_core_ids().unwrap_or(vec![]).into_iter().skip(start).step_by(step).take(total).collect::<Vec<_>>();
let floating = total.saturating_sub(cores.len());
// TODO: NUMA utils
let slow_pool = slowjob::ThreadPool::with_affinity(&cores, floating, slowjob::large());
/* let slow_pool = if cores.is_empty() {
// We need *some* workers, so just start on all cores.
slowjob::large_pool(1)/*slow_limit.min(64)/* as usize */)*/
} else {
let slow_pool = with_affinity(
cores: &[CoreId],
floating: usize,
parker: P
) -> ThreadPool<P>
} */
/* let slow_limit = match game_mode {
GameMode::Server | GameMode::Client => num_cpu / 2,
GameMode::Singleplayer => num_cpu / 4
}.max(1);/*(/*2 * */num_cpu / 2 + num_cpu / 4).max(1);*/ */
tracing::trace!(?game_mode, ?ecs_role, ?num_cpu, ?start, ?step, ?total, "Slow Thread limit");
// dbg!(game_mode, ecs_role, num_cpu, start, step, total, cores, floating, "Slow Thread limit");
let slowjob = SlowJobPool::new(
/* slow_limit as u64, */
total as u64,
10_000,
/*Arc::clone(*/slow_pool/*)*/,
);
Self {
ecs: Self::setup_ecs_world(game_mode, num_cpu as u64, &thread_pool),
thread_pool,
ecs: Self::setup_ecs_world(ecs_role, /*num_cpu as u64*//*, &thread_pool, *//*pools.1*/slowjob/*pools.3*/),
thread_pool: pools.2,
}
}
/// Creates ecs world and registers all the common components and resources
// TODO: Split up registering into server and client (e.g. move
// EventBus<ServerEvent> to the server)
fn setup_ecs_world(game_mode: GameMode, num_cpu: u64, thread_pool: &Arc<ThreadPool>) -> specs::World {
fn setup_ecs_world(ecs_role: GameMode, /*num_cpu: u64*//*, thread_pool: &Arc<ThreadPool>, */slowjob: SlowJobPool) -> specs::World {
let mut ecs = specs::World::new();
// Uids for sync
ecs.register_sync_marker();
@ -219,18 +306,24 @@ impl State {
ecs.insert(crate::build_areas::BuildAreas::default());
ecs.insert(TerrainChanges::default());
ecs.insert(EventBus::<LocalEvent>::default());
ecs.insert(game_mode);
ecs.insert(ecs_role);
ecs.insert(EventBus::<Outcome>::default());
ecs.insert(common::CachedSpatialGrid::default());
ecs.insert(EntitiesDiedLastTick::default());
let slow_limit = (num_cpu / 2 + num_cpu / 4).max(1);
tracing::trace!(?slow_limit, "Slow Thread limit");
/* let slow_limit = match game_mode {
GameMode::Server | GameMode::Client => num_cpu / 2,
GameMode::Singleplayer => num_cpu / 4
}.max(1); */
// let slow_limit = (num_cpu / 2 + num_cpu / 4).max(1);
/* tracing::trace!(?slow_limit, "Slow Thread limit");
let thread_pool = slowjob::large_pool(slow_limit.min(64) as usize);
ecs.insert(SlowJobPool::new(
slow_limit,
10_000,
Arc::clone(thread_pool),
));
/*Arc::clone(*/thread_pool/*)*/,
)); */
ecs.insert(slowjob);
// TODO: only register on the server
ecs.insert(EventBus::<ServerEvent>::default());
@ -254,7 +347,7 @@ impl State {
};
if let Err(e) = plugin_mgr
.execute_event(&ecs_world, &plugin_api::event::PluginLoadEvent {
game_mode,
game_mode: ecs_role,
})
{
tracing::debug!(?e, "Failed to run plugin init");

View File

@ -28,6 +28,7 @@ hot-reloading = ["server/hot-reloading"]
server = { package = "veloren-server", path = "../server", default-features = false, features = ["simd"] }
common = { package = "veloren-common", path = "../common" }
common-base = { package = "veloren-common-base", path = "../common/base" }
common-state = { package = "veloren-common-state", path = "../common/state" }
common-net = { package = "veloren-common-net", path = "../common/net" }
common-frontend = { package = "veloren-common-frontend", path = "../common/frontend" }

View File

@ -170,6 +170,7 @@ fn main() -> io::Result<()> {
database_settings,
&server_data_dir,
runtime,
common_state::State::pools(common::resources::GameMode::Server),
)
.expect("Failed to create server instance!");

View File

@ -55,7 +55,7 @@ impl ChunkGenerator {
v.insert(Arc::clone(&cancel));
let chunk_tx = self.chunk_tx.clone();
self.metrics.chunks_requested.inc();
slowjob_pool.spawn(&"CHUNK_GENERATOR", move || {
slowjob_pool.spawn(&"CHUNK_GENERATOR", async move {
let index = index.as_index_ref();
let payload = world
.generate_chunk(index, key, || cancel.load(Ordering::Relaxed), Some(time))

View File

@ -2,7 +2,6 @@
#![allow(clippy::option_map_unit_fn)]
#![deny(clippy::clone_on_ref_ptr)]
#![feature(
bool_to_option,
box_patterns,
drain_filter,
label_break_value,
@ -218,6 +217,7 @@ impl Server {
database_settings: DatabaseSettings,
data_dir: &std::path::Path,
runtime: Arc<Runtime>,
pools: common_state::Pools,
) -> Result<Self, Error> {
info!("Server data dir is: {}", data_dir.display());
if settings.auth_server_address.is_none() {
@ -242,7 +242,7 @@ impl Server {
let battlemode_buffer = BattleModeBuffer::default();
let mut state = State::server();
let mut state = State::server(pools);
state.ecs_mut().insert(battlemode_buffer);
state.ecs_mut().insert(settings.clone());
state.ecs_mut().insert(editable_settings);

View File

@ -117,7 +117,7 @@ impl<'a> System<'a> for Sys {
while chunks_iter.peek().is_some() {
let chunks: Vec<_> = chunks_iter.by_ref().take(CHUNK_SIZE).collect();
let chunk_sender = chunk_sender.clone();
slow_jobs.spawn(&"CHUNK_SERIALIZER", move || {
slow_jobs.spawn(&"CHUNK_SERIALIZER", async move {
for (chunk, chunk_key, mut meta) in chunks {
let msg = Client::prepare_chunk_update_msg(
ServerGeneral::TerrainChunkUpdate {

View File

@ -48,6 +48,7 @@ impl ClientInit {
username: String,
password: String,
runtime: Arc<runtime::Runtime>,
pools: common_state::Pools,
) -> Self {
let (tx, rx) = unbounded();
let (trust_tx, trust_rx) = unbounded();
@ -77,6 +78,7 @@ impl ClientInit {
connection_args.clone(),
Arc::clone(&runtime2),
&mut mismatched_server_info,
pools.clone(),
)
.await
{
@ -89,6 +91,7 @@ impl ClientInit {
break 'tries;
}
let _ = tx.send(Msg::Done(Ok(client)));
drop(pools);
tokio::task::block_in_place(move || drop(runtime2));
return;
},
@ -108,12 +111,12 @@ impl ClientInit {
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
// Parsing/host name resolution successful but no connection succeeded
// If last_err is None this typically means there was no server up at the input
// address and all the attempts timed out.
let _ = tx.send(Msg::Done(Err(last_err.unwrap_or(Error::ServerNotFound))));
drop(pools);
// Safe drop runtime
tokio::task::block_in_place(move || drop(runtime2));
});

View File

@ -96,7 +96,7 @@ impl PlayState for MainMenuState {
{
if let Some(singleplayer) = &global_state.singleplayer {
match singleplayer.receiver.try_recv() {
Ok(Ok(())) => {
Ok(Ok(pools)) => {
// Attempt login after the server is finished initializing
attempt_login(
&mut global_state.info_message,
@ -106,6 +106,7 @@ impl PlayState for MainMenuState {
&mut self.init,
&global_state.tokio_runtime,
&global_state.i18n,
Some(pools),
);
},
Ok(Err(e)) => {
@ -301,6 +302,7 @@ impl PlayState for MainMenuState {
&mut self.init,
&global_state.tokio_runtime,
&global_state.i18n,
None,
);
},
MainMenuEvent::CancelLoginAttempt => {
@ -500,6 +502,7 @@ fn attempt_login(
init: &mut InitState,
runtime: &Arc<runtime::Runtime>,
localized_strings: &LocalizationHandle,
pools: Option<common_state::Pools>,
) {
let localization = localized_strings.read();
if let Err(err) = comp::Player::alias_validate(&username) {
@ -530,6 +533,7 @@ fn attempt_login(
username,
password,
Arc::clone(runtime),
pools.unwrap_or_else(|| common_state::State::pools(common::resources::GameMode::Client)),
));
}
}

View File

@ -1013,6 +1013,7 @@ impl Renderer {
);
if self.is_minimized {
self.queue.submit(pre_commands.into_iter());
return Ok(None);
}
@ -1198,17 +1199,21 @@ impl Renderer {
// If lost recreate the swap chain
Err(err @ wgpu::SwapChainError::Lost) => {
warn!("{}. Recreating swap chain. A frame will be missed", err);
self.queue.submit(pre_commands.into_iter());
self.on_resize(self.resolution);
return Ok(None);
},
Err(wgpu::SwapChainError::Timeout) => {
println!("Timeout.");
// This will probably be resolved on the next frame
// NOTE: we don't log this because it happens very frequently with
// PresentMode::Fifo and unlimited FPS on certain machines
self.queue.submit(pre_commands.into_iter());
return Ok(None);
},
Err(err @ wgpu::SwapChainError::Outdated) => {
warn!("{}. Recreating the swapchain", err);
self.queue.submit(pre_commands.into_iter());
self.swap_chain = self.device.create_swap_chain(&self.surface, &self.sc_desc);
return Ok(None);
},

View File

@ -336,7 +336,7 @@ where
let manifests = self.manifests.clone();
let slot_ = Arc::clone(&slot);
slow_jobs.spawn(&"FIGURE_MESHING", move || {
slow_jobs.spawn(&"FIGURE_MESHING", async move {
// First, load all the base vertex data.
let meshes =
<Skel::Body as BodySpec>::bone_meshes(&key, &manifests, extra);

View File

@ -772,7 +772,7 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
(0..=count).for_each(|_| {
let new_atlas_tx = new_atlas_tx.clone();
let texture_fn = renderer.create_texture_raw();
slowjob.spawn(&"IMAGE_PROCESSING", move || {
slowjob.spawn(&"IMAGE_PROCESSING", async move {
// Construct the next atlas on a separate thread. If it doesn't get sent, it means
// the original channel was dropped, which implies the terrain scene data no longer
// exists, so we can just drop the result in that case.
@ -850,7 +850,7 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
if let Some(old) = chunks.insert(pos, chunk) {
Self::remove_chunk_meta(atlas, pos, &old);
// Drop the chunk on another thread.
slowjob.spawn(&"TERRAIN_DROP", move || { drop(old); });
slowjob.spawn(&"TERRAIN_DROP", async move { drop(old); });
}
/* let (zmin, zmax) = chunk.z_bounds;
self.z_index_up.insert(Vec3::from(zmin, pos.x, pos.y));
@ -1316,7 +1316,7 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
/* let create_locals = renderer.create_terrain_bound_locals(); */
let create_texture = renderer./*create_texture_raw*/create_model_lazy_base(wgpu::BufferUsage::COPY_SRC/* | wgpu::BufferUsage::MAP_WRITE */);
/* cnt.fetch_add(1, Ordering::Relaxed); */
let job = move || {
let job = async move {
// Since this loads when the task actually *runs*, rather than when it's
// queued, it provides us with a good opportunity to check whether the chunk
// should be canceled. We might miss updates, but that's okay, since canceling
@ -1395,7 +1395,7 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
// Chunk must have been removed, or it was spawned on an old tick. Drop
// the mesh in the background since it's either out of date or no longer
// needed.
slowjob.spawn(&"TERRAIN_DROP", move || { drop(response); });
slowjob.spawn(&"TERRAIN_DROP", async move { drop(response); });
continue;
}
@ -1500,7 +1500,7 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
},
);
// Drop image on background thread.
slowjob.spawn(&"TERRAIN_DROP", move || { drop(tex); });
slowjob.spawn(&"TERRAIN_DROP", async move { drop(tex); });
// Update the memory mapped locals.
let locals_buffer_ =
@ -1546,7 +1546,7 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
} else {
// Not sure what happened here, but we should drop the result in the
// background.
slowjob.spawn(&"TERRAIN_DROP", move || { drop(response); });
slowjob.spawn(&"TERRAIN_DROP", async move { drop(response); });
}
if response_started_tick == started_tick {
@ -1557,7 +1557,7 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
},
// Old task, drop the response in the background.
None => {
slowjob.spawn(&"TERRAIN_DROP", move || { drop(response); });
slowjob.spawn(&"TERRAIN_DROP", async move { drop(response); });
},
}
}
@ -1565,7 +1565,7 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
drop(locals_buffer);
renderer.unmap_consts(&mut locals);
// Drop buffer on background thread.
slowjob.spawn(&"TERRAIN_DROP", move || { drop(locals); });
slowjob.spawn(&"TERRAIN_DROP", async move { drop(locals); });
/* // TODO: Delay submission, don't just submit immediately out of convenience!
renderer.queue.submit(std::iter::once(encoder.finish())); */
self.command_buffers.push(encoder.finish());
@ -1728,7 +1728,7 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
.drain_filter(|(pos, chunk)| chunks.contains_key(pos) || !can_shadow_sun(*pos, chunk))
.for_each(|(pos, chunk)| {
// Drop the chunk on another thread.
slowjob.spawn(&"TERRAIN_DROP", move || { drop(chunk); });
slowjob.spawn(&"TERRAIN_DROP", async move { drop(chunk); });
});
(visible_light_volume, visible_bounds)
@ -1737,7 +1737,7 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
// shadow chunks around.
let chunks = core::mem::replace(&mut self.shadow_chunks, Vec::new());
// Drop the chunks on another thread.
slowjob.spawn(&"TERRAIN_DROP", move || { drop(chunks); });
slowjob.spawn(&"TERRAIN_DROP", async move { drop(chunks); });
(Vec::new(), math::Aabr {
min: math::Vec2::zero(),
max: math::Vec2::zero(),

View File

@ -22,7 +22,7 @@ const TPS: u64 = 30;
pub struct Singleplayer {
_server_thread: JoinHandle<()>,
stop_server_s: Sender<()>,
pub receiver: Receiver<Result<(), ServerError>>,
pub receiver: Receiver<Result<common_state::Pools, ServerError>>,
// Wether the server is stopped or not
paused: Arc<AtomicBool>,
// Settings that the server was started with
@ -105,14 +105,16 @@ impl Singleplayer {
.spawn(move || {
trace!("starting singleplayer server thread");
let pools = common_state::State::pools(common::resources::GameMode::Singleplayer);
let (server, init_result) = match Server::new(
settings2,
editable_settings,
database_settings,
&server_data_dir,
runtime,
pools.clone(),
) {
Ok(server) => (Some(server), Ok(())),
Ok(server) => (Some(server), Ok(pools)),
Err(err) => (None, Err(err)),
};

View File

@ -89,7 +89,7 @@ impl<K: Hash + Eq + Send + Sync + 'static + Clone, V: Send + Sync + 'static> Key
// approximating that
let tx = self.tx.clone();
let f = f();
let job = pool.spawn(&self.name, move || {
let job = pool.spawn(&self.name, async move {
let v = f(&k);
let _ = tx.send((k, v));
});