From f4ce38e58a10bb3d6a16f0b7292f7d1e666698a6 Mon Sep 17 00:00:00 2001 From: Joshua Barretto Date: Thu, 11 Apr 2019 23:26:43 +0100 Subject: [PATCH] Rewrote netcode, added basic chunk synching Former-commit-id: e9f76f7fa9dbe0c81cd4c998bf0f0b3eec9235cb --- chat-cli/src/main.rs | 2 +- client/src/error.rs | 5 +- client/src/lib.rs | 43 +++- common/Cargo.toml | 1 - common/src/lib.rs | 2 +- common/src/msg/client.rs | 2 + common/src/msg/server.rs | 4 +- common/src/net/mod.rs | 5 +- common/src/net/post.rs | 244 +++++++++++++-------- common/src/net/post2.rs | 402 ++++++++++++++++++++++++++++++++++ common/src/volumes/vol_map.rs | 4 + server/src/client.rs | 2 +- server/src/lib.rs | 12 +- 13 files changed, 608 insertions(+), 120 deletions(-) create mode 100644 common/src/net/post2.rs diff --git a/chat-cli/src/main.rs b/chat-cli/src/main.rs index deb5cb126c..2bbc51a388 100644 --- a/chat-cli/src/main.rs +++ b/chat-cli/src/main.rs @@ -18,7 +18,7 @@ fn main() { let mut clock = Clock::new(); // Create client - let mut client = Client::new(([127, 0, 0, 1], 59003), comp::Player::new("test".to_string()), None) + let mut client = Client::new(([127, 0, 0, 1], 59003), comp::Player::new("test".to_string()), None, 300) .expect("Failed to create client instance"); client.send_chat("Hello!".to_string()); diff --git a/client/src/error.rs b/client/src/error.rs index b4b3ce4742..1f74ae62b9 100644 --- a/client/src/error.rs +++ b/client/src/error.rs @@ -11,9 +11,6 @@ pub enum Error { impl From for Error { fn from(err: PostError) -> Self { - match err { - PostError::Disconnect => Error::ServerShutdown, - err => Error::Network(err), - } + Error::Network(err) } } diff --git a/client/src/lib.rs b/client/src/lib.rs index c20cca643c..1d5bdc1a00 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -13,6 +13,7 @@ pub use crate::{ use std::{ time::Duration, net::SocketAddr, + collections::HashSet, }; use vek::*; use threadpool::ThreadPool; @@ -41,6 +42,8 @@ pub struct Client { state: State, player: Option, view_distance: u64, + + pending_chunks: HashSet>, } impl Client { @@ -53,10 +56,10 @@ impl Client { view_distance: u64, ) -> Result { - let mut postbox = PostBox::to_server(addr)?; + let mut postbox = PostBox::to(addr)?; // Send connection request - postbox.send(ClientMsg::Connect { + postbox.send_message(ClientMsg::Connect { player, character, }); @@ -83,6 +86,8 @@ impl Client { state, player, view_distance, + + pending_chunks: HashSet::new(), }) } @@ -115,7 +120,7 @@ impl Client { /// Send a chat message to the server #[allow(dead_code)] pub fn send_chat(&mut self, msg: String) { - self.postbox.send(ClientMsg::Chat(msg)) + self.postbox.send_message(ClientMsg::Chat(msg)) } /// Execute a single client tick, handle input and update the game state by the given duration @@ -161,12 +166,31 @@ impl Client { self.state.read_storage().get(ecs_entity).cloned(), ) { (Some(pos), Some(vel), Some(dir)) => { - self.postbox.send(ClientMsg::PlayerPhysics { pos, vel, dir }); + self.postbox.send_message(ClientMsg::PlayerPhysics { pos, vel, dir }); }, _ => {}, } } + // Request chunks from the server + if let Some(player_entity) = self.player { + if let Some(pos) = self.state.read_storage::().get(player_entity) { + let chunk_pos = self.state.terrain().pos_key(pos.0.map(|e| e as i32)); + + for i in chunk_pos.x - 0..chunk_pos.x + 1 { + for j in chunk_pos.y - 0..chunk_pos.y + 1 { + for k in 0..3 { + let key = chunk_pos + Vec3::new(i, j, k); + if self.state.terrain().get_key(key).is_none() && !self.pending_chunks.contains(&key) { + self.postbox.send_message(ClientMsg::TerrainChunkRequest { key }); + self.pending_chunks.insert(key); + } + } + } + } + } + } + // Finish the tick, pass control back to the frontend (step 6) self.tick += 1; Ok(frontend_events) @@ -193,12 +217,15 @@ impl Client { match msg { ServerMsg::Handshake { .. } => return Err(Error::ServerWentMad), ServerMsg::Shutdown => return Err(Error::ServerShutdown), - ServerMsg::Ping => self.postbox.send(ClientMsg::Pong), + ServerMsg::Ping => self.postbox.send_message(ClientMsg::Pong), ServerMsg::Pong => {}, ServerMsg::Chat(msg) => frontend_events.push(Event::Chat(msg)), ServerMsg::SetPlayerEntity(uid) => self.player = Some(self.state.ecs().entity_from_uid(uid).unwrap()), // TODO: Don't unwrap here! ServerMsg::EcsSync(sync_package) => self.state.ecs_mut().sync_with_package(sync_package), - ServerMsg::TerrainChunkUpdate { key, chunk } => self.state.insert_chunk(key, chunk), + ServerMsg::TerrainChunkUpdate { key, chunk } => { + self.state.insert_chunk(key, *chunk); + self.pending_chunks.remove(&key); + }, } } } else if let Some(err) = self.postbox.error() { @@ -207,7 +234,7 @@ impl Client { return Err(Error::ServerTimeout); } else if self.state.get_time() - self.last_ping > SERVER_TIMEOUT * 0.5 { // Try pinging the server if the timeout is nearing - self.postbox.send(ClientMsg::Ping); + self.postbox.send_message(ClientMsg::Ping); } Ok(frontend_events) @@ -216,6 +243,6 @@ impl Client { impl Drop for Client { fn drop(&mut self) { - self.postbox.send(ClientMsg::Disconnect); + self.postbox.send_message(ClientMsg::Disconnect); } } diff --git a/common/Cargo.toml b/common/Cargo.toml index 75c032bf18..8c82e841a3 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -18,4 +18,3 @@ serde = "1.0" serde_derive = "1.0" bincode = "1.0" log = "0.4" -pretty_env_logger = "0.3" diff --git a/common/src/lib.rs b/common/src/lib.rs index 72bcade3f8..146e272351 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -1,4 +1,4 @@ -#![feature(euclidean_division, duration_float, trait_alias)] +#![feature(euclidean_division, duration_float, trait_alias, bind_by_move_pattern_guards)] #[macro_use] extern crate serde_derive; diff --git a/common/src/msg/client.rs b/common/src/msg/client.rs index 5a599633d7..ffc941b1b6 100644 --- a/common/src/msg/client.rs +++ b/common/src/msg/client.rs @@ -20,3 +20,5 @@ pub enum ClientMsg { }, Disconnect, } + +impl middleman::Message for ClientMsg {} diff --git a/common/src/msg/server.rs b/common/src/msg/server.rs index db0ae2ed81..bdd86945ca 100644 --- a/common/src/msg/server.rs +++ b/common/src/msg/server.rs @@ -16,6 +16,8 @@ pub enum ServerMsg { EcsSync(sphynx::SyncPackage), TerrainChunkUpdate { key: Vec3, - chunk: TerrainChunk, + chunk: Box, }, } + +impl middleman::Message for ServerMsg {} diff --git a/common/src/net/mod.rs b/common/src/net/mod.rs index c1c7fd4107..11829e32fa 100644 --- a/common/src/net/mod.rs +++ b/common/src/net/mod.rs @@ -1,5 +1,8 @@ pub mod data; -pub mod post; +//pub mod post; +pub mod post2; + +pub use post2 as post; // Reexports pub use self::{ diff --git a/common/src/net/post.rs b/common/src/net/post.rs index 6338424912..38a8533186 100644 --- a/common/src/net/post.rs +++ b/common/src/net/post.rs @@ -23,6 +23,7 @@ use mio_extras::channel::{ Sender, }; use bincode; +use middleman::Middleman; #[derive(Clone, Debug, PartialEq)] pub enum Error { @@ -56,14 +57,15 @@ impl From> for Error { } } -pub trait PostSend = 'static + serde::Serialize + Send; -pub trait PostRecv = 'static + serde::de::DeserializeOwned + Send; +pub trait PostSend = 'static + serde::Serialize + Send + middleman::Message; +pub trait PostRecv = 'static + serde::de::DeserializeOwned + Send + middleman::Message; -const TCP_TOK: Token = Token(0); -const CTRL_TOK: Token = Token(1); -const POSTBOX_TOK: Token = Token(2); -const SEND_TOK: Token = Token(3); -const RECV_TOK: Token = Token(4); +const TCP_TOK: Token = Token(0); +const CTRL_TOK: Token = Token(1); +const POSTBOX_TOK: Token = Token(2); +const SEND_TOK: Token = Token(3); +const RECV_TOK: Token = Token(4); +const MIDDLEMAN_TOK: Token = Token(5); const MAX_MSG_BYTES: usize = 1 << 20; @@ -218,7 +220,7 @@ impl PostBox { let (recv_tx, recv_rx) = channel(); let worker_poll = Poll::new()?; - worker_poll.register(&tcp_stream, TCP_TOK, Ready::readable(), PollOpt::edge())?; + worker_poll.register(&tcp_stream, TCP_TOK, Ready::readable() | Ready::writable(), PollOpt::edge())?; worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?; worker_poll.register(&send_rx, SEND_TOK, Ready::readable(), PollOpt::edge())?; @@ -345,12 +347,38 @@ fn postbox_worker( send_rx: Receiver, recv_tx: Sender>, ) -> Result<(), Error> { + fn try_tcp_send(tcp_stream: &mut TcpStream, chunks: &mut VecDeque>) -> Result<(), Error> { + loop { + let chunk = match chunks.pop_front() { + Some(chunk) => chunk, + None => break, + }; + + match tcp_stream.write_all(&chunk) { + Ok(()) => {}, + Err(err) if err.kind() == io::ErrorKind::WouldBlock => { + chunks.push_front(chunk); + break; + }, + Err(err) => { + println!("Error: {:?}", err); + return Err(err.into()); + }, + } + } + + Ok(()) + } + enum RecvState { ReadHead(Vec), ReadBody(usize, Vec), } - let mut recv_state = RecvState::ReadHead(Vec::with_capacity(8)); + let mut recv_state = RecvState::ReadHead(Vec::new()); + let mut chunks = VecDeque::new(); + + //let mut recv_state = RecvState::ReadHead(Vec::with_capacity(8)); let mut events = Events::with_capacity(64); 'work: loop { @@ -383,18 +411,23 @@ fn postbox_worker( }, }; - let mut packet = msg_bytes + let mut bytes = msg_bytes .len() .to_le_bytes() .as_ref() .to_vec(); - packet.append(&mut msg_bytes); + bytes.append(&mut msg_bytes); - match tcp_stream.write_all(&packet) { - Ok(()) => {}, + bytes + .chunks(1024) + .map(|chunk| chunk.to_vec()) + .for_each(|chunk| chunks.push_back(chunk)); + + match try_tcp_send(&mut tcp_stream, &mut chunks) { + Ok(_) => {}, Err(err) => { recv_tx.send(Err(err.into()))?; - break 'work; + return Err(Error::Network); }, } }, @@ -402,61 +435,75 @@ fn postbox_worker( Err(err) => Err(err)?, } }, - TCP_TOK => loop { - match tcp_stream.take_error() { - Ok(None) => {}, - Ok(Some(err)) => { - recv_tx.send(Err(err.into()))?; - break 'work; - }, + TCP_TOK => { + loop { + // Check TCP error + match tcp_stream.take_error() { + Ok(None) => {}, + Ok(Some(err)) => { + recv_tx.send(Err(err.into()))?; + break 'work; + }, + Err(err) => { + recv_tx.send(Err(err.into()))?; + break 'work; + }, + } + match &mut recv_state { + RecvState::ReadHead(head) => if head.len() == 8 { + let len = usize::from_le_bytes(<[u8; 8]>::try_from(head.as_slice()).unwrap()); + if len > MAX_MSG_BYTES { + println!("TOO BIG! {:x}", len); + recv_tx.send(Err(Error::InvalidMsg))?; + break 'work; + } else if len == 0 { + recv_state = RecvState::ReadHead(Vec::with_capacity(8)); + break; + } else { + recv_state = RecvState::ReadBody( + len, + Vec::new(), + ); + } + } else { + let mut b = [0; 1]; + match tcp_stream.read(&mut b) { + Ok(0) => {}, + Ok(_) => head.push(b[0]), + Err(_) => break, + } + }, + RecvState::ReadBody(len, body) => if body.len() == *len { + match bincode::deserialize(&body) { + Ok(msg) => { + recv_tx.send(Ok(msg))?; + recv_state = RecvState::ReadHead(Vec::with_capacity(8)); + }, + Err(err) => { + recv_tx.send(Err((*err).into()))?; + break 'work; + }, + } + } else { + let left = *len - body.len(); + let mut buf = vec![0; left]; + match tcp_stream.read(&mut buf) { + Ok(_) => body.append(&mut buf), + Err(err) => { + recv_tx.send(Err(err.into()))?; + break 'work; + }, + } + }, + } + } + + // Now, try sending TCP stuff + match try_tcp_send(&mut tcp_stream, &mut chunks) { + Ok(_) => {}, Err(err) => { recv_tx.send(Err(err.into()))?; - break 'work; - }, - } - match &mut recv_state { - RecvState::ReadHead(head) => if head.len() == 8 { - let len = usize::from_le_bytes(<[u8; 8]>::try_from(head.as_slice()).unwrap()); - if len > MAX_MSG_BYTES { - recv_tx.send(Err(Error::InvalidMsg))?; - break 'work; - } else if len == 0 { - recv_state = RecvState::ReadHead(Vec::with_capacity(8)); - break; - } else { - recv_state = RecvState::ReadBody( - len, - Vec::new(), - ); - } - } else { - let mut b = [0; 1]; - match tcp_stream.read(&mut b) { - Ok(_) => head.push(b[0]), - Err(_) => break, - } - }, - RecvState::ReadBody(len, body) => if body.len() == *len { - match bincode::deserialize(&body) { - Ok(msg) => { - recv_tx.send(Ok(msg))?; - recv_state = RecvState::ReadHead(Vec::with_capacity(8)); - }, - Err(err) => { - recv_tx.send(Err((*err).into()))?; - break 'work; - }, - } - } else { - let left = *len - body.len(); - let mut buf = vec![0; left]; - match tcp_stream.read(&mut buf) { - Ok(_) => body.append(&mut buf), - Err(err) => { - recv_tx.send(Err(err.into()))?; - break 'work; - }, - } + return Err(Error::Network); }, } }, @@ -465,24 +512,28 @@ fn postbox_worker( } } - tcp_stream.shutdown(Shutdown::Both)?; + //tcp_stream.shutdown(Shutdown::Both)?; Ok(()) } // TESTS +/* +#[derive(Serialize, Deserialize)] +struct TestMsg(T); + #[test] fn connect() { let srv_addr = ([127, 0, 0, 1], 12345); - let mut postoffice = PostOffice::::bind(srv_addr).unwrap(); + let mut postoffice = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); // We should start off with 0 incoming connections thread::sleep(Duration::from_millis(250)); assert_eq!(postoffice.new_connections().len(), 0); assert_eq!(postoffice.error(), None); - let postbox = PostBox::::to_server(srv_addr).unwrap(); + let postbox = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); // Now a postbox has been created, we should have 1 new thread::sleep(Duration::from_millis(250)); @@ -496,21 +547,21 @@ fn connect_fail() { let listen_addr = ([0; 4], 12345); let connect_addr = ([127, 0, 0, 1], 12212); - let mut postoffice = PostOffice::::bind(listen_addr).unwrap(); + let mut postoffice = PostOffice::, TestMsg>::bind(listen_addr).unwrap(); // We should start off with 0 incoming connections thread::sleep(Duration::from_millis(250)); assert_eq!(postoffice.new_connections().len(), 0); assert_eq!(postoffice.error(), None); - assert!(PostBox::::to_server(connect_addr).is_err()); + assert!(PostBox::, TestMsg>::to_server(connect_addr).is_err()); } #[test] fn connection_count() { let srv_addr = ([127, 0, 0, 1], 12346); - let mut postoffice = PostOffice::::bind(srv_addr).unwrap(); + let mut postoffice = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); let mut postboxes = Vec::new(); // We should start off with 0 incoming connections @@ -519,7 +570,7 @@ fn connection_count() { assert_eq!(postoffice.error(), None); for _ in 0..5 { - postboxes.push(PostBox::::to_server(srv_addr).unwrap()); + postboxes.push(PostBox::, TestMsg>::to_server(srv_addr).unwrap()); } // 5 postboxes created, we should have 5 @@ -533,10 +584,10 @@ fn connection_count() { fn disconnect() { let srv_addr = ([127, 0, 0, 1], 12347); - let mut postoffice = PostOffice::::bind(srv_addr).unwrap(); + let mut postoffice = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); let mut server_postbox = { - let mut client_postbox = PostBox::::to_server(srv_addr).unwrap(); + let mut client_postbox = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); thread::sleep(Duration::from_millis(250)); let mut incoming = postoffice.new_connections(); @@ -558,52 +609,53 @@ fn disconnect() { fn client_to_server() { let srv_addr = ([127, 0, 0, 1], 12348); - let mut po = PostOffice::::bind(srv_addr).unwrap(); + let mut po = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - let mut client_pb = PostBox::::to_server(srv_addr).unwrap(); + let mut client_pb = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); thread::sleep(Duration::from_millis(250)); let mut server_pb = po.new_connections().next().unwrap(); - client_pb.send(1337.0); - client_pb.send(9821.0); - client_pb.send(-3.2); - client_pb.send(17.0); + client_pb.send(TestMsg(1337.0)); + client_pb.send(TestMsg(9821.0)); + client_pb.send(TestMsg(-3.2)); + client_pb.send(TestMsg(17.0)); thread::sleep(Duration::from_millis(250)); let mut incoming_msgs = server_pb.new_messages(); assert_eq!(incoming_msgs.len(), 4); - assert_eq!(incoming_msgs.next().unwrap(), 1337.0); - assert_eq!(incoming_msgs.next().unwrap(), 9821.0); - assert_eq!(incoming_msgs.next().unwrap(), -3.2); - assert_eq!(incoming_msgs.next().unwrap(), 17.0); + assert_eq!(incoming_msgs.next().unwrap(), TestMsg(1337.0)); + assert_eq!(incoming_msgs.next().unwrap(), TestMsg(9821.0)); + assert_eq!(incoming_msgs.next().unwrap(), TestMsg(-3.2)); + assert_eq!(incoming_msgs.next().unwrap(), TestMsg(17.0)); } #[test] fn server_to_client() { let srv_addr = ([127, 0, 0, 1], 12349); - let mut po = PostOffice::::bind(srv_addr).unwrap(); + let mut po = PostOffice::, TestMsg>::bind(srv_addr).unwrap(); - let mut client_pb = PostBox::::to_server(srv_addr).unwrap(); + let mut client_pb = PostBox::, TestMsg>::to_server(srv_addr).unwrap(); thread::sleep(Duration::from_millis(250)); let mut server_pb = po.new_connections().next().unwrap(); - server_pb.send(1337); - server_pb.send(9821); - server_pb.send(39999999); - server_pb.send(17); + server_pb.send(TestMsg(1337)); + server_pb.send(TestMsg(9821)); + server_pb.send(TestMsg(39999999)); + server_pb.send(TestMsg(17)); thread::sleep(Duration::from_millis(250)); let mut incoming_msgs = client_pb.new_messages(); assert_eq!(incoming_msgs.len(), 4); - assert_eq!(incoming_msgs.next().unwrap(), 1337); - assert_eq!(incoming_msgs.next().unwrap(), 9821); - assert_eq!(incoming_msgs.next().unwrap(), 39999999); - assert_eq!(incoming_msgs.next().unwrap(), 17); + assert_eq!(incoming_msgs.next().unwrap(), TestMsg(1337)); + assert_eq!(incoming_msgs.next().unwrap(), TestMsg(9821)); + assert_eq!(incoming_msgs.next().unwrap(), TestMsg(39999999)); + assert_eq!(incoming_msgs.next().unwrap(), TestMsg(17)); } +*/ diff --git a/common/src/net/post2.rs b/common/src/net/post2.rs new file mode 100644 index 0000000000..16a9bb985a --- /dev/null +++ b/common/src/net/post2.rs @@ -0,0 +1,402 @@ +use std::{ + io::{self, Read, Write}, + net::{TcpListener, TcpStream, SocketAddr, Shutdown}, + time::{Instant, Duration}, + marker::PhantomData, + sync::mpsc, + thread, + collections::VecDeque, + convert::TryFrom, +}; +use serde::{Serialize, de::DeserializeOwned}; + +#[derive(Clone, Debug)] +pub enum Error { + Io, //Io(io::Error), + Bincode, //Bincode(bincode::Error), + ChannelFailure, + InvalidMessage, +} + +impl From for Error { + fn from(err: io::Error) -> Self { + Error::Io //(err) + } +} + +impl From for Error { + fn from(err: bincode::Error) -> Self { + Error::Bincode //(err) + } +} + +impl From for Error { + fn from(error: mpsc::TryRecvError) -> Self { + Error::ChannelFailure + } +} + +pub trait PostMsg = Serialize + DeserializeOwned + 'static + Send; + +const MAX_MSG_SIZE: usize = 1 << 20; + +pub struct PostOffice { + listener: TcpListener, + error: Option, + phantom: PhantomData<(S, R)>, +} + +impl PostOffice { + pub fn bind>(addr: A) -> Result { + let mut listener = TcpListener::bind(addr.into())?; + listener.set_nonblocking(true)?; + + Ok(Self { + listener, + error: None, + phantom: PhantomData, + }) + } + + pub fn error(&self) -> Option { + self.error.clone() + } + + pub fn new_postboxes(&mut self) -> impl ExactSizeIterator> { + let mut new = Vec::new(); + + if self.error.is_some() { + return new.into_iter(); + } + + loop { + match self.listener.accept() { + Ok((stream, sock)) => new.push(PostBox::from_stream(stream).unwrap()), + Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, + Err(e) => { + self.error = Some(e.into()); + break; + }, + } + } + + new.into_iter() + } +} + +pub struct PostBox { + send_tx: mpsc::Sender>, + recv_rx: mpsc::Receiver>, + worker: Option>, + error: Option, +} + +impl PostBox { + pub fn to>(addr: A) -> Result { + Self::from_stream(TcpStream::connect(addr.into())?) + } + + fn from_stream(stream: TcpStream) -> Result { + stream.set_nonblocking(true)?; + + let (send_tx, send_rx) = mpsc::channel(); + let (recv_tx, recv_rx) = mpsc::channel(); + + let worker = thread::spawn(move || Self::worker(stream, send_rx, recv_tx)); + + Ok(Self { + send_tx, + recv_rx, + worker: Some(worker), + error: None, + }) + } + + pub fn error(&self) -> Option { + self.error.clone() + } + + pub fn send_message(&mut self, msg: S) { + let _ = self.send_tx.send(Some(msg)); + } + + pub fn next_message(&mut self) -> Option { + if self.error.is_some() { + return None; + } + + match self.recv_rx.recv().ok()? { + Ok(msg) => Some(msg), + Err(e) => { + self.error = Some(e); + None + }, + } + } + + pub fn new_messages(&mut self) -> impl ExactSizeIterator { + let mut new = Vec::new(); + + if self.error.is_some() { + return new.into_iter(); + } + + loop { + match self.recv_rx.try_recv() { + Ok(Ok(msg)) => new.push(msg), + Err(mpsc::TryRecvError::Empty) => break, + Err(e) => { + self.error = Some(e.into()); + break; + }, + Ok(Err(e)) => { + self.error = Some(e); + break; + }, + } + } + + new.into_iter() + } + + fn worker(mut stream: TcpStream, send_rx: mpsc::Receiver>, recv_tx: mpsc::Sender>) { + let mut outgoing_chunks = VecDeque::new(); + let mut incoming_buf = Vec::new(); + + 'work: loop { + // Get stream errors + match stream.take_error() { + Ok(Some(e)) | Err(e) => { + recv_tx.send(Err(e.into())).unwrap(); + break 'work; + }, + Ok(None) => {}, + } + + // Try getting messages from the send channel + loop { + match send_rx.try_recv() { + Ok(Some(send_msg)) => { + // Serialize message + let mut msg_bytes = bincode::serialize(&send_msg).unwrap(); + + // Assemble into packet + let mut packet_bytes = msg_bytes + .len() + .to_le_bytes() + .as_ref() + .to_vec(); + packet_bytes.append(&mut msg_bytes); + + // Split packet into chunks + packet_bytes + .chunks(4096) + .map(|chunk| chunk.to_vec()) + .for_each(|chunk| outgoing_chunks.push_back(chunk)) + }, + // Shut down worker + Ok(None) => break 'work, + Err(mpsc::TryRecvError::Empty) => break, + // Worker error + Err(e) => { + let _ = recv_tx.send(Err(e.into())); + break 'work; + }, + } + } + + // Try sending bytes through the TCP stream + loop { + //println!("HERE! Outgoing len: {}", outgoing_chunks.len()); + match outgoing_chunks.pop_front() { + Some(chunk) => match stream.write_all(&chunk) { + Ok(()) => {}, + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + // Return chunk to the queue to try again later + outgoing_chunks.push_front(chunk); + break; + }, + // Worker error + Err(e) => { + recv_tx.send(Err(e.into())).unwrap(); + break 'work; + }, + }, + None => break, + } + } + + // Try receiving bytes from the TCP stream + loop { + let mut buf = [0; 1024]; + + match stream.read(&mut buf) { + Ok(n) => incoming_buf.extend_from_slice(&buf[0..n]), + Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, + // Worker error + Err(e) => { + recv_tx.send(Err(e.into())).unwrap(); + break 'work; + }, + } + } + + // Try turning bytes into messages + loop { + match incoming_buf.get(0..8) { + Some(len_bytes) => { + let len = usize::from_le_bytes(<[u8; 8]>::try_from(len_bytes).unwrap()); // Can't fail + + if len > MAX_MSG_SIZE { + recv_tx.send(Err(Error::InvalidMessage)).unwrap(); + break 'work; + } else if incoming_buf.len() >= len + 8 { + let deserialize_result = bincode::deserialize(&incoming_buf[8..len + 8]); + + if let Err(e) = &deserialize_result { + println!("DESERIALIZE ERROR: {:?}", e); + } + + recv_tx.send(deserialize_result.map_err(|e| e.into())); + incoming_buf = incoming_buf.split_off(len + 8); + } else { + break; + } + }, + None => break, + } + } + + thread::sleep(Duration::from_millis(10)); + } + + stream.shutdown(Shutdown::Both); + } +} + +impl Drop for PostBox { + fn drop(&mut self) { + let _ = self.send_tx.send(None); + // TODO: Cleanly join! + //self.worker.take().map(|handle| handle.join()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn create_postoffice(id: u16) -> Result<(PostOffice, SocketAddr), Error> { + let sock = ([0; 4], 12345 + id).into(); + Ok((PostOffice::bind(sock)?, sock)) + } + + fn loop_for(duration: Duration, mut f: F) { + let start = Instant::now(); + while start.elapsed() < duration { + f(); + } + } + + #[test] + fn connect() { + let (mut postoffice, sock) = create_postoffice::<(), ()>(0).unwrap(); + + let _client0 = PostBox::<(), ()>::to(sock).unwrap(); + let _client1 = PostBox::<(), ()>::to(sock).unwrap(); + let _client2 = PostBox::<(), ()>::to(sock).unwrap(); + + let mut new_clients = 0; + loop_for(Duration::from_millis(250), || { + new_clients += postoffice.new_postboxes().count(); + }); + + assert_eq!(new_clients, 3); + } + + /* + #[test] + fn disconnect() { + let (mut postoffice, sock) = create_postoffice::<(), ()>(1).unwrap(); + + let mut client = PostBox::::to(sock).unwrap(); + loop_for(Duration::from_millis(250), || ()); + let mut server = postoffice.new_postboxes().unwrap().next().unwrap(); + + drop(client); + loop_for(Duration::from_millis(300), || ()); + + assert!(server.new_messages().is_err()); + } + */ + + #[test] + fn send_recv() { + let (mut postoffice, sock) = create_postoffice::<(), i32>(2).unwrap(); + let test_msgs = vec![1, 1337, 42, -48]; + + let mut client = PostBox::::to(sock).unwrap(); + loop_for(Duration::from_millis(250), || ()); + let mut server = postoffice.new_postboxes().next().unwrap(); + + for msg in &test_msgs { + client.send_message(msg.clone()); + } + + let mut recv_msgs = Vec::new(); + loop_for(Duration::from_millis(250), || server + .new_messages() + .for_each(|msg| recv_msgs.push(msg))); + + assert_eq!(test_msgs, recv_msgs); + } + + #[test] + fn send_recv_huge() { + let (mut postoffice, sock) = create_postoffice::<(), Vec>(3).unwrap(); + let test_msgs: Vec> = (0..5).map(|i| (0..100000).map(|j| i * 2 + j).collect()).collect(); + + let mut client = PostBox::, ()>::to(sock).unwrap(); + loop_for(Duration::from_millis(250), || ()); + let mut server = postoffice.new_postboxes().next().unwrap(); + + for msg in &test_msgs { + client.send_message(msg.clone()); + } + + let mut recv_msgs = Vec::new(); + loop_for(Duration::from_millis(2000), || server + .new_messages() + .for_each(|msg| recv_msgs.push(msg))); + + assert_eq!(test_msgs.len(), recv_msgs.len()); + assert!(test_msgs == recv_msgs); + } + + #[test] + fn send_recv_both() { + let (mut postoffice, sock) = create_postoffice::(4).unwrap(); + let test_msgs = vec![1, 1337, 42, -48]; + + let mut client = PostBox::::to(sock).unwrap(); + loop_for(Duration::from_millis(250), || ()); + let mut server = postoffice.new_postboxes().next().unwrap(); + + let test_msgs = vec![ + (0xDEADBEAD, 0xBEEEEEEF), + (0x1BADB002, 0xBAADF00D), + (0xBAADA555, 0xC0DED00D), + (0xCAFEBABE, 0xDEADC0DE), + ]; + + for (to, from) in test_msgs { + client.send_message(to); + server.send_message(from); + + loop_for(Duration::from_millis(250), || ()); + + assert_eq!(client.new_messages().next().unwrap(), from); + assert_eq!(server.new_messages().next().unwrap(), to); + } + } +} diff --git a/common/src/volumes/vol_map.rs b/common/src/volumes/vol_map.rs index ab2947a416..507c009aea 100644 --- a/common/src/volumes/vol_map.rs +++ b/common/src/volumes/vol_map.rs @@ -137,4 +137,8 @@ impl VolMap { pub fn key_pos(&self, key: Vec3) -> Vec3 { key * S::SIZE.map(|e| e as i32) } + + pub fn pos_key(&self, pos: Vec3) -> Vec3 { + Self::chunk_key(pos) + } } diff --git a/server/src/client.rs b/server/src/client.rs index 71d8aae6ab..c0e2eafe90 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -21,7 +21,7 @@ pub struct Client { impl Client { pub fn notify(&mut self, msg: ServerMsg) { - self.postbox.send(msg); + self.postbox.send_message(msg); } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 9313505a71..f8ebba4b3c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -151,7 +151,7 @@ impl Server { self.clients.notify(entity, ServerMsg::TerrainChunkUpdate { key, - chunk: chunk.clone(), + chunk: Box::new(chunk.clone()), }); } @@ -176,7 +176,7 @@ impl Server { fn handle_new_connections(&mut self) -> Result, Error> { let mut frontend_events = Vec::new(); - for mut postbox in self.postoffice.new_connections() { + for mut postbox in self.postoffice.new_postboxes() { let entity = self.state .ecs_mut() .create_entity_synced() @@ -245,7 +245,7 @@ impl Server { ClientState::Connected => match msg { ClientMsg::Connect { .. } => disconnect = true, // Not allowed when already connected ClientMsg::Disconnect => disconnect = true, - ClientMsg::Ping => client.postbox.send(ServerMsg::Pong), + ClientMsg::Ping => client.postbox.send_message(ServerMsg::Pong), ClientMsg::Pong => {}, ClientMsg::Chat(msg) => new_chat_msgs.push((entity, msg)), ClientMsg::PlayerPhysics { pos, vel, dir } => { @@ -254,9 +254,9 @@ impl Server { state.write_component(entity, dir); }, ClientMsg::TerrainChunkRequest { key } => match state.terrain().get_key(key) { - Some(chunk) => client.postbox.send(ServerMsg::TerrainChunkUpdate { + Some(chunk) => client.postbox.send_message(ServerMsg::TerrainChunkUpdate { key, - chunk: chunk.clone(), + chunk: Box::new(chunk.clone()), }), None => requested_chunks.push(key), }, @@ -270,7 +270,7 @@ impl Server { disconnect = true; } else if state.get_time() - client.last_ping > CLIENT_TIMEOUT * 0.5 { // Try pinging the client if the timeout is nearing - client.postbox.send(ServerMsg::Ping); + client.postbox.send_message(ServerMsg::Ping); } if disconnect {