From 378b7699993b1b33aee4dd68e457e8d9fbe406e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Sat, 17 Oct 2020 01:42:19 +0200 Subject: [PATCH] working on implementing seperated msg system. Does compile, but only reach connection till character screen. Full play not yet possible --- server/src/sys/message.rs | 151 +++++++++-------- server/src/sys/mod.rs | 1 + server/src/sys/msg/in_game.rs | 304 ++++++++++++++++++++++++++++++++++ server/src/sys/msg/mod.rs | 1 + 4 files changed, 386 insertions(+), 71 deletions(-) create mode 100644 server/src/sys/msg/in_game.rs create mode 100644 server/src/sys/msg/mod.rs diff --git a/server/src/sys/message.rs b/server/src/sys/message.rs index 78183af705..39ef2537c2 100644 --- a/server/src/sys/message.rs +++ b/server/src/sys/message.rs @@ -88,6 +88,7 @@ impl Sys { Ok(()) } + /* #[allow(clippy::too_many_arguments)] fn handle_client_in_game_msg( server_emitter: &mut common::event::Emitter<'_, ServerEvent>, @@ -242,6 +243,7 @@ impl Sys { } Ok(()) } + */ #[allow(clippy::too_many_arguments)] fn handle_client_character_screen_msg( @@ -454,44 +456,81 @@ impl Sys { login_provider: &mut WriteExpect<'_, LoginProvider>, block_changes: &mut Write<'_, BlockChange>, admins: &mut WriteStorage<'_, Admin>, - positions: &mut WriteStorage<'_, Pos>, - velocities: &mut WriteStorage<'_, Vel>, - orientations: &mut WriteStorage<'_, Ori>, + //positions: &mut WriteStorage<'_, Pos>, + //velocities: &mut WriteStorage<'_, Vel>, + //orientations: &mut WriteStorage<'_, Ori>, players: &mut WriteStorage<'_, Player>, + general_stream: &mut GeneralStream, + ping_stream: &mut PingStream, + register_stream: &mut RegisterStream, + character_screen_stream: &mut CharacterScreenStream, + //in_game_stream: &mut InGameStream, controllers: &mut WriteStorage<'_, Controller>, settings: &Read<'_, Settings>, editable_settings: &ReadExpect<'_, EditableSettings>, alias_validator: &ReadExpect<'_, AliasValidator>, ) -> Result<(), crate::error::Error> { loop { - /* - let q1 = Client::internal_recv(&mut b1, &mut client.general_stream); - let q2 = Client::internal_recv(&mut b2, &mut client.in_game_stream); - let q3 = Client::internal_recv(&mut b3, &mut client.character_screen_stream); - let q4 = Client::internal_recv(&mut b4, &mut client.ping_stream); - let q5 = Client::internal_recv(&mut b5, &mut client.register_stream); - - let (m1, m2, m3, m4, m5) = select!( - msg = q1.fuse() => (Some(msg), None, None, None, None), - msg = q2.fuse() => (None, Some(msg), None, None, None), - msg = q3.fuse() => (None, None, Some(msg), None, None), - msg = q4.fuse() => (None, None, None, Some(msg), None), - msg = q5.fuse() => (None, None, None, None,Some(msg)), - ); - *cnt += 1; - if let Some(msg) = m1 { - client.network_error |= b1; + if let Some(msg) = general_stream.0.try_recv()? { Self::handle_client_msg( server_emitter, new_chat_msgs, entity, client, + general_stream, player_metrics, uids, chat_modes, - msg?, + msg, )?; + *cnt += 1; + continue; } + + if let Some(msg) = ping_stream.0.try_recv()? { + Self::handle_ping_msg(client, ping_stream, msg)?; + *cnt += 1; + continue; + } + + if let Some(msg) = register_stream.0.try_recv()? { + Self::handle_register_msg( + player_list, + new_players, + entity, + client, + register_stream, + player_metrics, + login_provider, + admins, + players, + editable_settings, + msg, + )?; + *cnt += 1; + continue; + } + + if let Some(msg) = character_screen_stream.0.try_recv()? { + Self::handle_client_character_screen_msg( + server_emitter, + new_chat_msgs, + entity, + client, + character_screen_stream, + character_loader, + uids, + players, + editable_settings, + alias_validator, + msg, + )?; + *cnt += 1; + continue; + } + + break Ok(()) + /* if let Some(msg) = m2 { client.network_error |= b2; Self::handle_client_in_game_msg( @@ -512,40 +551,6 @@ impl Sys { settings, msg?, )?; - } - if let Some(msg) = m3 { - client.network_error |= b3; - Self::handle_client_character_screen_msg( - server_emitter, - new_chat_msgs, - entity, - client, - character_loader, - uids, - players, - editable_settings, - alias_validator, - msg?, - )?; - } - if let Some(msg) = m4 { - client.network_error |= b4; - Self::handle_ping_msg(client, msg?)?; - } - if let Some(msg) = m5 { - client.network_error |= b5; - Self::handle_register_msg( - player_list, - new_players, - entity, - client, - player_metrics, - login_provider, - admins, - players, - editable_settings, - msg?, - )?; }*/ } } @@ -572,15 +577,15 @@ impl<'a> System<'a> for Sys { WriteExpect<'a, LoginProvider>, Write<'a, BlockChange>, WriteStorage<'a, Admin>, - WriteStorage<'a, Pos>, - WriteStorage<'a, Vel>, - WriteStorage<'a, Ori>, + //WriteStorage<'a, Pos>, + //WriteStorage<'a, Vel>, + //WriteStorage<'a, Ori>, WriteStorage<'a, Player>, WriteStorage<'a, Client>, WriteStorage<'a, GeneralStream>, - //WriteStorage<'a, PingStream>, - //WriteStorage<'a, RegisterStream>, - //WriteStorage<'a, CharacterScreenStream>, + WriteStorage<'a, PingStream>, + WriteStorage<'a, RegisterStream>, + WriteStorage<'a, CharacterScreenStream>, //WriteStorage<'a, InGameStream>, WriteStorage<'a, Controller>, Read<'a, Settings>, @@ -610,15 +615,15 @@ impl<'a> System<'a> for Sys { mut accounts, mut block_changes, mut admins, - mut positions, - mut velocities, - mut orientations, + //mut positions, + //mut velocities, + //mut orientations, mut players, mut clients, mut general_streams, - //mut ping_streams, - //mut register_streams, - //mut character_screen_streams, + mut ping_streams, + mut register_streams, + mut character_screen_streams, //mut in_game_streams, mut controllers, settings, @@ -651,7 +656,7 @@ impl<'a> System<'a> for Sys { // List of new players to update player lists of all clients. let mut new_players = Vec::new(); - for (entity, client) in (&entities, &mut clients).join() { + for (entity, client, general_stream, ping_stream, register_stream, character_screen_stream) in (&entities, &mut clients, &mut general_streams, &mut ping_streams, &mut register_streams, &mut character_screen_streams).join() { let mut cnt = 0; let network_err: Result<(), crate::error::Error> = block_on(async { @@ -676,10 +681,14 @@ impl<'a> System<'a> for Sys { &mut accounts, &mut block_changes, &mut admins, - &mut positions, - &mut velocities, - &mut orientations, + //&mut positions, + //&mut velocities, + //&mut orientations, &mut players, + general_stream, + ping_stream, + register_stream, + character_screen_stream, &mut controllers, &settings, &editable_settings, diff --git a/server/src/sys/mod.rs b/server/src/sys/mod.rs index 6b98fc9edb..6dba8a0f3c 100644 --- a/server/src/sys/mod.rs +++ b/server/src/sys/mod.rs @@ -1,5 +1,6 @@ pub mod entity_sync; pub mod invite_timeout; +pub mod msg; pub mod message; pub mod object; pub mod persistence; diff --git a/server/src/sys/msg/in_game.rs b/server/src/sys/msg/in_game.rs new file mode 100644 index 0000000000..0090f7603b --- /dev/null +++ b/server/src/sys/msg/in_game.rs @@ -0,0 +1,304 @@ +use super::super::SysTimer; +use crate::{ + alias_validator::AliasValidator, + client::{ + Client, InGameStream, + }, + login_provider::LoginProvider, + metrics::{NetworkRequestMetrics, PlayerMetrics}, + persistence::character_loader::CharacterLoader, + EditableSettings, Settings, +}; +use common::{ + comp::{ + Admin, CanBuild, ChatMode, ControlEvent, Controller, ForceUpdate, Ori, Player, + Pos, Stats, Vel, + }, + event::{EventBus, ServerEvent}, + msg::{ + ClientGeneral, ClientInGame, + ServerGeneral, + }, + span, + state::{BlockChange, Time}, + sync::Uid, + terrain::{TerrainChunkSize, TerrainGrid}, + vol::{ReadVol, RectVolSize}, +}; +use specs::{ + Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage, +}; +use tracing::{debug, trace}; + +impl Sys { + #[allow(clippy::too_many_arguments)] + fn handle_client_in_game_msg( + server_emitter: &mut common::event::Emitter<'_, ServerEvent>, + entity: specs::Entity, + client: &mut Client, + in_game_stream: &mut InGameStream, + terrain: &ReadExpect<'_, TerrainGrid>, + network_metrics: &ReadExpect<'_, NetworkRequestMetrics>, + can_build: &ReadStorage<'_, CanBuild>, + force_updates: &ReadStorage<'_, ForceUpdate>, + stats: &mut WriteStorage<'_, Stats>, + block_changes: &mut Write<'_, BlockChange>, + positions: &mut WriteStorage<'_, Pos>, + velocities: &mut WriteStorage<'_, Vel>, + orientations: &mut WriteStorage<'_, Ori>, + players: &mut WriteStorage<'_, Player>, + controllers: &mut WriteStorage<'_, Controller>, + settings: &Read<'_, Settings>, + msg: ClientGeneral, + ) -> Result<(), crate::error::Error> { + if client.in_game.is_none() { + debug!(?entity, "client is not in_game, ignoring msg"); + trace!(?msg, "ignored msg content"); + if matches!(msg, ClientGeneral::TerrainChunkRequest{ .. }) { + network_metrics.chunks_request_dropped.inc(); + } + return Ok(()); + } + match msg { + // Go back to registered state (char selection screen) + ClientGeneral::ExitInGame => { + client.in_game = None; + server_emitter.emit(ServerEvent::ExitIngame { entity }); + in_game_stream.0.send(ServerGeneral::ExitInGameSuccess)?; + }, + ClientGeneral::SetViewDistance(view_distance) => { + players.get_mut(entity).map(|player| { + player.view_distance = Some( + settings + .max_view_distance + .map(|max| view_distance.min(max)) + .unwrap_or(view_distance), + ) + }); + + //correct client if its VD is to high + if settings + .max_view_distance + .map(|max| view_distance > max) + .unwrap_or(false) + { + in_game_stream.0.send(ServerGeneral::SetViewDistance( + settings.max_view_distance.unwrap_or(0), + ))?; + } + }, + ClientGeneral::ControllerInputs(inputs) => { + if let Some(ClientInGame::Character) = client.in_game { + if let Some(controller) = controllers.get_mut(entity) { + controller.inputs.update_with_new(inputs); + } + } + }, + ClientGeneral::ControlEvent(event) => { + if let Some(ClientInGame::Character) = client.in_game { + // Skip respawn if client entity is alive + if let ControlEvent::Respawn = event { + if stats.get(entity).map_or(true, |s| !s.is_dead) { + //Todo: comment why return! + return Ok(()); + } + } + if let Some(controller) = controllers.get_mut(entity) { + controller.events.push(event); + } + } + }, + ClientGeneral::ControlAction(event) => { + if let Some(ClientInGame::Character) = client.in_game { + if let Some(controller) = controllers.get_mut(entity) { + controller.actions.push(event); + } + } + }, + ClientGeneral::PlayerPhysics { pos, vel, ori } => { + if let Some(ClientInGame::Character) = client.in_game { + if force_updates.get(entity).is_none() + && stats.get(entity).map_or(true, |s| !s.is_dead) + { + let _ = positions.insert(entity, pos); + let _ = velocities.insert(entity, vel); + let _ = orientations.insert(entity, ori); + } + } + }, + ClientGeneral::BreakBlock(pos) => { + if let Some(block) = can_build.get(entity).and_then(|_| terrain.get(pos).ok()) { + block_changes.set(pos, block.into_vacant()); + } + }, + ClientGeneral::PlaceBlock(pos, block) => { + if can_build.get(entity).is_some() { + block_changes.try_set(pos, block); + } + }, + ClientGeneral::TerrainChunkRequest { key } => { + let in_vd = if let (Some(view_distance), Some(pos)) = ( + players.get(entity).and_then(|p| p.view_distance), + positions.get(entity), + ) { + pos.0.xy().map(|e| e as f64).distance( + key.map(|e| e as f64 + 0.5) * TerrainChunkSize::RECT_SIZE.map(|e| e as f64), + ) < (view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt()) + * TerrainChunkSize::RECT_SIZE.x as f64 + } else { + true + }; + if in_vd { + match terrain.get_key(key) { + Some(chunk) => { + network_metrics.chunks_served_from_memory.inc(); + in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate { + key, + chunk: Ok(Box::new(chunk.clone())), + })? + }, + None => { + network_metrics.chunks_generation_triggered.inc(); + server_emitter.emit(ServerEvent::ChunkRequest(entity, key)) + }, + } + } else { + network_metrics.chunks_request_dropped.inc(); + } + }, + ClientGeneral::UnlockSkill(skill) => { + stats + .get_mut(entity) + .map(|s| s.skill_set.unlock_skill(skill)); + }, + ClientGeneral::RefundSkill(skill) => { + stats + .get_mut(entity) + .map(|s| s.skill_set.refund_skill(skill)); + }, + ClientGeneral::UnlockSkillGroup(skill_group_type) => { + stats + .get_mut(entity) + .map(|s| s.skill_set.unlock_skill_group(skill_group_type)); + }, + _ => unreachable!("not a client_in_game msg"), + } + Ok(()) + } +} + +/// This system will handle new messages from clients +pub struct Sys; +impl<'a> System<'a> for Sys { + #[allow(clippy::type_complexity)] // TODO: Pending review in #587 + type SystemData = ( + Entities<'a>, + Read<'a, EventBus>, + Read<'a, Time>, + ReadExpect<'a, CharacterLoader>, + ReadExpect<'a, TerrainGrid>, + ReadExpect<'a, NetworkRequestMetrics>, + ReadExpect<'a, PlayerMetrics>, + Write<'a, SysTimer>, + ReadStorage<'a, Uid>, + ReadStorage<'a, CanBuild>, + ReadStorage<'a, ForceUpdate>, + WriteStorage<'a, Stats>, + ReadStorage<'a, ChatMode>, + WriteExpect<'a, LoginProvider>, + Write<'a, BlockChange>, + WriteStorage<'a, Admin>, + WriteStorage<'a, Pos>, + WriteStorage<'a, Vel>, + WriteStorage<'a, Ori>, + WriteStorage<'a, Player>, + WriteStorage<'a, Client>, + WriteStorage<'a, InGameStream>, + WriteStorage<'a, Controller>, + Read<'a, Settings>, + ReadExpect<'a, EditableSettings>, + ReadExpect<'a, AliasValidator>, + ); + + #[allow(clippy::match_ref_pats)] // TODO: Pending review in #587 + #[allow(clippy::single_char_pattern)] // TODO: Pending review in #587 + #[allow(clippy::single_match)] // TODO: Pending review in #587 + fn run( + &mut self, + ( + entities, + server_event_bus, + time, + character_loader, + terrain, + network_metrics, + player_metrics, + mut timer, + uids, + can_build, + force_updates, + mut stats, + chat_modes, + mut accounts, + mut block_changes, + mut admins, + mut positions, + mut velocities, + mut orientations, + mut players, + mut clients, + mut in_game_streams, + mut controllers, + settings, + editable_settings, + alias_validator, + ): Self::SystemData, + ) { + span!(_guard, "run", "msg::in_game::Sys::run"); + timer.start(); + + let mut server_emitter = server_event_bus.emitter(); + + for (entity, client, mut in_game_stream) in (&entities, &mut clients, &mut in_game_streams).join() { + let mut cnt = 0; + + let network_err: Result<(), crate::error::Error> = { + loop { + let msg = match in_game_stream.0.try_recv() { + Ok(Some(msg)) => msg, + Ok(None) => break Ok(()), + Err(e) => break Err(e.into()), + }; + if let Err(e) = Self::handle_client_in_game_msg( + &mut server_emitter, + entity, + client, + &mut in_game_stream, + &terrain, + &network_metrics, + &can_build, + &force_updates, + &mut stats, + &mut block_changes, + &mut positions, + &mut velocities, + &mut orientations, + &mut players, + &mut controllers, + &settings, + msg, + ) { + break Err(e); + } + cnt += 1; + } + }; + + if cnt > 0 { // Update client ping. + client.last_ping = time.0 + } + } + + timer.end() + } +} diff --git a/server/src/sys/msg/mod.rs b/server/src/sys/msg/mod.rs new file mode 100644 index 0000000000..ee8449ae96 --- /dev/null +++ b/server/src/sys/msg/mod.rs @@ -0,0 +1 @@ +mod in_game; \ No newline at end of file