put Client::tick_network function behind feature, remove unnecessary cloning of deleted entity vecs in entity sync, move prepare_send hack that avoids locking during message serialization from send_fallible to send, add job.cpu_status par mode adjustment around parallel section

This commit is contained in:
Imbris 2021-12-26 11:48:34 -05:00
parent d137ffba63
commit b255f0ee0f
8 changed files with 34 additions and 45 deletions

View File

@ -15,7 +15,7 @@ test-voxygen = "run --bin veloren-voxygen --no-default-features --features simd,
tracy-voxygen = "-Zunstable-options run --bin veloren-voxygen --no-default-features --features tracy,simd,egui-ui --profile no_overflow" tracy-voxygen = "-Zunstable-options run --bin veloren-voxygen --no-default-features --features tracy,simd,egui-ui --profile no_overflow"
server = "run --bin veloren-server-cli" server = "run --bin veloren-server-cli"
dbg-voxygen = "run --bin veloren-voxygen -Zunstable-options --profile debuginfo" dbg-voxygen = "run --bin veloren-voxygen -Zunstable-options --profile debuginfo"
swarm = "run --bin swarm --features client/bin_bot --" swarm = "run --bin swarm --features client/bin_bot,client/tick_network --"
[env] [env]

View File

@ -9,6 +9,7 @@ simd = ["vek/platform_intrinsics"]
plugins = ["common-state/plugins"] plugins = ["common-state/plugins"]
bin_bot = ["common-ecs", "serde", "ron", "clap", "structopt", "rustyline", "common-frontend", "async-channel"] bin_bot = ["common-ecs", "serde", "ron", "clap", "structopt", "rustyline", "common-frontend", "async-channel"]
tracy = ["common-base/tracy"] tracy = ["common-base/tracy"]
tick_network = []
default = ["simd"] default = ["simd"]
@ -58,4 +59,4 @@ required-features = ["bin_bot"]
[[bin]] [[bin]]
name = "swarm" name = "swarm"
required-features = ["bin_bot"] required-features = ["bin_bot", "tick_network"]

View File

@ -2484,6 +2484,7 @@ impl Client {
/// ///
/// The game state is purposefully not simulated to reduce the overhead of running the client. /// The game state is purposefully not simulated to reduce the overhead of running the client.
/// This method is for use in testing a server with many clients connected. /// This method is for use in testing a server with many clients connected.
#[cfg(feature = "tick_network")]
pub fn tick_network( pub fn tick_network(
&mut self, &mut self,
dt: Duration, dt: Duration,

View File

@ -349,14 +349,6 @@ impl RegionMap {
pub fn iter(&self) -> impl Iterator<Item = (Vec2<i32>, &Region)> { pub fn iter(&self) -> impl Iterator<Item = (Vec2<i32>, &Region)> {
self.regions.iter().map(|(key, r)| (*key, r)) self.regions.iter().map(|(key, r)| (*key, r))
} }
/// Returns a parallel iterator of (Position, Regions)
pub fn par_iter(
&self,
) -> impl rayon::iter::IndexedParallelIterator<Item = (Vec2<i32>, &Region)> {
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
self.regions.par_iter().map(|(key, r)| (*key, r))
}
} }
/// Note vd is in blocks in this case /// Note vd is in blocks in this case

View File

@ -83,7 +83,11 @@ impl Client {
} }
pub(crate) fn send<M: Into<ServerMsg>>(&self, msg: M) -> Result<(), StreamError> { pub(crate) fn send<M: Into<ServerMsg>>(&self, msg: M) -> Result<(), StreamError> {
match msg.into() { // TODO: hack to avoid locking stream mutex while serializing the message,
// remove this when the mutexes on the Streams are removed
let prepared = self.prepare(msg);
self.send_prepared(&prepared)
/*match msg.into() {
ServerMsg::Info(m) => self.register_stream.lock().unwrap().send(m), ServerMsg::Info(m) => self.register_stream.lock().unwrap().send(m),
ServerMsg::Init(m) => self.register_stream.lock().unwrap().send(m), ServerMsg::Init(m) => self.register_stream.lock().unwrap().send(m),
ServerMsg::RegisterAnswer(m) => self.register_stream.lock().unwrap().send(m), ServerMsg::RegisterAnswer(m) => self.register_stream.lock().unwrap().send(m),
@ -133,15 +137,10 @@ impl Client {
} }
}, },
ServerMsg::Ping(m) => self.ping_stream.lock().unwrap().send(m), ServerMsg::Ping(m) => self.ping_stream.lock().unwrap().send(m),
} }*/
} }
pub(crate) fn send_fallible<M: Into<ServerMsg>>(&self, msg: M) { pub(crate) fn send_fallible<M: Into<ServerMsg>>(&self, msg: M) { let _ = self.send(msg); }
// TODO: hack to avoid locking stream mutex while serializing the message,
// remove this when the mutexes on the Streams are removed
let prepared = self.prepare(msg);
let _ = self.send_prepared(&prepared);
}
pub(crate) fn send_prepared(&self, msg: &PreparedMsg) -> Result<(), StreamError> { pub(crate) fn send_prepared(&self, msg: &PreparedMsg) -> Result<(), StreamError> {
match msg.stream_id { match msg.stream_id {

View File

@ -59,7 +59,7 @@ impl<'a> System<'a> for Sys {
const PHASE: Phase = Phase::Create; const PHASE: Phase = Phase::Create;
fn run( fn run(
_job: &mut Job<Self>, job: &mut Job<Self>,
( (
entities, entities,
tick, tick,
@ -108,14 +108,23 @@ impl<'a> System<'a> for Sys {
// Sync physics and other components // Sync physics and other components
// via iterating through regions (in parallel) // via iterating through regions (in parallel)
use rayon::iter::ParallelIterator;
// Pre-collect regions paired with deleted entity list so we can iterate over
// them in parallel below
let regions_and_deleted_entities = region_map
.iter()
.map(|(key, region)| (key, region, deleted_entities.take_deleted_in_region(key)))
.collect::<Vec<_>>();
use rayon::iter::{IntoParallelIterator, ParallelIterator};
job.cpu_stats.measure(common_ecs::ParMode::Rayon);
common_base::prof_span!(guard, "regions"); common_base::prof_span!(guard, "regions");
region_map.par_iter().for_each_init( regions_and_deleted_entities.into_par_iter().for_each_init(
|| { || {
common_base::prof_span!(guard, "entity sync rayon job"); common_base::prof_span!(guard, "entity sync rayon job");
guard guard
}, },
|_guard, (key, region)| { |_guard, (key, region, deleted_entities_in_region)| {
// Assemble subscriber list for this region by iterating through clients and // Assemble subscriber list for this region by iterating through clients and
// checking if they are subscribed to this region // checking if they are subscribed to this region
let mut subscribers = ( let mut subscribers = (
@ -192,10 +201,7 @@ impl<'a> System<'a> for Sys {
let (entity_sync_package, comp_sync_package) = trackers.create_sync_packages( let (entity_sync_package, comp_sync_package) = trackers.create_sync_packages(
&tracked_comps, &tracked_comps,
region.entities(), region.entities(),
deleted_entities deleted_entities_in_region,
.get_deleted_in_region(key)
.cloned() // TODO: quick hack to make this parallel, we can avoid this
.unwrap_or_default(),
); );
// We lazily initialize the the synchronization messages in case there are no // We lazily initialize the the synchronization messages in case there are no
// clients. // clients.
@ -300,14 +306,7 @@ impl<'a> System<'a> for Sys {
}, },
); );
drop(guard); drop(guard);
job.cpu_stats.measure(common_ecs::ParMode::Single);
// TODO: this is a quick hack to make the loop over regions above parallel,
// there might is probably a way to make it cleaner
//
// Remove delete entities for each region that we alread handled above
region_map
.iter()
.for_each(|(key, _)| drop(deleted_entities.take_deleted_in_region(key)));
// Update the last physics components for each entity // Update the last physics components for each entity
for (_, &pos, vel, ori, last_pos, last_vel, last_ori) in ( for (_, &pos, vel, ori, last_pos, last_vel, last_ori) in (

View File

@ -430,14 +430,15 @@ impl DeletedEntities {
.push(uid.into()); .push(uid.into());
} }
pub fn take_deleted_in_region(&mut self, key: Vec2<i32>) -> Option<Vec<u64>> { pub fn take_deleted_in_region(&mut self, key: Vec2<i32>) -> Vec<u64> {
self.map.remove(&key) self.map.remove(&key).unwrap_or_default()
} }
pub fn get_deleted_in_region(&self, key: Vec2<i32>) -> Option<&Vec<u64>> { self.map.get(&key) } pub fn get_deleted_in_region(&self, key: Vec2<i32>) -> &[u64] {
self.map.get(&key).map_or(&[], |v| v.as_slice())
}
pub fn take_remaining_deleted(&mut self) -> Vec<(Vec2<i32>, Vec<u64>)> { pub fn take_remaining_deleted(&mut self) -> impl Iterator<Item = (Vec2<i32>, Vec<u64>)> + '_ {
// TODO: don't allocate self.map.drain()
self.map.drain().collect()
} }
} }

View File

@ -161,11 +161,7 @@ impl<'a> System<'a> for Sys {
} }
// Send deleted entities since they won't be processed for this client in entity // Send deleted entities since they won't be processed for this client in entity
// sync // sync
for uid in deleted_entities for uid in deleted_entities.get_deleted_in_region(key).iter() {
.get_deleted_in_region(key)
.iter()
.flat_map(|v| v.iter())
{
client.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid))); client.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid)));
} }
} }