Rewrote netcode, added basic chunk synching

Former-commit-id: e9f76f7fa9dbe0c81cd4c998bf0f0b3eec9235cb
This commit is contained in:
Joshua Barretto 2019-04-11 23:26:43 +01:00
parent f27b5fa975
commit 507c47e771
13 changed files with 608 additions and 120 deletions

View File

@ -18,7 +18,7 @@ fn main() {
let mut clock = Clock::new();
// Create client
let mut client = Client::new(([127, 0, 0, 1], 59003), comp::Player::new("test".to_string()), None)
let mut client = Client::new(([127, 0, 0, 1], 59003), comp::Player::new("test".to_string()), None, 300)
.expect("Failed to create client instance");
client.send_chat("Hello!".to_string());

View File

@ -11,9 +11,6 @@ pub enum Error {
impl From<PostError> for Error {
fn from(err: PostError) -> Self {
match err {
PostError::Disconnect => Error::ServerShutdown,
err => Error::Network(err),
}
Error::Network(err)
}
}

View File

@ -13,6 +13,7 @@ pub use crate::{
use std::{
time::Duration,
net::SocketAddr,
collections::HashSet,
};
use vek::*;
use threadpool::ThreadPool;
@ -41,6 +42,8 @@ pub struct Client {
state: State,
player: Option<EcsEntity>,
view_distance: u64,
pending_chunks: HashSet<Vec3<i32>>,
}
impl Client {
@ -53,10 +56,10 @@ impl Client {
view_distance: u64,
) -> Result<Self, Error> {
let mut postbox = PostBox::to_server(addr)?;
let mut postbox = PostBox::to(addr)?;
// Send connection request
postbox.send(ClientMsg::Connect {
postbox.send_message(ClientMsg::Connect {
player,
character,
});
@ -83,6 +86,8 @@ impl Client {
state,
player,
view_distance,
pending_chunks: HashSet::new(),
})
}
@ -115,7 +120,7 @@ impl Client {
/// Send a chat message to the server
#[allow(dead_code)]
pub fn send_chat(&mut self, msg: String) {
self.postbox.send(ClientMsg::Chat(msg))
self.postbox.send_message(ClientMsg::Chat(msg))
}
/// Execute a single client tick, handle input and update the game state by the given duration
@ -161,12 +166,31 @@ impl Client {
self.state.read_storage().get(ecs_entity).cloned(),
) {
(Some(pos), Some(vel), Some(dir)) => {
self.postbox.send(ClientMsg::PlayerPhysics { pos, vel, dir });
self.postbox.send_message(ClientMsg::PlayerPhysics { pos, vel, dir });
},
_ => {},
}
}
// Request chunks from the server
if let Some(player_entity) = self.player {
if let Some(pos) = self.state.read_storage::<comp::phys::Pos>().get(player_entity) {
let chunk_pos = self.state.terrain().pos_key(pos.0.map(|e| e as i32));
for i in chunk_pos.x - 0..chunk_pos.x + 1 {
for j in chunk_pos.y - 0..chunk_pos.y + 1 {
for k in 0..3 {
let key = chunk_pos + Vec3::new(i, j, k);
if self.state.terrain().get_key(key).is_none() && !self.pending_chunks.contains(&key) {
self.postbox.send_message(ClientMsg::TerrainChunkRequest { key });
self.pending_chunks.insert(key);
}
}
}
}
}
}
// Finish the tick, pass control back to the frontend (step 6)
self.tick += 1;
Ok(frontend_events)
@ -193,12 +217,15 @@ impl Client {
match msg {
ServerMsg::Handshake { .. } => return Err(Error::ServerWentMad),
ServerMsg::Shutdown => return Err(Error::ServerShutdown),
ServerMsg::Ping => self.postbox.send(ClientMsg::Pong),
ServerMsg::Ping => self.postbox.send_message(ClientMsg::Pong),
ServerMsg::Pong => {},
ServerMsg::Chat(msg) => frontend_events.push(Event::Chat(msg)),
ServerMsg::SetPlayerEntity(uid) => self.player = Some(self.state.ecs().entity_from_uid(uid).unwrap()), // TODO: Don't unwrap here!
ServerMsg::EcsSync(sync_package) => self.state.ecs_mut().sync_with_package(sync_package),
ServerMsg::TerrainChunkUpdate { key, chunk } => self.state.insert_chunk(key, chunk),
ServerMsg::TerrainChunkUpdate { key, chunk } => {
self.state.insert_chunk(key, *chunk);
self.pending_chunks.remove(&key);
},
}
}
} else if let Some(err) = self.postbox.error() {
@ -207,7 +234,7 @@ impl Client {
return Err(Error::ServerTimeout);
} else if self.state.get_time() - self.last_ping > SERVER_TIMEOUT * 0.5 {
// Try pinging the server if the timeout is nearing
self.postbox.send(ClientMsg::Ping);
self.postbox.send_message(ClientMsg::Ping);
}
Ok(frontend_events)
@ -216,6 +243,6 @@ impl Client {
impl Drop for Client {
fn drop(&mut self) {
self.postbox.send(ClientMsg::Disconnect);
self.postbox.send_message(ClientMsg::Disconnect);
}
}

View File

@ -18,4 +18,3 @@ serde = "1.0"
serde_derive = "1.0"
bincode = "1.0"
log = "0.4"
pretty_env_logger = "0.3"

View File

@ -1,4 +1,4 @@
#![feature(euclidean_division, duration_float, trait_alias)]
#![feature(euclidean_division, duration_float, trait_alias, bind_by_move_pattern_guards)]
#[macro_use]
extern crate serde_derive;

View File

@ -20,3 +20,5 @@ pub enum ClientMsg {
},
Disconnect,
}
impl middleman::Message for ClientMsg {}

View File

@ -16,6 +16,8 @@ pub enum ServerMsg {
EcsSync(sphynx::SyncPackage<EcsPacket>),
TerrainChunkUpdate {
key: Vec3<i32>,
chunk: TerrainChunk,
chunk: Box<TerrainChunk>,
},
}
impl middleman::Message for ServerMsg {}

View File

@ -1,5 +1,8 @@
pub mod data;
pub mod post;
//pub mod post;
pub mod post2;
pub use post2 as post;
// Reexports
pub use self::{

View File

@ -23,6 +23,7 @@ use mio_extras::channel::{
Sender,
};
use bincode;
use middleman::Middleman;
#[derive(Clone, Debug, PartialEq)]
pub enum Error {
@ -56,14 +57,15 @@ impl<T> From<mio_extras::channel::SendError<T>> for Error {
}
}
pub trait PostSend = 'static + serde::Serialize + Send;
pub trait PostRecv = 'static + serde::de::DeserializeOwned + Send;
pub trait PostSend = 'static + serde::Serialize + Send + middleman::Message;
pub trait PostRecv = 'static + serde::de::DeserializeOwned + Send + middleman::Message;
const TCP_TOK: Token = Token(0);
const CTRL_TOK: Token = Token(1);
const POSTBOX_TOK: Token = Token(2);
const SEND_TOK: Token = Token(3);
const RECV_TOK: Token = Token(4);
const TCP_TOK: Token = Token(0);
const CTRL_TOK: Token = Token(1);
const POSTBOX_TOK: Token = Token(2);
const SEND_TOK: Token = Token(3);
const RECV_TOK: Token = Token(4);
const MIDDLEMAN_TOK: Token = Token(5);
const MAX_MSG_BYTES: usize = 1 << 20;
@ -218,7 +220,7 @@ impl<S: PostSend, R: PostRecv> PostBox<S, R> {
let (recv_tx, recv_rx) = channel();
let worker_poll = Poll::new()?;
worker_poll.register(&tcp_stream, TCP_TOK, Ready::readable(), PollOpt::edge())?;
worker_poll.register(&tcp_stream, TCP_TOK, Ready::readable() | Ready::writable(), PollOpt::edge())?;
worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?;
worker_poll.register(&send_rx, SEND_TOK, Ready::readable(), PollOpt::edge())?;
@ -345,12 +347,38 @@ fn postbox_worker<S: PostSend, R: PostRecv>(
send_rx: Receiver<S>,
recv_tx: Sender<Result<R, Error>>,
) -> Result<(), Error> {
fn try_tcp_send(tcp_stream: &mut TcpStream, chunks: &mut VecDeque<Vec<u8>>) -> Result<(), Error> {
loop {
let chunk = match chunks.pop_front() {
Some(chunk) => chunk,
None => break,
};
match tcp_stream.write_all(&chunk) {
Ok(()) => {},
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
chunks.push_front(chunk);
break;
},
Err(err) => {
println!("Error: {:?}", err);
return Err(err.into());
},
}
}
Ok(())
}
enum RecvState {
ReadHead(Vec<u8>),
ReadBody(usize, Vec<u8>),
}
let mut recv_state = RecvState::ReadHead(Vec::with_capacity(8));
let mut recv_state = RecvState::ReadHead(Vec::new());
let mut chunks = VecDeque::new();
//let mut recv_state = RecvState::ReadHead(Vec::with_capacity(8));
let mut events = Events::with_capacity(64);
'work: loop {
@ -383,18 +411,23 @@ fn postbox_worker<S: PostSend, R: PostRecv>(
},
};
let mut packet = msg_bytes
let mut bytes = msg_bytes
.len()
.to_le_bytes()
.as_ref()
.to_vec();
packet.append(&mut msg_bytes);
bytes.append(&mut msg_bytes);
match tcp_stream.write_all(&packet) {
Ok(()) => {},
bytes
.chunks(1024)
.map(|chunk| chunk.to_vec())
.for_each(|chunk| chunks.push_back(chunk));
match try_tcp_send(&mut tcp_stream, &mut chunks) {
Ok(_) => {},
Err(err) => {
recv_tx.send(Err(err.into()))?;
break 'work;
return Err(Error::Network);
},
}
},
@ -402,61 +435,75 @@ fn postbox_worker<S: PostSend, R: PostRecv>(
Err(err) => Err(err)?,
}
},
TCP_TOK => loop {
match tcp_stream.take_error() {
Ok(None) => {},
Ok(Some(err)) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
TCP_TOK => {
loop {
// Check TCP error
match tcp_stream.take_error() {
Ok(None) => {},
Ok(Some(err)) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
Err(err) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
}
match &mut recv_state {
RecvState::ReadHead(head) => if head.len() == 8 {
let len = usize::from_le_bytes(<[u8; 8]>::try_from(head.as_slice()).unwrap());
if len > MAX_MSG_BYTES {
println!("TOO BIG! {:x}", len);
recv_tx.send(Err(Error::InvalidMsg))?;
break 'work;
} else if len == 0 {
recv_state = RecvState::ReadHead(Vec::with_capacity(8));
break;
} else {
recv_state = RecvState::ReadBody(
len,
Vec::new(),
);
}
} else {
let mut b = [0; 1];
match tcp_stream.read(&mut b) {
Ok(0) => {},
Ok(_) => head.push(b[0]),
Err(_) => break,
}
},
RecvState::ReadBody(len, body) => if body.len() == *len {
match bincode::deserialize(&body) {
Ok(msg) => {
recv_tx.send(Ok(msg))?;
recv_state = RecvState::ReadHead(Vec::with_capacity(8));
},
Err(err) => {
recv_tx.send(Err((*err).into()))?;
break 'work;
},
}
} else {
let left = *len - body.len();
let mut buf = vec![0; left];
match tcp_stream.read(&mut buf) {
Ok(_) => body.append(&mut buf),
Err(err) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
}
},
}
}
// Now, try sending TCP stuff
match try_tcp_send(&mut tcp_stream, &mut chunks) {
Ok(_) => {},
Err(err) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
}
match &mut recv_state {
RecvState::ReadHead(head) => if head.len() == 8 {
let len = usize::from_le_bytes(<[u8; 8]>::try_from(head.as_slice()).unwrap());
if len > MAX_MSG_BYTES {
recv_tx.send(Err(Error::InvalidMsg))?;
break 'work;
} else if len == 0 {
recv_state = RecvState::ReadHead(Vec::with_capacity(8));
break;
} else {
recv_state = RecvState::ReadBody(
len,
Vec::new(),
);
}
} else {
let mut b = [0; 1];
match tcp_stream.read(&mut b) {
Ok(_) => head.push(b[0]),
Err(_) => break,
}
},
RecvState::ReadBody(len, body) => if body.len() == *len {
match bincode::deserialize(&body) {
Ok(msg) => {
recv_tx.send(Ok(msg))?;
recv_state = RecvState::ReadHead(Vec::with_capacity(8));
},
Err(err) => {
recv_tx.send(Err((*err).into()))?;
break 'work;
},
}
} else {
let left = *len - body.len();
let mut buf = vec![0; left];
match tcp_stream.read(&mut buf) {
Ok(_) => body.append(&mut buf),
Err(err) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
}
return Err(Error::Network);
},
}
},
@ -465,24 +512,28 @@ fn postbox_worker<S: PostSend, R: PostRecv>(
}
}
tcp_stream.shutdown(Shutdown::Both)?;
//tcp_stream.shutdown(Shutdown::Both)?;
Ok(())
}
// TESTS
/*
#[derive(Serialize, Deserialize)]
struct TestMsg<T>(T);
#[test]
fn connect() {
let srv_addr = ([127, 0, 0, 1], 12345);
let mut postoffice = PostOffice::<u32, f32>::bind(srv_addr).unwrap();
let mut postoffice = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(srv_addr).unwrap();
// We should start off with 0 incoming connections
thread::sleep(Duration::from_millis(250));
assert_eq!(postoffice.new_connections().len(), 0);
assert_eq!(postoffice.error(), None);
let postbox = PostBox::<f32, u32>::to_server(srv_addr).unwrap();
let postbox = PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(srv_addr).unwrap();
// Now a postbox has been created, we should have 1 new
thread::sleep(Duration::from_millis(250));
@ -496,21 +547,21 @@ fn connect_fail() {
let listen_addr = ([0; 4], 12345);
let connect_addr = ([127, 0, 0, 1], 12212);
let mut postoffice = PostOffice::<u32, f32>::bind(listen_addr).unwrap();
let mut postoffice = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(listen_addr).unwrap();
// We should start off with 0 incoming connections
thread::sleep(Duration::from_millis(250));
assert_eq!(postoffice.new_connections().len(), 0);
assert_eq!(postoffice.error(), None);
assert!(PostBox::<f32, u32>::to_server(connect_addr).is_err());
assert!(PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(connect_addr).is_err());
}
#[test]
fn connection_count() {
let srv_addr = ([127, 0, 0, 1], 12346);
let mut postoffice = PostOffice::<u32, f32>::bind(srv_addr).unwrap();
let mut postoffice = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(srv_addr).unwrap();
let mut postboxes = Vec::new();
// We should start off with 0 incoming connections
@ -519,7 +570,7 @@ fn connection_count() {
assert_eq!(postoffice.error(), None);
for _ in 0..5 {
postboxes.push(PostBox::<f32, u32>::to_server(srv_addr).unwrap());
postboxes.push(PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(srv_addr).unwrap());
}
// 5 postboxes created, we should have 5
@ -533,10 +584,10 @@ fn connection_count() {
fn disconnect() {
let srv_addr = ([127, 0, 0, 1], 12347);
let mut postoffice = PostOffice::<u32, f32>::bind(srv_addr).unwrap();
let mut postoffice = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(srv_addr).unwrap();
let mut server_postbox = {
let mut client_postbox = PostBox::<f32, u32>::to_server(srv_addr).unwrap();
let mut client_postbox = PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(srv_addr).unwrap();
thread::sleep(Duration::from_millis(250));
let mut incoming = postoffice.new_connections();
@ -558,52 +609,53 @@ fn disconnect() {
fn client_to_server() {
let srv_addr = ([127, 0, 0, 1], 12348);
let mut po = PostOffice::<u32, f32>::bind(srv_addr).unwrap();
let mut po = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(srv_addr).unwrap();
let mut client_pb = PostBox::<f32, u32>::to_server(srv_addr).unwrap();
let mut client_pb = PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(srv_addr).unwrap();
thread::sleep(Duration::from_millis(250));
let mut server_pb = po.new_connections().next().unwrap();
client_pb.send(1337.0);
client_pb.send(9821.0);
client_pb.send(-3.2);
client_pb.send(17.0);
client_pb.send(TestMsg(1337.0));
client_pb.send(TestMsg(9821.0));
client_pb.send(TestMsg(-3.2));
client_pb.send(TestMsg(17.0));
thread::sleep(Duration::from_millis(250));
let mut incoming_msgs = server_pb.new_messages();
assert_eq!(incoming_msgs.len(), 4);
assert_eq!(incoming_msgs.next().unwrap(), 1337.0);
assert_eq!(incoming_msgs.next().unwrap(), 9821.0);
assert_eq!(incoming_msgs.next().unwrap(), -3.2);
assert_eq!(incoming_msgs.next().unwrap(), 17.0);
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(1337.0));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(9821.0));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(-3.2));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(17.0));
}
#[test]
fn server_to_client() {
let srv_addr = ([127, 0, 0, 1], 12349);
let mut po = PostOffice::<u32, f32>::bind(srv_addr).unwrap();
let mut po = PostOffice::<TestMsg<u32>, TestMsg<f32>>::bind(srv_addr).unwrap();
let mut client_pb = PostBox::<f32, u32>::to_server(srv_addr).unwrap();
let mut client_pb = PostBox::<TestMsg<f32>, TestMsg<u32>>::to_server(srv_addr).unwrap();
thread::sleep(Duration::from_millis(250));
let mut server_pb = po.new_connections().next().unwrap();
server_pb.send(1337);
server_pb.send(9821);
server_pb.send(39999999);
server_pb.send(17);
server_pb.send(TestMsg(1337));
server_pb.send(TestMsg(9821));
server_pb.send(TestMsg(39999999));
server_pb.send(TestMsg(17));
thread::sleep(Duration::from_millis(250));
let mut incoming_msgs = client_pb.new_messages();
assert_eq!(incoming_msgs.len(), 4);
assert_eq!(incoming_msgs.next().unwrap(), 1337);
assert_eq!(incoming_msgs.next().unwrap(), 9821);
assert_eq!(incoming_msgs.next().unwrap(), 39999999);
assert_eq!(incoming_msgs.next().unwrap(), 17);
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(1337));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(9821));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(39999999));
assert_eq!(incoming_msgs.next().unwrap(), TestMsg(17));
}
*/

402
common/src/net/post2.rs Normal file
View File

@ -0,0 +1,402 @@
use std::{
io::{self, Read, Write},
net::{TcpListener, TcpStream, SocketAddr, Shutdown},
time::{Instant, Duration},
marker::PhantomData,
sync::mpsc,
thread,
collections::VecDeque,
convert::TryFrom,
};
use serde::{Serialize, de::DeserializeOwned};
#[derive(Clone, Debug)]
pub enum Error {
Io, //Io(io::Error),
Bincode, //Bincode(bincode::Error),
ChannelFailure,
InvalidMessage,
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Self {
Error::Io //(err)
}
}
impl From<bincode::Error> for Error {
fn from(err: bincode::Error) -> Self {
Error::Bincode //(err)
}
}
impl From<mpsc::TryRecvError> for Error {
fn from(error: mpsc::TryRecvError) -> Self {
Error::ChannelFailure
}
}
pub trait PostMsg = Serialize + DeserializeOwned + 'static + Send;
const MAX_MSG_SIZE: usize = 1 << 20;
pub struct PostOffice<S: PostMsg, R: PostMsg> {
listener: TcpListener,
error: Option<Error>,
phantom: PhantomData<(S, R)>,
}
impl<S: PostMsg, R: PostMsg> PostOffice<S, R> {
pub fn bind<A: Into<SocketAddr>>(addr: A) -> Result<Self, Error> {
let mut listener = TcpListener::bind(addr.into())?;
listener.set_nonblocking(true)?;
Ok(Self {
listener,
error: None,
phantom: PhantomData,
})
}
pub fn error(&self) -> Option<Error> {
self.error.clone()
}
pub fn new_postboxes(&mut self) -> impl ExactSizeIterator<Item=PostBox<S, R>> {
let mut new = Vec::new();
if self.error.is_some() {
return new.into_iter();
}
loop {
match self.listener.accept() {
Ok((stream, sock)) => new.push(PostBox::from_stream(stream).unwrap()),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => {
self.error = Some(e.into());
break;
},
}
}
new.into_iter()
}
}
pub struct PostBox<S: PostMsg, R: PostMsg> {
send_tx: mpsc::Sender<Option<S>>,
recv_rx: mpsc::Receiver<Result<R, Error>>,
worker: Option<thread::JoinHandle<()>>,
error: Option<Error>,
}
impl<S: PostMsg, R: PostMsg> PostBox<S, R> {
pub fn to<A: Into<SocketAddr>>(addr: A) -> Result<Self, Error> {
Self::from_stream(TcpStream::connect(addr.into())?)
}
fn from_stream(stream: TcpStream) -> Result<Self, Error> {
stream.set_nonblocking(true)?;
let (send_tx, send_rx) = mpsc::channel();
let (recv_tx, recv_rx) = mpsc::channel();
let worker = thread::spawn(move || Self::worker(stream, send_rx, recv_tx));
Ok(Self {
send_tx,
recv_rx,
worker: Some(worker),
error: None,
})
}
pub fn error(&self) -> Option<Error> {
self.error.clone()
}
pub fn send_message(&mut self, msg: S) {
let _ = self.send_tx.send(Some(msg));
}
pub fn next_message(&mut self) -> Option<R> {
if self.error.is_some() {
return None;
}
match self.recv_rx.recv().ok()? {
Ok(msg) => Some(msg),
Err(e) => {
self.error = Some(e);
None
},
}
}
pub fn new_messages(&mut self) -> impl ExactSizeIterator<Item=R> {
let mut new = Vec::new();
if self.error.is_some() {
return new.into_iter();
}
loop {
match self.recv_rx.try_recv() {
Ok(Ok(msg)) => new.push(msg),
Err(mpsc::TryRecvError::Empty) => break,
Err(e) => {
self.error = Some(e.into());
break;
},
Ok(Err(e)) => {
self.error = Some(e);
break;
},
}
}
new.into_iter()
}
fn worker(mut stream: TcpStream, send_rx: mpsc::Receiver<Option<S>>, recv_tx: mpsc::Sender<Result<R, Error>>) {
let mut outgoing_chunks = VecDeque::new();
let mut incoming_buf = Vec::new();
'work: loop {
// Get stream errors
match stream.take_error() {
Ok(Some(e)) | Err(e) => {
recv_tx.send(Err(e.into())).unwrap();
break 'work;
},
Ok(None) => {},
}
// Try getting messages from the send channel
loop {
match send_rx.try_recv() {
Ok(Some(send_msg)) => {
// Serialize message
let mut msg_bytes = bincode::serialize(&send_msg).unwrap();
// Assemble into packet
let mut packet_bytes = msg_bytes
.len()
.to_le_bytes()
.as_ref()
.to_vec();
packet_bytes.append(&mut msg_bytes);
// Split packet into chunks
packet_bytes
.chunks(4096)
.map(|chunk| chunk.to_vec())
.for_each(|chunk| outgoing_chunks.push_back(chunk))
},
// Shut down worker
Ok(None) => break 'work,
Err(mpsc::TryRecvError::Empty) => break,
// Worker error
Err(e) => {
let _ = recv_tx.send(Err(e.into()));
break 'work;
},
}
}
// Try sending bytes through the TCP stream
loop {
//println!("HERE! Outgoing len: {}", outgoing_chunks.len());
match outgoing_chunks.pop_front() {
Some(chunk) => match stream.write_all(&chunk) {
Ok(()) => {},
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// Return chunk to the queue to try again later
outgoing_chunks.push_front(chunk);
break;
},
// Worker error
Err(e) => {
recv_tx.send(Err(e.into())).unwrap();
break 'work;
},
},
None => break,
}
}
// Try receiving bytes from the TCP stream
loop {
let mut buf = [0; 1024];
match stream.read(&mut buf) {
Ok(n) => incoming_buf.extend_from_slice(&buf[0..n]),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
// Worker error
Err(e) => {
recv_tx.send(Err(e.into())).unwrap();
break 'work;
},
}
}
// Try turning bytes into messages
loop {
match incoming_buf.get(0..8) {
Some(len_bytes) => {
let len = usize::from_le_bytes(<[u8; 8]>::try_from(len_bytes).unwrap()); // Can't fail
if len > MAX_MSG_SIZE {
recv_tx.send(Err(Error::InvalidMessage)).unwrap();
break 'work;
} else if incoming_buf.len() >= len + 8 {
let deserialize_result = bincode::deserialize(&incoming_buf[8..len + 8]);
if let Err(e) = &deserialize_result {
println!("DESERIALIZE ERROR: {:?}", e);
}
recv_tx.send(deserialize_result.map_err(|e| e.into()));
incoming_buf = incoming_buf.split_off(len + 8);
} else {
break;
}
},
None => break,
}
}
thread::sleep(Duration::from_millis(10));
}
stream.shutdown(Shutdown::Both);
}
}
impl<S: PostMsg, R: PostMsg> Drop for PostBox<S, R> {
fn drop(&mut self) {
let _ = self.send_tx.send(None);
// TODO: Cleanly join!
//self.worker.take().map(|handle| handle.join());
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_postoffice<S: PostMsg, R: PostMsg>(id: u16) -> Result<(PostOffice<S, R>, SocketAddr), Error> {
let sock = ([0; 4], 12345 + id).into();
Ok((PostOffice::bind(sock)?, sock))
}
fn loop_for<F: FnMut()>(duration: Duration, mut f: F) {
let start = Instant::now();
while start.elapsed() < duration {
f();
}
}
#[test]
fn connect() {
let (mut postoffice, sock) = create_postoffice::<(), ()>(0).unwrap();
let _client0 = PostBox::<(), ()>::to(sock).unwrap();
let _client1 = PostBox::<(), ()>::to(sock).unwrap();
let _client2 = PostBox::<(), ()>::to(sock).unwrap();
let mut new_clients = 0;
loop_for(Duration::from_millis(250), || {
new_clients += postoffice.new_postboxes().count();
});
assert_eq!(new_clients, 3);
}
/*
#[test]
fn disconnect() {
let (mut postoffice, sock) = create_postoffice::<(), ()>(1).unwrap();
let mut client = PostBox::<i32, ()>::to(sock).unwrap();
loop_for(Duration::from_millis(250), || ());
let mut server = postoffice.new_postboxes().unwrap().next().unwrap();
drop(client);
loop_for(Duration::from_millis(300), || ());
assert!(server.new_messages().is_err());
}
*/
#[test]
fn send_recv() {
let (mut postoffice, sock) = create_postoffice::<(), i32>(2).unwrap();
let test_msgs = vec![1, 1337, 42, -48];
let mut client = PostBox::<i32, ()>::to(sock).unwrap();
loop_for(Duration::from_millis(250), || ());
let mut server = postoffice.new_postboxes().next().unwrap();
for msg in &test_msgs {
client.send_message(msg.clone());
}
let mut recv_msgs = Vec::new();
loop_for(Duration::from_millis(250), || server
.new_messages()
.for_each(|msg| recv_msgs.push(msg)));
assert_eq!(test_msgs, recv_msgs);
}
#[test]
fn send_recv_huge() {
let (mut postoffice, sock) = create_postoffice::<(), Vec<i32>>(3).unwrap();
let test_msgs: Vec<Vec<i32>> = (0..5).map(|i| (0..100000).map(|j| i * 2 + j).collect()).collect();
let mut client = PostBox::<Vec<i32>, ()>::to(sock).unwrap();
loop_for(Duration::from_millis(250), || ());
let mut server = postoffice.new_postboxes().next().unwrap();
for msg in &test_msgs {
client.send_message(msg.clone());
}
let mut recv_msgs = Vec::new();
loop_for(Duration::from_millis(2000), || server
.new_messages()
.for_each(|msg| recv_msgs.push(msg)));
assert_eq!(test_msgs.len(), recv_msgs.len());
assert!(test_msgs == recv_msgs);
}
#[test]
fn send_recv_both() {
let (mut postoffice, sock) = create_postoffice::<u32, u32>(4).unwrap();
let test_msgs = vec![1, 1337, 42, -48];
let mut client = PostBox::<u32, u32>::to(sock).unwrap();
loop_for(Duration::from_millis(250), || ());
let mut server = postoffice.new_postboxes().next().unwrap();
let test_msgs = vec![
(0xDEADBEAD, 0xBEEEEEEF),
(0x1BADB002, 0xBAADF00D),
(0xBAADA555, 0xC0DED00D),
(0xCAFEBABE, 0xDEADC0DE),
];
for (to, from) in test_msgs {
client.send_message(to);
server.send_message(from);
loop_for(Duration::from_millis(250), || ());
assert_eq!(client.new_messages().next().unwrap(), from);
assert_eq!(server.new_messages().next().unwrap(), to);
}
}
}

View File

@ -137,4 +137,8 @@ impl<V: Vox, S: VolSize, M> VolMap<V, S, M> {
pub fn key_pos(&self, key: Vec3<i32>) -> Vec3<i32> {
key * S::SIZE.map(|e| e as i32)
}
pub fn pos_key(&self, pos: Vec3<i32>) -> Vec3<i32> {
Self::chunk_key(pos)
}
}

View File

@ -21,7 +21,7 @@ pub struct Client {
impl Client {
pub fn notify(&mut self, msg: ServerMsg) {
self.postbox.send(msg);
self.postbox.send_message(msg);
}
}

View File

@ -151,7 +151,7 @@ impl Server {
self.clients.notify(entity, ServerMsg::TerrainChunkUpdate {
key,
chunk: chunk.clone(),
chunk: Box::new(chunk.clone()),
});
}
@ -176,7 +176,7 @@ impl Server {
fn handle_new_connections(&mut self) -> Result<Vec<Event>, Error> {
let mut frontend_events = Vec::new();
for mut postbox in self.postoffice.new_connections() {
for mut postbox in self.postoffice.new_postboxes() {
let entity = self.state
.ecs_mut()
.create_entity_synced()
@ -245,7 +245,7 @@ impl Server {
ClientState::Connected => match msg {
ClientMsg::Connect { .. } => disconnect = true, // Not allowed when already connected
ClientMsg::Disconnect => disconnect = true,
ClientMsg::Ping => client.postbox.send(ServerMsg::Pong),
ClientMsg::Ping => client.postbox.send_message(ServerMsg::Pong),
ClientMsg::Pong => {},
ClientMsg::Chat(msg) => new_chat_msgs.push((entity, msg)),
ClientMsg::PlayerPhysics { pos, vel, dir } => {
@ -254,9 +254,9 @@ impl Server {
state.write_component(entity, dir);
},
ClientMsg::TerrainChunkRequest { key } => match state.terrain().get_key(key) {
Some(chunk) => client.postbox.send(ServerMsg::TerrainChunkUpdate {
Some(chunk) => client.postbox.send_message(ServerMsg::TerrainChunkUpdate {
key,
chunk: chunk.clone(),
chunk: Box::new(chunk.clone()),
}),
None => requested_chunks.push(key),
},
@ -270,7 +270,7 @@ impl Server {
disconnect = true;
} else if state.get_time() - client.last_ping > CLIENT_TIMEOUT * 0.5 {
// Try pinging the client if the timeout is nearing
client.postbox.send(ServerMsg::Ping);
client.postbox.send_message(ServerMsg::Ping);
}
if disconnect {