From 3cbde527351814125dbaf9950a12d6bf86158595 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Fri, 16 Oct 2020 21:37:28 +0200 Subject: [PATCH] remote all streams from `Client` and put it directly in the ecs system. This commit does not run compile as the message sys now would requiere 30 imput parameters which is over the max of 26 :/ --- server/src/client.rs | 126 +++---------- server/src/cmd.rs | 16 +- server/src/connection_handler.rs | 39 +++-- server/src/events/entity_manipulation.rs | 8 +- server/src/events/group_manip.rs | 187 +++++++++++--------- server/src/events/interaction.rs | 214 +++++++++++++++-------- server/src/events/inventory_manip.rs | 15 +- server/src/events/player.rs | 53 +++++- server/src/lib.rs | 93 +++++++++- server/src/state_ext.rs | 97 ++++++---- server/src/sys/entity_sync.rs | 113 +++++++----- server/src/sys/invite_timeout.rs | 17 +- server/src/sys/message.rs | 98 ++++++----- server/src/sys/subscription.rs | 35 ++-- server/src/sys/terrain.rs | 18 +- server/src/sys/terrain_sync.rs | 15 +- server/src/sys/waypoint.rs | 24 ++- 17 files changed, 707 insertions(+), 461 deletions(-) diff --git a/server/src/client.rs b/server/src/client.rs index e706689d80..eec6385676 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,24 +1,39 @@ -use crate::error::Error; -use common::msg::{ClientInGame, ClientType, ServerGeneral, ServerMsg}; +use common::msg::{ClientInGame, ClientType}; use hashbrown::HashSet; use network::{Participant, Stream}; -use serde::{de::DeserializeOwned, Serialize}; use specs::{Component, FlaggedStorage}; use specs_idvs::IdvStorage; -use tracing::debug; use vek::*; +// Streams +// we ignore errors on send, and do unified error handling in recv +pub struct GeneralStream(pub Stream); +pub struct PingStream(pub Stream); +pub struct RegisterStream(pub Stream); +pub struct CharacterScreenStream(pub Stream); +pub struct InGameStream(pub Stream); + +impl Component for GeneralStream { + type Storage = FlaggedStorage>; +} +impl Component for PingStream { + type Storage = FlaggedStorage>; +} +impl Component for RegisterStream { + type Storage = FlaggedStorage>; +} +impl Component for CharacterScreenStream { + type Storage = FlaggedStorage>; +} +impl Component for InGameStream { + type Storage = FlaggedStorage>; +} + pub struct Client { pub registered: bool, pub client_type: ClientType, pub in_game: Option, pub participant: Option, - pub general_stream: Stream, - pub ping_stream: Stream, - pub register_stream: Stream, - pub character_screen_stream: Stream, - pub in_game_stream: Stream, - pub network_error: bool, pub last_ping: f64, pub login_msg_sent: bool, } @@ -27,97 +42,6 @@ impl Component for Client { type Storage = FlaggedStorage>; } -impl Client { - fn internal_send(err: &mut bool, s: &mut Stream, msg: M) { - if !*err { - if let Err(e) = s.send(msg) { - debug!(?e, "got a network error with client"); - *err = true; - } - } - } - - /* - fn internal_send_raw(b: &AtomicBool, s: &mut Stream, msg: Arc) { - if !b.load(Ordering::Relaxed) { - if let Err(e) = s.send_raw(msg) { - debug!(?e, "got a network error with client"); - b.store(true, Ordering::Relaxed); - } - } - } - */ - - pub fn send_msg(&mut self, msg: S) - where - S: Into, - { - const ERR: &str = - "Don't do that. Sending these messages is only done ONCE at connect and not by this fn"; - match msg.into() { - ServerMsg::Info(_) => panic!(ERR), - ServerMsg::Init(_) => panic!(ERR), - ServerMsg::RegisterAnswer(msg) => { - Self::internal_send(&mut self.network_error, &mut self.register_stream, &msg) - }, - ServerMsg::General(msg) => { - let stream = match &msg { - //Character Screen related - ServerGeneral::CharacterDataLoadError(_) - | ServerGeneral::CharacterListUpdate(_) - | ServerGeneral::CharacterActionError(_) - | ServerGeneral::CharacterSuccess => &mut self.character_screen_stream, - //Ingame related - ServerGeneral::GroupUpdate(_) - | ServerGeneral::GroupInvite { .. } - | ServerGeneral::InvitePending(_) - | ServerGeneral::InviteComplete { .. } - | ServerGeneral::ExitInGameSuccess - | ServerGeneral::InventoryUpdate(_, _) - | ServerGeneral::TerrainChunkUpdate { .. } - | ServerGeneral::TerrainBlockUpdates(_) - | ServerGeneral::SetViewDistance(_) - | ServerGeneral::Outcomes(_) - | ServerGeneral::Knockback(_) => &mut self.in_game_stream, - // Always possible - ServerGeneral::PlayerListUpdate(_) - | ServerGeneral::ChatMsg(_) - | ServerGeneral::SetPlayerEntity(_) - | ServerGeneral::TimeOfDay(_) - | ServerGeneral::EntitySync(_) - | ServerGeneral::CompSync(_) - | ServerGeneral::CreateEntity(_) - | ServerGeneral::DeleteEntity(_) - | ServerGeneral::Disconnect(_) - | ServerGeneral::Notification(_) => &mut self.general_stream, - }; - Self::internal_send(&mut self.network_error, stream, &msg) - }, - ServerMsg::Ping(msg) => { - Self::internal_send(&mut self.network_error, &mut self.ping_stream, &msg) - }, - }; - } - - pub async fn internal_recv( - err: &mut bool, - s: &mut Stream, - ) -> Result { - if !*err { - match s.recv().await { - Ok(r) => Ok(r), - Err(e) => { - debug!(?e, "got a network error with client while recv"); - *err = true; - Err(Error::StreamErr(e)) - }, - } - } else { - Err(Error::StreamErr(network::StreamError::StreamClosed)) - } - } -} - // Distance from fuzzy_chunk before snapping to current chunk pub const CHUNK_FUZZ: u32 = 2; // Distance out of the range of a region before removing it from subscriptions diff --git a/server/src/cmd.rs b/server/src/cmd.rs index e747e725ad..462d6c3e05 100644 --- a/server/src/cmd.rs +++ b/server/src/cmd.rs @@ -3,7 +3,6 @@ //! `CHAT_COMMANDS` and provide a handler function. use crate::{ - client::Client, settings::{BanRecord, EditableSetting}, Server, StateExt, }; @@ -27,7 +26,7 @@ use std::convert::TryFrom; use vek::*; use world::util::Sampler; -use crate::login_provider::LoginProvider; +use crate::{client::InGameStream, login_provider::LoginProvider}; use scan_fmt::{scan_fmt, scan_fmt_some}; use tracing::error; @@ -650,7 +649,8 @@ fn handle_spawn( // Add to group system if a pet if matches!(alignment, comp::Alignment::Owned { .. }) { let state = server.state(); - let mut clients = state.ecs().write_storage::(); + let mut in_game_streams = + state.ecs().write_storage::(); let uids = state.ecs().read_storage::(); let mut group_manager = state.ecs().write_resource::(); @@ -662,15 +662,15 @@ fn handle_spawn( &state.ecs().read_storage(), &uids, &mut |entity, group_change| { - clients + in_game_streams .get_mut(entity) - .and_then(|c| { + .and_then(|s| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, c)) + .map(|g| (g, s)) }) - .map(|(g, c)| { - c.send_msg(ServerGeneral::GroupUpdate(g)) + .map(|(g, s)| { + let _ = s.0.send(ServerGeneral::GroupUpdate(g)); }); }, ); diff --git a/server/src/connection_handler.rs b/server/src/connection_handler.rs index 2f79f1795d..82c4f191f9 100644 --- a/server/src/connection_handler.rs +++ b/server/src/connection_handler.rs @@ -1,4 +1,7 @@ -use crate::{Client, ClientType, ServerInfo}; +use crate::{ + CharacterScreenStream, Client, ClientType, GeneralStream, InGameStream, PingStream, + RegisterStream, ServerInfo, +}; use crossbeam::{bounded, unbounded, Receiver, Sender}; use futures_channel::oneshot; use futures_executor::block_on; @@ -13,10 +16,19 @@ pub(crate) struct ServerInfoPacket { pub time: f64, } +pub(crate) struct ClientPackage { + pub client: Client, + pub general: GeneralStream, + pub ping: PingStream, + pub register: RegisterStream, + pub character: CharacterScreenStream, + pub in_game: InGameStream, +} + pub(crate) struct ConnectionHandler { _network: Arc, thread_handle: Option>, - pub client_receiver: Receiver, + pub client_receiver: Receiver, pub info_requester_receiver: Receiver>, stop_sender: Option>, } @@ -31,7 +43,7 @@ impl ConnectionHandler { let network_clone = Arc::clone(&network); let (stop_sender, stop_receiver) = oneshot::channel(); - let (client_sender, client_receiver) = unbounded::(); + let (client_sender, client_receiver) = unbounded::(); let (info_requester_sender, info_requester_receiver) = bounded::>(1); @@ -55,7 +67,7 @@ impl ConnectionHandler { async fn work( network: Arc, - client_sender: Sender, + client_sender: Sender, info_requester_sender: Sender>, stop_receiver: oneshot::Receiver<()>, ) { @@ -92,7 +104,7 @@ impl ConnectionHandler { async fn init_participant( participant: Participant, - client_sender: Sender, + client_sender: Sender, info_requester_sender: Sender>, ) -> Result<(), Box> { debug!("New Participant connected to the server"); @@ -129,17 +141,20 @@ impl ConnectionHandler { client_type, in_game: None, participant: Some(participant), - general_stream, - ping_stream, - register_stream, - in_game_stream, - character_screen_stream, - network_error: false, last_ping: server_data.time, login_msg_sent: false, }; - client_sender.send(client)?; + let package = ClientPackage { + client, + general: GeneralStream(general_stream), + ping: PingStream(ping_stream), + register: RegisterStream(register_stream), + character: CharacterScreenStream(character_screen_stream), + in_game: InGameStream(in_game_stream), + }; + + client_sender.send(package)?; Ok(()) } } diff --git a/server/src/events/entity_manipulation.rs b/server/src/events/entity_manipulation.rs index cb175c4c20..1e3dd4af62 100644 --- a/server/src/events/entity_manipulation.rs +++ b/server/src/events/entity_manipulation.rs @@ -1,5 +1,5 @@ use crate::{ - client::Client, + client::{Client, InGameStream}, comp::{biped_large, quadruped_medium, quadruped_small}, Server, SpawnPoint, StateExt, }; @@ -42,9 +42,9 @@ pub fn handle_knockback(server: &Server, entity: EcsEntity, impulse: Vec3) if let Some(vel) = velocities.get_mut(entity) { vel.0 = impulse; } - let mut clients = state.ecs().write_storage::(); - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ServerGeneral::Knockback(impulse)); + let mut in_game_streams = state.ecs().write_storage::(); + if let Some(in_game_stream) = in_game_streams.get_mut(entity) { + let _ = in_game_stream.0.send(ServerGeneral::Knockback(impulse)); } } diff --git a/server/src/events/group_manip.rs b/server/src/events/group_manip.rs index 78c0df462b..8d8e00d150 100644 --- a/server/src/events/group_manip.rs +++ b/server/src/events/group_manip.rs @@ -1,4 +1,7 @@ -use crate::{client::Client, Server}; +use crate::{ + client::{GeneralStream, InGameStream}, + Server, +}; use common::{ comp::{ self, @@ -25,20 +28,21 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani match manip { GroupManip::Invite(uid) => { - let mut clients = state.ecs().write_storage::(); - let invitee = match state.ecs().entity_from_uid(uid.into()) { - Some(t) => t, - None => { - // Inform of failure - if let Some(client) = clients.get_mut(entity) { - client.send_msg( - ChatType::Meta - .server_msg("Invite failed, target does not exist.".to_owned()), - ); - } - return; - }, - }; + let mut general_streams = state.ecs().write_storage::(); + let invitee = + match state.ecs().entity_from_uid(uid.into()) { + Some(t) => t, + None => { + // Inform of failure + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = + general_stream.0.send(ChatType::Meta.server_msg( + "Invite failed, target does not exist.".to_owned(), + )); + } + return; + }, + }; let uids = state.ecs().read_storage::(); @@ -62,8 +66,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }); if already_in_same_group { // Inform of failure - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ChatType::Meta.server_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send(ChatType::Meta.server_msg( "Invite failed, can't invite someone already in your group".to_owned(), )); } @@ -92,8 +96,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani >= max_group_size as usize; if group_size_limit_reached { // Inform inviter that they have reached the group size limit - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send( ChatType::Meta.server_msg( "Invite failed, pending invites plus current group size have reached \ the group size limit" @@ -109,11 +113,13 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if invites.contains(invitee) { // Inform inviter that there is already an invite - if let Some(client) = clients.get_mut(entity) { - client.send_msg( - ChatType::Meta - .server_msg("This player already has a pending invite.".to_owned()), - ); + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = + general_stream + .0 + .send(ChatType::Meta.server_msg( + "This player already has a pending invite.".to_owned(), + )); } return; } @@ -150,33 +156,35 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani } }; + let mut in_game_streams = state.ecs().write_storage::(); + // If client comp - if let (Some(client), Some(inviter)) = - (clients.get_mut(invitee), uids.get(entity).copied()) + if let (Some(in_game_stream), Some(inviter)) = + (in_game_streams.get_mut(invitee), uids.get(entity).copied()) { if send_invite() { - client.send_msg(ServerGeneral::GroupInvite { + let _ = in_game_stream.0.send(ServerGeneral::GroupInvite { inviter, timeout: PRESENTED_INVITE_TIMEOUT_DUR, }); } } else if agents.contains(invitee) { send_invite(); - } else if let Some(client) = clients.get_mut(entity) { - client.send_msg( + } else if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send( ChatType::Meta.server_msg("Can't invite, not a player or npc".to_owned()), ); } // Notify inviter that the invite is pending if invite_sent { - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ServerGeneral::InvitePending(uid)); + if let Some(in_game_stream) = in_game_streams.get_mut(entity) { + let _ = in_game_stream.0.send(ServerGeneral::InvitePending(uid)); } } }, GroupManip::Accept => { - let mut clients = state.ecs().write_storage::(); + let mut in_game_streams = state.ecs().write_storage::(); let uids = state.ecs().read_storage::(); let mut invites = state.ecs().write_storage::(); if let Some(inviter) = invites.remove(entity).and_then(|invite| { @@ -193,13 +201,13 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani Some(inviter) }) { - if let (Some(client), Some(target)) = - (clients.get_mut(inviter), uids.get(entity).copied()) + if let (Some(in_game_stream), Some(target)) = + (in_game_streams.get_mut(inviter), uids.get(entity).copied()) { - client.send_msg(ServerGeneral::InviteComplete { + let _ = in_game_stream.0.send(ServerGeneral::InviteComplete { target, answer: InviteAnswer::Accepted, - }) + }); } let mut group_manager = state.ecs().write_resource::(); group_manager.add_group_member( @@ -210,20 +218,20 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &state.ecs().read_storage(), &uids, |entity, group_change| { - clients + in_game_streams .get_mut(entity) - .and_then(|c| { + .and_then(|s| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, c)) + .map(|g| (g, s)) }) - .map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g))); + .map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g))); }, ); } }, GroupManip::Decline => { - let mut clients = state.ecs().write_storage::(); + let mut in_game_streams = state.ecs().write_storage::(); let uids = state.ecs().read_storage::(); let mut invites = state.ecs().write_storage::(); if let Some(inviter) = invites.remove(entity).and_then(|invite| { @@ -241,18 +249,18 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani Some(inviter) }) { // Inform inviter of rejection - if let (Some(client), Some(target)) = - (clients.get_mut(inviter), uids.get(entity).copied()) + if let (Some(in_game_stream), Some(target)) = + (in_game_streams.get_mut(inviter), uids.get(entity).copied()) { - client.send_msg(ServerGeneral::InviteComplete { + let _ = in_game_stream.0.send(ServerGeneral::InviteComplete { target, answer: InviteAnswer::Declined, - }) + }); } } }, GroupManip::Leave => { - let mut clients = state.ecs().write_storage::(); + let mut in_game_streams = state.ecs().write_storage::(); let uids = state.ecs().read_storage::(); let mut group_manager = state.ecs().write_resource::(); group_manager.leave_group( @@ -262,19 +270,19 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &uids, &state.ecs().entities(), &mut |entity, group_change| { - clients + in_game_streams .get_mut(entity) - .and_then(|c| { + .and_then(|s| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, c)) + .map(|g| (g, s)) }) - .map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g))); + .map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g))); }, ); }, GroupManip::Kick(uid) => { - let mut clients = state.ecs().write_storage::(); + let mut general_streams = state.ecs().write_storage::(); let uids = state.ecs().read_storage::(); let alignments = state.ecs().read_storage::(); @@ -282,8 +290,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani Some(t) => t, None => { // Inform of failure - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send( ChatType::Meta .server_msg("Kick failed, target does not exist.".to_owned()), ); @@ -295,8 +303,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Can't kick pet if matches!(alignments.get(target), Some(comp::Alignment::Owned(owner)) if uids.get(target).map_or(true, |u| u != owner)) { - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send( ChatType::Meta.server_msg("Kick failed, you can't kick pets.".to_owned()), ); } @@ -304,8 +312,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani } // Can't kick yourself if uids.get(entity).map_or(false, |u| *u == uid) { - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send( ChatType::Meta .server_msg("Kick failed, you can't kick yourself.".to_owned()), ); @@ -315,6 +323,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani let mut groups = state.ecs().write_storage::(); let mut group_manager = state.ecs().write_resource::(); + let mut in_game_streams = state.ecs().write_storage::(); // Make sure kicker is the group leader match groups .get(target) @@ -329,58 +338,59 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &uids, &state.ecs().entities(), &mut |entity, group_change| { - clients + in_game_streams .get_mut(entity) - .and_then(|c| { + .and_then(|s| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, c)) + .map(|g| (g, s)) }) - .map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g))); + .map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g))); }, ); // Tell them the have been kicked - if let Some(client) = clients.get_mut(target) { - client.send_msg( + if let Some(general_stream) = general_streams.get_mut(target) { + let _ = general_stream.0.send( ChatType::Meta .server_msg("You were removed from the group.".to_owned()), ); } // Tell kicker that they were succesful - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ChatType::Meta.server_msg("Player kicked.".to_owned())); + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream + .0 + .send(ChatType::Meta.server_msg("Player kicked.".to_owned())); } }, Some(_) => { // Inform kicker that they are not the leader - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ChatType::Meta.server_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send(ChatType::Meta.server_msg( "Kick failed: You are not the leader of the target's group.".to_owned(), )); } }, None => { // Inform kicker that the target is not in a group - if let Some(client) = clients.get_mut(entity) { - client.send_msg( - ChatType::Meta.server_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = + general_stream.0.send(ChatType::Meta.server_msg( "Kick failed: Your target is not in a group.".to_owned(), - ), - ); + )); } }, } }, GroupManip::AssignLeader(uid) => { - let mut clients = state.ecs().write_storage::(); + let mut general_streams = state.ecs().write_storage::(); let uids = state.ecs().read_storage::(); let target = match state.ecs().entity_from_uid(uid.into()) { Some(t) => t, None => { // Inform of failure - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ChatType::Meta.server_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send(ChatType::Meta.server_msg( "Leadership transfer failed, target does not exist".to_owned(), )); } @@ -389,6 +399,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }; let groups = state.ecs().read_storage::(); let mut group_manager = state.ecs().write_resource::(); + let mut in_game_streams = state.ecs().write_storage::(); // Make sure assigner is the group leader match groups .get(target) @@ -403,25 +414,25 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &state.ecs().read_storage(), &uids, |entity, group_change| { - clients + in_game_streams .get_mut(entity) - .and_then(|c| { + .and_then(|s| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, c)) + .map(|g| (g, s)) }) - .map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g))); + .map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g))); }, ); // Tell them they are the leader - if let Some(client) = clients.get_mut(target) { - client.send_msg( + if let Some(general_stream) = general_streams.get_mut(target) { + let _ = general_stream.0.send( ChatType::Meta.server_msg("You are the group leader now.".to_owned()), ); } // Tell the old leader that the transfer was succesful - if let Some(client) = clients.get_mut(target) { - client.send_msg( + if let Some(general_stream) = general_streams.get_mut(target) { + let _ = general_stream.0.send( ChatType::Meta .server_msg("You are no longer the group leader.".to_owned()), ); @@ -429,8 +440,9 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }, Some(_) => { // Inform transferer that they are not the leader - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + let mut general_streams = state.ecs().write_storage::(); + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send( ChatType::Meta.server_msg( "Transfer failed: You are not the leader of the target's group." .to_owned(), @@ -440,8 +452,9 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }, None => { // Inform transferer that the target is not in a group - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ChatType::Meta.server_msg( + let mut general_streams = state.ecs().write_storage::(); + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send(ChatType::Meta.server_msg( "Transfer failed: Your target is not in a group.".to_owned(), )); } diff --git a/server/src/events/interaction.rs b/server/src/events/interaction.rs index ff9f2e467a..21a27b01e2 100644 --- a/server/src/events/interaction.rs +++ b/server/src/events/interaction.rs @@ -1,5 +1,8 @@ use crate::{ - client::{Client, RegionSubscription}, + client::{ + CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegionSubscription, + RegisterStream, + }, Server, }; use common::{ @@ -120,80 +123,153 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) { // You can't possess other players let mut clients = ecs.write_storage::(); + let mut general_streams = ecs.write_storage::(); + let mut ping_streams = ecs.write_storage::(); + let mut register_streams = ecs.write_storage::(); + let mut character_screen_streams = ecs.write_storage::(); + let mut in_game_streams = ecs.write_storage::(); if clients.get_mut(possesse).is_none() { - if let Some(mut client) = clients.remove(possessor) { - client.send_msg(ServerGeneral::SetPlayerEntity(possesse_uid)); - clients - .insert(possesse, client) - .err() - .map(|e| error!(?e, "Error inserting client component during possession")); - // Put possess item into loadout - let mut loadouts = ecs.write_storage::(); - let loadout = loadouts - .entry(possesse) - .expect("Could not read loadouts component while possessing") - .or_insert(comp::Loadout::default()); + let client = match clients.remove(possessor) { + Some(c) => c, + None => return, + }; + let mut general_stream = match general_streams.remove(possessor) { + Some(c) => c, + None => return, + }; + let ping_stream = match ping_streams.remove(possessor) { + Some(c) => c, + None => return, + }; + let register_stream = match register_streams.remove(possessor) { + Some(c) => c, + None => return, + }; + let character_screen_stream = match character_screen_streams.remove(possessor) { + Some(c) => c, + None => return, + }; + let in_game_stream = match in_game_streams.remove(possessor) { + Some(c) => c, + None => return, + }; + let _ = general_stream + .0 + .send(ServerGeneral::SetPlayerEntity(possesse_uid)); + clients + .insert(possesse, client) + .err() + .map(|e| error!(?e, "Error inserting client component during possession")); + general_streams + .insert(possesse, general_stream) + .err() + .map(|e| { + error!( + ?e, + "Error inserting general_streams component during possession" + ) + }); + ping_streams.insert(possesse, ping_stream).err().map(|e| { + error!( + ?e, + "Error inserting ping_streams component during possession" + ) + }); + register_streams + .insert(possesse, register_stream) + .err() + .map(|e| { + error!( + ?e, + "Error inserting register_streams component during possession" + ) + }); + character_screen_streams + .insert(possesse, character_screen_stream) + .err() + .map(|e| { + error!( + ?e, + "Error inserting character_screen_streams component during possession" + ) + }); + in_game_streams + .insert(possesse, in_game_stream) + .err() + .map(|e| { + error!( + ?e, + "Error inserting in_game_streams component during possession" + ) + }); + // Put possess item into loadout + let mut loadouts = ecs.write_storage::(); + let loadout = loadouts + .entry(possesse) + .expect("Could not read loadouts component while possessing") + .or_insert(comp::Loadout::default()); - let item = comp::Item::new_from_asset_expect("common.items.debug.possess"); - if let item::ItemKind::Tool(tool) = item.kind() { - let mut abilities = tool.get_abilities(); - let mut ability_drain = abilities.drain(..); - let debug_item = comp::ItemConfig { - item, - ability1: ability_drain.next(), - ability2: ability_drain.next(), - ability3: ability_drain.next(), - block_ability: None, - dodge_ability: None, - }; - std::mem::swap(&mut loadout.active_item, &mut loadout.second_item); - loadout.active_item = Some(debug_item); - } + let item = comp::Item::new_from_asset_expect("common.items.debug.possess"); + if let item::ItemKind::Tool(tool) = item.kind() { + let mut abilities = tool.get_abilities(); + let mut ability_drain = abilities.drain(..); + let debug_item = comp::ItemConfig { + item, + ability1: ability_drain.next(), + ability2: ability_drain.next(), + ability3: ability_drain.next(), + block_ability: None, + dodge_ability: None, + }; + std::mem::swap(&mut loadout.active_item, &mut loadout.second_item); + loadout.active_item = Some(debug_item); + } - // Move player component - { - let mut players = ecs.write_storage::(); - if let Some(player) = players.remove(possessor) { - players.insert(possesse, player).err().map(|e| { - error!(?e, "Error inserting player component during possession") - }); - } + // Move player component + { + let mut players = ecs.write_storage::(); + if let Some(player) = players.remove(possessor) { + players + .insert(possesse, player) + .err() + .map(|e| error!(?e, "Error inserting player component during possession")); } - // Transfer region subscription - { - let mut subscriptions = ecs.write_storage::(); - if let Some(s) = subscriptions.remove(possessor) { - subscriptions.insert(possesse, s).err().map(|e| { - error!( - ?e, - "Error inserting subscription component during possession" - ) - }); - } + } + // Transfer region subscription + { + let mut subscriptions = ecs.write_storage::(); + if let Some(s) = subscriptions.remove(possessor) { + subscriptions.insert(possesse, s).err().map(|e| { + error!( + ?e, + "Error inserting subscription component during possession" + ) + }); } - // Remove will of the entity - ecs.write_storage::().remove(possesse); - // Reset controller of former shell - ecs.write_storage::() - .get_mut(possessor) - .map(|c| c.reset()); - // Transfer admin powers - { - let mut admins = ecs.write_storage::(); - if let Some(admin) = admins.remove(possessor) { - admins.insert(possesse, admin).err().map(|e| { - error!(?e, "Error inserting admin component during possession") - }); - } + } + // Remove will of the entity + ecs.write_storage::().remove(possesse); + // Reset controller of former shell + ecs.write_storage::() + .get_mut(possessor) + .map(|c| c.reset()); + // Transfer admin powers + { + let mut admins = ecs.write_storage::(); + if let Some(admin) = admins.remove(possessor) { + admins + .insert(possesse, admin) + .err() + .map(|e| error!(?e, "Error inserting admin component during possession")); } - // Transfer waypoint - { - let mut waypoints = ecs.write_storage::(); - if let Some(waypoint) = waypoints.remove(possessor) { - waypoints.insert(possesse, waypoint).err().map(|e| { - error!(?e, "Error inserting waypoint component during possession",) - }); - } + } + // Transfer waypoint + { + let mut waypoints = ecs.write_storage::(); + if let Some(waypoint) = waypoints.remove(possessor) { + waypoints.insert(possesse, waypoint).err().map(|e| { + error!(?e, "Error inserting waypoint component during possession",) + }); } } } diff --git a/server/src/events/inventory_manip.rs b/server/src/events/inventory_manip.rs index e82f292385..3bc648aea1 100644 --- a/server/src/events/inventory_manip.rs +++ b/server/src/events/inventory_manip.rs @@ -1,4 +1,4 @@ -use crate::{client::Client, Server, StateExt}; +use crate::{client::InGameStream, Server, StateExt}; use common::{ comp::{ self, item, @@ -279,7 +279,8 @@ pub fn handle_inventory(server: &mut Server, entity: EcsEntity, manip: comp::Inv .insert(tameable_entity, comp::Alignment::Owned(uid)); // Add to group system - let mut clients = state.ecs().write_storage::(); + let mut in_game_streams = + state.ecs().write_storage::(); let uids = state.ecs().read_storage::(); let mut group_manager = state .ecs() @@ -293,15 +294,15 @@ pub fn handle_inventory(server: &mut Server, entity: EcsEntity, manip: comp::Inv &state.ecs().read_storage(), &uids, &mut |entity, group_change| { - clients + in_game_streams .get_mut(entity) - .and_then(|c| { + .and_then(|s| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, c)) + .map(|g| (g, s)) }) - .map(|(g, c)| { - c.send_msg(ServerGeneral::GroupUpdate(g)) + .map(|(g, s)| { + s.0.send(ServerGeneral::GroupUpdate(g)) }); }, ); diff --git a/server/src/events/player.rs b/server/src/events/player.rs index e466a5b116..383de143bf 100644 --- a/server/src/events/player.rs +++ b/server/src/events/player.rs @@ -1,6 +1,12 @@ use super::Event; use crate::{ - client::Client, login_provider::LoginProvider, persistence, state_ext::StateExt, Server, + client::{ + CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegisterStream, + }, + login_provider::LoginProvider, + persistence, + state_ext::StateExt, + Server, }; use common::{ comp, @@ -17,26 +23,61 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) { span!(_guard, "handle_exit_ingame"); let state = server.state_mut(); - // Create new entity with just `Client`, `Uid`, and `Player` components - // Easier than checking and removing all other known components + // Create new entity with just `Client`, `Uid`, `Player`, and `...Stream` + // components Easier than checking and removing all other known components // Note: If other `ServerEvent`s are referring to this entity they will be // disrupted let maybe_client = state.ecs().write_storage::().remove(entity); let maybe_uid = state.read_component_copied::(entity); let maybe_player = state.ecs().write_storage::().remove(entity); let maybe_admin = state.ecs().write_storage::().remove(entity); + let maybe_general_stream = state.ecs().write_storage::().remove(entity); + let maybe_ping_stream = state.ecs().write_storage::().remove(entity); + let maybe_register_stream = state.ecs().write_storage::().remove(entity); + let maybe_character_screen_stream = state + .ecs() + .write_storage::() + .remove(entity); + let maybe_in_game_stream = state.ecs().write_storage::().remove(entity); let maybe_group = state .ecs() .write_storage::() .get(entity) .cloned(); - if let (Some(mut client), Some(uid), Some(player)) = (maybe_client, maybe_uid, maybe_player) { + if let ( + Some(mut client), + Some(uid), + Some(player), + Some(mut general_stream), + Some(ping_stream), + Some(register_stream), + Some(character_screen_stream), + Some(in_game_stream), + ) = ( + maybe_client, + maybe_uid, + maybe_player, + maybe_general_stream, + maybe_ping_stream, + maybe_register_stream, + maybe_character_screen_stream, + maybe_in_game_stream, + ) { // Tell client its request was successful client.in_game = None; - client.send_msg(ServerGeneral::ExitInGameSuccess); + let _ = general_stream.0.send(ServerGeneral::ExitInGameSuccess); - let entity_builder = state.ecs_mut().create_entity().with(client).with(player); + let entity_builder = state + .ecs_mut() + .create_entity() + .with(client) + .with(player) + .with(general_stream) + .with(ping_stream) + .with(register_stream) + .with(character_screen_stream) + .with(in_game_stream); // Preserve group component if present let entity_builder = match maybe_group { diff --git a/server/src/lib.rs b/server/src/lib.rs index e3acfa2f02..976c9f4d9c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -34,7 +34,10 @@ pub use crate::{ use crate::{ alias_validator::AliasValidator, chunk_generator::ChunkGenerator, - client::{Client, RegionSubscription}, + client::{ + CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegionSubscription, + RegisterStream, + }, cmd::ChatCommandExt, connection_handler::ConnectionHandler, data_dir::DataDir, @@ -180,6 +183,11 @@ impl Server { // Server-only components state.ecs_mut().register::(); state.ecs_mut().register::(); + state.ecs_mut().register::(); + state.ecs_mut().register::(); + state.ecs_mut().register::(); + state.ecs_mut().register::(); + state.ecs_mut().register::(); //Alias validator let banned_words_paths = &settings.banned_words_files; @@ -804,8 +812,8 @@ impl Server { }); } - while let Ok(data) = self.connection_handler.client_receiver.try_recv() { - let mut client = data; + while let Ok(mut package) = self.connection_handler.client_receiver.try_recv() { + let client = package.client; if self.settings().max_players <= self.state.ecs().read_storage::().join().count() @@ -814,7 +822,7 @@ impl Server { ?client.participant, "to many players, wont allow participant to connect" ); - client.register_stream.send(ServerInit::TooManyPlayers)?; + package.register.0.send(ServerInit::TooManyPlayers)?; continue; } @@ -823,6 +831,11 @@ impl Server { .ecs_mut() .create_entity_synced() .with(client) + .with(package.general) + .with(package.ping) + .with(package.register) + .with(package.character) + .with(package.in_game) .build(); self.state .ecs() @@ -834,10 +847,10 @@ impl Server { debug!("Starting initial sync with client."); self.state .ecs() - .write_storage::() + .write_storage::() .get_mut(entity) .unwrap() - .register_stream + .0 .send(ServerInit::GameSync { // Send client their entity entity_package: TrackedComps::fetch(&self.state.ecs()) @@ -859,8 +872,72 @@ impl Server { where S: Into, { - if let Some(client) = self.state.ecs().write_storage::().get_mut(entity) { - client.send_msg(msg.into()) + const ERR: &str = + "Don't do that. Sending these messages is only done ONCE at connect and not by this fn"; + match msg.into() { + ServerMsg::Info(_) => panic!(ERR), + ServerMsg::Init(_) => panic!(ERR), + ServerMsg::RegisterAnswer(msg) => { + self.state + .ecs() + .write_storage::() + .get_mut(entity) + .map(|s| s.0.send(msg)); + }, + ServerMsg::General(msg) => { + match &msg { + //Character Screen related + ServerGeneral::CharacterDataLoadError(_) + | ServerGeneral::CharacterListUpdate(_) + | ServerGeneral::CharacterActionError(_) + | ServerGeneral::CharacterSuccess => self + .state + .ecs() + .write_storage::() + .get_mut(entity) + .map(|s| s.0.send(msg)), + //Ingame related + ServerGeneral::GroupUpdate(_) + | ServerGeneral::GroupInvite { .. } + | ServerGeneral::InvitePending(_) + | ServerGeneral::InviteComplete { .. } + | ServerGeneral::ExitInGameSuccess + | ServerGeneral::InventoryUpdate(_, _) + | ServerGeneral::TerrainChunkUpdate { .. } + | ServerGeneral::TerrainBlockUpdates(_) + | ServerGeneral::SetViewDistance(_) + | ServerGeneral::Outcomes(_) + | ServerGeneral::Knockback(_) => self + .state + .ecs() + .write_storage::() + .get_mut(entity) + .map(|s| s.0.send(msg)), + // Always possible + ServerGeneral::PlayerListUpdate(_) + | ServerGeneral::ChatMsg(_) + | ServerGeneral::SetPlayerEntity(_) + | ServerGeneral::TimeOfDay(_) + | ServerGeneral::EntitySync(_) + | ServerGeneral::CompSync(_) + | ServerGeneral::CreateEntity(_) + | ServerGeneral::DeleteEntity(_) + | ServerGeneral::Disconnect(_) + | ServerGeneral::Notification(_) => self + .state + .ecs() + .write_storage::() + .get_mut(entity) + .map(|s| s.0.send(msg)), + }; + }, + ServerMsg::Ping(msg) => { + self.state + .ecs() + .write_storage::() + .get_mut(entity) + .map(|s| s.0.send(msg)); + }, } } diff --git a/server/src/state_ext.rs b/server/src/state_ext.rs index 06d7c523e3..57d714838b 100644 --- a/server/src/state_ext.rs +++ b/server/src/state_ext.rs @@ -1,11 +1,14 @@ use crate::{ - client::Client, persistence::PersistedComponents, sys::sentinel::DeletedEntities, SpawnPoint, + client::{CharacterScreenStream, Client, GeneralStream}, + persistence::PersistedComponents, + sys::sentinel::DeletedEntities, + SpawnPoint, }; use common::{ character::CharacterId, comp, effect::Effect, - msg::{CharacterInfo, ClientInGame, PlayerListUpdate, ServerGeneral, ServerMsg}, + msg::{CharacterInfo, ClientInGame, PlayerListUpdate, ServerGeneral}, state::State, sync::{Uid, UidAllocator, WorldSyncExt}, util::Dir, @@ -222,8 +225,16 @@ impl StateExt for State { // Tell the client its request was successful. if let Some(client) = self.ecs().write_storage::().get_mut(entity) { - client.in_game = Some(ClientInGame::Character); - client.send_msg(ServerGeneral::CharacterSuccess) + if let Some(character_screen_stream) = self + .ecs() + .write_storage::() + .get_mut(entity) + { + client.in_game = Some(ClientInGame::Character); + let _ = character_screen_stream + .0 + .send(ServerGeneral::CharacterSuccess); + } } } @@ -293,14 +304,16 @@ impl StateExt for State { } }, comp::ChatType::Tell(u, t) => { - for (client, uid) in ( - &mut ecs.write_storage::(), + for (general_stream, uid) in ( + &mut ecs.write_storage::(), &ecs.read_storage::(), ) .join() { if uid == u || uid == t { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + let _ = general_stream + .0 + .send(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, @@ -310,9 +323,13 @@ impl StateExt for State { (*ecs.read_resource::()).retrieve_entity_internal(uid.0); let positions = ecs.read_storage::(); if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) { - for (client, pos) in (&mut ecs.write_storage::(), &positions).join() { + for (general_stream, pos) in + (&mut ecs.write_storage::(), &positions).join() + { if is_within(comp::ChatMsg::SAY_DISTANCE, pos, speaker_pos) { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + let _ = general_stream + .0 + .send(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } @@ -322,9 +339,13 @@ impl StateExt for State { (*ecs.read_resource::()).retrieve_entity_internal(uid.0); let positions = ecs.read_storage::(); if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) { - for (client, pos) in (&mut ecs.write_storage::(), &positions).join() { + for (general_stream, pos) in + (&mut ecs.write_storage::(), &positions).join() + { if is_within(comp::ChatMsg::REGION_DISTANCE, pos, speaker_pos) { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + let _ = general_stream + .0 + .send(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } @@ -334,35 +355,43 @@ impl StateExt for State { (*ecs.read_resource::()).retrieve_entity_internal(uid.0); let positions = ecs.read_storage::(); if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) { - for (client, pos) in (&mut ecs.write_storage::(), &positions).join() { + for (general_stream, pos) in + (&mut ecs.write_storage::(), &positions).join() + { if is_within(comp::ChatMsg::NPC_DISTANCE, pos, speaker_pos) { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + let _ = general_stream + .0 + .send(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } }, comp::ChatType::FactionMeta(s) | comp::ChatType::Faction(_, s) => { - for (client, faction) in ( - &mut ecs.write_storage::(), + for (general_stream, faction) in ( + &mut ecs.write_storage::(), &ecs.read_storage::(), ) .join() { if s == &faction.0 { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + let _ = general_stream + .0 + .send(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, comp::ChatType::GroupMeta(g) | comp::ChatType::Group(_, g) => { - for (client, group) in ( - &mut ecs.write_storage::(), + for (general_stream, group) in ( + &mut ecs.write_storage::(), &ecs.read_storage::(), ) .join() { if g == group { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + let _ = general_stream + .0 + .send(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, @@ -371,23 +400,27 @@ impl StateExt for State { /// Sends the message to all connected clients fn notify_registered_clients(&self, msg: ServerGeneral) { - let msg: ServerMsg = msg.into(); - for client in (&mut self.ecs().write_storage::()) + for (general_stream, _) in ( + &mut self.ecs().write_storage::(), + &self.ecs().read_storage::(), + ) .join() - .filter(|c| c.registered) + .filter(|(_, c)| c.registered) { - client.send_msg(msg.clone()); + let _ = general_stream.0.send(msg.clone()); } } /// Sends the message to all clients playing in game fn notify_in_game_clients(&self, msg: ServerGeneral) { - let msg: ServerMsg = msg.into(); - for client in (&mut self.ecs().write_storage::()) + for (general_stream, _) in ( + &mut self.ecs().write_storage::(), + &self.ecs().read_storage::(), + ) .join() - .filter(|c| c.in_game.is_some()) + .filter(|(_, c)| c.in_game.is_some()) { - client.send_msg(msg.clone()); + let _ = general_stream.0.send(msg.clone()); } } @@ -397,7 +430,7 @@ impl StateExt for State { ) -> Result<(), specs::error::WrongGeneration> { // Remove entity from a group if they are in one { - let mut clients = self.ecs().write_storage::(); + let mut general_streams = self.ecs().write_storage::(); let uids = self.ecs().read_storage::(); let mut group_manager = self.ecs().write_resource::(); group_manager.entity_deleted( @@ -407,14 +440,14 @@ impl StateExt for State { &uids, &self.ecs().entities(), &mut |entity, group_change| { - clients + general_streams .get_mut(entity) - .and_then(|c| { + .and_then(|s| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, c)) + .map(|g| (g, s)) }) - .map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g))); + .map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g))); }, ); } diff --git a/server/src/sys/entity_sync.rs b/server/src/sys/entity_sync.rs index 0442a04e8e..465a77d728 100644 --- a/server/src/sys/entity_sync.rs +++ b/server/src/sys/entity_sync.rs @@ -3,7 +3,7 @@ use super::{ SysTimer, }; use crate::{ - client::{Client, RegionSubscription}, + client::{Client, GeneralStream, InGameStream, RegionSubscription}, Tick, }; use common::{ @@ -43,6 +43,8 @@ impl<'a> System<'a> for Sys { WriteStorage<'a, Last>, WriteStorage<'a, Last>, WriteStorage<'a, Client>, + WriteStorage<'a, InGameStream>, + WriteStorage<'a, GeneralStream>, WriteStorage<'a, ForceUpdate>, WriteStorage<'a, InventoryUpdate>, Write<'a, DeletedEntities>, @@ -70,6 +72,8 @@ impl<'a> System<'a> for Sys { mut last_vel, mut last_ori, mut clients, + mut in_game_streams, + mut general_streams, mut force_updates, mut inventory_updates, mut deleted_entities, @@ -104,15 +108,31 @@ impl<'a> System<'a> for Sys { for (key, region) in region_map.iter() { // Assemble subscriber list for this region by iterating through clients and // checking if they are subscribed to this region - let mut subscribers = (&mut clients, &entities, &subscriptions, &positions) + let mut subscribers = ( + &mut clients, + &entities, + &subscriptions, + &positions, + &mut in_game_streams, + &mut general_streams, + ) .join() - .filter_map(|(client, entity, subscription, pos)| { - if client.in_game.is_some() && subscription.regions.contains(&key) { - Some((client, &subscription.regions, entity, *pos)) - } else { - None - } - }) + .filter_map( + |(client, entity, subscription, pos, in_game_stream, general_stream)| { + if client.in_game.is_some() && subscription.regions.contains(&key) { + Some(( + client, + &subscription.regions, + entity, + *pos, + in_game_stream, + general_stream, + )) + } else { + None + } + }, + ) .collect::>(); for event in region.events() { @@ -135,7 +155,9 @@ impl<'a> System<'a> for Sys { vel.copied(), ori.copied(), )); - for (client, regions, client_entity, _) in &mut subscribers { + for (_, regions, client_entity, _, _, general_stream) in + &mut subscribers + { if maybe_key .as_ref() .map(|key| !regions.contains(key)) @@ -143,7 +165,7 @@ impl<'a> System<'a> for Sys { // Client doesn't need to know about itself && *client_entity != entity { - client.send_msg(create_msg.clone()); + let _ = general_stream.0.send(create_msg.clone()); } } } @@ -151,13 +173,13 @@ impl<'a> System<'a> for Sys { RegionEvent::Left(id, maybe_key) => { // Lookup UID for entity if let Some(&uid) = uids.get(entities.entity(*id)) { - for (client, regions, _, _) in &mut subscribers { + for (_, regions, _, _, _, general_stream) in &mut subscribers { if maybe_key .as_ref() .map(|key| !regions.contains(key)) .unwrap_or(true) { - client.send_msg(ServerGeneral::DeleteEntity(uid)); + let _ = general_stream.0.send(ServerGeneral::DeleteEntity(uid)); } } } @@ -176,17 +198,19 @@ impl<'a> System<'a> for Sys { ); let entity_sync_msg = ServerGeneral::EntitySync(entity_sync_package); let comp_sync_msg = ServerGeneral::CompSync(comp_sync_package); - subscribers.iter_mut().for_each(move |(client, _, _, _)| { - client.send_msg(entity_sync_msg.clone()); - client.send_msg(comp_sync_msg.clone()); - }); + subscribers + .iter_mut() + .for_each(move |(_, _, _, _, _, general_stream)| { + let _ = general_stream.0.send(entity_sync_msg.clone()); + let _ = general_stream.0.send(comp_sync_msg.clone()); + }); - let mut send_msg = |msg: ServerGeneral, - entity: EcsEntity, - pos: Pos, - force_update: Option<&ForceUpdate>, - throttle: bool| { - for (client, _, client_entity, client_pos) in &mut subscribers { + let mut send_general = |msg: ServerGeneral, + entity: EcsEntity, + pos: Pos, + force_update: Option<&ForceUpdate>, + throttle: bool| { + for (_, _, client_entity, client_pos, _, general_stream) in &mut subscribers { if if client_entity == &entity { // Don't send client physics updates about itself unless force update is set force_update.is_some() @@ -212,7 +236,7 @@ impl<'a> System<'a> for Sys { true // Closer than 100 blocks } } { - client.send_msg(msg.clone()); + let _ = general_stream.0.send(msg.clone()); } } }; @@ -286,7 +310,7 @@ impl<'a> System<'a> for Sys { comp_sync_package.comp_removed::(uid); } - send_msg( + send_general( ServerGeneral::CompSync(comp_sync_package), entity, pos, @@ -299,19 +323,20 @@ impl<'a> System<'a> for Sys { // Handle entity deletion in regions that don't exist in RegionMap // (theoretically none) for (region_key, deleted) in deleted_entities.take_remaining_deleted() { - for client in - (&mut clients, &subscriptions) - .join() - .filter_map(|(client, subscription)| { - if client.in_game.is_some() && subscription.regions.contains(®ion_key) { - Some(client) - } else { - None - } - }) + for general_stream in (&mut clients, &subscriptions, &mut general_streams) + .join() + .filter_map(|(client, subscription, general_stream)| { + if client.in_game.is_some() && subscription.regions.contains(®ion_key) { + Some(general_stream) + } else { + None + } + }) { for uid in &deleted { - client.send_msg(ServerGeneral::DeleteEntity(Uid(*uid))); + let _ = general_stream + .0 + .send(ServerGeneral::DeleteEntity(Uid(*uid))); } } } @@ -319,15 +344,19 @@ impl<'a> System<'a> for Sys { // TODO: Sync clients that don't have a position? // Sync inventories - for (client, inventory, update) in (&mut clients, &inventories, &inventory_updates).join() { - client.send_msg(ServerGeneral::InventoryUpdate( + for (inventory, update, in_game_stream) in + (&inventories, &inventory_updates, &mut in_game_streams).join() + { + let _ = in_game_stream.0.send(ServerGeneral::InventoryUpdate( inventory.clone(), update.event(), )); } // Sync outcomes - for (client, player, pos) in (&mut clients, &players, positions.maybe()).join() { + for (player, pos, in_game_stream) in + (&players, positions.maybe(), &mut in_game_streams).join() + { let is_near = |o_pos: Vec3| { pos.zip_with(player.view_distance, |pos, vd| { pos.0.xy().distance_squared(o_pos.xy()) @@ -341,7 +370,7 @@ impl<'a> System<'a> for Sys { .cloned() .collect::>(); if !outcomes.is_empty() { - client.send_msg(ServerGeneral::Outcomes(outcomes)); + let _ = in_game_stream.0.send(ServerGeneral::Outcomes(outcomes)); } } outcomes.clear(); @@ -354,8 +383,8 @@ impl<'a> System<'a> for Sys { // TODO: doesn't really belong in this system (rename system or create another // system?) let tof_msg = ServerGeneral::TimeOfDay(*time_of_day); - for client in (&mut clients).join() { - client.send_msg(tof_msg.clone()); + for general_stream in (&mut general_streams).join() { + let _ = general_stream.0.send(tof_msg.clone()); } timer.end(); diff --git a/server/src/sys/invite_timeout.rs b/server/src/sys/invite_timeout.rs index 42f6603fe8..87fa70b25c 100644 --- a/server/src/sys/invite_timeout.rs +++ b/server/src/sys/invite_timeout.rs @@ -1,5 +1,5 @@ use super::SysTimer; -use crate::client::Client; +use crate::client::InGameStream; use common::{ comp::group::{Invite, PendingInvites}, msg::{InviteAnswer, ServerGeneral}, @@ -16,14 +16,14 @@ impl<'a> System<'a> for Sys { Entities<'a>, WriteStorage<'a, Invite>, WriteStorage<'a, PendingInvites>, - WriteStorage<'a, Client>, + WriteStorage<'a, InGameStream>, ReadStorage<'a, Uid>, Write<'a, SysTimer>, ); fn run( &mut self, - (entities, mut invites, mut pending_invites, mut clients, uids, mut timer): Self::SystemData, + (entities, mut invites, mut pending_invites, mut in_game_streams, uids, mut timer): Self::SystemData, ) { span!(_guard, "run", "invite_timeout::Sys::run"); timer.start(); @@ -51,13 +51,14 @@ impl<'a> System<'a> for Sys { } // Inform inviter of timeout - if let (Some(client), Some(target)) = - (clients.get_mut(*inviter), uids.get(invitee).copied()) - { - client.send_msg(ServerGeneral::InviteComplete { + if let (Some(in_game_stream), Some(target)) = ( + in_game_streams.get_mut(*inviter), + uids.get(invitee).copied(), + ) { + let _ = in_game_stream.0.send(ServerGeneral::InviteComplete { target, answer: InviteAnswer::TimedOut, - }) + }); } Some(invitee) diff --git a/server/src/sys/message.rs b/server/src/sys/message.rs index 9a9dfc92d3..78183af705 100644 --- a/server/src/sys/message.rs +++ b/server/src/sys/message.rs @@ -2,7 +2,9 @@ use super::SysTimer; use crate::{ alias_validator::AliasValidator, character_creator, - client::Client, + client::{ + CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegisterStream, + }, login_provider::LoginProvider, metrics::{NetworkRequestMetrics, PlayerMetrics}, persistence::character_loader::CharacterLoader, @@ -41,6 +43,7 @@ impl Sys { new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, entity: specs::Entity, client: &mut Client, + general_stream: &mut GeneralStream, player_metrics: &ReadExpect<'_, PlayerMetrics>, uids: &ReadStorage<'_, Uid>, chat_modes: &ReadStorage<'_, ChatMode>, @@ -68,7 +71,9 @@ impl Sys { } }, ClientGeneral::Disconnect => { - client.send_msg(ServerGeneral::Disconnect(DisconnectReason::Requested)); + general_stream + .0 + .send(ServerGeneral::Disconnect(DisconnectReason::Requested))?; }, ClientGeneral::Terminate => { debug!(?entity, "Client send message to termitate session"); @@ -88,6 +93,7 @@ impl Sys { server_emitter: &mut common::event::Emitter<'_, ServerEvent>, entity: specs::Entity, client: &mut Client, + in_game_stream: &mut InGameStream, terrain: &ReadExpect<'_, TerrainGrid>, network_metrics: &ReadExpect<'_, NetworkRequestMetrics>, can_build: &ReadStorage<'_, CanBuild>, @@ -115,7 +121,7 @@ impl Sys { ClientGeneral::ExitInGame => { client.in_game = None; server_emitter.emit(ServerEvent::ExitIngame { entity }); - client.send_msg(ServerGeneral::ExitInGameSuccess); + in_game_stream.0.send(ServerGeneral::ExitInGameSuccess)?; }, ClientGeneral::SetViewDistance(view_distance) => { players.get_mut(entity).map(|player| { @@ -133,9 +139,9 @@ impl Sys { .map(|max| view_distance > max) .unwrap_or(false) { - client.send_msg(ServerGeneral::SetViewDistance( + in_game_stream.0.send(ServerGeneral::SetViewDistance( settings.max_view_distance.unwrap_or(0), - )); + ))?; } }, ClientGeneral::ControllerInputs(inputs) => { @@ -203,10 +209,10 @@ impl Sys { match terrain.get_key(key) { Some(chunk) => { network_metrics.chunks_served_from_memory.inc(); - client.send_msg(ServerGeneral::TerrainChunkUpdate { + in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate { key, chunk: Ok(Box::new(chunk.clone())), - }) + })? }, None => { network_metrics.chunks_generation_triggered.inc(); @@ -243,6 +249,7 @@ impl Sys { new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, entity: specs::Entity, client: &mut Client, + character_screen_stream: &mut CharacterScreenStream, character_loader: &ReadExpect<'_, CharacterLoader>, uids: &ReadStorage<'_, Uid>, players: &mut WriteStorage<'_, Player>, @@ -278,10 +285,11 @@ impl Sys { // Give the player a welcome message if !editable_settings.server_description.is_empty() { - client.send_msg( - ChatType::CommandInfo - .server_msg(String::from(&*editable_settings.server_description)), - ); + character_screen_stream + .0 + .send(ChatType::CommandInfo.server_msg(String::from( + &*editable_settings.server_description, + )))?; } if !client.login_msg_sent { @@ -295,9 +303,11 @@ impl Sys { } } } else { - client.send_msg(ServerGeneral::CharacterDataLoadError(String::from( - "Failed to fetch player entity", - ))) + character_screen_stream + .0 + .send(ServerGeneral::CharacterDataLoadError(String::from( + "Failed to fetch player entity", + )))? } } ClientGeneral::Character(_) => { @@ -313,7 +323,9 @@ impl Sys { ClientGeneral::CreateCharacter { alias, tool, body } => { if let Err(error) = alias_validator.validate(&alias) { debug!(?error, ?alias, "denied alias as it contained a banned word"); - client.send_msg(ServerGeneral::CharacterActionError(error.to_string())); + character_screen_stream + .0 + .send(ServerGeneral::CharacterActionError(error.to_string()))?; } else if let Some(player) = players.get(entity) { character_creator::create_character( entity, @@ -340,9 +352,13 @@ impl Sys { } #[allow(clippy::too_many_arguments)] - fn handle_ping_msg(client: &mut Client, msg: PingMsg) -> Result<(), crate::error::Error> { + fn handle_ping_msg( + client: &mut Client, + ping_stream: &mut PingStream, + msg: PingMsg, + ) -> Result<(), crate::error::Error> { match msg { - PingMsg::Ping => client.send_msg(PingMsg::Pong), + PingMsg::Ping => ping_stream.0.send(PingMsg::Pong)?, PingMsg::Pong => {}, } Ok(()) @@ -354,6 +370,7 @@ impl Sys { new_players: &mut Vec, entity: specs::Entity, client: &mut Client, + register_stream: &mut RegisterStream, player_metrics: &ReadExpect<'_, PlayerMetrics>, login_provider: &mut WriteExpect<'_, LoginProvider>, admins: &mut WriteStorage<'_, Admin>, @@ -368,9 +385,7 @@ impl Sys { &*editable_settings.banlist, ) { Err(err) => { - client - .register_stream - .send(ServerRegisterAnswer::Err(err))?; + register_stream.0.send(ServerRegisterAnswer::Err(err))?; return Ok(()); }, Ok((username, uuid)) => (username, uuid), @@ -382,8 +397,8 @@ impl Sys { if !player.is_valid() { // Invalid player - client - .register_stream + register_stream + .0 .send(ServerRegisterAnswer::Err(RegisterError::InvalidCharacter))?; return Ok(()); } @@ -401,12 +416,14 @@ impl Sys { // Tell the client its request was successful. client.registered = true; - client.register_stream.send(ServerRegisterAnswer::Ok(()))?; + register_stream.0.send(ServerRegisterAnswer::Ok(()))?; // Send initial player list - client.send_msg(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init( - player_list.clone(), - ))); + register_stream + .0 + .send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init( + player_list.clone(), + )))?; // Add to list to notify all clients of the new player new_players.push(entity); @@ -446,19 +463,8 @@ impl Sys { editable_settings: &ReadExpect<'_, EditableSettings>, alias_validator: &ReadExpect<'_, AliasValidator>, ) -> Result<(), crate::error::Error> { - let (mut b1, mut b2, mut b3, mut b4, mut b5) = ( - client.network_error, - client.network_error, - client.network_error, - client.network_error, - client.network_error, - ); loop { /* - waiting for 1 of the 5 streams to return a massage asynchronous. - If so, handle that msg type. This code will be refactored soon - */ - let q1 = Client::internal_recv(&mut b1, &mut client.general_stream); let q2 = Client::internal_recv(&mut b2, &mut client.in_game_stream); let q3 = Client::internal_recv(&mut b3, &mut client.character_screen_stream); @@ -540,7 +546,7 @@ impl Sys { editable_settings, msg?, )?; - } + }*/ } } } @@ -571,6 +577,11 @@ impl<'a> System<'a> for Sys { WriteStorage<'a, Ori>, WriteStorage<'a, Player>, WriteStorage<'a, Client>, + WriteStorage<'a, GeneralStream>, + //WriteStorage<'a, PingStream>, + //WriteStorage<'a, RegisterStream>, + //WriteStorage<'a, CharacterScreenStream>, + //WriteStorage<'a, InGameStream>, WriteStorage<'a, Controller>, Read<'a, Settings>, ReadExpect<'a, EditableSettings>, @@ -604,6 +615,11 @@ impl<'a> System<'a> for Sys { mut orientations, mut players, mut clients, + mut general_streams, + //mut ping_streams, + //mut register_streams, + //mut character_screen_streams, + //mut in_game_streams, mut controllers, settings, editable_settings, @@ -697,7 +713,8 @@ impl<'a> System<'a> for Sys { server_emitter.emit(ServerEvent::ClientDisconnect(entity)); } else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 * 0.5 { // Try pinging the client if the timeout is nearing. - client.send_msg(PingMsg::Ping); + //FIXME + //client.send_msg(PingMsg::Ping); } } @@ -713,7 +730,8 @@ impl<'a> System<'a> for Sys { character: None, // new players will be on character select. })); for client in (&mut clients).join().filter(|c| c.registered) { - client.send_msg(msg.clone()) + //FIXME + //client.send_msg(msg.clone()) } } } diff --git a/server/src/sys/subscription.rs b/server/src/sys/subscription.rs index 2eac60f2a6..5fd1611ae0 100644 --- a/server/src/sys/subscription.rs +++ b/server/src/sys/subscription.rs @@ -2,7 +2,7 @@ use super::{ sentinel::{DeletedEntities, TrackedComps}, SysTimer, }; -use crate::client::{self, Client, RegionSubscription}; +use crate::client::{self, Client, InGameStream, RegionSubscription}; use common::{ comp::{Ori, Player, Pos, Vel}, msg::ServerGeneral, @@ -32,7 +32,8 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Vel>, ReadStorage<'a, Ori>, ReadStorage<'a, Player>, - WriteStorage<'a, Client>, + ReadStorage<'a, Client>, + WriteStorage<'a, InGameStream>, WriteStorage<'a, RegionSubscription>, Write<'a, DeletedEntities>, TrackedComps<'a>, @@ -50,7 +51,8 @@ impl<'a> System<'a> for Sys { velocities, orientations, players, - mut clients, + clients, + mut in_game_streams, mut subscriptions, mut deleted_entities, tracked_comps, @@ -71,17 +73,18 @@ impl<'a> System<'a> for Sys { // 7. Determine list of regions that are in range and iterate through it // - check if in hashset (hash calc) if not add it let mut regions_to_remove = Vec::new(); - for (client, subscription, pos, vd, client_entity) in ( - &mut clients, + for (_, subscription, pos, vd, client_entity, in_game_stream) in ( + &clients, &mut subscriptions, &positions, &players, &entities, + &mut in_game_streams, ) .join() - .filter_map(|(client, s, pos, player, e)| { + .filter_map(|(client, s, pos, player, e, stream)| { if client.in_game.is_some() { - player.view_distance.map(|v| (client, s, pos, v, e)) + player.view_distance.map(|v| (client, s, pos, v, e, stream)) } else { None } @@ -153,7 +156,9 @@ impl<'a> System<'a> for Sys { .map(|key| subscription.regions.contains(key)) .unwrap_or(false) { - client.send_msg(ServerGeneral::DeleteEntity(uid)); + let _ = in_game_stream + .0 + .send(ServerGeneral::DeleteEntity(uid)); } } }, @@ -161,7 +166,7 @@ impl<'a> System<'a> for Sys { } // Tell client to delete entities in the region for (&uid, _) in (&uids, region.entities()).join() { - client.send_msg(ServerGeneral::DeleteEntity(uid)); + let _ = in_game_stream.0.send(ServerGeneral::DeleteEntity(uid)); } } // Send deleted entities since they won't be processed for this client in entity @@ -171,7 +176,9 @@ impl<'a> System<'a> for Sys { .iter() .flat_map(|v| v.iter()) { - client.send_msg(ServerGeneral::DeleteEntity(Uid(*uid))); + let _ = in_game_stream + .0 + .send(ServerGeneral::DeleteEntity(Uid(*uid))); } } @@ -196,7 +203,7 @@ impl<'a> System<'a> for Sys { { // Send message to create entity and tracked components and physics // components - client.send_msg(ServerGeneral::CreateEntity( + let _ = in_game_stream.0.send(ServerGeneral::CreateEntity( tracked_comps.create_entity_package( entity, Some(*pos), @@ -217,14 +224,14 @@ impl<'a> System<'a> for Sys { /// Initialize region subscription pub fn initialize_region_subscription(world: &World, entity: specs::Entity) { - if let (Some(client_pos), Some(client_vd), Some(client)) = ( + if let (Some(client_pos), Some(client_vd), Some(in_game_stream)) = ( world.read_storage::().get(entity), world .read_storage::() .get(entity) .map(|pl| pl.view_distance) .and_then(|v| v), - world.write_storage::().get_mut(entity), + world.write_storage::().get_mut(entity), ) { let fuzzy_chunk = (Vec2::::from(client_pos.0)) .map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32); @@ -249,7 +256,7 @@ pub fn initialize_region_subscription(world: &World, entity: specs::Entity) { .join() { // Send message to create entity and tracked components and physics components - client.send_msg(ServerGeneral::CreateEntity( + let _ = in_game_stream.0.send(ServerGeneral::CreateEntity( tracked_comps.create_entity_package( entity, Some(*pos), diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index eafa24b0ff..8a359e2423 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -1,5 +1,5 @@ use super::SysTimer; -use crate::{chunk_generator::ChunkGenerator, client::Client, Tick}; +use crate::{chunk_generator::ChunkGenerator, client::InGameStream, Tick}; use common::{ comp::{self, bird_medium, Alignment, Player, Pos}, event::{EventBus, ServerEvent}, @@ -34,7 +34,7 @@ impl<'a> System<'a> for Sys { Write<'a, TerrainChanges>, ReadStorage<'a, Pos>, ReadStorage<'a, Player>, - WriteStorage<'a, Client>, + WriteStorage<'a, InGameStream>, ); fn run( @@ -48,7 +48,7 @@ impl<'a> System<'a> for Sys { mut terrain_changes, positions, players, - mut clients, + mut in_game_streams, ): Self::SystemData, ) { span!(_guard, "run", "terrain::Sys::run"); @@ -62,8 +62,8 @@ impl<'a> System<'a> for Sys { let (chunk, supplement) = match res { Ok((chunk, supplement)) => (chunk, supplement), Err(Some(entity)) => { - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ServerGeneral::TerrainChunkUpdate { + if let Some(in_game_stream) = in_game_streams.get_mut(entity) { + let _ = in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate { key, chunk: Err(()), }); @@ -75,10 +75,10 @@ impl<'a> System<'a> for Sys { }, }; // Send the chunk to all nearby players. - for (view_distance, pos, client) in (&players, &positions, &mut clients) + for (view_distance, pos, in_game_stream) in (&players, &positions, &mut in_game_streams) .join() - .filter_map(|(player, pos, client)| { - player.view_distance.map(|vd| (vd, pos, client)) + .filter_map(|(player, pos, in_game_stream)| { + player.view_distance.map(|vd| (vd, pos, in_game_stream)) }) { let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32)); @@ -90,7 +90,7 @@ impl<'a> System<'a> for Sys { .magnitude_squared(); if adjusted_dist_sqr <= view_distance.pow(2) { - client.send_msg(ServerGeneral::TerrainChunkUpdate { + let _ = in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate { key, chunk: Ok(Box::new(chunk.clone())), }); diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index a2e21f42a9..e547f4a8e3 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -1,5 +1,5 @@ use super::SysTimer; -use crate::client::Client; +use crate::client::InGameStream; use common::{ comp::{Player, Pos}, msg::ServerGeneral, @@ -20,25 +20,26 @@ impl<'a> System<'a> for Sys { Write<'a, SysTimer>, ReadStorage<'a, Pos>, ReadStorage<'a, Player>, - WriteStorage<'a, Client>, + WriteStorage<'a, InGameStream>, ); fn run( &mut self, - (terrain, terrain_changes, mut timer, positions, players, mut clients): Self::SystemData, + (terrain, terrain_changes, mut timer, positions, players, mut in_game_streams): Self::SystemData, ) { span!(_guard, "run", "terrain_sync::Sys::run"); timer.start(); // Sync changed chunks 'chunk: for chunk_key in &terrain_changes.modified_chunks { - for (player, pos, client) in (&players, &positions, &mut clients).join() { + for (player, pos, in_game_stream) in (&players, &positions, &mut in_game_streams).join() + { if player .view_distance .map(|vd| super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, vd)) .unwrap_or(false) { - client.send_msg(ServerGeneral::TerrainChunkUpdate { + let _ = in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate { key: *chunk_key, chunk: Ok(Box::new(match terrain.get_key(*chunk_key) { Some(chunk) => chunk.clone(), @@ -52,9 +53,9 @@ impl<'a> System<'a> for Sys { // TODO: Don't send all changed blocks to all clients // Sync changed blocks let msg = ServerGeneral::TerrainBlockUpdates(terrain_changes.modified_blocks.clone()); - for (player, client) in (&players, &mut clients).join() { + for (player, in_game_stream) in (&players, &mut in_game_streams).join() { if player.view_distance.is_some() { - client.send_msg(msg.clone()); + let _ = in_game_stream.0.send(msg.clone()); } } diff --git a/server/src/sys/waypoint.rs b/server/src/sys/waypoint.rs index bf8034baa0..3af23149b4 100644 --- a/server/src/sys/waypoint.rs +++ b/server/src/sys/waypoint.rs @@ -1,5 +1,5 @@ use super::SysTimer; -use crate::client::Client; +use crate::client::GeneralStream; use common::{ comp::{Player, Pos, Waypoint, WaypointArea}, msg::{Notification, ServerGeneral}, @@ -22,28 +22,38 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Player>, ReadStorage<'a, WaypointArea>, WriteStorage<'a, Waypoint>, - WriteStorage<'a, Client>, + WriteStorage<'a, GeneralStream>, Read<'a, Time>, Write<'a, SysTimer>, ); fn run( &mut self, - (entities, positions, players, waypoint_areas, mut waypoints, mut clients, time, mut timer): Self::SystemData, + ( + entities, + positions, + players, + waypoint_areas, + mut waypoints, + mut general_streams, + time, + mut timer, + ): Self::SystemData, ) { span!(_guard, "run", "waypoint::Sys::run"); timer.start(); - for (entity, player_pos, _, client) in - (&entities, &positions, &players, &mut clients).join() + for (entity, player_pos, _, general_stream) in + (&entities, &positions, &players, &mut general_streams).join() { for (waypoint_pos, waypoint_area) in (&positions, &waypoint_areas).join() { if player_pos.0.distance_squared(waypoint_pos.0) < waypoint_area.radius().powi(2) { if let Ok(wp_old) = waypoints.insert(entity, Waypoint::new(player_pos.0, *time)) { if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) { - client - .send_msg(ServerGeneral::Notification(Notification::WaypointSaved)); + let _ = general_stream + .0 + .send(ServerGeneral::Notification(Notification::WaypointSaved)); } } }