Merge branch 'xMAC94x/terrain_stream' into 'master'

Transport ChunkRequests and Chunkupdates in a own stream.

See merge request veloren/veloren!1876
This commit is contained in:
Marcel 2021-03-11 15:46:51 +00:00
commit a3d54e908e
9 changed files with 166 additions and 68 deletions

View File

@ -156,6 +156,7 @@ pub struct Client {
register_stream: Stream,
character_screen_stream: Stream,
in_game_stream: Stream,
terrain_stream: Stream,
client_timeout: Duration,
last_server_ping: f64,
@ -215,6 +216,7 @@ impl Client {
let mut register_stream = participant.opened().await?;
let character_screen_stream = participant.opened().await?;
let in_game_stream = participant.opened().await?;
let terrain_stream = participant.opened().await?;
register_stream.send(ClientType::Game)?;
let server_info: ServerInfo = register_stream.recv().await?;
@ -456,6 +458,7 @@ impl Client {
register_stream,
character_screen_stream,
in_game_stream,
terrain_stream,
client_timeout,
@ -550,10 +553,11 @@ impl Client {
| ClientGeneral::PlaceBlock(_, _)
| ClientGeneral::ExitInGame
| ClientGeneral::PlayerPhysics { .. }
| ClientGeneral::TerrainChunkRequest { .. }
| ClientGeneral::UnlockSkill(_)
| ClientGeneral::RefundSkill(_)
| ClientGeneral::UnlockSkillGroup(_) => &mut self.in_game_stream,
//Only in game, terrain
ClientGeneral::TerrainChunkRequest { .. } => &mut self.terrain_stream,
//Always possible
ClientGeneral::ChatMsg(_) | ClientGeneral::Terminate => {
&mut self.general_stream
@ -1555,17 +1559,6 @@ impl Client {
frontend_events.push(Event::InventoryUpdated(event));
},
ServerGeneral::TerrainChunkUpdate { key, chunk } => {
if let Ok(chunk) = chunk {
self.state.insert_chunk(key, *chunk);
}
self.pending_chunks.remove(&key);
},
ServerGeneral::TerrainBlockUpdates(mut blocks) => {
blocks.drain().for_each(|(pos, block)| {
self.state.set_block(pos, block);
});
},
ServerGeneral::SetViewDistance(vd) => {
self.view_distance = Some(vd);
frontend_events.push(Event::SetViewDistance(vd));
@ -1596,6 +1589,25 @@ impl Client {
Ok(())
}
#[allow(clippy::unnecessary_wraps)]
fn handle_server_terrain_msg(&mut self, msg: ServerGeneral) -> Result<(), Error> {
match msg {
ServerGeneral::TerrainChunkUpdate { key, chunk } => {
if let Ok(chunk) = chunk {
self.state.insert_chunk(key, *chunk);
}
self.pending_chunks.remove(&key);
},
ServerGeneral::TerrainBlockUpdates(mut blocks) => {
blocks.drain().for_each(|(pos, block)| {
self.state.set_block(pos, block);
});
},
_ => unreachable!("Not a terrain message"),
}
Ok(())
}
#[allow(clippy::unnecessary_wraps)]
fn handle_server_character_screen_msg(
&mut self,
@ -1658,11 +1670,12 @@ impl Client {
cnt: &mut u64,
) -> Result<(), Error> {
loop {
let (m1, m2, m3, m4) = select!(
msg = self.general_stream.recv().fuse() => (Some(msg), None, None, None),
msg = self.ping_stream.recv().fuse() => (None, Some(msg), None, None),
msg = self.character_screen_stream.recv().fuse() => (None, None, Some(msg), None),
msg = self.in_game_stream.recv().fuse() => (None, None, None, Some(msg)),
let (m1, m2, m3, m4, m5) = select!(
msg = self.general_stream.recv().fuse() => (Some(msg), None, None, None, None),
msg = self.ping_stream.recv().fuse() => (None, Some(msg), None, None, None),
msg = self.character_screen_stream.recv().fuse() => (None, None, Some(msg), None, None),
msg = self.in_game_stream.recv().fuse() => (None, None, None, Some(msg), None),
msg = self.terrain_stream.recv().fuse() => (None, None, None, None, Some(msg)),
);
*cnt += 1;
if let Some(msg) = m1 {
@ -1677,6 +1690,9 @@ impl Client {
if let Some(msg) = m4 {
self.handle_server_in_game_msg(frontend_events, msg?)?;
}
if let Some(msg) = m5 {
self.handle_server_terrain_msg(msg?)?;
}
}
}

View File

@ -70,12 +70,13 @@ pub enum ClientGeneral {
vel: comp::Vel,
ori: comp::Ori,
},
TerrainChunkRequest {
key: Vec2<i32>,
},
UnlockSkill(Skill),
RefundSkill(Skill),
UnlockSkillGroup(SkillGroupKind),
//Only in Game, via terrain stream
TerrainChunkRequest {
key: Vec2<i32>,
},
//Always possible
ChatMsg(String),
Terminate,

View File

@ -101,14 +101,15 @@ pub enum ServerGeneral {
/// from an ingame state
ExitInGameSuccess,
InventoryUpdate(comp::Inventory, comp::InventoryUpdateEvent),
SetViewDistance(u32),
Outcomes(Vec<Outcome>),
Knockback(Vec3<f32>),
// Ingame related AND terrain stream
TerrainChunkUpdate {
key: Vec2<i32>,
chunk: Result<Box<TerrainChunk>, ()>,
},
TerrainBlockUpdates(HashMap<Vec3<i32>, Block>),
SetViewDistance(u32),
Outcomes(Vec<Outcome>),
Knockback(Vec3<f32>),
// Always possible
PlayerListUpdate(PlayerListUpdate),
/// A message to go into the client chat box. The client is responsible for

View File

@ -25,6 +25,7 @@ pub struct Client {
register_stream: Mutex<Stream>,
character_screen_stream: Mutex<Stream>,
in_game_stream: Mutex<Stream>,
terrain_stream: Mutex<Stream>,
}
pub struct PreparedMsg {
@ -47,6 +48,7 @@ impl Client {
register_stream: Stream,
character_screen_stream: Stream,
in_game_stream: Stream,
terrain_stream: Stream,
) -> Self {
Client {
client_type,
@ -59,6 +61,7 @@ impl Client {
register_stream: Mutex::new(register_stream),
character_screen_stream: Mutex::new(character_screen_stream),
in_game_stream: Mutex::new(in_game_stream),
terrain_stream: Mutex::new(terrain_stream),
}
}
@ -84,8 +87,6 @@ impl Client {
| ServerGeneral::InviteComplete { .. }
| ServerGeneral::ExitInGameSuccess
| ServerGeneral::InventoryUpdate(_, _)
| ServerGeneral::TerrainChunkUpdate { .. }
| ServerGeneral::TerrainBlockUpdates(_)
| ServerGeneral::SetViewDistance(_)
| ServerGeneral::Outcomes(_)
| ServerGeneral::Knockback(_)
@ -93,6 +94,11 @@ impl Client {
| ServerGeneral::FinishedTrade(_) => {
self.in_game_stream.try_lock().unwrap().send(g)
},
//Ingame related, terrain
ServerGeneral::TerrainChunkUpdate { .. }
| ServerGeneral::TerrainBlockUpdates(_) => {
self.terrain_stream.try_lock().unwrap().send(g)
},
// Always possible
ServerGeneral::PlayerListUpdate(_)
| ServerGeneral::ChatMsg(_)
@ -138,6 +144,11 @@ impl Client {
.unwrap()
.send_raw(&msg.message),
4 => self.ping_stream.try_lock().unwrap().send_raw(&msg.message),
5 => self
.terrain_stream
.try_lock()
.unwrap()
.send_raw(&msg.message),
_ => unreachable!("invalid stream id"),
}
}
@ -164,8 +175,6 @@ impl Client {
| ServerGeneral::InviteComplete { .. }
| ServerGeneral::ExitInGameSuccess
| ServerGeneral::InventoryUpdate(_, _)
| ServerGeneral::TerrainChunkUpdate { .. }
| ServerGeneral::TerrainBlockUpdates(_)
| ServerGeneral::SetViewDistance(_)
| ServerGeneral::Outcomes(_)
| ServerGeneral::Knockback(_)
@ -173,6 +182,11 @@ impl Client {
| ServerGeneral::FinishedTrade(_) => {
PreparedMsg::new(2, &g, &self.in_game_stream)
},
//Ingame related, terrain
ServerGeneral::TerrainChunkUpdate { .. }
| ServerGeneral::TerrainBlockUpdates(_) => {
PreparedMsg::new(5, &g, &self.terrain_stream)
},
// Always possible
ServerGeneral::PlayerListUpdate(_)
| ServerGeneral::ChatMsg(_)
@ -203,6 +217,7 @@ impl Client {
2 => self.in_game_stream.try_lock().unwrap().try_recv(),
3 => self.general_stream.try_lock().unwrap().try_recv(),
4 => self.ping_stream.try_lock().unwrap().try_recv(),
5 => self.terrain_stream.try_lock().unwrap().try_recv(),
_ => unreachable!("invalid stream id"),
}
}

View File

@ -102,9 +102,10 @@ impl ConnectionHandler {
let general_stream = participant.open(3, reliablec, 500).await?;
let ping_stream = participant.open(2, reliable, 500).await?;
let mut register_stream = participant.open(3, reliablec, 0).await?;
let character_screen_stream = participant.open(3, reliablec, 0).await?;
let in_game_stream = participant.open(3, reliablec, 400_000).await?;
let mut register_stream = participant.open(3, reliablec, 500).await?;
let character_screen_stream = participant.open(3, reliablec, 500).await?;
let in_game_stream = participant.open(3, reliablec, 100_000).await?;
let terrain_stream = participant.open(4, reliablec, 20_000).await?;
let server_data = receiver.recv()?;
@ -131,6 +132,7 @@ impl ConnectionHandler {
register_stream,
character_screen_stream,
in_game_stream,
terrain_stream,
);
client_sender.send(client)?;

View File

@ -499,6 +499,7 @@ impl Server {
run_now::<sys::msg::register::Sys>(&self.state.ecs());
run_now::<sys::msg::character_screen::Sys>(&self.state.ecs());
run_now::<sys::msg::in_game::Sys>(&self.state.ecs());
run_now::<sys::msg::terrain::Sys>(&self.state.ecs());
run_now::<sys::msg::ping::Sys>(&self.state.ecs());
run_now::<sys::agent::Sys>(&self.state.ecs());

View File

@ -1,9 +1,9 @@
use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence, Settings};
use crate::{client::Client, presence::Presence, Settings};
use common::{
comp::{CanBuild, ControlEvent, Controller, ForceUpdate, Health, Ori, Pos, Stats, Vel},
event::{EventBus, ServerEvent},
terrain::{TerrainChunkSize, TerrainGrid},
vol::{ReadVol, RectVolSize},
terrain::TerrainGrid,
vol::ReadVol,
};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{ClientGeneral, PresenceKind, ServerGeneral};
@ -19,7 +19,6 @@ impl Sys {
client: &Client,
maybe_presence: &mut Option<&mut Presence>,
terrain: &ReadExpect<'_, TerrainGrid>,
network_metrics: &ReadExpect<'_, NetworkRequestMetrics>,
can_build: &ReadStorage<'_, CanBuild>,
force_updates: &ReadStorage<'_, ForceUpdate>,
stats: &mut WriteStorage<'_, Stats>,
@ -37,9 +36,6 @@ impl Sys {
None => {
debug!(?entity, "client is not in_game, ignoring msg");
trace!(?msg, "ignored msg content");
if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) {
network_metrics.chunks_request_dropped.inc();
}
return Ok(());
},
};
@ -115,33 +111,6 @@ impl Sys {
block_changes.try_set(pos, block);
}
},
ClientGeneral::TerrainChunkRequest { key } => {
let in_vd = if let Some(pos) = positions.get(entity) {
pos.0.xy().map(|e| e as f64).distance(
key.map(|e| e as f64 + 0.5) * TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
) < (presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt())
* TerrainChunkSize::RECT_SIZE.x as f64
} else {
true
};
if in_vd {
match terrain.get_key(key) {
Some(chunk) => {
network_metrics.chunks_served_from_memory.inc();
client.send(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
})?
},
None => {
network_metrics.chunks_generation_triggered.inc();
server_emitter.emit(ServerEvent::ChunkRequest(entity, key))
},
}
} else {
network_metrics.chunks_request_dropped.inc();
}
},
ClientGeneral::UnlockSkill(skill) => {
stats
.get_mut(entity)
@ -157,7 +126,7 @@ impl Sys {
.get_mut(entity)
.map(|mut s| s.skill_set.unlock_skill_group(skill_group_kind));
},
_ => unreachable!("not a client_in_game msg"),
_ => tracing::error!("not a client_in_game msg"),
}
Ok(())
}
@ -172,7 +141,6 @@ impl<'a> System<'a> for Sys {
Entities<'a>,
Read<'a, EventBus<ServerEvent>>,
ReadExpect<'a, TerrainGrid>,
ReadExpect<'a, NetworkRequestMetrics>,
ReadStorage<'a, CanBuild>,
ReadStorage<'a, ForceUpdate>,
WriteStorage<'a, Stats>,
@ -197,7 +165,6 @@ impl<'a> System<'a> for Sys {
entities,
server_event_bus,
terrain,
network_metrics,
can_build,
force_updates,
mut stats,
@ -224,7 +191,6 @@ impl<'a> System<'a> for Sys {
client,
&mut maybe_presence.as_deref_mut(),
&terrain,
&network_metrics,
&can_build,
&force_updates,
&mut stats,

View File

@ -3,6 +3,7 @@ pub mod general;
pub mod in_game;
pub mod ping;
pub mod register;
pub mod terrain;
use crate::client::Client;
use serde::de::DeserializeOwned;

View File

@ -0,0 +1,95 @@
use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence};
use common::{
comp::Pos,
event::{EventBus, ServerEvent},
terrain::{TerrainChunkSize, TerrainGrid},
vol::RectVolSize,
};
use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{ClientGeneral, ServerGeneral};
use specs::{Entities, Join, Read, ReadExpect, ReadStorage};
use tracing::{debug, trace};
/// This system will handle new messages from clients
#[derive(Default)]
pub struct Sys;
impl<'a> System<'a> for Sys {
#[allow(clippy::type_complexity)]
type SystemData = (
Entities<'a>,
Read<'a, EventBus<ServerEvent>>,
ReadExpect<'a, TerrainGrid>,
ReadExpect<'a, NetworkRequestMetrics>,
ReadStorage<'a, Pos>,
ReadStorage<'a, Presence>,
ReadStorage<'a, Client>,
);
const NAME: &'static str = "msg::in_game";
const ORIGIN: Origin = Origin::Server;
const PHASE: Phase = Phase::Create;
fn run(
_job: &mut Job<Self>,
(
entities,
server_event_bus,
terrain,
network_metrics,
positions,
presences,
clients,
): Self::SystemData,
) {
let mut server_emitter = server_event_bus.emitter();
for (entity, client, maybe_presence) in (&entities, &clients, (&presences).maybe()).join() {
let _ = super::try_recv_all(client, 5, |client, msg| {
let presence = match maybe_presence {
Some(g) => g,
None => {
debug!(?entity, "client is not in_game, ignoring msg");
trace!(?msg, "ignored msg content");
if matches!(msg, ClientGeneral::TerrainChunkRequest { .. }) {
network_metrics.chunks_request_dropped.inc();
}
return Ok(());
},
};
match msg {
ClientGeneral::TerrainChunkRequest { key } => {
let in_vd = if let Some(pos) = positions.get(entity) {
pos.0.xy().map(|e| e as f64).distance_squared(
key.map(|e| e as f64 + 0.5)
* TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
) < ((presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt())
* TerrainChunkSize::RECT_SIZE.x as f64)
.powi(2)
} else {
true
};
if in_vd {
match terrain.get_key(key) {
Some(chunk) => {
network_metrics.chunks_served_from_memory.inc();
client.send(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
})?
},
None => {
network_metrics.chunks_generation_triggered.inc();
server_emitter.emit(ServerEvent::ChunkRequest(entity, key))
},
}
} else {
network_metrics.chunks_request_dropped.inc();
}
},
_ => tracing::error!("not a client_terrain msg"),
}
Ok(())
});
}
}
}