mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
replace the single message
system with 5 message systems. one per stream to handle less ECS systems PER msg system.
As the MAIN message system was already on 25 of the max of 26 possible Ressources
This commit is contained in:
parent
9459ccf61b
commit
e9be36c993
@ -1,34 +1,10 @@
|
||||
use common::msg::{ClientInGame, ClientType};
|
||||
use hashbrown::HashSet;
|
||||
use network::{Participant, Stream};
|
||||
use network::Participant;
|
||||
use specs::{Component, FlaggedStorage};
|
||||
use specs_idvs::IdvStorage;
|
||||
use vek::*;
|
||||
|
||||
// Streams
|
||||
// we ignore errors on send, and do unified error handling in recv
|
||||
pub struct GeneralStream(pub Stream);
|
||||
pub struct PingStream(pub Stream);
|
||||
pub struct RegisterStream(pub Stream);
|
||||
pub struct CharacterScreenStream(pub Stream);
|
||||
pub struct InGameStream(pub Stream);
|
||||
|
||||
impl Component for GeneralStream {
|
||||
type Storage = FlaggedStorage<Self, IdvStorage<Self>>;
|
||||
}
|
||||
impl Component for PingStream {
|
||||
type Storage = FlaggedStorage<Self, IdvStorage<Self>>;
|
||||
}
|
||||
impl Component for RegisterStream {
|
||||
type Storage = FlaggedStorage<Self, IdvStorage<Self>>;
|
||||
}
|
||||
impl Component for CharacterScreenStream {
|
||||
type Storage = FlaggedStorage<Self, IdvStorage<Self>>;
|
||||
}
|
||||
impl Component for InGameStream {
|
||||
type Storage = FlaggedStorage<Self, IdvStorage<Self>>;
|
||||
}
|
||||
|
||||
pub struct Client {
|
||||
pub registered: bool,
|
||||
pub client_type: ClientType,
|
||||
|
@ -26,7 +26,10 @@ use std::convert::TryFrom;
|
||||
use vek::*;
|
||||
use world::util::Sampler;
|
||||
|
||||
use crate::{client::InGameStream, login_provider::LoginProvider};
|
||||
use crate::{
|
||||
login_provider::LoginProvider,
|
||||
streams::{GetStream, InGameStream},
|
||||
};
|
||||
use scan_fmt::{scan_fmt, scan_fmt_some};
|
||||
use tracing::error;
|
||||
|
||||
@ -670,7 +673,7 @@ fn handle_spawn(
|
||||
.map(|g| (g, s))
|
||||
})
|
||||
.map(|(g, s)| {
|
||||
let _ = s.0.send(ServerGeneral::GroupUpdate(g));
|
||||
s.send_unchecked(ServerGeneral::GroupUpdate(g));
|
||||
});
|
||||
},
|
||||
);
|
||||
|
@ -1,6 +1,6 @@
|
||||
use crate::{
|
||||
CharacterScreenStream, Client, ClientType, GeneralStream, InGameStream, PingStream,
|
||||
RegisterStream, ServerInfo,
|
||||
streams::{CharacterScreenStream, GeneralStream, InGameStream, PingStream, RegisterStream},
|
||||
Client, ClientType, ServerInfo,
|
||||
};
|
||||
use crossbeam::{bounded, unbounded, Receiver, Sender};
|
||||
use futures_channel::oneshot;
|
||||
|
@ -1,6 +1,7 @@
|
||||
use crate::{
|
||||
client::{Client, InGameStream},
|
||||
client::Client,
|
||||
comp::{biped_large, quadruped_medium, quadruped_small},
|
||||
streams::{GetStream, InGameStream},
|
||||
Server, SpawnPoint, StateExt,
|
||||
};
|
||||
use common::{
|
||||
@ -44,7 +45,7 @@ pub fn handle_knockback(server: &Server, entity: EcsEntity, impulse: Vec3<f32>)
|
||||
}
|
||||
let mut in_game_streams = state.ecs().write_storage::<InGameStream>();
|
||||
if let Some(in_game_stream) = in_game_streams.get_mut(entity) {
|
||||
let _ = in_game_stream.0.send(ServerGeneral::Knockback(impulse));
|
||||
in_game_stream.send_unchecked(ServerGeneral::Knockback(impulse));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
use crate::{
|
||||
client::{GeneralStream, InGameStream},
|
||||
streams::{GeneralStream, GetStream, InGameStream},
|
||||
Server,
|
||||
};
|
||||
use common::{
|
||||
@ -29,20 +29,19 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
match manip {
|
||||
GroupManip::Invite(uid) => {
|
||||
let mut general_streams = state.ecs().write_storage::<GeneralStream>();
|
||||
let invitee =
|
||||
match state.ecs().entity_from_uid(uid.into()) {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
// Inform of failure
|
||||
if let Some(general_stream) = general_streams.get_mut(entity) {
|
||||
let _ =
|
||||
general_stream.0.send(ChatType::Meta.server_msg(
|
||||
"Invite failed, target does not exist.".to_owned(),
|
||||
));
|
||||
}
|
||||
return;
|
||||
},
|
||||
};
|
||||
let invitee = match state.ecs().entity_from_uid(uid.into()) {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
// Inform of failure
|
||||
if let Some(general_stream) = general_streams.get_mut(entity) {
|
||||
general_stream.send_unchecked(
|
||||
ChatType::Meta
|
||||
.server_msg("Invite failed, target does not exist.".to_owned()),
|
||||
);
|
||||
}
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
let uids = state.ecs().read_storage::<sync::Uid>();
|
||||
|
||||
@ -67,7 +66,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
if already_in_same_group {
|
||||
// Inform of failure
|
||||
if let Some(general_stream) = general_streams.get_mut(entity) {
|
||||
let _ = general_stream.0.send(ChatType::Meta.server_msg(
|
||||
general_stream.send_unchecked(ChatType::Meta.server_msg(
|
||||
"Invite failed, can't invite someone already in your group".to_owned(),
|
||||
));
|
||||
}
|
||||
@ -97,7 +96,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
if group_size_limit_reached {
|
||||
// Inform inviter that they have reached the group size limit
|
||||
if let Some(general_stream) = general_streams.get_mut(entity) {
|
||||
let _ = general_stream.0.send(
|
||||
general_stream.send_unchecked(
|
||||
ChatType::Meta.server_msg(
|
||||
"Invite failed, pending invites plus current group size have reached \
|
||||
the group size limit"
|
||||
@ -114,12 +113,10 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
if invites.contains(invitee) {
|
||||
// Inform inviter that there is already an invite
|
||||
if let Some(general_stream) = general_streams.get_mut(entity) {
|
||||
let _ =
|
||||
general_stream
|
||||
.0
|
||||
.send(ChatType::Meta.server_msg(
|
||||
"This player already has a pending invite.".to_owned(),
|
||||
));
|
||||
general_stream.send_unchecked(
|
||||
ChatType::Meta
|
||||
.server_msg("This player already has a pending invite.".to_owned()),
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -163,7 +160,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
(in_game_streams.get_mut(invitee), uids.get(entity).copied())
|
||||
{
|
||||
if send_invite() {
|
||||
let _ = in_game_stream.0.send(ServerGeneral::GroupInvite {
|
||||
in_game_stream.send_unchecked(ServerGeneral::GroupInvite {
|
||||
inviter,
|
||||
timeout: PRESENTED_INVITE_TIMEOUT_DUR,
|
||||
});
|
||||
@ -171,7 +168,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
} else if agents.contains(invitee) {
|
||||
send_invite();
|
||||
} else if let Some(general_stream) = general_streams.get_mut(entity) {
|
||||
let _ = general_stream.0.send(
|
||||
general_stream.send_unchecked(
|
||||
ChatType::Meta.server_msg("Can't invite, not a player or npc".to_owned()),
|
||||
);
|
||||
}
|
||||
@ -179,7 +176,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
// Notify inviter that the invite is pending
|
||||
if invite_sent {
|
||||
if let Some(in_game_stream) = in_game_streams.get_mut(entity) {
|
||||
let _ = in_game_stream.0.send(ServerGeneral::InvitePending(uid));
|
||||
in_game_stream.send_unchecked(ServerGeneral::InvitePending(uid));
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -204,7 +201,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
if let (Some(in_game_stream), Some(target)) =
|
||||
(in_game_streams.get_mut(inviter), uids.get(entity).copied())
|
||||
{
|
||||
let _ = in_game_stream.0.send(ServerGeneral::InviteComplete {
|
||||
in_game_stream.send_unchecked(ServerGeneral::InviteComplete {
|
||||
target,
|
||||
answer: InviteAnswer::Accepted,
|
||||
});
|
||||
@ -225,7 +222,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
.try_map(|e| uids.get(e).copied())
|
||||
.map(|g| (g, s))
|
||||
})
|
||||
.map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g)));
|
||||
.map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g)));
|
||||
},
|
||||
);
|
||||
}
|
||||
@ -252,7 +249,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
if let (Some(in_game_stream), Some(target)) =
|
||||
(in_game_streams.get_mut(inviter), uids.get(entity).copied())
|
||||
{
|
||||
let _ = in_game_stream.0.send(ServerGeneral::InviteComplete {
|
||||
in_game_stream.send_unchecked(ServerGeneral::InviteComplete {
|
||||
target,
|
||||
answer: InviteAnswer::Declined,
|
||||
});
|
||||
@ -277,7 +274,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
.try_map(|e| uids.get(e).copied())
|
||||
.map(|g| (g, s))
|
||||
})
|
||||
.map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g)));
|
||||
.map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g)));
|
||||
},
|
||||
);
|
||||
},
|
||||
@ -291,7 +288,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
None => {
|
||||
// Inform of failure
|
||||
if let Some(general_stream) = general_streams.get_mut(entity) {
|
||||
let _ = general_stream.0.send(
|
||||
general_stream.send_unchecked(
|
||||
ChatType::Meta
|
||||
.server_msg("Kick failed, target does not exist.".to_owned()),
|
||||
);
|
||||
@ -304,7 +301,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
if matches!(alignments.get(target), Some(comp::Alignment::Owned(owner)) if uids.get(target).map_or(true, |u| u != owner))
|
||||
{
|
||||
if let Some(general_stream) = general_streams.get_mut(entity) {
|
||||
let _ = general_stream.0.send(
|
||||
general_stream.send_unchecked(
|
||||
ChatType::Meta.server_msg("Kick failed, you can't kick pets.".to_owned()),
|
||||
);
|
||||
}
|
||||
@ -313,7 +310,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
// Can't kick yourself
|
||||
if uids.get(entity).map_or(false, |u| *u == uid) {
|
||||
if let Some(general_stream) = general_streams.get_mut(entity) {
|
||||
let _ = general_stream.0.send(
|
||||
general_stream.send_unchecked(
|
||||
ChatType::Meta
|
||||
.server_msg("Kick failed, you can't kick yourself.".to_owned()),
|
||||
);
|
||||
@ -345,28 +342,27 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
.try_map(|e| uids.get(e).copied())
|
||||
.map(|g| (g, s))
|
||||
})
|
||||
.map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g)));
|
||||
.map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g)));
|
||||
},
|
||||
);
|
||||
|
||||
// Tell them the have been kicked
|
||||
if let Some(general_stream) = general_streams.get_mut(target) {
|
||||
let _ = general_stream.0.send(
|
||||
general_stream.send_unchecked(
|
||||
ChatType::Meta
|
||||
.server_msg("You were removed from the group.".to_owned()),
|
||||
);
|
||||
}
|
||||
// Tell kicker that they were succesful
|
||||
if let Some(general_stream) = general_streams.get_mut(entity) {
|
||||
let _ = general_stream
|
||||
.0
|
||||
.send(ChatType::Meta.server_msg("Player kicked.".to_owned()));
|
||||
general_stream
|
||||
.send_unchecked(ChatType::Meta.server_msg("Player kicked.".to_owned()));
|
||||
}
|
||||
},
|
||||
Some(_) => {
|
||||
// Inform kicker that they are not the leader
|
||||
if let Some(general_stream) = general_streams.get_mut(entity) {
|
||||
let _ = general_stream.0.send(ChatType::Meta.server_msg(
|
||||
general_stream.send_unchecked(ChatType::Meta.server_msg(
|
||||
"Kick failed: You are not the leader of the target's group.".to_owned(),
|
||||
));
|
||||
}
|
||||
@ -374,10 +370,11 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
None => {
|
||||
// Inform kicker that the target is not in a group
|
||||
if let Some(general_stream) = general_streams.get_mut(entity) {
|
||||
let _ =
|
||||
general_stream.0.send(ChatType::Meta.server_msg(
|
||||
general_stream.send_unchecked(
|
||||
ChatType::Meta.server_msg(
|
||||
"Kick failed: Your target is not in a group.".to_owned(),
|
||||
));
|
||||
),
|
||||
);
|
||||
}
|
||||
},
|
||||
}
|
||||
@ -390,7 +387,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
None => {
|
||||
// Inform of failure
|
||||
if let Some(general_stream) = general_streams.get_mut(entity) {
|
||||
let _ = general_stream.0.send(ChatType::Meta.server_msg(
|
||||
general_stream.send_unchecked(ChatType::Meta.server_msg(
|
||||
"Leadership transfer failed, target does not exist".to_owned(),
|
||||
));
|
||||
}
|
||||
@ -421,18 +418,18 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
.try_map(|e| uids.get(e).copied())
|
||||
.map(|g| (g, s))
|
||||
})
|
||||
.map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g)));
|
||||
.map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g)));
|
||||
},
|
||||
);
|
||||
// Tell them they are the leader
|
||||
if let Some(general_stream) = general_streams.get_mut(target) {
|
||||
let _ = general_stream.0.send(
|
||||
general_stream.send_unchecked(
|
||||
ChatType::Meta.server_msg("You are the group leader now.".to_owned()),
|
||||
);
|
||||
}
|
||||
// Tell the old leader that the transfer was succesful
|
||||
if let Some(general_stream) = general_streams.get_mut(target) {
|
||||
let _ = general_stream.0.send(
|
||||
general_stream.send_unchecked(
|
||||
ChatType::Meta
|
||||
.server_msg("You are no longer the group leader.".to_owned()),
|
||||
);
|
||||
@ -442,7 +439,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
// Inform transferer that they are not the leader
|
||||
let mut general_streams = state.ecs().write_storage::<GeneralStream>();
|
||||
if let Some(general_stream) = general_streams.get_mut(entity) {
|
||||
let _ = general_stream.0.send(
|
||||
general_stream.send_unchecked(
|
||||
ChatType::Meta.server_msg(
|
||||
"Transfer failed: You are not the leader of the target's group."
|
||||
.to_owned(),
|
||||
@ -454,7 +451,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
|
||||
// Inform transferer that the target is not in a group
|
||||
let mut general_streams = state.ecs().write_storage::<GeneralStream>();
|
||||
if let Some(general_stream) = general_streams.get_mut(entity) {
|
||||
let _ = general_stream.0.send(ChatType::Meta.server_msg(
|
||||
general_stream.send_unchecked(ChatType::Meta.server_msg(
|
||||
"Transfer failed: Your target is not in a group.".to_owned(),
|
||||
));
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
use crate::{
|
||||
client::{
|
||||
CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegionSubscription,
|
||||
RegisterStream,
|
||||
client::{Client, RegionSubscription},
|
||||
streams::{
|
||||
CharacterScreenStream, GeneralStream, GetStream, InGameStream, PingStream, RegisterStream,
|
||||
},
|
||||
Server,
|
||||
};
|
||||
@ -153,55 +153,34 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) {
|
||||
Some(c) => c,
|
||||
None => return,
|
||||
};
|
||||
let _ = general_stream
|
||||
.0
|
||||
.send(ServerGeneral::SetPlayerEntity(possesse_uid));
|
||||
clients
|
||||
.insert(possesse, client)
|
||||
.err()
|
||||
.map(|e| error!(?e, "Error inserting client component during possession"));
|
||||
general_streams
|
||||
.insert(possesse, general_stream)
|
||||
.err()
|
||||
.map(|e| {
|
||||
error!(
|
||||
?e,
|
||||
"Error inserting general_streams component during possession"
|
||||
)
|
||||
});
|
||||
ping_streams.insert(possesse, ping_stream).err().map(|e| {
|
||||
error!(
|
||||
?e,
|
||||
"Error inserting ping_streams component during possession"
|
||||
)
|
||||
});
|
||||
register_streams
|
||||
.insert(possesse, register_stream)
|
||||
.err()
|
||||
.map(|e| {
|
||||
error!(
|
||||
?e,
|
||||
"Error inserting register_streams component during possession"
|
||||
)
|
||||
});
|
||||
character_screen_streams
|
||||
.insert(possesse, character_screen_stream)
|
||||
.err()
|
||||
.map(|e| {
|
||||
error!(
|
||||
?e,
|
||||
"Error inserting character_screen_streams component during possession"
|
||||
)
|
||||
});
|
||||
in_game_streams
|
||||
.insert(possesse, in_game_stream)
|
||||
.err()
|
||||
.map(|e| {
|
||||
error!(
|
||||
?e,
|
||||
"Error inserting in_game_streams component during possession"
|
||||
)
|
||||
});
|
||||
general_stream.send_unchecked(ServerGeneral::SetPlayerEntity(possesse_uid));
|
||||
let err_fn = |c, e: Option<specs::error::Error>| {
|
||||
e.map(|e| error!(?e, "Error inserting {} component during possession", c));
|
||||
};
|
||||
|
||||
err_fn("client", clients.insert(possesse, client).err());
|
||||
err_fn(
|
||||
"general_streams",
|
||||
general_streams.insert(possesse, general_stream).err(),
|
||||
);
|
||||
err_fn(
|
||||
"ping_streams",
|
||||
ping_streams.insert(possesse, ping_stream).err(),
|
||||
);
|
||||
err_fn(
|
||||
"register_streams",
|
||||
register_streams.insert(possesse, register_stream).err(),
|
||||
);
|
||||
err_fn(
|
||||
"character_screen_streams",
|
||||
character_screen_streams
|
||||
.insert(possesse, character_screen_stream)
|
||||
.err(),
|
||||
);
|
||||
err_fn(
|
||||
"in_game_streams",
|
||||
in_game_streams.insert(possesse, in_game_stream).err(),
|
||||
);
|
||||
// Put possess item into loadout
|
||||
let mut loadouts = ecs.write_storage::<comp::Loadout>();
|
||||
let loadout = loadouts
|
||||
@ -229,22 +208,14 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) {
|
||||
{
|
||||
let mut players = ecs.write_storage::<comp::Player>();
|
||||
if let Some(player) = players.remove(possessor) {
|
||||
players
|
||||
.insert(possesse, player)
|
||||
.err()
|
||||
.map(|e| error!(?e, "Error inserting player component during possession"));
|
||||
err_fn("player", players.insert(possesse, player).err());
|
||||
}
|
||||
}
|
||||
// Transfer region subscription
|
||||
{
|
||||
let mut subscriptions = ecs.write_storage::<RegionSubscription>();
|
||||
if let Some(s) = subscriptions.remove(possessor) {
|
||||
subscriptions.insert(possesse, s).err().map(|e| {
|
||||
error!(
|
||||
?e,
|
||||
"Error inserting subscription component during possession"
|
||||
)
|
||||
});
|
||||
err_fn("subscription", subscriptions.insert(possesse, s).err());
|
||||
}
|
||||
}
|
||||
// Remove will of the entity
|
||||
@ -257,19 +228,14 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) {
|
||||
{
|
||||
let mut admins = ecs.write_storage::<comp::Admin>();
|
||||
if let Some(admin) = admins.remove(possessor) {
|
||||
admins
|
||||
.insert(possesse, admin)
|
||||
.err()
|
||||
.map(|e| error!(?e, "Error inserting admin component during possession"));
|
||||
err_fn("admin", admins.insert(possesse, admin).err());
|
||||
}
|
||||
}
|
||||
// Transfer waypoint
|
||||
{
|
||||
let mut waypoints = ecs.write_storage::<comp::Waypoint>();
|
||||
if let Some(waypoint) = waypoints.remove(possessor) {
|
||||
waypoints.insert(possesse, waypoint).err().map(|e| {
|
||||
error!(?e, "Error inserting waypoint component during possession",)
|
||||
});
|
||||
err_fn("waypoints", waypoints.insert(possesse, waypoint).err());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,7 @@
|
||||
use crate::{client::InGameStream, Server, StateExt};
|
||||
use crate::{
|
||||
streams::{GetStream, InGameStream},
|
||||
Server, StateExt,
|
||||
};
|
||||
use common::{
|
||||
comp::{
|
||||
self, item,
|
||||
@ -302,7 +305,7 @@ pub fn handle_inventory(server: &mut Server, entity: EcsEntity, manip: comp::Inv
|
||||
.map(|g| (g, s))
|
||||
})
|
||||
.map(|(g, s)| {
|
||||
s.0.send(ServerGeneral::GroupUpdate(g))
|
||||
s.send(ServerGeneral::GroupUpdate(g))
|
||||
});
|
||||
},
|
||||
);
|
||||
|
@ -1,11 +1,12 @@
|
||||
use super::Event;
|
||||
use crate::{
|
||||
client::{
|
||||
CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegisterStream,
|
||||
},
|
||||
client::Client,
|
||||
login_provider::LoginProvider,
|
||||
persistence,
|
||||
state_ext::StateExt,
|
||||
streams::{
|
||||
CharacterScreenStream, GeneralStream, GetStream, InGameStream, PingStream, RegisterStream,
|
||||
},
|
||||
Server,
|
||||
};
|
||||
use common::{
|
||||
@ -49,11 +50,11 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) {
|
||||
Some(mut client),
|
||||
Some(uid),
|
||||
Some(player),
|
||||
Some(mut general_stream),
|
||||
Some(general_stream),
|
||||
Some(ping_stream),
|
||||
Some(register_stream),
|
||||
Some(character_screen_stream),
|
||||
Some(in_game_stream),
|
||||
Some(mut in_game_stream),
|
||||
) = (
|
||||
maybe_client,
|
||||
maybe_uid,
|
||||
@ -66,7 +67,7 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) {
|
||||
) {
|
||||
// Tell client its request was successful
|
||||
client.in_game = None;
|
||||
let _ = general_stream.0.send(ServerGeneral::ExitInGameSuccess);
|
||||
in_game_stream.send_unchecked(ServerGeneral::ExitInGameSuccess);
|
||||
|
||||
let entity_builder = state
|
||||
.ecs_mut()
|
||||
|
@ -19,6 +19,7 @@ pub mod metrics;
|
||||
pub mod persistence;
|
||||
pub mod settings;
|
||||
pub mod state_ext;
|
||||
pub mod streams;
|
||||
pub mod sys;
|
||||
#[cfg(not(feature = "worldgen"))] mod test_world;
|
||||
|
||||
@ -34,15 +35,15 @@ pub use crate::{
|
||||
use crate::{
|
||||
alias_validator::AliasValidator,
|
||||
chunk_generator::ChunkGenerator,
|
||||
client::{
|
||||
CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegionSubscription,
|
||||
RegisterStream,
|
||||
},
|
||||
client::{Client, RegionSubscription},
|
||||
cmd::ChatCommandExt,
|
||||
connection_handler::ConnectionHandler,
|
||||
data_dir::DataDir,
|
||||
login_provider::LoginProvider,
|
||||
state_ext::StateExt,
|
||||
streams::{
|
||||
CharacterScreenStream, GeneralStream, GetStream, InGameStream, PingStream, RegisterStream,
|
||||
},
|
||||
sys::sentinel::{DeletedEntities, TrackedComps},
|
||||
};
|
||||
use common::{
|
||||
@ -166,7 +167,13 @@ impl Server {
|
||||
|
||||
// System timers for performance monitoring
|
||||
state.ecs_mut().insert(sys::EntitySyncTimer::default());
|
||||
state.ecs_mut().insert(sys::MessageTimer::default());
|
||||
state.ecs_mut().insert(sys::GeneralMsgTimer::default());
|
||||
state.ecs_mut().insert(sys::PingMsgTimer::default());
|
||||
state.ecs_mut().insert(sys::RegisterMsgTimer::default());
|
||||
state
|
||||
.ecs_mut()
|
||||
.insert(sys::CharacterScreenMsgTimer::default());
|
||||
state.ecs_mut().insert(sys::InGameMsgTimer::default());
|
||||
state.ecs_mut().insert(sys::SentinelTimer::default());
|
||||
state.ecs_mut().insert(sys::SubscriptionTimer::default());
|
||||
state.ecs_mut().insert(sys::TerrainSyncTimer::default());
|
||||
@ -466,7 +473,12 @@ impl Server {
|
||||
|
||||
// Run message receiving sys before the systems in common for decreased latency
|
||||
// (e.g. run before controller system)
|
||||
sys::message::Sys.run_now(&self.state.ecs());
|
||||
//TODO: run in parallel
|
||||
sys::msg::general::Sys.run_now(&self.state.ecs());
|
||||
sys::msg::register::Sys.run_now(&self.state.ecs());
|
||||
sys::msg::character_screen::Sys.run_now(&self.state.ecs());
|
||||
sys::msg::in_game::Sys.run_now(&self.state.ecs());
|
||||
sys::msg::ping::Sys.run_now(&self.state.ecs());
|
||||
|
||||
let before_state_tick = Instant::now();
|
||||
|
||||
@ -615,7 +627,14 @@ impl Server {
|
||||
.ecs()
|
||||
.read_resource::<sys::EntitySyncTimer>()
|
||||
.nanos as i64;
|
||||
let message_nanos = self.state.ecs().read_resource::<sys::MessageTimer>().nanos as i64;
|
||||
let message_nanos = {
|
||||
let state = self.state.ecs();
|
||||
(state.read_resource::<sys::GeneralMsgTimer>().nanos
|
||||
+ state.read_resource::<sys::PingMsgTimer>().nanos
|
||||
+ state.read_resource::<sys::RegisterMsgTimer>().nanos
|
||||
+ state.read_resource::<sys::CharacterScreenMsgTimer>().nanos
|
||||
+ state.read_resource::<sys::InGameMsgTimer>().nanos) as i64
|
||||
};
|
||||
let sentinel_nanos = self.state.ecs().read_resource::<sys::SentinelTimer>().nanos as i64;
|
||||
let subscription_nanos = self
|
||||
.state
|
||||
@ -882,7 +901,7 @@ impl Server {
|
||||
.ecs()
|
||||
.write_storage::<RegisterStream>()
|
||||
.get_mut(entity)
|
||||
.map(|s| s.0.send(msg));
|
||||
.map(|s| s.send(msg));
|
||||
},
|
||||
ServerMsg::General(msg) => {
|
||||
match &msg {
|
||||
@ -895,7 +914,7 @@ impl Server {
|
||||
.ecs()
|
||||
.write_storage::<CharacterScreenStream>()
|
||||
.get_mut(entity)
|
||||
.map(|s| s.0.send(msg)),
|
||||
.map(|s| s.send(msg)),
|
||||
//Ingame related
|
||||
ServerGeneral::GroupUpdate(_)
|
||||
| ServerGeneral::GroupInvite { .. }
|
||||
@ -912,7 +931,7 @@ impl Server {
|
||||
.ecs()
|
||||
.write_storage::<InGameStream>()
|
||||
.get_mut(entity)
|
||||
.map(|s| s.0.send(msg)),
|
||||
.map(|s| s.send(msg)),
|
||||
// Always possible
|
||||
ServerGeneral::PlayerListUpdate(_)
|
||||
| ServerGeneral::ChatMsg(_)
|
||||
@ -928,7 +947,7 @@ impl Server {
|
||||
.ecs()
|
||||
.write_storage::<GeneralStream>()
|
||||
.get_mut(entity)
|
||||
.map(|s| s.0.send(msg)),
|
||||
.map(|s| s.send(msg)),
|
||||
};
|
||||
},
|
||||
ServerMsg::Ping(msg) => {
|
||||
@ -936,7 +955,7 @@ impl Server {
|
||||
.ecs()
|
||||
.write_storage::<PingStream>()
|
||||
.get_mut(entity)
|
||||
.map(|s| s.0.send(msg));
|
||||
.map(|s| s.send(msg));
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
use crate::{
|
||||
client::{CharacterScreenStream, Client, GeneralStream},
|
||||
client::Client,
|
||||
persistence::PersistedComponents,
|
||||
streams::{CharacterScreenStream, GeneralStream, GetStream, InGameStream},
|
||||
sys::sentinel::DeletedEntities,
|
||||
SpawnPoint,
|
||||
};
|
||||
@ -231,9 +232,7 @@ impl StateExt for State {
|
||||
.get_mut(entity)
|
||||
{
|
||||
client.in_game = Some(ClientInGame::Character);
|
||||
let _ = character_screen_stream
|
||||
.0
|
||||
.send(ServerGeneral::CharacterSuccess);
|
||||
character_screen_stream.send_unchecked(ServerGeneral::CharacterSuccess);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -292,14 +291,14 @@ impl StateExt for State {
|
||||
self.notify_registered_clients(ServerGeneral::ChatMsg(resolved_msg))
|
||||
},
|
||||
comp::ChatType::Online(u) => {
|
||||
for (client, uid) in (
|
||||
&mut ecs.write_storage::<Client>(),
|
||||
for (general_stream, uid) in (
|
||||
&mut ecs.write_storage::<GeneralStream>(),
|
||||
&ecs.read_storage::<Uid>(),
|
||||
)
|
||||
.join()
|
||||
{
|
||||
if uid != u {
|
||||
client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone()));
|
||||
general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone()));
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -311,9 +310,7 @@ impl StateExt for State {
|
||||
.join()
|
||||
{
|
||||
if uid == u || uid == t {
|
||||
let _ = general_stream
|
||||
.0
|
||||
.send(ServerGeneral::ChatMsg(resolved_msg.clone()));
|
||||
general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone()));
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -327,9 +324,8 @@ impl StateExt for State {
|
||||
(&mut ecs.write_storage::<GeneralStream>(), &positions).join()
|
||||
{
|
||||
if is_within(comp::ChatMsg::SAY_DISTANCE, pos, speaker_pos) {
|
||||
let _ = general_stream
|
||||
.0
|
||||
.send(ServerGeneral::ChatMsg(resolved_msg.clone()));
|
||||
general_stream
|
||||
.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -343,9 +339,8 @@ impl StateExt for State {
|
||||
(&mut ecs.write_storage::<GeneralStream>(), &positions).join()
|
||||
{
|
||||
if is_within(comp::ChatMsg::REGION_DISTANCE, pos, speaker_pos) {
|
||||
let _ = general_stream
|
||||
.0
|
||||
.send(ServerGeneral::ChatMsg(resolved_msg.clone()));
|
||||
general_stream
|
||||
.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -359,9 +354,8 @@ impl StateExt for State {
|
||||
(&mut ecs.write_storage::<GeneralStream>(), &positions).join()
|
||||
{
|
||||
if is_within(comp::ChatMsg::NPC_DISTANCE, pos, speaker_pos) {
|
||||
let _ = general_stream
|
||||
.0
|
||||
.send(ServerGeneral::ChatMsg(resolved_msg.clone()));
|
||||
general_stream
|
||||
.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -375,9 +369,7 @@ impl StateExt for State {
|
||||
.join()
|
||||
{
|
||||
if s == &faction.0 {
|
||||
let _ = general_stream
|
||||
.0
|
||||
.send(ServerGeneral::ChatMsg(resolved_msg.clone()));
|
||||
general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone()));
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -389,9 +381,7 @@ impl StateExt for State {
|
||||
.join()
|
||||
{
|
||||
if g == group {
|
||||
let _ = general_stream
|
||||
.0
|
||||
.send(ServerGeneral::ChatMsg(resolved_msg.clone()));
|
||||
general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone()));
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -407,7 +397,7 @@ impl StateExt for State {
|
||||
.join()
|
||||
.filter(|(_, c)| c.registered)
|
||||
{
|
||||
let _ = general_stream.0.send(msg.clone());
|
||||
general_stream.send_unchecked(msg.clone());
|
||||
}
|
||||
}
|
||||
|
||||
@ -420,7 +410,7 @@ impl StateExt for State {
|
||||
.join()
|
||||
.filter(|(_, c)| c.in_game.is_some())
|
||||
{
|
||||
let _ = general_stream.0.send(msg.clone());
|
||||
general_stream.send_unchecked(msg.clone());
|
||||
}
|
||||
}
|
||||
|
||||
@ -430,7 +420,7 @@ impl StateExt for State {
|
||||
) -> Result<(), specs::error::WrongGeneration> {
|
||||
// Remove entity from a group if they are in one
|
||||
{
|
||||
let mut general_streams = self.ecs().write_storage::<GeneralStream>();
|
||||
let mut in_game_streams = self.ecs().write_storage::<InGameStream>();
|
||||
let uids = self.ecs().read_storage::<Uid>();
|
||||
let mut group_manager = self.ecs().write_resource::<comp::group::GroupManager>();
|
||||
group_manager.entity_deleted(
|
||||
@ -440,14 +430,14 @@ impl StateExt for State {
|
||||
&uids,
|
||||
&self.ecs().entities(),
|
||||
&mut |entity, group_change| {
|
||||
general_streams
|
||||
in_game_streams
|
||||
.get_mut(entity)
|
||||
.and_then(|s| {
|
||||
group_change
|
||||
.try_map(|e| uids.get(e).copied())
|
||||
.map(|g| (g, s))
|
||||
})
|
||||
.map(|(g, s)| s.0.send(ServerGeneral::GroupUpdate(g)));
|
||||
.map(|(g, s)| s.send(ServerGeneral::GroupUpdate(g)));
|
||||
},
|
||||
);
|
||||
}
|
||||
|
118
server/src/streams.rs
Normal file
118
server/src/streams.rs
Normal file
@ -0,0 +1,118 @@
|
||||
use common::msg::{ClientGeneral, ClientRegister, PingMsg, ServerGeneral, ServerRegisterAnswer};
|
||||
|
||||
use network::{Stream, StreamError};
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
|
||||
use specs::Component;
|
||||
use specs_idvs::IdvStorage;
|
||||
|
||||
/// helped to reduce code duplication
|
||||
pub(crate) trait GetStream {
|
||||
type RecvMsg: DeserializeOwned;
|
||||
type SendMsg: Serialize + core::fmt::Debug;
|
||||
fn get_mut(&mut self) -> &mut Stream;
|
||||
fn verify(msg: &Self::SendMsg) -> bool;
|
||||
|
||||
fn send(&mut self, msg: Self::SendMsg) -> Result<(), StreamError> {
|
||||
if Self::verify(&msg) {
|
||||
self.get_mut().send(msg)
|
||||
} else {
|
||||
unreachable!("sending this msg isn't allowed! got: {:?}", msg)
|
||||
}
|
||||
}
|
||||
|
||||
fn send_unchecked(&mut self, msg: Self::SendMsg) { let _ = self.send(msg); }
|
||||
}
|
||||
|
||||
// Streams
|
||||
// we ignore errors on send, and do unified error handling in recv
|
||||
pub struct GeneralStream(pub(crate) Stream);
|
||||
pub struct PingStream(pub(crate) Stream);
|
||||
pub struct RegisterStream(pub(crate) Stream);
|
||||
pub struct CharacterScreenStream(pub(crate) Stream);
|
||||
pub struct InGameStream(pub(crate) Stream);
|
||||
|
||||
impl Component for GeneralStream {
|
||||
type Storage = IdvStorage<Self>;
|
||||
}
|
||||
impl Component for PingStream {
|
||||
type Storage = IdvStorage<Self>;
|
||||
}
|
||||
impl Component for RegisterStream {
|
||||
type Storage = IdvStorage<Self>;
|
||||
}
|
||||
impl Component for CharacterScreenStream {
|
||||
type Storage = IdvStorage<Self>;
|
||||
}
|
||||
impl Component for InGameStream {
|
||||
type Storage = IdvStorage<Self>;
|
||||
}
|
||||
|
||||
impl GetStream for GeneralStream {
|
||||
type RecvMsg = ClientGeneral;
|
||||
type SendMsg = ServerGeneral;
|
||||
|
||||
fn get_mut(&mut self) -> &mut Stream { &mut self.0 }
|
||||
|
||||
fn verify(msg: &Self::SendMsg) -> bool {
|
||||
matches!(&msg, ServerGeneral::PlayerListUpdate(_)
|
||||
| ServerGeneral::ChatMsg(_)
|
||||
| ServerGeneral::SetPlayerEntity(_)
|
||||
| ServerGeneral::TimeOfDay(_)
|
||||
| ServerGeneral::EntitySync(_)
|
||||
| ServerGeneral::CompSync(_)
|
||||
| ServerGeneral::CreateEntity(_)
|
||||
| ServerGeneral::DeleteEntity(_)
|
||||
| ServerGeneral::Disconnect(_)
|
||||
| ServerGeneral::Notification(_))
|
||||
}
|
||||
}
|
||||
impl GetStream for PingStream {
|
||||
type RecvMsg = PingMsg;
|
||||
type SendMsg = PingMsg;
|
||||
|
||||
fn get_mut(&mut self) -> &mut Stream { &mut self.0 }
|
||||
|
||||
fn verify(_: &Self::SendMsg) -> bool { true }
|
||||
}
|
||||
impl GetStream for RegisterStream {
|
||||
type RecvMsg = ClientRegister;
|
||||
type SendMsg = ServerRegisterAnswer;
|
||||
|
||||
fn get_mut(&mut self) -> &mut Stream { &mut self.0 }
|
||||
|
||||
fn verify(_: &Self::SendMsg) -> bool { true }
|
||||
}
|
||||
impl GetStream for CharacterScreenStream {
|
||||
type RecvMsg = ClientGeneral;
|
||||
type SendMsg = ServerGeneral;
|
||||
|
||||
fn get_mut(&mut self) -> &mut Stream { &mut self.0 }
|
||||
|
||||
fn verify(msg: &Self::SendMsg) -> bool {
|
||||
matches!(&msg, ServerGeneral::CharacterDataLoadError(_)
|
||||
| ServerGeneral::CharacterListUpdate(_)
|
||||
| ServerGeneral::CharacterActionError(_)
|
||||
| ServerGeneral::CharacterSuccess)
|
||||
}
|
||||
}
|
||||
impl GetStream for InGameStream {
|
||||
type RecvMsg = ClientGeneral;
|
||||
type SendMsg = ServerGeneral;
|
||||
|
||||
fn get_mut(&mut self) -> &mut Stream { &mut self.0 }
|
||||
|
||||
fn verify(msg: &Self::SendMsg) -> bool {
|
||||
matches!(&msg, ServerGeneral::GroupUpdate(_)
|
||||
| ServerGeneral::GroupInvite { .. }
|
||||
| ServerGeneral::InvitePending(_)
|
||||
| ServerGeneral::InviteComplete { .. }
|
||||
| ServerGeneral::ExitInGameSuccess
|
||||
| ServerGeneral::InventoryUpdate(_, _)
|
||||
| ServerGeneral::TerrainChunkUpdate { .. }
|
||||
| ServerGeneral::TerrainBlockUpdates(_)
|
||||
| ServerGeneral::SetViewDistance(_)
|
||||
| ServerGeneral::Outcomes(_)
|
||||
| ServerGeneral::Knockback(_))
|
||||
}
|
||||
}
|
@ -3,7 +3,8 @@ use super::{
|
||||
SysTimer,
|
||||
};
|
||||
use crate::{
|
||||
client::{Client, GeneralStream, InGameStream, RegionSubscription},
|
||||
client::{Client, RegionSubscription},
|
||||
streams::{GeneralStream, GetStream, InGameStream},
|
||||
Tick,
|
||||
};
|
||||
use common::{
|
||||
@ -165,7 +166,7 @@ impl<'a> System<'a> for Sys {
|
||||
// Client doesn't need to know about itself
|
||||
&& *client_entity != entity
|
||||
{
|
||||
let _ = general_stream.0.send(create_msg.clone());
|
||||
general_stream.send_unchecked(create_msg.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -179,7 +180,7 @@ impl<'a> System<'a> for Sys {
|
||||
.map(|key| !regions.contains(key))
|
||||
.unwrap_or(true)
|
||||
{
|
||||
let _ = general_stream.0.send(ServerGeneral::DeleteEntity(uid));
|
||||
general_stream.send_unchecked(ServerGeneral::DeleteEntity(uid));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -201,8 +202,8 @@ impl<'a> System<'a> for Sys {
|
||||
subscribers
|
||||
.iter_mut()
|
||||
.for_each(move |(_, _, _, _, _, general_stream)| {
|
||||
let _ = general_stream.0.send(entity_sync_msg.clone());
|
||||
let _ = general_stream.0.send(comp_sync_msg.clone());
|
||||
general_stream.send_unchecked(entity_sync_msg.clone());
|
||||
general_stream.send_unchecked(comp_sync_msg.clone());
|
||||
});
|
||||
|
||||
let mut send_general = |msg: ServerGeneral,
|
||||
@ -236,7 +237,7 @@ impl<'a> System<'a> for Sys {
|
||||
true // Closer than 100 blocks
|
||||
}
|
||||
} {
|
||||
let _ = general_stream.0.send(msg.clone());
|
||||
general_stream.send_unchecked(msg.clone());
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -334,9 +335,7 @@ impl<'a> System<'a> for Sys {
|
||||
})
|
||||
{
|
||||
for uid in &deleted {
|
||||
let _ = general_stream
|
||||
.0
|
||||
.send(ServerGeneral::DeleteEntity(Uid(*uid)));
|
||||
general_stream.send_unchecked(ServerGeneral::DeleteEntity(Uid(*uid)));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -347,7 +346,7 @@ impl<'a> System<'a> for Sys {
|
||||
for (inventory, update, in_game_stream) in
|
||||
(&inventories, &inventory_updates, &mut in_game_streams).join()
|
||||
{
|
||||
let _ = in_game_stream.0.send(ServerGeneral::InventoryUpdate(
|
||||
in_game_stream.send_unchecked(ServerGeneral::InventoryUpdate(
|
||||
inventory.clone(),
|
||||
update.event(),
|
||||
));
|
||||
@ -370,7 +369,7 @@ impl<'a> System<'a> for Sys {
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
if !outcomes.is_empty() {
|
||||
let _ = in_game_stream.0.send(ServerGeneral::Outcomes(outcomes));
|
||||
in_game_stream.send_unchecked(ServerGeneral::Outcomes(outcomes));
|
||||
}
|
||||
}
|
||||
outcomes.clear();
|
||||
@ -384,7 +383,7 @@ impl<'a> System<'a> for Sys {
|
||||
// system?)
|
||||
let tof_msg = ServerGeneral::TimeOfDay(*time_of_day);
|
||||
for general_stream in (&mut general_streams).join() {
|
||||
let _ = general_stream.0.send(tof_msg.clone());
|
||||
general_stream.send_unchecked(tof_msg.clone());
|
||||
}
|
||||
|
||||
timer.end();
|
||||
|
@ -1,5 +1,5 @@
|
||||
use super::SysTimer;
|
||||
use crate::client::InGameStream;
|
||||
use crate::streams::{GetStream, InGameStream};
|
||||
use common::{
|
||||
comp::group::{Invite, PendingInvites},
|
||||
msg::{InviteAnswer, ServerGeneral},
|
||||
@ -55,7 +55,7 @@ impl<'a> System<'a> for Sys {
|
||||
in_game_streams.get_mut(*inviter),
|
||||
uids.get(invitee).copied(),
|
||||
) {
|
||||
let _ = in_game_stream.0.send(ServerGeneral::InviteComplete {
|
||||
in_game_stream.send_unchecked(ServerGeneral::InviteComplete {
|
||||
target,
|
||||
answer: InviteAnswer::TimedOut,
|
||||
});
|
||||
|
@ -1,764 +0,0 @@
|
||||
use super::SysTimer;
|
||||
use crate::{
|
||||
alias_validator::AliasValidator,
|
||||
character_creator,
|
||||
client::{
|
||||
CharacterScreenStream, Client, GeneralStream, InGameStream, PingStream, RegisterStream,
|
||||
},
|
||||
login_provider::LoginProvider,
|
||||
metrics::{NetworkRequestMetrics, PlayerMetrics},
|
||||
persistence::character_loader::CharacterLoader,
|
||||
EditableSettings, Settings,
|
||||
};
|
||||
use common::{
|
||||
comp::{
|
||||
Admin, CanBuild, ChatMode, ChatType, ControlEvent, Controller, ForceUpdate, Ori, Player,
|
||||
Pos, Stats, UnresolvedChatMsg, Vel,
|
||||
},
|
||||
event::{EventBus, ServerEvent},
|
||||
msg::{
|
||||
validate_chat_msg, CharacterInfo, ChatMsgValidationError, ClientGeneral, ClientInGame,
|
||||
ClientRegister, DisconnectReason, PingMsg, PlayerInfo, PlayerListUpdate, RegisterError,
|
||||
ServerGeneral, ServerRegisterAnswer, MAX_BYTES_CHAT_MSG,
|
||||
},
|
||||
span,
|
||||
state::{BlockChange, Time},
|
||||
sync::Uid,
|
||||
terrain::{TerrainChunkSize, TerrainGrid},
|
||||
vol::{ReadVol, RectVolSize},
|
||||
};
|
||||
use futures_executor::block_on;
|
||||
use futures_timer::Delay;
|
||||
use futures_util::{select, FutureExt};
|
||||
use hashbrown::HashMap;
|
||||
use specs::{
|
||||
Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage,
|
||||
};
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
impl Sys {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn handle_client_msg(
|
||||
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
|
||||
new_chat_msgs: &mut Vec<(Option<specs::Entity>, UnresolvedChatMsg)>,
|
||||
entity: specs::Entity,
|
||||
client: &mut Client,
|
||||
general_stream: &mut GeneralStream,
|
||||
player_metrics: &ReadExpect<'_, PlayerMetrics>,
|
||||
uids: &ReadStorage<'_, Uid>,
|
||||
chat_modes: &ReadStorage<'_, ChatMode>,
|
||||
msg: ClientGeneral,
|
||||
) -> Result<(), crate::error::Error> {
|
||||
match msg {
|
||||
ClientGeneral::ChatMsg(message) => {
|
||||
if client.registered {
|
||||
match validate_chat_msg(&message) {
|
||||
Ok(()) => {
|
||||
if let Some(from) = uids.get(entity) {
|
||||
let mode = chat_modes.get(entity).cloned().unwrap_or_default();
|
||||
let msg = mode.new_message(*from, message);
|
||||
new_chat_msgs.push((Some(entity), msg));
|
||||
} else {
|
||||
error!("Could not send message. Missing player uid");
|
||||
}
|
||||
},
|
||||
Err(ChatMsgValidationError::TooLong) => {
|
||||
let max = MAX_BYTES_CHAT_MSG;
|
||||
let len = message.len();
|
||||
warn!(?len, ?max, "Received a chat message that's too long")
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
ClientGeneral::Disconnect => {
|
||||
general_stream
|
||||
.0
|
||||
.send(ServerGeneral::Disconnect(DisconnectReason::Requested))?;
|
||||
},
|
||||
ClientGeneral::Terminate => {
|
||||
debug!(?entity, "Client send message to termitate session");
|
||||
player_metrics
|
||||
.clients_disconnected
|
||||
.with_label_values(&["gracefully"])
|
||||
.inc();
|
||||
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
|
||||
},
|
||||
_ => unreachable!("not a client_general msg"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/*
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn handle_client_in_game_msg(
|
||||
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
|
||||
entity: specs::Entity,
|
||||
client: &mut Client,
|
||||
in_game_stream: &mut InGameStream,
|
||||
terrain: &ReadExpect<'_, TerrainGrid>,
|
||||
network_metrics: &ReadExpect<'_, NetworkRequestMetrics>,
|
||||
can_build: &ReadStorage<'_, CanBuild>,
|
||||
force_updates: &ReadStorage<'_, ForceUpdate>,
|
||||
stats: &mut WriteStorage<'_, Stats>,
|
||||
block_changes: &mut Write<'_, BlockChange>,
|
||||
positions: &mut WriteStorage<'_, Pos>,
|
||||
velocities: &mut WriteStorage<'_, Vel>,
|
||||
orientations: &mut WriteStorage<'_, Ori>,
|
||||
players: &mut WriteStorage<'_, Player>,
|
||||
controllers: &mut WriteStorage<'_, Controller>,
|
||||
settings: &Read<'_, Settings>,
|
||||
msg: ClientGeneral,
|
||||
) -> Result<(), crate::error::Error> {
|
||||
if client.in_game.is_none() {
|
||||
debug!(?entity, "client is not in_game, ignoring msg");
|
||||
trace!(?msg, "ignored msg content");
|
||||
if matches!(msg, ClientGeneral::TerrainChunkRequest{ .. }) {
|
||||
network_metrics.chunks_request_dropped.inc();
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
match msg {
|
||||
// Go back to registered state (char selection screen)
|
||||
ClientGeneral::ExitInGame => {
|
||||
client.in_game = None;
|
||||
server_emitter.emit(ServerEvent::ExitIngame { entity });
|
||||
in_game_stream.0.send(ServerGeneral::ExitInGameSuccess)?;
|
||||
},
|
||||
ClientGeneral::SetViewDistance(view_distance) => {
|
||||
players.get_mut(entity).map(|player| {
|
||||
player.view_distance = Some(
|
||||
settings
|
||||
.max_view_distance
|
||||
.map(|max| view_distance.min(max))
|
||||
.unwrap_or(view_distance),
|
||||
)
|
||||
});
|
||||
|
||||
//correct client if its VD is to high
|
||||
if settings
|
||||
.max_view_distance
|
||||
.map(|max| view_distance > max)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
in_game_stream.0.send(ServerGeneral::SetViewDistance(
|
||||
settings.max_view_distance.unwrap_or(0),
|
||||
))?;
|
||||
}
|
||||
},
|
||||
ClientGeneral::ControllerInputs(inputs) => {
|
||||
if let Some(ClientInGame::Character) = client.in_game {
|
||||
if let Some(controller) = controllers.get_mut(entity) {
|
||||
controller.inputs.update_with_new(inputs);
|
||||
}
|
||||
}
|
||||
},
|
||||
ClientGeneral::ControlEvent(event) => {
|
||||
if let Some(ClientInGame::Character) = client.in_game {
|
||||
// Skip respawn if client entity is alive
|
||||
if let ControlEvent::Respawn = event {
|
||||
if stats.get(entity).map_or(true, |s| !s.is_dead) {
|
||||
//Todo: comment why return!
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
if let Some(controller) = controllers.get_mut(entity) {
|
||||
controller.events.push(event);
|
||||
}
|
||||
}
|
||||
},
|
||||
ClientGeneral::ControlAction(event) => {
|
||||
if let Some(ClientInGame::Character) = client.in_game {
|
||||
if let Some(controller) = controllers.get_mut(entity) {
|
||||
controller.actions.push(event);
|
||||
}
|
||||
}
|
||||
},
|
||||
ClientGeneral::PlayerPhysics { pos, vel, ori } => {
|
||||
if let Some(ClientInGame::Character) = client.in_game {
|
||||
if force_updates.get(entity).is_none()
|
||||
&& stats.get(entity).map_or(true, |s| !s.is_dead)
|
||||
{
|
||||
let _ = positions.insert(entity, pos);
|
||||
let _ = velocities.insert(entity, vel);
|
||||
let _ = orientations.insert(entity, ori);
|
||||
}
|
||||
}
|
||||
},
|
||||
ClientGeneral::BreakBlock(pos) => {
|
||||
if let Some(block) = can_build.get(entity).and_then(|_| terrain.get(pos).ok()) {
|
||||
block_changes.set(pos, block.into_vacant());
|
||||
}
|
||||
},
|
||||
ClientGeneral::PlaceBlock(pos, block) => {
|
||||
if can_build.get(entity).is_some() {
|
||||
block_changes.try_set(pos, block);
|
||||
}
|
||||
},
|
||||
ClientGeneral::TerrainChunkRequest { key } => {
|
||||
let in_vd = if let (Some(view_distance), Some(pos)) = (
|
||||
players.get(entity).and_then(|p| p.view_distance),
|
||||
positions.get(entity),
|
||||
) {
|
||||
pos.0.xy().map(|e| e as f64).distance(
|
||||
key.map(|e| e as f64 + 0.5) * TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
|
||||
) < (view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt())
|
||||
* TerrainChunkSize::RECT_SIZE.x as f64
|
||||
} else {
|
||||
true
|
||||
};
|
||||
if in_vd {
|
||||
match terrain.get_key(key) {
|
||||
Some(chunk) => {
|
||||
network_metrics.chunks_served_from_memory.inc();
|
||||
in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate {
|
||||
key,
|
||||
chunk: Ok(Box::new(chunk.clone())),
|
||||
})?
|
||||
},
|
||||
None => {
|
||||
network_metrics.chunks_generation_triggered.inc();
|
||||
server_emitter.emit(ServerEvent::ChunkRequest(entity, key))
|
||||
},
|
||||
}
|
||||
} else {
|
||||
network_metrics.chunks_request_dropped.inc();
|
||||
}
|
||||
},
|
||||
ClientGeneral::UnlockSkill(skill) => {
|
||||
stats
|
||||
.get_mut(entity)
|
||||
.map(|s| s.skill_set.unlock_skill(skill));
|
||||
},
|
||||
ClientGeneral::RefundSkill(skill) => {
|
||||
stats
|
||||
.get_mut(entity)
|
||||
.map(|s| s.skill_set.refund_skill(skill));
|
||||
},
|
||||
ClientGeneral::UnlockSkillGroup(skill_group_type) => {
|
||||
stats
|
||||
.get_mut(entity)
|
||||
.map(|s| s.skill_set.unlock_skill_group(skill_group_type));
|
||||
},
|
||||
_ => unreachable!("not a client_in_game msg"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
*/
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn handle_client_character_screen_msg(
|
||||
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
|
||||
new_chat_msgs: &mut Vec<(Option<specs::Entity>, UnresolvedChatMsg)>,
|
||||
entity: specs::Entity,
|
||||
client: &mut Client,
|
||||
character_screen_stream: &mut CharacterScreenStream,
|
||||
character_loader: &ReadExpect<'_, CharacterLoader>,
|
||||
uids: &ReadStorage<'_, Uid>,
|
||||
players: &mut WriteStorage<'_, Player>,
|
||||
editable_settings: &ReadExpect<'_, EditableSettings>,
|
||||
alias_validator: &ReadExpect<'_, AliasValidator>,
|
||||
msg: ClientGeneral,
|
||||
) -> Result<(), crate::error::Error> {
|
||||
match msg {
|
||||
// Request spectator state
|
||||
ClientGeneral::Spectate if client.registered => {
|
||||
client.in_game = Some(ClientInGame::Spectator)
|
||||
},
|
||||
ClientGeneral::Spectate => debug!("dropped Spectate msg from unregistered client"),
|
||||
ClientGeneral::Character(character_id)
|
||||
if client.registered && client.in_game.is_none() =>
|
||||
{
|
||||
if let Some(player) = players.get(entity) {
|
||||
// Send a request to load the character's component data from the
|
||||
// DB. Once loaded, persisted components such as stats and inventory
|
||||
// will be inserted for the entity
|
||||
character_loader.load_character_data(
|
||||
entity,
|
||||
player.uuid().to_string(),
|
||||
character_id,
|
||||
);
|
||||
|
||||
// Start inserting non-persisted/default components for the entity
|
||||
// while we load the DB data
|
||||
server_emitter.emit(ServerEvent::InitCharacterData {
|
||||
entity,
|
||||
character_id,
|
||||
});
|
||||
|
||||
// Give the player a welcome message
|
||||
if !editable_settings.server_description.is_empty() {
|
||||
character_screen_stream
|
||||
.0
|
||||
.send(ChatType::CommandInfo.server_msg(String::from(
|
||||
&*editable_settings.server_description,
|
||||
)))?;
|
||||
}
|
||||
|
||||
if !client.login_msg_sent {
|
||||
if let Some(player_uid) = uids.get(entity) {
|
||||
new_chat_msgs.push((None, UnresolvedChatMsg {
|
||||
chat_type: ChatType::Online(*player_uid),
|
||||
message: "".to_string(),
|
||||
}));
|
||||
|
||||
client.login_msg_sent = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
character_screen_stream
|
||||
.0
|
||||
.send(ServerGeneral::CharacterDataLoadError(String::from(
|
||||
"Failed to fetch player entity",
|
||||
)))?
|
||||
}
|
||||
}
|
||||
ClientGeneral::Character(_) => {
|
||||
let registered = client.registered;
|
||||
let in_game = client.in_game;
|
||||
debug!(?registered, ?in_game, "dropped Character msg from client");
|
||||
},
|
||||
ClientGeneral::RequestCharacterList => {
|
||||
if let Some(player) = players.get(entity) {
|
||||
character_loader.load_character_list(entity, player.uuid().to_string())
|
||||
}
|
||||
},
|
||||
ClientGeneral::CreateCharacter { alias, tool, body } => {
|
||||
if let Err(error) = alias_validator.validate(&alias) {
|
||||
debug!(?error, ?alias, "denied alias as it contained a banned word");
|
||||
character_screen_stream
|
||||
.0
|
||||
.send(ServerGeneral::CharacterActionError(error.to_string()))?;
|
||||
} else if let Some(player) = players.get(entity) {
|
||||
character_creator::create_character(
|
||||
entity,
|
||||
player.uuid().to_string(),
|
||||
alias,
|
||||
tool,
|
||||
body,
|
||||
character_loader,
|
||||
);
|
||||
}
|
||||
},
|
||||
ClientGeneral::DeleteCharacter(character_id) => {
|
||||
if let Some(player) = players.get(entity) {
|
||||
character_loader.delete_character(
|
||||
entity,
|
||||
player.uuid().to_string(),
|
||||
character_id,
|
||||
);
|
||||
}
|
||||
},
|
||||
_ => unreachable!("not a client_character_screen msg"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn handle_ping_msg(
|
||||
client: &mut Client,
|
||||
ping_stream: &mut PingStream,
|
||||
msg: PingMsg,
|
||||
) -> Result<(), crate::error::Error> {
|
||||
match msg {
|
||||
PingMsg::Ping => ping_stream.0.send(PingMsg::Pong)?,
|
||||
PingMsg::Pong => {},
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn handle_register_msg(
|
||||
player_list: &HashMap<Uid, PlayerInfo>,
|
||||
new_players: &mut Vec<specs::Entity>,
|
||||
entity: specs::Entity,
|
||||
client: &mut Client,
|
||||
register_stream: &mut RegisterStream,
|
||||
player_metrics: &ReadExpect<'_, PlayerMetrics>,
|
||||
login_provider: &mut WriteExpect<'_, LoginProvider>,
|
||||
admins: &mut WriteStorage<'_, Admin>,
|
||||
players: &mut WriteStorage<'_, Player>,
|
||||
editable_settings: &ReadExpect<'_, EditableSettings>,
|
||||
msg: ClientRegister,
|
||||
) -> Result<(), crate::error::Error> {
|
||||
let (username, uuid) = match login_provider.try_login(
|
||||
&msg.token_or_username,
|
||||
&*editable_settings.admins,
|
||||
&*editable_settings.whitelist,
|
||||
&*editable_settings.banlist,
|
||||
) {
|
||||
Err(err) => {
|
||||
register_stream.0.send(ServerRegisterAnswer::Err(err))?;
|
||||
return Ok(());
|
||||
},
|
||||
Ok((username, uuid)) => (username, uuid),
|
||||
};
|
||||
|
||||
const INITIAL_VD: Option<u32> = Some(5); //will be changed after login
|
||||
let player = Player::new(username, None, INITIAL_VD, uuid);
|
||||
let is_admin = editable_settings.admins.contains(&uuid);
|
||||
|
||||
if !player.is_valid() {
|
||||
// Invalid player
|
||||
register_stream
|
||||
.0
|
||||
.send(ServerRegisterAnswer::Err(RegisterError::InvalidCharacter))?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if !client.registered && client.in_game.is_none() {
|
||||
// Add Player component to this client
|
||||
let _ = players.insert(entity, player);
|
||||
player_metrics.players_connected.inc();
|
||||
|
||||
// Give the Admin component to the player if their name exists in
|
||||
// admin list
|
||||
if is_admin {
|
||||
let _ = admins.insert(entity, Admin);
|
||||
}
|
||||
|
||||
// Tell the client its request was successful.
|
||||
client.registered = true;
|
||||
register_stream.0.send(ServerRegisterAnswer::Ok(()))?;
|
||||
|
||||
// Send initial player list
|
||||
register_stream
|
||||
.0
|
||||
.send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init(
|
||||
player_list.clone(),
|
||||
)))?;
|
||||
|
||||
// Add to list to notify all clients of the new player
|
||||
new_players.push(entity);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///We needed to move this to a async fn, if we would use a async closures
|
||||
/// the compiler generates to much recursion and fails to compile this
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn handle_messages(
|
||||
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
|
||||
new_chat_msgs: &mut Vec<(Option<specs::Entity>, UnresolvedChatMsg)>,
|
||||
player_list: &HashMap<Uid, PlayerInfo>,
|
||||
new_players: &mut Vec<specs::Entity>,
|
||||
entity: specs::Entity,
|
||||
client: &mut Client,
|
||||
cnt: &mut u64,
|
||||
character_loader: &ReadExpect<'_, CharacterLoader>,
|
||||
terrain: &ReadExpect<'_, TerrainGrid>,
|
||||
network_metrics: &ReadExpect<'_, NetworkRequestMetrics>,
|
||||
player_metrics: &ReadExpect<'_, PlayerMetrics>,
|
||||
uids: &ReadStorage<'_, Uid>,
|
||||
can_build: &ReadStorage<'_, CanBuild>,
|
||||
force_updates: &ReadStorage<'_, ForceUpdate>,
|
||||
stats: &mut WriteStorage<'_, Stats>,
|
||||
chat_modes: &ReadStorage<'_, ChatMode>,
|
||||
login_provider: &mut WriteExpect<'_, LoginProvider>,
|
||||
block_changes: &mut Write<'_, BlockChange>,
|
||||
admins: &mut WriteStorage<'_, Admin>,
|
||||
//positions: &mut WriteStorage<'_, Pos>,
|
||||
//velocities: &mut WriteStorage<'_, Vel>,
|
||||
//orientations: &mut WriteStorage<'_, Ori>,
|
||||
players: &mut WriteStorage<'_, Player>,
|
||||
general_stream: &mut GeneralStream,
|
||||
ping_stream: &mut PingStream,
|
||||
register_stream: &mut RegisterStream,
|
||||
character_screen_stream: &mut CharacterScreenStream,
|
||||
//in_game_stream: &mut InGameStream,
|
||||
controllers: &mut WriteStorage<'_, Controller>,
|
||||
settings: &Read<'_, Settings>,
|
||||
editable_settings: &ReadExpect<'_, EditableSettings>,
|
||||
alias_validator: &ReadExpect<'_, AliasValidator>,
|
||||
) -> Result<(), crate::error::Error> {
|
||||
loop {
|
||||
if let Some(msg) = general_stream.0.try_recv()? {
|
||||
Self::handle_client_msg(
|
||||
server_emitter,
|
||||
new_chat_msgs,
|
||||
entity,
|
||||
client,
|
||||
general_stream,
|
||||
player_metrics,
|
||||
uids,
|
||||
chat_modes,
|
||||
msg,
|
||||
)?;
|
||||
*cnt += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(msg) = ping_stream.0.try_recv()? {
|
||||
Self::handle_ping_msg(client, ping_stream, msg)?;
|
||||
*cnt += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(msg) = register_stream.0.try_recv()? {
|
||||
Self::handle_register_msg(
|
||||
player_list,
|
||||
new_players,
|
||||
entity,
|
||||
client,
|
||||
register_stream,
|
||||
player_metrics,
|
||||
login_provider,
|
||||
admins,
|
||||
players,
|
||||
editable_settings,
|
||||
msg,
|
||||
)?;
|
||||
*cnt += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(msg) = character_screen_stream.0.try_recv()? {
|
||||
Self::handle_client_character_screen_msg(
|
||||
server_emitter,
|
||||
new_chat_msgs,
|
||||
entity,
|
||||
client,
|
||||
character_screen_stream,
|
||||
character_loader,
|
||||
uids,
|
||||
players,
|
||||
editable_settings,
|
||||
alias_validator,
|
||||
msg,
|
||||
)?;
|
||||
*cnt += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
break Ok(())
|
||||
/*
|
||||
if let Some(msg) = m2 {
|
||||
client.network_error |= b2;
|
||||
Self::handle_client_in_game_msg(
|
||||
server_emitter,
|
||||
entity,
|
||||
client,
|
||||
terrain,
|
||||
network_metrics,
|
||||
can_build,
|
||||
force_updates,
|
||||
stats,
|
||||
block_changes,
|
||||
positions,
|
||||
velocities,
|
||||
orientations,
|
||||
players,
|
||||
controllers,
|
||||
settings,
|
||||
msg?,
|
||||
)?;
|
||||
}*/
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This system will handle new messages from clients
|
||||
pub struct Sys;
|
||||
impl<'a> System<'a> for Sys {
|
||||
#[allow(clippy::type_complexity)] // TODO: Pending review in #587
|
||||
type SystemData = (
|
||||
Entities<'a>,
|
||||
Read<'a, EventBus<ServerEvent>>,
|
||||
Read<'a, Time>,
|
||||
ReadExpect<'a, CharacterLoader>,
|
||||
ReadExpect<'a, TerrainGrid>,
|
||||
ReadExpect<'a, NetworkRequestMetrics>,
|
||||
ReadExpect<'a, PlayerMetrics>,
|
||||
Write<'a, SysTimer<Self>>,
|
||||
ReadStorage<'a, Uid>,
|
||||
ReadStorage<'a, CanBuild>,
|
||||
ReadStorage<'a, ForceUpdate>,
|
||||
WriteStorage<'a, Stats>,
|
||||
ReadStorage<'a, ChatMode>,
|
||||
WriteExpect<'a, LoginProvider>,
|
||||
Write<'a, BlockChange>,
|
||||
WriteStorage<'a, Admin>,
|
||||
//WriteStorage<'a, Pos>,
|
||||
//WriteStorage<'a, Vel>,
|
||||
//WriteStorage<'a, Ori>,
|
||||
WriteStorage<'a, Player>,
|
||||
WriteStorage<'a, Client>,
|
||||
WriteStorage<'a, GeneralStream>,
|
||||
WriteStorage<'a, PingStream>,
|
||||
WriteStorage<'a, RegisterStream>,
|
||||
WriteStorage<'a, CharacterScreenStream>,
|
||||
//WriteStorage<'a, InGameStream>,
|
||||
WriteStorage<'a, Controller>,
|
||||
Read<'a, Settings>,
|
||||
ReadExpect<'a, EditableSettings>,
|
||||
ReadExpect<'a, AliasValidator>,
|
||||
);
|
||||
|
||||
#[allow(clippy::match_ref_pats)] // TODO: Pending review in #587
|
||||
#[allow(clippy::single_char_pattern)] // TODO: Pending review in #587
|
||||
#[allow(clippy::single_match)] // TODO: Pending review in #587
|
||||
fn run(
|
||||
&mut self,
|
||||
(
|
||||
entities,
|
||||
server_event_bus,
|
||||
time,
|
||||
character_loader,
|
||||
terrain,
|
||||
network_metrics,
|
||||
player_metrics,
|
||||
mut timer,
|
||||
uids,
|
||||
can_build,
|
||||
force_updates,
|
||||
mut stats,
|
||||
chat_modes,
|
||||
mut accounts,
|
||||
mut block_changes,
|
||||
mut admins,
|
||||
//mut positions,
|
||||
//mut velocities,
|
||||
//mut orientations,
|
||||
mut players,
|
||||
mut clients,
|
||||
mut general_streams,
|
||||
mut ping_streams,
|
||||
mut register_streams,
|
||||
mut character_screen_streams,
|
||||
//mut in_game_streams,
|
||||
mut controllers,
|
||||
settings,
|
||||
editable_settings,
|
||||
alias_validator,
|
||||
): Self::SystemData,
|
||||
) {
|
||||
span!(_guard, "run", "message::Sys::run");
|
||||
timer.start();
|
||||
|
||||
let mut server_emitter = server_event_bus.emitter();
|
||||
|
||||
let mut new_chat_msgs = Vec::new();
|
||||
|
||||
// Player list to send new players.
|
||||
let player_list = (&uids, &players, stats.maybe(), admins.maybe())
|
||||
.join()
|
||||
.map(|(uid, player, stats, admin)| {
|
||||
(*uid, PlayerInfo {
|
||||
is_online: true,
|
||||
is_admin: admin.is_some(),
|
||||
player_alias: player.alias.clone(),
|
||||
character: stats.map(|stats| CharacterInfo {
|
||||
name: stats.name.clone(),
|
||||
level: stats.level.level(),
|
||||
}),
|
||||
})
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
// List of new players to update player lists of all clients.
|
||||
let mut new_players = Vec::new();
|
||||
|
||||
for (entity, client, general_stream, ping_stream, register_stream, character_screen_stream) in (&entities, &mut clients, &mut general_streams, &mut ping_streams, &mut register_streams, &mut character_screen_streams).join() {
|
||||
let mut cnt = 0;
|
||||
|
||||
let network_err: Result<(), crate::error::Error> = block_on(async {
|
||||
//TIMEOUT 0.02 ms for msg handling
|
||||
let work_future = Self::handle_messages(
|
||||
&mut server_emitter,
|
||||
&mut new_chat_msgs,
|
||||
&player_list,
|
||||
&mut new_players,
|
||||
entity,
|
||||
client,
|
||||
&mut cnt,
|
||||
&character_loader,
|
||||
&terrain,
|
||||
&network_metrics,
|
||||
&player_metrics,
|
||||
&uids,
|
||||
&can_build,
|
||||
&force_updates,
|
||||
&mut stats,
|
||||
&chat_modes,
|
||||
&mut accounts,
|
||||
&mut block_changes,
|
||||
&mut admins,
|
||||
//&mut positions,
|
||||
//&mut velocities,
|
||||
//&mut orientations,
|
||||
&mut players,
|
||||
general_stream,
|
||||
ping_stream,
|
||||
register_stream,
|
||||
character_screen_stream,
|
||||
&mut controllers,
|
||||
&settings,
|
||||
&editable_settings,
|
||||
&alias_validator,
|
||||
);
|
||||
select!(
|
||||
_ = Delay::new(std::time::Duration::from_micros(20)).fuse() => Ok(()),
|
||||
err = work_future.fuse() => err,
|
||||
)
|
||||
});
|
||||
|
||||
// Network error
|
||||
if network_err.is_err() {
|
||||
debug!(?entity, "postbox error with client, disconnecting");
|
||||
player_metrics
|
||||
.clients_disconnected
|
||||
.with_label_values(&["network_error"])
|
||||
.inc();
|
||||
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
|
||||
} else if cnt > 0 {
|
||||
// Update client ping.
|
||||
client.last_ping = time.0
|
||||
} else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64
|
||||
// Timeout
|
||||
{
|
||||
info!(?entity, "timeout error with client, disconnecting");
|
||||
player_metrics
|
||||
.clients_disconnected
|
||||
.with_label_values(&["timeout"])
|
||||
.inc();
|
||||
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
|
||||
} else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 * 0.5 {
|
||||
// Try pinging the client if the timeout is nearing.
|
||||
//FIXME
|
||||
//client.send_msg(PingMsg::Ping);
|
||||
}
|
||||
}
|
||||
|
||||
// Handle new players.
|
||||
// Tell all clients to add them to the player list.
|
||||
for entity in new_players {
|
||||
if let (Some(uid), Some(player)) = (uids.get(entity), players.get(entity)) {
|
||||
let msg =
|
||||
ServerGeneral::PlayerListUpdate(PlayerListUpdate::Add(*uid, PlayerInfo {
|
||||
player_alias: player.alias.clone(),
|
||||
is_online: true,
|
||||
is_admin: admins.get(entity).is_some(),
|
||||
character: None, // new players will be on character select.
|
||||
}));
|
||||
for client in (&mut clients).join().filter(|c| c.registered) {
|
||||
//FIXME
|
||||
//client.send_msg(msg.clone())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle new chat messages.
|
||||
for (entity, msg) in new_chat_msgs {
|
||||
// Handle chat commands.
|
||||
if msg.message.starts_with("/") {
|
||||
if let (Some(entity), true) = (entity, msg.message.len() > 1) {
|
||||
let argv = String::from(&msg.message[1..]);
|
||||
server_emitter.emit(ServerEvent::ChatCmd(entity, argv));
|
||||
}
|
||||
} else {
|
||||
// Send chat message
|
||||
server_emitter.emit(ServerEvent::Chat(msg));
|
||||
}
|
||||
}
|
||||
|
||||
timer.end()
|
||||
}
|
||||
}
|
@ -1,7 +1,6 @@
|
||||
pub mod entity_sync;
|
||||
pub mod invite_timeout;
|
||||
pub mod msg;
|
||||
pub mod message;
|
||||
pub mod object;
|
||||
pub mod persistence;
|
||||
pub mod sentinel;
|
||||
@ -17,7 +16,11 @@ use std::{
|
||||
};
|
||||
|
||||
pub type EntitySyncTimer = SysTimer<entity_sync::Sys>;
|
||||
pub type MessageTimer = SysTimer<message::Sys>;
|
||||
pub type GeneralMsgTimer = SysTimer<msg::general::Sys>;
|
||||
pub type PingMsgTimer = SysTimer<msg::ping::Sys>;
|
||||
pub type RegisterMsgTimer = SysTimer<msg::register::Sys>;
|
||||
pub type CharacterScreenMsgTimer = SysTimer<msg::character_screen::Sys>;
|
||||
pub type InGameMsgTimer = SysTimer<msg::in_game::Sys>;
|
||||
pub type SentinelTimer = SysTimer<sentinel::Sys>;
|
||||
pub type SubscriptionTimer = SysTimer<subscription::Sys>;
|
||||
pub type TerrainTimer = SysTimer<terrain::Sys>;
|
||||
|
224
server/src/sys/msg/character_screen.rs
Normal file
224
server/src/sys/msg/character_screen.rs
Normal file
@ -0,0 +1,224 @@
|
||||
use super::super::SysTimer;
|
||||
use crate::{
|
||||
alias_validator::AliasValidator,
|
||||
character_creator,
|
||||
client::Client,
|
||||
persistence::character_loader::CharacterLoader,
|
||||
streams::{CharacterScreenStream, GeneralStream, GetStream},
|
||||
EditableSettings,
|
||||
};
|
||||
use common::{
|
||||
comp::{ChatType, Player, UnresolvedChatMsg},
|
||||
event::{EventBus, ServerEvent},
|
||||
msg::{ClientGeneral, ClientInGame, ServerGeneral},
|
||||
span,
|
||||
state::Time,
|
||||
sync::Uid,
|
||||
};
|
||||
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage};
|
||||
use tracing::debug;
|
||||
|
||||
impl Sys {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn handle_client_character_screen_msg(
|
||||
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
|
||||
new_chat_msgs: &mut Vec<(Option<specs::Entity>, UnresolvedChatMsg)>,
|
||||
entity: specs::Entity,
|
||||
client: &mut Client,
|
||||
character_screen_stream: &mut CharacterScreenStream,
|
||||
general_stream: &mut GeneralStream,
|
||||
character_loader: &ReadExpect<'_, CharacterLoader>,
|
||||
uids: &ReadStorage<'_, Uid>,
|
||||
players: &ReadStorage<'_, Player>,
|
||||
editable_settings: &ReadExpect<'_, EditableSettings>,
|
||||
alias_validator: &ReadExpect<'_, AliasValidator>,
|
||||
msg: ClientGeneral,
|
||||
) -> Result<(), crate::error::Error> {
|
||||
match msg {
|
||||
// Request spectator state
|
||||
ClientGeneral::Spectate if client.registered => {
|
||||
client.in_game = Some(ClientInGame::Spectator)
|
||||
},
|
||||
ClientGeneral::Spectate => debug!("dropped Spectate msg from unregistered client"),
|
||||
ClientGeneral::Character(character_id)
|
||||
if client.registered && client.in_game.is_none() =>
|
||||
{
|
||||
if let Some(player) = players.get(entity) {
|
||||
// Send a request to load the character's component data from the
|
||||
// DB. Once loaded, persisted components such as stats and inventory
|
||||
// will be inserted for the entity
|
||||
character_loader.load_character_data(
|
||||
entity,
|
||||
player.uuid().to_string(),
|
||||
character_id,
|
||||
);
|
||||
|
||||
// Start inserting non-persisted/default components for the entity
|
||||
// while we load the DB data
|
||||
server_emitter.emit(ServerEvent::InitCharacterData {
|
||||
entity,
|
||||
character_id,
|
||||
});
|
||||
|
||||
// Give the player a welcome message
|
||||
if !editable_settings.server_description.is_empty() {
|
||||
general_stream
|
||||
.send(ChatType::CommandInfo.server_msg(String::from(
|
||||
&*editable_settings.server_description,
|
||||
)))?;
|
||||
}
|
||||
|
||||
if !client.login_msg_sent {
|
||||
if let Some(player_uid) = uids.get(entity) {
|
||||
new_chat_msgs.push((None, UnresolvedChatMsg {
|
||||
chat_type: ChatType::Online(*player_uid),
|
||||
message: "".to_string(),
|
||||
}));
|
||||
|
||||
client.login_msg_sent = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
character_screen_stream.send(ServerGeneral::CharacterDataLoadError(
|
||||
String::from("Failed to fetch player entity"),
|
||||
))?
|
||||
}
|
||||
}
|
||||
ClientGeneral::Character(_) => {
|
||||
let registered = client.registered;
|
||||
let in_game = client.in_game;
|
||||
debug!(?registered, ?in_game, "dropped Character msg from client");
|
||||
},
|
||||
ClientGeneral::RequestCharacterList => {
|
||||
if let Some(player) = players.get(entity) {
|
||||
character_loader.load_character_list(entity, player.uuid().to_string())
|
||||
}
|
||||
},
|
||||
ClientGeneral::CreateCharacter { alias, tool, body } => {
|
||||
if let Err(error) = alias_validator.validate(&alias) {
|
||||
debug!(?error, ?alias, "denied alias as it contained a banned word");
|
||||
character_screen_stream
|
||||
.send(ServerGeneral::CharacterActionError(error.to_string()))?;
|
||||
} else if let Some(player) = players.get(entity) {
|
||||
character_creator::create_character(
|
||||
entity,
|
||||
player.uuid().to_string(),
|
||||
alias,
|
||||
tool,
|
||||
body,
|
||||
character_loader,
|
||||
);
|
||||
}
|
||||
},
|
||||
ClientGeneral::DeleteCharacter(character_id) => {
|
||||
if let Some(player) = players.get(entity) {
|
||||
character_loader.delete_character(
|
||||
entity,
|
||||
player.uuid().to_string(),
|
||||
character_id,
|
||||
);
|
||||
}
|
||||
},
|
||||
_ => unreachable!("not a client_character_screen msg"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// This system will handle new messages from clients
|
||||
pub struct Sys;
|
||||
impl<'a> System<'a> for Sys {
|
||||
#[allow(clippy::type_complexity)] // TODO: Pending review in #587
|
||||
type SystemData = (
|
||||
Entities<'a>,
|
||||
Read<'a, EventBus<ServerEvent>>,
|
||||
Read<'a, Time>,
|
||||
ReadExpect<'a, CharacterLoader>,
|
||||
Write<'a, SysTimer<Self>>,
|
||||
ReadStorage<'a, Uid>,
|
||||
WriteStorage<'a, Client>,
|
||||
ReadStorage<'a, Player>,
|
||||
WriteStorage<'a, CharacterScreenStream>,
|
||||
WriteStorage<'a, GeneralStream>,
|
||||
ReadExpect<'a, EditableSettings>,
|
||||
ReadExpect<'a, AliasValidator>,
|
||||
);
|
||||
|
||||
#[allow(clippy::match_ref_pats)] // TODO: Pending review in #587
|
||||
#[allow(clippy::single_char_pattern)] // TODO: Pending review in #587
|
||||
#[allow(clippy::single_match)] // TODO: Pending review in #587
|
||||
fn run(
|
||||
&mut self,
|
||||
(
|
||||
entities,
|
||||
server_event_bus,
|
||||
time,
|
||||
character_loader,
|
||||
mut timer,
|
||||
uids,
|
||||
mut clients,
|
||||
players,
|
||||
mut character_screen_streams,
|
||||
mut general_streams,
|
||||
editable_settings,
|
||||
alias_validator,
|
||||
): Self::SystemData,
|
||||
) {
|
||||
span!(_guard, "run", "msg::character_screen::Sys::run");
|
||||
timer.start();
|
||||
|
||||
let mut server_emitter = server_event_bus.emitter();
|
||||
let mut new_chat_msgs = Vec::new();
|
||||
|
||||
for (entity, client, character_screen_stream, general_stream) in (
|
||||
&entities,
|
||||
&mut clients,
|
||||
&mut character_screen_streams,
|
||||
&mut general_streams,
|
||||
)
|
||||
.join()
|
||||
{
|
||||
let res =
|
||||
super::try_recv_all(character_screen_stream, |character_screen_stream, msg| {
|
||||
Self::handle_client_character_screen_msg(
|
||||
&mut server_emitter,
|
||||
&mut new_chat_msgs,
|
||||
entity,
|
||||
client,
|
||||
character_screen_stream,
|
||||
general_stream,
|
||||
&character_loader,
|
||||
&uids,
|
||||
&players,
|
||||
&editable_settings,
|
||||
&alias_validator,
|
||||
msg,
|
||||
)
|
||||
});
|
||||
|
||||
match res {
|
||||
Ok(1_u64..=u64::MAX) => {
|
||||
// Update client ping.
|
||||
client.last_ping = time.0
|
||||
},
|
||||
_ => (/*handled by ping*/),
|
||||
}
|
||||
}
|
||||
|
||||
// Handle new chat messages.
|
||||
for (entity, msg) in new_chat_msgs {
|
||||
// Handle chat commands.
|
||||
if msg.message.starts_with("/") {
|
||||
if let (Some(entity), true) = (entity, msg.message.len() > 1) {
|
||||
let argv = String::from(&msg.message[1..]);
|
||||
server_emitter.emit(ServerEvent::ChatCmd(entity, argv));
|
||||
}
|
||||
} else {
|
||||
// Send chat message
|
||||
server_emitter.emit(ServerEvent::Chat(msg));
|
||||
}
|
||||
}
|
||||
|
||||
timer.end()
|
||||
}
|
||||
}
|
153
server/src/sys/msg/general.rs
Normal file
153
server/src/sys/msg/general.rs
Normal file
@ -0,0 +1,153 @@
|
||||
use super::super::SysTimer;
|
||||
use crate::{
|
||||
client::Client,
|
||||
metrics::PlayerMetrics,
|
||||
streams::{GeneralStream, GetStream},
|
||||
};
|
||||
use common::{
|
||||
comp::{ChatMode, UnresolvedChatMsg},
|
||||
event::{EventBus, ServerEvent},
|
||||
msg::{
|
||||
validate_chat_msg, ChatMsgValidationError, ClientGeneral, DisconnectReason, ServerGeneral,
|
||||
MAX_BYTES_CHAT_MSG,
|
||||
},
|
||||
span,
|
||||
state::Time,
|
||||
sync::Uid,
|
||||
};
|
||||
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage};
|
||||
use tracing::{debug, error, warn};
|
||||
|
||||
impl Sys {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn handle_general_msg(
|
||||
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
|
||||
new_chat_msgs: &mut Vec<(Option<specs::Entity>, UnresolvedChatMsg)>,
|
||||
entity: specs::Entity,
|
||||
client: &mut Client,
|
||||
general_stream: &mut GeneralStream,
|
||||
player_metrics: &ReadExpect<'_, PlayerMetrics>,
|
||||
uids: &ReadStorage<'_, Uid>,
|
||||
chat_modes: &ReadStorage<'_, ChatMode>,
|
||||
msg: ClientGeneral,
|
||||
) -> Result<(), crate::error::Error> {
|
||||
match msg {
|
||||
ClientGeneral::ChatMsg(message) => {
|
||||
if client.registered {
|
||||
match validate_chat_msg(&message) {
|
||||
Ok(()) => {
|
||||
if let Some(from) = uids.get(entity) {
|
||||
let mode = chat_modes.get(entity).cloned().unwrap_or_default();
|
||||
let msg = mode.new_message(*from, message);
|
||||
new_chat_msgs.push((Some(entity), msg));
|
||||
} else {
|
||||
error!("Could not send message. Missing player uid");
|
||||
}
|
||||
},
|
||||
Err(ChatMsgValidationError::TooLong) => {
|
||||
let max = MAX_BYTES_CHAT_MSG;
|
||||
let len = message.len();
|
||||
warn!(?len, ?max, "Received a chat message that's too long")
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
ClientGeneral::Disconnect => {
|
||||
general_stream.send(ServerGeneral::Disconnect(DisconnectReason::Requested))?;
|
||||
},
|
||||
ClientGeneral::Terminate => {
|
||||
debug!(?entity, "Client send message to termitate session");
|
||||
player_metrics
|
||||
.clients_disconnected
|
||||
.with_label_values(&["gracefully"])
|
||||
.inc();
|
||||
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
|
||||
},
|
||||
_ => unreachable!("not a client_general msg"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// This system will handle new messages from clients
|
||||
pub struct Sys;
|
||||
impl<'a> System<'a> for Sys {
|
||||
#[allow(clippy::type_complexity)] // TODO: Pending review in #587
|
||||
type SystemData = (
|
||||
Entities<'a>,
|
||||
Read<'a, EventBus<ServerEvent>>,
|
||||
Read<'a, Time>,
|
||||
ReadExpect<'a, PlayerMetrics>,
|
||||
Write<'a, SysTimer<Self>>,
|
||||
ReadStorage<'a, Uid>,
|
||||
ReadStorage<'a, ChatMode>,
|
||||
WriteStorage<'a, Client>,
|
||||
WriteStorage<'a, GeneralStream>,
|
||||
);
|
||||
|
||||
#[allow(clippy::match_ref_pats)] // TODO: Pending review in #587
|
||||
#[allow(clippy::single_char_pattern)] // TODO: Pending review in #587
|
||||
#[allow(clippy::single_match)] // TODO: Pending review in #587
|
||||
fn run(
|
||||
&mut self,
|
||||
(
|
||||
entities,
|
||||
server_event_bus,
|
||||
time,
|
||||
player_metrics,
|
||||
mut timer,
|
||||
uids,
|
||||
chat_modes,
|
||||
mut clients,
|
||||
mut general_streams,
|
||||
): Self::SystemData,
|
||||
) {
|
||||
span!(_guard, "run", "msg::general::Sys::run");
|
||||
timer.start();
|
||||
|
||||
let mut server_emitter = server_event_bus.emitter();
|
||||
let mut new_chat_msgs = Vec::new();
|
||||
|
||||
for (entity, client, general_stream) in
|
||||
(&entities, &mut clients, &mut general_streams).join()
|
||||
{
|
||||
let res = super::try_recv_all(general_stream, |general_stream, msg| {
|
||||
Self::handle_general_msg(
|
||||
&mut server_emitter,
|
||||
&mut new_chat_msgs,
|
||||
entity,
|
||||
client,
|
||||
general_stream,
|
||||
&player_metrics,
|
||||
&uids,
|
||||
&chat_modes,
|
||||
msg,
|
||||
)
|
||||
});
|
||||
|
||||
match res {
|
||||
Ok(1_u64..=u64::MAX) => {
|
||||
// Update client ping.
|
||||
client.last_ping = time.0
|
||||
},
|
||||
_ => (/*handled by ping*/),
|
||||
}
|
||||
}
|
||||
|
||||
// Handle new chat messages.
|
||||
for (entity, msg) in new_chat_msgs {
|
||||
// Handle chat commands.
|
||||
if msg.message.starts_with("/") {
|
||||
if let (Some(entity), true) = (entity, msg.message.len() > 1) {
|
||||
let argv = String::from(&msg.message[1..]);
|
||||
server_emitter.emit(ServerEvent::ChatCmd(entity, argv));
|
||||
}
|
||||
} else {
|
||||
// Send chat message
|
||||
server_emitter.emit(ServerEvent::Chat(msg));
|
||||
}
|
||||
}
|
||||
|
||||
timer.end()
|
||||
}
|
||||
}
|
@ -1,33 +1,20 @@
|
||||
use super::super::SysTimer;
|
||||
use crate::{
|
||||
alias_validator::AliasValidator,
|
||||
client::{
|
||||
Client, InGameStream,
|
||||
},
|
||||
login_provider::LoginProvider,
|
||||
metrics::{NetworkRequestMetrics, PlayerMetrics},
|
||||
persistence::character_loader::CharacterLoader,
|
||||
EditableSettings, Settings,
|
||||
client::Client,
|
||||
metrics::NetworkRequestMetrics,
|
||||
streams::{GetStream, InGameStream},
|
||||
Settings,
|
||||
};
|
||||
use common::{
|
||||
comp::{
|
||||
Admin, CanBuild, ChatMode, ControlEvent, Controller, ForceUpdate, Ori, Player,
|
||||
Pos, Stats, Vel,
|
||||
},
|
||||
comp::{CanBuild, ControlEvent, Controller, ForceUpdate, Ori, Player, Pos, Stats, Vel},
|
||||
event::{EventBus, ServerEvent},
|
||||
msg::{
|
||||
ClientGeneral, ClientInGame,
|
||||
ServerGeneral,
|
||||
},
|
||||
msg::{ClientGeneral, ClientInGame, ServerGeneral},
|
||||
span,
|
||||
state::{BlockChange, Time},
|
||||
sync::Uid,
|
||||
terrain::{TerrainChunkSize, TerrainGrid},
|
||||
vol::{ReadVol, RectVolSize},
|
||||
};
|
||||
use specs::{
|
||||
Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage,
|
||||
};
|
||||
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage};
|
||||
use tracing::{debug, trace};
|
||||
|
||||
impl Sys {
|
||||
@ -64,7 +51,7 @@ impl Sys {
|
||||
ClientGeneral::ExitInGame => {
|
||||
client.in_game = None;
|
||||
server_emitter.emit(ServerEvent::ExitIngame { entity });
|
||||
in_game_stream.0.send(ServerGeneral::ExitInGameSuccess)?;
|
||||
in_game_stream.send(ServerGeneral::ExitInGameSuccess)?;
|
||||
},
|
||||
ClientGeneral::SetViewDistance(view_distance) => {
|
||||
players.get_mut(entity).map(|player| {
|
||||
@ -82,7 +69,7 @@ impl Sys {
|
||||
.map(|max| view_distance > max)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
in_game_stream.0.send(ServerGeneral::SetViewDistance(
|
||||
in_game_stream.send(ServerGeneral::SetViewDistance(
|
||||
settings.max_view_distance.unwrap_or(0),
|
||||
))?;
|
||||
}
|
||||
@ -152,7 +139,7 @@ impl Sys {
|
||||
match terrain.get_key(key) {
|
||||
Some(chunk) => {
|
||||
network_metrics.chunks_served_from_memory.inc();
|
||||
in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate {
|
||||
in_game_stream.send(ServerGeneral::TerrainChunkUpdate {
|
||||
key,
|
||||
chunk: Ok(Box::new(chunk.clone())),
|
||||
})?
|
||||
@ -195,19 +182,13 @@ impl<'a> System<'a> for Sys {
|
||||
Entities<'a>,
|
||||
Read<'a, EventBus<ServerEvent>>,
|
||||
Read<'a, Time>,
|
||||
ReadExpect<'a, CharacterLoader>,
|
||||
ReadExpect<'a, TerrainGrid>,
|
||||
ReadExpect<'a, NetworkRequestMetrics>,
|
||||
ReadExpect<'a, PlayerMetrics>,
|
||||
Write<'a, SysTimer<Self>>,
|
||||
ReadStorage<'a, Uid>,
|
||||
ReadStorage<'a, CanBuild>,
|
||||
ReadStorage<'a, ForceUpdate>,
|
||||
WriteStorage<'a, Stats>,
|
||||
ReadStorage<'a, ChatMode>,
|
||||
WriteExpect<'a, LoginProvider>,
|
||||
Write<'a, BlockChange>,
|
||||
WriteStorage<'a, Admin>,
|
||||
WriteStorage<'a, Pos>,
|
||||
WriteStorage<'a, Vel>,
|
||||
WriteStorage<'a, Ori>,
|
||||
@ -216,8 +197,6 @@ impl<'a> System<'a> for Sys {
|
||||
WriteStorage<'a, InGameStream>,
|
||||
WriteStorage<'a, Controller>,
|
||||
Read<'a, Settings>,
|
||||
ReadExpect<'a, EditableSettings>,
|
||||
ReadExpect<'a, AliasValidator>,
|
||||
);
|
||||
|
||||
#[allow(clippy::match_ref_pats)] // TODO: Pending review in #587
|
||||
@ -229,19 +208,13 @@ impl<'a> System<'a> for Sys {
|
||||
entities,
|
||||
server_event_bus,
|
||||
time,
|
||||
character_loader,
|
||||
terrain,
|
||||
network_metrics,
|
||||
player_metrics,
|
||||
mut timer,
|
||||
uids,
|
||||
can_build,
|
||||
force_updates,
|
||||
mut stats,
|
||||
chat_modes,
|
||||
mut accounts,
|
||||
mut block_changes,
|
||||
mut admins,
|
||||
mut positions,
|
||||
mut velocities,
|
||||
mut orientations,
|
||||
@ -250,8 +223,6 @@ impl<'a> System<'a> for Sys {
|
||||
mut in_game_streams,
|
||||
mut controllers,
|
||||
settings,
|
||||
editable_settings,
|
||||
alias_validator,
|
||||
): Self::SystemData,
|
||||
) {
|
||||
span!(_guard, "run", "msg::in_game::Sys::run");
|
||||
@ -259,43 +230,37 @@ impl<'a> System<'a> for Sys {
|
||||
|
||||
let mut server_emitter = server_event_bus.emitter();
|
||||
|
||||
for (entity, client, mut in_game_stream) in (&entities, &mut clients, &mut in_game_streams).join() {
|
||||
let mut cnt = 0;
|
||||
for (entity, client, in_game_stream) in
|
||||
(&entities, &mut clients, &mut in_game_streams).join()
|
||||
{
|
||||
let res = super::try_recv_all(in_game_stream, |in_game_stream, msg| {
|
||||
Self::handle_client_in_game_msg(
|
||||
&mut server_emitter,
|
||||
entity,
|
||||
client,
|
||||
in_game_stream,
|
||||
&terrain,
|
||||
&network_metrics,
|
||||
&can_build,
|
||||
&force_updates,
|
||||
&mut stats,
|
||||
&mut block_changes,
|
||||
&mut positions,
|
||||
&mut velocities,
|
||||
&mut orientations,
|
||||
&mut players,
|
||||
&mut controllers,
|
||||
&settings,
|
||||
msg,
|
||||
)
|
||||
});
|
||||
|
||||
let network_err: Result<(), crate::error::Error> = {
|
||||
loop {
|
||||
let msg = match in_game_stream.0.try_recv() {
|
||||
Ok(Some(msg)) => msg,
|
||||
Ok(None) => break Ok(()),
|
||||
Err(e) => break Err(e.into()),
|
||||
};
|
||||
if let Err(e) = Self::handle_client_in_game_msg(
|
||||
&mut server_emitter,
|
||||
entity,
|
||||
client,
|
||||
&mut in_game_stream,
|
||||
&terrain,
|
||||
&network_metrics,
|
||||
&can_build,
|
||||
&force_updates,
|
||||
&mut stats,
|
||||
&mut block_changes,
|
||||
&mut positions,
|
||||
&mut velocities,
|
||||
&mut orientations,
|
||||
&mut players,
|
||||
&mut controllers,
|
||||
&settings,
|
||||
msg,
|
||||
) {
|
||||
break Err(e);
|
||||
}
|
||||
cnt += 1;
|
||||
}
|
||||
};
|
||||
|
||||
if cnt > 0 { // Update client ping.
|
||||
client.last_ping = time.0
|
||||
match res {
|
||||
Ok(1_u64..=u64::MAX) => {
|
||||
// Update client ping.
|
||||
client.last_ping = time.0
|
||||
},
|
||||
_ => (/*handled by ping*/),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1 +1,28 @@
|
||||
mod in_game;
|
||||
pub mod character_screen;
|
||||
pub mod general;
|
||||
pub mod in_game;
|
||||
pub mod ping;
|
||||
pub mod register;
|
||||
|
||||
use crate::streams::GetStream;
|
||||
|
||||
/// handles all send msg and calls a handle fn
|
||||
/// Aborts when a error occurred returns cnt of successful msg otherwise
|
||||
pub(crate) fn try_recv_all<T, F>(stream: &mut T, mut f: F) -> Result<u64, crate::error::Error>
|
||||
where
|
||||
T: GetStream,
|
||||
F: FnMut(&mut T, T::RecvMsg) -> Result<(), crate::error::Error>,
|
||||
{
|
||||
let mut cnt = 0u64;
|
||||
loop {
|
||||
let msg = match stream.get_mut().try_recv() {
|
||||
Ok(Some(msg)) => msg,
|
||||
Ok(None) => break Ok(cnt),
|
||||
Err(e) => break Err(e.into()),
|
||||
};
|
||||
if let Err(e) = f(stream, msg) {
|
||||
break Err(e);
|
||||
}
|
||||
cnt += 1;
|
||||
}
|
||||
}
|
||||
|
106
server/src/sys/msg/ping.rs
Normal file
106
server/src/sys/msg/ping.rs
Normal file
@ -0,0 +1,106 @@
|
||||
use super::super::SysTimer;
|
||||
use crate::{
|
||||
client::Client,
|
||||
metrics::PlayerMetrics,
|
||||
streams::{GetStream, PingStream},
|
||||
Settings,
|
||||
};
|
||||
use common::{
|
||||
event::{EventBus, ServerEvent},
|
||||
msg::PingMsg,
|
||||
span,
|
||||
state::Time,
|
||||
};
|
||||
use specs::{Entities, Join, Read, ReadExpect, System, Write, WriteStorage};
|
||||
use tracing::{debug, info};
|
||||
|
||||
impl Sys {
|
||||
fn handle_ping_msg(
|
||||
ping_stream: &mut PingStream,
|
||||
msg: PingMsg,
|
||||
) -> Result<(), crate::error::Error> {
|
||||
match msg {
|
||||
PingMsg::Ping => ping_stream.send(PingMsg::Pong)?,
|
||||
PingMsg::Pong => {},
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// This system will handle new messages from clients
|
||||
pub struct Sys;
|
||||
impl<'a> System<'a> for Sys {
|
||||
#[allow(clippy::type_complexity)] // TODO: Pending review in #587
|
||||
type SystemData = (
|
||||
Entities<'a>,
|
||||
Read<'a, EventBus<ServerEvent>>,
|
||||
Read<'a, Time>,
|
||||
ReadExpect<'a, PlayerMetrics>,
|
||||
Write<'a, SysTimer<Self>>,
|
||||
WriteStorage<'a, Client>,
|
||||
WriteStorage<'a, PingStream>,
|
||||
Read<'a, Settings>,
|
||||
);
|
||||
|
||||
#[allow(clippy::match_ref_pats)] // TODO: Pending review in #587
|
||||
#[allow(clippy::single_char_pattern)] // TODO: Pending review in #587
|
||||
#[allow(clippy::single_match)] // TODO: Pending review in #587
|
||||
fn run(
|
||||
&mut self,
|
||||
(
|
||||
entities,
|
||||
server_event_bus,
|
||||
time,
|
||||
player_metrics,
|
||||
mut timer,
|
||||
mut clients,
|
||||
mut ping_streams,
|
||||
settings,
|
||||
): Self::SystemData,
|
||||
) {
|
||||
span!(_guard, "run", "msg::ping::Sys::run");
|
||||
timer.start();
|
||||
|
||||
let mut server_emitter = server_event_bus.emitter();
|
||||
|
||||
for (entity, client, ping_stream) in (&entities, &mut clients, &mut ping_streams).join() {
|
||||
let res = super::try_recv_all(ping_stream, |ping_stream, msg| {
|
||||
Self::handle_ping_msg(ping_stream, msg)
|
||||
});
|
||||
|
||||
match res {
|
||||
Err(e) => {
|
||||
debug!(?entity, ?e, "network error with client, disconnecting");
|
||||
player_metrics
|
||||
.clients_disconnected
|
||||
.with_label_values(&["network_error"])
|
||||
.inc();
|
||||
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
|
||||
},
|
||||
Ok(1_u64..=u64::MAX) => {
|
||||
// Update client ping.
|
||||
client.last_ping = time.0
|
||||
},
|
||||
Ok(0) => {
|
||||
if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64
|
||||
// Timeout
|
||||
{
|
||||
info!(?entity, "timeout error with client, disconnecting");
|
||||
player_metrics
|
||||
.clients_disconnected
|
||||
.with_label_values(&["timeout"])
|
||||
.inc();
|
||||
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
|
||||
} else if time.0 - client.last_ping
|
||||
> settings.client_timeout.as_secs() as f64 * 0.5
|
||||
{
|
||||
// Try pinging the client if the timeout is nearing.
|
||||
ping_stream.send_unchecked(PingMsg::Ping);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
timer.end()
|
||||
}
|
||||
}
|
208
server/src/sys/msg/register.rs
Normal file
208
server/src/sys/msg/register.rs
Normal file
@ -0,0 +1,208 @@
|
||||
use super::super::SysTimer;
|
||||
use crate::{
|
||||
client::Client,
|
||||
login_provider::LoginProvider,
|
||||
metrics::PlayerMetrics,
|
||||
streams::{GeneralStream, GetStream, RegisterStream},
|
||||
EditableSettings,
|
||||
};
|
||||
use common::{
|
||||
comp::{Admin, Player, Stats},
|
||||
msg::{
|
||||
CharacterInfo, ClientRegister, PlayerInfo, PlayerListUpdate, RegisterError, ServerGeneral,
|
||||
ServerRegisterAnswer,
|
||||
},
|
||||
span,
|
||||
state::Time,
|
||||
sync::Uid,
|
||||
};
|
||||
use hashbrown::HashMap;
|
||||
use specs::{
|
||||
Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage,
|
||||
};
|
||||
|
||||
impl Sys {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn handle_register_msg(
|
||||
player_list: &HashMap<Uid, PlayerInfo>,
|
||||
new_players: &mut Vec<specs::Entity>,
|
||||
entity: specs::Entity,
|
||||
client: &mut Client,
|
||||
register_stream: &mut RegisterStream,
|
||||
general_stream: &mut GeneralStream,
|
||||
player_metrics: &ReadExpect<'_, PlayerMetrics>,
|
||||
login_provider: &mut WriteExpect<'_, LoginProvider>,
|
||||
admins: &mut WriteStorage<'_, Admin>,
|
||||
players: &mut WriteStorage<'_, Player>,
|
||||
editable_settings: &ReadExpect<'_, EditableSettings>,
|
||||
msg: ClientRegister,
|
||||
) -> Result<(), crate::error::Error> {
|
||||
let (username, uuid) = match login_provider.try_login(
|
||||
&msg.token_or_username,
|
||||
&*editable_settings.admins,
|
||||
&*editable_settings.whitelist,
|
||||
&*editable_settings.banlist,
|
||||
) {
|
||||
Err(err) => {
|
||||
register_stream.send(ServerRegisterAnswer::Err(err))?;
|
||||
return Ok(());
|
||||
},
|
||||
Ok((username, uuid)) => (username, uuid),
|
||||
};
|
||||
|
||||
const INITIAL_VD: Option<u32> = Some(5); //will be changed after login
|
||||
let player = Player::new(username, None, INITIAL_VD, uuid);
|
||||
let is_admin = editable_settings.admins.contains(&uuid);
|
||||
|
||||
if !player.is_valid() {
|
||||
// Invalid player
|
||||
register_stream.send(ServerRegisterAnswer::Err(RegisterError::InvalidCharacter))?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if !client.registered && client.in_game.is_none() {
|
||||
// Add Player component to this client
|
||||
let _ = players.insert(entity, player);
|
||||
player_metrics.players_connected.inc();
|
||||
|
||||
// Give the Admin component to the player if their name exists in
|
||||
// admin list
|
||||
if is_admin {
|
||||
let _ = admins.insert(entity, Admin);
|
||||
}
|
||||
|
||||
// Tell the client its request was successful.
|
||||
client.registered = true;
|
||||
register_stream.send(ServerRegisterAnswer::Ok(()))?;
|
||||
|
||||
// Send initial player list
|
||||
general_stream.send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init(
|
||||
player_list.clone(),
|
||||
)))?;
|
||||
|
||||
// Add to list to notify all clients of the new player
|
||||
new_players.push(entity);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// This system will handle new messages from clients
|
||||
pub struct Sys;
|
||||
impl<'a> System<'a> for Sys {
|
||||
#[allow(clippy::type_complexity)] // TODO: Pending review in #587
|
||||
type SystemData = (
|
||||
Entities<'a>,
|
||||
Read<'a, Time>,
|
||||
ReadExpect<'a, PlayerMetrics>,
|
||||
Write<'a, SysTimer<Self>>,
|
||||
ReadStorage<'a, Uid>,
|
||||
WriteStorage<'a, Client>,
|
||||
WriteStorage<'a, Player>,
|
||||
ReadStorage<'a, Stats>,
|
||||
WriteExpect<'a, LoginProvider>,
|
||||
WriteStorage<'a, Admin>,
|
||||
WriteStorage<'a, RegisterStream>,
|
||||
WriteStorage<'a, GeneralStream>,
|
||||
ReadExpect<'a, EditableSettings>,
|
||||
);
|
||||
|
||||
#[allow(clippy::match_ref_pats)] // TODO: Pending review in #587
|
||||
#[allow(clippy::single_char_pattern)] // TODO: Pending review in #587
|
||||
#[allow(clippy::single_match)] // TODO: Pending review in #587
|
||||
fn run(
|
||||
&mut self,
|
||||
(
|
||||
entities,
|
||||
time,
|
||||
player_metrics,
|
||||
mut timer,
|
||||
uids,
|
||||
mut clients,
|
||||
mut players,
|
||||
stats,
|
||||
mut login_provider,
|
||||
mut admins,
|
||||
mut register_streams,
|
||||
mut general_streams,
|
||||
editable_settings,
|
||||
): Self::SystemData,
|
||||
) {
|
||||
span!(_guard, "run", "msg::register::Sys::run");
|
||||
timer.start();
|
||||
|
||||
// Player list to send new players.
|
||||
let player_list = (&uids, &players, stats.maybe(), admins.maybe())
|
||||
.join()
|
||||
.map(|(uid, player, stats, admin)| {
|
||||
(*uid, PlayerInfo {
|
||||
is_online: true,
|
||||
is_admin: admin.is_some(),
|
||||
player_alias: player.alias.clone(),
|
||||
character: stats.map(|stats| CharacterInfo {
|
||||
name: stats.name.clone(),
|
||||
level: stats.level.level(),
|
||||
}),
|
||||
})
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
// List of new players to update player lists of all clients.
|
||||
let mut new_players = Vec::new();
|
||||
|
||||
for (entity, client, register_stream, general_stream) in (
|
||||
&entities,
|
||||
&mut clients,
|
||||
&mut register_streams,
|
||||
&mut general_streams,
|
||||
)
|
||||
.join()
|
||||
{
|
||||
let res = super::try_recv_all(register_stream, |register_stream, msg| {
|
||||
Self::handle_register_msg(
|
||||
&player_list,
|
||||
&mut new_players,
|
||||
entity,
|
||||
client,
|
||||
register_stream,
|
||||
general_stream,
|
||||
&player_metrics,
|
||||
&mut login_provider,
|
||||
&mut admins,
|
||||
&mut players,
|
||||
&editable_settings,
|
||||
msg,
|
||||
)
|
||||
});
|
||||
|
||||
match res {
|
||||
Ok(1_u64..=u64::MAX) => {
|
||||
// Update client ping.
|
||||
client.last_ping = time.0
|
||||
},
|
||||
_ => (/*handled by ping*/),
|
||||
}
|
||||
}
|
||||
|
||||
// Handle new players.
|
||||
// Tell all clients to add them to the player list.
|
||||
for entity in new_players {
|
||||
if let (Some(uid), Some(player)) = (uids.get(entity), players.get(entity)) {
|
||||
let msg =
|
||||
ServerGeneral::PlayerListUpdate(PlayerListUpdate::Add(*uid, PlayerInfo {
|
||||
player_alias: player.alias.clone(),
|
||||
is_online: true,
|
||||
is_admin: admins.get(entity).is_some(),
|
||||
character: None, // new players will be on character select.
|
||||
}));
|
||||
for (_, general_stream) in (&mut clients, &mut general_streams)
|
||||
.join()
|
||||
.filter(|(c, _)| c.registered)
|
||||
{
|
||||
let _ = general_stream.send(msg.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
timer.end()
|
||||
}
|
||||
}
|
@ -2,7 +2,10 @@ use super::{
|
||||
sentinel::{DeletedEntities, TrackedComps},
|
||||
SysTimer,
|
||||
};
|
||||
use crate::client::{self, Client, InGameStream, RegionSubscription};
|
||||
use crate::{
|
||||
client::{self, Client, RegionSubscription},
|
||||
streams::{GeneralStream, GetStream},
|
||||
};
|
||||
use common::{
|
||||
comp::{Ori, Player, Pos, Vel},
|
||||
msg::ServerGeneral,
|
||||
@ -33,7 +36,7 @@ impl<'a> System<'a> for Sys {
|
||||
ReadStorage<'a, Ori>,
|
||||
ReadStorage<'a, Player>,
|
||||
ReadStorage<'a, Client>,
|
||||
WriteStorage<'a, InGameStream>,
|
||||
WriteStorage<'a, GeneralStream>,
|
||||
WriteStorage<'a, RegionSubscription>,
|
||||
Write<'a, DeletedEntities>,
|
||||
TrackedComps<'a>,
|
||||
@ -52,7 +55,7 @@ impl<'a> System<'a> for Sys {
|
||||
orientations,
|
||||
players,
|
||||
clients,
|
||||
mut in_game_streams,
|
||||
mut general_streams,
|
||||
mut subscriptions,
|
||||
mut deleted_entities,
|
||||
tracked_comps,
|
||||
@ -73,13 +76,13 @@ impl<'a> System<'a> for Sys {
|
||||
// 7. Determine list of regions that are in range and iterate through it
|
||||
// - check if in hashset (hash calc) if not add it
|
||||
let mut regions_to_remove = Vec::new();
|
||||
for (_, subscription, pos, vd, client_entity, in_game_stream) in (
|
||||
for (_, subscription, pos, vd, client_entity, general_stream) in (
|
||||
&clients,
|
||||
&mut subscriptions,
|
||||
&positions,
|
||||
&players,
|
||||
&entities,
|
||||
&mut in_game_streams,
|
||||
&mut general_streams,
|
||||
)
|
||||
.join()
|
||||
.filter_map(|(client, s, pos, player, e, stream)| {
|
||||
@ -156,9 +159,8 @@ impl<'a> System<'a> for Sys {
|
||||
.map(|key| subscription.regions.contains(key))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
let _ = in_game_stream
|
||||
.0
|
||||
.send(ServerGeneral::DeleteEntity(uid));
|
||||
general_stream
|
||||
.send_unchecked(ServerGeneral::DeleteEntity(uid));
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -166,7 +168,7 @@ impl<'a> System<'a> for Sys {
|
||||
}
|
||||
// Tell client to delete entities in the region
|
||||
for (&uid, _) in (&uids, region.entities()).join() {
|
||||
let _ = in_game_stream.0.send(ServerGeneral::DeleteEntity(uid));
|
||||
let _ = general_stream.send(ServerGeneral::DeleteEntity(uid));
|
||||
}
|
||||
}
|
||||
// Send deleted entities since they won't be processed for this client in entity
|
||||
@ -176,9 +178,7 @@ impl<'a> System<'a> for Sys {
|
||||
.iter()
|
||||
.flat_map(|v| v.iter())
|
||||
{
|
||||
let _ = in_game_stream
|
||||
.0
|
||||
.send(ServerGeneral::DeleteEntity(Uid(*uid)));
|
||||
general_stream.send_unchecked(ServerGeneral::DeleteEntity(Uid(*uid)));
|
||||
}
|
||||
}
|
||||
|
||||
@ -203,7 +203,7 @@ impl<'a> System<'a> for Sys {
|
||||
{
|
||||
// Send message to create entity and tracked components and physics
|
||||
// components
|
||||
let _ = in_game_stream.0.send(ServerGeneral::CreateEntity(
|
||||
general_stream.send_unchecked(ServerGeneral::CreateEntity(
|
||||
tracked_comps.create_entity_package(
|
||||
entity,
|
||||
Some(*pos),
|
||||
@ -224,14 +224,14 @@ impl<'a> System<'a> for Sys {
|
||||
|
||||
/// Initialize region subscription
|
||||
pub fn initialize_region_subscription(world: &World, entity: specs::Entity) {
|
||||
if let (Some(client_pos), Some(client_vd), Some(in_game_stream)) = (
|
||||
if let (Some(client_pos), Some(client_vd), Some(general_stream)) = (
|
||||
world.read_storage::<Pos>().get(entity),
|
||||
world
|
||||
.read_storage::<Player>()
|
||||
.get(entity)
|
||||
.map(|pl| pl.view_distance)
|
||||
.and_then(|v| v),
|
||||
world.write_storage::<InGameStream>().get_mut(entity),
|
||||
world.write_storage::<GeneralStream>().get_mut(entity),
|
||||
) {
|
||||
let fuzzy_chunk = (Vec2::<f32>::from(client_pos.0))
|
||||
.map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32);
|
||||
@ -256,7 +256,7 @@ pub fn initialize_region_subscription(world: &World, entity: specs::Entity) {
|
||||
.join()
|
||||
{
|
||||
// Send message to create entity and tracked components and physics components
|
||||
let _ = in_game_stream.0.send(ServerGeneral::CreateEntity(
|
||||
general_stream.send_unchecked(ServerGeneral::CreateEntity(
|
||||
tracked_comps.create_entity_package(
|
||||
entity,
|
||||
Some(*pos),
|
||||
|
@ -1,5 +1,9 @@
|
||||
use super::SysTimer;
|
||||
use crate::{chunk_generator::ChunkGenerator, client::InGameStream, Tick};
|
||||
use crate::{
|
||||
chunk_generator::ChunkGenerator,
|
||||
streams::{GetStream, InGameStream},
|
||||
Tick,
|
||||
};
|
||||
use common::{
|
||||
comp::{self, bird_medium, Alignment, Player, Pos},
|
||||
event::{EventBus, ServerEvent},
|
||||
@ -63,7 +67,7 @@ impl<'a> System<'a> for Sys {
|
||||
Ok((chunk, supplement)) => (chunk, supplement),
|
||||
Err(Some(entity)) => {
|
||||
if let Some(in_game_stream) = in_game_streams.get_mut(entity) {
|
||||
let _ = in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate {
|
||||
in_game_stream.send_unchecked(ServerGeneral::TerrainChunkUpdate {
|
||||
key,
|
||||
chunk: Err(()),
|
||||
});
|
||||
@ -90,7 +94,7 @@ impl<'a> System<'a> for Sys {
|
||||
.magnitude_squared();
|
||||
|
||||
if adjusted_dist_sqr <= view_distance.pow(2) {
|
||||
let _ = in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate {
|
||||
in_game_stream.send_unchecked(ServerGeneral::TerrainChunkUpdate {
|
||||
key,
|
||||
chunk: Ok(Box::new(chunk.clone())),
|
||||
});
|
||||
|
@ -1,5 +1,5 @@
|
||||
use super::SysTimer;
|
||||
use crate::client::InGameStream;
|
||||
use crate::streams::{GetStream, InGameStream};
|
||||
use common::{
|
||||
comp::{Player, Pos},
|
||||
msg::ServerGeneral,
|
||||
@ -39,7 +39,7 @@ impl<'a> System<'a> for Sys {
|
||||
.map(|vd| super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, vd))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
let _ = in_game_stream.0.send(ServerGeneral::TerrainChunkUpdate {
|
||||
let _ = in_game_stream.send(ServerGeneral::TerrainChunkUpdate {
|
||||
key: *chunk_key,
|
||||
chunk: Ok(Box::new(match terrain.get_key(*chunk_key) {
|
||||
Some(chunk) => chunk.clone(),
|
||||
@ -55,7 +55,7 @@ impl<'a> System<'a> for Sys {
|
||||
let msg = ServerGeneral::TerrainBlockUpdates(terrain_changes.modified_blocks.clone());
|
||||
for (player, in_game_stream) in (&players, &mut in_game_streams).join() {
|
||||
if player.view_distance.is_some() {
|
||||
let _ = in_game_stream.0.send(msg.clone());
|
||||
in_game_stream.send_unchecked(msg.clone());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
use super::SysTimer;
|
||||
use crate::client::GeneralStream;
|
||||
use crate::streams::{GeneralStream, GetStream};
|
||||
use common::{
|
||||
comp::{Player, Pos, Waypoint, WaypointArea},
|
||||
msg::{Notification, ServerGeneral},
|
||||
@ -51,9 +51,9 @@ impl<'a> System<'a> for Sys {
|
||||
if let Ok(wp_old) = waypoints.insert(entity, Waypoint::new(player_pos.0, *time))
|
||||
{
|
||||
if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) {
|
||||
let _ = general_stream
|
||||
.0
|
||||
.send(ServerGeneral::Notification(Notification::WaypointSaved));
|
||||
general_stream.send_unchecked(ServerGeneral::Notification(
|
||||
Notification::WaypointSaved,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user