implement lazy_msg which only serialize + compress AT MAX ONCE if the same msg is send to multiple participants

This commit is contained in:
Marcel Märtens 2020-10-19 01:53:19 +02:00
parent 2290efd219
commit 9ba19a1cd9
5 changed files with 87 additions and 26 deletions

View File

@ -390,6 +390,7 @@ impl StateExt for State {
/// Sends the message to all connected clients
fn notify_registered_clients(&self, msg: ServerGeneral) {
let mut lazy_msg = None;
for (general_stream, _) in (
&mut self.ecs().write_storage::<GeneralStream>(),
&self.ecs().read_storage::<Client>(),
@ -397,12 +398,18 @@ impl StateExt for State {
.join()
.filter(|(_, c)| c.registered)
{
general_stream.send_unchecked(msg.clone());
if lazy_msg.is_none() {
lazy_msg = Some(general_stream.prepare(&msg));
}
lazy_msg
.as_ref()
.map(|ref msg| general_stream.0.send_raw(&msg));
}
}
/// Sends the message to all clients playing in game
fn notify_in_game_clients(&self, msg: ServerGeneral) {
let mut lazy_msg = None;
for (general_stream, _) in (
&mut self.ecs().write_storage::<GeneralStream>(),
&self.ecs().read_storage::<Client>(),
@ -410,7 +417,12 @@ impl StateExt for State {
.join()
.filter(|(_, c)| c.in_game.is_some())
{
general_stream.send_unchecked(msg.clone());
if lazy_msg.is_none() {
lazy_msg = Some(general_stream.prepare(&msg));
}
lazy_msg
.as_ref()
.map(|ref msg| general_stream.0.send_raw(&msg));
}
}

View File

@ -1,6 +1,6 @@
use common::msg::{ClientGeneral, ClientRegister, PingMsg, ServerGeneral, ServerRegisterAnswer};
use network::{Stream, StreamError};
use network::{Message, Stream, StreamError};
use serde::{de::DeserializeOwned, Serialize};
use specs::Component;
@ -22,6 +22,14 @@ pub(crate) trait GetStream {
}
fn send_unchecked(&mut self, msg: Self::SendMsg) { let _ = self.send(msg); }
fn prepare(&mut self, msg: &Self::SendMsg) -> Message {
if Self::verify(&msg) {
Message::serialize(&msg, &self.get_mut())
} else {
unreachable!("sending this msg isn't allowed! got: {:?}", msg)
}
}
}
// Streams

View File

@ -197,13 +197,28 @@ impl<'a> System<'a> for Sys {
.take_deleted_in_region(key)
.unwrap_or_default(),
);
let entity_sync_msg = ServerGeneral::EntitySync(entity_sync_package);
let comp_sync_msg = ServerGeneral::CompSync(comp_sync_package);
let mut entity_sync_package = Some(entity_sync_package);
let mut comp_sync_package = Some(comp_sync_package);
let mut entity_sync_lazymsg = None;
let mut comp_sync_lazymsg = None;
subscribers
.iter_mut()
.for_each(move |(_, _, _, _, _, general_stream)| {
general_stream.send_unchecked(entity_sync_msg.clone());
general_stream.send_unchecked(comp_sync_msg.clone());
if entity_sync_lazymsg.is_none() {
entity_sync_lazymsg = Some(general_stream.prepare(
&ServerGeneral::EntitySync(entity_sync_package.take().unwrap()),
));
comp_sync_lazymsg =
Some(general_stream.prepare(&ServerGeneral::CompSync(
comp_sync_package.take().unwrap(),
)));
}
entity_sync_lazymsg
.as_ref()
.map(|msg| general_stream.0.send_raw(&msg));
comp_sync_lazymsg
.as_ref()
.map(|msg| general_stream.0.send_raw(&msg));
});
let mut send_general = |msg: ServerGeneral,
@ -381,9 +396,14 @@ impl<'a> System<'a> for Sys {
// Sync resources
// TODO: doesn't really belong in this system (rename system or create another
// system?)
let tof_msg = ServerGeneral::TimeOfDay(*time_of_day);
let mut tof_lazymsg = None;
for general_stream in (&mut general_streams).join() {
general_stream.send_unchecked(tof_msg.clone());
if tof_lazymsg.is_none() {
tof_lazymsg = Some(general_stream.prepare(&ServerGeneral::TimeOfDay(*time_of_day)));
}
tof_lazymsg
.as_ref()
.map(|msg| general_stream.0.send_raw(&msg));
}
timer.end();

View File

@ -181,18 +181,24 @@ impl<'a> System<'a> for Sys {
// Tell all clients to add them to the player list.
for entity in new_players {
if let (Some(uid), Some(player)) = (uids.get(entity), players.get(entity)) {
let msg =
ServerGeneral::PlayerListUpdate(PlayerListUpdate::Add(*uid, PlayerInfo {
player_alias: player.alias.clone(),
is_online: true,
is_admin: admins.get(entity).is_some(),
character: None, // new players will be on character select.
}));
let mut lazy_msg = None;
for (_, general_stream) in (&mut clients, &mut general_streams)
.join()
.filter(|(c, _)| c.registered)
{
let _ = general_stream.send(msg.clone());
if lazy_msg.is_none() {
lazy_msg = Some(general_stream.prepare(&ServerGeneral::PlayerListUpdate(
PlayerListUpdate::Add(*uid, PlayerInfo {
player_alias: player.alias.clone(),
is_online: true,
is_admin: admins.get(entity).is_some(),
character: None, // new players will be on character select.
}),
)));
}
lazy_msg
.as_ref()
.map(|ref msg| general_stream.0.send_raw(&msg));
}
}
}

View File

@ -32,6 +32,8 @@ impl<'a> System<'a> for Sys {
// Sync changed chunks
'chunk: for chunk_key in &terrain_changes.modified_chunks {
let mut lazy_msg = None;
for (player, pos, in_game_stream) in (&players, &positions, &mut in_game_streams).join()
{
if player
@ -39,23 +41,36 @@ impl<'a> System<'a> for Sys {
.map(|vd| super::terrain::chunk_in_vd(pos.0, *chunk_key, &terrain, vd))
.unwrap_or(false)
{
let _ = in_game_stream.send(ServerGeneral::TerrainChunkUpdate {
key: *chunk_key,
chunk: Ok(Box::new(match terrain.get_key(*chunk_key) {
Some(chunk) => chunk.clone(),
None => break 'chunk,
})),
});
if lazy_msg.is_none() {
lazy_msg =
Some(in_game_stream.prepare(&ServerGeneral::TerrainChunkUpdate {
key: *chunk_key,
chunk: Ok(Box::new(match terrain.get_key(*chunk_key) {
Some(chunk) => chunk.clone(),
None => break 'chunk,
})),
}));
}
lazy_msg
.as_ref()
.map(|ref msg| in_game_stream.0.send_raw(&msg));
}
}
}
// TODO: Don't send all changed blocks to all clients
// Sync changed blocks
let msg = ServerGeneral::TerrainBlockUpdates(terrain_changes.modified_blocks.clone());
let mut lazy_msg = None;
for (player, in_game_stream) in (&players, &mut in_game_streams).join() {
if lazy_msg.is_none() {
lazy_msg = Some(in_game_stream.prepare(&ServerGeneral::TerrainBlockUpdates(
terrain_changes.modified_blocks.clone(),
)));
}
if player.view_distance.is_some() {
in_game_stream.send_unchecked(msg.clone());
lazy_msg
.as_ref()
.map(|ref msg| in_game_stream.0.send_raw(&msg));
}
}