From db4e86a38bcff75778248ff26a73837e4cc94795 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Thu, 11 Mar 2021 13:07:03 +0100 Subject: [PATCH] git push -fTransport ChunkRequests and Chunkupdates in a own stream. ChunkUpdates are very big and having them in a own stream, helps slow clients to keep up with entity syncs and lagging a bit behind on terrain. Which is fine. Block Places and Block Pickup are not handled in this stream, as they go through the standart route of event handling. --- client/src/lib.rs | 50 +++++++++++------ common/net/src/msg/client.rs | 7 ++- common/net/src/msg/server.rs | 7 ++- server/src/client.rs | 23 ++++++-- server/src/connection_handler.rs | 8 ++- server/src/lib.rs | 1 + server/src/sys/msg/in_game.rs | 42 ++------------ server/src/sys/msg/mod.rs | 1 + server/src/sys/msg/terrain.rs | 95 ++++++++++++++++++++++++++++++++ 9 files changed, 166 insertions(+), 68 deletions(-) create mode 100644 server/src/sys/msg/terrain.rs diff --git a/client/src/lib.rs b/client/src/lib.rs index 7b7232a5d1..8eddf0153b 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -156,6 +156,7 @@ pub struct Client { register_stream: Stream, character_screen_stream: Stream, in_game_stream: Stream, + terrain_stream: Stream, client_timeout: Duration, last_server_ping: f64, @@ -215,6 +216,7 @@ impl Client { let mut register_stream = participant.opened().await?; let character_screen_stream = participant.opened().await?; let in_game_stream = participant.opened().await?; + let terrain_stream = participant.opened().await?; register_stream.send(ClientType::Game)?; let server_info: ServerInfo = register_stream.recv().await?; @@ -456,6 +458,7 @@ impl Client { register_stream, character_screen_stream, in_game_stream, + terrain_stream, client_timeout, @@ -550,10 +553,11 @@ impl Client { | ClientGeneral::PlaceBlock(_, _) | ClientGeneral::ExitInGame | ClientGeneral::PlayerPhysics { .. } - | ClientGeneral::TerrainChunkRequest { .. } | ClientGeneral::UnlockSkill(_) | ClientGeneral::RefundSkill(_) | ClientGeneral::UnlockSkillGroup(_) => &mut self.in_game_stream, + //Only in game, terrain + ClientGeneral::TerrainChunkRequest { .. } => &mut self.terrain_stream, //Always possible ClientGeneral::ChatMsg(_) | ClientGeneral::Terminate => { &mut self.general_stream @@ -1555,17 +1559,6 @@ impl Client { frontend_events.push(Event::InventoryUpdated(event)); }, - ServerGeneral::TerrainChunkUpdate { key, chunk } => { - if let Ok(chunk) = chunk { - self.state.insert_chunk(key, *chunk); - } - self.pending_chunks.remove(&key); - }, - ServerGeneral::TerrainBlockUpdates(mut blocks) => { - blocks.drain().for_each(|(pos, block)| { - self.state.set_block(pos, block); - }); - }, ServerGeneral::SetViewDistance(vd) => { self.view_distance = Some(vd); frontend_events.push(Event::SetViewDistance(vd)); @@ -1596,6 +1589,25 @@ impl Client { Ok(()) } + #[allow(clippy::unnecessary_wraps)] + fn handle_server_terrain_msg(&mut self, msg: ServerGeneral) -> Result<(), Error> { + match msg { + ServerGeneral::TerrainChunkUpdate { key, chunk } => { + if let Ok(chunk) = chunk { + self.state.insert_chunk(key, *chunk); + } + self.pending_chunks.remove(&key); + }, + ServerGeneral::TerrainBlockUpdates(mut blocks) => { + blocks.drain().for_each(|(pos, block)| { + self.state.set_block(pos, block); + }); + }, + _ => unreachable!("Not a terrain message"), + } + Ok(()) + } + #[allow(clippy::unnecessary_wraps)] fn handle_server_character_screen_msg( &mut self, @@ -1658,11 +1670,12 @@ impl Client { cnt: &mut u64, ) -> Result<(), Error> { loop { - let (m1, m2, m3, m4) = select!( - msg = self.general_stream.recv().fuse() => (Some(msg), None, None, None), - msg = self.ping_stream.recv().fuse() => (None, Some(msg), None, None), - msg = self.character_screen_stream.recv().fuse() => (None, None, Some(msg), None), - msg = self.in_game_stream.recv().fuse() => (None, None, None, Some(msg)), + let (m1, m2, m3, m4, m5) = select!( + msg = self.general_stream.recv().fuse() => (Some(msg), None, None, None, None), + msg = self.ping_stream.recv().fuse() => (None, Some(msg), None, None, None), + msg = self.character_screen_stream.recv().fuse() => (None, None, Some(msg), None, None), + msg = self.in_game_stream.recv().fuse() => (None, None, None, Some(msg), None), + msg = self.terrain_stream.recv().fuse() => (None, None, None, None, Some(msg)), ); *cnt += 1; if let Some(msg) = m1 { @@ -1677,6 +1690,9 @@ impl Client { if let Some(msg) = m4 { self.handle_server_in_game_msg(frontend_events, msg?)?; } + if let Some(msg) = m5 { + self.handle_server_terrain_msg(msg?)?; + } } } diff --git a/common/net/src/msg/client.rs b/common/net/src/msg/client.rs index 8b040a6b7b..e44514798b 100644 --- a/common/net/src/msg/client.rs +++ b/common/net/src/msg/client.rs @@ -70,12 +70,13 @@ pub enum ClientGeneral { vel: comp::Vel, ori: comp::Ori, }, - TerrainChunkRequest { - key: Vec2, - }, UnlockSkill(Skill), RefundSkill(Skill), UnlockSkillGroup(SkillGroupKind), + //Only in Game, via terrain stream + TerrainChunkRequest { + key: Vec2, + }, //Always possible ChatMsg(String), Terminate, diff --git a/common/net/src/msg/server.rs b/common/net/src/msg/server.rs index 9ea2297a97..23fbe5372c 100644 --- a/common/net/src/msg/server.rs +++ b/common/net/src/msg/server.rs @@ -101,14 +101,15 @@ pub enum ServerGeneral { /// from an ingame state ExitInGameSuccess, InventoryUpdate(comp::Inventory, comp::InventoryUpdateEvent), + SetViewDistance(u32), + Outcomes(Vec), + Knockback(Vec3), + // Ingame related AND terrain stream TerrainChunkUpdate { key: Vec2, chunk: Result, ()>, }, TerrainBlockUpdates(HashMap, Block>), - SetViewDistance(u32), - Outcomes(Vec), - Knockback(Vec3), // Always possible PlayerListUpdate(PlayerListUpdate), /// A message to go into the client chat box. The client is responsible for diff --git a/server/src/client.rs b/server/src/client.rs index c4b1e68057..65408463cb 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -25,6 +25,7 @@ pub struct Client { register_stream: Mutex, character_screen_stream: Mutex, in_game_stream: Mutex, + terrain_stream: Mutex, } pub struct PreparedMsg { @@ -47,6 +48,7 @@ impl Client { register_stream: Stream, character_screen_stream: Stream, in_game_stream: Stream, + terrain_stream: Stream, ) -> Self { Client { client_type, @@ -59,6 +61,7 @@ impl Client { register_stream: Mutex::new(register_stream), character_screen_stream: Mutex::new(character_screen_stream), in_game_stream: Mutex::new(in_game_stream), + terrain_stream: Mutex::new(terrain_stream), } } @@ -84,8 +87,6 @@ impl Client { | ServerGeneral::InviteComplete { .. } | ServerGeneral::ExitInGameSuccess | ServerGeneral::InventoryUpdate(_, _) - | ServerGeneral::TerrainChunkUpdate { .. } - | ServerGeneral::TerrainBlockUpdates(_) | ServerGeneral::SetViewDistance(_) | ServerGeneral::Outcomes(_) | ServerGeneral::Knockback(_) @@ -93,6 +94,11 @@ impl Client { | ServerGeneral::FinishedTrade(_) => { self.in_game_stream.try_lock().unwrap().send(g) }, + //Ingame related, terrain + ServerGeneral::TerrainChunkUpdate { .. } + | ServerGeneral::TerrainBlockUpdates(_) => { + self.terrain_stream.try_lock().unwrap().send(g) + }, // Always possible ServerGeneral::PlayerListUpdate(_) | ServerGeneral::ChatMsg(_) @@ -138,6 +144,11 @@ impl Client { .unwrap() .send_raw(&msg.message), 4 => self.ping_stream.try_lock().unwrap().send_raw(&msg.message), + 5 => self + .terrain_stream + .try_lock() + .unwrap() + .send_raw(&msg.message), _ => unreachable!("invalid stream id"), } } @@ -164,8 +175,6 @@ impl Client { | ServerGeneral::InviteComplete { .. } | ServerGeneral::ExitInGameSuccess | ServerGeneral::InventoryUpdate(_, _) - | ServerGeneral::TerrainChunkUpdate { .. } - | ServerGeneral::TerrainBlockUpdates(_) | ServerGeneral::SetViewDistance(_) | ServerGeneral::Outcomes(_) | ServerGeneral::Knockback(_) @@ -173,6 +182,11 @@ impl Client { | ServerGeneral::FinishedTrade(_) => { PreparedMsg::new(2, &g, &self.in_game_stream) }, + //Ingame related, terrain + ServerGeneral::TerrainChunkUpdate { .. } + | ServerGeneral::TerrainBlockUpdates(_) => { + PreparedMsg::new(5, &g, &self.terrain_stream) + }, // Always possible ServerGeneral::PlayerListUpdate(_) | ServerGeneral::ChatMsg(_) @@ -203,6 +217,7 @@ impl Client { 2 => self.in_game_stream.try_lock().unwrap().try_recv(), 3 => self.general_stream.try_lock().unwrap().try_recv(), 4 => self.ping_stream.try_lock().unwrap().try_recv(), + 5 => self.terrain_stream.try_lock().unwrap().try_recv(), _ => unreachable!("invalid stream id"), } } diff --git a/server/src/connection_handler.rs b/server/src/connection_handler.rs index e5db83c021..44b975ec04 100644 --- a/server/src/connection_handler.rs +++ b/server/src/connection_handler.rs @@ -102,9 +102,10 @@ impl ConnectionHandler { let general_stream = participant.open(3, reliablec, 500).await?; let ping_stream = participant.open(2, reliable, 500).await?; - let mut register_stream = participant.open(3, reliablec, 0).await?; - let character_screen_stream = participant.open(3, reliablec, 0).await?; - let in_game_stream = participant.open(3, reliablec, 400_000).await?; + let mut register_stream = participant.open(3, reliablec, 500).await?; + let character_screen_stream = participant.open(3, reliablec, 500).await?; + let in_game_stream = participant.open(3, reliablec, 100_000).await?; + let terrain_stream = participant.open(4, reliablec, 20_000).await?; let server_data = receiver.recv()?; @@ -131,6 +132,7 @@ impl ConnectionHandler { register_stream, character_screen_stream, in_game_stream, + terrain_stream, ); client_sender.send(client)?; diff --git a/server/src/lib.rs b/server/src/lib.rs index 8bc0ea8aab..d90ed25531 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -499,6 +499,7 @@ impl Server { run_now::(&self.state.ecs()); run_now::(&self.state.ecs()); run_now::(&self.state.ecs()); + run_now::(&self.state.ecs()); run_now::(&self.state.ecs()); run_now::(&self.state.ecs()); diff --git a/server/src/sys/msg/in_game.rs b/server/src/sys/msg/in_game.rs index e79436de1f..edcb7077c6 100644 --- a/server/src/sys/msg/in_game.rs +++ b/server/src/sys/msg/in_game.rs @@ -1,9 +1,9 @@ -use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence, Settings}; +use crate::{client::Client, presence::Presence, Settings}; use common::{ comp::{CanBuild, ControlEvent, Controller, ForceUpdate, Health, Ori, Pos, Stats, Vel}, event::{EventBus, ServerEvent}, - terrain::{TerrainChunkSize, TerrainGrid}, - vol::{ReadVol, RectVolSize}, + terrain::TerrainGrid, + vol::ReadVol, }; use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{ClientGeneral, PresenceKind, ServerGeneral}; @@ -19,7 +19,6 @@ impl Sys { client: &Client, maybe_presence: &mut Option<&mut Presence>, terrain: &ReadExpect<'_, TerrainGrid>, - network_metrics: &ReadExpect<'_, NetworkRequestMetrics>, can_build: &ReadStorage<'_, CanBuild>, force_updates: &ReadStorage<'_, ForceUpdate>, stats: &mut WriteStorage<'_, Stats>, @@ -37,9 +36,6 @@ impl Sys { None => { debug!(?entity, "client is not in_game, ignoring msg"); trace!(?msg, "ignored msg content"); - if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) { - network_metrics.chunks_request_dropped.inc(); - } return Ok(()); }, }; @@ -115,33 +111,6 @@ impl Sys { block_changes.try_set(pos, block); } }, - ClientGeneral::TerrainChunkRequest { key } => { - let in_vd = if let Some(pos) = positions.get(entity) { - pos.0.xy().map(|e| e as f64).distance( - key.map(|e| e as f64 + 0.5) * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), - ) < (presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt()) - * TerrainChunkSize::RECT_SIZE.x as f64 - } else { - true - }; - if in_vd { - match terrain.get_key(key) { - Some(chunk) => { - network_metrics.chunks_served_from_memory.inc(); - client.send(ServerGeneral::TerrainChunkUpdate { - key, - chunk: Ok(Box::new(chunk.clone())), - })? - }, - None => { - network_metrics.chunks_generation_triggered.inc(); - server_emitter.emit(ServerEvent::ChunkRequest(entity, key)) - }, - } - } else { - network_metrics.chunks_request_dropped.inc(); - } - }, ClientGeneral::UnlockSkill(skill) => { stats .get_mut(entity) @@ -157,7 +126,7 @@ impl Sys { .get_mut(entity) .map(|mut s| s.skill_set.unlock_skill_group(skill_group_kind)); }, - _ => unreachable!("not a client_in_game msg"), + _ => tracing::error!("not a client_in_game msg"), } Ok(()) } @@ -172,7 +141,6 @@ impl<'a> System<'a> for Sys { Entities<'a>, Read<'a, EventBus>, ReadExpect<'a, TerrainGrid>, - ReadExpect<'a, NetworkRequestMetrics>, ReadStorage<'a, CanBuild>, ReadStorage<'a, ForceUpdate>, WriteStorage<'a, Stats>, @@ -197,7 +165,6 @@ impl<'a> System<'a> for Sys { entities, server_event_bus, terrain, - network_metrics, can_build, force_updates, mut stats, @@ -224,7 +191,6 @@ impl<'a> System<'a> for Sys { client, &mut maybe_presence.as_deref_mut(), &terrain, - &network_metrics, &can_build, &force_updates, &mut stats, diff --git a/server/src/sys/msg/mod.rs b/server/src/sys/msg/mod.rs index 9fceb19428..7967acf8a0 100644 --- a/server/src/sys/msg/mod.rs +++ b/server/src/sys/msg/mod.rs @@ -3,6 +3,7 @@ pub mod general; pub mod in_game; pub mod ping; pub mod register; +pub mod terrain; use crate::client::Client; use serde::de::DeserializeOwned; diff --git a/server/src/sys/msg/terrain.rs b/server/src/sys/msg/terrain.rs new file mode 100644 index 0000000000..3f5ddffe13 --- /dev/null +++ b/server/src/sys/msg/terrain.rs @@ -0,0 +1,95 @@ +use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence}; +use common::{ + comp::Pos, + event::{EventBus, ServerEvent}, + terrain::{TerrainChunkSize, TerrainGrid}, + vol::RectVolSize, +}; +use common_ecs::{Job, Origin, Phase, System}; +use common_net::msg::{ClientGeneral, ServerGeneral}; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage}; +use tracing::{debug, trace}; + +/// This system will handle new messages from clients +#[derive(Default)] +pub struct Sys; +impl<'a> System<'a> for Sys { + #[allow(clippy::type_complexity)] + type SystemData = ( + Entities<'a>, + Read<'a, EventBus>, + ReadExpect<'a, TerrainGrid>, + ReadExpect<'a, NetworkRequestMetrics>, + ReadStorage<'a, Pos>, + ReadStorage<'a, Presence>, + ReadStorage<'a, Client>, + ); + + const NAME: &'static str = "msg::in_game"; + const ORIGIN: Origin = Origin::Server; + const PHASE: Phase = Phase::Create; + + fn run( + _job: &mut Job, + ( + entities, + server_event_bus, + terrain, + network_metrics, + positions, + presences, + clients, + ): Self::SystemData, + ) { + let mut server_emitter = server_event_bus.emitter(); + + for (entity, client, maybe_presence) in (&entities, &clients, (&presences).maybe()).join() { + let _ = super::try_recv_all(client, 5, |client, msg| { + let presence = match maybe_presence { + Some(g) => g, + None => { + debug!(?entity, "client is not in_game, ignoring msg"); + trace!(?msg, "ignored msg content"); + if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) { + network_metrics.chunks_request_dropped.inc(); + } + return Ok(()); + }, + }; + match msg { + ClientGeneral::TerrainChunkRequest { key } => { + let in_vd = if let Some(pos) = positions.get(entity) { + pos.0.xy().map(|e| e as f64).distance_squared( + key.map(|e| e as f64 + 0.5) + * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), + ) < ((presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt()) + * TerrainChunkSize::RECT_SIZE.x as f64) + .powi(2) + } else { + true + }; + if in_vd { + match terrain.get_key(key) { + Some(chunk) => { + network_metrics.chunks_served_from_memory.inc(); + client.send(ServerGeneral::TerrainChunkUpdate { + key, + chunk: Ok(Box::new(chunk.clone())), + })? + }, + None => { + network_metrics.chunks_generation_triggered.inc(); + server_emitter.emit(ServerEvent::ChunkRequest(entity, key)) + }, + } + } else { + network_metrics.chunks_request_dropped.inc(); + } + }, + _ => tracing::error!("not a client_terrain msg"), + } + Ok(()) + }); + } + } +}