network layer switch, doing the stuff that might confict.

- mostly its the message handling put now in a async wrapper
 - add some fixes to pass without warnings and make clippy happy
 - some network error handling is ignored, this can be improved but is no blocker
This commit is contained in:
Marcel Märtens 2020-07-01 11:45:39 +02:00
parent 77c90b2c7c
commit 4e92c0160e
6 changed files with 627 additions and 554 deletions

View File

@ -1,5 +1,6 @@
use authc::AuthClientError;
use network::{NetworkError, ParticipantError, StreamError};
use network::{ParticipantError, StreamError};
pub use network::NetworkError;
#[derive(Debug)]
pub enum Error {

View File

@ -252,6 +252,7 @@ impl Client {
RegisterError::AlreadyLoggedIn => Error::AlreadyLoggedIn,
RegisterError::AuthError(err) => Error::AuthErr(err),
RegisterError::InvalidCharacter => Error::InvalidCharacter,
RegisterError::NotOnWhitelist => Error::NotOnWhitelist,
})
},
ServerMsg::StateAnswer(Ok(ClientState::Registered)) => break Ok(()),
@ -364,8 +365,7 @@ impl Client {
}
pub fn unmount(&mut self) {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::Unmount));
self.singleton_stream.send(ClientMsg::ControlEvent(ControlEvent::Unmount)).unwrap();
}
pub fn respawn(&mut self) {
@ -489,7 +489,7 @@ impl Client {
/// Send a chat message to the server.
pub fn send_chat(&mut self, message: String) {
match validate_chat_msg(&message) {
Ok(()) => self.postbox.send_message(ClientMsg::ChatMsg(message)),
Ok(()) => self.singleton_stream.send(ClientMsg::ChatMsg(message)).unwrap(),
Err(ChatMsgValidationError::TooLong) => tracing::warn!(
"Attempted to send a message that's too long (Over {} bytes)",
MAX_BYTES_CHAT_MSG
@ -737,8 +737,213 @@ impl Client {
self.state.cleanup();
}
/// Handle new server messages.
async fn handle_message(&mut self, frontend_events: &mut Vec<Event>, cnt: &mut u64) -> Result<(), Error> {
loop {
let msg = self.singleton_stream.recv().await?;
*cnt += 1;
match msg {
ServerMsg::TooManyPlayers => {
return Err(Error::ServerWentMad);
},
ServerMsg::Shutdown => return Err(Error::ServerShutdown),
ServerMsg::InitialSync { .. } => return Err(Error::ServerWentMad),
ServerMsg::PlayerListUpdate(PlayerListUpdate::Init(list)) => {
self.player_list = list
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Add(uid, player_info)) => {
if let Some(old_player_info) =
self.player_list.insert(uid, player_info.clone())
{
warn!(
"Received msg to insert {} with uid {} into the player list but \
there was already an entry for {} with the same uid that was \
overwritten!",
player_info.player_alias, uid, old_player_info.player_alias
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Admin(uid, admin)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.is_admin = admin;
} else {
warn!(
"Received msg to update admin status of uid {}, but they were not \
in the list.",
uid
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::SelectedCharacter(
uid,
char_info,
)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.character = Some(char_info);
} else {
warn!(
"Received msg to update character info for uid {}, but they were \
not in the list.",
uid
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::LevelChange(uid, next_level)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.character = match &player_info.character {
Some(character) => Some(common::msg::CharacterInfo {
name: character.name.to_string(),
level: next_level,
}),
None => {
warn!(
"Received msg to update character level info to {} for \
uid {}, but this player's character is None.",
next_level, uid
);
None
},
};
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Remove(uid)) => {
// Instead of removing players, mark them as offline because we need to
// remember the names of disconnected players in chat.
//
// TODO the server should re-use uids of players that log out and log back
// in.
if let Some(player_info) = self.player_list.get_mut(&uid) {
if player_info.is_online {
player_info.is_online = false;
} else {
warn!(
"Received msg to remove uid {} from the player list by they \
were already marked offline",
uid
);
}
} else {
warn!(
"Received msg to remove uid {} from the player list by they \
weren't in the list!",
uid
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Alias(uid, new_name)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.player_alias = new_name;
} else {
warn!(
"Received msg to alias player with uid {} to {} but this uid is \
not in the player list",
uid, new_name
);
}
},
ServerMsg::Ping => {self.singleton_stream.send(ClientMsg::Pong)?;},
ServerMsg::Pong => {
self.last_server_pong = self.state.get_time();
self.last_ping_delta =
(self.state.get_time() - self.last_server_ping).round();
},
ServerMsg::ChatMsg(m) => frontend_events.push(Event::Chat(m)),
ServerMsg::SetPlayerEntity(uid) => {
if let Some(entity) = self.state.ecs().entity_from_uid(uid.0) {
self.entity = entity;
} else {
return Err(Error::Other("Failed to find entity from uid.".to_owned()));
}
},
ServerMsg::TimeOfDay(time_of_day) => {
*self.state.ecs_mut().write_resource() = time_of_day;
},
ServerMsg::EntitySync(entity_sync_package) => {
self.state
.ecs_mut()
.apply_entity_sync_package(entity_sync_package);
},
ServerMsg::CompSync(comp_sync_package) => {
self.state
.ecs_mut()
.apply_comp_sync_package(comp_sync_package);
},
ServerMsg::CreateEntity(entity_package) => {
self.state.ecs_mut().apply_entity_package(entity_package);
},
ServerMsg::DeleteEntity(entity) => {
if self.state.read_component_cloned::<Uid>(self.entity) != Some(entity) {
self.state
.ecs_mut()
.delete_entity_and_clear_from_uid_allocator(entity.0);
}
},
// Cleanup for when the client goes back to the `Registered` state
ServerMsg::ExitIngameCleanup => {
self.clean_state();
},
ServerMsg::InventoryUpdate(inventory, event) => {
match event {
InventoryUpdateEvent::CollectFailed => {},
_ => {
// Push the updated inventory component to the client
self.state.write_component(self.entity, inventory);
},
}
frontend_events.push(Event::InventoryUpdated(event));
},
ServerMsg::TerrainChunkUpdate { key, chunk } => {
if let Ok(chunk) = chunk {
self.state.insert_chunk(key, *chunk);
}
self.pending_chunks.remove(&key);
},
ServerMsg::TerrainBlockUpdates(mut blocks) => {
blocks.drain().for_each(|(pos, block)| {
self.state.set_block(pos, block);
});
},
ServerMsg::StateAnswer(Ok(state)) => {
self.client_state = state;
},
ServerMsg::StateAnswer(Err((error, state))) => {
warn!(
"StateAnswer: {:?}. Server thinks client is in state {:?}.",
error, state
);
},
ServerMsg::Disconnect => {
frontend_events.push(Event::Disconnect);
self.singleton_stream.send(ClientMsg::Terminate)?;
},
ServerMsg::CharacterListUpdate(character_list) => {
self.character_list.characters = character_list;
self.character_list.loading = false;
},
ServerMsg::CharacterActionError(error) => {
warn!("CharacterActionError: {:?}.", error);
self.character_list.error = Some(error);
},
ServerMsg::Notification(n) => {
frontend_events.push(Event::Notification(n));
},
ServerMsg::CharacterDataLoadError(error) => {
self.clean_state();
self.character_list.error = Some(error);
},
ServerMsg::SetViewDistance(vd) => {
self.view_distance = Some(vd);
frontend_events.push(Event::SetViewDistance(vd));
},
}
}
}
/// Handle new server messages.
fn handle_new_messages(&mut self) -> Result<Vec<Event>, Error> {
let mut frontend_events = Vec::new();
@ -758,216 +963,20 @@ impl Client {
}
}
let new_msgs = self.postbox.new_messages();
let mut handles_msg = 0;
if new_msgs.len() > 0 {
for msg in new_msgs {
match msg {
ServerMsg::TooManyPlayers => {
return Err(Error::ServerWentMad);
},
ServerMsg::Shutdown => return Err(Error::ServerShutdown),
ServerMsg::InitialSync { .. } => return Err(Error::ServerWentMad),
ServerMsg::PlayerListUpdate(PlayerListUpdate::Init(list)) => {
self.player_list = list
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Add(uid, player_info)) => {
if let Some(old_player_info) =
self.player_list.insert(uid, player_info.clone())
{
warn!(
"Received msg to insert {} with uid {} into the player list but \
there was already an entry for {} with the same uid that was \
overwritten!",
player_info.player_alias, uid, old_player_info.player_alias
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Admin(uid, admin)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.is_admin = admin;
} else {
warn!(
"Received msg to update admin status of uid {}, but they were not \
in the list.",
uid
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::SelectedCharacter(
uid,
char_info,
)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.character = Some(char_info);
} else {
warn!(
"Received msg to update character info for uid {}, but they were \
not in the list.",
uid
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::LevelChange(uid, next_level)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.character = match &player_info.character {
Some(character) => Some(common::msg::CharacterInfo {
name: character.name.to_string(),
level: next_level,
}),
None => {
warn!(
"Received msg to update character level info to {} for \
uid {}, but this player's character is None.",
next_level, uid
);
block_on(async{
//TIMEOUT 0.01 ms for msg handling
select!(
_ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()),
err = self.handle_message(&mut frontend_events, &mut handles_msg).fuse() => err,
)
})?;
None
},
};
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Remove(uid)) => {
// Instead of removing players, mark them as offline because we need to
// remember the names of disconnected players in chat.
//
// TODO the server should re-use uids of players that log out and log back
// in.
if let Some(player_info) = self.player_list.get_mut(&uid) {
if player_info.is_online {
player_info.is_online = false;
} else {
warn!(
"Received msg to remove uid {} from the player list by they \
were already marked offline",
uid
);
}
} else {
warn!(
"Received msg to remove uid {} from the player list by they \
weren't in the list!",
uid
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Alias(uid, new_name)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.player_alias = new_name;
} else {
warn!(
"Received msg to alias player with uid {} to {} but this uid is \
not in the player list",
uid, new_name
);
}
},
ServerMsg::Ping => self.postbox.send_message(ClientMsg::Pong),
ServerMsg::Pong => {
self.last_server_pong = self.state.get_time();
self.last_ping_delta =
(self.state.get_time() - self.last_server_ping).round();
},
ServerMsg::ChatMsg(m) => frontend_events.push(Event::Chat(m)),
ServerMsg::SetPlayerEntity(uid) => {
if let Some(entity) = self.state.ecs().entity_from_uid(uid.0) {
self.entity = entity;
} else {
return Err(Error::Other("Failed to find entity from uid.".to_owned()));
}
},
ServerMsg::TimeOfDay(time_of_day) => {
*self.state.ecs_mut().write_resource() = time_of_day;
},
ServerMsg::EntitySync(entity_sync_package) => {
self.state
.ecs_mut()
.apply_entity_sync_package(entity_sync_package);
},
ServerMsg::CompSync(comp_sync_package) => {
self.state
.ecs_mut()
.apply_comp_sync_package(comp_sync_package);
},
ServerMsg::CreateEntity(entity_package) => {
self.state.ecs_mut().apply_entity_package(entity_package);
},
ServerMsg::DeleteEntity(entity) => {
if self.state.read_component_cloned::<Uid>(self.entity) != Some(entity) {
self.state
.ecs_mut()
.delete_entity_and_clear_from_uid_allocator(entity.0);
}
},
// Cleanup for when the client goes back to the `Registered` state
ServerMsg::ExitIngameCleanup => {
self.clean_state();
},
ServerMsg::InventoryUpdate(inventory, event) => {
match event {
InventoryUpdateEvent::CollectFailed => {},
_ => {
// Push the updated inventory component to the client
self.state.write_component(self.entity, inventory);
},
}
frontend_events.push(Event::InventoryUpdated(event));
},
ServerMsg::TerrainChunkUpdate { key, chunk } => {
if let Ok(chunk) = chunk {
self.state.insert_chunk(key, *chunk);
}
self.pending_chunks.remove(&key);
},
ServerMsg::TerrainBlockUpdates(mut blocks) => {
blocks.drain().for_each(|(pos, block)| {
self.state.set_block(pos, block);
});
},
ServerMsg::StateAnswer(Ok(state)) => {
self.client_state = state;
},
ServerMsg::StateAnswer(Err((error, state))) => {
warn!(
"StateAnswer: {:?}. Server thinks client is in state {:?}.",
error, state
);
},
ServerMsg::Disconnect => {
frontend_events.push(Event::Disconnect);
self.postbox.send_message(ClientMsg::Terminate);
},
ServerMsg::CharacterListUpdate(character_list) => {
self.character_list.characters = character_list;
self.character_list.loading = false;
},
ServerMsg::CharacterActionError(error) => {
warn!("CharacterActionError: {:?}.", error);
self.character_list.error = Some(error);
},
ServerMsg::Notification(n) => {
frontend_events.push(Event::Notification(n));
},
ServerMsg::CharacterDataLoadError(error) => {
self.clean_state();
self.character_list.error = Some(error);
},
ServerMsg::SetViewDistance(vd) => {
self.view_distance = Some(vd);
frontend_events.push(Event::SetViewDistance(vd));
},
}
}
} else if let Some(err) = self.postbox.error() {
return Err(err.into());
// We regularily ping in the tick method
} else if self.state.get_time() - self.last_server_pong > SERVER_TIMEOUT {
if handles_msg == 0 && self.state.get_time() - self.last_server_pong > SERVER_TIMEOUT {
return Err(Error::ServerTimeout);
}
Ok(frontend_events)
}

View File

@ -19,7 +19,7 @@ impl Component for Client {
}
impl Client {
pub fn notify(&mut self, msg: ServerMsg) { self.singleton_stream.lock().unwrap().send(msg); }
pub fn notify(&mut self, msg: ServerMsg) { let _ = self.singleton_stream.lock().unwrap().send(msg); }
pub fn is_registered(&self) -> bool {
match self.client_state {
@ -37,12 +37,12 @@ impl Client {
pub fn allow_state(&mut self, new_state: ClientState) {
self.client_state = new_state;
self.singleton_stream
let _ = self.singleton_stream
.lock().unwrap().send(ServerMsg::StateAnswer(Ok(new_state)));
}
pub fn error_state(&mut self, error: RequestStateError) {
self.singleton_stream
let _ = self.singleton_stream
.lock().unwrap().send(ServerMsg::StateAnswer(Err((error, self.client_state))));
}
}

View File

@ -342,11 +342,11 @@ impl Server {
// 3) Handle inputs from clients
block_on(async{
//TIMEOUT 0.01 ms for msg handling
let x = select!(
select!(
_ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()),
err = self.handle_new_connections(&mut frontend_events).fuse() => err,
);
});
)
})?;
let before_message_system = Instant::now();

View File

@ -22,6 +22,366 @@ use hashbrown::HashMap;
use specs::{
Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage,
};
use futures_util::{select, FutureExt};
use futures_executor::block_on;
use futures_timer::Delay;
impl Sys {
///We need to move this to a async fn, otherwise the compiler generates to much recursive fn, and async closures dont work yet
#[allow(clippy::too_many_arguments)]
async fn handle_client_msg(
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
new_chat_msgs: &mut Vec<(Option<specs::Entity>, ChatMsg)>,
player_list: &HashMap<Uid, PlayerInfo>,
new_players: &mut Vec<specs::Entity>,
entity: specs::Entity,
client: &mut Client,
cnt: &mut u64,
character_loader: &ReadExpect<'_, CharacterLoader>,
terrain: &ReadExpect<'_, TerrainGrid>,
uids: &ReadStorage<'_, Uid>,
can_build: &ReadStorage<'_, CanBuild>,
force_updates: &ReadStorage<'_, ForceUpdate>,
stats: &ReadStorage<'_, Stats>,
chat_modes: &ReadStorage<'_, ChatMode>,
accounts: &mut WriteExpect<'_, AuthProvider>,
block_changes: &mut Write<'_, BlockChange>,
admin_list: &ReadExpect<'_, AdminList>,
admins: &mut WriteStorage<'_, Admin>,
positions: &mut WriteStorage<'_, Pos>,
velocities: &mut WriteStorage<'_, Vel>,
orientations: &mut WriteStorage<'_, Ori>,
players: &mut WriteStorage<'_, Player>,
controllers: &mut WriteStorage<'_, Controller>,
settings: &Read<'_, ServerSettings>,
) -> Result<(), crate::error::Error> {
loop {
let msg = client.singleton_stream.lock().unwrap().recv().await?;
*cnt += 1;
match msg {
// Go back to registered state (char selection screen)
ClientMsg::ExitIngame => match client.client_state {
// Use ClientMsg::Register instead.
ClientState::Connected => {
client.error_state(RequestStateError::WrongMessage)
},
ClientState::Registered => client.error_state(RequestStateError::Already),
ClientState::Spectator | ClientState::Character => {
server_emitter.emit(ServerEvent::ExitIngame { entity });
},
ClientState::Pending => {},
},
// Request spectator state
ClientMsg::Spectate => match client.client_state {
// Become Registered first.
ClientState::Connected => client.error_state(RequestStateError::Impossible),
ClientState::Spectator => client.error_state(RequestStateError::Already),
ClientState::Registered | ClientState::Character => {
client.allow_state(ClientState::Spectator)
},
ClientState::Pending => {},
},
// Request registered state (login)
ClientMsg::Register {
view_distance,
token_or_username,
} => {
let (username, uuid) = match accounts.query(token_or_username.clone()) {
Err(err) => {
client.error_state(RequestStateError::RegisterDenied(err));
break Ok(());
},
Ok((username, uuid)) => (username, uuid),
};
let vd = view_distance
.map(|vd| vd.min(settings.max_view_distance.unwrap_or(vd)));
let player = Player::new(username.clone(), None, vd, uuid);
let is_admin = admin_list.contains(&username);
if !player.is_valid() {
// Invalid player
client.error_state(RequestStateError::Impossible);
break Ok(());
}
match client.client_state {
ClientState::Connected => {
// Add Player component to this client
let _ = players.insert(entity, player);
// Give the Admin component to the player if their name exists in
// admin list
if is_admin {
let _ = admins.insert(entity, Admin);
}
// Tell the client its request was successful.
client.allow_state(ClientState::Registered);
// Send initial player list
client.notify(ServerMsg::PlayerListUpdate(PlayerListUpdate::Init(
player_list.clone(),
)));
// Add to list to notify all clients of the new player
new_players.push(entity);
},
// Use RequestState instead (No need to send `player` again).
_ => client.error_state(RequestStateError::Impossible),
}
//client.allow_state(ClientState::Registered);
// Limit view distance if it's too high
// This comes after state registration so that the client actually hears it
if settings
.max_view_distance
.zip(view_distance)
.map(|(vd, max)| vd > max)
.unwrap_or(false)
{
client.notify(ServerMsg::SetViewDistance(
settings.max_view_distance.unwrap_or(0),
));
};
},
ClientMsg::SetViewDistance(view_distance) => if let ClientState::Character { .. } = client.client_state {
if settings
.max_view_distance
.map(|max| view_distance <= max)
.unwrap_or(true)
{
players.get_mut(entity).map(|player| {
player.view_distance = Some(
settings
.max_view_distance
.map(|max| view_distance.min(max))
.unwrap_or(view_distance),
)
});
} else {
client.notify(ServerMsg::SetViewDistance(
settings.max_view_distance.unwrap_or(0),
));
}
},
ClientMsg::Character(character_id) => match client.client_state {
// Become Registered first.
ClientState::Connected => client.error_state(RequestStateError::Impossible),
ClientState::Registered | ClientState::Spectator => {
// Only send login message if it wasn't already
// sent previously
if let (Some(player), false) =
(players.get(entity), client.login_msg_sent)
{
// Send a request to load the character's component data from the
// DB. Once loaded, persisted components such as stats and inventory
// will be inserted for the entity
character_loader.load_character_data(
entity,
player.uuid().to_string(),
character_id,
);
// Start inserting non-persisted/default components for the entity
// while we load the DB data
server_emitter.emit(ServerEvent::InitCharacterData {
entity,
character_id,
});
// Give the player a welcome message
if settings.server_description.len() > 0 {
client.notify(
ChatType::CommandInfo
.server_msg(settings.server_description.clone()),
);
}
// Only send login message if it wasn't already
// sent previously
if !client.login_msg_sent {
new_chat_msgs.push((None, ChatMsg {
chat_type: ChatType::Online,
message: format!("[{}] is now online.", &player.alias), // TODO: Localize this
}));
client.login_msg_sent = true;
}
} else {
client.notify(ServerMsg::CharacterDataLoadError(String::from(
"Failed to fetch player entity",
)))
}
},
ClientState::Character => client.error_state(RequestStateError::Already),
ClientState::Pending => {},
},
ClientMsg::ControllerInputs(inputs) => match client.client_state {
ClientState::Connected
| ClientState::Registered
| ClientState::Spectator => {
client.error_state(RequestStateError::Impossible)
},
ClientState::Character => {
if let Some(controller) = controllers.get_mut(entity) {
controller.inputs.update_with_new(inputs);
}
},
ClientState::Pending => {},
},
ClientMsg::ControlEvent(event) => match client.client_state {
ClientState::Connected
| ClientState::Registered
| ClientState::Spectator => {
client.error_state(RequestStateError::Impossible)
},
ClientState::Character => {
// Skip respawn if client entity is alive
if let ControlEvent::Respawn = event {
if stats.get(entity).map_or(true, |s| !s.is_dead) {
continue;
}
}
if let Some(controller) = controllers.get_mut(entity) {
controller.events.push(event);
}
},
ClientState::Pending => {},
},
ClientMsg::ControlAction(event) => match client.client_state {
ClientState::Connected
| ClientState::Registered
| ClientState::Spectator => {
client.error_state(RequestStateError::Impossible)
},
ClientState::Character => {
if let Some(controller) = controllers.get_mut(entity) {
controller.actions.push(event);
}
},
ClientState::Pending => {},
},
ClientMsg::ChatMsg(message) => match client.client_state {
ClientState::Connected => client.error_state(RequestStateError::Impossible),
ClientState::Registered
| ClientState::Spectator
| ClientState::Character => match validate_chat_msg(&message) {
Ok(()) => {
if let Some(from) = uids.get(entity) {
let mode = chat_modes.get(entity).cloned().unwrap_or_default();
let msg = mode.new_message(*from, message);
new_chat_msgs.push((Some(entity), msg));
} else {
tracing::error!("Could not send message. Missing player uid");
}
},
Err(ChatMsgValidationError::TooLong) => {
let max = MAX_BYTES_CHAT_MSG;
let len = message.len();
tracing::warn!(
?len,
?max,
"Recieved a chat message that's too long"
)
},
},
ClientState::Pending => {},
},
ClientMsg::PlayerPhysics { pos, vel, ori } => match client.client_state {
ClientState::Character => {
if force_updates.get(entity).is_none()
&& stats.get(entity).map_or(true, |s| !s.is_dead)
{
let _ = positions.insert(entity, pos);
let _ = velocities.insert(entity, vel);
let _ = orientations.insert(entity, ori);
}
},
// Only characters can send positions.
_ => client.error_state(RequestStateError::Impossible),
},
ClientMsg::BreakBlock(pos) => {
if can_build.get(entity).is_some() {
block_changes.set(pos, Block::empty());
}
},
ClientMsg::PlaceBlock(pos, block) => {
if can_build.get(entity).is_some() {
block_changes.try_set(pos, block);
}
},
ClientMsg::TerrainChunkRequest { key } => match client.client_state {
ClientState::Connected | ClientState::Registered => {
client.error_state(RequestStateError::Impossible);
},
ClientState::Spectator | ClientState::Character => {
let in_vd = if let (Some(view_distance), Some(pos)) = (
players.get(entity).and_then(|p| p.view_distance),
positions.get(entity),
) {
pos.0.xy().map(|e| e as f64).distance(
key.map(|e| e as f64 + 0.5)
* TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
) < (view_distance as f64 + 1.5)
* TerrainChunkSize::RECT_SIZE.x as f64
} else {
true
};
if in_vd {
match terrain.get_key(key) {
Some(chunk) => {
client.notify(ServerMsg::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
})
},
None => {
server_emitter.emit(ServerEvent::ChunkRequest(entity, key))
},
}
}
},
ClientState::Pending => {},
},
// Always possible.
ClientMsg::Ping => client.notify(ServerMsg::Pong),
ClientMsg::Pong => {},
ClientMsg::Disconnect => {
client.notify(ServerMsg::Disconnect);
},
ClientMsg::Terminate => {
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
},
ClientMsg::RequestCharacterList => {
if let Some(player) = players.get(entity) {
character_loader.load_character_list(entity, player.uuid().to_string())
}
},
ClientMsg::CreateCharacter { alias, tool, body } => {
if let Some(player) = players.get(entity) {
character_loader.create_character(
entity,
player.uuid().to_string(),
alias,
tool,
body,
);
}
},
ClientMsg::DeleteCharacter(character_id) => {
if let Some(player) = players.get(entity) {
character_loader.delete_character(
entity,
player.uuid().to_string(),
character_id,
);
}
},
}
}
}
}
/// This system will handle new messages from clients
pub struct Sys;
@ -107,347 +467,53 @@ impl<'a> System<'a> for Sys {
let mut new_players = Vec::new();
for (entity, client) in (&entities, &mut clients).join() {
let new_msgs = client.postbox.new_messages();
let mut cnt = 0;
let network_err: Result<(), crate::error::Error> = block_on(async{
//TIMEOUT 0.01 ms for msg handling
select!(
_ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()),
err = Self::handle_client_msg(
&mut server_emitter,
&mut new_chat_msgs,
&player_list,
&mut new_players,
entity,
client,
&mut cnt,
&character_loader,
&terrain,
&uids,
&can_build,
&force_updates,
&stats,
&chat_modes,
&mut accounts,
&mut block_changes,
&admin_list,
&mut admins,
&mut positions,
&mut velocities,
&mut orientations,
&mut players,
&mut controllers,
&settings,
).fuse() => err,
)
});
// Update client ping.
if new_msgs.len() > 0 {
if cnt > 0 {
client.last_ping = time.0
} else if time.0 - client.last_ping > CLIENT_TIMEOUT // Timeout
|| client.postbox.error().is_some()
|| network_err.is_err()
// Postbox error
{
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
} else if time.0 - client.last_ping > CLIENT_TIMEOUT * 0.5 {
// Try pinging the client if the timeout is nearing.
client.postbox.send_message(ServerMsg::Ping);
}
// Process incoming messages.
for msg in new_msgs {
match msg {
// Go back to registered state (char selection screen)
ClientMsg::ExitIngame => match client.client_state {
// Use ClientMsg::Register instead.
ClientState::Connected => {
client.error_state(RequestStateError::WrongMessage)
},
ClientState::Registered => client.error_state(RequestStateError::Already),
ClientState::Spectator | ClientState::Character => {
server_emitter.emit(ServerEvent::ExitIngame { entity });
},
ClientState::Pending => {},
},
// Request spectator state
ClientMsg::Spectate => match client.client_state {
// Become Registered first.
ClientState::Connected => client.error_state(RequestStateError::Impossible),
ClientState::Spectator => client.error_state(RequestStateError::Already),
ClientState::Registered | ClientState::Character => {
client.allow_state(ClientState::Spectator)
},
ClientState::Pending => {},
},
// Request registered state (login)
ClientMsg::Register {
view_distance,
token_or_username,
} => {
let (username, uuid) = match accounts.query(token_or_username.clone()) {
Err(err) => {
client.error_state(RequestStateError::RegisterDenied(err));
break;
},
Ok((username, uuid)) => (username, uuid),
};
let vd = view_distance
.map(|vd| vd.min(settings.max_view_distance.unwrap_or(vd)));
let player = Player::new(username.clone(), None, vd, uuid);
let is_admin = admin_list.contains(&username);
if !player.is_valid() {
// Invalid player
client.error_state(RequestStateError::Impossible);
break;
}
match client.client_state {
ClientState::Connected => {
// Add Player component to this client
let _ = players.insert(entity, player);
// Give the Admin component to the player if their name exists in
// admin list
if is_admin {
let _ = admins.insert(entity, Admin);
}
// Tell the client its request was successful.
client.allow_state(ClientState::Registered);
// Send initial player list
client.notify(ServerMsg::PlayerListUpdate(PlayerListUpdate::Init(
player_list.clone(),
)));
// Add to list to notify all clients of the new player
new_players.push(entity);
},
// Use RequestState instead (No need to send `player` again).
_ => client.error_state(RequestStateError::Impossible),
}
//client.allow_state(ClientState::Registered);
// Limit view distance if it's too high
// This comes after state registration so that the client actually hears it
if settings
.max_view_distance
.zip(view_distance)
.map(|(vd, max)| vd > max)
.unwrap_or(false)
{
client.notify(ServerMsg::SetViewDistance(
settings.max_view_distance.unwrap_or(0),
));
};
},
ClientMsg::SetViewDistance(view_distance) => match client.client_state {
ClientState::Character { .. } => {
if settings
.max_view_distance
.map(|max| view_distance <= max)
.unwrap_or(true)
{
players.get_mut(entity).map(|player| {
player.view_distance = Some(
settings
.max_view_distance
.map(|max| view_distance.min(max))
.unwrap_or(view_distance),
)
});
} else {
client.notify(ServerMsg::SetViewDistance(
settings.max_view_distance.unwrap_or(0),
));
}
},
_ => {},
},
ClientMsg::Character(character_id) => match client.client_state {
// Become Registered first.
ClientState::Connected => client.error_state(RequestStateError::Impossible),
ClientState::Registered | ClientState::Spectator => {
// Only send login message if it wasn't already
// sent previously
if let (Some(player), false) =
(players.get(entity), client.login_msg_sent)
{
// Send a request to load the character's component data from the
// DB. Once loaded, persisted components such as stats and inventory
// will be inserted for the entity
character_loader.load_character_data(
entity,
player.uuid().to_string(),
character_id,
);
// Start inserting non-persisted/default components for the entity
// while we load the DB data
server_emitter.emit(ServerEvent::InitCharacterData {
entity,
character_id,
});
// Give the player a welcome message
if settings.server_description.len() > 0 {
client.notify(
ChatType::CommandInfo
.server_msg(settings.server_description.clone()),
);
}
// Only send login message if it wasn't already
// sent previously
if !client.login_msg_sent {
new_chat_msgs.push((None, ChatMsg {
chat_type: ChatType::Online,
message: format!("[{}] is now online.", &player.alias), // TODO: Localize this
}));
client.login_msg_sent = true;
}
} else {
client.notify(ServerMsg::CharacterDataLoadError(String::from(
"Failed to fetch player entity",
)))
}
},
ClientState::Character => client.error_state(RequestStateError::Already),
ClientState::Pending => {},
},
ClientMsg::ControllerInputs(inputs) => match client.client_state {
ClientState::Connected
| ClientState::Registered
| ClientState::Spectator => {
client.error_state(RequestStateError::Impossible)
},
ClientState::Character => {
if let Some(controller) = controllers.get_mut(entity) {
controller.inputs.update_with_new(inputs);
}
},
ClientState::Pending => {},
},
ClientMsg::ControlEvent(event) => match client.client_state {
ClientState::Connected
| ClientState::Registered
| ClientState::Spectator => {
client.error_state(RequestStateError::Impossible)
},
ClientState::Character => {
// Skip respawn if client entity is alive
if let &ControlEvent::Respawn = &event {
if stats.get(entity).map_or(true, |s| !s.is_dead) {
continue;
}
}
if let Some(controller) = controllers.get_mut(entity) {
controller.events.push(event);
}
},
ClientState::Pending => {},
},
ClientMsg::ControlAction(event) => match client.client_state {
ClientState::Connected
| ClientState::Registered
| ClientState::Spectator => {
client.error_state(RequestStateError::Impossible)
},
ClientState::Character => {
if let Some(controller) = controllers.get_mut(entity) {
controller.actions.push(event);
}
},
ClientState::Pending => {},
},
ClientMsg::ChatMsg(message) => match client.client_state {
ClientState::Connected => client.error_state(RequestStateError::Impossible),
ClientState::Registered
| ClientState::Spectator
| ClientState::Character => match validate_chat_msg(&message) {
Ok(()) => {
if let Some(from) = uids.get(entity) {
let mode = chat_modes.get(entity).cloned().unwrap_or_default();
let msg = mode.new_message(*from, message);
new_chat_msgs.push((Some(entity), msg));
} else {
tracing::error!("Could not send message. Missing player uid");
}
},
Err(ChatMsgValidationError::TooLong) => {
let max = MAX_BYTES_CHAT_MSG;
let len = message.len();
tracing::warn!(
?len,
?max,
"Recieved a chat message that's too long"
)
},
},
ClientState::Pending => {},
},
ClientMsg::PlayerPhysics { pos, vel, ori } => match client.client_state {
ClientState::Character => {
if force_updates.get(entity).is_none()
&& stats.get(entity).map_or(true, |s| !s.is_dead)
{
let _ = positions.insert(entity, pos);
let _ = velocities.insert(entity, vel);
let _ = orientations.insert(entity, ori);
}
},
// Only characters can send positions.
_ => client.error_state(RequestStateError::Impossible),
},
ClientMsg::BreakBlock(pos) => {
if can_build.get(entity).is_some() {
block_changes.set(pos, Block::empty());
}
},
ClientMsg::PlaceBlock(pos, block) => {
if can_build.get(entity).is_some() {
block_changes.try_set(pos, block);
}
},
ClientMsg::TerrainChunkRequest { key } => match client.client_state {
ClientState::Connected | ClientState::Registered => {
client.error_state(RequestStateError::Impossible);
},
ClientState::Spectator | ClientState::Character => {
let in_vd = if let (Some(view_distance), Some(pos)) = (
players.get(entity).and_then(|p| p.view_distance),
positions.get(entity),
) {
pos.0.xy().map(|e| e as f64).distance(
key.map(|e| e as f64 + 0.5)
* TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
) < (view_distance as f64 + 1.5)
* TerrainChunkSize::RECT_SIZE.x as f64
} else {
true
};
if in_vd {
match terrain.get_key(key) {
Some(chunk) => {
client.postbox.send_message(ServerMsg::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
})
},
None => {
server_emitter.emit(ServerEvent::ChunkRequest(entity, key))
},
}
}
},
ClientState::Pending => {},
},
// Always possible.
ClientMsg::Ping => client.postbox.send_message(ServerMsg::Pong),
ClientMsg::Pong => {},
ClientMsg::Disconnect => {
client.postbox.send_message(ServerMsg::Disconnect);
},
ClientMsg::Terminate => {
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
},
ClientMsg::RequestCharacterList => {
if let Some(player) = players.get(entity) {
character_loader.load_character_list(entity, player.uuid().to_string())
}
},
ClientMsg::CreateCharacter { alias, tool, body } => {
if let Some(player) = players.get(entity) {
character_loader.create_character(
entity,
player.uuid().to_string(),
alias,
tool,
body,
);
}
},
ClientMsg::DeleteCharacter(character_id) => {
if let Some(player) = players.get(entity) {
character_loader.delete_character(
entity,
player.uuid().to_string(),
character_id,
);
}
},
}
client.notify(ServerMsg::Ping);
}
}

View File

@ -1,4 +1,4 @@
use client::{error::Error as ClientError, Client};
use client::{error::Error as ClientError, Client, error::NetworkError};
use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError};
use std::{
net::ToSocketAddrs,
@ -97,10 +97,7 @@ impl ClientInit {
},
Err(err) => {
match err {
ClientError::NetworkErr(_) => {
last_err = Some(Error::ClientError(err));
break 'tries;
}
ClientError::NetworkErr(NetworkError::ListenFailed(..)) => {},
// Non-connection error, stop attempts
err => {
last_err = Some(Error::ClientError(err));