Merge branch 'xMAC94x/netfixD' into 'master'

xMAC94x/netfixD

See merge request veloren/veloren!1444
This commit is contained in:
Marcel 2020-11-03 13:07:33 +00:00
commit 9e905b297a
35 changed files with 1638 additions and 1358 deletions

View File

@ -25,9 +25,9 @@ use common::{
},
event::{EventBus, LocalEvent},
msg::{
validate_chat_msg, ChatMsgValidationError, ClientGeneral, ClientInGame, ClientMsg,
ClientRegister, ClientType, DisconnectReason, InviteAnswer, Notification, PingMsg,
PlayerInfo, PlayerListUpdate, RegisterError, ServerGeneral, ServerInfo, ServerInit,
validate_chat_msg, ChatMsgValidationError, ClientGeneral, ClientMsg, ClientRegister,
ClientType, DisconnectReason, InviteAnswer, Notification, PingMsg, PlayerInfo,
PlayerListUpdate, PresenceKind, RegisterError, ServerGeneral, ServerInfo, ServerInit,
ServerRegisterAnswer, MAX_BYTES_CHAT_MSG,
},
outcome::Outcome,
@ -71,7 +71,7 @@ pub enum Event {
pub struct Client {
registered: bool,
in_game: Option<ClientInGame>,
presence: Option<PresenceKind>,
thread_pool: ThreadPool,
pub server_info: ServerInfo,
/// Just the "base" layer for LOD; currently includes colors and nothing
@ -98,7 +98,6 @@ pub struct Client {
pub world_map: (Arc<DynamicImage>, Vec2<u16>, Vec2<f32>),
pub player_list: HashMap<Uid, PlayerInfo>,
pub character_list: CharacterList,
pub active_character_id: Option<CharacterId>,
recipe_book: RecipeBook,
available_recipes: HashSet<String>,
@ -376,7 +375,7 @@ impl Client {
Ok(Self {
registered: false,
in_game: None,
presence: None,
thread_pool,
server_info,
world_map,
@ -385,7 +384,6 @@ impl Client {
lod_horizon,
player_list: HashMap::new(),
character_list: CharacterList::default(),
active_character_id: None,
recipe_book,
available_recipes: HashSet::default(),
@ -467,12 +465,12 @@ impl Client {
#[cfg(debug_assertions)]
{
const C_TYPE: ClientType = ClientType::Game;
let verified = msg.verify(C_TYPE, self.registered, self.in_game);
let verified = msg.verify(C_TYPE, self.registered, self.presence);
assert!(
verified,
format!(
"c_type: {:?}, registered: {}, in_game: {:?}, msg: {:?}",
C_TYPE, self.registered, self.in_game, msg
"c_type: {:?}, registered: {}, presence: {:?}, msg: {:?}",
C_TYPE, self.registered, self.presence, msg
)
);
}
@ -500,9 +498,9 @@ impl Client {
| ClientGeneral::RefundSkill(_)
| ClientGeneral::UnlockSkillGroup(_) => &mut self.in_game_stream,
//Always possible
ClientGeneral::ChatMsg(_)
| ClientGeneral::Disconnect
| ClientGeneral::Terminate => &mut self.general_stream,
ClientGeneral::ChatMsg(_) | ClientGeneral::Terminate => {
&mut self.general_stream
},
};
stream.send(msg)
},
@ -528,9 +526,7 @@ impl Client {
self.send_msg(ClientGeneral::Character(character_id));
//Assume we are in_game unless server tells us otherwise
self.in_game = Some(ClientInGame::Character);
self.active_character_id = Some(character_id);
self.presence = Some(PresenceKind::Character(character_id));
}
/// Load the current players character list
@ -552,9 +548,11 @@ impl Client {
}
/// Send disconnect message to the server
pub fn request_logout(&mut self) {
debug!("Requesting logout from server");
self.send_msg(ClientGeneral::Disconnect);
pub fn logout(&mut self) {
debug!("Sending logout from server");
self.send_msg(ClientGeneral::Terminate);
self.registered = false;
self.presence = None;
}
/// Request a state transition to `ClientState::Registered` from an ingame
@ -920,7 +918,7 @@ impl Client {
// 1) Handle input from frontend.
// Pass character actions from frontend input to the player's entity.
if self.in_game.is_some() {
if self.presence.is_some() {
if let Err(e) = self
.state
.ecs()
@ -1091,7 +1089,7 @@ impl Client {
}
// 6) Update the server about the player's physics attributes.
if self.in_game.is_some() {
if self.presence.is_some() {
if let (Some(pos), Some(vel), Some(ori)) = (
self.state.read_storage().get(self.entity).cloned(),
self.state.read_storage().get(self.entity).cloned(),
@ -1133,11 +1131,6 @@ impl Client {
match msg {
ServerGeneral::Disconnect(reason) => match reason {
DisconnectReason::Shutdown => return Err(Error::ServerShutdown),
DisconnectReason::Requested => {
debug!("finally sending ClientMsg::Terminate");
frontend_events.push(Event::Disconnect);
self.send_msg_err(ClientGeneral::Terminate)?;
},
DisconnectReason::Kicked(reason) => {
debug!("sending ClientMsg::Terminate because we got kicked");
frontend_events.push(Event::Kicked(reason));
@ -1378,9 +1371,9 @@ impl Client {
};
frontend_events.push(Event::Chat(comp::ChatType::Meta.chat_msg(msg)));
},
// Cleanup for when the client goes back to the `in_game = None`
// Cleanup for when the client goes back to the `presence = None`
ServerGeneral::ExitInGameSuccess => {
self.in_game = None;
self.presence = None;
self.clean_state();
},
ServerGeneral::InventoryUpdate(mut inventory, event) => {
@ -1441,7 +1434,7 @@ impl Client {
},
ServerGeneral::CharacterDataLoadError(error) => {
trace!("Handling join error by server");
self.in_game = None;
self.presence = None;
self.clean_state();
self.character_list.error = Some(error);
},
@ -1551,7 +1544,7 @@ impl Client {
pub fn uid(&self) -> Option<Uid> { self.state.read_component_copied(self.entity) }
pub fn in_game(&self) -> Option<ClientInGame> { self.in_game }
pub fn presence(&self) -> Option<PresenceKind> { self.presence }
pub fn registered(&self) -> bool { self.registered }
@ -1840,7 +1833,7 @@ impl Drop for Client {
fn drop(&mut self) {
trace!("Dropping client");
if self.registered {
if let Err(e) = self.send_msg_err(ClientGeneral::Disconnect) {
if let Err(e) = self.send_msg_err(ClientGeneral::Terminate) {
warn!(
?e,
"Error during drop of client, couldn't send disconnect package, is the \

View File

@ -1,4 +1,3 @@
use crate::character::CharacterId;
use authc::Uuid;
use serde::{Deserialize, Serialize};
use specs::{Component, FlaggedStorage, NullStorage};
@ -9,25 +8,11 @@ const MAX_ALIAS_LEN: usize = 32;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Player {
pub alias: String,
pub character_id: Option<CharacterId>,
pub view_distance: Option<u32>,
uuid: Uuid,
}
impl Player {
pub fn new(
alias: String,
character_id: Option<CharacterId>,
view_distance: Option<u32>,
uuid: Uuid,
) -> Self {
Self {
alias,
character_id,
view_distance,
uuid,
}
}
pub fn new(alias: String, uuid: Uuid) -> Self { Self { alias, uuid } }
pub fn is_valid(&self) -> bool { Self::alias_is_valid(&self.alias) }

View File

@ -78,7 +78,6 @@ pub enum ClientGeneral {
UnlockSkillGroup(SkillGroupType),
//Always possible
ChatMsg(String),
Disconnect,
Terminate,
}
@ -87,21 +86,21 @@ impl ClientMsg {
&self,
c_type: ClientType,
registered: bool,
in_game: Option<super::ClientInGame>,
presence: Option<super::PresenceKind>,
) -> bool {
match self {
ClientMsg::Type(t) => c_type == *t,
ClientMsg::Register(_) => !registered && in_game.is_none(),
ClientMsg::Register(_) => !registered && presence.is_none(),
ClientMsg::General(g) => {
registered
&& match g {
ClientGeneral::RequestCharacterList
| ClientGeneral::CreateCharacter { .. }
| ClientGeneral::DeleteCharacter(_) => {
c_type != ClientType::ChatOnly && in_game.is_none()
c_type != ClientType::ChatOnly && presence.is_none()
},
ClientGeneral::Character(_) | ClientGeneral::Spectate => {
c_type == ClientType::Game && in_game.is_none()
c_type == ClientType::Game && presence.is_none()
},
//Only in game
ClientGeneral::ControllerInputs(_)
@ -116,12 +115,10 @@ impl ClientMsg {
| ClientGeneral::UnlockSkill(_)
| ClientGeneral::RefundSkill(_)
| ClientGeneral::UnlockSkillGroup(_) => {
c_type == ClientType::Game && in_game.is_some()
c_type == ClientType::Game && presence.is_some()
},
//Always possible
ClientGeneral::ChatMsg(_)
| ClientGeneral::Disconnect
| ClientGeneral::Terminate => true,
ClientGeneral::ChatMsg(_) | ClientGeneral::Terminate => true,
}
},
ClientMsg::Ping(_) => true,

View File

@ -13,12 +13,13 @@ pub use self::{
},
world_msg::WorldMapMsg,
};
use crate::character::CharacterId;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum ClientInGame {
pub enum PresenceKind {
Spectator,
Character,
Character(CharacterId),
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]

View File

@ -163,8 +163,6 @@ pub enum Notification {
pub enum DisconnectReason {
/// Server shut down
Shutdown,
/// Client sent disconnect message
Requested,
/// Client was kicked
Kicked(String),
}
@ -184,11 +182,11 @@ impl ServerMsg {
&self,
c_type: ClientType,
registered: bool,
in_game: Option<super::ClientInGame>,
presence: Option<super::PresenceKind>,
) -> bool {
match self {
ServerMsg::Info(_) | ServerMsg::Init(_) | ServerMsg::RegisterAnswer(_) => {
!registered && in_game.is_none()
!registered && presence.is_none()
},
ServerMsg::General(g) => {
registered
@ -197,10 +195,10 @@ impl ServerMsg {
ServerGeneral::CharacterDataLoadError(_)
| ServerGeneral::CharacterListUpdate(_)
| ServerGeneral::CharacterActionError(_) => {
c_type != ClientType::ChatOnly && in_game.is_none()
c_type != ClientType::ChatOnly && presence.is_none()
},
ServerGeneral::CharacterSuccess => {
c_type == ClientType::Game && in_game.is_none()
c_type == ClientType::Game && presence.is_none()
},
//Ingame related
ServerGeneral::GroupUpdate(_)
@ -214,7 +212,7 @@ impl ServerMsg {
| ServerGeneral::SetViewDistance(_)
| ServerGeneral::Outcomes(_)
| ServerGeneral::Knockback(_) => {
c_type == ClientType::Game && in_game.is_some()
c_type == ClientType::Game && presence.is_some()
},
// Always possible
ServerGeneral::PlayerListUpdate(_)

View File

@ -155,7 +155,7 @@ impl ShutdownCoordinator {
/// Logs and sends a message to all connected clients
fn send_msg(server: &mut Server, msg: String) {
info!("{}", &msg);
server.notify_registered_clients(ChatType::CommandError.server_msg(msg));
server.notify_players(ChatType::CommandError.server_msg(msg));
}
/// Converts a `Duration` into text in the format XsXm for example 1 minute

View File

@ -1,72 +1,81 @@
use crate::error::Error;
use common::msg::{ClientInGame, ClientType, ServerGeneral, ServerMsg};
use hashbrown::HashSet;
use network::{Participant, Stream};
use common::msg::{ClientType, ServerGeneral, ServerMsg};
use network::{Message, Participant, Stream, StreamError};
use serde::{de::DeserializeOwned, Serialize};
use specs::{Component, FlaggedStorage};
use specs::Component;
use specs_idvs::IdvStorage;
use tracing::debug;
use vek::*;
use std::sync::{atomic::AtomicBool, Mutex};
/// Client handles ALL network related information of everything that connects
/// to the server Client DOES NOT handle game states
/// Client DOES NOT handle network information that is only relevant to some
/// "things" connecting to the server (there is currently no such case). First a
/// Client connects to the game, when it registers, it gets the `Player`
/// component, when he enters the game he gets the `InGame` component.
pub struct Client {
pub registered: bool,
pub client_type: ClientType,
pub in_game: Option<ClientInGame>,
pub participant: Option<Participant>,
pub general_stream: Stream,
pub ping_stream: Stream,
pub register_stream: Stream,
pub character_screen_stream: Stream,
pub in_game_stream: Stream,
pub network_error: bool,
pub last_ping: f64,
pub login_msg_sent: bool,
pub last_ping: Mutex<f64>,
pub login_msg_sent: AtomicBool,
pub terminate_msg_recv: AtomicBool,
//TODO: improve network crate so that `send` is no longer `&mut self` and we can get rid of
// this Mutex. This Mutex is just to please the compiler as we do not get into contention
general_stream: Mutex<Stream>,
ping_stream: Mutex<Stream>,
register_stream: Mutex<Stream>,
character_screen_stream: Mutex<Stream>,
in_game_stream: Mutex<Stream>,
}
pub struct PreparedMsg {
stream_id: u8,
message: Message,
}
impl Component for Client {
type Storage = FlaggedStorage<Self, IdvStorage<Self>>;
type Storage = IdvStorage<Self>;
}
impl Client {
fn internal_send<M: Serialize>(err: &mut bool, s: &mut Stream, msg: M) {
if !*err {
if let Err(e) = s.send(msg) {
debug!(?e, "got a network error with client");
*err = true;
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
client_type: ClientType,
participant: Participant,
last_ping: f64,
general_stream: Stream,
ping_stream: Stream,
register_stream: Stream,
character_screen_stream: Stream,
in_game_stream: Stream,
) -> Self {
Client {
client_type,
participant: Some(participant),
last_ping: Mutex::new(last_ping),
login_msg_sent: AtomicBool::new(false),
terminate_msg_recv: AtomicBool::new(false),
general_stream: Mutex::new(general_stream),
ping_stream: Mutex::new(ping_stream),
register_stream: Mutex::new(register_stream),
character_screen_stream: Mutex::new(character_screen_stream),
in_game_stream: Mutex::new(in_game_stream),
}
}
/*
fn internal_send_raw(b: &AtomicBool, s: &mut Stream, msg: Arc<MessageBuffer>) {
if !b.load(Ordering::Relaxed) {
if let Err(e) = s.send_raw(msg) {
debug!(?e, "got a network error with client");
b.store(true, Ordering::Relaxed);
}
}
}
*/
pub fn send_msg<S>(&mut self, msg: S)
where
S: Into<ServerMsg>,
{
const ERR: &str =
"Don't do that. Sending these messages is only done ONCE at connect and not by this fn";
pub(crate) fn send<M: Into<ServerMsg>>(&self, msg: M) -> Result<(), StreamError> {
match msg.into() {
ServerMsg::Info(_) => panic!(ERR),
ServerMsg::Init(_) => panic!(ERR),
ServerMsg::RegisterAnswer(msg) => {
Self::internal_send(&mut self.network_error, &mut self.register_stream, &msg)
},
ServerMsg::General(msg) => {
let stream = match &msg {
ServerMsg::Info(m) => self.register_stream.try_lock().unwrap().send(m),
ServerMsg::Init(m) => self.register_stream.try_lock().unwrap().send(m),
ServerMsg::RegisterAnswer(m) => self.register_stream.try_lock().unwrap().send(m),
ServerMsg::General(g) => {
match g {
//Character Screen related
ServerGeneral::CharacterDataLoadError(_)
| ServerGeneral::CharacterListUpdate(_)
| ServerGeneral::CharacterActionError(_)
| ServerGeneral::CharacterSuccess => &mut self.character_screen_stream,
| ServerGeneral::CharacterSuccess => {
self.character_screen_stream.try_lock().unwrap().send(g)
},
//Ingame related
ServerGeneral::GroupUpdate(_)
| ServerGeneral::GroupInvite { .. }
@ -78,7 +87,9 @@ impl Client {
| ServerGeneral::TerrainBlockUpdates(_)
| ServerGeneral::SetViewDistance(_)
| ServerGeneral::Outcomes(_)
| ServerGeneral::Knockback(_) => &mut self.in_game_stream,
| ServerGeneral::Knockback(_) => {
self.in_game_stream.try_lock().unwrap().send(g)
},
// Always possible
ServerGeneral::PlayerListUpdate(_)
| ServerGeneral::ChatMsg(_)
@ -89,46 +100,109 @@ impl Client {
| ServerGeneral::CreateEntity(_)
| ServerGeneral::DeleteEntity(_)
| ServerGeneral::Disconnect(_)
| ServerGeneral::Notification(_) => &mut self.general_stream,
};
Self::internal_send(&mut self.network_error, stream, &msg)
| ServerGeneral::Notification(_) => {
self.general_stream.try_lock().unwrap().send(g)
},
}
},
ServerMsg::Ping(msg) => {
Self::internal_send(&mut self.network_error, &mut self.ping_stream, &msg)
},
};
ServerMsg::Ping(m) => self.ping_stream.try_lock().unwrap().send(m),
}
}
pub async fn internal_recv<M: DeserializeOwned>(
err: &mut bool,
s: &mut Stream,
) -> Result<M, Error> {
if !*err {
match s.recv().await {
Ok(r) => Ok(r),
Err(e) => {
debug!(?e, "got a network error with client while recv");
*err = true;
Err(Error::StreamErr(e))
},
}
} else {
Err(Error::StreamErr(network::StreamError::StreamClosed))
pub(crate) fn send_fallible<M: Into<ServerMsg>>(&self, msg: M) { let _ = self.send(msg); }
pub(crate) fn send_prepared(&self, msg: &PreparedMsg) -> Result<(), StreamError> {
match msg.stream_id {
0 => self
.register_stream
.try_lock()
.unwrap()
.send_raw(&msg.message),
1 => self
.character_screen_stream
.try_lock()
.unwrap()
.send_raw(&msg.message),
2 => self
.in_game_stream
.try_lock()
.unwrap()
.send_raw(&msg.message),
3 => self
.general_stream
.try_lock()
.unwrap()
.send_raw(&msg.message),
4 => self.ping_stream.try_lock().unwrap().send_raw(&msg.message),
_ => unreachable!("invalid stream id"),
}
}
pub(crate) fn prepare<M: Into<ServerMsg>>(&self, msg: M) -> PreparedMsg {
match msg.into() {
ServerMsg::Info(m) => PreparedMsg::new(0, &m, &self.register_stream),
ServerMsg::Init(m) => PreparedMsg::new(0, &m, &self.register_stream),
ServerMsg::RegisterAnswer(m) => PreparedMsg::new(0, &m, &self.register_stream),
ServerMsg::General(g) => {
match g {
//Character Screen related
ServerGeneral::CharacterDataLoadError(_)
| ServerGeneral::CharacterListUpdate(_)
| ServerGeneral::CharacterActionError(_)
| ServerGeneral::CharacterSuccess => {
PreparedMsg::new(1, &g, &self.character_screen_stream)
},
//Ingame related
ServerGeneral::GroupUpdate(_)
| ServerGeneral::GroupInvite { .. }
| ServerGeneral::InvitePending(_)
| ServerGeneral::InviteComplete { .. }
| ServerGeneral::ExitInGameSuccess
| ServerGeneral::InventoryUpdate(_, _)
| ServerGeneral::TerrainChunkUpdate { .. }
| ServerGeneral::TerrainBlockUpdates(_)
| ServerGeneral::SetViewDistance(_)
| ServerGeneral::Outcomes(_)
| ServerGeneral::Knockback(_) => PreparedMsg::new(2, &g, &self.in_game_stream),
// Always possible
ServerGeneral::PlayerListUpdate(_)
| ServerGeneral::ChatMsg(_)
| ServerGeneral::SetPlayerEntity(_)
| ServerGeneral::TimeOfDay(_)
| ServerGeneral::EntitySync(_)
| ServerGeneral::CompSync(_)
| ServerGeneral::CreateEntity(_)
| ServerGeneral::DeleteEntity(_)
| ServerGeneral::Disconnect(_)
| ServerGeneral::Notification(_) => {
PreparedMsg::new(3, &g, &self.general_stream)
},
}
},
ServerMsg::Ping(m) => PreparedMsg::new(4, &m, &self.ping_stream),
}
}
pub(crate) fn recv<M: DeserializeOwned>(
&self,
stream_id: u8,
) -> Result<Option<M>, StreamError> {
match stream_id {
0 => self.register_stream.try_lock().unwrap().try_recv(),
1 => self.character_screen_stream.try_lock().unwrap().try_recv(),
2 => self.in_game_stream.try_lock().unwrap().try_recv(),
3 => self.general_stream.try_lock().unwrap().try_recv(),
4 => self.ping_stream.try_lock().unwrap().try_recv(),
_ => unreachable!("invalid stream id"),
}
}
}
// Distance from fuzzy_chunk before snapping to current chunk
pub const CHUNK_FUZZ: u32 = 2;
// Distance out of the range of a region before removing it from subscriptions
pub const REGION_FUZZ: u32 = 16;
#[derive(Clone, Debug)]
pub struct RegionSubscription {
pub fuzzy_chunk: Vec2<i32>,
pub regions: HashSet<Vec2<i32>>,
}
impl Component for RegionSubscription {
type Storage = FlaggedStorage<Self, IdvStorage<Self>>;
impl PreparedMsg {
fn new<M: Serialize + ?Sized>(id: u8, msg: &M, stream: &Mutex<Stream>) -> PreparedMsg {
Self {
stream_id: id,
message: Message::serialize(&msg, &stream.try_lock().unwrap()),
}
}
}

View File

@ -3,7 +3,6 @@
//! `CHAT_COMMANDS` and provide a handler function.
use crate::{
client::Client,
settings::{BanRecord, EditableSetting},
Server, StateExt,
};
@ -27,7 +26,7 @@ use std::convert::TryFrom;
use vek::*;
use world::util::Sampler;
use crate::login_provider::LoginProvider;
use crate::{client::Client, login_provider::LoginProvider};
use scan_fmt::{scan_fmt, scan_fmt_some};
use tracing::error;
@ -508,11 +507,11 @@ fn handle_alias(
*uid,
player.alias.clone(),
));
server.state.notify_registered_clients(msg);
server.state.notify_players(msg);
// Announce alias change if target has a Body.
if ecs.read_storage::<comp::Body>().get(target).is_some() {
server.state.notify_registered_clients(
server.state.notify_players(
ChatType::CommandInfo
.server_msg(format!("{} is now known as {}.", old_alias, player.alias)),
);
@ -650,7 +649,7 @@ fn handle_spawn(
// Add to group system if a pet
if matches!(alignment, comp::Alignment::Owned { .. }) {
let state = server.state();
let mut clients = state.ecs().write_storage::<Client>();
let clients = state.ecs().read_storage::<Client>();
let uids = state.ecs().read_storage::<Uid>();
let mut group_manager =
state.ecs().write_resource::<comp::group::GroupManager>();
@ -663,14 +662,14 @@ fn handle_spawn(
&uids,
&mut |entity, group_change| {
clients
.get_mut(entity)
.get(entity)
.and_then(|c| {
group_change
.try_map(|e| uids.get(e).copied())
.map(|g| (g, c))
})
.map(|(g, c)| {
c.send_msg(ServerGeneral::GroupUpdate(g))
c.send_fallible(ServerGeneral::GroupUpdate(g));
});
},
);
@ -1211,7 +1210,7 @@ fn handle_adminify(
.expect("Player should have uid"),
is_admin,
));
server.state.notify_registered_clients(msg);
server.state.notify_players(msg);
},
None => {
server.notify_client(
@ -1668,11 +1667,9 @@ fn handle_set_level(
.read_storage::<Uid>()
.get(player)
.expect("Failed to get uid for player");
server
.state
.notify_registered_clients(ServerGeneral::PlayerListUpdate(
PlayerListUpdate::LevelChange(uid, lvl),
));
server.state.notify_players(ServerGeneral::PlayerListUpdate(
PlayerListUpdate::LevelChange(uid, lvl),
));
if let Some(stats) = server
.state

View File

@ -13,10 +13,12 @@ pub(crate) struct ServerInfoPacket {
pub time: f64,
}
pub(crate) type IncomingClient = Client;
pub(crate) struct ConnectionHandler {
_network: Arc<Network>,
thread_handle: Option<thread::JoinHandle<()>>,
pub client_receiver: Receiver<Client>,
pub client_receiver: Receiver<IncomingClient>,
pub info_requester_receiver: Receiver<Sender<ServerInfoPacket>>,
stop_sender: Option<oneshot::Sender<()>>,
}
@ -31,7 +33,7 @@ impl ConnectionHandler {
let network_clone = Arc::clone(&network);
let (stop_sender, stop_receiver) = oneshot::channel();
let (client_sender, client_receiver) = unbounded::<Client>();
let (client_sender, client_receiver) = unbounded::<IncomingClient>();
let (info_requester_sender, info_requester_receiver) =
bounded::<Sender<ServerInfoPacket>>(1);
@ -55,7 +57,7 @@ impl ConnectionHandler {
async fn work(
network: Arc<Network>,
client_sender: Sender<Client>,
client_sender: Sender<IncomingClient>,
info_requester_sender: Sender<Sender<ServerInfoPacket>>,
stop_receiver: oneshot::Receiver<()>,
) {
@ -92,7 +94,7 @@ impl ConnectionHandler {
async fn init_participant(
participant: Participant,
client_sender: Sender<Client>,
client_sender: Sender<IncomingClient>,
info_requester_sender: Sender<Sender<ServerInfoPacket>>,
) -> Result<(), Box<dyn std::error::Error>> {
debug!("New Participant connected to the server");
@ -118,26 +120,22 @@ impl ConnectionHandler {
t = register_stream.recv::<ClientType>().fuse() => Some(t),
) {
None => {
debug!("slow client connection detected, dropping it");
debug!("Timeout for incoming client elapsed, aborting connection");
return Ok(());
},
Some(client_type) => client_type?,
};
let client = Client {
registered: false,
let client = Client::new(
client_type,
in_game: None,
participant: Some(participant),
participant,
server_data.time,
general_stream,
ping_stream,
register_stream,
in_game_stream,
character_screen_stream,
network_error: false,
last_ping: server_data.time,
login_msg_sent: false,
};
in_game_stream,
);
client_sender.send(client)?;
Ok(())

View File

@ -42,9 +42,9 @@ pub fn handle_knockback(server: &Server, entity: EcsEntity, impulse: Vec3<f32>)
if let Some(vel) = velocities.get_mut(entity) {
vel.0 = impulse;
}
let mut clients = state.ecs().write_storage::<Client>();
if let Some(client) = clients.get_mut(entity) {
client.send_msg(ServerGeneral::Knockback(impulse));
let clients = state.ecs().read_storage::<Client>();
if let Some(client) = clients.get(entity) {
client.send_fallible(ServerGeneral::Knockback(impulse));
}
}
@ -199,9 +199,8 @@ pub fn handle_destroy(server: &mut Server, entity: EcsEntity, cause: HealthSourc
| HealthSource::Healing { by: _ }
| HealthSource::Unknown => KillSource::Other,
};
state.notify_registered_clients(
comp::ChatType::Kill(kill_source, *uid).server_msg("".to_string()),
);
state
.notify_players(comp::ChatType::Kill(kill_source, *uid).server_msg("".to_string()));
}
}
@ -667,11 +666,9 @@ pub fn handle_level_up(server: &mut Server, entity: EcsEntity, new_level: u32) {
.get(entity)
.expect("Failed to fetch uid component for entity.");
server
.state
.notify_registered_clients(ServerGeneral::PlayerListUpdate(
PlayerListUpdate::LevelChange(*uid, new_level),
));
server.state.notify_players(ServerGeneral::PlayerListUpdate(
PlayerListUpdate::LevelChange(*uid, new_level),
));
}
pub fn handle_buff(server: &mut Server, entity: EcsEntity, buff_change: buff::BuffChange) {

View File

@ -25,13 +25,13 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
match manip {
GroupManip::Invite(uid) => {
let mut clients = state.ecs().write_storage::<Client>();
let clients = state.ecs().read_storage::<Client>();
let invitee = match state.ecs().entity_from_uid(uid.into()) {
Some(t) => t,
None => {
// Inform of failure
if let Some(client) = clients.get_mut(entity) {
client.send_msg(
if let Some(client) = clients.get(entity) {
client.send_fallible(
ChatType::Meta
.server_msg("Invite failed, target does not exist.".to_owned()),
);
@ -62,8 +62,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
});
if already_in_same_group {
// Inform of failure
if let Some(client) = clients.get_mut(entity) {
client.send_msg(ChatType::Meta.server_msg(
if let Some(general_stream) = clients.get(entity) {
general_stream.send_fallible(ChatType::Meta.server_msg(
"Invite failed, can't invite someone already in your group".to_owned(),
));
}
@ -92,8 +92,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
>= max_group_size as usize;
if group_size_limit_reached {
// Inform inviter that they have reached the group size limit
if let Some(client) = clients.get_mut(entity) {
client.send_msg(
if let Some(general_stream) = clients.get(entity) {
general_stream.send_fallible(
ChatType::Meta.server_msg(
"Invite failed, pending invites plus current group size have reached \
the group size limit"
@ -109,8 +109,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
if invites.contains(invitee) {
// Inform inviter that there is already an invite
if let Some(client) = clients.get_mut(entity) {
client.send_msg(
if let Some(client) = clients.get(entity) {
client.send_fallible(
ChatType::Meta
.server_msg("This player already has a pending invite.".to_owned()),
);
@ -151,32 +151,31 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
};
// If client comp
if let (Some(client), Some(inviter)) =
(clients.get_mut(invitee), uids.get(entity).copied())
if let (Some(client), Some(inviter)) = (clients.get(invitee), uids.get(entity).copied())
{
if send_invite() {
client.send_msg(ServerGeneral::GroupInvite {
client.send_fallible(ServerGeneral::GroupInvite {
inviter,
timeout: PRESENTED_INVITE_TIMEOUT_DUR,
});
}
} else if agents.contains(invitee) {
send_invite();
} else if let Some(client) = clients.get_mut(entity) {
client.send_msg(
} else if let Some(client) = clients.get(entity) {
client.send_fallible(
ChatType::Meta.server_msg("Can't invite, not a player or npc".to_owned()),
);
}
// Notify inviter that the invite is pending
if invite_sent {
if let Some(client) = clients.get_mut(entity) {
client.send_msg(ServerGeneral::InvitePending(uid));
if let Some(client) = clients.get(entity) {
client.send_fallible(ServerGeneral::InvitePending(uid));
}
}
},
GroupManip::Accept => {
let mut clients = state.ecs().write_storage::<Client>();
let clients = state.ecs().read_storage::<Client>();
let uids = state.ecs().read_storage::<sync::Uid>();
let mut invites = state.ecs().write_storage::<Invite>();
if let Some(inviter) = invites.remove(entity).and_then(|invite| {
@ -194,12 +193,12 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
Some(inviter)
}) {
if let (Some(client), Some(target)) =
(clients.get_mut(inviter), uids.get(entity).copied())
(clients.get(inviter), uids.get(entity).copied())
{
client.send_msg(ServerGeneral::InviteComplete {
client.send_fallible(ServerGeneral::InviteComplete {
target,
answer: InviteAnswer::Accepted,
})
});
}
let mut group_manager = state.ecs().write_resource::<GroupManager>();
group_manager.add_group_member(
@ -211,19 +210,19 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
&uids,
|entity, group_change| {
clients
.get_mut(entity)
.get(entity)
.and_then(|c| {
group_change
.try_map(|e| uids.get(e).copied())
.map(|g| (g, c))
})
.map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g)));
.map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g)));
},
);
}
},
GroupManip::Decline => {
let mut clients = state.ecs().write_storage::<Client>();
let clients = state.ecs().read_storage::<Client>();
let uids = state.ecs().read_storage::<sync::Uid>();
let mut invites = state.ecs().write_storage::<Invite>();
if let Some(inviter) = invites.remove(entity).and_then(|invite| {
@ -242,17 +241,17 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
}) {
// Inform inviter of rejection
if let (Some(client), Some(target)) =
(clients.get_mut(inviter), uids.get(entity).copied())
(clients.get(inviter), uids.get(entity).copied())
{
client.send_msg(ServerGeneral::InviteComplete {
client.send_fallible(ServerGeneral::InviteComplete {
target,
answer: InviteAnswer::Declined,
})
});
}
}
},
GroupManip::Leave => {
let mut clients = state.ecs().write_storage::<Client>();
let clients = state.ecs().read_storage::<Client>();
let uids = state.ecs().read_storage::<sync::Uid>();
let mut group_manager = state.ecs().write_resource::<GroupManager>();
group_manager.leave_group(
@ -263,18 +262,18 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
&state.ecs().entities(),
&mut |entity, group_change| {
clients
.get_mut(entity)
.get(entity)
.and_then(|c| {
group_change
.try_map(|e| uids.get(e).copied())
.map(|g| (g, c))
})
.map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g)));
.map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g)));
},
);
},
GroupManip::Kick(uid) => {
let mut clients = state.ecs().write_storage::<Client>();
let clients = state.ecs().read_storage::<Client>();
let uids = state.ecs().read_storage::<sync::Uid>();
let alignments = state.ecs().read_storage::<comp::Alignment>();
@ -282,8 +281,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
Some(t) => t,
None => {
// Inform of failure
if let Some(client) = clients.get_mut(entity) {
client.send_msg(
if let Some(client) = clients.get(entity) {
client.send_fallible(
ChatType::Meta
.server_msg("Kick failed, target does not exist.".to_owned()),
);
@ -295,8 +294,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
// Can't kick pet
if matches!(alignments.get(target), Some(comp::Alignment::Owned(owner)) if uids.get(target).map_or(true, |u| u != owner))
{
if let Some(client) = clients.get_mut(entity) {
client.send_msg(
if let Some(general_stream) = clients.get(entity) {
general_stream.send_fallible(
ChatType::Meta.server_msg("Kick failed, you can't kick pets.".to_owned()),
);
}
@ -304,8 +303,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
}
// Can't kick yourself
if uids.get(entity).map_or(false, |u| *u == uid) {
if let Some(client) = clients.get_mut(entity) {
client.send_msg(
if let Some(client) = clients.get(entity) {
client.send_fallible(
ChatType::Meta
.server_msg("Kick failed, you can't kick yourself.".to_owned()),
);
@ -330,40 +329,41 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
&state.ecs().entities(),
&mut |entity, group_change| {
clients
.get_mut(entity)
.get(entity)
.and_then(|c| {
group_change
.try_map(|e| uids.get(e).copied())
.map(|g| (g, c))
})
.map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g)));
.map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g)));
},
);
// Tell them the have been kicked
if let Some(client) = clients.get_mut(target) {
client.send_msg(
if let Some(client) = clients.get(target) {
client.send_fallible(
ChatType::Meta
.server_msg("You were removed from the group.".to_owned()),
);
}
// Tell kicker that they were succesful
if let Some(client) = clients.get_mut(entity) {
client.send_msg(ChatType::Meta.server_msg("Player kicked.".to_owned()));
if let Some(client) = clients.get(entity) {
client
.send_fallible(ChatType::Meta.server_msg("Player kicked.".to_owned()));
}
},
Some(_) => {
// Inform kicker that they are not the leader
if let Some(client) = clients.get_mut(entity) {
client.send_msg(ChatType::Meta.server_msg(
if let Some(client) = clients.get(entity) {
client.send_fallible(ChatType::Meta.server_msg(
"Kick failed: You are not the leader of the target's group.".to_owned(),
));
}
},
None => {
// Inform kicker that the target is not in a group
if let Some(client) = clients.get_mut(entity) {
client.send_msg(
if let Some(client) = clients.get(entity) {
client.send_fallible(
ChatType::Meta.server_msg(
"Kick failed: Your target is not in a group.".to_owned(),
),
@ -373,14 +373,14 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
}
},
GroupManip::AssignLeader(uid) => {
let mut clients = state.ecs().write_storage::<Client>();
let clients = state.ecs().read_storage::<Client>();
let uids = state.ecs().read_storage::<sync::Uid>();
let target = match state.ecs().entity_from_uid(uid.into()) {
Some(t) => t,
None => {
// Inform of failure
if let Some(client) = clients.get_mut(entity) {
client.send_msg(ChatType::Meta.server_msg(
if let Some(client) = clients.get(entity) {
client.send_fallible(ChatType::Meta.server_msg(
"Leadership transfer failed, target does not exist".to_owned(),
));
}
@ -404,24 +404,24 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
&uids,
|entity, group_change| {
clients
.get_mut(entity)
.get(entity)
.and_then(|c| {
group_change
.try_map(|e| uids.get(e).copied())
.map(|g| (g, c))
})
.map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g)));
.map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g)));
},
);
// Tell them they are the leader
if let Some(client) = clients.get_mut(target) {
client.send_msg(
if let Some(client) = clients.get(target) {
client.send_fallible(
ChatType::Meta.server_msg("You are the group leader now.".to_owned()),
);
}
// Tell the old leader that the transfer was succesful
if let Some(client) = clients.get_mut(target) {
client.send_msg(
if let Some(client) = clients.get(target) {
client.send_fallible(
ChatType::Meta
.server_msg("You are no longer the group leader.".to_owned()),
);
@ -429,8 +429,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
},
Some(_) => {
// Inform transferer that they are not the leader
if let Some(client) = clients.get_mut(entity) {
client.send_msg(
if let Some(client) = clients.get(entity) {
client.send_fallible(
ChatType::Meta.server_msg(
"Transfer failed: You are not the leader of the target's group."
.to_owned(),
@ -440,8 +440,8 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
},
None => {
// Inform transferer that the target is not in a group
if let Some(client) = clients.get_mut(entity) {
client.send_msg(ChatType::Meta.server_msg(
if let Some(client) = clients.get(entity) {
client.send_fallible(ChatType::Meta.server_msg(
"Transfer failed: Your target is not in a group.".to_owned(),
));
}

View File

@ -1,7 +1,4 @@
use crate::{
client::{Client, RegionSubscription},
Server,
};
use crate::{client::Client, presence::RegionSubscription, Server};
use common::{
comp::{self, item, Pos},
consts::MAX_MOUNT_RANGE,
@ -118,85 +115,80 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) {
return;
}
// You can't possess other players
let mut clients = ecs.write_storage::<Client>();
if clients.get_mut(possesse).is_none() {
if let Some(mut client) = clients.remove(possessor) {
client.send_msg(ServerGeneral::SetPlayerEntity(possesse_uid));
clients
.insert(possesse, client)
.err()
.map(|e| error!(?e, "Error inserting client component during possession"));
// Put possess item into loadout
let mut loadouts = ecs.write_storage::<comp::Loadout>();
let loadout = loadouts
.entry(possesse)
.expect("Could not read loadouts component while possessing")
.or_insert(comp::Loadout::default());
let item = comp::Item::new_from_asset_expect("common.items.debug.possess");
if let item::ItemKind::Tool(tool) = item.kind() {
let mut abilities = tool.get_abilities();
let mut ability_drain = abilities.drain(..);
let debug_item = comp::ItemConfig {
item,
ability1: ability_drain.next(),
ability2: ability_drain.next(),
ability3: ability_drain.next(),
block_ability: None,
dodge_ability: None,
};
std::mem::swap(&mut loadout.active_item, &mut loadout.second_item);
loadout.active_item = Some(debug_item);
}
// Move player component
{
let mut players = ecs.write_storage::<comp::Player>();
if let Some(player) = players.remove(possessor) {
players.insert(possesse, player).err().map(|e| {
error!(?e, "Error inserting player component during possession")
});
}
}
// Transfer region subscription
{
let mut subscriptions = ecs.write_storage::<RegionSubscription>();
if let Some(s) = subscriptions.remove(possessor) {
subscriptions.insert(possesse, s).err().map(|e| {
error!(
?e,
"Error inserting subscription component during possession"
)
});
}
}
// Remove will of the entity
ecs.write_storage::<comp::Agent>().remove(possesse);
// Reset controller of former shell
ecs.write_storage::<comp::Controller>()
.get_mut(possessor)
.map(|c| c.reset());
// Transfer admin powers
{
let mut admins = ecs.write_storage::<comp::Admin>();
if let Some(admin) = admins.remove(possessor) {
admins.insert(possesse, admin).err().map(|e| {
error!(?e, "Error inserting admin component during possession")
});
}
}
// Transfer waypoint
{
let mut waypoints = ecs.write_storage::<comp::Waypoint>();
if let Some(waypoint) = waypoints.remove(possessor) {
waypoints.insert(possesse, waypoint).err().map(|e| {
error!(?e, "Error inserting waypoint component during possession",)
});
}
}
}
if clients.get_mut(possesse).is_some() {
error!("can't possess other players");
return;
}
match (|| -> Option<Result<(), specs::error::Error>> {
let c = clients.remove(possessor)?;
clients.insert(possesse, c).ok()?;
//optional entities
let mut players = ecs.write_storage::<comp::Player>();
let mut subscriptions = ecs.write_storage::<RegionSubscription>();
let mut admins = ecs.write_storage::<comp::Admin>();
let mut waypoints = ecs.write_storage::<comp::Waypoint>();
players
.remove(possessor)
.map(|p| players.insert(possesse, p).ok()?);
subscriptions
.remove(possessor)
.map(|s| subscriptions.insert(possesse, s).ok()?);
admins
.remove(possessor)
.map(|a| admins.insert(possesse, a).ok()?);
waypoints
.remove(possessor)
.map(|w| waypoints.insert(possesse, w).ok()?);
Some(Ok(()))
})() {
Some(Ok(())) => (),
Some(Err(e)) => {
error!(?e, ?possesse, "Error inserting component during possession");
return;
},
None => {
error!(?possessor, "Error removing component during possession");
return;
},
}
clients
.get_mut(possesse)
.map(|c| c.send_fallible(ServerGeneral::SetPlayerEntity(possesse_uid)));
// Put possess item into loadout
let mut loadouts = ecs.write_storage::<comp::Loadout>();
let loadout = loadouts
.entry(possesse)
.expect("Could not read loadouts component while possessing")
.or_insert(comp::Loadout::default());
let item = comp::Item::new_from_asset_expect("common.items.debug.possess");
if let item::ItemKind::Tool(tool) = item.kind() {
let mut abilities = tool.get_abilities();
let mut ability_drain = abilities.drain(..);
let debug_item = comp::ItemConfig {
item,
ability1: ability_drain.next(),
ability2: ability_drain.next(),
ability3: ability_drain.next(),
block_ability: None,
dodge_ability: None,
};
std::mem::swap(&mut loadout.active_item, &mut loadout.second_item);
loadout.active_item = Some(debug_item);
}
// Remove will of the entity
ecs.write_storage::<comp::Agent>().remove(possesse);
// Reset controller of former shell
ecs.write_storage::<comp::Controller>()
.get_mut(possessor)
.map(|c| c.reset());
}
}

View File

@ -279,7 +279,7 @@ pub fn handle_inventory(server: &mut Server, entity: EcsEntity, manip: comp::Inv
.insert(tameable_entity, comp::Alignment::Owned(uid));
// Add to group system
let mut clients = state.ecs().write_storage::<Client>();
let clients = state.ecs().read_storage::<Client>();
let uids = state.ecs().read_storage::<Uid>();
let mut group_manager = state
.ecs()
@ -294,14 +294,14 @@ pub fn handle_inventory(server: &mut Server, entity: EcsEntity, manip: comp::Inv
&uids,
&mut |entity, group_change| {
clients
.get_mut(entity)
.get(entity)
.and_then(|c| {
group_change
.try_map(|e| uids.get(e).copied())
.map(|g| (g, c))
})
.map(|(g, c)| {
c.send_msg(ServerGeneral::GroupUpdate(g))
c.send(ServerGeneral::GroupUpdate(g))
});
},
);

View File

@ -1,11 +1,12 @@
use super::Event;
use crate::{
client::Client, login_provider::LoginProvider, persistence, state_ext::StateExt, Server,
client::Client, login_provider::LoginProvider, persistence, presence::Presence,
state_ext::StateExt, Server,
};
use common::{
comp,
comp::{group, Player},
msg::{PlayerListUpdate, ServerGeneral},
msg::{PlayerListUpdate, PresenceKind, ServerGeneral},
span,
sync::{Uid, UidAllocator},
};
@ -17,24 +18,28 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) {
span!(_guard, "handle_exit_ingame");
let state = server.state_mut();
// Create new entity with just `Client`, `Uid`, and `Player` components
// Easier than checking and removing all other known components
// Create new entity with just `Client`, `Uid`, `Player`, and `...Stream`
// components Easier than checking and removing all other known components
// Note: If other `ServerEvent`s are referring to this entity they will be
// disrupted
let maybe_client = state.ecs().write_storage::<Client>().remove(entity);
let maybe_uid = state.read_component_copied::<Uid>(entity);
let maybe_player = state.ecs().write_storage::<comp::Player>().remove(entity);
let maybe_admin = state.ecs().write_storage::<comp::Admin>().remove(entity);
let maybe_admin = state.ecs().write_storage::<comp::Admin>().remove(entity);
let maybe_group = state
.ecs()
.write_storage::<group::Group>()
.get(entity)
.cloned();
if let (Some(mut client), Some(uid), Some(player)) = (maybe_client, maybe_uid, maybe_player) {
if let Some((client, uid, player)) = (|| {
let ecs = state.ecs();
Some((
ecs.write_storage::<Client>().remove(entity)?,
ecs.write_storage::<Uid>().remove(entity)?,
ecs.write_storage::<comp::Player>().remove(entity)?,
))
})() {
// Tell client its request was successful
client.in_game = None;
client.send_msg(ServerGeneral::ExitInGameSuccess);
client.send_fallible(ServerGeneral::ExitInGameSuccess);
let entity_builder = state.ecs_mut().create_entity().with(client).with(player);
@ -127,9 +132,9 @@ pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event
state.read_storage::<Uid>().get(entity),
state.read_storage::<comp::Player>().get(entity),
) {
state.notify_registered_clients(comp::ChatType::Offline(*uid).server_msg(""));
state.notify_players(comp::ChatType::Offline(*uid).server_msg(""));
state.notify_registered_clients(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Remove(
state.notify_players(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Remove(
*uid,
)));
}
@ -141,8 +146,8 @@ pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event
}
// Sync the player's character data to the database
if let (Some(player), Some(stats), Some(inventory), Some(loadout), updater) = (
state.read_storage::<Player>().get(entity),
if let (Some(presences), Some(stats), Some(inventory), Some(loadout), updater) = (
state.read_storage::<Presence>().get(entity),
state.read_storage::<comp::Stats>().get(entity),
state.read_storage::<comp::Inventory>().get(entity),
state.read_storage::<comp::Loadout>().get(entity),
@ -150,7 +155,7 @@ pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event
.ecs()
.read_resource::<persistence::character_updater::CharacterUpdater>(),
) {
if let Some(character_id) = player.character_id {
if let PresenceKind::Character(character_id) = presences.kind {
updater.update(character_id, stats, inventory, loadout);
}
}

View File

@ -17,6 +17,7 @@ pub mod input;
pub mod login_provider;
pub mod metrics;
pub mod persistence;
pub mod presence;
pub mod settings;
pub mod state_ext;
pub mod sys;
@ -34,11 +35,12 @@ pub use crate::{
use crate::{
alias_validator::AliasValidator,
chunk_generator::ChunkGenerator,
client::{Client, RegionSubscription},
client::Client,
cmd::ChatCommandExt,
connection_handler::ConnectionHandler,
data_dir::DataDir,
login_provider::LoginProvider,
presence::{Presence, RegionSubscription},
state_ext::StateExt,
sys::sentinel::{DeletedEntities, TrackedComps},
};
@ -163,7 +165,13 @@ impl Server {
// System timers for performance monitoring
state.ecs_mut().insert(sys::EntitySyncTimer::default());
state.ecs_mut().insert(sys::MessageTimer::default());
state.ecs_mut().insert(sys::GeneralMsgTimer::default());
state.ecs_mut().insert(sys::PingMsgTimer::default());
state.ecs_mut().insert(sys::RegisterMsgTimer::default());
state
.ecs_mut()
.insert(sys::CharacterScreenMsgTimer::default());
state.ecs_mut().insert(sys::InGameMsgTimer::default());
state.ecs_mut().insert(sys::SentinelTimer::default());
state.ecs_mut().insert(sys::SubscriptionTimer::default());
state.ecs_mut().insert(sys::TerrainSyncTimer::default());
@ -180,6 +188,7 @@ impl Server {
// Server-only components
state.ecs_mut().register::<RegionSubscription>();
state.ecs_mut().register::<Client>();
state.ecs_mut().register::<Presence>();
//Alias validator
let banned_words_paths = &settings.banned_words_files;
@ -452,13 +461,18 @@ impl Server {
let before_new_connections = Instant::now();
// 3) Handle inputs from clients
self.handle_new_connections(&mut frontend_events)?;
self.handle_new_connections(&mut frontend_events);
let before_message_system = Instant::now();
// Run message receiving sys before the systems in common for decreased latency
// (e.g. run before controller system)
sys::message::Sys.run_now(&self.state.ecs());
//TODO: run in parallel
sys::msg::general::Sys.run_now(&self.state.ecs());
sys::msg::register::Sys.run_now(&self.state.ecs());
sys::msg::character_screen::Sys.run_now(&self.state.ecs());
sys::msg::in_game::Sys.run_now(&self.state.ecs());
sys::msg::ping::Sys.run_now(&self.state.ecs());
let before_state_tick = Instant::now();
@ -607,7 +621,14 @@ impl Server {
.ecs()
.read_resource::<sys::EntitySyncTimer>()
.nanos as i64;
let message_nanos = self.state.ecs().read_resource::<sys::MessageTimer>().nanos as i64;
let message_nanos = {
let state = self.state.ecs();
(state.read_resource::<sys::GeneralMsgTimer>().nanos
+ state.read_resource::<sys::PingMsgTimer>().nanos
+ state.read_resource::<sys::RegisterMsgTimer>().nanos
+ state.read_resource::<sys::CharacterScreenMsgTimer>().nanos
+ state.read_resource::<sys::InGameMsgTimer>().nanos) as i64
};
let sentinel_nanos = self.state.ecs().read_resource::<sys::SentinelTimer>().nanos as i64;
let subscription_nanos = self
.state
@ -793,8 +814,53 @@ impl Server {
self.state.cleanup();
}
fn initialize_client(
&mut self,
client: crate::connection_handler::IncomingClient,
) -> Result<Option<specs::Entity>, Error> {
if self.settings().max_players <= self.state.ecs().read_storage::<Client>().join().count() {
trace!(
?client.participant,
"to many players, wont allow participant to connect"
);
client.send(ServerInit::TooManyPlayers)?;
return Ok(None);
}
let entity = self
.state
.ecs_mut()
.create_entity_synced()
.with(client)
.build();
self.state
.ecs()
.read_resource::<metrics::PlayerMetrics>()
.clients_connected
.inc();
// Send client all the tracked components currently attached to its entity as
// well as synced resources (currently only `TimeOfDay`)
debug!("Starting initial sync with client.");
self.state
.ecs()
.read_storage::<Client>()
.get(entity)
.unwrap()
.send(ServerInit::GameSync {
// Send client their entity
entity_package: TrackedComps::fetch(&self.state.ecs())
.create_entity_package(entity, None, None, None),
time_of_day: *self.state.ecs().read_resource(),
max_group_size: self.settings().max_player_group_size,
client_timeout: self.settings().client_timeout,
world_map: self.map.clone(),
recipe_book: (&*default_recipe_book()).clone(),
})?;
Ok(Some(entity))
}
/// Handle new client connections.
fn handle_new_connections(&mut self, frontend_events: &mut Vec<Event>) -> Result<(), Error> {
fn handle_new_connections(&mut self, frontend_events: &mut Vec<Event>) {
while let Ok(sender) = self.connection_handler.info_requester_receiver.try_recv() {
// can fail, e.g. due to timeout or network prob.
trace!("sending info to connection_handler");
@ -804,69 +870,32 @@ impl Server {
});
}
while let Ok(data) = self.connection_handler.client_receiver.try_recv() {
let mut client = data;
if self.settings().max_players
<= self.state.ecs().read_storage::<Client>().join().count()
{
trace!(
?client.participant,
"to many players, wont allow participant to connect"
);
client.register_stream.send(ServerInit::TooManyPlayers)?;
continue;
while let Ok(incoming) = self.connection_handler.client_receiver.try_recv() {
match self.initialize_client(incoming) {
Ok(None) => (),
Ok(Some(entity)) => {
frontend_events.push(Event::ClientConnected { entity });
debug!("Done initial sync with client.");
},
Err(e) => {
debug!(?e, "failed initializing a new client");
},
}
let entity = self
.state
.ecs_mut()
.create_entity_synced()
.with(client)
.build();
self.state
.ecs()
.read_resource::<metrics::PlayerMetrics>()
.clients_connected
.inc();
// Send client all the tracked components currently attached to its entity as
// well as synced resources (currently only `TimeOfDay`)
debug!("Starting initial sync with client.");
self.state
.ecs()
.write_storage::<Client>()
.get_mut(entity)
.unwrap()
.register_stream
.send(ServerInit::GameSync {
// Send client their entity
entity_package: TrackedComps::fetch(&self.state.ecs())
.create_entity_package(entity, None, None, None),
time_of_day: *self.state.ecs().read_resource(),
max_group_size: self.settings().max_player_group_size,
client_timeout: self.settings().client_timeout,
world_map: self.map.clone(),
recipe_book: (&*default_recipe_book()).clone(),
})?;
frontend_events.push(Event::ClientConnected { entity });
debug!("Done initial sync with client.");
}
Ok(())
}
pub fn notify_client<S>(&self, entity: EcsEntity, msg: S)
where
S: Into<ServerMsg>,
{
if let Some(client) = self.state.ecs().write_storage::<Client>().get_mut(entity) {
client.send_msg(msg.into())
}
self.state
.ecs()
.read_storage::<Client>()
.get(entity)
.map(|c| c.send(msg));
}
pub fn notify_registered_clients(&mut self, msg: ServerGeneral) {
self.state.notify_registered_clients(msg);
}
pub fn notify_players(&mut self, msg: ServerGeneral) { self.state.notify_players(msg); }
pub fn generate_chunk(&mut self, entity: EcsEntity, key: Vec2<i32>) {
self.state
@ -944,7 +973,7 @@ impl Server {
impl Drop for Server {
fn drop(&mut self) {
self.state
.notify_registered_clients(ServerGeneral::Disconnect(DisconnectReason::Shutdown));
.notify_players(ServerGeneral::Disconnect(DisconnectReason::Shutdown));
}
}

40
server/src/presence.rs Normal file
View File

@ -0,0 +1,40 @@
use common::msg::PresenceKind;
use hashbrown::HashSet;
use serde::{Deserialize, Serialize};
use specs::{Component, FlaggedStorage};
use specs_idvs::IdvStorage;
use vek::*;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Presence {
pub view_distance: u32,
pub kind: PresenceKind,
}
impl Presence {
pub fn new(view_distance: u32, kind: PresenceKind) -> Self {
Self {
view_distance,
kind,
}
}
}
impl Component for Presence {
type Storage = FlaggedStorage<Self, IdvStorage<Self>>;
}
// Distance from fuzzy_chunk before snapping to current chunk
pub const CHUNK_FUZZ: u32 = 2;
// Distance out of the range of a region before removing it from subscriptions
pub const REGION_FUZZ: u32 = 16;
#[derive(Clone, Debug)]
pub struct RegionSubscription {
pub fuzzy_chunk: Vec2<i32>,
pub regions: HashSet<Vec2<i32>>,
}
impl Component for RegionSubscription {
type Storage = FlaggedStorage<Self, IdvStorage<Self>>;
}

View File

@ -1,11 +1,12 @@
use crate::{
client::Client, persistence::PersistedComponents, sys::sentinel::DeletedEntities, SpawnPoint,
client::Client, persistence::PersistedComponents, presence::Presence,
sys::sentinel::DeletedEntities, SpawnPoint,
};
use common::{
character::CharacterId,
comp,
effect::Effect,
msg::{CharacterInfo, ClientInGame, PlayerListUpdate, ServerGeneral, ServerMsg},
msg::{CharacterInfo, PlayerListUpdate, PresenceKind, ServerGeneral},
state::State,
sync::{Uid, UidAllocator, WorldSyncExt},
util::Dir,
@ -59,7 +60,7 @@ pub trait StateExt {
fn update_character_data(&mut self, entity: EcsEntity, components: PersistedComponents);
/// Iterates over registered clients and send each `ServerMsg`
fn send_chat(&self, msg: comp::UnresolvedChatMsg);
fn notify_registered_clients(&self, msg: ServerGeneral);
fn notify_players(&self, msg: ServerGeneral);
fn notify_in_game_clients(&self, msg: ServerGeneral);
/// Delete an entity, recording the deletion in [`DeletedEntities`]
fn delete_entity_recorded(
@ -208,22 +209,15 @@ impl StateExt for State {
// Make sure physics components are updated
self.write_component(entity, comp::ForceUpdate);
// Set the character id for the player
// TODO this results in a warning in the console: "Error modifying synced
// component, it doesn't seem to exist"
// It appears to be caused by the player not yet existing on the client at this
// point, despite being able to write the data on the server
self.ecs()
.write_storage::<comp::Player>()
.get_mut(entity)
.map(|player| {
player.character_id = Some(character_id);
});
const INITIAL_VD: u32 = 5; //will be changed after login
self.write_component(
entity,
Presence::new(INITIAL_VD, PresenceKind::Character(character_id)),
);
// Tell the client its request was successful.
if let Some(client) = self.ecs().write_storage::<Client>().get_mut(entity) {
client.in_game = Some(ClientInGame::Character);
client.send_msg(ServerGeneral::CharacterSuccess)
if let Some(client) = self.ecs().read_storage::<Client>().get(entity) {
client.send_fallible(ServerGeneral::CharacterSuccess);
}
}
@ -232,7 +226,7 @@ impl StateExt for State {
if let Some(player_uid) = self.read_component_copied::<Uid>(entity) {
// Notify clients of a player list update
self.notify_registered_clients(ServerGeneral::PlayerListUpdate(
self.notify_players(ServerGeneral::PlayerListUpdate(
PlayerListUpdate::SelectedCharacter(player_uid, CharacterInfo {
name: String::from(&stats.name),
level: stats.level.level(),
@ -277,30 +271,22 @@ impl StateExt for State {
| comp::ChatType::Loot
| comp::ChatType::Kill(_, _)
| comp::ChatType::Meta
| comp::ChatType::World(_) => {
self.notify_registered_clients(ServerGeneral::ChatMsg(resolved_msg))
},
| comp::ChatType::World(_) => self.notify_players(ServerGeneral::ChatMsg(resolved_msg)),
comp::ChatType::Online(u) => {
for (client, uid) in (
&mut ecs.write_storage::<Client>(),
&ecs.read_storage::<Uid>(),
)
.join()
for (client, uid) in
(&ecs.read_storage::<Client>(), &ecs.read_storage::<Uid>()).join()
{
if uid != u {
client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone()));
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
},
comp::ChatType::Tell(u, t) => {
for (client, uid) in (
&mut ecs.write_storage::<Client>(),
&ecs.read_storage::<Uid>(),
)
.join()
for (client, uid) in
(&ecs.read_storage::<Client>(), &ecs.read_storage::<Uid>()).join()
{
if uid == u || uid == t {
client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone()));
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
},
@ -310,9 +296,9 @@ impl StateExt for State {
(*ecs.read_resource::<UidAllocator>()).retrieve_entity_internal(uid.0);
let positions = ecs.read_storage::<comp::Pos>();
if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) {
for (client, pos) in (&mut ecs.write_storage::<Client>(), &positions).join() {
for (client, pos) in (&ecs.read_storage::<Client>(), &positions).join() {
if is_within(comp::ChatMsg::SAY_DISTANCE, pos, speaker_pos) {
client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone()));
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
}
@ -322,9 +308,9 @@ impl StateExt for State {
(*ecs.read_resource::<UidAllocator>()).retrieve_entity_internal(uid.0);
let positions = ecs.read_storage::<comp::Pos>();
if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) {
for (client, pos) in (&mut ecs.write_storage::<Client>(), &positions).join() {
for (client, pos) in (&ecs.read_storage::<Client>(), &positions).join() {
if is_within(comp::ChatMsg::REGION_DISTANCE, pos, speaker_pos) {
client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone()));
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
}
@ -334,9 +320,9 @@ impl StateExt for State {
(*ecs.read_resource::<UidAllocator>()).retrieve_entity_internal(uid.0);
let positions = ecs.read_storage::<comp::Pos>();
if let Some(speaker_pos) = entity_opt.and_then(|e| positions.get(e)) {
for (client, pos) in (&mut ecs.write_storage::<Client>(), &positions).join() {
for (client, pos) in (&ecs.read_storage::<Client>(), &positions).join() {
if is_within(comp::ChatMsg::NPC_DISTANCE, pos, speaker_pos) {
client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone()));
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
}
@ -344,25 +330,25 @@ impl StateExt for State {
comp::ChatType::FactionMeta(s) | comp::ChatType::Faction(_, s) => {
for (client, faction) in (
&mut ecs.write_storage::<Client>(),
&ecs.read_storage::<Client>(),
&ecs.read_storage::<comp::Faction>(),
)
.join()
{
if s == &faction.0 {
client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone()));
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
},
comp::ChatType::GroupMeta(g) | comp::ChatType::Group(_, g) => {
for (client, group) in (
&mut ecs.write_storage::<Client>(),
&ecs.read_storage::<Client>(),
&ecs.read_storage::<comp::Group>(),
)
.join()
{
if g == group {
client.send_msg(ServerGeneral::ChatMsg(resolved_msg.clone()));
client.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
},
@ -370,24 +356,36 @@ impl StateExt for State {
}
/// Sends the message to all connected clients
fn notify_registered_clients(&self, msg: ServerGeneral) {
let msg: ServerMsg = msg.into();
for client in (&mut self.ecs().write_storage::<Client>())
fn notify_players(&self, msg: ServerGeneral) {
let mut msg = Some(msg);
let mut lazy_msg = None;
for (client, _) in (
&self.ecs().read_storage::<Client>(),
&self.ecs().read_storage::<comp::Player>(),
)
.join()
.filter(|c| c.registered)
{
client.send_msg(msg.clone());
if lazy_msg.is_none() {
lazy_msg = Some(client.prepare(msg.take().unwrap()));
}
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));
}
}
/// Sends the message to all clients playing in game
fn notify_in_game_clients(&self, msg: ServerGeneral) {
let msg: ServerMsg = msg.into();
for client in (&mut self.ecs().write_storage::<Client>())
let mut msg = Some(msg);
let mut lazy_msg = None;
for (client, _) in (
&mut self.ecs().write_storage::<Client>(),
&self.ecs().read_storage::<Presence>(),
)
.join()
.filter(|c| c.in_game.is_some())
{
client.send_msg(msg.clone());
if lazy_msg.is_none() {
lazy_msg = Some(client.prepare(msg.take().unwrap()));
}
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));
}
}
@ -397,7 +395,7 @@ impl StateExt for State {
) -> Result<(), specs::error::WrongGeneration> {
// Remove entity from a group if they are in one
{
let mut clients = self.ecs().write_storage::<Client>();
let clients = self.ecs().read_storage::<Client>();
let uids = self.ecs().read_storage::<Uid>();
let mut group_manager = self.ecs().write_resource::<comp::group::GroupManager>();
group_manager.entity_deleted(
@ -408,13 +406,13 @@ impl StateExt for State {
&self.ecs().entities(),
&mut |entity, group_change| {
clients
.get_mut(entity)
.get(entity)
.and_then(|c| {
group_change
.try_map(|e| uids.get(e).copied())
.map(|g| (g, c))
})
.map(|(g, c)| c.send_msg(ServerGeneral::GroupUpdate(g)));
.map(|(g, c)| c.send(ServerGeneral::GroupUpdate(g)));
},
);
}

View File

@ -3,11 +3,12 @@ use super::{
SysTimer,
};
use crate::{
client::{Client, RegionSubscription},
client::Client,
presence::{Presence, RegionSubscription},
Tick,
};
use common::{
comp::{ForceUpdate, Inventory, InventoryUpdate, Last, Ori, Player, Pos, Vel},
comp::{ForceUpdate, Inventory, InventoryUpdate, Last, Ori, Pos, Vel},
msg::ServerGeneral,
outcome::Outcome,
region::{Event as RegionEvent, RegionMap},
@ -38,11 +39,11 @@ impl<'a> System<'a> for Sys {
ReadStorage<'a, Ori>,
ReadStorage<'a, Inventory>,
ReadStorage<'a, RegionSubscription>,
ReadStorage<'a, Player>,
ReadStorage<'a, Presence>,
WriteStorage<'a, Last<Pos>>,
WriteStorage<'a, Last<Vel>>,
WriteStorage<'a, Last<Ori>>,
WriteStorage<'a, Client>,
ReadStorage<'a, Client>,
WriteStorage<'a, ForceUpdate>,
WriteStorage<'a, InventoryUpdate>,
Write<'a, DeletedEntities>,
@ -65,11 +66,11 @@ impl<'a> System<'a> for Sys {
orientations,
inventories,
subscriptions,
players,
presences,
mut last_pos,
mut last_vel,
mut last_ori,
mut clients,
clients,
mut force_updates,
mut inventory_updates,
mut deleted_entities,
@ -104,10 +105,16 @@ impl<'a> System<'a> for Sys {
for (key, region) in region_map.iter() {
// Assemble subscriber list for this region by iterating through clients and
// checking if they are subscribed to this region
let mut subscribers = (&mut clients, &entities, &subscriptions, &positions)
let mut subscribers = (
&clients,
&entities,
presences.maybe(),
&subscriptions,
&positions,
)
.join()
.filter_map(|(client, entity, subscription, pos)| {
if client.in_game.is_some() && subscription.regions.contains(&key) {
.filter_map(|(client, entity, presence, subscription, pos)| {
if presence.is_some() && subscription.regions.contains(&key) {
Some((client, &subscription.regions, entity, *pos))
} else {
None
@ -143,7 +150,7 @@ impl<'a> System<'a> for Sys {
// Client doesn't need to know about itself
&& *client_entity != entity
{
client.send_msg(create_msg.clone());
client.send_fallible(create_msg.clone());
}
}
}
@ -157,7 +164,7 @@ impl<'a> System<'a> for Sys {
.map(|key| !regions.contains(key))
.unwrap_or(true)
{
client.send_msg(ServerGeneral::DeleteEntity(uid));
client.send_fallible(ServerGeneral::DeleteEntity(uid));
}
}
}
@ -174,18 +181,32 @@ impl<'a> System<'a> for Sys {
.take_deleted_in_region(key)
.unwrap_or_default(),
);
let entity_sync_msg = ServerGeneral::EntitySync(entity_sync_package);
let comp_sync_msg = ServerGeneral::CompSync(comp_sync_package);
let mut entity_sync_package = Some(entity_sync_package);
let mut comp_sync_package = Some(comp_sync_package);
let mut entity_sync_lazymsg = None;
let mut comp_sync_lazymsg = None;
subscribers.iter_mut().for_each(move |(client, _, _, _)| {
client.send_msg(entity_sync_msg.clone());
client.send_msg(comp_sync_msg.clone());
if entity_sync_lazymsg.is_none() {
entity_sync_lazymsg = Some(client.prepare(ServerGeneral::EntitySync(
entity_sync_package.take().unwrap(),
)));
comp_sync_lazymsg = Some(
client.prepare(ServerGeneral::CompSync(comp_sync_package.take().unwrap())),
);
}
entity_sync_lazymsg
.as_ref()
.map(|msg| client.send_prepared(&msg));
comp_sync_lazymsg
.as_ref()
.map(|msg| client.send_prepared(&msg));
});
let mut send_msg = |msg: ServerGeneral,
entity: EcsEntity,
pos: Pos,
force_update: Option<&ForceUpdate>,
throttle: bool| {
let mut send_general = |msg: ServerGeneral,
entity: EcsEntity,
pos: Pos,
force_update: Option<&ForceUpdate>,
throttle: bool| {
for (client, _, client_entity, client_pos) in &mut subscribers {
if if client_entity == &entity {
// Don't send client physics updates about itself unless force update is set
@ -212,7 +233,7 @@ impl<'a> System<'a> for Sys {
true // Closer than 100 blocks
}
} {
client.send_msg(msg.clone());
client.send_fallible(msg.clone());
}
}
};
@ -286,7 +307,7 @@ impl<'a> System<'a> for Sys {
comp_sync_package.comp_removed::<Ori>(uid);
}
send_msg(
send_general(
ServerGeneral::CompSync(comp_sync_package),
entity,
pos,
@ -299,19 +320,18 @@ impl<'a> System<'a> for Sys {
// Handle entity deletion in regions that don't exist in RegionMap
// (theoretically none)
for (region_key, deleted) in deleted_entities.take_remaining_deleted() {
for client in
(&mut clients, &subscriptions)
.join()
.filter_map(|(client, subscription)| {
if client.in_game.is_some() && subscription.regions.contains(&region_key) {
Some(client)
} else {
None
}
})
for client in (presences.maybe(), &subscriptions, &clients)
.join()
.filter_map(|(presence, subscription, client)| {
if presence.is_some() && subscription.regions.contains(&region_key) {
Some(client)
} else {
None
}
})
{
for uid in &deleted {
client.send_msg(ServerGeneral::DeleteEntity(Uid(*uid)));
client.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid)));
}
}
}
@ -319,19 +339,20 @@ impl<'a> System<'a> for Sys {
// TODO: Sync clients that don't have a position?
// Sync inventories
for (client, inventory, update) in (&mut clients, &inventories, &inventory_updates).join() {
client.send_msg(ServerGeneral::InventoryUpdate(
for (inventory, update, client) in (&inventories, &inventory_updates, &clients).join() {
client.send_fallible(ServerGeneral::InventoryUpdate(
inventory.clone(),
update.event(),
));
}
// Sync outcomes
for (client, player, pos) in (&mut clients, &players, positions.maybe()).join() {
for (presence, pos, client) in (presences.maybe(), positions.maybe(), &clients).join() {
let is_near = |o_pos: Vec3<f32>| {
pos.zip_with(player.view_distance, |pos, vd| {
pos.zip_with(presence, |pos, presence| {
pos.0.xy().distance_squared(o_pos.xy())
< (vd as f32 * TerrainChunkSize::RECT_SIZE.x as f32).powf(2.0)
< (presence.view_distance as f32 * TerrainChunkSize::RECT_SIZE.x as f32)
.powf(2.0)
})
};
@ -341,7 +362,7 @@ impl<'a> System<'a> for Sys {
.cloned()
.collect::<Vec<_>>();
if !outcomes.is_empty() {
client.send_msg(ServerGeneral::Outcomes(outcomes));
client.send_fallible(ServerGeneral::Outcomes(outcomes));
}
}
outcomes.clear();
@ -353,9 +374,12 @@ impl<'a> System<'a> for Sys {
// Sync resources
// TODO: doesn't really belong in this system (rename system or create another
// system?)
let tof_msg = ServerGeneral::TimeOfDay(*time_of_day);
for client in (&mut clients).join() {
client.send_msg(tof_msg.clone());
let mut tof_lazymsg = None;
for client in (&clients).join() {
if tof_lazymsg.is_none() {
tof_lazymsg = Some(client.prepare(ServerGeneral::TimeOfDay(*time_of_day)));
}
tof_lazymsg.as_ref().map(|msg| client.send_prepared(&msg));
}
timer.end();

View File

@ -16,14 +16,14 @@ impl<'a> System<'a> for Sys {
Entities<'a>,
WriteStorage<'a, Invite>,
WriteStorage<'a, PendingInvites>,
WriteStorage<'a, Client>,
ReadStorage<'a, Client>,
ReadStorage<'a, Uid>,
Write<'a, SysTimer<Self>>,
);
fn run(
&mut self,
(entities, mut invites, mut pending_invites, mut clients, uids, mut timer): Self::SystemData,
(entities, mut invites, mut pending_invites, clients, uids, mut timer): Self::SystemData,
) {
span!(_guard, "run", "invite_timeout::Sys::run");
timer.start();
@ -52,12 +52,12 @@ impl<'a> System<'a> for Sys {
// Inform inviter of timeout
if let (Some(client), Some(target)) =
(clients.get_mut(*inviter), uids.get(invitee).copied())
(clients.get(*inviter), uids.get(invitee).copied())
{
client.send_msg(ServerGeneral::InviteComplete {
client.send_fallible(ServerGeneral::InviteComplete {
target,
answer: InviteAnswer::TimedOut,
})
});
}
Some(invitee)

View File

@ -1,737 +0,0 @@
use super::SysTimer;
use crate::{
alias_validator::AliasValidator,
character_creator,
client::Client,
login_provider::LoginProvider,
metrics::{NetworkRequestMetrics, PlayerMetrics},
persistence::character_loader::CharacterLoader,
EditableSettings, Settings,
};
use common::{
comp::{
Admin, CanBuild, ChatMode, ChatType, ControlEvent, Controller, ForceUpdate, Ori, Player,
Pos, Stats, UnresolvedChatMsg, Vel,
},
event::{EventBus, ServerEvent},
msg::{
validate_chat_msg, CharacterInfo, ChatMsgValidationError, ClientGeneral, ClientInGame,
ClientRegister, DisconnectReason, PingMsg, PlayerInfo, PlayerListUpdate, RegisterError,
ServerGeneral, ServerRegisterAnswer, MAX_BYTES_CHAT_MSG,
},
span,
state::{BlockChange, Time},
sync::Uid,
terrain::{TerrainChunkSize, TerrainGrid},
vol::{ReadVol, RectVolSize},
};
use futures_executor::block_on;
use futures_timer::Delay;
use futures_util::{select, FutureExt};
use hashbrown::HashMap;
use specs::{
Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage,
};
use tracing::{debug, error, info, trace, warn};
impl Sys {
#[allow(clippy::too_many_arguments)]
fn handle_client_msg(
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
new_chat_msgs: &mut Vec<(Option<specs::Entity>, UnresolvedChatMsg)>,
entity: specs::Entity,
client: &mut Client,
player_metrics: &ReadExpect<'_, PlayerMetrics>,
uids: &ReadStorage<'_, Uid>,
chat_modes: &ReadStorage<'_, ChatMode>,
msg: ClientGeneral,
) -> Result<(), crate::error::Error> {
match msg {
ClientGeneral::ChatMsg(message) => {
if client.registered {
match validate_chat_msg(&message) {
Ok(()) => {
if let Some(from) = uids.get(entity) {
let mode = chat_modes.get(entity).cloned().unwrap_or_default();
let msg = mode.new_message(*from, message);
new_chat_msgs.push((Some(entity), msg));
} else {
error!("Could not send message. Missing player uid");
}
},
Err(ChatMsgValidationError::TooLong) => {
let max = MAX_BYTES_CHAT_MSG;
let len = message.len();
warn!(?len, ?max, "Received a chat message that's too long")
},
}
}
},
ClientGeneral::Disconnect => {
client.send_msg(ServerGeneral::Disconnect(DisconnectReason::Requested));
},
ClientGeneral::Terminate => {
debug!(?entity, "Client send message to termitate session");
player_metrics
.clients_disconnected
.with_label_values(&["gracefully"])
.inc();
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
},
_ => unreachable!("not a client_general msg"),
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn handle_client_in_game_msg(
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
entity: specs::Entity,
client: &mut Client,
terrain: &ReadExpect<'_, TerrainGrid>,
network_metrics: &ReadExpect<'_, NetworkRequestMetrics>,
can_build: &ReadStorage<'_, CanBuild>,
force_updates: &ReadStorage<'_, ForceUpdate>,
stats: &mut WriteStorage<'_, Stats>,
block_changes: &mut Write<'_, BlockChange>,
positions: &mut WriteStorage<'_, Pos>,
velocities: &mut WriteStorage<'_, Vel>,
orientations: &mut WriteStorage<'_, Ori>,
players: &mut WriteStorage<'_, Player>,
controllers: &mut WriteStorage<'_, Controller>,
settings: &Read<'_, Settings>,
msg: ClientGeneral,
) -> Result<(), crate::error::Error> {
if client.in_game.is_none() {
debug!(?entity, "client is not in_game, ignoring msg");
trace!(?msg, "ignored msg content");
if matches!(msg, ClientGeneral::TerrainChunkRequest{ .. }) {
network_metrics.chunks_request_dropped.inc();
}
return Ok(());
}
match msg {
// Go back to registered state (char selection screen)
ClientGeneral::ExitInGame => {
client.in_game = None;
server_emitter.emit(ServerEvent::ExitIngame { entity });
client.send_msg(ServerGeneral::ExitInGameSuccess);
},
ClientGeneral::SetViewDistance(view_distance) => {
players.get_mut(entity).map(|player| {
player.view_distance = Some(
settings
.max_view_distance
.map(|max| view_distance.min(max))
.unwrap_or(view_distance),
)
});
//correct client if its VD is to high
if settings
.max_view_distance
.map(|max| view_distance > max)
.unwrap_or(false)
{
client.send_msg(ServerGeneral::SetViewDistance(
settings.max_view_distance.unwrap_or(0),
));
}
},
ClientGeneral::ControllerInputs(inputs) => {
if let Some(ClientInGame::Character) = client.in_game {
if let Some(controller) = controllers.get_mut(entity) {
controller.inputs.update_with_new(inputs);
}
}
},
ClientGeneral::ControlEvent(event) => {
if let Some(ClientInGame::Character) = client.in_game {
// Skip respawn if client entity is alive
if let ControlEvent::Respawn = event {
if stats.get(entity).map_or(true, |s| !s.is_dead) {
//Todo: comment why return!
return Ok(());
}
}
if let Some(controller) = controllers.get_mut(entity) {
controller.events.push(event);
}
}
},
ClientGeneral::ControlAction(event) => {
if let Some(ClientInGame::Character) = client.in_game {
if let Some(controller) = controllers.get_mut(entity) {
controller.actions.push(event);
}
}
},
ClientGeneral::PlayerPhysics { pos, vel, ori } => {
if let Some(ClientInGame::Character) = client.in_game {
if force_updates.get(entity).is_none()
&& stats.get(entity).map_or(true, |s| !s.is_dead)
{
let _ = positions.insert(entity, pos);
let _ = velocities.insert(entity, vel);
let _ = orientations.insert(entity, ori);
}
}
},
ClientGeneral::BreakBlock(pos) => {
if let Some(block) = can_build.get(entity).and_then(|_| terrain.get(pos).ok()) {
block_changes.set(pos, block.into_vacant());
}
},
ClientGeneral::PlaceBlock(pos, block) => {
if can_build.get(entity).is_some() {
block_changes.try_set(pos, block);
}
},
ClientGeneral::TerrainChunkRequest { key } => {
let in_vd = if let (Some(view_distance), Some(pos)) = (
players.get(entity).and_then(|p| p.view_distance),
positions.get(entity),
) {
pos.0.xy().map(|e| e as f64).distance(
key.map(|e| e as f64 + 0.5) * TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
) < (view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt())
* TerrainChunkSize::RECT_SIZE.x as f64
} else {
true
};
if in_vd {
match terrain.get_key(key) {
Some(chunk) => {
network_metrics.chunks_served_from_memory.inc();
client.send_msg(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
})
},
None => {
network_metrics.chunks_generation_triggered.inc();
server_emitter.emit(ServerEvent::ChunkRequest(entity, key))
},
}
} else {
network_metrics.chunks_request_dropped.inc();
}
},
ClientGeneral::UnlockSkill(skill) => {
stats
.get_mut(entity)
.map(|s| s.skill_set.unlock_skill(skill));
},
ClientGeneral::RefundSkill(skill) => {
stats
.get_mut(entity)
.map(|s| s.skill_set.refund_skill(skill));
},
ClientGeneral::UnlockSkillGroup(skill_group_type) => {
stats
.get_mut(entity)
.map(|s| s.skill_set.unlock_skill_group(skill_group_type));
},
_ => unreachable!("not a client_in_game msg"),
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn handle_client_character_screen_msg(
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
new_chat_msgs: &mut Vec<(Option<specs::Entity>, UnresolvedChatMsg)>,
entity: specs::Entity,
client: &mut Client,
character_loader: &ReadExpect<'_, CharacterLoader>,
uids: &ReadStorage<'_, Uid>,
players: &mut WriteStorage<'_, Player>,
editable_settings: &ReadExpect<'_, EditableSettings>,
alias_validator: &ReadExpect<'_, AliasValidator>,
msg: ClientGeneral,
) -> Result<(), crate::error::Error> {
match msg {
// Request spectator state
ClientGeneral::Spectate if client.registered => {
client.in_game = Some(ClientInGame::Spectator)
},
ClientGeneral::Spectate => debug!("dropped Spectate msg from unregistered client"),
ClientGeneral::Character(character_id)
if client.registered && client.in_game.is_none() =>
{
if let Some(player) = players.get(entity) {
// Send a request to load the character's component data from the
// DB. Once loaded, persisted components such as stats and inventory
// will be inserted for the entity
character_loader.load_character_data(
entity,
player.uuid().to_string(),
character_id,
);
// Start inserting non-persisted/default components for the entity
// while we load the DB data
server_emitter.emit(ServerEvent::InitCharacterData {
entity,
character_id,
});
// Give the player a welcome message
if !editable_settings.server_description.is_empty() {
client.send_msg(
ChatType::CommandInfo
.server_msg(String::from(&*editable_settings.server_description)),
);
}
if !client.login_msg_sent {
if let Some(player_uid) = uids.get(entity) {
new_chat_msgs.push((None, UnresolvedChatMsg {
chat_type: ChatType::Online(*player_uid),
message: "".to_string(),
}));
client.login_msg_sent = true;
}
}
} else {
client.send_msg(ServerGeneral::CharacterDataLoadError(String::from(
"Failed to fetch player entity",
)))
}
}
ClientGeneral::Character(_) => {
let registered = client.registered;
let in_game = client.in_game;
debug!(?registered, ?in_game, "dropped Character msg from client");
},
ClientGeneral::RequestCharacterList => {
if let Some(player) = players.get(entity) {
character_loader.load_character_list(entity, player.uuid().to_string())
}
},
ClientGeneral::CreateCharacter { alias, tool, body } => {
if let Err(error) = alias_validator.validate(&alias) {
debug!(?error, ?alias, "denied alias as it contained a banned word");
client.send_msg(ServerGeneral::CharacterActionError(error.to_string()));
} else if let Some(player) = players.get(entity) {
character_creator::create_character(
entity,
player.uuid().to_string(),
alias,
tool,
body,
character_loader,
);
}
},
ClientGeneral::DeleteCharacter(character_id) => {
if let Some(player) = players.get(entity) {
character_loader.delete_character(
entity,
player.uuid().to_string(),
character_id,
);
}
},
_ => unreachable!("not a client_character_screen msg"),
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn handle_ping_msg(client: &mut Client, msg: PingMsg) -> Result<(), crate::error::Error> {
match msg {
PingMsg::Ping => client.send_msg(PingMsg::Pong),
PingMsg::Pong => {},
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn handle_register_msg(
player_list: &HashMap<Uid, PlayerInfo>,
new_players: &mut Vec<specs::Entity>,
entity: specs::Entity,
client: &mut Client,
player_metrics: &ReadExpect<'_, PlayerMetrics>,
login_provider: &mut WriteExpect<'_, LoginProvider>,
admins: &mut WriteStorage<'_, Admin>,
players: &mut WriteStorage<'_, Player>,
editable_settings: &ReadExpect<'_, EditableSettings>,
msg: ClientRegister,
) -> Result<(), crate::error::Error> {
let (username, uuid) = match login_provider.try_login(
&msg.token_or_username,
&*editable_settings.admins,
&*editable_settings.whitelist,
&*editable_settings.banlist,
) {
Err(err) => {
client
.register_stream
.send(ServerRegisterAnswer::Err(err))?;
return Ok(());
},
Ok((username, uuid)) => (username, uuid),
};
const INITIAL_VD: Option<u32> = Some(5); //will be changed after login
let player = Player::new(username, None, INITIAL_VD, uuid);
let is_admin = editable_settings.admins.contains(&uuid);
if !player.is_valid() {
// Invalid player
client
.register_stream
.send(ServerRegisterAnswer::Err(RegisterError::InvalidCharacter))?;
return Ok(());
}
if !client.registered && client.in_game.is_none() {
// Add Player component to this client
let _ = players.insert(entity, player);
player_metrics.players_connected.inc();
// Give the Admin component to the player if their name exists in
// admin list
if is_admin {
let _ = admins.insert(entity, Admin);
}
// Tell the client its request was successful.
client.registered = true;
client.register_stream.send(ServerRegisterAnswer::Ok(()))?;
// Send initial player list
client.send_msg(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init(
player_list.clone(),
)));
// Add to list to notify all clients of the new player
new_players.push(entity);
}
Ok(())
}
///We needed to move this to a async fn, if we would use a async closures
/// the compiler generates to much recursion and fails to compile this
#[allow(clippy::too_many_arguments)]
async fn handle_messages(
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
new_chat_msgs: &mut Vec<(Option<specs::Entity>, UnresolvedChatMsg)>,
player_list: &HashMap<Uid, PlayerInfo>,
new_players: &mut Vec<specs::Entity>,
entity: specs::Entity,
client: &mut Client,
cnt: &mut u64,
character_loader: &ReadExpect<'_, CharacterLoader>,
terrain: &ReadExpect<'_, TerrainGrid>,
network_metrics: &ReadExpect<'_, NetworkRequestMetrics>,
player_metrics: &ReadExpect<'_, PlayerMetrics>,
uids: &ReadStorage<'_, Uid>,
can_build: &ReadStorage<'_, CanBuild>,
force_updates: &ReadStorage<'_, ForceUpdate>,
stats: &mut WriteStorage<'_, Stats>,
chat_modes: &ReadStorage<'_, ChatMode>,
login_provider: &mut WriteExpect<'_, LoginProvider>,
block_changes: &mut Write<'_, BlockChange>,
admins: &mut WriteStorage<'_, Admin>,
positions: &mut WriteStorage<'_, Pos>,
velocities: &mut WriteStorage<'_, Vel>,
orientations: &mut WriteStorage<'_, Ori>,
players: &mut WriteStorage<'_, Player>,
controllers: &mut WriteStorage<'_, Controller>,
settings: &Read<'_, Settings>,
editable_settings: &ReadExpect<'_, EditableSettings>,
alias_validator: &ReadExpect<'_, AliasValidator>,
) -> Result<(), crate::error::Error> {
let (mut b1, mut b2, mut b3, mut b4, mut b5) = (
client.network_error,
client.network_error,
client.network_error,
client.network_error,
client.network_error,
);
loop {
/*
waiting for 1 of the 5 streams to return a massage asynchronous.
If so, handle that msg type. This code will be refactored soon
*/
let q1 = Client::internal_recv(&mut b1, &mut client.general_stream);
let q2 = Client::internal_recv(&mut b2, &mut client.in_game_stream);
let q3 = Client::internal_recv(&mut b3, &mut client.character_screen_stream);
let q4 = Client::internal_recv(&mut b4, &mut client.ping_stream);
let q5 = Client::internal_recv(&mut b5, &mut client.register_stream);
let (m1, m2, m3, m4, m5) = select!(
msg = q1.fuse() => (Some(msg), None, None, None, None),
msg = q2.fuse() => (None, Some(msg), None, None, None),
msg = q3.fuse() => (None, None, Some(msg), None, None),
msg = q4.fuse() => (None, None, None, Some(msg), None),
msg = q5.fuse() => (None, None, None, None,Some(msg)),
);
*cnt += 1;
if let Some(msg) = m1 {
client.network_error |= b1;
Self::handle_client_msg(
server_emitter,
new_chat_msgs,
entity,
client,
player_metrics,
uids,
chat_modes,
msg?,
)?;
}
if let Some(msg) = m2 {
client.network_error |= b2;
Self::handle_client_in_game_msg(
server_emitter,
entity,
client,
terrain,
network_metrics,
can_build,
force_updates,
stats,
block_changes,
positions,
velocities,
orientations,
players,
controllers,
settings,
msg?,
)?;
}
if let Some(msg) = m3 {
client.network_error |= b3;
Self::handle_client_character_screen_msg(
server_emitter,
new_chat_msgs,
entity,
client,
character_loader,
uids,
players,
editable_settings,
alias_validator,
msg?,
)?;
}
if let Some(msg) = m4 {
client.network_error |= b4;
Self::handle_ping_msg(client, msg?)?;
}
if let Some(msg) = m5 {
client.network_error |= b5;
Self::handle_register_msg(
player_list,
new_players,
entity,
client,
player_metrics,
login_provider,
admins,
players,
editable_settings,
msg?,
)?;
}
}
}
}
/// This system will handle new messages from clients
pub struct Sys;
impl<'a> System<'a> for Sys {
#[allow(clippy::type_complexity)] // TODO: Pending review in #587
type SystemData = (
Entities<'a>,
Read<'a, EventBus<ServerEvent>>,
Read<'a, Time>,
ReadExpect<'a, CharacterLoader>,
ReadExpect<'a, TerrainGrid>,
ReadExpect<'a, NetworkRequestMetrics>,
ReadExpect<'a, PlayerMetrics>,
Write<'a, SysTimer<Self>>,
ReadStorage<'a, Uid>,
ReadStorage<'a, CanBuild>,
ReadStorage<'a, ForceUpdate>,
WriteStorage<'a, Stats>,
ReadStorage<'a, ChatMode>,
WriteExpect<'a, LoginProvider>,
Write<'a, BlockChange>,
WriteStorage<'a, Admin>,
WriteStorage<'a, Pos>,
WriteStorage<'a, Vel>,
WriteStorage<'a, Ori>,
WriteStorage<'a, Player>,
WriteStorage<'a, Client>,
WriteStorage<'a, Controller>,
Read<'a, Settings>,
ReadExpect<'a, EditableSettings>,
ReadExpect<'a, AliasValidator>,
);
#[allow(clippy::match_ref_pats)] // TODO: Pending review in #587
#[allow(clippy::single_char_pattern)] // TODO: Pending review in #587
#[allow(clippy::single_match)] // TODO: Pending review in #587
fn run(
&mut self,
(
entities,
server_event_bus,
time,
character_loader,
terrain,
network_metrics,
player_metrics,
mut timer,
uids,
can_build,
force_updates,
mut stats,
chat_modes,
mut accounts,
mut block_changes,
mut admins,
mut positions,
mut velocities,
mut orientations,
mut players,
mut clients,
mut controllers,
settings,
editable_settings,
alias_validator,
): Self::SystemData,
) {
span!(_guard, "run", "message::Sys::run");
timer.start();
let mut server_emitter = server_event_bus.emitter();
let mut new_chat_msgs = Vec::new();
// Player list to send new players.
let player_list = (&uids, &players, stats.maybe(), admins.maybe())
.join()
.map(|(uid, player, stats, admin)| {
(*uid, PlayerInfo {
is_online: true,
is_admin: admin.is_some(),
player_alias: player.alias.clone(),
character: stats.map(|stats| CharacterInfo {
name: stats.name.clone(),
level: stats.level.level(),
}),
})
})
.collect::<HashMap<_, _>>();
// List of new players to update player lists of all clients.
let mut new_players = Vec::new();
for (entity, client) in (&entities, &mut clients).join() {
let mut cnt = 0;
let network_err: Result<(), crate::error::Error> = block_on(async {
//TIMEOUT 0.02 ms for msg handling
let work_future = Self::handle_messages(
&mut server_emitter,
&mut new_chat_msgs,
&player_list,
&mut new_players,
entity,
client,
&mut cnt,
&character_loader,
&terrain,
&network_metrics,
&player_metrics,
&uids,
&can_build,
&force_updates,
&mut stats,
&chat_modes,
&mut accounts,
&mut block_changes,
&mut admins,
&mut positions,
&mut velocities,
&mut orientations,
&mut players,
&mut controllers,
&settings,
&editable_settings,
&alias_validator,
);
select!(
_ = Delay::new(std::time::Duration::from_micros(20)).fuse() => Ok(()),
err = work_future.fuse() => err,
)
});
// Network error
if network_err.is_err() {
debug!(?entity, "postbox error with client, disconnecting");
player_metrics
.clients_disconnected
.with_label_values(&["network_error"])
.inc();
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
} else if cnt > 0 {
// Update client ping.
client.last_ping = time.0
} else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64
// Timeout
{
info!(?entity, "timeout error with client, disconnecting");
player_metrics
.clients_disconnected
.with_label_values(&["timeout"])
.inc();
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
} else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 * 0.5 {
// Try pinging the client if the timeout is nearing.
client.send_msg(PingMsg::Ping);
}
}
// Handle new players.
// Tell all clients to add them to the player list.
for entity in new_players {
if let (Some(uid), Some(player)) = (uids.get(entity), players.get(entity)) {
let msg =
ServerGeneral::PlayerListUpdate(PlayerListUpdate::Add(*uid, PlayerInfo {
player_alias: player.alias.clone(),
is_online: true,
is_admin: admins.get(entity).is_some(),
character: None, // new players will be on character select.
}));
for client in (&mut clients).join().filter(|c| c.registered) {
client.send_msg(msg.clone())
}
}
}
// Handle new chat messages.
for (entity, msg) in new_chat_msgs {
// Handle chat commands.
if msg.message.starts_with("/") {
if let (Some(entity), true) = (entity, msg.message.len() > 1) {
let argv = String::from(&msg.message[1..]);
server_emitter.emit(ServerEvent::ChatCmd(entity, argv));
}
} else {
// Send chat message
server_emitter.emit(ServerEvent::Chat(msg));
}
}
timer.end()
}
}

View File

@ -1,6 +1,6 @@
pub mod entity_sync;
pub mod invite_timeout;
pub mod message;
pub mod msg;
pub mod object;
pub mod persistence;
pub mod sentinel;
@ -16,7 +16,11 @@ use std::{
};
pub type EntitySyncTimer = SysTimer<entity_sync::Sys>;
pub type MessageTimer = SysTimer<message::Sys>;
pub type GeneralMsgTimer = SysTimer<msg::general::Sys>;
pub type PingMsgTimer = SysTimer<msg::ping::Sys>;
pub type RegisterMsgTimer = SysTimer<msg::register::Sys>;
pub type CharacterScreenMsgTimer = SysTimer<msg::character_screen::Sys>;
pub type InGameMsgTimer = SysTimer<msg::in_game::Sys>;
pub type SentinelTimer = SysTimer<sentinel::Sys>;
pub type SubscriptionTimer = SysTimer<subscription::Sys>;
pub type TerrainTimer = SysTimer<terrain::Sys>;

View File

@ -0,0 +1,194 @@
use super::super::SysTimer;
use crate::{
alias_validator::AliasValidator, character_creator, client::Client,
persistence::character_loader::CharacterLoader, presence::Presence, EditableSettings,
};
use common::{
comp::{ChatType, Player, UnresolvedChatMsg},
event::{EventBus, ServerEvent},
msg::{ClientGeneral, ServerGeneral},
span,
sync::Uid,
};
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write};
use std::sync::atomic::Ordering;
use tracing::{debug, warn};
impl Sys {
#[allow(clippy::too_many_arguments)]
fn handle_client_character_screen_msg(
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
new_chat_msgs: &mut Vec<(Option<specs::Entity>, UnresolvedChatMsg)>,
entity: specs::Entity,
client: &Client,
character_loader: &ReadExpect<'_, CharacterLoader>,
uids: &ReadStorage<'_, Uid>,
players: &ReadStorage<'_, Player>,
presences: &ReadStorage<'_, Presence>,
editable_settings: &ReadExpect<'_, EditableSettings>,
alias_validator: &ReadExpect<'_, AliasValidator>,
msg: ClientGeneral,
) -> Result<(), crate::error::Error> {
match msg {
// Request spectator state
ClientGeneral::Spectate => {
if players.contains(entity) {
warn!("Spectator mode not yet implemented on server");
} else {
debug!("dropped Spectate msg from unregistered client")
}
},
ClientGeneral::Character(character_id) => {
if let Some(player) = players.get(entity) {
if presences.contains(entity) {
debug!("player already ingame, aborting");
} else {
// Send a request to load the character's component data from the
// DB. Once loaded, persisted components such as stats and inventory
// will be inserted for the entity
character_loader.load_character_data(
entity,
player.uuid().to_string(),
character_id,
);
// Start inserting non-persisted/default components for the entity
// while we load the DB data
server_emitter.emit(ServerEvent::InitCharacterData {
entity,
character_id,
});
// Give the player a welcome message
if !editable_settings.server_description.is_empty() {
client.send(ChatType::CommandInfo.server_msg(String::from(
&*editable_settings.server_description,
)))?;
}
if !client.login_msg_sent.load(Ordering::Relaxed) {
if let Some(player_uid) = uids.get(entity) {
new_chat_msgs.push((None, UnresolvedChatMsg {
chat_type: ChatType::Online(*player_uid),
message: "".to_string(),
}));
client.login_msg_sent.store(true, Ordering::Relaxed);
}
}
}
} else {
debug!("Client is not yet registered");
client.send(ServerGeneral::CharacterDataLoadError(String::from(
"Failed to fetch player entity",
)))?
}
},
ClientGeneral::RequestCharacterList => {
if let Some(player) = players.get(entity) {
character_loader.load_character_list(entity, player.uuid().to_string())
}
},
ClientGeneral::CreateCharacter { alias, tool, body } => {
if let Err(error) = alias_validator.validate(&alias) {
debug!(?error, ?alias, "denied alias as it contained a banned word");
client.send(ServerGeneral::CharacterActionError(error.to_string()))?;
} else if let Some(player) = players.get(entity) {
character_creator::create_character(
entity,
player.uuid().to_string(),
alias,
tool,
body,
character_loader,
);
}
},
ClientGeneral::DeleteCharacter(character_id) => {
if let Some(player) = players.get(entity) {
character_loader.delete_character(
entity,
player.uuid().to_string(),
character_id,
);
}
},
_ => unreachable!("not a client_character_screen msg"),
}
Ok(())
}
}
/// This system will handle new messages from clients
pub struct Sys;
impl<'a> System<'a> for Sys {
#[allow(clippy::type_complexity)]
type SystemData = (
Entities<'a>,
Read<'a, EventBus<ServerEvent>>,
ReadExpect<'a, CharacterLoader>,
Write<'a, SysTimer<Self>>,
ReadStorage<'a, Uid>,
ReadStorage<'a, Client>,
ReadStorage<'a, Player>,
ReadStorage<'a, Presence>,
ReadExpect<'a, EditableSettings>,
ReadExpect<'a, AliasValidator>,
);
fn run(
&mut self,
(
entities,
server_event_bus,
character_loader,
mut timer,
uids,
clients,
players,
presences,
editable_settings,
alias_validator,
): Self::SystemData,
) {
span!(_guard, "run", "msg::character_screen::Sys::run");
timer.start();
let mut server_emitter = server_event_bus.emitter();
let mut new_chat_msgs = Vec::new();
for (entity, client) in (&entities, &clients).join() {
let _ = super::try_recv_all(client, 1, |client, msg| {
Self::handle_client_character_screen_msg(
&mut server_emitter,
&mut new_chat_msgs,
entity,
client,
&character_loader,
&uids,
&players,
&presences,
&editable_settings,
&alias_validator,
msg,
)
});
}
// Handle new chat messages.
for (entity, msg) in new_chat_msgs {
// Handle chat commands.
if msg.message.starts_with('/') {
if let (Some(entity), true) = (entity, msg.message.len() > 1) {
let argv = String::from(&msg.message[1..]);
server_emitter.emit(ServerEvent::ChatCmd(entity, argv));
}
} else {
// Send chat message
server_emitter.emit(ServerEvent::Chat(msg));
}
}
timer.end()
}
}

View File

@ -0,0 +1,137 @@
use super::super::SysTimer;
use crate::{client::Client, metrics::PlayerMetrics};
use common::{
comp::{ChatMode, Player, UnresolvedChatMsg},
event::{EventBus, ServerEvent},
msg::{validate_chat_msg, ChatMsgValidationError, ClientGeneral, MAX_BYTES_CHAT_MSG},
span,
state::Time,
sync::Uid,
};
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write};
use std::sync::atomic::Ordering;
use tracing::{debug, error, warn};
impl Sys {
#[allow(clippy::too_many_arguments)]
fn handle_general_msg(
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
new_chat_msgs: &mut Vec<(Option<specs::Entity>, UnresolvedChatMsg)>,
entity: specs::Entity,
client: &Client,
player: Option<&Player>,
player_metrics: &ReadExpect<'_, PlayerMetrics>,
uids: &ReadStorage<'_, Uid>,
chat_modes: &ReadStorage<'_, ChatMode>,
msg: ClientGeneral,
) -> Result<(), crate::error::Error> {
match msg {
ClientGeneral::ChatMsg(message) => {
if player.is_some() {
match validate_chat_msg(&message) {
Ok(()) => {
if let Some(from) = uids.get(entity) {
let mode = chat_modes.get(entity).cloned().unwrap_or_default();
let msg = mode.new_message(*from, message);
new_chat_msgs.push((Some(entity), msg));
} else {
error!("Could not send message. Missing player uid");
}
},
Err(ChatMsgValidationError::TooLong) => {
let max = MAX_BYTES_CHAT_MSG;
let len = message.len();
warn!(?len, ?max, "Received a chat message that's too long")
},
}
}
},
ClientGeneral::Terminate => {
debug!(?entity, "Client send message to termitate session");
player_metrics
.clients_disconnected
.with_label_values(&["gracefully"])
.inc();
client.terminate_msg_recv.store(true, Ordering::Relaxed);
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
},
_ => unreachable!("not a client_general msg"),
}
Ok(())
}
}
/// This system will handle new messages from clients
pub struct Sys;
impl<'a> System<'a> for Sys {
#[allow(clippy::type_complexity)]
type SystemData = (
Entities<'a>,
Read<'a, EventBus<ServerEvent>>,
Read<'a, Time>,
ReadExpect<'a, PlayerMetrics>,
Write<'a, SysTimer<Self>>,
ReadStorage<'a, Uid>,
ReadStorage<'a, ChatMode>,
ReadStorage<'a, Player>,
ReadStorage<'a, Client>,
);
fn run(
&mut self,
(
entities,
server_event_bus,
time,
player_metrics,
mut timer,
uids,
chat_modes,
players,
clients,
): Self::SystemData,
) {
span!(_guard, "run", "msg::general::Sys::run");
timer.start();
let mut server_emitter = server_event_bus.emitter();
let mut new_chat_msgs = Vec::new();
for (entity, client, player) in (&entities, &clients, (&players).maybe()).join() {
let res = super::try_recv_all(client, 3, |client, msg| {
Self::handle_general_msg(
&mut server_emitter,
&mut new_chat_msgs,
entity,
client,
player,
&player_metrics,
&uids,
&chat_modes,
msg,
)
});
if let Ok(1_u64..=u64::MAX) = res {
// Update client ping.
*client.last_ping.lock().unwrap() = time.0
}
}
// Handle new chat messages.
for (entity, msg) in new_chat_msgs {
// Handle chat commands.
if msg.message.starts_with('/') {
if let (Some(entity), true) = (entity, msg.message.len() > 1) {
let argv = String::from(&msg.message[1..]);
server_emitter.emit(ServerEvent::ChatCmd(entity, argv));
}
} else {
// Send chat message
server_emitter.emit(ServerEvent::Chat(msg));
}
}
timer.end()
}
}

View File

@ -0,0 +1,242 @@
use super::super::SysTimer;
use crate::{client::Client, metrics::NetworkRequestMetrics, presence::Presence, Settings};
use common::{
comp::{CanBuild, ControlEvent, Controller, ForceUpdate, Ori, Pos, Stats, Vel},
event::{EventBus, ServerEvent},
msg::{ClientGeneral, PresenceKind, ServerGeneral},
span,
state::BlockChange,
terrain::{TerrainChunkSize, TerrainGrid},
vol::{ReadVol, RectVolSize},
};
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage};
use tracing::{debug, trace};
impl Sys {
#[allow(clippy::too_many_arguments)]
fn handle_client_in_game_msg(
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
entity: specs::Entity,
client: &Client,
maybe_presence: &mut Option<&mut Presence>,
terrain: &ReadExpect<'_, TerrainGrid>,
network_metrics: &ReadExpect<'_, NetworkRequestMetrics>,
can_build: &ReadStorage<'_, CanBuild>,
force_updates: &ReadStorage<'_, ForceUpdate>,
stats: &mut WriteStorage<'_, Stats>,
block_changes: &mut Write<'_, BlockChange>,
positions: &mut WriteStorage<'_, Pos>,
velocities: &mut WriteStorage<'_, Vel>,
orientations: &mut WriteStorage<'_, Ori>,
controllers: &mut WriteStorage<'_, Controller>,
settings: &Read<'_, Settings>,
msg: ClientGeneral,
) -> Result<(), crate::error::Error> {
let presence = match maybe_presence {
Some(g) => g,
None => {
debug!(?entity, "client is not in_game, ignoring msg");
trace!(?msg, "ignored msg content");
if matches!(msg, ClientGeneral::TerrainChunkRequest{ .. }) {
network_metrics.chunks_request_dropped.inc();
}
return Ok(());
},
};
match msg {
// Go back to registered state (char selection screen)
ClientGeneral::ExitInGame => {
server_emitter.emit(ServerEvent::ExitIngame { entity });
client.send(ServerGeneral::ExitInGameSuccess)?;
*maybe_presence = None;
},
ClientGeneral::SetViewDistance(view_distance) => {
presence.view_distance = settings
.max_view_distance
.map(|max| view_distance.min(max))
.unwrap_or(view_distance);
//correct client if its VD is to high
if settings
.max_view_distance
.map(|max| view_distance > max)
.unwrap_or(false)
{
client.send(ServerGeneral::SetViewDistance(
settings.max_view_distance.unwrap_or(0),
))?;
}
},
ClientGeneral::ControllerInputs(inputs) => {
if matches!(presence.kind, PresenceKind::Character(_)) {
if let Some(controller) = controllers.get_mut(entity) {
controller.inputs.update_with_new(inputs);
}
}
},
ClientGeneral::ControlEvent(event) => {
if matches!(presence.kind, PresenceKind::Character(_)) {
// Skip respawn if client entity is alive
if let ControlEvent::Respawn = event {
if stats.get(entity).map_or(true, |s| !s.is_dead) {
//Todo: comment why return!
return Ok(());
}
}
if let Some(controller) = controllers.get_mut(entity) {
controller.events.push(event);
}
}
},
ClientGeneral::ControlAction(event) => {
if matches!(presence.kind, PresenceKind::Character(_)) {
if let Some(controller) = controllers.get_mut(entity) {
controller.actions.push(event);
}
}
},
ClientGeneral::PlayerPhysics { pos, vel, ori } => {
if matches!(presence.kind, PresenceKind::Character(_))
&& force_updates.get(entity).is_none()
&& stats.get(entity).map_or(true, |s| !s.is_dead)
{
let _ = positions.insert(entity, pos);
let _ = velocities.insert(entity, vel);
let _ = orientations.insert(entity, ori);
}
},
ClientGeneral::BreakBlock(pos) => {
if let Some(block) = can_build.get(entity).and_then(|_| terrain.get(pos).ok()) {
block_changes.set(pos, block.into_vacant());
}
},
ClientGeneral::PlaceBlock(pos, block) => {
if can_build.get(entity).is_some() {
block_changes.try_set(pos, block);
}
},
ClientGeneral::TerrainChunkRequest { key } => {
let in_vd = if let Some(pos) = positions.get(entity) {
pos.0.xy().map(|e| e as f64).distance(
key.map(|e| e as f64 + 0.5) * TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
) < (presence.view_distance as f64 - 1.0 + 2.5 * 2.0_f64.sqrt())
* TerrainChunkSize::RECT_SIZE.x as f64
} else {
true
};
if in_vd {
match terrain.get_key(key) {
Some(chunk) => {
network_metrics.chunks_served_from_memory.inc();
client.send(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
})?
},
None => {
network_metrics.chunks_generation_triggered.inc();
server_emitter.emit(ServerEvent::ChunkRequest(entity, key))
},
}
} else {
network_metrics.chunks_request_dropped.inc();
}
},
ClientGeneral::UnlockSkill(skill) => {
stats
.get_mut(entity)
.map(|s| s.skill_set.unlock_skill(skill));
},
ClientGeneral::RefundSkill(skill) => {
stats
.get_mut(entity)
.map(|s| s.skill_set.refund_skill(skill));
},
ClientGeneral::UnlockSkillGroup(skill_group_type) => {
stats
.get_mut(entity)
.map(|s| s.skill_set.unlock_skill_group(skill_group_type));
},
_ => unreachable!("not a client_in_game msg"),
}
Ok(())
}
}
/// This system will handle new messages from clients
pub struct Sys;
impl<'a> System<'a> for Sys {
#[allow(clippy::type_complexity)]
type SystemData = (
Entities<'a>,
Read<'a, EventBus<ServerEvent>>,
ReadExpect<'a, TerrainGrid>,
ReadExpect<'a, NetworkRequestMetrics>,
Write<'a, SysTimer<Self>>,
ReadStorage<'a, CanBuild>,
ReadStorage<'a, ForceUpdate>,
WriteStorage<'a, Stats>,
Write<'a, BlockChange>,
WriteStorage<'a, Pos>,
WriteStorage<'a, Vel>,
WriteStorage<'a, Ori>,
WriteStorage<'a, Presence>,
WriteStorage<'a, Client>,
WriteStorage<'a, Controller>,
Read<'a, Settings>,
);
fn run(
&mut self,
(
entities,
server_event_bus,
terrain,
network_metrics,
mut timer,
can_build,
force_updates,
mut stats,
mut block_changes,
mut positions,
mut velocities,
mut orientations,
mut presences,
mut clients,
mut controllers,
settings,
): Self::SystemData,
) {
span!(_guard, "run", "msg::in_game::Sys::run");
timer.start();
let mut server_emitter = server_event_bus.emitter();
for (entity, client, mut maybe_presence) in
(&entities, &mut clients, (&mut presences).maybe()).join()
{
let _ = super::try_recv_all(client, 2, |client, msg| {
Self::handle_client_in_game_msg(
&mut server_emitter,
entity,
client,
&mut maybe_presence,
&terrain,
&network_metrics,
&can_build,
&force_updates,
&mut stats,
&mut block_changes,
&mut positions,
&mut velocities,
&mut orientations,
&mut controllers,
&settings,
msg,
)
});
}
timer.end()
}
}

33
server/src/sys/msg/mod.rs Normal file
View File

@ -0,0 +1,33 @@
pub mod character_screen;
pub mod general;
pub mod in_game;
pub mod ping;
pub mod register;
use crate::client::Client;
use serde::de::DeserializeOwned;
/// handles all send msg and calls a handle fn
/// Aborts when a error occurred returns cnt of successful msg otherwise
pub(in crate::sys::msg) fn try_recv_all<M, F>(
client: &Client,
stream_id: u8,
mut f: F,
) -> Result<u64, crate::error::Error>
where
M: DeserializeOwned,
F: FnMut(&Client, M) -> Result<(), crate::error::Error>,
{
let mut cnt = 0u64;
loop {
let msg = match client.recv(stream_id) {
Ok(Some(msg)) => msg,
Ok(None) => break Ok(cnt),
Err(e) => break Err(e.into()),
};
if let Err(e) = f(client, msg) {
break Err(e);
}
cnt += 1;
}
}

View File

@ -0,0 +1,95 @@
use super::super::SysTimer;
use crate::{client::Client, metrics::PlayerMetrics, Settings};
use common::{
event::{EventBus, ServerEvent},
msg::PingMsg,
span,
state::Time,
};
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, System, Write};
use std::sync::atomic::Ordering;
use tracing::{debug, info};
impl Sys {
fn handle_ping_msg(client: &Client, msg: PingMsg) -> Result<(), crate::error::Error> {
match msg {
PingMsg::Ping => client.send(PingMsg::Pong)?,
PingMsg::Pong => {},
}
Ok(())
}
}
/// This system will handle new messages from clients
pub struct Sys;
impl<'a> System<'a> for Sys {
#[allow(clippy::type_complexity)]
type SystemData = (
Entities<'a>,
Read<'a, EventBus<ServerEvent>>,
Read<'a, Time>,
ReadExpect<'a, PlayerMetrics>,
Write<'a, SysTimer<Self>>,
ReadStorage<'a, Client>,
Read<'a, Settings>,
);
fn run(
&mut self,
(
entities,
server_event_bus,
time,
player_metrics,
mut timer,
clients,
settings,
): Self::SystemData,
) {
span!(_guard, "run", "msg::ping::Sys::run");
timer.start();
let mut server_emitter = server_event_bus.emitter();
for (entity, client) in (&entities, &clients).join() {
let res = super::try_recv_all(client, 4, Self::handle_ping_msg);
match res {
Err(e) => {
if !client.terminate_msg_recv.load(Ordering::Relaxed) {
debug!(?entity, ?e, "network error with client, disconnecting");
player_metrics
.clients_disconnected
.with_label_values(&["network_error"])
.inc();
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
}
},
Ok(1_u64..=u64::MAX) => {
// Update client ping.
*client.last_ping.lock().unwrap() = time.0
},
Ok(0) => {
let last_ping: f64 = *client.last_ping.lock().unwrap();
if time.0 - last_ping > settings.client_timeout.as_secs() as f64
// Timeout
{
if !client.terminate_msg_recv.load(Ordering::Relaxed) {
info!(?entity, "timeout error with client, disconnecting");
player_metrics
.clients_disconnected
.with_label_values(&["timeout"])
.inc();
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
}
} else if time.0 - last_ping > settings.client_timeout.as_secs() as f64 * 0.5 {
// Try pinging the client if the timeout is nearing.
client.send_fallible(PingMsg::Ping);
}
},
}
}
timer.end()
}
}

View File

@ -0,0 +1,173 @@
use super::super::SysTimer;
use crate::{
client::Client, login_provider::LoginProvider, metrics::PlayerMetrics, EditableSettings,
};
use common::{
comp::{Admin, Player, Stats},
msg::{
CharacterInfo, ClientRegister, PlayerInfo, PlayerListUpdate, RegisterError, ServerGeneral,
ServerRegisterAnswer,
},
span,
sync::Uid,
};
use hashbrown::HashMap;
use specs::{Entities, Join, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage};
impl Sys {
#[allow(clippy::too_many_arguments)]
fn handle_register_msg(
player_list: &HashMap<Uid, PlayerInfo>,
new_players: &mut Vec<specs::Entity>,
entity: specs::Entity,
client: &Client,
player_metrics: &ReadExpect<'_, PlayerMetrics>,
login_provider: &mut WriteExpect<'_, LoginProvider>,
admins: &mut WriteStorage<'_, Admin>,
players: &mut WriteStorage<'_, Player>,
editable_settings: &ReadExpect<'_, EditableSettings>,
msg: ClientRegister,
) -> Result<(), crate::error::Error> {
let (username, uuid) = match login_provider.try_login(
&msg.token_or_username,
&*editable_settings.admins,
&*editable_settings.whitelist,
&*editable_settings.banlist,
) {
Err(err) => {
client.send(ServerRegisterAnswer::Err(err))?;
return Ok(());
},
Ok((username, uuid)) => (username, uuid),
};
let player = Player::new(username, uuid);
let is_admin = editable_settings.admins.contains(&uuid);
if !player.is_valid() {
// Invalid player
client.send(ServerRegisterAnswer::Err(RegisterError::InvalidCharacter))?;
return Ok(());
}
if !players.contains(entity) {
// Add Player component to this client
let _ = players.insert(entity, player);
player_metrics.players_connected.inc();
// Give the Admin component to the player if their name exists in
// admin list
if is_admin {
let _ = admins.insert(entity, Admin);
}
// Tell the client its request was successful.
client.send(ServerRegisterAnswer::Ok(()))?;
// Send initial player list
client.send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init(
player_list.clone(),
)))?;
// Add to list to notify all clients of the new player
new_players.push(entity);
}
Ok(())
}
}
/// This system will handle new messages from clients
pub struct Sys;
impl<'a> System<'a> for Sys {
#[allow(clippy::type_complexity)]
type SystemData = (
Entities<'a>,
ReadExpect<'a, PlayerMetrics>,
Write<'a, SysTimer<Self>>,
ReadStorage<'a, Uid>,
ReadStorage<'a, Client>,
WriteStorage<'a, Player>,
ReadStorage<'a, Stats>,
WriteExpect<'a, LoginProvider>,
WriteStorage<'a, Admin>,
ReadExpect<'a, EditableSettings>,
);
fn run(
&mut self,
(
entities,
player_metrics,
mut timer,
uids,
clients,
mut players,
stats,
mut login_provider,
mut admins,
editable_settings,
): Self::SystemData,
) {
span!(_guard, "run", "msg::register::Sys::run");
timer.start();
// Player list to send new players.
let player_list = (&uids, &players, stats.maybe(), admins.maybe())
.join()
.map(|(uid, player, stats, admin)| {
(*uid, PlayerInfo {
is_online: true,
is_admin: admin.is_some(),
player_alias: player.alias.clone(),
character: stats.map(|stats| CharacterInfo {
name: stats.name.clone(),
level: stats.level.level(),
}),
})
})
.collect::<HashMap<_, _>>();
// List of new players to update player lists of all clients.
let mut new_players = Vec::new();
for (entity, client) in (&entities, &clients).join() {
let _ = super::try_recv_all(client, 0, |client, msg| {
Self::handle_register_msg(
&player_list,
&mut new_players,
entity,
client,
&player_metrics,
&mut login_provider,
&mut admins,
&mut players,
&editable_settings,
msg,
)
});
}
// Handle new players.
// Tell all clients to add them to the player list.
for entity in new_players {
if let (Some(uid), Some(player)) = (uids.get(entity), players.get(entity)) {
let mut lazy_msg = None;
for (_, client) in (&players, &clients).join() {
if lazy_msg.is_none() {
lazy_msg = Some(client.prepare(ServerGeneral::PlayerListUpdate(
PlayerListUpdate::Add(*uid, PlayerInfo {
player_alias: player.alias.clone(),
is_online: true,
is_admin: admins.get(entity).is_some(),
character: None, // new players will be on character select.
}),
)));
}
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));
}
}
}
timer.end()
}
}

View File

@ -1,9 +1,11 @@
use crate::{
persistence::character_updater,
presence::Presence,
sys::{SysScheduler, SysTimer},
};
use common::{
comp::{Inventory, Loadout, Player, Stats},
comp::{Inventory, Loadout, Stats},
msg::PresenceKind,
span,
};
use specs::{Join, ReadExpect, ReadStorage, System, Write};
@ -13,7 +15,7 @@ pub struct Sys;
impl<'a> System<'a> for Sys {
#[allow(clippy::type_complexity)] // TODO: Pending review in #587
type SystemData = (
ReadStorage<'a, Player>,
ReadStorage<'a, Presence>,
ReadStorage<'a, Stats>,
ReadStorage<'a, Inventory>,
ReadStorage<'a, Loadout>,
@ -25,7 +27,7 @@ impl<'a> System<'a> for Sys {
fn run(
&mut self,
(
players,
presences,
player_stats,
player_inventories,
player_loadouts,
@ -39,17 +41,18 @@ impl<'a> System<'a> for Sys {
timer.start();
updater.batch_update(
(
&players,
&presences,
&player_stats,
&player_inventories,
&player_loadouts,
)
.join()
.filter_map(|(player, stats, inventory, loadout)| {
player
.character_id
.map(|id| (id, stats, inventory, loadout))
}),
.filter_map(
|(presence, stats, inventory, loadout)| match presence.kind {
PresenceKind::Character(id) => Some((id, stats, inventory, loadout)),
PresenceKind::Spectator => None,
},
),
);
timer.end();
}

View File

@ -2,9 +2,12 @@ use super::{
sentinel::{DeletedEntities, TrackedComps},
SysTimer,
};
use crate::client::{self, Client, RegionSubscription};
use crate::{
client::Client,
presence::{self, Presence, RegionSubscription},
};
use common::{
comp::{Ori, Player, Pos, Vel},
comp::{Ori, Pos, Vel},
msg::ServerGeneral,
region::{region_in_vd, regions_in_vd, Event as RegionEvent, RegionMap},
span,
@ -31,8 +34,8 @@ impl<'a> System<'a> for Sys {
ReadStorage<'a, Pos>,
ReadStorage<'a, Vel>,
ReadStorage<'a, Ori>,
ReadStorage<'a, Player>,
WriteStorage<'a, Client>,
ReadStorage<'a, Presence>,
ReadStorage<'a, Client>,
WriteStorage<'a, RegionSubscription>,
Write<'a, DeletedEntities>,
TrackedComps<'a>,
@ -49,8 +52,8 @@ impl<'a> System<'a> for Sys {
positions,
velocities,
orientations,
players,
mut clients,
presences,
clients,
mut subscriptions,
mut deleted_entities,
tracked_comps,
@ -71,22 +74,16 @@ impl<'a> System<'a> for Sys {
// 7. Determine list of regions that are in range and iterate through it
// - check if in hashset (hash calc) if not add it
let mut regions_to_remove = Vec::new();
for (client, subscription, pos, vd, client_entity) in (
&mut clients,
for (subscription, pos, presence, client_entity, client) in (
&mut subscriptions,
&positions,
&players,
&presences,
&entities,
&clients,
)
.join()
.filter_map(|(client, s, pos, player, e)| {
if client.in_game.is_some() {
player.view_distance.map(|v| (client, s, pos, v, e))
} else {
None
}
})
{
let vd = presence.view_distance;
// Calculate current chunk
let chunk = (Vec2::<f32>::from(pos.0))
.map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32);
@ -101,7 +98,7 @@ impl<'a> System<'a> for Sys {
})
- Vec2::from(pos.0))
.map2(TerrainChunkSize::RECT_SIZE, |e, sz| {
e.abs() > (sz / 2 + client::CHUNK_FUZZ) as f32
e.abs() > (sz / 2 + presence::CHUNK_FUZZ) as f32
})
.reduce_or()
{
@ -117,7 +114,9 @@ impl<'a> System<'a> for Sys {
*key,
pos.0,
(vd as f32 * chunk_size)
+ (client::CHUNK_FUZZ as f32 + client::REGION_FUZZ as f32 + chunk_size)
+ (presence::CHUNK_FUZZ as f32
+ presence::REGION_FUZZ as f32
+ chunk_size)
* 2.0f32.sqrt(),
) {
// Add to the list of regions to remove
@ -153,7 +152,7 @@ impl<'a> System<'a> for Sys {
.map(|key| subscription.regions.contains(key))
.unwrap_or(false)
{
client.send_msg(ServerGeneral::DeleteEntity(uid));
client.send_fallible(ServerGeneral::DeleteEntity(uid));
}
}
},
@ -161,7 +160,7 @@ impl<'a> System<'a> for Sys {
}
// Tell client to delete entities in the region
for (&uid, _) in (&uids, region.entities()).join() {
client.send_msg(ServerGeneral::DeleteEntity(uid));
client.send_fallible(ServerGeneral::DeleteEntity(uid));
}
}
// Send deleted entities since they won't be processed for this client in entity
@ -171,14 +170,14 @@ impl<'a> System<'a> for Sys {
.iter()
.flat_map(|v| v.iter())
{
client.send_msg(ServerGeneral::DeleteEntity(Uid(*uid)));
client.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid)));
}
}
for key in regions_in_vd(
pos.0,
(vd as f32 * chunk_size)
+ (client::CHUNK_FUZZ as f32 + chunk_size) * 2.0f32.sqrt(),
+ (presence::CHUNK_FUZZ as f32 + chunk_size) * 2.0f32.sqrt(),
) {
// Send client initial info about the entities in this region if it was not
// already within the set of subscribed regions
@ -196,7 +195,7 @@ impl<'a> System<'a> for Sys {
{
// Send message to create entity and tracked components and physics
// components
client.send_msg(ServerGeneral::CreateEntity(
client.send_fallible(ServerGeneral::CreateEntity(
tracked_comps.create_entity_package(
entity,
Some(*pos),
@ -217,22 +216,18 @@ impl<'a> System<'a> for Sys {
/// Initialize region subscription
pub fn initialize_region_subscription(world: &World, entity: specs::Entity) {
if let (Some(client_pos), Some(client_vd), Some(client)) = (
if let (Some(client_pos), Some(presence), Some(client)) = (
world.read_storage::<Pos>().get(entity),
world
.read_storage::<Player>()
.get(entity)
.map(|pl| pl.view_distance)
.and_then(|v| v),
world.write_storage::<Client>().get_mut(entity),
world.read_storage::<Presence>().get(entity),
world.write_storage::<Client>().get(entity),
) {
let fuzzy_chunk = (Vec2::<f32>::from(client_pos.0))
.map2(TerrainChunkSize::RECT_SIZE, |e, sz| e as i32 / sz as i32);
let chunk_size = TerrainChunkSize::RECT_SIZE.reduce_max() as f32;
let regions = common::region::regions_in_vd(
client_pos.0,
(client_vd as f32 * chunk_size) as f32
+ (client::CHUNK_FUZZ as f32 + chunk_size) * 2.0f32.sqrt(),
(presence.view_distance as f32 * chunk_size) as f32
+ (presence::CHUNK_FUZZ as f32 + chunk_size) * 2.0f32.sqrt(),
);
let region_map = world.read_resource::<RegionMap>();
@ -249,7 +244,7 @@ pub fn initialize_region_subscription(world: &World, entity: specs::Entity) {
.join()
{
// Send message to create entity and tracked components and physics components
client.send_msg(ServerGeneral::CreateEntity(
client.send_fallible(ServerGeneral::CreateEntity(
tracked_comps.create_entity_package(
entity,
Some(*pos),

View File

@ -1,7 +1,7 @@
use super::SysTimer;
use crate::{chunk_generator::ChunkGenerator, client::Client, Tick};
use crate::{chunk_generator::ChunkGenerator, client::Client, presence::Presence, Tick};
use common::{
comp::{self, bird_medium, Alignment, Player, Pos},
comp::{self, bird_medium, Alignment, Pos},
event::{EventBus, ServerEvent},
generation::get_npc_name,
msg::ServerGeneral,
@ -12,7 +12,7 @@ use common::{
LoadoutBuilder,
};
use rand::Rng;
use specs::{Join, Read, ReadStorage, System, Write, WriteExpect, WriteStorage};
use specs::{Join, Read, ReadStorage, System, Write, WriteExpect};
use std::sync::Arc;
use vek::*;
@ -33,8 +33,8 @@ impl<'a> System<'a> for Sys {
WriteExpect<'a, TerrainGrid>,
Write<'a, TerrainChanges>,
ReadStorage<'a, Pos>,
ReadStorage<'a, Player>,
WriteStorage<'a, Client>,
ReadStorage<'a, Presence>,
ReadStorage<'a, Client>,
);
fn run(
@ -47,8 +47,8 @@ impl<'a> System<'a> for Sys {
mut terrain,
mut terrain_changes,
positions,
players,
mut clients,
presences,
clients,
): Self::SystemData,
) {
span!(_guard, "run", "terrain::Sys::run");
@ -62,8 +62,8 @@ impl<'a> System<'a> for Sys {
let (chunk, supplement) = match res {
Ok((chunk, supplement)) => (chunk, supplement),
Err(Some(entity)) => {
if let Some(client) = clients.get_mut(entity) {
client.send_msg(ServerGeneral::TerrainChunkUpdate {
if let Some(client) = clients.get(entity) {
client.send_fallible(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Err(()),
});
@ -75,12 +75,7 @@ impl<'a> System<'a> for Sys {
},
};
// Send the chunk to all nearby players.
for (view_distance, pos, client) in (&players, &positions, &mut clients)
.join()
.filter_map(|(player, pos, client)| {
player.view_distance.map(|vd| (vd, pos, client))
})
{
for (presence, pos, client) in (&presences, &positions, &clients).join() {
let chunk_pos = terrain.pos_key(pos.0.map(|e| e as i32));
// Subtract 2 from the offset before computing squared magnitude
// 1 since chunks need neighbors to be meshed
@ -89,8 +84,8 @@ impl<'a> System<'a> for Sys {
.map(|e: i32| (e.abs() as u32).saturating_sub(2))
.magnitude_squared();
if adjusted_dist_sqr <= view_distance.pow(2) {
client.send_msg(ServerGeneral::TerrainChunkUpdate {
if adjusted_dist_sqr <= presence.view_distance.pow(2) {
client.send_fallible(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
});
@ -206,12 +201,8 @@ impl<'a> System<'a> for Sys {
let mut should_drop = true;
// For each player with a position, calculate the distance.
for (player, pos) in (&players, &positions).join() {
if player
.view_distance
.map(|vd| chunk_in_vd(pos.0, chunk_key, &terrain, vd))
.unwrap_or(false)
{
for (presence, pos) in (&presences, &positions).join() {
if chunk_in_vd(pos.0, chunk_key, &terrain, presence.view_distance) {
should_drop = false;
break;
}

View File

@ -1,13 +1,7 @@
use super::SysTimer;
use crate::client::Client;
use common::{
comp::{Player, Pos},
msg::ServerGeneral,
span,
state::TerrainChanges,
terrain::TerrainGrid,
};
use specs::{Join, Read, ReadExpect, ReadStorage, System, Write, WriteStorage};
use crate::{client::Client, presence::Presence};
use common::{comp::Pos, msg::ServerGeneral, span, state::TerrainChanges, terrain::TerrainGrid};
use specs::{Join, Read, ReadExpect, ReadStorage, System, Write};
/// This systems sends new chunks to clients as well as changes to existing
/// chunks
@ -19,43 +13,48 @@ impl<'a> System<'a> for Sys {
Read<'a, TerrainChanges>,
Write<'a, SysTimer<Self>>,
ReadStorage<'a, Pos>,
ReadStorage<'a, Player>,
WriteStorage<'a, Client>,
ReadStorage<'a, Presence>,
ReadStorage<'a, Client>,
);
fn run(
&mut self,
(terrain, terrain_changes, mut timer, positions, players, mut clients): Self::SystemData,
(terrain, terrain_changes, mut timer, positions, presences, clients): Self::SystemData,
) {
span!(_guard, "run", "terrain_sync::Sys::run");
timer.start();
// Sync changed chunks
'chunk: for chunk_key in &terrain_changes.modified_chunks {
for (player, pos, client) in (&players, &positions, &mut clients).join() {
if player
.view_distance
.map(|vd| super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, vd))
.unwrap_or(false)
let mut lazy_msg = None;
for (presence, pos, client) in (&presences, &positions, &clients).join() {
if super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, presence.view_distance)
{
client.send_msg(ServerGeneral::TerrainChunkUpdate {
key: *chunk_key,
chunk: Ok(Box::new(match terrain.get_key(*chunk_key) {
Some(chunk) => chunk.clone(),
None => break 'chunk,
})),
});
if lazy_msg.is_none() {
lazy_msg = Some(client.prepare(ServerGeneral::TerrainChunkUpdate {
key: *chunk_key,
chunk: Ok(Box::new(match terrain.get_key(*chunk_key) {
Some(chunk) => chunk.clone(),
None => break 'chunk,
})),
}));
}
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));
}
}
}
// TODO: Don't send all changed blocks to all clients
// Sync changed blocks
let msg = ServerGeneral::TerrainBlockUpdates(terrain_changes.modified_blocks.clone());
for (player, client) in (&players, &mut clients).join() {
if player.view_distance.is_some() {
client.send_msg(msg.clone());
let mut lazy_msg = None;
for (_, client) in (&presences, &clients).join() {
if lazy_msg.is_none() {
lazy_msg = Some(client.prepare(ServerGeneral::TerrainBlockUpdates(
terrain_changes.modified_blocks.clone(),
)));
}
lazy_msg.as_ref().map(|ref msg| client.send_prepared(&msg));
}
timer.end();

View File

@ -22,28 +22,36 @@ impl<'a> System<'a> for Sys {
ReadStorage<'a, Player>,
ReadStorage<'a, WaypointArea>,
WriteStorage<'a, Waypoint>,
WriteStorage<'a, Client>,
ReadStorage<'a, Client>,
Read<'a, Time>,
Write<'a, SysTimer<Self>>,
);
fn run(
&mut self,
(entities, positions, players, waypoint_areas, mut waypoints, mut clients, time, mut timer): Self::SystemData,
(
entities,
positions,
players,
waypoint_areas,
mut waypoints,
clients,
time,
mut timer,
): Self::SystemData,
) {
span!(_guard, "run", "waypoint::Sys::run");
timer.start();
for (entity, player_pos, _, client) in
(&entities, &positions, &players, &mut clients).join()
{
for (entity, player_pos, _, client) in (&entities, &positions, &players, &clients).join() {
for (waypoint_pos, waypoint_area) in (&positions, &waypoint_areas).join() {
if player_pos.0.distance_squared(waypoint_pos.0) < waypoint_area.radius().powi(2) {
if let Ok(wp_old) = waypoints.insert(entity, Waypoint::new(player_pos.0, *time))
{
if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) {
client
.send_msg(ServerGeneral::Notification(Notification::WaypointSaved));
client.send_fallible(ServerGeneral::Notification(
Notification::WaypointSaved,
));
}
}
}

View File

@ -64,6 +64,7 @@ use common::{
item::{ItemDesc, Quality},
BuffKind,
},
msg::PresenceKind,
span,
sync::Uid,
terrain::TerrainChunk,
@ -658,7 +659,12 @@ impl Hud {
let server = &client.server_info.name;
// Get the id, unwrap is safe because this CANNOT be None at this
// point.
let character_id = client.active_character_id.unwrap();
let character_id = match client.presence().unwrap() {
PresenceKind::Character(id) => id,
PresenceKind::Spectator => unreachable!("HUD creation in Spectator mode!"),
};
// Create a new HotbarState from the persisted slots.
let hotbar_state =
HotbarState::new(global_state.profile.get_hotbar_slots(server, character_id));

View File

@ -61,11 +61,11 @@ impl PlayState for CharSelectionState {
fn tick(&mut self, global_state: &mut GlobalState, events: Vec<WinEvent>) -> PlayStateResult {
span!(_guard, "tick", "<CharSelectionState as PlayState>::tick");
let (client_in_game, client_registered) = {
let (client_presence, client_registered) = {
let client = self.client.borrow();
(client.in_game(), client.registered())
(client.presence(), client.registered())
};
if client_in_game.is_none() && client_registered {
if client_presence.is_none() && client_registered {
// Handle window events
for event in events {
if self.char_selection_ui.handle_event(event.clone()) {

View File

@ -18,6 +18,7 @@ use common::{
comp::{ChatMsg, ChatType, InventoryUpdateEvent, Pos, Vel},
consts::{MAX_MOUNT_RANGE, MAX_PICKUP_RANGE},
event::EventBus,
msg::PresenceKind,
outcome::Outcome,
span,
terrain::{Block, BlockKind},
@ -211,11 +212,11 @@ impl PlayState for SessionState {
));
// TODO: can this be a method on the session or are there borrowcheck issues?
let (client_in_game, client_registered) = {
let (client_presence, client_registered) = {
let client = self.client.borrow();
(client.in_game(), client.registered())
(client.presence(), client.registered())
};
if client_in_game.is_some() {
if client_presence.is_some() {
// Update MyEntity
// Note: Alternatively, the client could emit an event when the entity changes
// which may or may not be more elegant
@ -763,7 +764,10 @@ impl PlayState for SessionState {
HudEvent::CharacterSelection => {
self.client.borrow_mut().request_remove_character()
},
HudEvent::Logout => self.client.borrow_mut().request_logout(),
HudEvent::Logout => {
self.client.borrow_mut().logout();
return PlayStateResult::Pop;
},
HudEvent::Quit => {
return PlayStateResult::Shutdown;
},
@ -924,7 +928,12 @@ impl PlayState for SessionState {
let server = &client.server_info.name;
// If we are changing the hotbar state this CANNOT be None.
let character_id = client.active_character_id.unwrap();
let character_id = match client.presence().unwrap() {
PresenceKind::Character(id) => id,
PresenceKind::Spectator => {
unreachable!("HUD adaption in Spectator mode!")
},
};
// Get or update the ServerProfile.
global_state
@ -1077,7 +1086,7 @@ impl PlayState for SessionState {
self.cleanup();
PlayStateResult::Continue
} else if client_registered && client_in_game.is_none() {
} else if client_registered && client_presence.is_none() {
PlayStateResult::Switch(Box::new(CharSelectionState::new(
global_state,
Rc::clone(&self.client),