diff --git a/.cargo/config b/.cargo/config index f85f7ca37c..9aaa76436a 100644 --- a/.cargo/config +++ b/.cargo/config @@ -15,6 +15,7 @@ test-voxygen = "run --bin veloren-voxygen --no-default-features --features simd, tracy-voxygen = "-Zunstable-options run --bin veloren-voxygen --no-default-features --features tracy,simd,egui-ui --profile no_overflow" server = "run --bin veloren-server-cli" dbg-voxygen = "run --bin veloren-voxygen -Zunstable-options --profile debuginfo" +swarm = "run --bin swarm --features client/bin_bot,client/tick_network --" [env] diff --git a/CHANGELOG.md b/CHANGELOG.md index 06727e531e..04ee2a1f28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Arbitrary volume entities - New outfit for merchants - Nightly linux Aarch64 builds are now produced (distribution via airshipper will follow soon) +- Worldgen wildlife density modifier in features.ron ### Changed @@ -44,6 +45,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Explosions can now have a nonzero minimum falloff - EXP on kill is now shared based on damage contribution - Dungeons have somewhat proper scaling. The higher the dungeon the harder it gets, Cultist staying unchanged while Mino is now at its level. +- Parallelized entity sync system on the server ### Removed diff --git a/Cargo.lock b/Cargo.lock index d6f09b79ca..ffd6f77b0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2620,6 +2620,7 @@ checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" dependencies = [ "autocfg", "hashbrown 0.11.2", + "rayon", "serde", ] @@ -6124,6 +6125,7 @@ dependencies = [ "rustyline", "serde", "specs", + "structopt", "termcolor", "tokio", "tracing", diff --git a/assets/world/features.ron b/assets/world/features.ron index 65c15f24dd..2c4ed7198b 100644 --- a/assets/world/features.ron +++ b/assets/world/features.ron @@ -10,4 +10,5 @@ paths: true, spots: true, site2: false, + wildlife_density: 1.0, ) diff --git a/client/Cargo.toml b/client/Cargo.toml index 8a39eb58d3..d48889a0b3 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -7,8 +7,9 @@ edition = "2018" [features] simd = ["vek/platform_intrinsics"] plugins = ["common-state/plugins"] -bin_bot = ["common-ecs", "serde", "ron", "clap", "rustyline", "common-frontend", "async-channel"] +bin_bot = ["common-ecs", "serde", "ron", "clap", "structopt", "rustyline", "common-frontend", "async-channel"] tracy = ["common-base/tracy"] +tick_network = [] default = ["simd"] @@ -39,6 +40,7 @@ common-ecs = { package = "veloren-common-ecs", path = "../common/ecs", optional serde = { version = "1.0", features = [ "rc", "derive" ], optional = true } ron = { version = "0.7", default-features = false, optional = true } clap = { version = "2.33", optional = true } +structopt = { version = "0.3.13", optional = true } rustyline = { version = "9.0.0", optional = true } ## logging termcolor = { version = "1.1", optional = true } @@ -54,3 +56,7 @@ required-features = ["bin_bot"] name = "bot" #authors = ["Avi Weinstock "] required-features = ["bin_bot"] + +[[bin]] +name = "swarm" +required-features = ["bin_bot", "tick_network"] diff --git a/client/examples/chat-cli/main.rs b/client/examples/chat-cli/main.rs index 92aff3c5cb..ee79e3bc3f 100644 --- a/client/examples/chat-cli/main.rs +++ b/client/examples/chat-cli/main.rs @@ -56,7 +56,7 @@ fn main() { println!("Server info: {:?}", client.server_info()); - println!("Players online: {:?}", client.get_players()); + println!("Players online: {:?}", client.players().collect::>()); runtime .block_on(client.register(username, password, |provider| { diff --git a/client/src/bin/swarm/main.rs b/client/src/bin/swarm/main.rs new file mode 100644 index 0000000000..690ff39266 --- /dev/null +++ b/client/src/bin/swarm/main.rs @@ -0,0 +1,302 @@ +use common::comp; +use hashbrown::HashSet; +use std::{ + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + thread, + time::{Duration, SystemTime}, +}; +use structopt::StructOpt; +use tokio::runtime::Runtime; +use vek::*; +use veloren_client::{addr::ConnectionArgs, Client}; + +#[derive(Clone, Copy, StructOpt)] +struct Opt { + /// Number of clients to spin up + size: u32, + /// View distance of each client + vd: u32, + /// Distribution of the clients, if not clustered they are dispersed + #[structopt(short, long)] + clustered: bool, + /// Whether the clients should move + #[structopt(short, long)] + movement: bool, +} + +fn main() { + let opt = Opt::from_args(); + // Start logging + common_frontend::init_stdout(None); + // Run clients and stuff + // + // NOTE: "swarm0" is assumed to be an admin already + // + // Since this requires a no-auth server use this command to add swarm0 as an + // admin: + // + // --no-auth admin add swarm0 Admin + // + let admin_username = "swarm0".to_owned(); + let usernames = (1..opt.size) + .map(|i| format!("swarm{}", i)) + .collect::>(); + let to_adminify = usernames.clone(); + + let finished_init = Arc::new(AtomicU32::new(0)); + let runtime = Arc::new(Runtime::new().unwrap()); + + // TODO: calculate and log the required chunks per second to maintain the + // selected scenario with full vd loaded + + run_client_new_thread( + admin_username, + 0, + to_adminify, + &runtime, + opt, + &finished_init, + ); + + usernames.into_iter().enumerate().for_each(|(index, name)| { + run_client_new_thread( + name, + index as u32, + Vec::new(), + &runtime, + opt, + &finished_init, + ); + }); + + loop { + thread::sleep(Duration::from_secs_f32(1.0)); + } +} + +fn run_client_new_thread( + username: String, + index: u32, + to_adminify: Vec, + runtime: &Arc, + opt: Opt, + finished_init: &Arc, +) { + let runtime = Arc::clone(runtime); + let finished_init = Arc::clone(finished_init); + thread::spawn(move || { + if let Err(err) = run_client(username, index, to_adminify, runtime, opt, finished_init) { + tracing::error!("swarm member {} exited with an error: {:?}", index, err); + } + }); +} + +fn run_client( + username: String, + index: u32, + to_adminify: Vec, + runtime: Arc, + opt: Opt, + finished_init: Arc, +) -> Result<(), veloren_client::Error> { + // Connect to localhost + let addr = ConnectionArgs::Tcp { + prefer_ipv6: false, + hostname: "localhost".into(), + }; + let runtime_clone = Arc::clone(&runtime); + let mut client = runtime + .block_on(Client::new(addr, runtime_clone, &mut None)) + .expect("Failed to connect to the server"); + client.set_view_distance(opt.vd); + + // Login + // NOTE: use a no-auth server + runtime + .block_on(client.register(username.clone(), String::new(), |_| false)) + .expect("Failed to log in"); + + let mut clock = common::clock::Clock::new(Duration::from_secs_f32(1.0 / 30.0)); + + let mut tick = |client: &mut Client| -> Result<(), veloren_client::Error> { + clock.tick(); + client.tick_network(clock.dt())?; + Ok(()) + }; + + // Wait for character list to load + client.load_character_list(); + while client.character_list().loading { + tick(&mut client)?; + } + + // Create character if none exist + if client.character_list().characters.is_empty() { + client.create_character( + username.clone(), + Some("common.items.weapons.sword.starter".into()), + None, + body(), + ); + + client.load_character_list(); + + while client.character_list().loading || client.character_list().characters.is_empty() { + tick(&mut client)?; + } + } + + // Select the first character + client.request_character( + client + .character_list() + .characters + .first() + .expect("Just created new character if non were listed!!!") + .character + .id + .expect("Why is this an option?"), + ); + + // If this is the admin client then adminify the other swarm members + if !to_adminify.is_empty() { + // Wait for other clients to connect + loop { + tick(&mut client)?; + // NOTE: it's expected that each swarm member will have a unique alias + let players = client.players().collect::>(); + if to_adminify + .iter() + .all(|name| players.contains(&name.as_str())) + { + break; + } + } + // Assert that we are a moderator (assumes we are an admin if so) + assert!( + client.is_moderator(), + "The user needs to ensure \"{}\" is registered as an admin on the server", + username + ); + // Send commands to adminify others + to_adminify.iter().for_each(|name| { + client.send_command("adminify".into(), vec![name.into(), "admin".into()]) + }); + } + + // Wait for moderator + while !client.is_moderator() { + tick(&mut client)?; + } + + finished_init.fetch_add(1, Ordering::Relaxed); + // Wait for initialization of all other swarm clients to finish + while !finished_init.load(Ordering::Relaxed) == opt.size { + tick(&mut client)?; + } + + // Use this check so this is only printed once + if !to_adminify.is_empty() { + println!("Initialization of all clients finished!"); + } + + // Main loop + let chunk_size = 32.0; // TODO: replace with the actual constant + let world_center = client + .world_data() + .chunk_size() + .map(|e| e as f32 * chunk_size) + / 2.0; + loop { + // TODO: doesn't seem to produce an error when server is shutdown (process keeps + // running) + tick(&mut client)?; + let entity = client.entity(); + // Move or stay still depending on specified options + // TODO: make sure server cheat protections aren't triggering + let pos = common::comp::Pos(position(index, opt) + world_center); + let vel = common::comp::Vel(Default::default()); + client + .state_mut() + .write_component_ignore_entity_dead(entity, pos); + client + .state_mut() + .write_component_ignore_entity_dead(entity, vel); + } +} + +// Use client index, opts, and current system time to determine position +fn position(index: u32, opt: Opt) -> Vec3 { + // TODO: replace 32 with constant for chunk size + let chunk_size = 32.0; + + let width = (opt.size as f32).sqrt().round() as u32; + + let spacing = if opt.clustered { + 5.0 + } else { + use common::region::REGION_SIZE; + // Attempt to make regions subscribed to by each client not overlapping + opt.vd as f32 * 2.0 * chunk_size + 2.0 * REGION_SIZE as f32 + }; + + // Offset to center the grid of clients + let offset = Vec2::new( + width as f32 * spacing / 2.0, + (opt.size / width) as f32 / 2.0, + ); + // Position clients in a grid + let base_pos = Vec2::new( + (index % width) as f32 * spacing, + (index / width) as f32 * spacing, + ) - offset; + + let movement_offset: Vec2<_> = if opt.movement { + // blocks per second + const SPEED: f32 = 9.0; // typical super fast veloren walking speed + + // move in a square route + // in blocks + let route_side_length = chunk_size * opt.vd as f32 * 3.0; + let route_length = route_side_length * 4.0; + // in secs + let route_time = route_length / SPEED; + let route_progress = (SystemTime::UNIX_EPOCH.elapsed().unwrap().as_secs_f64() + % route_time as f64) as f32 + / route_time; + + // clockwise square + (match route_progress * 4.0 { + // going up left side + t if t < 1.0 => Vec2::new(0.0, 0.0 + t), + // going across top side + t if t < 2.0 => Vec2::new(0.0 + (t - 1.0), 1.0), + // going down right side + t if t < 3.0 => Vec2::new(1.0, 1.0 - (t - 2.0)), + // going across bottom + t => Vec2::new(1.0 - (t - 3.0), 0.0), + }) * route_side_length + } else { + Vec2::zero() + }; + + Vec3::from(base_pos + movement_offset) +} + +fn body() -> comp::Body { + comp::body::humanoid::Body { + species: comp::body::humanoid::Species::Human, + body_type: comp::body::humanoid::BodyType::Male, + hair_style: 0, + beard: 0, + eyes: 0, + accessory: 0, + hair_color: 0, + skin: 0, + eye_color: 0, + } + .into() +} diff --git a/client/src/lib.rs b/client/src/lib.rs index afd41d5245..2bbdf82010 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1534,6 +1534,54 @@ impl Client { .recv_all(); // 5) Terrain + self.tick_terrain()?; + + // Send a ping to the server once every second + if self.state.get_time() - self.last_server_ping > 1. { + self.send_msg_err(PingMsg::Ping)?; + self.last_server_ping = self.state.get_time(); + } + + // 6) Update the server about the player's physics attributes. + if self.presence.is_some() { + if let (Some(pos), Some(vel), Some(ori)) = ( + self.state.read_storage().get(self.entity()).cloned(), + self.state.read_storage().get(self.entity()).cloned(), + self.state.read_storage().get(self.entity()).cloned(), + ) { + self.in_game_stream + .send(ClientGeneral::PlayerPhysics { pos, vel, ori })?; + } + } + + /* + // Output debug metrics + if log_enabled!(Level::Info) && self.tick % 600 == 0 { + let metrics = self + .state + .terrain() + .iter() + .fold(ChonkMetrics::default(), |a, (_, c)| a + c.get_metrics()); + info!("{:?}", metrics); + } + */ + + // 7) Finish the tick, pass control back to the frontend. + self.tick += 1; + Ok(frontend_events) + } + + /// Clean up the client after a tick. + pub fn cleanup(&mut self) { + // Cleanup the local state + self.state.cleanup(); + } + + /// Handles terrain addition and removal. + /// + /// Removes old terrain chunks outside the view distance. + /// Sends requests for missing chunks within the view distance. + fn tick_terrain(&mut self) -> Result<(), Error> { let pos = self .state .read_storage::() @@ -1628,45 +1676,7 @@ impl Client { .retain(|_, created| now.duration_since(*created) < Duration::from_secs(3)); } - // Send a ping to the server once every second - if self.state.get_time() - self.last_server_ping > 1. { - self.send_msg_err(PingMsg::Ping)?; - self.last_server_ping = self.state.get_time(); - } - - // 6) Update the server about the player's physics attributes. - if self.presence.is_some() { - if let (Some(pos), Some(vel), Some(ori)) = ( - self.state.read_storage().get(self.entity()).cloned(), - self.state.read_storage().get(self.entity()).cloned(), - self.state.read_storage().get(self.entity()).cloned(), - ) { - self.in_game_stream - .send(ClientGeneral::PlayerPhysics { pos, vel, ori })?; - } - } - - /* - // Output debug metrics - if log_enabled!(Level::Info) && self.tick % 600 == 0 { - let metrics = self - .state - .terrain() - .iter() - .fold(ChonkMetrics::default(), |a, (_, c)| a + c.get_metrics()); - info!("{:?}", metrics); - } - */ - - // 7) Finish the tick, pass control back to the frontend. - self.tick += 1; - Ok(frontend_events) - } - - /// Clean up the client after a tick. - pub fn cleanup(&mut self) { - // Cleanup the local state - self.state.cleanup(); + Ok(()) } fn handle_server_msg( @@ -2211,15 +2221,12 @@ impl Client { /// Get a mutable reference to the client's game state. pub fn state_mut(&mut self) -> &mut State { &mut self.state } - /// Get a vector of all the players on the server - pub fn get_players(&mut self) -> Vec { - // TODO: Don't clone players. - self.state - .ecs() - .read_storage::() - .join() - .cloned() - .collect() + /// Returns an iterator over the aliases of all the online players on the + /// server + pub fn players(&self) -> impl Iterator { + self.player_list() + .values() + .filter_map(|player_info| player_info.is_online.then(|| &*player_info.player_alias)) } /// Return true if this client is a moderator on the server @@ -2469,6 +2476,70 @@ impl Client { comp::ChatType::Meta => message.to_string(), } } + + /// Execute a single client tick: + /// - handles messages from the server + /// - sends physics update + /// - requests chunks + /// + /// The game state is purposefully not simulated to reduce the overhead of + /// running the client. This method is for use in testing a server with + /// many clients connected. + #[cfg(feature = "tick_network")] + #[allow(clippy::needless_collect)] // False positive + pub fn tick_network(&mut self, dt: Duration) -> Result<(), Error> { + span!(_guard, "tick_network", "Client::tick_network"); + // Advance state time manually since we aren't calling `State::tick` + self.state + .ecs() + .write_resource::() + .0 += dt.as_secs_f64(); + + // Handle new messages from the server. + self.handle_new_messages()?; + + // 5) Terrain + self.tick_terrain()?; + let empty = Arc::new(TerrainChunk::new( + 0, + Block::empty(), + Block::empty(), + common::terrain::TerrainChunkMeta::void(), + )); + let mut terrain = self.state.terrain_mut(); + // Replace chunks with empty chunks to save memory + let to_clear = terrain + .iter() + .filter_map(|(key, chunk)| (chunk.sub_chunks_len() != 0).then(|| key)) + .collect::>(); + to_clear.into_iter().for_each(|key| { + terrain.insert(key, Arc::clone(&empty)); + }); + drop(terrain); + + // Send a ping to the server once every second + if self.state.get_time() - self.last_server_ping > 1. { + self.send_msg_err(PingMsg::Ping)?; + self.last_server_ping = self.state.get_time(); + } + + // 6) Update the server about the player's physics attributes. + if self.presence.is_some() { + if let (Some(pos), Some(vel), Some(ori)) = ( + self.state.read_storage().get(self.entity()).cloned(), + self.state.read_storage().get(self.entity()).cloned(), + self.state.read_storage().get(self.entity()).cloned(), + ) { + self.in_game_stream + .send(ClientGeneral::PlayerPhysics { pos, vel, ori })?; + } + } + + // 7) Finish the tick, pass control back to the frontend. + self.tick += 1; + + Ok(()) + } } impl Drop for Client { diff --git a/common/Cargo.toml b/common/Cargo.toml index f935d53c0e..8e5977d4bf 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -73,7 +73,7 @@ kiddo = { version = "0.1", optional = true } # Data structures hashbrown = { version = "0.11", features = ["rayon", "serde", "nightly"] } slotmap = { version = "1.0", features = ["serde"] } -indexmap = "1.3.0" +indexmap = { version = "1.3.0", features = ["rayon"] } slab = "0.4.2" # ECS diff --git a/common/src/region.rs b/common/src/region.rs index 6949f63a7d..9e89579912 100644 --- a/common/src/region.rs +++ b/common/src/region.rs @@ -342,16 +342,16 @@ impl RegionMap { } } - // Returns a region given a key + /// Returns a region given a key pub fn get(&self, key: Vec2) -> Option<&Region> { self.regions.get(&key) } - // Returns an iterator of (Position, Region) + /// Returns an iterator of (Position, Region) pub fn iter(&self) -> impl Iterator, &Region)> { self.regions.iter().map(|(key, r)| (*key, r)) } } -// Note vd is in blocks in this case +/// Note vd is in blocks in this case pub fn region_in_vd(key: Vec2, pos: Vec3, vd: f32) -> bool { let vd_extended = vd + TETHER_LENGTH as f32 * 2.0f32.sqrt(); diff --git a/server-cli/src/main.rs b/server-cli/src/main.rs index 7845e2a439..c49aebd963 100644 --- a/server-cli/src/main.rs +++ b/server-cli/src/main.rs @@ -85,6 +85,11 @@ fn main() -> io::Result<()> { let mut server_settings = server::Settings::load(&server_data_dir); let mut editable_settings = server::EditableSettings::load(&server_data_dir); + // Apply no_auth modifier to the settings + if no_auth { + server_settings.auth_server_address = None; + } + // Relative to data_dir const PERSISTENCE_DB_DIR: &str = "saves"; @@ -149,10 +154,6 @@ fn main() -> io::Result<()> { info!("Starting server..."); - if no_auth { - server_settings.auth_server_address = None; - } - let server_port = &server_settings.gameserver_address.port(); let metrics_port = &server_settings.metrics_address.port(); // Create server diff --git a/server/src/client.rs b/server/src/client.rs index ca69bb0421..bc45cd45a0 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -83,7 +83,11 @@ impl Client { } pub(crate) fn send>(&self, msg: M) -> Result<(), StreamError> { - match msg.into() { + // TODO: hack to avoid locking stream mutex while serializing the message, + // remove this when the mutexes on the Streams are removed + let prepared = self.prepare(msg); + self.send_prepared(&prepared) + /*match msg.into() { ServerMsg::Info(m) => self.register_stream.lock().unwrap().send(m), ServerMsg::Init(m) => self.register_stream.lock().unwrap().send(m), ServerMsg::RegisterAnswer(m) => self.register_stream.lock().unwrap().send(m), @@ -133,7 +137,7 @@ impl Client { } }, ServerMsg::Ping(m) => self.ping_stream.lock().unwrap().send(m), - } + }*/ } pub(crate) fn send_fallible>(&self, msg: M) { let _ = self.send(msg); } diff --git a/server/src/sys/entity_sync.rs b/server/src/sys/entity_sync.rs index ad6742dcce..9810b30954 100644 --- a/server/src/sys/entity_sync.rs +++ b/server/src/sys/entity_sync.rs @@ -23,6 +23,7 @@ use vek::*; /// This system will send physics updates to the client #[derive(Default)] pub struct Sys; + impl<'a> System<'a> for Sys { #[allow(clippy::type_complexity)] type SystemData = ( @@ -58,7 +59,7 @@ impl<'a> System<'a> for Sys { const PHASE: Phase = Phase::Create; fn run( - _job: &mut Job, + job: &mut Job, ( entities, tick, @@ -105,201 +106,223 @@ impl<'a> System<'a> for Sys { // 5. Inform clients of the component changes for that entity // - Throttle update rate base on distance to each client - // Sync physics - // via iterating through regions - for (key, region) in region_map.iter() { - // Assemble subscriber list for this region by iterating through clients and - // checking if they are subscribed to this region - let mut subscribers = ( - &clients, - &entities, - presences.maybe(), - &subscriptions, - &positions, - ) - .join() - .filter_map(|(client, entity, presence, subscription, pos)| { - if presence.is_some() && subscription.regions.contains(&key) { - Some((client, &subscription.regions, entity, *pos)) - } else { - None - } - }) - .collect::>(); + // Sync physics and other components + // via iterating through regions (in parallel) - for event in region.events() { - match event { - RegionEvent::Entered(id, maybe_key) => { - // Don't process newly created entities here (redundant network messages) - if trackers.uid.inserted().contains(*id) { - continue; + // Pre-collect regions paired with deleted entity list so we can iterate over + // them in parallel below + let regions_and_deleted_entities = region_map + .iter() + .map(|(key, region)| (key, region, deleted_entities.take_deleted_in_region(key))) + .collect::>(); + + use rayon::iter::{IntoParallelIterator, ParallelIterator}; + job.cpu_stats.measure(common_ecs::ParMode::Rayon); + common_base::prof_span!(guard, "regions"); + regions_and_deleted_entities.into_par_iter().for_each_init( + || { + common_base::prof_span!(guard, "entity sync rayon job"); + guard + }, + |_guard, (key, region, deleted_entities_in_region)| { + // Assemble subscriber list for this region by iterating through clients and + // checking if they are subscribed to this region + let mut subscribers = ( + &clients, + &entities, + presences.maybe(), + &subscriptions, + &positions, + ) + .join() + .filter_map(|(client, entity, presence, subscription, pos)| { + if presence.is_some() && subscription.regions.contains(&key) { + Some((client, &subscription.regions, entity, *pos)) + } else { + None } - let entity = entities.entity(*id); - if let Some(pkg) = positions - .get(entity) - .map(|pos| (pos, velocities.get(entity), orientations.get(entity))) - .and_then(|(pos, vel, ori)| { - tracked_comps.create_entity_package( - entity, - Some(*pos), - vel.copied(), - ori.copied(), - ) - }) - { - let create_msg = ServerGeneral::CreateEntity(pkg); - for (client, regions, client_entity, _) in &mut subscribers { - if maybe_key + }) + .collect::>(); + + for event in region.events() { + match event { + RegionEvent::Entered(id, maybe_key) => { + // Don't process newly created entities here (redundant network + // messages) + if trackers.uid.inserted().contains(*id) { + continue; + } + let entity = entities.entity(*id); + if let Some(pkg) = positions + .get(entity) + .map(|pos| (pos, velocities.get(entity), orientations.get(entity))) + .and_then(|(pos, vel, ori)| { + tracked_comps.create_entity_package( + entity, + Some(*pos), + vel.copied(), + ori.copied(), + ) + }) + { + let create_msg = ServerGeneral::CreateEntity(pkg); + for (client, regions, client_entity, _) in &mut subscribers { + if maybe_key .as_ref() .map(|key| !regions.contains(key)) .unwrap_or(true) // Client doesn't need to know about itself && *client_entity != entity - { - client.send_fallible(create_msg.clone()); + { + client.send_fallible(create_msg.clone()); + } } } - } - }, - RegionEvent::Left(id, maybe_key) => { - // Lookup UID for entity - if let Some(&uid) = uids.get(entities.entity(*id)) { - for (client, regions, _, _) in &mut subscribers { - if maybe_key - .as_ref() - .map(|key| !regions.contains(key)) - .unwrap_or(true) - { - client.send_fallible(ServerGeneral::DeleteEntity(uid)); + }, + RegionEvent::Left(id, maybe_key) => { + // Lookup UID for entity + if let Some(&uid) = uids.get(entities.entity(*id)) { + for (client, regions, _, _) in &mut subscribers { + if maybe_key + .as_ref() + .map(|key| !regions.contains(key)) + .unwrap_or(true) + { + client.send_fallible(ServerGeneral::DeleteEntity(uid)); + } } } - } - }, + }, + } } - } - // Sync tracked components - // Get deleted entities in this region from DeletedEntities - let (entity_sync_package, comp_sync_package) = trackers.create_sync_packages( - &tracked_comps, - region.entities(), - deleted_entities - .take_deleted_in_region(key) - .unwrap_or_default(), - ); - // We lazily initializethe the synchronization messages in case there are no - // clients. - let mut entity_comp_sync = Either::Left((entity_sync_package, comp_sync_package)); - for (client, _, _, _) in &mut subscribers { - let msg = - entity_comp_sync.right_or_else(|(entity_sync_package, comp_sync_package)| { - ( - client.prepare(ServerGeneral::EntitySync(entity_sync_package)), - client.prepare(ServerGeneral::CompSync(comp_sync_package)), - ) - }); - // We don't care much about stream errors here since they could just represent - // network disconnection, which is handled elsewhere. - let _ = client.send_prepared(&msg.0); - let _ = client.send_prepared(&msg.1); - entity_comp_sync = Either::Right(msg); - } - - for (client, _, client_entity, client_pos) in &mut subscribers { - let mut comp_sync_package = CompSyncPackage::new(); - - for (_, entity, &uid, (&pos, last_pos), vel, ori, force_update, collider) in ( + // Sync tracked components + // Get deleted entities in this region from DeletedEntities + let (entity_sync_package, comp_sync_package) = trackers.create_sync_packages( + &tracked_comps, region.entities(), - &entities, - &uids, - (&positions, last_pos.mask().maybe()), - (&velocities, last_vel.mask().maybe()).maybe(), - (&orientations, last_vel.mask().maybe()).maybe(), - force_updates.mask().maybe(), - colliders.maybe(), - ) - .join() - { - // Decide how regularly to send physics updates. - let send_now = if client_entity == &entity { - let player_physics_setting = players - .get(entity) - .and_then(|p| player_physics_settings.settings.get(&p.uuid()).copied()) - .unwrap_or_default(); - // Don't send client physics updates about itself unless force update is set - // or the client is subject to server-authoritative physics - force_update.is_some() || player_physics_setting.server_authoritative() - } else if matches!(collider, Some(Collider::Voxel { .. })) { - // Things with a voxel collider (airships, etc.) need to have very stable - // physics so we always send updated for these where - // we can. - true - } else { - // Throttle update rates for all other entities based on distance to client - let distance_sq = client_pos.0.distance_squared(pos.0); - let id_staggered_tick = tick + entity.id() as u64; - - // More entities farther away so checks start there - if distance_sq > 500.0f32.powi(2) { - id_staggered_tick % 32 == 0 - } else if distance_sq > 300.0f32.powi(2) { - id_staggered_tick % 16 == 0 - } else if distance_sq > 200.0f32.powi(2) { - id_staggered_tick % 8 == 0 - } else if distance_sq > 120.0f32.powi(2) { - id_staggered_tick % 6 == 0 - } else if distance_sq > 64.0f32.powi(2) { - id_staggered_tick % 3 == 0 - } else if distance_sq > 24.0f32.powi(2) { - id_staggered_tick % 2 == 0 - } else { - true - } - }; - - if last_pos.is_none() { - comp_sync_package.comp_inserted(uid, pos); - } else if send_now { - comp_sync_package.comp_modified(uid, pos); - } - - if let Some((v, last_vel)) = vel { - if last_vel.is_none() { - comp_sync_package.comp_inserted(uid, *v); - } else if send_now { - comp_sync_package.comp_modified(uid, *v); - } - } - - if let Some((o, last_ori)) = ori { - if last_ori.is_none() { - comp_sync_package.comp_inserted(uid, *o); - } else if send_now { - comp_sync_package.comp_modified(uid, *o); - } - } + deleted_entities_in_region, + ); + // We lazily initialize the the synchronization messages in case there are no + // clients. + let mut entity_comp_sync = Either::Left((entity_sync_package, comp_sync_package)); + for (client, _, _, _) in &mut subscribers { + let msg = entity_comp_sync.right_or_else( + |(entity_sync_package, comp_sync_package)| { + ( + client.prepare(ServerGeneral::EntitySync(entity_sync_package)), + client.prepare(ServerGeneral::CompSync(comp_sync_package)), + ) + }, + ); + // We don't care much about stream errors here since they could just represent + // network disconnection, which is handled elsewhere. + let _ = client.send_prepared(&msg.0); + let _ = client.send_prepared(&msg.1); + entity_comp_sync = Either::Right(msg); } - client.send_fallible(ServerGeneral::CompSync(comp_sync_package)); - } + for (client, _, client_entity, client_pos) in &mut subscribers { + let mut comp_sync_package = CompSyncPackage::new(); - // Update the last physics components for each entity - for (_, _, &pos, vel, ori, last_pos, last_vel, last_ori) in ( - region.entities(), - &entities, - &positions, - velocities.maybe(), - orientations.maybe(), - last_pos.entries(), - last_vel.entries(), - last_ori.entries(), - ) - .join() - { - last_pos.replace(Last(pos)); - vel.and_then(|&v| last_vel.replace(Last(v))); - ori.and_then(|&o| last_ori.replace(Last(o))); - } + for (_, entity, &uid, (&pos, last_pos), vel, ori, force_update, collider) in ( + region.entities(), + &entities, + &uids, + (&positions, last_pos.mask().maybe()), + (&velocities, last_vel.mask().maybe()).maybe(), + (&orientations, last_vel.mask().maybe()).maybe(), + force_updates.mask().maybe(), + colliders.maybe(), + ) + .join() + { + // Decide how regularly to send physics updates. + let send_now = if client_entity == &entity { + let player_physics_setting = players + .get(entity) + .and_then(|p| { + player_physics_settings.settings.get(&p.uuid()).copied() + }) + .unwrap_or_default(); + // Don't send client physics updates about itself unless force update is + // set or the client is subject to + // server-authoritative physics + force_update.is_some() || player_physics_setting.server_authoritative() + } else if matches!(collider, Some(Collider::Voxel { .. })) { + // Things with a voxel collider (airships, etc.) need to have very + // stable physics so we always send updated + // for these where we can. + true + } else { + // Throttle update rates for all other entities based on distance to + // client + let distance_sq = client_pos.0.distance_squared(pos.0); + let id_staggered_tick = tick + entity.id() as u64; + + // More entities farther away so checks start there + if distance_sq > 500.0f32.powi(2) { + id_staggered_tick % 32 == 0 + } else if distance_sq > 300.0f32.powi(2) { + id_staggered_tick % 16 == 0 + } else if distance_sq > 200.0f32.powi(2) { + id_staggered_tick % 8 == 0 + } else if distance_sq > 120.0f32.powi(2) { + id_staggered_tick % 6 == 0 + } else if distance_sq > 64.0f32.powi(2) { + id_staggered_tick % 3 == 0 + } else if distance_sq > 24.0f32.powi(2) { + id_staggered_tick % 2 == 0 + } else { + true + } + }; + + if last_pos.is_none() { + comp_sync_package.comp_inserted(uid, pos); + } else if send_now { + comp_sync_package.comp_modified(uid, pos); + } + + if let Some((v, last_vel)) = vel { + if last_vel.is_none() { + comp_sync_package.comp_inserted(uid, *v); + } else if send_now { + comp_sync_package.comp_modified(uid, *v); + } + } + + if let Some((o, last_ori)) = ori { + if last_ori.is_none() { + comp_sync_package.comp_inserted(uid, *o); + } else if send_now { + comp_sync_package.comp_modified(uid, *o); + } + } + } + + client.send_fallible(ServerGeneral::CompSync(comp_sync_package)); + } + }, + ); + drop(guard); + job.cpu_stats.measure(common_ecs::ParMode::Single); + + // Update the last physics components for each entity + for (_, &pos, vel, ori, last_pos, last_vel, last_ori) in ( + &entities, + &positions, + velocities.maybe(), + orientations.maybe(), + last_pos.entries(), + last_vel.entries(), + last_ori.entries(), + ) + .join() + { + last_pos.replace(Last(pos)); + vel.and_then(|&v| last_vel.replace(Last(v))); + ori.and_then(|&o| last_ori.replace(Last(o))); } // Handle entity deletion in regions that don't exist in RegionMap diff --git a/server/src/sys/sentinel.rs b/server/src/sys/sentinel.rs index 7d4008ec5d..74f77376d0 100644 --- a/server/src/sys/sentinel.rs +++ b/server/src/sys/sentinel.rs @@ -430,16 +430,15 @@ impl DeletedEntities { .push(uid.into()); } - pub fn take_deleted_in_region(&mut self, key: Vec2) -> Option> { - self.map.remove(&key) + pub fn take_deleted_in_region(&mut self, key: Vec2) -> Vec { + self.map.remove(&key).unwrap_or_default() } - pub fn get_deleted_in_region(&mut self, key: Vec2) -> Option<&Vec> { - self.map.get(&key) + pub fn get_deleted_in_region(&self, key: Vec2) -> &[u64] { + self.map.get(&key).map_or(&[], |v| v.as_slice()) } - pub fn take_remaining_deleted(&mut self) -> Vec<(Vec2, Vec)> { - // TODO: don't allocate - self.map.drain().collect() + pub fn take_remaining_deleted(&mut self) -> impl Iterator, Vec)> + '_ { + self.map.drain() } } diff --git a/server/src/sys/subscription.rs b/server/src/sys/subscription.rs index 9b83dfede7..714c762d16 100644 --- a/server/src/sys/subscription.rs +++ b/server/src/sys/subscription.rs @@ -13,7 +13,7 @@ use common::{ use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::ServerGeneral; use specs::{ - Entities, Join, ReadExpect, ReadStorage, SystemData, World, WorldExt, Write, WriteStorage, + Entities, Join, Read, ReadExpect, ReadStorage, SystemData, World, WorldExt, WriteStorage, }; use tracing::{debug, error}; use vek::*; @@ -33,7 +33,7 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Presence>, ReadStorage<'a, Client>, WriteStorage<'a, RegionSubscription>, - Write<'a, DeletedEntities>, + Read<'a, DeletedEntities>, TrackedComps<'a>, ); @@ -54,7 +54,7 @@ impl<'a> System<'a> for Sys { presences, clients, mut subscriptions, - mut deleted_entities, + deleted_entities, tracked_comps, ): Self::SystemData, ) { @@ -161,11 +161,7 @@ impl<'a> System<'a> for Sys { } // Send deleted entities since they won't be processed for this client in entity // sync - for uid in deleted_entities - .get_deleted_in_region(key) - .iter() - .flat_map(|v| v.iter()) - { + for uid in deleted_entities.get_deleted_in_region(key).iter() { client.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid))); } } diff --git a/world/src/config.rs b/world/src/config.rs index 6945b043f7..c7faf7c2ce 100644 --- a/world/src/config.rs +++ b/world/src/config.rs @@ -87,6 +87,8 @@ pub struct Features { pub paths: bool, pub spots: bool, pub site2: bool, + // 1.0 is the default wildlife density + pub wildlife_density: f32, } impl assets::Asset for Features { diff --git a/world/src/index.rs b/world/src/index.rs index 513a0c8d0a..75eaa6d7f2 100644 --- a/world/src/index.rs +++ b/world/src/index.rs @@ -119,6 +119,7 @@ impl IndexOwned { // Reload the fields from the asset handle, which is updated automatically self.colors = self.index.colors.cloned(); self.features = self.index.features.cloned(); + // Update wildlife spawns which is based on base_density in features reload(self) }) } diff --git a/world/src/layer/wildlife.rs b/world/src/layer/wildlife.rs index abd3c9001c..4793a4064f 100644 --- a/world/src/layer/wildlife.rs +++ b/world/src/layer/wildlife.rs @@ -21,8 +21,6 @@ fn close(x: f32, tgt: f32, falloff: f32) -> f32 { (1.0 - (x - tgt).abs() / falloff).max(0.0).powf(0.125) } -const BASE_DENSITY: f32 = 1.0e-5; // Base wildlife density - #[derive(Clone, Debug, Deserialize)] pub struct SpawnEntry { /// User-facing info for wiki, statistical tools, etc. @@ -131,6 +129,7 @@ impl Pack { pub type DensityFn = fn(&SimChunk, &ColumnSample) -> f32; pub fn spawn_manifest() -> Vec<(&'static str, DensityFn)> { + const BASE_DENSITY: f32 = 1.0e-5; // Base wildlife density // NOTE: Order matters. // Entries with more specific requirements // and overall scarcity should come first, where possible. @@ -317,6 +316,8 @@ pub fn apply_wildlife_supplement<'a, R: Rng>( time: Option<&(TimeOfDay, Calendar)>, ) { let scatter = &index.wildlife_spawns; + // Configurable density multiplier + let wildlife_density_modifier = index.features.wildlife_density; for y in 0..vol.size_xy().y as i32 { for x in 0..vol.size_xy().x as i32 { @@ -342,7 +343,7 @@ pub fn apply_wildlife_supplement<'a, R: Rng>( .iter() .enumerate() .find_map(|(_i, (entry, get_density))| { - let density = get_density(chunk, col_sample); + let density = get_density(chunk, col_sample) * wildlife_density_modifier; (density > 0.0) .then(|| { entry