git push -fTransport ChunkRequests and Chunkupdates in a own stream. ChunkUpdates are very big and having them in a own stream, helps slow clients to keep up with entity syncs and

lagging a bit behind on terrain. Which is fine. Block Places and Block Pickup are not handled in this stream, as they go through the standart route of event handling.
This commit is contained in:
Marcel Märtens 2021-03-11 13:07:03 +01:00
parent 3a7ff55dc0
commit f8b7d96066
9 changed files with 166 additions and 68 deletions

View File

@ -156,6 +156,7 @@ pub struct Client {
register_stream: Stream,
character_screen_stream: Stream,
in_game_stream: Stream,
terrain_stream: Stream,
client_timeout: Duration,
last_server_ping: f64,
@ -215,6 +216,7 @@ impl Client {
let mut register_stream = participant.opened().await?;
let character_screen_stream = participant.opened().await?;
let in_game_stream = participant.opened().await?;
let terrain_stream = participant.opened().await?;
register_stream.send(ClientType::Game)?;
let server_info: ServerInfo = register_stream.recv().await?;
@ -456,6 +458,7 @@ impl Client {
register_stream,
character_screen_stream,
in_game_stream,
terrain_stream,
client_timeout,
@ -550,10 +553,11 @@ impl Client {
| ClientGeneral::PlaceBlock(_, _)
| ClientGeneral::ExitInGame
| ClientGeneral::PlayerPhysics { .. }
| ClientGeneral::TerrainChunkRequest { .. }
| ClientGeneral::UnlockSkill(_)
| ClientGeneral::RefundSkill(_)
| ClientGeneral::UnlockSkillGroup(_) => &mut self.in_game_stream,
//Only in game, terrain
ClientGeneral::TerrainChunkRequest { .. } => &mut self.terrain_stream,
//Always possible
ClientGeneral::ChatMsg(_) | ClientGeneral::Terminate => {
&mut self.general_stream
@ -1555,17 +1559,6 @@ impl Client {
frontend_events.push(Event::InventoryUpdated(event));
},
ServerGeneral::TerrainChunkUpdate { key, chunk } => {
if let Ok(chunk) = chunk {
self.state.insert_chunk(key, *chunk);
}
self.pending_chunks.remove(&key);
},
ServerGeneral::TerrainBlockUpdates(mut blocks) => {
blocks.drain().for_each(|(pos, block)| {
self.state.set_block(pos, block);
});
},
ServerGeneral::SetViewDistance(vd) => {
self.view_distance = Some(vd);
frontend_events.push(Event::SetViewDistance(vd));
@ -1596,6 +1589,25 @@ impl Client {
Ok(())
}
#[allow(clippy::unnecessary_wraps)]
fn handle_server_terrain_msg(&mut self, msg: ServerGeneral) -> Result<(), Error> {
match msg {
ServerGeneral::TerrainChunkUpdate { key, chunk } => {
if let Ok(chunk) = chunk {
self.state.insert_chunk(key, *chunk);
}
self.pending_chunks.remove(&key);
},
ServerGeneral::TerrainBlockUpdates(mut blocks) => {
blocks.drain().for_each(|(pos, block)| {
self.state.set_block(pos, block);
});
},
_ => unreachable!("Not a terrain message"),
}
Ok(())
}
#[allow(clippy::unnecessary_wraps)]
fn handle_server_character_screen_msg(
&mut self,
@ -1658,11 +1670,12 @@ impl Client {
cnt: &mut u64,
) -> Result<(), Error> {
loop {
let (m1, m2, m3, m4) = select!(
msg = self.general_stream.recv().fuse() => (Some(msg), None, None, None),
msg = self.ping_stream.recv().fuse() => (None, Some(msg), None, None),
msg = self.character_screen_stream.recv().fuse() => (None, None, Some(msg), None),
msg = self.in_game_stream.recv().fuse() => (None, None, None, Some(msg)),
let (m1, m2, m3, m4, m5) = select!(
msg = self.general_stream.recv().fuse() => (Some(msg), None, None, None, None),
msg = self.ping_stream.recv().fuse() => (None, Some(msg), None, None, None),
msg = self.character_screen_stream.recv().fuse() => (None, None, Some(msg), None, None),
msg = self.in_game_stream.recv().fuse() => (None, None, None, Some(msg), None),
msg = self.terrain_stream.recv().fuse() => (None, None, None, None, Some(msg)),
);
*cnt += 1;
if let Some(msg) = m1 {
@ -1677,6 +1690,9 @@ impl Client {
if let Some(msg) = m4 {
self.handle_server_in_game_msg(frontend_events, msg?)?;
}
if let Some(msg) = m5 {
self.handle_server_terrain_msg(msg?)?;
}
}
}

View File

@ -70,12 +70,13 @@ pub enum ClientGeneral {
vel: comp::Vel,
ori: comp::Ori,
},
TerrainChunkRequest {
key: Vec2<i32>,
},
UnlockSkill(Skill),
RefundSkill(Skill),
UnlockSkillGroup(SkillGroupKind),
//Only in Game, via terrain stream
TerrainChunkRequest {
key: Vec2<i32>,
},
//Always possible
ChatMsg(String),
Terminate,

View File

@ -101,14 +101,15 @@ pub enum ServerGeneral {
/// from an ingame state
ExitInGameSuccess,
InventoryUpdate(comp::Inventory, comp::InventoryUpdateEvent),
SetViewDistance(u32),
Outcomes(Vec<Outcome>),
Knockback(Vec3<f32>),
// Ingame related AND terrain stream
TerrainChunkUpdate {
key: Vec2<i32>,
chunk: Result<Box<TerrainChunk>, ()>,
},
TerrainBlockUpdates(HashMap<Vec3<i32>, Block>),
SetViewDistance(u32),
Outcomes(Vec<Outcome>),
Knockback(Vec3<f32>),
// Always possible
PlayerListUpdate(PlayerListUpdate),
/// A message to go into the client chat box. The client is responsible for

View File

@ -25,6 +25,7 @@ pub struct Client {
register_stream: Mutex<Stream>,
character_screen_stream: Mutex<Stream>,
in_game_stream: Mutex<Stream>,
terrain_stream: Mutex<Stream>,
}
pub struct PreparedMsg {
@ -47,6 +48,7 @@ impl Client {
register_stream: Stream,
character_screen_stream: Stream,
in_game_stream: Stream,
terrain_stream: Stream,
) -> Self {
Client {
client_type,
@ -59,6 +61,7 @@ impl Client {
register_stream: Mutex::new(register_stream),
character_screen_stream: Mutex::new(character_screen_stream),
in_game_stream: Mutex::new(in_game_stream),
terrain_stream: Mutex::new(terrain_stream),
}
}
@ -84,8 +87,6 @@ impl Client {
| ServerGeneral::InviteComplete { .. }
| ServerGeneral::ExitInGameSuccess
| ServerGeneral::InventoryUpdate(_, _)
| ServerGeneral::TerrainChunkUpdate { .. }
| ServerGeneral::TerrainBlockUpdates(_)
| ServerGeneral::SetViewDistance(_)
| ServerGeneral::Outcomes(_)
| ServerGeneral::Knockback(_)
@ -93,6 +94,11 @@ impl Client {
| ServerGeneral::FinishedTrade(_) => {
self.in_game_stream.try_lock().unwrap().send(g)
},
//Ingame related, terrain
ServerGeneral::TerrainChunkUpdate { .. }
| ServerGeneral::TerrainBlockUpdates(_) => {
self.terrain_stream.try_lock().unwrap().send(g)
},
// Always possible
ServerGeneral::PlayerListUpdate(_)
| ServerGeneral::ChatMsg(_)
@ -138,6 +144,11 @@ impl Client {
.unwrap()
.send_raw(&msg.message),
4 => self.ping_stream.try_lock().unwrap().send_raw(&msg.message),
5 => self
.terrain_stream
.try_lock()
.unwrap()
.send_raw(&msg.message),
_ => unreachable!("invalid stream id"),
}
}
@ -164,8 +175,6 @@ impl Client {
| ServerGeneral::InviteComplete { .. }
| ServerGeneral::ExitInGameSuccess
| ServerGeneral::InventoryUpdate(_, _)
| ServerGeneral::TerrainChunkUpdate { .. }
| ServerGeneral::TerrainBlockUpdates(_)
| ServerGeneral::SetViewDistance(_)
| ServerGeneral::Outcomes(_)
| ServerGeneral::Knockback(_)
@ -173,6 +182,11 @@ impl Client {
| ServerGeneral::FinishedTrade(_) => {
PreparedMsg::new(2, &g, &self.in_game_stream)
},
//Ingame related, terrain
ServerGeneral::TerrainChunkUpdate { .. }
| ServerGeneral::TerrainBlockUpdates(_) => {
PreparedMsg::new(5, &g, &self.terrain_stream)
},
// Always possible
ServerGeneral::PlayerListUpdate(_)
| ServerGeneral::ChatMsg(_)
@ -203,6 +217,7 @@ impl Client {
2 => self.in_game_stream.try_lock().unwrap().try_recv(),
3 => self.general_stream.try_lock().unwrap().try_recv(),
4 => self.ping_stream.try_lock().unwrap().try_recv(),
5 => self.terrain_stream.try_lock().unwrap().try_recv(),
_ => unreachable!("invalid stream id"),
}
}

View File

@ -102,9 +102,10 @@ impl ConnectionHandler {
let general_stream = participant.open(3, reliablec, 500).await?;
let ping_stream = participant.open(2, reliable, 500).await?;
let mut register_stream = participant.open(3, reliablec, 0).await?;
let character_screen_stream = participant.open(3, reliablec, 0).await?;
let in_game_stream = participant.open(3, reliablec, 400_000).await?;
let mut register_stream = participant.open(3, reliablec, 500).await?;
let character_screen_stream = participant.open(3, reliablec, 500).await?;
let in_game_stream = participant.open(3, reliablec, 100_000).await?;
let terrain_stream = participant.open(4, reliablec, 20_000).await?;
let server_data = receiver.recv()?;
@ -131,6 +132,7 @@ impl ConnectionHandler {
register_stream,
character_screen_stream,
in_game_stream,
terrain_stream,
);
client_sender.send(client)?;

View File

@ -499,6 +499,7 @@ impl Server {
run_now::<sys::msg::register::Sys>(&self.state.ecs());
run_now::<sys::msg::character_screen::Sys>(&self.state.ecs());
run_now::<sys::msg::in_game::Sys>(&self.state.ecs());
run_now::<sys::msg::terrain::Sys>(&self.state.ecs());
run_now::<sys::msg::ping::Sys>(&self.state.ecs());
run_now::<sys::agent::Sys>(&self.state.ecs());

View File

@ -1,9 +1,9 @@
use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence, Settings};
use crate::{client::Client, presence::Presence, Settings};
use common::{
comp::{CanBuild, ControlEvent, Controller, ForceUpdate, Health, Ori, Pos, Stats, Vel},
event::{EventBus, ServerEvent},
terrain::{TerrainChunkSize, TerrainGrid},
vol::{ReadVol, RectVolSize},
terrain::TerrainGrid,
vol::ReadVol,
};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{ClientGeneral, PresenceKind, ServerGeneral};
@ -19,7 +19,6 @@ impl Sys {
client: &Client,
maybe_presence: &mut Option<&mut Presence>,
terrain: &ReadExpect<'_, TerrainGrid>,
network_metrics: &ReadExpect<'_, NetworkRequestMetrics>,
can_build: &ReadStorage<'_, CanBuild>,
force_updates: &ReadStorage<'_, ForceUpdate>,
stats: &mut WriteStorage<'_, Stats>,
@ -37,9 +36,6 @@ impl Sys {
None => {
debug!(?entity, "client is not in_game, ignoring msg");
trace!(?msg, "ignored msg content");
if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) {
network_metrics.chunks_request_dropped.inc();
}
return Ok(());
},
};
@ -115,33 +111,6 @@ impl Sys {
block_changes.try_set(pos, block);
}
},
ClientGeneral::TerrainChunkRequest { key } => {
let in_vd = if let Some(pos) = positions.get(entity) {
pos.0.xy().map(|e| e as f64).distance(
key.map(|e| e as f64 + 0.5) * TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
) < (presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt())
* TerrainChunkSize::RECT_SIZE.x as f64
} else {
true
};
if in_vd {
match terrain.get_key(key) {
Some(chunk) => {
network_metrics.chunks_served_from_memory.inc();
client.send(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
})?
},
None => {
network_metrics.chunks_generation_triggered.inc();
server_emitter.emit(ServerEvent::ChunkRequest(entity, key))
},
}
} else {
network_metrics.chunks_request_dropped.inc();
}
},
ClientGeneral::UnlockSkill(skill) => {
stats
.get_mut(entity)
@ -157,7 +126,7 @@ impl Sys {
.get_mut(entity)
.map(|mut s| s.skill_set.unlock_skill_group(skill_group_kind));
},
_ => unreachable!("not a client_in_game msg"),
_ => tracing::error!("not a client_in_game msg"),
}
Ok(())
}
@ -172,7 +141,6 @@ impl<'a> System<'a> for Sys {
Entities<'a>,
Read<'a, EventBus<ServerEvent>>,
ReadExpect<'a, TerrainGrid>,
ReadExpect<'a, NetworkRequestMetrics>,
ReadStorage<'a, CanBuild>,
ReadStorage<'a, ForceUpdate>,
WriteStorage<'a, Stats>,
@ -197,7 +165,6 @@ impl<'a> System<'a> for Sys {
entities,
server_event_bus,
terrain,
network_metrics,
can_build,
force_updates,
mut stats,
@ -224,7 +191,6 @@ impl<'a> System<'a> for Sys {
client,
&mut maybe_presence.as_deref_mut(),
&terrain,
&network_metrics,
&can_build,
&force_updates,
&mut stats,

View File

@ -3,6 +3,7 @@ pub mod general;
pub mod in_game;
pub mod ping;
pub mod register;
pub mod terrain;
use crate::client::Client;
use serde::de::DeserializeOwned;

View File

@ -0,0 +1,95 @@
use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence};
use common::{
comp::Pos,
event::{EventBus, ServerEvent},
terrain::{TerrainChunkSize, TerrainGrid},
vol::RectVolSize,
};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{ClientGeneral, ServerGeneral};
use specs::{Entities, Join, Read, ReadExpect, ReadStorage};
use tracing::{debug, trace};
/// This system will handle new messages from clients
#[derive(Default)]
pub struct Sys;
impl<'a> System<'a> for Sys {
#[allow(clippy::type_complexity)]
type SystemData = (
Entities<'a>,
Read<'a, EventBus<ServerEvent>>,
ReadExpect<'a, TerrainGrid>,
ReadExpect<'a, NetworkRequestMetrics>,
ReadStorage<'a, Pos>,
ReadStorage<'a, Presence>,
ReadStorage<'a, Client>,
);
const NAME: &'static str = "msg::in_game";
const ORIGIN: Origin = Origin::Server;
const PHASE: Phase = Phase::Create;
fn run(
_job: &mut Job<Self>,
(
entities,
server_event_bus,
terrain,
network_metrics,
positions,
presences,
clients,
): Self::SystemData,
) {
let mut server_emitter = server_event_bus.emitter();
for (entity, client, maybe_presence) in (&entities, &clients, (&presences).maybe()).join() {
let _ = super::try_recv_all(client, 5, |client, msg| {
let presence = match maybe_presence {
Some(g) => g,
None => {
debug!(?entity, "client is not in_game, ignoring msg");
trace!(?msg, "ignored msg content");
if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) {
network_metrics.chunks_request_dropped.inc();
}
return Ok(());
},
};
match msg {
ClientGeneral::TerrainChunkRequest { key } => {
let in_vd = if let Some(pos) = positions.get(entity) {
pos.0.xy().map(|e| e as f64).distance_squared(
key.map(|e| e as f64 + 0.5)
* TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
) < ((presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt())
* TerrainChunkSize::RECT_SIZE.x as f64)
.powi(2)
} else {
true
};
if in_vd {
match terrain.get_key(key) {
Some(chunk) => {
network_metrics.chunks_served_from_memory.inc();
client.send(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
})?
},
None => {
network_metrics.chunks_generation_triggered.inc();
server_emitter.emit(ServerEvent::ChunkRequest(entity, key))
},
}
} else {
network_metrics.chunks_request_dropped.inc();
}
},
_ => tracing::error!("not a client_terrain msg"),
}
Ok(())
});
}
}
}