diff --git a/Cargo.lock b/Cargo.lock index 97153bac97..b2d47b3ea8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,7 +163,7 @@ dependencies = [ "crossbeam-utils 0.7.2", "futures-core", "futures-io", - "futures-timer", + "futures-timer 2.0.2", "kv-log-macro", "log", "memchr", @@ -1370,6 +1370,12 @@ version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1de7508b218029b0f01662ed8f61b1c964b3ae99d6f25462d0f55a595109df6" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.5" @@ -4448,6 +4454,9 @@ version = "0.6.0" dependencies = [ "authc", "byteorder 1.3.4", + "futures-executor", + "futures-timer 3.0.2", + "futures-util", "hashbrown", "image", "num_cpus", @@ -4456,6 +4465,7 @@ dependencies = [ "uvth 3.1.1", "vek", "veloren-common", + "veloren_network", ] [[package]] @@ -4499,6 +4509,9 @@ dependencies = [ "diesel", "diesel_migrations", "dotenv", + "futures-executor", + "futures-timer 3.0.2", + "futures-util", "hashbrown", "lazy_static", "libsqlite3-sys", @@ -4519,6 +4532,7 @@ dependencies = [ "vek", "veloren-common", "veloren-world", + "veloren_network", ] [[package]] diff --git a/client/Cargo.toml b/client/Cargo.toml index 73fe3dc06e..7a3c36edfa 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -6,9 +6,13 @@ edition = "2018" [dependencies] common = { package = "veloren-common", path = "../common", features = ["no-assets"] } +network = { package = "veloren_network", path = "../network", default-features = false } byteorder = "1.3.2" uvth = "3.1.1" +futures-util = "0.3" +futures-executor = "0.3" +futures-timer = "3.0" image = { version = "0.22.3", default-features = false, features = ["png"] } num_cpus = "1.10.1" tracing = { version = "0.1", default-features = false } diff --git a/client/src/error.rs b/client/src/error.rs index 873b1ce2e8..cd816026e4 100644 --- a/client/src/error.rs +++ b/client/src/error.rs @@ -1,9 +1,11 @@ use authc::AuthClientError; -use common::net::PostError; +use network::{NetworkError, ParticipantError, StreamError}; #[derive(Debug)] pub enum Error { - Network(PostError), + NetworkErr(NetworkError), + ParticipantErr(ParticipantError), + StreamErr(StreamError), ServerWentMad, ServerTimeout, ServerShutdown, @@ -19,8 +21,16 @@ pub enum Error { Other(String), } -impl From for Error { - fn from(err: PostError) -> Self { Self::Network(err) } +impl From for Error { + fn from(err: NetworkError) -> Self { Self::NetworkErr(err) } +} + +impl From for Error { + fn from(err: ParticipantError) -> Self { Self::ParticipantErr(err) } +} + +impl From for Error { + fn from(err: StreamError) -> Self { Self::StreamErr(err) } } impl From for Error { diff --git a/client/src/lib.rs b/client/src/lib.rs index 64a75024f3..4719f2cdd7 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -25,12 +25,12 @@ use common::{ PlayerInfo, PlayerListUpdate, RegisterError, RequestStateError, ServerInfo, ServerMsg, MAX_BYTES_CHAT_MSG, }, - net::PostBox, state::State, sync::{Uid, UidAllocator, WorldSyncExt}, terrain::{block::Block, TerrainChunk, TerrainChunkSize}, vol::RectVolSize, }; +use network::{Network, Participant, Stream, Address, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED}; use hashbrown::HashMap; use image::DynamicImage; use std::{ @@ -38,6 +38,9 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; +use futures_util::{select, FutureExt}; +use futures_executor::block_on; +use futures_timer::Delay; use tracing::{debug, error, warn}; use uvth::{ThreadPool, ThreadPoolBuilder}; use vek::*; @@ -69,7 +72,9 @@ pub struct Client { pub character_list: CharacterList, pub active_character_id: Option, - postbox: PostBox, + _network: Network, + _participant: Arc, + singleton_stream: Stream, last_server_ping: f64, last_server_pong: f64, @@ -99,64 +104,80 @@ impl Client { /// Create a new `Client`. pub fn new>(addr: A, view_distance: Option) -> Result { let client_state = ClientState::Connected; - let mut postbox = PostBox::to(addr)?; + + let mut thread_pool = ThreadPoolBuilder::new() + .name("veloren-worker".into()) + .build(); + // We reduce the thread count by 1 to keep rendering smooth + thread_pool.set_num_threads((num_cpus::get() - 1).max(1)); + + let (network, f) = Network::new(Pid::new(), None); + thread_pool.execute(f); + + let participant = block_on(network.connect(Address::Tcp(addr.into())))?; + let mut stream = block_on(participant.open(10, PROMISES_ORDERED | PROMISES_CONSISTENCY))?; // Wait for initial sync - let (state, entity, server_info, world_map) = match postbox.next_message()? { - ServerMsg::InitialSync { - entity_package, - server_info, - time_of_day, - world_map: (map_size, world_map), - } => { - // TODO: Display that versions don't match in Voxygen - if &server_info.git_hash != *common::util::GIT_HASH { - warn!( - "Server is running {}[{}], you are running {}[{}], versions might be \ + let (state, entity, server_info, world_map) = block_on(async{ + loop { + match stream.recv().await? { + ServerMsg::InitialSync { + entity_package, + server_info, + time_of_day, + world_map: (map_size, world_map), + } => { + // TODO: Display that versions don't match in Voxygen + if &server_info.git_hash != *common::util::GIT_HASH { + warn!( + "Server is running {}[{}], you are running {}[{}], versions might be \ incompatible!", server_info.git_hash, server_info.git_date, common::util::GIT_HASH.to_string(), common::util::GIT_DATE.to_string(), ); - } + } - debug!("Auth Server: {:?}", server_info.auth_provider); + debug!("Auth Server: {:?}", server_info.auth_provider); - // Initialize `State` - let mut state = State::default(); - // Client-only components - state - .ecs_mut() - .register::>(); + // Initialize `State` + let mut state = State::default(); + // Client-only components + state + .ecs_mut() + .register::>(); - let entity = state.ecs_mut().apply_entity_package(entity_package); - *state.ecs_mut().write_resource() = time_of_day; + let entity = state.ecs_mut().apply_entity_package(entity_package); + *state.ecs_mut().write_resource() = time_of_day; - assert_eq!(world_map.len(), (map_size.x * map_size.y) as usize); - let mut world_map_raw = vec![0u8; 4 * world_map.len()/*map_size.x * map_size.y*/]; - LittleEndian::write_u32_into(&world_map, &mut world_map_raw); - debug!("Preparing image..."); - let world_map = Arc::new( - image::DynamicImage::ImageRgba8({ - // Should not fail if the dimensions are correct. - let world_map = - image::ImageBuffer::from_raw(map_size.x, map_size.y, world_map_raw); - world_map.ok_or_else(|| Error::Other("Server sent a bad world map image".into()))? - }) - // Flip the image, since Voxygen uses an orientation where rotation from - // positive x axis to positive y axis is counterclockwise around the z axis. - .flipv(), - ); - debug!("Done preparing image..."); + assert_eq!(world_map.len(), (map_size.x * map_size.y) as usize); + let mut world_map_raw = vec![0u8; 4 * world_map.len()/*map_size.x * map_size.y*/]; + LittleEndian::write_u32_into(&world_map, &mut world_map_raw); + debug!("Preparing image..."); + let world_map = Arc::new( + image::DynamicImage::ImageRgba8({ + // Should not fail if the dimensions are correct. + let world_map = + image::ImageBuffer::from_raw(map_size.x, map_size.y, world_map_raw); + world_map.ok_or_else(|| Error::Other("Server sent a bad world map image".into()))? + }) + // Flip the image, since Voxygen uses an orientation where rotation from + // positive x axis to positive y axis is counterclockwise around the z axis. + .flipv(), + ); + debug!("Done preparing image..."); - (state, entity, server_info, (world_map, map_size)) - }, - ServerMsg::TooManyPlayers => return Err(Error::TooManyPlayers), - _ => return Err(Error::ServerWentMad), - }; + break Ok((state, entity, server_info, (world_map, map_size))) + }, + ServerMsg::TooManyPlayers => break Err(Error::TooManyPlayers), + err => { + warn!("whoops, server mad {:?}, ignoring", err); + }, + }} + })?; - postbox.send_message(ClientMsg::Ping); + stream.send(ClientMsg::Ping)?; let mut thread_pool = ThreadPoolBuilder::new() .name("veloren-worker".into()) @@ -173,7 +194,9 @@ impl Client { character_list: CharacterList::default(), active_character_id: None, - postbox, + _network: network, + _participant: participant, + singleton_stream: stream, last_server_ping: 0.0, last_server_pong: 0.0, @@ -213,33 +236,39 @@ impl Client { } ).unwrap_or(Ok(username))?; - self.postbox.send_message(ClientMsg::Register { + self.singleton_stream.send(ClientMsg::Register { view_distance: self.view_distance, token_or_username, - }); + })?; self.client_state = ClientState::Pending; - loop { - match self.postbox.next_message()? { - ServerMsg::StateAnswer(Err((RequestStateError::RegisterDenied(err), state))) => { - self.client_state = state; - break Err(match err { - RegisterError::AlreadyLoggedIn => Error::AlreadyLoggedIn, - RegisterError::AuthError(err) => Error::AuthErr(err), - RegisterError::InvalidCharacter => Error::InvalidCharacter, - RegisterError::NotOnWhitelist => Error::NotOnWhitelist, - }); - }, - ServerMsg::StateAnswer(Ok(ClientState::Registered)) => break Ok(()), - _ => {}, + block_on( + async { + loop { + match self.singleton_stream.recv().await? { + ServerMsg::StateAnswer(Err((RequestStateError::RegisterDenied(err), state))) => { + self.client_state = state; + break Err(match err { + RegisterError::AlreadyLoggedIn => Error::AlreadyLoggedIn, + RegisterError::AuthError(err) => Error::AuthErr(err), + RegisterError::InvalidCharacter => Error::InvalidCharacter, + }) + }, + ServerMsg::StateAnswer(Ok(ClientState::Registered)) => break Ok(()), + ignore => { + warn!("Ignoring what the server send till registered: {:? }", ignore); + //return Err(Error::ServerWentMad) + }, + } + } } - } + ) } /// Request a state transition to `ClientState::Character`. pub fn request_character(&mut self, character_id: i32) { - self.postbox - .send_message(ClientMsg::Character(character_id)); + self.singleton_stream + .send(ClientMsg::Character(character_id)).unwrap(); self.active_character_id = Some(character_id); self.client_state = ClientState::Pending; @@ -248,73 +277,75 @@ impl Client { /// Load the current players character list pub fn load_character_list(&mut self) { self.character_list.loading = true; - self.postbox.send_message(ClientMsg::RequestCharacterList); + self.singleton_stream.send(ClientMsg::RequestCharacterList).unwrap(); } /// New character creation pub fn create_character(&mut self, alias: String, tool: Option, body: comp::Body) { self.character_list.loading = true; - self.postbox - .send_message(ClientMsg::CreateCharacter { alias, tool, body }); + self.singleton_stream + .send(ClientMsg::CreateCharacter { alias, tool, body }).unwrap(); } /// Character deletion pub fn delete_character(&mut self, character_id: i32) { self.character_list.loading = true; - self.postbox - .send_message(ClientMsg::DeleteCharacter(character_id)); + self.singleton_stream + .send(ClientMsg::DeleteCharacter(character_id)).unwrap(); } /// Send disconnect message to the server - pub fn request_logout(&mut self) { self.postbox.send_message(ClientMsg::Disconnect); } + pub fn request_logout(&mut self) { if let Err(e) = self.singleton_stream.send(ClientMsg::Disconnect) { + error!(?e, "couldn't send disconnect package to server, did server close already?"); + }} /// Request a state transition to `ClientState::Registered` from an ingame /// state. pub fn request_remove_character(&mut self) { - self.postbox.send_message(ClientMsg::ExitIngame); + self.singleton_stream.send(ClientMsg::ExitIngame).unwrap(); self.client_state = ClientState::Pending; } pub fn set_view_distance(&mut self, view_distance: u32) { self.view_distance = Some(view_distance.max(1).min(65)); - self.postbox - .send_message(ClientMsg::SetViewDistance(self.view_distance.unwrap())); + self.singleton_stream + .send(ClientMsg::SetViewDistance(self.view_distance.unwrap())).unwrap(); // Can't fail } pub fn use_slot(&mut self, slot: comp::slot::Slot) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Use(slot), - ))); + ))).unwrap(); } pub fn swap_slots(&mut self, a: comp::slot::Slot, b: comp::slot::Slot) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Swap(a, b), - ))); + ))).unwrap(); } pub fn drop_slot(&mut self, slot: comp::slot::Slot) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Drop(slot), - ))); + ))).unwrap(); } pub fn pick_up(&mut self, entity: EcsEntity) { if let Some(uid) = self.state.ecs().read_storage::().get(entity).copied() { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Pickup(uid), - ))); + ))).unwrap(); } } pub fn toggle_lantern(&mut self) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::ToggleLantern)); + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::ToggleLantern)).unwrap(); } pub fn is_mounted(&self) -> bool { @@ -327,8 +358,8 @@ impl Client { pub fn mount(&mut self, entity: EcsEntity) { if let Some(uid) = self.state.ecs().read_storage::().get(entity).copied() { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::Mount(uid))); + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::Mount(uid))).unwrap(); } } @@ -345,8 +376,8 @@ impl Client { .get(self.entity) .map_or(false, |s| s.is_dead) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::Respawn)); + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::Respawn)).unwrap(); } } @@ -428,8 +459,8 @@ impl Client { { controller.actions.push(control_action); } - self.postbox - .send_message(ClientMsg::ControlAction(control_action)); + self.singleton_stream + .send(ClientMsg::ControlAction(control_action)).unwrap(); } pub fn view_distance(&self) -> Option { self.view_distance } @@ -473,18 +504,18 @@ impl Client { } pub fn place_block(&mut self, pos: Vec3, block: Block) { - self.postbox.send_message(ClientMsg::PlaceBlock(pos, block)); + self.singleton_stream.send(ClientMsg::PlaceBlock(pos, block)).unwrap(); } pub fn remove_block(&mut self, pos: Vec3) { - self.postbox.send_message(ClientMsg::BreakBlock(pos)); + self.singleton_stream.send(ClientMsg::BreakBlock(pos)).unwrap(); } pub fn collect_block(&mut self, pos: Vec3) { - self.postbox - .send_message(ClientMsg::ControlEvent(ControlEvent::InventoryManip( + self.singleton_stream + .send(ClientMsg::ControlEvent(ControlEvent::InventoryManip( InventoryManip::Collect(pos), - ))); + ))).unwrap(); } /// Execute a single client tick, handle input and update the game state by @@ -539,8 +570,7 @@ impl Client { "Couldn't access controller component on client entity" ); } - self.postbox - .send_message(ClientMsg::ControllerInputs(inputs)); + self.singleton_stream.send(ClientMsg::ControllerInputs(inputs)).unwrap(); } // 2) Build up a list of events for this frame, to be passed to the frontend. @@ -637,8 +667,7 @@ impl Client { if self.state.terrain().get_key(*key).is_none() { if !skip_mode && !self.pending_chunks.contains_key(key) { if self.pending_chunks.len() < 4 { - self.postbox - .send_message(ClientMsg::TerrainChunkRequest { key: *key }); + self.singleton_stream.send(ClientMsg::TerrainChunkRequest { key: *key })?; self.pending_chunks.insert(*key, Instant::now()); } else { skip_mode = true; @@ -670,7 +699,7 @@ impl Client { // Send a ping to the server once every second if self.state.get_time() - self.last_server_ping > 1. { - self.postbox.send_message(ClientMsg::Ping); + self.singleton_stream.send(ClientMsg::Ping)?; self.last_server_ping = self.state.get_time(); } @@ -681,8 +710,7 @@ impl Client { self.state.read_storage().get(self.entity).cloned(), self.state.read_storage().get(self.entity).cloned(), ) { - self.postbox - .send_message(ClientMsg::PlayerPhysics { pos, vel, ori }); + self.singleton_stream.send(ClientMsg::PlayerPhysics { pos, vel, ori })?; } } @@ -1072,5 +1100,7 @@ impl Client { } impl Drop for Client { - fn drop(&mut self) { self.postbox.send_message(ClientMsg::Disconnect); } + fn drop(&mut self) { if let Err(e) = self.singleton_stream.send(ClientMsg::Disconnect) { + warn!("error during drop of client, couldn't send disconnect package, is the connection already closed? : {}", e); + } } } diff --git a/common/src/lib.rs b/common/src/lib.rs index a53cba2716..d3211b3abf 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -39,30 +39,4 @@ pub mod util; pub mod vol; pub mod volumes; -pub use loadout_builder::LoadoutBuilder; - -/// The networking module containing high-level wrappers of `TcpListener` and -/// `TcpStream` (`PostOffice` and `PostBox` respectively) and data types used by -/// both the server and client. # Examples -/// ``` -/// use std::net::SocketAddr; -/// use veloren_common::net::{PostBox, PostOffice}; -/// -/// let listen_addr = SocketAddr::from(([0, 0, 0, 0], 12345u16)); -/// let conn_addr = SocketAddr::from(([127, 0, 0, 1], 12345u16)); -/// -/// let mut server: PostOffice = PostOffice::bind(listen_addr).unwrap(); -/// let mut client: PostBox = PostBox::to(conn_addr).unwrap(); -/// std::thread::sleep(std::time::Duration::from_millis(100)); -/// -/// let mut scon = server.new_postboxes().next().unwrap(); -/// std::thread::sleep(std::time::Duration::from_millis(100)); -/// -/// scon.send_message(String::from("foo")); -/// client.send_message(String::from("bar")); -/// std::thread::sleep(std::time::Duration::from_millis(100)); -/// -/// assert_eq!("foo", client.next_message().unwrap()); -/// assert_eq!("bar", scon.next_message().unwrap()); -/// ``` -pub mod net; +pub use loadout_builder::LoadoutBuilder; \ No newline at end of file diff --git a/common/src/net/data.rs b/common/src/net/data.rs deleted file mode 100644 index ff16b1fe94..0000000000 --- a/common/src/net/data.rs +++ /dev/null @@ -1,19 +0,0 @@ -/// Messages server sends to client. -#[derive(Deserialize, Serialize, Debug)] -pub enum ServerMsg { - // VersionInfo MUST always stay first in this struct. - VersionInfo {}, -} - -/// Messages client sends to server. -#[derive(Deserialize, Serialize, Debug)] -pub enum ClientMsg { - // VersionInfo MUST always stay first in this struct. - VersionInfo {}, -} - -/// Control message type, used in [PostBox](super::PostBox) and -/// [PostOffice](super::PostOffice) to control threads. -pub enum ControlMsg { - Shutdown, -} diff --git a/common/src/net/mod.rs b/common/src/net/mod.rs deleted file mode 100644 index a27e61bbf2..0000000000 --- a/common/src/net/mod.rs +++ /dev/null @@ -1,14 +0,0 @@ -pub mod data; -//pub mod post; -pub mod post2; - -pub use post2 as post; - -// Reexports -pub use self::{ - data::{ClientMsg, ServerMsg}, - post::{Error as PostError, PostBox, PostOffice}, -}; - -pub trait PostSend = 'static + serde::Serialize + std::marker::Send + std::fmt::Debug; -pub trait PostRecv = 'static + serde::de::DeserializeOwned + std::marker::Send + std::fmt::Debug; diff --git a/common/src/net/post.rs b/common/src/net/post.rs deleted file mode 100644 index 5912e323e9..0000000000 --- a/common/src/net/post.rs +++ /dev/null @@ -1,628 +0,0 @@ -use bincode; -use middleman::Middleman; -use mio::{ - net::{TcpListener, TcpStream}, - Events, Poll, PollOpt, Ready, Token, -}; -use mio_extras::channel::{channel, Receiver, Sender}; -use serde; -use std::{ - collections::VecDeque, - convert::TryFrom, - fmt, - io::{self, Read, Write}, - net::{Shutdown, SocketAddr}, - sync::mpsc::TryRecvError, - thread, - time::Duration, -}; - -#[derive(Clone, Debug, PartialEq)] -pub enum Error { - Disconnect, - Network, - InvalidMsg, - Internal, -} - -impl From for Error { - fn from(err: io::Error) -> Self { Error::Network } -} - -impl From for Error { - fn from(err: TryRecvError) -> Self { Error::Internal } -} - -impl From for Error { - fn from(err: bincode::ErrorKind) -> Self { Error::InvalidMsg } -} - -impl From> for Error { - fn from(err: mio_extras::channel::SendError) -> Self { Error::Internal } -} - -pub trait PostSend = 'static + serde::Serialize + Send + middleman::Message; -pub trait PostRecv = 'static + serde::de::DeserializeOwned + Send + middleman::Message; - -const TCP_TOK: Token = Token(0); -const CTRL_TOK: Token = Token(1); -const POSTBOX_TOK: Token = Token(2); -const SEND_TOK: Token = Token(3); -const RECV_TOK: Token = Token(4); -const MIDDLEMAN_TOK: Token = Token(5); - -const MAX_MSG_BYTES: usize = 1 << 28; - -enum CtrlMsg { - Shutdown, -} - -pub struct PostOffice { - worker: Option>>, - ctrl_tx: Sender, - postbox_rx: Receiver, Error>>, - poll: Poll, - err: Option, -} - -impl PostOffice { - pub fn bind>(addr: A) -> Result { - let tcp_listener = TcpListener::bind(&addr.into())?; - - let (ctrl_tx, ctrl_rx) = channel(); - let (postbox_tx, postbox_rx) = channel(); - - let worker_poll = Poll::new()?; - worker_poll.register(&tcp_listener, TCP_TOK, Ready::readable(), PollOpt::edge())?; - worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?; - - let office_poll = Poll::new()?; - office_poll.register(&postbox_rx, POSTBOX_TOK, Ready::readable(), PollOpt::edge())?; - - let worker = - thread::spawn(move || office_worker(worker_poll, tcp_listener, ctrl_rx, postbox_tx)); - - Ok(Self { - worker: Some(worker), - ctrl_tx, - postbox_rx, - poll: office_poll, - err: None, - }) - } - - pub fn error(&self) -> Option { self.err.clone() } - - pub fn new_connections(&mut self) -> impl ExactSizeIterator> { - let mut conns = VecDeque::new(); - - if self.err.is_some() { - return conns.into_iter(); - } - - let mut events = Events::with_capacity(64); - if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) { - self.err = Some(err.into()); - return conns.into_iter(); - } - - for event in events { - match event.token() { - // Keep reading new postboxes from the channel - POSTBOX_TOK => loop { - match self.postbox_rx.try_recv() { - Ok(Ok(conn)) => conns.push_back(conn), - Err(TryRecvError::Empty) => break, - Err(err) => { - self.err = Some(err.into()); - return conns.into_iter(); - }, - Ok(Err(err)) => { - self.err = Some(err); - return conns.into_iter(); - }, - } - }, - tok => panic!("Unexpected event token '{:?}'", tok), - } - } - - conns.into_iter() - } -} - -impl Drop for PostOffice { - fn drop(&mut self) { - let _ = self.ctrl_tx.send(CtrlMsg::Shutdown); - let _ = self.worker.take().map(|w| w.join()); - } -} - -fn office_worker( - poll: Poll, - tcp_listener: TcpListener, - ctrl_rx: Receiver, - postbox_tx: Sender, Error>>, -) -> Result<(), Error> { - let mut events = Events::with_capacity(64); - loop { - if let Err(err) = poll.poll(&mut events, None) { - postbox_tx.send(Err(err.into()))?; - return Ok(()); - } - - for event in &events { - match event.token() { - CTRL_TOK => loop { - match ctrl_rx.try_recv() { - Ok(CtrlMsg::Shutdown) => return Ok(()), - Err(TryRecvError::Empty) => {}, - Err(err) => { - postbox_tx.send(Err(err.into()))?; - return Ok(()); - }, - } - }, - TCP_TOK => postbox_tx.send(match tcp_listener.accept() { - Ok((stream, _)) => PostBox::from_tcpstream(stream), - Err(err) => Err(err.into()), - })?, - tok => panic!("Unexpected event token '{:?}'", tok), - } - } - } -} - -pub struct PostBox { - worker: Option>>, - ctrl_tx: Sender, - send_tx: Sender, - recv_rx: Receiver>, - poll: Poll, - err: Option, -} - -impl PostBox { - pub fn to_server>(addr: A) -> Result { - Self::from_tcpstream(TcpStream::from_stream(std::net::TcpStream::connect( - &addr.into(), - )?)?) - } - - fn from_tcpstream(tcp_stream: TcpStream) -> Result { - let (ctrl_tx, ctrl_rx) = channel(); - let (send_tx, send_rx) = channel(); - let (recv_tx, recv_rx) = channel(); - - let worker_poll = Poll::new()?; - worker_poll.register( - &tcp_stream, - TCP_TOK, - Ready::readable() | Ready::writable(), - PollOpt::edge(), - )?; - worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?; - worker_poll.register(&send_rx, SEND_TOK, Ready::readable(), PollOpt::edge())?; - - let postbox_poll = Poll::new()?; - postbox_poll.register(&recv_rx, RECV_TOK, Ready::readable(), PollOpt::edge())?; - - let worker = thread::spawn(move || { - postbox_worker(worker_poll, tcp_stream, ctrl_rx, send_rx, recv_tx) - }); - - Ok(Self { - worker: Some(worker), - ctrl_tx, - send_tx, - recv_rx, - poll: postbox_poll, - err: None, - }) - } - - pub fn error(&self) -> Option { self.err.clone() } - - pub fn send(&mut self, data: S) { let _ = self.send_tx.send(data); } - - // TODO: This method is super messy. - pub fn next_message(&mut self) -> Option { - if self.err.is_some() { - return None; - } - - loop { - let mut events = Events::with_capacity(10); - if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) { - self.err = Some(err.into()); - return None; - } - - for event in events { - match event.token() { - // Keep reading new messages from the channel - RECV_TOK => loop { - match self.recv_rx.try_recv() { - Ok(Ok(msg)) => return Some(msg), - Err(TryRecvError::Empty) => break, - Err(err) => { - self.err = Some(err.into()); - return None; - }, - Ok(Err(err)) => { - self.err = Some(err); - return None; - }, - } - }, - tok => panic!("Unexpected event token '{:?}'", tok), - } - } - } - } - - pub fn new_messages(&mut self) -> impl ExactSizeIterator { - let mut msgs = VecDeque::new(); - - if self.err.is_some() { - return msgs.into_iter(); - } - - let mut events = Events::with_capacity(64); - if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) { - self.err = Some(err.into()); - return msgs.into_iter(); - } - - for event in events { - match event.token() { - // Keep reading new messages from the channel - RECV_TOK => loop { - match self.recv_rx.try_recv() { - Ok(Ok(msg)) => msgs.push_back(msg), - Err(TryRecvError::Empty) => break, - Err(err) => { - self.err = Some(err.into()); - return msgs.into_iter(); - }, - Ok(Err(err)) => { - self.err = Some(err); - return msgs.into_iter(); - }, - } - }, - tok => panic!("Unexpected event token '{:?}'", tok), - } - } - - msgs.into_iter() - } -} - -impl Drop for PostBox { - fn drop(&mut self) { - let _ = self.ctrl_tx.send(CtrlMsg::Shutdown); - let _ = self.worker.take().map(|w| w.join()); - } -} - -fn postbox_worker( - poll: Poll, - mut tcp_stream: TcpStream, - ctrl_rx: Receiver, - send_rx: Receiver, - recv_tx: Sender>, -) -> Result<(), Error> { - fn try_tcp_send( - tcp_stream: &mut TcpStream, - chunks: &mut VecDeque>, - ) -> Result<(), Error> { - loop { - let chunk = match chunks.pop_front() { - Some(chunk) => chunk, - None => break, - }; - - match tcp_stream.write_all(&chunk) { - Ok(()) => {}, - Err(err) if err.kind() == io::ErrorKind::WouldBlock => { - chunks.push_front(chunk); - break; - }, - Err(err) => { - println!("Error: {:?}", err); - return Err(err.into()); - }, - } - } - - Ok(()) - } - - enum RecvState { - ReadHead(Vec), - ReadBody(usize, Vec), - } - - let mut recv_state = RecvState::ReadHead(Vec::new()); - let mut chunks = VecDeque::new(); - - //let mut recv_state = RecvState::ReadHead(Vec::with_capacity(8)); - let mut events = Events::with_capacity(64); - - 'work: loop { - if let Err(err) = poll.poll(&mut events, None) { - recv_tx.send(Err(err.into()))?; - break 'work; - } - - for event in &events { - match event.token() { - CTRL_TOK => match ctrl_rx.try_recv() { - Ok(CtrlMsg::Shutdown) => { - break 'work; - }, - Err(TryRecvError::Empty) => (), - Err(err) => { - recv_tx.send(Err(err.into()))?; - break 'work; - }, - }, - SEND_TOK => loop { - match send_rx.try_recv() { - Ok(outgoing_msg) => { - let mut msg_bytes = match bincode::serialize(&outgoing_msg) { - Ok(bytes) => bytes, - Err(err) => { - recv_tx.send(Err((*err).into()))?; - break 'work; - }, - }; - - let mut bytes = msg_bytes.len().to_le_bytes().as_ref().to_vec(); - bytes.append(&mut msg_bytes); - - bytes - .chunks(1024) - .map(|chunk| chunk.to_vec()) - .for_each(|chunk| chunks.push_back(chunk)); - - match try_tcp_send(&mut tcp_stream, &mut chunks) { - Ok(_) => {}, - Err(err) => { - recv_tx.send(Err(err.into()))?; - return Err(Error::Network); - }, - } - }, - Err(TryRecvError::Empty) => break, - Err(err) => Err(err)?, - } - }, - TCP_TOK => { - loop { - // Check TCP error - match tcp_stream.take_error() { - Ok(None) => {}, - Ok(Some(err)) => { - recv_tx.send(Err(err.into()))?; - break 'work; - }, - Err(err) => { - recv_tx.send(Err(err.into()))?; - break 'work; - }, - } - match &mut recv_state { - RecvState::ReadHead(head) => { - if head.len() == 8 { - let len = usize::from_le_bytes( - <[u8; 8]>::try_from(head.as_slice()).unwrap(), - ); - if len > MAX_MSG_BYTES { - println!("TOO BIG! {:x}", len); - recv_tx.send(Err(Error::InvalidMsg))?; - break 'work; - } else if len == 0 { - recv_state = RecvState::ReadHead(Vec::with_capacity(8)); - break; - } else { - recv_state = RecvState::ReadBody(len, Vec::new()); - } - } else { - let mut b = [0; 1]; - match tcp_stream.read(&mut b) { - Ok(0) => {}, - Ok(_) => head.push(b[0]), - Err(_) => break, - } - } - }, - RecvState::ReadBody(len, body) => { - if body.len() == *len { - match bincode::deserialize(&body) { - Ok(msg) => { - recv_tx.send(Ok(msg))?; - recv_state = RecvState::ReadHead(Vec::with_capacity(8)); - }, - Err(err) => { - recv_tx.send(Err((*err).into()))?; - break 'work; - }, - } - } else { - let left = *len - body.len(); - let mut buf = vec![0; left]; - match tcp_stream.read(&mut buf) { - Ok(_) => body.append(&mut buf), - Err(err) => { - recv_tx.send(Err(err.into()))?; - break 'work; - }, - } - } - }, - } - } - - // Now, try sending TCP stuff - match try_tcp_send(&mut tcp_stream, &mut chunks) { - Ok(_) => {}, - Err(err) => { - recv_tx.send(Err(err.into()))?; - return Err(Error::Network); - }, - } - }, - tok => panic!("Unexpected event token '{:?}'", tok), - } - } - } - - //tcp_stream.shutdown(Shutdown::Both)?; - Ok(()) -} - -// TESTS - -/* -#[derive(Serialize, Deserialize)] -struct TestMsg(T); - -#[test] -fn connect() { - let srv_addr = ([127, 0, 0, 1], 12345); - - let mut postoffice = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - - // We should start off with 0 incoming connections. - thread::sleep(Duration::from_millis(250)); - assert_eq!(postoffice.new_connections().len(), 0); - assert_eq!(postoffice.error(), None); - - let postbox = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); - - // Now a postbox has been created, we should have 1 new. - thread::sleep(Duration::from_millis(250)); - let incoming = postoffice.new_connections(); - assert_eq!(incoming.len(), 1); - assert_eq!(postoffice.error(), None); -} - -#[test] -fn connect_fail() { - let listen_addr = ([0; 4], 12345); - let connect_addr = ([127, 0, 0, 1], 12212); - - let mut postoffice = PostOffice::, TestMsg>::bind(listen_addr).unwrap(); - - // We should start off with 0 incoming connections. - thread::sleep(Duration::from_millis(250)); - assert_eq!(postoffice.new_connections().len(), 0); - assert_eq!(postoffice.error(), None); - - assert!(PostBox::, TestMsg>::to_server(connect_addr).is_err()); -} - -#[test] -fn connection_count() { - let srv_addr = ([127, 0, 0, 1], 12346); - - let mut postoffice = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - let mut postboxes = Vec::new(); - - // We should start off with 0 incoming connections. - thread::sleep(Duration::from_millis(250)); - assert_eq!(postoffice.new_connections().len(), 0); - assert_eq!(postoffice.error(), None); - - for _ in 0..5 { - postboxes.push(PostBox::, TestMsg>::to_server(srv_addr).unwrap()); - } - - // 5 postboxes created, we should have 5. - thread::sleep(Duration::from_millis(3500)); - let incoming = postoffice.new_connections(); - assert_eq!(incoming.len(), 5); - assert_eq!(postoffice.error(), None); -} - -#[test] -fn disconnect() { - let srv_addr = ([127, 0, 0, 1], 12347); - - let mut postoffice = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - - let mut server_postbox = { - let mut client_postbox = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); - - thread::sleep(Duration::from_millis(250)); - let mut incoming = postoffice.new_connections(); - assert_eq!(incoming.len(), 1); - assert_eq!(postoffice.error(), None); - - incoming.next().unwrap() - }; - - // The client postbox has since been disconnected. - thread::sleep(Duration::from_millis(2050)); - let incoming_msgs = server_postbox.new_messages(); - assert_eq!(incoming_msgs.len(), 0); - // TODO - // assert_eq!(server_postbox.error(), Some(Error::Disconnect)); -} - -#[test] -fn client_to_server() { - let srv_addr = ([127, 0, 0, 1], 12348); - - let mut po = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - - let mut client_pb = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); - - thread::sleep(Duration::from_millis(250)); - - let mut server_pb = po.new_connections().next().unwrap(); - - client_pb.send(TestMsg(1337.0)); - client_pb.send(TestMsg(9821.0)); - client_pb.send(TestMsg(-3.2)); - client_pb.send(TestMsg(17.0)); - - thread::sleep(Duration::from_millis(250)); - - let mut incoming_msgs = server_pb.new_messages(); - assert_eq!(incoming_msgs.len(), 4); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(1337.0)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(9821.0)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(-3.2)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(17.0)); -} - -#[test] -fn server_to_client() { - let srv_addr = ([127, 0, 0, 1], 12349); - - let mut po = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - - let mut client_pb = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); - - thread::sleep(Duration::from_millis(250)); - - let mut server_pb = po.new_connections().next().unwrap(); - - server_pb.send(TestMsg(1337)); - server_pb.send(TestMsg(9821)); - server_pb.send(TestMsg(39999999)); - server_pb.send(TestMsg(17)); - - thread::sleep(Duration::from_millis(250)); - - let mut incoming_msgs = client_pb.new_messages(); - assert_eq!(incoming_msgs.len(), 4); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(1337)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(9821)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(39999999)); - assert_eq!(incoming_msgs.next().unwrap(), TestMsg(17)); -} -*/ diff --git a/common/src/net/post2.rs b/common/src/net/post2.rs deleted file mode 100644 index e4ba5b31cb..0000000000 --- a/common/src/net/post2.rs +++ /dev/null @@ -1,439 +0,0 @@ -use crossbeam::channel; -use serde::{de::DeserializeOwned, Serialize}; -use std::{ - collections::VecDeque, - convert::TryFrom, - io::{self, Read, Write}, - marker::PhantomData, - net::{Shutdown, SocketAddr, TcpListener, TcpStream}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread, - time::Duration, -}; -use tracing::warn; - -#[derive(Clone, Debug)] -pub enum Error { - Io(Arc), - Bincode(Arc), - ChannelFailure, - InvalidMessage, -} - -impl From for Error { - fn from(err: io::Error) -> Self { Error::Io(Arc::new(err)) } -} - -impl From for Error { - fn from(err: bincode::Error) -> Self { Error::Bincode(Arc::new(err)) } -} - -impl From for Error { - fn from(_error: channel::TryRecvError) -> Self { Error::ChannelFailure } -} - -pub trait PostMsg = Serialize + DeserializeOwned + 'static + Send; - -const MAX_MSG_SIZE: usize = 1 << 28; - -pub struct PostOffice { - listener: TcpListener, - error: Option, - phantom: PhantomData<(S, R)>, -} - -impl PostOffice { - pub fn bind>(addr: A) -> Result { - let listener = TcpListener::bind(addr.into())?; - listener.set_nonblocking(true)?; - - Ok(Self { - listener, - error: None, - phantom: PhantomData, - }) - } - - pub fn error(&self) -> Option { self.error.clone() } - - pub fn new_postboxes(&mut self) -> impl ExactSizeIterator> { - let mut new = Vec::new(); - - if self.error.is_some() { - return new.into_iter(); - } - - loop { - match self.listener.accept() { - Ok((stream, _sock)) => new.push(PostBox::from_stream(stream).unwrap()), - Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, - Err(e) if e.kind() == io::ErrorKind::Interrupted => {}, - Err(e) => { - self.error = Some(e.into()); - break; - }, - } - } - - new.into_iter() - } -} - -pub struct PostBox { - send_tx: channel::Sender, - recv_rx: channel::Receiver>, - worker: Option>, - running: Arc, - error: Option, -} - -impl PostBox { - pub fn to>(addr: A) -> Result { - Self::from_stream(TcpStream::connect(addr.into())?) - } - - fn from_stream(stream: TcpStream) -> Result { - stream.set_nonblocking(true)?; - - let running = Arc::new(AtomicBool::new(true)); - let worker_running = running.clone(); - - let (send_tx, send_rx) = channel::unbounded(); - let (recv_tx, recv_rx) = channel::unbounded(); - - let worker = thread::spawn(move || Self::worker(stream, send_rx, recv_tx, worker_running)); - - Ok(Self { - send_tx, - recv_rx, - worker: Some(worker), - running, - error: None, - }) - } - - pub fn error(&self) -> Option { self.error.clone() } - - pub fn send_message(&mut self, msg: S) { let _ = self.send_tx.send(msg); } - - pub fn next_message(&mut self) -> Result { - if let Some(e) = self.error.clone() { - return Err(e); - } - - match self.recv_rx.recv().map_err(|_| Error::ChannelFailure)? { - Ok(msg) => Ok(msg), - Err(e) => { - self.error = Some(e.clone()); - Err(e) - }, - } - } - - pub fn new_messages(&mut self) -> impl ExactSizeIterator { - let mut new = Vec::new(); - - if self.error.is_some() { - return new.into_iter(); - } - - loop { - match self.recv_rx.try_recv() { - Ok(Ok(msg)) => new.push(msg), - Err(channel::TryRecvError::Empty) => break, - Err(e) => { - self.error = Some(e.into()); - break; - }, - Ok(Err(e)) => { - self.error = Some(e); - break; - }, - } - } - - new.into_iter() - } - - fn worker( - mut stream: TcpStream, - send_rx: channel::Receiver, - recv_tx: channel::Sender>, - running: Arc, - ) { - let mut outgoing_chunks = VecDeque::new(); - let mut incoming_buf = Vec::new(); - - 'work: while running.load(Ordering::Relaxed) { - for _ in 0..30 { - // Get stream errors. - match stream.take_error() { - Ok(Some(e)) | Err(e) => { - recv_tx.send(Err(e.into())).unwrap(); - break 'work; - }, - Ok(None) => {}, - } - - // Try getting messages from the send channel. - for _ in 0..1000 { - match send_rx.try_recv() { - Ok(send_msg) => { - // Serialize message - let msg_bytes = bincode::serialize(&send_msg).unwrap(); - let mut msg_bytes = lz4_compress::compress(&msg_bytes); - - /* - if msg_bytes.len() > 512 { - println!("MSG SIZE: {}", msg_bytes.len()); - } - */ - - // Assemble into packet. - let mut packet_bytes = - (msg_bytes.len() as u64).to_le_bytes().as_ref().to_vec(); - packet_bytes.push(msg_bytes.iter().fold(0, |a, x| a ^ *x)); - packet_bytes.append(&mut msg_bytes); - - // Split packet into chunks. - packet_bytes - .chunks(4096) - .map(|chunk| chunk.to_vec()) - .for_each(|chunk| outgoing_chunks.push_back(chunk)) - }, - Err(channel::TryRecvError::Empty) => break, - // Worker error - Err(e) => { - let _ = recv_tx.send(Err(e.into())); - break 'work; - }, - } - } - - // Try sending bytes through the TCP stream. - for _ in 0..1000 { - match outgoing_chunks.pop_front() { - Some(mut chunk) => match stream.write(&chunk) { - Ok(n) if n == chunk.len() => {}, - Ok(n) => { - outgoing_chunks.push_front(chunk.split_off(n)); - break; - }, - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - // Return chunk to the queue to try again later. - outgoing_chunks.push_front(chunk); - break; - }, - // Worker error - Err(e) => { - recv_tx.send(Err(e.into())).unwrap(); - break 'work; - }, - }, - None => break, - } - } - - // Try receiving bytes from the TCP stream. - for _ in 0..1000 { - let mut buf = [0; 4096]; - - match stream.read(&mut buf) { - Ok(n) => incoming_buf.extend_from_slice(&buf[0..n]), - Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, - Err(e) if e.kind() == io::ErrorKind::Interrupted => {}, - // Worker error - Err(e) => { - recv_tx.send(Err(e.into())).unwrap(); - break 'work; - }, - } - } - - // Try turning bytes into messages. - for _ in 0..1000 { - match incoming_buf.get(0..9) { - Some(len_bytes) => { - let len = - u64::from_le_bytes(<[u8; 8]>::try_from(&len_bytes[0..8]).unwrap()) - as usize; // Can't fail - - if len > MAX_MSG_SIZE { - recv_tx.send(Err(Error::InvalidMessage)).unwrap(); - break 'work; - } else if incoming_buf.len() >= len + 9 { - let checksum_found = - incoming_buf[9..len + 9].iter().fold(0, |a, x| a ^ *x); - let checksum_expected = len_bytes[8]; - - assert_eq!( - checksum_found, checksum_expected, - "Message checksum failed!" - ); - - let msg_bytes = - lz4_compress::decompress(&incoming_buf[9..len + 9]).unwrap(); - - match bincode::deserialize(&msg_bytes) { - Ok(msg) => recv_tx.send(Ok(msg)).unwrap(), - Err(err) => { - println!("BINCODE ERROR: {:?}", err); - recv_tx.send(Err(err.into())).unwrap() - }, - } - - incoming_buf = incoming_buf.split_off(len + 9); - } else { - break; - } - }, - None => break, - } - } - } - - thread::sleep(Duration::from_millis(10)); - } - - if let Err(err) = stream.shutdown(Shutdown::Both) { - warn!("TCP worker stream shutdown failed: {:?}", err); - } - } -} - -impl Drop for PostBox { - fn drop(&mut self) { - self.running.store(false, Ordering::Relaxed); - self.worker.take().map(|handle| handle.join()); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::time::Instant; - - fn create_postoffice( - id: u16, - ) -> Result<(PostOffice, SocketAddr), Error> { - let sock = ([0; 4], 12345 + id).into(); - Ok((PostOffice::bind(sock)?, sock)) - } - - fn loop_for(duration: Duration, mut f: F) { - let start = Instant::now(); - while start.elapsed() < duration { - f(); - } - } - - #[test] - fn connect() { - let (mut postoffice, bound) = create_postoffice::<(), ()>(0).unwrap(); - let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port()); - - let _client0 = PostBox::<(), ()>::to(sock).unwrap(); - let _client1 = PostBox::<(), ()>::to(sock).unwrap(); - let _client2 = PostBox::<(), ()>::to(sock).unwrap(); - - let mut new_clients = 0; - loop_for(Duration::from_millis(250), || { - new_clients += postoffice.new_postboxes().count(); - }); - - assert_eq!(new_clients, 3); - } - - /* - #[test] - fn disconnect() { - let (mut postoffice, sock) = create_postoffice::<(), ()>(1).unwrap(); - - let mut client = PostBox::::to(sock).unwrap(); - loop_for(Duration::from_millis(250), || ()); - let mut server = postoffice.new_postboxes().unwrap().next().unwrap(); - - drop(client); - loop_for(Duration::from_millis(300), || ()); - - assert!(server.new_messages().is_err()); - } - */ - - #[test] - fn send_recv() { - let (mut postoffice, bound) = create_postoffice::<(), i32>(2).unwrap(); - let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port()); - let test_msgs = vec![1, 1337, 42, -48]; - - let mut client = PostBox::::to(sock).unwrap(); - loop_for(Duration::from_millis(250), || ()); - let mut server = postoffice.new_postboxes().next().unwrap(); - - for msg in &test_msgs { - client.send_message(*msg); - } - - let mut recv_msgs = Vec::new(); - loop_for(Duration::from_millis(250), || { - server.new_messages().for_each(|msg| recv_msgs.push(msg)) - }); - - assert_eq!(test_msgs, recv_msgs); - } - - #[test] - #[ignore] - fn send_recv_huge() { - let (mut postoffice, bound) = create_postoffice::<(), Vec>(3).unwrap(); - let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port()); - let test_msgs: Vec> = (0..5) - .map(|i| (0..100000).map(|j| i * 2 + j).collect()) - .collect(); - - let mut client = PostBox::, ()>::to(sock).unwrap(); - loop_for(Duration::from_millis(250), || ()); - let mut server = postoffice.new_postboxes().next().unwrap(); - - for msg in &test_msgs { - client.send_message(msg.clone()); - } - - let mut recv_msgs = Vec::new(); - loop_for(Duration::from_millis(3000), || { - server.new_messages().for_each(|msg| recv_msgs.push(msg)) - }); - - assert_eq!(test_msgs.len(), recv_msgs.len()); - assert!(test_msgs == recv_msgs); - } - - #[test] - fn send_recv_both() { - let (mut postoffice, bound) = create_postoffice::(4).unwrap(); - let sock = (std::net::Ipv4Addr::LOCALHOST, bound.port()); - let mut client = PostBox::::to(sock).unwrap(); - loop_for(Duration::from_millis(250), || ()); - let mut server = postoffice.new_postboxes().next().unwrap(); - - let test_msgs = vec![ - (0xDEADBEAD, 0xBEEEEEEF), - (0x1BADB002, 0xBAADF00D), - (0xBAADA555, 0xC0DED00D), - (0xCAFEBABE, 0xDEADC0DE), - ]; - - for (to, from) in test_msgs { - client.send_message(to); - server.send_message(from); - - loop_for(Duration::from_millis(250), || ()); - - assert_eq!(client.new_messages().next().unwrap(), from); - assert_eq!(server.new_messages().next().unwrap(), to); - } - } -} diff --git a/server/Cargo.toml b/server/Cargo.toml index 0a33b95e71..ade46b9c6c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,6 +11,7 @@ default = ["worldgen"] [dependencies] common = { package = "veloren-common", path = "../common" } world = { package = "veloren-world", path = "../world" } +network = { package = "veloren_network", path = "../network", default-features = false } specs-idvs = { git = "https://gitlab.com/veloren/specs-idvs.git" } @@ -18,6 +19,9 @@ tracing = "0.1" specs = { version = "0.15.1", features = ["shred-derive"] } vek = "0.11.0" uvth = "3.1.1" +futures-util = "0.3" +futures-executor = "0.3" +futures-timer = "3.0" lazy_static = "1.4.0" scan_fmt = "0.2.4" ron = { version = "0.6", default-features = false } diff --git a/server/src/client.rs b/server/src/client.rs index 994a5126ff..e02a48392d 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -1,7 +1,7 @@ use common::{ - msg::{ClientMsg, ClientState, RequestStateError, ServerMsg}, - net::PostBox, + msg::{ClientState, RequestStateError, ServerMsg}, }; +use network::Stream; use hashbrown::HashSet; use specs::{Component, FlaggedStorage}; use specs_idvs::IDVStorage; @@ -9,7 +9,7 @@ use vek::*; pub struct Client { pub client_state: ClientState, - pub postbox: PostBox, + pub singleton_stream: std::sync::Mutex, pub last_ping: f64, pub login_msg_sent: bool, } @@ -19,7 +19,7 @@ impl Component for Client { } impl Client { - pub fn notify(&mut self, msg: ServerMsg) { self.postbox.send_message(msg); } + pub fn notify(&mut self, msg: ServerMsg) { self.singleton_stream.lock().unwrap().send(msg); } pub fn is_registered(&self) -> bool { match self.client_state { @@ -37,13 +37,13 @@ impl Client { pub fn allow_state(&mut self, new_state: ClientState) { self.client_state = new_state; - self.postbox - .send_message(ServerMsg::StateAnswer(Ok(new_state))); + self.singleton_stream + .lock().unwrap().send(ServerMsg::StateAnswer(Ok(new_state))); } pub fn error_state(&mut self, error: RequestStateError) { - self.postbox - .send_message(ServerMsg::StateAnswer(Err((error, self.client_state)))); + self.singleton_stream + .lock().unwrap().send(ServerMsg::StateAnswer(Err((error, self.client_state)))); } } diff --git a/server/src/error.rs b/server/src/error.rs index e98ec644f7..a231b3ec34 100644 --- a/server/src/error.rs +++ b/server/src/error.rs @@ -1,11 +1,21 @@ -use common::net::PostError; +use network::{NetworkError, ParticipantError, StreamError}; #[derive(Debug)] pub enum Error { - Network(PostError), + NetworkErr(NetworkError), + ParticipantErr(ParticipantError), + StreamErr(StreamError), Other(String), } -impl From for Error { - fn from(err: PostError) -> Self { Error::Network(err) } +impl From for Error { + fn from(err: NetworkError) -> Self { Error::NetworkErr(err) } +} + +impl From for Error { + fn from(err: ParticipantError) -> Self { Error::ParticipantErr(err) } +} + +impl From for Error { + fn from(err: StreamError) -> Self { Error::StreamErr(err) } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 288537277a..898004616d 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -31,13 +31,13 @@ use common::{ cmd::ChatCommand, comp::{self, ChatType}, event::{EventBus, ServerEvent}, - msg::{ClientMsg, ClientState, ServerInfo, ServerMsg}, - net::PostOffice, + msg::{ClientState, ServerInfo, ServerMsg}, state::{State, TimeOfDay}, sync::WorldSyncExt, terrain::TerrainChunkSize, vol::{ReadVol, RectVolSize}, }; +use network::{Network, Address, Pid}; use metrics::{ServerMetrics, TickMetrics}; use persistence::character::{CharacterLoader, CharacterLoaderResponseType, CharacterUpdater}; use specs::{join::Join, Builder, Entity as EcsEntity, RunNow, SystemData, WorldExt}; @@ -47,6 +47,9 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; +use futures_util::{select, FutureExt}; +use futures_executor::block_on; +use futures_timer::Delay; #[cfg(not(feature = "worldgen"))] use test_world::{World, WORLD_SIZE}; use tracing::{debug, error, info}; @@ -77,7 +80,7 @@ pub struct Server { world: Arc, map: Vec, - postoffice: PostOffice, + network: Network, thread_pool: ThreadPool, @@ -233,16 +236,19 @@ impl Server { .run(settings.metrics_address) .expect("Failed to initialize server metrics submodule."); + let thread_pool = ThreadPoolBuilder::new().name("veloren-worker".to_string()).build(); + let (network, f) = Network::new(Pid::new(), None); + thread_pool.execute(f); + block_on(network.listen(Address::Tcp(settings.gameserver_address)))?; + let this = Self { state, world: Arc::new(world), map, - postoffice: PostOffice::bind(settings.gameserver_address)?, + network, - thread_pool: ThreadPoolBuilder::new() - .name("veloren-worker".into()) - .build(), + thread_pool, metrics, tick_metrics, @@ -329,17 +335,18 @@ impl Server { // 1) Build up a list of events for this frame, to be passed to the frontend. let mut frontend_events = Vec::new(); - // If networking has problems, handle them. - if let Some(err) = self.postoffice.error() { - return Err(err.into()); - } - // 2) let before_new_connections = Instant::now(); // 3) Handle inputs from clients - frontend_events.append(&mut self.handle_new_connections()?); + block_on(async{ + //TIMEOUT 0.01 ms for msg handling + let x = select!( + _ = Delay::new(std::time::Duration::from_micros(10)).fuse() => Ok(()), + err = self.handle_new_connections(&mut frontend_events).fuse() => err, + ); + }); let before_message_system = Instant::now(); @@ -582,13 +589,14 @@ impl Server { } /// Handle new client connections. - fn handle_new_connections(&mut self) -> Result, Error> { - let mut frontend_events = Vec::new(); + async fn handle_new_connections(&mut self, frontend_events: &mut Vec) -> Result<(), Error> { + loop { + let participant = self.network.connected().await?; + let singleton_stream = participant.opened().await?; - for postbox in self.postoffice.new_postboxes() { let mut client = Client { client_state: ClientState::Connected, - postbox, + singleton_stream: std::sync::Mutex::new(singleton_stream), last_ping: self.state.get_time(), login_msg_sent: false, }; @@ -626,8 +634,6 @@ impl Server { frontend_events.push(Event::ClientConnected { entity }); } } - - Ok(frontend_events) } pub fn notify_client(&self, entity: EcsEntity, msg: S) diff --git a/server/src/sys/waypoint.rs b/server/src/sys/waypoint.rs index c7b3ebbfb7..7ee8ff1253 100644 --- a/server/src/sys/waypoint.rs +++ b/server/src/sys/waypoint.rs @@ -41,8 +41,7 @@ impl<'a> System<'a> for Sys { { if wp_old.map_or(true, |w| w.elapsed(*time) > NOTIFY_TIME) { client - .postbox - .send_message(ServerMsg::Notification(Notification::WaypointSaved)); + .notify(ServerMsg::Notification(Notification::WaypointSaved)); } } } diff --git a/voxygen/src/menu/main/client_init.rs b/voxygen/src/menu/main/client_init.rs index 02c171ac5b..455ced5066 100644 --- a/voxygen/src/menu/main/client_init.rs +++ b/voxygen/src/menu/main/client_init.rs @@ -1,5 +1,4 @@ use client::{error::Error as ClientError, Client}; -use common::net::PostError; use crossbeam::channel::{unbounded, Receiver, Sender, TryRecvError}; use std::{ net::ToSocketAddrs, @@ -98,12 +97,10 @@ impl ClientInit { }, Err(err) => { match err { - ClientError::Network(PostError::Bincode(_)) => { + ClientError::NetworkErr(_) => { last_err = Some(Error::ClientError(err)); break 'tries; - }, - // Assume the connection failed and try again soon - ClientError::Network(_) => {}, + } // Non-connection error, stop attempts err => { last_err = Some(Error::ClientError(err)); diff --git a/voxygen/src/menu/main/mod.rs b/voxygen/src/menu/main/mod.rs index 0cdc9b6e17..cdd9410545 100644 --- a/voxygen/src/menu/main/mod.rs +++ b/voxygen/src/menu/main/mod.rs @@ -109,7 +109,17 @@ impl PlayState for MainMenuState { client::Error::NotOnWhitelist => { localized_strings.get("main.login.not_on_whitelist").into() }, - client::Error::Network(e) => format!( + client::Error::NetworkErr(e) => format!( + "{}: {:?}", + localized_strings.get("main.login.network_error"), + e + ), + client::Error::ParticipantErr(e) => format!( + "{}: {:?}", + localized_strings.get("main.login.network_error"), + e + ), + client::Error::StreamErr(e) => format!( "{}: {:?}", localized_strings.get("main.login.network_error"), e diff --git a/voxygen/src/singleplayer.rs b/voxygen/src/singleplayer.rs index aee967caf4..7184d39dd5 100644 --- a/voxygen/src/singleplayer.rs +++ b/voxygen/src/singleplayer.rs @@ -47,14 +47,14 @@ impl Singleplayer { let paused = Arc::new(AtomicBool::new(false)); let paused1 = paused.clone(); + let server = Server::new(settings2).expect("Failed to create server instance!"); + + let server = match thread_pool { + Some(pool) => server.with_thread_pool(pool), + None => server, + }; + let thread = thread::spawn(move || { - let server = Server::new(settings2).expect("Failed to create server instance!"); - - let server = match thread_pool { - Some(pool) => server.with_thread_pool(pool), - None => server, - }; - run_server(server, receiver, paused1); });