Merge branch 'xMAC94x/network-switch' into 'master'

xmac94x/network switch

See merge request veloren/veloren!1139
This commit is contained in:
Marcel 2020-07-03 17:13:58 +00:00
commit 1f05446ce6
18 changed files with 923 additions and 1841 deletions

16
Cargo.lock generated
View File

@ -163,7 +163,7 @@ dependencies = [
"crossbeam-utils 0.7.2",
"futures-core",
"futures-io",
"futures-timer",
"futures-timer 2.0.2",
"kv-log-macro",
"log",
"memchr",
@ -1370,6 +1370,12 @@ version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1de7508b218029b0f01662ed8f61b1c964b3ae99d6f25462d0f55a595109df6"
[[package]]
name = "futures-timer"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.5"
@ -4448,6 +4454,9 @@ version = "0.6.0"
dependencies = [
"authc",
"byteorder 1.3.4",
"futures-executor",
"futures-timer 3.0.2",
"futures-util",
"hashbrown",
"image",
"num_cpus",
@ -4456,6 +4465,7 @@ dependencies = [
"uvth 3.1.1",
"vek",
"veloren-common",
"veloren_network",
]
[[package]]
@ -4499,6 +4509,9 @@ dependencies = [
"diesel",
"diesel_migrations",
"dotenv",
"futures-executor",
"futures-timer 3.0.2",
"futures-util",
"hashbrown",
"lazy_static",
"libsqlite3-sys",
@ -4519,6 +4532,7 @@ dependencies = [
"vek",
"veloren-common",
"veloren-world",
"veloren_network",
]
[[package]]

View File

@ -6,9 +6,13 @@ edition = "2018"
[dependencies]
common = { package = "veloren-common", path = "../common", features = ["no-assets"] }
network = { package = "veloren_network", path = "../network", default-features = false }
byteorder = "1.3.2"
uvth = "3.1.1"
futures-util = "0.3"
futures-executor = "0.3"
futures-timer = "3.0"
image = { version = "0.22.3", default-features = false, features = ["png"] }
num_cpus = "1.10.1"
tracing = { version = "0.1", default-features = false }

View File

@ -1,9 +1,12 @@
use authc::AuthClientError;
use common::net::PostError;
pub use network::NetworkError;
use network::{ParticipantError, StreamError};
#[derive(Debug)]
pub enum Error {
Network(PostError),
NetworkErr(NetworkError),
ParticipantErr(ParticipantError),
StreamErr(StreamError),
ServerWentMad,
ServerTimeout,
ServerShutdown,
@ -19,8 +22,16 @@ pub enum Error {
Other(String),
}
impl From<PostError> for Error {
fn from(err: PostError) -> Self { Self::Network(err) }
impl From<NetworkError> for Error {
fn from(err: NetworkError) -> Self { Self::NetworkErr(err) }
}
impl From<ParticipantError> for Error {
fn from(err: ParticipantError) -> Self { Self::ParticipantErr(err) }
}
impl From<StreamError> for Error {
fn from(err: StreamError) -> Self { Self::StreamErr(err) }
}
impl From<AuthClientError> for Error {

View File

@ -25,14 +25,17 @@ use common::{
PlayerInfo, PlayerListUpdate, RegisterError, RequestStateError, ServerInfo, ServerMsg,
MAX_BYTES_CHAT_MSG,
},
net::PostBox,
state::State,
sync::{Uid, UidAllocator, WorldSyncExt},
terrain::{block::Block, TerrainChunk, TerrainChunkSize},
vol::RectVolSize,
};
use futures_executor::block_on;
use futures_timer::Delay;
use futures_util::{select, FutureExt};
use hashbrown::HashMap;
use image::DynamicImage;
use network::{Address, Network, Participant, Pid, Stream, PROMISES_CONSISTENCY, PROMISES_ORDERED};
use std::{
net::SocketAddr,
sync::Arc,
@ -69,7 +72,9 @@ pub struct Client {
pub character_list: CharacterList,
pub active_character_id: Option<i32>,
postbox: PostBox<ClientMsg, ServerMsg>,
_network: Network,
_participant: Arc<Participant>,
singleton_stream: Stream,
last_server_ping: f64,
last_server_pong: f64,
@ -99,64 +104,82 @@ impl Client {
/// Create a new `Client`.
pub fn new<A: Into<SocketAddr>>(addr: A, view_distance: Option<u32>) -> Result<Self, Error> {
let client_state = ClientState::Connected;
let mut postbox = PostBox::to(addr)?;
let mut thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".into())
.build();
// We reduce the thread count by 1 to keep rendering smooth
thread_pool.set_num_threads((num_cpus::get() - 1).max(1));
let (network, f) = Network::new(Pid::new(), None);
thread_pool.execute(f);
let participant = block_on(network.connect(Address::Tcp(addr.into())))?;
let mut stream = block_on(participant.open(10, PROMISES_ORDERED | PROMISES_CONSISTENCY))?;
// Wait for initial sync
let (state, entity, server_info, world_map) = match postbox.next_message()? {
ServerMsg::InitialSync {
entity_package,
server_info,
time_of_day,
world_map: (map_size, world_map),
} => {
// TODO: Display that versions don't match in Voxygen
if &server_info.git_hash != *common::util::GIT_HASH {
warn!(
"Server is running {}[{}], you are running {}[{}], versions might be \
incompatible!",
server_info.git_hash,
server_info.git_date,
common::util::GIT_HASH.to_string(),
common::util::GIT_DATE.to_string(),
);
let (state, entity, server_info, world_map) = block_on(async {
loop {
match stream.recv().await? {
ServerMsg::InitialSync {
entity_package,
server_info,
time_of_day,
world_map: (map_size, world_map),
} => {
// TODO: Display that versions don't match in Voxygen
if &server_info.git_hash != *common::util::GIT_HASH {
warn!(
"Server is running {}[{}], you are running {}[{}], versions might \
be incompatible!",
server_info.git_hash,
server_info.git_date,
common::util::GIT_HASH.to_string(),
common::util::GIT_DATE.to_string(),
);
}
debug!("Auth Server: {:?}", server_info.auth_provider);
// Initialize `State`
let mut state = State::default();
// Client-only components
state
.ecs_mut()
.register::<comp::Last<comp::CharacterState>>();
let entity = state.ecs_mut().apply_entity_package(entity_package);
*state.ecs_mut().write_resource() = time_of_day;
assert_eq!(world_map.len(), (map_size.x * map_size.y) as usize);
let mut world_map_raw =
vec![0u8; 4 * world_map.len()/*map_size.x * map_size.y*/];
LittleEndian::write_u32_into(&world_map, &mut world_map_raw);
debug!("Preparing image...");
let world_map = Arc::new(
image::DynamicImage::ImageRgba8({
// Should not fail if the dimensions are correct.
let world_map =
image::ImageBuffer::from_raw(map_size.x, map_size.y, world_map_raw);
world_map.ok_or_else(|| Error::Other("Server sent a bad world map image".into()))?
})
// Flip the image, since Voxygen uses an orientation where rotation from
// positive x axis to positive y axis is counterclockwise around the z axis.
.flipv(),
);
debug!("Done preparing image...");
break Ok((state, entity, server_info, (world_map, map_size)));
},
ServerMsg::TooManyPlayers => break Err(Error::TooManyPlayers),
err => {
warn!("whoops, server mad {:?}, ignoring", err);
},
}
}
})?;
debug!("Auth Server: {:?}", server_info.auth_provider);
// Initialize `State`
let mut state = State::default();
// Client-only components
state
.ecs_mut()
.register::<comp::Last<comp::CharacterState>>();
let entity = state.ecs_mut().apply_entity_package(entity_package);
*state.ecs_mut().write_resource() = time_of_day;
assert_eq!(world_map.len(), (map_size.x * map_size.y) as usize);
let mut world_map_raw = vec![0u8; 4 * world_map.len()/*map_size.x * map_size.y*/];
LittleEndian::write_u32_into(&world_map, &mut world_map_raw);
debug!("Preparing image...");
let world_map = Arc::new(
image::DynamicImage::ImageRgba8({
// Should not fail if the dimensions are correct.
let world_map =
image::ImageBuffer::from_raw(map_size.x, map_size.y, world_map_raw);
world_map.ok_or_else(|| Error::Other("Server sent a bad world map image".into()))?
})
// Flip the image, since Voxygen uses an orientation where rotation from
// positive x axis to positive y axis is counterclockwise around the z axis.
.flipv(),
);
debug!("Done preparing image...");
(state, entity, server_info, (world_map, map_size))
},
ServerMsg::TooManyPlayers => return Err(Error::TooManyPlayers),
_ => return Err(Error::ServerWentMad),
};
postbox.send_message(ClientMsg::Ping);
stream.send(ClientMsg::Ping)?;
let mut thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".into())
@ -173,7 +196,9 @@ impl Client {
character_list: CharacterList::default(),
active_character_id: None,
postbox,
_network: network,
_participant: participant,
singleton_stream: stream,
last_server_ping: 0.0,
last_server_pong: 0.0,
@ -213,33 +238,45 @@ impl Client {
}
).unwrap_or(Ok(username))?;
self.postbox.send_message(ClientMsg::Register {
self.singleton_stream.send(ClientMsg::Register {
view_distance: self.view_distance,
token_or_username,
});
})?;
self.client_state = ClientState::Pending;
loop {
match self.postbox.next_message()? {
ServerMsg::StateAnswer(Err((RequestStateError::RegisterDenied(err), state))) => {
self.client_state = state;
break Err(match err {
RegisterError::AlreadyLoggedIn => Error::AlreadyLoggedIn,
RegisterError::AuthError(err) => Error::AuthErr(err),
RegisterError::InvalidCharacter => Error::InvalidCharacter,
RegisterError::NotOnWhitelist => Error::NotOnWhitelist,
});
},
ServerMsg::StateAnswer(Ok(ClientState::Registered)) => break Ok(()),
_ => {},
block_on(async {
loop {
match self.singleton_stream.recv().await? {
ServerMsg::StateAnswer(Err((
RequestStateError::RegisterDenied(err),
state,
))) => {
self.client_state = state;
break Err(match err {
RegisterError::AlreadyLoggedIn => Error::AlreadyLoggedIn,
RegisterError::AuthError(err) => Error::AuthErr(err),
RegisterError::InvalidCharacter => Error::InvalidCharacter,
RegisterError::NotOnWhitelist => Error::NotOnWhitelist,
});
},
ServerMsg::StateAnswer(Ok(ClientState::Registered)) => break Ok(()),
ignore => {
warn!(
"Ignoring what the server send till registered: {:? }",
ignore
);
//return Err(Error::ServerWentMad)
},
}
}
}
})
}
/// Request a state transition to `ClientState::Character`.
pub fn request_character(&mut self, character_id: i32) {
self.postbox
.send_message(ClientMsg::Character(character_id));
self.singleton_stream
.send(ClientMsg::Character(character_id))
.unwrap();
self.active_character_id = Some(character_id);
self.client_state = ClientState::Pending;
@ -248,73 +285,90 @@ impl Client {
/// Load the current players character list
pub fn load_character_list(&mut self) {
self.character_list.loading = true;
self.postbox.send_message(ClientMsg::RequestCharacterList);
self.singleton_stream
.send(ClientMsg::RequestCharacterList)
.unwrap();
}
/// New character creation
pub fn create_character(&mut self, alias: String, tool: Option<String>, body: comp::Body) {
self.character_list.loading = true;
self.postbox
.send_message(ClientMsg::CreateCharacter { alias, tool, body });
self.singleton_stream
.send(ClientMsg::CreateCharacter { alias, tool, body })
.unwrap();
}
/// Character deletion
pub fn delete_character(&mut self, character_id: i32) {
self.character_list.loading = true;
self.postbox
.send_message(ClientMsg::DeleteCharacter(character_id));
self.singleton_stream
.send(ClientMsg::DeleteCharacter(character_id))
.unwrap();
}
/// Send disconnect message to the server
pub fn request_logout(&mut self) { self.postbox.send_message(ClientMsg::Disconnect); }
pub fn request_logout(&mut self) {
if let Err(e) = self.singleton_stream.send(ClientMsg::Disconnect) {
error!(
?e,
"couldn't send disconnect package to server, did server close already?"
);
}
}
/// Request a state transition to `ClientState::Registered` from an ingame
/// state.
pub fn request_remove_character(&mut self) {
self.postbox.send_message(ClientMsg::ExitIngame);
self.singleton_stream.send(ClientMsg::ExitIngame).unwrap();
self.client_state = ClientState::Pending;
}
pub fn set_view_distance(&mut self, view_distance: u32) {
self.view_distance = Some(view_distance.max(1).min(65));
self.postbox
.send_message(ClientMsg::SetViewDistance(self.view_distance.unwrap()));
self.singleton_stream
.send(ClientMsg::SetViewDistance(self.view_distance.unwrap()))
.unwrap();
// Can't fail
}
pub fn use_slot(&mut self, slot: comp::slot::Slot) {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
InventoryManip::Use(slot),
)));
)))
.unwrap();
}
pub fn swap_slots(&mut self, a: comp::slot::Slot, b: comp::slot::Slot) {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
InventoryManip::Swap(a, b),
)));
)))
.unwrap();
}
pub fn drop_slot(&mut self, slot: comp::slot::Slot) {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
InventoryManip::Drop(slot),
)));
)))
.unwrap();
}
pub fn pick_up(&mut self, entity: EcsEntity) {
if let Some(uid) = self.state.ecs().read_storage::<Uid>().get(entity).copied() {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
InventoryManip::Pickup(uid),
)));
)))
.unwrap();
}
}
pub fn toggle_lantern(&mut self) {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::ToggleLantern));
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::ToggleLantern))
.unwrap();
}
pub fn is_mounted(&self) -> bool {
@ -327,14 +381,16 @@ impl Client {
pub fn mount(&mut self, entity: EcsEntity) {
if let Some(uid) = self.state.ecs().read_storage::<Uid>().get(entity).copied() {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::Mount(uid)));
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::Mount(uid)))
.unwrap();
}
}
pub fn unmount(&mut self) {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::Unmount));
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::Unmount))
.unwrap();
}
pub fn respawn(&mut self) {
@ -345,8 +401,9 @@ impl Client {
.get(self.entity)
.map_or(false, |s| s.is_dead)
{
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::Respawn));
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::Respawn))
.unwrap();
}
}
@ -428,8 +485,9 @@ impl Client {
{
controller.actions.push(control_action);
}
self.postbox
.send_message(ClientMsg::ControlAction(control_action));
self.singleton_stream
.send(ClientMsg::ControlAction(control_action))
.unwrap();
}
pub fn view_distance(&self) -> Option<u32> { self.view_distance }
@ -458,7 +516,10 @@ impl Client {
/// Send a chat message to the server.
pub fn send_chat(&mut self, message: String) {
match validate_chat_msg(&message) {
Ok(()) => self.postbox.send_message(ClientMsg::ChatMsg(message)),
Ok(()) => self
.singleton_stream
.send(ClientMsg::ChatMsg(message))
.unwrap(),
Err(ChatMsgValidationError::TooLong) => tracing::warn!(
"Attempted to send a message that's too long (Over {} bytes)",
MAX_BYTES_CHAT_MSG
@ -473,18 +534,23 @@ impl Client {
}
pub fn place_block(&mut self, pos: Vec3<i32>, block: Block) {
self.postbox.send_message(ClientMsg::PlaceBlock(pos, block));
self.singleton_stream
.send(ClientMsg::PlaceBlock(pos, block))
.unwrap();
}
pub fn remove_block(&mut self, pos: Vec3<i32>) {
self.postbox.send_message(ClientMsg::BreakBlock(pos));
self.singleton_stream
.send(ClientMsg::BreakBlock(pos))
.unwrap();
}
pub fn collect_block(&mut self, pos: Vec3<i32>) {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
InventoryManip::Collect(pos),
)));
)))
.unwrap();
}
/// Execute a single client tick, handle input and update the game state by
@ -539,8 +605,9 @@ impl Client {
"Couldn't access controller component on client entity"
);
}
self.postbox
.send_message(ClientMsg::ControllerInputs(inputs));
self.singleton_stream
.send(ClientMsg::ControllerInputs(inputs))
.unwrap();
}
// 2) Build up a list of events for this frame, to be passed to the frontend.
@ -637,8 +704,8 @@ impl Client {
if self.state.terrain().get_key(*key).is_none() {
if !skip_mode && !self.pending_chunks.contains_key(key) {
if self.pending_chunks.len() < 4 {
self.postbox
.send_message(ClientMsg::TerrainChunkRequest { key: *key });
self.singleton_stream
.send(ClientMsg::TerrainChunkRequest { key: *key })?;
self.pending_chunks.insert(*key, Instant::now());
} else {
skip_mode = true;
@ -670,7 +737,7 @@ impl Client {
// Send a ping to the server once every second
if self.state.get_time() - self.last_server_ping > 1. {
self.postbox.send_message(ClientMsg::Ping);
self.singleton_stream.send(ClientMsg::Ping)?;
self.last_server_ping = self.state.get_time();
}
@ -681,8 +748,8 @@ impl Client {
self.state.read_storage().get(self.entity).cloned(),
self.state.read_storage().get(self.entity).cloned(),
) {
self.postbox
.send_message(ClientMsg::PlayerPhysics { pos, vel, ori });
self.singleton_stream
.send(ClientMsg::PlayerPhysics { pos, vel, ori })?;
}
}
@ -709,8 +776,216 @@ impl Client {
self.state.cleanup();
}
/// Handle new server messages.
async fn handle_message(
&mut self,
frontend_events: &mut Vec<Event>,
cnt: &mut u64,
) -> Result<(), Error> {
loop {
let msg = self.singleton_stream.recv().await?;
*cnt += 1;
match msg {
ServerMsg::TooManyPlayers => {
return Err(Error::ServerWentMad);
},
ServerMsg::Shutdown => return Err(Error::ServerShutdown),
ServerMsg::InitialSync { .. } => return Err(Error::ServerWentMad),
ServerMsg::PlayerListUpdate(PlayerListUpdate::Init(list)) => {
self.player_list = list
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Add(uid, player_info)) => {
if let Some(old_player_info) = self.player_list.insert(uid, player_info.clone())
{
warn!(
"Received msg to insert {} with uid {} into the player list but there \
was already an entry for {} with the same uid that was overwritten!",
player_info.player_alias, uid, old_player_info.player_alias
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Admin(uid, admin)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.is_admin = admin;
} else {
warn!(
"Received msg to update admin status of uid {}, but they were not in \
the list.",
uid
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::SelectedCharacter(
uid,
char_info,
)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.character = Some(char_info);
} else {
warn!(
"Received msg to update character info for uid {}, but they were not \
in the list.",
uid
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::LevelChange(uid, next_level)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.character = match &player_info.character {
Some(character) => Some(common::msg::CharacterInfo {
name: character.name.to_string(),
level: next_level,
}),
None => {
warn!(
"Received msg to update character level info to {} for uid \
{}, but this player's character is None.",
next_level, uid
);
None
},
};
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Remove(uid)) => {
// Instead of removing players, mark them as offline because we need to
// remember the names of disconnected players in chat.
//
// TODO the server should re-use uids of players that log out and log back
// in.
if let Some(player_info) = self.player_list.get_mut(&uid) {
if player_info.is_online {
player_info.is_online = false;
} else {
warn!(
"Received msg to remove uid {} from the player list by they were \
already marked offline",
uid
);
}
} else {
warn!(
"Received msg to remove uid {} from the player list by they weren't \
in the list!",
uid
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Alias(uid, new_name)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.player_alias = new_name;
} else {
warn!(
"Received msg to alias player with uid {} to {} but this uid is not \
in the player list",
uid, new_name
);
}
},
ServerMsg::Ping => {
self.singleton_stream.send(ClientMsg::Pong)?;
},
ServerMsg::Pong => {
self.last_server_pong = self.state.get_time();
self.last_ping_delta = (self.state.get_time() - self.last_server_ping).round();
},
ServerMsg::ChatMsg(m) => frontend_events.push(Event::Chat(m)),
ServerMsg::SetPlayerEntity(uid) => {
if let Some(entity) = self.state.ecs().entity_from_uid(uid.0) {
self.entity = entity;
} else {
return Err(Error::Other("Failed to find entity from uid.".to_owned()));
}
},
ServerMsg::TimeOfDay(time_of_day) => {
*self.state.ecs_mut().write_resource() = time_of_day;
},
ServerMsg::EntitySync(entity_sync_package) => {
self.state
.ecs_mut()
.apply_entity_sync_package(entity_sync_package);
},
ServerMsg::CompSync(comp_sync_package) => {
self.state
.ecs_mut()
.apply_comp_sync_package(comp_sync_package);
},
ServerMsg::CreateEntity(entity_package) => {
self.state.ecs_mut().apply_entity_package(entity_package);
},
ServerMsg::DeleteEntity(entity) => {
if self.state.read_component_cloned::<Uid>(self.entity) != Some(entity) {
self.state
.ecs_mut()
.delete_entity_and_clear_from_uid_allocator(entity.0);
}
},
// Cleanup for when the client goes back to the `Registered` state
ServerMsg::ExitIngameCleanup => {
self.clean_state();
},
ServerMsg::InventoryUpdate(inventory, event) => {
match event {
InventoryUpdateEvent::CollectFailed => {},
_ => {
// Push the updated inventory component to the client
self.state.write_component(self.entity, inventory);
},
}
frontend_events.push(Event::InventoryUpdated(event));
},
ServerMsg::TerrainChunkUpdate { key, chunk } => {
if let Ok(chunk) = chunk {
self.state.insert_chunk(key, *chunk);
}
self.pending_chunks.remove(&key);
},
ServerMsg::TerrainBlockUpdates(mut blocks) => {
blocks.drain().for_each(|(pos, block)| {
self.state.set_block(pos, block);
});
},
ServerMsg::StateAnswer(Ok(state)) => {
self.client_state = state;
},
ServerMsg::StateAnswer(Err((error, state))) => {
warn!(
"StateAnswer: {:?}. Server thinks client is in state {:?}.",
error, state
);
},
ServerMsg::Disconnect => {
frontend_events.push(Event::Disconnect);
self.singleton_stream.send(ClientMsg::Terminate)?;
},
ServerMsg::CharacterListUpdate(character_list) => {
self.character_list.characters = character_list;
self.character_list.loading = false;
},
ServerMsg::CharacterActionError(error) => {
warn!("CharacterActionError: {:?}.", error);
self.character_list.error = Some(error);
},
ServerMsg::Notification(n) => {
frontend_events.push(Event::Notification(n));
},
ServerMsg::CharacterDataLoadError(error) => {
self.clean_state();
self.character_list.error = Some(error);
},
ServerMsg::SetViewDistance(vd) => {
self.view_distance = Some(vd);
frontend_events.push(Event::SetViewDistance(vd));
},
}
}
}
/// Handle new server messages.
fn handle_new_messages(&mut self) -> Result<Vec<Event>, Error> {
let mut frontend_events = Vec::new();
@ -730,216 +1005,20 @@ impl Client {
}
}
let new_msgs = self.postbox.new_messages();
let mut handles_msg = 0;
if new_msgs.len() > 0 {
for msg in new_msgs {
match msg {
ServerMsg::TooManyPlayers => {
return Err(Error::ServerWentMad);
},
ServerMsg::Shutdown => return Err(Error::ServerShutdown),
ServerMsg::InitialSync { .. } => return Err(Error::ServerWentMad),
ServerMsg::PlayerListUpdate(PlayerListUpdate::Init(list)) => {
self.player_list = list
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Add(uid, player_info)) => {
if let Some(old_player_info) =
self.player_list.insert(uid, player_info.clone())
{
warn!(
"Received msg to insert {} with uid {} into the player list but \
there was already an entry for {} with the same uid that was \
overwritten!",
player_info.player_alias, uid, old_player_info.player_alias
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Admin(uid, admin)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.is_admin = admin;
} else {
warn!(
"Received msg to update admin status of uid {}, but they were not \
in the list.",
uid
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::SelectedCharacter(
uid,
char_info,
)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.character = Some(char_info);
} else {
warn!(
"Received msg to update character info for uid {}, but they were \
not in the list.",
uid
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::LevelChange(uid, next_level)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.character = match &player_info.character {
Some(character) => Some(common::msg::CharacterInfo {
name: character.name.to_string(),
level: next_level,
}),
None => {
warn!(
"Received msg to update character level info to {} for \
uid {}, but this player's character is None.",
next_level, uid
);
block_on(async {
//TIMEOUT 0.01 ms for msg handling
select!(
_ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()),
err = self.handle_message(&mut frontend_events, &mut handles_msg).fuse() => err,
)
})?;
None
},
};
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Remove(uid)) => {
// Instead of removing players, mark them as offline because we need to
// remember the names of disconnected players in chat.
//
// TODO the server should re-use uids of players that log out and log back
// in.
if let Some(player_info) = self.player_list.get_mut(&uid) {
if player_info.is_online {
player_info.is_online = false;
} else {
warn!(
"Received msg to remove uid {} from the player list by they \
were already marked offline",
uid
);
}
} else {
warn!(
"Received msg to remove uid {} from the player list by they \
weren't in the list!",
uid
);
}
},
ServerMsg::PlayerListUpdate(PlayerListUpdate::Alias(uid, new_name)) => {
if let Some(player_info) = self.player_list.get_mut(&uid) {
player_info.player_alias = new_name;
} else {
warn!(
"Received msg to alias player with uid {} to {} but this uid is \
not in the player list",
uid, new_name
);
}
},
ServerMsg::Ping => self.postbox.send_message(ClientMsg::Pong),
ServerMsg::Pong => {
self.last_server_pong = self.state.get_time();
self.last_ping_delta =
(self.state.get_time() - self.last_server_ping).round();
},
ServerMsg::ChatMsg(m) => frontend_events.push(Event::Chat(m)),
ServerMsg::SetPlayerEntity(uid) => {
if let Some(entity) = self.state.ecs().entity_from_uid(uid.0) {
self.entity = entity;
} else {
return Err(Error::Other("Failed to find entity from uid.".to_owned()));
}
},
ServerMsg::TimeOfDay(time_of_day) => {
*self.state.ecs_mut().write_resource() = time_of_day;
},
ServerMsg::EntitySync(entity_sync_package) => {
self.state
.ecs_mut()
.apply_entity_sync_package(entity_sync_package);
},
ServerMsg::CompSync(comp_sync_package) => {
self.state
.ecs_mut()
.apply_comp_sync_package(comp_sync_package);
},
ServerMsg::CreateEntity(entity_package) => {
self.state.ecs_mut().apply_entity_package(entity_package);
},
ServerMsg::DeleteEntity(entity) => {
if self.state.read_component_cloned::<Uid>(self.entity) != Some(entity) {
self.state
.ecs_mut()
.delete_entity_and_clear_from_uid_allocator(entity.0);
}
},
// Cleanup for when the client goes back to the `Registered` state
ServerMsg::ExitIngameCleanup => {
self.clean_state();
},
ServerMsg::InventoryUpdate(inventory, event) => {
match event {
InventoryUpdateEvent::CollectFailed => {},
_ => {
// Push the updated inventory component to the client
self.state.write_component(self.entity, inventory);
},
}
frontend_events.push(Event::InventoryUpdated(event));
},
ServerMsg::TerrainChunkUpdate { key, chunk } => {
if let Ok(chunk) = chunk {
self.state.insert_chunk(key, *chunk);
}
self.pending_chunks.remove(&key);
},
ServerMsg::TerrainBlockUpdates(mut blocks) => {
blocks.drain().for_each(|(pos, block)| {
self.state.set_block(pos, block);
});
},
ServerMsg::StateAnswer(Ok(state)) => {
self.client_state = state;
},
ServerMsg::StateAnswer(Err((error, state))) => {
warn!(
"StateAnswer: {:?}. Server thinks client is in state {:?}.",
error, state
);
},
ServerMsg::Disconnect => {
frontend_events.push(Event::Disconnect);
self.postbox.send_message(ClientMsg::Terminate);
},
ServerMsg::CharacterListUpdate(character_list) => {
self.character_list.characters = character_list;
self.character_list.loading = false;
},
ServerMsg::CharacterActionError(error) => {
warn!("CharacterActionError: {:?}.", error);
self.character_list.error = Some(error);
},
ServerMsg::Notification(n) => {
frontend_events.push(Event::Notification(n));
},
ServerMsg::CharacterDataLoadError(error) => {
self.clean_state();
self.character_list.error = Some(error);
},
ServerMsg::SetViewDistance(vd) => {
self.view_distance = Some(vd);
frontend_events.push(Event::SetViewDistance(vd));
},
}
}
} else if let Some(err) = self.postbox.error() {
return Err(err.into());
// We regularily ping in the tick method
} else if self.state.get_time() - self.last_server_pong > SERVER_TIMEOUT {
if handles_msg == 0 && self.state.get_time() - self.last_server_pong > SERVER_TIMEOUT {
return Err(Error::ServerTimeout);
}
Ok(frontend_events)
}
@ -1072,5 +1151,13 @@ impl Client {
}
impl Drop for Client {
fn drop(&mut self) { self.postbox.send_message(ClientMsg::Disconnect); }
fn drop(&mut self) {
if let Err(e) = self.singleton_stream.send(ClientMsg::Disconnect) {
warn!(
"error during drop of client, couldn't send disconnect package, is the connection \
already closed? : {}",
e
);
}
}
}

View File

@ -40,29 +40,3 @@ pub mod vol;
pub mod volumes;
pub use loadout_builder::LoadoutBuilder;
/// The networking module containing high-level wrappers of `TcpListener` and
/// `TcpStream` (`PostOffice` and `PostBox` respectively) and data types used by
/// both the server and client. # Examples
/// ```
/// use std::net::SocketAddr;
/// use veloren_common::net::{PostBox, PostOffice};
///
/// let listen_addr = SocketAddr::from(([0, 0, 0, 0], 12345u16));
/// let conn_addr = SocketAddr::from(([127, 0, 0, 1], 12345u16));
///
/// let mut server: PostOffice<String, String> = PostOffice::bind(listen_addr).unwrap();
/// let mut client: PostBox<String, String> = PostBox::to(conn_addr).unwrap();
/// std::thread::sleep(std::time::Duration::from_millis(100));
///
/// let mut scon = server.new_postboxes().next().unwrap();
/// std::thread::sleep(std::time::Duration::from_millis(100));
///
/// scon.send_message(String::from("foo"));
/// client.send_message(String::from("bar"));
/// std::thread::sleep(std::time::Duration::from_millis(100));
///
/// assert_eq!("foo", client.next_message().unwrap());
/// assert_eq!("bar", scon.next_message().unwrap());
/// ```
pub mod net;

View File

@ -1,19 +0,0 @@
/// Messages server sends to client.
#[derive(Deserialize, Serialize, Debug)]
pub enum ServerMsg {
// VersionInfo MUST always stay first in this struct.
VersionInfo {},
}
/// Messages client sends to server.
#[derive(Deserialize, Serialize, Debug)]
pub enum ClientMsg {
// VersionInfo MUST always stay first in this struct.
VersionInfo {},
}
/// Control message type, used in [PostBox](super::PostBox) and
/// [PostOffice](super::PostOffice) to control threads.
pub enum ControlMsg {
Shutdown,
}

View File

@ -1,14 +0,0 @@
pub mod data;
//pub mod post;
pub mod post2;
pub use post2 as post;
// Reexports
pub use self::{
data::{ClientMsg, ServerMsg},
post::{Error as PostError, PostBox, PostOffice},
};
pub trait PostSend = 'static + serde::Serialize + std::marker::Send + std::fmt::Debug;
pub trait PostRecv = 'static + serde::de::DeserializeOwned + std::marker::Send + std::fmt::Debug;

View File

@ -1,628 +0,0 @@
use bincode;
use middleman::Middleman;
use mio::{
net::{TcpListener, TcpStream},
Events, Poll, PollOpt, Ready, Token,
};
use mio_extras::channel::{channel, Receiver, Sender};
use serde;
use std::{
collections::VecDeque,
convert::TryFrom,
fmt,
io::{self, Read, Write},
net::{Shutdown, SocketAddr},
sync::mpsc::TryRecvError,
thread,
time::Duration,
};
#[derive(Clone, Debug, PartialEq)]
pub enum Error {
Disconnect,
Network,
InvalidMsg,
Internal,
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Self { Error::Network }
}
impl From<TryRecvError> for Error {
fn from(err: TryRecvError) -> Self { Error::Internal }
}
impl From<bincode::ErrorKind> for Error {
fn from(err: bincode::ErrorKind) -> Self { Error::InvalidMsg }
}
impl<T> From<mio_extras::channel::SendError<T>> for Error {
fn from(err: mio_extras::channel::SendError<T>) -> Self { Error::Internal }
}
pub trait PostSend = 'static + serde::Serialize + Send + middleman::Message;
pub trait PostRecv = 'static + serde::de::DeserializeOwned + Send + middleman::Message;
const TCP_TOK: Token = Token(0);
const CTRL_TOK: Token = Token(1);
const POSTBOX_TOK: Token = Token(2);
const SEND_TOK: Token = Token(3);
const RECV_TOK: Token = Token(4);
const MIDDLEMAN_TOK: Token = Token(5);
const MAX_MSG_BYTES: usize = 1 << 28;
enum CtrlMsg {
Shutdown,
}
pub struct PostOffice<S: PostSend, R: PostRecv> {
worker: Option<thread::JoinHandle<Result<(), Error>>>,
ctrl_tx: Sender<CtrlMsg>,
postbox_rx: Receiver<Result<PostBox<S, R>, Error>>,
poll: Poll,
err: Option<Error>,
}
impl<S: PostSend, R: PostRecv> PostOffice<S, R> {
pub fn bind<A: Into<SocketAddr>>(addr: A) -> Result<Self, Error> {
let tcp_listener = TcpListener::bind(&addr.into())?;
let (ctrl_tx, ctrl_rx) = channel();
let (postbox_tx, postbox_rx) = channel();
let worker_poll = Poll::new()?;
worker_poll.register(&tcp_listener, TCP_TOK, Ready::readable(), PollOpt::edge())?;
worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?;
let office_poll = Poll::new()?;
office_poll.register(&postbox_rx, POSTBOX_TOK, Ready::readable(), PollOpt::edge())?;
let worker =
thread::spawn(move || office_worker(worker_poll, tcp_listener, ctrl_rx, postbox_tx));
Ok(Self {
worker: Some(worker),
ctrl_tx,
postbox_rx,
poll: office_poll,
err: None,
})
}
pub fn error(&self) -> Option<Error> { self.err.clone() }
pub fn new_connections(&mut self) -> impl ExactSizeIterator<Item = PostBox<S, R>> {
let mut conns = VecDeque::new();
if self.err.is_some() {
return conns.into_iter();
}
let mut events = Events::with_capacity(64);
if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) {
self.err = Some(err.into());
return conns.into_iter();
}
for event in events {
match event.token() {
// Keep reading new postboxes from the channel
POSTBOX_TOK => loop {
match self.postbox_rx.try_recv() {
Ok(Ok(conn)) => conns.push_back(conn),
Err(TryRecvError::Empty) => break,
Err(err) => {
self.err = Some(err.into());
return conns.into_iter();
},
Ok(Err(err)) => {
self.err = Some(err);
return conns.into_iter();
},
}
},
tok => panic!("Unexpected event token '{:?}'", tok),
}
}
conns.into_iter()
}
}
impl<S: PostSend, R: PostRecv> Drop for PostOffice<S, R> {
fn drop(&mut self) {
let _ = self.ctrl_tx.send(CtrlMsg::Shutdown);
let _ = self.worker.take().map(|w| w.join());
}
}
fn office_worker<S: PostSend, R: PostRecv>(
poll: Poll,
tcp_listener: TcpListener,
ctrl_rx: Receiver<CtrlMsg>,
postbox_tx: Sender<Result<PostBox<S, R>, Error>>,
) -> Result<(), Error> {
let mut events = Events::with_capacity(64);
loop {
if let Err(err) = poll.poll(&mut events, None) {
postbox_tx.send(Err(err.into()))?;
return Ok(());
}
for event in &events {
match event.token() {
CTRL_TOK => loop {
match ctrl_rx.try_recv() {
Ok(CtrlMsg::Shutdown) => return Ok(()),
Err(TryRecvError::Empty) => {},
Err(err) => {
postbox_tx.send(Err(err.into()))?;
return Ok(());
},
}
},
TCP_TOK => postbox_tx.send(match tcp_listener.accept() {
Ok((stream, _)) => PostBox::from_tcpstream(stream),
Err(err) => Err(err.into()),
})?,
tok => panic!("Unexpected event token '{:?}'", tok),
}
}
}
}
pub struct PostBox<S: PostSend, R: PostRecv> {
worker: Option<thread::JoinHandle<Result<(), Error>>>,
ctrl_tx: Sender<CtrlMsg>,
send_tx: Sender<S>,
recv_rx: Receiver<Result<R, Error>>,
poll: Poll,
err: Option<Error>,
}
impl<S: PostSend, R: PostRecv> PostBox<S, R> {
pub fn to_server<A: Into<SocketAddr>>(addr: A) -> Result<Self, Error> {
Self::from_tcpstream(TcpStream::from_stream(std::net::TcpStream::connect(
&addr.into(),
)?)?)
}
fn from_tcpstream(tcp_stream: TcpStream) -> Result<Self, Error> {
let (ctrl_tx, ctrl_rx) = channel();
let (send_tx, send_rx) = channel();
let (recv_tx, recv_rx) = channel();
let worker_poll = Poll::new()?;
worker_poll.register(
&tcp_stream,
TCP_TOK,
Ready::readable() | Ready::writable(),
PollOpt::edge(),
)?;
worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?;
worker_poll.register(&send_rx, SEND_TOK, Ready::readable(), PollOpt::edge())?;
let postbox_poll = Poll::new()?;
postbox_poll.register(&recv_rx, RECV_TOK, Ready::readable(), PollOpt::edge())?;
let worker = thread::spawn(move || {
postbox_worker(worker_poll, tcp_stream, ctrl_rx, send_rx, recv_tx)
});
Ok(Self {
worker: Some(worker),
ctrl_tx,
send_tx,
recv_rx,
poll: postbox_poll,
err: None,
})
}
pub fn error(&self) -> Option<Error> { self.err.clone() }
pub fn send(&mut self, data: S) { let _ = self.send_tx.send(data); }
// TODO: This method is super messy.
pub fn next_message(&mut self) -> Option<R> {
if self.err.is_some() {
return None;
}
loop {
let mut events = Events::with_capacity(10);
if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) {
self.err = Some(err.into());
return None;
}
for event in events {
match event.token() {
// Keep reading new messages from the channel
RECV_TOK => loop {
match self.recv_rx.try_recv() {
Ok(Ok(msg)) => return Some(msg),
Err(TryRecvError::Empty) => break,
Err(err) => {
self.err = Some(err.into());
return None;
},
Ok(Err(err)) => {
self.err = Some(err);
return None;
},
}
},
tok => panic!("Unexpected event token '{:?}'", tok),
}
}
}
}
pub fn new_messages(&mut self) -> impl ExactSizeIterator<Item = R> {
let mut msgs = VecDeque::new();
if self.err.is_some() {
return msgs.into_iter();
}
let mut events = Events::with_capacity(64);
if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) {
self.err = Some(err.into());
return msgs.into_iter();
}
for event in events {
match event.token() {
// Keep reading new messages from the channel
RECV_TOK => loop {
match self.recv_rx.try_recv() {
Ok(Ok(msg)) => msgs.push_back(msg),
Err(TryRecvError::Empty) => break,
Err(err) => {
self.err = Some(err.into());
return msgs.into_iter();
},
Ok(Err(err)) => {
self.err = Some(err);
return msgs.into_iter();
},
}
},
tok => panic!("Unexpected event token '{:?}'", tok),
}
}
msgs.into_iter()
}
}
impl<S: PostSend, R: PostRecv> Drop for PostBox<S, R> {
fn drop(&mut self) {
let _ = self.ctrl_tx.send(CtrlMsg::Shutdown);
let _ = self.worker.take().map(|w| w.join());
}
}
fn postbox_worker<S: PostSend, R: PostRecv>(
poll: Poll,
mut tcp_stream: TcpStream,
ctrl_rx: Receiver<CtrlMsg>,
send_rx: Receiver<S>,
recv_tx: Sender<Result<R, Error>>,
) -> Result<(), Error> {
fn try_tcp_send(
tcp_stream: &mut TcpStream,
chunks: &mut VecDeque<Vec<u8>>,
) -> Result<(), Error> {
loop {
let chunk = match chunks.pop_front() {
Some(chunk) => chunk,
None => break,
};
match tcp_stream.write_all(&chunk) {
Ok(()) => {},
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
chunks.push_front(chunk);
break;
},
Err(err) => {
println!("Error: {:?}", err);
return Err(err.into());
},
}
}
Ok(())
}
enum RecvState {
ReadHead(Vec<u8>),
ReadBody(usize, Vec<u8>),
}
let mut recv_state = RecvState::ReadHead(Vec::new());
let mut chunks = VecDeque::new();
//let mut recv_state = RecvState::ReadHead(Vec::with_capacity(8));
let mut events = Events::with_capacity(64);
'work: loop {
if let Err(err) = poll.poll(&mut events, None) {
recv_tx.send(Err(err.into()))?;
break 'work;
}
for event in &events {
match event.token() {
CTRL_TOK => match ctrl_rx.try_recv() {
Ok(CtrlMsg::Shutdown) => {
break 'work;
},
Err(TryRecvError::Empty) => (),
Err(err) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
},
SEND_TOK => loop {
match send_rx.try_recv() {
Ok(outgoing_msg) => {
let mut msg_bytes = match bincode::serialize(&outgoing_msg) {
Ok(bytes) => bytes,
Err(err) => {
recv_tx.send(Err((*err).into()))?;
break 'work;
},
};
let mut bytes = msg_bytes.len().to_le_bytes().as_ref().to_vec();
bytes.append(&mut msg_bytes);
bytes
.chunks(1024)
.map(|chunk| chunk.to_vec())
.for_each(|chunk| chunks.push_back(chunk));
match try_tcp_send(&mut tcp_stream, &mut chunks) {
Ok(_) => {},
Err(err) => {
recv_tx.send(Err(err.into()))?;
return Err(Error::Network);
},
}
},
Err(TryRecvError::Empty) => break,
Err(err) => Err(err)?,
}
},
TCP_TOK => {
loop {
// Check TCP error
match tcp_stream.take_error() {
Ok(None) => {},
Ok(Some(err)) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
Err(err) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
}
match &mut recv_state {
RecvState::ReadHead(head) => {
if head.len() == 8 {
let len = usize::from_le_bytes(
<[u8; 8]>::try_from(head.as_slice()).unwrap(),
);
if len > MAX_MSG_BYTES {
println!("TOO BIG! {:x}", len);
recv_tx.send(Err(Error::InvalidMsg))?;
break 'work;
} else if len == 0 {
recv_state = RecvState::ReadHead(Vec::with_capacity(8));
break;
} else {
recv_state = RecvState::ReadBody(len, Vec::new());
}
} else {
let mut b = [0; 1];
match tcp_stream.read(&mut b) {
Ok(0) => {},
Ok(_) => head.push(b[0]),
Err(_) => break,
}
}
},
RecvState::ReadBody(len, body) => {
if body.len() == *len {
match bincode::deserialize(&body) {
Ok(msg) => {
recv_tx.send(Ok(msg))?;
recv_state = RecvState::ReadHead(Vec::with_capacity(8));
},
Err(err) => {
recv_tx.send(Err((*err).into()))?;
break 'work;
},
}
} else {
let left = *len - body.len();
let mut buf = vec![0; left];
match tcp_stream.read(&mut buf) {
Ok(_) => body.append(&mut buf),
Err(err) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
}
}
},
}
}
// Now, try sending TCP stuff
match try_tcp_send(&mut tcp_stream, &mut chunks) {
Ok(_) => {},
Err(err) => {
recv_tx.send(Err(err.into()))?;
return Err(Error::Network);
},
}
},
tok => panic!("Unexpected event token '{:?}'", tok),
}
}
}
//tcp_stream.shutdown(Shutdown::Both)?;
Ok(())
}
// TESTS
/*
#[derive(Serialize, Deserialize)]
struct TestMsg<T>(T);
#[test]
fn connect() {
let srv_addr = ([127, 0, 0, 1], 12345);
let mut postoffice = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(srv_addr).unwrap();
// We should start off with 0 incoming connections.
thread::sleep(Duration::from_millis(250));
assert_eq!(postoffice.new_connections().len(), 0);
assert_eq!(postoffice.error(), None);
let postbox = PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(srv_addr).unwrap();
// Now a postbox has been created, we should have 1 new.
thread::sleep(Duration::from_millis(250));
let incoming = postoffice.new_connections();
assert_eq!(incoming.len(), 1);
assert_eq!(postoffice.error(), None);
}
#[test]
fn connect_fail() {
let listen_addr = ([0; 4], 12345);
let connect_addr = ([127, 0, 0, 1], 12212);
let mut postoffice = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(listen_addr).unwrap();
// We should start off with 0 incoming connections.
thread::sleep(Duration::from_millis(250));
assert_eq!(postoffice.new_connections().len(), 0);
assert_eq!(postoffice.error(), None);
assert!(PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(connect_addr).is_err());
}
#[test]
fn connection_count() {
let srv_addr = ([127, 0, 0, 1], 12346);
let mut postoffice = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(srv_addr).unwrap();
let mut postboxes = Vec::new();
// We should start off with 0 incoming connections.
thread::sleep(Duration::from_millis(250));
assert_eq!(postoffice.new_connections().len(), 0);
assert_eq!(postoffice.error(), None);
for _ in 0..5 {
postboxes.push(PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(srv_addr).unwrap());
}
// 5 postboxes created, we should have 5.
thread::sleep(Duration::from_millis(3500));
let incoming = postoffice.new_connections();
assert_eq!(incoming.len(), 5);
assert_eq!(postoffice.error(), None);
}
#[test]
fn disconnect() {
let srv_addr = ([127, 0, 0, 1], 12347);
let mut postoffice = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(srv_addr).unwrap();
let mut server_postbox = {
let mut client_postbox = PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(srv_addr).unwrap();
thread::sleep(Duration::from_millis(250));
let mut incoming = postoffice.new_connections();
assert_eq!(incoming.len(), 1);
assert_eq!(postoffice.error(), None);
incoming.next().unwrap()
};
// The client postbox has since been disconnected.
thread::sleep(Duration::from_millis(2050));
let incoming_msgs = server_postbox.new_messages();
assert_eq!(incoming_msgs.len(), 0);
// TODO
// assert_eq!(server_postbox.error(), Some(Error::Disconnect));
}
#[test]
fn client_to_server() {
let srv_addr = ([127, 0, 0, 1], 12348);
let mut po = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(srv_addr).unwrap();
let mut client_pb = PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(srv_addr).unwrap();
thread::sleep(Duration::from_millis(250));
let mut server_pb = po.new_connections().next().unwrap();
client_pb.send(TestMsg(1337.0));
client_pb.send(TestMsg(9821.0));
client_pb.send(TestMsg(-3.2));
client_pb.send(TestMsg(17.0));
thread::sleep(Duration::from_millis(250));
let mut incoming_msgs = server_pb.new_messages();
assert_eq!(incoming_msgs.len(), 4);
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(1337.0));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(9821.0));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(-3.2));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(17.0));
}
#[test]
fn server_to_client() {
let srv_addr = ([127, 0, 0, 1], 12349);
let mut po = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(srv_addr).unwrap();
let mut client_pb = PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(srv_addr).unwrap();
thread::sleep(Duration::from_millis(250));
let mut server_pb = po.new_connections().next().unwrap();
server_pb.send(TestMsg(1337));
server_pb.send(TestMsg(9821));
server_pb.send(TestMsg(39999999));
server_pb.send(TestMsg(17));
thread::sleep(Duration::from_millis(250));
let mut incoming_msgs = client_pb.new_messages();
assert_eq!(incoming_msgs.len(), 4);
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(1337));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(9821));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(39999999));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(17));
}
*/

View File

@ -1,439 +0,0 @@
use crossbeam::channel;
use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::VecDeque,
convert::TryFrom,
io::{self, Read, Write},
marker::PhantomData,
net::{Shutdown, SocketAddr, TcpListener, TcpStream},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::Duration,
};
use tracing::warn;
#[derive(Clone, Debug)]
pub enum Error {
Io(Arc<io::Error>),
Bincode(Arc<bincode::Error>),
ChannelFailure,
InvalidMessage,
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Self { Error::Io(Arc::new(err)) }
}
impl From<bincode::Error> for Error {
fn from(err: bincode::Error) -> Self { Error::Bincode(Arc::new(err)) }
}
impl From<channel::TryRecvError> for Error {
fn from(_error: channel::TryRecvError) -> Self { Error::ChannelFailure }
}
pub trait PostMsg = Serialize + DeserializeOwned + 'static + Send;
const MAX_MSG_SIZE: usize = 1 << 28;
pub struct PostOffice<S: PostMsg, R: PostMsg> {
listener: TcpListener,
error: Option<Error>,
phantom: PhantomData<(S, R)>,
}
impl<S: PostMsg, R: PostMsg> PostOffice<S, R> {
pub fn bind<A: Into<SocketAddr>>(addr: A) -> Result<Self, Error> {
let listener = TcpListener::bind(addr.into())?;
listener.set_nonblocking(true)?;
Ok(Self {
listener,
error: None,
phantom: PhantomData,
})
}
pub fn error(&self) -> Option<Error> { self.error.clone() }
pub fn new_postboxes(&mut self) -> impl ExactSizeIterator<Item = PostBox<S, R>> {
let mut new = Vec::new();
if self.error.is_some() {
return new.into_iter();
}
loop {
match self.listener.accept() {
Ok((stream, _sock)) => new.push(PostBox::from_stream(stream).unwrap()),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) if e.kind() == io::ErrorKind::Interrupted => {},
Err(e) => {
self.error = Some(e.into());
break;
},
}
}
new.into_iter()
}
}
pub struct PostBox<S: PostMsg, R: PostMsg> {
send_tx: channel::Sender<S>,
recv_rx: channel::Receiver<Result<R, Error>>,
worker: Option<thread::JoinHandle<()>>,
running: Arc<AtomicBool>,
error: Option<Error>,
}
impl<S: PostMsg, R: PostMsg> PostBox<S, R> {
pub fn to<A: Into<SocketAddr>>(addr: A) -> Result<Self, Error> {
Self::from_stream(TcpStream::connect(addr.into())?)
}
fn from_stream(stream: TcpStream) -> Result<Self, Error> {
stream.set_nonblocking(true)?;
let running = Arc::new(AtomicBool::new(true));
let worker_running = running.clone();
let (send_tx, send_rx) = channel::unbounded();
let (recv_tx, recv_rx) = channel::unbounded();
let worker = thread::spawn(move || Self::worker(stream, send_rx, recv_tx, worker_running));
Ok(Self {
send_tx,
recv_rx,
worker: Some(worker),
running,
error: None,
})
}
pub fn error(&self) -> Option<Error> { self.error.clone() }
pub fn send_message(&mut self, msg: S) { let _ = self.send_tx.send(msg); }
pub fn next_message(&mut self) -> Result<R, Error> {
if let Some(e) = self.error.clone() {
return Err(e);
}
match self.recv_rx.recv().map_err(|_| Error::ChannelFailure)? {
Ok(msg) => Ok(msg),
Err(e) => {
self.error = Some(e.clone());
Err(e)
},
}
}
pub fn new_messages(&mut self) -> impl ExactSizeIterator<Item = R> {
let mut new = Vec::new();
if self.error.is_some() {
return new.into_iter();
}
loop {
match self.recv_rx.try_recv() {
Ok(Ok(msg)) => new.push(msg),
Err(channel::TryRecvError::Empty) => break,
Err(e) => {
self.error = Some(e.into());
break;
},
Ok(Err(e)) => {
self.error = Some(e);
break;
},
}
}
new.into_iter()
}
fn worker(
mut stream: TcpStream,
send_rx: channel::Receiver<S>,
recv_tx: channel::Sender<Result<R, Error>>,
running: Arc<AtomicBool>,
) {
let mut outgoing_chunks = VecDeque::new();
let mut incoming_buf = Vec::new();
'work: while running.load(Ordering::Relaxed) {
for _ in 0..30 {
// Get stream errors.
match stream.take_error() {
Ok(Some(e)) | Err(e) => {
recv_tx.send(Err(e.into())).unwrap();
break 'work;
},
Ok(None) => {},
}
// Try getting messages from the send channel.
for _ in 0..1000 {
match send_rx.try_recv() {
Ok(send_msg) => {
// Serialize message
let msg_bytes = bincode::serialize(&send_msg).unwrap();
let mut msg_bytes = lz4_compress::compress(&msg_bytes);
/*
if msg_bytes.len() > 512 {
println!("MSG SIZE: {}", msg_bytes.len());
}
*/
// Assemble into packet.
let mut packet_bytes =
(msg_bytes.len() as u64).to_le_bytes().as_ref().to_vec();
packet_bytes.push(msg_bytes.iter().fold(0, |a, x| a ^ *x));
packet_bytes.append(&mut msg_bytes);
// Split packet into chunks.
packet_bytes
.chunks(4096)
.map(|chunk| chunk.to_vec())
.for_each(|chunk| outgoing_chunks.push_back(chunk))
},
Err(channel::TryRecvError::Empty) => break,
// Worker error
Err(e) => {
let _ = recv_tx.send(Err(e.into()));
break 'work;
},
}
}
// Try sending bytes through the TCP stream.
for _ in 0..1000 {
match outgoing_chunks.pop_front() {
Some(mut chunk) => match stream.write(&chunk) {
Ok(n) if n == chunk.len() => {},
Ok(n) => {
outgoing_chunks.push_front(chunk.split_off(n));
break;
},
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// Return chunk to the queue to try again later.
outgoing_chunks.push_front(chunk);
break;
},
// Worker error
Err(e) => {
recv_tx.send(Err(e.into())).unwrap();
break 'work;
},
},
None => break,
}
}
// Try receiving bytes from the TCP stream.
for _ in 0..1000 {
let mut buf = [0; 4096];
match stream.read(&mut buf) {
Ok(n) => incoming_buf.extend_from_slice(&buf[0..n]),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) if e.kind() == io::ErrorKind::Interrupted => {},
// Worker error
Err(e) => {
recv_tx.send(Err(e.into())).unwrap();
break 'work;
},
}
}
// Try turning bytes into messages.
for _ in 0..1000 {
match incoming_buf.get(0..9) {
Some(len_bytes) => {
let len =
u64::from_le_bytes(<[u8; 8]>::try_from(&len_bytes[0..8]).unwrap())
as usize; // Can't fail
if len > MAX_MSG_SIZE {
recv_tx.send(Err(Error::InvalidMessage)).unwrap();
break 'work;
} else if incoming_buf.len() >= len + 9 {
let checksum_found =
incoming_buf[9..len + 9].iter().fold(0, |a, x| a ^ *x);
let checksum_expected = len_bytes[8];
assert_eq!(
checksum_found, checksum_expected,
"Message checksum failed!"
);
let msg_bytes =
lz4_compress::decompress(&incoming_buf[9..len + 9]).unwrap();
match bincode::deserialize(&msg_bytes) {
Ok(msg) => recv_tx.send(Ok(msg)).unwrap(),
Err(err) => {
println!("BINCODE ERROR: {:?}", err);
recv_tx.send(Err(err.into())).unwrap()
},
}
incoming_buf = incoming_buf.split_off(len + 9);
} else {
break;
}
},
None => break,
}
}
}
thread::sleep(Duration::from_millis(10));
}
if let Err(err) = stream.shutdown(Shutdown::Both) {
warn!("TCP worker stream shutdown failed: {:?}", err);
}
}
}
impl<S: PostMsg, R: PostMsg> Drop for PostBox<S, R> {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
self.worker.take().map(|handle| handle.join());
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
fn create_postoffice<S: PostMsg, R: PostMsg>(
id: u16,
) -> Result<(PostOffice<S, R>, SocketAddr), Error> {
let sock = ([0; 4], 12345 + id).into();
Ok((PostOffice::bind(sock)?, sock))
}
fn loop_for<F: FnMut()>(duration: Duration, mut f: F) {
let start = Instant::now();
while start.elapsed() < duration {
f();
}
}
#[test]
fn connect() {
let (mut postoffice, bound) = create_postoffice::<(), ()>(0).unwrap();
let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port());
let _client0 = PostBox::<(), ()>::to(sock).unwrap();
let _client1 = PostBox::<(), ()>::to(sock).unwrap();
let _client2 = PostBox::<(), ()>::to(sock).unwrap();
let mut new_clients = 0;
loop_for(Duration::from_millis(250), || {
new_clients += postoffice.new_postboxes().count();
});
assert_eq!(new_clients, 3);
}
/*
#[test]
fn disconnect() {
let (mut postoffice, sock) = create_postoffice::<(), ()>(1).unwrap();
let mut client = PostBox::<i32, ()>::to(sock).unwrap();
loop_for(Duration::from_millis(250), || ());
let mut server = postoffice.new_postboxes().unwrap().next().unwrap();
drop(client);
loop_for(Duration::from_millis(300), || ());
assert!(server.new_messages().is_err());
}
*/
#[test]
fn send_recv() {
let (mut postoffice, bound) = create_postoffice::<(), i32>(2).unwrap();
let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port());
let test_msgs = vec![1, 1337, 42, -48];
let mut client = PostBox::<i32, ()>::to(sock).unwrap();
loop_for(Duration::from_millis(250), || ());
let mut server = postoffice.new_postboxes().next().unwrap();
for msg in &test_msgs {
client.send_message(*msg);
}
let mut recv_msgs = Vec::new();
loop_for(Duration::from_millis(250), || {
server.new_messages().for_each(|msg| recv_msgs.push(msg))
});
assert_eq!(test_msgs, recv_msgs);
}
#[test]
#[ignore]
fn send_recv_huge() {
let (mut postoffice, bound) = create_postoffice::<(), Vec<i32>>(3).unwrap();
let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port());
let test_msgs: Vec<Vec<i32>> = (0..5)
.map(|i| (0..100000).map(|j| i * 2 + j).collect())
.collect();
let mut client = PostBox::<Vec<i32>, ()>::to(sock).unwrap();
loop_for(Duration::from_millis(250), || ());
let mut server = postoffice.new_postboxes().next().unwrap();
for msg in &test_msgs {
client.send_message(msg.clone());
}
let mut recv_msgs = Vec::new();
loop_for(Duration::from_millis(3000), || {
server.new_messages().for_each(|msg| recv_msgs.push(msg))
});
assert_eq!(test_msgs.len(), recv_msgs.len());
assert!(test_msgs == recv_msgs);
}
#[test]
fn send_recv_both() {
let (mut postoffice, bound) = create_postoffice::<u32, u32>(4).unwrap();
let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port());
let mut client = PostBox::<u32, u32>::to(sock).unwrap();
loop_for(Duration::from_millis(250), || ());
let mut server = postoffice.new_postboxes().next().unwrap();
let test_msgs = vec![
(0xDEADBEAD, 0xBEEEEEEF),
(0x1BADB002, 0xBAADF00D),
(0xBAADA555, 0xC0DED00D),
(0xCAFEBABE, 0xDEADC0DE),
];
for (to, from) in test_msgs {
client.send_message(to);
server.send_message(from);
loop_for(Duration::from_millis(250), || ());
assert_eq!(client.new_messages().next().unwrap(), from);
assert_eq!(server.new_messages().next().unwrap(), to);
}
}
}

View File

@ -11,6 +11,7 @@ default = ["worldgen"]
[dependencies]
common = { package = "veloren-common", path = "../common" }
world = { package = "veloren-world", path = "../world" }
network = { package = "veloren_network", path = "../network", default-features = false }
specs-idvs = { git = "https://gitlab.com/veloren/specs-idvs.git" }
@ -18,6 +19,9 @@ tracing = "0.1"
specs = { version = "0.15.1", features = ["shred-derive"] }
vek = "0.11.0"
uvth = "3.1.1"
futures-util = "0.3"
futures-executor = "0.3"
futures-timer = "3.0"
lazy_static = "1.4.0"
scan_fmt = "0.2.4"
ron = { version = "0.6", default-features = false }

View File

@ -1,15 +1,13 @@
use common::{
msg::{ClientMsg, ClientState, RequestStateError, ServerMsg},
net::PostBox,
};
use common::msg::{ClientState, RequestStateError, ServerMsg};
use hashbrown::HashSet;
use network::Stream;
use specs::{Component, FlaggedStorage};
use specs_idvs::IDVStorage;
use vek::*;
pub struct Client {
pub client_state: ClientState,
pub postbox: PostBox<ServerMsg, ClientMsg>,
pub singleton_stream: std::sync::Mutex<Stream>,
pub last_ping: f64,
pub login_msg_sent: bool,
}
@ -19,7 +17,9 @@ impl Component for Client {
}
impl Client {
pub fn notify(&mut self, msg: ServerMsg) { self.postbox.send_message(msg); }
pub fn notify(&mut self, msg: ServerMsg) {
let _ = self.singleton_stream.lock().unwrap().send(msg);
}
pub fn is_registered(&self) -> bool {
match self.client_state {
@ -37,13 +37,19 @@ impl Client {
pub fn allow_state(&mut self, new_state: ClientState) {
self.client_state = new_state;
self.postbox
.send_message(ServerMsg::StateAnswer(Ok(new_state)));
let _ = self
.singleton_stream
.lock()
.unwrap()
.send(ServerMsg::StateAnswer(Ok(new_state)));
}
pub fn error_state(&mut self, error: RequestStateError) {
self.postbox
.send_message(ServerMsg::StateAnswer(Err((error, self.client_state))));
let _ = self
.singleton_stream
.lock()
.unwrap()
.send(ServerMsg::StateAnswer(Err((error, self.client_state))));
}
}

View File

@ -1,11 +1,21 @@
use common::net::PostError;
use network::{NetworkError, ParticipantError, StreamError};
#[derive(Debug)]
pub enum Error {
Network(PostError),
NetworkErr(NetworkError),
ParticipantErr(ParticipantError),
StreamErr(StreamError),
Other(String),
}
impl From<PostError> for Error {
fn from(err: PostError) -> Self { Error::Network(err) }
impl From<NetworkError> for Error {
fn from(err: NetworkError) -> Self { Error::NetworkErr(err) }
}
impl From<ParticipantError> for Error {
fn from(err: ParticipantError) -> Self { Error::ParticipantErr(err) }
}
impl From<StreamError> for Error {
fn from(err: StreamError) -> Self { Error::StreamErr(err) }
}

View File

@ -31,14 +31,17 @@ use common::{
cmd::ChatCommand,
comp::{self, ChatType},
event::{EventBus, ServerEvent},
msg::{ClientMsg, ClientState, ServerInfo, ServerMsg},
net::PostOffice,
msg::{ClientState, ServerInfo, ServerMsg},
state::{State, TimeOfDay},
sync::WorldSyncExt,
terrain::TerrainChunkSize,
vol::{ReadVol, RectVolSize},
};
use futures_executor::block_on;
use futures_timer::Delay;
use futures_util::{select, FutureExt};
use metrics::{ServerMetrics, TickMetrics};
use network::{Address, Network, Pid};
use persistence::character::{CharacterLoader, CharacterLoaderResponseType, CharacterUpdater};
use specs::{join::Join, Builder, Entity as EcsEntity, RunNow, SystemData, WorldExt};
use std::{
@ -77,7 +80,7 @@ pub struct Server {
world: Arc<World>,
map: Vec<u32>,
postoffice: PostOffice<ServerMsg, ClientMsg>,
network: Network,
thread_pool: ThreadPool,
@ -233,16 +236,21 @@ impl Server {
.run(settings.metrics_address)
.expect("Failed to initialize server metrics submodule.");
let thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".to_string())
.build();
let (network, f) = Network::new(Pid::new(), None);
thread_pool.execute(f);
block_on(network.listen(Address::Tcp(settings.gameserver_address)))?;
let this = Self {
state,
world: Arc::new(world),
map,
postoffice: PostOffice::bind(settings.gameserver_address)?,
network,
thread_pool: ThreadPoolBuilder::new()
.name("veloren-worker".into())
.build(),
thread_pool,
metrics,
tick_metrics,
@ -329,17 +337,18 @@ impl Server {
// 1) Build up a list of events for this frame, to be passed to the frontend.
let mut frontend_events = Vec::new();
// If networking has problems, handle them.
if let Some(err) = self.postoffice.error() {
return Err(err.into());
}
// 2)
let before_new_connections = Instant::now();
// 3) Handle inputs from clients
frontend_events.append(&mut self.handle_new_connections()?);
block_on(async {
//TIMEOUT 0.01 ms for msg handling
select!(
_ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()),
err = self.handle_new_connections(&mut frontend_events).fuse() => err,
)
})?;
let before_message_system = Instant::now();
@ -582,13 +591,17 @@ impl Server {
}
/// Handle new client connections.
fn handle_new_connections(&mut self) -> Result<Vec<Event>, Error> {
let mut frontend_events = Vec::new();
async fn handle_new_connections(
&mut self,
frontend_events: &mut Vec<Event>,
) -> Result<(), Error> {
loop {
let participant = self.network.connected().await?;
let singleton_stream = participant.opened().await?;
for postbox in self.postoffice.new_postboxes() {
let mut client = Client {
client_state: ClientState::Connected,
postbox,
singleton_stream: std::sync::Mutex::new(singleton_stream),
last_ping: self.state.get_time(),
login_msg_sent: false,
};
@ -626,8 +639,6 @@ impl Server {
frontend_events.push(Event::ClientConnected { entity });
}
}
Ok(frontend_events)
}
pub fn notify_client<S>(&self, entity: EcsEntity, msg: S)

View File

@ -18,11 +18,360 @@ use common::{
terrain::{Block, TerrainChunkSize, TerrainGrid},
vol::{RectVolSize, Vox},
};
use futures_executor::block_on;
use futures_timer::Delay;
use futures_util::{select, FutureExt};
use hashbrown::HashMap;
use specs::{
Entities, Join, Read, ReadExpect, ReadStorage, System, Write, WriteExpect, WriteStorage,
};
impl Sys {
///We need to move this to a async fn, otherwise the compiler generates to
/// much recursive fn, and async closures dont work yet
#[allow(clippy::too_many_arguments)]
async fn handle_client_msg(
server_emitter: &mut common::event::Emitter<'_, ServerEvent>,
new_chat_msgs: &mut Vec<(Option<specs::Entity>, ChatMsg)>,
player_list: &HashMap<Uid, PlayerInfo>,
new_players: &mut Vec<specs::Entity>,
entity: specs::Entity,
client: &mut Client,
cnt: &mut u64,
character_loader: &ReadExpect<'_, CharacterLoader>,
terrain: &ReadExpect<'_, TerrainGrid>,
uids: &ReadStorage<'_, Uid>,
can_build: &ReadStorage<'_, CanBuild>,
force_updates: &ReadStorage<'_, ForceUpdate>,
stats: &ReadStorage<'_, Stats>,
chat_modes: &ReadStorage<'_, ChatMode>,
accounts: &mut WriteExpect<'_, AuthProvider>,
block_changes: &mut Write<'_, BlockChange>,
admin_list: &ReadExpect<'_, AdminList>,
admins: &mut WriteStorage<'_, Admin>,
positions: &mut WriteStorage<'_, Pos>,
velocities: &mut WriteStorage<'_, Vel>,
orientations: &mut WriteStorage<'_, Ori>,
players: &mut WriteStorage<'_, Player>,
controllers: &mut WriteStorage<'_, Controller>,
settings: &Read<'_, ServerSettings>,
) -> Result<(), crate::error::Error> {
loop {
let msg = client.singleton_stream.lock().unwrap().recv().await?;
*cnt += 1;
match msg {
// Go back to registered state (char selection screen)
ClientMsg::ExitIngame => match client.client_state {
// Use ClientMsg::Register instead.
ClientState::Connected => client.error_state(RequestStateError::WrongMessage),
ClientState::Registered => client.error_state(RequestStateError::Already),
ClientState::Spectator | ClientState::Character => {
server_emitter.emit(ServerEvent::ExitIngame { entity });
},
ClientState::Pending => {},
},
// Request spectator state
ClientMsg::Spectate => match client.client_state {
// Become Registered first.
ClientState::Connected => client.error_state(RequestStateError::Impossible),
ClientState::Spectator => client.error_state(RequestStateError::Already),
ClientState::Registered | ClientState::Character => {
client.allow_state(ClientState::Spectator)
},
ClientState::Pending => {},
},
// Request registered state (login)
ClientMsg::Register {
view_distance,
token_or_username,
} => {
let (username, uuid) = match accounts.query(token_or_username.clone()) {
Err(err) => {
client.error_state(RequestStateError::RegisterDenied(err));
break Ok(());
},
Ok((username, uuid)) => (username, uuid),
};
let vd =
view_distance.map(|vd| vd.min(settings.max_view_distance.unwrap_or(vd)));
let player = Player::new(username.clone(), None, vd, uuid);
let is_admin = admin_list.contains(&username);
if !player.is_valid() {
// Invalid player
client.error_state(RequestStateError::Impossible);
break Ok(());
}
match client.client_state {
ClientState::Connected => {
// Add Player component to this client
let _ = players.insert(entity, player);
// Give the Admin component to the player if their name exists in
// admin list
if is_admin {
let _ = admins.insert(entity, Admin);
}
// Tell the client its request was successful.
client.allow_state(ClientState::Registered);
// Send initial player list
client.notify(ServerMsg::PlayerListUpdate(PlayerListUpdate::Init(
player_list.clone(),
)));
// Add to list to notify all clients of the new player
new_players.push(entity);
},
// Use RequestState instead (No need to send `player` again).
_ => client.error_state(RequestStateError::Impossible),
}
//client.allow_state(ClientState::Registered);
// Limit view distance if it's too high
// This comes after state registration so that the client actually hears it
if settings
.max_view_distance
.zip(view_distance)
.map(|(vd, max)| vd > max)
.unwrap_or(false)
{
client.notify(ServerMsg::SetViewDistance(
settings.max_view_distance.unwrap_or(0),
));
};
},
ClientMsg::SetViewDistance(view_distance) => {
if let ClientState::Character { .. } = client.client_state {
if settings
.max_view_distance
.map(|max| view_distance <= max)
.unwrap_or(true)
{
players.get_mut(entity).map(|player| {
player.view_distance = Some(
settings
.max_view_distance
.map(|max| view_distance.min(max))
.unwrap_or(view_distance),
)
});
} else {
client.notify(ServerMsg::SetViewDistance(
settings.max_view_distance.unwrap_or(0),
));
}
}
},
ClientMsg::Character(character_id) => match client.client_state {
// Become Registered first.
ClientState::Connected => client.error_state(RequestStateError::Impossible),
ClientState::Registered | ClientState::Spectator => {
// Only send login message if it wasn't already
// sent previously
if let (Some(player), false) = (players.get(entity), client.login_msg_sent)
{
// Send a request to load the character's component data from the
// DB. Once loaded, persisted components such as stats and inventory
// will be inserted for the entity
character_loader.load_character_data(
entity,
player.uuid().to_string(),
character_id,
);
// Start inserting non-persisted/default components for the entity
// while we load the DB data
server_emitter.emit(ServerEvent::InitCharacterData {
entity,
character_id,
});
// Give the player a welcome message
if settings.server_description.len() > 0 {
client.notify(
ChatType::CommandInfo
.server_msg(settings.server_description.clone()),
);
}
// Only send login message if it wasn't already
// sent previously
if !client.login_msg_sent {
new_chat_msgs.push((None, ChatMsg {
chat_type: ChatType::Online,
message: format!("[{}] is now online.", &player.alias), // TODO: Localize this
}));
client.login_msg_sent = true;
}
} else {
client.notify(ServerMsg::CharacterDataLoadError(String::from(
"Failed to fetch player entity",
)))
}
},
ClientState::Character => client.error_state(RequestStateError::Already),
ClientState::Pending => {},
},
ClientMsg::ControllerInputs(inputs) => match client.client_state {
ClientState::Connected | ClientState::Registered | ClientState::Spectator => {
client.error_state(RequestStateError::Impossible)
},
ClientState::Character => {
if let Some(controller) = controllers.get_mut(entity) {
controller.inputs.update_with_new(inputs);
}
},
ClientState::Pending => {},
},
ClientMsg::ControlEvent(event) => match client.client_state {
ClientState::Connected | ClientState::Registered | ClientState::Spectator => {
client.error_state(RequestStateError::Impossible)
},
ClientState::Character => {
// Skip respawn if client entity is alive
if let ControlEvent::Respawn = event {
if stats.get(entity).map_or(true, |s| !s.is_dead) {
continue;
}
}
if let Some(controller) = controllers.get_mut(entity) {
controller.events.push(event);
}
},
ClientState::Pending => {},
},
ClientMsg::ControlAction(event) => match client.client_state {
ClientState::Connected | ClientState::Registered | ClientState::Spectator => {
client.error_state(RequestStateError::Impossible)
},
ClientState::Character => {
if let Some(controller) = controllers.get_mut(entity) {
controller.actions.push(event);
}
},
ClientState::Pending => {},
},
ClientMsg::ChatMsg(message) => match client.client_state {
ClientState::Connected => client.error_state(RequestStateError::Impossible),
ClientState::Registered | ClientState::Spectator | ClientState::Character => {
match validate_chat_msg(&message) {
Ok(()) => {
if let Some(from) = uids.get(entity) {
let mode = chat_modes.get(entity).cloned().unwrap_or_default();
let msg = mode.new_message(*from, message);
new_chat_msgs.push((Some(entity), msg));
} else {
tracing::error!("Could not send message. Missing player uid");
}
},
Err(ChatMsgValidationError::TooLong) => {
let max = MAX_BYTES_CHAT_MSG;
let len = message.len();
tracing::warn!(
?len,
?max,
"Recieved a chat message that's too long"
)
},
}
},
ClientState::Pending => {},
},
ClientMsg::PlayerPhysics { pos, vel, ori } => match client.client_state {
ClientState::Character => {
if force_updates.get(entity).is_none()
&& stats.get(entity).map_or(true, |s| !s.is_dead)
{
let _ = positions.insert(entity, pos);
let _ = velocities.insert(entity, vel);
let _ = orientations.insert(entity, ori);
}
},
// Only characters can send positions.
_ => client.error_state(RequestStateError::Impossible),
},
ClientMsg::BreakBlock(pos) => {
if can_build.get(entity).is_some() {
block_changes.set(pos, Block::empty());
}
},
ClientMsg::PlaceBlock(pos, block) => {
if can_build.get(entity).is_some() {
block_changes.try_set(pos, block);
}
},
ClientMsg::TerrainChunkRequest { key } => match client.client_state {
ClientState::Connected | ClientState::Registered => {
client.error_state(RequestStateError::Impossible);
},
ClientState::Spectator | ClientState::Character => {
let in_vd = if let (Some(view_distance), Some(pos)) = (
players.get(entity).and_then(|p| p.view_distance),
positions.get(entity),
) {
pos.0.xy().map(|e| e as f64).distance(
key.map(|e| e as f64 + 0.5)
* TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
) < (view_distance as f64 + 1.5) * TerrainChunkSize::RECT_SIZE.x as f64
} else {
true
};
if in_vd {
match terrain.get_key(key) {
Some(chunk) => client.notify(ServerMsg::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
}),
None => server_emitter.emit(ServerEvent::ChunkRequest(entity, key)),
}
}
},
ClientState::Pending => {},
},
// Always possible.
ClientMsg::Ping => client.notify(ServerMsg::Pong),
ClientMsg::Pong => {},
ClientMsg::Disconnect => {
client.notify(ServerMsg::Disconnect);
},
ClientMsg::Terminate => {
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
},
ClientMsg::RequestCharacterList => {
if let Some(player) = players.get(entity) {
character_loader.load_character_list(entity, player.uuid().to_string())
}
},
ClientMsg::CreateCharacter { alias, tool, body } => {
if let Some(player) = players.get(entity) {
character_loader.create_character(
entity,
player.uuid().to_string(),
alias,
tool,
body,
);
}
},
ClientMsg::DeleteCharacter(character_id) => {
if let Some(player) = players.get(entity) {
character_loader.delete_character(
entity,
player.uuid().to_string(),
character_id,
);
}
},
}
}
}
}
/// This system will handle new messages from clients
pub struct Sys;
impl<'a> System<'a> for Sys {
@ -107,347 +456,53 @@ impl<'a> System<'a> for Sys {
let mut new_players = Vec::new();
for (entity, client) in (&entities, &mut clients).join() {
let new_msgs = client.postbox.new_messages();
let mut cnt = 0;
let network_err: Result<(), crate::error::Error> = block_on(async {
//TIMEOUT 0.01 ms for msg handling
select!(
_ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()),
err = Self::handle_client_msg(
&mut server_emitter,
&mut new_chat_msgs,
&player_list,
&mut new_players,
entity,
client,
&mut cnt,
&character_loader,
&terrain,
&uids,
&can_build,
&force_updates,
&stats,
&chat_modes,
&mut accounts,
&mut block_changes,
&admin_list,
&mut admins,
&mut positions,
&mut velocities,
&mut orientations,
&mut players,
&mut controllers,
&settings,
).fuse() => err,
)
});
// Update client ping.
if new_msgs.len() > 0 {
if cnt > 0 {
client.last_ping = time.0
} else if time.0 - client.last_ping > CLIENT_TIMEOUT // Timeout
|| client.postbox.error().is_some()
|| network_err.is_err()
// Postbox error
{
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
} else if time.0 - client.last_ping > CLIENT_TIMEOUT * 0.5 {
// Try pinging the client if the timeout is nearing.
client.postbox.send_message(ServerMsg::Ping);
}
// Process incoming messages.
for msg in new_msgs {
match msg {
// Go back to registered state (char selection screen)
ClientMsg::ExitIngame => match client.client_state {
// Use ClientMsg::Register instead.
ClientState::Connected => {
client.error_state(RequestStateError::WrongMessage)
},
ClientState::Registered => client.error_state(RequestStateError::Already),
ClientState::Spectator | ClientState::Character => {
server_emitter.emit(ServerEvent::ExitIngame { entity });
},
ClientState::Pending => {},
},
// Request spectator state
ClientMsg::Spectate => match client.client_state {
// Become Registered first.
ClientState::Connected => client.error_state(RequestStateError::Impossible),
ClientState::Spectator => client.error_state(RequestStateError::Already),
ClientState::Registered | ClientState::Character => {
client.allow_state(ClientState::Spectator)
},
ClientState::Pending => {},
},
// Request registered state (login)
ClientMsg::Register {
view_distance,
token_or_username,
} => {
let (username, uuid) = match accounts.query(token_or_username.clone()) {
Err(err) => {
client.error_state(RequestStateError::RegisterDenied(err));
break;
},
Ok((username, uuid)) => (username, uuid),
};
let vd = view_distance
.map(|vd| vd.min(settings.max_view_distance.unwrap_or(vd)));
let player = Player::new(username.clone(), None, vd, uuid);
let is_admin = admin_list.contains(&username);
if !player.is_valid() {
// Invalid player
client.error_state(RequestStateError::Impossible);
break;
}
match client.client_state {
ClientState::Connected => {
// Add Player component to this client
let _ = players.insert(entity, player);
// Give the Admin component to the player if their name exists in
// admin list
if is_admin {
let _ = admins.insert(entity, Admin);
}
// Tell the client its request was successful.
client.allow_state(ClientState::Registered);
// Send initial player list
client.notify(ServerMsg::PlayerListUpdate(PlayerListUpdate::Init(
player_list.clone(),
)));
// Add to list to notify all clients of the new player
new_players.push(entity);
},
// Use RequestState instead (No need to send `player` again).
_ => client.error_state(RequestStateError::Impossible),
}
//client.allow_state(ClientState::Registered);
// Limit view distance if it's too high
// This comes after state registration so that the client actually hears it
if settings
.max_view_distance
.zip(view_distance)
.map(|(vd, max)| vd > max)
.unwrap_or(false)
{
client.notify(ServerMsg::SetViewDistance(
settings.max_view_distance.unwrap_or(0),
));
};
},
ClientMsg::SetViewDistance(view_distance) => match client.client_state {
ClientState::Character { .. } => {
if settings
.max_view_distance
.map(|max| view_distance <= max)
.unwrap_or(true)
{
players.get_mut(entity).map(|player| {
player.view_distance = Some(
settings
.max_view_distance
.map(|max| view_distance.min(max))
.unwrap_or(view_distance),
)
});
} else {
client.notify(ServerMsg::SetViewDistance(
settings.max_view_distance.unwrap_or(0),
));
}
},
_ => {},
},
ClientMsg::Character(character_id) => match client.client_state {
// Become Registered first.
ClientState::Connected => client.error_state(RequestStateError::Impossible),
ClientState::Registered | ClientState::Spectator => {
// Only send login message if it wasn't already
// sent previously
if let (Some(player), false) =
(players.get(entity), client.login_msg_sent)
{
// Send a request to load the character's component data from the
// DB. Once loaded, persisted components such as stats and inventory
// will be inserted for the entity
character_loader.load_character_data(
entity,
player.uuid().to_string(),
character_id,
);
// Start inserting non-persisted/default components for the entity
// while we load the DB data
server_emitter.emit(ServerEvent::InitCharacterData {
entity,
character_id,
});
// Give the player a welcome message
if settings.server_description.len() > 0 {
client.notify(
ChatType::CommandInfo
.server_msg(settings.server_description.clone()),
);
}
// Only send login message if it wasn't already
// sent previously
if !client.login_msg_sent {
new_chat_msgs.push((None, ChatMsg {
chat_type: ChatType::Online,
message: format!("[{}] is now online.", &player.alias), // TODO: Localize this
}));
client.login_msg_sent = true;
}
} else {
client.notify(ServerMsg::CharacterDataLoadError(String::from(
"Failed to fetch player entity",
)))
}
},
ClientState::Character => client.error_state(RequestStateError::Already),
ClientState::Pending => {},
},
ClientMsg::ControllerInputs(inputs) => match client.client_state {
ClientState::Connected
| ClientState::Registered
| ClientState::Spectator => {
client.error_state(RequestStateError::Impossible)
},
ClientState::Character => {
if let Some(controller) = controllers.get_mut(entity) {
controller.inputs.update_with_new(inputs);
}
},
ClientState::Pending => {},
},
ClientMsg::ControlEvent(event) => match client.client_state {
ClientState::Connected
| ClientState::Registered
| ClientState::Spectator => {
client.error_state(RequestStateError::Impossible)
},
ClientState::Character => {
// Skip respawn if client entity is alive
if let &ControlEvent::Respawn = &event {
if stats.get(entity).map_or(true, |s| !s.is_dead) {
continue;
}
}
if let Some(controller) = controllers.get_mut(entity) {
controller.events.push(event);
}
},
ClientState::Pending => {},
},
ClientMsg::ControlAction(event) => match client.client_state {
ClientState::Connected
| ClientState::Registered
| ClientState::Spectator => {
client.error_state(RequestStateError::Impossible)
},
ClientState::Character => {
if let Some(controller) = controllers.get_mut(entity) {
controller.actions.push(event);
}
},
ClientState::Pending => {},
},
ClientMsg::ChatMsg(message) => match client.client_state {
ClientState::Connected => client.error_state(RequestStateError::Impossible),
ClientState::Registered
| ClientState::Spectator
| ClientState::Character => match validate_chat_msg(&message) {
Ok(()) => {
if let Some(from) = uids.get(entity) {
let mode = chat_modes.get(entity).cloned().unwrap_or_default();
let msg = mode.new_message(*from, message);
new_chat_msgs.push((Some(entity), msg));
} else {
tracing::error!("Could not send message. Missing player uid");
}
},
Err(ChatMsgValidationError::TooLong) => {
let max = MAX_BYTES_CHAT_MSG;
let len = message.len();
tracing::warn!(
?len,
?max,
"Recieved a chat message that's too long"
)
},
},
ClientState::Pending => {},
},
ClientMsg::PlayerPhysics { pos, vel, ori } => match client.client_state {
ClientState::Character => {
if force_updates.get(entity).is_none()
&& stats.get(entity).map_or(true, |s| !s.is_dead)
{
let _ = positions.insert(entity, pos);
let _ = velocities.insert(entity, vel);
let _ = orientations.insert(entity, ori);
}
},
// Only characters can send positions.
_ => client.error_state(RequestStateError::Impossible),
},
ClientMsg::BreakBlock(pos) => {
if can_build.get(entity).is_some() {
block_changes.set(pos, Block::empty());
}
},
ClientMsg::PlaceBlock(pos, block) => {
if can_build.get(entity).is_some() {
block_changes.try_set(pos, block);
}
},
ClientMsg::TerrainChunkRequest { key } => match client.client_state {
ClientState::Connected | ClientState::Registered => {
client.error_state(RequestStateError::Impossible);
},
ClientState::Spectator | ClientState::Character => {
let in_vd = if let (Some(view_distance), Some(pos)) = (
players.get(entity).and_then(|p| p.view_distance),
positions.get(entity),
) {
pos.0.xy().map(|e| e as f64).distance(
key.map(|e| e as f64 + 0.5)
* TerrainChunkSize::RECT_SIZE.map(|e| e as f64),
) < (view_distance as f64 + 1.5)
* TerrainChunkSize::RECT_SIZE.x as f64
} else {
true
};
if in_vd {
match terrain.get_key(key) {
Some(chunk) => {
client.postbox.send_message(ServerMsg::TerrainChunkUpdate {
key,
chunk: Ok(Box::new(chunk.clone())),
})
},
None => {
server_emitter.emit(ServerEvent::ChunkRequest(entity, key))
},
}
}
},
ClientState::Pending => {},
},
// Always possible.
ClientMsg::Ping => client.postbox.send_message(ServerMsg::Pong),
ClientMsg::Pong => {},
ClientMsg::Disconnect => {
client.postbox.send_message(ServerMsg::Disconnect);
},
ClientMsg::Terminate => {
server_emitter.emit(ServerEvent::ClientDisconnect(entity));
},
ClientMsg::RequestCharacterList => {
if let Some(player) = players.get(entity) {
character_loader.load_character_list(entity, player.uuid().to_string())
}
},
ClientMsg::CreateCharacter { alias, tool, body } => {
if let Some(player) = players.get(entity) {
character_loader.create_character(
entity,
player.uuid().to_string(),
alias,
tool,
body,
);
}
},
ClientMsg::DeleteCharacter(character_id) => {
if let Some(player) = players.get(entity) {
character_loader.delete_character(
entity,
player.uuid().to_string(),
character_id,
);
}
},
}
client.notify(ServerMsg::Ping);
}
}

View File

@ -40,9 +40,7 @@ impl<'a> System<'a> for Sys {
if let Ok(wp_old) = waypoints.insert(entity, Waypoint::new(player_pos.0, *time))
{
if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) {
client
.postbox
.send_message(ServerMsg::Notification(Notification::WaypointSaved));
client.notify(ServerMsg::Notification(Notification::WaypointSaved));
}
}
}

View File

@ -1,5 +1,7 @@
use client::{error::Error as ClientError, Client};
use common::net::PostError;
use client::{
error::{Error as ClientError, NetworkError},
Client,
};
use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError};
use std::{
net::ToSocketAddrs,
@ -98,12 +100,8 @@ impl ClientInit {
},
Err(err) => {
match err {
ClientError::Network(PostError::Bincode(_)) => {
last_err = Some(Error::ClientError(err));
break 'tries;
ClientError::NetworkErr(NetworkError::ListenFailed(..)) => {
},
// Assume the connection failed and try again soon
ClientError::Network(_) => {},
// Non-connection error, stop attempts
err => {
last_err = Some(Error::ClientError(err));

View File

@ -109,7 +109,17 @@ impl PlayState for MainMenuState {
client::Error::NotOnWhitelist => {
localized_strings.get("main.login.not_on_whitelist").into()
},
client::Error::Network(e) => format!(
client::Error::NetworkErr(e) => format!(
"{}: {:?}",
localized_strings.get("main.login.network_error"),
e
),
client::Error::ParticipantErr(e) => format!(
"{}: {:?}",
localized_strings.get("main.login.network_error"),
e
),
client::Error::StreamErr(e) => format!(
"{}: {:?}",
localized_strings.get("main.login.network_error"),
e

View File

@ -47,14 +47,14 @@ impl Singleplayer {
let paused = Arc::new(AtomicBool::new(false));
let paused1 = paused.clone();
let server = Server::new(settings2).expect("Failed to create server instance!");
let server = match thread_pool {
Some(pool) => server.with_thread_pool(pool),
None => server,
};
let thread = thread::spawn(move || {
let server = Server::new(settings2).expect("Failed to create server instance!");
let server = match thread_pool {
Some(pool) => server.with_thread_pool(pool),
None => server,
};
run_server(server, receiver, paused1);
});