Asynchronously wait if jobs are using too many buffers at once, and drop

tasks from a frame all in one job to maximize the likelihood that it
actually runs.
This commit is contained in:
Joshua Yanovski 2022-08-30 12:19:29 -07:00
parent 7b1c62acf7
commit 83ca01e42d
8 changed files with 102 additions and 47 deletions

3
Cargo.lock generated
View File

@ -1831,8 +1831,7 @@ checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71"
[[package]]
name = "executors"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a94f9d4d0b228598ba8db61d2fe9321df9afcaa10aabdc26aa183cc76b10f7d"
source = "git+https://github.com/pythonesque/rust-executors.git?rev=1d08f96cf9c52ed85eda8e0f68b5d43072d39b14#1d08f96cf9c52ed85eda8e0f68b5d43072d39b14"
dependencies = [
"arr_macro",
"async-task",

View File

@ -128,6 +128,9 @@ wgpu = { git = "https://github.com/pythonesque/wgpu.git", rev = "50a14ff9d164182
# wgpu-core = { path = "../wgpu/wgpu-core" }
# wgpu-types = { path = "../wgpu/wgpu-types" }
executors = { git = "https://github.com/pythonesque/rust-executors.git", rev = "1d08f96cf9c52ed85eda8e0f68b5d43072d39b14" }
# executors = { path = "../rust-executors/executors" }
# # use the latest fixes in naga (remove when updates trickle down to wgpu-rs)
# naga = { git = "https://github.com/gfx-rs/naga.git", rev = "3a0f0144112ff621dd7f731bf455adf6cab19164" }
# # use the latest fixes in gfx (remove when updates trickle down to wgpu-rs)

View File

@ -9,7 +9,7 @@ use core::{
task,
};
use executors::{
crossbeam_workstealing_pool::ThreadPool as ThreadPool_,
crossbeam_workstealing_pool::{self as executor, ThreadPool as ThreadPool_},
parker::{LargeThreadData, StaticParker},
Executor,
};
@ -519,7 +519,7 @@ impl SlowJobPool {
// Repeatedly run until exit; we do things this way to avoid recursion, which might blow
// our call stack.
loop {
let (name, mut task) = name_task;
let (name, /*mut */task) = name_task;
let queue_created = task.queue_created;
// See the [SlowJob::cancel] method for justification for this step's correctness.
//
@ -527,17 +527,38 @@ impl SlowJobPool {
// difference is minor and it makes it easier to assign metrics to canceled tasks
// (though maybe we don't want to do that?).
let execution_start = Instant::now();
if let Some(mut task) = Strong::try_pin_borrow_mut(&mut task)
.ok()
.filter(|task| !task.is_canceled.load(Ordering::Relaxed)) {
// The task was not canceled.
//
{
// Run the task in its own scope so perf works correctly.
common_base::prof_span_alloc!(_guard, &name);
futures::executor::block_on(task.as_mut()/* .instrument({
common_base::prof_span!(span, &name);
span
}) */);
struct Job(Pin<Strong<Queue>>);
impl Future for Job {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
if let Some(mut task) = Strong::try_pin_borrow_mut(&mut self.get_mut().0)
.ok()
.filter(|task| !task.is_canceled.load(Ordering::Relaxed)) {
// The task was not canceled.
task.as_mut()/* .instrument({
common_base::prof_span!(span, &name);
span
}) */.poll(cx)
} else {
task::Poll::Ready(())
}
}
}
executor::run_locally(/*async move {
if let Some(mut task) = Strong::try_pin_borrow_mut(&mut task)
.ok()
.filter(|task| !task.is_canceled.load(Ordering::Relaxed)) {
// The task was not canceled.
task.as_mut()/* .instrument({
common_base::prof_span!(span, &name);
span
}) */.await;
}
}*/Job(task)).detach();
}
let execution_end = Instant::now();
let metrics = JobMetrics {

View File

@ -127,7 +127,7 @@ serde = {version = "1.0", features = [ "rc", "derive" ]}
slab = "0.4.2"
strum = { version = "0.24", features = ["derive"] }
treeculler = "0.2"
tokio = { version = "1.14", default-features = false, features = ["rt-multi-thread"] }
tokio = { version = "1.14", default-features = false, features = ["rt-multi-thread", "sync"] }
num_cpus = "1.0"
# vec_map = { version = "0.8.2" }
inline_tweak = "1.0.2"

View File

@ -81,7 +81,7 @@ pub struct GreedyConfig<D, FV, FA, FL, FG, FO, FS, FP, FT> {
/// coloring part as a continuation. When called with a final tile size and
/// vector, the continuation will consume the color data and write it to the
/// vector.
pub type SuspendedMesh<'a> = dyn for<'r> FnOnce(/*&'r mut ColLightInfo*/(&'r mut [[u8; 4]], Vec2<u16>)) + 'a;
pub type SuspendedMesh<'a> = dyn for<'r> FnOnce(/*&'r mut ColLightInfo*/(&'r mut [[u8; 4]], Vec2<u16>)) + Send + 'a;
/// Abstraction over different atlas allocators. Useful to swap out the
/// allocator implementation for specific cases (e.g. sprites).
@ -389,18 +389,18 @@ impl<'a, Allocator: AtlasAllocator> GreedyMesh<'a, Allocator> {
/// Returns an estimate of the bounds of the current meshed model.
///
/// For more information on the config parameter, see [GreedyConfig].
pub fn push<M: PartialEq, D: 'a, V: 'a, FV, FA, FL, FG, FO, FS, FP, FT>(
pub fn push<M: PartialEq, D: Send + 'a, V: Send + 'a, FV, FA, FL, FG, FO, FS, FP, FT>(
&mut self,
config: GreedyConfig<D, FV, FA, FL, FG, FO, FS, FP, FT>,
) where
FV: for<'r> FnMut(&'r mut D, Vec3<i32>) -> V + 'a,
FA: for<'r> FnMut(&'r mut D, Vec3<i32>) -> f32 + 'a,
FL: for<'r> FnMut(&'r mut D, Vec3<i32>) -> f32 + 'a,
FG: for<'r> FnMut(&'r mut D, Vec3<i32>) -> f32 + 'a,
FO: for<'r> FnMut(&'r mut D, Vec3<i32>) -> bool + 'a,
FA: for<'r> FnMut(&'r mut D, Vec3<i32>) -> f32 + Send + 'a,
FL: for<'r> FnMut(&'r mut D, Vec3<i32>) -> f32 + Send + 'a,
FG: for<'r> FnMut(&'r mut D, Vec3<i32>) -> f32 + Send + 'a,
FO: for<'r> FnMut(&'r mut D, Vec3<i32>) -> bool + Send + 'a,
FS: for<'r> FnMut(&'r mut D, Vec3<i32>, V, V, /*Vec3<i32>, */Vec2<Vec3<i32>>) -> Option<(bool, M)>,
FP: FnMut(Vec2<u16>, Vec2<Vec2<u16>>, Vec3<f32>, Vec2<Vec3<f32>>, Vec3<f32>, &M),
FT: for<'r> FnMut(&'r mut D, Vec3<i32>, u8, u8, bool) -> [u8; 4] + 'a,
FT: for<'r> FnMut(&'r mut D, Vec3<i32>, u8, u8, bool) -> [u8; 4] + Send + 'a,
{
span!(_guard, "push", "GreedyMesh::push");
let cont = greedy_mesh(
@ -458,7 +458,7 @@ impl<'a, Allocator: AtlasAllocator> GreedyMesh<'a, Allocator> {
pub fn max_size(&self) -> Vec2<u16> { self.max_size }
}
fn greedy_mesh<'a, M: PartialEq, D: 'a, V: 'a, FV, FA, FL, FG, FO, FS, FP, FT, Allocator: AtlasAllocator>(
fn greedy_mesh<'a, M: PartialEq, D: Send + 'a, V: Send + 'a, FV, FA, FL, FG, FO, FS, FP, FT, Allocator: AtlasAllocator>(
atlas: &mut Allocator,
col_lights_size: &mut Vec2<u16>,
max_size: Vec2<u16>,
@ -479,13 +479,13 @@ fn greedy_mesh<'a, M: PartialEq, D: 'a, V: 'a, FV, FA, FL, FG, FO, FS, FP, FT, A
) -> Box<SuspendedMesh<'a>>
where
FV: for<'r> FnMut(&'r mut D, Vec3<i32>) -> V + 'a,
FA: for<'r> FnMut(&'r mut D, Vec3<i32>) -> f32 + 'a,
FL: for<'r> FnMut(&'r mut D, Vec3<i32>) -> f32 + 'a,
FG: for<'r> FnMut(&'r mut D, Vec3<i32>) -> f32 + 'a,
FO: for<'r> FnMut(&'r mut D, Vec3<i32>) -> bool + 'a,
FA: for<'r> FnMut(&'r mut D, Vec3<i32>) -> f32 + Send + 'a,
FL: for<'r> FnMut(&'r mut D, Vec3<i32>) -> f32 + Send + 'a,
FG: for<'r> FnMut(&'r mut D, Vec3<i32>) -> f32 + Send + 'a,
FO: for<'r> FnMut(&'r mut D, Vec3<i32>) -> bool + Send + 'a,
FS: for<'r> FnMut(&'r mut D, Vec3<i32>, V, V, /*Vec3<i32>, */Vec2<Vec3<i32>>) -> Option<(bool, M)>,
FP: FnMut(Vec2<u16>, Vec2<Vec2<u16>>, Vec3<f32>, Vec2<Vec3<f32>>, Vec3<f32>, &M),
FT: for<'r> FnMut(&'r mut D, Vec3<i32>, u8, u8, bool) -> [u8; 4] + 'a,
FT: for<'r> FnMut(&'r mut D, Vec3<i32>, u8, u8, bool) -> [u8; 4] + Send + 'a,
{
span!(_guard, "greedy_mesh");
// TODO: Collect information to see if we can choose a good value here.

View File

@ -27,7 +27,7 @@ pub fn generate_mesh_base_vol_terrain<'a: 'b, 'b, V: 'a>(
),
) -> MeshGen<TerrainVertex, TerrainVertex, TerrainVertex, math::Aabb<f32>>
where
V: BaseVol<Vox = Cell> + ReadVol + SizedVol,
V: BaseVol<Vox = Cell> + ReadVol + Send + SizedVol,
{
assert!(bone_idx <= 15, "Bone index for figures must be in [0, 15]");
let max_size = greedy.max_size();
@ -172,7 +172,7 @@ pub fn generate_mesh_base_vol_sprite<'a: 'b, 'b, V: 'a>(
),
) -> MeshGen<SpriteVertex, SpriteVertex, TerrainVertex, ()>
where
V: BaseVol<Vox = Cell> + ReadVol + SizedVol,
V: BaseVol<Vox = Cell> + ReadVol + Send + SizedVol,
{
let max_size = greedy.max_size();
// NOTE: Required because we steal two bits from the normal in the shadow uint
@ -311,7 +311,7 @@ pub fn generate_mesh_base_vol_particle<'a: 'b, 'b, V: 'a>(
greedy: &'b mut GreedyMesh<'a>,
) -> MeshGen<ParticleVertex, ParticleVertex, TerrainVertex, ()>
where
V: BaseVol<Vox = Cell> + ReadVol + SizedVol,
V: BaseVol<Vox = Cell> + ReadVol + Send + SizedVol,
{
let max_size = greedy.max_size();
// NOTE: Required because we steal two bits from the normal in the shadow uint

View File

@ -1,5 +1,6 @@
#![allow(clippy::clone_on_copy)] // TODO: fix after wgpu branch
use core::future::Future;
use crate::{
mesh::{
greedy::{self, GreedyConfig, GreedyMesh},
@ -330,9 +331,9 @@ type V = TerrainChunk;
#[allow(clippy::type_complexity)]
#[inline(always)]
pub fn generate_mesh<'a/*, V: RectRasterableVol<Vox = Block> + ReadVol + Debug + 'static*/>(
pub async fn generate_mesh<'a/*, V: RectRasterableVol<Vox = Block> + ReadVol + Debug + 'static*/, F: Future<Output=Option<Model<[u8; 4]>>> + 'a>(
vol: &'a VolGrid2d<V>,
create_texture: impl Fn(usize) -> Option<Model<[u8; 4]>>,
create_texture: impl FnOnce(usize) -> /*Option<Model<[u8; 4]>>*/F + Send,
(range, max_texture_size, boi): (Aabb<i32>, Vec2<u16>, &'a BlocksOfInterest),
) -> MeshGen<
TerrainVertex,
@ -1002,7 +1003,7 @@ pub fn generate_mesh<'a/*, V: RectRasterableVol<Vox = Block> + ReadVol + Debug +
Vec2::new((wgpu::COPY_BYTES_PER_ROW_ALIGNMENT / 4) as u16, 1),
);
// Allocate the fresh mesh.
let mut col_lights = create_texture(col_lights_alloc_size);
let mut col_lights = create_texture(col_lights_alloc_size).await;
let col_lights_size = col_lights.as_mut().map(|col_lights| {
let slice = col_lights.get_mapped_mut(0, col_lights.len());
let mut buf = slice.get_mapped_range_mut();

View File

@ -41,6 +41,7 @@ use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tracing::warn;
use treeculler::{BVol, Frustum, AABB};
use vek::*;
@ -134,6 +135,8 @@ pub struct MeshWorkerResponseMesh {
/// NOTE: These are memory mapped, and must be unmapped!
/* locals: pipelines::terrain::BoundLocals, */
col_lights_info: /*ColLightInfo*/(Option<Model<[u8; 4]>>, Vec2<u16>),
/// Permit indicating that a certain amount of temporary buffer space is in use.
permit: Option<OwnedSemaphorePermit>,
light_map: LightMapFn,
glow_map: LightMapFn,
}
@ -245,7 +248,7 @@ type V = TerrainChunk;
/// skip_remesh is either None (do the full remesh, including recomputing the
/// light map), or Some((light_map, glow_map)).
fn mesh_worker/*<V: BaseVol<Vox = Block> + RectRasterableVol + ReadVol + Debug + 'static>*/(
async fn mesh_worker/*<V: BaseVol<Vox = Block> + RectRasterableVol + ReadVol + Debug + 'static>*/<'b>(
pos: Vec2<i32>,
z_bounds: (f32, f32),
skip_remesh: Option<(LightMapFn, LightMapFn)>,
@ -254,13 +257,14 @@ fn mesh_worker/*<V: BaseVol<Vox = Block> + RectRasterableVol + ReadVol + Debug +
max_texture_size: u16,
chunk: Arc<TerrainChunk>,
range: Aabb<i32>,
sprite_data: &HashMap<(SpriteKind, usize), [SpriteData; SPRITE_LOD_LEVELS]>,
sprite_config: &SpriteSpec,
sprite_data: &'b HashMap<(SpriteKind, usize), [SpriteData; SPRITE_LOD_LEVELS]>,
sprite_config: &'b SpriteSpec,
mem_semaphore: Arc<Semaphore>,
create_opaque: impl for<'a> Fn(&'a Mesh<TerrainVertex>) -> Option<Model<TerrainVertex>>,
create_fluid: impl for<'a> Fn(&'a Mesh<FluidVertex>) -> Option<Model<FluidVertex>>,
create_instances: impl for<'a> Fn(/* &'a [SpriteInstance] */usize) -> Instances<SpriteInstance>,
/* create_locals: impl Fn() -> pipelines::terrain::BoundLocals, */
create_texture: impl for<'a> Fn(/* wgpu::TextureDescriptor<'a>, wgpu::TextureViewDescriptor<'a>, wgpu::SamplerDescriptor<'a>*//*&'a Mesh<[u8; 4]>*/usize) -> /*Texture + Send + Sync*/Option<Model<[u8; 4]>>,
create_texture: impl for<'a> Fn(/* wgpu::TextureDescriptor<'a>, wgpu::TextureViewDescriptor<'a>, wgpu::SamplerDescriptor<'a>*//*&'a Mesh<[u8; 4]>*/usize) -> /*Texture + Send + Sync*/Option<Model<[u8; 4]>> + Send,
) -> MeshWorkerResponse {
span!(_guard, "mesh_worker");
let (blocks_of_interest, sprite_kinds) = BlocksOfInterest::from_chunk(&chunk)/*default()*/;
@ -272,21 +276,33 @@ fn mesh_worker/*<V: BaseVol<Vox = Block> + RectRasterableVol + ReadVol + Debug +
range.max.z = chunk.get_max_z() + 2;
let z_bounds = (range.min.z, range.max.z);
let mut permit = None;
let mesh;
let (light_map, glow_map) = if let Some((light_map, glow_map)) = &skip_remesh {
mesh = None;
(&**light_map, &**glow_map)
} else {
let permit_ = &mut permit;
let (opaque_mesh, fluid_mesh, _shadow_mesh, (bounds, col_lights_info, light_map, glow_map)) =
generate_mesh(
&volume,
create_texture,
move |size| async move {
if size > 0 {
// Allocate a number of texture blocks in MiB, to avoid allocating so many
// temporary buffers that we run out of GPU memory.
let size_mb = (size >> 20) as u32 + 1;
// If for some reason the semaphore is closed, we just won't allocate
// texture space.
*permit_ = Some(Semaphore::acquire_many_owned(mem_semaphore, size_mb).await.ok()?);
}
create_texture(size)
},
(
range,
Vec2::new(max_texture_size, max_texture_size),
&blocks_of_interest,
),
);
).await;
/* let mut tex_ = Mesh::new();
*tex_.vertices_mut_vec() = col_lights_info.0;
let tex = create_texture(&tex_); */
@ -302,6 +318,7 @@ fn mesh_worker/*<V: BaseVol<Vox = Block> + RectRasterableVol + ReadVol + Debug +
fluid_model: create_fluid(&fluid_mesh),
/* locals: create_locals(), */
col_lights_info/*: (tex, col_lights_info.1)*/,
permit,
light_map,
glow_map,
});
@ -469,6 +486,8 @@ pub struct Terrain<V: RectRasterableVol = TerrainChunk> {
mesh_recv: channel::Receiver<MeshWorkerResponse>,
new_atlas_tx: channel::Sender<Texture>,
new_atlas_rx: channel::Receiver<Texture>,
/// Number of (max) 1 MiB buffers available.
mem_semaphore: Arc<Semaphore>,
mesh_todo: HashMap<Vec2<i32>, ChunkMeshState>,
mesh_todos_active: Arc<AtomicU64>,
mesh_recv_overflow: f32,
@ -711,6 +730,9 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
// available).
const EXTRA_ATLAS_COUNT: usize = 1;
// Number of 1 MiB blocks available for temporary texture maps.
const TEXTURE_STAGING_SIZE_MB: usize = /*0x800*/0x100;
// Create a second mpsc pair for offloading atlas allocation to a second thread. This way,
// a second thread is usually ready to produce a new atlas the moment we ask for it, so we
// avoid waiting longer than necessary. The channel holds just BACKGROUND_ATLASE_COUNT
@ -735,6 +757,7 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
mesh_recv: recv,
new_atlas_tx,
new_atlas_rx,
mem_semaphore: Arc::new(Semaphore::new(TEXTURE_STAGING_SIZE_MB)),
mesh_todo: HashMap::default(),
mesh_todos_active: Arc::new(AtomicU64::new(0)),
mesh_recv_overflow: 0.0,
@ -1310,6 +1333,7 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
let sprite_data = Arc::clone(&self.sprite_data);
let sprite_config = Arc::clone(&self.sprite_config);
let cnt = Arc::clone(&self.mesh_todos_active);
let mem_semaphore = Arc::clone(&self.mem_semaphore);
let create_opaque = renderer.create_model_lazy(wgpu::BufferUsage::VERTEX);
let create_fluid = renderer.create_model_lazy(wgpu::BufferUsage::VERTEX);
let create_instances = renderer.create_instances_lazy();
@ -1336,12 +1360,13 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
aabb,
&sprite_data,
&sprite_config,
mem_semaphore,
create_opaque,
create_fluid,
create_instances,
/* create_locals, */
create_texture,
));
).await);
cnt.fetch_add(1, Ordering::Relaxed);
}
/* cnt.fetch_sub(1, Ordering::Relaxed); */
@ -1369,12 +1394,14 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
let mesh_recv = &self.mesh_recv;
let max_recv_count = self.mesh_todos_active.load(Ordering::Relaxed)/*.min(recv_count.floor() as u64)*/;
let incoming_chunks =
std::iter::from_fn(|| mesh_recv.try_recv().ok())
std::iter::from_fn(|| mesh_recv.recv().ok())
.take(/* recv_count.floor() as usize */max_recv_count as usize);
self.mesh_todos_active.fetch_sub(max_recv_count, Ordering::Relaxed);
if max_recv_count > 0 {
// Construct a buffer for all the chunks we're going to process in this frame. There might
// be some unused slots, which is fine.
let mut drop_textures = Vec::new();
let mut drop_responses = Vec::new();
let mut locals = /*Arc::new(*/renderer.create_consts_mapped(wgpu::BufferUsage::empty(), max_recv_count as usize)/*)*/;
let mut locals_bound = renderer.create_terrain_bound_locals(&locals/*, locals_offset */);
let mut locals_buffer = locals.get_mapped_mut(0, locals.len());
@ -1396,7 +1423,7 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
// Chunk must have been removed, or it was spawned on an old tick. Drop
// the mesh in the background since it's either out of date or no longer
// needed.
slowjob.spawn(&"TERRAIN_DROP", async move { drop(response); });
drop_responses.push(response);
continue;
}
@ -1501,7 +1528,7 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
},
);
// Drop image on background thread.
slowjob.spawn(&"TERRAIN_DROP", async move { drop(tex); });
drop_textures.push((tex, mesh.permit));
// Update the memory mapped locals.
let locals_buffer_ =
@ -1548,7 +1575,7 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
} else {
// Not sure what happened here, but we should drop the result in the
// background.
slowjob.spawn(&"TERRAIN_DROP", async move { drop(response); });
drop_responses.push(response);
}
if response_started_tick == started_tick {
@ -1559,15 +1586,19 @@ impl/*<V: RectRasterableVol>*/ Terrain<V> {
},
// Old task, drop the response in the background.
None => {
slowjob.spawn(&"TERRAIN_DROP", async move { drop(response); });
drop_responses.push(response);
},
}
}
// Drop the memory mapping and unmap the locals.
drop(locals_buffer);
renderer.unmap_consts(&mut locals);
// Drop buffer on background thread.
slowjob.spawn(&"TERRAIN_DROP", async move { drop(locals); });
// Drop buffers on background thread.
slowjob.spawn(&"TERRAIN_DROP", async move {
drop(drop_responses);
drop(drop_textures);
drop(locals);
});
/* // TODO: Delay submission, don't just submit immediately out of convenience!
renderer.queue.submit(std::iter::once(encoder.finish())); */
self.command_buffers.push(encoder.finish());