make weather not block main thread, and lessen the amount of data shared with client.

This commit is contained in:
Isse 2024-01-22 19:47:57 +01:00
parent 92a8bc2806
commit 268215b301
9 changed files with 269 additions and 109 deletions

View File

@ -49,7 +49,7 @@ use common::{
trade::{PendingTrade, SitePrices, TradeAction, TradeId, TradeResult}, trade::{PendingTrade, SitePrices, TradeAction, TradeId, TradeResult},
uid::{IdMaps, Uid}, uid::{IdMaps, Uid},
vol::RectVolSize, vol::RectVolSize,
weather::{Weather, WeatherGrid}, weather::{SharedWeatherGrid, Weather, WeatherGrid},
}; };
#[cfg(feature = "tracy")] use common_base::plot; #[cfg(feature = "tracy")] use common_base::plot;
use common_base::{prof_span, span}; use common_base::{prof_span, span};
@ -178,12 +178,13 @@ pub struct SiteInfoRich {
} }
struct WeatherLerp { struct WeatherLerp {
old: (WeatherGrid, Instant), old: (SharedWeatherGrid, Instant),
new: (WeatherGrid, Instant), new: (SharedWeatherGrid, Instant),
local_weather: Weather,
} }
impl WeatherLerp { impl WeatherLerp {
fn weather_update(&mut self, weather: WeatherGrid) { fn weather_update(&mut self, weather: SharedWeatherGrid) {
self.old = mem::replace(&mut self.new, (weather, Instant::now())); self.old = mem::replace(&mut self.new, (weather, Instant::now()));
} }
@ -197,7 +198,7 @@ impl WeatherLerp {
return; return;
} }
if to_update.size() != new.size() { if to_update.size() != new.size() {
*to_update = new.clone(); *to_update = WeatherGrid::from(new);
} }
if old.size() == new.size() { if old.size() == new.size() {
// Assumes updates are regular // Assumes updates are regular
@ -209,7 +210,7 @@ impl WeatherLerp {
.iter_mut() .iter_mut()
.zip(old.iter().zip(new.iter())) .zip(old.iter().zip(new.iter()))
.for_each(|((_, current), ((_, old), (_, new)))| { .for_each(|((_, current), ((_, old), (_, new)))| {
*current = Weather::lerp_unclamped(old, new, t); *current = Weather::lerp_shared(old, new, t);
}); });
} }
} }
@ -218,8 +219,9 @@ impl WeatherLerp {
impl Default for WeatherLerp { impl Default for WeatherLerp {
fn default() -> Self { fn default() -> Self {
Self { Self {
old: (WeatherGrid::new(Vec2::zero()), Instant::now()), old: (SharedWeatherGrid::new(Vec2::zero()), Instant::now()),
new: (WeatherGrid::new(Vec2::zero()), Instant::now()), new: (SharedWeatherGrid::new(Vec2::zero()), Instant::now()),
local_weather: Weather::default(),
} }
} }
} }
@ -1713,12 +1715,7 @@ impl Client {
.map(|v| v.0) .map(|v| v.0)
} }
/// Returns Weather::default if no player position exists. pub fn weather_at_player(&self) -> Weather { self.weather.local_weather }
pub fn weather_at_player(&self) -> Weather {
self.position()
.map(|wpos| self.state.weather_at(wpos.xy()))
.unwrap_or_default()
}
pub fn current_chunk(&self) -> Option<Arc<TerrainChunk>> { pub fn current_chunk(&self) -> Option<Arc<TerrainChunk>> {
let chunk_pos = Vec2::from(self.position()?) let chunk_pos = Vec2::from(self.position()?)
@ -2539,6 +2536,9 @@ impl Client {
ServerGeneral::WeatherUpdate(weather) => { ServerGeneral::WeatherUpdate(weather) => {
self.weather.weather_update(weather); self.weather.weather_update(weather);
}, },
ServerGeneral::LocalWeatherUpdate(weather) => {
self.weather.local_weather = weather;
},
ServerGeneral::SpectatePosition(pos) => { ServerGeneral::SpectatePosition(pos) => {
frontend_events.push(Event::SpectatePosition(pos)); frontend_events.push(Event::SpectatePosition(pos));
}, },

View File

@ -17,7 +17,7 @@ use common::{
trade::{PendingTrade, SitePrices, TradeId, TradeResult}, trade::{PendingTrade, SitePrices, TradeId, TradeResult},
uid::Uid, uid::Uid,
uuid::Uuid, uuid::Uuid,
weather::WeatherGrid, weather::{SharedWeatherGrid, Weather},
}; };
use hashbrown::HashMap; use hashbrown::HashMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -214,7 +214,8 @@ pub enum ServerGeneral {
/// Economic information about sites /// Economic information about sites
SiteEconomy(EconomyInfo), SiteEconomy(EconomyInfo),
MapMarker(comp::MapMarkerUpdate), MapMarker(comp::MapMarkerUpdate),
WeatherUpdate(WeatherGrid), WeatherUpdate(SharedWeatherGrid),
LocalWeatherUpdate(Weather),
/// Suggest the client to spectate a position. Called after client has /// Suggest the client to spectate a position. Called after client has
/// requested teleport etc. /// requested teleport etc.
SpectatePosition(Vec3<f32>), SpectatePosition(Vec3<f32>),
@ -339,6 +340,7 @@ impl ServerMsg {
| ServerGeneral::SiteEconomy(_) | ServerGeneral::SiteEconomy(_)
| ServerGeneral::MapMarker(_) | ServerGeneral::MapMarker(_)
| ServerGeneral::WeatherUpdate(_) | ServerGeneral::WeatherUpdate(_)
| ServerGeneral::LocalWeatherUpdate(_)
| ServerGeneral::SpectatePosition(_) => { | ServerGeneral::SpectatePosition(_) => {
c_type == ClientType::Game && presence.is_some() c_type == ClientType::Game && presence.is_some()
}, },

View File

@ -40,6 +40,14 @@ impl Weather {
} }
} }
pub fn lerp_shared(from: &SharedWeather, to: &SharedWeather, t: f32) -> Self {
Self {
cloud: f32::lerp_unclamped(from.cloud as f32, to.cloud as f32, t) / 255.0,
rain: f32::lerp_unclamped(from.rain as f32, to.rain as f32, t) / 255.0,
wind: Vec2::zero(),
}
}
// Get the rain velocity for this weather // Get the rain velocity for this weather
pub fn rain_vel(&self) -> Vec3<f32> { pub fn rain_vel(&self) -> Vec3<f32> {
const FALL_RATE: f32 = 30.0; const FALL_RATE: f32 = 30.0;
@ -75,11 +83,92 @@ pub const CHUNKS_PER_CELL: u32 = 16;
pub const CELL_SIZE: u32 = CHUNKS_PER_CELL * TerrainChunkSize::RECT_SIZE.x; pub const CELL_SIZE: u32 = CHUNKS_PER_CELL * TerrainChunkSize::RECT_SIZE.x;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone)]
pub struct WeatherGrid { pub struct WeatherGrid {
weather: Grid<Weather>, weather: Grid<Weather>,
} }
#[derive(Default, Debug, Clone, Copy, Serialize, Deserialize)]
pub struct SharedWeather {
cloud: u8,
rain: u8,
}
impl From<Weather> for SharedWeather {
fn from(weather: Weather) -> Self {
Self {
cloud: (weather.cloud * 255.0) as u8,
rain: (weather.rain * 255.0) as u8,
}
}
}
impl From<SharedWeather> for Weather {
fn from(weather: SharedWeather) -> Self {
Self {
cloud: weather.cloud as f32 / 255.0,
rain: weather.rain as f32 / 255.0,
wind: Vec2::zero(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SharedWeatherGrid {
weather: Grid<SharedWeather>,
}
impl From<&WeatherGrid> for SharedWeatherGrid {
fn from(value: &WeatherGrid) -> Self {
Self {
weather: Grid::from_raw(
value.weather.size(),
value
.weather
.raw()
.iter()
.copied()
.map(SharedWeather::from)
.collect::<Vec<_>>(),
),
}
}
}
impl From<&SharedWeatherGrid> for WeatherGrid {
fn from(value: &SharedWeatherGrid) -> Self {
Self {
weather: Grid::from_raw(
value.weather.size(),
value
.weather
.raw()
.iter()
.copied()
.map(Weather::from)
.collect::<Vec<_>>(),
),
}
}
}
impl SharedWeatherGrid {
pub fn new(size: Vec2<u32>) -> Self {
size.map(|e| debug_assert!(i32::try_from(e).is_ok()));
Self {
weather: Grid::new(size.as_(), SharedWeather::default()),
}
}
pub fn iter(&self) -> impl Iterator<Item = (Vec2<i32>, &SharedWeather)> { self.weather.iter() }
pub fn iter_mut(&mut self) -> impl Iterator<Item = (Vec2<i32>, &mut SharedWeather)> {
self.weather.iter_mut()
}
pub fn size(&self) -> Vec2<u32> { self.weather.size().as_() }
}
/// Transforms a world position to cell coordinates. Where (0.0, 0.0) in cell /// Transforms a world position to cell coordinates. Where (0.0, 0.0) in cell
/// coordinates is the center of the weather cell located at (0, 0) in the grid. /// coordinates is the center of the weather cell located at (0, 0) in the grid.
fn to_cell_pos(wpos: Vec2<f32>) -> Vec2<f32> { wpos / CELL_SIZE as f32 - 0.5 } fn to_cell_pos(wpos: Vec2<f32>) -> Vec2<f32> { wpos / CELL_SIZE as f32 - 0.5 }
@ -113,6 +202,13 @@ impl WeatherGrid {
pub fn size(&self) -> Vec2<u32> { self.weather.size().as_() } pub fn size(&self) -> Vec2<u32> { self.weather.size().as_() }
pub fn get(&self, cell_pos: Vec2<u32>) -> Weather {
self.weather
.get(cell_pos.as_())
.copied()
.unwrap_or_default()
}
/// Get the weather at a given world position by doing bilinear /// Get the weather at a given world position by doing bilinear
/// interpolation between four cells. /// interpolation between four cells.
pub fn get_interpolated(&self, wpos: Vec2<f32>) -> Weather { pub fn get_interpolated(&self, wpos: Vec2<f32>) -> Weather {

View File

@ -192,6 +192,7 @@ impl Client {
| ServerGeneral::FinishedTrade(_) | ServerGeneral::FinishedTrade(_)
| ServerGeneral::MapMarker(_) | ServerGeneral::MapMarker(_)
| ServerGeneral::WeatherUpdate(_) | ServerGeneral::WeatherUpdate(_)
| ServerGeneral::LocalWeatherUpdate(_)
| ServerGeneral::SpectatePosition(_) => { | ServerGeneral::SpectatePosition(_) => {
PreparedMsg::new(2, &g, &self.in_game_stream_params) PreparedMsg::new(2, &g, &self.in_game_stream_params)
}, },

View File

@ -371,6 +371,7 @@ impl Server {
pool.configure("CHUNK_GENERATOR", |n| n / 2 + n / 4); pool.configure("CHUNK_GENERATOR", |n| n / 2 + n / 4);
pool.configure("CHUNK_SERIALIZER", |n| n / 2); pool.configure("CHUNK_SERIALIZER", |n| n / 2);
pool.configure("RTSIM_SAVE", |_| 1); pool.configure("RTSIM_SAVE", |_| 1);
pool.configure("WEATHER", |_| 1);
} }
state state
.ecs_mut() .ecs_mut()
@ -588,7 +589,7 @@ impl Server {
return Err(Error::RtsimError(err)); return Err(Error::RtsimError(err));
}, },
} }
weather::init(&mut state, &world); weather::init(&mut state);
} }
let server_constants = ServerConstants { let server_constants = ServerConstants {

View File

@ -1,13 +1,8 @@
use common::weather::CHUNKS_PER_CELL; use common_ecs::dispatch;
use common_ecs::{dispatch, System};
use common_state::State; use common_state::State;
use specs::DispatcherBuilder; use specs::DispatcherBuilder;
use std::time::Duration;
use crate::sys::SysScheduler;
mod sim; mod sim;
mod sync;
mod tick; mod tick;
pub use sim::WeatherSim; pub use sim::WeatherSim;
@ -17,25 +12,14 @@ const WEATHER_DT: f32 = 5.0;
pub fn add_server_systems(dispatch_builder: &mut DispatcherBuilder) { pub fn add_server_systems(dispatch_builder: &mut DispatcherBuilder) {
dispatch::<tick::Sys>(dispatch_builder, &[]); dispatch::<tick::Sys>(dispatch_builder, &[]);
dispatch::<sync::Sys>(dispatch_builder, &[&tick::Sys::sys_name()]);
} }
#[cfg(feature = "worldgen")] #[cfg(feature = "worldgen")]
pub fn init(state: &mut State, world: &world::World) { pub fn init(state: &mut State) {
let weather_size = world.sim().get_size() / CHUNKS_PER_CELL; use crate::weather::sim::LightningCells;
let sim = WeatherSim::new(weather_size, world);
state.ecs_mut().insert(sim);
// NOTE: If weather computations get too heavy, this should not block the main use self::tick::WeatherJob;
// thread.
state state.ecs_mut().insert(None::<WeatherJob>);
.ecs_mut() state.ecs_mut().insert(LightningCells::default());
.insert(SysScheduler::<tick::Sys>::every(Duration::from_secs_f32(
WEATHER_DT,
)));
state
.ecs_mut()
.insert(SysScheduler::<sync::Sys>::every(Duration::from_secs_f32(
WEATHER_DT,
)));
} }

View File

@ -1,12 +1,9 @@
use common::{ use common::{
event::EventBus,
grid::Grid, grid::Grid,
outcome::Outcome,
resources::TimeOfDay, resources::TimeOfDay,
weather::{Weather, WeatherGrid, CELL_SIZE, CHUNKS_PER_CELL}, weather::{Weather, WeatherGrid, CELL_SIZE, CHUNKS_PER_CELL},
}; };
use noise::{NoiseFn, SuperSimplex, Turbulence}; use noise::{NoiseFn, SuperSimplex, Turbulence};
use rand::prelude::*;
use vek::*; use vek::*;
use world::World; use world::World;
@ -31,6 +28,12 @@ pub struct WeatherSim {
zones: Grid<Option<WeatherZone>>, zones: Grid<Option<WeatherZone>>,
} }
/// A list of weather cells where lightning has a chance to strike.
#[derive(Default)]
pub struct LightningCells {
pub cells: Vec<Vec2<i32>>,
}
impl WeatherSim { impl WeatherSim {
pub fn new(size: Vec2<u32>, world: &World) -> Self { pub fn new(size: Vec2<u32>, world: &World) -> Self {
Self { Self {
@ -85,14 +88,8 @@ impl WeatherSim {
} }
} }
// Time step is cell size / maximum wind speed // Time step is cell size / maximum wind speed.
pub fn tick( pub fn tick(&mut self, time_of_day: TimeOfDay, out: &mut WeatherGrid) -> LightningCells {
&mut self,
time_of_day: &TimeOfDay,
outcomes: &EventBus<Outcome>,
out: &mut WeatherGrid,
world: &World,
) {
let time = time_of_day.0; let time = time_of_day.0;
let base_nz = Turbulence::new( let base_nz = Turbulence::new(
@ -105,6 +102,7 @@ impl WeatherSim {
let rain_nz = SuperSimplex::new(); let rain_nz = SuperSimplex::new();
let mut lightning_cells = Vec::new();
for (point, cell) in out.iter_mut() { for (point, cell) in out.iter_mut() {
if let Some(zone) = &mut self.zones[point] { if let Some(zone) = &mut self.zones[point] {
*cell = zone.weather; *cell = zone.weather;
@ -147,16 +145,14 @@ impl WeatherSim {
rain_nz.get((spos + 1.0).into_array()).powi(3) as f32, rain_nz.get((spos + 1.0).into_array()).powi(3) as f32,
) * 200.0 ) * 200.0
* (1.0 - pressure); * (1.0 - pressure);
if cell.rain > 0.2 && cell.cloud > 0.15 && thread_rng().gen_bool(0.01) {
let wpos = wpos.map(|e| {
e as f32 + thread_rng().gen_range(-1.0..1.0) * CELL_SIZE as f32 * 0.5
});
outcomes.emit_now(Outcome::Lightning {
pos: wpos.with_z(world.sim().get_alt_approx(wpos.as_()).unwrap_or(0.0)),
});
}
} }
if cell.rain > 0.2 && cell.cloud > 0.15 {
lightning_cells.push(point);
}
}
LightningCells {
cells: lightning_cells,
} }
} }

View File

@ -1,37 +0,0 @@
use common::weather::WeatherGrid;
use common_ecs::{Origin, Phase, System};
use common_net::msg::ServerGeneral;
use specs::{Join, ReadExpect, ReadStorage, Write};
use crate::{client::Client, sys::SysScheduler};
#[derive(Default)]
pub struct Sys;
impl<'a> System<'a> for Sys {
type SystemData = (
ReadExpect<'a, WeatherGrid>,
Write<'a, SysScheduler<Self>>,
ReadStorage<'a, Client>,
);
const NAME: &'static str = "weather::sync";
const ORIGIN: Origin = Origin::Server;
const PHASE: Phase = Phase::Create;
fn run(
_job: &mut common_ecs::Job<Self>,
(weather_grid, mut scheduler, clients): Self::SystemData,
) {
if scheduler.should_run() {
let mut lazy_msg = None;
for client in clients.join() {
if lazy_msg.is_none() {
lazy_msg =
Some(client.prepare(ServerGeneral::WeatherUpdate(weather_grid.clone())));
}
lazy_msg.as_ref().map(|msg| client.send_prepared(msg));
}
}
}
}

View File

@ -1,12 +1,37 @@
use common::{event::EventBus, outcome::Outcome, resources::TimeOfDay, weather::WeatherGrid}; use common::{
comp,
event::EventBus,
outcome::Outcome,
resources::{DeltaTime, ProgramTime, TimeOfDay},
slowjob::{SlowJob, SlowJobPool},
weather::{SharedWeatherGrid, WeatherGrid},
};
use common_ecs::{Origin, Phase, System}; use common_ecs::{Origin, Phase, System};
use specs::{Read, ReadExpect, Write, WriteExpect}; use common_net::msg::ServerGeneral;
use std::sync::Arc; use rand::{seq::SliceRandom, thread_rng, Rng};
use specs::{Join, Read, ReadExpect, ReadStorage, Write, WriteExpect};
use std::{mem, sync::Arc};
use world::World; use world::World;
use crate::sys::SysScheduler; use crate::client::Client;
use super::sim::WeatherSim; use super::{
sim::{LightningCells, WeatherSim},
WEATHER_DT,
};
enum WeatherJobState {
Working(SlowJob),
Idle(WeatherSim),
None,
}
pub struct WeatherJob {
last_update: ProgramTime,
weather_tx: crossbeam_channel::Sender<(WeatherGrid, LightningCells, WeatherSim)>,
weather_rx: crossbeam_channel::Receiver<(WeatherGrid, LightningCells, WeatherSim)>,
state: WeatherJobState,
}
#[derive(Default)] #[derive(Default)]
pub struct Sys; pub struct Sys;
@ -14,11 +39,16 @@ pub struct Sys;
impl<'a> System<'a> for Sys { impl<'a> System<'a> for Sys {
type SystemData = ( type SystemData = (
Read<'a, TimeOfDay>, Read<'a, TimeOfDay>,
WriteExpect<'a, WeatherSim>, Read<'a, ProgramTime>,
Read<'a, DeltaTime>,
Write<'a, LightningCells>,
Write<'a, Option<WeatherJob>>,
WriteExpect<'a, WeatherGrid>, WriteExpect<'a, WeatherGrid>,
Write<'a, SysScheduler<Self>>, WriteExpect<'a, SlowJobPool>,
ReadExpect<'a, EventBus<Outcome>>, ReadExpect<'a, EventBus<Outcome>>,
ReadExpect<'a, Arc<World>>, ReadExpect<'a, Arc<World>>,
ReadStorage<'a, Client>,
ReadStorage<'a, comp::Pos>,
); );
const NAME: &'static str = "weather::tick"; const NAME: &'static str = "weather::tick";
@ -27,13 +57,100 @@ impl<'a> System<'a> for Sys {
fn run( fn run(
_job: &mut common_ecs::Job<Self>, _job: &mut common_ecs::Job<Self>,
(game_time, mut sim, mut grid, mut scheduler, outcomes, world): Self::SystemData, (
game_time,
program_time,
delta_time,
mut lightning_cells,
mut weather_job,
mut grid,
slow_job_pool,
outcomes,
world,
clients,
positions,
): Self::SystemData,
) { ) {
if scheduler.should_run() { if let Some(weather_job) = match &mut *weather_job {
if grid.size() != sim.size() { Some(weather_job) => (program_time.0 - weather_job.last_update.0 >= WEATHER_DT as f64)
.then_some(weather_job),
None => {
let (weather_tx, weather_rx) = crossbeam_channel::bounded(1);
let weather_size = world.sim().get_size() / common::weather::CHUNKS_PER_CELL;
let mut sim = WeatherSim::new(weather_size, &world);
*grid = WeatherGrid::new(sim.size()); *grid = WeatherGrid::new(sim.size());
*lightning_cells = sim.tick(*game_time, &mut grid);
*weather_job = Some(WeatherJob {
last_update: *program_time,
weather_tx,
weather_rx,
state: WeatherJobState::Idle(sim),
});
None
},
} {
if matches!(weather_job.state, WeatherJobState::Working(_))
&& let Ok((new_grid, new_lightning_cells, sim)) = weather_job.weather_rx.try_recv() {
*grid = new_grid;
*lightning_cells = new_lightning_cells;
let mut lazy_msg = None;
for client in clients.join() {
if lazy_msg.is_none() {
lazy_msg = Some(client.prepare(ServerGeneral::WeatherUpdate(
SharedWeatherGrid::from(&*grid),
)));
}
lazy_msg.as_ref().map(|msg| client.send_prepared(msg));
}
weather_job.state = WeatherJobState::Idle(sim);
} }
sim.tick(&game_time, &outcomes, &mut grid, &world);
if matches!(weather_job.state, WeatherJobState::Idle(_)) {
let old_state = mem::replace(&mut weather_job.state, WeatherJobState::None);
let WeatherJobState::Idle(mut sim) = old_state else {
unreachable!()
};
let weather_tx = weather_job.weather_tx.clone();
let game_time = *game_time;
let job = slow_job_pool.spawn("WEATHER", move || {
let mut grid = WeatherGrid::new(sim.size());
let lightning_cells = sim.tick(game_time, &mut grid);
weather_tx
.send((grid, lightning_cells, sim))
.expect("We should never send more than 1 of these.")
});
weather_job.state = WeatherJobState::Working(job);
}
}
let mut outcome_emitter = outcomes.emitter();
let mut rng = thread_rng();
let num_cells = lightning_cells.cells.len() as f64 * 0.002 * delta_time.0 as f64;
let num_cells = num_cells.floor() as u32 + rng.gen_bool(num_cells.fract()) as u32;
for _ in 0..num_cells {
let cell_pos = lightning_cells.cells.choose(&mut rng).expect(
"This is non-empty, since we multiply with its len for the chance to do a \
lightning strike.",
);
let wpos = cell_pos.map(|e| {
(e as f32 + thread_rng().gen_range(0.0..1.0)) * common::weather::CELL_SIZE as f32
});
outcome_emitter.emit(Outcome::Lightning {
pos: wpos.with_z(world.sim().get_alt_approx(wpos.as_()).unwrap_or(0.0)),
});
}
for (client, pos) in (&clients, &positions).join() {
let weather = grid.get_interpolated(pos.0.xy());
client.send_fallible(ServerGeneral::LocalWeatherUpdate(weather));
} }
} }
} }