diff --git a/client/src/error.rs b/client/src/error.rs index ac61996040..e702a67b0f 100644 --- a/client/src/error.rs +++ b/client/src/error.rs @@ -3,6 +3,7 @@ use common::net::PostError; #[derive(Debug)] pub enum Error { Network(PostError), + ServerTimeout, ServerShutdown, Other(String), } @@ -10,7 +11,7 @@ pub enum Error { impl From for Error { fn from(err: PostError) -> Self { match err { - PostError::Disconnected => Error::ServerShutdown, + PostError::Disconnect => Error::ServerShutdown, err => Error::Network(err), } } diff --git a/client/src/lib.rs b/client/src/lib.rs index 7c2fc8017e..6d6aca5cdc 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(label_break_value)] + pub mod error; pub mod input; @@ -27,6 +29,8 @@ use common::{ }; use world::World; +const SERVER_TIMEOUT: f64 = 5.0; // Seconds + pub enum Event { Chat(String), } @@ -39,7 +43,7 @@ pub struct Client { tick: u64, state: State, - player: Option, + player: Option, // Testing world: World, @@ -101,7 +105,7 @@ impl Client { pub fn state_mut(&mut self) -> &mut State { &mut self.state } /// Get an entity from its UID, creating it if it does not exists - pub fn get_or_create_entity(&mut self, uid: Uid) -> EcsEntity { + pub fn get_or_create_entity_from_uid(&mut self, uid: Uid) -> EcsEntity { // Find the ECS entity from its UID let ecs_entity = self.state().ecs_world() .read_resource::() @@ -126,7 +130,7 @@ impl Client { /// Get the player entity #[allow(dead_code)] - pub fn player(&self) -> Option { + pub fn player(&self) -> Option { self.player } @@ -138,8 +142,8 @@ impl Client { /// Send a chat message to the server #[allow(dead_code)] - pub fn send_chat(&mut self, msg: String) -> Result<(), Error> { - Ok(self.postbox.send(ClientMsg::Chat(msg))?) + pub fn send_chat(&mut self, msg: String) { + self.postbox.send(ClientMsg::Chat(msg)) } /// Execute a single client tick, handle input and update the game state by the given duration @@ -164,7 +168,7 @@ impl Client { frontend_events.append(&mut self.handle_new_messages()?); // Step 3 - if let Some(ecs_entity) = self.player.and_then(|uid| self.state().get_entity(uid)) { + if let Some(ecs_entity) = self.player { // TODO: remove this const PLAYER_VELOCITY: f32 = 100.0; @@ -175,6 +179,20 @@ impl Client { // Tick the client's LocalState (step 3) self.state.tick(dt); + // Update the server about the player's physics attributes + if let Some(ecs_entity) = self.player { + match ( + self.state.read_storage().get(ecs_entity).cloned(), + self.state.read_storage().get(ecs_entity).cloned(), + self.state.read_storage().get(ecs_entity).cloned(), + ) { + (Some(pos), Some(vel), Some(dir)) => { + self.postbox.send(ClientMsg::PlayerPhysics { pos, vel, dir }); + }, + _ => {}, + } + } + // Finish the tick, pass control back to the frontend (step 6) self.tick += 1; Ok(frontend_events) @@ -200,10 +218,17 @@ impl Client { for msg in new_msgs { match msg { ServerMsg::Shutdown => return Err(Error::ServerShutdown), + ServerMsg::Ping => self.postbox.send(ClientMsg::Pong), + ServerMsg::Pong => {}, ServerMsg::Chat(msg) => frontend_events.push(Event::Chat(msg)), - ServerMsg::SetPlayerEntity(uid) => self.player = Some(uid), + ServerMsg::SetPlayerEntity(uid) => { + println!("Ent!"); + let ecs_entity = self.get_or_create_entity_from_uid(uid); + self.player = Some(ecs_entity); + }, ServerMsg::EntityPhysics { uid, pos, vel, dir } => { - let ecs_entity = self.get_or_create_entity(uid); + println!("Phys!"); + let ecs_entity = self.get_or_create_entity_from_uid(uid); self.state.write_component(ecs_entity, pos); self.state.write_component(ecs_entity, vel); self.state.write_component(ecs_entity, dir); @@ -213,8 +238,12 @@ impl Client { }, } } - } else if let Some(err) = self.postbox.status() { + } else if let Some(err) = self.postbox.error() { return Err(err.into()); + } else if self.state.get_time() - self.last_ping > SERVER_TIMEOUT * 0.5 { + self.postbox.send(ClientMsg::Ping); + } else if self.state.get_time() - self.last_ping > SERVER_TIMEOUT { + return Err(Error::ServerTimeout); } Ok(frontend_events) @@ -223,6 +252,6 @@ impl Client { impl Drop for Client { fn drop(&mut self) { - self.postbox.send(ClientMsg::Disconnect).unwrap(); + self.postbox.send(ClientMsg::Disconnect); } } diff --git a/common/src/msg/client.rs b/common/src/msg/client.rs index 2e304e20bd..149cd07ad4 100644 --- a/common/src/msg/client.rs +++ b/common/src/msg/client.rs @@ -1,5 +1,17 @@ +use crate::comp::{ + Uid, + phys, +}; + #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ClientMsg { + Ping, + Pong, Chat(String), + PlayerPhysics { + pos: phys::Pos, + vel: phys::Vel, + dir: phys::Dir, + }, Disconnect, } diff --git a/common/src/msg/server.rs b/common/src/msg/server.rs index b613b790d7..871215138e 100644 --- a/common/src/msg/server.rs +++ b/common/src/msg/server.rs @@ -6,6 +6,8 @@ use crate::comp::{ #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ServerMsg { Shutdown, + Ping, + Pong, Chat(String), SetPlayerEntity(Uid), EntityPhysics { diff --git a/common/src/net/mod.rs b/common/src/net/mod.rs index d9fe3929ce..8fbefac9a8 100644 --- a/common/src/net/mod.rs +++ b/common/src/net/mod.rs @@ -1,5 +1,6 @@ pub mod data; pub mod error; +pub mod post; pub mod postbox; pub mod postoffice; mod test; @@ -7,9 +8,11 @@ mod test; // Reexports pub use self::{ data::{ClientMsg, ServerMsg}, - error::PostError, - postbox::PostBox, - postoffice::PostOffice, + post::{ + Error as PostError, + PostBox, + PostOffice, + }, }; pub trait PostSend = 'static + serde::Serialize + std::marker::Send + std::fmt::Debug; diff --git a/common/src/net/post.rs b/common/src/net/post.rs new file mode 100644 index 0000000000..7e3be2100a --- /dev/null +++ b/common/src/net/post.rs @@ -0,0 +1,432 @@ +use std::{ + fmt, + thread, + net::{SocketAddr, Shutdown}, + sync::mpsc::TryRecvError, + io::{self, Read, Write}, + collections::VecDeque, + time::Duration, + convert::TryFrom, +}; +use serde; +use mio::{ + net::{TcpListener, TcpStream}, + Events, + Poll, + PollOpt, + Ready, + Token, +}; +use mio_extras::channel::{ + channel, + Receiver, + Sender, +}; +use bincode; + +#[derive(Clone, Debug, PartialEq)] +pub enum Error { + Disconnect, + Network, + InvalidMsg, + Internal, +} + +impl From for Error { + fn from(err: io::Error) -> Self { + Error::Network + } +} + +impl From for Error { + fn from(err: TryRecvError) -> Self { + Error::Internal + } +} + +impl From for Error { + fn from(err: bincode::ErrorKind) -> Self { + Error::InvalidMsg + } +} + +impl From> for Error { + fn from(err: mio_extras::channel::SendError) -> Self { + Error::Internal + } +} + +pub trait PostSend = 'static + serde::Serialize + Send + fmt::Debug; +pub trait PostRecv = 'static + serde::de::DeserializeOwned + Send + fmt::Debug; + +const TCP_TOK: Token = Token(0); +const CTRL_TOK: Token = Token(1); +const POSTBOX_TOK: Token = Token(2); +const SEND_TOK: Token = Token(3); +const RECV_TOK: Token = Token(4); + +const MAX_MSG_BYTES: usize = 1 << 20; + +enum CtrlMsg { + Shutdown, +} + +pub struct PostOffice { + worker: Option>>, + ctrl_tx: Sender, + postbox_rx: Receiver, Error>>, + poll: Poll, + err: Option, +} + +impl PostOffice { + pub fn bind>(addr: A) -> Result { + let tcp_listener = TcpListener::bind(&addr.into())?; + + let (ctrl_tx, ctrl_rx) = channel(); + let (postbox_tx, postbox_rx) = channel(); + + let worker_poll = Poll::new()?; + worker_poll.register(&tcp_listener, TCP_TOK, Ready::readable(), PollOpt::edge())?; + worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?; + + let office_poll = Poll::new()?; + office_poll.register(&postbox_rx, POSTBOX_TOK, Ready::readable(), PollOpt::edge())?; + + let worker = thread::spawn(move || office_worker( + worker_poll, + tcp_listener, + ctrl_rx, + postbox_tx, + )); + + Ok(Self { + worker: Some(worker), + ctrl_tx, + postbox_rx, + poll: office_poll, + err: None, + }) + } + + pub fn error(&self) -> Option { + self.err.clone() + } + + pub fn new_connections(&mut self) -> impl ExactSizeIterator> { + let mut conns = VecDeque::new(); + + if let Some(_) = self.err { + return conns.into_iter(); + } + + let mut events = Events::with_capacity(64); + if let Err(err) = self.poll.poll( + &mut events, + Some(Duration::new(0, 0)), + ) { + self.err = Some(err.into()); + return conns.into_iter(); + } + + for event in events { + match event.token() { + // Keep reading new postboxes from the channel + POSTBOX_TOKEN => loop { + match self.postbox_rx.try_recv() { + Ok(Ok(conn)) => conns.push_back(conn), + Err(TryRecvError::Empty) => break, + Err(err) => { + self.err = Some(err.into()); + return conns.into_iter(); + }, + Ok(Err(err)) => { + self.err = Some(err.into()); + return conns.into_iter(); + }, + } + }, + tok => panic!("Unexpected event token '{:?}'", tok), + } + } + + conns.into_iter() + } +} + +impl Drop for PostOffice { + fn drop(&mut self) { + let _ = self.ctrl_tx.send(CtrlMsg::Shutdown); + let _ = self.worker.take().map(|w| w.join()); + } +} + +fn office_worker( + poll: Poll, + tcp_listener: TcpListener, + ctrl_rx: Receiver, + postbox_tx: Sender, Error>>, +) -> Result<(), Error> { + let mut events = Events::with_capacity(64); + loop { + if let Err(err) = poll.poll(&mut events, None) { + postbox_tx.send(Err(err.into()))?; + return Ok(()); + } + + for event in &events { + match event.token() { + CTRL_TOK => loop { + match ctrl_rx.try_recv() { + Ok(CtrlMsg::Shutdown) => return Ok(()), + Err(TryRecvError::Empty) => {}, + Err(err) => { + postbox_tx.send(Err(err.into()))?; + return Ok(()); + }, + } + }, + TCP_TOK => postbox_tx.send( + match tcp_listener.accept() { + Ok((stream, _)) => PostBox::from_tcpstream(stream), + Err(err) => Err(err.into()), + } + )?, + tok => panic!("Unexpected event token '{:?}'", tok), + } + } + } +} + +pub struct PostBox { + worker: Option>>, + ctrl_tx: Sender, + send_tx: Sender, + recv_rx: Receiver>, + poll: Poll, + err: Option, +} + +impl PostBox { + pub fn to_server>(addr: A) -> Result { + Self::from_tcpstream(TcpStream::connect(&addr.into())?) + } + + fn from_tcpstream(tcp_stream: TcpStream) -> Result { + let (ctrl_tx, ctrl_rx) = channel(); + let (send_tx, send_rx) = channel(); + let (recv_tx, recv_rx) = channel(); + + let worker_poll = Poll::new()?; + worker_poll.register(&tcp_stream, TCP_TOK, Ready::readable(), PollOpt::edge())?; + worker_poll.register(&ctrl_rx, CTRL_TOK, Ready::readable(), PollOpt::edge())?; + worker_poll.register(&send_rx, SEND_TOK, Ready::readable(), PollOpt::edge())?; + + let postbox_poll = Poll::new()?; + postbox_poll.register(&recv_rx, RECV_TOK, Ready::readable(), PollOpt::edge())?; + + let worker = thread::spawn(move || postbox_worker( + worker_poll, + tcp_stream, + ctrl_rx, + send_rx, + recv_tx, + )); + + Ok(Self { + worker: Some(worker), + ctrl_tx, + send_tx, + recv_rx, + poll: postbox_poll, + err: None, + }) + } + + pub fn error(&self) -> Option { + self.err.clone() + } + + pub fn send(&mut self, data: S) { + let _ = self.send_tx.send(data); + } + + pub fn new_messages(&mut self) -> impl ExactSizeIterator { + let mut msgs = VecDeque::new(); + + if let Some(_) = self.err { + return msgs.into_iter(); + } + + let mut events = Events::with_capacity(64); + if let Err(err) = self.poll.poll( + &mut events, + Some(Duration::new(0, 0)), + ) { + self.err = Some(err.into()); + return msgs.into_iter(); + } + + for event in events { + match event.token() { + // Keep reading new messages from the channel + RECV_TOKEN => loop { + match self.recv_rx.try_recv() { + Ok(Ok(msg)) => msgs.push_back(msg), + Err(TryRecvError::Empty) => break, + Err(err) => { + self.err = Some(err.into()); + return msgs.into_iter(); + }, + Ok(Err(err)) => { + self.err = Some(err.into()); + return msgs.into_iter(); + }, + } + }, + tok => panic!("Unexpected event token '{:?}'", tok), + } + } + + msgs.into_iter() + } +} + +impl Drop for PostBox { + fn drop(&mut self) { + let _ = self.ctrl_tx.send(CtrlMsg::Shutdown); + let _ = self.worker.take().map(|w| w.join()); + } +} + +fn postbox_worker( + poll: Poll, + mut tcp_stream: TcpStream, + ctrl_rx: Receiver, + send_rx: Receiver, + recv_tx: Sender>, +) -> Result<(), Error> { + enum RecvState { + ReadHead(Vec), + ReadBody(usize, Vec), + } + + let mut recv_state = RecvState::ReadHead(Vec::with_capacity(8)); + let mut events = Events::with_capacity(64); + + 'work: loop { + if let Err(err) = poll.poll(&mut events, None) { + recv_tx.send(Err(err.into()))?; + break 'work; + } + + for event in &events { + match event.token() { + CTRL_TOK => loop { + match ctrl_rx.try_recv() { + Ok(CtrlMsg::Shutdown) => { + break 'work; + }, + Err(TryRecvError::Empty) => break, + Err(err) => { + recv_tx.send(Err(err.into()))?; + break 'work; + }, + } + }, + SEND_TOK => loop { + match send_rx.try_recv() { + Ok(outgoing_msg) => { + let mut msg_bytes = match bincode::serialize(&outgoing_msg) { + Ok(bytes) => bytes, + Err(err) => { + recv_tx.send(Err((*err).into())); + break 'work; + }, + }; + + let mut packet = msg_bytes + .len() + .to_le_bytes() + .as_ref() + .to_vec(); + packet.append(&mut msg_bytes); + + match tcp_stream.write_all(&packet) { + Ok(()) => {}, + Err(err) => { + recv_tx.send(Err(err.into())); + break 'work; + }, + } + }, + Err(TryRecvError::Empty) => break, + Err(err) => Err(err)?, + } + }, + TCP_TOK => loop { + match tcp_stream.take_error() { + Ok(None) => {}, + Ok(Some(err)) => { + recv_tx.send(Err(err.into())); + break 'work; + }, + Err(err) => { + recv_tx.send(Err(err.into())); + break 'work; + }, + } + match &mut recv_state { + RecvState::ReadHead(head) => if head.len() == 8 { + let len = usize::from_le_bytes(<[u8; 8]>::try_from(head.as_slice()).unwrap()); + if len > MAX_MSG_BYTES { + recv_tx.send(Err(Error::InvalidMsg)); + break 'work; + } else if len == 0 { + recv_state = RecvState::ReadHead(Vec::with_capacity(8)); + break; + } else { + recv_state = RecvState::ReadBody( + len, + Vec::new(), + ); + } + } else { + let mut b = [0; 1]; + match tcp_stream.read(&mut b) { + Ok(_) => head.push(b[0]), + Err(_) => break, + } + }, + RecvState::ReadBody(len, body) => if body.len() == *len { + match bincode::deserialize(&body) { + Ok(msg) => { + recv_tx.send(Ok(msg))?; + recv_state = RecvState::ReadHead(Vec::with_capacity(8)); + }, + Err(err) => { + recv_tx.send(Err((*err).into()))?; + break 'work; + }, + } + } else { + let left = *len - body.len(); + let mut buf = vec![0; left]; + match tcp_stream.read(&mut buf) { + Ok(_) => body.append(&mut buf), + Err(err) => { + recv_tx.send(Err(err.into()))?; + break 'work; + }, + } + }, + } + }, + tok => panic!("Unexpected event token '{:?}'", tok), + } + } + } + + tcp_stream.shutdown(Shutdown::Both); + Ok(()) +} diff --git a/common/src/net/postbox.rs b/common/src/net/postbox.rs index 1b25d5abb2..21d9466637 100644 --- a/common/src/net/postbox.rs +++ b/common/src/net/postbox.rs @@ -120,14 +120,22 @@ where // If an error occured, or previously occured, just give up if let Some(_) = self.err { return items.into_iter(); + } else if let Err(err) = self.poll.poll(&mut events, Some(Duration::new(0, 0))) { + self.err = Some(err.into()); + return items.into_iter(); } - loop { - match self.recv.try_recv() { - Ok(Ok(item)) => items.push_back(item), - Ok(Err(err)) => self.err = Some(err.into()), - Err(TryRecvError::Empty) => break, - Err(err) => self.err = Some(err.into()), + for event in events { + match event.token() { + DATA_TOKEN => loop { + match self.recv.try_recv() { + Ok(Ok(item)) => items.push_back(item), + Err(TryRecvError::Empty) => break, + Err(err) => self.err = Some(err.into()), + Ok(Err(err)) => self.err = Some(err.into()), + } + }, + _ => (), } } @@ -145,15 +153,17 @@ fn postbox_thread( S: PostSend, R: PostRecv, { - let mut events = Events::with_capacity(64); // Receiving related variables + let mut events = Events::with_capacity(64); let mut recv_buff = Vec::new(); let mut recv_nextlen: u64 = 0; loop { let mut disconnected = false; - poll.poll(&mut events, None) + poll.poll(&mut events, Some(Duration::from_millis(20))) .expect("Failed to execute poll(), most likely fault of the OS"); + println!("FINISHED POLL!"); for event in events.iter() { + println!("EVENT!"); match event.token() { CTRL_TOKEN => match ctrl_rx.try_recv().unwrap() { ControlMsg::Shutdown => return, @@ -162,16 +172,17 @@ fn postbox_thread( Ok(_) => {} // Returned when all the data has been read Err(ref e) if e.kind() == ErrorKind::WouldBlock => {} - Err(e) => { - recv_tx.send(Err(e.into())).unwrap(); - }, + Err(e) => recv_tx.send(Err(e.into())).unwrap(), }, DATA_TOKEN => { - let mut packet = bincode::serialize(&send_rx.try_recv().unwrap()).unwrap(); + let msg = send_rx.try_recv().unwrap(); + println!("Send: {:?}", msg); + let mut packet = bincode::serialize(&msg).unwrap(); packet.splice(0..0, (packet.len() as u64).to_be_bytes().iter().cloned()); match connection.write_bufs(&[packet.as_slice().into()]) { - Ok(_) => {} + Ok(_) => { println!("Sent!"); } Err(e) => { + println!("Send error!"); recv_tx.send(Err(e.into())).unwrap(); } }; @@ -181,9 +192,9 @@ fn postbox_thread( } loop { if recv_nextlen == 0 && recv_buff.len() >= 8 { + println!("Read nextlen"); recv_nextlen = u64::from_be_bytes( - <[u8; 8]>::try_from(recv_buff.drain(0..8).collect::>().as_slice()) - .unwrap(), + <[u8; 8]>::try_from(recv_buff.drain(0..8).collect::>().as_slice()).unwrap(), ); if recv_nextlen > MESSAGE_SIZE_CAP { recv_tx.send(Err(PostErrorInternal::MsgSizeLimitExceeded)).unwrap(); @@ -202,15 +213,16 @@ fn postbox_thread( .collect::>() .as_slice()) { Ok(msg) => { + println!("Recv: {:?}", msg); recv_tx .send(Ok(msg)) .unwrap(); recv_nextlen = 0; } Err(e) => { + println!("Recv error: {:?}", e); recv_tx.send(Err(e.into())).unwrap(); recv_nextlen = 0; - continue } } } else { diff --git a/common/src/net/postoffice.rs b/common/src/net/postoffice.rs index ce25fbdcaf..e8d70330ad 100644 --- a/common/src/net/postoffice.rs +++ b/common/src/net/postoffice.rs @@ -4,6 +4,7 @@ use std::{ collections::VecDeque, net::SocketAddr, thread, + sync::mpsc::TryRecvError, }; // External @@ -93,11 +94,13 @@ where for event in events { match event.token() { - // Ignore recv error - DATA_TOKEN => match self.recv.try_recv() { - Ok(Ok(conn)) => conns.push_back(conn), - Err(err) => self.err = Some(err.into()), - Ok(Err(err)) => self.err = Some(err.into()), + DATA_TOKEN => loop { + match self.recv.try_recv() { + Ok(Ok(conn)) => conns.push_back(conn), + Err(TryRecvError::Empty) => break, + Err(err) => self.err = Some(err.into()), + Ok(Err(err)) => self.err = Some(err.into()), + } }, _ => (), } diff --git a/common/src/state.rs b/common/src/state.rs index de33bf6986..232c9d6645 100644 --- a/common/src/state.rs +++ b/common/src/state.rs @@ -116,11 +116,6 @@ impl State { let _ = self.ecs_world.write_storage().insert(entity, comp); } - /// Read a clone of a component attributed to a particular entity - pub fn read_component(&self, entity: EcsEntity) -> Option { - self.ecs_world.read_storage::().get(entity).cloned() - } - /// Get a read-only reference to the storage of a particular component type pub fn read_storage(&self) -> EcsStorage>> { self.ecs_world.read_storage::() diff --git a/server-cli/src/main.rs b/server-cli/src/main.rs index f959114998..c4dcdc43ab 100644 --- a/server-cli/src/main.rs +++ b/server-cli/src/main.rs @@ -24,9 +24,9 @@ fn main() { for event in events { match event { - Event::ClientConnected { uid } => info!("Client {} connected!", uid), - Event::ClientDisconnected { uid } => info!("Client {} disconnected!", uid), - Event::Chat { uid, msg } => info!("[Client {}] {}", uid, msg), + Event::ClientConnected { ecs_entity } => println!("Client connected!"), + Event::ClientDisconnected { ecs_entity } => println!("Client disconnected!"), + Event::Chat { ecs_entity, msg } => println!("[Client] {}", msg), } } diff --git a/server/src/client.rs b/server/src/client.rs index ffc876e1bc..8030fb03e5 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -7,7 +7,7 @@ use common::{ use crate::Error; pub struct Client { - pub uid: comp::Uid, + pub ecs_entity: EcsEntity, pub postbox: PostBox, pub last_ping: f64, } @@ -33,16 +33,14 @@ impl Clients { pub fn notify_all(&mut self, msg: ServerMsg) { for client in &mut self.clients { - // Consume any errors, deal with them later - let _ = client.postbox.send(msg.clone()); + client.postbox.send(msg.clone()); } } - pub fn notify_all_except(&mut self, uid: comp::Uid, msg: ServerMsg) { + pub fn notify_all_except(&mut self, ecs_entity: EcsEntity, msg: ServerMsg) { for client in &mut self.clients { - if client.uid != uid { - // Consume any errors, deal with them later - let _ = client.postbox.send(msg.clone()); + if client.ecs_entity != ecs_entity { + client.postbox.send(msg.clone()); } } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 856ab9635d..655066ad47 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -38,13 +38,13 @@ const CLIENT_TIMEOUT: f64 = 5.0; // Seconds pub enum Event { ClientConnected { - uid: comp::Uid, + ecs_entity: EcsEntity, }, ClientDisconnected { - uid: comp::Uid, + ecs_entity: EcsEntity, }, Chat { - uid: comp::Uid, + ecs_entity: EcsEntity, msg: String, }, } @@ -65,7 +65,7 @@ impl Server { state: State::new(), world: World::new(), - postoffice: PostOffice::new(SocketAddr::from(([0; 4], 59003)))?, + postoffice: PostOffice::bind(SocketAddr::from(([0; 4], 59003)))?, clients: Clients::empty(), }) } @@ -89,9 +89,7 @@ impl Server { .with(comp::phys::Pos(Vec3::zero())) .with(comp::phys::Vel(Vec3::zero())) .with(comp::phys::Dir(Vec3::unit_y())) - // When the player is first created, force a physics notification to everyone - // including themselves. - .with(comp::phys::UpdateKind::Force) + .with(comp::phys::UpdateKind::Passive) } /// Get a reference to the server's world. @@ -122,7 +120,7 @@ impl Server { let mut frontend_events = Vec::new(); // If networking has problems, handle them - if let Some(err) = self.postoffice.status() { + if let Some(err) = self.postoffice.error() { return Err(err.into()); } @@ -154,19 +152,23 @@ impl Server { let mut frontend_events = Vec::new(); for mut postbox in self.postoffice.new_connections() { - let ecs_entity = self.build_player().build(); - let uid = self.state.read_component(ecs_entity).unwrap(); + let ecs_entity = self.build_player() + // When the player is first created, force a physics notification to everyone + // including themselves. + .with(comp::phys::UpdateKind::Force) + .build(); + let uid = self.state.read_storage().get(ecs_entity).cloned().unwrap(); - let _ = postbox.send(ServerMsg::SetPlayerEntity(uid)); + postbox.send(ServerMsg::SetPlayerEntity(uid)); self.clients.add(Client { - uid, + ecs_entity, postbox, last_ping: self.state.get_time(), }); frontend_events.push(Event::ClientConnected { - uid, + ecs_entity, }); } @@ -192,19 +194,26 @@ impl Server { // Process incoming messages for msg in new_msgs { match msg { - ClientMsg::Chat(msg) => new_chat_msgs.push((client.uid, msg)), + ClientMsg::Ping => client.postbox.send(ServerMsg::Pong), + ClientMsg::Pong => {}, + ClientMsg::Chat(msg) => new_chat_msgs.push((client.ecs_entity, msg)), + ClientMsg::PlayerPhysics { pos, vel, dir } => { + state.write_component(client.ecs_entity, pos); + state.write_component(client.ecs_entity, vel); + state.write_component(client.ecs_entity, dir); + }, ClientMsg::Disconnect => disconnected = true, } } } else if state.get_time() - client.last_ping > CLIENT_TIMEOUT || // Timeout - client.postbox.status().is_some() // Postbox eror + client.postbox.error().is_some() // Postbox eror { disconnected = true; } if disconnected { - disconnected_clients.push(client.uid); + disconnected_clients.push(client.ecs_entity); true } else { false @@ -212,24 +221,24 @@ impl Server { }); // Handle new chat messages - for (uid, msg) in new_chat_msgs { + for (ecs_entity, msg) in new_chat_msgs { self.clients.notify_all(ServerMsg::Chat(msg.clone())); frontend_events.push(Event::Chat { - uid, + ecs_entity, msg, }); } // Handle client disconnects - for uid in disconnected_clients { - self.clients.notify_all(ServerMsg::EntityDeleted(uid)); + for ecs_entity in disconnected_clients { + self.clients.notify_all(ServerMsg::EntityDeleted(state.read_storage().get(ecs_entity).cloned().unwrap())); frontend_events.push(Event::ClientDisconnected { - uid, + ecs_entity, }); - state.delete_entity(uid); + state.ecs_world_mut().delete_entity(ecs_entity); } Ok(frontend_events) @@ -237,7 +246,8 @@ impl Server { /// Sync client states with the most up to date information fn sync_clients(&mut self) { - for (&uid, &pos, &vel, &dir, update_kind) in ( + for (entity, &uid, &pos, &vel, &dir, update_kind) in ( + &self.state.ecs_world().entities(), &self.state.ecs_world().read_storage::(), &self.state.ecs_world().read_storage::(), &self.state.ecs_world().read_storage::(), @@ -255,7 +265,7 @@ impl Server { // everyone, including the player themselves, of their new physics information. match update_kind { comp::phys::UpdateKind::Force => self.clients.notify_all(msg), - comp::phys::UpdateKind::Passive => self.clients.notify_all_except(uid, msg), + comp::phys::UpdateKind::Passive => self.clients.notify_all_except(entity, msg), } // Now that the update has occured, default to a passive update @@ -263,3 +273,9 @@ impl Server { } } } + +impl Drop for Server { + fn drop(&mut self) { + self.clients.notify_all(ServerMsg::Shutdown); + } +} diff --git a/voxygen/src/scene/mod.rs b/voxygen/src/scene/mod.rs index 4b1fbf0f4d..40f7cb0e2b 100644 --- a/voxygen/src/scene/mod.rs +++ b/voxygen/src/scene/mod.rs @@ -135,18 +135,17 @@ impl Scene { /// Maintain data such as GPU constant buffers, models, etc. To be called once per tick. pub fn maintain(&mut self, renderer: &mut Renderer, client: &Client) { // Get player position - let player_pos = match client.player().and_then(|uid| client.state().get_entity(uid)) { - Some(ecs_entity) => { - client - .state() - .ecs_world() - .read_storage::() - .get(ecs_entity) - .expect("There was no position component on the player entity!") - .0 - } - None => Vec3::default(), - }; + let player_pos = client + .player() + .and_then(|ent| client + .state() + .ecs_world() + .read_storage::() + .get(ent) + .map(|pos| pos.0) + ) + .unwrap_or(Vec3::zero()); + // Alter camera position to match player self.camera.set_focus_pos(player_pos);