Patched in netcode refactor

Former-commit-id: 34a3b9a95129edad6dc8280d9a3cb41581d4d386
This commit is contained in:
Joshua Barretto 2019-03-05 18:39:18 +00:00
parent a8b0039898
commit 664f28a972
13 changed files with 588 additions and 86 deletions

View File

@ -3,6 +3,7 @@ use common::net::PostError;
#[derive(Debug)]
pub enum Error {
Network(PostError),
ServerTimeout,
ServerShutdown,
Other(String),
}
@ -10,7 +11,7 @@ pub enum Error {
impl From<PostError> for Error {
fn from(err: PostError) -> Self {
match err {
PostError::Disconnected => Error::ServerShutdown,
PostError::Disconnect => Error::ServerShutdown,
err => Error::Network(err),
}
}

View File

@ -1,3 +1,5 @@
#![feature(label_break_value)]
pub mod error;
pub mod input;
@ -27,6 +29,8 @@ use common::{
};
use world::World;
const SERVER_TIMEOUT: f64 = 5.0; // Seconds
pub enum Event {
Chat(String),
}
@ -39,7 +43,7 @@ pub struct Client {
tick: u64,
state: State,
player: Option<Uid>,
player: Option<EcsEntity>,
// Testing
world: World,
@ -101,7 +105,7 @@ impl Client {
pub fn state_mut(&mut self) -> &mut State { &mut self.state }
/// Get an entity from its UID, creating it if it does not exists
pub fn get_or_create_entity(&mut self, uid: Uid) -> EcsEntity {
pub fn get_or_create_entity_from_uid(&mut self, uid: Uid) -> EcsEntity {
// Find the ECS entity from its UID
let ecs_entity = self.state().ecs_world()
.read_resource::<comp::UidAllocator>()
@ -126,7 +130,7 @@ impl Client {
/// Get the player entity
#[allow(dead_code)]
pub fn player(&self) -> Option<Uid> {
pub fn player(&self) -> Option<EcsEntity> {
self.player
}
@ -138,8 +142,8 @@ impl Client {
/// Send a chat message to the server
#[allow(dead_code)]
pub fn send_chat(&mut self, msg: String) -> Result<(), Error> {
Ok(self.postbox.send(ClientMsg::Chat(msg))?)
pub fn send_chat(&mut self, msg: String) {
self.postbox.send(ClientMsg::Chat(msg))
}
/// Execute a single client tick, handle input and update the game state by the given duration
@ -164,7 +168,7 @@ impl Client {
frontend_events.append(&mut self.handle_new_messages()?);
// Step 3
if let Some(ecs_entity) = self.player.and_then(|uid| self.state().get_entity(uid)) {
if let Some(ecs_entity) = self.player {
// TODO: remove this
const PLAYER_VELOCITY: f32 = 100.0;
@ -175,6 +179,20 @@ impl Client {
// Tick the client's LocalState (step 3)
self.state.tick(dt);
// Update the server about the player's physics attributes
if let Some(ecs_entity) = self.player {
match (
self.state.read_storage().get(ecs_entity).cloned(),
self.state.read_storage().get(ecs_entity).cloned(),
self.state.read_storage().get(ecs_entity).cloned(),
) {
(Some(pos), Some(vel), Some(dir)) => {
self.postbox.send(ClientMsg::PlayerPhysics { pos, vel, dir });
},
_ => {},
}
}
// Finish the tick, pass control back to the frontend (step 6)
self.tick += 1;
Ok(frontend_events)
@ -200,10 +218,17 @@ impl Client {
for msg in new_msgs {
match msg {
ServerMsg::Shutdown => return Err(Error::ServerShutdown),
ServerMsg::Ping => self.postbox.send(ClientMsg::Pong),
ServerMsg::Pong => {},
ServerMsg::Chat(msg) => frontend_events.push(Event::Chat(msg)),
ServerMsg::SetPlayerEntity(uid) => self.player = Some(uid),
ServerMsg::SetPlayerEntity(uid) => {
println!("Ent!");
let ecs_entity = self.get_or_create_entity_from_uid(uid);
self.player = Some(ecs_entity);
},
ServerMsg::EntityPhysics { uid, pos, vel, dir } => {
let ecs_entity = self.get_or_create_entity(uid);
println!("Phys!");
let ecs_entity = self.get_or_create_entity_from_uid(uid);
self.state.write_component(ecs_entity, pos);
self.state.write_component(ecs_entity, vel);
self.state.write_component(ecs_entity, dir);
@ -213,8 +238,12 @@ impl Client {
},
}
}
} else if let Some(err) = self.postbox.status() {
} else if let Some(err) = self.postbox.error() {
return Err(err.into());
} else if self.state.get_time() - self.last_ping > SERVER_TIMEOUT * 0.5 {
self.postbox.send(ClientMsg::Ping);
} else if self.state.get_time() - self.last_ping > SERVER_TIMEOUT {
return Err(Error::ServerTimeout);
}
Ok(frontend_events)
@ -223,6 +252,6 @@ impl Client {
impl Drop for Client {
fn drop(&mut self) {
self.postbox.send(ClientMsg::Disconnect).unwrap();
self.postbox.send(ClientMsg::Disconnect);
}
}

View File

@ -1,5 +1,17 @@
use crate::comp::{
Uid,
phys,
};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ClientMsg {
Ping,
Pong,
Chat(String),
PlayerPhysics {
pos: phys::Pos,
vel: phys::Vel,
dir: phys::Dir,
},
Disconnect,
}

View File

@ -6,6 +6,8 @@ use crate::comp::{
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ServerMsg {
Shutdown,
Ping,
Pong,
Chat(String),
SetPlayerEntity(Uid),
EntityPhysics {

View File

@ -1,5 +1,6 @@
pub mod data;
pub mod error;
pub mod post;
pub mod postbox;
pub mod postoffice;
mod test;
@ -7,9 +8,11 @@ mod test;
// Reexports
pub use self::{
data::{ClientMsg, ServerMsg},
error::PostError,
postbox::PostBox,
postoffice::PostOffice,
post::{
Error as PostError,
PostBox,
PostOffice,
},
};
pub trait PostSend = 'static + serde::Serialize + std::marker::Send + std::fmt::Debug;

432
common/src/net/post.rs Normal file
View File

@ -0,0 +1,432 @@
use std::{
fmt,
thread,
net::{SocketAddr, Shutdown},
sync::mpsc::TryRecvError,
io::{self, Read, Write},
collections::VecDeque,
time::Duration,
convert::TryFrom,
};
use serde;
use mio::{
net::{TcpListener, TcpStream},
Events,
Poll,
PollOpt,
Ready,
Token,
};
use mio_extras::channel::{
channel,
Receiver,
Sender,
};
use bincode;
#[derive(Clone, Debug, PartialEq)]
pub enum Error {
Disconnect,
Network,
InvalidMsg,
Internal,
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Self {
Error::Network
}
}
impl From<TryRecvError> for Error {
fn from(err: TryRecvError) -> Self {
Error::Internal
}
}
impl From<bincode::ErrorKind> for Error {
fn from(err: bincode::ErrorKind) -> Self {
Error::InvalidMsg
}
}
impl<T> From<mio_extras::channel::SendError<T>> for Error {
fn from(err: mio_extras::channel::SendError<T>) -> Self {
Error::Internal
}
}
pub trait PostSend = 'static + serde::Serialize + Send + fmt::Debug;
pub trait PostRecv = 'static + serde::de::DeserializeOwned + Send + fmt::Debug;
const TCP_TOK: Token = Token(0);
const CTRL_TOK: Token = Token(1);
const POSTBOX_TOK: Token = Token(2);
const SEND_TOK: Token = Token(3);
const RECV_TOK: Token = Token(4);
const MAX_MSG_BYTES: usize = 1 << 20;
enum CtrlMsg {
Shutdown,
}
pub struct PostOffice<S: PostSend, R: PostRecv> {
worker: Option<thread::JoinHandle<Result<(), Error>>>,
ctrl_tx: Sender<CtrlMsg>,
postbox_rx: Receiver<Result<PostBox<S, R>, Error>>,
poll: Poll,
err: Option<Error>,
}
impl<S: PostSend, R: PostRecv> PostOffice<S, R> {
pub fn bind<A: Into<SocketAddr>>(addr: A) -> Result<Self, Error> {
let tcp_listener = TcpListener::bind(&addr.into())?;
let (ctrl_tx, ctrl_rx) = channel();
let (postbox_tx, postbox_rx) = channel();
let worker_poll = Poll::new()?;
worker_poll.register(&tcp_listener, TCP_TOK, Ready::readable(), PollOpt::edge())?;
worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?;
let office_poll = Poll::new()?;
office_poll.register(&postbox_rx, POSTBOX_TOK, Ready::readable(), PollOpt::edge())?;
let worker = thread::spawn(move || office_worker(
worker_poll,
tcp_listener,
ctrl_rx,
postbox_tx,
));
Ok(Self {
worker: Some(worker),
ctrl_tx,
postbox_rx,
poll: office_poll,
err: None,
})
}
pub fn error(&self) -> Option<Error> {
self.err.clone()
}
pub fn new_connections(&mut self) -> impl ExactSizeIterator<Item=PostBox<S, R>> {
let mut conns = VecDeque::new();
if let Some(_) = self.err {
return conns.into_iter();
}
let mut events = Events::with_capacity(64);
if let Err(err) = self.poll.poll(
&mut events,
Some(Duration::new(0, 0)),
) {
self.err = Some(err.into());
return conns.into_iter();
}
for event in events {
match event.token() {
// Keep reading new postboxes from the channel
POSTBOX_TOKEN => loop {
match self.postbox_rx.try_recv() {
Ok(Ok(conn)) => conns.push_back(conn),
Err(TryRecvError::Empty) => break,
Err(err) => {
self.err = Some(err.into());
return conns.into_iter();
},
Ok(Err(err)) => {
self.err = Some(err.into());
return conns.into_iter();
},
}
},
tok => panic!("Unexpected event token '{:?}'", tok),
}
}
conns.into_iter()
}
}
impl<S: PostSend, R: PostRecv> Drop for PostOffice<S, R> {
fn drop(&mut self) {
let _ = self.ctrl_tx.send(CtrlMsg::Shutdown);
let _ = self.worker.take().map(|w| w.join());
}
}
fn office_worker<S: PostSend, R: PostRecv>(
poll: Poll,
tcp_listener: TcpListener,
ctrl_rx: Receiver<CtrlMsg>,
postbox_tx: Sender<Result<PostBox<S, R>, Error>>,
) -> Result<(), Error> {
let mut events = Events::with_capacity(64);
loop {
if let Err(err) = poll.poll(&mut events, None) {
postbox_tx.send(Err(err.into()))?;
return Ok(());
}
for event in &events {
match event.token() {
CTRL_TOK => loop {
match ctrl_rx.try_recv() {
Ok(CtrlMsg::Shutdown) => return Ok(()),
Err(TryRecvError::Empty) => {},
Err(err) => {
postbox_tx.send(Err(err.into()))?;
return Ok(());
},
}
},
TCP_TOK => postbox_tx.send(
match tcp_listener.accept() {
Ok((stream, _)) => PostBox::from_tcpstream(stream),
Err(err) => Err(err.into()),
}
)?,
tok => panic!("Unexpected event token '{:?}'", tok),
}
}
}
}
pub struct PostBox<S: PostSend, R: PostRecv> {
worker: Option<thread::JoinHandle<Result<(), Error>>>,
ctrl_tx: Sender<CtrlMsg>,
send_tx: Sender<S>,
recv_rx: Receiver<Result<R, Error>>,
poll: Poll,
err: Option<Error>,
}
impl<S: PostSend, R: PostRecv> PostBox<S, R> {
pub fn to_server<A: Into<SocketAddr>>(addr: A) -> Result<Self, Error> {
Self::from_tcpstream(TcpStream::connect(&addr.into())?)
}
fn from_tcpstream(tcp_stream: TcpStream) -> Result<Self, Error> {
let (ctrl_tx, ctrl_rx) = channel();
let (send_tx, send_rx) = channel();
let (recv_tx, recv_rx) = channel();
let worker_poll = Poll::new()?;
worker_poll.register(&tcp_stream, TCP_TOK, Ready::readable(), PollOpt::edge())?;
worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?;
worker_poll.register(&send_rx, SEND_TOK, Ready::readable(), PollOpt::edge())?;
let postbox_poll = Poll::new()?;
postbox_poll.register(&recv_rx, RECV_TOK, Ready::readable(), PollOpt::edge())?;
let worker = thread::spawn(move || postbox_worker(
worker_poll,
tcp_stream,
ctrl_rx,
send_rx,
recv_tx,
));
Ok(Self {
worker: Some(worker),
ctrl_tx,
send_tx,
recv_rx,
poll: postbox_poll,
err: None,
})
}
pub fn error(&self) -> Option<Error> {
self.err.clone()
}
pub fn send(&mut self, data: S) {
let _ = self.send_tx.send(data);
}
pub fn new_messages(&mut self) -> impl ExactSizeIterator<Item=R> {
let mut msgs = VecDeque::new();
if let Some(_) = self.err {
return msgs.into_iter();
}
let mut events = Events::with_capacity(64);
if let Err(err) = self.poll.poll(
&mut events,
Some(Duration::new(0, 0)),
) {
self.err = Some(err.into());
return msgs.into_iter();
}
for event in events {
match event.token() {
// Keep reading new messages from the channel
RECV_TOKEN => loop {
match self.recv_rx.try_recv() {
Ok(Ok(msg)) => msgs.push_back(msg),
Err(TryRecvError::Empty) => break,
Err(err) => {
self.err = Some(err.into());
return msgs.into_iter();
},
Ok(Err(err)) => {
self.err = Some(err.into());
return msgs.into_iter();
},
}
},
tok => panic!("Unexpected event token '{:?}'", tok),
}
}
msgs.into_iter()
}
}
impl<S: PostSend, R: PostRecv> Drop for PostBox<S, R> {
fn drop(&mut self) {
let _ = self.ctrl_tx.send(CtrlMsg::Shutdown);
let _ = self.worker.take().map(|w| w.join());
}
}
fn postbox_worker<S: PostSend, R: PostRecv>(
poll: Poll,
mut tcp_stream: TcpStream,
ctrl_rx: Receiver<CtrlMsg>,
send_rx: Receiver<S>,
recv_tx: Sender<Result<R, Error>>,
) -> Result<(), Error> {
enum RecvState {
ReadHead(Vec<u8>),
ReadBody(usize, Vec<u8>),
}
let mut recv_state = RecvState::ReadHead(Vec::with_capacity(8));
let mut events = Events::with_capacity(64);
'work: loop {
if let Err(err) = poll.poll(&mut events, None) {
recv_tx.send(Err(err.into()))?;
break 'work;
}
for event in &events {
match event.token() {
CTRL_TOK => loop {
match ctrl_rx.try_recv() {
Ok(CtrlMsg::Shutdown) => {
break 'work;
},
Err(TryRecvError::Empty) => break,
Err(err) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
}
},
SEND_TOK => loop {
match send_rx.try_recv() {
Ok(outgoing_msg) => {
let mut msg_bytes = match bincode::serialize(&outgoing_msg) {
Ok(bytes) => bytes,
Err(err) => {
recv_tx.send(Err((*err).into()));
break 'work;
},
};
let mut packet = msg_bytes
.len()
.to_le_bytes()
.as_ref()
.to_vec();
packet.append(&mut msg_bytes);
match tcp_stream.write_all(&packet) {
Ok(()) => {},
Err(err) => {
recv_tx.send(Err(err.into()));
break 'work;
},
}
},
Err(TryRecvError::Empty) => break,
Err(err) => Err(err)?,
}
},
TCP_TOK => loop {
match tcp_stream.take_error() {
Ok(None) => {},
Ok(Some(err)) => {
recv_tx.send(Err(err.into()));
break 'work;
},
Err(err) => {
recv_tx.send(Err(err.into()));
break 'work;
},
}
match &mut recv_state {
RecvState::ReadHead(head) => if head.len() == 8 {
let len = usize::from_le_bytes(<[u8; 8]>::try_from(head.as_slice()).unwrap());
if len > MAX_MSG_BYTES {
recv_tx.send(Err(Error::InvalidMsg));
break 'work;
} else if len == 0 {
recv_state = RecvState::ReadHead(Vec::with_capacity(8));
break;
} else {
recv_state = RecvState::ReadBody(
len,
Vec::new(),
);
}
} else {
let mut b = [0; 1];
match tcp_stream.read(&mut b) {
Ok(_) => head.push(b[0]),
Err(_) => break,
}
},
RecvState::ReadBody(len, body) => if body.len() == *len {
match bincode::deserialize(&body) {
Ok(msg) => {
recv_tx.send(Ok(msg))?;
recv_state = RecvState::ReadHead(Vec::with_capacity(8));
},
Err(err) => {
recv_tx.send(Err((*err).into()))?;
break 'work;
},
}
} else {
let left = *len - body.len();
let mut buf = vec![0; left];
match tcp_stream.read(&mut buf) {
Ok(_) => body.append(&mut buf),
Err(err) => {
recv_tx.send(Err(err.into()))?;
break 'work;
},
}
},
}
},
tok => panic!("Unexpected event token '{:?}'", tok),
}
}
}
tcp_stream.shutdown(Shutdown::Both);
Ok(())
}

View File

@ -120,14 +120,22 @@ where
// If an error occured, or previously occured, just give up
if let Some(_) = self.err {
return items.into_iter();
} else if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) {
self.err = Some(err.into());
return items.into_iter();
}
loop {
match self.recv.try_recv() {
Ok(Ok(item)) => items.push_back(item),
Ok(Err(err)) => self.err = Some(err.into()),
Err(TryRecvError::Empty) => break,
Err(err) => self.err = Some(err.into()),
for event in events {
match event.token() {
DATA_TOKEN => loop {
match self.recv.try_recv() {
Ok(Ok(item)) => items.push_back(item),
Err(TryRecvError::Empty) => break,
Err(err) => self.err = Some(err.into()),
Ok(Err(err)) => self.err = Some(err.into()),
}
},
_ => (),
}
}
@ -145,15 +153,17 @@ fn postbox_thread<S, R>(
S: PostSend,
R: PostRecv,
{
let mut events = Events::with_capacity(64);
// Receiving related variables
let mut events = Events::with_capacity(64);
let mut recv_buff = Vec::new();
let mut recv_nextlen: u64 = 0;
loop {
let mut disconnected = false;
poll.poll(&mut events, None)
poll.poll(&mut events, Some(Duration::from_millis(20)))
.expect("Failed to execute poll(), most likely fault of the OS");
println!("FINISHED POLL!");
for event in events.iter() {
println!("EVENT!");
match event.token() {
CTRL_TOKEN => match ctrl_rx.try_recv().unwrap() {
ControlMsg::Shutdown => return,
@ -162,16 +172,17 @@ fn postbox_thread<S, R>(
Ok(_) => {}
// Returned when all the data has been read
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
Err(e) => {
recv_tx.send(Err(e.into())).unwrap();
},
Err(e) => recv_tx.send(Err(e.into())).unwrap(),
},
DATA_TOKEN => {
let mut packet = bincode::serialize(&send_rx.try_recv().unwrap()).unwrap();
let msg = send_rx.try_recv().unwrap();
println!("Send: {:?}", msg);
let mut packet = bincode::serialize(&msg).unwrap();
packet.splice(0..0, (packet.len() as u64).to_be_bytes().iter().cloned());
match connection.write_bufs(&[packet.as_slice().into()]) {
Ok(_) => {}
Ok(_) => { println!("Sent!"); }
Err(e) => {
println!("Send error!");
recv_tx.send(Err(e.into())).unwrap();
}
};
@ -181,9 +192,9 @@ fn postbox_thread<S, R>(
}
loop {
if recv_nextlen == 0 && recv_buff.len() >= 8 {
println!("Read nextlen");
recv_nextlen = u64::from_be_bytes(
<[u8; 8]>::try_from(recv_buff.drain(0..8).collect::<Vec<u8>>().as_slice())
.unwrap(),
<[u8; 8]>::try_from(recv_buff.drain(0..8).collect::<Vec<u8>>().as_slice()).unwrap(),
);
if recv_nextlen > MESSAGE_SIZE_CAP {
recv_tx.send(Err(PostErrorInternal::MsgSizeLimitExceeded)).unwrap();
@ -202,15 +213,16 @@ fn postbox_thread<S, R>(
.collect::<Vec<u8>>()
.as_slice()) {
Ok(msg) => {
println!("Recv: {:?}", msg);
recv_tx
.send(Ok(msg))
.unwrap();
recv_nextlen = 0;
}
Err(e) => {
println!("Recv error: {:?}", e);
recv_tx.send(Err(e.into())).unwrap();
recv_nextlen = 0;
continue
}
}
} else {

View File

@ -4,6 +4,7 @@ use std::{
collections::VecDeque,
net::SocketAddr,
thread,
sync::mpsc::TryRecvError,
};
// External
@ -93,11 +94,13 @@ where
for event in events {
match event.token() {
// Ignore recv error
DATA_TOKEN => match self.recv.try_recv() {
Ok(Ok(conn)) => conns.push_back(conn),
Err(err) => self.err = Some(err.into()),
Ok(Err(err)) => self.err = Some(err.into()),
DATA_TOKEN => loop {
match self.recv.try_recv() {
Ok(Ok(conn)) => conns.push_back(conn),
Err(TryRecvError::Empty) => break,
Err(err) => self.err = Some(err.into()),
Ok(Err(err)) => self.err = Some(err.into()),
}
},
_ => (),
}

View File

@ -116,11 +116,6 @@ impl State {
let _ = self.ecs_world.write_storage().insert(entity, comp);
}
/// Read a clone of a component attributed to a particular entity
pub fn read_component<C: Component + Clone>(&self, entity: EcsEntity) -> Option<C> {
self.ecs_world.read_storage::<C>().get(entity).cloned()
}
/// Get a read-only reference to the storage of a particular component type
pub fn read_storage<C: Component>(&self) -> EcsStorage<C, Fetch<EcsMaskedStorage<C>>> {
self.ecs_world.read_storage::<C>()

View File

@ -24,9 +24,9 @@ fn main() {
for event in events {
match event {
Event::ClientConnected { uid } => info!("Client {} connected!", uid),
Event::ClientDisconnected { uid } => info!("Client {} disconnected!", uid),
Event::Chat { uid, msg } => info!("[Client {}] {}", uid, msg),
Event::ClientConnected { ecs_entity } => println!("Client connected!"),
Event::ClientDisconnected { ecs_entity } => println!("Client disconnected!"),
Event::Chat { ecs_entity, msg } => println!("[Client] {}", msg),
}
}

View File

@ -7,7 +7,7 @@ use common::{
use crate::Error;
pub struct Client {
pub uid: comp::Uid,
pub ecs_entity: EcsEntity,
pub postbox: PostBox<ServerMsg, ClientMsg>,
pub last_ping: f64,
}
@ -33,16 +33,14 @@ impl Clients {
pub fn notify_all(&mut self, msg: ServerMsg) {
for client in &mut self.clients {
// Consume any errors, deal with them later
let _ = client.postbox.send(msg.clone());
client.postbox.send(msg.clone());
}
}
pub fn notify_all_except(&mut self, uid: comp::Uid, msg: ServerMsg) {
pub fn notify_all_except(&mut self, ecs_entity: EcsEntity, msg: ServerMsg) {
for client in &mut self.clients {
if client.uid != uid {
// Consume any errors, deal with them later
let _ = client.postbox.send(msg.clone());
if client.ecs_entity != ecs_entity {
client.postbox.send(msg.clone());
}
}
}

View File

@ -38,13 +38,13 @@ const CLIENT_TIMEOUT: f64 = 5.0; // Seconds
pub enum Event {
ClientConnected {
uid: comp::Uid,
ecs_entity: EcsEntity,
},
ClientDisconnected {
uid: comp::Uid,
ecs_entity: EcsEntity,
},
Chat {
uid: comp::Uid,
ecs_entity: EcsEntity,
msg: String,
},
}
@ -65,7 +65,7 @@ impl Server {
state: State::new(),
world: World::new(),
postoffice: PostOffice::new(SocketAddr::from(([0; 4], 59003)))?,
postoffice: PostOffice::bind(SocketAddr::from(([0; 4], 59003)))?,
clients: Clients::empty(),
})
}
@ -89,9 +89,7 @@ impl Server {
.with(comp::phys::Pos(Vec3::zero()))
.with(comp::phys::Vel(Vec3::zero()))
.with(comp::phys::Dir(Vec3::unit_y()))
// When the player is first created, force a physics notification to everyone
// including themselves.
.with(comp::phys::UpdateKind::Force)
.with(comp::phys::UpdateKind::Passive)
}
/// Get a reference to the server's world.
@ -122,7 +120,7 @@ impl Server {
let mut frontend_events = Vec::new();
// If networking has problems, handle them
if let Some(err) = self.postoffice.status() {
if let Some(err) = self.postoffice.error() {
return Err(err.into());
}
@ -154,19 +152,23 @@ impl Server {
let mut frontend_events = Vec::new();
for mut postbox in self.postoffice.new_connections() {
let ecs_entity = self.build_player().build();
let uid = self.state.read_component(ecs_entity).unwrap();
let ecs_entity = self.build_player()
// When the player is first created, force a physics notification to everyone
// including themselves.
.with(comp::phys::UpdateKind::Force)
.build();
let uid = self.state.read_storage().get(ecs_entity).cloned().unwrap();
let _ = postbox.send(ServerMsg::SetPlayerEntity(uid));
postbox.send(ServerMsg::SetPlayerEntity(uid));
self.clients.add(Client {
uid,
ecs_entity,
postbox,
last_ping: self.state.get_time(),
});
frontend_events.push(Event::ClientConnected {
uid,
ecs_entity,
});
}
@ -192,19 +194,26 @@ impl Server {
// Process incoming messages
for msg in new_msgs {
match msg {
ClientMsg::Chat(msg) => new_chat_msgs.push((client.uid, msg)),
ClientMsg::Ping => client.postbox.send(ServerMsg::Pong),
ClientMsg::Pong => {},
ClientMsg::Chat(msg) => new_chat_msgs.push((client.ecs_entity, msg)),
ClientMsg::PlayerPhysics { pos, vel, dir } => {
state.write_component(client.ecs_entity, pos);
state.write_component(client.ecs_entity, vel);
state.write_component(client.ecs_entity, dir);
},
ClientMsg::Disconnect => disconnected = true,
}
}
} else if
state.get_time() - client.last_ping > CLIENT_TIMEOUT || // Timeout
client.postbox.status().is_some() // Postbox eror
client.postbox.error().is_some() // Postbox eror
{
disconnected = true;
}
if disconnected {
disconnected_clients.push(client.uid);
disconnected_clients.push(client.ecs_entity);
true
} else {
false
@ -212,24 +221,24 @@ impl Server {
});
// Handle new chat messages
for (uid, msg) in new_chat_msgs {
for (ecs_entity, msg) in new_chat_msgs {
self.clients.notify_all(ServerMsg::Chat(msg.clone()));
frontend_events.push(Event::Chat {
uid,
ecs_entity,
msg,
});
}
// Handle client disconnects
for uid in disconnected_clients {
self.clients.notify_all(ServerMsg::EntityDeleted(uid));
for ecs_entity in disconnected_clients {
self.clients.notify_all(ServerMsg::EntityDeleted(state.read_storage().get(ecs_entity).cloned().unwrap()));
frontend_events.push(Event::ClientDisconnected {
uid,
ecs_entity,
});
state.delete_entity(uid);
state.ecs_world_mut().delete_entity(ecs_entity);
}
Ok(frontend_events)
@ -237,7 +246,8 @@ impl Server {
/// Sync client states with the most up to date information
fn sync_clients(&mut self) {
for (&uid, &pos, &vel, &dir, update_kind) in (
for (entity, &uid, &pos, &vel, &dir, update_kind) in (
&self.state.ecs_world().entities(),
&self.state.ecs_world().read_storage::<comp::Uid>(),
&self.state.ecs_world().read_storage::<comp::phys::Pos>(),
&self.state.ecs_world().read_storage::<comp::phys::Vel>(),
@ -255,7 +265,7 @@ impl Server {
// everyone, including the player themselves, of their new physics information.
match update_kind {
comp::phys::UpdateKind::Force => self.clients.notify_all(msg),
comp::phys::UpdateKind::Passive => self.clients.notify_all_except(uid, msg),
comp::phys::UpdateKind::Passive => self.clients.notify_all_except(entity, msg),
}
// Now that the update has occured, default to a passive update
@ -263,3 +273,9 @@ impl Server {
}
}
}
impl Drop for Server {
fn drop(&mut self) {
self.clients.notify_all(ServerMsg::Shutdown);
}
}

View File

@ -135,18 +135,17 @@ impl Scene {
/// Maintain data such as GPU constant buffers, models, etc. To be called once per tick.
pub fn maintain(&mut self, renderer: &mut Renderer, client: &Client) {
// Get player position
let player_pos = match client.player().and_then(|uid| client.state().get_entity(uid)) {
Some(ecs_entity) => {
client
.state()
.ecs_world()
.read_storage::<comp::phys::Pos>()
.get(ecs_entity)
.expect("There was no position component on the player entity!")
.0
}
None => Vec3::default(),
};
let player_pos = client
.player()
.and_then(|ent| client
.state()
.ecs_world()
.read_storage::<comp::phys::Pos>()
.get(ent)
.map(|pos| pos.0)
)
.unwrap_or(Vec3::zero());
// Alter camera position to match player
self.camera.set_focus_pos(player_pos);