network scheduler and rawmsg cleanup

This commit is contained in:
Marcel Märtens
2021-03-25 12:22:31 +01:00
parent 843850a4b8
commit 01c82b70ab
9 changed files with 97 additions and 85 deletions

View File

@ -3,11 +3,11 @@ use std::{net::SocketAddr, sync::Arc};
use tokio::{runtime::Runtime, sync::Mutex}; use tokio::{runtime::Runtime, sync::Mutex};
use veloren_network::{Message, Network, Participant, Pid, Promises, ProtocolAddr, Stream}; use veloren_network::{Message, Network, Participant, Pid, Promises, ProtocolAddr, Stream};
fn serialize(data: &[u8], stream: &Stream) { let _ = Message::serialize(data, &stream); } fn serialize(data: &[u8], stream: &Stream) { let _ = Message::serialize(data, stream.params()); }
async fn stream_msg(s1_a: Arc<Mutex<Stream>>, s1_b: Arc<Mutex<Stream>>, data: &[u8], cnt: usize) { async fn stream_msg(s1_a: Arc<Mutex<Stream>>, s1_b: Arc<Mutex<Stream>>, data: &[u8], cnt: usize) {
let mut s1_b = s1_b.lock().await; let mut s1_b = s1_b.lock().await;
let m = Message::serialize(&data, &s1_b); let m = Message::serialize(&data, s1_b.params());
std::thread::spawn(move || { std::thread::spawn(move || {
let mut s1_a = s1_a.try_lock().unwrap(); let mut s1_a = s1_a.try_lock().unwrap();
for _ in 0..cnt { for _ in 0..cnt {

View File

@ -173,7 +173,7 @@ fn client(address: ProtocolAddr, runtime: Arc<Runtime>) {
id, id,
data: vec![0; 1000], data: vec![0; 1000],
}, },
&s1, s1.params(),
); );
loop { loop {
s1.send_raw(&raw_msg).unwrap(); s1.send_raw(&raw_msg).unwrap();

View File

@ -119,6 +119,12 @@ pub enum StreamError {
Deserialize(bincode::Error), Deserialize(bincode::Error),
} }
/// All Parameters of a Stream, can be used to generate RawMessages
#[derive(Debug, Clone)]
pub struct StreamParams {
pub(crate) promises: Promises,
}
/// Use the `Network` to create connections to other [`Participants`] /// Use the `Network` to create connections to other [`Participants`]
/// ///
/// The `Network` is the single source that handles all connections in your /// The `Network` is the single source that handles all connections in your
@ -803,7 +809,7 @@ impl Stream {
/// [`Serialized`]: Serialize /// [`Serialized`]: Serialize
#[inline] #[inline]
pub fn send<M: Serialize>(&mut self, msg: M) -> Result<(), StreamError> { pub fn send<M: Serialize>(&mut self, msg: M) -> Result<(), StreamError> {
self.send_raw(&Message::serialize(&msg, &self)) self.send_raw(&Message::serialize(&msg, self.params()))
} }
/// This methods give the option to skip multiple calls of [`bincode`] and /// This methods give the option to skip multiple calls of [`bincode`] and
@ -837,7 +843,7 @@ impl Stream {
/// let mut stream_b = participant_b.opened().await?; /// let mut stream_b = participant_b.opened().await?;
/// ///
/// //Prepare Message and decode it /// //Prepare Message and decode it
/// let msg = Message::serialize("Hello World", &stream_a); /// let msg = Message::serialize("Hello World", stream_a.params());
/// //Send same Message to multiple Streams /// //Send same Message to multiple Streams
/// stream_a.send_raw(&msg); /// stream_a.send_raw(&msg);
/// stream_b.send_raw(&msg); /// stream_b.send_raw(&msg);
@ -858,7 +864,7 @@ impl Stream {
return Err(StreamError::StreamClosed); return Err(StreamError::StreamClosed);
} }
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
message.verify(&self); message.verify(self.params());
self.a2b_msg_s.send((self.sid, message.data.clone()))?; self.a2b_msg_s.send((self.sid, message.data.clone()))?;
Ok(()) Ok(())
} }
@ -999,7 +1005,7 @@ impl Stream {
Message { Message {
data, data,
#[cfg(feature = "compression")] #[cfg(feature = "compression")]
compressed: self.promises().contains(Promises::COMPRESSED), compressed: self.promises.contains(Promises::COMPRESSED),
} }
.deserialize()?, .deserialize()?,
)), )),
@ -1013,7 +1019,11 @@ impl Stream {
} }
} }
pub fn promises(&self) -> Promises { self.promises } pub fn params(&self) -> StreamParams {
StreamParams {
promises: self.promises,
}
}
} }
impl core::cmp::PartialEq for Participant { impl core::cmp::PartialEq for Participant {

View File

@ -108,7 +108,7 @@ mod trace;
pub use api::{ pub use api::{
Network, NetworkConnectError, NetworkError, Participant, ParticipantError, ProtocolAddr, Network, NetworkConnectError, NetworkError, Participant, ParticipantError, ProtocolAddr,
Stream, StreamError, Stream, StreamError, StreamParams,
}; };
pub use message::Message; pub use message::Message;
pub use network_protocol::{InitProtocolError, Pid, Promises}; pub use network_protocol::{InitProtocolError, Pid, Promises};

View File

@ -1,4 +1,4 @@
use crate::api::{Stream, StreamError}; use crate::api::{StreamError, StreamParams};
use bytes::Bytes; use bytes::Bytes;
#[cfg(feature = "compression")] #[cfg(feature = "compression")]
use network_protocol::Promises; use network_protocol::Promises;
@ -36,12 +36,12 @@ impl Message {
/// [`Message::serialize`]: crate::message::Message::serialize /// [`Message::serialize`]: crate::message::Message::serialize
/// ///
/// [`Streams`]: crate::api::Stream /// [`Streams`]: crate::api::Stream
pub fn serialize<M: Serialize + ?Sized>(message: &M, stream: &Stream) -> Self { pub fn serialize<M: Serialize + ?Sized>(message: &M, stream_params: StreamParams) -> Self {
//this will never fail: https://docs.rs/bincode/0.8.0/bincode/fn.serialize.html //this will never fail: https://docs.rs/bincode/0.8.0/bincode/fn.serialize.html
let serialized_data = bincode::serialize(message).unwrap(); let serialized_data = bincode::serialize(message).unwrap();
#[cfg(feature = "compression")] #[cfg(feature = "compression")]
let compressed = stream.promises().contains(Promises::COMPRESSED); let compressed = stream_params.promises.contains(Promises::COMPRESSED);
#[cfg(feature = "compression")] #[cfg(feature = "compression")]
let data = if compressed { let data = if compressed {
let mut compressed_data = Vec::with_capacity(serialized_data.len() / 4 + 10); let mut compressed_data = Vec::with_capacity(serialized_data.len() / 4 + 10);
@ -54,7 +54,7 @@ impl Message {
#[cfg(not(feature = "compression"))] #[cfg(not(feature = "compression"))]
let data = serialized_data; let data = serialized_data;
#[cfg(not(feature = "compression"))] #[cfg(not(feature = "compression"))]
let _stream = stream; let _stream_params = stream_params;
Self { Self {
data: Bytes::from(data), data: Bytes::from(data),
@ -127,13 +127,13 @@ impl Message {
} }
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
pub(crate) fn verify(&self, stream: &Stream) { pub(crate) fn verify(&self, params: StreamParams) {
#[cfg(not(feature = "compression"))] #[cfg(not(feature = "compression"))]
let _stream = stream; let _stream = stream;
#[cfg(feature = "compression")] #[cfg(feature = "compression")]
if self.compressed != stream.promises().contains(Promises::COMPRESSED) { if self.compressed != params.promises.contains(Promises::COMPRESSED) {
warn!( warn!(
?stream, ?params,
"verify failed, msg is {} and it doesn't match with stream", self.compressed "verify failed, msg is {} and it doesn't match with stream", self.compressed
); );
} }
@ -171,14 +171,9 @@ pub(crate) fn partial_eq_bincode(first: &bincode::ErrorKind, second: &bincode::E
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::{api::Stream, message::*}; use crate::{api::StreamParams, message::*};
use std::sync::{atomic::AtomicBool, Arc};
use tokio::sync::mpsc;
fn stub_stream(compressed: bool) -> Stream {
use crate::api::*;
use network_protocol::*;
fn stub_stream(compressed: bool) -> StreamParams {
#[cfg(feature = "compression")] #[cfg(feature = "compression")]
let promises = if compressed { let promises = if compressed {
Promises::COMPRESSED Promises::COMPRESSED
@ -189,27 +184,12 @@ mod tests {
#[cfg(not(feature = "compression"))] #[cfg(not(feature = "compression"))]
let promises = Promises::empty(); let promises = Promises::empty();
let (a2b_msg_s, _a2b_msg_r) = crossbeam_channel::unbounded(); StreamParams { promises }
let (_b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded();
let (a2b_close_stream_s, _a2b_close_stream_r) = mpsc::unbounded_channel();
Stream::new(
Pid::fake(0),
Pid::fake(1),
Sid::new(0),
0u8,
promises,
1_000_000,
Arc::new(AtomicBool::new(true)),
a2b_msg_s,
b2a_msg_recv_r,
a2b_close_stream_s,
)
} }
#[test] #[test]
fn serialize_test() { fn serialize_test() {
let msg = Message::serialize("abc", &stub_stream(false)); let msg = Message::serialize("abc", stub_stream(false));
assert_eq!(msg.data.len(), 11); assert_eq!(msg.data.len(), 11);
assert_eq!(msg.data[0], 3); assert_eq!(msg.data[0], 3);
assert_eq!(msg.data[1..7], [0, 0, 0, 0, 0, 0]); assert_eq!(msg.data[1..7], [0, 0, 0, 0, 0, 0]);
@ -221,7 +201,7 @@ mod tests {
#[cfg(feature = "compression")] #[cfg(feature = "compression")]
#[test] #[test]
fn serialize_compress_small() { fn serialize_compress_small() {
let msg = Message::serialize("abc", &stub_stream(true)); let msg = Message::serialize("abc", stub_stream(true));
assert_eq!(msg.data.len(), 12); assert_eq!(msg.data.len(), 12);
assert_eq!(msg.data[0], 176); assert_eq!(msg.data[0], 176);
assert_eq!(msg.data[1], 3); assert_eq!(msg.data[1], 3);
@ -245,7 +225,7 @@ mod tests {
0, 0,
"assets/data/plants/flowers/greenrose.ron", "assets/data/plants/flowers/greenrose.ron",
); );
let msg = Message::serialize(&msg, &stub_stream(true)); let msg = Message::serialize(&msg, stub_stream(true));
assert_eq!(msg.data.len(), 79); assert_eq!(msg.data.len(), 79);
assert_eq!(msg.data[0], 34); assert_eq!(msg.data[0], 34);
assert_eq!(msg.data[1], 5); assert_eq!(msg.data[1], 5);
@ -275,7 +255,7 @@ mod tests {
_ => {}, _ => {},
} }
} }
let msg = Message::serialize(&msg, &stub_stream(true)); let msg = Message::serialize(&msg, stub_stream(true));
assert_eq!(msg.data.len(), 1331); assert_eq!(msg.data.len(), 1331);
} }
} }

View File

@ -1,3 +1,4 @@
use crate::api::ProtocolAddr;
use network_protocol::{Cid, Pid}; use network_protocol::{Cid, Pid};
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
@ -151,6 +152,27 @@ impl NetworkMetrics {
.with_label_values(&[remote_p]) .with_label_values(&[remote_p])
.inc(); .inc();
} }
pub(crate) fn listen_request(&self, protocol: &ProtocolAddr) {
self.listen_requests_total
.with_label_values(&[protocol_name(protocol)])
.inc();
}
pub(crate) fn connect_request(&self, protocol: &ProtocolAddr) {
self.connect_requests_total
.with_label_values(&[protocol_name(protocol)])
.inc();
}
}
#[cfg(feature = "metrics")]
fn protocol_name(protocol: &ProtocolAddr) -> &str {
match protocol {
ProtocolAddr::Tcp(_) => "tcp",
ProtocolAddr::Udp(_) => "udp",
ProtocolAddr::Mpsc(_) => "mpsc",
}
} }
#[cfg(not(feature = "metrics"))] #[cfg(not(feature = "metrics"))]
@ -164,6 +186,10 @@ impl NetworkMetrics {
pub(crate) fn streams_opened(&self, _remote_p: &str) {} pub(crate) fn streams_opened(&self, _remote_p: &str) {}
pub(crate) fn streams_closed(&self, _remote_p: &str) {} pub(crate) fn streams_closed(&self, _remote_p: &str) {}
pub(crate) fn listen_request(&self, _protocol: &ProtocolAddr) {}
pub(crate) fn connect_request(&self, _protocol: &ProtocolAddr) {}
} }
impl std::fmt::Debug for NetworkMetrics { impl std::fmt::Debug for NetworkMetrics {

View File

@ -881,7 +881,7 @@ mod tests {
.unwrap(); .unwrap();
let stream = runtime.block_on(b2a_stream_opened_r.recv()).unwrap(); let stream = runtime.block_on(b2a_stream_opened_r.recv()).unwrap();
assert_eq!(stream.promises(), Promises::ORDERED); assert_eq!(stream.params().promises, Promises::ORDERED);
let (s, r) = oneshot::channel(); let (s, r) = oneshot::channel();
runtime.block_on(async { runtime.block_on(async {

View File

@ -177,15 +177,7 @@ impl Scheduler {
async move { async move {
debug!(?address, "Got request to open a channel_creator"); debug!(?address, "Got request to open a channel_creator");
#[cfg(feature = "metrics")] self.metrics.listen_request(&address);
self.metrics
.listen_requests_total
.with_label_values(&[match address {
ProtocolAddr::Tcp(_) => "tcp",
ProtocolAddr::Udp(_) => "udp",
ProtocolAddr::Mpsc(_) => "mpsc",
}])
.inc();
let (end_sender, end_receiver) = oneshot::channel::<()>(); let (end_sender, end_receiver) = oneshot::channel::<()>();
self.channel_listener self.channel_listener
.lock() .lock()
@ -202,13 +194,11 @@ impl Scheduler {
async fn connect_mgr(&self, mut a2s_connect_r: mpsc::UnboundedReceiver<A2sConnect>) { async fn connect_mgr(&self, mut a2s_connect_r: mpsc::UnboundedReceiver<A2sConnect>) {
trace!("Start connect_mgr"); trace!("Start connect_mgr");
while let Some((addr, pid_sender)) = a2s_connect_r.recv().await { while let Some((addr, pid_sender)) = a2s_connect_r.recv().await {
let (protocol, cid, handshake) = match addr { let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed);
let metrics = Arc::clone(&self.protocol_metrics);
self.metrics.connect_request(&addr);
let (protocol, handshake) = match addr {
ProtocolAddr::Tcp(addr) => { ProtocolAddr::Tcp(addr) => {
#[cfg(feature = "metrics")]
self.metrics
.connect_requests_total
.with_label_values(&["tcp"])
.inc();
let stream = match net::TcpStream::connect(addr).await { let stream = match net::TcpStream::connect(addr).await {
Ok(stream) => stream, Ok(stream) => stream,
Err(e) => { Err(e) => {
@ -216,13 +206,8 @@ impl Scheduler {
continue; continue;
}, },
}; };
let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed);
info!("Connecting Tcp to: {}", stream.peer_addr().unwrap()); info!("Connecting Tcp to: {}", stream.peer_addr().unwrap());
( (Protocols::new_tcp(stream, cid, metrics), false)
Protocols::new_tcp(stream, cid, Arc::clone(&self.protocol_metrics)),
cid,
false,
)
}, },
ProtocolAddr::Mpsc(addr) => { ProtocolAddr::Mpsc(addr) => {
let mpsc_s = match MPSC_POOL.lock().await.get(&addr) { let mpsc_s = match MPSC_POOL.lock().await.get(&addr) {
@ -244,17 +229,9 @@ impl Scheduler {
.send((remote_to_local_s, local_to_remote_oneshot_s)) .send((remote_to_local_s, local_to_remote_oneshot_s))
.unwrap(); .unwrap();
let local_to_remote_s = local_to_remote_oneshot_r.await.unwrap(); let local_to_remote_s = local_to_remote_oneshot_r.await.unwrap();
let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed);
info!(?addr, "Connecting Mpsc"); info!(?addr, "Connecting Mpsc");
( (
Protocols::new_mpsc( Protocols::new_mpsc(local_to_remote_s, remote_to_local_r, cid, metrics),
local_to_remote_s,
remote_to_local_r,
cid,
Arc::clone(&self.protocol_metrics),
),
cid,
false, false,
) )
}, },

View File

@ -1,5 +1,5 @@
use common_net::msg::{ClientType, ServerGeneral, ServerMsg}; use common_net::msg::{ClientType, ServerGeneral, ServerMsg};
use network::{Message, Participant, Stream, StreamError}; use network::{Message, Participant, Stream, StreamError, StreamParams};
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use specs::Component; use specs::Component;
use specs_idvs::IdvStorage; use specs_idvs::IdvStorage;
@ -26,6 +26,13 @@ pub struct Client {
character_screen_stream: Mutex<Stream>, character_screen_stream: Mutex<Stream>,
in_game_stream: Mutex<Stream>, in_game_stream: Mutex<Stream>,
terrain_stream: Mutex<Stream>, terrain_stream: Mutex<Stream>,
general_stream_params: StreamParams,
ping_stream_params: StreamParams,
register_stream_params: StreamParams,
character_screen_stream_params: StreamParams,
in_game_stream_params: StreamParams,
terrain_stream_params: StreamParams,
} }
pub struct PreparedMsg { pub struct PreparedMsg {
@ -50,6 +57,12 @@ impl Client {
in_game_stream: Stream, in_game_stream: Stream,
terrain_stream: Stream, terrain_stream: Stream,
) -> Self { ) -> Self {
let general_stream_params = general_stream.params();
let ping_stream_params = ping_stream.params();
let register_stream_params = register_stream.params();
let character_screen_stream_params = character_screen_stream.params();
let in_game_stream_params = in_game_stream.params();
let terrain_stream_params = terrain_stream.params();
Client { Client {
client_type, client_type,
participant: Some(participant), participant: Some(participant),
@ -62,6 +75,12 @@ impl Client {
character_screen_stream: Mutex::new(character_screen_stream), character_screen_stream: Mutex::new(character_screen_stream),
in_game_stream: Mutex::new(in_game_stream), in_game_stream: Mutex::new(in_game_stream),
terrain_stream: Mutex::new(terrain_stream), terrain_stream: Mutex::new(terrain_stream),
general_stream_params,
ping_stream_params,
register_stream_params,
character_screen_stream_params,
in_game_stream_params,
terrain_stream_params,
} }
} }
@ -138,9 +157,9 @@ impl Client {
pub(crate) fn prepare<M: Into<ServerMsg>>(&self, msg: M) -> PreparedMsg { pub(crate) fn prepare<M: Into<ServerMsg>>(&self, msg: M) -> PreparedMsg {
match msg.into() { match msg.into() {
ServerMsg::Info(m) => PreparedMsg::new(0, &m, &self.register_stream), ServerMsg::Info(m) => PreparedMsg::new(0, &m, &self.register_stream_params),
ServerMsg::Init(m) => PreparedMsg::new(0, &m, &self.register_stream), ServerMsg::Init(m) => PreparedMsg::new(0, &m, &self.register_stream_params),
ServerMsg::RegisterAnswer(m) => PreparedMsg::new(0, &m, &self.register_stream), ServerMsg::RegisterAnswer(m) => PreparedMsg::new(0, &m, &self.register_stream_params),
ServerMsg::General(g) => { ServerMsg::General(g) => {
match g { match g {
//Character Screen related //Character Screen related
@ -149,7 +168,7 @@ impl Client {
| ServerGeneral::CharacterActionError(_) | ServerGeneral::CharacterActionError(_)
| ServerGeneral::CharacterCreated(_) | ServerGeneral::CharacterCreated(_)
| ServerGeneral::CharacterSuccess => { | ServerGeneral::CharacterSuccess => {
PreparedMsg::new(1, &g, &self.character_screen_stream) PreparedMsg::new(1, &g, &self.character_screen_stream_params)
}, },
//Ingame related //Ingame related
ServerGeneral::GroupUpdate(_) ServerGeneral::GroupUpdate(_)
@ -164,12 +183,12 @@ impl Client {
| ServerGeneral::SiteEconomy(_) | ServerGeneral::SiteEconomy(_)
| ServerGeneral::UpdatePendingTrade(_, _, _) | ServerGeneral::UpdatePendingTrade(_, _, _)
| ServerGeneral::FinishedTrade(_) => { | ServerGeneral::FinishedTrade(_) => {
PreparedMsg::new(2, &g, &self.in_game_stream) PreparedMsg::new(2, &g, &self.in_game_stream_params)
}, },
//Ingame related, terrain //Ingame related, terrain
ServerGeneral::TerrainChunkUpdate { .. } ServerGeneral::TerrainChunkUpdate { .. }
| ServerGeneral::TerrainBlockUpdates(_) => { | ServerGeneral::TerrainBlockUpdates(_) => {
PreparedMsg::new(5, &g, &self.terrain_stream) PreparedMsg::new(5, &g, &self.terrain_stream_params)
}, },
// Always possible // Always possible
ServerGeneral::PlayerListUpdate(_) ServerGeneral::PlayerListUpdate(_)
@ -183,11 +202,11 @@ impl Client {
| ServerGeneral::DeleteEntity(_) | ServerGeneral::DeleteEntity(_)
| ServerGeneral::Disconnect(_) | ServerGeneral::Disconnect(_)
| ServerGeneral::Notification(_) => { | ServerGeneral::Notification(_) => {
PreparedMsg::new(3, &g, &self.general_stream) PreparedMsg::new(3, &g, &self.general_stream_params)
}, },
} }
}, },
ServerMsg::Ping(m) => PreparedMsg::new(4, &m, &self.ping_stream), ServerMsg::Ping(m) => PreparedMsg::new(4, &m, &self.ping_stream_params),
} }
} }
@ -209,10 +228,10 @@ impl Client {
} }
impl PreparedMsg { impl PreparedMsg {
fn new<M: Serialize + ?Sized>(id: u8, msg: &M, stream: &Mutex<Stream>) -> PreparedMsg { fn new<M: Serialize + ?Sized>(id: u8, msg: &M, stream_params: &StreamParams) -> PreparedMsg {
Self { Self {
stream_id: id, stream_id: id,
message: Message::serialize(&msg, &stream.lock().unwrap()), message: Message::serialize(&msg, stream_params.clone()),
} }
} }
} }