diff --git a/server/src/client.rs b/server/src/client.rs index a968194fad..8c4b2655de 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,7 +1,9 @@ -use common::msg::ClientType; -use network::Participant; +use common::msg::{ClientType, ServerGeneral, ServerMsg}; +use network::{Message, Participant, Stream, StreamError}; +use serde::{de::DeserializeOwned, Serialize}; use specs::Component; use specs_idvs::IdvStorage; +use std::sync::{atomic::AtomicBool, Mutex}; /// Client handles ALL network related information of everything that connects /// to the server Client DOES NOT handle game states @@ -12,11 +14,195 @@ use specs_idvs::IdvStorage; pub struct Client { pub client_type: ClientType, pub participant: Option, - pub last_ping: f64, - pub login_msg_sent: bool, - pub terminate_msg_recv: bool, + pub last_ping: Mutex, + pub login_msg_sent: AtomicBool, + pub terminate_msg_recv: AtomicBool, + + //TODO: improve network crate so that `send` is no longer `&mut self` and we can get rid of + // this Mutex. This Mutex is just to please the compiler as we do not get into contention + general_stream: Mutex, + ping_stream: Mutex, + register_stream: Mutex, + character_screen_stream: Mutex, + in_game_stream: Mutex, +} + +pub struct PreparedMsg { + stream_id: u8, + message: Message, } impl Component for Client { type Storage = IdvStorage; } + +impl Client { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + client_type: ClientType, + participant: Participant, + last_ping: f64, + general_stream: Stream, + ping_stream: Stream, + register_stream: Stream, + character_screen_stream: Stream, + in_game_stream: Stream, + ) -> Self { + Client { + client_type, + participant: Some(participant), + last_ping: Mutex::new(last_ping), + login_msg_sent: AtomicBool::new(false), + terminate_msg_recv: AtomicBool::new(false), + general_stream: Mutex::new(general_stream), + ping_stream: Mutex::new(ping_stream), + register_stream: Mutex::new(register_stream), + character_screen_stream: Mutex::new(character_screen_stream), + in_game_stream: Mutex::new(in_game_stream), + } + } + + pub(crate) fn send>(&self, msg: M) -> Result<(), StreamError> { + match msg.into() { + ServerMsg::Info(m) => self.register_stream.try_lock().unwrap().send(m), + ServerMsg::Init(m) => self.register_stream.try_lock().unwrap().send(m), + ServerMsg::RegisterAnswer(m) => self.register_stream.try_lock().unwrap().send(m), + ServerMsg::General(g) => { + match g { + //Character Screen related + ServerGeneral::CharacterDataLoadError(_) + | ServerGeneral::CharacterListUpdate(_) + | ServerGeneral::CharacterActionError(_) + | ServerGeneral::CharacterSuccess => { + self.character_screen_stream.try_lock().unwrap().send(g) + }, + //Ingame related + ServerGeneral::GroupUpdate(_) + | ServerGeneral::GroupInvite { .. } + | ServerGeneral::InvitePending(_) + | ServerGeneral::InviteComplete { .. } + | ServerGeneral::ExitInGameSuccess + | ServerGeneral::InventoryUpdate(_, _) + | ServerGeneral::TerrainChunkUpdate { .. } + | ServerGeneral::TerrainBlockUpdates(_) + | ServerGeneral::SetViewDistance(_) + | ServerGeneral::Outcomes(_) + | ServerGeneral::Knockback(_) => { + self.in_game_stream.try_lock().unwrap().send(g) + }, + // Always possible + ServerGeneral::PlayerListUpdate(_) + | ServerGeneral::ChatMsg(_) + | ServerGeneral::SetPlayerEntity(_) + | ServerGeneral::TimeOfDay(_) + | ServerGeneral::EntitySync(_) + | ServerGeneral::CompSync(_) + | ServerGeneral::CreateEntity(_) + | ServerGeneral::DeleteEntity(_) + | ServerGeneral::Disconnect(_) + | ServerGeneral::Notification(_) => { + self.general_stream.try_lock().unwrap().send(g) + }, + } + }, + ServerMsg::Ping(m) => self.ping_stream.try_lock().unwrap().send(m), + } + } + + pub(crate) fn send_fallible>(&self, msg: M) { let _ = self.send(msg); } + + pub(crate) fn send_prepared(&self, msg: &PreparedMsg) -> Result<(), StreamError> { + match msg.stream_id { + 0 => self + .register_stream + .try_lock() + .unwrap() + .send_raw(&msg.message), + 1 => self + .character_screen_stream + .try_lock() + .unwrap() + .send_raw(&msg.message), + 2 => self + .in_game_stream + .try_lock() + .unwrap() + .send_raw(&msg.message), + 3 => self + .general_stream + .try_lock() + .unwrap() + .send_raw(&msg.message), + 4 => self.ping_stream.try_lock().unwrap().send_raw(&msg.message), + _ => unreachable!("invalid stream id"), + } + } + + pub(crate) fn prepare>(&self, msg: M) -> PreparedMsg { + match msg.into() { + ServerMsg::Info(m) => PreparedMsg::new(0, &m, &self.register_stream), + ServerMsg::Init(m) => PreparedMsg::new(0, &m, &self.register_stream), + ServerMsg::RegisterAnswer(m) => PreparedMsg::new(0, &m, &self.register_stream), + ServerMsg::General(g) => { + match g { + //Character Screen related + ServerGeneral::CharacterDataLoadError(_) + | ServerGeneral::CharacterListUpdate(_) + | ServerGeneral::CharacterActionError(_) + | ServerGeneral::CharacterSuccess => { + PreparedMsg::new(1, &g, &self.character_screen_stream) + }, + //Ingame related + ServerGeneral::GroupUpdate(_) + | ServerGeneral::GroupInvite { .. } + | ServerGeneral::InvitePending(_) + | ServerGeneral::InviteComplete { .. } + | ServerGeneral::ExitInGameSuccess + | ServerGeneral::InventoryUpdate(_, _) + | ServerGeneral::TerrainChunkUpdate { .. } + | ServerGeneral::TerrainBlockUpdates(_) + | ServerGeneral::SetViewDistance(_) + | ServerGeneral::Outcomes(_) + | ServerGeneral::Knockback(_) => PreparedMsg::new(2, &g, &self.in_game_stream), + // Always possible + ServerGeneral::PlayerListUpdate(_) + | ServerGeneral::ChatMsg(_) + | ServerGeneral::SetPlayerEntity(_) + | ServerGeneral::TimeOfDay(_) + | ServerGeneral::EntitySync(_) + | ServerGeneral::CompSync(_) + | ServerGeneral::CreateEntity(_) + | ServerGeneral::DeleteEntity(_) + | ServerGeneral::Disconnect(_) + | ServerGeneral::Notification(_) => { + PreparedMsg::new(3, &g, &self.general_stream) + }, + } + }, + ServerMsg::Ping(m) => PreparedMsg::new(4, &m, &self.ping_stream), + } + } + + pub(crate) fn recv( + &self, + stream_id: u8, + ) -> Result, StreamError> { + match stream_id { + 0 => self.register_stream.try_lock().unwrap().try_recv(), + 1 => self.character_screen_stream.try_lock().unwrap().try_recv(), + 2 => self.in_game_stream.try_lock().unwrap().try_recv(), + 3 => self.general_stream.try_lock().unwrap().try_recv(), + 4 => self.ping_stream.try_lock().unwrap().try_recv(), + _ => unreachable!("invalid stream id"), + } + } +} + +impl PreparedMsg { + fn new(id: u8, msg: &M, stream: &Mutex) -> PreparedMsg { + Self { + stream_id: id, + message: Message::serialize(&msg, &stream.try_lock().unwrap()), + } + } +} diff --git a/server/src/cmd.rs b/server/src/cmd.rs index edb1cc2fc0..a573c94878 100644 --- a/server/src/cmd.rs +++ b/server/src/cmd.rs @@ -26,10 +26,7 @@ use std::convert::TryFrom; use vek::*; use world::util::Sampler; -use crate::{ - login_provider::LoginProvider, - streams::{GetStream, InGameStream}, -}; +use crate::{client::Client, login_provider::LoginProvider}; use scan_fmt::{scan_fmt, scan_fmt_some}; use tracing::error; @@ -652,8 +649,7 @@ fn handle_spawn( // Add to group system if a pet if matches!(alignment, comp::Alignment::Owned { .. }) { let state = server.state(); - let mut in_game_streams = - state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let uids = state.ecs().read_storage::(); let mut group_manager = state.ecs().write_resource::(); @@ -665,15 +661,15 @@ fn handle_spawn( &state.ecs().read_storage(), &uids, &mut |entity, group_change| { - in_game_streams - .get_mut(entity) - .and_then(|s| { + clients + .get(entity) + .and_then(|c| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, s)) + .map(|g| (g, c)) }) - .map(|(g, s)| { - s.send_fallible(ServerGeneral::GroupUpdate(g)); + .map(|(g, c)| { + c.send_fallible(ServerGeneral::GroupUpdate(g)); }); }, ); diff --git a/server/src/connection_handler.rs b/server/src/connection_handler.rs index dfe64a971d..bbc04605cd 100644 --- a/server/src/connection_handler.rs +++ b/server/src/connection_handler.rs @@ -1,7 +1,4 @@ -use crate::{ - streams::{CharacterScreenStream, GeneralStream, InGameStream, PingStream, RegisterStream}, - Client, ClientType, ServerInfo, -}; +use crate::{Client, ClientType, ServerInfo}; use crossbeam::{bounded, unbounded, Receiver, Sender}; use futures_channel::oneshot; use futures_executor::block_on; @@ -16,14 +13,7 @@ pub(crate) struct ServerInfoPacket { pub time: f64, } -pub(crate) struct IncomingClient { - pub client: Client, - pub general: GeneralStream, - pub ping: PingStream, - pub register: RegisterStream, - pub character: CharacterScreenStream, - pub in_game: InGameStream, -} +pub(crate) type IncomingClient = Client; pub(crate) struct ConnectionHandler { _network: Arc, @@ -136,24 +126,18 @@ impl ConnectionHandler { Some(client_type) => client_type?, }; - let client = Client { + let client = Client::new( client_type, - participant: Some(participant), - last_ping: server_data.time, - login_msg_sent: false, - terminate_msg_recv: false, - }; + participant, + server_data.time, + general_stream, + ping_stream, + register_stream, + character_screen_stream, + in_game_stream, + ); - let package = IncomingClient { - client, - general: GeneralStream(general_stream), - ping: PingStream(ping_stream), - register: RegisterStream(register_stream), - character: CharacterScreenStream(character_screen_stream), - in_game: InGameStream(in_game_stream), - }; - - client_sender.send(package)?; + client_sender.send(client)?; Ok(()) } } diff --git a/server/src/events/entity_manipulation.rs b/server/src/events/entity_manipulation.rs index dbbd361f8a..48ae6b7563 100644 --- a/server/src/events/entity_manipulation.rs +++ b/server/src/events/entity_manipulation.rs @@ -1,7 +1,6 @@ use crate::{ client::Client, comp::{biped_large, quadruped_medium, quadruped_small}, - streams::{GetStream, InGameStream}, Server, SpawnPoint, StateExt, }; use common::{ @@ -43,9 +42,9 @@ pub fn handle_knockback(server: &Server, entity: EcsEntity, impulse: Vec3) if let Some(vel) = velocities.get_mut(entity) { vel.0 = impulse; } - let mut in_game_streams = state.ecs().write_storage::(); - if let Some(in_game_stream) = in_game_streams.get_mut(entity) { - in_game_stream.send_fallible(ServerGeneral::Knockback(impulse)); + let clients = state.ecs().read_storage::(); + if let Some(client) = clients.get(entity) { + client.send_fallible(ServerGeneral::Knockback(impulse)); } } diff --git a/server/src/events/group_manip.rs b/server/src/events/group_manip.rs index 2c750f86bb..65598019c9 100644 --- a/server/src/events/group_manip.rs +++ b/server/src/events/group_manip.rs @@ -1,7 +1,4 @@ -use crate::{ - streams::{GeneralStream, GetStream, InGameStream}, - Server, -}; +use crate::{client::Client, Server}; use common::{ comp::{ self, @@ -28,13 +25,13 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani match manip { GroupManip::Invite(uid) => { - let mut general_streams = state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let invitee = match state.ecs().entity_from_uid(uid.into()) { Some(t) => t, None => { // Inform of failure - if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_fallible( + if let Some(client) = clients.get(entity) { + client.send_fallible( ChatType::Meta .server_msg("Invite failed, target does not exist.".to_owned()), ); @@ -65,7 +62,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }); if already_in_same_group { // Inform of failure - if let Some(general_stream) = general_streams.get_mut(entity) { + if let Some(general_stream) = clients.get(entity) { general_stream.send_fallible(ChatType::Meta.server_msg( "Invite failed, can't invite someone already in your group".to_owned(), )); @@ -95,7 +92,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani >= max_group_size as usize; if group_size_limit_reached { // Inform inviter that they have reached the group size limit - if let Some(general_stream) = general_streams.get_mut(entity) { + if let Some(general_stream) = clients.get(entity) { general_stream.send_fallible( ChatType::Meta.server_msg( "Invite failed, pending invites plus current group size have reached \ @@ -112,8 +109,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani if invites.contains(invitee) { // Inform inviter that there is already an invite - if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_fallible( + if let Some(client) = clients.get(entity) { + client.send_fallible( ChatType::Meta .server_msg("This player already has a pending invite.".to_owned()), ); @@ -153,35 +150,32 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani } }; - let mut in_game_streams = state.ecs().write_storage::(); - // If client comp - if let (Some(in_game_stream), Some(inviter)) = - (in_game_streams.get_mut(invitee), uids.get(entity).copied()) + if let (Some(client), Some(inviter)) = (clients.get(invitee), uids.get(entity).copied()) { if send_invite() { - in_game_stream.send_fallible(ServerGeneral::GroupInvite { + client.send_fallible(ServerGeneral::GroupInvite { inviter, timeout: PRESENTED_INVITE_TIMEOUT_DUR, }); } } else if agents.contains(invitee) { send_invite(); - } else if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_fallible( + } else if let Some(client) = clients.get(entity) { + client.send_fallible( ChatType::Meta.server_msg("Can't invite, not a player or npc".to_owned()), ); } // Notify inviter that the invite is pending if invite_sent { - if let Some(in_game_stream) = in_game_streams.get_mut(entity) { - in_game_stream.send_fallible(ServerGeneral::InvitePending(uid)); + if let Some(client) = clients.get(entity) { + client.send_fallible(ServerGeneral::InvitePending(uid)); } } }, GroupManip::Accept => { - let mut in_game_streams = state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let uids = state.ecs().read_storage::(); let mut invites = state.ecs().write_storage::(); if let Some(inviter) = invites.remove(entity).and_then(|invite| { @@ -198,10 +192,10 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani Some(inviter) }) { - if let (Some(in_game_stream), Some(target)) = - (in_game_streams.get_mut(inviter), uids.get(entity).copied()) + if let (Some(client), Some(target)) = + (clients.get(inviter), uids.get(entity).copied()) { - in_game_stream.send_fallible(ServerGeneral::InviteComplete { + client.send_fallible(ServerGeneral::InviteComplete { target, answer: InviteAnswer::Accepted, }); @@ -215,20 +209,20 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &state.ecs().read_storage(), &uids, |entity, group_change| { - in_game_streams - .get_mut(entity) - .and_then(|s| { + clients + .get(entity) + .and_then(|c| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, s)) + .map(|g| (g, c)) }) - .map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g))); + .map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g))); }, ); } }, GroupManip::Decline => { - let mut in_game_streams = state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let uids = state.ecs().read_storage::(); let mut invites = state.ecs().write_storage::(); if let Some(inviter) = invites.remove(entity).and_then(|invite| { @@ -246,10 +240,10 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani Some(inviter) }) { // Inform inviter of rejection - if let (Some(in_game_stream), Some(target)) = - (in_game_streams.get_mut(inviter), uids.get(entity).copied()) + if let (Some(client), Some(target)) = + (clients.get(inviter), uids.get(entity).copied()) { - in_game_stream.send_fallible(ServerGeneral::InviteComplete { + client.send_fallible(ServerGeneral::InviteComplete { target, answer: InviteAnswer::Declined, }); @@ -257,7 +251,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani } }, GroupManip::Leave => { - let mut in_game_streams = state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let uids = state.ecs().read_storage::(); let mut group_manager = state.ecs().write_resource::(); group_manager.leave_group( @@ -267,19 +261,19 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &uids, &state.ecs().entities(), &mut |entity, group_change| { - in_game_streams - .get_mut(entity) - .and_then(|s| { + clients + .get(entity) + .and_then(|c| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, s)) + .map(|g| (g, c)) }) - .map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g))); + .map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g))); }, ); }, GroupManip::Kick(uid) => { - let mut general_streams = state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let uids = state.ecs().read_storage::(); let alignments = state.ecs().read_storage::(); @@ -287,8 +281,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani Some(t) => t, None => { // Inform of failure - if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_fallible( + if let Some(client) = clients.get(entity) { + client.send_fallible( ChatType::Meta .server_msg("Kick failed, target does not exist.".to_owned()), ); @@ -300,7 +294,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani // Can't kick pet if matches!(alignments.get(target), Some(comp::Alignment::Owned(owner)) if uids.get(target).map_or(true, |u| u != owner)) { - if let Some(general_stream) = general_streams.get_mut(entity) { + if let Some(general_stream) = clients.get(entity) { general_stream.send_fallible( ChatType::Meta.server_msg("Kick failed, you can't kick pets.".to_owned()), ); @@ -309,8 +303,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani } // Can't kick yourself if uids.get(entity).map_or(false, |u| *u == uid) { - if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_fallible( + if let Some(client) = clients.get(entity) { + client.send_fallible( ChatType::Meta .server_msg("Kick failed, you can't kick yourself.".to_owned()), ); @@ -320,7 +314,6 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani let mut groups = state.ecs().write_storage::(); let mut group_manager = state.ecs().write_resource::(); - let mut in_game_streams = state.ecs().write_storage::(); // Make sure kicker is the group leader match groups .get(target) @@ -335,42 +328,42 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &uids, &state.ecs().entities(), &mut |entity, group_change| { - in_game_streams - .get_mut(entity) - .and_then(|s| { + clients + .get(entity) + .and_then(|c| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, s)) + .map(|g| (g, c)) }) - .map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g))); + .map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g))); }, ); // Tell them the have been kicked - if let Some(general_stream) = general_streams.get_mut(target) { - general_stream.send_fallible( + if let Some(client) = clients.get(target) { + client.send_fallible( ChatType::Meta .server_msg("You were removed from the group.".to_owned()), ); } // Tell kicker that they were succesful - if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream + if let Some(client) = clients.get(entity) { + client .send_fallible(ChatType::Meta.server_msg("Player kicked.".to_owned())); } }, Some(_) => { // Inform kicker that they are not the leader - if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_fallible(ChatType::Meta.server_msg( + if let Some(client) = clients.get(entity) { + client.send_fallible(ChatType::Meta.server_msg( "Kick failed: You are not the leader of the target's group.".to_owned(), )); } }, None => { // Inform kicker that the target is not in a group - if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_fallible( + if let Some(client) = clients.get(entity) { + client.send_fallible( ChatType::Meta.server_msg( "Kick failed: Your target is not in a group.".to_owned(), ), @@ -380,14 +373,14 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani } }, GroupManip::AssignLeader(uid) => { - let mut general_streams = state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let uids = state.ecs().read_storage::(); let target = match state.ecs().entity_from_uid(uid.into()) { Some(t) => t, None => { // Inform of failure - if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_fallible(ChatType::Meta.server_msg( + if let Some(client) = clients.get(entity) { + client.send_fallible(ChatType::Meta.server_msg( "Leadership transfer failed, target does not exist".to_owned(), )); } @@ -396,7 +389,6 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }; let groups = state.ecs().read_storage::(); let mut group_manager = state.ecs().write_resource::(); - let mut in_game_streams = state.ecs().write_storage::(); // Make sure assigner is the group leader match groups .get(target) @@ -411,25 +403,25 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani &state.ecs().read_storage(), &uids, |entity, group_change| { - in_game_streams - .get_mut(entity) - .and_then(|s| { + clients + .get(entity) + .and_then(|c| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, s)) + .map(|g| (g, c)) }) - .map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g))); + .map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g))); }, ); // Tell them they are the leader - if let Some(general_stream) = general_streams.get_mut(target) { - general_stream.send_fallible( + if let Some(client) = clients.get(target) { + client.send_fallible( ChatType::Meta.server_msg("You are the group leader now.".to_owned()), ); } // Tell the old leader that the transfer was succesful - if let Some(general_stream) = general_streams.get_mut(target) { - general_stream.send_fallible( + if let Some(client) = clients.get(target) { + client.send_fallible( ChatType::Meta .server_msg("You are no longer the group leader.".to_owned()), ); @@ -437,9 +429,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }, Some(_) => { // Inform transferer that they are not the leader - let mut general_streams = state.ecs().write_storage::(); - if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_fallible( + if let Some(client) = clients.get(entity) { + client.send_fallible( ChatType::Meta.server_msg( "Transfer failed: You are not the leader of the target's group." .to_owned(), @@ -449,9 +440,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani }, None => { // Inform transferer that the target is not in a group - let mut general_streams = state.ecs().write_storage::(); - if let Some(general_stream) = general_streams.get_mut(entity) { - general_stream.send_fallible(ChatType::Meta.server_msg( + if let Some(client) = clients.get(entity) { + client.send_fallible(ChatType::Meta.server_msg( "Transfer failed: Your target is not in a group.".to_owned(), )); } diff --git a/server/src/events/interaction.rs b/server/src/events/interaction.rs index 9d868692d2..9a0cfcb32f 100644 --- a/server/src/events/interaction.rs +++ b/server/src/events/interaction.rs @@ -1,11 +1,4 @@ -use crate::{ - client::Client, - presence::RegionSubscription, - streams::{ - CharacterScreenStream, GeneralStream, GetStream, InGameStream, PingStream, RegisterStream, - }, - Server, -}; +use crate::{client::Client, presence::RegionSubscription, Server}; use common::{ comp::{self, item, Pos}, consts::MAX_MOUNT_RANGE, @@ -123,7 +116,6 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) { } let mut clients = ecs.write_storage::(); - let mut general_streams = ecs.write_storage::(); if clients.get_mut(possesse).is_some() { error!("can't possess other players"); @@ -131,23 +123,8 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) { } match (|| -> Option> { - let mut ping_streams = ecs.write_storage::(); - let mut register_streams = ecs.write_storage::(); - let mut character_screen_streams = ecs.write_storage::(); - let mut in_game_streams = ecs.write_storage::(); - let c = clients.remove(possessor)?; clients.insert(possesse, c).ok()?; - let s = general_streams.remove(possessor)?; - general_streams.insert(possesse, s).ok()?; - let s = ping_streams.remove(possessor)?; - ping_streams.insert(possesse, s).ok()?; - let s = register_streams.remove(possessor)?; - register_streams.insert(possesse, s).ok()?; - let s = character_screen_streams.remove(possessor)?; - character_screen_streams.insert(possesse, s).ok()?; - let s = in_game_streams.remove(possessor)?; - in_game_streams.insert(possesse, s).ok()?; //optional entities let mut players = ecs.write_storage::(); let mut subscriptions = ecs.write_storage::(); @@ -179,9 +156,9 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) { }, } - general_streams + clients .get_mut(possesse) - .map(|s| s.send_fallible(ServerGeneral::SetPlayerEntity(possesse_uid))); + .map(|c| c.send_fallible(ServerGeneral::SetPlayerEntity(possesse_uid))); // Put possess item into loadout let mut loadouts = ecs.write_storage::(); diff --git a/server/src/events/inventory_manip.rs b/server/src/events/inventory_manip.rs index 76dd82f638..a9b936da70 100644 --- a/server/src/events/inventory_manip.rs +++ b/server/src/events/inventory_manip.rs @@ -1,7 +1,4 @@ -use crate::{ - streams::{GetStream, InGameStream}, - Server, StateExt, -}; +use crate::{client::Client, Server, StateExt}; use common::{ comp::{ self, item, @@ -282,8 +279,7 @@ pub fn handle_inventory(server: &mut Server, entity: EcsEntity, manip: comp::Inv .insert(tameable_entity, comp::Alignment::Owned(uid)); // Add to group system - let mut in_game_streams = - state.ecs().write_storage::(); + let clients = state.ecs().read_storage::(); let uids = state.ecs().read_storage::(); let mut group_manager = state .ecs() @@ -297,15 +293,15 @@ pub fn handle_inventory(server: &mut Server, entity: EcsEntity, manip: comp::Inv &state.ecs().read_storage(), &uids, &mut |entity, group_change| { - in_game_streams - .get_mut(entity) - .and_then(|s| { + clients + .get(entity) + .and_then(|c| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, s)) + .map(|g| (g, c)) }) - .map(|(g, s)| { - s.send(ServerGeneral::GroupUpdate(g)) + .map(|(g, c)| { + c.send(ServerGeneral::GroupUpdate(g)) }); }, ); diff --git a/server/src/events/player.rs b/server/src/events/player.rs index 65297e3f60..e522fe28f0 100644 --- a/server/src/events/player.rs +++ b/server/src/events/player.rs @@ -1,14 +1,7 @@ use super::Event; use crate::{ - client::Client, - login_provider::LoginProvider, - persistence, - presence::Presence, - state_ext::StateExt, - streams::{ - CharacterScreenStream, GeneralStream, GetStream, InGameStream, PingStream, RegisterStream, - }, - Server, + client::Client, login_provider::LoginProvider, persistence, presence::Presence, + state_ext::StateExt, Server, }; use common::{ comp, @@ -37,42 +30,18 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) { .get(entity) .cloned(); - if let Some(( - client, - uid, - player, - general_stream, - ping_stream, - register_stream, - character_screen_stream, - mut in_game_stream, - )) = (|| { + if let Some((client, uid, player)) = (|| { let ecs = state.ecs(); Some(( ecs.write_storage::().remove(entity)?, ecs.write_storage::().remove(entity)?, ecs.write_storage::().remove(entity)?, - ecs.write_storage::().remove(entity)?, - ecs.write_storage::().remove(entity)?, - ecs.write_storage::().remove(entity)?, - ecs.write_storage::() - .remove(entity)?, - ecs.write_storage::().remove(entity)?, )) })() { // Tell client its request was successful - in_game_stream.send_fallible(ServerGeneral::ExitInGameSuccess); + client.send_fallible(ServerGeneral::ExitInGameSuccess); - let entity_builder = state - .ecs_mut() - .create_entity() - .with(client) - .with(player) - .with(general_stream) - .with(ping_stream) - .with(register_stream) - .with(character_screen_stream) - .with(in_game_stream); + let entity_builder = state.ecs_mut().create_entity().with(client).with(player); // Preserve group component if present let entity_builder = match maybe_group { diff --git a/server/src/lib.rs b/server/src/lib.rs index 256f12c0f0..8db6c0532f 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -20,7 +20,6 @@ pub mod persistence; pub mod presence; pub mod settings; pub mod state_ext; -pub mod streams; pub mod sys; #[cfg(not(feature = "worldgen"))] mod test_world; @@ -43,9 +42,6 @@ use crate::{ login_provider::LoginProvider, presence::{Presence, RegionSubscription}, state_ext::StateExt, - streams::{ - CharacterScreenStream, GeneralStream, GetStream, InGameStream, PingStream, RegisterStream, - }, sys::sentinel::{DeletedEntities, TrackedComps}, }; use common::{ @@ -193,11 +189,6 @@ impl Server { state.ecs_mut().register::(); state.ecs_mut().register::(); state.ecs_mut().register::(); - state.ecs_mut().register::(); - state.ecs_mut().register::(); - state.ecs_mut().register::(); - state.ecs_mut().register::(); - state.ecs_mut().register::(); //Alias validator let banned_words_paths = &settings.banned_words_files; @@ -825,16 +816,14 @@ impl Server { fn initialize_client( &mut self, - mut incoming: crate::connection_handler::IncomingClient, + client: crate::connection_handler::IncomingClient, ) -> Result, Error> { - let client = incoming.client; - if self.settings().max_players <= self.state.ecs().read_storage::().join().count() { trace!( ?client.participant, "to many players, wont allow participant to connect" ); - incoming.register.0.send(ServerInit::TooManyPlayers)?; + client.send(ServerInit::TooManyPlayers)?; return Ok(None); } @@ -843,11 +832,6 @@ impl Server { .ecs_mut() .create_entity_synced() .with(client) - .with(incoming.general) - .with(incoming.ping) - .with(incoming.register) - .with(incoming.character) - .with(incoming.in_game) .build(); self.state .ecs() @@ -859,10 +843,9 @@ impl Server { debug!("Starting initial sync with client."); self.state .ecs() - .write_storage::() - .get_mut(entity) + .read_storage::() + .get(entity) .unwrap() - .0 .send(ServerInit::GameSync { // Send client their entity entity_package: TrackedComps::fetch(&self.state.ecs()) @@ -905,73 +888,11 @@ impl Server { where S: Into, { - const ERR: &str = - "Don't do that. Sending these messages is only done ONCE at connect and not by this fn"; - match msg.into() { - ServerMsg::Info(_) => panic!(ERR), - ServerMsg::Init(_) => panic!(ERR), - ServerMsg::RegisterAnswer(msg) => { - self.state - .ecs() - .write_storage::() - .get_mut(entity) - .map(|s| s.send(msg)); - }, - ServerMsg::General(msg) => { - match &msg { - //Character Screen related - ServerGeneral::CharacterDataLoadError(_) - | ServerGeneral::CharacterListUpdate(_) - | ServerGeneral::CharacterActionError(_) - | ServerGeneral::CharacterSuccess => self - .state - .ecs() - .write_storage::() - .get_mut(entity) - .map(|s| s.send(msg)), - //Ingame related - ServerGeneral::GroupUpdate(_) - | ServerGeneral::GroupInvite { .. } - | ServerGeneral::InvitePending(_) - | ServerGeneral::InviteComplete { .. } - | ServerGeneral::ExitInGameSuccess - | ServerGeneral::InventoryUpdate(_, _) - | ServerGeneral::TerrainChunkUpdate { .. } - | ServerGeneral::TerrainBlockUpdates(_) - | ServerGeneral::SetViewDistance(_) - | ServerGeneral::Outcomes(_) - | ServerGeneral::Knockback(_) => self - .state - .ecs() - .write_storage::() - .get_mut(entity) - .map(|s| s.send(msg)), - // Always possible - ServerGeneral::PlayerListUpdate(_) - | ServerGeneral::ChatMsg(_) - | ServerGeneral::SetPlayerEntity(_) - | ServerGeneral::TimeOfDay(_) - | ServerGeneral::EntitySync(_) - | ServerGeneral::CompSync(_) - | ServerGeneral::CreateEntity(_) - | ServerGeneral::DeleteEntity(_) - | ServerGeneral::Disconnect(_) - | ServerGeneral::Notification(_) => self - .state - .ecs() - .write_storage::() - .get_mut(entity) - .map(|s| s.send(msg)), - }; - }, - ServerMsg::Ping(msg) => { - self.state - .ecs() - .write_storage::() - .get_mut(entity) - .map(|s| s.send(msg)); - }, - } + self.state + .ecs() + .read_storage::() + .get(entity) + .map(|c| c.send(msg)); } pub fn notify_players(&mut self, msg: ServerGeneral) { self.state.notify_players(msg); } diff --git a/server/src/state_ext.rs b/server/src/state_ext.rs index 71548b2dd8..46fe38fb12 100644 --- a/server/src/state_ext.rs +++ b/server/src/state_ext.rs @@ -1,9 +1,6 @@ use crate::{ - persistence::PersistedComponents, - presence::Presence, - streams::{CharacterScreenStream, GeneralStream, GetStream, InGameStream}, - sys::sentinel::DeletedEntities, - SpawnPoint, + client::Client, persistence::PersistedComponents, presence::Presence, + sys::sentinel::DeletedEntities, SpawnPoint, }; use common::{ character::CharacterId, @@ -219,12 +216,8 @@ impl StateExt for State { ); // Tell the client its request was successful. - if let Some(character_screen_stream) = self - .ecs() - .write_storage::() - .get_mut(entity) - { - character_screen_stream.send_fallible(ServerGeneral::CharacterSuccess); + if let Some(client) = self.ecs().read_storage::().get(entity) { + client.send_fallible(ServerGeneral::CharacterSuccess); } } @@ -280,26 +273,20 @@ impl StateExt for State { | comp::ChatType::Meta | comp::ChatType::World(_) => self.notify_players(ServerGeneral::ChatMsg(resolved_msg)), comp::ChatType::Online(u) => { - for (general_stream, uid) in ( - &mut ecs.write_storage::(), - &ecs.read_storage::(), - ) - .join() + for (client, uid) in + (&ecs.read_storage::(), &ecs.read_storage::()).join() { if uid != u { - general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); + client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, comp::ChatType::Tell(u, t) => { - for (general_stream, uid) in ( - &mut ecs.write_storage::(), - &ecs.read_storage::(), - ) - .join() + for (client, uid) in + (&ecs.read_storage::(), &ecs.read_storage::()).join() { if uid == u || uid == t { - general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); + client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, @@ -309,12 +296,9 @@ impl StateExt for State { (*ecs.read_resource::()).retrieve_entity_internal(uid.0); let positions = ecs.read_storage::(); if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) { - for (general_stream, pos) in - (&mut ecs.write_storage::(), &positions).join() - { + for (client, pos) in (&ecs.read_storage::(), &positions).join() { if is_within(comp::ChatMsg::SAY_DISTANCE, pos, speaker_pos) { - general_stream - .send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); + client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } @@ -324,12 +308,9 @@ impl StateExt for State { (*ecs.read_resource::()).retrieve_entity_internal(uid.0); let positions = ecs.read_storage::(); if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) { - for (general_stream, pos) in - (&mut ecs.write_storage::(), &positions).join() - { + for (client, pos) in (&ecs.read_storage::(), &positions).join() { if is_within(comp::ChatMsg::REGION_DISTANCE, pos, speaker_pos) { - general_stream - .send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); + client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } @@ -339,38 +320,35 @@ impl StateExt for State { (*ecs.read_resource::()).retrieve_entity_internal(uid.0); let positions = ecs.read_storage::(); if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) { - for (general_stream, pos) in - (&mut ecs.write_storage::(), &positions).join() - { + for (client, pos) in (&ecs.read_storage::(), &positions).join() { if is_within(comp::ChatMsg::NPC_DISTANCE, pos, speaker_pos) { - general_stream - .send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); + client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } } }, comp::ChatType::FactionMeta(s) | comp::ChatType::Faction(_, s) => { - for (general_stream, faction) in ( - &mut ecs.write_storage::(), + for (client, faction) in ( + &ecs.read_storage::(), &ecs.read_storage::(), ) .join() { if s == &faction.0 { - general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); + client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, comp::ChatType::GroupMeta(g) | comp::ChatType::Group(_, g) => { - for (general_stream, group) in ( - &mut ecs.write_storage::(), + for (client, group) in ( + &ecs.read_storage::(), &ecs.read_storage::(), ) .join() { if g == group { - general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); + client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone())); } } }, @@ -379,37 +357,35 @@ impl StateExt for State { /// Sends the message to all connected clients fn notify_players(&self, msg: ServerGeneral) { + let mut msg = Some(msg); let mut lazy_msg = None; - for (general_stream, _) in ( - &mut self.ecs().write_storage::(), + for (client, _) in ( + &self.ecs().read_storage::(), &self.ecs().read_storage::(), ) .join() { if lazy_msg.is_none() { - lazy_msg = Some(general_stream.prepare(&msg)); + lazy_msg = Some(client.prepare(msg.take().unwrap())); } - lazy_msg - .as_ref() - .map(|ref msg| general_stream.0.send_raw(&msg)); + lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); } } /// Sends the message to all clients playing in game fn notify_in_game_clients(&self, msg: ServerGeneral) { + let mut msg = Some(msg); let mut lazy_msg = None; - for (general_stream, _) in ( - &mut self.ecs().write_storage::(), + for (client, _) in ( + &mut self.ecs().write_storage::(), &self.ecs().read_storage::(), ) .join() { if lazy_msg.is_none() { - lazy_msg = Some(general_stream.prepare(&msg)); + lazy_msg = Some(client.prepare(msg.take().unwrap())); } - lazy_msg - .as_ref() - .map(|ref msg| general_stream.0.send_raw(&msg)); + lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); } } @@ -419,7 +395,7 @@ impl StateExt for State { ) -> Result<(), specs::error::WrongGeneration> { // Remove entity from a group if they are in one { - let mut in_game_streams = self.ecs().write_storage::(); + let clients = self.ecs().read_storage::(); let uids = self.ecs().read_storage::(); let mut group_manager = self.ecs().write_resource::(); group_manager.entity_deleted( @@ -429,14 +405,14 @@ impl StateExt for State { &uids, &self.ecs().entities(), &mut |entity, group_change| { - in_game_streams - .get_mut(entity) - .and_then(|s| { + clients + .get(entity) + .and_then(|c| { group_change .try_map(|e| uids.get(e).copied()) - .map(|g| (g, s)) + .map(|g| (g, c)) }) - .map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g))); + .map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g))); }, ); } diff --git a/server/src/streams.rs b/server/src/streams.rs deleted file mode 100644 index d1dfebfd86..0000000000 --- a/server/src/streams.rs +++ /dev/null @@ -1,126 +0,0 @@ -use common::msg::{ClientGeneral, ClientRegister, PingMsg, ServerGeneral, ServerRegisterAnswer}; - -use network::{Message, Stream, StreamError}; -use serde::{de::DeserializeOwned, Serialize}; - -use specs::Component; -use specs_idvs::IdvStorage; - -/// helped to reduce code duplication -pub(crate) trait GetStream { - type RecvMsg: DeserializeOwned; - type SendMsg: Serialize + core::fmt::Debug; - fn get_mut(&mut self) -> &mut Stream; - fn verify(msg: &Self::SendMsg) -> bool; - - fn send(&mut self, msg: Self::SendMsg) -> Result<(), StreamError> { - if Self::verify(&msg) { - self.get_mut().send(msg) - } else { - unreachable!("sending this msg isn't allowed! got: {:?}", msg) - } - } - - fn send_fallible(&mut self, msg: Self::SendMsg) { let _ = self.send(msg); } - - fn prepare(&mut self, msg: &Self::SendMsg) -> Message { - if Self::verify(&msg) { - Message::serialize(&msg, &self.get_mut()) - } else { - unreachable!("sending this msg isn't allowed! got: {:?}", msg) - } - } -} - -// Streams -// we ignore errors on send, and do unified error handling in recv -pub struct GeneralStream(pub(crate) Stream); -pub struct PingStream(pub(crate) Stream); -pub struct RegisterStream(pub(crate) Stream); -pub struct CharacterScreenStream(pub(crate) Stream); -pub struct InGameStream(pub(crate) Stream); - -impl Component for GeneralStream { - type Storage = IdvStorage; -} -impl Component for PingStream { - type Storage = IdvStorage; -} -impl Component for RegisterStream { - type Storage = IdvStorage; -} -impl Component for CharacterScreenStream { - type Storage = IdvStorage; -} -impl Component for InGameStream { - type Storage = IdvStorage; -} - -impl GetStream for GeneralStream { - type RecvMsg = ClientGeneral; - type SendMsg = ServerGeneral; - - fn get_mut(&mut self) -> &mut Stream { &mut self.0 } - - fn verify(msg: &Self::SendMsg) -> bool { - matches!(&msg, ServerGeneral::PlayerListUpdate(_) - | ServerGeneral::ChatMsg(_) - | ServerGeneral::SetPlayerEntity(_) - | ServerGeneral::TimeOfDay(_) - | ServerGeneral::EntitySync(_) - | ServerGeneral::CompSync(_) - | ServerGeneral::CreateEntity(_) - | ServerGeneral::DeleteEntity(_) - | ServerGeneral::Disconnect(_) - | ServerGeneral::Notification(_)) - } -} -impl GetStream for PingStream { - type RecvMsg = PingMsg; - type SendMsg = PingMsg; - - fn get_mut(&mut self) -> &mut Stream { &mut self.0 } - - fn verify(_: &Self::SendMsg) -> bool { true } -} -impl GetStream for RegisterStream { - type RecvMsg = ClientRegister; - type SendMsg = ServerRegisterAnswer; - - fn get_mut(&mut self) -> &mut Stream { &mut self.0 } - - fn verify(_: &Self::SendMsg) -> bool { true } -} -impl GetStream for CharacterScreenStream { - type RecvMsg = ClientGeneral; - type SendMsg = ServerGeneral; - - fn get_mut(&mut self) -> &mut Stream { &mut self.0 } - - fn verify(msg: &Self::SendMsg) -> bool { - matches!(&msg, ServerGeneral::CharacterDataLoadError(_) - | ServerGeneral::CharacterListUpdate(_) - | ServerGeneral::CharacterActionError(_) - | ServerGeneral::CharacterSuccess) - } -} -impl GetStream for InGameStream { - type RecvMsg = ClientGeneral; - type SendMsg = ServerGeneral; - - fn get_mut(&mut self) -> &mut Stream { &mut self.0 } - - fn verify(msg: &Self::SendMsg) -> bool { - matches!(&msg, ServerGeneral::GroupUpdate(_) - | ServerGeneral::GroupInvite { .. } - | ServerGeneral::InvitePending(_) - | ServerGeneral::InviteComplete { .. } - | ServerGeneral::ExitInGameSuccess - | ServerGeneral::InventoryUpdate(_, _) - | ServerGeneral::TerrainChunkUpdate { .. } - | ServerGeneral::TerrainBlockUpdates(_) - | ServerGeneral::SetViewDistance(_) - | ServerGeneral::Outcomes(_) - | ServerGeneral::Knockback(_)) - } -} diff --git a/server/src/sys/entity_sync.rs b/server/src/sys/entity_sync.rs index 811af3b723..fffa97871e 100644 --- a/server/src/sys/entity_sync.rs +++ b/server/src/sys/entity_sync.rs @@ -5,7 +5,6 @@ use super::{ use crate::{ client::Client, presence::{Presence, RegionSubscription}, - streams::{GeneralStream, GetStream, InGameStream}, Tick, }; use common::{ @@ -44,9 +43,7 @@ impl<'a> System<'a> for Sys { WriteStorage<'a, Last>, WriteStorage<'a, Last>, WriteStorage<'a, Last>, - WriteStorage<'a, Client>, - WriteStorage<'a, InGameStream>, - WriteStorage<'a, GeneralStream>, + ReadStorage<'a, Client>, WriteStorage<'a, ForceUpdate>, WriteStorage<'a, InventoryUpdate>, Write<'a, DeletedEntities>, @@ -73,9 +70,7 @@ impl<'a> System<'a> for Sys { mut last_pos, mut last_vel, mut last_ori, - mut clients, - mut in_game_streams, - mut general_streams, + clients, mut force_updates, mut inventory_updates, mut deleted_entities, @@ -111,39 +106,20 @@ impl<'a> System<'a> for Sys { // Assemble subscriber list for this region by iterating through clients and // checking if they are subscribed to this region let mut subscribers = ( - &mut clients, + &clients, &entities, presences.maybe(), &subscriptions, &positions, - &mut in_game_streams, - &mut general_streams, ) .join() - .filter_map( - |( - client, - entity, - presence, - subscription, - pos, - in_game_stream, - general_stream, - )| { - if presence.is_some() && subscription.regions.contains(&key) { - Some(( - client, - &subscription.regions, - entity, - *pos, - in_game_stream, - general_stream, - )) - } else { - None - } - }, - ) + .filter_map(|(client, entity, presence, subscription, pos)| { + if presence.is_some() && subscription.regions.contains(&key) { + Some((client, &subscription.regions, entity, *pos)) + } else { + None + } + }) .collect::>(); for event in region.events() { @@ -166,9 +142,7 @@ impl<'a> System<'a> for Sys { vel.copied(), ori.copied(), )); - for (_, regions, client_entity, _, _, general_stream) in - &mut subscribers - { + for (client, regions, client_entity, _) in &mut subscribers { if maybe_key .as_ref() .map(|key| !regions.contains(key)) @@ -176,7 +150,7 @@ impl<'a> System<'a> for Sys { // Client doesn't need to know about itself && *client_entity != entity { - general_stream.send_fallible(create_msg.clone()); + client.send_fallible(create_msg.clone()); } } } @@ -184,13 +158,13 @@ impl<'a> System<'a> for Sys { RegionEvent::Left(id, maybe_key) => { // Lookup UID for entity if let Some(&uid) = uids.get(entities.entity(*id)) { - for (_, regions, _, _, _, general_stream) in &mut subscribers { + for (client, regions, _, _) in &mut subscribers { if maybe_key .as_ref() .map(|key| !regions.contains(key)) .unwrap_or(true) { - general_stream.send_fallible(ServerGeneral::DeleteEntity(uid)); + client.send_fallible(ServerGeneral::DeleteEntity(uid)); } } } @@ -211,32 +185,29 @@ impl<'a> System<'a> for Sys { let mut comp_sync_package = Some(comp_sync_package); let mut entity_sync_lazymsg = None; let mut comp_sync_lazymsg = None; - subscribers - .iter_mut() - .for_each(move |(_, _, _, _, _, general_stream)| { - if entity_sync_lazymsg.is_none() { - entity_sync_lazymsg = Some(general_stream.prepare( - &ServerGeneral::EntitySync(entity_sync_package.take().unwrap()), - )); - comp_sync_lazymsg = - Some(general_stream.prepare(&ServerGeneral::CompSync( - comp_sync_package.take().unwrap(), - ))); - } - entity_sync_lazymsg - .as_ref() - .map(|msg| general_stream.0.send_raw(&msg)); - comp_sync_lazymsg - .as_ref() - .map(|msg| general_stream.0.send_raw(&msg)); - }); + subscribers.iter_mut().for_each(move |(client, _, _, _)| { + if entity_sync_lazymsg.is_none() { + entity_sync_lazymsg = Some(client.prepare(ServerGeneral::EntitySync( + entity_sync_package.take().unwrap(), + ))); + comp_sync_lazymsg = Some( + client.prepare(ServerGeneral::CompSync(comp_sync_package.take().unwrap())), + ); + } + entity_sync_lazymsg + .as_ref() + .map(|msg| client.send_prepared(&msg)); + comp_sync_lazymsg + .as_ref() + .map(|msg| client.send_prepared(&msg)); + }); let mut send_general = |msg: ServerGeneral, entity: EcsEntity, pos: Pos, force_update: Option<&ForceUpdate>, throttle: bool| { - for (_, _, client_entity, client_pos, _, general_stream) in &mut subscribers { + for (client, _, client_entity, client_pos) in &mut subscribers { if if client_entity == &entity { // Don't send client physics updates about itself unless force update is set force_update.is_some() @@ -262,7 +233,7 @@ impl<'a> System<'a> for Sys { true // Closer than 100 blocks } } { - general_stream.send_fallible(msg.clone()); + client.send_fallible(msg.clone()); } } }; @@ -349,18 +320,18 @@ impl<'a> System<'a> for Sys { // Handle entity deletion in regions that don't exist in RegionMap // (theoretically none) for (region_key, deleted) in deleted_entities.take_remaining_deleted() { - for general_stream in (presences.maybe(), &subscriptions, &mut general_streams) + for client in (presences.maybe(), &subscriptions, &clients) .join() - .filter_map(|(presence, subscription, general_stream)| { + .filter_map(|(presence, subscription, client)| { if presence.is_some() && subscription.regions.contains(®ion_key) { - Some(general_stream) + Some(client) } else { None } }) { for uid in &deleted { - general_stream.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid))); + client.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid))); } } } @@ -368,19 +339,15 @@ impl<'a> System<'a> for Sys { // TODO: Sync clients that don't have a position? // Sync inventories - for (inventory, update, in_game_stream) in - (&inventories, &inventory_updates, &mut in_game_streams).join() - { - in_game_stream.send_fallible(ServerGeneral::InventoryUpdate( + for (inventory, update, client) in (&inventories, &inventory_updates, &clients).join() { + client.send_fallible(ServerGeneral::InventoryUpdate( inventory.clone(), update.event(), )); } // Sync outcomes - for (presence, pos, in_game_stream) in - (presences.maybe(), positions.maybe(), &mut in_game_streams).join() - { + for (presence, pos, client) in (presences.maybe(), positions.maybe(), &clients).join() { let is_near = |o_pos: Vec3| { pos.zip_with(presence, |pos, presence| { pos.0.xy().distance_squared(o_pos.xy()) @@ -395,7 +362,7 @@ impl<'a> System<'a> for Sys { .cloned() .collect::>(); if !outcomes.is_empty() { - in_game_stream.send_fallible(ServerGeneral::Outcomes(outcomes)); + client.send_fallible(ServerGeneral::Outcomes(outcomes)); } } outcomes.clear(); @@ -408,13 +375,11 @@ impl<'a> System<'a> for Sys { // TODO: doesn't really belong in this system (rename system or create another // system?) let mut tof_lazymsg = None; - for general_stream in (&mut general_streams).join() { + for client in (&clients).join() { if tof_lazymsg.is_none() { - tof_lazymsg = Some(general_stream.prepare(&ServerGeneral::TimeOfDay(*time_of_day))); + tof_lazymsg = Some(client.prepare(ServerGeneral::TimeOfDay(*time_of_day))); } - tof_lazymsg - .as_ref() - .map(|msg| general_stream.0.send_raw(&msg)); + tof_lazymsg.as_ref().map(|msg| client.send_prepared(&msg)); } timer.end(); diff --git a/server/src/sys/invite_timeout.rs b/server/src/sys/invite_timeout.rs index ea913a1a1c..27819c317f 100644 --- a/server/src/sys/invite_timeout.rs +++ b/server/src/sys/invite_timeout.rs @@ -1,5 +1,5 @@ use super::SysTimer; -use crate::streams::{GetStream, InGameStream}; +use crate::client::Client; use common::{ comp::group::{Invite, PendingInvites}, msg::{InviteAnswer, ServerGeneral}, @@ -16,14 +16,14 @@ impl<'a> System<'a> for Sys { Entities<'a>, WriteStorage<'a, Invite>, WriteStorage<'a, PendingInvites>, - WriteStorage<'a, InGameStream>, + ReadStorage<'a, Client>, ReadStorage<'a, Uid>, Write<'a, SysTimer>, ); fn run( &mut self, - (entities, mut invites, mut pending_invites, mut in_game_streams, uids, mut timer): Self::SystemData, + (entities, mut invites, mut pending_invites, clients, uids, mut timer): Self::SystemData, ) { span!(_guard, "run", "invite_timeout::Sys::run"); timer.start(); @@ -51,11 +51,10 @@ impl<'a> System<'a> for Sys { } // Inform inviter of timeout - if let (Some(in_game_stream), Some(target)) = ( - in_game_streams.get_mut(*inviter), - uids.get(invitee).copied(), - ) { - in_game_stream.send_fallible(ServerGeneral::InviteComplete { + if let (Some(client), Some(target)) = + (clients.get(*inviter), uids.get(invitee).copied()) + { + client.send_fallible(ServerGeneral::InviteComplete { target, answer: InviteAnswer::TimedOut, }); diff --git a/server/src/sys/msg/character_screen.rs b/server/src/sys/msg/character_screen.rs index 2fb8b8a49c..6dae2482fb 100644 --- a/server/src/sys/msg/character_screen.rs +++ b/server/src/sys/msg/character_screen.rs @@ -1,22 +1,17 @@ use super::super::SysTimer; use crate::{ - alias_validator::AliasValidator, - character_creator, - client::Client, - persistence::character_loader::CharacterLoader, - presence::Presence, - streams::{CharacterScreenStream, GeneralStream, GetStream}, - EditableSettings, + alias_validator::AliasValidator, character_creator, client::Client, + persistence::character_loader::CharacterLoader, presence::Presence, EditableSettings, }; use common::{ comp::{ChatType, Player, UnresolvedChatMsg}, event::{EventBus, ServerEvent}, msg::{ClientGeneral, ServerGeneral}, span, - state::Time, sync::Uid, }; -use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage}; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write}; +use std::sync::atomic::Ordering; use tracing::{debug, warn}; impl Sys { @@ -25,9 +20,7 @@ impl Sys { server_emitter: &mut common::event::Emitter<'_, ServerEvent>, new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, entity: specs::Entity, - client: &mut Client, - character_screen_stream: &mut CharacterScreenStream, - general_stream: &mut GeneralStream, + client: &Client, character_loader: &ReadExpect<'_, CharacterLoader>, uids: &ReadStorage<'_, Uid>, players: &ReadStorage<'_, Player>, @@ -68,27 +61,27 @@ impl Sys { // Give the player a welcome message if !editable_settings.server_description.is_empty() { - general_stream.send(ChatType::CommandInfo.server_msg(String::from( + client.send(ChatType::CommandInfo.server_msg(String::from( &*editable_settings.server_description, )))?; } - if !client.login_msg_sent { + if !client.login_msg_sent.load(Ordering::Relaxed) { if let Some(player_uid) = uids.get(entity) { new_chat_msgs.push((None, UnresolvedChatMsg { chat_type: ChatType::Online(*player_uid), message: "".to_string(), })); - client.login_msg_sent = true; + client.login_msg_sent.store(true, Ordering::Relaxed); } } } } else { debug!("Client is not yet registered"); - character_screen_stream.send(ServerGeneral::CharacterDataLoadError( - String::from("Failed to fetch player entity"), - ))? + client.send(ServerGeneral::CharacterDataLoadError(String::from( + "Failed to fetch player entity", + )))? } }, ClientGeneral::RequestCharacterList => { @@ -99,8 +92,7 @@ impl Sys { ClientGeneral::CreateCharacter { alias, tool, body } => { if let Err(error) = alias_validator.validate(&alias) { debug!(?error, ?alias, "denied alias as it contained a banned word"); - character_screen_stream - .send(ServerGeneral::CharacterActionError(error.to_string()))?; + client.send(ServerGeneral::CharacterActionError(error.to_string()))?; } else if let Some(player) = players.get(entity) { character_creator::create_character( entity, @@ -134,15 +126,12 @@ impl<'a> System<'a> for Sys { type SystemData = ( Entities<'a>, Read<'a, EventBus>, - Read<'a, Time>, ReadExpect<'a, CharacterLoader>, Write<'a, SysTimer>, ReadStorage<'a, Uid>, - WriteStorage<'a, Client>, + ReadStorage<'a, Client>, ReadStorage<'a, Player>, ReadStorage<'a, Presence>, - WriteStorage<'a, CharacterScreenStream>, - WriteStorage<'a, GeneralStream>, ReadExpect<'a, EditableSettings>, ReadExpect<'a, AliasValidator>, ); @@ -152,15 +141,12 @@ impl<'a> System<'a> for Sys { ( entities, server_event_bus, - time, character_loader, mut timer, uids, - mut clients, + clients, players, presences, - mut character_screen_streams, - mut general_streams, editable_settings, alias_validator, ): Self::SystemData, @@ -171,37 +157,22 @@ impl<'a> System<'a> for Sys { let mut server_emitter = server_event_bus.emitter(); let mut new_chat_msgs = Vec::new(); - for (entity, client, character_screen_stream, general_stream) in ( - &entities, - &mut clients, - &mut character_screen_streams, - &mut general_streams, - ) - .join() - { - let res = - super::try_recv_all(character_screen_stream, |character_screen_stream, msg| { - Self::handle_client_character_screen_msg( - &mut server_emitter, - &mut new_chat_msgs, - entity, - client, - character_screen_stream, - general_stream, - &character_loader, - &uids, - &players, - &presences, - &editable_settings, - &alias_validator, - msg, - ) - }); - - if let Ok(1_u64..=u64::MAX) = res { - // Update client ping. - client.last_ping = time.0 - } + for (entity, client) in (&entities, &clients).join() { + let _ = super::try_recv_all(client, 1, |client, msg| { + Self::handle_client_character_screen_msg( + &mut server_emitter, + &mut new_chat_msgs, + entity, + client, + &character_loader, + &uids, + &players, + &presences, + &editable_settings, + &alias_validator, + msg, + ) + }); } // Handle new chat messages. diff --git a/server/src/sys/msg/general.rs b/server/src/sys/msg/general.rs index dc58478cdb..8051da65b5 100644 --- a/server/src/sys/msg/general.rs +++ b/server/src/sys/msg/general.rs @@ -1,5 +1,5 @@ use super::super::SysTimer; -use crate::{client::Client, metrics::PlayerMetrics, streams::GeneralStream}; +use crate::{client::Client, metrics::PlayerMetrics}; use common::{ comp::{ChatMode, Player, UnresolvedChatMsg}, event::{EventBus, ServerEvent}, @@ -8,7 +8,8 @@ use common::{ state::Time, sync::Uid, }; -use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage}; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write}; +use std::sync::atomic::Ordering; use tracing::{debug, error, warn}; impl Sys { @@ -17,7 +18,7 @@ impl Sys { server_emitter: &mut common::event::Emitter<'_, ServerEvent>, new_chat_msgs: &mut Vec<(Option, UnresolvedChatMsg)>, entity: specs::Entity, - client: &mut Client, + client: &Client, player: Option<&Player>, player_metrics: &ReadExpect<'_, PlayerMetrics>, uids: &ReadStorage<'_, Uid>, @@ -51,7 +52,7 @@ impl Sys { .clients_disconnected .with_label_values(&["gracefully"]) .inc(); - client.terminate_msg_recv = true; + client.terminate_msg_recv.store(true, Ordering::Relaxed); server_emitter.emit(ServerEvent::ClientDisconnect(entity)); }, _ => unreachable!("not a client_general msg"), @@ -73,8 +74,7 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Uid>, ReadStorage<'a, ChatMode>, ReadStorage<'a, Player>, - WriteStorage<'a, Client>, - WriteStorage<'a, GeneralStream>, + ReadStorage<'a, Client>, ); fn run( @@ -88,8 +88,7 @@ impl<'a> System<'a> for Sys { uids, chat_modes, players, - mut clients, - mut general_streams, + clients, ): Self::SystemData, ) { span!(_guard, "run", "msg::general::Sys::run"); @@ -98,15 +97,8 @@ impl<'a> System<'a> for Sys { let mut server_emitter = server_event_bus.emitter(); let mut new_chat_msgs = Vec::new(); - for (entity, client, player, general_stream) in ( - &entities, - &mut clients, - (&players).maybe(), - &mut general_streams, - ) - .join() - { - let res = super::try_recv_all(general_stream, |_, msg| { + for (entity, client, player) in (&entities, &clients, (&players).maybe()).join() { + let res = super::try_recv_all(client, 3, |client, msg| { Self::handle_general_msg( &mut server_emitter, &mut new_chat_msgs, @@ -122,7 +114,7 @@ impl<'a> System<'a> for Sys { if let Ok(1_u64..=u64::MAX) = res { // Update client ping. - client.last_ping = time.0 + *client.last_ping.lock().unwrap() = time.0 } } diff --git a/server/src/sys/msg/in_game.rs b/server/src/sys/msg/in_game.rs index 8d3ddd0248..ae49d6d8e2 100644 --- a/server/src/sys/msg/in_game.rs +++ b/server/src/sys/msg/in_game.rs @@ -1,17 +1,11 @@ use super::super::SysTimer; -use crate::{ - client::Client, - metrics::NetworkRequestMetrics, - presence::Presence, - streams::{GetStream, InGameStream}, - Settings, -}; +use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence, Settings}; use common::{ comp::{CanBuild, ControlEvent, Controller, ForceUpdate, Ori, Pos, Stats, Vel}, event::{EventBus, ServerEvent}, msg::{ClientGeneral, PresenceKind, ServerGeneral}, span, - state::{BlockChange, Time}, + state::BlockChange, terrain::{TerrainChunkSize, TerrainGrid}, vol::{ReadVol, RectVolSize}, }; @@ -23,9 +17,8 @@ impl Sys { fn handle_client_in_game_msg( server_emitter: &mut common::event::Emitter<'_, ServerEvent>, entity: specs::Entity, - _client: &Client, + client: &Client, maybe_presence: &mut Option<&mut Presence>, - in_game_stream: &mut InGameStream, terrain: &ReadExpect<'_, TerrainGrid>, network_metrics: &ReadExpect<'_, NetworkRequestMetrics>, can_build: &ReadStorage<'_, CanBuild>, @@ -54,7 +47,7 @@ impl Sys { // Go back to registered state (char selection screen) ClientGeneral::ExitInGame => { server_emitter.emit(ServerEvent::ExitIngame { entity }); - in_game_stream.send(ServerGeneral::ExitInGameSuccess)?; + client.send(ServerGeneral::ExitInGameSuccess)?; *maybe_presence = None; }, ClientGeneral::SetViewDistance(view_distance) => { @@ -69,7 +62,7 @@ impl Sys { .map(|max| view_distance > max) .unwrap_or(false) { - in_game_stream.send(ServerGeneral::SetViewDistance( + client.send(ServerGeneral::SetViewDistance( settings.max_view_distance.unwrap_or(0), ))?; } @@ -135,7 +128,7 @@ impl Sys { match terrain.get_key(key) { Some(chunk) => { network_metrics.chunks_served_from_memory.inc(); - in_game_stream.send(ServerGeneral::TerrainChunkUpdate { + client.send(ServerGeneral::TerrainChunkUpdate { key, chunk: Ok(Box::new(chunk.clone())), })? @@ -177,7 +170,6 @@ impl<'a> System<'a> for Sys { type SystemData = ( Entities<'a>, Read<'a, EventBus>, - Read<'a, Time>, ReadExpect<'a, TerrainGrid>, ReadExpect<'a, NetworkRequestMetrics>, Write<'a, SysTimer>, @@ -190,7 +182,6 @@ impl<'a> System<'a> for Sys { WriteStorage<'a, Ori>, WriteStorage<'a, Presence>, WriteStorage<'a, Client>, - WriteStorage<'a, InGameStream>, WriteStorage<'a, Controller>, Read<'a, Settings>, ); @@ -200,7 +191,6 @@ impl<'a> System<'a> for Sys { ( entities, server_event_bus, - time, terrain, network_metrics, mut timer, @@ -213,7 +203,6 @@ impl<'a> System<'a> for Sys { mut orientations, mut presences, mut clients, - mut in_game_streams, mut controllers, settings, ): Self::SystemData, @@ -223,21 +212,15 @@ impl<'a> System<'a> for Sys { let mut server_emitter = server_event_bus.emitter(); - for (entity, client, mut presence, in_game_stream) in ( - &entities, - &mut clients, - (&mut presences).maybe(), - &mut in_game_streams, - ) - .join() + for (entity, client, mut maybe_presence) in + (&entities, &mut clients, (&mut presences).maybe()).join() { - let res = super::try_recv_all(in_game_stream, |in_game_stream, msg| { + let _ = super::try_recv_all(client, 2, |client, msg| { Self::handle_client_in_game_msg( &mut server_emitter, entity, client, - &mut presence, - in_game_stream, + &mut maybe_presence, &terrain, &network_metrics, &can_build, @@ -252,11 +235,6 @@ impl<'a> System<'a> for Sys { msg, ) }); - - if let Ok(1_u64..=u64::MAX) = res { - // Update client ping. - client.last_ping = time.0 - } } timer.end() diff --git a/server/src/sys/msg/mod.rs b/server/src/sys/msg/mod.rs index 54aa60a9d4..0242d83794 100644 --- a/server/src/sys/msg/mod.rs +++ b/server/src/sys/msg/mod.rs @@ -4,26 +4,28 @@ pub mod in_game; pub mod ping; pub mod register; -use crate::streams::GetStream; +use crate::client::Client; +use serde::de::DeserializeOwned; /// handles all send msg and calls a handle fn /// Aborts when a error occurred returns cnt of successful msg otherwise -pub(in crate::sys::msg) fn try_recv_all( - stream: &mut T, +pub(in crate::sys::msg) fn try_recv_all( + client: &Client, + stream_id: u8, mut f: F, ) -> Result where - T: GetStream, - F: FnMut(&mut T, T::RecvMsg) -> Result<(), crate::error::Error>, + M: DeserializeOwned, + F: FnMut(&Client, M) -> Result<(), crate::error::Error>, { let mut cnt = 0u64; loop { - let msg = match stream.get_mut().try_recv() { + let msg = match client.recv(stream_id) { Ok(Some(msg)) => msg, Ok(None) => break Ok(cnt), Err(e) => break Err(e.into()), }; - if let Err(e) = f(stream, msg) { + if let Err(e) = f(client, msg) { break Err(e); } cnt += 1; diff --git a/server/src/sys/msg/ping.rs b/server/src/sys/msg/ping.rs index c2fc8ea915..aa476ac2ae 100644 --- a/server/src/sys/msg/ping.rs +++ b/server/src/sys/msg/ping.rs @@ -1,26 +1,19 @@ use super::super::SysTimer; -use crate::{ - client::Client, - metrics::PlayerMetrics, - streams::{GetStream, PingStream}, - Settings, -}; +use crate::{client::Client, metrics::PlayerMetrics, Settings}; use common::{ event::{EventBus, ServerEvent}, msg::PingMsg, span, state::Time, }; -use specs::{Entities, Join, Read, ReadExpect, System, Write, WriteStorage}; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write}; +use std::sync::atomic::Ordering; use tracing::{debug, info}; impl Sys { - fn handle_ping_msg( - ping_stream: &mut PingStream, - msg: PingMsg, - ) -> Result<(), crate::error::Error> { + fn handle_ping_msg(client: &Client, msg: PingMsg) -> Result<(), crate::error::Error> { match msg { - PingMsg::Ping => ping_stream.send(PingMsg::Pong)?, + PingMsg::Ping => client.send(PingMsg::Pong)?, PingMsg::Pong => {}, } Ok(()) @@ -37,8 +30,7 @@ impl<'a> System<'a> for Sys { Read<'a, Time>, ReadExpect<'a, PlayerMetrics>, Write<'a, SysTimer>, - WriteStorage<'a, Client>, - WriteStorage<'a, PingStream>, + ReadStorage<'a, Client>, Read<'a, Settings>, ); @@ -50,8 +42,7 @@ impl<'a> System<'a> for Sys { time, player_metrics, mut timer, - mut clients, - mut ping_streams, + clients, settings, ): Self::SystemData, ) { @@ -60,14 +51,12 @@ impl<'a> System<'a> for Sys { let mut server_emitter = server_event_bus.emitter(); - for (entity, client, ping_stream) in (&entities, &mut clients, &mut ping_streams).join() { - let res = super::try_recv_all(ping_stream, |ping_stream, msg| { - Self::handle_ping_msg(ping_stream, msg) - }); + for (entity, client) in (&entities, &clients).join() { + let res = super::try_recv_all(client, 4, Self::handle_ping_msg); match res { Err(e) => { - if !client.terminate_msg_recv { + if !client.terminate_msg_recv.load(Ordering::Relaxed) { debug!(?entity, ?e, "network error with client, disconnecting"); player_metrics .clients_disconnected @@ -78,13 +67,14 @@ impl<'a> System<'a> for Sys { }, Ok(1_u64..=u64::MAX) => { // Update client ping. - client.last_ping = time.0 + *client.last_ping.lock().unwrap() = time.0 }, Ok(0) => { - if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 + let last_ping: f64 = *client.last_ping.lock().unwrap(); + if time.0 - last_ping > settings.client_timeout.as_secs() as f64 // Timeout { - if !client.terminate_msg_recv { + if !client.terminate_msg_recv.load(Ordering::Relaxed) { info!(?entity, "timeout error with client, disconnecting"); player_metrics .clients_disconnected @@ -92,11 +82,9 @@ impl<'a> System<'a> for Sys { .inc(); server_emitter.emit(ServerEvent::ClientDisconnect(entity)); } - } else if time.0 - client.last_ping - > settings.client_timeout.as_secs() as f64 * 0.5 - { + } else if time.0 - last_ping > settings.client_timeout.as_secs() as f64 * 0.5 { // Try pinging the client if the timeout is nearing. - ping_stream.send_fallible(PingMsg::Ping); + client.send_fallible(PingMsg::Ping); } }, } diff --git a/server/src/sys/msg/register.rs b/server/src/sys/msg/register.rs index 219b3b81e9..d072ada6c8 100644 --- a/server/src/sys/msg/register.rs +++ b/server/src/sys/msg/register.rs @@ -1,10 +1,6 @@ use super::super::SysTimer; use crate::{ - client::Client, - login_provider::LoginProvider, - metrics::PlayerMetrics, - streams::{GeneralStream, GetStream, RegisterStream}, - EditableSettings, + client::Client, login_provider::LoginProvider, metrics::PlayerMetrics, EditableSettings, }; use common::{ comp::{Admin, Player, Stats}, @@ -13,13 +9,10 @@ use common::{ ServerRegisterAnswer, }, span, - state::Time, sync::Uid, }; use hashbrown::HashMap; -use specs::{ - Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage, -}; +use specs::{Entities, Join, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage}; impl Sys { #[allow(clippy::too_many_arguments)] @@ -27,9 +20,7 @@ impl Sys { player_list: &HashMap, new_players: &mut Vec, entity: specs::Entity, - _client: &mut Client, - register_stream: &mut RegisterStream, - general_stream: &mut GeneralStream, + client: &Client, player_metrics: &ReadExpect<'_, PlayerMetrics>, login_provider: &mut WriteExpect<'_, LoginProvider>, admins: &mut WriteStorage<'_, Admin>, @@ -44,7 +35,7 @@ impl Sys { &*editable_settings.banlist, ) { Err(err) => { - register_stream.send(ServerRegisterAnswer::Err(err))?; + client.send(ServerRegisterAnswer::Err(err))?; return Ok(()); }, Ok((username, uuid)) => (username, uuid), @@ -55,7 +46,7 @@ impl Sys { if !player.is_valid() { // Invalid player - register_stream.send(ServerRegisterAnswer::Err(RegisterError::InvalidCharacter))?; + client.send(ServerRegisterAnswer::Err(RegisterError::InvalidCharacter))?; return Ok(()); } @@ -71,10 +62,10 @@ impl Sys { } // Tell the client its request was successful. - register_stream.send(ServerRegisterAnswer::Ok(()))?; + client.send(ServerRegisterAnswer::Ok(()))?; // Send initial player list - general_stream.send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init( + client.send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init( player_list.clone(), )))?; @@ -92,17 +83,14 @@ impl<'a> System<'a> for Sys { #[allow(clippy::type_complexity)] type SystemData = ( Entities<'a>, - Read<'a, Time>, ReadExpect<'a, PlayerMetrics>, Write<'a, SysTimer>, ReadStorage<'a, Uid>, - WriteStorage<'a, Client>, + ReadStorage<'a, Client>, WriteStorage<'a, Player>, ReadStorage<'a, Stats>, WriteExpect<'a, LoginProvider>, WriteStorage<'a, Admin>, - WriteStorage<'a, RegisterStream>, - WriteStorage<'a, GeneralStream>, ReadExpect<'a, EditableSettings>, ); @@ -110,17 +98,14 @@ impl<'a> System<'a> for Sys { &mut self, ( entities, - time, player_metrics, mut timer, uids, - mut clients, + clients, mut players, stats, mut login_provider, mut admins, - mut register_streams, - mut general_streams, editable_settings, ): Self::SystemData, ) { @@ -145,22 +130,13 @@ impl<'a> System<'a> for Sys { // List of new players to update player lists of all clients. let mut new_players = Vec::new(); - for (entity, client, register_stream, general_stream) in ( - &entities, - &mut clients, - &mut register_streams, - &mut general_streams, - ) - .join() - { - let res = super::try_recv_all(register_stream, |register_stream, msg| { + for (entity, client) in (&entities, &clients).join() { + let _ = super::try_recv_all(client, 0, |client, msg| { Self::handle_register_msg( &player_list, &mut new_players, entity, client, - register_stream, - general_stream, &player_metrics, &mut login_provider, &mut admins, @@ -169,11 +145,6 @@ impl<'a> System<'a> for Sys { msg, ) }); - - if let Ok(1_u64..=u64::MAX) = res { - // Update client ping. - client.last_ping = time.0 - } } // Handle new players. @@ -181,9 +152,9 @@ impl<'a> System<'a> for Sys { for entity in new_players { if let (Some(uid), Some(player)) = (uids.get(entity), players.get(entity)) { let mut lazy_msg = None; - for (_, general_stream) in (&players, &mut general_streams).join() { + for (_, client) in (&players, &clients).join() { if lazy_msg.is_none() { - lazy_msg = Some(general_stream.prepare(&ServerGeneral::PlayerListUpdate( + lazy_msg = Some(client.prepare(ServerGeneral::PlayerListUpdate( PlayerListUpdate::Add(*uid, PlayerInfo { player_alias: player.alias.clone(), is_online: true, @@ -192,9 +163,7 @@ impl<'a> System<'a> for Sys { }), ))); } - lazy_msg - .as_ref() - .map(|ref msg| general_stream.0.send_raw(&msg)); + lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); } } } diff --git a/server/src/sys/subscription.rs b/server/src/sys/subscription.rs index 66d9992e03..b1fa4e4d71 100644 --- a/server/src/sys/subscription.rs +++ b/server/src/sys/subscription.rs @@ -5,7 +5,6 @@ use super::{ use crate::{ client::Client, presence::{self, Presence, RegionSubscription}, - streams::{GeneralStream, GetStream}, }; use common::{ comp::{Ori, Pos, Vel}, @@ -37,7 +36,6 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Ori>, ReadStorage<'a, Presence>, ReadStorage<'a, Client>, - WriteStorage<'a, GeneralStream>, WriteStorage<'a, RegionSubscription>, Write<'a, DeletedEntities>, TrackedComps<'a>, @@ -55,8 +53,7 @@ impl<'a> System<'a> for Sys { velocities, orientations, presences, - _clients, - mut general_streams, + clients, mut subscriptions, mut deleted_entities, tracked_comps, @@ -77,12 +74,12 @@ impl<'a> System<'a> for Sys { // 7. Determine list of regions that are in range and iterate through it // - check if in hashset (hash calc) if not add it let mut regions_to_remove = Vec::new(); - for (subscription, pos, presence, client_entity, general_stream) in ( + for (subscription, pos, presence, client_entity, client) in ( &mut subscriptions, &positions, &presences, &entities, - &mut general_streams, + &clients, ) .join() { @@ -155,8 +152,7 @@ impl<'a> System<'a> for Sys { .map(|key| subscription.regions.contains(key)) .unwrap_or(false) { - general_stream - .send_fallible(ServerGeneral::DeleteEntity(uid)); + client.send_fallible(ServerGeneral::DeleteEntity(uid)); } } }, @@ -164,7 +160,7 @@ impl<'a> System<'a> for Sys { } // Tell client to delete entities in the region for (&uid, _) in (&uids, region.entities()).join() { - let _ = general_stream.send(ServerGeneral::DeleteEntity(uid)); + client.send_fallible(ServerGeneral::DeleteEntity(uid)); } } // Send deleted entities since they won't be processed for this client in entity @@ -174,7 +170,7 @@ impl<'a> System<'a> for Sys { .iter() .flat_map(|v| v.iter()) { - general_stream.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid))); + client.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid))); } } @@ -199,7 +195,7 @@ impl<'a> System<'a> for Sys { { // Send message to create entity and tracked components and physics // components - general_stream.send_fallible(ServerGeneral::CreateEntity( + client.send_fallible(ServerGeneral::CreateEntity( tracked_comps.create_entity_package( entity, Some(*pos), @@ -220,10 +216,10 @@ impl<'a> System<'a> for Sys { /// Initialize region subscription pub fn initialize_region_subscription(world: &World, entity: specs::Entity) { - if let (Some(client_pos), Some(presence), Some(general_stream)) = ( + if let (Some(client_pos), Some(presence), Some(client)) = ( world.read_storage::().get(entity), world.read_storage::().get(entity), - world.write_storage::().get_mut(entity), + world.write_storage::().get(entity), ) { let fuzzy_chunk = (Vec2::::from(client_pos.0)) .map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32); @@ -248,7 +244,7 @@ pub fn initialize_region_subscription(world: &World, entity: specs::Entity) { .join() { // Send message to create entity and tracked components and physics components - general_stream.send_fallible(ServerGeneral::CreateEntity( + client.send_fallible(ServerGeneral::CreateEntity( tracked_comps.create_entity_package( entity, Some(*pos), diff --git a/server/src/sys/terrain.rs b/server/src/sys/terrain.rs index 5b169ab18a..b2da237a3c 100644 --- a/server/src/sys/terrain.rs +++ b/server/src/sys/terrain.rs @@ -1,10 +1,5 @@ use super::SysTimer; -use crate::{ - chunk_generator::ChunkGenerator, - presence::Presence, - streams::{GetStream, InGameStream}, - Tick, -}; +use crate::{chunk_generator::ChunkGenerator, client::Client, presence::Presence, Tick}; use common::{ comp::{self, bird_medium, Alignment, Pos}, event::{EventBus, ServerEvent}, @@ -17,7 +12,7 @@ use common::{ LoadoutBuilder, }; use rand::Rng; -use specs::{Join, Read, ReadStorage, System, Write, WriteExpect, WriteStorage}; +use specs::{Join, Read, ReadStorage, System, Write, WriteExpect}; use std::sync::Arc; use vek::*; @@ -39,7 +34,7 @@ impl<'a> System<'a> for Sys { Write<'a, TerrainChanges>, ReadStorage<'a, Pos>, ReadStorage<'a, Presence>, - WriteStorage<'a, InGameStream>, + ReadStorage<'a, Client>, ); fn run( @@ -53,7 +48,7 @@ impl<'a> System<'a> for Sys { mut terrain_changes, positions, presences, - mut in_game_streams, + clients, ): Self::SystemData, ) { span!(_guard, "run", "terrain::Sys::run"); @@ -67,8 +62,8 @@ impl<'a> System<'a> for Sys { let (chunk, supplement) = match res { Ok((chunk, supplement)) => (chunk, supplement), Err(Some(entity)) => { - if let Some(in_game_stream) = in_game_streams.get_mut(entity) { - in_game_stream.send_fallible(ServerGeneral::TerrainChunkUpdate { + if let Some(client) = clients.get(entity) { + client.send_fallible(ServerGeneral::TerrainChunkUpdate { key, chunk: Err(()), }); @@ -80,9 +75,7 @@ impl<'a> System<'a> for Sys { }, }; // Send the chunk to all nearby players. - for (presence, pos, in_game_stream) in - (&presences, &positions, &mut in_game_streams).join() - { + for (presence, pos, client) in (&presences, &positions, &clients).join() { let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32)); // Subtract 2 from the offset before computing squared magnitude // 1 since chunks need neighbors to be meshed @@ -92,7 +85,7 @@ impl<'a> System<'a> for Sys { .magnitude_squared(); if adjusted_dist_sqr <= presence.view_distance.pow(2) { - in_game_stream.send_fallible(ServerGeneral::TerrainChunkUpdate { + client.send_fallible(ServerGeneral::TerrainChunkUpdate { key, chunk: Ok(Box::new(chunk.clone())), }); diff --git a/server/src/sys/terrain_sync.rs b/server/src/sys/terrain_sync.rs index a04c55fd25..900248b6f1 100644 --- a/server/src/sys/terrain_sync.rs +++ b/server/src/sys/terrain_sync.rs @@ -1,10 +1,7 @@ use super::SysTimer; -use crate::{ - presence::Presence, - streams::{GetStream, InGameStream}, -}; +use crate::{client::Client, presence::Presence}; use common::{comp::Pos, msg::ServerGeneral, span, state::TerrainChanges, terrain::TerrainGrid}; -use specs::{Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage}; +use specs::{Join, Read, ReadExpect, ReadStorage, System, Write}; /// This systems sends new chunks to clients as well as changes to existing /// chunks @@ -17,12 +14,12 @@ impl<'a> System<'a> for Sys { Write<'a, SysTimer>, ReadStorage<'a, Pos>, ReadStorage<'a, Presence>, - WriteStorage<'a, InGameStream>, + ReadStorage<'a, Client>, ); fn run( &mut self, - (terrain, terrain_changes, mut timer, positions, presences, mut in_game_streams): Self::SystemData, + (terrain, terrain_changes, mut timer, positions, presences, clients): Self::SystemData, ) { span!(_guard, "run", "terrain_sync::Sys::run"); timer.start(); @@ -31,24 +28,19 @@ impl<'a> System<'a> for Sys { 'chunk: for chunk_key in &terrain_changes.modified_chunks { let mut lazy_msg = None; - for (presence, pos, in_game_stream) in - (&presences, &positions, &mut in_game_streams).join() - { + for (presence, pos, client) in (&presences, &positions, &clients).join() { if super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance) { if lazy_msg.is_none() { - lazy_msg = - Some(in_game_stream.prepare(&ServerGeneral::TerrainChunkUpdate { - key: *chunk_key, - chunk: Ok(Box::new(match terrain.get_key(*chunk_key) { - Some(chunk) => chunk.clone(), - None => break 'chunk, - })), - })); + lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate { + key: *chunk_key, + chunk: Ok(Box::new(match terrain.get_key(*chunk_key) { + Some(chunk) => chunk.clone(), + None => break 'chunk, + })), + })); } - lazy_msg - .as_ref() - .map(|ref msg| in_game_stream.0.send_raw(&msg)); + lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); } } } @@ -56,15 +48,13 @@ impl<'a> System<'a> for Sys { // TODO: Don't send all changed blocks to all clients // Sync changed blocks let mut lazy_msg = None; - for (_, in_game_stream) in (&presences, &mut in_game_streams).join() { + for (_, client) in (&presences, &clients).join() { if lazy_msg.is_none() { - lazy_msg = Some(in_game_stream.prepare(&ServerGeneral::TerrainBlockUpdates( + lazy_msg = Some(client.prepare(ServerGeneral::TerrainBlockUpdates( terrain_changes.modified_blocks.clone(), ))); } - lazy_msg - .as_ref() - .map(|ref msg| in_game_stream.0.send_raw(&msg)); + lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg)); } timer.end(); diff --git a/server/src/sys/waypoint.rs b/server/src/sys/waypoint.rs index 237391b964..89e16be7c5 100644 --- a/server/src/sys/waypoint.rs +++ b/server/src/sys/waypoint.rs @@ -1,5 +1,5 @@ use super::SysTimer; -use crate::streams::{GeneralStream, GetStream}; +use crate::client::Client; use common::{ comp::{Player, Pos, Waypoint, WaypointArea}, msg::{Notification, ServerGeneral}, @@ -22,7 +22,7 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Player>, ReadStorage<'a, WaypointArea>, WriteStorage<'a, Waypoint>, - WriteStorage<'a, GeneralStream>, + ReadStorage<'a, Client>, Read<'a, Time>, Write<'a, SysTimer>, ); @@ -35,7 +35,7 @@ impl<'a> System<'a> for Sys { players, waypoint_areas, mut waypoints, - mut general_streams, + clients, time, mut timer, ): Self::SystemData, @@ -43,15 +43,13 @@ impl<'a> System<'a> for Sys { span!(_guard, "run", "waypoint::Sys::run"); timer.start(); - for (entity, player_pos, _, general_stream) in - (&entities, &positions, &players, &mut general_streams).join() - { + for (entity, player_pos, _, client) in (&entities, &positions, &players, &clients).join() { for (waypoint_pos, waypoint_area) in (&positions, &waypoint_areas).join() { if player_pos.0.distance_squared(waypoint_pos.0) < waypoint_area.radius().powi(2) { if let Ok(wp_old) = waypoints.insert(entity, Waypoint::new(player_pos.0, *time)) { if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) { - general_stream.send_fallible(ServerGeneral::Notification( + client.send_fallible(ServerGeneral::Notification( Notification::WaypointSaved, )); }