- fix wording in error msg
 - find better name for structs
 - unify errors and cleanup code with `(|| {foo?; Some(())})()` pattern
 - fix the negative PlayersOnline, it was caused by having a gracefull shutdown AND a timeout error. we now unregister the client when he issues TERMINATE
This commit is contained in:
Marcel Märtens 2020-10-21 12:23:34 +02:00
parent 9ba19a1cd9
commit 37d08e93ca
16 changed files with 193 additions and 211 deletions

View File

@ -673,7 +673,7 @@ fn handle_spawn(
.map(|g| (g, s))
})
.map(|(g, s)| {
s.send_unchecked(ServerGeneral::GroupUpdate(g));
s.send_fallible(ServerGeneral::GroupUpdate(g));
});
},
);

View File

@ -16,7 +16,7 @@ pub(crate) struct ServerInfoPacket {
pub time: f64,
}
pub(crate) struct ClientPackage {
pub(crate) struct IncomingClient {
pub client: Client,
pub general: GeneralStream,
pub ping: PingStream,
@ -28,7 +28,7 @@ pub(crate) struct ClientPackage {
pub(crate) struct ConnectionHandler {
_network: Arc<Network>,
thread_handle: Option<thread::JoinHandle<()>>,
pub client_receiver: Receiver<ClientPackage>,
pub client_receiver: Receiver<IncomingClient>,
pub info_requester_receiver: Receiver<Sender<ServerInfoPacket>>,
stop_sender: Option<oneshot::Sender<()>>,
}
@ -43,7 +43,7 @@ impl ConnectionHandler {
let network_clone = Arc::clone(&network);
let (stop_sender, stop_receiver) = oneshot::channel();
let (client_sender, client_receiver) = unbounded::<ClientPackage>();
let (client_sender, client_receiver) = unbounded::<IncomingClient>();
let (info_requester_sender, info_requester_receiver) =
bounded::<Sender<ServerInfoPacket>>(1);
@ -67,7 +67,7 @@ impl ConnectionHandler {
async fn work(
network: Arc<Network>,
client_sender: Sender<ClientPackage>,
client_sender: Sender<IncomingClient>,
info_requester_sender: Sender<Sender<ServerInfoPacket>>,
stop_receiver: oneshot::Receiver<()>,
) {
@ -104,7 +104,7 @@ impl ConnectionHandler {
async fn init_participant(
participant: Participant,
client_sender: Sender<ClientPackage>,
client_sender: Sender<IncomingClient>,
info_requester_sender: Sender<Sender<ServerInfoPacket>>,
) -> Result<(), Box<dyn std::error::Error>> {
debug!("New Participant connected to the server");
@ -130,7 +130,7 @@ impl ConnectionHandler {
t = register_stream.recv::<ClientType>().fuse() => Some(t),
) {
None => {
debug!("slow client connection detected, dropping it");
debug!("Timeout for incoming client elapsed, aborting connection");
return Ok(());
},
Some(client_type) => client_type?,
@ -145,7 +145,7 @@ impl ConnectionHandler {
login_msg_sent: false,
};
let package = ClientPackage {
let package = IncomingClient {
client,
general: GeneralStream(general_stream),
ping: PingStream(ping_stream),

View File

@ -45,7 +45,7 @@ pub fn handle_knockback(server: &Server, entity: EcsEntity, impulse: Vec3<f32>)
}
let mut in_game_streams = state.ecs().write_storage::<InGameStream>();
if let Some(in_game_stream) = in_game_streams.get_mut(entity) {
in_game_stream.send_unchecked(ServerGeneral::Knockback(impulse));
in_game_stream.send_fallible(ServerGeneral::Knockback(impulse));
}
}

View File

@ -34,7 +34,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
None => {
// Inform of failure
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_unchecked(
general_stream.send_fallible(
ChatType::Meta
.server_msg("Invite failed, target does not exist.".to_owned()),
);
@ -66,7 +66,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
if already_in_same_group {
// Inform of failure
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_unchecked(ChatType::Meta.server_msg(
general_stream.send_fallible(ChatType::Meta.server_msg(
"Invite failed, can't invite someone already in your group".to_owned(),
));
}
@ -96,7 +96,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
if group_size_limit_reached {
// Inform inviter that they have reached the group size limit
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_unchecked(
general_stream.send_fallible(
ChatType::Meta.server_msg(
"Invite failed, pending invites plus current group size have reached \
the group size limit"
@ -113,7 +113,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
if invites.contains(invitee) {
// Inform inviter that there is already an invite
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_unchecked(
general_stream.send_fallible(
ChatType::Meta
.server_msg("This player already has a pending invite.".to_owned()),
);
@ -160,7 +160,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
(in_game_streams.get_mut(invitee), uids.get(entity).copied())
{
if send_invite() {
in_game_stream.send_unchecked(ServerGeneral::GroupInvite {
in_game_stream.send_fallible(ServerGeneral::GroupInvite {
inviter,
timeout: PRESENTED_INVITE_TIMEOUT_DUR,
});
@ -168,7 +168,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
} else if agents.contains(invitee) {
send_invite();
} else if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_unchecked(
general_stream.send_fallible(
ChatType::Meta.server_msg("Can't invite, not a player or npc".to_owned()),
);
}
@ -176,7 +176,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
// Notify inviter that the invite is pending
if invite_sent {
if let Some(in_game_stream) = in_game_streams.get_mut(entity) {
in_game_stream.send_unchecked(ServerGeneral::InvitePending(uid));
in_game_stream.send_fallible(ServerGeneral::InvitePending(uid));
}
}
},
@ -201,7 +201,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
if let (Some(in_game_stream), Some(target)) =
(in_game_streams.get_mut(inviter), uids.get(entity).copied())
{
in_game_stream.send_unchecked(ServerGeneral::InviteComplete {
in_game_stream.send_fallible(ServerGeneral::InviteComplete {
target,
answer: InviteAnswer::Accepted,
});
@ -249,7 +249,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
if let (Some(in_game_stream), Some(target)) =
(in_game_streams.get_mut(inviter), uids.get(entity).copied())
{
in_game_stream.send_unchecked(ServerGeneral::InviteComplete {
in_game_stream.send_fallible(ServerGeneral::InviteComplete {
target,
answer: InviteAnswer::Declined,
});
@ -288,7 +288,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
None => {
// Inform of failure
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_unchecked(
general_stream.send_fallible(
ChatType::Meta
.server_msg("Kick failed, target does not exist.".to_owned()),
);
@ -301,7 +301,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
if matches!(alignments.get(target), Some(comp::Alignment::Owned(owner)) if uids.get(target).map_or(true, |u| u != owner))
{
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_unchecked(
general_stream.send_fallible(
ChatType::Meta.server_msg("Kick failed, you can't kick pets.".to_owned()),
);
}
@ -310,7 +310,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
// Can't kick yourself
if uids.get(entity).map_or(false, |u| *u == uid) {
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_unchecked(
general_stream.send_fallible(
ChatType::Meta
.server_msg("Kick failed, you can't kick yourself.".to_owned()),
);
@ -348,7 +348,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
// Tell them the have been kicked
if let Some(general_stream) = general_streams.get_mut(target) {
general_stream.send_unchecked(
general_stream.send_fallible(
ChatType::Meta
.server_msg("You were removed from the group.".to_owned()),
);
@ -356,13 +356,13 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
// Tell kicker that they were succesful
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream
.send_unchecked(ChatType::Meta.server_msg("Player kicked.".to_owned()));
.send_fallible(ChatType::Meta.server_msg("Player kicked.".to_owned()));
}
},
Some(_) => {
// Inform kicker that they are not the leader
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_unchecked(ChatType::Meta.server_msg(
general_stream.send_fallible(ChatType::Meta.server_msg(
"Kick failed: You are not the leader of the target's group.".to_owned(),
));
}
@ -370,7 +370,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
None => {
// Inform kicker that the target is not in a group
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_unchecked(
general_stream.send_fallible(
ChatType::Meta.server_msg(
"Kick failed: Your target is not in a group.".to_owned(),
),
@ -387,7 +387,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
None => {
// Inform of failure
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_unchecked(ChatType::Meta.server_msg(
general_stream.send_fallible(ChatType::Meta.server_msg(
"Leadership transfer failed, target does not exist".to_owned(),
));
}
@ -423,13 +423,13 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
);
// Tell them they are the leader
if let Some(general_stream) = general_streams.get_mut(target) {
general_stream.send_unchecked(
general_stream.send_fallible(
ChatType::Meta.server_msg("You are the group leader now.".to_owned()),
);
}
// Tell the old leader that the transfer was succesful
if let Some(general_stream) = general_streams.get_mut(target) {
general_stream.send_unchecked(
general_stream.send_fallible(
ChatType::Meta
.server_msg("You are no longer the group leader.".to_owned()),
);
@ -439,7 +439,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
// Inform transferer that they are not the leader
let mut general_streams = state.ecs().write_storage::<GeneralStream>();
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_unchecked(
general_stream.send_fallible(
ChatType::Meta.server_msg(
"Transfer failed: You are not the leader of the target's group."
.to_owned(),
@ -451,7 +451,7 @@ pub fn handle_group(server: &mut Server, entity: specs::Entity, manip: GroupMani
// Inform transferer that the target is not in a group
let mut general_streams = state.ecs().write_storage::<GeneralStream>();
if let Some(general_stream) = general_streams.get_mut(entity) {
general_stream.send_unchecked(ChatType::Meta.server_msg(
general_stream.send_fallible(ChatType::Meta.server_msg(
"Transfer failed: Your target is not in a group.".to_owned(),
));
}

View File

@ -121,124 +121,96 @@ pub fn handle_possess(server: &Server, possessor_uid: Uid, possesse_uid: Uid) {
return;
}
// You can't possess other players
let mut clients = ecs.write_storage::<Client>();
let mut general_streams = ecs.write_storage::<GeneralStream>();
let mut ping_streams = ecs.write_storage::<PingStream>();
let mut register_streams = ecs.write_storage::<RegisterStream>();
let mut character_screen_streams = ecs.write_storage::<CharacterScreenStream>();
let mut in_game_streams = ecs.write_storage::<InGameStream>();
if clients.get_mut(possesse).is_none() {
let client = match clients.remove(possessor) {
Some(c) => c,
None => return,
};
let mut general_stream = match general_streams.remove(possessor) {
Some(c) => c,
None => return,
};
let ping_stream = match ping_streams.remove(possessor) {
Some(c) => c,
None => return,
};
let register_stream = match register_streams.remove(possessor) {
Some(c) => c,
None => return,
};
let character_screen_stream = match character_screen_streams.remove(possessor) {
Some(c) => c,
None => return,
};
let in_game_stream = match in_game_streams.remove(possessor) {
Some(c) => c,
None => return,
};
general_stream.send_unchecked(ServerGeneral::SetPlayerEntity(possesse_uid));
let err_fn = |c, e: Option<specs::error::Error>| {
e.map(|e| error!(?e, "Error inserting {} component during possession", c));
};
err_fn("client", clients.insert(possesse, client).err());
err_fn(
"general_streams",
general_streams.insert(possesse, general_stream).err(),
);
err_fn(
"ping_streams",
ping_streams.insert(possesse, ping_stream).err(),
);
err_fn(
"register_streams",
register_streams.insert(possesse, register_stream).err(),
);
err_fn(
"character_screen_streams",
character_screen_streams
.insert(possesse, character_screen_stream)
.err(),
);
err_fn(
"in_game_streams",
in_game_streams.insert(possesse, in_game_stream).err(),
);
// Put possess item into loadout
let mut loadouts = ecs.write_storage::<comp::Loadout>();
let loadout = loadouts
.entry(possesse)
.expect("Could not read loadouts component while possessing")
.or_insert(comp::Loadout::default());
let item = comp::Item::new_from_asset_expect("common.items.debug.possess");
if let item::ItemKind::Tool(tool) = item.kind() {
let mut abilities = tool.get_abilities();
let mut ability_drain = abilities.drain(..);
let debug_item = comp::ItemConfig {
item,
ability1: ability_drain.next(),
ability2: ability_drain.next(),
ability3: ability_drain.next(),
block_ability: None,
dodge_ability: None,
};
std::mem::swap(&mut loadout.active_item, &mut loadout.second_item);
loadout.active_item = Some(debug_item);
}
// Move player component
{
let mut players = ecs.write_storage::<comp::Player>();
if let Some(player) = players.remove(possessor) {
err_fn("player", players.insert(possesse, player).err());
}
}
// Transfer region subscription
{
let mut subscriptions = ecs.write_storage::<RegionSubscription>();
if let Some(s) = subscriptions.remove(possessor) {
err_fn("subscription", subscriptions.insert(possesse, s).err());
}
}
// Remove will of the entity
ecs.write_storage::<comp::Agent>().remove(possesse);
// Reset controller of former shell
ecs.write_storage::<comp::Controller>()
.get_mut(possessor)
.map(|c| c.reset());
// Transfer admin powers
{
let mut admins = ecs.write_storage::<comp::Admin>();
if let Some(admin) = admins.remove(possessor) {
err_fn("admin", admins.insert(possesse, admin).err());
}
}
// Transfer waypoint
{
let mut waypoints = ecs.write_storage::<comp::Waypoint>();
if let Some(waypoint) = waypoints.remove(possessor) {
err_fn("waypoints", waypoints.insert(possesse, waypoint).err());
}
}
if clients.get_mut(possesse).is_some() {
error!("can't possess other players");
return;
}
match (|| -> Option<Result<(), specs::error::Error>> {
let mut ping_streams = ecs.write_storage::<PingStream>();
let mut register_streams = ecs.write_storage::<RegisterStream>();
let mut character_screen_streams = ecs.write_storage::<CharacterScreenStream>();
let mut in_game_streams = ecs.write_storage::<InGameStream>();
let c = clients.remove(possessor)?;
clients.insert(possesse, c).ok()?;
let s = general_streams.remove(possessor)?;
general_streams.insert(possesse, s).ok()?;
let s = ping_streams.remove(possessor)?;
ping_streams.insert(possesse, s).ok()?;
let s = register_streams.remove(possessor)?;
register_streams.insert(possesse, s).ok()?;
let s = character_screen_streams.remove(possessor)?;
character_screen_streams.insert(possesse, s).ok()?;
let s = in_game_streams.remove(possessor)?;
in_game_streams.insert(possesse, s).ok()?;
//optional entities
let mut players = ecs.write_storage::<comp::Player>();
let mut subscriptions = ecs.write_storage::<RegionSubscription>();
let mut admins = ecs.write_storage::<comp::Admin>();
let mut waypoints = ecs.write_storage::<comp::Waypoint>();
players
.remove(possessor)
.map(|p| players.insert(possesse, p).ok()?);
subscriptions
.remove(possessor)
.map(|s| subscriptions.insert(possesse, s).ok()?);
admins
.remove(possessor)
.map(|a| admins.insert(possesse, a).ok()?);
waypoints
.remove(possessor)
.map(|w| waypoints.insert(possesse, w).ok()?);
Some(Ok(()))
})() {
Some(Ok(())) => (),
Some(Err(e)) => {
error!(?e, ?possesse, "Error inserting component during possession");
return;
},
None => {
error!(?possessor, "Error removing component during possession");
return;
},
}
general_streams
.get_mut(possesse)
.map(|s| s.send_fallible(ServerGeneral::SetPlayerEntity(possesse_uid)));
// Put possess item into loadout
let mut loadouts = ecs.write_storage::<comp::Loadout>();
let loadout = loadouts
.entry(possesse)
.expect("Could not read loadouts component while possessing")
.or_insert(comp::Loadout::default());
let item = comp::Item::new_from_asset_expect("common.items.debug.possess");
if let item::ItemKind::Tool(tool) = item.kind() {
let mut abilities = tool.get_abilities();
let mut ability_drain = abilities.drain(..);
let debug_item = comp::ItemConfig {
item,
ability1: ability_drain.next(),
ability2: ability_drain.next(),
ability3: ability_drain.next(),
block_ability: None,
dodge_ability: None,
};
std::mem::swap(&mut loadout.active_item, &mut loadout.second_item);
loadout.active_item = Some(debug_item);
}
// Remove will of the entity
ecs.write_storage::<comp::Agent>().remove(possesse);
// Reset controller of former shell
ecs.write_storage::<comp::Controller>()
.get_mut(possessor)
.map(|c| c.reset());
}
}

View File

@ -28,46 +28,40 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) {
// components Easier than checking and removing all other known components
// Note: If other `ServerEvent`s are referring to this entity they will be
// disrupted
let maybe_client = state.ecs().write_storage::<Client>().remove(entity);
let maybe_uid = state.read_component_copied::<Uid>(entity);
let maybe_player = state.ecs().write_storage::<comp::Player>().remove(entity);
let maybe_admin = state.ecs().write_storage::<comp::Admin>().remove(entity);
let maybe_general_stream = state.ecs().write_storage::<GeneralStream>().remove(entity);
let maybe_ping_stream = state.ecs().write_storage::<PingStream>().remove(entity);
let maybe_register_stream = state.ecs().write_storage::<RegisterStream>().remove(entity);
let maybe_character_screen_stream = state
.ecs()
.write_storage::<CharacterScreenStream>()
.remove(entity);
let maybe_in_game_stream = state.ecs().write_storage::<InGameStream>().remove(entity);
let maybe_admin = state.ecs().write_storage::<comp::Admin>().remove(entity);
let maybe_group = state
.ecs()
.write_storage::<group::Group>()
.get(entity)
.cloned();
if let (
Some(mut client),
Some(uid),
Some(player),
Some(general_stream),
Some(ping_stream),
Some(register_stream),
Some(character_screen_stream),
Some(mut in_game_stream),
) = (
maybe_client,
maybe_uid,
maybe_player,
maybe_general_stream,
maybe_ping_stream,
maybe_register_stream,
maybe_character_screen_stream,
maybe_in_game_stream,
) {
if let Some((
mut client,
uid,
player,
general_stream,
ping_stream,
register_stream,
character_screen_stream,
mut in_game_stream,
)) = (|| {
let ecs = state.ecs();
Some((
ecs.write_storage::<Client>().remove(entity)?,
ecs.write_storage::<Uid>().remove(entity)?,
ecs.write_storage::<comp::Player>().remove(entity)?,
ecs.write_storage::<GeneralStream>().remove(entity)?,
ecs.write_storage::<PingStream>().remove(entity)?,
ecs.write_storage::<RegisterStream>().remove(entity)?,
ecs.write_storage::<CharacterScreenStream>()
.remove(entity)?,
ecs.write_storage::<InGameStream>().remove(entity)?,
))
})() {
// Tell client its request was successful
client.in_game = None;
in_game_stream.send_unchecked(ServerGeneral::ExitInGameSuccess);
in_game_stream.send_fallible(ServerGeneral::ExitInGameSuccess);
let entity_builder = state
.ecs_mut()

View File

@ -232,7 +232,7 @@ impl StateExt for State {
.get_mut(entity)
{
client.in_game = Some(ClientInGame::Character);
character_screen_stream.send_unchecked(ServerGeneral::CharacterSuccess);
character_screen_stream.send_fallible(ServerGeneral::CharacterSuccess);
}
}
}
@ -298,7 +298,7 @@ impl StateExt for State {
.join()
{
if uid != u {
general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone()));
general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
},
@ -310,7 +310,7 @@ impl StateExt for State {
.join()
{
if uid == u || uid == t {
general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone()));
general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
},
@ -325,7 +325,7 @@ impl StateExt for State {
{
if is_within(comp::ChatMsg::SAY_DISTANCE, pos, speaker_pos) {
general_stream
.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone()));
.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
}
@ -340,7 +340,7 @@ impl StateExt for State {
{
if is_within(comp::ChatMsg::REGION_DISTANCE, pos, speaker_pos) {
general_stream
.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone()));
.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
}
@ -355,7 +355,7 @@ impl StateExt for State {
{
if is_within(comp::ChatMsg::NPC_DISTANCE, pos, speaker_pos) {
general_stream
.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone()));
.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
}
@ -369,7 +369,7 @@ impl StateExt for State {
.join()
{
if s == &faction.0 {
general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone()));
general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
},
@ -381,7 +381,7 @@ impl StateExt for State {
.join()
{
if g == group {
general_stream.send_unchecked(ServerGeneral::ChatMsg(resolved_msg.clone()));
general_stream.send_fallible(ServerGeneral::ChatMsg(resolved_msg.clone()));
}
}
},

View File

@ -21,7 +21,7 @@ pub(crate) trait GetStream {
}
}
fn send_unchecked(&mut self, msg: Self::SendMsg) { let _ = self.send(msg); }
fn send_fallible(&mut self, msg: Self::SendMsg) { let _ = self.send(msg); }
fn prepare(&mut self, msg: &Self::SendMsg) -> Message {
if Self::verify(&msg) {

View File

@ -166,7 +166,7 @@ impl<'a> System<'a> for Sys {
// Client doesn't need to know about itself
&& *client_entity != entity
{
general_stream.send_unchecked(create_msg.clone());
general_stream.send_fallible(create_msg.clone());
}
}
}
@ -180,7 +180,7 @@ impl<'a> System<'a> for Sys {
.map(|key| !regions.contains(key))
.unwrap_or(true)
{
general_stream.send_unchecked(ServerGeneral::DeleteEntity(uid));
general_stream.send_fallible(ServerGeneral::DeleteEntity(uid));
}
}
}
@ -252,7 +252,7 @@ impl<'a> System<'a> for Sys {
true // Closer than 100 blocks
}
} {
general_stream.send_unchecked(msg.clone());
general_stream.send_fallible(msg.clone());
}
}
};
@ -350,7 +350,7 @@ impl<'a> System<'a> for Sys {
})
{
for uid in &deleted {
general_stream.send_unchecked(ServerGeneral::DeleteEntity(Uid(*uid)));
general_stream.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid)));
}
}
}
@ -361,7 +361,7 @@ impl<'a> System<'a> for Sys {
for (inventory, update, in_game_stream) in
(&inventories, &inventory_updates, &mut in_game_streams).join()
{
in_game_stream.send_unchecked(ServerGeneral::InventoryUpdate(
in_game_stream.send_fallible(ServerGeneral::InventoryUpdate(
inventory.clone(),
update.event(),
));
@ -384,7 +384,7 @@ impl<'a> System<'a> for Sys {
.cloned()
.collect::<Vec<_>>();
if !outcomes.is_empty() {
in_game_stream.send_unchecked(ServerGeneral::Outcomes(outcomes));
in_game_stream.send_fallible(ServerGeneral::Outcomes(outcomes));
}
}
outcomes.clear();

View File

@ -55,7 +55,7 @@ impl<'a> System<'a> for Sys {
in_game_streams.get_mut(*inviter),
uids.get(invitee).copied(),
) {
in_game_stream.send_unchecked(ServerGeneral::InviteComplete {
in_game_stream.send_fallible(ServerGeneral::InviteComplete {
target,
answer: InviteAnswer::TimedOut,
});

View File

@ -61,6 +61,8 @@ impl Sys {
.clients_disconnected
.with_label_values(&["gracefully"])
.inc();
client.registered = false;
client.in_game = None;
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
},
_ => unreachable!("not a client_general msg"),

View File

@ -8,7 +8,10 @@ use crate::streams::GetStream;
/// handles all send msg and calls a handle fn
/// Aborts when a error occurred returns cnt of successful msg otherwise
pub(crate) fn try_recv_all<T, F>(stream: &mut T, mut f: F) -> Result<u64, crate::error::Error>
pub(in crate::sys::msg) fn try_recv_all<T, F>(
stream: &mut T,
mut f: F,
) -> Result<u64, crate::error::Error>
where
T: GetStream,
F: FnMut(&mut T, T::RecvMsg) -> Result<(), crate::error::Error>,

View File

@ -67,11 +67,19 @@ impl<'a> System<'a> for Sys {
match res {
Err(e) => {
debug!(?entity, ?e, "network error with client, disconnecting");
player_metrics
.clients_disconnected
.with_label_values(&["network_error"])
.inc();
let reg = client.registered;
debug!(
?entity,
?e,
?reg,
"network error with client, disconnecting"
);
if reg {
player_metrics
.clients_disconnected
.with_label_values(&["network_error"])
.inc();
}
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
},
Ok(1_u64..=u64::MAX) => {
@ -82,17 +90,20 @@ impl<'a> System<'a> for Sys {
if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64
// Timeout
{
info!(?entity, "timeout error with client, disconnecting");
player_metrics
.clients_disconnected
.with_label_values(&["timeout"])
.inc();
let reg = client.registered;
info!(?entity, ?reg, "timeout error with client, disconnecting");
if reg {
player_metrics
.clients_disconnected
.with_label_values(&["timeout"])
.inc();
}
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
} else if time.0 - client.last_ping
> settings.client_timeout.as_secs() as f64 * 0.5
{
// Try pinging the client if the timeout is nearing.
ping_stream.send_unchecked(PingMsg::Ping);
ping_stream.send_fallible(PingMsg::Ping);
}
},
}

View File

@ -160,7 +160,7 @@ impl<'a> System<'a> for Sys {
.unwrap_or(false)
{
general_stream
.send_unchecked(ServerGeneral::DeleteEntity(uid));
.send_fallible(ServerGeneral::DeleteEntity(uid));
}
}
},
@ -178,7 +178,7 @@ impl<'a> System<'a> for Sys {
.iter()
.flat_map(|v| v.iter())
{
general_stream.send_unchecked(ServerGeneral::DeleteEntity(Uid(*uid)));
general_stream.send_fallible(ServerGeneral::DeleteEntity(Uid(*uid)));
}
}
@ -203,7 +203,7 @@ impl<'a> System<'a> for Sys {
{
// Send message to create entity and tracked components and physics
// components
general_stream.send_unchecked(ServerGeneral::CreateEntity(
general_stream.send_fallible(ServerGeneral::CreateEntity(
tracked_comps.create_entity_package(
entity,
Some(*pos),
@ -256,7 +256,7 @@ pub fn initialize_region_subscription(world: &World, entity: specs::Entity) {
.join()
{
// Send message to create entity and tracked components and physics components
general_stream.send_unchecked(ServerGeneral::CreateEntity(
general_stream.send_fallible(ServerGeneral::CreateEntity(
tracked_comps.create_entity_package(
entity,
Some(*pos),

View File

@ -67,7 +67,7 @@ impl<'a> System<'a> for Sys {
Ok((chunk, supplement)) => (chunk, supplement),
Err(Some(entity)) => {
if let Some(in_game_stream) = in_game_streams.get_mut(entity) {
in_game_stream.send_unchecked(ServerGeneral::TerrainChunkUpdate {
in_game_stream.send_fallible(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Err(()),
});
@ -94,7 +94,7 @@ impl<'a> System<'a> for Sys {
.magnitude_squared();
if adjusted_dist_sqr <= view_distance.pow(2) {
in_game_stream.send_unchecked(ServerGeneral::TerrainChunkUpdate {
in_game_stream.send_fallible(ServerGeneral::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
});

View File

@ -51,7 +51,7 @@ impl<'a> System<'a> for Sys {
if let Ok(wp_old) = waypoints.insert(entity, Waypoint::new(player_pos.0, *time))
{
if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) {
general_stream.send_unchecked(ServerGeneral::Notification(
general_stream.send_fallible(ServerGeneral::Notification(
Notification::WaypointSaved,
));
}