Reduce overhead of messaging systems.

This commit is contained in:
Joshua Yanovski 2022-09-09 08:29:43 -07:00
parent 39225cdbd5
commit c78f496fca
21 changed files with 602 additions and 420 deletions

1
Cargo.lock generated
View File

@ -6844,6 +6844,7 @@ dependencies = [
"lazy_static", "lazy_static",
"noise", "noise",
"num_cpus", "num_cpus",
"parking_lot 0.12.0",
"portpicker", "portpicker",
"prometheus", "prometheus",
"prometheus-hyper", "prometheus-hyper",

View File

@ -293,7 +293,7 @@ impl Client {
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let network = Network::new(Pid::new(), &runtime); let network = Network::new(Pid::new(), &runtime);
let participant = match addr { let mut participant = match addr {
ConnectionArgs::Tcp { ConnectionArgs::Tcp {
hostname, hostname,
prefer_ipv6, prefer_ipv6,
@ -316,7 +316,7 @@ impl Client {
}; };
let stream = participant.opened().await?; let stream = participant.opened().await?;
let mut ping_stream = participant.opened().await?; let ping_stream = participant.opened().await?;
let mut register_stream = participant.opened().await?; let mut register_stream = participant.opened().await?;
let character_screen_stream = participant.opened().await?; let character_screen_stream = participant.opened().await?;
let in_game_stream = participant.opened().await?; let in_game_stream = participant.opened().await?;
@ -2514,7 +2514,7 @@ impl Client {
} }
// ignore network events // ignore network events
while let Some(Ok(Some(event))) = self.participant.as_ref().map(|p| p.try_fetch_event()) { while let Some(Ok(Some(event))) = self.participant.as_mut().map(|p| p.try_fetch_event()) {
trace!(?event, "received network event"); trace!(?event, "received network event");
} }

View File

@ -11,7 +11,7 @@ async fn stream_msg(s1_a: Arc<Mutex<Stream>>, s1_b: Arc<Mutex<Stream>>, data: &[
let mut s1_b = s1_b.lock().await; let mut s1_b = s1_b.lock().await;
let m = Message::serialize(&data, s1_b.params()); let m = Message::serialize(&data, s1_b.params());
std::thread::spawn(move || { std::thread::spawn(move || {
let mut s1_a = s1_a.try_lock().unwrap(); let s1_a = s1_a.try_lock().unwrap();
for _ in 0..cnt { for _ in 0..cnt {
s1_a.send_raw(&m).unwrap(); s1_a.send_raw(&m).unwrap();
} }
@ -130,11 +130,11 @@ pub fn network_participant_stream(
) { ) {
let runtime = Runtime::new().unwrap(); let runtime = Runtime::new().unwrap();
let (n_a, p1_a, s1_a, n_b, p1_b, s1_b) = runtime.block_on(async { let (n_a, p1_a, s1_a, n_b, p1_b, s1_b) = runtime.block_on(async {
let n_a = Network::new(Pid::fake(0), &runtime); let mut n_a = Network::new(Pid::fake(0), &runtime);
let n_b = Network::new(Pid::fake(1), &runtime); let n_b = Network::new(Pid::fake(1), &runtime);
n_a.listen(addr.0).await.unwrap(); n_a.listen(addr.0).await.unwrap();
let p1_b = n_b.connect(addr.1).await.unwrap(); let mut p1_b = n_b.connect(addr.1).await.unwrap();
let p1_a = n_a.connected().await.unwrap(); let p1_a = n_a.connected().await.unwrap();
let s1_a = p1_a.open(4, Promises::empty(), 0).await.unwrap(); let s1_a = p1_a.open(4, Promises::empty(), 0).await.unwrap();

View File

@ -8,7 +8,7 @@ use std::{sync::Arc, thread, time::Duration};
use tokio::{io, io::AsyncBufReadExt, runtime::Runtime, sync::RwLock}; use tokio::{io, io::AsyncBufReadExt, runtime::Runtime, sync::RwLock};
use tracing::*; use tracing::*;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use veloren_network::{ConnectAddr, ListenAddr, Network, Participant, Pid, Promises}; use veloren_network::{ConnectAddr, ListenAddr, Network, Participant, Pid, Promises, Stream};
///This example contains a simple chatserver, that allows to send messages ///This example contains a simple chatserver, that allows to send messages
/// between participants, it's neither pretty nor perfect, but it should show /// between participants, it's neither pretty nor perfect, but it should show
@ -106,26 +106,20 @@ fn main() {
fn server(address: ListenAddr) { fn server(address: ListenAddr) {
let r = Arc::new(Runtime::new().unwrap()); let r = Arc::new(Runtime::new().unwrap());
let server = Network::new(Pid::new(), &r); let mut server = Network::new(Pid::new(), &r);
let server = Arc::new(server);
let participants = Arc::new(RwLock::new(Vec::new())); let participants = Arc::new(RwLock::new(Vec::new()));
r.block_on(async { r.block_on(async {
server.listen(address).await.unwrap(); server.listen(address).await.unwrap();
loop { loop {
let p1 = Arc::new(server.connected().await.unwrap()); let mut p1 = server.connected().await.unwrap();
let server1 = server.clone(); let s1 = p1.opened().await.unwrap();
participants.write().await.push(p1.clone()); participants.write().await.push(p1);
tokio::spawn(client_connection(server1, p1, participants.clone())); tokio::spawn(client_connection(s1, participants.clone()));
} }
}); });
} }
async fn client_connection( async fn client_connection(mut s1: Stream, participants: Arc<RwLock<Vec<Participant>>>) {
_network: Arc<Network>,
participant: Arc<Participant>,
participants: Arc<RwLock<Vec<Arc<Participant>>>>,
) {
let mut s1 = participant.opened().await.unwrap();
let username = s1.recv::<String>().await.unwrap(); let username = s1.recv::<String>().await.unwrap();
println!("[{}] connected", username); println!("[{}] connected", username);
loop { loop {
@ -141,7 +135,7 @@ async fn client_connection(
.await .await
{ {
Err(_) => info!("error talking to client, //TODO drop it"), Err(_) => info!("error talking to client, //TODO drop it"),
Ok(mut s) => s.send((username.clone(), msg.clone())).unwrap(), Ok(s) => s.send((username.clone(), msg.clone())).unwrap(),
}; };
} }
}, },
@ -156,7 +150,7 @@ fn client(address: ConnectAddr) {
r.block_on(async { r.block_on(async {
let p1 = client.connect(address.clone()).await.unwrap(); //remote representation of p1 let p1 = client.connect(address.clone()).await.unwrap(); //remote representation of p1
let mut s1 = p1 let s1 = p1
.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0) .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
.await .await
.unwrap(); //remote representation of s1 .unwrap(); //remote representation of s1
@ -188,7 +182,7 @@ fn client(address: ConnectAddr) {
// receiving i open and close a stream per message. this can be done easier but // receiving i open and close a stream per message. this can be done easier but
// this allows me to be quite lazy on the server side and just get a list of // this allows me to be quite lazy on the server side and just get a list of
// all participants and send to them... // all participants and send to them...
async fn read_messages(participant: Participant) { async fn read_messages(mut participant: Participant) {
while let Ok(mut s) = participant.opened().await { while let Ok(mut s) = participant.opened().await {
let (username, message) = s.recv::<(String, String)>().await.unwrap(); let (username, message) = s.recv::<(String, String)>().await.unwrap();
println!("[{}]: {}", username, message); println!("[{}]: {}", username, message);

View File

@ -1,5 +1,5 @@
use crate::commands::{Command, FileInfo, LocalCommand, RemoteInfo}; use crate::commands::{Command, FileInfo, LocalCommand, RemoteInfo};
use futures_util::{FutureExt, StreamExt}; use futures_util::StreamExt;
use std::{collections::HashMap, path::PathBuf, sync::Arc}; use std::{collections::HashMap, path::PathBuf, sync::Arc};
use tokio::{ use tokio::{
fs, join, fs, join,
@ -15,49 +15,65 @@ struct ControlChannels {
command_receiver: mpsc::UnboundedReceiver<LocalCommand>, command_receiver: mpsc::UnboundedReceiver<LocalCommand>,
} }
pub struct Server { struct Shared {
run_channels: Option<ControlChannels>,
network: Network,
served: RwLock<Vec<FileInfo>>, served: RwLock<Vec<FileInfo>>,
remotes: RwLock<HashMap<Pid, Arc<Mutex<RemoteInfo>>>>, remotes: RwLock<HashMap<Pid, Arc<Mutex<RemoteInfo>>>>,
receiving_files: Mutex<HashMap<u32, Option<String>>>, receiving_files: Mutex<HashMap<u32, Option<String>>>,
} }
pub struct Server {
run_channels: ControlChannels,
server: Network,
client: Network,
shared: Shared,
}
impl Server { impl Server {
pub fn new(runtime: Arc<Runtime>) -> (Self, mpsc::UnboundedSender<LocalCommand>) { pub fn new(runtime: Arc<Runtime>) -> (Self, mpsc::UnboundedSender<LocalCommand>) {
let (command_sender, command_receiver) = mpsc::unbounded_channel(); let (command_sender, command_receiver) = mpsc::unbounded_channel();
let network = Network::new(Pid::new(), &runtime); let server = Network::new(Pid::new(), &runtime);
let client = Network::new(Pid::new(), &runtime);
let run_channels = Some(ControlChannels { command_receiver }); let run_channels = ControlChannels { command_receiver };
( (
Server { Server {
run_channels, run_channels,
network, server,
served: RwLock::new(vec![]), client,
remotes: RwLock::new(HashMap::new()), shared: Shared {
receiving_files: Mutex::new(HashMap::new()), served: RwLock::new(vec![]),
remotes: RwLock::new(HashMap::new()),
receiving_files: Mutex::new(HashMap::new()),
},
}, },
command_sender, command_sender,
) )
} }
pub async fn run(mut self, address: ListenAddr) { pub async fn run(self, address: ListenAddr) {
let run_channels = self.run_channels.take().unwrap(); let run_channels = self.run_channels;
self.network.listen(address).await.unwrap(); self.server.listen(address).await.unwrap();
join!( join!(
self.command_manager(run_channels.command_receiver,), self.shared
self.connect_manager(), .command_manager(self.client, run_channels.command_receiver),
self.shared.connect_manager(self.server),
); );
} }
}
async fn command_manager(&self, command_receiver: mpsc::UnboundedReceiver<LocalCommand>) { impl Shared {
async fn command_manager(
&self,
client: Network,
command_receiver: mpsc::UnboundedReceiver<LocalCommand>,
) {
trace!("Start command_manager"); trace!("Start command_manager");
let command_receiver = UnboundedReceiverStream::new(command_receiver); let command_receiver = UnboundedReceiverStream::new(command_receiver);
command_receiver command_receiver
.for_each_concurrent(None, async move |cmd| { .for_each_concurrent(None, |cmd| async {
match cmd { match cmd {
LocalCommand::Shutdown => println!("Shutting down service"), LocalCommand::Shutdown => println!("Shutting down service"),
LocalCommand::Disconnect => { LocalCommand::Disconnect => {
@ -66,7 +82,7 @@ impl Server {
}, },
LocalCommand::Connect(addr) => { LocalCommand::Connect(addr) => {
println!("Trying to connect to: {:?}", &addr); println!("Trying to connect to: {:?}", &addr);
match self.network.connect(addr.clone()).await { match client.connect(addr.clone()).await {
Ok(p) => self.loop_participant(p).await, Ok(p) => self.loop_participant(p).await,
Err(e) => println!("Failed to connect to {:?}, err: {:?}", &addr, e), Err(e) => println!("Failed to connect to {:?}, err: {:?}", &addr, e),
} }
@ -89,7 +105,7 @@ impl Server {
LocalCommand::Get(id, path) => { LocalCommand::Get(id, path) => {
// i dont know the owner, just broadcast, i am laaaazyyy // i dont know the owner, just broadcast, i am laaaazyyy
for ri in self.remotes.read().await.values() { for ri in self.remotes.read().await.values() {
let mut ri = ri.lock().await; let ri = ri.lock().await;
if ri.get_info(id).is_some() { if ri.get_info(id).is_some() {
//found provider, send request. //found provider, send request.
self.receiving_files.lock().await.insert(id, path.clone()); self.receiving_files.lock().await.insert(id, path.clone());
@ -105,20 +121,20 @@ impl Server {
trace!("Stop command_manager"); trace!("Stop command_manager");
} }
async fn connect_manager(&self) { async fn connect_manager(&self, network: Network) {
trace!("Start connect_manager"); trace!("Start connect_manager");
let iter = futures_util::stream::unfold((), |_| { let iter = futures_util::stream::unfold(network, async move |mut network| {
self.network.connected().map(|r| r.ok().map(|v| (v, ()))) network.connected().await.ok().map(|v| (v, network))
}); });
iter.for_each_concurrent(/* limit */ None, async move |participant| { iter.for_each_concurrent(/* limit */ None, |participant| async {
self.loop_participant(participant).await; self.loop_participant(participant).await;
}) })
.await; .await;
trace!("Stop connect_manager"); trace!("Stop connect_manager");
} }
async fn loop_participant(&self, p: Participant) { async fn loop_participant(&self, mut p: Participant) {
if let (Ok(cmd_out), Ok(file_out), Ok(cmd_in), Ok(file_in)) = ( if let (Ok(cmd_out), Ok(file_out), Ok(cmd_in), Ok(file_in)) = (
p.open(3, Promises::ORDERED | Promises::CONSISTENCY, 0) p.open(3, Promises::ORDERED | Promises::CONSISTENCY, 0)
.await, .await,

View File

@ -129,7 +129,7 @@ fn main() {
fn server(address: ListenAddr, runtime: Arc<Runtime>) { fn server(address: ListenAddr, runtime: Arc<Runtime>) {
let registry = Arc::new(Registry::new()); let registry = Arc::new(Registry::new());
let server = Network::new_with_registry(Pid::new(), &runtime, &registry); let mut server = Network::new_with_registry(Pid::new(), &runtime, &registry);
runtime.spawn(Server::run( runtime.spawn(Server::run(
Arc::clone(&registry), Arc::clone(&registry),
SocketAddr::from(([0; 4], 59112)), SocketAddr::from(([0; 4], 59112)),
@ -140,7 +140,7 @@ fn server(address: ListenAddr, runtime: Arc<Runtime>) {
loop { loop {
info!("----"); info!("----");
info!("Waiting for participant to connect"); info!("Waiting for participant to connect");
let p1 = runtime.block_on(server.connected()).unwrap(); //remote representation of p1 let mut p1 = runtime.block_on(server.connected()).unwrap(); //remote representation of p1
let mut s1 = runtime.block_on(p1.opened()).unwrap(); //remote representation of s1 let mut s1 = runtime.block_on(p1.opened()).unwrap(); //remote representation of s1
runtime.block_on(async { runtime.block_on(async {
let mut last = Instant::now(); let mut last = Instant::now();
@ -169,7 +169,7 @@ fn client(address: ConnectAddr, runtime: Arc<Runtime>) {
)); ));
let p1 = runtime.block_on(client.connect(address)).unwrap(); //remote representation of p1 let p1 = runtime.block_on(client.connect(address)).unwrap(); //remote representation of p1
let mut s1 = runtime let s1 = runtime
.block_on(p1.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)) .block_on(p1.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0))
.unwrap(); //remote representation of s1 .unwrap(); //remote representation of s1
let mut last = Instant::now(); let mut last = Instant::now();

View File

@ -67,9 +67,9 @@ pub enum ParticipantEvent {
pub struct Participant { pub struct Participant {
local_pid: Pid, local_pid: Pid,
remote_pid: Pid, remote_pid: Pid,
a2b_open_stream_s: Mutex<mpsc::UnboundedSender<A2bStreamOpen>>, a2b_open_stream_s: mpsc::UnboundedSender<A2bStreamOpen>,
b2a_stream_opened_r: Mutex<mpsc::UnboundedReceiver<Stream>>, b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
b2a_event_r: Mutex<mpsc::UnboundedReceiver<ParticipantEvent>>, b2a_event_r: mpsc::UnboundedReceiver<ParticipantEvent>,
b2a_bandwidth_stats_r: watch::Receiver<f32>, b2a_bandwidth_stats_r: watch::Receiver<f32>,
a2s_disconnect_s: A2sDisconnect, a2s_disconnect_s: A2sDisconnect,
} }
@ -195,9 +195,9 @@ pub struct StreamParams {
pub struct Network { pub struct Network {
local_pid: Pid, local_pid: Pid,
participant_disconnect_sender: Arc<Mutex<HashMap<Pid, A2sDisconnect>>>, participant_disconnect_sender: Arc<Mutex<HashMap<Pid, A2sDisconnect>>>,
listen_sender: Mutex<mpsc::UnboundedSender<(ListenAddr, oneshot::Sender<io::Result<()>>)>>, listen_sender: mpsc::UnboundedSender<(ListenAddr, oneshot::Sender<io::Result<()>>)>,
connect_sender: Mutex<mpsc::UnboundedSender<A2sConnect>>, connect_sender: mpsc::UnboundedSender<A2sConnect>,
connected_receiver: Mutex<mpsc::UnboundedReceiver<Participant>>, connected_receiver: mpsc::UnboundedReceiver<Participant>,
shutdown_network_s: Option<oneshot::Sender<oneshot::Sender<()>>>, shutdown_network_s: Option<oneshot::Sender<oneshot::Sender<()>>>,
} }
@ -300,9 +300,9 @@ impl Network {
Self { Self {
local_pid: participant_id, local_pid: participant_id,
participant_disconnect_sender, participant_disconnect_sender,
listen_sender: Mutex::new(listen_sender), listen_sender,
connect_sender: Mutex::new(connect_sender), connect_sender,
connected_receiver: Mutex::new(connected_receiver), connected_receiver,
shutdown_network_s: Some(shutdown_network_s), shutdown_network_s: Some(shutdown_network_s),
} }
} }
@ -342,10 +342,7 @@ impl Network {
pub async fn listen(&self, address: ListenAddr) -> Result<(), NetworkError> { pub async fn listen(&self, address: ListenAddr) -> Result<(), NetworkError> {
let (s2a_result_s, s2a_result_r) = oneshot::channel::<io::Result<()>>(); let (s2a_result_s, s2a_result_r) = oneshot::channel::<io::Result<()>>();
debug!(?address, "listening on address"); debug!(?address, "listening on address");
self.listen_sender self.listen_sender.send((address, s2a_result_s))?;
.lock()
.await
.send((address, s2a_result_s))?;
match s2a_result_r.await? { match s2a_result_r.await? {
//waiting guarantees that we either listened successfully or get an error like port in //waiting guarantees that we either listened successfully or get an error like port in
// use // use
@ -401,10 +398,7 @@ impl Network {
let (pid_sender, pid_receiver) = let (pid_sender, pid_receiver) =
oneshot::channel::<Result<Participant, NetworkConnectError>>(); oneshot::channel::<Result<Participant, NetworkConnectError>>();
debug!(?address, "Connect to address"); debug!(?address, "Connect to address");
self.connect_sender self.connect_sender.send((address, pid_sender))?;
.lock()
.await
.send((address, pid_sender))?;
let participant = match pid_receiver.await? { let participant = match pid_receiver.await? {
Ok(p) => p, Ok(p) => p,
Err(e) => return Err(NetworkError::ConnectFailed(e)), Err(e) => return Err(NetworkError::ConnectFailed(e)),
@ -454,11 +448,9 @@ impl Network {
/// [`listen`]: crate::api::Network::listen /// [`listen`]: crate::api::Network::listen
/// [`ListenAddr`]: crate::api::ListenAddr /// [`ListenAddr`]: crate::api::ListenAddr
#[instrument(name="network", skip(self), fields(p = %self.local_pid))] #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
pub async fn connected(&self) -> Result<Participant, NetworkError> { pub async fn connected(&mut self) -> Result<Participant, NetworkError> {
let participant = self let participant = self
.connected_receiver .connected_receiver
.lock()
.await
.recv() .recv()
.await .await
.ok_or(NetworkError::NetworkClosed)?; .ok_or(NetworkError::NetworkClosed)?;
@ -536,9 +528,9 @@ impl Participant {
Self { Self {
local_pid, local_pid,
remote_pid, remote_pid,
a2b_open_stream_s: Mutex::new(a2b_open_stream_s), a2b_open_stream_s,
b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r), b2a_stream_opened_r,
b2a_event_r: Mutex::new(b2a_event_r), b2a_event_r,
b2a_bandwidth_stats_r, b2a_bandwidth_stats_r,
a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
} }
@ -600,12 +592,10 @@ impl Participant {
) -> Result<Stream, ParticipantError> { ) -> Result<Stream, ParticipantError> {
debug_assert!(prio <= network_protocol::HIGHEST_PRIO, "invalid prio"); debug_assert!(prio <= network_protocol::HIGHEST_PRIO, "invalid prio");
let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel::<Stream>(); let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel::<Stream>();
if let Err(e) = self.a2b_open_stream_s.lock().await.send(( if let Err(e) =
prio, self.a2b_open_stream_s
promises, .send((prio, promises, bandwidth, p2a_return_stream_s))
bandwidth, {
p2a_return_stream_s,
)) {
debug!(?e, "bParticipant is already closed, notifying"); debug!(?e, "bParticipant is already closed, notifying");
return Err(ParticipantError::ParticipantDisconnected); return Err(ParticipantError::ParticipantDisconnected);
} }
@ -657,8 +647,8 @@ impl Participant {
/// [`connected`]: Network::connected /// [`connected`]: Network::connected
/// [`open`]: Participant::open /// [`open`]: Participant::open
#[instrument(name="network", skip(self), fields(p = %self.local_pid))] #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
pub async fn opened(&self) -> Result<Stream, ParticipantError> { pub async fn opened(&mut self) -> Result<Stream, ParticipantError> {
match self.b2a_stream_opened_r.lock().await.recv().await { match self.b2a_stream_opened_r.recv().await {
Some(stream) => { Some(stream) => {
let sid = stream.sid; let sid = stream.sid;
debug!(?sid, "Receive opened stream"); debug!(?sid, "Receive opened stream");
@ -794,8 +784,8 @@ impl Participant {
/// ``` /// ```
/// ///
/// [`ParticipantEvent`]: crate::api::ParticipantEvent /// [`ParticipantEvent`]: crate::api::ParticipantEvent
pub async fn fetch_event(&self) -> Result<ParticipantEvent, ParticipantError> { pub async fn fetch_event(&mut self) -> Result<ParticipantEvent, ParticipantError> {
match self.b2a_event_r.lock().await.recv().await { match self.b2a_event_r.recv().await {
Some(event) => Ok(event), Some(event) => Ok(event),
None => { None => {
debug!("event_receiver failed, closing participant"); debug!("event_receiver failed, closing participant");
@ -811,16 +801,13 @@ impl Participant {
/// ///
/// [`ParticipantEvent`]: crate::api::ParticipantEvent /// [`ParticipantEvent`]: crate::api::ParticipantEvent
/// [`fetch_event`]: Participant::fetch_event /// [`fetch_event`]: Participant::fetch_event
pub fn try_fetch_event(&self) -> Result<Option<ParticipantEvent>, ParticipantError> { pub fn try_fetch_event(&mut self) -> Result<Option<ParticipantEvent>, ParticipantError> {
match &mut self.b2a_event_r.try_lock() { match self.b2a_event_r.try_recv() {
Ok(b2a_event_r) => match b2a_event_r.try_recv() { Ok(event) => Ok(Some(event)),
Ok(event) => Ok(Some(event)), Err(mpsc::error::TryRecvError::Empty) => Ok(None),
Err(mpsc::error::TryRecvError::Empty) => Ok(None), Err(mpsc::error::TryRecvError::Disconnected) => {
Err(mpsc::error::TryRecvError::Disconnected) => { Err(ParticipantError::ParticipantDisconnected)
Err(ParticipantError::ParticipantDisconnected)
},
}, },
Err(_) => Ok(None),
} }
} }
@ -914,7 +901,7 @@ impl Stream {
/// [`recv`]: Stream::recv /// [`recv`]: Stream::recv
/// [`Serialized`]: Serialize /// [`Serialized`]: Serialize
#[inline] #[inline]
pub fn send<M: Serialize>(&mut self, msg: M) -> Result<(), StreamError> { pub fn send<M: Serialize>(&self, msg: M) -> Result<(), StreamError> {
self.send_raw_move(Message::serialize(&msg, self.params())) self.send_raw_move(Message::serialize(&msg, self.params()))
} }
@ -966,7 +953,7 @@ impl Stream {
/// [`compress`]: lz_fear::raw::compress2 /// [`compress`]: lz_fear::raw::compress2
/// [`Message::serialize`]: crate::message::Message::serialize /// [`Message::serialize`]: crate::message::Message::serialize
#[inline] #[inline]
pub fn send_raw(&mut self, message: &Message) -> Result<(), StreamError> { pub fn send_raw(&self, message: &Message) -> Result<(), StreamError> {
self.send_raw_move(Message { self.send_raw_move(Message {
data: message.data.clone(), data: message.data.clone(),
#[cfg(feature = "compression")] #[cfg(feature = "compression")]
@ -974,7 +961,7 @@ impl Stream {
}) })
} }
fn send_raw_move(&mut self, message: Message) -> Result<(), StreamError> { fn send_raw_move(&self, message: Message) -> Result<(), StreamError> {
if self.send_closed.load(Ordering::Relaxed) { if self.send_closed.load(Ordering::Relaxed) {
return Err(StreamError::StreamClosed); return Err(StreamError::StreamClosed);
} }

View File

@ -28,7 +28,7 @@ use helper::{network_participant_stream, tcp, SLEEP_EXTERNAL, SLEEP_INTERNAL};
#[test] #[test]
fn close_network() { fn close_network() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _, _p1_a, mut s1_a, _, _p1_b, mut s1_b) = network_participant_stream(tcp()); let (r, _, _p1_a, s1_a, _, _p1_b, mut s1_b) = network_participant_stream(tcp());
std::thread::sleep(SLEEP_INTERNAL); std::thread::sleep(SLEEP_INTERNAL);
@ -40,7 +40,7 @@ fn close_network() {
#[test] #[test]
fn close_participant() { fn close_participant() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = network_participant_stream(tcp()); let (r, _n_a, p1_a, s1_a, _n_b, p1_b, mut s1_b) = network_participant_stream(tcp());
r.block_on(p1_a.disconnect()).unwrap(); r.block_on(p1_a.disconnect()).unwrap();
r.block_on(p1_b.disconnect()).unwrap(); r.block_on(p1_b.disconnect()).unwrap();
@ -75,7 +75,7 @@ fn close_streams_in_block_on() {
let (r, _n_a, _p_a, s1_a, _n_b, _p_b, s1_b) = network_participant_stream(tcp()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, s1_b) = network_participant_stream(tcp());
r.block_on(async { r.block_on(async {
//make it locally so that they are dropped later //make it locally so that they are dropped later
let mut s1_a = s1_a; let s1_a = s1_a;
let mut s1_b = s1_b; let mut s1_b = s1_b;
s1_a.send("ping").unwrap(); s1_a.send("ping").unwrap();
assert_eq!(s1_b.recv().await, Ok("ping".to_string())); assert_eq!(s1_b.recv().await, Ok("ping".to_string()));
@ -87,7 +87,7 @@ fn close_streams_in_block_on() {
#[test] #[test]
fn stream_simple_3msg_then_close() { fn stream_simple_3msg_then_close() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(1u8).unwrap(); s1_a.send(1u8).unwrap();
s1_a.send(42).unwrap(); s1_a.send(42).unwrap();
@ -104,7 +104,7 @@ fn stream_simple_3msg_then_close() {
fn stream_send_first_then_receive() { fn stream_send_first_then_receive() {
// recv should still be possible even if stream got closed if they are in queue // recv should still be possible even if stream got closed if they are in queue
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(1u8).unwrap(); s1_a.send(1u8).unwrap();
s1_a.send(42).unwrap(); s1_a.send(42).unwrap();
@ -120,7 +120,7 @@ fn stream_send_first_then_receive() {
#[test] #[test]
fn stream_send_1_then_close_stream() { fn stream_send_1_then_close_stream() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send("this message must be received, even if stream is closed already!") s1_a.send("this message must be received, even if stream is closed already!")
.unwrap(); .unwrap();
drop(s1_a); drop(s1_a);
@ -133,7 +133,7 @@ fn stream_send_1_then_close_stream() {
#[test] #[test]
fn stream_send_100000_then_close_stream() { fn stream_send_100000_then_close_stream() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
for _ in 0..100000 { for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap(); s1_a.send("woop_PARTY_HARD_woop").unwrap();
} }
@ -151,7 +151,7 @@ fn stream_send_100000_then_close_stream() {
#[test] #[test]
fn stream_send_100000_then_close_stream_remote() { fn stream_send_100000_then_close_stream_remote() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100000 { for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap(); s1_a.send("woop_PARTY_HARD_woop").unwrap();
} }
@ -164,7 +164,7 @@ fn stream_send_100000_then_close_stream_remote() {
#[test] #[test]
fn stream_send_100000_then_close_stream_remote2() { fn stream_send_100000_then_close_stream_remote2() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100000 { for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap(); s1_a.send("woop_PARTY_HARD_woop").unwrap();
} }
@ -178,7 +178,7 @@ fn stream_send_100000_then_close_stream_remote2() {
#[test] #[test]
fn stream_send_100000_then_close_stream_remote3() { fn stream_send_100000_then_close_stream_remote3() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100000 { for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap(); s1_a.send("woop_PARTY_HARD_woop").unwrap();
} }
@ -192,7 +192,7 @@ fn stream_send_100000_then_close_stream_remote3() {
#[test] #[test]
fn close_part_then_network() { fn close_part_then_network() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (_r, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); let (_r, n_a, p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..1000 { for _ in 0..1000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap(); s1_a.send("woop_PARTY_HARD_woop").unwrap();
} }
@ -205,7 +205,7 @@ fn close_part_then_network() {
#[test] #[test]
fn close_network_then_part() { fn close_network_then_part() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (_r, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); let (_r, n_a, p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..1000 { for _ in 0..1000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap(); s1_a.send("woop_PARTY_HARD_woop").unwrap();
} }
@ -218,7 +218,7 @@ fn close_network_then_part() {
#[test] #[test]
fn close_network_then_disconnect_part() { fn close_network_then_disconnect_part() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); let (r, n_a, p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..1000 { for _ in 0..1000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap(); s1_a.send("woop_PARTY_HARD_woop").unwrap();
} }
@ -231,7 +231,7 @@ fn close_network_then_disconnect_part() {
#[test] #[test]
fn close_runtime_then_network() { fn close_runtime_then_network() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100 { for _ in 0..100 {
s1_a.send("woop_PARTY_HARD_woop").unwrap(); s1_a.send("woop_PARTY_HARD_woop").unwrap();
} }
@ -244,7 +244,7 @@ fn close_runtime_then_network() {
#[test] #[test]
fn close_runtime_then_part() { fn close_runtime_then_part() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100 { for _ in 0..100 {
s1_a.send("woop_PARTY_HARD_woop").unwrap(); s1_a.send("woop_PARTY_HARD_woop").unwrap();
} }
@ -258,7 +258,7 @@ fn close_runtime_then_part() {
#[test] #[test]
fn close_network_from_async() { fn close_network_from_async() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100 { for _ in 0..100 {
s1_a.send("woop_PARTY_HARD_woop").unwrap(); s1_a.send("woop_PARTY_HARD_woop").unwrap();
} }
@ -271,7 +271,7 @@ fn close_network_from_async() {
#[test] #[test]
fn close_part_from_async() { fn close_part_from_async() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); let (r, _n_a, p_a, s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100 { for _ in 0..100 {
s1_a.send("woop_PARTY_HARD_woop").unwrap(); s1_a.send("woop_PARTY_HARD_woop").unwrap();
} }
@ -285,8 +285,8 @@ fn close_part_from_async() {
#[test] #[test]
fn opened_stream_before_remote_part_is_closed() { fn opened_stream_before_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); let (r, _n_a, p_a, _, _n_b, mut p_b, _) = network_participant_stream(tcp());
let mut s2_a = r.block_on(p_a.open(4, Promises::empty(), 0)).unwrap(); let s2_a = r.block_on(p_a.open(4, Promises::empty(), 0)).unwrap();
s2_a.send("HelloWorld").unwrap(); s2_a.send("HelloWorld").unwrap();
let mut s2_b = r.block_on(p_b.opened()).unwrap(); let mut s2_b = r.block_on(p_b.opened()).unwrap();
drop(p_a); drop(p_a);
@ -298,8 +298,8 @@ fn opened_stream_before_remote_part_is_closed() {
#[test] #[test]
fn opened_stream_after_remote_part_is_closed() { fn opened_stream_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); let (r, _n_a, p_a, _, _n_b, mut p_b, _) = network_participant_stream(tcp());
let mut s2_a = r.block_on(p_a.open(3, Promises::empty(), 0)).unwrap(); let s2_a = r.block_on(p_a.open(3, Promises::empty(), 0)).unwrap();
s2_a.send("HelloWorld").unwrap(); s2_a.send("HelloWorld").unwrap();
drop(p_a); drop(p_a);
std::thread::sleep(SLEEP_EXTERNAL); std::thread::sleep(SLEEP_EXTERNAL);
@ -315,8 +315,8 @@ fn opened_stream_after_remote_part_is_closed() {
#[test] #[test]
fn open_stream_after_remote_part_is_closed() { fn open_stream_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); let (r, _n_a, p_a, _, _n_b, mut p_b, _) = network_participant_stream(tcp());
let mut s2_a = r.block_on(p_a.open(4, Promises::empty(), 0)).unwrap(); let s2_a = r.block_on(p_a.open(4, Promises::empty(), 0)).unwrap();
s2_a.send("HelloWorld").unwrap(); s2_a.send("HelloWorld").unwrap();
drop(p_a); drop(p_a);
std::thread::sleep(SLEEP_EXTERNAL); std::thread::sleep(SLEEP_EXTERNAL);
@ -332,7 +332,7 @@ fn open_stream_after_remote_part_is_closed() {
#[test] #[test]
fn failed_stream_open_after_remote_part_is_closed() { fn failed_stream_open_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); let (r, _n_a, p_a, _, _n_b, mut p_b, _) = network_participant_stream(tcp());
drop(p_a); drop(p_a);
assert_eq!( assert_eq!(
r.block_on(p_b.opened()).unwrap_err(), r.block_on(p_b.opened()).unwrap_err(),
@ -345,14 +345,14 @@ fn failed_stream_open_after_remote_part_is_closed() {
fn open_participant_before_remote_part_is_closed() { fn open_participant_before_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let r = Arc::new(Runtime::new().unwrap()); let r = Arc::new(Runtime::new().unwrap());
let n_a = Network::new(Pid::fake(0), &r); let mut n_a = Network::new(Pid::fake(0), &r);
let n_b = Network::new(Pid::fake(1), &r); let n_b = Network::new(Pid::fake(1), &r);
let addr = tcp(); let addr = tcp();
r.block_on(n_a.listen(addr.0)).unwrap(); r.block_on(n_a.listen(addr.0)).unwrap();
let p_b = r.block_on(n_b.connect(addr.1)).unwrap(); let p_b = r.block_on(n_b.connect(addr.1)).unwrap();
let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap(); let s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap();
s1_b.send("HelloWorld").unwrap(); s1_b.send("HelloWorld").unwrap();
let p_a = r.block_on(n_a.connected()).unwrap(); let mut p_a = r.block_on(n_a.connected()).unwrap();
drop(s1_b); drop(s1_b);
drop(p_b); drop(p_b);
drop(n_b); drop(n_b);
@ -365,18 +365,18 @@ fn open_participant_before_remote_part_is_closed() {
fn open_participant_after_remote_part_is_closed() { fn open_participant_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let r = Arc::new(Runtime::new().unwrap()); let r = Arc::new(Runtime::new().unwrap());
let n_a = Network::new(Pid::fake(0), &r); let mut n_a = Network::new(Pid::fake(0), &r);
let n_b = Network::new(Pid::fake(1), &r); let n_b = Network::new(Pid::fake(1), &r);
let addr = tcp(); let addr = tcp();
r.block_on(n_a.listen(addr.0)).unwrap(); r.block_on(n_a.listen(addr.0)).unwrap();
let p_b = r.block_on(n_b.connect(addr.1)).unwrap(); let p_b = r.block_on(n_b.connect(addr.1)).unwrap();
let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap(); let s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap();
s1_b.send("HelloWorld").unwrap(); s1_b.send("HelloWorld").unwrap();
drop(s1_b); drop(s1_b);
drop(p_b); drop(p_b);
drop(n_b); drop(n_b);
std::thread::sleep(SLEEP_EXTERNAL); std::thread::sleep(SLEEP_EXTERNAL);
let p_a = r.block_on(n_a.connected()).unwrap(); let mut p_a = r.block_on(n_a.connected()).unwrap();
let mut s1_a = r.block_on(p_a.opened()).unwrap(); let mut s1_a = r.block_on(p_a.opened()).unwrap();
assert_eq!(r.block_on(s1_a.recv()), Ok("HelloWorld".to_string())); assert_eq!(r.block_on(s1_a.recv()), Ok("HelloWorld".to_string()));
} }
@ -385,19 +385,19 @@ fn open_participant_after_remote_part_is_closed() {
fn close_network_scheduler_completely() { fn close_network_scheduler_completely() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let r = Arc::new(Runtime::new().unwrap()); let r = Arc::new(Runtime::new().unwrap());
let n_a = Network::new(Pid::fake(0), &r); let mut n_a = Network::new(Pid::fake(0), &r);
let n_b = Network::new(Pid::fake(1), &r); let n_b = Network::new(Pid::fake(1), &r);
let addr = tcp(); let addr = tcp();
r.block_on(n_a.listen(addr.0)).unwrap(); r.block_on(n_a.listen(addr.0)).unwrap();
let p_b = r.block_on(n_b.connect(addr.1)).unwrap(); let mut p_b = r.block_on(n_b.connect(addr.1)).unwrap();
assert_matches!( assert_matches!(
r.block_on(p_b.fetch_event()), r.block_on(p_b.fetch_event()),
Ok(ParticipantEvent::ChannelCreated(_)) Ok(ParticipantEvent::ChannelCreated(_))
); );
let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap(); let s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap();
s1_b.send("HelloWorld").unwrap(); s1_b.send("HelloWorld").unwrap();
let p_a = r.block_on(n_a.connected()).unwrap(); let mut p_a = r.block_on(n_a.connected()).unwrap();
assert_matches!( assert_matches!(
r.block_on(p_a.fetch_event()), r.block_on(p_a.fetch_event()),
Ok(ParticipantEvent::ChannelCreated(_)) Ok(ParticipantEvent::ChannelCreated(_))
@ -429,7 +429,7 @@ fn close_network_scheduler_completely() {
#[test] #[test]
fn dont_panic_on_multiply_recv_after_close() { fn dont_panic_on_multiply_recv_after_close() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(11u32).unwrap(); s1_a.send(11u32).unwrap();
drop(s1_a); drop(s1_a);
@ -444,7 +444,7 @@ fn dont_panic_on_multiply_recv_after_close() {
#[test] #[test]
fn dont_panic_on_recv_send_after_close() { fn dont_panic_on_recv_send_after_close() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(11u32).unwrap(); s1_a.send(11u32).unwrap();
drop(s1_a); drop(s1_a);
@ -457,7 +457,7 @@ fn dont_panic_on_recv_send_after_close() {
#[test] #[test]
fn dont_panic_on_multiple_send_after_close() { fn dont_panic_on_multiple_send_after_close() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(11u32).unwrap(); s1_a.send(11u32).unwrap();
drop(s1_a); drop(s1_a);

View File

@ -67,11 +67,11 @@ pub fn network_participant_stream(
) { ) {
let runtime = Arc::new(Runtime::new().unwrap()); let runtime = Arc::new(Runtime::new().unwrap());
let (n_a, p1_a, s1_a, n_b, p1_b, s1_b) = runtime.block_on(async { let (n_a, p1_a, s1_a, n_b, p1_b, s1_b) = runtime.block_on(async {
let n_a = Network::new(Pid::fake(0), &runtime); let mut n_a = Network::new(Pid::fake(0), &runtime);
let n_b = Network::new(Pid::fake(1), &runtime); let n_b = Network::new(Pid::fake(1), &runtime);
n_a.listen(addr.0).await.unwrap(); n_a.listen(addr.0).await.unwrap();
let p1_b = n_b.connect(addr.1).await.unwrap(); let mut p1_b = n_b.connect(addr.1).await.unwrap();
let p1_a = n_a.connected().await.unwrap(); let p1_a = n_a.connected().await.unwrap();
let s1_a = p1_a.open(4, Promises::ORDERED, 0).await.unwrap(); let s1_a = p1_a.open(4, Promises::ORDERED, 0).await.unwrap();

View File

@ -10,7 +10,7 @@ use veloren_network::{ConnectAddr, ListenAddr, Network, ParticipantEvent, Pid, P
#[test] #[test]
fn stream_simple() { fn stream_simple() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send("Hello World").unwrap(); s1_a.send("Hello World").unwrap();
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
@ -20,7 +20,7 @@ fn stream_simple() {
#[test] #[test]
fn stream_try_recv() { fn stream_try_recv() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(4242u32).unwrap(); s1_a.send(4242u32).unwrap();
std::thread::sleep(SLEEP_EXTERNAL); std::thread::sleep(SLEEP_EXTERNAL);
@ -31,7 +31,7 @@ fn stream_try_recv() {
#[test] #[test]
fn stream_simple_3msg() { fn stream_simple_3msg() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send("Hello World").unwrap(); s1_a.send("Hello World").unwrap();
s1_a.send(1337).unwrap(); s1_a.send(1337).unwrap();
@ -45,7 +45,7 @@ fn stream_simple_3msg() {
#[test] #[test]
fn stream_simple_mpsc() { fn stream_simple_mpsc() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(mpsc()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(mpsc());
s1_a.send("Hello World").unwrap(); s1_a.send("Hello World").unwrap();
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
@ -55,7 +55,7 @@ fn stream_simple_mpsc() {
#[test] #[test]
fn stream_simple_mpsc_3msg() { fn stream_simple_mpsc_3msg() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(mpsc()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(mpsc());
s1_a.send("Hello World").unwrap(); s1_a.send("Hello World").unwrap();
s1_a.send(1337).unwrap(); s1_a.send(1337).unwrap();
@ -69,7 +69,7 @@ fn stream_simple_mpsc_3msg() {
#[test] #[test]
fn stream_simple_quic() { fn stream_simple_quic() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(quic()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(quic());
s1_a.send("Hello World").unwrap(); s1_a.send("Hello World").unwrap();
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
@ -79,7 +79,7 @@ fn stream_simple_quic() {
#[test] #[test]
fn stream_simple_quic_3msg() { fn stream_simple_quic_3msg() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(quic()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(quic());
s1_a.send("Hello World").unwrap(); s1_a.send("Hello World").unwrap();
s1_a.send(1337).unwrap(); s1_a.send(1337).unwrap();
@ -94,7 +94,7 @@ fn stream_simple_quic_3msg() {
#[ignore] #[ignore]
fn stream_simple_udp() { fn stream_simple_udp() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp());
s1_a.send("Hello World").unwrap(); s1_a.send("Hello World").unwrap();
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
@ -105,7 +105,7 @@ fn stream_simple_udp() {
#[ignore] #[ignore]
fn stream_simple_udp_3msg() { fn stream_simple_udp_3msg() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp());
s1_a.send("Hello World").unwrap(); s1_a.send("Hello World").unwrap();
s1_a.send(1337).unwrap(); s1_a.send(1337).unwrap();
@ -184,7 +184,7 @@ fn api_stream_send_main() -> Result<(), Box<dyn std::error::Error>> {
let network = Network::new(Pid::new(), &r); let network = Network::new(Pid::new(), &r);
let remote = Network::new(Pid::new(), &r); let remote = Network::new(Pid::new(), &r);
r.block_on(async { r.block_on(async {
let network = network; let mut network = network;
let remote = remote; let remote = remote;
network network
.listen(ListenAddr::Tcp("127.0.0.1:1200".parse().unwrap())) .listen(ListenAddr::Tcp("127.0.0.1:1200".parse().unwrap()))
@ -196,8 +196,8 @@ fn api_stream_send_main() -> Result<(), Box<dyn std::error::Error>> {
let _stream_p = remote_p let _stream_p = remote_p
.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0) .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
.await?; .await?;
let participant_a = network.connected().await?; let mut participant_a = network.connected().await?;
let mut stream_a = participant_a.opened().await?; let stream_a = participant_a.opened().await?;
//Send Message //Send Message
stream_a.send("Hello World")?; stream_a.send("Hello World")?;
Ok(()) Ok(())
@ -213,7 +213,7 @@ fn api_stream_recv_main() -> Result<(), Box<dyn std::error::Error>> {
let network = Network::new(Pid::new(), &r); let network = Network::new(Pid::new(), &r);
let remote = Network::new(Pid::new(), &r); let remote = Network::new(Pid::new(), &r);
r.block_on(async { r.block_on(async {
let network = network; let mut network = network;
let remote = remote; let remote = remote;
network network
.listen(ListenAddr::Tcp("127.0.0.1:1220".parse().unwrap())) .listen(ListenAddr::Tcp("127.0.0.1:1220".parse().unwrap()))
@ -221,11 +221,11 @@ fn api_stream_recv_main() -> Result<(), Box<dyn std::error::Error>> {
let remote_p = remote let remote_p = remote
.connect(ConnectAddr::Tcp("127.0.0.1:1220".parse().unwrap())) .connect(ConnectAddr::Tcp("127.0.0.1:1220".parse().unwrap()))
.await?; .await?;
let mut stream_p = remote_p let stream_p = remote_p
.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0) .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
.await?; .await?;
stream_p.send("Hello World")?; stream_p.send("Hello World")?;
let participant_a = network.connected().await?; let mut participant_a = network.connected().await?;
let mut stream_a = participant_a.opened().await?; let mut stream_a = participant_a.opened().await?;
//Send Message //Send Message
assert_eq!("Hello World".to_string(), stream_a.recv::<String>().await?); assert_eq!("Hello World".to_string(), stream_a.recv::<String>().await?);
@ -236,7 +236,7 @@ fn api_stream_recv_main() -> Result<(), Box<dyn std::error::Error>> {
#[test] #[test]
fn wrong_parse() { fn wrong_parse() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); let (r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(1337).unwrap(); s1_a.send(1337).unwrap();
match r.block_on(s1_b.recv::<String>()) { match r.block_on(s1_b.recv::<String>()) {
@ -249,7 +249,7 @@ fn wrong_parse() {
#[test] #[test]
fn multiple_try_recv() { fn multiple_try_recv() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send("asd").unwrap(); s1_a.send("asd").unwrap();
s1_a.send(11u32).unwrap(); s1_a.send(11u32).unwrap();
@ -295,9 +295,9 @@ fn listen_on_ipv6_doesnt_block_ipv4() {
))), ))),
); );
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcpv6); let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcpv6);
std::thread::sleep(SLEEP_EXTERNAL); std::thread::sleep(SLEEP_EXTERNAL);
let (_r2, _n_a2, _p_a2, mut s1_a2, _n_b2, _p_b2, mut s1_b2) = network_participant_stream(tcpv4); let (_r2, _n_a2, _p_a2, s1_a2, _n_b2, _p_b2, mut s1_b2) = network_participant_stream(tcpv4);
s1_a.send(42u32).unwrap(); s1_a.send(42u32).unwrap();
s1_a2.send(1337u32).unwrap(); s1_a2.send(1337u32).unwrap();
@ -313,7 +313,7 @@ fn listen_on_ipv6_doesnt_block_ipv4() {
fn check_correct_channel_events() { fn check_correct_channel_events() {
let (_, _) = helper::setup(false, 0); let (_, _) = helper::setup(false, 0);
let con_addr = tcp(); let con_addr = tcp();
let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(con_addr.clone()); let (r, _n_a, mut p_a, _, _n_b, mut p_b, _) = network_participant_stream(con_addr.clone());
let event_a = r.block_on(p_a.fetch_event()).unwrap(); let event_a = r.block_on(p_a.fetch_event()).unwrap();
let event_b = r.block_on(p_b.fetch_event()).unwrap(); let event_b = r.block_on(p_b.fetch_event()).unwrap();

View File

@ -50,6 +50,7 @@ serde = { version = "1.0.110", features = ["derive"] }
serde_json = "1.0.50" serde_json = "1.0.50"
rand = { version = "0.8", features = ["small_rng"] } rand = { version = "0.8", features = ["small_rng"] }
hashbrown = { version = "0.12", features = ["rayon", "serde", "nightly"] } hashbrown = { version = "0.12", features = ["rayon", "serde", "nightly"] }
parking_lot = { version = "0.12" }
rayon = "1.5" rayon = "1.5"
crossbeam-channel = "0.5" crossbeam-channel = "0.5"
prometheus = { version = "0.13", default-features = false} prometheus = { version = "0.13", default-features = false}

View File

@ -3,7 +3,7 @@ use network::{Message, Participant, Stream, StreamError, StreamParams};
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use specs::Component; use specs::Component;
use specs_idvs::IdvStorage; use specs_idvs::IdvStorage;
use std::sync::{atomic::AtomicBool, Mutex}; use std::sync::atomic::AtomicBool;
/// Client handles ALL network related information of everything that connects /// Client handles ALL network related information of everything that connects
/// to the server Client DOES NOT handle game states /// to the server Client DOES NOT handle game states
@ -14,17 +14,18 @@ use std::sync::{atomic::AtomicBool, Mutex};
pub struct Client { pub struct Client {
pub client_type: ClientType, pub client_type: ClientType,
pub participant: Option<Participant>, pub participant: Option<Participant>,
pub last_ping: Mutex<f64>, pub last_ping: f64,
pub login_msg_sent: AtomicBool, pub login_msg_sent: AtomicBool,
//TODO: improve network crate so that `send` is no longer `&mut self` and we can get rid of //TODO: Consider splitting each of these out into their own components so all the message
// this Mutex. This Mutex is just to please the compiler as we do not get into contention //processing systems can run in parallel with each other (though it may turn out not to
general_stream: Mutex<Stream>, //matter that much).
ping_stream: Mutex<Stream>, general_stream: Stream,
register_stream: Mutex<Stream>, ping_stream: Stream,
character_screen_stream: Mutex<Stream>, register_stream: Stream,
in_game_stream: Mutex<Stream>, character_screen_stream: Stream,
terrain_stream: Mutex<Stream>, in_game_stream: Stream,
terrain_stream: Stream,
general_stream_params: StreamParams, general_stream_params: StreamParams,
ping_stream_params: StreamParams, ping_stream_params: StreamParams,
@ -64,14 +65,14 @@ impl Client {
Client { Client {
client_type, client_type,
participant: Some(participant), participant: Some(participant),
last_ping: Mutex::new(last_ping), last_ping,
login_msg_sent: AtomicBool::new(false), login_msg_sent: AtomicBool::new(false),
general_stream: Mutex::new(general_stream), general_stream,
ping_stream: Mutex::new(ping_stream), ping_stream,
register_stream: Mutex::new(register_stream), register_stream,
character_screen_stream: Mutex::new(character_screen_stream), character_screen_stream,
in_game_stream: Mutex::new(in_game_stream), in_game_stream,
terrain_stream: Mutex::new(terrain_stream), terrain_stream,
general_stream_params, general_stream_params,
ping_stream_params, ping_stream_params,
register_stream_params, register_stream_params,
@ -145,16 +146,12 @@ impl Client {
pub(crate) fn send_prepared(&self, msg: &PreparedMsg) -> Result<(), StreamError> { pub(crate) fn send_prepared(&self, msg: &PreparedMsg) -> Result<(), StreamError> {
match msg.stream_id { match msg.stream_id {
0 => self.register_stream.lock().unwrap().send_raw(&msg.message), 0 => self.register_stream.send_raw(&msg.message),
1 => self 1 => self.character_screen_stream.send_raw(&msg.message),
.character_screen_stream 2 => self.in_game_stream.send_raw(&msg.message),
.lock() 3 => self.general_stream.send_raw(&msg.message),
.unwrap() 4 => self.ping_stream.send_raw(&msg.message),
.send_raw(&msg.message), 5 => self.terrain_stream.send_raw(&msg.message),
2 => self.in_game_stream.lock().unwrap().send_raw(&msg.message),
3 => self.general_stream.lock().unwrap().send_raw(&msg.message),
4 => self.ping_stream.lock().unwrap().send_raw(&msg.message),
5 => self.terrain_stream.lock().unwrap().send_raw(&msg.message),
_ => unreachable!("invalid stream id"), _ => unreachable!("invalid stream id"),
} }
} }
@ -236,17 +233,17 @@ impl Client {
} }
pub(crate) fn recv<M: DeserializeOwned>( pub(crate) fn recv<M: DeserializeOwned>(
&self, &mut self,
stream_id: u8, stream_id: u8,
) -> Result<Option<M>, StreamError> { ) -> Result<Option<M>, StreamError> {
// TODO: are two systems using the same stream?? why is there contention here? // TODO: are two systems using the same stream?? why is there contention here?
match stream_id { match stream_id {
0 => self.register_stream.lock().unwrap().try_recv(), 0 => self.register_stream.try_recv(),
1 => self.character_screen_stream.lock().unwrap().try_recv(), 1 => self.character_screen_stream.try_recv(),
2 => self.in_game_stream.lock().unwrap().try_recv(), 2 => self.in_game_stream.try_recv(),
3 => self.general_stream.lock().unwrap().try_recv(), 3 => self.general_stream.try_recv(),
4 => self.ping_stream.lock().unwrap().try_recv(), 4 => self.ping_stream.try_recv(),
5 => self.terrain_stream.lock().unwrap().try_recv(), 5 => self.terrain_stream.try_recv(),
_ => unreachable!("invalid stream id"), _ => unreachable!("invalid stream id"),
} }
} }

View File

@ -2,7 +2,7 @@ use crate::{Client, ClientType, ServerInfo};
use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use futures_util::future::FutureExt; use futures_util::future::FutureExt;
use network::{Network, Participant, Promises}; use network::{Network, Participant, Promises};
use std::{sync::Arc, time::Duration}; use std::time::Duration;
use tokio::{runtime::Runtime, select, sync::oneshot}; use tokio::{runtime::Runtime, select, sync::oneshot};
use tracing::{debug, error, trace, warn}; use tracing::{debug, error, trace, warn};
@ -14,7 +14,6 @@ pub(crate) struct ServerInfoPacket {
pub(crate) type IncomingClient = Client; pub(crate) type IncomingClient = Client;
pub(crate) struct ConnectionHandler { pub(crate) struct ConnectionHandler {
_network: Arc<Network>,
thread_handle: Option<tokio::task::JoinHandle<()>>, thread_handle: Option<tokio::task::JoinHandle<()>>,
pub client_receiver: Receiver<IncomingClient>, pub client_receiver: Receiver<IncomingClient>,
pub info_requester_receiver: Receiver<Sender<ServerInfoPacket>>, pub info_requester_receiver: Receiver<Sender<ServerInfoPacket>>,
@ -27,8 +26,6 @@ pub(crate) struct ConnectionHandler {
/// and time /// and time
impl ConnectionHandler { impl ConnectionHandler {
pub fn new(network: Network, runtime: &Runtime) -> Self { pub fn new(network: Network, runtime: &Runtime) -> Self {
let network = Arc::new(network);
let network_clone = Arc::clone(&network);
let (stop_sender, stop_receiver) = oneshot::channel(); let (stop_sender, stop_receiver) = oneshot::channel();
let (client_sender, client_receiver) = unbounded::<IncomingClient>(); let (client_sender, client_receiver) = unbounded::<IncomingClient>();
@ -36,14 +33,13 @@ impl ConnectionHandler {
bounded::<Sender<ServerInfoPacket>>(1); bounded::<Sender<ServerInfoPacket>>(1);
let thread_handle = Some(runtime.spawn(Self::work( let thread_handle = Some(runtime.spawn(Self::work(
network_clone, network,
client_sender, client_sender,
info_requester_sender, info_requester_sender,
stop_receiver, stop_receiver,
))); )));
Self { Self {
_network: network,
thread_handle, thread_handle,
client_receiver, client_receiver,
info_requester_receiver, info_requester_receiver,
@ -52,7 +48,7 @@ impl ConnectionHandler {
} }
async fn work( async fn work(
network: Arc<Network>, mut network: Network,
client_sender: Sender<IncomingClient>, client_sender: Sender<IncomingClient>,
info_requester_sender: Sender<Sender<ServerInfoPacket>>, info_requester_sender: Sender<Sender<ServerInfoPacket>>,
stop_receiver: oneshot::Receiver<()>, stop_receiver: oneshot::Receiver<()>,

View File

@ -98,15 +98,15 @@ impl LoginProvider {
PendingLogin { pending_r } PendingLogin { pending_r }
} }
pub fn login( pub(crate) fn login<R>(
&mut self,
pending: &mut PendingLogin, pending: &mut PendingLogin,
#[cfg(feature = "plugins")] world: &EcsWorld, #[cfg(feature = "plugins")] world: &EcsWorld,
#[cfg(feature = "plugins")] plugin_manager: &PluginMgr, #[cfg(feature = "plugins")] plugin_manager: &PluginMgr,
admins: &HashMap<Uuid, AdminRecord>, admins: &HashMap<Uuid, AdminRecord>,
whitelist: &HashMap<Uuid, WhitelistRecord>, whitelist: &HashMap<Uuid, WhitelistRecord>,
banlist: &HashMap<Uuid, BanEntry>, banlist: &HashMap<Uuid, BanEntry>,
) -> Option<Result<(String, Uuid), RegisterError>> { player_count_exceeded: impl FnOnce(String, Uuid) -> (bool, R),
) -> Option<Result<R, RegisterError>> {
match pending.pending_r.try_recv() { match pending.pending_r.try_recv() {
Ok(Err(e)) => Some(Err(e)), Ok(Err(e)) => Some(Err(e)),
Ok(Ok((username, uuid))) => { Ok(Ok((username, uuid))) => {
@ -142,6 +142,9 @@ impl LoginProvider {
{ {
// Plugin player join hooks execute for all players, but are only allowed to // Plugin player join hooks execute for all players, but are only allowed to
// filter non-admins. // filter non-admins.
//
// We also run it before checking player count, to avoid lock contention in the
// plugin.
match plugin_manager.execute_event(world, &PlayerJoinEvent { match plugin_manager.execute_event(world, &PlayerJoinEvent {
player_name: username.clone(), player_name: username.clone(),
player_id: *uuid.as_bytes(), player_id: *uuid.as_bytes(),
@ -161,8 +164,13 @@ impl LoginProvider {
}; };
} }
info!(?username, "New User"); // non-admins can only join if the player count has not been exceeded.
Some(Ok((username, uuid))) let (player_count_exceeded, res) = player_count_exceeded(username, uuid);
if admin.is_none() && player_count_exceeded {
return Some(Err(RegisterError::TooManyPlayers));
}
Some(Ok(res))
}, },
Err(oneshot::error::TryRecvError::Closed) => { Err(oneshot::error::TryRecvError::Closed) => {
error!("channel got closed to early, this shouldn't happen"); error!("channel got closed to early, this shouldn't happen");

View File

@ -13,7 +13,7 @@ use common::{
}; };
use common_ecs::{Job, Origin, Phase, System}; use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{ClientGeneral, ServerGeneral}; use common_net::msg::{ClientGeneral, ServerGeneral};
use specs::{Entities, Join, Read, ReadExpect, ReadStorage, WriteExpect}; use specs::{Entities, Join, Read, ReadExpect, ReadStorage, WriteExpect, WriteStorage};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use tracing::{debug, warn}; use tracing::{debug, warn};
@ -197,7 +197,7 @@ impl<'a> System<'a> for Sys {
ReadExpect<'a, CharacterLoader>, ReadExpect<'a, CharacterLoader>,
WriteExpect<'a, CharacterUpdater>, WriteExpect<'a, CharacterUpdater>,
ReadStorage<'a, Uid>, ReadStorage<'a, Uid>,
ReadStorage<'a, Client>, WriteStorage<'a, Client>,
ReadStorage<'a, Player>, ReadStorage<'a, Player>,
ReadStorage<'a, Presence>, ReadStorage<'a, Presence>,
ReadExpect<'a, EditableSettings>, ReadExpect<'a, EditableSettings>,
@ -216,7 +216,7 @@ impl<'a> System<'a> for Sys {
character_loader, character_loader,
mut character_updater, mut character_updater,
uids, uids,
clients, mut clients,
players, players,
presences, presences,
editable_settings, editable_settings,
@ -225,7 +225,7 @@ impl<'a> System<'a> for Sys {
) { ) {
let mut server_emitter = server_event_bus.emitter(); let mut server_emitter = server_event_bus.emitter();
for (entity, client) in (&entities, &clients).join() { for (entity, client) in (&entities, &mut clients).join() {
let _ = super::try_recv_all(client, 1, |client, msg| { let _ = super::try_recv_all(client, 1, |client, msg| {
Self::handle_client_character_screen_msg( Self::handle_client_character_screen_msg(
&mut server_emitter, &mut server_emitter,

View File

@ -9,7 +9,8 @@ use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::{ use common_net::msg::{
validate_chat_msg, ChatMsgValidationError, ClientGeneral, MAX_BYTES_CHAT_MSG, validate_chat_msg, ChatMsgValidationError, ClientGeneral, MAX_BYTES_CHAT_MSG,
}; };
use specs::{Entities, Join, Read, ReadStorage}; use rayon::prelude::*;
use specs::{Entities, Join, ParJoin, Read, ReadStorage, WriteStorage};
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
impl Sys { impl Sys {
@ -80,7 +81,7 @@ impl<'a> System<'a> for Sys {
ReadStorage<'a, Uid>, ReadStorage<'a, Uid>,
ReadStorage<'a, ChatMode>, ReadStorage<'a, ChatMode>,
ReadStorage<'a, Player>, ReadStorage<'a, Player>,
ReadStorage<'a, Client>, WriteStorage<'a, Client>,
); );
const NAME: &'static str = "msg::general"; const NAME: &'static str = "msg::general";
@ -89,27 +90,30 @@ impl<'a> System<'a> for Sys {
fn run( fn run(
_job: &mut Job<Self>, _job: &mut Job<Self>,
(entities, server_event_bus, time, uids, chat_modes, players, clients): Self::SystemData, (entities, server_event_bus, time, uids, chat_modes, players, mut clients): Self::SystemData,
) { ) {
let mut server_emitter = server_event_bus.emitter(); (&entities, &mut clients, players.maybe())
.par_join()
.for_each_init(
|| server_event_bus.emitter(),
|server_emitter, (entity, client, player)| {
let res = super::try_recv_all(client, 3, |client, msg| {
Self::handle_general_msg(
server_emitter,
entity,
client,
player,
&uids,
&chat_modes,
msg,
)
});
for (entity, client, player) in (&entities, &clients, (&players).maybe()).join() { if let Ok(1_u64..=u64::MAX) = res {
let res = super::try_recv_all(client, 3, |client, msg| { // Update client ping.
Self::handle_general_msg( client.last_ping = time.0
&mut server_emitter, }
entity, },
client, );
player,
&uids,
&chat_modes,
msg,
)
});
if let Ok(1_u64..=u64::MAX) = res {
// Update client ping.
*client.last_ping.lock().unwrap() = time.0
}
}
} }
} }

View File

@ -35,7 +35,7 @@ pub fn add_server_systems(dispatch_builder: &mut DispatcherBuilder) {
/// handles all send msg and calls a handle fn /// handles all send msg and calls a handle fn
/// Aborts when a error occurred returns cnt of successful msg otherwise /// Aborts when a error occurred returns cnt of successful msg otherwise
pub(crate) fn try_recv_all<M, F>( pub(crate) fn try_recv_all<M, F>(
client: &Client, client: &mut Client,
stream_id: u8, stream_id: u8,
mut f: F, mut f: F,
) -> Result<u64, crate::error::Error> ) -> Result<u64, crate::error::Error>

View File

@ -5,7 +5,8 @@ use common::{
}; };
use common_ecs::{Job, Origin, Phase, System}; use common_ecs::{Job, Origin, Phase, System};
use common_net::msg::PingMsg; use common_net::msg::PingMsg;
use specs::{Entities, Join, Read, ReadStorage}; use rayon::prelude::*;
use specs::{Entities, ParJoin, Read, WriteStorage};
use tracing::{debug, info}; use tracing::{debug, info};
impl Sys { impl Sys {
@ -26,7 +27,7 @@ impl<'a> System<'a> for Sys {
Entities<'a>, Entities<'a>,
Read<'a, EventBus<ServerEvent>>, Read<'a, EventBus<ServerEvent>>,
Read<'a, Time>, Read<'a, Time>,
ReadStorage<'a, Client>, WriteStorage<'a, Client>,
Read<'a, Settings>, Read<'a, Settings>,
); );
@ -36,45 +37,49 @@ impl<'a> System<'a> for Sys {
fn run( fn run(
_job: &mut Job<Self>, _job: &mut Job<Self>,
(entities, server_event_bus, time, clients, settings): Self::SystemData, (entities, server_event_bus, time, mut clients, settings): Self::SystemData,
) { ) {
let mut server_emitter = server_event_bus.emitter(); (&entities, &mut clients).par_join().for_each_init(
|| server_event_bus.emitter(),
|server_emitter, (entity, client)| {
// ignore network events
while let Some(Ok(Some(_))) =
client.participant.as_mut().map(|p| p.try_fetch_event())
{}
for (entity, client) in (&entities, &clients).join() { let res = super::try_recv_all(client, 4, Self::handle_ping_msg);
// ignore network events
while let Some(Ok(Some(_))) = client.participant.as_ref().map(|p| p.try_fetch_event()) {
}
let res = super::try_recv_all(client, 4, Self::handle_ping_msg); match res {
Err(e) => {
match res { debug!(?entity, ?e, "network error with client, disconnecting");
Err(e) => {
debug!(?entity, ?e, "network error with client, disconnecting");
server_emitter.emit(ServerEvent::ClientDisconnect(
entity,
common::comp::DisconnectReason::NetworkError,
));
},
Ok(1_u64..=u64::MAX) => {
// Update client ping.
*client.last_ping.lock().unwrap() = time.0
},
Ok(0) => {
let last_ping: f64 = *client.last_ping.lock().unwrap();
if time.0 - last_ping > settings.client_timeout.as_secs() as f64
// Timeout
{
info!(?entity, "timeout error with client, disconnecting");
server_emitter.emit(ServerEvent::ClientDisconnect( server_emitter.emit(ServerEvent::ClientDisconnect(
entity, entity,
common::comp::DisconnectReason::Timeout, common::comp::DisconnectReason::NetworkError,
)); ));
} else if time.0 - last_ping > settings.client_timeout.as_secs() as f64 * 0.5 { },
// Try pinging the client if the timeout is nearing. Ok(1_u64..=u64::MAX) => {
client.send_fallible(PingMsg::Ping); // Update client ping.
} client.last_ping = time.0
}, },
} Ok(0) => {
} let last_ping: f64 = client.last_ping;
if time.0 - last_ping > settings.client_timeout.as_secs() as f64
// Timeout
{
info!(?entity, "timeout error with client, disconnecting");
server_emitter.emit(ServerEvent::ClientDisconnect(
entity,
common::comp::DisconnectReason::Timeout,
));
} else if time.0 - last_ping
> settings.client_timeout.as_secs() as f64 * 0.5
{
// Try pinging the client if the timeout is nearing.
client.send_fallible(PingMsg::Ping);
}
},
}
},
);
} }
} }

View File

@ -14,13 +14,14 @@ use common_net::msg::{
CharacterInfo, ClientRegister, DisconnectReason, PlayerInfo, PlayerListUpdate, RegisterError, CharacterInfo, ClientRegister, DisconnectReason, PlayerInfo, PlayerListUpdate, RegisterError,
ServerGeneral, ServerGeneral,
}; };
use hashbrown::HashMap; use hashbrown::{hash_map, HashMap};
use plugin_api::Health; use plugin_api::Health;
use rayon::prelude::*;
use specs::{ use specs::{
shred::ResourceId, storage::StorageEntry, Entities, Join, Read, ReadExpect, ReadStorage, shred::ResourceId, Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, SystemData, World,
SystemData, World, WriteExpect, WriteStorage, WriteStorage,
}; };
use tracing::trace; use tracing::{debug, info, trace, warn};
#[cfg(feature = "plugins")] #[cfg(feature = "plugins")]
use {common_state::plugin::memory_manager::EcsWorld, common_state::plugin::PluginMgr}; use {common_state::plugin::memory_manager::EcsWorld, common_state::plugin::PluginMgr};
@ -35,8 +36,8 @@ pub struct ReadData<'a> {
entities: Entities<'a>, entities: Entities<'a>,
stats: ReadStorage<'a, Stats>, stats: ReadStorage<'a, Stats>,
uids: ReadStorage<'a, Uid>, uids: ReadStorage<'a, Uid>,
clients: ReadStorage<'a, Client>,
server_event_bus: Read<'a, EventBus<ServerEvent>>, server_event_bus: Read<'a, EventBus<ServerEvent>>,
login_provider: ReadExpect<'a, LoginProvider>,
player_metrics: ReadExpect<'a, PlayerMetrics>, player_metrics: ReadExpect<'a, PlayerMetrics>,
settings: ReadExpect<'a, Settings>, settings: ReadExpect<'a, Settings>,
editable_settings: ReadExpect<'a, EditableSettings>, editable_settings: ReadExpect<'a, EditableSettings>,
@ -51,10 +52,10 @@ pub struct Sys;
impl<'a> System<'a> for Sys { impl<'a> System<'a> for Sys {
type SystemData = ( type SystemData = (
ReadData<'a>, ReadData<'a>,
WriteStorage<'a, Client>,
WriteStorage<'a, Player>, WriteStorage<'a, Player>,
WriteStorage<'a, Admin>, WriteStorage<'a, Admin>,
WriteStorage<'a, PendingLogin>, WriteStorage<'a, PendingLogin>,
WriteExpect<'a, LoginProvider>,
); );
const NAME: &'static str = "msg::register"; const NAME: &'static str = "msg::register";
@ -63,189 +64,345 @@ impl<'a> System<'a> for Sys {
fn run( fn run(
_job: &mut Job<Self>, _job: &mut Job<Self>,
( (read_data, mut clients, mut players, mut admins, mut pending_logins): Self::SystemData,
read_data,
mut players,
mut admins,
mut pending_logins,
mut login_provider,
): Self::SystemData,
) { ) {
let mut server_emitter = read_data.server_event_bus.emitter(); // Player list to send new players, and lookup from UUID to entity (so we don't
// Player list to send new players. // have to do a linear scan over all entities on each login to see if
let player_list = ( // it's a duplicate).
//
// NOTE: For this to work as desired, we must maintain the invariant that there
// is just one player per UUID!
let (player_list, old_players_by_uuid): (HashMap<_, _>, HashMap<_, _>) = (
&read_data.entities,
&read_data.uids, &read_data.uids,
&players, &players,
read_data.stats.maybe(), read_data.stats.maybe(),
admins.maybe(), admins.maybe(),
) )
.join() .join()
.map(|(uid, player, stats, admin)| { .map(|(entity, uid, player, stats, admin)| {
(*uid, PlayerInfo { (
is_online: true, (*uid, PlayerInfo {
is_moderator: admin.is_some(), is_online: true,
player_alias: player.alias.clone(), is_moderator: admin.is_some(),
character: stats.map(|stats| CharacterInfo { player_alias: player.alias.clone(),
name: stats.name.clone(), character: stats.map(|stats| CharacterInfo {
name: stats.name.clone(),
}),
uuid: player.uuid(),
}), }),
uuid: player.uuid(), (player.uuid(), entity),
}) )
}) })
.collect::<HashMap<_, _>>(); .unzip();
let max_players = read_data.settings.max_players;
let capacity = read_data.settings.max_players * 2;
// List of new players to update player lists of all clients. // List of new players to update player lists of all clients.
let mut new_players = Vec::new(); //
// Big enough that we hopefully won't have to reallocate.
//
// Also includes a list of logins to retry, since we happen to update those
// around the same time that we update the new players list.
//
// NOTE: stdlib mutex is more than good enough on Linux and (probably) Windows,
// but not Mac.
let new_players = parking_lot::Mutex::new((
HashMap::<_, (_, _, _, _)>::with_capacity(capacity),
Vec::with_capacity(capacity),
));
// defer auth lockup // defer auth lockup
for (entity, client) in (&read_data.entities, &read_data.clients).join() { for (entity, client) in (&read_data.entities, &mut clients).join() {
let _ = super::try_recv_all(client, 0, |_, msg: ClientRegister| { let _ = super::try_recv_all(client, 0, |_, msg: ClientRegister| {
trace!(?msg.token_or_username, "defer auth lockup"); trace!(?msg.token_or_username, "defer auth lockup");
let pending = login_provider.verify(&msg.token_or_username); let pending = read_data.login_provider.verify(&msg.token_or_username);
let _ = pending_logins.insert(entity, pending); let _ = pending_logins.insert(entity, pending);
Ok(()) Ok(())
}); });
} }
let mut finished_pending = vec![]; let old_player_count = player_list.len();
let mut retries = vec![]; #[cfg(feature = "plugins")]
for (entity, client, pending) in let ecs_world = EcsWorld {
(&read_data.entities, &read_data.clients, &mut pending_logins).join() entities: &read_data.entities,
{ health: (&read_data._healths).into(),
if let Err(e) = || -> Result<(), crate::error::Error> { uid: (&read_data.uids).into(),
#[cfg(feature = "plugins")] // NOTE: Only the old player list is provided, to avoid scalability
let ecs_world = EcsWorld { // bottlenecks.
entities: &read_data.entities, player: (&players).into(),
health: (&read_data._healths).into(), uid_allocator: &read_data._uid_allocator,
uid: (&read_data.uids).into(), };
player: (&players).into(),
uid_allocator: &read_data._uid_allocator,
};
let (username, uuid) = match login_provider.login( /// Trivially cloneable wrapper over any type, to make rayon happy.
pending, struct OptionClone<T>(Option<T>);
#[cfg(feature = "plugins")]
&ecs_world, impl<T> Clone for OptionClone<T> {
#[cfg(feature = "plugins")] fn clone(&self) -> Self { Self(None) }
&read_data._plugin_mgr, }
&*read_data.editable_settings.admins,
&*read_data.editable_settings.whitelist, // NOTE: this is just default value.
&*read_data.editable_settings.banlist, //
) { // It will be overwritten in ServerExt::update_character_data.
None => return Ok(()), let battle_mode = read_data.settings.gameplay.battle_mode.default_mode();
Some(r) => {
finished_pending.push(entity); let finished_pending = (
trace!(?r, "pending login returned"); &read_data.entities,
match r { &read_data.uids,
Err(e) => { &clients,
!players.mask(),
&mut pending_logins,
)
.par_join()
.fold_with(
// (Finished pending entity list, emitter)
(vec![], OptionClone(None)),
|(mut finished_pending, mut server_emitter_), (entity, uid, client, _, pending)| {
let server_emitter = server_emitter_
.0
.get_or_insert_with(|| read_data.server_event_bus.emitter());
if let Err(e) = || -> Result<(), crate::error::Error> {
// Destructure new_players_guard last so it gets dropped before the other
// three.
let (
(pending_login, player, admin, msg, old_player),
mut new_players_guard,
) = match LoginProvider::login(
pending,
#[cfg(feature = "plugins")]
&ecs_world,
#[cfg(feature = "plugins")]
&read_data._plugin_mgr,
&*read_data.editable_settings.admins,
&*read_data.editable_settings.whitelist,
&*read_data.editable_settings.banlist,
|username, uuid| {
// We construct a few things outside the lock to reduce contention.
let pending_login =
PendingLogin::new_success(username.clone(), uuid);
let player = Player::new(username, battle_mode, uuid, None);
let admin = read_data.editable_settings.admins.get(&uuid);
let msg = player
.is_valid()
.then_some(PlayerInfo {
player_alias: player.alias.clone(),
is_online: true,
is_moderator: admin.is_some(),
character: None, // new players will be on character select.
uuid: player.uuid(),
})
.map(|player_info| {
// Prepare the player list update to be sent to all clients.
client.prepare(ServerGeneral::PlayerListUpdate(
PlayerListUpdate::Add(*uid, player_info),
))
});
// Check if this player was already logged in before the system
// started.
let old_player = old_players_by_uuid
.get(&uuid)
.copied()
// We perform the get outside the lock here, since we can't 100%
// know that this player has a client entry.
.map(|entity| (entity, Some(clients.get(entity))));
// We take the lock only when necessary, and for a short duration,
// to avoid contention with other
// threads. We need to hold the guard past the end of
// the login function because otherwise there's a race between when
// we read it and when we
// (potentially) write to it.
let guard = new_players.lock();
// Guard comes first in the tuple so it's dropped before the other
// stuff if login returns an error.
(
old_player_count + guard.0.len() >= max_players,
(guard, (pending_login, player, admin, msg, old_player)),
)
},
) {
None => return Ok(()),
Some(r) => {
finished_pending.push(entity);
match r {
Err(e) => {
// NOTE: Done only on error to avoid doing extra work within
// the lock.
trace!(?e, "pending login returned error");
server_emitter.emit(ServerEvent::ClientDisconnect(
entity,
common::comp::DisconnectReason::Kicked,
));
client.send(Err(e))?;
return Ok(());
},
// Swap the order of the tuple, so when it's destructured guard
// is dropped first.
Ok((guard, res)) => (res, guard),
}
},
};
let (new_players_by_uuid, retries) = &mut *new_players_guard;
// Check if the user logged in before us during this tick (this is why we
// need the lock held).
let uuid = player.uuid();
let old_player = old_player.map_or_else(
move || match new_players_by_uuid.entry(uuid) {
// We don't actually extract the client yet, to avoid doing extra
// work with the lock held.
hash_map::Entry::Occupied(o) => Ok((o.get().0, None)),
hash_map::Entry::Vacant(v) => Err(v),
},
Ok,
);
let vacant_player = match old_player {
Ok((old_entity, old_client)) => {
if matches!(old_client, None | Some(Some(_))) {
// We can't login the new client right now as the
// removal of the old client and player occurs later in
// the tick, so we instead setup the new login to be
// processed in the next tick
// Create "fake" successful pending auth and mark it to
// be inserted into pending_logins at the end of this
// run.
retries.push((entity, pending_login));
drop(new_players_guard);
let old_client = old_client
.flatten()
.or_else(|| clients.get(old_entity))
.expect(
"All entries in the new player list were explicitly \
joining on client",
);
let _ = old_client.send(ServerGeneral::Disconnect(
DisconnectReason::Kicked(String::from(
"You have logged in from another location.",
)),
));
} else {
drop(new_players_guard);
// A player without a client is strange, so we don't really want
// to retry. Warn about
// this case and hope that trying to perform the
// disconnect process removes the invalid player entry.
warn!(
"Player without client detected for entity {:?}",
old_entity
);
}
// Remove old client
server_emitter.emit(ServerEvent::ClientDisconnect( server_emitter.emit(ServerEvent::ClientDisconnect(
entity, old_entity,
common::comp::DisconnectReason::Kicked, common::comp::DisconnectReason::NewerLogin,
)); ));
client.send(Err(e))?;
return Ok(()); return Ok(());
}, },
Ok((username, uuid)) => (username, uuid), Err(v) => v,
} };
},
};
// Check if user is already logged-in let Some(msg) = msg else {
if let Some((old_entity, old_client, _)) = drop(new_players_guard);
(&read_data.entities, &read_data.clients, &players) // Invalid player
.join() client.send(Err(RegisterError::InvalidCharacter))?;
.find(|(_, _, old_player)| old_player.uuid() == uuid) return Ok(());
{ };
// Remove old client
server_emitter.emit(ServerEvent::ClientDisconnect(
old_entity,
common::comp::DisconnectReason::NewerLogin,
));
let _ = old_client.send(ServerGeneral::Disconnect(DisconnectReason::Kicked(
String::from("You have logged in from another location."),
)));
// We can't login the new client right now as the
// removal of the old client and player occurs later in
// the tick, so we instead setup the new login to be
// processed in the next tick
// Create "fake" successful pending auth and mark it to
// be inserted into pending_logins at the end of this
// run
retries.push((entity, PendingLogin::new_success(username, uuid)));
return Ok(());
}
// NOTE: this is just default value. // We know the player list didn't already contain this entity because we
// // joined on !players, so we can assume from here
// It will be overwritten in ServerExt::update_character_data. // that we'll definitely be adding a new player.
let battle_mode = read_data.settings.gameplay.battle_mode.default_mode();
let player = Player::new(username, battle_mode, uuid, None);
let admin = read_data.editable_settings.admins.get(&uuid); // Add to list to notify all clients of the new player
vacant_player.insert((entity, player, admin, msg));
drop(new_players_guard);
read_data.player_metrics.players_connected.inc();
if !player.is_valid() { // Tell the client its request was successful.
// Invalid player client.send(Ok(()))?;
client.send(Err(RegisterError::InvalidCharacter))?;
return Ok(());
}
if let Ok(StorageEntry::Vacant(v)) = players.entry(entity) { // Send client all the tracked components currently attached to its entity
// Add Player component to this client, if the entity exists. // as well as synced resources (currently only
v.insert(player); // `TimeOfDay`)
read_data.player_metrics.players_connected.inc(); debug!("Starting initial sync with client.");
client.send(ServerInit::GameSync {
// Send client their entity
entity_package: read_data
.trackers
.create_entity_package_with_uid(entity, *uid, None, None, None),
time_of_day: *read_data.time_of_day,
max_group_size: read_data.settings.max_player_group_size,
client_timeout: read_data.settings.client_timeout,
world_map: (&*read_data.map).clone(),
recipe_book: default_recipe_book().cloned(),
component_recipe_book: default_component_recipe_book().cloned(),
material_stats: (&*read_data.material_stats).clone(),
ability_map: (&*read_data.ability_map).clone(),
})?;
debug!("Done initial sync with client.");
// Give the Admin component to the player if their name exists in // Send initial player list
// admin list client.send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init(
if let Some(admin) = admin { player_list.clone(),
admins )))?;
.insert(entity, Admin(admin.role.into()))
.expect("Inserting into players proves the entity exists."); Ok(())
}() {
trace!(?e, "failed to process register");
} }
(finished_pending, server_emitter_)
// Tell the client its request was successful. },
client.send(Ok(()))?; )
.map(|(finished_pending, _server_emitter)| finished_pending)
// Send initial player list .collect::<Vec<_>>();
client.send(ServerGeneral::PlayerListUpdate(PlayerListUpdate::Init( finished_pending.into_iter().flatten().for_each(|e| {
player_list.clone(), // Remove all entities in finished_pending from pending_logins.
)))?;
// Add to list to notify all clients of the new player
new_players.push(entity);
}
Ok(())
}() {
trace!(?e, "failed to process register")
};
}
for e in finished_pending {
pending_logins.remove(e); pending_logins.remove(e);
} });
let (new_players, retries) = new_players.into_inner();
// Insert retry attempts back into pending_logins to be processed next tick // Insert retry attempts back into pending_logins to be processed next tick
for (entity, pending) in retries { for (entity, pending) in retries {
let _ = pending_logins.insert(entity, pending); let _ = pending_logins.insert(entity, pending);
} }
// Handle new players. // Handle new players.
// Tell all clients to add them to the player list. let msgs = new_players
let player_info = |entity| { .into_values()
let player_info = read_data.uids.get(entity).zip(players.get(entity)); .map(|(entity, player, admin, msg)| {
player_info.map(|(u, p)| (entity, u, p)) let username = &player.alias;
}; info!(?username, "New User");
for (entity, uid, player) in new_players.into_iter().filter_map(player_info) { // Add Player component to this client.
let mut lazy_msg = None; //
for (_, client) in (&players, &read_data.clients).join() { // Note that since players has been write locked for the duration of this
if lazy_msg.is_none() { // system, we know that nobody else added any players since we
lazy_msg = Some(client.prepare(ServerGeneral::PlayerListUpdate( // last checked its value, and we checked that everything in
PlayerListUpdate::Add(*uid, PlayerInfo { // new_players was not already in players, so we know the insert
player_alias: player.alias.clone(), // succeeds and the old entry was vacant. Moreover, we know that all new
is_online: true, // players we added have different UUIDs both from each other, and from any old
is_moderator: admins.get(entity).is_some(), // players, preserving the uniqueness invariant.
character: None, // new players will be on character select. players
uuid: player.uuid(), .insert(entity, player)
}), .expect("The entity was joined against in the same system, so it exists");
)));
// Give the Admin component to the player if their name exists in
// admin list
if let Some(admin) = admin {
admins
.insert(entity, Admin(admin.role.into()))
.expect("Inserting into players proves the entity exists.");
} }
lazy_msg.as_ref().map(|msg| client.send_prepared(msg)); msg
} })
} .collect::<Vec<_>>();
// Tell all clients to add the new players to the player list, in parallel.
(players.mask(), &clients)
.par_join()
.for_each(|(_, client)| {
// Send messages sequentially within each client; by the time we have enough
// players to make parallelizing useful, we will have way more
// players than cores.
msgs.iter().for_each(|msg| {
let _ = client.send_prepared(msg);
});
});
} }
} }

View File

@ -12,7 +12,7 @@ use common::{
use common_ecs::{Job, Origin, ParMode, Phase, System}; use common_ecs::{Job, Origin, ParMode, Phase, System};
use common_net::msg::{ClientGeneral, ServerGeneral}; use common_net::msg::{ClientGeneral, ServerGeneral};
use rayon::iter::ParallelIterator; use rayon::iter::ParallelIterator;
use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write}; use specs::{Entities, Join, ParJoin, Read, ReadExpect, ReadStorage, Write, WriteStorage};
use tracing::{debug, trace}; use tracing::{debug, trace};
/// This system will handle new messages from clients /// This system will handle new messages from clients
@ -29,7 +29,7 @@ impl<'a> System<'a> for Sys {
Write<'a, Vec<ChunkRequest>>, Write<'a, Vec<ChunkRequest>>,
ReadStorage<'a, Pos>, ReadStorage<'a, Pos>,
ReadStorage<'a, Presence>, ReadStorage<'a, Presence>,
ReadStorage<'a, Client>, WriteStorage<'a, Client>,
); );
const NAME: &'static str = "msg::terrain"; const NAME: &'static str = "msg::terrain";
@ -48,17 +48,17 @@ impl<'a> System<'a> for Sys {
mut chunk_requests, mut chunk_requests,
positions, positions,
presences, presences,
clients, mut clients,
): Self::SystemData, ): Self::SystemData,
) { ) {
job.cpu_stats.measure(ParMode::Rayon); job.cpu_stats.measure(ParMode::Rayon);
let mut new_chunk_requests = (&entities, &clients, (&presences).maybe()) let mut new_chunk_requests = (&entities, &mut clients, (&presences).maybe())
.par_join() .par_join()
.map_init( .map_init(
|| (chunk_send_bus.emitter(), server_event_bus.emitter()), || (chunk_send_bus.emitter(), server_event_bus.emitter()),
|(chunk_send_emitter, server_emitter), (entity, client, maybe_presence)| { |(chunk_send_emitter, server_emitter), (entity, client, maybe_presence)| {
let mut chunk_requests = Vec::new(); let mut chunk_requests = Vec::new();
let _ = super::try_recv_all(client, 5, |_, msg| { let _ = super::try_recv_all(client, 5, |client, msg| {
let presence = match maybe_presence { let presence = match maybe_presence {
Some(g) => g, Some(g) => g,
None => { None => {

View File

@ -64,7 +64,23 @@ macro_rules! trackers {
vel: Option<Vel>, vel: Option<Vel>,
ori: Option<Ori>, ori: Option<Ori>,
) -> Option<EntityPackage<EcsCompPacket>> { ) -> Option<EntityPackage<EcsCompPacket>> {
let uid = self.uid.get(entity).copied()?.0; let uid = self.uid.get(entity).copied()?;
Some(self.create_entity_package_with_uid(entity, uid, pos, vel, ori))
}
/// See [create_entity_package].
///
/// NOTE: Only if you're certain you know the UID for the entity, and it hasn't
/// changed!
pub fn create_entity_package_with_uid(
&self,
entity: EcsEntity,
uid: Uid,
pos: Option<Pos>,
vel: Option<Vel>,
ori: Option<Ori>,
) -> EntityPackage<EcsCompPacket> {
let uid = uid.0;
let mut comps = Vec::new(); let mut comps = Vec::new();
// NOTE: we could potentially include a bitmap indicating which components are present instead of tagging // NOTE: we could potentially include a bitmap indicating which components are present instead of tagging
// components with the type in order to save bandwidth // components with the type in order to save bandwidth
@ -94,7 +110,7 @@ macro_rules! trackers {
vel.map(|c| comps.push(c.into())); vel.map(|c| comps.push(c.into()));
ori.map(|c| comps.push(c.into())); ori.map(|c| comps.push(c.into()));
Some(EntityPackage { uid, comps }) EntityPackage { uid, comps }
} }
/// Create sync package for switching a client to another entity specifically to /// Create sync package for switching a client to another entity specifically to