various small fixes according to the MR

This commit is contained in:
Marcel Märtens 2020-10-12 00:14:04 +02:00
parent 00c66b5b9c
commit 55b59fbe07
7 changed files with 90 additions and 98 deletions

13
Cargo.lock generated
View File

@ -1393,9 +1393,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.5"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5"
checksum = "a7a4d35f7401e948629c9c3d6638fb9bf94e0b2121e96c3b428cc4e631f3eb74"
dependencies = [
"futures-core",
"futures-sink",
@ -1403,9 +1403,9 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.5"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
checksum = "d674eaa0056896d5ada519900dbf97ead2e46a7b6621e8160d79e2f2e1e2784b"
[[package]]
name = "futures-cpupool"
@ -1449,9 +1449,9 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.5"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc"
checksum = "0d8764258ed64ebc5d9ed185cf86a95db5cac810269c5d20ececb32e0088abbd"
[[package]]
name = "futures-task"
@ -4689,6 +4689,7 @@ dependencies = [
"diesel",
"diesel_migrations",
"dotenv",
"futures-channel",
"futures-executor",
"futures-timer",
"futures-util",

View File

@ -466,7 +466,7 @@ impl Client {
let msg: ClientMsg = msg.into();
#[cfg(debug_assertions)]
{
//There assertions veriy that the state is correct when a msg is send!
// These assertions verify that the state is correct when a message is sent!
match &msg {
ClientMsg::Type(_) | ClientMsg::Register(_) => assert!(
!self.registered,
@ -1514,19 +1514,14 @@ impl Client {
Ok(frontend_events)
}
/// Get the player's entity.
pub fn entity(&self) -> EcsEntity { self.entity }
/// Get the player's Uid.
pub fn uid(&self) -> Option<Uid> { self.state.read_component_copied(self.entity) }
pub fn get_client_type(&self) -> ClientType { ClientType::Game }
pub fn in_game(&self) -> Option<ClientIngame> { self.in_game }
pub fn get_in_game(&self) -> Option<ClientIngame> { self.in_game }
pub fn registered(&self) -> bool { self.registered }
pub fn get_registered(&self) -> bool { self.registered }
/// Get the current tick number.
pub fn get_tick(&self) -> u64 { self.tick }
pub fn get_ping_ms(&self) -> f64 { self.last_ping_delta * 1000.0 }

View File

@ -22,6 +22,7 @@ uvth = "3.1.1"
futures-util = "0.3"
futures-executor = "0.3"
futures-timer = "2.0"
futures-channel = "0.3"
itertools = "0.9"
lazy_static = "1.4.0"
scan_fmt = "0.2.4"

View File

@ -1,17 +1,11 @@
use crate::{Client, ClientType, ServerInfo};
use crossbeam::{bounded, unbounded, Receiver, Sender};
use futures_channel::oneshot;
use futures_executor::block_on;
use futures_timer::Delay;
use futures_util::{select, FutureExt};
use network::{Network, Participant, Promises};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::Duration,
};
use std::{sync::Arc, thread, time::Duration};
use tracing::{debug, error, trace, warn};
pub(crate) struct ServerInfoPacket {
@ -19,28 +13,25 @@ pub(crate) struct ServerInfoPacket {
pub time: f64,
}
pub(crate) type ConnectionDataPacket = Client;
pub(crate) struct ConnectionHandler {
_network: Arc<Network>,
thread_handle: Option<thread::JoinHandle<()>>,
pub client_receiver: Receiver<ConnectionDataPacket>,
pub client_receiver: Receiver<Client>,
pub info_requester_receiver: Receiver<Sender<ServerInfoPacket>>,
running: Arc<AtomicBool>,
stop_sender: Option<oneshot::Sender<()>>,
}
/// Instead of waiting the main loop we are handling connections, especially
/// their slow network .await part on a different thread. We need to communicate
/// to the Server main thread sometimes tough to get the current server_info and
/// time
/// to the Server main thread sometimes though to get the current server_info
/// and time
impl ConnectionHandler {
pub fn new(network: Network) -> Self {
let network = Arc::new(network);
let network_clone = Arc::clone(&network);
let running = Arc::new(AtomicBool::new(true));
let running_clone = Arc::clone(&running);
let (stop_sender, stop_receiver) = oneshot::channel();
let (client_sender, client_receiver) = unbounded::<ConnectionDataPacket>();
let (client_sender, client_receiver) = unbounded::<Client>();
let (info_requester_sender, info_requester_receiver) =
bounded::<Sender<ServerInfoPacket>>(1);
@ -49,7 +40,7 @@ impl ConnectionHandler {
network_clone,
client_sender,
info_requester_sender,
running_clone,
stop_receiver,
));
}));
@ -58,23 +49,23 @@ impl ConnectionHandler {
thread_handle,
client_receiver,
info_requester_receiver,
running,
stop_sender: Some(stop_sender),
}
}
async fn work(
network: Arc<Network>,
client_sender: Sender<ConnectionDataPacket>,
client_sender: Sender<Client>,
info_requester_sender: Sender<Sender<ServerInfoPacket>>,
running: Arc<AtomicBool>,
stop_receiver: oneshot::Receiver<()>,
) {
while running.load(Ordering::Relaxed) {
const TIMEOUT: Duration = Duration::from_secs(5);
let mut stop_receiver = stop_receiver.fuse();
loop {
let participant = match select!(
_ = Delay::new(TIMEOUT).fuse() => None,
_ = stop_receiver => None,
p = network.connected().fuse() => Some(p),
) {
None => continue, //check condition
None => break,
Some(Ok(p)) => p,
Some(Err(e)) => {
error!(
@ -88,16 +79,20 @@ impl ConnectionHandler {
let client_sender = client_sender.clone();
let info_requester_sender = info_requester_sender.clone();
match Self::init_participant(participant, client_sender, info_requester_sender).await {
Ok(_) => (),
Err(e) => warn!(?e, "drop new participant, because an error occurred"),
match select!(
_ = stop_receiver => None,
e = Self::init_participant(participant, client_sender, info_requester_sender).fuse() => Some(e),
) {
None => break,
Some(Ok(())) => (),
Some(Err(e)) => warn!(?e, "drop new participant, because an error occurred"),
}
}
}
async fn init_participant(
participant: Participant,
client_sender: Sender<ConnectionDataPacket>,
client_sender: Sender<Client>,
info_requester_sender: Sender<Sender<ServerInfoPacket>>,
) -> Result<(), Box<dyn std::error::Error>> {
debug!("New Participant connected to the server");
@ -151,7 +146,7 @@ impl ConnectionHandler {
impl Drop for ConnectionHandler {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
let _ = self.stop_sender.take().unwrap().send(());
trace!("blocking till ConnectionHandler is closed");
self.thread_handle
.take()

View File

@ -251,65 +251,60 @@ impl Sys {
) -> Result<(), crate::error::Error> {
match msg {
// Request spectator state
ClientCharacterScreen::Spectate => {
if client.registered {
client.in_game = Some(ClientIngame::Spectator)
} else {
debug!("dropped Spectate msg from unregistered client");
}
ClientCharacterScreen::Spectate if client.registered => {
client.in_game = Some(ClientIngame::Spectator)
},
ClientCharacterScreen::Character(character_id) => {
if client.registered && client.in_game.is_none() {
// Only send login message if it wasn't already
// sent previously
if let Some(player) = players.get(entity) {
// Send a request to load the character's component data from the
// DB. Once loaded, persisted components such as stats and inventory
// will be inserted for the entity
character_loader.load_character_data(
entity,
player.uuid().to_string(),
character_id,
ClientCharacterScreen::Spectate => {
debug!("dropped Spectate msg from unregistered client")
},
ClientCharacterScreen::Character(character_id)
if client.registered && client.in_game.is_none() =>
{
if let Some(player) = players.get(entity) {
// Send a request to load the character's component data from the
// DB. Once loaded, persisted components such as stats and inventory
// will be inserted for the entity
character_loader.load_character_data(
entity,
player.uuid().to_string(),
character_id,
);
// Start inserting non-persisted/default components for the entity
// while we load the DB data
server_emitter.emit(ServerEvent::InitCharacterData {
entity,
character_id,
});
// Give the player a welcome message
if !editable_settings.server_description.is_empty() {
client.send_msg(
ChatType::CommandInfo
.server_msg(String::from(&*editable_settings.server_description)),
);
}
// Start inserting non-persisted/default components for the entity
// while we load the DB data
server_emitter.emit(ServerEvent::InitCharacterData {
entity,
character_id,
});
if !client.login_msg_sent {
if let Some(player_uid) = uids.get(entity) {
new_chat_msgs.push((None, UnresolvedChatMsg {
chat_type: ChatType::Online(*player_uid),
message: "".to_string(),
}));
// Give the player a welcome message
if !editable_settings.server_description.is_empty() {
client.send_msg(
ChatType::CommandInfo.server_msg(String::from(
&*editable_settings.server_description,
)),
);
client.login_msg_sent = true;
}
// Only send login message if it wasn't already
// sent previously
if !client.login_msg_sent {
if let Some(player_uid) = uids.get(entity) {
new_chat_msgs.push((None, UnresolvedChatMsg {
chat_type: ChatType::Online(*player_uid),
message: "".to_string(),
}));
client.login_msg_sent = true;
}
}
} else {
client.send_msg(ServerCharacterScreen::CharacterDataLoadError(
String::from("Failed to fetch player entity"),
))
}
} else {
let registered = client.registered;
let in_game = client.in_game;
debug!(?registered, ?in_game, "dropped Character msg from client");
client.send_msg(ServerCharacterScreen::CharacterDataLoadError(String::from(
"Failed to fetch player entity",
)))
}
}
ClientCharacterScreen::Character(_) => {
let registered = client.registered;
let in_game = client.in_game;
debug!(?registered, ?in_game, "dropped Character msg from client");
},
ClientCharacterScreen::RequestCharacterList => {
if let Some(player) = players.get(entity) {
@ -461,6 +456,11 @@ impl Sys {
client.network_error,
);
loop {
/*
waiting for 1 of the 5 streams to return a massage asynchronous.
If so, handle that msg type. This code will be refactored soon
*/
let q1 = Client::internal_recv(&mut b1, &mut client.general_stream);
let q2 = Client::internal_recv(&mut b2, &mut client.in_game_stream);
let q3 = Client::internal_recv(&mut b3, &mut client.character_screen_stream);
@ -677,7 +677,7 @@ impl<'a> System<'a> for Sys {
)
});
// Postbox error
// Network error
if network_err.is_err() {
debug!(?entity, "postbox error with client, disconnecting");
player_metrics

View File

@ -63,7 +63,7 @@ impl PlayState for CharSelectionState {
span!(_guard, "tick", "<CharSelectionState as PlayState>::tick");
let (client_in_game, client_registered) = {
let client = self.client.borrow();
(client.get_in_game(), client.get_registered())
(client.in_game(), client.registered())
};
if client_in_game.is_none() && client_registered {
// Handle window events

View File

@ -213,7 +213,7 @@ impl PlayState for SessionState {
// TODO: can this be a method on the session or are there borrowcheck issues?
let (client_in_game, client_registered) = {
let client = self.client.borrow();
(client.get_in_game(), client.get_registered())
(client.in_game(), client.registered())
};
if client_in_game.is_some() {
// Update MyEntity