diff --git a/server/src/client.rs b/server/src/client.rs index e706689d80..eec6385676 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,24 +1,39 @@ -use crate::error::Error; -use common::msg::{ClientInGame, ClientType, ServerGeneral, ServerMsg}; +use common::msg::{ClientInGame, ClientType}; use hashbrown::HashSet; use network::{Participant, Stream}; -use serde::{de::DeserializeOwned, Serialize}; use specs::{Component, FlaggedStorage}; use specs_idvs::IdvStorage; -use tracing::debug; use vek::*; +// Streams +// we ignore errors on send, and do unified error handling in recv +pub struct GeneralStream(pub Stream); +pub struct PingStream(pub Stream); +pub struct RegisterStream(pub Stream); +pub struct CharacterScreenStream(pub Stream); +pub struct InGameStream(pub Stream); + +impl Component for GeneralStream { + type Storage = FlaggedStorage>; +} +impl Component for PingStream { + type Storage = FlaggedStorage>; +} +impl Component for RegisterStream { + type Storage = FlaggedStorage>; +} +impl Component for CharacterScreenStream { + type Storage = FlaggedStorage>; +} +impl Component for InGameStream { + type Storage = FlaggedStorage>; +} + pub struct Client { pub registered: bool, pub client_type: ClientType, pub in_game: Option, pub participant: Option, - pub general_stream: Stream, - pub ping_stream: Stream, - pub register_stream: Stream, - pub character_screen_stream: Stream, - pub in_game_stream: Stream, - pub network_error: bool, pub last_ping: f64, pub login_msg_sent: bool, } @@ -27,97 +42,6 @@ impl Component for Client { type Storage = FlaggedStorage>; } -impl Client { - fn internal_send(err: &mut bool, s: &mut Stream, msg: M) { - if !*err { - if let Err(e) = s.send(msg) { - debug!(?e, "got a network error with client"); - *err = true; - } - } - } - - /* - fn internal_send_raw(b: &AtomicBool, s: &mut Stream, msg: Arc) { - if !b.load(Ordering::Relaxed) { - if let Err(e) = s.send_raw(msg) { - debug!(?e, "got a network error with client"); - b.store(true, Ordering::Relaxed); - } - } - } - */ - - pub fn send_msg(&mut self, msg: S) - where - S: Into, - { - const ERR: &str = - "Don't do that. Sending these messages is only done ONCE at connect and not by this fn"; - match msg.into() { - ServerMsg::Info(_) => panic!(ERR), - ServerMsg::Init(_) => panic!(ERR), - ServerMsg::RegisterAnswer(msg) => { - Self::internal_send(&mut self.network_error, &mut self.register_stream, &msg) - }, - ServerMsg::General(msg) => { - let stream = match &msg { - //Character Screen related - ServerGeneral::CharacterDataLoadError(_) - | ServerGeneral::CharacterListUpdate(_) - | ServerGeneral::CharacterActionError(_) - | ServerGeneral::CharacterSuccess => &mut self.character_screen_stream, - //Ingame related - ServerGeneral::GroupUpdate(_) - | ServerGeneral::GroupInvite { .. } - | ServerGeneral::InvitePending(_) - | ServerGeneral::InviteComplete { .. } - | ServerGeneral::ExitInGameSuccess - | ServerGeneral::InventoryUpdate(_, _) - | ServerGeneral::TerrainChunkUpdate { .. } - | ServerGeneral::TerrainBlockUpdates(_) - | ServerGeneral::SetViewDistance(_) - | ServerGeneral::Outcomes(_) - | ServerGeneral::Knockback(_) => &mut self.in_game_stream, - // Always possible - ServerGeneral::PlayerListUpdate(_) - | ServerGeneral::ChatMsg(_) - | ServerGeneral::SetPlayerEntity(_) - | ServerGeneral::TimeOfDay(_) - | ServerGeneral::EntitySync(_) - | ServerGeneral::CompSync(_) - | ServerGeneral::CreateEntity(_) - | ServerGeneral::DeleteEntity(_) - | ServerGeneral::Disconnect(_) - | ServerGeneral::Notification(_) => &mut self.general_stream, - }; - Self::internal_send(&mut self.network_error, stream, &msg) - }, - ServerMsg::Ping(msg) => { - Self::internal_send(&mut self.network_error, &mut self.ping_stream, &msg) - }, - }; - } - - pub async fn internal_recv( - err: &mut bool, - s: &mut Stream, - ) -> Result { - if !*err { - match s.recv().await { - Ok(r) => Ok(r), - Err(e) => { - debug!(?e, "got a network error with client while recv"); - *err = true; - Err(Error::StreamErr(e)) - }, - } - } else { - Err(Error::StreamErr(network::StreamError::StreamClosed)) - } - } -} - // Distance from fuzzy_chunk before snapping to current chunk pub const CHUNK_FUZZ: u32 = 2; // Distance out of the range of a region before removing it from subscriptions diff --git a/server/src/cmd.rs b/server/src/cmd.rs index e747e725ad..462d6c3e05 100644 --- a/server/src/cmd.rs +++ b/server/src/cmd.rs @@ -3,7 +3,6 @@ //! `CHAT_COMMANDS` and provide a handler function. use crate::{ - client::Client, settings::{BanRecord, EditableSetting}, Server, StateExt, }; @@ -27,7 +26,7 @@ use std::convert::TryFrom; use vek::*; use world::util::Sampler; -use crate::login_provider::LoginProvider; +use crate::{client::InGameStream, login_provider::LoginProvider}; use scan_fmt::{scan_fmt, scan_fmt_some}; use tracing::error; @@ -650,7 +649,8 @@ fn handle_spawn( // Add to group system if a pet if matches!(alignment, comp::Alignment::Owned { .. }) { let state = server.state(); - let mut clients = state.ecs().write_storage::(); + let mut in_game_streams = + state.ecs().write_storage::(); let uids = state.ecs().read_storage::(); let mut group_manager = state.ecs().write_resource::(); @@ -662,15 +662,15 @@ fn handle_spawn( &state.ecs().read_storage(), &uids, &mut |entity, group_change| { - clients + in_game_streams .get_mut(entity) - .and_then(|c| { + .and_then(|s| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, c)) + .map(|g| (g, s)) }) - .map(|(g, c)| { - c.send_msg(ServerGeneral::GroupUpdate(g)) + .map(|(g, s)| { + let _ = s.0.send(ServerGeneral::GroupUpdate(g)); }); }, ); diff --git a/server/src/connection_handler.rs b/server/src/connection_handler.rs index 2f79f1795d..82c4f191f9 100644 --- a/server/src/connection_handler.rs +++ b/server/src/connection_handler.rs @@ -1,4 +1,7 @@ -use crate::{Client, ClientType, ServerInfo}; +use crate::{ + CharacterScreenStream, Client, ClientType, GeneralStream, InGameStream, PingStream, + RegisterStream, ServerInfo, +}; use crossbeam::{bounded, unbounded, Receiver, Sender}; use futures_channel::oneshot; use futures_executor::block_on; @@ -13,10 +16,19 @@ pub(crate) struct ServerInfoPacket { pub time: f64, } +pub(crate) struct ClientPackage { + pub client: Client, + pub general: GeneralStream, + pub ping: PingStream, + pub register: RegisterStream, + pub character: CharacterScreenStream, + pub in_game: InGameStream, +} + pub(crate) struct ConnectionHandler { _network: Arc, thread_handle: Option>, - pub client_receiver: Receiver, + pub client_receiver: Receiver, pub info_requester_receiver: Receiver>, stop_sender: Option>, } @@ -31,7 +43,7 @@ impl ConnectionHandler { let network_clone = Arc::clone(&network); let (stop_sender, stop_receiver) = oneshot::channel(); - let (client_sender, client_receiver) = unbounded::(); + let (client_sender, client_receiver) = unbounded::(); let (info_requester_sender, info_requester_receiver) = bounded::>(1); @@ -55,7 +67,7 @@ impl ConnectionHandler { async fn work( network: Arc, - client_sender: Sender, + client_sender: Sender, info_requester_sender: Sender>, stop_receiver: oneshot::Receiver<()>, ) { @@ -92,7 +104,7 @@ impl ConnectionHandler { async fn init_participant( participant: Participant, - client_sender: Sender, + client_sender: Sender, info_requester_sender: Sender>, ) -> Result<(), Box> { debug!("New Participant connected to the server"); @@ -129,17 +141,20 @@ impl ConnectionHandler { client_type, in_game: None, participant: Some(participant), - general_stream, - ping_stream, - register_stream, - in_game_stream, - character_screen_stream, - network_error: false, last_ping: server_data.time, login_msg_sent: false, }; - client_sender.send(client)?; + let package = ClientPackage { + client, + general: GeneralStream(general_stream), + ping: PingStream(ping_stream), + register: RegisterStream(register_stream), + character: CharacterScreenStream(character_screen_stream), + in_game: InGameStream(in_game_stream), + }; + + client_sender.send(package)?; Ok(()) } } diff --git a/server/src/events/entity_manipulation.rs b/server/src/events/entity_manipulation.rs index cb175c4c20..1e3dd4af62 100644 --- a/server/src/events/entity_manipulation.rs +++ b/server/src/events/entity_manipulation.rs @@ -1,5 +1,5 @@ use crate::{ - client::Client, + client::{Client, InGameStream}, comp::{biped_large, quadruped_medium, quadruped_small}, Server, SpawnPoint, StateExt, }; @@ -42,9 +42,9 @@ pub fn handle_knockback(server: &Server, entity: EcsEntity, impulse: Vec3) if let Some(vel) = velocities.get_mut(entity) { vel.0 = impulse; } - let mut clients = state.ecs().write_storage::(); - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ServerGeneral::Knockback(impulse)); + let mut in_game_streams = state.ecs().write_storage::(); + if let Some(in_game_stream) = in_game_streams.get_mut(entity) { + let _ = in_game_stream.0.send(ServerGeneral::Knockback(impulse)); } } diff --git a/server/src/events/group_manip.rs b/server/src/events/group_manip.rs index 78c0df462b..8d8e00d150 100644 --- a/server/src/events/group_manip.rs +++ b/server/src/events/group_manip.rs @@ -1,4 +1,7 @@ -use crate::{client::Client, Server}; +use crate::{ + client::{GeneralStream, InGameStream}, + Server, +}; use common::{ comp::{ self, @@ -25,20 +28,21 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani match manip { GroupManip::Invite(uid) => { - let mut clients = state.ecs().write_storage::(); - let invitee = match state.ecs().entity_from_uid(uid.into()) { - Some(t) => t, - None => { - // Inform of failure - if let Some(client) = clients.get_mut(entity) { - client.send_msg( - ChatType::Meta - .server_msg("Invite failed, target does not exist.".to_owned()), - ); - } - return; - }, - }; + let mut general_streams = state.ecs().write_storage::(); + let invitee = + match state.ecs().entity_from_uid(uid.into()) { + Some(t) => t, + None => { + // Inform of failure + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = + general_stream.0.send(ChatType::Meta.server_msg( + "Invite failed, target does not exist.".to_owned(), + )); + } + return; + }, + }; let uids = state.ecs().read_storage::(); @@ -62,8 +66,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }); if already_in_same_group { // Inform of failure - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ChatType::Meta.server_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send(ChatType::Meta.server_msg( "Invite failed, can't invite someone already in your group".to_owned(), )); } @@ -92,8 +96,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani >= max_group_size as usize; if group_size_limit_reached { // Inform inviter that they have reached the group size limit - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send( ChatType::Meta.server_msg( "Invite failed, pending invites plus current group size have reached \ the group size limit" @@ -109,11 +113,13 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if invites.contains(invitee) { // Inform inviter that there is already an invite - if let Some(client) = clients.get_mut(entity) { - client.send_msg( - ChatType::Meta - .server_msg("This player already has a pending invite.".to_owned()), - ); + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = + general_stream + .0 + .send(ChatType::Meta.server_msg( + "This player already has a pending invite.".to_owned(), + )); } return; } @@ -150,33 +156,35 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani } }; + let mut in_game_streams = state.ecs().write_storage::(); + // If client comp - if let (Some(client), Some(inviter)) = - (clients.get_mut(invitee), uids.get(entity).copied()) + if let (Some(in_game_stream), Some(inviter)) = + (in_game_streams.get_mut(invitee), uids.get(entity).copied()) { if send_invite() { - client.send_msg(ServerGeneral::GroupInvite { + let _ = in_game_stream.0.send(ServerGeneral::GroupInvite { inviter, timeout: PRESENTED_INVITE_TIMEOUT_DUR, }); } } else if agents.contains(invitee) { send_invite(); - } else if let Some(client) = clients.get_mut(entity) { - client.send_msg( + } else if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send( ChatType::Meta.server_msg("Can't invite, not a player or npc".to_owned()), ); } // Notify inviter that the invite is pending if invite_sent { - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ServerGeneral::InvitePending(uid)); + if let Some(in_game_stream) = in_game_streams.get_mut(entity) { + let _ = in_game_stream.0.send(ServerGeneral::InvitePending(uid)); } } }, GroupManip::Accept => { - let mut clients = state.ecs().write_storage::(); + let mut in_game_streams = state.ecs().write_storage::(); let uids = state.ecs().read_storage::(); let mut invites = state.ecs().write_storage::(); if let Some(inviter) = invites.remove(entity).and_then(|invite| { @@ -193,13 +201,13 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani Some(inviter) }) { - if let (Some(client), Some(target)) = - (clients.get_mut(inviter), uids.get(entity).copied()) + if let (Some(in_game_stream), Some(target)) = + (in_game_streams.get_mut(inviter), uids.get(entity).copied()) { - client.send_msg(ServerGeneral::InviteComplete { + let _ = in_game_stream.0.send(ServerGeneral::InviteComplete { target, answer: InviteAnswer::Accepted, - }) + }); } let mut group_manager = state.ecs().write_resource::(); group_manager.add_group_member( @@ -210,20 +218,20 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &state.ecs().read_storage(), &uids, |entity, group_change| { - clients + in_game_streams .get_mut(entity) - .and_then(|c| { + .and_then(|s| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, c)) + .map(|g| (g, s)) }) - .map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g))); + .map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g))); }, ); } }, GroupManip::Decline => { - let mut clients = state.ecs().write_storage::(); + let mut in_game_streams = state.ecs().write_storage::(); let uids = state.ecs().read_storage::(); let mut invites = state.ecs().write_storage::(); if let Some(inviter) = invites.remove(entity).and_then(|invite| { @@ -241,18 +249,18 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani Some(inviter) }) { // Inform inviter of rejection - if let (Some(client), Some(target)) = - (clients.get_mut(inviter), uids.get(entity).copied()) + if let (Some(in_game_stream), Some(target)) = + (in_game_streams.get_mut(inviter), uids.get(entity).copied()) { - client.send_msg(ServerGeneral::InviteComplete { + let _ = in_game_stream.0.send(ServerGeneral::InviteComplete { target, answer: InviteAnswer::Declined, - }) + }); } } }, GroupManip::Leave => { - let mut clients = state.ecs().write_storage::(); + let mut in_game_streams = state.ecs().write_storage::(); let uids = state.ecs().read_storage::(); let mut group_manager = state.ecs().write_resource::(); group_manager.leave_group( @@ -262,19 +270,19 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &uids, &state.ecs().entities(), &mut |entity, group_change| { - clients + in_game_streams .get_mut(entity) - .and_then(|c| { + .and_then(|s| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, c)) + .map(|g| (g, s)) }) - .map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g))); + .map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g))); }, ); }, GroupManip::Kick(uid) => { - let mut clients = state.ecs().write_storage::(); + let mut general_streams = state.ecs().write_storage::(); let uids = state.ecs().read_storage::(); let alignments = state.ecs().read_storage::(); @@ -282,8 +290,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani Some(t) => t, None => { // Inform of failure - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send( ChatType::Meta .server_msg("Kick failed, target does not exist.".to_owned()), ); @@ -295,8 +303,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Can't kick pet if matches!(alignments.get(target), Some(comp::Alignment::Owned(owner)) if uids.get(target).map_or(true, |u| u != owner)) { - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send( ChatType::Meta.server_msg("Kick failed, you can't kick pets.".to_owned()), ); } @@ -304,8 +312,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani } // Can't kick yourself if uids.get(entity).map_or(false, |u| *u == uid) { - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send( ChatType::Meta .server_msg("Kick failed, you can't kick yourself.".to_owned()), ); @@ -315,6 +323,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani let mut groups = state.ecs().write_storage::(); let mut group_manager = state.ecs().write_resource::(); + let mut in_game_streams = state.ecs().write_storage::(); // Make sure kicker is the group leader match groups .get(target) @@ -329,58 +338,59 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &uids, &state.ecs().entities(), &mut |entity, group_change| { - clients + in_game_streams .get_mut(entity) - .and_then(|c| { + .and_then(|s| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, c)) + .map(|g| (g, s)) }) - .map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g))); + .map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g))); }, ); // Tell them the have been kicked - if let Some(client) = clients.get_mut(target) { - client.send_msg( + if let Some(general_stream) = general_streams.get_mut(target) { + let _ = general_stream.0.send( ChatType::Meta .server_msg("You were removed from the group.".to_owned()), ); } // Tell kicker that they were succesful - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ChatType::Meta.server_msg("Player kicked.".to_owned())); + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream + .0 + .send(ChatType::Meta.server_msg("Player kicked.".to_owned())); } }, Some(_) => { // Inform kicker that they are not the leader - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ChatType::Meta.server_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send(ChatType::Meta.server_msg( "Kick failed: You are not the leader of the target's group.".to_owned(), )); } }, None => { // Inform kicker that the target is not in a group - if let Some(client) = clients.get_mut(entity) { - client.send_msg( - ChatType::Meta.server_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = + general_stream.0.send(ChatType::Meta.server_msg( "Kick failed: Your target is not in a group.".to_owned(), - ), - ); + )); } }, } }, GroupManip::AssignLeader(uid) => { - let mut clients = state.ecs().write_storage::(); + let mut general_streams = state.ecs().write_storage::(); let uids = state.ecs().read_storage::(); let target = match state.ecs().entity_from_uid(uid.into()) { Some(t) => t, None => { // Inform of failure - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ChatType::Meta.server_msg( + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send(ChatType::Meta.server_msg( "Leadership transfer failed, target does not exist".to_owned(), )); } @@ -389,6 +399,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }; let groups = state.ecs().read_storage::(); let mut group_manager = state.ecs().write_resource::(); + let mut in_game_streams = state.ecs().write_storage::(); // Make sure assigner is the group leader match groups .get(target) @@ -403,25 +414,25 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &state.ecs().read_storage(), &uids, |entity, group_change| { - clients + in_game_streams .get_mut(entity) - .and_then(|c| { + .and_then(|s| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, c)) + .map(|g| (g, s)) }) - .map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g))); + .map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g))); }, ); // Tell them they are the leader - if let Some(client) = clients.get_mut(target) { - client.send_msg( + if let Some(general_stream) = general_streams.get_mut(target) { + let _ = general_stream.0.send( ChatType::Meta.server_msg("You are the group leader now.".to_owned()), ); } // Tell the old leader that the transfer was succesful - if let Some(client) = clients.get_mut(target) { - client.send_msg( + if let Some(general_stream) = general_streams.get_mut(target) { + let _ = general_stream.0.send( ChatType::Meta .server_msg("You are no longer the group leader.".to_owned()), ); @@ -429,8 +440,9 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }, Some(_) => { // Inform transferer that they are not the leader - if let Some(client) = clients.get_mut(entity) { - client.send_msg( + let mut general_streams = state.ecs().write_storage::(); + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send( ChatType::Meta.server_msg( "Transfer failed: You are not the leader of the target's group." .to_owned(), @@ -440,8 +452,9 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }, None => { // Inform transferer that the target is not in a group - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ChatType::Meta.server_msg( + let mut general_streams = state.ecs().write_storage::(); + if let Some(general_stream) = general_streams.get_mut(entity) { + let _ = general_stream.0.send(ChatType::Meta.server_msg( "Transfer failed: Your target is not in a group.".to_owned(), )); } diff --git a/server/src/events/interaction.rs b/server/src/events/interaction.rs index ff9f2e467a..21a27b01e2 100644 --- a/server/src/events/interaction.rs +++ b/server/src/events/interaction.rs @@ -1,5 +1,8 @@ use crate::{ - client::{Client, RegionSubscription}, + client::{ + CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegionSubscription, + RegisterStream, + }, Server, }; use common::{ @@ -120,80 +123,153 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) { // You can't possess other players let mut clients = ecs.write_storage::(); + let mut general_streams = ecs.write_storage::(); + let mut ping_streams = ecs.write_storage::(); + let mut register_streams = ecs.write_storage::(); + let mut character_screen_streams = ecs.write_storage::(); + let mut in_game_streams = ecs.write_storage::(); if clients.get_mut(possesse).is_none() { - if let Some(mut client) = clients.remove(possessor) { - client.send_msg(ServerGeneral::SetPlayerEntity(possesse_uid)); - clients - .insert(possesse, client) - .err() - .map(|e| error!(?e, "Error inserting client component during possession")); - // Put possess item into loadout - let mut loadouts = ecs.write_storage::(); - let loadout = loadouts - .entry(possesse) - .expect("Could not read loadouts component while possessing") - .or_insert(comp::Loadout::default()); + let client = match clients.remove(possessor) { + Some(c) => c, + None => return, + }; + let mut general_stream = match general_streams.remove(possessor) { + Some(c) => c, + None => return, + }; + let ping_stream = match ping_streams.remove(possessor) { + Some(c) => c, + None => return, + }; + let register_stream = match register_streams.remove(possessor) { + Some(c) => c, + None => return, + }; + let character_screen_stream = match character_screen_streams.remove(possessor) { + Some(c) => c, + None => return, + }; + let in_game_stream = match in_game_streams.remove(possessor) { + Some(c) => c, + None => return, + }; + let _ = general_stream + .0 + .send(ServerGeneral::SetPlayerEntity(possesse_uid)); + clients + .insert(possesse, client) + .err() + .map(|e| error!(?e, "Error inserting client component during possession")); + general_streams + .insert(possesse, general_stream) + .err() + .map(|e| { + error!( + ?e, + "Error inserting general_streams component during possession" + ) + }); + ping_streams.insert(possesse, ping_stream).err().map(|e| { + error!( + ?e, + "Error inserting ping_streams component during possession" + ) + }); + register_streams + .insert(possesse, register_stream) + .err() + .map(|e| { + error!( + ?e, + "Error inserting register_streams component during possession" + ) + }); + character_screen_streams + .insert(possesse, character_screen_stream) + .err() + .map(|e| { + error!( + ?e, + "Error inserting character_screen_streams component during possession" + ) + }); + in_game_streams + .insert(possesse, in_game_stream) + .err() + .map(|e| { + error!( + ?e, + "Error inserting in_game_streams component during possession" + ) + }); + // Put possess item into loadout + let mut loadouts = ecs.write_storage::(); + let loadout = loadouts + .entry(possesse) + .expect("Could not read loadouts component while possessing") + .or_insert(comp::Loadout::default()); - let item = comp::Item::new_from_asset_expect("common.items.debug.possess"); - if let item::ItemKind::Tool(tool) = item.kind() { - let mut abilities = tool.get_abilities(); - let mut ability_drain = abilities.drain(..); - let debug_item = comp::ItemConfig { - item, - ability1: ability_drain.next(), - ability2: ability_drain.next(), - ability3: ability_drain.next(), - block_ability: None, - dodge_ability: None, - }; - std::mem::swap(&mut loadout.active_item, &mut loadout.second_item); - loadout.active_item = Some(debug_item); - } + let item = comp::Item::new_from_asset_expect("common.items.debug.possess"); + if let item::ItemKind::Tool(tool) = item.kind() { + let mut abilities = tool.get_abilities(); + let mut ability_drain = abilities.drain(..); + let debug_item = comp::ItemConfig { + item, + ability1: ability_drain.next(), + ability2: ability_drain.next(), + ability3: ability_drain.next(), + block_ability: None, + dodge_ability: None, + }; + std::mem::swap(&mut loadout.active_item, &mut loadout.second_item); + loadout.active_item = Some(debug_item); + } - // Move player component - { - let mut players = ecs.write_storage::(); - if let Some(player) = players.remove(possessor) { - players.insert(possesse, player).err().map(|e| { - error!(?e, "Error inserting player component during possession") - }); - } + // Move player component + { + let mut players = ecs.write_storage::(); + if let Some(player) = players.remove(possessor) { + players + .insert(possesse, player) + .err() + .map(|e| error!(?e, "Error inserting player component during possession")); } - // Transfer region subscription - { - let mut subscriptions = ecs.write_storage::(); - if let Some(s) = subscriptions.remove(possessor) { - subscriptions.insert(possesse, s).err().map(|e| { - error!( - ?e, - "Error inserting subscription component during possession" - ) - }); - } + } + // Transfer region subscription + { + let mut subscriptions = ecs.write_storage::(); + if let Some(s) = subscriptions.remove(possessor) { + subscriptions.insert(possesse, s).err().map(|e| { + error!( + ?e, + "Error inserting subscription component during possession" + ) + }); } - // Remove will of the entity - ecs.write_storage::().remove(possesse); - // Reset controller of former shell - ecs.write_storage::() - .get_mut(possessor) - .map(|c| c.reset()); - // Transfer admin powers - { - let mut admins = ecs.write_storage::(); - if let Some(admin) = admins.remove(possessor) { - admins.insert(possesse, admin).err().map(|e| { - error!(?e, "Error inserting admin component during possession") - }); - } + } + // Remove will of the entity + ecs.write_storage::().remove(possesse); + // Reset controller of former shell + ecs.write_storage::() + .get_mut(possessor) + .map(|c| c.reset()); + // Transfer admin powers + { + let mut admins = ecs.write_storage::(); + if let Some(admin) = admins.remove(possessor) { + admins + .insert(possesse, admin) + .err() + .map(|e| error!(?e, "Error inserting admin component during possession")); } - // Transfer waypoint - { - let mut waypoints = ecs.write_storage::(); - if let Some(waypoint) = waypoints.remove(possessor) { - waypoints.insert(possesse, waypoint).err().map(|e| { - error!(?e, "Error inserting waypoint component during possession",) - }); - } + } + // Transfer waypoint + { + let mut waypoints = ecs.write_storage::(); + if let Some(waypoint) = waypoints.remove(possessor) { + waypoints.insert(possesse, waypoint).err().map(|e| { + error!(?e, "Error inserting waypoint component during possession",) + }); } } } diff --git a/server/src/events/inventory_manip.rs b/server/src/events/inventory_manip.rs index e82f292385..3bc648aea1 100644 --- a/server/src/events/inventory_manip.rs +++ b/server/src/events/inventory_manip.rs @@ -1,4 +1,4 @@ -use crate::{client::Client, Server, StateExt}; +use crate::{client::InGameStream, Server, StateExt}; use common::{ comp::{ self, item, @@ -279,7 +279,8 @@ pub fn handle_inventory(server: &mut Server, entity: EcsEntity, manip: comp::Inv .insert(tameable_entity, comp::Alignment::Owned(uid)); // Add to group system - let mut clients = state.ecs().write_storage::(); + let mut in_game_streams = + state.ecs().write_storage::(); let uids = state.ecs().read_storage::(); let mut group_manager = state .ecs() @@ -293,15 +294,15 @@ pub fn handle_inventory(server: &mut Server, entity: EcsEntity, manip: comp::Inv &state.ecs().read_storage(), &uids, &mut |entity, group_change| { - clients + in_game_streams .get_mut(entity) - .and_then(|c| { + .and_then(|s| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, c)) + .map(|g| (g, s)) }) - .map(|(g, c)| { - c.send_msg(ServerGeneral::GroupUpdate(g)) + .map(|(g, s)| { + s.0.send(ServerGeneral::GroupUpdate(g)) }); }, ); diff --git a/server/src/events/player.rs b/server/src/events/player.rs index e466a5b116..383de143bf 100644 --- a/server/src/events/player.rs +++ b/server/src/events/player.rs @@ -1,6 +1,12 @@ use super::Event; use crate::{ - client::Client, login_provider::LoginProvider, persistence, state_ext::StateExt, Server, + client::{ + CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegisterStream, + }, + login_provider::LoginProvider, + persistence, + state_ext::StateExt, + Server, }; use common::{ comp, @@ -17,26 +23,61 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) { span!(_guard, "handle_exit_ingame"); let state = server.state_mut(); - // Create new entity with just `Client`, `Uid`, and `Player` components - // Easier than checking and removing all other known components + // Create new entity with just `Client`, `Uid`, `Player`, and `...Stream` + // components Easier than checking and removing all other known components // Note: If other `ServerEvent`s are referring to this entity they will be // disrupted let maybe_client = state.ecs().write_storage::().remove(entity); let maybe_uid = state.read_component_copied::(entity); let maybe_player = state.ecs().write_storage::().remove(entity); let maybe_admin = state.ecs().write_storage::().remove(entity); + let maybe_general_stream = state.ecs().write_storage::().remove(entity); + let maybe_ping_stream = state.ecs().write_storage::().remove(entity); + let maybe_register_stream = state.ecs().write_storage::().remove(entity); + let maybe_character_screen_stream = state + .ecs() + .write_storage::() + .remove(entity); + let maybe_in_game_stream = state.ecs().write_storage::().remove(entity); let maybe_group = state .ecs() .write_storage::() .get(entity) .cloned(); - if let (Some(mut client), Some(uid), Some(player)) = (maybe_client, maybe_uid, maybe_player) { + if let ( + Some(mut client), + Some(uid), + Some(player), + Some(mut general_stream), + Some(ping_stream), + Some(register_stream), + Some(character_screen_stream), + Some(in_game_stream), + ) = ( + maybe_client, + maybe_uid, + maybe_player, + maybe_general_stream, + maybe_ping_stream, + maybe_register_stream, + maybe_character_screen_stream, + maybe_in_game_stream, + ) { // Tell client its request was successful client.in_game = None; - client.send_msg(ServerGeneral::ExitInGameSuccess); + let _ = general_stream.0.send(ServerGeneral::ExitInGameSuccess); - let entity_builder = state.ecs_mut().create_entity().with(client).with(player); + let entity_builder = state + .ecs_mut() + .create_entity() + .with(client) + .with(player) + .with(general_stream) + .with(ping_stream) + .with(register_stream) + .with(character_screen_stream) + .with(in_game_stream); // Preserve group component if present let entity_builder = match maybe_group { diff --git a/server/src/lib.rs b/server/src/lib.rs index e3acfa2f02..976c9f4d9c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -34,7 +34,10 @@ pub use crate::{ use crate::{ alias_validator::AliasValidator, chunk_generator::ChunkGenerator, - client::{Client, RegionSubscription}, + client::{ + CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegionSubscription, + RegisterStream, + }, cmd::ChatCommandExt, connection_handler::ConnectionHandler, data_dir::DataDir, @@ -180,6 +183,11 @@ impl Server { // Server-only components state.ecs_mut().register::(); state.ecs_mut().register::(); + state.ecs_mut().register::(); + state.ecs_mut().register::(); + state.ecs_mut().register::(); + state.ecs_mut().register::(); + state.ecs_mut().register::(); //Alias validator let banned_words_paths = &settings.banned_words_files; @@ -804,8 +812,8 @@ impl Server { }); } - while let Ok(data) = self.connection_handler.client_receiver.try_recv() { - let mut client = data; + while let Ok(mut package) = self.connection_handler.client_receiver.try_recv() { + let client = package.client; if self.settings().max_players <= self.state.ecs().read_storage::().join().count() @@ -814,7 +822,7 @@ impl Server { ?client.participant, "to many players, wont allow participant to connect" ); - client.register_stream.send(ServerInit::TooManyPlayers)?; + package.register.0.send(ServerInit::TooManyPlayers)?; continue; } @@ -823,6 +831,11 @@ impl Server { .ecs_mut() .create_entity_synced() .with(client) + .with(package.general) + .with(package.ping) + .with(package.register) + .with(package.character) + .with(package.in_game) .build(); self.state .ecs() @@ -834,10 +847,10 @@ impl Server { debug!("Starting initial sync with client."); self.state .ecs() - .write_storage::() + .write_storage::() .get_mut(entity) .unwrap() - .register_stream + .0 .send(ServerInit::GameSync { // Send client their entity entity_package: TrackedComps::fetch(&self.state.ecs()) @@ -859,8 +872,72 @@ impl Server { where S: Into, { - if let Some(client) = self.state.ecs().write_storage::().get_mut(entity) { - client.send_msg(msg.into()) + const ERR: &str = + "Don't do that. Sending these messages is only done ONCE at connect and not by this fn"; + match msg.into() { + ServerMsg::Info(_) => panic!(ERR), + ServerMsg::Init(_) => panic!(ERR), + ServerMsg::RegisterAnswer(msg) => { + self.state + .ecs() + .write_storage::() + .get_mut(entity) + .map(|s| s.0.send(msg)); + }, + ServerMsg::General(msg) => { + match &msg { + //Character Screen related + ServerGeneral::CharacterDataLoadError(_) + | ServerGeneral::CharacterListUpdate(_) + | ServerGeneral::CharacterActionError(_) + | ServerGeneral::CharacterSuccess => self + .state + .ecs() + .write_storage::() + .get_mut(entity) + .map(|s| s.0.send(msg)), + //Ingame related + ServerGeneral::GroupUpdate(_) + | ServerGeneral::GroupInvite { .. } + | ServerGeneral::InvitePending(_) + | ServerGeneral::InviteComplete { .. } + | ServerGeneral::ExitInGameSuccess + | ServerGeneral::InventoryUpdate(_, _) + | ServerGeneral::TerrainChunkUpdate { .. } + | ServerGeneral::TerrainBlockUpdates(_) + | ServerGeneral::SetViewDistance(_) + | ServerGeneral::Outcomes(_) + | ServerGeneral::Knockback(_) => self + .state + .ecs() + .write_storage::() + .get_mut(entity) + .map(|s| s.0.send(msg)), + // Always possible + ServerGeneral::PlayerListUpdate(_) + | ServerGeneral::ChatMsg(_) + | ServerGeneral::SetPlayerEntity(_) + | ServerGeneral::TimeOfDay(_) + | ServerGeneral::EntitySync(_) + | ServerGeneral::CompSync(_) + | ServerGeneral::CreateEntity(_) + | ServerGeneral::DeleteEntity(_) + | ServerGeneral::Disconnect(_) + | ServerGeneral::Notification(_) => self + .state + .ecs() + .write_storage::() + .get_mut(entity) + .map(|s| s.0.send(msg)), + }; + }, + ServerMsg::Ping(msg) => { + self.state + .ecs() + .write_storage::() + .get_mut(entity) + .map(|s| s.0.send(msg)); + }, } } diff --git a/server/src/state_ext.rs b/server/src/state_ext.rs index 06d7c523e3..57d714838b 100644 --- a/server/src/state_ext.rs +++ b/server/src/state_ext.rs @@ -1,11 +1,14 @@ use crate::{ - client::Client, persistence::PersistedComponents, sys::sentinel::DeletedEntities, SpawnPoint, + client::{CharacterScreenStream, Client, GeneralStream}, + persistence::PersistedComponents, + sys::sentinel::DeletedEntities, + SpawnPoint, }; use common::{ character::CharacterId, comp, effect::Effect, - msg::{CharacterInfo, ClientInGame, PlayerListUpdate, ServerGeneral, ServerMsg}, + msg::{CharacterInfo, ClientInGame, PlayerListUpdate, ServerGeneral}, state::State, sync::{Uid, UidAllocator, WorldSyncExt}, util::Dir, @@ -222,8 +225,16 @@ impl StateExt for State { // Tell the client its request was successful. if let Some(client) = self.ecs().write_storage::().get_mut(entity) { - client.in_game = Some(ClientInGame::Character); - client.send_msg(ServerGeneral::CharacterSuccess) + if let Some(character_screen_stream) = self + .ecs() + .write_storage::() + .get_mut(entity) + { + client.in_game = Some(ClientInGame::Character); + let _ = character_screen_stream + .0 + .send(ServerGeneral::CharacterSuccess); + } } } @@ -293,14 +304,16 @@ impl StateExt for State { } }, comp::ChatType::Tell(u, t) => { - for (client, uid) in ( - &mut ecs.write_storage::(), + for (general_stream, uid) in ( + &mut ecs.write_storage::(), &ecs.read_storage::(), ) .join() { if uid == u || uid == t { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + let _ = general_stream + .0 + .send(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, @@ -310,9 +323,13 @@ impl StateExt for State { (*ecs.read_resource::()).retrieve_entity_internal(uid.0); let positions = ecs.read_storage::(); if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) { - for (client, pos) in (&mut ecs.write_storage::(), &positions).join() { + for (general_stream, pos) in + (&mut ecs.write_storage::(), &positions).join() + { if is_within(comp::ChatMsg::SAY_DISTANCE, pos, speaker_pos) { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + let _ = general_stream + .0 + .send(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } @@ -322,9 +339,13 @@ impl StateExt for State { (*ecs.read_resource::()).retrieve_entity_internal(uid.0); let positions = ecs.read_storage::(); if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) { - for (client, pos) in (&mut ecs.write_storage::(), &positions).join() { + for (general_stream, pos) in + (&mut ecs.write_storage::(), &positions).join() + { if is_within(comp::ChatMsg::REGION_DISTANCE, pos, speaker_pos) { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + let _ = general_stream + .0 + .send(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } @@ -334,35 +355,43 @@ impl StateExt for State { (*ecs.read_resource::()).retrieve_entity_internal(uid.0); let positions = ecs.read_storage::(); if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) { - for (client, pos) in (&mut ecs.write_storage::(), &positions).join() { + for (general_stream, pos) in + (&mut ecs.write_storage::(), &positions).join() + { if is_within(comp::ChatMsg::NPC_DISTANCE, pos, speaker_pos) { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + let _ = general_stream + .0 + .send(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } }, comp::ChatType::FactionMeta(s) | comp::ChatType::Faction(_, s) => { - for (client, faction) in ( - &mut ecs.write_storage::(), + for (general_stream, faction) in ( + &mut ecs.write_storage::(), &ecs.read_storage::(), ) .join() { if s == &faction.0 { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + let _ = general_stream + .0 + .send(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, comp::ChatType::GroupMeta(g) | comp::ChatType::Group(_, g) => { - for (client, group) in ( - &mut ecs.write_storage::(), + for (general_stream, group) in ( + &mut ecs.write_storage::(), &ecs.read_storage::(), ) .join() { if g == group { - client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone())); + let _ = general_stream + .0 + .send(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, @@ -371,23 +400,27 @@ impl StateExt for State { /// Sends the message to all connected clients fn notify_registered_clients(&self, msg: ServerGeneral) { - let msg: ServerMsg = msg.into(); - for client in (&mut self.ecs().write_storage::()) + for (general_stream, _) in ( + &mut self.ecs().write_storage::(), + &self.ecs().read_storage::(), + ) .join() - .filter(|c| c.registered) + .filter(|(_, c)| c.registered) { - client.send_msg(msg.clone()); + let _ = general_stream.0.send(msg.clone()); } } /// Sends the message to all clients playing in game fn notify_in_game_clients(&self, msg: ServerGeneral) { - let msg: ServerMsg = msg.into(); - for client in (&mut self.ecs().write_storage::()) + for (general_stream, _) in ( + &mut self.ecs().write_storage::(), + &self.ecs().read_storage::(), + ) .join() - .filter(|c| c.in_game.is_some()) + .filter(|(_, c)| c.in_game.is_some()) { - client.send_msg(msg.clone()); + let _ = general_stream.0.send(msg.clone()); } } @@ -397,7 +430,7 @@ impl StateExt for State { ) -> Result<(), specs::error::WrongGeneration> { // Remove entity from a group if they are in one { - let mut clients = self.ecs().write_storage::(); + let mut general_streams = self.ecs().write_storage::(); let uids = self.ecs().read_storage::(); let mut group_manager = self.ecs().write_resource::(); group_manager.entity_deleted( @@ -407,14 +440,14 @@ impl StateExt for State { &uids, &self.ecs().entities(), &mut |entity, group_change| { - clients + general_streams .get_mut(entity) - .and_then(|c| { + .and_then(|s| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, c)) + .map(|g| (g, s)) }) - .map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g))); + .map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g))); }, ); } diff --git a/server/src/sys/entity_sync.rs b/server/src/sys/entity_sync.rs index 0442a04e8e..465a77d728 100644 --- a/server/src/sys/entity_sync.rs +++ b/server/src/sys/entity_sync.rs @@ -3,7 +3,7 @@ use super::{ SysTimer, }; use crate::{ - client::{Client, RegionSubscription}, + client::{Client, GeneralStream, InGameStream, RegionSubscription}, Tick, }; use common::{ @@ -43,6 +43,8 @@ impl<'a> System<'a> for Sys { WriteStorage<'a, Last>, WriteStorage<'a, Last>, WriteStorage<'a, Client>, + WriteStorage<'a, InGameStream>, + WriteStorage<'a, GeneralStream>, WriteStorage<'a, ForceUpdate>, WriteStorage<'a, InventoryUpdate>, Write<'a, DeletedEntities>, @@ -70,6 +72,8 @@ impl<'a> System<'a> for Sys { mut last_vel, mut last_ori, mut clients, + mut in_game_streams, + mut general_streams, mut force_updates, mut inventory_updates, mut deleted_entities, @@ -104,15 +108,31 @@ impl<'a> System<'a> for Sys { for (key, region) in region_map.iter() { // Assemble subscriber list for this region by iterating through clients and // checking if they are subscribed to this region - let mut subscribers = (&mut clients, &entities, &subscriptions, &positions) + let mut subscribers = ( + &mut clients, + &entities, + &subscriptions, + &positions, + &mut in_game_streams, + &mut general_streams, + ) .join() - .filter_map(|(client, entity, subscription, pos)| { - if client.in_game.is_some() && subscription.regions.contains(&key) { - Some((client, &subscription.regions, entity, *pos)) - } else { - None - } - }) + .filter_map( + |(client, entity, subscription, pos, in_game_stream, general_stream)| { + if client.in_game.is_some() && subscription.regions.contains(&key) { + Some(( + client, + &subscription.regions, + entity, + *pos, + in_game_stream, + general_stream, + )) + } else { + None + } + }, + ) .collect::>(); for event in region.events() { @@ -135,7 +155,9 @@ impl<'a> System<'a> for Sys { vel.copied(), ori.copied(), )); - for (client, regions, client_entity, _) in &mut subscribers { + for (_, regions, client_entity, _, _, general_stream) in + &mut subscribers + { if maybe_key .as_ref() .map(|key| !regions.contains(key)) @@ -143,7 +165,7 @@ impl<'a> System<'a> for Sys { // Client doesn't need to know about itself && *client_entity != entity { - client.send_msg(create_msg.clone()); + let _ = general_stream.0.send(create_msg.clone()); } } } @@ -151,13 +173,13 @@ impl<'a> System<'a> for Sys { RegionEvent::Left(id, maybe_key) => { // Lookup UID for entity if let Some(&uid) = uids.get(entities.entity(*id)) { - for (client, regions, _, _) in &mut subscribers { + for (_, regions, _, _, _, general_stream) in &mut subscribers { if maybe_key .as_ref() .map(|key| !regions.contains(key)) .unwrap_or(true) { - client.send_msg(ServerGeneral::DeleteEntity(uid)); + let _ = general_stream.0.send(ServerGeneral::DeleteEntity(uid)); } } } @@ -176,17 +198,19 @@ impl<'a> System<'a> for Sys { ); let entity_sync_msg = ServerGeneral::EntitySync(entity_sync_package); let comp_sync_msg = ServerGeneral::CompSync(comp_sync_package); - subscribers.iter_mut().for_each(move |(client, _, _, _)| { - client.send_msg(entity_sync_msg.clone()); - client.send_msg(comp_sync_msg.clone()); - }); + subscribers + .iter_mut() + .for_each(move |(_, _, _, _, _, general_stream)| { + let _ = general_stream.0.send(entity_sync_msg.clone()); + let _ = general_stream.0.send(comp_sync_msg.clone()); + }); - let mut send_msg = |msg: ServerGeneral, - entity: EcsEntity, - pos: Pos, - force_update: Option<&ForceUpdate>, - throttle: bool| { - for (client, _, client_entity, client_pos) in &mut subscribers { + let mut send_general = |msg: ServerGeneral, + entity: EcsEntity, + pos: Pos, + force_update: Option<&ForceUpdate>, + throttle: bool| { + for (_, _, client_entity, client_pos, _, general_stream) in &mut subscribers { if if client_entity == &entity { // Don't send client physics updates about itself unless force update is set force_update.is_some() @@ -212,7 +236,7 @@ impl<'a> System<'a> for Sys { true // Closer than 100 blocks } } { - client.send_msg(msg.clone()); + let _ = general_stream.0.send(msg.clone()); } } }; @@ -286,7 +310,7 @@ impl<'a> System<'a> for Sys { comp_sync_package.comp_removed::(uid); } - send_msg( + send_general( ServerGeneral::CompSync(comp_sync_package), entity, pos, @@ -299,19 +323,20 @@ impl<'a> System<'a> for Sys { // Handle entity deletion in regions that don't exist in RegionMap // (theoretically none) for (region_key, deleted) in deleted_entities.take_remaining_deleted() { - for client in - (&mut clients, &subscriptions) - .join() - .filter_map(|(client, subscription)| { - if client.in_game.is_some() && subscription.regions.contains(®ion_key) { - Some(client) - } else { - None - } - }) + for general_stream in (&mut clients, &subscriptions, &mut general_streams) + .join() + .filter_map(|(client, subscription, general_stream)| { + if client.in_game.is_some() && subscription.regions.contains(®ion_key) { + Some(general_stream) + } else { + None + } + }) { for uid in &deleted { - client.send_msg(ServerGeneral::DeleteEntity(Uid(*uid))); + let _ = general_stream + .0 + .send(ServerGeneral::DeleteEntity(Uid(*uid))); } } } @@ -319,15 +344,19 @@ impl<'a> System<'a> for Sys { // TODO: Sync clients that don't have a position? // Sync inventories - for (client, inventory, update) in (&mut clients, &inventories, &inventory_updates).join() { - client.send_msg(ServerGeneral::InventoryUpdate( + for (inventory, update, in_game_stream) in + (&inventories, &inventory_updates, &mut in_game_streams).join() + { + let _ = in_game_stream.0.send(ServerGeneral::InventoryUpdate( inventory.clone(), update.event(), )); } // Sync outcomes - for (client, player, pos) in (&mut clients, &players, positions.maybe()).join() { + for (player, pos, in_game_stream) in + (&players, positions.maybe(), &mut in_game_streams).join() + { let is_near = |o_pos: Vec3| { pos.zip_with(player.view_distance, |pos, vd| { pos.0.xy().distance_squared(o_pos.xy()) @@ -341,7 +370,7 @@ impl<'a> System<'a> for Sys { .cloned() .collect::>(); if !outcomes.is_empty() { - client.send_msg(ServerGeneral::Outcomes(outcomes)); + let _ = in_game_stream.0.send(ServerGeneral::Outcomes(outcomes)); } } outcomes.clear(); @@ -354,8 +383,8 @@ impl<'a> System<'a> for Sys { // TODO: doesn't really belong in this system (rename system or create another // system?) let tof_msg = ServerGeneral::TimeOfDay(*time_of_day); - for client in (&mut clients).join() { - client.send_msg(tof_msg.clone()); + for general_stream in (&mut general_streams).join() { + let _ = general_stream.0.send(tof_msg.clone()); } timer.end(); diff --git a/server/src/sys/invite_timeout.rs b/server/src/sys/invite_timeout.rs index 42f6603fe8..87fa70b25c 100644 --- a/server/src/sys/invite_timeout.rs +++ b/server/src/sys/invite_timeout.rs @@ -1,5 +1,5 @@ use super::SysTimer; -use crate::client::Client; +use crate::client::InGameStream; use common::{ comp::group::{Invite, PendingInvites}, msg::{InviteAnswer, ServerGeneral}, @@ -16,14 +16,14 @@ impl<'a> System<'a> for Sys { Entities<'a>, WriteStorage<'a, Invite>, WriteStorage<'a, PendingInvites>, - WriteStorage<'a, Client>, + WriteStorage<'a, InGameStream>, ReadStorage<'a, Uid>, Write<'a, SysTimer>, ); fn run( &mut self, - (entities, mut invites, mut pending_invites, mut clients, uids, mut timer): Self::SystemData, + (entities, mut invites, mut pending_invites, mut in_game_streams, uids, mut timer): Self::SystemData, ) { span!(_guard, "run", "invite_timeout::Sys::run"); timer.start(); @@ -51,13 +51,14 @@ impl<'a> System<'a> for Sys { } // Inform inviter of timeout - if let (Some(client), Some(target)) = - (clients.get_mut(*inviter), uids.get(invitee).copied()) - { - client.send_msg(ServerGeneral::InviteComplete { + if let (Some(in_game_stream), Some(target)) = ( + in_game_streams.get_mut(*inviter), + uids.get(invitee).copied(), + ) { + let _ = in_game_stream.0.send(ServerGeneral::InviteComplete { target, answer: InviteAnswer::TimedOut, - }) + }); } Some(invitee) diff --git a/server/src/sys/message.rs b/server/src/sys/message.rs index 9a9dfc92d3..78183af705 100644 --- a/server/src/sys/message.rs +++ b/server/src/sys/message.rs @@ -2,7 +2,9 @@ use super::SysTimer; use crate::{ alias_validator::AliasValidator, character_creator, - client::Client, + client::{ + CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegisterStream, + }, login_provider::LoginProvider, metrics::{NetworkRequestMetrics, PlayerMetrics}, persistence::character_loader::CharacterLoader, @@ -41,6 +43,7 @@ impl Sys { new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, entity: specs::Entity, client: &mut Client, + general_stream: &mut GeneralStream, player_metrics: &ReadExpect<'_, PlayerMetrics>, uids: &ReadStorage<'_, Uid>, chat_modes: &ReadStorage<'_, ChatMode>, @@ -68,7 +71,9 @@ impl Sys { } }, ClientGeneral::Disconnect => { - client.send_msg(ServerGeneral::Disconnect(DisconnectReason::Requested)); + general_stream + .0 + .send(ServerGeneral::Disconnect(DisconnectReason::Requested))?; }, ClientGeneral::Terminate => { debug!(?entity, "Client send message to termitate session"); @@ -88,6 +93,7 @@ impl Sys { server_emitter: &mut common::event::Emitter<'_, ServerEvent>, entity: specs::Entity, client: &mut Client, + in_game_stream: &mut InGameStream, terrain: &ReadExpect<'_, TerrainGrid>, network_metrics: &ReadExpect<'_, NetworkRequestMetrics>, can_build: &ReadStorage<'_, CanBuild>, @@ -115,7 +121,7 @@ impl Sys { ClientGeneral::ExitInGame => { client.in_game = None; server_emitter.emit(ServerEvent::ExitIngame { entity }); - client.send_msg(ServerGeneral::ExitInGameSuccess); + in_game_stream.0.send(ServerGeneral::ExitInGameSuccess)?; }, ClientGeneral::SetViewDistance(view_distance) => { players.get_mut(entity).map(|player| { @@ -133,9 +139,9 @@ impl Sys { .map(|max| view_distance > max) .unwrap_or(false) { - client.send_msg(ServerGeneral::SetViewDistance( + in_game_stream.0.send(ServerGeneral::SetViewDistance( settings.max_view_distance.unwrap_or(0), - )); + ))?; } }, ClientGeneral::ControllerInputs(inputs) => { @@ -203,10 +209,10 @@ impl Sys { match terrain.get_key(key) { Some(chunk) => { network_metrics.chunks_served_from_memory.inc(); - client.send_msg(ServerGeneral::TerrainChunkUpdate { + in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate { key, chunk: Ok(Box::new(chunk.clone())), - }) + })? }, None => { network_metrics.chunks_generation_triggered.inc(); @@ -243,6 +249,7 @@ impl Sys { new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, entity: specs::Entity, client: &mut Client, + character_screen_stream: &mut CharacterScreenStream, character_loader: &ReadExpect<'_, CharacterLoader>, uids: &ReadStorage<'_, Uid>, players: &mut WriteStorage<'_, Player>, @@ -278,10 +285,11 @@ impl Sys { // Give the player a welcome message if !editable_settings.server_description.is_empty() { - client.send_msg( - ChatType::CommandInfo - .server_msg(String::from(&*editable_settings.server_description)), - ); + character_screen_stream + .0 + .send(ChatType::CommandInfo.server_msg(String::from( + &*editable_settings.server_description, + )))?; } if !client.login_msg_sent { @@ -295,9 +303,11 @@ impl Sys { } } } else { - client.send_msg(ServerGeneral::CharacterDataLoadError(String::from( - "Failed to fetch player entity", - ))) + character_screen_stream + .0 + .send(ServerGeneral::CharacterDataLoadError(String::from( + "Failed to fetch player entity", + )))? } } ClientGeneral::Character(_) => { @@ -313,7 +323,9 @@ impl Sys { ClientGeneral::CreateCharacter { alias, tool, body } => { if let Err(error) = alias_validator.validate(&alias) { debug!(?error, ?alias, "denied alias as it contained a banned word"); - client.send_msg(ServerGeneral::CharacterActionError(error.to_string())); + character_screen_stream + .0 + .send(ServerGeneral::CharacterActionError(error.to_string()))?; } else if let Some(player) = players.get(entity) { character_creator::create_character( entity, @@ -340,9 +352,13 @@ impl Sys { } #[allow(clippy::too_many_arguments)] - fn handle_ping_msg(client: &mut Client, msg: PingMsg) -> Result<(), crate::error::Error> { + fn handle_ping_msg( + client: &mut Client, + ping_stream: &mut PingStream, + msg: PingMsg, + ) -> Result<(), crate::error::Error> { match msg { - PingMsg::Ping => client.send_msg(PingMsg::Pong), + PingMsg::Ping => ping_stream.0.send(PingMsg::Pong)?, PingMsg::Pong => {}, } Ok(()) @@ -354,6 +370,7 @@ impl Sys { new_players: &mut Vec, entity: specs::Entity, client: &mut Client, + register_stream: &mut RegisterStream, player_metrics: &ReadExpect<'_, PlayerMetrics>, login_provider: &mut WriteExpect<'_, LoginProvider>, admins: &mut WriteStorage<'_, Admin>, @@ -368,9 +385,7 @@ impl Sys { &*editable_settings.banlist, ) { Err(err) => { - client - .register_stream - .send(ServerRegisterAnswer::Err(err))?; + register_stream.0.send(ServerRegisterAnswer::Err(err))?; return Ok(()); }, Ok((username, uuid)) => (username, uuid), @@ -382,8 +397,8 @@ impl Sys { if !player.is_valid() { // Invalid player - client - .register_stream + register_stream + .0 .send(ServerRegisterAnswer::Err(RegisterError::InvalidCharacter))?; return Ok(()); } @@ -401,12 +416,14 @@ impl Sys { // Tell the client its request was successful. client.registered = true; - client.register_stream.send(ServerRegisterAnswer::Ok(()))?; + register_stream.0.send(ServerRegisterAnswer::Ok(()))?; // Send initial player list - client.send_msg(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init( - player_list.clone(), - ))); + register_stream + .0 + .send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init( + player_list.clone(), + )))?; // Add to list to notify all clients of the new player new_players.push(entity); @@ -446,19 +463,8 @@ impl Sys { editable_settings: &ReadExpect<'_, EditableSettings>, alias_validator: &ReadExpect<'_, AliasValidator>, ) -> Result<(), crate::error::Error> { - let (mut b1, mut b2, mut b3, mut b4, mut b5) = ( - client.network_error, - client.network_error, - client.network_error, - client.network_error, - client.network_error, - ); loop { /* - waiting for 1 of the 5 streams to return a massage asynchronous. - If so, handle that msg type. This code will be refactored soon - */ - let q1 = Client::internal_recv(&mut b1, &mut client.general_stream); let q2 = Client::internal_recv(&mut b2, &mut client.in_game_stream); let q3 = Client::internal_recv(&mut b3, &mut client.character_screen_stream); @@ -540,7 +546,7 @@ impl Sys { editable_settings, msg?, )?; - } + }*/ } } } @@ -571,6 +577,11 @@ impl<'a> System<'a> for Sys { WriteStorage<'a, Ori>, WriteStorage<'a, Player>, WriteStorage<'a, Client>, + WriteStorage<'a, GeneralStream>, + //WriteStorage<'a, PingStream>, + //WriteStorage<'a, RegisterStream>, + //WriteStorage<'a, CharacterScreenStream>, + //WriteStorage<'a, InGameStream>, WriteStorage<'a, Controller>, Read<'a, Settings>, ReadExpect<'a, EditableSettings>, @@ -604,6 +615,11 @@ impl<'a> System<'a> for Sys { mut orientations, mut players, mut clients, + mut general_streams, + //mut ping_streams, + //mut register_streams, + //mut character_screen_streams, + //mut in_game_streams, mut controllers, settings, editable_settings, @@ -697,7 +713,8 @@ impl<'a> System<'a> for Sys { server_emitter.emit(ServerEvent::ClientDisconnect(entity)); } else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 * 0.5 { // Try pinging the client if the timeout is nearing. - client.send_msg(PingMsg::Ping); + //FIXME + //client.send_msg(PingMsg::Ping); } } @@ -713,7 +730,8 @@ impl<'a> System<'a> for Sys { character: None, // new players will be on character select. })); for client in (&mut clients).join().filter(|c| c.registered) { - client.send_msg(msg.clone()) + //FIXME + //client.send_msg(msg.clone()) } } } diff --git a/server/src/sys/subscription.rs b/server/src/sys/subscription.rs index 2eac60f2a6..5fd1611ae0 100644 --- a/server/src/sys/subscription.rs +++ b/server/src/sys/subscription.rs @@ -2,7 +2,7 @@ use super::{ sentinel::{DeletedEntities, TrackedComps}, SysTimer, }; -use crate::client::{self, Client, RegionSubscription}; +use crate::client::{self, Client, InGameStream, RegionSubscription}; use common::{ comp::{Ori, Player, Pos, Vel}, msg::ServerGeneral, @@ -32,7 +32,8 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Vel>, ReadStorage<'a, Ori>, ReadStorage<'a, Player>, - WriteStorage<'a, Client>, + ReadStorage<'a, Client>, + WriteStorage<'a, InGameStream>, WriteStorage<'a, RegionSubscription>, Write<'a, DeletedEntities>, TrackedComps<'a>, @@ -50,7 +51,8 @@ impl<'a> System<'a> for Sys { velocities, orientations, players, - mut clients, + clients, + mut in_game_streams, mut subscriptions, mut deleted_entities, tracked_comps, @@ -71,17 +73,18 @@ impl<'a> System<'a> for Sys { // 7. Determine list of regions that are in range and iterate through it // - check if in hashset (hash calc) if not add it let mut regions_to_remove = Vec::new(); - for (client, subscription, pos, vd, client_entity) in ( - &mut clients, + for (_, subscription, pos, vd, client_entity, in_game_stream) in ( + &clients, &mut subscriptions, &positions, &players, &entities, + &mut in_game_streams, ) .join() - .filter_map(|(client, s, pos, player, e)| { + .filter_map(|(client, s, pos, player, e, stream)| { if client.in_game.is_some() { - player.view_distance.map(|v| (client, s, pos, v, e)) + player.view_distance.map(|v| (client, s, pos, v, e, stream)) } else { None } @@ -153,7 +156,9 @@ impl<'a> System<'a> for Sys { .map(|key| subscription.regions.contains(key)) .unwrap_or(false) { - client.send_msg(ServerGeneral::DeleteEntity(uid)); + let _ = in_game_stream + .0 + .send(ServerGeneral::DeleteEntity(uid)); } } }, @@ -161,7 +166,7 @@ impl<'a> System<'a> for Sys { } // Tell client to delete entities in the region for (&uid, _) in (&uids, region.entities()).join() { - client.send_msg(ServerGeneral::DeleteEntity(uid)); + let _ = in_game_stream.0.send(ServerGeneral::DeleteEntity(uid)); } } // Send deleted entities since they won't be processed for this client in entity @@ -171,7 +176,9 @@ impl<'a> System<'a> for Sys { .iter() .flat_map(|v| v.iter()) { - client.send_msg(ServerGeneral::DeleteEntity(Uid(*uid))); + let _ = in_game_stream + .0 + .send(ServerGeneral::DeleteEntity(Uid(*uid))); } } @@ -196,7 +203,7 @@ impl<'a> System<'a> for Sys { { // Send message to create entity and tracked components and physics // components - client.send_msg(ServerGeneral::CreateEntity( + let _ = in_game_stream.0.send(ServerGeneral::CreateEntity( tracked_comps.create_entity_package( entity, Some(*pos), @@ -217,14 +224,14 @@ impl<'a> System<'a> for Sys { /// Initialize region subscription pub fn initialize_region_subscription(world: &World, entity: specs::Entity) { - if let (Some(client_pos), Some(client_vd), Some(client)) = ( + if let (Some(client_pos), Some(client_vd), Some(in_game_stream)) = ( world.read_storage::().get(entity), world .read_storage::() .get(entity) .map(|pl| pl.view_distance) .and_then(|v| v), - world.write_storage::().get_mut(entity), + world.write_storage::().get_mut(entity), ) { let fuzzy_chunk = (Vec2::::from(client_pos.0)) .map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32); @@ -249,7 +256,7 @@ pub fn initialize_region_subscription(world: &World, entity: specs::Entity) { .join() { // Send message to create entity and tracked components and physics components - client.send_msg(ServerGeneral::CreateEntity( + let _ = in_game_stream.0.send(ServerGeneral::CreateEntity( tracked_comps.create_entity_package( entity, Some(*pos), diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index eafa24b0ff..8a359e2423 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -1,5 +1,5 @@ use super::SysTimer; -use crate::{chunk_generator::ChunkGenerator, client::Client, Tick}; +use crate::{chunk_generator::ChunkGenerator, client::InGameStream, Tick}; use common::{ comp::{self, bird_medium, Alignment, Player, Pos}, event::{EventBus, ServerEvent}, @@ -34,7 +34,7 @@ impl<'a> System<'a> for Sys { Write<'a, TerrainChanges>, ReadStorage<'a, Pos>, ReadStorage<'a, Player>, - WriteStorage<'a, Client>, + WriteStorage<'a, InGameStream>, ); fn run( @@ -48,7 +48,7 @@ impl<'a> System<'a> for Sys { mut terrain_changes, positions, players, - mut clients, + mut in_game_streams, ): Self::SystemData, ) { span!(_guard, "run", "terrain::Sys::run"); @@ -62,8 +62,8 @@ impl<'a> System<'a> for Sys { let (chunk, supplement) = match res { Ok((chunk, supplement)) => (chunk, supplement), Err(Some(entity)) => { - if let Some(client) = clients.get_mut(entity) { - client.send_msg(ServerGeneral::TerrainChunkUpdate { + if let Some(in_game_stream) = in_game_streams.get_mut(entity) { + let _ = in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate { key, chunk: Err(()), }); @@ -75,10 +75,10 @@ impl<'a> System<'a> for Sys { }, }; // Send the chunk to all nearby players. - for (view_distance, pos, client) in (&players, &positions, &mut clients) + for (view_distance, pos, in_game_stream) in (&players, &positions, &mut in_game_streams) .join() - .filter_map(|(player, pos, client)| { - player.view_distance.map(|vd| (vd, pos, client)) + .filter_map(|(player, pos, in_game_stream)| { + player.view_distance.map(|vd| (vd, pos, in_game_stream)) }) { let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32)); @@ -90,7 +90,7 @@ impl<'a> System<'a> for Sys { .magnitude_squared(); if adjusted_dist_sqr <= view_distance.pow(2) { - client.send_msg(ServerGeneral::TerrainChunkUpdate { + let _ = in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate { key, chunk: Ok(Box::new(chunk.clone())), }); diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index a2e21f42a9..e547f4a8e3 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -1,5 +1,5 @@ use super::SysTimer; -use crate::client::Client; +use crate::client::InGameStream; use common::{ comp::{Player, Pos}, msg::ServerGeneral, @@ -20,25 +20,26 @@ impl<'a> System<'a> for Sys { Write<'a, SysTimer>, ReadStorage<'a, Pos>, ReadStorage<'a, Player>, - WriteStorage<'a, Client>, + WriteStorage<'a, InGameStream>, ); fn run( &mut self, - (terrain, terrain_changes, mut timer, positions, players, mut clients): Self::SystemData, + (terrain, terrain_changes, mut timer, positions, players, mut in_game_streams): Self::SystemData, ) { span!(_guard, "run", "terrain_sync::Sys::run"); timer.start(); // Sync changed chunks 'chunk: for chunk_key in &terrain_changes.modified_chunks { - for (player, pos, client) in (&players, &positions, &mut clients).join() { + for (player, pos, in_game_stream) in (&players, &positions, &mut in_game_streams).join() + { if player .view_distance .map(|vd| super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, vd)) .unwrap_or(false) { - client.send_msg(ServerGeneral::TerrainChunkUpdate { + let _ = in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate { key: *chunk_key, chunk: Ok(Box::new(match terrain.get_key(*chunk_key) { Some(chunk) => chunk.clone(), @@ -52,9 +53,9 @@ impl<'a> System<'a> for Sys { // TODO: Don't send all changed blocks to all clients // Sync changed blocks let msg = ServerGeneral::TerrainBlockUpdates(terrain_changes.modified_blocks.clone()); - for (player, client) in (&players, &mut clients).join() { + for (player, in_game_stream) in (&players, &mut in_game_streams).join() { if player.view_distance.is_some() { - client.send_msg(msg.clone()); + let _ = in_game_stream.0.send(msg.clone()); } } diff --git a/server/src/sys/waypoint.rs b/server/src/sys/waypoint.rs index bf8034baa0..3af23149b4 100644 --- a/server/src/sys/waypoint.rs +++ b/server/src/sys/waypoint.rs @@ -1,5 +1,5 @@ use super::SysTimer; -use crate::client::Client; +use crate::client::GeneralStream; use common::{ comp::{Player, Pos, Waypoint, WaypointArea}, msg::{Notification, ServerGeneral}, @@ -22,28 +22,38 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Player>, ReadStorage<'a, WaypointArea>, WriteStorage<'a, Waypoint>, - WriteStorage<'a, Client>, + WriteStorage<'a, GeneralStream>, Read<'a, Time>, Write<'a, SysTimer>, ); fn run( &mut self, - (entities, positions, players, waypoint_areas, mut waypoints, mut clients, time, mut timer): Self::SystemData, + ( + entities, + positions, + players, + waypoint_areas, + mut waypoints, + mut general_streams, + time, + mut timer, + ): Self::SystemData, ) { span!(_guard, "run", "waypoint::Sys::run"); timer.start(); - for (entity, player_pos, _, client) in - (&entities, &positions, &players, &mut clients).join() + for (entity, player_pos, _, general_stream) in + (&entities, &positions, &players, &mut general_streams).join() { for (waypoint_pos, waypoint_area) in (&positions, &waypoint_areas).join() { if player_pos.0.distance_squared(waypoint_pos.0) < waypoint_area.radius().powi(2) { if let Ok(wp_old) = waypoints.insert(entity, Waypoint::new(player_pos.0, *time)) { if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) { - client - .send_msg(ServerGeneral::Notification(Notification::WaypointSaved)); + let _ = general_stream + .0 + .send(ServerGeneral::Notification(Notification::WaypointSaved)); } } }