doing a simple 1:1 swap out network coding

this is the part which prob has less Merge conflics and is easy to rebase
the next commit will have prob alot of merge conflics
followed by a fmt commit
This commit is contained in:
Marcel Märtens 2020-07-01 09:30:38 +02:00
parent 66f139eb93
commit 77c90b2c7c
17 changed files with 243 additions and 1285 deletions

16
Cargo.lock generated
View File

@ -163,7 +163,7 @@ dependencies = [
"crossbeam-utils 0.7.2",
"futures-core",
"futures-io",
"futures-timer",
"futures-timer 2.0.2",
"kv-log-macro",
"log",
"memchr",
@ -1370,6 +1370,12 @@ version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1de7508b218029b0f01662ed8f61b1c964b3ae99d6f25462d0f55a595109df6"
[[package]]
name = "futures-timer"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.5"
@ -4448,6 +4454,9 @@ version = "0.6.0"
dependencies = [
"authc",
"byteorder 1.3.4",
"futures-executor",
"futures-timer 3.0.2",
"futures-util",
"hashbrown",
"image",
"num_cpus",
@ -4456,6 +4465,7 @@ dependencies = [
"uvth 3.1.1",
"vek",
"veloren-common",
"veloren_network",
]
[[package]]
@ -4499,6 +4509,9 @@ dependencies = [
"diesel",
"diesel_migrations",
"dotenv",
"futures-executor",
"futures-timer 3.0.2",
"futures-util",
"hashbrown",
"lazy_static",
"libsqlite3-sys",
@ -4519,6 +4532,7 @@ dependencies = [
"vek",
"veloren-common",
"veloren-world",
"veloren_network",
]
[[package]]

View File

@ -6,9 +6,13 @@ edition = "2018"
[dependencies]
common = { package = "veloren-common", path = "../common", features = ["no-assets"] }
network = { package = "veloren_network", path = "../network", default-features = false }
byteorder = "1.3.2"
uvth = "3.1.1"
futures-util = "0.3"
futures-executor = "0.3"
futures-timer = "3.0"
image = { version = "0.22.3", default-features = false, features = ["png"] }
num_cpus = "1.10.1"
tracing = { version = "0.1", default-features = false }

View File

@ -1,9 +1,11 @@
use authc::AuthClientError;
use common::net::PostError;
use network::{NetworkError, ParticipantError, StreamError};
#[derive(Debug)]
pub enum Error {
Network(PostError),
NetworkErr(NetworkError),
ParticipantErr(ParticipantError),
StreamErr(StreamError),
ServerWentMad,
ServerTimeout,
ServerShutdown,
@ -19,8 +21,16 @@ pub enum Error {
Other(String),
}
impl From<PostError> for Error {
fn from(err: PostError) -> Self { Self::Network(err) }
impl From<NetworkError> for Error {
fn from(err: NetworkError) -> Self { Self::NetworkErr(err) }
}
impl From<ParticipantError> for Error {
fn from(err: ParticipantError) -> Self { Self::ParticipantErr(err) }
}
impl From<StreamError> for Error {
fn from(err: StreamError) -> Self { Self::StreamErr(err) }
}
impl From<AuthClientError> for Error {

View File

@ -25,12 +25,12 @@ use common::{
PlayerInfo, PlayerListUpdate, RegisterError, RequestStateError, ServerInfo, ServerMsg,
MAX_BYTES_CHAT_MSG,
},
net::PostBox,
state::State,
sync::{Uid, UidAllocator, WorldSyncExt},
terrain::{block::Block, TerrainChunk, TerrainChunkSize},
vol::RectVolSize,
};
use network::{Network, Participant, Stream, Address, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
use hashbrown::HashMap;
use image::DynamicImage;
use std::{
@ -38,6 +38,9 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use futures_util::{select, FutureExt};
use futures_executor::block_on;
use futures_timer::Delay;
use tracing::{debug, error, warn};
use uvth::{ThreadPool, ThreadPoolBuilder};
use vek::*;
@ -69,7 +72,9 @@ pub struct Client {
pub character_list: CharacterList,
pub active_character_id: Option<i32>,
postbox: PostBox<ClientMsg, ServerMsg>,
_network: Network,
_participant: Arc<Participant>,
singleton_stream: Stream,
last_server_ping: f64,
last_server_pong: f64,
@ -99,64 +104,80 @@ impl Client {
/// Create a new `Client`.
pub fn new<A: Into<SocketAddr>>(addr: A, view_distance: Option<u32>) -> Result<Self, Error> {
let client_state = ClientState::Connected;
let mut postbox = PostBox::to(addr)?;
let mut thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".into())
.build();
// We reduce the thread count by 1 to keep rendering smooth
thread_pool.set_num_threads((num_cpus::get() - 1).max(1));
let (network, f) = Network::new(Pid::new(), None);
thread_pool.execute(f);
let participant = block_on(network.connect(Address::Tcp(addr.into())))?;
let mut stream = block_on(participant.open(10, PROMISES_ORDERED | PROMISES_CONSISTENCY))?;
// Wait for initial sync
let (state, entity, server_info, world_map) = match postbox.next_message()? {
ServerMsg::InitialSync {
entity_package,
server_info,
time_of_day,
world_map: (map_size, world_map),
} => {
// TODO: Display that versions don't match in Voxygen
if &server_info.git_hash != *common::util::GIT_HASH {
warn!(
"Server is running {}[{}], you are running {}[{}], versions might be \
let (state, entity, server_info, world_map) = block_on(async{
loop {
match stream.recv().await? {
ServerMsg::InitialSync {
entity_package,
server_info,
time_of_day,
world_map: (map_size, world_map),
} => {
// TODO: Display that versions don't match in Voxygen
if &server_info.git_hash != *common::util::GIT_HASH {
warn!(
"Server is running {}[{}], you are running {}[{}], versions might be \
incompatible!",
server_info.git_hash,
server_info.git_date,
common::util::GIT_HASH.to_string(),
common::util::GIT_DATE.to_string(),
);
}
}
debug!("Auth Server: {:?}", server_info.auth_provider);
debug!("Auth Server: {:?}", server_info.auth_provider);
// Initialize `State`
let mut state = State::default();
// Client-only components
state
.ecs_mut()
.register::<comp::Last<comp::CharacterState>>();
// Initialize `State`
let mut state = State::default();
// Client-only components
state
.ecs_mut()
.register::<comp::Last<comp::CharacterState>>();
let entity = state.ecs_mut().apply_entity_package(entity_package);
*state.ecs_mut().write_resource() = time_of_day;
let entity = state.ecs_mut().apply_entity_package(entity_package);
*state.ecs_mut().write_resource() = time_of_day;
assert_eq!(world_map.len(), (map_size.x * map_size.y) as usize);
let mut world_map_raw = vec![0u8; 4 * world_map.len()/*map_size.x * map_size.y*/];
LittleEndian::write_u32_into(&world_map, &mut world_map_raw);
debug!("Preparing image...");
let world_map = Arc::new(
image::DynamicImage::ImageRgba8({
// Should not fail if the dimensions are correct.
let world_map =
image::ImageBuffer::from_raw(map_size.x, map_size.y, world_map_raw);
world_map.ok_or_else(|| Error::Other("Server sent a bad world map image".into()))?
})
// Flip the image, since Voxygen uses an orientation where rotation from
// positive x axis to positive y axis is counterclockwise around the z axis.
.flipv(),
);
debug!("Done preparing image...");
assert_eq!(world_map.len(), (map_size.x * map_size.y) as usize);
let mut world_map_raw = vec![0u8; 4 * world_map.len()/*map_size.x * map_size.y*/];
LittleEndian::write_u32_into(&world_map, &mut world_map_raw);
debug!("Preparing image...");
let world_map = Arc::new(
image::DynamicImage::ImageRgba8({
// Should not fail if the dimensions are correct.
let world_map =
image::ImageBuffer::from_raw(map_size.x, map_size.y, world_map_raw);
world_map.ok_or_else(|| Error::Other("Server sent a bad world map image".into()))?
})
// Flip the image, since Voxygen uses an orientation where rotation from
// positive x axis to positive y axis is counterclockwise around the z axis.
.flipv(),
);
debug!("Done preparing image...");
(state, entity, server_info, (world_map, map_size))
},
ServerMsg::TooManyPlayers => return Err(Error::TooManyPlayers),
_ => return Err(Error::ServerWentMad),
};
break Ok((state, entity, server_info, (world_map, map_size)))
},
ServerMsg::TooManyPlayers => break Err(Error::TooManyPlayers),
err => {
warn!("whoops, server mad {:?}, ignoring", err);
},
}}
})?;
postbox.send_message(ClientMsg::Ping);
stream.send(ClientMsg::Ping)?;
let mut thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".into())
@ -173,7 +194,9 @@ impl Client {
character_list: CharacterList::default(),
active_character_id: None,
postbox,
_network: network,
_participant: participant,
singleton_stream: stream,
last_server_ping: 0.0,
last_server_pong: 0.0,
@ -213,33 +236,39 @@ impl Client {
}
).unwrap_or(Ok(username))?;
self.postbox.send_message(ClientMsg::Register {
self.singleton_stream.send(ClientMsg::Register {
view_distance: self.view_distance,
token_or_username,
});
})?;
self.client_state = ClientState::Pending;
loop {
match self.postbox.next_message()? {
ServerMsg::StateAnswer(Err((RequestStateError::RegisterDenied(err), state))) => {
self.client_state = state;
break Err(match err {
RegisterError::AlreadyLoggedIn => Error::AlreadyLoggedIn,
RegisterError::AuthError(err) => Error::AuthErr(err),
RegisterError::InvalidCharacter => Error::InvalidCharacter,
RegisterError::NotOnWhitelist => Error::NotOnWhitelist,
});
},
ServerMsg::StateAnswer(Ok(ClientState::Registered)) => break Ok(()),
_ => {},
block_on(
async {
loop {
match self.singleton_stream.recv().await? {
ServerMsg::StateAnswer(Err((RequestStateError::RegisterDenied(err), state))) => {
self.client_state = state;
break Err(match err {
RegisterError::AlreadyLoggedIn => Error::AlreadyLoggedIn,
RegisterError::AuthError(err) => Error::AuthErr(err),
RegisterError::InvalidCharacter => Error::InvalidCharacter,
})
},
ServerMsg::StateAnswer(Ok(ClientState::Registered)) => break Ok(()),
ignore => {
warn!("Ignoring what the server send till registered: {:? }", ignore);
//return Err(Error::ServerWentMad)
},
}
}
}
}
)
}
/// Request a state transition to `ClientState::Character`.
pub fn request_character(&mut self, character_id: i32) {
self.postbox
.send_message(ClientMsg::Character(character_id));
self.singleton_stream
.send(ClientMsg::Character(character_id)).unwrap();
self.active_character_id = Some(character_id);
self.client_state = ClientState::Pending;
@ -248,73 +277,75 @@ impl Client {
/// Load the current players character list
pub fn load_character_list(&mut self) {
self.character_list.loading = true;
self.postbox.send_message(ClientMsg::RequestCharacterList);
self.singleton_stream.send(ClientMsg::RequestCharacterList).unwrap();
}
/// New character creation
pub fn create_character(&mut self, alias: String, tool: Option<String>, body: comp::Body) {
self.character_list.loading = true;
self.postbox
.send_message(ClientMsg::CreateCharacter { alias, tool, body });
self.singleton_stream
.send(ClientMsg::CreateCharacter { alias, tool, body }).unwrap();
}
/// Character deletion
pub fn delete_character(&mut self, character_id: i32) {
self.character_list.loading = true;
self.postbox
.send_message(ClientMsg::DeleteCharacter(character_id));
self.singleton_stream
.send(ClientMsg::DeleteCharacter(character_id)).unwrap();
}
/// Send disconnect message to the server
pub fn request_logout(&mut self) { self.postbox.send_message(ClientMsg::Disconnect); }
pub fn request_logout(&mut self) { if let Err(e) = self.singleton_stream.send(ClientMsg::Disconnect) {
error!(?e, "couldn't send disconnect package to server, did server close already?");
}}
/// Request a state transition to `ClientState::Registered` from an ingame
/// state.
pub fn request_remove_character(&mut self) {
self.postbox.send_message(ClientMsg::ExitIngame);
self.singleton_stream.send(ClientMsg::ExitIngame).unwrap();
self.client_state = ClientState::Pending;
}
pub fn set_view_distance(&mut self, view_distance: u32) {
self.view_distance = Some(view_distance.max(1).min(65));
self.postbox
.send_message(ClientMsg::SetViewDistance(self.view_distance.unwrap()));
self.singleton_stream
.send(ClientMsg::SetViewDistance(self.view_distance.unwrap())).unwrap();
// Can't fail
}
pub fn use_slot(&mut self, slot: comp::slot::Slot) {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
InventoryManip::Use(slot),
)));
))).unwrap();
}
pub fn swap_slots(&mut self, a: comp::slot::Slot, b: comp::slot::Slot) {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
InventoryManip::Swap(a, b),
)));
))).unwrap();
}
pub fn drop_slot(&mut self, slot: comp::slot::Slot) {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
InventoryManip::Drop(slot),
)));
))).unwrap();
}
pub fn pick_up(&mut self, entity: EcsEntity) {
if let Some(uid) = self.state.ecs().read_storage::<Uid>().get(entity).copied() {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
InventoryManip::Pickup(uid),
)));
))).unwrap();
}
}
pub fn toggle_lantern(&mut self) {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::ToggleLantern));
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::ToggleLantern)).unwrap();
}
pub fn is_mounted(&self) -> bool {
@ -327,8 +358,8 @@ impl Client {
pub fn mount(&mut self, entity: EcsEntity) {
if let Some(uid) = self.state.ecs().read_storage::<Uid>().get(entity).copied() {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::Mount(uid)));
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::Mount(uid))).unwrap();
}
}
@ -345,8 +376,8 @@ impl Client {
.get(self.entity)
.map_or(false, |s| s.is_dead)
{
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::Respawn));
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::Respawn)).unwrap();
}
}
@ -428,8 +459,8 @@ impl Client {
{
controller.actions.push(control_action);
}
self.postbox
.send_message(ClientMsg::ControlAction(control_action));
self.singleton_stream
.send(ClientMsg::ControlAction(control_action)).unwrap();
}
pub fn view_distance(&self) -> Option<u32> { self.view_distance }
@ -473,18 +504,18 @@ impl Client {
}
pub fn place_block(&mut self, pos: Vec3<i32>, block: Block) {
self.postbox.send_message(ClientMsg::PlaceBlock(pos, block));
self.singleton_stream.send(ClientMsg::PlaceBlock(pos, block)).unwrap();
}
pub fn remove_block(&mut self, pos: Vec3<i32>) {
self.postbox.send_message(ClientMsg::BreakBlock(pos));
self.singleton_stream.send(ClientMsg::BreakBlock(pos)).unwrap();
}
pub fn collect_block(&mut self, pos: Vec3<i32>) {
self.postbox
.send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
self.singleton_stream
.send(ClientMsg::ControlEvent(ControlEvent::InventoryManip(
InventoryManip::Collect(pos),
)));
))).unwrap();
}
/// Execute a single client tick, handle input and update the game state by
@ -539,8 +570,7 @@ impl Client {
"Couldn't access controller component on client entity"
);
}
self.postbox
.send_message(ClientMsg::ControllerInputs(inputs));
self.singleton_stream.send(ClientMsg::ControllerInputs(inputs)).unwrap();
}
// 2) Build up a list of events for this frame, to be passed to the frontend.
@ -637,8 +667,7 @@ impl Client {
if self.state.terrain().get_key(*key).is_none() {
if !skip_mode && !self.pending_chunks.contains_key(key) {
if self.pending_chunks.len() < 4 {
self.postbox
.send_message(ClientMsg::TerrainChunkRequest { key: *key });
self.singleton_stream.send(ClientMsg::TerrainChunkRequest { key: *key })?;
self.pending_chunks.insert(*key, Instant::now());
} else {
skip_mode = true;
@ -670,7 +699,7 @@ impl Client {
// Send a ping to the server once every second
if self.state.get_time() - self.last_server_ping > 1. {
self.postbox.send_message(ClientMsg::Ping);
self.singleton_stream.send(ClientMsg::Ping)?;
self.last_server_ping = self.state.get_time();
}
@ -681,8 +710,7 @@ impl Client {
self.state.read_storage().get(self.entity).cloned(),
self.state.read_storage().get(self.entity).cloned(),
) {
self.postbox
.send_message(ClientMsg::PlayerPhysics { pos, vel, ori });
self.singleton_stream.send(ClientMsg::PlayerPhysics { pos, vel, ori })?;
}
}
@ -1072,5 +1100,7 @@ impl Client {
}
impl Drop for Client {
fn drop(&mut self) { self.postbox.send_message(ClientMsg::Disconnect); }
fn drop(&mut self) { if let Err(e) = self.singleton_stream.send(ClientMsg::Disconnect) {
warn!("error during drop of client, couldn't send disconnect package, is the connection already closed? : {}", e);
} }
}

View File

@ -39,30 +39,4 @@ pub mod util;
pub mod vol;
pub mod volumes;
pub use loadout_builder::LoadoutBuilder;
/// The networking module containing high-level wrappers of `TcpListener` and
/// `TcpStream` (`PostOffice` and `PostBox` respectively) and data types used by
/// both the server and client. # Examples
/// ```
/// use std::net::SocketAddr;
/// use veloren_common::net::{PostBox, PostOffice};
///
/// let listen_addr = SocketAddr::from(([0, 0, 0, 0], 12345u16));
/// let conn_addr = SocketAddr::from(([127, 0, 0, 1], 12345u16));
///
/// let mut server: PostOffice<String, String> = PostOffice::bind(listen_addr).unwrap();
/// let mut client: PostBox<String, String> = PostBox::to(conn_addr).unwrap();
/// std::thread::sleep(std::time::Duration::from_millis(100));
///
/// let mut scon = server.new_postboxes().next().unwrap();
/// std::thread::sleep(std::time::Duration::from_millis(100));
///
/// scon.send_message(String::from("foo"));
/// client.send_message(String::from("bar"));
/// std::thread::sleep(std::time::Duration::from_millis(100));
///
/// assert_eq!("foo", client.next_message().unwrap());
/// assert_eq!("bar", scon.next_message().unwrap());
/// ```
pub mod net;
pub use loadout_builder::LoadoutBuilder;

View File

@ -1,19 +0,0 @@
/// Messages server sends to client.
#[derive(Deserialize, Serialize, Debug)]
pub enum ServerMsg {
// VersionInfo MUST always stay first in this struct.
VersionInfo {},
}
/// Messages client sends to server.
#[derive(Deserialize, Serialize, Debug)]
pub enum ClientMsg {
// VersionInfo MUST always stay first in this struct.
VersionInfo {},
}
/// Control message type, used in [PostBox](super::PostBox) and
/// [PostOffice](super::PostOffice) to control threads.
pub enum ControlMsg {
Shutdown,
}

View File

@ -1,14 +0,0 @@
pub mod data;
//pub mod post;
pub mod post2;
pub use post2 as post;
// Reexports
pub use self::{
data::{ClientMsg, ServerMsg},
post::{Error as PostError, PostBox, PostOffice},
};
pub trait PostSend = 'static + serde::Serialize + std::marker::Send + std::fmt::Debug;
pub trait PostRecv = 'static + serde::de::DeserializeOwned + std::marker::Send + std::fmt::Debug;

View File

@ -1,628 +0,0 @@
use bincode;
use middleman::Middleman;
use mio::{
net::{TcpListener, TcpStream},
Events, Poll, PollOpt, Ready, Token,
};
use mio_extras::channel::{channel, Receiver, Sender};
use serde;
use std::{
collections::VecDeque,
convert::TryFrom,
fmt,
io::{self, Read, Write},
net::{Shutdown, SocketAddr},
sync::mpsc::TryRecvError,
thread,
time::Duration,
};
#[derive(Clone, Debug, PartialEq)]
pub enum Error {
Disconnect,
Network,
InvalidMsg,
Internal,
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Self { Error::Network }
}
impl From<TryRecvError> for Error {
fn from(err: TryRecvError) -> Self { Error::Internal }
}
impl From<bincode::ErrorKind> for Error {
fn from(err: bincode::ErrorKind) -> Self { Error::InvalidMsg }
}
impl<T> From<mio_extras::channel::SendError<T>> for Error {
fn from(err: mio_extras::channel::SendError<T>) -> Self { Error::Internal }
}
pub trait PostSend = 'static + serde::Serialize + Send + middleman::Message;
pub trait PostRecv = 'static + serde::de::DeserializeOwned + Send + middleman::Message;
const TCP_TOK: Token = Token(0);
const CTRL_TOK: Token = Token(1);
const POSTBOX_TOK: Token = Token(2);
const SEND_TOK: Token = Token(3);
const RECV_TOK: Token = Token(4);
const MIDDLEMAN_TOK: Token = Token(5);
const MAX_MSG_BYTES: usize = 1 << 28;
enum CtrlMsg {
Shutdown,
}
pub struct PostOffice<S: PostSend, R: PostRecv> {
worker: Option<thread::JoinHandle<Result<(), Error>>>,
ctrl_tx: Sender<CtrlMsg>,
postbox_rx: Receiver<Result<PostBox<S, R>, Error>>,
poll: Poll,
err: Option<Error>,
}
impl<S: PostSend, R: PostRecv> PostOffice<S, R> {
pub fn bind<A: Into<SocketAddr>>(addr: A) -> Result<Self, Error> {
let tcp_listener = TcpListener::bind(&addr.into())?;
let (ctrl_tx, ctrl_rx) = channel();
let (postbox_tx, postbox_rx) = channel();
let worker_poll = Poll::new()?;
worker_poll.register(&tcp_listener, TCP_TOK, Ready::readable(), PollOpt::edge())?;
worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?;
let office_poll = Poll::new()?;
office_poll.register(&postbox_rx, POSTBOX_TOK, Ready::readable(), PollOpt::edge())?;
let worker =
thread::spawn(move || office_worker(worker_poll, tcp_listener, ctrl_rx, postbox_tx));
Ok(Self {
worker: Some(worker),
ctrl_tx,
postbox_rx,
poll: office_poll,
err: None,
})
}
pub fn error(&self) -> Option<Error> { self.err.clone() }
pub fn new_connections(&mut self) -> impl ExactSizeIterator<Item = PostBox<S, R>> {
let mut conns = VecDeque::new();
if self.err.is_some() {
return conns.into_iter();
}
let mut events = Events::with_capacity(64);
if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) {
self.err = Some(err.into());
return conns.into_iter();
}
for event in events {
match event.token() {
// Keep reading new postboxes from the channel
POSTBOX_TOK => loop {
match self.postbox_rx.try_recv() {
Ok(Ok(conn)) => conns.push_back(conn),
Err(TryRecvError::Empty) => break,
Err(err) => {
self.err = Some(err.into());
return conns.into_iter();
},
Ok(Err(err)) => {
self.err = Some(err);
return conns.into_iter();
},
}
},
tok => panic!("Unexpected event token '{:?}'", tok),
}
}
conns.into_iter()
}
}
impl<S: PostSend, R: PostRecv> Drop for PostOffice<S, R> {
fn drop(&mut self) {
let _ = self.ctrl_tx.send(CtrlMsg::Shutdown);
let _ = self.worker.take().map(|w| w.join());
}
}
fn office_worker<S: PostSend, R: PostRecv>(
poll: Poll,
tcp_listener: TcpListener,
ctrl_rx: Receiver<CtrlMsg>,
postbox_tx: Sender<Result<PostBox<S, R>, Error>>,
) -> Result<(), Error> {
let mut events = Events::with_capacity(64);
loop {
if let Err(err) = poll.poll(&mut events, None) {
postbox_tx.send(Err(err.into()))?;
return Ok(());
}
for event in &events {
match event.token() {
CTRL_TOK => loop {
match ctrl_rx.try_recv() {
Ok(CtrlMsg::Shutdown) => return Ok(()),
Err(TryRecvError::Empty) => {},
Err(err) => {
postbox_tx.send(Err(err.into()))?;
return Ok(());
},
}
},
TCP_TOK => postbox_tx.send(match tcp_listener.accept() {
Ok((stream, _)) => PostBox::from_tcpstream(stream),
Err(err) => Err(err.into()),
})?,
tok => panic!("Unexpected event token '{:?}'", tok),
}
}
}
}
pub struct PostBox<S: PostSend, R: PostRecv> {
worker: Option<thread::JoinHandle<Result<(), Error>>>,
ctrl_tx: Sender<CtrlMsg>,
send_tx: Sender<S>,
recv_rx: Receiver<Result<R, Error>>,
poll: Poll,
err: Option<Error>,
}
impl<S: PostSend, R: PostRecv> PostBox<S, R> {
pub fn to_server<A: Into<SocketAddr>>(addr: A) -> Result<Self, Error> {
Self::from_tcpstream(TcpStream::from_stream(std::net::TcpStream::connect(
&addr.into(),
)?)?)
}
fn from_tcpstream(tcp_stream: TcpStream) -> Result<Self, Error> {
let (ctrl_tx, ctrl_rx) = channel();
let (send_tx, send_rx) = channel();
let (recv_tx, recv_rx) = channel();
let worker_poll = Poll::new()?;
worker_poll.register(
&tcp_stream,
TCP_TOK,
Ready::readable() | Ready::writable(),
PollOpt::edge(),
)?;
worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?;
worker_poll.register(&send_rx, SEND_TOK, Ready::readable(), PollOpt::edge())?;
let postbox_poll = Poll::new()?;
postbox_poll.register(&recv_rx, RECV_TOK, Ready::readable(), PollOpt::edge())?;
let worker = thread::spawn(move || {
postbox_worker(worker_poll, tcp_stream, ctrl_rx, send_rx, recv_tx)
});
Ok(Self {
worker: Some(worker),
ctrl_tx,
send_tx,
recv_rx,
poll: postbox_poll,
err: None,
})
}
pub fn error(&self) -> Option<Error> { self.err.clone() }
pub fn send(&mut self, data: S) { let _ = self.send_tx.send(data); }
// TODO: This method is super messy.
pub fn next_message(&mut self) -> Option<R> {
if self.err.is_some() {
return None;
}
loop {
let mut events = Events::with_capacity(10);
if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) {
self.err = Some(err.into());
return None;
}
for event in events {
match event.token() {
// Keep reading new messages from the channel
RECV_TOK => loop {
match self.recv_rx.try_recv() {
Ok(Ok(msg)) => return Some(msg),
Err(TryRecvError::Empty) => break,
Err(err) => {
self.err = Some(err.into());
return None;
},
Ok(Err(err)) => {
self.err = Some(err);
return None;
},
}
},
tok => panic!("Unexpected event token '{:?}'", tok),
}
}
}
}
pub fn new_messages(&mut self) -> impl ExactSizeIterator<Item = R> {
let mut msgs = VecDeque::new();
if self.err.is_some() {
return msgs.into_iter();
}
let mut events = Events::with_capacity(64);
if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) {
self.err = Some(err.into());
return msgs.into_iter();
}
for event in events {
match event.token() {
// Keep reading new messages from the channel
RECV_TOK => loop {
match self.recv_rx.try_recv() {
Ok(Ok(msg)) => msgs.push_back(msg),
Err(TryRecvError::Empty) => break,
Err(err) => {
self.err = Some(err.into());
return msgs.into_iter();
},
Ok(Err(err)) => {
self.err = Some(err);
return msgs.into_iter();
},
}
},
tok => panic!("Unexpected event token '{:?}'", tok),
}
}
msgs.into_iter()
}
}
impl<S: PostSend, R: PostRecv> Drop for PostBox<S, R> {
fn drop(&mut self) {
let _ = self.ctrl_tx.send(CtrlMsg::Shutdown);
let _ = self.worker.take().map(|w| w.join());
}
}
fn postbox_worker<S: PostSend, R: PostRecv>(
poll: Poll,
mut tcp_stream: TcpStream,
ctrl_rx: Receiver<CtrlMsg>,
send_rx: Receiver<S>,
recv_tx: Sender<Result<R, Error>>,
) -> Result<(), Error> {
fn try_tcp_send(
tcp_stream: &mut TcpStream,
chunks: &mut VecDeque<Vec<u8>>,
) -> Result<(), Error> {
loop {
let chunk = match chunks.pop_front() {
Some(chunk) => chunk,
None => break,
};
match tcp_stream.write_all(&chunk) {
Ok(()) => {},
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
chunks.push_front(chunk);
break;
},
Err(err) => {
println!("Error: {:?}", err);
return Err(err.into());
},
}
}
Ok(())
}
enum RecvState {
ReadHead(Vec<u8>),
ReadBody(usize, Vec<u8>),
}
let mut recv_state = RecvState::ReadHead(Vec::new());
let mut chunks = VecDeque::new();
//let mut recv_state = RecvState::ReadHead(Vec::with_capacity(8));
let mut events = Events::with_capacity(64);
'work: loop {
if let Err(err) = poll.poll(&mut events, None) {
recv_tx.send(Err(err.into()))?;
break 'work;
}
for event in &events {
match event.token() {
CTRL_TOK => match ctrl_rx.try_recv() {
Ok(CtrlMsg::Shutdown) => {
break 'work;
},
Err(TryRecvError::Empty) => (),
Err(err) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
},
SEND_TOK => loop {
match send_rx.try_recv() {
Ok(outgoing_msg) => {
let mut msg_bytes = match bincode::serialize(&outgoing_msg) {
Ok(bytes) => bytes,
Err(err) => {
recv_tx.send(Err((*err).into()))?;
break 'work;
},
};
let mut bytes = msg_bytes.len().to_le_bytes().as_ref().to_vec();
bytes.append(&mut msg_bytes);
bytes
.chunks(1024)
.map(|chunk| chunk.to_vec())
.for_each(|chunk| chunks.push_back(chunk));
match try_tcp_send(&mut tcp_stream, &mut chunks) {
Ok(_) => {},
Err(err) => {
recv_tx.send(Err(err.into()))?;
return Err(Error::Network);
},
}
},
Err(TryRecvError::Empty) => break,
Err(err) => Err(err)?,
}
},
TCP_TOK => {
loop {
// Check TCP error
match tcp_stream.take_error() {
Ok(None) => {},
Ok(Some(err)) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
Err(err) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
}
match &mut recv_state {
RecvState::ReadHead(head) => {
if head.len() == 8 {
let len = usize::from_le_bytes(
<[u8; 8]>::try_from(head.as_slice()).unwrap(),
);
if len > MAX_MSG_BYTES {
println!("TOO BIG! {:x}", len);
recv_tx.send(Err(Error::InvalidMsg))?;
break 'work;
} else if len == 0 {
recv_state = RecvState::ReadHead(Vec::with_capacity(8));
break;
} else {
recv_state = RecvState::ReadBody(len, Vec::new());
}
} else {
let mut b = [0; 1];
match tcp_stream.read(&mut b) {
Ok(0) => {},
Ok(_) => head.push(b[0]),
Err(_) => break,
}
}
},
RecvState::ReadBody(len, body) => {
if body.len() == *len {
match bincode::deserialize(&body) {
Ok(msg) => {
recv_tx.send(Ok(msg))?;
recv_state = RecvState::ReadHead(Vec::with_capacity(8));
},
Err(err) => {
recv_tx.send(Err((*err).into()))?;
break 'work;
},
}
} else {
let left = *len - body.len();
let mut buf = vec![0; left];
match tcp_stream.read(&mut buf) {
Ok(_) => body.append(&mut buf),
Err(err) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
}
}
},
}
}
// Now, try sending TCP stuff
match try_tcp_send(&mut tcp_stream, &mut chunks) {
Ok(_) => {},
Err(err) => {
recv_tx.send(Err(err.into()))?;
return Err(Error::Network);
},
}
},
tok => panic!("Unexpected event token '{:?}'", tok),
}
}
}
//tcp_stream.shutdown(Shutdown::Both)?;
Ok(())
}
// TESTS
/*
#[derive(Serialize, Deserialize)]
struct TestMsg<T>(T);
#[test]
fn connect() {
let srv_addr = ([127, 0, 0, 1], 12345);
let mut postoffice = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(srv_addr).unwrap();
// We should start off with 0 incoming connections.
thread::sleep(Duration::from_millis(250));
assert_eq!(postoffice.new_connections().len(), 0);
assert_eq!(postoffice.error(), None);
let postbox = PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(srv_addr).unwrap();
// Now a postbox has been created, we should have 1 new.
thread::sleep(Duration::from_millis(250));
let incoming = postoffice.new_connections();
assert_eq!(incoming.len(), 1);
assert_eq!(postoffice.error(), None);
}
#[test]
fn connect_fail() {
let listen_addr = ([0; 4], 12345);
let connect_addr = ([127, 0, 0, 1], 12212);
let mut postoffice = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(listen_addr).unwrap();
// We should start off with 0 incoming connections.
thread::sleep(Duration::from_millis(250));
assert_eq!(postoffice.new_connections().len(), 0);
assert_eq!(postoffice.error(), None);
assert!(PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(connect_addr).is_err());
}
#[test]
fn connection_count() {
let srv_addr = ([127, 0, 0, 1], 12346);
let mut postoffice = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(srv_addr).unwrap();
let mut postboxes = Vec::new();
// We should start off with 0 incoming connections.
thread::sleep(Duration::from_millis(250));
assert_eq!(postoffice.new_connections().len(), 0);
assert_eq!(postoffice.error(), None);
for _ in 0..5 {
postboxes.push(PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(srv_addr).unwrap());
}
// 5 postboxes created, we should have 5.
thread::sleep(Duration::from_millis(3500));
let incoming = postoffice.new_connections();
assert_eq!(incoming.len(), 5);
assert_eq!(postoffice.error(), None);
}
#[test]
fn disconnect() {
let srv_addr = ([127, 0, 0, 1], 12347);
let mut postoffice = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(srv_addr).unwrap();
let mut server_postbox = {
let mut client_postbox = PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(srv_addr).unwrap();
thread::sleep(Duration::from_millis(250));
let mut incoming = postoffice.new_connections();
assert_eq!(incoming.len(), 1);
assert_eq!(postoffice.error(), None);
incoming.next().unwrap()
};
// The client postbox has since been disconnected.
thread::sleep(Duration::from_millis(2050));
let incoming_msgs = server_postbox.new_messages();
assert_eq!(incoming_msgs.len(), 0);
// TODO
// assert_eq!(server_postbox.error(), Some(Error::Disconnect));
}
#[test]
fn client_to_server() {
let srv_addr = ([127, 0, 0, 1], 12348);
let mut po = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(srv_addr).unwrap();
let mut client_pb = PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(srv_addr).unwrap();
thread::sleep(Duration::from_millis(250));
let mut server_pb = po.new_connections().next().unwrap();
client_pb.send(TestMsg(1337.0));
client_pb.send(TestMsg(9821.0));
client_pb.send(TestMsg(-3.2));
client_pb.send(TestMsg(17.0));
thread::sleep(Duration::from_millis(250));
let mut incoming_msgs = server_pb.new_messages();
assert_eq!(incoming_msgs.len(), 4);
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(1337.0));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(9821.0));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(-3.2));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(17.0));
}
#[test]
fn server_to_client() {
let srv_addr = ([127, 0, 0, 1], 12349);
let mut po = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(srv_addr).unwrap();
let mut client_pb = PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(srv_addr).unwrap();
thread::sleep(Duration::from_millis(250));
let mut server_pb = po.new_connections().next().unwrap();
server_pb.send(TestMsg(1337));
server_pb.send(TestMsg(9821));
server_pb.send(TestMsg(39999999));
server_pb.send(TestMsg(17));
thread::sleep(Duration::from_millis(250));
let mut incoming_msgs = client_pb.new_messages();
assert_eq!(incoming_msgs.len(), 4);
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(1337));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(9821));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(39999999));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(17));
}
*/

View File

@ -1,439 +0,0 @@
use crossbeam::channel;
use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::VecDeque,
convert::TryFrom,
io::{self, Read, Write},
marker::PhantomData,
net::{Shutdown, SocketAddr, TcpListener, TcpStream},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::Duration,
};
use tracing::warn;
#[derive(Clone, Debug)]
pub enum Error {
Io(Arc<io::Error>),
Bincode(Arc<bincode::Error>),
ChannelFailure,
InvalidMessage,
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Self { Error::Io(Arc::new(err)) }
}
impl From<bincode::Error> for Error {
fn from(err: bincode::Error) -> Self { Error::Bincode(Arc::new(err)) }
}
impl From<channel::TryRecvError> for Error {
fn from(_error: channel::TryRecvError) -> Self { Error::ChannelFailure }
}
pub trait PostMsg = Serialize + DeserializeOwned + 'static + Send;
const MAX_MSG_SIZE: usize = 1 << 28;
pub struct PostOffice<S: PostMsg, R: PostMsg> {
listener: TcpListener,
error: Option<Error>,
phantom: PhantomData<(S, R)>,
}
impl<S: PostMsg, R: PostMsg> PostOffice<S, R> {
pub fn bind<A: Into<SocketAddr>>(addr: A) -> Result<Self, Error> {
let listener = TcpListener::bind(addr.into())?;
listener.set_nonblocking(true)?;
Ok(Self {
listener,
error: None,
phantom: PhantomData,
})
}
pub fn error(&self) -> Option<Error> { self.error.clone() }
pub fn new_postboxes(&mut self) -> impl ExactSizeIterator<Item = PostBox<S, R>> {
let mut new = Vec::new();
if self.error.is_some() {
return new.into_iter();
}
loop {
match self.listener.accept() {
Ok((stream, _sock)) => new.push(PostBox::from_stream(stream).unwrap()),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) if e.kind() == io::ErrorKind::Interrupted => {},
Err(e) => {
self.error = Some(e.into());
break;
},
}
}
new.into_iter()
}
}
pub struct PostBox<S: PostMsg, R: PostMsg> {
send_tx: channel::Sender<S>,
recv_rx: channel::Receiver<Result<R, Error>>,
worker: Option<thread::JoinHandle<()>>,
running: Arc<AtomicBool>,
error: Option<Error>,
}
impl<S: PostMsg, R: PostMsg> PostBox<S, R> {
pub fn to<A: Into<SocketAddr>>(addr: A) -> Result<Self, Error> {
Self::from_stream(TcpStream::connect(addr.into())?)
}
fn from_stream(stream: TcpStream) -> Result<Self, Error> {
stream.set_nonblocking(true)?;
let running = Arc::new(AtomicBool::new(true));
let worker_running = running.clone();
let (send_tx, send_rx) = channel::unbounded();
let (recv_tx, recv_rx) = channel::unbounded();
let worker = thread::spawn(move || Self::worker(stream, send_rx, recv_tx, worker_running));
Ok(Self {
send_tx,
recv_rx,
worker: Some(worker),
running,
error: None,
})
}
pub fn error(&self) -> Option<Error> { self.error.clone() }
pub fn send_message(&mut self, msg: S) { let _ = self.send_tx.send(msg); }
pub fn next_message(&mut self) -> Result<R, Error> {
if let Some(e) = self.error.clone() {
return Err(e);
}
match self.recv_rx.recv().map_err(|_| Error::ChannelFailure)? {
Ok(msg) => Ok(msg),
Err(e) => {
self.error = Some(e.clone());
Err(e)
},
}
}
pub fn new_messages(&mut self) -> impl ExactSizeIterator<Item = R> {
let mut new = Vec::new();
if self.error.is_some() {
return new.into_iter();
}
loop {
match self.recv_rx.try_recv() {
Ok(Ok(msg)) => new.push(msg),
Err(channel::TryRecvError::Empty) => break,
Err(e) => {
self.error = Some(e.into());
break;
},
Ok(Err(e)) => {
self.error = Some(e);
break;
},
}
}
new.into_iter()
}
fn worker(
mut stream: TcpStream,
send_rx: channel::Receiver<S>,
recv_tx: channel::Sender<Result<R, Error>>,
running: Arc<AtomicBool>,
) {
let mut outgoing_chunks = VecDeque::new();
let mut incoming_buf = Vec::new();
'work: while running.load(Ordering::Relaxed) {
for _ in 0..30 {
// Get stream errors.
match stream.take_error() {
Ok(Some(e)) | Err(e) => {
recv_tx.send(Err(e.into())).unwrap();
break 'work;
},
Ok(None) => {},
}
// Try getting messages from the send channel.
for _ in 0..1000 {
match send_rx.try_recv() {
Ok(send_msg) => {
// Serialize message
let msg_bytes = bincode::serialize(&send_msg).unwrap();
let mut msg_bytes = lz4_compress::compress(&msg_bytes);
/*
if msg_bytes.len() > 512 {
println!("MSG SIZE: {}", msg_bytes.len());
}
*/
// Assemble into packet.
let mut packet_bytes =
(msg_bytes.len() as u64).to_le_bytes().as_ref().to_vec();
packet_bytes.push(msg_bytes.iter().fold(0, |a, x| a ^ *x));
packet_bytes.append(&mut msg_bytes);
// Split packet into chunks.
packet_bytes
.chunks(4096)
.map(|chunk| chunk.to_vec())
.for_each(|chunk| outgoing_chunks.push_back(chunk))
},
Err(channel::TryRecvError::Empty) => break,
// Worker error
Err(e) => {
let _ = recv_tx.send(Err(e.into()));
break 'work;
},
}
}
// Try sending bytes through the TCP stream.
for _ in 0..1000 {
match outgoing_chunks.pop_front() {
Some(mut chunk) => match stream.write(&chunk) {
Ok(n) if n == chunk.len() => {},
Ok(n) => {
outgoing_chunks.push_front(chunk.split_off(n));
break;
},
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// Return chunk to the queue to try again later.
outgoing_chunks.push_front(chunk);
break;
},
// Worker error
Err(e) => {
recv_tx.send(Err(e.into())).unwrap();
break 'work;
},
},
None => break,
}
}
// Try receiving bytes from the TCP stream.
for _ in 0..1000 {
let mut buf = [0; 4096];
match stream.read(&mut buf) {
Ok(n) => incoming_buf.extend_from_slice(&buf[0..n]),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) if e.kind() == io::ErrorKind::Interrupted => {},
// Worker error
Err(e) => {
recv_tx.send(Err(e.into())).unwrap();
break 'work;
},
}
}
// Try turning bytes into messages.
for _ in 0..1000 {
match incoming_buf.get(0..9) {
Some(len_bytes) => {
let len =
u64::from_le_bytes(<[u8; 8]>::try_from(&len_bytes[0..8]).unwrap())
as usize; // Can't fail
if len > MAX_MSG_SIZE {
recv_tx.send(Err(Error::InvalidMessage)).unwrap();
break 'work;
} else if incoming_buf.len() >= len + 9 {
let checksum_found =
incoming_buf[9..len + 9].iter().fold(0, |a, x| a ^ *x);
let checksum_expected = len_bytes[8];
assert_eq!(
checksum_found, checksum_expected,
"Message checksum failed!"
);
let msg_bytes =
lz4_compress::decompress(&incoming_buf[9..len + 9]).unwrap();
match bincode::deserialize(&msg_bytes) {
Ok(msg) => recv_tx.send(Ok(msg)).unwrap(),
Err(err) => {
println!("BINCODE ERROR: {:?}", err);
recv_tx.send(Err(err.into())).unwrap()
},
}
incoming_buf = incoming_buf.split_off(len + 9);
} else {
break;
}
},
None => break,
}
}
}
thread::sleep(Duration::from_millis(10));
}
if let Err(err) = stream.shutdown(Shutdown::Both) {
warn!("TCP worker stream shutdown failed: {:?}", err);
}
}
}
impl<S: PostMsg, R: PostMsg> Drop for PostBox<S, R> {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
self.worker.take().map(|handle| handle.join());
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
fn create_postoffice<S: PostMsg, R: PostMsg>(
id: u16,
) -> Result<(PostOffice<S, R>, SocketAddr), Error> {
let sock = ([0; 4], 12345 + id).into();
Ok((PostOffice::bind(sock)?, sock))
}
fn loop_for<F: FnMut()>(duration: Duration, mut f: F) {
let start = Instant::now();
while start.elapsed() < duration {
f();
}
}
#[test]
fn connect() {
let (mut postoffice, bound) = create_postoffice::<(), ()>(0).unwrap();
let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port());
let _client0 = PostBox::<(), ()>::to(sock).unwrap();
let _client1 = PostBox::<(), ()>::to(sock).unwrap();
let _client2 = PostBox::<(), ()>::to(sock).unwrap();
let mut new_clients = 0;
loop_for(Duration::from_millis(250), || {
new_clients += postoffice.new_postboxes().count();
});
assert_eq!(new_clients, 3);
}
/*
#[test]
fn disconnect() {
let (mut postoffice, sock) = create_postoffice::<(), ()>(1).unwrap();
let mut client = PostBox::<i32, ()>::to(sock).unwrap();
loop_for(Duration::from_millis(250), || ());
let mut server = postoffice.new_postboxes().unwrap().next().unwrap();
drop(client);
loop_for(Duration::from_millis(300), || ());
assert!(server.new_messages().is_err());
}
*/
#[test]
fn send_recv() {
let (mut postoffice, bound) = create_postoffice::<(), i32>(2).unwrap();
let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port());
let test_msgs = vec![1, 1337, 42, -48];
let mut client = PostBox::<i32, ()>::to(sock).unwrap();
loop_for(Duration::from_millis(250), || ());
let mut server = postoffice.new_postboxes().next().unwrap();
for msg in &test_msgs {
client.send_message(*msg);
}
let mut recv_msgs = Vec::new();
loop_for(Duration::from_millis(250), || {
server.new_messages().for_each(|msg| recv_msgs.push(msg))
});
assert_eq!(test_msgs, recv_msgs);
}
#[test]
#[ignore]
fn send_recv_huge() {
let (mut postoffice, bound) = create_postoffice::<(), Vec<i32>>(3).unwrap();
let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port());
let test_msgs: Vec<Vec<i32>> = (0..5)
.map(|i| (0..100000).map(|j| i * 2 + j).collect())
.collect();
let mut client = PostBox::<Vec<i32>, ()>::to(sock).unwrap();
loop_for(Duration::from_millis(250), || ());
let mut server = postoffice.new_postboxes().next().unwrap();
for msg in &test_msgs {
client.send_message(msg.clone());
}
let mut recv_msgs = Vec::new();
loop_for(Duration::from_millis(3000), || {
server.new_messages().for_each(|msg| recv_msgs.push(msg))
});
assert_eq!(test_msgs.len(), recv_msgs.len());
assert!(test_msgs == recv_msgs);
}
#[test]
fn send_recv_both() {
let (mut postoffice, bound) = create_postoffice::<u32, u32>(4).unwrap();
let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port());
let mut client = PostBox::<u32, u32>::to(sock).unwrap();
loop_for(Duration::from_millis(250), || ());
let mut server = postoffice.new_postboxes().next().unwrap();
let test_msgs = vec![
(0xDEADBEAD, 0xBEEEEEEF),
(0x1BADB002, 0xBAADF00D),
(0xBAADA555, 0xC0DED00D),
(0xCAFEBABE, 0xDEADC0DE),
];
for (to, from) in test_msgs {
client.send_message(to);
server.send_message(from);
loop_for(Duration::from_millis(250), || ());
assert_eq!(client.new_messages().next().unwrap(), from);
assert_eq!(server.new_messages().next().unwrap(), to);
}
}
}

View File

@ -11,6 +11,7 @@ default = ["worldgen"]
[dependencies]
common = { package = "veloren-common", path = "../common" }
world = { package = "veloren-world", path = "../world" }
network = { package = "veloren_network", path = "../network", default-features = false }
specs-idvs = { git = "https://gitlab.com/veloren/specs-idvs.git" }
@ -18,6 +19,9 @@ tracing = "0.1"
specs = { version = "0.15.1", features = ["shred-derive"] }
vek = "0.11.0"
uvth = "3.1.1"
futures-util = "0.3"
futures-executor = "0.3"
futures-timer = "3.0"
lazy_static = "1.4.0"
scan_fmt = "0.2.4"
ron = { version = "0.6", default-features = false }

View File

@ -1,7 +1,7 @@
use common::{
msg::{ClientMsg, ClientState, RequestStateError, ServerMsg},
net::PostBox,
msg::{ClientState, RequestStateError, ServerMsg},
};
use network::Stream;
use hashbrown::HashSet;
use specs::{Component, FlaggedStorage};
use specs_idvs::IDVStorage;
@ -9,7 +9,7 @@ use vek::*;
pub struct Client {
pub client_state: ClientState,
pub postbox: PostBox<ServerMsg, ClientMsg>,
pub singleton_stream: std::sync::Mutex<Stream>,
pub last_ping: f64,
pub login_msg_sent: bool,
}
@ -19,7 +19,7 @@ impl Component for Client {
}
impl Client {
pub fn notify(&mut self, msg: ServerMsg) { self.postbox.send_message(msg); }
pub fn notify(&mut self, msg: ServerMsg) { self.singleton_stream.lock().unwrap().send(msg); }
pub fn is_registered(&self) -> bool {
match self.client_state {
@ -37,13 +37,13 @@ impl Client {
pub fn allow_state(&mut self, new_state: ClientState) {
self.client_state = new_state;
self.postbox
.send_message(ServerMsg::StateAnswer(Ok(new_state)));
self.singleton_stream
.lock().unwrap().send(ServerMsg::StateAnswer(Ok(new_state)));
}
pub fn error_state(&mut self, error: RequestStateError) {
self.postbox
.send_message(ServerMsg::StateAnswer(Err((error, self.client_state))));
self.singleton_stream
.lock().unwrap().send(ServerMsg::StateAnswer(Err((error, self.client_state))));
}
}

View File

@ -1,11 +1,21 @@
use common::net::PostError;
use network::{NetworkError, ParticipantError, StreamError};
#[derive(Debug)]
pub enum Error {
Network(PostError),
NetworkErr(NetworkError),
ParticipantErr(ParticipantError),
StreamErr(StreamError),
Other(String),
}
impl From<PostError> for Error {
fn from(err: PostError) -> Self { Error::Network(err) }
impl From<NetworkError> for Error {
fn from(err: NetworkError) -> Self { Error::NetworkErr(err) }
}
impl From<ParticipantError> for Error {
fn from(err: ParticipantError) -> Self { Error::ParticipantErr(err) }
}
impl From<StreamError> for Error {
fn from(err: StreamError) -> Self { Error::StreamErr(err) }
}

View File

@ -31,13 +31,13 @@ use common::{
cmd::ChatCommand,
comp::{self, ChatType},
event::{EventBus, ServerEvent},
msg::{ClientMsg, ClientState, ServerInfo, ServerMsg},
net::PostOffice,
msg::{ClientState, ServerInfo, ServerMsg},
state::{State, TimeOfDay},
sync::WorldSyncExt,
terrain::TerrainChunkSize,
vol::{ReadVol, RectVolSize},
};
use network::{Network, Address, Pid};
use metrics::{ServerMetrics, TickMetrics};
use persistence::character::{CharacterLoader, CharacterLoaderResponseType, CharacterUpdater};
use specs::{join::Join, Builder, Entity as EcsEntity, RunNow, SystemData, WorldExt};
@ -47,6 +47,9 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use futures_util::{select, FutureExt};
use futures_executor::block_on;
use futures_timer::Delay;
#[cfg(not(feature = "worldgen"))]
use test_world::{World, WORLD_SIZE};
use tracing::{debug, error, info};
@ -77,7 +80,7 @@ pub struct Server {
world: Arc<World>,
map: Vec<u32>,
postoffice: PostOffice<ServerMsg, ClientMsg>,
network: Network,
thread_pool: ThreadPool,
@ -233,16 +236,19 @@ impl Server {
.run(settings.metrics_address)
.expect("Failed to initialize server metrics submodule.");
let thread_pool = ThreadPoolBuilder::new().name("veloren-worker".to_string()).build();
let (network, f) = Network::new(Pid::new(), None);
thread_pool.execute(f);
block_on(network.listen(Address::Tcp(settings.gameserver_address)))?;
let this = Self {
state,
world: Arc::new(world),
map,
postoffice: PostOffice::bind(settings.gameserver_address)?,
network,
thread_pool: ThreadPoolBuilder::new()
.name("veloren-worker".into())
.build(),
thread_pool,
metrics,
tick_metrics,
@ -329,17 +335,18 @@ impl Server {
// 1) Build up a list of events for this frame, to be passed to the frontend.
let mut frontend_events = Vec::new();
// If networking has problems, handle them.
if let Some(err) = self.postoffice.error() {
return Err(err.into());
}
// 2)
let before_new_connections = Instant::now();
// 3) Handle inputs from clients
frontend_events.append(&mut self.handle_new_connections()?);
block_on(async{
//TIMEOUT 0.01 ms for msg handling
let x = select!(
_ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()),
err = self.handle_new_connections(&mut frontend_events).fuse() => err,
);
});
let before_message_system = Instant::now();
@ -582,13 +589,14 @@ impl Server {
}
/// Handle new client connections.
fn handle_new_connections(&mut self) -> Result<Vec<Event>, Error> {
let mut frontend_events = Vec::new();
async fn handle_new_connections(&mut self, frontend_events: &mut Vec<Event>) -> Result<(), Error> {
loop {
let participant = self.network.connected().await?;
let singleton_stream = participant.opened().await?;
for postbox in self.postoffice.new_postboxes() {
let mut client = Client {
client_state: ClientState::Connected,
postbox,
singleton_stream: std::sync::Mutex::new(singleton_stream),
last_ping: self.state.get_time(),
login_msg_sent: false,
};
@ -626,8 +634,6 @@ impl Server {
frontend_events.push(Event::ClientConnected { entity });
}
}
Ok(frontend_events)
}
pub fn notify_client<S>(&self, entity: EcsEntity, msg: S)

View File

@ -41,8 +41,7 @@ impl<'a> System<'a> for Sys {
{
if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) {
client
.postbox
.send_message(ServerMsg::Notification(Notification::WaypointSaved));
.notify(ServerMsg::Notification(Notification::WaypointSaved));
}
}
}

View File

@ -1,5 +1,4 @@
use client::{error::Error as ClientError, Client};
use common::net::PostError;
use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError};
use std::{
net::ToSocketAddrs,
@ -98,12 +97,10 @@ impl ClientInit {
},
Err(err) => {
match err {
ClientError::Network(PostError::Bincode(_)) => {
ClientError::NetworkErr(_) => {
last_err = Some(Error::ClientError(err));
break 'tries;
},
// Assume the connection failed and try again soon
ClientError::Network(_) => {},
}
// Non-connection error, stop attempts
err => {
last_err = Some(Error::ClientError(err));

View File

@ -109,7 +109,17 @@ impl PlayState for MainMenuState {
client::Error::NotOnWhitelist => {
localized_strings.get("main.login.not_on_whitelist").into()
},
client::Error::Network(e) => format!(
client::Error::NetworkErr(e) => format!(
"{}: {:?}",
localized_strings.get("main.login.network_error"),
e
),
client::Error::ParticipantErr(e) => format!(
"{}: {:?}",
localized_strings.get("main.login.network_error"),
e
),
client::Error::StreamErr(e) => format!(
"{}: {:?}",
localized_strings.get("main.login.network_error"),
e

View File

@ -47,14 +47,14 @@ impl Singleplayer {
let paused = Arc::new(AtomicBool::new(false));
let paused1 = paused.clone();
let server = Server::new(settings2).expect("Failed to create server instance!");
let server = match thread_pool {
Some(pool) => server.with_thread_pool(pool),
None => server,
};
let thread = thread::spawn(move || {
let server = Server::new(settings2).expect("Failed to create server instance!");
let server = match thread_pool {
Some(pool) => server.with_thread_pool(pool),
None => server,
};
run_server(server, receiver, paused1);
});