Undo one Componenet per Stream and instead use Client.

In order to keep the performance we made it Internal Mutability and use a `Mutex` per Stream, till `Stream.send` is no longer `&mut self`.
The old solution didn't rely on this, but needed multiple Components instead which zest didn't liked
This commit is contained in:
Marcel Märtens 2020-11-02 19:30:56 +01:00
parent 00456c8373
commit 3d9c3e481e
23 changed files with 523 additions and 814 deletions

View File

@ -1,7 +1,9 @@
use common::msg::ClientType;
use network::Participant;
use common::msg::{ClientType, ServerGeneral, ServerMsg};
use network::{Message, Participant, Stream, StreamError};
use serde::{de::DeserializeOwned, Serialize};
use specs::Component;
use specs_idvs::IdvStorage;
use std::sync::{atomic::AtomicBool, Mutex};
/// Client handles ALL network related information of everything that connects
/// to the server Client DOES NOT handle game states
@ -12,11 +14,195 @@ use specs_idvs::IdvStorage;
pub struct Client {
pub client_type: ClientType,
pub participant: Option<Participant>,
pub last_ping: f64,
pub login_msg_sent: bool,
pub terminate_msg_recv: bool,
pub last_ping: Mutex<f64>,
pub login_msg_sent: AtomicBool,
pub terminate_msg_recv: AtomicBool,
//TODO: improve network crate so that `send` is no longer `&mut self` and we can get rid of
// this Mutex. This Mutex is just to please the compiler as we do not get into contention
general_stream: Mutex<Stream>,
ping_stream: Mutex<Stream>,
register_stream: Mutex<Stream>,
character_screen_stream: Mutex<Stream>,
in_game_stream: Mutex<Stream>,
}
pub struct PreparedMsg {
stream_id: u8,
message: Message,
}
impl Component for Client {
type Storage = IdvStorage<Self>;
}
impl Client {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
client_type: ClientType,
participant: Participant,
last_ping: f64,
general_stream: Stream,
ping_stream: Stream,
register_stream: Stream,
character_screen_stream: Stream,
in_game_stream: Stream,
) -> Self {
Client {
client_type,
participant: Some(participant),
last_ping: Mutex::new(last_ping),
login_msg_sent: AtomicBool::new(false),
terminate_msg_recv: AtomicBool::new(false),
general_stream: Mutex::new(general_stream),
ping_stream: Mutex::new(ping_stream),
register_stream: Mutex::new(register_stream),
character_screen_stream: Mutex::new(character_screen_stream),
in_game_stream: Mutex::new(in_game_stream),
}
}
pub(crate) fn send<M: Into<ServerMsg>>(&self, msg: M) -> Result<(), StreamError> {
match msg.into() {
ServerMsg::Info(m) => self.register_stream.try_lock().unwrap().send(m),
ServerMsg::Init(m) => self.register_stream.try_lock().unwrap().send(m),
ServerMsg::RegisterAnswer(m) => self.register_stream.try_lock().unwrap().send(m),
ServerMsg::General(g) => {
match g {
//Character Screen related
ServerGeneral::CharacterDataLoadError(_)
| ServerGeneral::CharacterListUpdate(_)
| ServerGeneral::CharacterActionError(_)
| ServerGeneral::CharacterSuccess => {
self.character_screen_stream.try_lock().unwrap().send(g)
},
//Ingame related
ServerGeneral::GroupUpdate(_)
| ServerGeneral::GroupInvite { .. }
| ServerGeneral::InvitePending(_)
| ServerGeneral::InviteComplete { .. }
| ServerGeneral::ExitInGameSuccess
| ServerGeneral::InventoryUpdate(_, _)
| ServerGeneral::TerrainChunkUpdate { .. }
| ServerGeneral::TerrainBlockUpdates(_)
| ServerGeneral::SetViewDistance(_)
| ServerGeneral::Outcomes(_)
| ServerGeneral::Knockback(_) => {
self.in_game_stream.try_lock().unwrap().send(g)
},
// Always possible
ServerGeneral::PlayerListUpdate(_)
| ServerGeneral::ChatMsg(_)
| ServerGeneral::SetPlayerEntity(_)
| ServerGeneral::TimeOfDay(_)
| ServerGeneral::EntitySync(_)
| ServerGeneral::CompSync(_)
| ServerGeneral::CreateEntity(_)
| ServerGeneral::DeleteEntity(_)
| ServerGeneral::Disconnect(_)
| ServerGeneral::Notification(_) => {
self.general_stream.try_lock().unwrap().send(g)
},
}
},
ServerMsg::Ping(m) => self.ping_stream.try_lock().unwrap().send(m),
}
}
pub(crate) fn send_fallible<M: Into<ServerMsg>>(&self, msg: M) { let _ = self.send(msg); }
pub(crate) fn send_prepared(&self, msg: &PreparedMsg) -> Result<(), StreamError> {
match msg.stream_id {
0 => self
.register_stream
.try_lock()
.unwrap()
.send_raw(&msg.message),
1 => self
.character_screen_stream
.try_lock()
.unwrap()
.send_raw(&msg.message),
2 => self
.in_game_stream
.try_lock()
.unwrap()
.send_raw(&msg.message),
3 => self
.general_stream
.try_lock()
.unwrap()
.send_raw(&msg.message),
4 => self.ping_stream.try_lock().unwrap().send_raw(&msg.message),
_ => unreachable!("invalid stream id"),
}
}
pub(crate) fn prepare<M: Into<ServerMsg>>(&self, msg: M) -> PreparedMsg {
match msg.into() {
ServerMsg::Info(m) => PreparedMsg::new(0, &m, &self.register_stream),
ServerMsg::Init(m) => PreparedMsg::new(0, &m, &self.register_stream),
ServerMsg::RegisterAnswer(m) => PreparedMsg::new(0, &m, &self.register_stream),
ServerMsg::General(g) => {
match g {
//Character Screen related
ServerGeneral::CharacterDataLoadError(_)
| ServerGeneral::CharacterListUpdate(_)
| ServerGeneral::CharacterActionError(_)
| ServerGeneral::CharacterSuccess => {
PreparedMsg::new(1, &g, &self.character_screen_stream)
},
//Ingame related
ServerGeneral::GroupUpdate(_)
| ServerGeneral::GroupInvite { .. }
| ServerGeneral::InvitePending(_)
| ServerGeneral::InviteComplete { .. }
| ServerGeneral::ExitInGameSuccess
| ServerGeneral::InventoryUpdate(_, _)
| ServerGeneral::TerrainChunkUpdate { .. }
| ServerGeneral::TerrainBlockUpdates(_)
| ServerGeneral::SetViewDistance(_)
| ServerGeneral::Outcomes(_)
| ServerGeneral::Knockback(_) => PreparedMsg::new(2, &g, &self.in_game_stream),
// Always possible
ServerGeneral::PlayerListUpdate(_)
| ServerGeneral::ChatMsg(_)
| ServerGeneral::SetPlayerEntity(_)
| ServerGeneral::TimeOfDay(_)
| ServerGeneral::EntitySync(_)
| ServerGeneral::CompSync(_)
| ServerGeneral::CreateEntity(_)
| ServerGeneral::DeleteEntity(_)
| ServerGeneral::Disconnect(_)
| ServerGeneral::Notification(_) => {
PreparedMsg::new(3, &g, &self.general_stream)
},
}
},
ServerMsg::Ping(m) => PreparedMsg::new(4, &m, &self.ping_stream),
}
}
pub(crate) fn recv<M: DeserializeOwned>(
&self,
stream_id: u8,
) -> Result<Option<M>, StreamError> {
match stream_id {
0 => self.register_stream.try_lock().unwrap().try_recv(),
1 => self.character_screen_stream.try_lock().unwrap().try_recv(),
2 => self.in_game_stream.try_lock().unwrap().try_recv(),
3 => self.general_stream.try_lock().unwrap().try_recv(),
4 => self.ping_stream.try_lock().unwrap().try_recv(),
_ => unreachable!("invalid stream id"),
}
}
}
impl PreparedMsg {
fn new<M: Serialize + ?Sized>(id: u8, msg: &M, stream: &Mutex<Stream>) -> PreparedMsg {
Self {
stream_id: id,
message: Message::serialize(&msg, &stream.try_lock().unwrap()),
}
}
}

View File

@ -26,10 +26,7 @@ use std::convert::TryFrom;
use vek::*;
use world::util::Sampler;
use crate::{
login_provider::LoginProvider,
streams::{GetStream, InGameStream},
};
use crate::{client::Client, login_provider::LoginProvider};
use scan_fmt::{scan_fmt, scan_fmt_some};
use tracing::error;
@ -652,8 +649,7 @@ fn handle_spawn(
// Add to group system if a pet
if matches!(alignment, comp::Alignment::Owned { .. }) {
let state = server.state();
let mut in_game_streams =
state.ecs().write_storage::<InGameStream>();
let clients = state.ecs().read_storage::<Client>();
let uids = state.ecs().read_storage::<Uid>();
let mut group_manager =
state.ecs().write_resource::<comp::group::GroupManager>();
@ -665,15 +661,15 @@ fn handle_spawn(
&state.ecs().read_storage(),
&uids,
&mut |entity, group_change| {
in_game_streams
.get_mut(entity)
.and_then(|s| {
clients
.get(entity)
.and_then(|c| {
group_change
.try_map(|e| uids.get(e).copied())
.map(|g| (g, s))
.map(|g| (g, c))
})
.map(|(g, s)| {
s.send_fallible(ServerGeneral::GroupUpdate(g));
.map(|(g, c)| {
c.send_fallible(ServerGeneral::GroupUpdate(g));
});
},
);

View File

@ -1,7 +1,4 @@
use crate::{
streams::{CharacterScreenStream, GeneralStream, InGameStream, PingStream, RegisterStream},
Client, ClientType, ServerInfo,
};
use crate::{Client, ClientType, ServerInfo};
use crossbeam::{bounded, unbounded, Receiver, Sender};
use futures_channel::oneshot;
use futures_executor::block_on;
@ -16,14 +13,7 @@ pub(crate) struct ServerInfoPacket {
pub time: f64,
}
pub(crate) struct IncomingClient {
pub client: Client,
pub general: GeneralStream,
pub ping: PingStream,
pub register: RegisterStream,
pub character: CharacterScreenStream,
pub in_game: InGameStream,
}
pub(crate) type IncomingClient = Client;
pub(crate) struct ConnectionHandler {
_network: Arc<Network>,
@ -136,24 +126,18 @@ impl ConnectionHandler {
Some(client_type) => client_type?,
};
let client = Client {
let client = Client::new(
client_type,
participant: Some(participant),
last_ping: server_data.time,
login_msg_sent: false,
terminate_msg_recv: false,
};
participant,
server_data.time,
general_stream,
ping_stream,
register_stream,
character_screen_stream,
in_game_stream,
);
let package = IncomingClient {
client,
general: GeneralStream(general_stream),
ping: PingStream(ping_stream),
register: RegisterStream(register_stream),
character: CharacterScreenStream(character_screen_stream),
in_game: InGameStream(in_game_stream),
};
client_sender.send(package)?;
client_sender.send(client)?;
Ok(())
}
}

View File

@ -1,7 +1,6 @@
use crate::{
client::Client,
comp::{biped_large, quadruped_medium, quadruped_small},
streams::{GetStream, InGameStream},
Server, SpawnPoint, StateExt,
};
use common::{
@ -43,9 +42,9 @@ pub fn handle_knockback(server: &Server, entity: EcsEntity, impulse: Vec3<f32>)
if let Some(vel) = velocities.get_mut(entity) {
vel.0 = impulse;
}
let mut in_game_streams = state.ecs().write_storage::<InGameStream>();
if let Some(in_game_stream) = in_game_streams.get_mut(entity) {
in_game_stream.send_fallible(ServerGeneral::Knockback(impulse));
let clients = state.ecs().read_storage::<Client>();
if let Some(client) = clients.get(entity) {
client.send_fallible(ServerGeneral::Knockback(impulse));
}
}

View File

@ -1,7 +1,4 @@
use crate::{
streams::{GeneralStream, GetStream, InGameStream},
Server,
};
use crate::{client::Client, Server};
use common::{
comp::{
self,
@ -28,13 +25,13 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
match manip {
GroupManip::Invite(uid) => {
let mut general_streams = state.ecs().write_storage::<GeneralStream>();
let clients = state.ecs().read_storage::<Client>();
let invitee = match state.ecs().entity_from_uid(uid.into()) {
Some(t) => t,
None => {
// Inform of failure
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_fallible(
if let Some(client) = clients.get(entity) {
client.send_fallible(
ChatType::Meta
.server_msg("Invite failed, target does not exist.".to_owned()),
);
@ -65,7 +62,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
});
if already_in_same_group {
// Inform of failure
if let Some(general_stream) = general_streams.get_mut(entity) {
if let Some(general_stream) = clients.get(entity) {
general_stream.send_fallible(ChatType::Meta.server_msg(
"Invite failed, can't invite someone already in your group".to_owned(),
));
@ -95,7 +92,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
>= max_group_size as usize;
if group_size_limit_reached {
// Inform inviter that they have reached the group size limit
if let Some(general_stream) = general_streams.get_mut(entity) {
if let Some(general_stream) = clients.get(entity) {
general_stream.send_fallible(
ChatType::Meta.server_msg(
"Invite failed, pending invites plus current group size have reached \
@ -112,8 +109,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
if invites.contains(invitee) {
// Inform inviter that there is already an invite
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_fallible(
if let Some(client) = clients.get(entity) {
client.send_fallible(
ChatType::Meta
.server_msg("This player already has a pending invite.".to_owned()),
);
@ -153,35 +150,32 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
}
};
let mut in_game_streams = state.ecs().write_storage::<InGameStream>();
// If client comp
if let (Some(in_game_stream), Some(inviter)) =
(in_game_streams.get_mut(invitee), uids.get(entity).copied())
if let (Some(client), Some(inviter)) = (clients.get(invitee), uids.get(entity).copied())
{
if send_invite() {
in_game_stream.send_fallible(ServerGeneral::GroupInvite {
client.send_fallible(ServerGeneral::GroupInvite {
inviter,
timeout: PRESENTED_INVITE_TIMEOUT_DUR,
});
}
} else if agents.contains(invitee) {
send_invite();
} else if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_fallible(
} else if let Some(client) = clients.get(entity) {
client.send_fallible(
ChatType::Meta.server_msg("Can't invite, not a player or npc".to_owned()),
);
}
// Notify inviter that the invite is pending
if invite_sent {
if let Some(in_game_stream) = in_game_streams.get_mut(entity) {
in_game_stream.send_fallible(ServerGeneral::InvitePending(uid));
if let Some(client) = clients.get(entity) {
client.send_fallible(ServerGeneral::InvitePending(uid));
}
}
},
GroupManip::Accept => {
let mut in_game_streams = state.ecs().write_storage::<InGameStream>();
let clients = state.ecs().read_storage::<Client>();
let uids = state.ecs().read_storage::<sync::Uid>();
let mut invites = state.ecs().write_storage::<Invite>();
if let Some(inviter) = invites.remove(entity).and_then(|invite| {
@ -198,10 +192,10 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
Some(inviter)
}) {
if let (Some(in_game_stream), Some(target)) =
(in_game_streams.get_mut(inviter), uids.get(entity).copied())
if let (Some(client), Some(target)) =
(clients.get(inviter), uids.get(entity).copied())
{
in_game_stream.send_fallible(ServerGeneral::InviteComplete {
client.send_fallible(ServerGeneral::InviteComplete {
target,
answer: InviteAnswer::Accepted,
});
@ -215,20 +209,20 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
&state.ecs().read_storage(),
&uids,
|entity, group_change| {
in_game_streams
.get_mut(entity)
.and_then(|s| {
clients
.get(entity)
.and_then(|c| {
group_change
.try_map(|e| uids.get(e).copied())
.map(|g| (g, s))
.map(|g| (g, c))
})
.map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g)));
.map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g)));
},
);
}
},
GroupManip::Decline => {
let mut in_game_streams = state.ecs().write_storage::<InGameStream>();
let clients = state.ecs().read_storage::<Client>();
let uids = state.ecs().read_storage::<sync::Uid>();
let mut invites = state.ecs().write_storage::<Invite>();
if let Some(inviter) = invites.remove(entity).and_then(|invite| {
@ -246,10 +240,10 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
Some(inviter)
}) {
// Inform inviter of rejection
if let (Some(in_game_stream), Some(target)) =
(in_game_streams.get_mut(inviter), uids.get(entity).copied())
if let (Some(client), Some(target)) =
(clients.get(inviter), uids.get(entity).copied())
{
in_game_stream.send_fallible(ServerGeneral::InviteComplete {
client.send_fallible(ServerGeneral::InviteComplete {
target,
answer: InviteAnswer::Declined,
});
@ -257,7 +251,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
}
},
GroupManip::Leave => {
let mut in_game_streams = state.ecs().write_storage::<InGameStream>();
let clients = state.ecs().read_storage::<Client>();
let uids = state.ecs().read_storage::<sync::Uid>();
let mut group_manager = state.ecs().write_resource::<GroupManager>();
group_manager.leave_group(
@ -267,19 +261,19 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
&uids,
&state.ecs().entities(),
&mut |entity, group_change| {
in_game_streams
.get_mut(entity)
.and_then(|s| {
clients
.get(entity)
.and_then(|c| {
group_change
.try_map(|e| uids.get(e).copied())
.map(|g| (g, s))
.map(|g| (g, c))
})
.map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g)));
.map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g)));
},
);
},
GroupManip::Kick(uid) => {
let mut general_streams = state.ecs().write_storage::<GeneralStream>();
let clients = state.ecs().read_storage::<Client>();
let uids = state.ecs().read_storage::<sync::Uid>();
let alignments = state.ecs().read_storage::<comp::Alignment>();
@ -287,8 +281,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
Some(t) => t,
None => {
// Inform of failure
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_fallible(
if let Some(client) = clients.get(entity) {
client.send_fallible(
ChatType::Meta
.server_msg("Kick failed, target does not exist.".to_owned()),
);
@ -300,7 +294,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
// Can't kick pet
if matches!(alignments.get(target), Some(comp::Alignment::Owned(owner)) if uids.get(target).map_or(true, |u| u != owner))
{
if let Some(general_stream) = general_streams.get_mut(entity) {
if let Some(general_stream) = clients.get(entity) {
general_stream.send_fallible(
ChatType::Meta.server_msg("Kick failed, you can't kick pets.".to_owned()),
);
@ -309,8 +303,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
}
// Can't kick yourself
if uids.get(entity).map_or(false, |u| *u == uid) {
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_fallible(
if let Some(client) = clients.get(entity) {
client.send_fallible(
ChatType::Meta
.server_msg("Kick failed, you can't kick yourself.".to_owned()),
);
@ -320,7 +314,6 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
let mut groups = state.ecs().write_storage::<group::Group>();
let mut group_manager = state.ecs().write_resource::<GroupManager>();
let mut in_game_streams = state.ecs().write_storage::<InGameStream>();
// Make sure kicker is the group leader
match groups
.get(target)
@ -335,42 +328,42 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
&uids,
&state.ecs().entities(),
&mut |entity, group_change| {
in_game_streams
.get_mut(entity)
.and_then(|s| {
clients
.get(entity)
.and_then(|c| {
group_change
.try_map(|e| uids.get(e).copied())
.map(|g| (g, s))
.map(|g| (g, c))
})
.map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g)));
.map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g)));
},
);
// Tell them the have been kicked
if let Some(general_stream) = general_streams.get_mut(target) {
general_stream.send_fallible(
if let Some(client) = clients.get(target) {
client.send_fallible(
ChatType::Meta
.server_msg("You were removed from the group.".to_owned()),
);
}
// Tell kicker that they were succesful
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream
if let Some(client) = clients.get(entity) {
client
.send_fallible(ChatType::Meta.server_msg("Player kicked.".to_owned()));
}
},
Some(_) => {
// Inform kicker that they are not the leader
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_fallible(ChatType::Meta.server_msg(
if let Some(client) = clients.get(entity) {
client.send_fallible(ChatType::Meta.server_msg(
"Kick failed: You are not the leader of the target's group.".to_owned(),
));
}
},
None => {
// Inform kicker that the target is not in a group
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_fallible(
if let Some(client) = clients.get(entity) {
client.send_fallible(
ChatType::Meta.server_msg(
"Kick failed: Your target is not in a group.".to_owned(),
),
@ -380,14 +373,14 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
}
},
GroupManip::AssignLeader(uid) => {
let mut general_streams = state.ecs().write_storage::<GeneralStream>();
let clients = state.ecs().read_storage::<Client>();
let uids = state.ecs().read_storage::<sync::Uid>();
let target = match state.ecs().entity_from_uid(uid.into()) {
Some(t) => t,
None => {
// Inform of failure
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_fallible(ChatType::Meta.server_msg(
if let Some(client) = clients.get(entity) {
client.send_fallible(ChatType::Meta.server_msg(
"Leadership transfer failed, target does not exist".to_owned(),
));
}
@ -396,7 +389,6 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
};
let groups = state.ecs().read_storage::<group::Group>();
let mut group_manager = state.ecs().write_resource::<GroupManager>();
let mut in_game_streams = state.ecs().write_storage::<InGameStream>();
// Make sure assigner is the group leader
match groups
.get(target)
@ -411,25 +403,25 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
&state.ecs().read_storage(),
&uids,
|entity, group_change| {
in_game_streams
.get_mut(entity)
.and_then(|s| {
clients
.get(entity)
.and_then(|c| {
group_change
.try_map(|e| uids.get(e).copied())
.map(|g| (g, s))
.map(|g| (g, c))
})
.map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g)));
.map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g)));
},
);
// Tell them they are the leader
if let Some(general_stream) = general_streams.get_mut(target) {
general_stream.send_fallible(
if let Some(client) = clients.get(target) {
client.send_fallible(
ChatType::Meta.server_msg("You are the group leader now.".to_owned()),
);
}
// Tell the old leader that the transfer was succesful
if let Some(general_stream) = general_streams.get_mut(target) {
general_stream.send_fallible(
if let Some(client) = clients.get(target) {
client.send_fallible(
ChatType::Meta
.server_msg("You are no longer the group leader.".to_owned()),
);
@ -437,9 +429,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
},
Some(_) => {
// Inform transferer that they are not the leader
let mut general_streams = state.ecs().write_storage::<GeneralStream>();
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_fallible(
if let Some(client) = clients.get(entity) {
client.send_fallible(
ChatType::Meta.server_msg(
"Transfer failed: You are not the leader of the target's group."
.to_owned(),
@ -449,9 +440,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
},
None => {
// Inform transferer that the target is not in a group
let mut general_streams = state.ecs().write_storage::<GeneralStream>();
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_fallible(ChatType::Meta.server_msg(
if let Some(client) = clients.get(entity) {
client.send_fallible(ChatType::Meta.server_msg(
"Transfer failed: Your target is not in a group.".to_owned(),
));
}

View File

@ -1,11 +1,4 @@
use crate::{
client::Client,
presence::RegionSubscription,
streams::{
CharacterScreenStream, GeneralStream, GetStream, InGameStream, PingStream, RegisterStream,
},
Server,
};
use crate::{client::Client, presence::RegionSubscription, Server};
use common::{
comp::{self, item, Pos},
consts::MAX_MOUNT_RANGE,
@ -123,7 +116,6 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) {
}
let mut clients = ecs.write_storage::<Client>();
let mut general_streams = ecs.write_storage::<GeneralStream>();
if clients.get_mut(possesse).is_some() {
error!("can't possess other players");
@ -131,23 +123,8 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) {
}
match (|| -> Option<Result<(), specs::error::Error>> {
let mut ping_streams = ecs.write_storage::<PingStream>();
let mut register_streams = ecs.write_storage::<RegisterStream>();
let mut character_screen_streams = ecs.write_storage::<CharacterScreenStream>();
let mut in_game_streams = ecs.write_storage::<InGameStream>();
let c = clients.remove(possessor)?;
clients.insert(possesse, c).ok()?;
let s = general_streams.remove(possessor)?;
general_streams.insert(possesse, s).ok()?;
let s = ping_streams.remove(possessor)?;
ping_streams.insert(possesse, s).ok()?;
let s = register_streams.remove(possessor)?;
register_streams.insert(possesse, s).ok()?;
let s = character_screen_streams.remove(possessor)?;
character_screen_streams.insert(possesse, s).ok()?;
let s = in_game_streams.remove(possessor)?;
in_game_streams.insert(possesse, s).ok()?;
//optional entities
let mut players = ecs.write_storage::<comp::Player>();
let mut subscriptions = ecs.write_storage::<RegionSubscription>();
@ -179,9 +156,9 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) {
},
}
general_streams
clients
.get_mut(possesse)
.map(|s| s.send_fallible(ServerGeneral::SetPlayerEntity(possesse_uid)));
.map(|c| c.send_fallible(ServerGeneral::SetPlayerEntity(possesse_uid)));
// Put possess item into loadout
let mut loadouts = ecs.write_storage::<comp::Loadout>();

View File

@ -1,7 +1,4 @@
use crate::{
streams::{GetStream, InGameStream},
Server, StateExt,
};
use crate::{client::Client, Server, StateExt};
use common::{
comp::{
self, item,
@ -282,8 +279,7 @@ pub fn handle_inventory(server: &mut Server, entity: EcsEntity, manip: comp::Inv
.insert(tameable_entity, comp::Alignment::Owned(uid));
// Add to group system
let mut in_game_streams =
state.ecs().write_storage::<InGameStream>();
let clients = state.ecs().read_storage::<Client>();
let uids = state.ecs().read_storage::<Uid>();
let mut group_manager = state
.ecs()
@ -297,15 +293,15 @@ pub fn handle_inventory(server: &mut Server, entity: EcsEntity, manip: comp::Inv
&state.ecs().read_storage(),
&uids,
&mut |entity, group_change| {
in_game_streams
.get_mut(entity)
.and_then(|s| {
clients
.get(entity)
.and_then(|c| {
group_change
.try_map(|e| uids.get(e).copied())
.map(|g| (g, s))
.map(|g| (g, c))
})
.map(|(g, s)| {
s.send(ServerGeneral::GroupUpdate(g))
.map(|(g, c)| {
c.send(ServerGeneral::GroupUpdate(g))
});
},
);

View File

@ -1,14 +1,7 @@
use super::Event;
use crate::{
client::Client,
login_provider::LoginProvider,
persistence,
presence::Presence,
state_ext::StateExt,
streams::{
CharacterScreenStream, GeneralStream, GetStream, InGameStream, PingStream, RegisterStream,
},
Server,
client::Client, login_provider::LoginProvider, persistence, presence::Presence,
state_ext::StateExt, Server,
};
use common::{
comp,
@ -37,42 +30,18 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) {
.get(entity)
.cloned();
if let Some((
client,
uid,
player,
general_stream,
ping_stream,
register_stream,
character_screen_stream,
mut in_game_stream,
)) = (|| {
if let Some((client, uid, player)) = (|| {
let ecs = state.ecs();
Some((
ecs.write_storage::<Client>().remove(entity)?,
ecs.write_storage::<Uid>().remove(entity)?,
ecs.write_storage::<comp::Player>().remove(entity)?,
ecs.write_storage::<GeneralStream>().remove(entity)?,
ecs.write_storage::<PingStream>().remove(entity)?,
ecs.write_storage::<RegisterStream>().remove(entity)?,
ecs.write_storage::<CharacterScreenStream>()
.remove(entity)?,
ecs.write_storage::<InGameStream>().remove(entity)?,
))
})() {
// Tell client its request was successful
in_game_stream.send_fallible(ServerGeneral::ExitInGameSuccess);
client.send_fallible(ServerGeneral::ExitInGameSuccess);
let entity_builder = state
.ecs_mut()
.create_entity()
.with(client)
.with(player)
.with(general_stream)
.with(ping_stream)
.with(register_stream)
.with(character_screen_stream)
.with(in_game_stream);
let entity_builder = state.ecs_mut().create_entity().with(client).with(player);
// Preserve group component if present
let entity_builder = match maybe_group {

View File

@ -20,7 +20,6 @@ pub mod persistence;
pub mod presence;
pub mod settings;
pub mod state_ext;
pub mod streams;
pub mod sys;
#[cfg(not(feature = "worldgen"))] mod test_world;
@ -43,9 +42,6 @@ use crate::{
login_provider::LoginProvider,
presence::{Presence, RegionSubscription},
state_ext::StateExt,
streams::{
CharacterScreenStream, GeneralStream, GetStream, InGameStream, PingStream, RegisterStream,
},
sys::sentinel::{DeletedEntities, TrackedComps},
};
use common::{
@ -193,11 +189,6 @@ impl Server {
state.ecs_mut().register::<RegionSubscription>();
state.ecs_mut().register::<Client>();
state.ecs_mut().register::<Presence>();
state.ecs_mut().register::<GeneralStream>();
state.ecs_mut().register::<PingStream>();
state.ecs_mut().register::<RegisterStream>();
state.ecs_mut().register::<CharacterScreenStream>();
state.ecs_mut().register::<InGameStream>();
//Alias validator
let banned_words_paths = &settings.banned_words_files;
@ -825,16 +816,14 @@ impl Server {
fn initialize_client(
&mut self,
mut incoming: crate::connection_handler::IncomingClient,
client: crate::connection_handler::IncomingClient,
) -> Result<Option<specs::Entity>, Error> {
let client = incoming.client;
if self.settings().max_players <= self.state.ecs().read_storage::<Client>().join().count() {
trace!(
?client.participant,
"to many players, wont allow participant to connect"
);
incoming.register.0.send(ServerInit::TooManyPlayers)?;
client.send(ServerInit::TooManyPlayers)?;
return Ok(None);
}
@ -843,11 +832,6 @@ impl Server {
.ecs_mut()
.create_entity_synced()
.with(client)
.with(incoming.general)
.with(incoming.ping)
.with(incoming.register)
.with(incoming.character)
.with(incoming.in_game)
.build();
self.state
.ecs()
@ -859,10 +843,9 @@ impl Server {
debug!("Starting initial sync with client.");
self.state
.ecs()
.write_storage::<RegisterStream>()
.get_mut(entity)
.read_storage::<Client>()
.get(entity)
.unwrap()
.0
.send(ServerInit::GameSync {
// Send client their entity
entity_package: TrackedComps::fetch(&self.state.ecs())
@ -905,73 +888,11 @@ impl Server {
where
S: Into<ServerMsg>,
{
const ERR: &str =
"Don't do that. Sending these messages is only done ONCE at connect and not by this fn";
match msg.into() {
ServerMsg::Info(_) => panic!(ERR),
ServerMsg::Init(_) => panic!(ERR),
ServerMsg::RegisterAnswer(msg) => {
self.state
.ecs()
.write_storage::<RegisterStream>()
.get_mut(entity)
.map(|s| s.send(msg));
},
ServerMsg::General(msg) => {
match &msg {
//Character Screen related
ServerGeneral::CharacterDataLoadError(_)
| ServerGeneral::CharacterListUpdate(_)
| ServerGeneral::CharacterActionError(_)
| ServerGeneral::CharacterSuccess => self
.state
.ecs()
.write_storage::<CharacterScreenStream>()
.get_mut(entity)
.map(|s| s.send(msg)),
//Ingame related
ServerGeneral::GroupUpdate(_)
| ServerGeneral::GroupInvite { .. }
| ServerGeneral::InvitePending(_)
| ServerGeneral::InviteComplete { .. }
| ServerGeneral::ExitInGameSuccess
| ServerGeneral::InventoryUpdate(_, _)
| ServerGeneral::TerrainChunkUpdate { .. }
| ServerGeneral::TerrainBlockUpdates(_)
| ServerGeneral::SetViewDistance(_)
| ServerGeneral::Outcomes(_)
| ServerGeneral::Knockback(_) => self
.state
.ecs()
.write_storage::<InGameStream>()
.get_mut(entity)
.map(|s| s.send(msg)),
// Always possible
ServerGeneral::PlayerListUpdate(_)
| ServerGeneral::ChatMsg(_)
| ServerGeneral::SetPlayerEntity(_)
| ServerGeneral::TimeOfDay(_)
| ServerGeneral::EntitySync(_)
| ServerGeneral::CompSync(_)
| ServerGeneral::CreateEntity(_)
| ServerGeneral::DeleteEntity(_)
| ServerGeneral::Disconnect(_)
| ServerGeneral::Notification(_) => self
.state
.ecs()
.write_storage::<GeneralStream>()
.get_mut(entity)
.map(|s| s.send(msg)),
};
},
ServerMsg::Ping(msg) => {
self.state
.ecs()
.write_storage::<PingStream>()
.get_mut(entity)
.map(|s| s.send(msg));
},
}
self.state
.ecs()
.read_storage::<Client>()
.get(entity)
.map(|c| c.send(msg));
}
pub fn notify_players(&mut self, msg: ServerGeneral) { self.state.notify_players(msg); }

View File

@ -1,9 +1,6 @@
use crate::{
persistence::PersistedComponents,
presence::Presence,
streams::{CharacterScreenStream, GeneralStream, GetStream, InGameStream},
sys::sentinel::DeletedEntities,
SpawnPoint,
client::Client, persistence::PersistedComponents, presence::Presence,
sys::sentinel::DeletedEntities, SpawnPoint,
};
use common::{
character::CharacterId,
@ -219,12 +216,8 @@ impl StateExt for State {
);
// Tell the client its request was successful.
if let Some(character_screen_stream) = self
.ecs()
.write_storage::<CharacterScreenStream>()
.get_mut(entity)
{
character_screen_stream.send_fallible(ServerGeneral::CharacterSuccess);
if let Some(client) = self.ecs().read_storage::<Client>().get(entity) {
client.send_fallible(ServerGeneral::CharacterSuccess);
}
}
@ -280,26 +273,20 @@ impl StateExt for State {
| comp::ChatType::Meta
| comp::ChatType::World(_) => self.notify_players(ServerGeneral::ChatMsg(resolved_msg)),
comp::ChatType::Online(u) => {
for (general_stream, uid) in (
&mut ecs.write_storage::<GeneralStream>(),
&ecs.read_storage::<Uid>(),
)
.join()
for (client, uid) in
(&ecs.read_storage::<Client>(), &ecs.read_storage::<Uid>()).join()
{
if uid != u {
general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
},
comp::ChatType::Tell(u, t) => {
for (general_stream, uid) in (
&mut ecs.write_storage::<GeneralStream>(),
&ecs.read_storage::<Uid>(),
)
.join()
for (client, uid) in
(&ecs.read_storage::<Client>(), &ecs.read_storage::<Uid>()).join()
{
if uid == u || uid == t {
general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
},
@ -309,12 +296,9 @@ impl StateExt for State {
(*ecs.read_resource::<UidAllocator>()).retrieve_entity_internal(uid.0);
let positions = ecs.read_storage::<comp::Pos>();
if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) {
for (general_stream, pos) in
(&mut ecs.write_storage::<GeneralStream>(), &positions).join()
{
for (client, pos) in (&ecs.read_storage::<Client>(), &positions).join() {
if is_within(comp::ChatMsg::SAY_DISTANCE, pos, speaker_pos) {
general_stream
.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
}
@ -324,12 +308,9 @@ impl StateExt for State {
(*ecs.read_resource::<UidAllocator>()).retrieve_entity_internal(uid.0);
let positions = ecs.read_storage::<comp::Pos>();
if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) {
for (general_stream, pos) in
(&mut ecs.write_storage::<GeneralStream>(), &positions).join()
{
for (client, pos) in (&ecs.read_storage::<Client>(), &positions).join() {
if is_within(comp::ChatMsg::REGION_DISTANCE, pos, speaker_pos) {
general_stream
.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
}
@ -339,38 +320,35 @@ impl StateExt for State {
(*ecs.read_resource::<UidAllocator>()).retrieve_entity_internal(uid.0);
let positions = ecs.read_storage::<comp::Pos>();
if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) {
for (general_stream, pos) in
(&mut ecs.write_storage::<GeneralStream>(), &positions).join()
{
for (client, pos) in (&ecs.read_storage::<Client>(), &positions).join() {
if is_within(comp::ChatMsg::NPC_DISTANCE, pos, speaker_pos) {
general_stream
.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
}
},
comp::ChatType::FactionMeta(s) | comp::ChatType::Faction(_, s) => {
for (general_stream, faction) in (
&mut ecs.write_storage::<GeneralStream>(),
for (client, faction) in (
&ecs.read_storage::<Client>(),
&ecs.read_storage::<comp::Faction>(),
)
.join()
{
if s == &faction.0 {
general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
},
comp::ChatType::GroupMeta(g) | comp::ChatType::Group(_, g) => {
for (general_stream, group) in (
&mut ecs.write_storage::<GeneralStream>(),
for (client, group) in (
&ecs.read_storage::<Client>(),
&ecs.read_storage::<comp::Group>(),
)
.join()
{
if g == group {
general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
},
@ -379,37 +357,35 @@ impl StateExt for State {
/// Sends the message to all connected clients
fn notify_players(&self, msg: ServerGeneral) {
let mut msg = Some(msg);
let mut lazy_msg = None;
for (general_stream, _) in (
&mut self.ecs().write_storage::<GeneralStream>(),
for (client, _) in (
&self.ecs().read_storage::<Client>(),
&self.ecs().read_storage::<comp::Player>(),
)
.join()
{
if lazy_msg.is_none() {
lazy_msg = Some(general_stream.prepare(&msg));
lazy_msg = Some(client.prepare(msg.take().unwrap()));
}
lazy_msg
.as_ref()
.map(|ref msg| general_stream.0.send_raw(&msg));
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));
}
}
/// Sends the message to all clients playing in game
fn notify_in_game_clients(&self, msg: ServerGeneral) {
let mut msg = Some(msg);
let mut lazy_msg = None;
for (general_stream, _) in (
&mut self.ecs().write_storage::<GeneralStream>(),
for (client, _) in (
&mut self.ecs().write_storage::<Client>(),
&self.ecs().read_storage::<Presence>(),
)
.join()
{
if lazy_msg.is_none() {
lazy_msg = Some(general_stream.prepare(&msg));
lazy_msg = Some(client.prepare(msg.take().unwrap()));
}
lazy_msg
.as_ref()
.map(|ref msg| general_stream.0.send_raw(&msg));
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));
}
}
@ -419,7 +395,7 @@ impl StateExt for State {
) -> Result<(), specs::error::WrongGeneration> {
// Remove entity from a group if they are in one
{
let mut in_game_streams = self.ecs().write_storage::<InGameStream>();
let clients = self.ecs().read_storage::<Client>();
let uids = self.ecs().read_storage::<Uid>();
let mut group_manager = self.ecs().write_resource::<comp::group::GroupManager>();
group_manager.entity_deleted(
@ -429,14 +405,14 @@ impl StateExt for State {
&uids,
&self.ecs().entities(),
&mut |entity, group_change| {
in_game_streams
.get_mut(entity)
.and_then(|s| {
clients
.get(entity)
.and_then(|c| {
group_change
.try_map(|e| uids.get(e).copied())
.map(|g| (g, s))
.map(|g| (g, c))
})
.map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g)));
.map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g)));
},
);
}

View File

@ -1,126 +0,0 @@
use common::msg::{ClientGeneral, ClientRegister, PingMsg, ServerGeneral, ServerRegisterAnswer};
use network::{Message, Stream, StreamError};
use serde::{de::DeserializeOwned, Serialize};
use specs::Component;
use specs_idvs::IdvStorage;
/// helped to reduce code duplication
pub(crate) trait GetStream {
type RecvMsg: DeserializeOwned;
type SendMsg: Serialize + core::fmt::Debug;
fn get_mut(&mut self) -> &mut Stream;
fn verify(msg: &Self::SendMsg) -> bool;
fn send(&mut self, msg: Self::SendMsg) -> Result<(), StreamError> {
if Self::verify(&msg) {
self.get_mut().send(msg)
} else {
unreachable!("sending this msg isn't allowed! got: {:?}", msg)
}
}
fn send_fallible(&mut self, msg: Self::SendMsg) { let _ = self.send(msg); }
fn prepare(&mut self, msg: &Self::SendMsg) -> Message {
if Self::verify(&msg) {
Message::serialize(&msg, &self.get_mut())
} else {
unreachable!("sending this msg isn't allowed! got: {:?}", msg)
}
}
}
// Streams
// we ignore errors on send, and do unified error handling in recv
pub struct GeneralStream(pub(crate) Stream);
pub struct PingStream(pub(crate) Stream);
pub struct RegisterStream(pub(crate) Stream);
pub struct CharacterScreenStream(pub(crate) Stream);
pub struct InGameStream(pub(crate) Stream);
impl Component for GeneralStream {
type Storage = IdvStorage<Self>;
}
impl Component for PingStream {
type Storage = IdvStorage<Self>;
}
impl Component for RegisterStream {
type Storage = IdvStorage<Self>;
}
impl Component for CharacterScreenStream {
type Storage = IdvStorage<Self>;
}
impl Component for InGameStream {
type Storage = IdvStorage<Self>;
}
impl GetStream for GeneralStream {
type RecvMsg = ClientGeneral;
type SendMsg = ServerGeneral;
fn get_mut(&mut self) -> &mut Stream { &mut self.0 }
fn verify(msg: &Self::SendMsg) -> bool {
matches!(&msg, ServerGeneral::PlayerListUpdate(_)
| ServerGeneral::ChatMsg(_)
| ServerGeneral::SetPlayerEntity(_)
| ServerGeneral::TimeOfDay(_)
| ServerGeneral::EntitySync(_)
| ServerGeneral::CompSync(_)
| ServerGeneral::CreateEntity(_)
| ServerGeneral::DeleteEntity(_)
| ServerGeneral::Disconnect(_)
| ServerGeneral::Notification(_))
}
}
impl GetStream for PingStream {
type RecvMsg = PingMsg;
type SendMsg = PingMsg;
fn get_mut(&mut self) -> &mut Stream { &mut self.0 }
fn verify(_: &Self::SendMsg) -> bool { true }
}
impl GetStream for RegisterStream {
type RecvMsg = ClientRegister;
type SendMsg = ServerRegisterAnswer;
fn get_mut(&mut self) -> &mut Stream { &mut self.0 }
fn verify(_: &Self::SendMsg) -> bool { true }
}
impl GetStream for CharacterScreenStream {
type RecvMsg = ClientGeneral;
type SendMsg = ServerGeneral;
fn get_mut(&mut self) -> &mut Stream { &mut self.0 }
fn verify(msg: &Self::SendMsg) -> bool {
matches!(&msg, ServerGeneral::CharacterDataLoadError(_)
| ServerGeneral::CharacterListUpdate(_)
| ServerGeneral::CharacterActionError(_)
| ServerGeneral::CharacterSuccess)
}
}
impl GetStream for InGameStream {
type RecvMsg = ClientGeneral;
type SendMsg = ServerGeneral;
fn get_mut(&mut self) -> &mut Stream { &mut self.0 }
fn verify(msg: &Self::SendMsg) -> bool {
matches!(&msg, ServerGeneral::GroupUpdate(_)
| ServerGeneral::GroupInvite { .. }
| ServerGeneral::InvitePending(_)
| ServerGeneral::InviteComplete { .. }
| ServerGeneral::ExitInGameSuccess
| ServerGeneral::InventoryUpdate(_, _)
| ServerGeneral::TerrainChunkUpdate { .. }
| ServerGeneral::TerrainBlockUpdates(_)
| ServerGeneral::SetViewDistance(_)
| ServerGeneral::Outcomes(_)
| ServerGeneral::Knockback(_))
}
}

View File

@ -5,7 +5,6 @@ use super::{
use crate::{
client::Client,
presence::{Presence, RegionSubscription},
streams::{GeneralStream, GetStream, InGameStream},
Tick,
};
use common::{
@ -44,9 +43,7 @@ impl<'a> System<'a> for Sys {
WriteStorage<'a, Last<Pos>>,
WriteStorage<'a, Last<Vel>>,
WriteStorage<'a, Last<Ori>>,
WriteStorage<'a, Client>,
WriteStorage<'a, InGameStream>,
WriteStorage<'a, GeneralStream>,
ReadStorage<'a, Client>,
WriteStorage<'a, ForceUpdate>,
WriteStorage<'a, InventoryUpdate>,
Write<'a, DeletedEntities>,
@ -73,9 +70,7 @@ impl<'a> System<'a> for Sys {
mut last_pos,
mut last_vel,
mut last_ori,
mut clients,
mut in_game_streams,
mut general_streams,
clients,
mut force_updates,
mut inventory_updates,
mut deleted_entities,
@ -111,39 +106,20 @@ impl<'a> System<'a> for Sys {
// Assemble subscriber list for this region by iterating through clients and
// checking if they are subscribed to this region
let mut subscribers = (
&mut clients,
&clients,
&entities,
presences.maybe(),
&subscriptions,
&positions,
&mut in_game_streams,
&mut general_streams,
)
.join()
.filter_map(
|(
client,
entity,
presence,
subscription,
pos,
in_game_stream,
general_stream,
)| {
if presence.is_some() && subscription.regions.contains(&key) {
Some((
client,
&subscription.regions,
entity,
*pos,
in_game_stream,
general_stream,
))
} else {
None
}
},
)
.filter_map(|(client, entity, presence, subscription, pos)| {
if presence.is_some() && subscription.regions.contains(&key) {
Some((client, &subscription.regions, entity, *pos))
} else {
None
}
})
.collect::<Vec<_>>();
for event in region.events() {
@ -166,9 +142,7 @@ impl<'a> System<'a> for Sys {
vel.copied(),
ori.copied(),
));
for (_, regions, client_entity, _, _, general_stream) in
&mut subscribers
{
for (client, regions, client_entity, _) in &mut subscribers {
if maybe_key
.as_ref()
.map(|key| !regions.contains(key))
@ -176,7 +150,7 @@ impl<'a> System<'a> for Sys {
// Client doesn't need to know about itself
&& *client_entity != entity
{
general_stream.send_fallible(create_msg.clone());
client.send_fallible(create_msg.clone());
}
}
}
@ -184,13 +158,13 @@ impl<'a> System<'a> for Sys {
RegionEvent::Left(id, maybe_key) => {
// Lookup UID for entity
if let Some(&uid) = uids.get(entities.entity(*id)) {
for (_, regions, _, _, _, general_stream) in &mut subscribers {
for (client, regions, _, _) in &mut subscribers {
if maybe_key
.as_ref()
.map(|key| !regions.contains(key))
.unwrap_or(true)
{
general_stream.send_fallible(ServerGeneral::DeleteEntity(uid));
client.send_fallible(ServerGeneral::DeleteEntity(uid));
}
}
}
@ -211,32 +185,29 @@ impl<'a> System<'a> for Sys {
let mut comp_sync_package = Some(comp_sync_package);
let mut entity_sync_lazymsg = None;
let mut comp_sync_lazymsg = None;
subscribers
.iter_mut()
.for_each(move |(_, _, _, _, _, general_stream)| {
if entity_sync_lazymsg.is_none() {
entity_sync_lazymsg = Some(general_stream.prepare(
&ServerGeneral::EntitySync(entity_sync_package.take().unwrap()),
));
comp_sync_lazymsg =
Some(general_stream.prepare(&ServerGeneral::CompSync(
comp_sync_package.take().unwrap(),
)));
}
entity_sync_lazymsg
.as_ref()
.map(|msg| general_stream.0.send_raw(&msg));
comp_sync_lazymsg
.as_ref()
.map(|msg| general_stream.0.send_raw(&msg));
});
subscribers.iter_mut().for_each(move |(client, _, _, _)| {
if entity_sync_lazymsg.is_none() {
entity_sync_lazymsg = Some(client.prepare(ServerGeneral::EntitySync(
entity_sync_package.take().unwrap(),
)));
comp_sync_lazymsg = Some(
client.prepare(ServerGeneral::CompSync(comp_sync_package.take().unwrap())),
);
}
entity_sync_lazymsg
.as_ref()
.map(|msg| client.send_prepared(&msg));
comp_sync_lazymsg
.as_ref()
.map(|msg| client.send_prepared(&msg));
});
let mut send_general = |msg: ServerGeneral,
entity: EcsEntity,
pos: Pos,
force_update: Option<&ForceUpdate>,
throttle: bool| {
for (_, _, client_entity, client_pos, _, general_stream) in &mut subscribers {
for (client, _, client_entity, client_pos) in &mut subscribers {
if if client_entity == &entity {
// Don't send client physics updates about itself unless force update is set
force_update.is_some()
@ -262,7 +233,7 @@ impl<'a> System<'a> for Sys {
true // Closer than 100 blocks
}
} {
general_stream.send_fallible(msg.clone());
client.send_fallible(msg.clone());
}
}
};
@ -349,18 +320,18 @@ impl<'a> System<'a> for Sys {
// Handle entity deletion in regions that don't exist in RegionMap
// (theoretically none)
for (region_key, deleted) in deleted_entities.take_remaining_deleted() {
for general_stream in (presences.maybe(), &subscriptions, &mut general_streams)
for client in (presences.maybe(), &subscriptions, &clients)
.join()
.filter_map(|(presence, subscription, general_stream)| {
.filter_map(|(presence, subscription, client)| {
if presence.is_some() && subscription.regions.contains(&region_key) {
Some(general_stream)
Some(client)
} else {
None
}
})
{
for uid in &deleted {
general_stream.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid)));
client.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid)));
}
}
}
@ -368,19 +339,15 @@ impl<'a> System<'a> for Sys {
// TODO: Sync clients that don't have a position?
// Sync inventories
for (inventory, update, in_game_stream) in
(&inventories, &inventory_updates, &mut in_game_streams).join()
{
in_game_stream.send_fallible(ServerGeneral::InventoryUpdate(
for (inventory, update, client) in (&inventories, &inventory_updates, &clients).join() {
client.send_fallible(ServerGeneral::InventoryUpdate(
inventory.clone(),
update.event(),
));
}
// Sync outcomes
for (presence, pos, in_game_stream) in
(presences.maybe(), positions.maybe(), &mut in_game_streams).join()
{
for (presence, pos, client) in (presences.maybe(), positions.maybe(), &clients).join() {
let is_near = |o_pos: Vec3<f32>| {
pos.zip_with(presence, |pos, presence| {
pos.0.xy().distance_squared(o_pos.xy())
@ -395,7 +362,7 @@ impl<'a> System<'a> for Sys {
.cloned()
.collect::<Vec<_>>();
if !outcomes.is_empty() {
in_game_stream.send_fallible(ServerGeneral::Outcomes(outcomes));
client.send_fallible(ServerGeneral::Outcomes(outcomes));
}
}
outcomes.clear();
@ -408,13 +375,11 @@ impl<'a> System<'a> for Sys {
// TODO: doesn't really belong in this system (rename system or create another
// system?)
let mut tof_lazymsg = None;
for general_stream in (&mut general_streams).join() {
for client in (&clients).join() {
if tof_lazymsg.is_none() {
tof_lazymsg = Some(general_stream.prepare(&ServerGeneral::TimeOfDay(*time_of_day)));
tof_lazymsg = Some(client.prepare(ServerGeneral::TimeOfDay(*time_of_day)));
}
tof_lazymsg
.as_ref()
.map(|msg| general_stream.0.send_raw(&msg));
tof_lazymsg.as_ref().map(|msg| client.send_prepared(&msg));
}
timer.end();

View File

@ -1,5 +1,5 @@
use super::SysTimer;
use crate::streams::{GetStream, InGameStream};
use crate::client::Client;
use common::{
comp::group::{Invite, PendingInvites},
msg::{InviteAnswer, ServerGeneral},
@ -16,14 +16,14 @@ impl<'a> System<'a> for Sys {
Entities<'a>,
WriteStorage<'a, Invite>,
WriteStorage<'a, PendingInvites>,
WriteStorage<'a, InGameStream>,
ReadStorage<'a, Client>,
ReadStorage<'a, Uid>,
Write<'a, SysTimer<Self>>,
);
fn run(
&mut self,
(entities, mut invites, mut pending_invites, mut in_game_streams, uids, mut timer): Self::SystemData,
(entities, mut invites, mut pending_invites, clients, uids, mut timer): Self::SystemData,
) {
span!(_guard, "run", "invite_timeout::Sys::run");
timer.start();
@ -51,11 +51,10 @@ impl<'a> System<'a> for Sys {
}
// Inform inviter of timeout
if let (Some(in_game_stream), Some(target)) = (
in_game_streams.get_mut(*inviter),
uids.get(invitee).copied(),
) {
in_game_stream.send_fallible(ServerGeneral::InviteComplete {
if let (Some(client), Some(target)) =
(clients.get(*inviter), uids.get(invitee).copied())
{
client.send_fallible(ServerGeneral::InviteComplete {
target,
answer: InviteAnswer::TimedOut,
});

View File

@ -1,22 +1,17 @@
use super::super::SysTimer;
use crate::{
alias_validator::AliasValidator,
character_creator,
client::Client,
persistence::character_loader::CharacterLoader,
presence::Presence,
streams::{CharacterScreenStream, GeneralStream, GetStream},
EditableSettings,
alias_validator::AliasValidator, character_creator, client::Client,
persistence::character_loader::CharacterLoader, presence::Presence, EditableSettings,
};
use common::{
comp::{ChatType, Player, UnresolvedChatMsg},
event::{EventBus, ServerEvent},
msg::{ClientGeneral, ServerGeneral},
span,
state::Time,
sync::Uid,
};
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage};
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write};
use std::sync::atomic::Ordering;
use tracing::{debug, warn};
impl Sys {
@ -25,9 +20,7 @@ impl Sys {
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
new_chat_msgs: &mut Vec<(Option<specs::Entity>, UnresolvedChatMsg)>,
entity: specs::Entity,
client: &mut Client,
character_screen_stream: &mut CharacterScreenStream,
general_stream: &mut GeneralStream,
client: &Client,
character_loader: &ReadExpect<'_, CharacterLoader>,
uids: &ReadStorage<'_, Uid>,
players: &ReadStorage<'_, Player>,
@ -68,27 +61,27 @@ impl Sys {
// Give the player a welcome message
if !editable_settings.server_description.is_empty() {
general_stream.send(ChatType::CommandInfo.server_msg(String::from(
client.send(ChatType::CommandInfo.server_msg(String::from(
&*editable_settings.server_description,
)))?;
}
if !client.login_msg_sent {
if !client.login_msg_sent.load(Ordering::Relaxed) {
if let Some(player_uid) = uids.get(entity) {
new_chat_msgs.push((None, UnresolvedChatMsg {
chat_type: ChatType::Online(*player_uid),
message: "".to_string(),
}));
client.login_msg_sent = true;
client.login_msg_sent.store(true, Ordering::Relaxed);
}
}
}
} else {
debug!("Client is not yet registered");
character_screen_stream.send(ServerGeneral::CharacterDataLoadError(
String::from("Failed to fetch player entity"),
))?
client.send(ServerGeneral::CharacterDataLoadError(String::from(
"Failed to fetch player entity",
)))?
}
},
ClientGeneral::RequestCharacterList => {
@ -99,8 +92,7 @@ impl Sys {
ClientGeneral::CreateCharacter { alias, tool, body } => {
if let Err(error) = alias_validator.validate(&alias) {
debug!(?error, ?alias, "denied alias as it contained a banned word");
character_screen_stream
.send(ServerGeneral::CharacterActionError(error.to_string()))?;
client.send(ServerGeneral::CharacterActionError(error.to_string()))?;
} else if let Some(player) = players.get(entity) {
character_creator::create_character(
entity,
@ -134,15 +126,12 @@ impl<'a> System<'a> for Sys {
type SystemData = (
Entities<'a>,
Read<'a, EventBus<ServerEvent>>,
Read<'a, Time>,
ReadExpect<'a, CharacterLoader>,
Write<'a, SysTimer<Self>>,
ReadStorage<'a, Uid>,
WriteStorage<'a, Client>,
ReadStorage<'a, Client>,
ReadStorage<'a, Player>,
ReadStorage<'a, Presence>,
WriteStorage<'a, CharacterScreenStream>,
WriteStorage<'a, GeneralStream>,
ReadExpect<'a, EditableSettings>,
ReadExpect<'a, AliasValidator>,
);
@ -152,15 +141,12 @@ impl<'a> System<'a> for Sys {
(
entities,
server_event_bus,
time,
character_loader,
mut timer,
uids,
mut clients,
clients,
players,
presences,
mut character_screen_streams,
mut general_streams,
editable_settings,
alias_validator,
): Self::SystemData,
@ -171,37 +157,22 @@ impl<'a> System<'a> for Sys {
let mut server_emitter = server_event_bus.emitter();
let mut new_chat_msgs = Vec::new();
for (entity, client, character_screen_stream, general_stream) in (
&entities,
&mut clients,
&mut character_screen_streams,
&mut general_streams,
)
.join()
{
let res =
super::try_recv_all(character_screen_stream, |character_screen_stream, msg| {
Self::handle_client_character_screen_msg(
&mut server_emitter,
&mut new_chat_msgs,
entity,
client,
character_screen_stream,
general_stream,
&character_loader,
&uids,
&players,
&presences,
&editable_settings,
&alias_validator,
msg,
)
});
if let Ok(1_u64..=u64::MAX) = res {
// Update client ping.
client.last_ping = time.0
}
for (entity, client) in (&entities, &clients).join() {
let _ = super::try_recv_all(client, 1, |client, msg| {
Self::handle_client_character_screen_msg(
&mut server_emitter,
&mut new_chat_msgs,
entity,
client,
&character_loader,
&uids,
&players,
&presences,
&editable_settings,
&alias_validator,
msg,
)
});
}
// Handle new chat messages.

View File

@ -1,5 +1,5 @@
use super::super::SysTimer;
use crate::{client::Client, metrics::PlayerMetrics, streams::GeneralStream};
use crate::{client::Client, metrics::PlayerMetrics};
use common::{
comp::{ChatMode, Player, UnresolvedChatMsg},
event::{EventBus, ServerEvent},
@ -8,7 +8,8 @@ use common::{
state::Time,
sync::Uid,
};
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage};
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write};
use std::sync::atomic::Ordering;
use tracing::{debug, error, warn};
impl Sys {
@ -17,7 +18,7 @@ impl Sys {
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
new_chat_msgs: &mut Vec<(Option<specs::Entity>, UnresolvedChatMsg)>,
entity: specs::Entity,
client: &mut Client,
client: &Client,
player: Option<&Player>,
player_metrics: &ReadExpect<'_, PlayerMetrics>,
uids: &ReadStorage<'_, Uid>,
@ -51,7 +52,7 @@ impl Sys {
.clients_disconnected
.with_label_values(&["gracefully"])
.inc();
client.terminate_msg_recv = true;
client.terminate_msg_recv.store(true, Ordering::Relaxed);
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
},
_ => unreachable!("not a client_general msg"),
@ -73,8 +74,7 @@ impl<'a> System<'a> for Sys {
ReadStorage<'a, Uid>,
ReadStorage<'a, ChatMode>,
ReadStorage<'a, Player>,
WriteStorage<'a, Client>,
WriteStorage<'a, GeneralStream>,
ReadStorage<'a, Client>,
);
fn run(
@ -88,8 +88,7 @@ impl<'a> System<'a> for Sys {
uids,
chat_modes,
players,
mut clients,
mut general_streams,
clients,
): Self::SystemData,
) {
span!(_guard, "run", "msg::general::Sys::run");
@ -98,15 +97,8 @@ impl<'a> System<'a> for Sys {
let mut server_emitter = server_event_bus.emitter();
let mut new_chat_msgs = Vec::new();
for (entity, client, player, general_stream) in (
&entities,
&mut clients,
(&players).maybe(),
&mut general_streams,
)
.join()
{
let res = super::try_recv_all(general_stream, |_, msg| {
for (entity, client, player) in (&entities, &clients, (&players).maybe()).join() {
let res = super::try_recv_all(client, 3, |client, msg| {
Self::handle_general_msg(
&mut server_emitter,
&mut new_chat_msgs,
@ -122,7 +114,7 @@ impl<'a> System<'a> for Sys {
if let Ok(1_u64..=u64::MAX) = res {
// Update client ping.
client.last_ping = time.0
*client.last_ping.lock().unwrap() = time.0
}
}

View File

@ -1,17 +1,11 @@
use super::super::SysTimer;
use crate::{
client::Client,
metrics::NetworkRequestMetrics,
presence::Presence,
streams::{GetStream, InGameStream},
Settings,
};
use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence, Settings};
use common::{
comp::{CanBuild, ControlEvent, Controller, ForceUpdate, Ori, Pos, Stats, Vel},
event::{EventBus, ServerEvent},
msg::{ClientGeneral, PresenceKind, ServerGeneral},
span,
state::{BlockChange, Time},
state::BlockChange,
terrain::{TerrainChunkSize, TerrainGrid},
vol::{ReadVol, RectVolSize},
};
@ -23,9 +17,8 @@ impl Sys {
fn handle_client_in_game_msg(
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
entity: specs::Entity,
_client: &Client,
client: &Client,
maybe_presence: &mut Option<&mut Presence>,
in_game_stream: &mut InGameStream,
terrain: &ReadExpect<'_, TerrainGrid>,
network_metrics: &ReadExpect<'_, NetworkRequestMetrics>,
can_build: &ReadStorage<'_, CanBuild>,
@ -54,7 +47,7 @@ impl Sys {
// Go back to registered state (char selection screen)
ClientGeneral::ExitInGame => {
server_emitter.emit(ServerEvent::ExitIngame { entity });
in_game_stream.send(ServerGeneral::ExitInGameSuccess)?;
client.send(ServerGeneral::ExitInGameSuccess)?;
*maybe_presence = None;
},
ClientGeneral::SetViewDistance(view_distance) => {
@ -69,7 +62,7 @@ impl Sys {
.map(|max| view_distance > max)
.unwrap_or(false)
{
in_game_stream.send(ServerGeneral::SetViewDistance(
client.send(ServerGeneral::SetViewDistance(
settings.max_view_distance.unwrap_or(0),
))?;
}
@ -135,7 +128,7 @@ impl Sys {
match terrain.get_key(key) {
Some(chunk) => {
network_metrics.chunks_served_from_memory.inc();
in_game_stream.send(ServerGeneral::TerrainChunkUpdate {
client.send(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
})?
@ -177,7 +170,6 @@ impl<'a> System<'a> for Sys {
type SystemData = (
Entities<'a>,
Read<'a, EventBus<ServerEvent>>,
Read<'a, Time>,
ReadExpect<'a, TerrainGrid>,
ReadExpect<'a, NetworkRequestMetrics>,
Write<'a, SysTimer<Self>>,
@ -190,7 +182,6 @@ impl<'a> System<'a> for Sys {
WriteStorage<'a, Ori>,
WriteStorage<'a, Presence>,
WriteStorage<'a, Client>,
WriteStorage<'a, InGameStream>,
WriteStorage<'a, Controller>,
Read<'a, Settings>,
);
@ -200,7 +191,6 @@ impl<'a> System<'a> for Sys {
(
entities,
server_event_bus,
time,
terrain,
network_metrics,
mut timer,
@ -213,7 +203,6 @@ impl<'a> System<'a> for Sys {
mut orientations,
mut presences,
mut clients,
mut in_game_streams,
mut controllers,
settings,
): Self::SystemData,
@ -223,21 +212,15 @@ impl<'a> System<'a> for Sys {
let mut server_emitter = server_event_bus.emitter();
for (entity, client, mut presence, in_game_stream) in (
&entities,
&mut clients,
(&mut presences).maybe(),
&mut in_game_streams,
)
.join()
for (entity, client, mut maybe_presence) in
(&entities, &mut clients, (&mut presences).maybe()).join()
{
let res = super::try_recv_all(in_game_stream, |in_game_stream, msg| {
let _ = super::try_recv_all(client, 2, |client, msg| {
Self::handle_client_in_game_msg(
&mut server_emitter,
entity,
client,
&mut presence,
in_game_stream,
&mut maybe_presence,
&terrain,
&network_metrics,
&can_build,
@ -252,11 +235,6 @@ impl<'a> System<'a> for Sys {
msg,
)
});
if let Ok(1_u64..=u64::MAX) = res {
// Update client ping.
client.last_ping = time.0
}
}
timer.end()

View File

@ -4,26 +4,28 @@ pub mod in_game;
pub mod ping;
pub mod register;
use crate::streams::GetStream;
use crate::client::Client;
use serde::de::DeserializeOwned;
/// handles all send msg and calls a handle fn
/// Aborts when a error occurred returns cnt of successful msg otherwise
pub(in crate::sys::msg) fn try_recv_all<T, F>(
stream: &mut T,
pub(in crate::sys::msg) fn try_recv_all<M, F>(
client: &Client,
stream_id: u8,
mut f: F,
) -> Result<u64, crate::error::Error>
where
T: GetStream,
F: FnMut(&mut T, T::RecvMsg) -> Result<(), crate::error::Error>,
M: DeserializeOwned,
F: FnMut(&Client, M) -> Result<(), crate::error::Error>,
{
let mut cnt = 0u64;
loop {
let msg = match stream.get_mut().try_recv() {
let msg = match client.recv(stream_id) {
Ok(Some(msg)) => msg,
Ok(None) => break Ok(cnt),
Err(e) => break Err(e.into()),
};
if let Err(e) = f(stream, msg) {
if let Err(e) = f(client, msg) {
break Err(e);
}
cnt += 1;

View File

@ -1,26 +1,19 @@
use super::super::SysTimer;
use crate::{
client::Client,
metrics::PlayerMetrics,
streams::{GetStream, PingStream},
Settings,
};
use crate::{client::Client, metrics::PlayerMetrics, Settings};
use common::{
event::{EventBus, ServerEvent},
msg::PingMsg,
span,
state::Time,
};
use specs::{Entities, Join, Read, ReadExpect, System, Write, WriteStorage};
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write};
use std::sync::atomic::Ordering;
use tracing::{debug, info};
impl Sys {
fn handle_ping_msg(
ping_stream: &mut PingStream,
msg: PingMsg,
) -> Result<(), crate::error::Error> {
fn handle_ping_msg(client: &Client, msg: PingMsg) -> Result<(), crate::error::Error> {
match msg {
PingMsg::Ping => ping_stream.send(PingMsg::Pong)?,
PingMsg::Ping => client.send(PingMsg::Pong)?,
PingMsg::Pong => {},
}
Ok(())
@ -37,8 +30,7 @@ impl<'a> System<'a> for Sys {
Read<'a, Time>,
ReadExpect<'a, PlayerMetrics>,
Write<'a, SysTimer<Self>>,
WriteStorage<'a, Client>,
WriteStorage<'a, PingStream>,
ReadStorage<'a, Client>,
Read<'a, Settings>,
);
@ -50,8 +42,7 @@ impl<'a> System<'a> for Sys {
time,
player_metrics,
mut timer,
mut clients,
mut ping_streams,
clients,
settings,
): Self::SystemData,
) {
@ -60,14 +51,12 @@ impl<'a> System<'a> for Sys {
let mut server_emitter = server_event_bus.emitter();
for (entity, client, ping_stream) in (&entities, &mut clients, &mut ping_streams).join() {
let res = super::try_recv_all(ping_stream, |ping_stream, msg| {
Self::handle_ping_msg(ping_stream, msg)
});
for (entity, client) in (&entities, &clients).join() {
let res = super::try_recv_all(client, 4, Self::handle_ping_msg);
match res {
Err(e) => {
if !client.terminate_msg_recv {
if !client.terminate_msg_recv.load(Ordering::Relaxed) {
debug!(?entity, ?e, "network error with client, disconnecting");
player_metrics
.clients_disconnected
@ -78,13 +67,14 @@ impl<'a> System<'a> for Sys {
},
Ok(1_u64..=u64::MAX) => {
// Update client ping.
client.last_ping = time.0
*client.last_ping.lock().unwrap() = time.0
},
Ok(0) => {
if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64
let last_ping: f64 = *client.last_ping.lock().unwrap();
if time.0 - last_ping > settings.client_timeout.as_secs() as f64
// Timeout
{
if !client.terminate_msg_recv {
if !client.terminate_msg_recv.load(Ordering::Relaxed) {
info!(?entity, "timeout error with client, disconnecting");
player_metrics
.clients_disconnected
@ -92,11 +82,9 @@ impl<'a> System<'a> for Sys {
.inc();
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
}
} else if time.0 - client.last_ping
> settings.client_timeout.as_secs() as f64 * 0.5
{
} else if time.0 - last_ping > settings.client_timeout.as_secs() as f64 * 0.5 {
// Try pinging the client if the timeout is nearing.
ping_stream.send_fallible(PingMsg::Ping);
client.send_fallible(PingMsg::Ping);
}
},
}

View File

@ -1,10 +1,6 @@
use super::super::SysTimer;
use crate::{
client::Client,
login_provider::LoginProvider,
metrics::PlayerMetrics,
streams::{GeneralStream, GetStream, RegisterStream},
EditableSettings,
client::Client, login_provider::LoginProvider, metrics::PlayerMetrics, EditableSettings,
};
use common::{
comp::{Admin, Player, Stats},
@ -13,13 +9,10 @@ use common::{
ServerRegisterAnswer,
},
span,
state::Time,
sync::Uid,
};
use hashbrown::HashMap;
use specs::{
Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage,
};
use specs::{Entities, Join, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage};
impl Sys {
#[allow(clippy::too_many_arguments)]
@ -27,9 +20,7 @@ impl Sys {
player_list: &HashMap<Uid, PlayerInfo>,
new_players: &mut Vec<specs::Entity>,
entity: specs::Entity,
_client: &mut Client,
register_stream: &mut RegisterStream,
general_stream: &mut GeneralStream,
client: &Client,
player_metrics: &ReadExpect<'_, PlayerMetrics>,
login_provider: &mut WriteExpect<'_, LoginProvider>,
admins: &mut WriteStorage<'_, Admin>,
@ -44,7 +35,7 @@ impl Sys {
&*editable_settings.banlist,
) {
Err(err) => {
register_stream.send(ServerRegisterAnswer::Err(err))?;
client.send(ServerRegisterAnswer::Err(err))?;
return Ok(());
},
Ok((username, uuid)) => (username, uuid),
@ -55,7 +46,7 @@ impl Sys {
if !player.is_valid() {
// Invalid player
register_stream.send(ServerRegisterAnswer::Err(RegisterError::InvalidCharacter))?;
client.send(ServerRegisterAnswer::Err(RegisterError::InvalidCharacter))?;
return Ok(());
}
@ -71,10 +62,10 @@ impl Sys {
}
// Tell the client its request was successful.
register_stream.send(ServerRegisterAnswer::Ok(()))?;
client.send(ServerRegisterAnswer::Ok(()))?;
// Send initial player list
general_stream.send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init(
client.send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init(
player_list.clone(),
)))?;
@ -92,17 +83,14 @@ impl<'a> System<'a> for Sys {
#[allow(clippy::type_complexity)]
type SystemData = (
Entities<'a>,
Read<'a, Time>,
ReadExpect<'a, PlayerMetrics>,
Write<'a, SysTimer<Self>>,
ReadStorage<'a, Uid>,
WriteStorage<'a, Client>,
ReadStorage<'a, Client>,
WriteStorage<'a, Player>,
ReadStorage<'a, Stats>,
WriteExpect<'a, LoginProvider>,
WriteStorage<'a, Admin>,
WriteStorage<'a, RegisterStream>,
WriteStorage<'a, GeneralStream>,
ReadExpect<'a, EditableSettings>,
);
@ -110,17 +98,14 @@ impl<'a> System<'a> for Sys {
&mut self,
(
entities,
time,
player_metrics,
mut timer,
uids,
mut clients,
clients,
mut players,
stats,
mut login_provider,
mut admins,
mut register_streams,
mut general_streams,
editable_settings,
): Self::SystemData,
) {
@ -145,22 +130,13 @@ impl<'a> System<'a> for Sys {
// List of new players to update player lists of all clients.
let mut new_players = Vec::new();
for (entity, client, register_stream, general_stream) in (
&entities,
&mut clients,
&mut register_streams,
&mut general_streams,
)
.join()
{
let res = super::try_recv_all(register_stream, |register_stream, msg| {
for (entity, client) in (&entities, &clients).join() {
let _ = super::try_recv_all(client, 0, |client, msg| {
Self::handle_register_msg(
&player_list,
&mut new_players,
entity,
client,
register_stream,
general_stream,
&player_metrics,
&mut login_provider,
&mut admins,
@ -169,11 +145,6 @@ impl<'a> System<'a> for Sys {
msg,
)
});
if let Ok(1_u64..=u64::MAX) = res {
// Update client ping.
client.last_ping = time.0
}
}
// Handle new players.
@ -181,9 +152,9 @@ impl<'a> System<'a> for Sys {
for entity in new_players {
if let (Some(uid), Some(player)) = (uids.get(entity), players.get(entity)) {
let mut lazy_msg = None;
for (_, general_stream) in (&players, &mut general_streams).join() {
for (_, client) in (&players, &clients).join() {
if lazy_msg.is_none() {
lazy_msg = Some(general_stream.prepare(&ServerGeneral::PlayerListUpdate(
lazy_msg = Some(client.prepare(ServerGeneral::PlayerListUpdate(
PlayerListUpdate::Add(*uid, PlayerInfo {
player_alias: player.alias.clone(),
is_online: true,
@ -192,9 +163,7 @@ impl<'a> System<'a> for Sys {
}),
)));
}
lazy_msg
.as_ref()
.map(|ref msg| general_stream.0.send_raw(&msg));
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));
}
}
}

View File

@ -5,7 +5,6 @@ use super::{
use crate::{
client::Client,
presence::{self, Presence, RegionSubscription},
streams::{GeneralStream, GetStream},
};
use common::{
comp::{Ori, Pos, Vel},
@ -37,7 +36,6 @@ impl<'a> System<'a> for Sys {
ReadStorage<'a, Ori>,
ReadStorage<'a, Presence>,
ReadStorage<'a, Client>,
WriteStorage<'a, GeneralStream>,
WriteStorage<'a, RegionSubscription>,
Write<'a, DeletedEntities>,
TrackedComps<'a>,
@ -55,8 +53,7 @@ impl<'a> System<'a> for Sys {
velocities,
orientations,
presences,
_clients,
mut general_streams,
clients,
mut subscriptions,
mut deleted_entities,
tracked_comps,
@ -77,12 +74,12 @@ impl<'a> System<'a> for Sys {
// 7. Determine list of regions that are in range and iterate through it
// - check if in hashset (hash calc) if not add it
let mut regions_to_remove = Vec::new();
for (subscription, pos, presence, client_entity, general_stream) in (
for (subscription, pos, presence, client_entity, client) in (
&mut subscriptions,
&positions,
&presences,
&entities,
&mut general_streams,
&clients,
)
.join()
{
@ -155,8 +152,7 @@ impl<'a> System<'a> for Sys {
.map(|key| subscription.regions.contains(key))
.unwrap_or(false)
{
general_stream
.send_fallible(ServerGeneral::DeleteEntity(uid));
client.send_fallible(ServerGeneral::DeleteEntity(uid));
}
}
},
@ -164,7 +160,7 @@ impl<'a> System<'a> for Sys {
}
// Tell client to delete entities in the region
for (&uid, _) in (&uids, region.entities()).join() {
let _ = general_stream.send(ServerGeneral::DeleteEntity(uid));
client.send_fallible(ServerGeneral::DeleteEntity(uid));
}
}
// Send deleted entities since they won't be processed for this client in entity
@ -174,7 +170,7 @@ impl<'a> System<'a> for Sys {
.iter()
.flat_map(|v| v.iter())
{
general_stream.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid)));
client.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid)));
}
}
@ -199,7 +195,7 @@ impl<'a> System<'a> for Sys {
{
// Send message to create entity and tracked components and physics
// components
general_stream.send_fallible(ServerGeneral::CreateEntity(
client.send_fallible(ServerGeneral::CreateEntity(
tracked_comps.create_entity_package(
entity,
Some(*pos),
@ -220,10 +216,10 @@ impl<'a> System<'a> for Sys {
/// Initialize region subscription
pub fn initialize_region_subscription(world: &World, entity: specs::Entity) {
if let (Some(client_pos), Some(presence), Some(general_stream)) = (
if let (Some(client_pos), Some(presence), Some(client)) = (
world.read_storage::<Pos>().get(entity),
world.read_storage::<Presence>().get(entity),
world.write_storage::<GeneralStream>().get_mut(entity),
world.write_storage::<Client>().get(entity),
) {
let fuzzy_chunk = (Vec2::<f32>::from(client_pos.0))
.map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32);
@ -248,7 +244,7 @@ pub fn initialize_region_subscription(world: &World, entity: specs::Entity) {
.join()
{
// Send message to create entity and tracked components and physics components
general_stream.send_fallible(ServerGeneral::CreateEntity(
client.send_fallible(ServerGeneral::CreateEntity(
tracked_comps.create_entity_package(
entity,
Some(*pos),

View File

@ -1,10 +1,5 @@
use super::SysTimer;
use crate::{
chunk_generator::ChunkGenerator,
presence::Presence,
streams::{GetStream, InGameStream},
Tick,
};
use crate::{chunk_generator::ChunkGenerator, client::Client, presence::Presence, Tick};
use common::{
comp::{self, bird_medium, Alignment, Pos},
event::{EventBus, ServerEvent},
@ -17,7 +12,7 @@ use common::{
LoadoutBuilder,
};
use rand::Rng;
use specs::{Join, Read, ReadStorage, System, Write, WriteExpect, WriteStorage};
use specs::{Join, Read, ReadStorage, System, Write, WriteExpect};
use std::sync::Arc;
use vek::*;
@ -39,7 +34,7 @@ impl<'a> System<'a> for Sys {
Write<'a, TerrainChanges>,
ReadStorage<'a, Pos>,
ReadStorage<'a, Presence>,
WriteStorage<'a, InGameStream>,
ReadStorage<'a, Client>,
);
fn run(
@ -53,7 +48,7 @@ impl<'a> System<'a> for Sys {
mut terrain_changes,
positions,
presences,
mut in_game_streams,
clients,
): Self::SystemData,
) {
span!(_guard, "run", "terrain::Sys::run");
@ -67,8 +62,8 @@ impl<'a> System<'a> for Sys {
let (chunk, supplement) = match res {
Ok((chunk, supplement)) => (chunk, supplement),
Err(Some(entity)) => {
if let Some(in_game_stream) = in_game_streams.get_mut(entity) {
in_game_stream.send_fallible(ServerGeneral::TerrainChunkUpdate {
if let Some(client) = clients.get(entity) {
client.send_fallible(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Err(()),
});
@ -80,9 +75,7 @@ impl<'a> System<'a> for Sys {
},
};
// Send the chunk to all nearby players.
for (presence, pos, in_game_stream) in
(&presences, &positions, &mut in_game_streams).join()
{
for (presence, pos, client) in (&presences, &positions, &clients).join() {
let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32));
// Subtract 2 from the offset before computing squared magnitude
// 1 since chunks need neighbors to be meshed
@ -92,7 +85,7 @@ impl<'a> System<'a> for Sys {
.magnitude_squared();
if adjusted_dist_sqr <= presence.view_distance.pow(2) {
in_game_stream.send_fallible(ServerGeneral::TerrainChunkUpdate {
client.send_fallible(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
});

View File

@ -1,10 +1,7 @@
use super::SysTimer;
use crate::{
presence::Presence,
streams::{GetStream, InGameStream},
};
use crate::{client::Client, presence::Presence};
use common::{comp::Pos, msg::ServerGeneral, span, state::TerrainChanges, terrain::TerrainGrid};
use specs::{Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage};
use specs::{Join, Read, ReadExpect, ReadStorage, System, Write};
/// This systems sends new chunks to clients as well as changes to existing
/// chunks
@ -17,12 +14,12 @@ impl<'a> System<'a> for Sys {
Write<'a, SysTimer<Self>>,
ReadStorage<'a, Pos>,
ReadStorage<'a, Presence>,
WriteStorage<'a, InGameStream>,
ReadStorage<'a, Client>,
);
fn run(
&mut self,
(terrain, terrain_changes, mut timer, positions, presences, mut in_game_streams): Self::SystemData,
(terrain, terrain_changes, mut timer, positions, presences, clients): Self::SystemData,
) {
span!(_guard, "run", "terrain_sync::Sys::run");
timer.start();
@ -31,24 +28,19 @@ impl<'a> System<'a> for Sys {
'chunk: for chunk_key in &terrain_changes.modified_chunks {
let mut lazy_msg = None;
for (presence, pos, in_game_stream) in
(&presences, &positions, &mut in_game_streams).join()
{
for (presence, pos, client) in (&presences, &positions, &clients).join() {
if super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance)
{
if lazy_msg.is_none() {
lazy_msg =
Some(in_game_stream.prepare(&ServerGeneral::TerrainChunkUpdate {
key: *chunk_key,
chunk: Ok(Box::new(match terrain.get_key(*chunk_key) {
Some(chunk) => chunk.clone(),
None => break 'chunk,
})),
}));
lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate {
key: *chunk_key,
chunk: Ok(Box::new(match terrain.get_key(*chunk_key) {
Some(chunk) => chunk.clone(),
None => break 'chunk,
})),
}));
}
lazy_msg
.as_ref()
.map(|ref msg| in_game_stream.0.send_raw(&msg));
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));
}
}
}
@ -56,15 +48,13 @@ impl<'a> System<'a> for Sys {
// TODO: Don't send all changed blocks to all clients
// Sync changed blocks
let mut lazy_msg = None;
for (_, in_game_stream) in (&presences, &mut in_game_streams).join() {
for (_, client) in (&presences, &clients).join() {
if lazy_msg.is_none() {
lazy_msg = Some(in_game_stream.prepare(&ServerGeneral::TerrainBlockUpdates(
lazy_msg = Some(client.prepare(ServerGeneral::TerrainBlockUpdates(
terrain_changes.modified_blocks.clone(),
)));
}
lazy_msg
.as_ref()
.map(|ref msg| in_game_stream.0.send_raw(&msg));
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));
}
timer.end();

View File

@ -1,5 +1,5 @@
use super::SysTimer;
use crate::streams::{GeneralStream, GetStream};
use crate::client::Client;
use common::{
comp::{Player, Pos, Waypoint, WaypointArea},
msg::{Notification, ServerGeneral},
@ -22,7 +22,7 @@ impl<'a> System<'a> for Sys {
ReadStorage<'a, Player>,
ReadStorage<'a, WaypointArea>,
WriteStorage<'a, Waypoint>,
WriteStorage<'a, GeneralStream>,
ReadStorage<'a, Client>,
Read<'a, Time>,
Write<'a, SysTimer<Self>>,
);
@ -35,7 +35,7 @@ impl<'a> System<'a> for Sys {
players,
waypoint_areas,
mut waypoints,
mut general_streams,
clients,
time,
mut timer,
): Self::SystemData,
@ -43,15 +43,13 @@ impl<'a> System<'a> for Sys {
span!(_guard, "run", "waypoint::Sys::run");
timer.start();
for (entity, player_pos, _, general_stream) in
(&entities, &positions, &players, &mut general_streams).join()
{
for (entity, player_pos, _, client) in (&entities, &positions, &players, &clients).join() {
for (waypoint_pos, waypoint_area) in (&positions, &waypoint_areas).join() {
if player_pos.0.distance_squared(waypoint_pos.0) < waypoint_area.radius().powi(2) {
if let Ok(wp_old) = waypoints.insert(entity, Waypoint::new(player_pos.0, *time))
{
if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) {
general_stream.send_fallible(ServerGeneral::Notification(
client.send_fallible(ServerGeneral::Notification(
Notification::WaypointSaved,
));
}