From 55b59fbe070520eae66d64bff393717c2a8f8354 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Mon, 12 Oct 2020 00:14:04 +0200 Subject: [PATCH] various small fixes according to the MR --- Cargo.lock | 13 ++-- client/src/lib.rs | 11 +-- server/Cargo.toml | 1 + server/src/connection_handler.rs | 55 ++++++------- server/src/sys/message.rs | 104 ++++++++++++------------- voxygen/src/menu/char_selection/mod.rs | 2 +- voxygen/src/session.rs | 2 +- 7 files changed, 90 insertions(+), 98 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 177e2ef47e..17330dfba1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1393,9 +1393,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" +checksum = "a7a4d35f7401e948629c9c3d6638fb9bf94e0b2121e96c3b428cc4e631f3eb74" dependencies = [ "futures-core", "futures-sink", @@ -1403,9 +1403,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" +checksum = "d674eaa0056896d5ada519900dbf97ead2e46a7b6621e8160d79e2f2e1e2784b" [[package]] name = "futures-cpupool" @@ -1449,9 +1449,9 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" +checksum = "0d8764258ed64ebc5d9ed185cf86a95db5cac810269c5d20ececb32e0088abbd" [[package]] name = "futures-task" @@ -4689,6 +4689,7 @@ dependencies = [ "diesel", "diesel_migrations", "dotenv", + "futures-channel", "futures-executor", "futures-timer", "futures-util", diff --git a/client/src/lib.rs b/client/src/lib.rs index 8efadb48a4..2f22d94242 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -466,7 +466,7 @@ impl Client { let msg: ClientMsg = msg.into(); #[cfg(debug_assertions)] { - //There assertions veriy that the state is correct when a msg is send! + // These assertions verify that the state is correct when a message is sent! match &msg { ClientMsg::Type(_) | ClientMsg::Register(_) => assert!( !self.registered, @@ -1514,19 +1514,14 @@ impl Client { Ok(frontend_events) } - /// Get the player's entity. pub fn entity(&self) -> EcsEntity { self.entity } - /// Get the player's Uid. pub fn uid(&self) -> Option { self.state.read_component_copied(self.entity) } - pub fn get_client_type(&self) -> ClientType { ClientType::Game } + pub fn in_game(&self) -> Option { self.in_game } - pub fn get_in_game(&self) -> Option { self.in_game } + pub fn registered(&self) -> bool { self.registered } - pub fn get_registered(&self) -> bool { self.registered } - - /// Get the current tick number. pub fn get_tick(&self) -> u64 { self.tick } pub fn get_ping_ms(&self) -> f64 { self.last_ping_delta * 1000.0 } diff --git a/server/Cargo.toml b/server/Cargo.toml index 7eb4747105..952973055c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -22,6 +22,7 @@ uvth = "3.1.1" futures-util = "0.3" futures-executor = "0.3" futures-timer = "2.0" +futures-channel = "0.3" itertools = "0.9" lazy_static = "1.4.0" scan_fmt = "0.2.4" diff --git a/server/src/connection_handler.rs b/server/src/connection_handler.rs index 6617a78214..2f79f1795d 100644 --- a/server/src/connection_handler.rs +++ b/server/src/connection_handler.rs @@ -1,17 +1,11 @@ use crate::{Client, ClientType, ServerInfo}; use crossbeam::{bounded, unbounded, Receiver, Sender}; +use futures_channel::oneshot; use futures_executor::block_on; use futures_timer::Delay; use futures_util::{select, FutureExt}; use network::{Network, Participant, Promises}; -use std::{ - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread, - time::Duration, -}; +use std::{sync::Arc, thread, time::Duration}; use tracing::{debug, error, trace, warn}; pub(crate) struct ServerInfoPacket { @@ -19,28 +13,25 @@ pub(crate) struct ServerInfoPacket { pub time: f64, } -pub(crate) type ConnectionDataPacket = Client; - pub(crate) struct ConnectionHandler { _network: Arc, thread_handle: Option>, - pub client_receiver: Receiver, + pub client_receiver: Receiver, pub info_requester_receiver: Receiver>, - running: Arc, + stop_sender: Option>, } /// Instead of waiting the main loop we are handling connections, especially /// their slow network .await part on a different thread. We need to communicate -/// to the Server main thread sometimes tough to get the current server_info and -/// time +/// to the Server main thread sometimes though to get the current server_info +/// and time impl ConnectionHandler { pub fn new(network: Network) -> Self { let network = Arc::new(network); let network_clone = Arc::clone(&network); - let running = Arc::new(AtomicBool::new(true)); - let running_clone = Arc::clone(&running); + let (stop_sender, stop_receiver) = oneshot::channel(); - let (client_sender, client_receiver) = unbounded::(); + let (client_sender, client_receiver) = unbounded::(); let (info_requester_sender, info_requester_receiver) = bounded::>(1); @@ -49,7 +40,7 @@ impl ConnectionHandler { network_clone, client_sender, info_requester_sender, - running_clone, + stop_receiver, )); })); @@ -58,23 +49,23 @@ impl ConnectionHandler { thread_handle, client_receiver, info_requester_receiver, - running, + stop_sender: Some(stop_sender), } } async fn work( network: Arc, - client_sender: Sender, + client_sender: Sender, info_requester_sender: Sender>, - running: Arc, + stop_receiver: oneshot::Receiver<()>, ) { - while running.load(Ordering::Relaxed) { - const TIMEOUT: Duration = Duration::from_secs(5); + let mut stop_receiver = stop_receiver.fuse(); + loop { let participant = match select!( - _ = Delay::new(TIMEOUT).fuse() => None, + _ = stop_receiver => None, p = network.connected().fuse() => Some(p), ) { - None => continue, //check condition + None => break, Some(Ok(p)) => p, Some(Err(e)) => { error!( @@ -88,16 +79,20 @@ impl ConnectionHandler { let client_sender = client_sender.clone(); let info_requester_sender = info_requester_sender.clone(); - match Self::init_participant(participant, client_sender, info_requester_sender).await { - Ok(_) => (), - Err(e) => warn!(?e, "drop new participant, because an error occurred"), + match select!( + _ = stop_receiver => None, + e = Self::init_participant(participant, client_sender, info_requester_sender).fuse() => Some(e), + ) { + None => break, + Some(Ok(())) => (), + Some(Err(e)) => warn!(?e, "drop new participant, because an error occurred"), } } } async fn init_participant( participant: Participant, - client_sender: Sender, + client_sender: Sender, info_requester_sender: Sender>, ) -> Result<(), Box> { debug!("New Participant connected to the server"); @@ -151,7 +146,7 @@ impl ConnectionHandler { impl Drop for ConnectionHandler { fn drop(&mut self) { - self.running.store(false, Ordering::Relaxed); + let _ = self.stop_sender.take().unwrap().send(()); trace!("blocking till ConnectionHandler is closed"); self.thread_handle .take() diff --git a/server/src/sys/message.rs b/server/src/sys/message.rs index 07badca7cc..16631c6234 100644 --- a/server/src/sys/message.rs +++ b/server/src/sys/message.rs @@ -251,65 +251,60 @@ impl Sys { ) -> Result<(), crate::error::Error> { match msg { // Request spectator state - ClientCharacterScreen::Spectate => { - if client.registered { - client.in_game = Some(ClientIngame::Spectator) - } else { - debug!("dropped Spectate msg from unregistered client"); - } + ClientCharacterScreen::Spectate if client.registered => { + client.in_game = Some(ClientIngame::Spectator) }, - ClientCharacterScreen::Character(character_id) => { - if client.registered && client.in_game.is_none() { - // Only send login message if it wasn't already - // sent previously - if let Some(player) = players.get(entity) { - // Send a request to load the character's component data from the - // DB. Once loaded, persisted components such as stats and inventory - // will be inserted for the entity - character_loader.load_character_data( - entity, - player.uuid().to_string(), - character_id, + ClientCharacterScreen::Spectate => { + debug!("dropped Spectate msg from unregistered client") + }, + ClientCharacterScreen::Character(character_id) + if client.registered && client.in_game.is_none() => + { + if let Some(player) = players.get(entity) { + // Send a request to load the character's component data from the + // DB. Once loaded, persisted components such as stats and inventory + // will be inserted for the entity + character_loader.load_character_data( + entity, + player.uuid().to_string(), + character_id, + ); + + // Start inserting non-persisted/default components for the entity + // while we load the DB data + server_emitter.emit(ServerEvent::InitCharacterData { + entity, + character_id, + }); + + // Give the player a welcome message + if !editable_settings.server_description.is_empty() { + client.send_msg( + ChatType::CommandInfo + .server_msg(String::from(&*editable_settings.server_description)), ); + } - // Start inserting non-persisted/default components for the entity - // while we load the DB data - server_emitter.emit(ServerEvent::InitCharacterData { - entity, - character_id, - }); + if !client.login_msg_sent { + if let Some(player_uid) = uids.get(entity) { + new_chat_msgs.push((None, UnresolvedChatMsg { + chat_type: ChatType::Online(*player_uid), + message: "".to_string(), + })); - // Give the player a welcome message - if !editable_settings.server_description.is_empty() { - client.send_msg( - ChatType::CommandInfo.server_msg(String::from( - &*editable_settings.server_description, - )), - ); + client.login_msg_sent = true; } - - // Only send login message if it wasn't already - // sent previously - if !client.login_msg_sent { - if let Some(player_uid) = uids.get(entity) { - new_chat_msgs.push((None, UnresolvedChatMsg { - chat_type: ChatType::Online(*player_uid), - message: "".to_string(), - })); - - client.login_msg_sent = true; - } - } - } else { - client.send_msg(ServerCharacterScreen::CharacterDataLoadError( - String::from("Failed to fetch player entity"), - )) } } else { - let registered = client.registered; - let in_game = client.in_game; - debug!(?registered, ?in_game, "dropped Character msg from client"); + client.send_msg(ServerCharacterScreen::CharacterDataLoadError(String::from( + "Failed to fetch player entity", + ))) } + } + ClientCharacterScreen::Character(_) => { + let registered = client.registered; + let in_game = client.in_game; + debug!(?registered, ?in_game, "dropped Character msg from client"); }, ClientCharacterScreen::RequestCharacterList => { if let Some(player) = players.get(entity) { @@ -461,6 +456,11 @@ impl Sys { client.network_error, ); loop { + /* + waiting for 1 of the 5 streams to return a massage asynchronous. + If so, handle that msg type. This code will be refactored soon + */ + let q1 = Client::internal_recv(&mut b1, &mut client.general_stream); let q2 = Client::internal_recv(&mut b2, &mut client.in_game_stream); let q3 = Client::internal_recv(&mut b3, &mut client.character_screen_stream); @@ -677,7 +677,7 @@ impl<'a> System<'a> for Sys { ) }); - // Postbox error + // Network error if network_err.is_err() { debug!(?entity, "postbox error with client, disconnecting"); player_metrics diff --git a/voxygen/src/menu/char_selection/mod.rs b/voxygen/src/menu/char_selection/mod.rs index 434c9c565f..fe80710b07 100644 --- a/voxygen/src/menu/char_selection/mod.rs +++ b/voxygen/src/menu/char_selection/mod.rs @@ -63,7 +63,7 @@ impl PlayState for CharSelectionState { span!(_guard, "tick", "::tick"); let (client_in_game, client_registered) = { let client = self.client.borrow(); - (client.get_in_game(), client.get_registered()) + (client.in_game(), client.registered()) }; if client_in_game.is_none() && client_registered { // Handle window events diff --git a/voxygen/src/session.rs b/voxygen/src/session.rs index f4538232a8..6162696608 100644 --- a/voxygen/src/session.rs +++ b/voxygen/src/session.rs @@ -213,7 +213,7 @@ impl PlayState for SessionState { // TODO: can this be a method on the session or are there borrowcheck issues? let (client_in_game, client_registered) = { let client = self.client.borrow(); - (client.get_in_game(), client.get_registered()) + (client.in_game(), client.registered()) }; if client_in_game.is_some() { // Update MyEntity