From c78f496fca32ad370ca8e2bab2b70842167b0ca4 Mon Sep 17 00:00:00 2001 From: Joshua Yanovski Date: Fri, 9 Sep 2022 08:29:43 -0700 Subject: [PATCH] Reduce overhead of messaging systems. --- Cargo.lock | 1 + client/src/lib.rs | 6 +- network/benches/speed.rs | 6 +- network/examples/chat.rs | 26 +- network/examples/fileshare/server.rs | 64 ++-- network/examples/network-speed/main.rs | 6 +- network/src/api.rs | 77 ++--- network/tests/closing.rs | 74 ++-- network/tests/helper.rs | 4 +- network/tests/integration.rs | 40 +-- server/Cargo.toml | 1 + server/src/client.rs | 65 ++-- server/src/connection_handler.rs | 10 +- server/src/login_provider.rs | 18 +- server/src/sys/msg/character_screen.rs | 8 +- server/src/sys/msg/general.rs | 48 +-- server/src/sys/msg/mod.rs | 2 +- server/src/sys/msg/ping.rs | 77 +++-- server/src/sys/msg/register.rs | 459 +++++++++++++++++-------- server/src/sys/msg/terrain.rs | 10 +- server/src/sys/sentinel.rs | 20 +- 21 files changed, 602 insertions(+), 420 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 118e68ba02..05edd1c188 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6844,6 +6844,7 @@ dependencies = [ "lazy_static", "noise", "num_cpus", + "parking_lot 0.12.0", "portpicker", "prometheus", "prometheus-hyper", diff --git a/client/src/lib.rs b/client/src/lib.rs index d63c27add0..f9aed44a8d 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -293,7 +293,7 @@ impl Client { ) -> Result { let network = Network::new(Pid::new(), &runtime); - let participant = match addr { + let mut participant = match addr { ConnectionArgs::Tcp { hostname, prefer_ipv6, @@ -316,7 +316,7 @@ impl Client { }; let stream = participant.opened().await?; - let mut ping_stream = participant.opened().await?; + let ping_stream = participant.opened().await?; let mut register_stream = participant.opened().await?; let character_screen_stream = participant.opened().await?; let in_game_stream = participant.opened().await?; @@ -2514,7 +2514,7 @@ impl Client { } // ignore network events - while let Some(Ok(Some(event))) = self.participant.as_ref().map(|p| p.try_fetch_event()) { + while let Some(Ok(Some(event))) = self.participant.as_mut().map(|p| p.try_fetch_event()) { trace!(?event, "received network event"); } diff --git a/network/benches/speed.rs b/network/benches/speed.rs index d7f0f2b63c..ffc8de16d9 100644 --- a/network/benches/speed.rs +++ b/network/benches/speed.rs @@ -11,7 +11,7 @@ async fn stream_msg(s1_a: Arc>, s1_b: Arc>, data: &[ let mut s1_b = s1_b.lock().await; let m = Message::serialize(&data, s1_b.params()); std::thread::spawn(move || { - let mut s1_a = s1_a.try_lock().unwrap(); + let s1_a = s1_a.try_lock().unwrap(); for _ in 0..cnt { s1_a.send_raw(&m).unwrap(); } @@ -130,11 +130,11 @@ pub fn network_participant_stream( ) { let runtime = Runtime::new().unwrap(); let (n_a, p1_a, s1_a, n_b, p1_b, s1_b) = runtime.block_on(async { - let n_a = Network::new(Pid::fake(0), &runtime); + let mut n_a = Network::new(Pid::fake(0), &runtime); let n_b = Network::new(Pid::fake(1), &runtime); n_a.listen(addr.0).await.unwrap(); - let p1_b = n_b.connect(addr.1).await.unwrap(); + let mut p1_b = n_b.connect(addr.1).await.unwrap(); let p1_a = n_a.connected().await.unwrap(); let s1_a = p1_a.open(4, Promises::empty(), 0).await.unwrap(); diff --git a/network/examples/chat.rs b/network/examples/chat.rs index d52db59d3a..5f7675d90c 100644 --- a/network/examples/chat.rs +++ b/network/examples/chat.rs @@ -8,7 +8,7 @@ use std::{sync::Arc, thread, time::Duration}; use tokio::{io, io::AsyncBufReadExt, runtime::Runtime, sync::RwLock}; use tracing::*; use tracing_subscriber::EnvFilter; -use veloren_network::{ConnectAddr, ListenAddr, Network, Participant, Pid, Promises}; +use veloren_network::{ConnectAddr, ListenAddr, Network, Participant, Pid, Promises, Stream}; ///This example contains a simple chatserver, that allows to send messages /// between participants, it's neither pretty nor perfect, but it should show @@ -106,26 +106,20 @@ fn main() { fn server(address: ListenAddr) { let r = Arc::new(Runtime::new().unwrap()); - let server = Network::new(Pid::new(), &r); - let server = Arc::new(server); + let mut server = Network::new(Pid::new(), &r); let participants = Arc::new(RwLock::new(Vec::new())); r.block_on(async { server.listen(address).await.unwrap(); loop { - let p1 = Arc::new(server.connected().await.unwrap()); - let server1 = server.clone(); - participants.write().await.push(p1.clone()); - tokio::spawn(client_connection(server1, p1, participants.clone())); + let mut p1 = server.connected().await.unwrap(); + let s1 = p1.opened().await.unwrap(); + participants.write().await.push(p1); + tokio::spawn(client_connection(s1, participants.clone())); } }); } -async fn client_connection( - _network: Arc, - participant: Arc, - participants: Arc>>>, -) { - let mut s1 = participant.opened().await.unwrap(); +async fn client_connection(mut s1: Stream, participants: Arc>>) { let username = s1.recv::().await.unwrap(); println!("[{}] connected", username); loop { @@ -141,7 +135,7 @@ async fn client_connection( .await { Err(_) => info!("error talking to client, //TODO drop it"), - Ok(mut s) => s.send((username.clone(), msg.clone())).unwrap(), + Ok(s) => s.send((username.clone(), msg.clone())).unwrap(), }; } }, @@ -156,7 +150,7 @@ fn client(address: ConnectAddr) { r.block_on(async { let p1 = client.connect(address.clone()).await.unwrap(); //remote representation of p1 - let mut s1 = p1 + let s1 = p1 .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0) .await .unwrap(); //remote representation of s1 @@ -188,7 +182,7 @@ fn client(address: ConnectAddr) { // receiving i open and close a stream per message. this can be done easier but // this allows me to be quite lazy on the server side and just get a list of // all participants and send to them... -async fn read_messages(participant: Participant) { +async fn read_messages(mut participant: Participant) { while let Ok(mut s) = participant.opened().await { let (username, message) = s.recv::<(String, String)>().await.unwrap(); println!("[{}]: {}", username, message); diff --git a/network/examples/fileshare/server.rs b/network/examples/fileshare/server.rs index 4069b3ea6b..f6be103012 100644 --- a/network/examples/fileshare/server.rs +++ b/network/examples/fileshare/server.rs @@ -1,5 +1,5 @@ use crate::commands::{Command, FileInfo, LocalCommand, RemoteInfo}; -use futures_util::{FutureExt, StreamExt}; +use futures_util::StreamExt; use std::{collections::HashMap, path::PathBuf, sync::Arc}; use tokio::{ fs, join, @@ -15,49 +15,65 @@ struct ControlChannels { command_receiver: mpsc::UnboundedReceiver, } -pub struct Server { - run_channels: Option, - network: Network, +struct Shared { served: RwLock>, remotes: RwLock>>>, receiving_files: Mutex>>, } +pub struct Server { + run_channels: ControlChannels, + server: Network, + client: Network, + shared: Shared, +} + impl Server { pub fn new(runtime: Arc) -> (Self, mpsc::UnboundedSender) { let (command_sender, command_receiver) = mpsc::unbounded_channel(); - let network = Network::new(Pid::new(), &runtime); + let server = Network::new(Pid::new(), &runtime); + let client = Network::new(Pid::new(), &runtime); - let run_channels = Some(ControlChannels { command_receiver }); + let run_channels = ControlChannels { command_receiver }; ( Server { run_channels, - network, - served: RwLock::new(vec![]), - remotes: RwLock::new(HashMap::new()), - receiving_files: Mutex::new(HashMap::new()), + server, + client, + shared: Shared { + served: RwLock::new(vec![]), + remotes: RwLock::new(HashMap::new()), + receiving_files: Mutex::new(HashMap::new()), + }, }, command_sender, ) } - pub async fn run(mut self, address: ListenAddr) { - let run_channels = self.run_channels.take().unwrap(); + pub async fn run(self, address: ListenAddr) { + let run_channels = self.run_channels; - self.network.listen(address).await.unwrap(); + self.server.listen(address).await.unwrap(); join!( - self.command_manager(run_channels.command_receiver,), - self.connect_manager(), + self.shared + .command_manager(self.client, run_channels.command_receiver), + self.shared.connect_manager(self.server), ); } +} - async fn command_manager(&self, command_receiver: mpsc::UnboundedReceiver) { +impl Shared { + async fn command_manager( + &self, + client: Network, + command_receiver: mpsc::UnboundedReceiver, + ) { trace!("Start command_manager"); let command_receiver = UnboundedReceiverStream::new(command_receiver); command_receiver - .for_each_concurrent(None, async move |cmd| { + .for_each_concurrent(None, |cmd| async { match cmd { LocalCommand::Shutdown => println!("Shutting down service"), LocalCommand::Disconnect => { @@ -66,7 +82,7 @@ impl Server { }, LocalCommand::Connect(addr) => { println!("Trying to connect to: {:?}", &addr); - match self.network.connect(addr.clone()).await { + match client.connect(addr.clone()).await { Ok(p) => self.loop_participant(p).await, Err(e) => println!("Failed to connect to {:?}, err: {:?}", &addr, e), } @@ -89,7 +105,7 @@ impl Server { LocalCommand::Get(id, path) => { // i dont know the owner, just broadcast, i am laaaazyyy for ri in self.remotes.read().await.values() { - let mut ri = ri.lock().await; + let ri = ri.lock().await; if ri.get_info(id).is_some() { //found provider, send request. self.receiving_files.lock().await.insert(id, path.clone()); @@ -105,20 +121,20 @@ impl Server { trace!("Stop command_manager"); } - async fn connect_manager(&self) { + async fn connect_manager(&self, network: Network) { trace!("Start connect_manager"); - let iter = futures_util::stream::unfold((), |_| { - self.network.connected().map(|r| r.ok().map(|v| (v, ()))) + let iter = futures_util::stream::unfold(network, async move |mut network| { + network.connected().await.ok().map(|v| (v, network)) }); - iter.for_each_concurrent(/* limit */ None, async move |participant| { + iter.for_each_concurrent(/* limit */ None, |participant| async { self.loop_participant(participant).await; }) .await; trace!("Stop connect_manager"); } - async fn loop_participant(&self, p: Participant) { + async fn loop_participant(&self, mut p: Participant) { if let (Ok(cmd_out), Ok(file_out), Ok(cmd_in), Ok(file_in)) = ( p.open(3, Promises::ORDERED | Promises::CONSISTENCY, 0) .await, diff --git a/network/examples/network-speed/main.rs b/network/examples/network-speed/main.rs index 931989c2ac..8b7485172b 100644 --- a/network/examples/network-speed/main.rs +++ b/network/examples/network-speed/main.rs @@ -129,7 +129,7 @@ fn main() { fn server(address: ListenAddr, runtime: Arc) { let registry = Arc::new(Registry::new()); - let server = Network::new_with_registry(Pid::new(), &runtime, ®istry); + let mut server = Network::new_with_registry(Pid::new(), &runtime, ®istry); runtime.spawn(Server::run( Arc::clone(®istry), SocketAddr::from(([0; 4], 59112)), @@ -140,7 +140,7 @@ fn server(address: ListenAddr, runtime: Arc) { loop { info!("----"); info!("Waiting for participant to connect"); - let p1 = runtime.block_on(server.connected()).unwrap(); //remote representation of p1 + let mut p1 = runtime.block_on(server.connected()).unwrap(); //remote representation of p1 let mut s1 = runtime.block_on(p1.opened()).unwrap(); //remote representation of s1 runtime.block_on(async { let mut last = Instant::now(); @@ -169,7 +169,7 @@ fn client(address: ConnectAddr, runtime: Arc) { )); let p1 = runtime.block_on(client.connect(address)).unwrap(); //remote representation of p1 - let mut s1 = runtime + let s1 = runtime .block_on(p1.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)) .unwrap(); //remote representation of s1 let mut last = Instant::now(); diff --git a/network/src/api.rs b/network/src/api.rs index 46e5a8259e..2a1ee7c63f 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -67,9 +67,9 @@ pub enum ParticipantEvent { pub struct Participant { local_pid: Pid, remote_pid: Pid, - a2b_open_stream_s: Mutex>, - b2a_stream_opened_r: Mutex>, - b2a_event_r: Mutex>, + a2b_open_stream_s: mpsc::UnboundedSender, + b2a_stream_opened_r: mpsc::UnboundedReceiver, + b2a_event_r: mpsc::UnboundedReceiver, b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: A2sDisconnect, } @@ -195,9 +195,9 @@ pub struct StreamParams { pub struct Network { local_pid: Pid, participant_disconnect_sender: Arc>>, - listen_sender: Mutex>)>>, - connect_sender: Mutex>, - connected_receiver: Mutex>, + listen_sender: mpsc::UnboundedSender<(ListenAddr, oneshot::Sender>)>, + connect_sender: mpsc::UnboundedSender, + connected_receiver: mpsc::UnboundedReceiver, shutdown_network_s: Option>>, } @@ -300,9 +300,9 @@ impl Network { Self { local_pid: participant_id, participant_disconnect_sender, - listen_sender: Mutex::new(listen_sender), - connect_sender: Mutex::new(connect_sender), - connected_receiver: Mutex::new(connected_receiver), + listen_sender, + connect_sender, + connected_receiver, shutdown_network_s: Some(shutdown_network_s), } } @@ -342,10 +342,7 @@ impl Network { pub async fn listen(&self, address: ListenAddr) -> Result<(), NetworkError> { let (s2a_result_s, s2a_result_r) = oneshot::channel::>(); debug!(?address, "listening on address"); - self.listen_sender - .lock() - .await - .send((address, s2a_result_s))?; + self.listen_sender.send((address, s2a_result_s))?; match s2a_result_r.await? { //waiting guarantees that we either listened successfully or get an error like port in // use @@ -401,10 +398,7 @@ impl Network { let (pid_sender, pid_receiver) = oneshot::channel::>(); debug!(?address, "Connect to address"); - self.connect_sender - .lock() - .await - .send((address, pid_sender))?; + self.connect_sender.send((address, pid_sender))?; let participant = match pid_receiver.await? { Ok(p) => p, Err(e) => return Err(NetworkError::ConnectFailed(e)), @@ -454,11 +448,9 @@ impl Network { /// [`listen`]: crate::api::Network::listen /// [`ListenAddr`]: crate::api::ListenAddr #[instrument(name="network", skip(self), fields(p = %self.local_pid))] - pub async fn connected(&self) -> Result { + pub async fn connected(&mut self) -> Result { let participant = self .connected_receiver - .lock() - .await .recv() .await .ok_or(NetworkError::NetworkClosed)?; @@ -536,9 +528,9 @@ impl Participant { Self { local_pid, remote_pid, - a2b_open_stream_s: Mutex::new(a2b_open_stream_s), - b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r), - b2a_event_r: Mutex::new(b2a_event_r), + a2b_open_stream_s, + b2a_stream_opened_r, + b2a_event_r, b2a_bandwidth_stats_r, a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), } @@ -600,12 +592,10 @@ impl Participant { ) -> Result { debug_assert!(prio <= network_protocol::HIGHEST_PRIO, "invalid prio"); let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel::(); - if let Err(e) = self.a2b_open_stream_s.lock().await.send(( - prio, - promises, - bandwidth, - p2a_return_stream_s, - )) { + if let Err(e) = + self.a2b_open_stream_s + .send((prio, promises, bandwidth, p2a_return_stream_s)) + { debug!(?e, "bParticipant is already closed, notifying"); return Err(ParticipantError::ParticipantDisconnected); } @@ -657,8 +647,8 @@ impl Participant { /// [`connected`]: Network::connected /// [`open`]: Participant::open #[instrument(name="network", skip(self), fields(p = %self.local_pid))] - pub async fn opened(&self) -> Result { - match self.b2a_stream_opened_r.lock().await.recv().await { + pub async fn opened(&mut self) -> Result { + match self.b2a_stream_opened_r.recv().await { Some(stream) => { let sid = stream.sid; debug!(?sid, "Receive opened stream"); @@ -794,8 +784,8 @@ impl Participant { /// ``` /// /// [`ParticipantEvent`]: crate::api::ParticipantEvent - pub async fn fetch_event(&self) -> Result { - match self.b2a_event_r.lock().await.recv().await { + pub async fn fetch_event(&mut self) -> Result { + match self.b2a_event_r.recv().await { Some(event) => Ok(event), None => { debug!("event_receiver failed, closing participant"); @@ -811,16 +801,13 @@ impl Participant { /// /// [`ParticipantEvent`]: crate::api::ParticipantEvent /// [`fetch_event`]: Participant::fetch_event - pub fn try_fetch_event(&self) -> Result, ParticipantError> { - match &mut self.b2a_event_r.try_lock() { - Ok(b2a_event_r) => match b2a_event_r.try_recv() { - Ok(event) => Ok(Some(event)), - Err(mpsc::error::TryRecvError::Empty) => Ok(None), - Err(mpsc::error::TryRecvError::Disconnected) => { - Err(ParticipantError::ParticipantDisconnected) - }, + pub fn try_fetch_event(&mut self) -> Result, ParticipantError> { + match self.b2a_event_r.try_recv() { + Ok(event) => Ok(Some(event)), + Err(mpsc::error::TryRecvError::Empty) => Ok(None), + Err(mpsc::error::TryRecvError::Disconnected) => { + Err(ParticipantError::ParticipantDisconnected) }, - Err(_) => Ok(None), } } @@ -914,7 +901,7 @@ impl Stream { /// [`recv`]: Stream::recv /// [`Serialized`]: Serialize #[inline] - pub fn send(&mut self, msg: M) -> Result<(), StreamError> { + pub fn send(&self, msg: M) -> Result<(), StreamError> { self.send_raw_move(Message::serialize(&msg, self.params())) } @@ -966,7 +953,7 @@ impl Stream { /// [`compress`]: lz_fear::raw::compress2 /// [`Message::serialize`]: crate::message::Message::serialize #[inline] - pub fn send_raw(&mut self, message: &Message) -> Result<(), StreamError> { + pub fn send_raw(&self, message: &Message) -> Result<(), StreamError> { self.send_raw_move(Message { data: message.data.clone(), #[cfg(feature = "compression")] @@ -974,7 +961,7 @@ impl Stream { }) } - fn send_raw_move(&mut self, message: Message) -> Result<(), StreamError> { + fn send_raw_move(&self, message: Message) -> Result<(), StreamError> { if self.send_closed.load(Ordering::Relaxed) { return Err(StreamError::StreamClosed); } diff --git a/network/tests/closing.rs b/network/tests/closing.rs index 88eedc9439..c72870b78d 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -28,7 +28,7 @@ use helper::{network_participant_stream, tcp, SLEEP_EXTERNAL, SLEEP_INTERNAL}; #[test] fn close_network() { let (_, _) = helper::setup(false, 0); - let (r, _, _p1_a, mut s1_a, _, _p1_b, mut s1_b) = network_participant_stream(tcp()); + let (r, _, _p1_a, s1_a, _, _p1_b, mut s1_b) = network_participant_stream(tcp()); std::thread::sleep(SLEEP_INTERNAL); @@ -40,7 +40,7 @@ fn close_network() { #[test] fn close_participant() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = network_participant_stream(tcp()); + let (r, _n_a, p1_a, s1_a, _n_b, p1_b, mut s1_b) = network_participant_stream(tcp()); r.block_on(p1_a.disconnect()).unwrap(); r.block_on(p1_b.disconnect()).unwrap(); @@ -75,7 +75,7 @@ fn close_streams_in_block_on() { let (r, _n_a, _p_a, s1_a, _n_b, _p_b, s1_b) = network_participant_stream(tcp()); r.block_on(async { //make it locally so that they are dropped later - let mut s1_a = s1_a; + let s1_a = s1_a; let mut s1_b = s1_b; s1_a.send("ping").unwrap(); assert_eq!(s1_b.recv().await, Ok("ping".to_string())); @@ -87,7 +87,7 @@ fn close_streams_in_block_on() { #[test] fn stream_simple_3msg_then_close() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(1u8).unwrap(); s1_a.send(42).unwrap(); @@ -104,7 +104,7 @@ fn stream_simple_3msg_then_close() { fn stream_send_first_then_receive() { // recv should still be possible even if stream got closed if they are in queue let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(1u8).unwrap(); s1_a.send(42).unwrap(); @@ -120,7 +120,7 @@ fn stream_send_first_then_receive() { #[test] fn stream_send_1_then_close_stream() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send("this message must be received, even if stream is closed already!") .unwrap(); drop(s1_a); @@ -133,7 +133,7 @@ fn stream_send_1_then_close_stream() { #[test] fn stream_send_100000_then_close_stream() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); for _ in 0..100000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -151,7 +151,7 @@ fn stream_send_100000_then_close_stream() { #[test] fn stream_send_100000_then_close_stream_remote() { let (_, _) = helper::setup(false, 0); - let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..100000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -164,7 +164,7 @@ fn stream_send_100000_then_close_stream_remote() { #[test] fn stream_send_100000_then_close_stream_remote2() { let (_, _) = helper::setup(false, 0); - let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..100000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -178,7 +178,7 @@ fn stream_send_100000_then_close_stream_remote2() { #[test] fn stream_send_100000_then_close_stream_remote3() { let (_, _) = helper::setup(false, 0); - let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..100000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -192,7 +192,7 @@ fn stream_send_100000_then_close_stream_remote3() { #[test] fn close_part_then_network() { let (_, _) = helper::setup(false, 0); - let (_r, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + let (_r, n_a, p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..1000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -205,7 +205,7 @@ fn close_part_then_network() { #[test] fn close_network_then_part() { let (_, _) = helper::setup(false, 0); - let (_r, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + let (_r, n_a, p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..1000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -218,7 +218,7 @@ fn close_network_then_part() { #[test] fn close_network_then_disconnect_part() { let (_, _) = helper::setup(false, 0); - let (r, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + let (r, n_a, p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..1000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -231,7 +231,7 @@ fn close_network_then_disconnect_part() { #[test] fn close_runtime_then_network() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..100 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -244,7 +244,7 @@ fn close_runtime_then_network() { #[test] fn close_runtime_then_part() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..100 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -258,7 +258,7 @@ fn close_runtime_then_part() { #[test] fn close_network_from_async() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..100 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -271,7 +271,7 @@ fn close_network_from_async() { #[test] fn close_part_from_async() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + let (r, _n_a, p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..100 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -285,8 +285,8 @@ fn close_part_from_async() { #[test] fn opened_stream_before_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); - let mut s2_a = r.block_on(p_a.open(4, Promises::empty(), 0)).unwrap(); + let (r, _n_a, p_a, _, _n_b, mut p_b, _) = network_participant_stream(tcp()); + let s2_a = r.block_on(p_a.open(4, Promises::empty(), 0)).unwrap(); s2_a.send("HelloWorld").unwrap(); let mut s2_b = r.block_on(p_b.opened()).unwrap(); drop(p_a); @@ -298,8 +298,8 @@ fn opened_stream_before_remote_part_is_closed() { #[test] fn opened_stream_after_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); - let mut s2_a = r.block_on(p_a.open(3, Promises::empty(), 0)).unwrap(); + let (r, _n_a, p_a, _, _n_b, mut p_b, _) = network_participant_stream(tcp()); + let s2_a = r.block_on(p_a.open(3, Promises::empty(), 0)).unwrap(); s2_a.send("HelloWorld").unwrap(); drop(p_a); std::thread::sleep(SLEEP_EXTERNAL); @@ -315,8 +315,8 @@ fn opened_stream_after_remote_part_is_closed() { #[test] fn open_stream_after_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); - let mut s2_a = r.block_on(p_a.open(4, Promises::empty(), 0)).unwrap(); + let (r, _n_a, p_a, _, _n_b, mut p_b, _) = network_participant_stream(tcp()); + let s2_a = r.block_on(p_a.open(4, Promises::empty(), 0)).unwrap(); s2_a.send("HelloWorld").unwrap(); drop(p_a); std::thread::sleep(SLEEP_EXTERNAL); @@ -332,7 +332,7 @@ fn open_stream_after_remote_part_is_closed() { #[test] fn failed_stream_open_after_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); + let (r, _n_a, p_a, _, _n_b, mut p_b, _) = network_participant_stream(tcp()); drop(p_a); assert_eq!( r.block_on(p_b.opened()).unwrap_err(), @@ -345,14 +345,14 @@ fn failed_stream_open_after_remote_part_is_closed() { fn open_participant_before_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); let r = Arc::new(Runtime::new().unwrap()); - let n_a = Network::new(Pid::fake(0), &r); + let mut n_a = Network::new(Pid::fake(0), &r); let n_b = Network::new(Pid::fake(1), &r); let addr = tcp(); r.block_on(n_a.listen(addr.0)).unwrap(); let p_b = r.block_on(n_b.connect(addr.1)).unwrap(); - let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap(); + let s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap(); s1_b.send("HelloWorld").unwrap(); - let p_a = r.block_on(n_a.connected()).unwrap(); + let mut p_a = r.block_on(n_a.connected()).unwrap(); drop(s1_b); drop(p_b); drop(n_b); @@ -365,18 +365,18 @@ fn open_participant_before_remote_part_is_closed() { fn open_participant_after_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); let r = Arc::new(Runtime::new().unwrap()); - let n_a = Network::new(Pid::fake(0), &r); + let mut n_a = Network::new(Pid::fake(0), &r); let n_b = Network::new(Pid::fake(1), &r); let addr = tcp(); r.block_on(n_a.listen(addr.0)).unwrap(); let p_b = r.block_on(n_b.connect(addr.1)).unwrap(); - let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap(); + let s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap(); s1_b.send("HelloWorld").unwrap(); drop(s1_b); drop(p_b); drop(n_b); std::thread::sleep(SLEEP_EXTERNAL); - let p_a = r.block_on(n_a.connected()).unwrap(); + let mut p_a = r.block_on(n_a.connected()).unwrap(); let mut s1_a = r.block_on(p_a.opened()).unwrap(); assert_eq!(r.block_on(s1_a.recv()), Ok("HelloWorld".to_string())); } @@ -385,19 +385,19 @@ fn open_participant_after_remote_part_is_closed() { fn close_network_scheduler_completely() { let (_, _) = helper::setup(false, 0); let r = Arc::new(Runtime::new().unwrap()); - let n_a = Network::new(Pid::fake(0), &r); + let mut n_a = Network::new(Pid::fake(0), &r); let n_b = Network::new(Pid::fake(1), &r); let addr = tcp(); r.block_on(n_a.listen(addr.0)).unwrap(); - let p_b = r.block_on(n_b.connect(addr.1)).unwrap(); + let mut p_b = r.block_on(n_b.connect(addr.1)).unwrap(); assert_matches!( r.block_on(p_b.fetch_event()), Ok(ParticipantEvent::ChannelCreated(_)) ); - let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap(); + let s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap(); s1_b.send("HelloWorld").unwrap(); - let p_a = r.block_on(n_a.connected()).unwrap(); + let mut p_a = r.block_on(n_a.connected()).unwrap(); assert_matches!( r.block_on(p_a.fetch_event()), Ok(ParticipantEvent::ChannelCreated(_)) @@ -429,7 +429,7 @@ fn close_network_scheduler_completely() { #[test] fn dont_panic_on_multiply_recv_after_close() { let (_, _) = helper::setup(false, 0); - let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(11u32).unwrap(); drop(s1_a); @@ -444,7 +444,7 @@ fn dont_panic_on_multiply_recv_after_close() { #[test] fn dont_panic_on_recv_send_after_close() { let (_, _) = helper::setup(false, 0); - let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(11u32).unwrap(); drop(s1_a); @@ -457,7 +457,7 @@ fn dont_panic_on_recv_send_after_close() { #[test] fn dont_panic_on_multiple_send_after_close() { let (_, _) = helper::setup(false, 0); - let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(11u32).unwrap(); drop(s1_a); diff --git a/network/tests/helper.rs b/network/tests/helper.rs index acea2434a5..dc3498dd0c 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -67,11 +67,11 @@ pub fn network_participant_stream( ) { let runtime = Arc::new(Runtime::new().unwrap()); let (n_a, p1_a, s1_a, n_b, p1_b, s1_b) = runtime.block_on(async { - let n_a = Network::new(Pid::fake(0), &runtime); + let mut n_a = Network::new(Pid::fake(0), &runtime); let n_b = Network::new(Pid::fake(1), &runtime); n_a.listen(addr.0).await.unwrap(); - let p1_b = n_b.connect(addr.1).await.unwrap(); + let mut p1_b = n_b.connect(addr.1).await.unwrap(); let p1_a = n_a.connected().await.unwrap(); let s1_a = p1_a.open(4, Promises::ORDERED, 0).await.unwrap(); diff --git a/network/tests/integration.rs b/network/tests/integration.rs index 7571ff483e..d95f6ed33c 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -10,7 +10,7 @@ use veloren_network::{ConnectAddr, ListenAddr, Network, ParticipantEvent, Pid, P #[test] fn stream_simple() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send("Hello World").unwrap(); assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); @@ -20,7 +20,7 @@ fn stream_simple() { #[test] fn stream_try_recv() { let (_, _) = helper::setup(false, 0); - let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(4242u32).unwrap(); std::thread::sleep(SLEEP_EXTERNAL); @@ -31,7 +31,7 @@ fn stream_try_recv() { #[test] fn stream_simple_3msg() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send("Hello World").unwrap(); s1_a.send(1337).unwrap(); @@ -45,7 +45,7 @@ fn stream_simple_3msg() { #[test] fn stream_simple_mpsc() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(mpsc()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(mpsc()); s1_a.send("Hello World").unwrap(); assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); @@ -55,7 +55,7 @@ fn stream_simple_mpsc() { #[test] fn stream_simple_mpsc_3msg() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(mpsc()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(mpsc()); s1_a.send("Hello World").unwrap(); s1_a.send(1337).unwrap(); @@ -69,7 +69,7 @@ fn stream_simple_mpsc_3msg() { #[test] fn stream_simple_quic() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(quic()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(quic()); s1_a.send("Hello World").unwrap(); assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); @@ -79,7 +79,7 @@ fn stream_simple_quic() { #[test] fn stream_simple_quic_3msg() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(quic()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(quic()); s1_a.send("Hello World").unwrap(); s1_a.send(1337).unwrap(); @@ -94,7 +94,7 @@ fn stream_simple_quic_3msg() { #[ignore] fn stream_simple_udp() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp()); s1_a.send("Hello World").unwrap(); assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); @@ -105,7 +105,7 @@ fn stream_simple_udp() { #[ignore] fn stream_simple_udp_3msg() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp()); s1_a.send("Hello World").unwrap(); s1_a.send(1337).unwrap(); @@ -184,7 +184,7 @@ fn api_stream_send_main() -> Result<(), Box> { let network = Network::new(Pid::new(), &r); let remote = Network::new(Pid::new(), &r); r.block_on(async { - let network = network; + let mut network = network; let remote = remote; network .listen(ListenAddr::Tcp("127.0.0.1:1200".parse().unwrap())) @@ -196,8 +196,8 @@ fn api_stream_send_main() -> Result<(), Box> { let _stream_p = remote_p .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0) .await?; - let participant_a = network.connected().await?; - let mut stream_a = participant_a.opened().await?; + let mut participant_a = network.connected().await?; + let stream_a = participant_a.opened().await?; //Send Message stream_a.send("Hello World")?; Ok(()) @@ -213,7 +213,7 @@ fn api_stream_recv_main() -> Result<(), Box> { let network = Network::new(Pid::new(), &r); let remote = Network::new(Pid::new(), &r); r.block_on(async { - let network = network; + let mut network = network; let remote = remote; network .listen(ListenAddr::Tcp("127.0.0.1:1220".parse().unwrap())) @@ -221,11 +221,11 @@ fn api_stream_recv_main() -> Result<(), Box> { let remote_p = remote .connect(ConnectAddr::Tcp("127.0.0.1:1220".parse().unwrap())) .await?; - let mut stream_p = remote_p + let stream_p = remote_p .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0) .await?; stream_p.send("Hello World")?; - let participant_a = network.connected().await?; + let mut participant_a = network.connected().await?; let mut stream_a = participant_a.opened().await?; //Send Message assert_eq!("Hello World".to_string(), stream_a.recv::().await?); @@ -236,7 +236,7 @@ fn api_stream_recv_main() -> Result<(), Box> { #[test] fn wrong_parse() { let (_, _) = helper::setup(false, 0); - let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(1337).unwrap(); match r.block_on(s1_b.recv::()) { @@ -249,7 +249,7 @@ fn wrong_parse() { #[test] fn multiple_try_recv() { let (_, _) = helper::setup(false, 0); - let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send("asd").unwrap(); s1_a.send(11u32).unwrap(); @@ -295,9 +295,9 @@ fn listen_on_ipv6_doesnt_block_ipv4() { ))), ); - let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcpv6); + let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcpv6); std::thread::sleep(SLEEP_EXTERNAL); - let (_r2, _n_a2, _p_a2, mut s1_a2, _n_b2, _p_b2, mut s1_b2) = network_participant_stream(tcpv4); + let (_r2, _n_a2, _p_a2, s1_a2, _n_b2, _p_b2, mut s1_b2) = network_participant_stream(tcpv4); s1_a.send(42u32).unwrap(); s1_a2.send(1337u32).unwrap(); @@ -313,7 +313,7 @@ fn listen_on_ipv6_doesnt_block_ipv4() { fn check_correct_channel_events() { let (_, _) = helper::setup(false, 0); let con_addr = tcp(); - let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(con_addr.clone()); + let (r, _n_a, mut p_a, _, _n_b, mut p_b, _) = network_participant_stream(con_addr.clone()); let event_a = r.block_on(p_a.fetch_event()).unwrap(); let event_b = r.block_on(p_b.fetch_event()).unwrap(); diff --git a/server/Cargo.toml b/server/Cargo.toml index 03bf506937..c766b06aa7 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -50,6 +50,7 @@ serde = { version = "1.0.110", features = ["derive"] } serde_json = "1.0.50" rand = { version = "0.8", features = ["small_rng"] } hashbrown = { version = "0.12", features = ["rayon", "serde", "nightly"] } +parking_lot = { version = "0.12" } rayon = "1.5" crossbeam-channel = "0.5" prometheus = { version = "0.13", default-features = false} diff --git a/server/src/client.rs b/server/src/client.rs index fc679ac010..f4fbc073f7 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -3,7 +3,7 @@ use network::{Message, Participant, Stream, StreamError, StreamParams}; use serde::{de::DeserializeOwned, Serialize}; use specs::Component; use specs_idvs::IdvStorage; -use std::sync::{atomic::AtomicBool, Mutex}; +use std::sync::atomic::AtomicBool; /// Client handles ALL network related information of everything that connects /// to the server Client DOES NOT handle game states @@ -14,17 +14,18 @@ use std::sync::{atomic::AtomicBool, Mutex}; pub struct Client { pub client_type: ClientType, pub participant: Option, - pub last_ping: Mutex, + pub last_ping: f64, pub login_msg_sent: AtomicBool, - //TODO: improve network crate so that `send` is no longer `&mut self` and we can get rid of - // this Mutex. This Mutex is just to please the compiler as we do not get into contention - general_stream: Mutex, - ping_stream: Mutex, - register_stream: Mutex, - character_screen_stream: Mutex, - in_game_stream: Mutex, - terrain_stream: Mutex, + //TODO: Consider splitting each of these out into their own components so all the message + //processing systems can run in parallel with each other (though it may turn out not to + //matter that much). + general_stream: Stream, + ping_stream: Stream, + register_stream: Stream, + character_screen_stream: Stream, + in_game_stream: Stream, + terrain_stream: Stream, general_stream_params: StreamParams, ping_stream_params: StreamParams, @@ -64,14 +65,14 @@ impl Client { Client { client_type, participant: Some(participant), - last_ping: Mutex::new(last_ping), + last_ping, login_msg_sent: AtomicBool::new(false), - general_stream: Mutex::new(general_stream), - ping_stream: Mutex::new(ping_stream), - register_stream: Mutex::new(register_stream), - character_screen_stream: Mutex::new(character_screen_stream), - in_game_stream: Mutex::new(in_game_stream), - terrain_stream: Mutex::new(terrain_stream), + general_stream, + ping_stream, + register_stream, + character_screen_stream, + in_game_stream, + terrain_stream, general_stream_params, ping_stream_params, register_stream_params, @@ -145,16 +146,12 @@ impl Client { pub(crate) fn send_prepared(&self, msg: &PreparedMsg) -> Result<(), StreamError> { match msg.stream_id { - 0 => self.register_stream.lock().unwrap().send_raw(&msg.message), - 1 => self - .character_screen_stream - .lock() - .unwrap() - .send_raw(&msg.message), - 2 => self.in_game_stream.lock().unwrap().send_raw(&msg.message), - 3 => self.general_stream.lock().unwrap().send_raw(&msg.message), - 4 => self.ping_stream.lock().unwrap().send_raw(&msg.message), - 5 => self.terrain_stream.lock().unwrap().send_raw(&msg.message), + 0 => self.register_stream.send_raw(&msg.message), + 1 => self.character_screen_stream.send_raw(&msg.message), + 2 => self.in_game_stream.send_raw(&msg.message), + 3 => self.general_stream.send_raw(&msg.message), + 4 => self.ping_stream.send_raw(&msg.message), + 5 => self.terrain_stream.send_raw(&msg.message), _ => unreachable!("invalid stream id"), } } @@ -236,17 +233,17 @@ impl Client { } pub(crate) fn recv( - &self, + &mut self, stream_id: u8, ) -> Result, StreamError> { // TODO: are two systems using the same stream?? why is there contention here? match stream_id { - 0 => self.register_stream.lock().unwrap().try_recv(), - 1 => self.character_screen_stream.lock().unwrap().try_recv(), - 2 => self.in_game_stream.lock().unwrap().try_recv(), - 3 => self.general_stream.lock().unwrap().try_recv(), - 4 => self.ping_stream.lock().unwrap().try_recv(), - 5 => self.terrain_stream.lock().unwrap().try_recv(), + 0 => self.register_stream.try_recv(), + 1 => self.character_screen_stream.try_recv(), + 2 => self.in_game_stream.try_recv(), + 3 => self.general_stream.try_recv(), + 4 => self.ping_stream.try_recv(), + 5 => self.terrain_stream.try_recv(), _ => unreachable!("invalid stream id"), } } diff --git a/server/src/connection_handler.rs b/server/src/connection_handler.rs index e2f9aa368c..8efb2f9d06 100644 --- a/server/src/connection_handler.rs +++ b/server/src/connection_handler.rs @@ -2,7 +2,7 @@ use crate::{Client, ClientType, ServerInfo}; use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; use futures_util::future::FutureExt; use network::{Network, Participant, Promises}; -use std::{sync::Arc, time::Duration}; +use std::time::Duration; use tokio::{runtime::Runtime, select, sync::oneshot}; use tracing::{debug, error, trace, warn}; @@ -14,7 +14,6 @@ pub(crate) struct ServerInfoPacket { pub(crate) type IncomingClient = Client; pub(crate) struct ConnectionHandler { - _network: Arc, thread_handle: Option>, pub client_receiver: Receiver, pub info_requester_receiver: Receiver>, @@ -27,8 +26,6 @@ pub(crate) struct ConnectionHandler { /// and time impl ConnectionHandler { pub fn new(network: Network, runtime: &Runtime) -> Self { - let network = Arc::new(network); - let network_clone = Arc::clone(&network); let (stop_sender, stop_receiver) = oneshot::channel(); let (client_sender, client_receiver) = unbounded::(); @@ -36,14 +33,13 @@ impl ConnectionHandler { bounded::>(1); let thread_handle = Some(runtime.spawn(Self::work( - network_clone, + network, client_sender, info_requester_sender, stop_receiver, ))); Self { - _network: network, thread_handle, client_receiver, info_requester_receiver, @@ -52,7 +48,7 @@ impl ConnectionHandler { } async fn work( - network: Arc, + mut network: Network, client_sender: Sender, info_requester_sender: Sender>, stop_receiver: oneshot::Receiver<()>, diff --git a/server/src/login_provider.rs b/server/src/login_provider.rs index b66d1481b1..ae56176206 100644 --- a/server/src/login_provider.rs +++ b/server/src/login_provider.rs @@ -98,15 +98,15 @@ impl LoginProvider { PendingLogin { pending_r } } - pub fn login( - &mut self, + pub(crate) fn login( pending: &mut PendingLogin, #[cfg(feature = "plugins")] world: &EcsWorld, #[cfg(feature = "plugins")] plugin_manager: &PluginMgr, admins: &HashMap, whitelist: &HashMap, banlist: &HashMap, - ) -> Option> { + player_count_exceeded: impl FnOnce(String, Uuid) -> (bool, R), + ) -> Option> { match pending.pending_r.try_recv() { Ok(Err(e)) => Some(Err(e)), Ok(Ok((username, uuid))) => { @@ -142,6 +142,9 @@ impl LoginProvider { { // Plugin player join hooks execute for all players, but are only allowed to // filter non-admins. + // + // We also run it before checking player count, to avoid lock contention in the + // plugin. match plugin_manager.execute_event(world, &PlayerJoinEvent { player_name: username.clone(), player_id: *uuid.as_bytes(), @@ -161,8 +164,13 @@ impl LoginProvider { }; } - info!(?username, "New User"); - Some(Ok((username, uuid))) + // non-admins can only join if the player count has not been exceeded. + let (player_count_exceeded, res) = player_count_exceeded(username, uuid); + if admin.is_none() && player_count_exceeded { + return Some(Err(RegisterError::TooManyPlayers)); + } + + Some(Ok(res)) }, Err(oneshot::error::TryRecvError::Closed) => { error!("channel got closed to early, this shouldn't happen"); diff --git a/server/src/sys/msg/character_screen.rs b/server/src/sys/msg/character_screen.rs index 296abb3da0..bf69cab24c 100644 --- a/server/src/sys/msg/character_screen.rs +++ b/server/src/sys/msg/character_screen.rs @@ -13,7 +13,7 @@ use common::{ }; use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{ClientGeneral, ServerGeneral}; -use specs::{Entities, Join, Read, ReadExpect, ReadStorage, WriteExpect}; +use specs::{Entities, Join, Read, ReadExpect, ReadStorage, WriteExpect, WriteStorage}; use std::sync::atomic::Ordering; use tracing::{debug, warn}; @@ -197,7 +197,7 @@ impl<'a> System<'a> for Sys { ReadExpect<'a, CharacterLoader>, WriteExpect<'a, CharacterUpdater>, ReadStorage<'a, Uid>, - ReadStorage<'a, Client>, + WriteStorage<'a, Client>, ReadStorage<'a, Player>, ReadStorage<'a, Presence>, ReadExpect<'a, EditableSettings>, @@ -216,7 +216,7 @@ impl<'a> System<'a> for Sys { character_loader, mut character_updater, uids, - clients, + mut clients, players, presences, editable_settings, @@ -225,7 +225,7 @@ impl<'a> System<'a> for Sys { ) { let mut server_emitter = server_event_bus.emitter(); - for (entity, client) in (&entities, &clients).join() { + for (entity, client) in (&entities, &mut clients).join() { let _ = super::try_recv_all(client, 1, |client, msg| { Self::handle_client_character_screen_msg( &mut server_emitter, diff --git a/server/src/sys/msg/general.rs b/server/src/sys/msg/general.rs index 0527da7dcf..0cd49d3d8e 100644 --- a/server/src/sys/msg/general.rs +++ b/server/src/sys/msg/general.rs @@ -9,7 +9,8 @@ use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::{ validate_chat_msg, ChatMsgValidationError, ClientGeneral, MAX_BYTES_CHAT_MSG, }; -use specs::{Entities, Join, Read, ReadStorage}; +use rayon::prelude::*; +use specs::{Entities, Join, ParJoin, Read, ReadStorage, WriteStorage}; use tracing::{debug, error, warn}; impl Sys { @@ -80,7 +81,7 @@ impl<'a> System<'a> for Sys { ReadStorage<'a, Uid>, ReadStorage<'a, ChatMode>, ReadStorage<'a, Player>, - ReadStorage<'a, Client>, + WriteStorage<'a, Client>, ); const NAME: &'static str = "msg::general"; @@ -89,27 +90,30 @@ impl<'a> System<'a> for Sys { fn run( _job: &mut Job, - (entities, server_event_bus, time, uids, chat_modes, players, clients): Self::SystemData, + (entities, server_event_bus, time, uids, chat_modes, players, mut clients): Self::SystemData, ) { - let mut server_emitter = server_event_bus.emitter(); + (&entities, &mut clients, players.maybe()) + .par_join() + .for_each_init( + || server_event_bus.emitter(), + |server_emitter, (entity, client, player)| { + let res = super::try_recv_all(client, 3, |client, msg| { + Self::handle_general_msg( + server_emitter, + entity, + client, + player, + &uids, + &chat_modes, + msg, + ) + }); - for (entity, client, player) in (&entities, &clients, (&players).maybe()).join() { - let res = super::try_recv_all(client, 3, |client, msg| { - Self::handle_general_msg( - &mut server_emitter, - entity, - client, - player, - &uids, - &chat_modes, - msg, - ) - }); - - if let Ok(1_u64..=u64::MAX) = res { - // Update client ping. - *client.last_ping.lock().unwrap() = time.0 - } - } + if let Ok(1_u64..=u64::MAX) = res { + // Update client ping. + client.last_ping = time.0 + } + }, + ); } } diff --git a/server/src/sys/msg/mod.rs b/server/src/sys/msg/mod.rs index 25ef7d89cd..a76691f384 100644 --- a/server/src/sys/msg/mod.rs +++ b/server/src/sys/msg/mod.rs @@ -35,7 +35,7 @@ pub fn add_server_systems(dispatch_builder: &mut DispatcherBuilder) { /// handles all send msg and calls a handle fn /// Aborts when a error occurred returns cnt of successful msg otherwise pub(crate) fn try_recv_all( - client: &Client, + client: &mut Client, stream_id: u8, mut f: F, ) -> Result diff --git a/server/src/sys/msg/ping.rs b/server/src/sys/msg/ping.rs index 8224133d3e..cf56910326 100644 --- a/server/src/sys/msg/ping.rs +++ b/server/src/sys/msg/ping.rs @@ -5,7 +5,8 @@ use common::{ }; use common_ecs::{Job, Origin, Phase, System}; use common_net::msg::PingMsg; -use specs::{Entities, Join, Read, ReadStorage}; +use rayon::prelude::*; +use specs::{Entities, ParJoin, Read, WriteStorage}; use tracing::{debug, info}; impl Sys { @@ -26,7 +27,7 @@ impl<'a> System<'a> for Sys { Entities<'a>, Read<'a, EventBus>, Read<'a, Time>, - ReadStorage<'a, Client>, + WriteStorage<'a, Client>, Read<'a, Settings>, ); @@ -36,45 +37,49 @@ impl<'a> System<'a> for Sys { fn run( _job: &mut Job, - (entities, server_event_bus, time, clients, settings): Self::SystemData, + (entities, server_event_bus, time, mut clients, settings): Self::SystemData, ) { - let mut server_emitter = server_event_bus.emitter(); + (&entities, &mut clients).par_join().for_each_init( + || server_event_bus.emitter(), + |server_emitter, (entity, client)| { + // ignore network events + while let Some(Ok(Some(_))) = + client.participant.as_mut().map(|p| p.try_fetch_event()) + {} - for (entity, client) in (&entities, &clients).join() { - // ignore network events - while let Some(Ok(Some(_))) = client.participant.as_ref().map(|p| p.try_fetch_event()) { - } + let res = super::try_recv_all(client, 4, Self::handle_ping_msg); - let res = super::try_recv_all(client, 4, Self::handle_ping_msg); - - match res { - Err(e) => { - debug!(?entity, ?e, "network error with client, disconnecting"); - server_emitter.emit(ServerEvent::ClientDisconnect( - entity, - common::comp::DisconnectReason::NetworkError, - )); - }, - Ok(1_u64..=u64::MAX) => { - // Update client ping. - *client.last_ping.lock().unwrap() = time.0 - }, - Ok(0) => { - let last_ping: f64 = *client.last_ping.lock().unwrap(); - if time.0 - last_ping > settings.client_timeout.as_secs() as f64 - // Timeout - { - info!(?entity, "timeout error with client, disconnecting"); + match res { + Err(e) => { + debug!(?entity, ?e, "network error with client, disconnecting"); server_emitter.emit(ServerEvent::ClientDisconnect( entity, - common::comp::DisconnectReason::Timeout, + common::comp::DisconnectReason::NetworkError, )); - } else if time.0 - last_ping > settings.client_timeout.as_secs() as f64 * 0.5 { - // Try pinging the client if the timeout is nearing. - client.send_fallible(PingMsg::Ping); - } - }, - } - } + }, + Ok(1_u64..=u64::MAX) => { + // Update client ping. + client.last_ping = time.0 + }, + Ok(0) => { + let last_ping: f64 = client.last_ping; + if time.0 - last_ping > settings.client_timeout.as_secs() as f64 + // Timeout + { + info!(?entity, "timeout error with client, disconnecting"); + server_emitter.emit(ServerEvent::ClientDisconnect( + entity, + common::comp::DisconnectReason::Timeout, + )); + } else if time.0 - last_ping + > settings.client_timeout.as_secs() as f64 * 0.5 + { + // Try pinging the client if the timeout is nearing. + client.send_fallible(PingMsg::Ping); + } + }, + } + }, + ); } } diff --git a/server/src/sys/msg/register.rs b/server/src/sys/msg/register.rs index 8d751b094f..c9ea7e7a46 100644 --- a/server/src/sys/msg/register.rs +++ b/server/src/sys/msg/register.rs @@ -14,13 +14,14 @@ use common_net::msg::{ CharacterInfo, ClientRegister, DisconnectReason, PlayerInfo, PlayerListUpdate, RegisterError, ServerGeneral, }; -use hashbrown::HashMap; +use hashbrown::{hash_map, HashMap}; use plugin_api::Health; +use rayon::prelude::*; use specs::{ - shred::ResourceId, storage::StorageEntry, Entities, Join, Read, ReadExpect, ReadStorage, - SystemData, World, WriteExpect, WriteStorage, + shred::ResourceId, Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, SystemData, World, + WriteStorage, }; -use tracing::trace; +use tracing::{debug, info, trace, warn}; #[cfg(feature = "plugins")] use {common_state::plugin::memory_manager::EcsWorld, common_state::plugin::PluginMgr}; @@ -35,8 +36,8 @@ pub struct ReadData<'a> { entities: Entities<'a>, stats: ReadStorage<'a, Stats>, uids: ReadStorage<'a, Uid>, - clients: ReadStorage<'a, Client>, server_event_bus: Read<'a, EventBus>, + login_provider: ReadExpect<'a, LoginProvider>, player_metrics: ReadExpect<'a, PlayerMetrics>, settings: ReadExpect<'a, Settings>, editable_settings: ReadExpect<'a, EditableSettings>, @@ -51,10 +52,10 @@ pub struct Sys; impl<'a> System<'a> for Sys { type SystemData = ( ReadData<'a>, + WriteStorage<'a, Client>, WriteStorage<'a, Player>, WriteStorage<'a, Admin>, WriteStorage<'a, PendingLogin>, - WriteExpect<'a, LoginProvider>, ); const NAME: &'static str = "msg::register"; @@ -63,189 +64,345 @@ impl<'a> System<'a> for Sys { fn run( _job: &mut Job, - ( - read_data, - mut players, - mut admins, - mut pending_logins, - mut login_provider, - ): Self::SystemData, + (read_data, mut clients, mut players, mut admins, mut pending_logins): Self::SystemData, ) { - let mut server_emitter = read_data.server_event_bus.emitter(); - // Player list to send new players. - let player_list = ( + // Player list to send new players, and lookup from UUID to entity (so we don't + // have to do a linear scan over all entities on each login to see if + // it's a duplicate). + // + // NOTE: For this to work as desired, we must maintain the invariant that there + // is just one player per UUID! + let (player_list, old_players_by_uuid): (HashMap<_, _>, HashMap<_, _>) = ( + &read_data.entities, &read_data.uids, &players, read_data.stats.maybe(), admins.maybe(), ) .join() - .map(|(uid, player, stats, admin)| { - (*uid, PlayerInfo { - is_online: true, - is_moderator: admin.is_some(), - player_alias: player.alias.clone(), - character: stats.map(|stats| CharacterInfo { - name: stats.name.clone(), + .map(|(entity, uid, player, stats, admin)| { + ( + (*uid, PlayerInfo { + is_online: true, + is_moderator: admin.is_some(), + player_alias: player.alias.clone(), + character: stats.map(|stats| CharacterInfo { + name: stats.name.clone(), + }), + uuid: player.uuid(), }), - uuid: player.uuid(), - }) + (player.uuid(), entity), + ) }) - .collect::>(); + .unzip(); + let max_players = read_data.settings.max_players; + let capacity = read_data.settings.max_players * 2; // List of new players to update player lists of all clients. - let mut new_players = Vec::new(); + // + // Big enough that we hopefully won't have to reallocate. + // + // Also includes a list of logins to retry, since we happen to update those + // around the same time that we update the new players list. + // + // NOTE: stdlib mutex is more than good enough on Linux and (probably) Windows, + // but not Mac. + let new_players = parking_lot::Mutex::new(( + HashMap::<_, (_, _, _, _)>::with_capacity(capacity), + Vec::with_capacity(capacity), + )); // defer auth lockup - for (entity, client) in (&read_data.entities, &read_data.clients).join() { + for (entity, client) in (&read_data.entities, &mut clients).join() { let _ = super::try_recv_all(client, 0, |_, msg: ClientRegister| { trace!(?msg.token_or_username, "defer auth lockup"); - let pending = login_provider.verify(&msg.token_or_username); + let pending = read_data.login_provider.verify(&msg.token_or_username); let _ = pending_logins.insert(entity, pending); Ok(()) }); } - let mut finished_pending = vec![]; - let mut retries = vec![]; - for (entity, client, pending) in - (&read_data.entities, &read_data.clients, &mut pending_logins).join() - { - if let Err(e) = || -> Result<(), crate::error::Error> { - #[cfg(feature = "plugins")] - let ecs_world = EcsWorld { - entities: &read_data.entities, - health: (&read_data._healths).into(), - uid: (&read_data.uids).into(), - player: (&players).into(), - uid_allocator: &read_data._uid_allocator, - }; + let old_player_count = player_list.len(); + #[cfg(feature = "plugins")] + let ecs_world = EcsWorld { + entities: &read_data.entities, + health: (&read_data._healths).into(), + uid: (&read_data.uids).into(), + // NOTE: Only the old player list is provided, to avoid scalability + // bottlenecks. + player: (&players).into(), + uid_allocator: &read_data._uid_allocator, + }; - let (username, uuid) = match login_provider.login( - pending, - #[cfg(feature = "plugins")] - &ecs_world, - #[cfg(feature = "plugins")] - &read_data._plugin_mgr, - &*read_data.editable_settings.admins, - &*read_data.editable_settings.whitelist, - &*read_data.editable_settings.banlist, - ) { - None => return Ok(()), - Some(r) => { - finished_pending.push(entity); - trace!(?r, "pending login returned"); - match r { - Err(e) => { + /// Trivially cloneable wrapper over any type, to make rayon happy. + struct OptionClone(Option); + + impl Clone for OptionClone { + fn clone(&self) -> Self { Self(None) } + } + + // NOTE: this is just default value. + // + // It will be overwritten in ServerExt::update_character_data. + let battle_mode = read_data.settings.gameplay.battle_mode.default_mode(); + + let finished_pending = ( + &read_data.entities, + &read_data.uids, + &clients, + !players.mask(), + &mut pending_logins, + ) + .par_join() + .fold_with( + // (Finished pending entity list, emitter) + (vec![], OptionClone(None)), + |(mut finished_pending, mut server_emitter_), (entity, uid, client, _, pending)| { + let server_emitter = server_emitter_ + .0 + .get_or_insert_with(|| read_data.server_event_bus.emitter()); + if let Err(e) = || -> Result<(), crate::error::Error> { + // Destructure new_players_guard last so it gets dropped before the other + // three. + let ( + (pending_login, player, admin, msg, old_player), + mut new_players_guard, + ) = match LoginProvider::login( + pending, + #[cfg(feature = "plugins")] + &ecs_world, + #[cfg(feature = "plugins")] + &read_data._plugin_mgr, + &*read_data.editable_settings.admins, + &*read_data.editable_settings.whitelist, + &*read_data.editable_settings.banlist, + |username, uuid| { + // We construct a few things outside the lock to reduce contention. + let pending_login = + PendingLogin::new_success(username.clone(), uuid); + let player = Player::new(username, battle_mode, uuid, None); + let admin = read_data.editable_settings.admins.get(&uuid); + let msg = player + .is_valid() + .then_some(PlayerInfo { + player_alias: player.alias.clone(), + is_online: true, + is_moderator: admin.is_some(), + character: None, // new players will be on character select. + uuid: player.uuid(), + }) + .map(|player_info| { + // Prepare the player list update to be sent to all clients. + client.prepare(ServerGeneral::PlayerListUpdate( + PlayerListUpdate::Add(*uid, player_info), + )) + }); + // Check if this player was already logged in before the system + // started. + let old_player = old_players_by_uuid + .get(&uuid) + .copied() + // We perform the get outside the lock here, since we can't 100% + // know that this player has a client entry. + .map(|entity| (entity, Some(clients.get(entity)))); + // We take the lock only when necessary, and for a short duration, + // to avoid contention with other + // threads. We need to hold the guard past the end of + // the login function because otherwise there's a race between when + // we read it and when we + // (potentially) write to it. + let guard = new_players.lock(); + // Guard comes first in the tuple so it's dropped before the other + // stuff if login returns an error. + ( + old_player_count + guard.0.len() >= max_players, + (guard, (pending_login, player, admin, msg, old_player)), + ) + }, + ) { + None => return Ok(()), + Some(r) => { + finished_pending.push(entity); + match r { + Err(e) => { + // NOTE: Done only on error to avoid doing extra work within + // the lock. + trace!(?e, "pending login returned error"); + server_emitter.emit(ServerEvent::ClientDisconnect( + entity, + common::comp::DisconnectReason::Kicked, + )); + client.send(Err(e))?; + return Ok(()); + }, + // Swap the order of the tuple, so when it's destructured guard + // is dropped first. + Ok((guard, res)) => (res, guard), + } + }, + }; + + let (new_players_by_uuid, retries) = &mut *new_players_guard; + // Check if the user logged in before us during this tick (this is why we + // need the lock held). + let uuid = player.uuid(); + let old_player = old_player.map_or_else( + move || match new_players_by_uuid.entry(uuid) { + // We don't actually extract the client yet, to avoid doing extra + // work with the lock held. + hash_map::Entry::Occupied(o) => Ok((o.get().0, None)), + hash_map::Entry::Vacant(v) => Err(v), + }, + Ok, + ); + let vacant_player = match old_player { + Ok((old_entity, old_client)) => { + if matches!(old_client, None | Some(Some(_))) { + // We can't login the new client right now as the + // removal of the old client and player occurs later in + // the tick, so we instead setup the new login to be + // processed in the next tick + // Create "fake" successful pending auth and mark it to + // be inserted into pending_logins at the end of this + // run. + retries.push((entity, pending_login)); + drop(new_players_guard); + let old_client = old_client + .flatten() + .or_else(|| clients.get(old_entity)) + .expect( + "All entries in the new player list were explicitly \ + joining on client", + ); + let _ = old_client.send(ServerGeneral::Disconnect( + DisconnectReason::Kicked(String::from( + "You have logged in from another location.", + )), + )); + } else { + drop(new_players_guard); + // A player without a client is strange, so we don't really want + // to retry. Warn about + // this case and hope that trying to perform the + // disconnect process removes the invalid player entry. + warn!( + "Player without client detected for entity {:?}", + old_entity + ); + } + // Remove old client server_emitter.emit(ServerEvent::ClientDisconnect( - entity, - common::comp::DisconnectReason::Kicked, + old_entity, + common::comp::DisconnectReason::NewerLogin, )); - client.send(Err(e))?; return Ok(()); }, - Ok((username, uuid)) => (username, uuid), - } - }, - }; + Err(v) => v, + }; - // Check if user is already logged-in - if let Some((old_entity, old_client, _)) = - (&read_data.entities, &read_data.clients, &players) - .join() - .find(|(_, _, old_player)| old_player.uuid() == uuid) - { - // Remove old client - server_emitter.emit(ServerEvent::ClientDisconnect( - old_entity, - common::comp::DisconnectReason::NewerLogin, - )); - let _ = old_client.send(ServerGeneral::Disconnect(DisconnectReason::Kicked( - String::from("You have logged in from another location."), - ))); - // We can't login the new client right now as the - // removal of the old client and player occurs later in - // the tick, so we instead setup the new login to be - // processed in the next tick - // Create "fake" successful pending auth and mark it to - // be inserted into pending_logins at the end of this - // run - retries.push((entity, PendingLogin::new_success(username, uuid))); - return Ok(()); - } + let Some(msg) = msg else { + drop(new_players_guard); + // Invalid player + client.send(Err(RegisterError::InvalidCharacter))?; + return Ok(()); + }; - // NOTE: this is just default value. - // - // It will be overwritten in ServerExt::update_character_data. - let battle_mode = read_data.settings.gameplay.battle_mode.default_mode(); - let player = Player::new(username, battle_mode, uuid, None); + // We know the player list didn't already contain this entity because we + // joined on !players, so we can assume from here + // that we'll definitely be adding a new player. - let admin = read_data.editable_settings.admins.get(&uuid); + // Add to list to notify all clients of the new player + vacant_player.insert((entity, player, admin, msg)); + drop(new_players_guard); + read_data.player_metrics.players_connected.inc(); - if !player.is_valid() { - // Invalid player - client.send(Err(RegisterError::InvalidCharacter))?; - return Ok(()); - } + // Tell the client its request was successful. + client.send(Ok(()))?; - if let Ok(StorageEntry::Vacant(v)) = players.entry(entity) { - // Add Player component to this client, if the entity exists. - v.insert(player); - read_data.player_metrics.players_connected.inc(); + // Send client all the tracked components currently attached to its entity + // as well as synced resources (currently only + // `TimeOfDay`) + debug!("Starting initial sync with client."); + client.send(ServerInit::GameSync { + // Send client their entity + entity_package: read_data + .trackers + .create_entity_package_with_uid(entity, *uid, None, None, None), + time_of_day: *read_data.time_of_day, + max_group_size: read_data.settings.max_player_group_size, + client_timeout: read_data.settings.client_timeout, + world_map: (&*read_data.map).clone(), + recipe_book: default_recipe_book().cloned(), + component_recipe_book: default_component_recipe_book().cloned(), + material_stats: (&*read_data.material_stats).clone(), + ability_map: (&*read_data.ability_map).clone(), + })?; + debug!("Done initial sync with client."); - // Give the Admin component to the player if their name exists in - // admin list - if let Some(admin) = admin { - admins - .insert(entity, Admin(admin.role.into())) - .expect("Inserting into players proves the entity exists."); + // Send initial player list + client.send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init( + player_list.clone(), + )))?; + + Ok(()) + }() { + trace!(?e, "failed to process register"); } - - // Tell the client its request was successful. - client.send(Ok(()))?; - - // Send initial player list - client.send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init( - player_list.clone(), - )))?; - - // Add to list to notify all clients of the new player - new_players.push(entity); - } - Ok(()) - }() { - trace!(?e, "failed to process register") - }; - } - for e in finished_pending { + (finished_pending, server_emitter_) + }, + ) + .map(|(finished_pending, _server_emitter)| finished_pending) + .collect::>(); + finished_pending.into_iter().flatten().for_each(|e| { + // Remove all entities in finished_pending from pending_logins. pending_logins.remove(e); - } + }); + let (new_players, retries) = new_players.into_inner(); + // Insert retry attempts back into pending_logins to be processed next tick for (entity, pending) in retries { let _ = pending_logins.insert(entity, pending); } // Handle new players. - // Tell all clients to add them to the player list. - let player_info = |entity| { - let player_info = read_data.uids.get(entity).zip(players.get(entity)); - player_info.map(|(u, p)| (entity, u, p)) - }; - for (entity, uid, player) in new_players.into_iter().filter_map(player_info) { - let mut lazy_msg = None; - for (_, client) in (&players, &read_data.clients).join() { - if lazy_msg.is_none() { - lazy_msg = Some(client.prepare(ServerGeneral::PlayerListUpdate( - PlayerListUpdate::Add(*uid, PlayerInfo { - player_alias: player.alias.clone(), - is_online: true, - is_moderator: admins.get(entity).is_some(), - character: None, // new players will be on character select. - uuid: player.uuid(), - }), - ))); + let msgs = new_players + .into_values() + .map(|(entity, player, admin, msg)| { + let username = &player.alias; + info!(?username, "New User"); + // Add Player component to this client. + // + // Note that since players has been write locked for the duration of this + // system, we know that nobody else added any players since we + // last checked its value, and we checked that everything in + // new_players was not already in players, so we know the insert + // succeeds and the old entry was vacant. Moreover, we know that all new + // players we added have different UUIDs both from each other, and from any old + // players, preserving the uniqueness invariant. + players + .insert(entity, player) + .expect("The entity was joined against in the same system, so it exists"); + + // Give the Admin component to the player if their name exists in + // admin list + if let Some(admin) = admin { + admins + .insert(entity, Admin(admin.role.into())) + .expect("Inserting into players proves the entity exists."); } - lazy_msg.as_ref().map(|msg| client.send_prepared(msg)); - } - } + msg + }) + .collect::>(); + + // Tell all clients to add the new players to the player list, in parallel. + (players.mask(), &clients) + .par_join() + .for_each(|(_, client)| { + // Send messages sequentially within each client; by the time we have enough + // players to make parallelizing useful, we will have way more + // players than cores. + msgs.iter().for_each(|msg| { + let _ = client.send_prepared(msg); + }); + }); } } diff --git a/server/src/sys/msg/terrain.rs b/server/src/sys/msg/terrain.rs index e941d31b76..ecdd6bdd9f 100644 --- a/server/src/sys/msg/terrain.rs +++ b/server/src/sys/msg/terrain.rs @@ -12,7 +12,7 @@ use common::{ use common_ecs::{Job, Origin, ParMode, Phase, System}; use common_net::msg::{ClientGeneral, ServerGeneral}; use rayon::iter::ParallelIterator; -use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write}; +use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write, WriteStorage}; use tracing::{debug, trace}; /// This system will handle new messages from clients @@ -29,7 +29,7 @@ impl<'a> System<'a> for Sys { Write<'a, Vec>, ReadStorage<'a, Pos>, ReadStorage<'a, Presence>, - ReadStorage<'a, Client>, + WriteStorage<'a, Client>, ); const NAME: &'static str = "msg::terrain"; @@ -48,17 +48,17 @@ impl<'a> System<'a> for Sys { mut chunk_requests, positions, presences, - clients, + mut clients, ): Self::SystemData, ) { job.cpu_stats.measure(ParMode::Rayon); - let mut new_chunk_requests = (&entities, &clients, (&presences).maybe()) + let mut new_chunk_requests = (&entities, &mut clients, (&presences).maybe()) .par_join() .map_init( || (chunk_send_bus.emitter(), server_event_bus.emitter()), |(chunk_send_emitter, server_emitter), (entity, client, maybe_presence)| { let mut chunk_requests = Vec::new(); - let _ = super::try_recv_all(client, 5, |_, msg| { + let _ = super::try_recv_all(client, 5, |client, msg| { let presence = match maybe_presence { Some(g) => g, None => { diff --git a/server/src/sys/sentinel.rs b/server/src/sys/sentinel.rs index 438ded6a42..b9537d86de 100644 --- a/server/src/sys/sentinel.rs +++ b/server/src/sys/sentinel.rs @@ -64,7 +64,23 @@ macro_rules! trackers { vel: Option, ori: Option, ) -> Option> { - let uid = self.uid.get(entity).copied()?.0; + let uid = self.uid.get(entity).copied()?; + Some(self.create_entity_package_with_uid(entity, uid, pos, vel, ori)) + } + + /// See [create_entity_package]. + /// + /// NOTE: Only if you're certain you know the UID for the entity, and it hasn't + /// changed! + pub fn create_entity_package_with_uid( + &self, + entity: EcsEntity, + uid: Uid, + pos: Option, + vel: Option, + ori: Option, + ) -> EntityPackage { + let uid = uid.0; let mut comps = Vec::new(); // NOTE: we could potentially include a bitmap indicating which components are present instead of tagging // components with the type in order to save bandwidth @@ -94,7 +110,7 @@ macro_rules! trackers { vel.map(|c| comps.push(c.into())); ori.map(|c| comps.push(c.into())); - Some(EntityPackage { uid, comps }) + EntityPackage { uid, comps } } /// Create sync package for switching a client to another entity specifically to