remove a Mutex and AtomicBool

This commit is contained in:
Marcel Märtens 2020-10-07 13:59:14 +02:00
parent ff374eab59
commit 00c66b5b9c
4 changed files with 64 additions and 76 deletions

View File

@ -1,17 +1,10 @@
use crate::error::Error; use crate::error::Error;
use common::msg::{ use common::msg::{ClientIngame, ClientType, ServerMsg};
ClientCharacterScreen, ClientGeneral, ClientInGame, ClientIngame, ClientType, PingMsg,
ServerMsg,
};
use hashbrown::HashSet; use hashbrown::HashSet;
use network::{Participant, Stream}; use network::{Participant, Stream};
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use specs::{Component, FlaggedStorage}; use specs::{Component, FlaggedStorage};
use specs_idvs::IdvStorage; use specs_idvs::IdvStorage;
use std::sync::{
atomic::{AtomicBool, Ordering},
Mutex,
};
use tracing::debug; use tracing::debug;
use vek::*; use vek::*;
@ -19,13 +12,13 @@ pub struct Client {
pub registered: bool, pub registered: bool,
pub client_type: ClientType, pub client_type: ClientType,
pub in_game: Option<ClientIngame>, pub in_game: Option<ClientIngame>,
pub participant: Mutex<Option<Participant>>, pub participant: Option<Participant>,
pub general_stream: Stream, pub general_stream: Stream,
pub ping_stream: Stream, pub ping_stream: Stream,
pub register_stream: Stream, pub register_stream: Stream,
pub character_screen_stream: Stream, pub character_screen_stream: Stream,
pub in_game_stream: Stream, pub in_game_stream: Stream,
pub network_error: AtomicBool, pub network_error: bool,
pub last_ping: f64, pub last_ping: f64,
pub login_msg_sent: bool, pub login_msg_sent: bool,
} }
@ -35,11 +28,11 @@ impl Component for Client {
} }
impl Client { impl Client {
fn internal_send<M: Serialize>(b: &AtomicBool, s: &mut Stream, msg: M) { fn internal_send<M: Serialize>(err: &mut bool, s: &mut Stream, msg: M) {
if !b.load(Ordering::Relaxed) { if !*err {
if let Err(e) = s.send(msg) { if let Err(e) = s.send(msg) {
debug!(?e, "got a network error with client"); debug!(?e, "got a network error with client");
b.store(true, Ordering::Relaxed); *err = true;
} }
} }
} }
@ -64,33 +57,35 @@ impl Client {
ServerMsg::Info(_) => panic!(ERR), ServerMsg::Info(_) => panic!(ERR),
ServerMsg::Init(_) => panic!(ERR), ServerMsg::Init(_) => panic!(ERR),
ServerMsg::RegisterAnswer(msg) => { ServerMsg::RegisterAnswer(msg) => {
Self::internal_send(&self.network_error, &mut self.register_stream, &msg) Self::internal_send(&mut self.network_error, &mut self.register_stream, &msg)
},
ServerMsg::CharacterScreen(msg) => {
Self::internal_send(&self.network_error, &mut self.character_screen_stream, &msg)
}, },
ServerMsg::CharacterScreen(msg) => Self::internal_send(
&mut self.network_error,
&mut self.character_screen_stream,
&msg,
),
ServerMsg::InGame(msg) => { ServerMsg::InGame(msg) => {
Self::internal_send(&self.network_error, &mut self.in_game_stream, &msg) Self::internal_send(&mut self.network_error, &mut self.in_game_stream, &msg)
}, },
ServerMsg::General(msg) => { ServerMsg::General(msg) => {
Self::internal_send(&self.network_error, &mut self.general_stream, &msg) Self::internal_send(&mut self.network_error, &mut self.general_stream, &msg)
}, },
ServerMsg::Ping(msg) => { ServerMsg::Ping(msg) => {
Self::internal_send(&self.network_error, &mut self.ping_stream, &msg) Self::internal_send(&mut self.network_error, &mut self.ping_stream, &msg)
}, },
}; };
} }
pub async fn internal_recv<M: DeserializeOwned>( pub async fn internal_recv<M: DeserializeOwned>(
b: &AtomicBool, err: &mut bool,
s: &mut Stream, s: &mut Stream,
) -> Result<M, Error> { ) -> Result<M, Error> {
if !b.load(Ordering::Relaxed) { if !*err {
match s.recv().await { match s.recv().await {
Ok(r) => Ok(r), Ok(r) => Ok(r),
Err(e) => { Err(e) => {
debug!(?e, "got a network error with client while recv"); debug!(?e, "got a network error with client while recv");
b.store(true, Ordering::Relaxed); *err = true;
Err(Error::StreamErr(e)) Err(Error::StreamErr(e))
}, },
} }
@ -98,22 +93,6 @@ impl Client {
Err(Error::StreamErr(network::StreamError::StreamClosed)) Err(Error::StreamErr(network::StreamError::StreamClosed))
} }
} }
pub async fn recv_msg(&mut self) -> Result<ClientGeneral, Error> {
Self::internal_recv(&self.network_error, &mut self.general_stream).await
}
pub async fn recv_in_game_msg(&mut self) -> Result<ClientInGame, Error> {
Self::internal_recv(&self.network_error, &mut self.in_game_stream).await
}
pub async fn recv_character_screen_msg(&mut self) -> Result<ClientCharacterScreen, Error> {
Self::internal_recv(&self.network_error, &mut self.character_screen_stream).await
}
pub async fn recv_ping_msg(&mut self) -> Result<PingMsg, Error> {
Self::internal_recv(&self.network_error, &mut self.ping_stream).await
}
} }
// Distance from fuzzy_chunk before snapping to current chunk // Distance from fuzzy_chunk before snapping to current chunk

View File

@ -133,13 +133,13 @@ impl ConnectionHandler {
registered: false, registered: false,
client_type, client_type,
in_game: None, in_game: None,
participant: std::sync::Mutex::new(Some(participant)), participant: Some(participant),
general_stream, general_stream,
ping_stream, ping_stream,
register_stream, register_stream,
in_game_stream, in_game_stream,
character_screen_stream, character_screen_stream,
network_error: std::sync::atomic::AtomicBool::new(false), network_error: false,
last_ping: server_data.time, last_ping: server_data.time,
login_msg_sent: false, login_msg_sent: false,
}; };

View File

@ -89,14 +89,13 @@ pub fn handle_exit_ingame(server: &mut Server, entity: EcsEntity) {
pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event { pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event {
span!(_guard, "handle_client_disconnect"); span!(_guard, "handle_client_disconnect");
if let Some(client) = server.state().read_storage::<Client>().get(entity) { if let Some(client) = server
let participant = match client.participant.try_lock() { .state()
Ok(mut p) => p.take().unwrap(), .ecs()
Err(e) => { .write_storage::<Client>()
error!(?e, ?entity, "couldn't lock participant for removal"); .get_mut(entity)
return Event::ClientDisconnected { entity }; {
}, let participant = client.participant.take().unwrap();
};
let pid = participant.remote_pid(); let pid = participant.remote_pid();
std::thread::spawn(move || { std::thread::spawn(move || {
let span = tracing::span!(tracing::Level::DEBUG, "client_disconnect", ?pid, ?entity); let span = tracing::span!(tracing::Level::DEBUG, "client_disconnect", ?pid, ?entity);

View File

@ -453,23 +453,30 @@ impl Sys {
editable_settings: &ReadExpect<'_, EditableSettings>, editable_settings: &ReadExpect<'_, EditableSettings>,
alias_validator: &ReadExpect<'_, AliasValidator>, alias_validator: &ReadExpect<'_, AliasValidator>,
) -> Result<(), crate::error::Error> { ) -> Result<(), crate::error::Error> {
let (mut b1, mut b2, mut b3, mut b4, mut b5) = (
client.network_error,
client.network_error,
client.network_error,
client.network_error,
client.network_error,
);
loop { loop {
let q1 = Client::internal_recv(&client.network_error, &mut client.general_stream); let q1 = Client::internal_recv(&mut b1, &mut client.general_stream);
let q2 = Client::internal_recv(&client.network_error, &mut client.in_game_stream); let q2 = Client::internal_recv(&mut b2, &mut client.in_game_stream);
let q3 = let q3 = Client::internal_recv(&mut b3, &mut client.character_screen_stream);
Client::internal_recv(&client.network_error, &mut client.character_screen_stream); let q4 = Client::internal_recv(&mut b4, &mut client.ping_stream);
let q4 = Client::internal_recv(&client.network_error, &mut client.ping_stream); let q5 = Client::internal_recv(&mut b5, &mut client.register_stream);
let q5 = Client::internal_recv(&client.network_error, &mut client.register_stream);
let (m1, m2, m3, m4, m5) = select!( let (m1, m2, m3, m4, m5) = select!(
msg = q1.fuse() => (Some(msg?), None, None, None, None), msg = q1.fuse() => (Some(msg), None, None, None, None),
msg = q2.fuse() => (None, Some(msg?), None, None, None), msg = q2.fuse() => (None, Some(msg), None, None, None),
msg = q3.fuse() => (None, None, Some(msg?), None, None), msg = q3.fuse() => (None, None, Some(msg), None, None),
msg = q4.fuse() => (None, None, None, Some(msg?), None), msg = q4.fuse() => (None, None, None, Some(msg), None),
msg = q5.fuse() => (None, None, None, None,Some(msg?)), msg = q5.fuse() => (None, None, None, None,Some(msg)),
); );
*cnt += 1; *cnt += 1;
if let Some(msg) = m1 { if let Some(msg) = m1 {
client.network_error |= b1;
Self::handle_client_msg( Self::handle_client_msg(
server_emitter, server_emitter,
new_chat_msgs, new_chat_msgs,
@ -478,10 +485,11 @@ impl Sys {
player_metrics, player_metrics,
uids, uids,
chat_modes, chat_modes,
msg, msg?,
)?; )?;
} }
if let Some(msg) = m2 { if let Some(msg) = m2 {
client.network_error |= b2;
Self::handle_client_in_game_msg( Self::handle_client_in_game_msg(
server_emitter, server_emitter,
entity, entity,
@ -498,10 +506,11 @@ impl Sys {
players, players,
controllers, controllers,
settings, settings,
msg, msg?,
)?; )?;
} }
if let Some(msg) = m3 { if let Some(msg) = m3 {
client.network_error |= b3;
Self::handle_client_character_screen_msg( Self::handle_client_character_screen_msg(
server_emitter, server_emitter,
new_chat_msgs, new_chat_msgs,
@ -512,13 +521,15 @@ impl Sys {
players, players,
editable_settings, editable_settings,
alias_validator, alias_validator,
msg, msg?,
)?; )?;
} }
if let Some(msg) = m4 { if let Some(msg) = m4 {
Self::handle_ping_msg(client, msg)?; client.network_error |= b4;
Self::handle_ping_msg(client, msg?)?;
} }
if let Some(msg) = m5 { if let Some(msg) = m5 {
client.network_error |= b5;
Self::handle_register_msg( Self::handle_register_msg(
player_list, player_list,
new_players, new_players,
@ -529,7 +540,7 @@ impl Sys {
admins, admins,
players, players,
editable_settings, editable_settings,
msg, msg?,
)?; )?;
} }
} }
@ -666,8 +677,16 @@ impl<'a> System<'a> for Sys {
) )
}); });
// Postbox error
if network_err.is_err() {
debug!(?entity, "postbox error with client, disconnecting");
player_metrics
.clients_disconnected
.with_label_values(&["network_error"])
.inc();
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
} else if cnt > 0 {
// Update client ping. // Update client ping.
if cnt > 0 {
client.last_ping = time.0 client.last_ping = time.0
} else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 } else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64
// Timeout // Timeout
@ -678,15 +697,6 @@ impl<'a> System<'a> for Sys {
.with_label_values(&["timeout"]) .with_label_values(&["timeout"])
.inc(); .inc();
server_emitter.emit(ServerEvent::ClientDisconnect(entity)); server_emitter.emit(ServerEvent::ClientDisconnect(entity));
} else if network_err.is_err()
// Postbox error
{
debug!(?entity, "postbox error with client, disconnecting");
player_metrics
.clients_disconnected
.with_label_values(&["network_error"])
.inc();
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
} else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 * 0.5 { } else if time.0 - client.last_ping > settings.client_timeout.as_secs() as f64 * 0.5 {
// Try pinging the client if the timeout is nearing. // Try pinging the client if the timeout is nearing.
client.send_msg(PingMsg::Ping); client.send_msg(PingMsg::Ping);