Merge branch 'xMAC94x/net-improve' into 'master'

network preparation for UDP, Metrics and Bandwidth stats

See merge request veloren/veloren!1994
This commit is contained in:
Marcel 2021-03-26 12:26:18 +00:00
commit 69b38f872a
16 changed files with 458 additions and 197 deletions

View File

@ -3,11 +3,11 @@ use std::{net::SocketAddr, sync::Arc};
use tokio::{runtime::Runtime, sync::Mutex};
use veloren_network::{Message, Network, Participant, Pid, Promises, ProtocolAddr, Stream};
fn serialize(data: &[u8], stream: &Stream) { let _ = Message::serialize(data, &stream); }
fn serialize(data: &[u8], stream: &Stream) { let _ = Message::serialize(data, stream.params()); }
async fn stream_msg(s1_a: Arc<Mutex<Stream>>, s1_b: Arc<Mutex<Stream>>, data: &[u8], cnt: usize) {
let mut s1_b = s1_b.lock().await;
let m = Message::serialize(&data, &s1_b);
let m = Message::serialize(&data, s1_b.params());
std::thread::spawn(move || {
let mut s1_a = s1_a.try_lock().unwrap();
for _ in 0..cnt {

View File

@ -173,7 +173,7 @@ fn client(address: ProtocolAddr, runtime: Arc<Runtime>) {
id,
data: vec![0; 1000],
},
&s1,
s1.params(),
);
loop {
s1.send_raw(&raw_msg).unwrap();

View File

@ -117,7 +117,7 @@ pub trait SendProtocol {
&mut self,
bandwidth: Bandwidth,
dt: std::time::Duration,
) -> Result<(), ProtocolError>;
) -> Result<Bandwidth, ProtocolError>;
}
/// Generic Network Recv Protocol. See: [`SendProtocol`]

View File

@ -6,7 +6,7 @@ use crate::{
frame::InitFrame,
handshake::{ReliableDrain, ReliableSink},
metrics::ProtocolMetricCache,
types::Bandwidth,
types::{Bandwidth, Promises},
RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink,
};
use async_trait::async_trait;
@ -57,6 +57,16 @@ where
metrics,
}
}
/// returns all promises that this Protocol can take care of
/// If you open a Stream anyway, unsupported promises are ignored.
pub fn supported_promises() -> Promises {
Promises::ORDERED
| Promises::CONSISTENCY
| Promises::GUARANTEED_DELIVERY
| Promises::COMPRESSED
| Promises::ENCRYPTED /*assume a direct mpsc connection is secure*/
}
}
impl<S> MpscRecvProtocol<S>
@ -102,7 +112,9 @@ where
}
}
async fn flush(&mut self, _: Bandwidth, _: Duration) -> Result<(), ProtocolError> { Ok(()) }
async fn flush(&mut self, _: Bandwidth, _: Duration) -> Result<Bandwidth, ProtocolError> {
Ok(0)
}
}
#[async_trait]

View File

@ -6,7 +6,7 @@ use crate::{
message::{ITMessage, ALLOC_BLOCK},
metrics::{ProtocolMetricCache, RemoveReason},
prio::PrioManager,
types::{Bandwidth, Mid, Sid},
types::{Bandwidth, Mid, Promises, Sid},
RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink,
};
use async_trait::async_trait;
@ -70,6 +70,15 @@ where
metrics,
}
}
/// returns all promises that this Protocol can take care of
/// If you open a Stream anyway, unsupported promises are ignored.
pub fn supported_promises() -> Promises {
Promises::ORDERED
| Promises::CONSISTENCY
| Promises::GUARANTEED_DELIVERY
| Promises::COMPRESSED
}
}
impl<S> TcpRecvProtocol<S>
@ -158,7 +167,11 @@ where
Ok(())
}
async fn flush(&mut self, bandwidth: Bandwidth, dt: Duration) -> Result<(), ProtocolError> {
async fn flush(
&mut self,
bandwidth: Bandwidth,
dt: Duration,
) -> Result</* actual */ Bandwidth, ProtocolError> {
let (frames, total_bytes) = self.store.grab(bandwidth, dt);
self.buffer.reserve(total_bytes as usize);
let mut data_frames = 0;
@ -207,7 +220,7 @@ where
self.drain.send(self.buffer.split()).await?;
self.pending_shutdown = false;
}
Ok(())
Ok(data_bandwidth as u64)
}
}

View File

@ -65,7 +65,7 @@ pub struct Pid {
/// Unique ID per Stream, in one Channel.
/// one side will always start with 0, while the other start with u64::MAX / 2.
/// number increases for each created Stream.
#[derive(PartialEq, Eq, Hash, Clone, Copy)]
#[derive(PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)]
pub struct Sid {
internal: u64,
}

View File

@ -22,7 +22,7 @@ use std::{
use tokio::{
io,
runtime::Runtime,
sync::{mpsc, oneshot, Mutex},
sync::{mpsc, oneshot, watch, Mutex},
};
use tracing::*;
@ -48,6 +48,7 @@ pub struct Participant {
remote_pid: Pid,
a2b_open_stream_s: Mutex<mpsc::UnboundedSender<A2bStreamOpen>>,
b2a_stream_opened_r: Mutex<mpsc::UnboundedReceiver<Stream>>,
b2a_bandwidth_stats_r: watch::Receiver<f32>,
a2s_disconnect_s: A2sDisconnect,
}
@ -119,6 +120,12 @@ pub enum StreamError {
Deserialize(bincode::Error),
}
/// All Parameters of a Stream, can be used to generate RawMessages
#[derive(Debug, Clone)]
pub struct StreamParams {
pub(crate) promises: Promises,
}
/// Use the `Network` to create connections to other [`Participants`]
///
/// The `Network` is the single source that handles all connections in your
@ -487,6 +494,7 @@ impl Participant {
remote_pid: Pid,
a2b_open_stream_s: mpsc::UnboundedSender<A2bStreamOpen>,
b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
b2a_bandwidth_stats_r: watch::Receiver<f32>,
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>,
) -> Self {
Self {
@ -494,6 +502,7 @@ impl Participant {
remote_pid,
a2b_open_stream_s: Mutex::new(a2b_open_stream_s),
b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r),
b2a_bandwidth_stats_r,
a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
}
}
@ -715,6 +724,10 @@ impl Participant {
}
}
/// Returns the current approximation on the maximum bandwidth available.
/// This WILL fluctuate based on the amount/size of send messages.
pub fn bandwidth(&self) -> f32 { *self.b2a_bandwidth_stats_r.borrow() }
/// Returns the remote [`Pid`](network_protocol::Pid)
pub fn remote_pid(&self) -> Pid { self.remote_pid }
}
@ -803,7 +816,7 @@ impl Stream {
/// [`Serialized`]: Serialize
#[inline]
pub fn send<M: Serialize>(&mut self, msg: M) -> Result<(), StreamError> {
self.send_raw(&Message::serialize(&msg, &self))
self.send_raw(&Message::serialize(&msg, self.params()))
}
/// This methods give the option to skip multiple calls of [`bincode`] and
@ -837,7 +850,7 @@ impl Stream {
/// let mut stream_b = participant_b.opened().await?;
///
/// //Prepare Message and decode it
/// let msg = Message::serialize("Hello World", &stream_a);
/// let msg = Message::serialize("Hello World", stream_a.params());
/// //Send same Message to multiple Streams
/// stream_a.send_raw(&msg);
/// stream_b.send_raw(&msg);
@ -858,7 +871,7 @@ impl Stream {
return Err(StreamError::StreamClosed);
}
#[cfg(debug_assertions)]
message.verify(&self);
message.verify(self.params());
self.a2b_msg_s.send((self.sid, message.data.clone()))?;
Ok(())
}
@ -999,7 +1012,7 @@ impl Stream {
Message {
data,
#[cfg(feature = "compression")]
compressed: self.promises().contains(Promises::COMPRESSED),
compressed: self.promises.contains(Promises::COMPRESSED),
}
.deserialize()?,
)),
@ -1013,7 +1026,11 @@ impl Stream {
}
}
pub fn promises(&self) -> Promises { self.promises }
pub fn params(&self) -> StreamParams {
StreamParams {
promises: self.promises,
}
}
}
impl core::cmp::PartialEq for Participant {

View File

@ -1,9 +1,9 @@
use async_trait::async_trait;
use bytes::BytesMut;
use network_protocol::{
Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid, ProtocolError,
ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol, TcpSendProtocol,
UnreliableDrain, UnreliableSink,
Bandwidth, Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid,
ProtocolError, ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol,
TcpSendProtocol, UnreliableDrain, UnreliableSink,
};
use std::{sync::Arc, time::Duration};
use tokio::{
@ -102,7 +102,11 @@ impl network_protocol::SendProtocol for SendProtocols {
}
}
async fn flush(&mut self, bandwidth: u64, dt: Duration) -> Result<(), ProtocolError> {
async fn flush(
&mut self,
bandwidth: Bandwidth,
dt: Duration,
) -> Result<Bandwidth, ProtocolError> {
match self {
SendProtocols::Tcp(s) => s.flush(bandwidth, dt).await,
SendProtocols::Mpsc(s) => s.flush(bandwidth, dt).await,

View File

@ -104,11 +104,11 @@ mod message;
mod metrics;
mod participant;
mod scheduler;
mod trace;
mod util;
pub use api::{
Network, NetworkConnectError, NetworkError, Participant, ParticipantError, ProtocolAddr,
Stream, StreamError,
Stream, StreamError, StreamParams,
};
pub use message::Message;
pub use network_protocol::{InitProtocolError, Pid, Promises};

View File

@ -1,4 +1,4 @@
use crate::api::{Stream, StreamError};
use crate::api::{StreamError, StreamParams};
use bytes::Bytes;
#[cfg(feature = "compression")]
use network_protocol::Promises;
@ -36,12 +36,12 @@ impl Message {
/// [`Message::serialize`]: crate::message::Message::serialize
///
/// [`Streams`]: crate::api::Stream
pub fn serialize<M: Serialize + ?Sized>(message: &M, stream: &Stream) -> Self {
pub fn serialize<M: Serialize + ?Sized>(message: &M, stream_params: StreamParams) -> Self {
//this will never fail: https://docs.rs/bincode/0.8.0/bincode/fn.serialize.html
let serialized_data = bincode::serialize(message).unwrap();
#[cfg(feature = "compression")]
let compressed = stream.promises().contains(Promises::COMPRESSED);
let compressed = stream_params.promises.contains(Promises::COMPRESSED);
#[cfg(feature = "compression")]
let data = if compressed {
let mut compressed_data = Vec::with_capacity(serialized_data.len() / 4 + 10);
@ -54,7 +54,7 @@ impl Message {
#[cfg(not(feature = "compression"))]
let data = serialized_data;
#[cfg(not(feature = "compression"))]
let _stream = stream;
let _stream_params = stream_params;
Self {
data: Bytes::from(data),
@ -127,13 +127,13 @@ impl Message {
}
#[cfg(debug_assertions)]
pub(crate) fn verify(&self, stream: &Stream) {
pub(crate) fn verify(&self, params: StreamParams) {
#[cfg(not(feature = "compression"))]
let _stream = stream;
#[cfg(feature = "compression")]
if self.compressed != stream.promises().contains(Promises::COMPRESSED) {
if self.compressed != params.promises.contains(Promises::COMPRESSED) {
warn!(
?stream,
?params,
"verify failed, msg is {} and it doesn't match with stream", self.compressed
);
}
@ -171,14 +171,9 @@ pub(crate) fn partial_eq_bincode(first: &bincode::ErrorKind, second: &bincode::E
#[cfg(test)]
mod tests {
use crate::{api::Stream, message::*};
use std::sync::{atomic::AtomicBool, Arc};
use tokio::sync::mpsc;
fn stub_stream(compressed: bool) -> Stream {
use crate::api::*;
use network_protocol::*;
use crate::{api::StreamParams, message::*};
fn stub_stream(compressed: bool) -> StreamParams {
#[cfg(feature = "compression")]
let promises = if compressed {
Promises::COMPRESSED
@ -189,27 +184,12 @@ mod tests {
#[cfg(not(feature = "compression"))]
let promises = Promises::empty();
let (a2b_msg_s, _a2b_msg_r) = crossbeam_channel::unbounded();
let (_b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded();
let (a2b_close_stream_s, _a2b_close_stream_r) = mpsc::unbounded_channel();
Stream::new(
Pid::fake(0),
Pid::fake(1),
Sid::new(0),
0u8,
promises,
1_000_000,
Arc::new(AtomicBool::new(true)),
a2b_msg_s,
b2a_msg_recv_r,
a2b_close_stream_s,
)
StreamParams { promises }
}
#[test]
fn serialize_test() {
let msg = Message::serialize("abc", &stub_stream(false));
let msg = Message::serialize("abc", stub_stream(false));
assert_eq!(msg.data.len(), 11);
assert_eq!(msg.data[0], 3);
assert_eq!(msg.data[1..7], [0, 0, 0, 0, 0, 0]);
@ -221,7 +201,7 @@ mod tests {
#[cfg(feature = "compression")]
#[test]
fn serialize_compress_small() {
let msg = Message::serialize("abc", &stub_stream(true));
let msg = Message::serialize("abc", stub_stream(true));
assert_eq!(msg.data.len(), 12);
assert_eq!(msg.data[0], 176);
assert_eq!(msg.data[1], 3);
@ -245,7 +225,7 @@ mod tests {
0,
"assets/data/plants/flowers/greenrose.ron",
);
let msg = Message::serialize(&msg, &stub_stream(true));
let msg = Message::serialize(&msg, stub_stream(true));
assert_eq!(msg.data.len(), 79);
assert_eq!(msg.data[0], 34);
assert_eq!(msg.data[1], 5);
@ -275,7 +255,7 @@ mod tests {
_ => {},
}
}
let msg = Message::serialize(&msg, &stub_stream(true));
let msg = Message::serialize(&msg, stub_stream(true));
assert_eq!(msg.data.len(), 1331);
}
}

View File

@ -1,3 +1,4 @@
use crate::api::ProtocolAddr;
use network_protocol::{Cid, Pid};
#[cfg(feature = "metrics")]
use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
@ -12,6 +13,8 @@ pub struct NetworkMetrics {
pub participants_disconnected_total: IntCounter,
// channel id's, seperated by PARTICIPANT, max 5
pub participants_channel_ids: IntGaugeVec,
// upload to remote, averaged, seperated by PARTICIPANT
pub participants_bandwidth: IntGaugeVec,
// opened Channels, seperated by PARTICIPANT
pub channels_connected_total: IntCounterVec,
pub channels_disconnected_total: IntCounterVec,
@ -56,6 +59,13 @@ impl NetworkMetrics {
),
&["participant", "no"],
)?;
let participants_bandwidth = IntGaugeVec::new(
Opts::new(
"participants_bandwidth",
"max upload possible to Participant",
),
&["participant"],
)?;
let channels_connected_total = IntCounterVec::new(
Opts::new(
"channels_connected_total",
@ -103,6 +113,7 @@ impl NetworkMetrics {
participants_connected_total,
participants_disconnected_total,
participants_channel_ids,
participants_bandwidth,
channels_connected_total,
channels_disconnected_total,
streams_opened_total,
@ -117,6 +128,7 @@ impl NetworkMetrics {
registry.register(Box::new(self.participants_connected_total.clone()))?;
registry.register(Box::new(self.participants_disconnected_total.clone()))?;
registry.register(Box::new(self.participants_channel_ids.clone()))?;
registry.register(Box::new(self.participants_bandwidth.clone()))?;
registry.register(Box::new(self.channels_connected_total.clone()))?;
registry.register(Box::new(self.channels_disconnected_total.clone()))?;
registry.register(Box::new(self.streams_opened_total.clone()))?;
@ -140,6 +152,12 @@ impl NetworkMetrics {
.inc();
}
pub(crate) fn participant_bandwidth(&self, remote_p: &str, bandwidth: f32) {
self.participants_bandwidth
.with_label_values(&[remote_p])
.set(bandwidth as i64);
}
pub(crate) fn streams_opened(&self, remote_p: &str) {
self.streams_opened_total
.with_label_values(&[remote_p])
@ -151,6 +169,46 @@ impl NetworkMetrics {
.with_label_values(&[remote_p])
.inc();
}
pub(crate) fn listen_request(&self, protocol: &ProtocolAddr) {
self.listen_requests_total
.with_label_values(&[protocol_name(protocol)])
.inc();
}
pub(crate) fn connect_request(&self, protocol: &ProtocolAddr) {
self.connect_requests_total
.with_label_values(&[protocol_name(protocol)])
.inc();
}
pub(crate) fn cleanup_participant(&self, remote_p: &str) {
for no in 0..5 {
let _ = self
.participants_channel_ids
.remove_label_values(&[&remote_p, &no.to_string()]);
}
let _ = self
.channels_connected_total
.remove_label_values(&[&remote_p]);
let _ = self
.channels_disconnected_total
.remove_label_values(&[&remote_p]);
let _ = self
.participants_bandwidth
.remove_label_values(&[&remote_p]);
let _ = self.streams_opened_total.remove_label_values(&[&remote_p]);
let _ = self.streams_closed_total.remove_label_values(&[&remote_p]);
}
}
#[cfg(feature = "metrics")]
fn protocol_name(protocol: &ProtocolAddr) -> &str {
match protocol {
ProtocolAddr::Tcp(_) => "tcp",
ProtocolAddr::Udp(_) => "udp",
ProtocolAddr::Mpsc(_) => "mpsc",
}
}
#[cfg(not(feature = "metrics"))]
@ -161,9 +219,17 @@ impl NetworkMetrics {
pub(crate) fn channels_disconnected(&self, _remote_p: &str) {}
pub(crate) fn participant_bandwidth(&self, _remote_p: &str, _bandwidth: f32) {}
pub(crate) fn streams_opened(&self, _remote_p: &str) {}
pub(crate) fn streams_closed(&self, _remote_p: &str) {}
pub(crate) fn listen_request(&self, _protocol: &ProtocolAddr) {}
pub(crate) fn connect_request(&self, _protocol: &ProtocolAddr) {}
pub(crate) fn cleanup_participant(&self, _remote_p: &str) {}
}
impl std::fmt::Debug for NetworkMetrics {

View File

@ -2,7 +2,7 @@ use crate::{
api::{ParticipantError, Stream},
channel::{Protocols, RecvProtocols, SendProtocols},
metrics::NetworkMetrics,
trace::DeferredTracer,
util::{DeferredTracer, SortedVec},
};
use bytes::Bytes;
use futures_util::{FutureExt, StreamExt};
@ -19,7 +19,7 @@ use std::{
};
use tokio::{
select,
sync::{mpsc, oneshot, Mutex, RwLock},
sync::{mpsc, oneshot, watch, Mutex, RwLock},
task::JoinHandle,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
@ -49,6 +49,7 @@ struct ControlChannels {
a2b_open_stream_r: mpsc::UnboundedReceiver<A2bStreamOpen>,
b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
s2b_create_channel_r: mpsc::UnboundedReceiver<S2bCreateChannel>,
b2a_bandwidth_stats_s: watch::Sender<f32>,
s2b_shutdown_bparticipant_r: oneshot::Receiver<S2bShutdownBparticipant>, /* own */
}
@ -92,16 +93,19 @@ impl BParticipant {
mpsc::UnboundedReceiver<Stream>,
mpsc::UnboundedSender<S2bCreateChannel>,
oneshot::Sender<S2bShutdownBparticipant>,
watch::Receiver<f32>,
) {
let (a2b_open_stream_s, a2b_open_stream_r) = mpsc::unbounded_channel::<A2bStreamOpen>();
let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded_channel::<Stream>();
let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel();
let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded_channel();
let (b2a_bandwidth_stats_s, b2a_bandwidth_stats_r) = watch::channel::<f32>(0.0);
let run_channels = Some(ControlChannels {
a2b_open_stream_r,
b2a_stream_opened_s,
s2b_create_channel_r,
b2a_bandwidth_stats_s,
s2b_shutdown_bparticipant_r,
});
@ -124,6 +128,7 @@ impl BParticipant {
b2a_stream_opened_r,
s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2a_bandwidth_stats_r,
)
}
@ -136,8 +141,10 @@ impl BParticipant {
async_channel::unbounded::<Cid>();
let (b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_r) =
async_channel::unbounded::<Cid>();
let (b2b_notify_send_of_recv_s, b2b_notify_send_of_recv_r) =
crossbeam_channel::unbounded::<ProtocolEvent>();
let (b2b_notify_send_of_recv_open_s, b2b_notify_send_of_recv_open_r) =
crossbeam_channel::unbounded::<(Cid, Sid, Prio, Promises, u64)>();
let (b2b_notify_send_of_recv_close_s, b2b_notify_send_of_recv_close_r) =
crossbeam_channel::unbounded::<(Cid, Sid)>();
let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded_channel::<Sid>();
let (a2b_msg_s, a2b_msg_r) = crossbeam_channel::unbounded::<(Sid, Bytes)>();
@ -155,8 +162,10 @@ impl BParticipant {
a2b_msg_r,
b2b_add_send_protocol_r,
b2b_close_send_protocol_r,
b2b_notify_send_of_recv_r,
b2b_notify_send_of_recv_open_r,
b2b_notify_send_of_recv_close_r,
b2s_prio_statistic_s,
run_channels.b2a_bandwidth_stats_s,
)
.instrument(tracing::info_span!("send")),
self.recv_mgr(
@ -164,7 +173,8 @@ impl BParticipant {
b2b_add_recv_protocol_r,
b2b_force_close_recv_protocol_r,
b2b_close_send_protocol_s.clone(),
b2b_notify_send_of_recv_s,
b2b_notify_send_of_recv_open_s,
b2b_notify_send_of_recv_close_s,
)
.instrument(tracing::info_span!("recv")),
self.create_channel_mgr(
@ -180,6 +190,28 @@ impl BParticipant {
);
}
fn best_protocol(all: &SortedVec<Cid, SendProtocols>, promises: Promises) -> Option<Cid> {
// check for mpsc
for (cid, p) in all.data.iter() {
if matches!(p, SendProtocols::Mpsc(_)) {
return Some(*cid);
}
}
// check for tcp
if network_protocol::TcpSendProtocol::<crate::channel::TcpDrain>::supported_promises()
== promises
{
for (cid, p) in all.data.iter() {
if matches!(p, SendProtocols::Tcp(_)) {
return Some(*cid);
}
}
}
warn!("couldn't satisfy promises");
all.data.first().map(|(c, _)| *c)
}
//TODO: local stream_cid: HashMap<Sid, Cid> to know the respective protocol
#[allow(clippy::too_many_arguments)]
async fn send_mgr(
@ -189,18 +221,27 @@ impl BParticipant {
a2b_msg_r: crossbeam_channel::Receiver<(Sid, Bytes)>,
mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, SendProtocols)>,
b2b_close_send_protocol_r: async_channel::Receiver<Cid>,
b2b_notify_send_of_recv_r: crossbeam_channel::Receiver<ProtocolEvent>,
b2b_notify_send_of_recv_open_r: crossbeam_channel::Receiver<(
Cid,
Sid,
Prio,
Promises,
Bandwidth,
)>,
b2b_notify_send_of_recv_close_r: crossbeam_channel::Receiver<(Cid, Sid)>,
_b2s_prio_statistic_s: mpsc::UnboundedSender<B2sPrioStatistic>,
b2a_bandwidth_stats_s: watch::Sender<f32>,
) {
let mut send_protocols: HashMap<Cid, SendProtocols> = HashMap::new();
let mut sorted_send_protocols = SortedVec::<Cid, SendProtocols>::default();
let mut sorted_stream_protocols = SortedVec::<Sid, Cid>::default();
let mut interval = tokio::time::interval(Self::TICK_TIME);
let mut last_instant = Instant::now();
let mut stream_ids = self.offset_sid;
let mut part_bandwidth = 0.0f32;
trace!("workaround, actively wait for first protocol");
b2b_add_protocol_r
.recv()
.await
.map(|(c, p)| send_protocols.insert(c, p));
if let Some((c, p)) = b2b_add_protocol_r.recv().await {
sorted_send_protocols.insert(c, p)
}
loop {
let (open, close, _, addp, remp) = select!(
Some(n) = a2b_open_stream_r.recv().fuse() => (Some(n), None, None, None, None),
@ -210,25 +251,29 @@ impl BParticipant {
Ok(n) = b2b_close_send_protocol_r.recv().fuse() => (None, None, None, None, Some(n)),
);
addp.map(|(cid, p)| {
if let Some((cid, p)) = addp {
debug!(?cid, "add protocol");
send_protocols.insert(cid, p)
});
sorted_send_protocols.insert(cid, p);
}
let (cid, active) = match send_protocols.iter_mut().next() {
Some((cid, a)) => (*cid, a),
None => {
warn!("no channel");
tokio::time::sleep(Self::TICK_TIME * 1000).await; //TODO: failover
continue;
},
};
//verify that we have at LEAST 1 channel before continuing
if sorted_send_protocols.data.is_empty() {
warn!("no channel");
tokio::time::sleep(Self::TICK_TIME * 1000).await; //TODO: failover
continue;
}
//let (cid, active) = sorted_send_protocols.data.iter_mut().next().unwrap();
//used for error handling
let mut cid = u64::MAX;
let active_err = async {
if let Some((prio, promises, guaranteed_bandwidth, return_s)) = open {
let sid = stream_ids;
trace!(?sid, "open stream");
stream_ids += Sid::from(1);
cid = Self::best_protocol(&sorted_send_protocols, promises).unwrap();
trace!(?sid, ?cid, "open stream");
let stream = self
.create_stream(sid, prio, promises, guaranteed_bandwidth)
.await;
@ -240,49 +285,83 @@ impl BParticipant {
guaranteed_bandwidth,
};
sorted_stream_protocols.insert(sid, cid);
return_s.send(stream).unwrap();
active.send(event).await?;
sorted_send_protocols
.get_mut(&cid)
.unwrap()
.send(event)
.await?;
}
// process recv content first
let mut closeevents = b2b_notify_send_of_recv_r
.try_iter()
.map(|e| {
if matches!(e, ProtocolEvent::OpenStream { .. }) {
active.notify_from_recv(e);
None
} else {
Some(e)
}
})
.collect::<Vec<_>>();
for (cid, sid, prio, promises, guaranteed_bandwidth) in
b2b_notify_send_of_recv_open_r.try_iter()
{
match sorted_send_protocols.get_mut(&cid) {
Some(p) => {
sorted_stream_protocols.insert(sid, cid);
p.notify_from_recv(ProtocolEvent::OpenStream {
sid,
prio,
promises,
guaranteed_bandwidth,
});
},
None => warn!(?cid, "couldn't notify create protocol, doesn't exist"),
};
}
// get all messages and assign it to a channel
for (sid, buffer) in a2b_msg_r.try_iter() {
active
.send(ProtocolEvent::Message { data: buffer, sid })
.await?
cid = *sorted_stream_protocols.get(&sid).unwrap();
let event = ProtocolEvent::Message { data: buffer, sid };
sorted_send_protocols
.get_mut(&cid)
.unwrap()
.send(event)
.await?;
}
// process recv content afterwards
let _ = closeevents.drain(..).map(|e| {
if let Some(e) = e {
active.notify_from_recv(e);
}
});
for (cid, sid) in b2b_notify_send_of_recv_close_r.try_iter() {
match sorted_send_protocols.get_mut(&cid) {
Some(p) => {
let _ = sorted_stream_protocols.delete(&sid);
p.notify_from_recv(ProtocolEvent::CloseStream { sid });
},
None => warn!(?cid, "couldn't notify close protocol, doesn't exist"),
};
}
if let Some(sid) = close {
trace!(?stream_ids, "delete stream");
self.delete_stream(sid).await;
// Fire&Forget the protocol will take care to verify that this Frame is delayed
// till the last msg was received!
active.send(ProtocolEvent::CloseStream { sid }).await?;
if let Some(c) = sorted_stream_protocols.delete(&sid) {
cid = c;
let event = ProtocolEvent::CloseStream { sid };
sorted_send_protocols
.get_mut(&c)
.unwrap()
.send(event)
.await?;
}
}
let send_time = Instant::now();
let diff = send_time.duration_since(last_instant);
last_instant = send_time;
active.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it.
let mut cnt = 0;
for (_, p) in sorted_send_protocols.data.iter_mut() {
cnt += p.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it.
}
let flush_time = send_time.elapsed().as_secs_f32();
part_bandwidth = 0.99 * part_bandwidth + 0.01 * (cnt as f32 / flush_time);
self.metrics
.participant_bandwidth(&self.remote_pid_string, part_bandwidth);
let _ = b2a_bandwidth_stats_s.send(part_bandwidth);
let r: Result<(), network_protocol::ProtocolError> = Ok(());
r
}
@ -292,16 +371,16 @@ impl BParticipant {
// remote recv will now fail, which will trigger remote send which will trigger
// recv
trace!("TODO: for now decide to FAIL this participant and not wait for a failover");
send_protocols.remove(&cid).unwrap();
sorted_send_protocols.delete(&cid).unwrap();
self.metrics.channels_disconnected(&self.remote_pid_string);
if send_protocols.is_empty() {
if sorted_send_protocols.data.is_empty() {
break;
}
}
if let Some(cid) = remp {
debug!(?cid, "remove protocol");
match send_protocols.remove(&cid) {
match sorted_send_protocols.delete(&cid) {
Some(mut prot) => {
self.metrics.channels_disconnected(&self.remote_pid_string);
trace!("blocking flush");
@ -311,7 +390,7 @@ impl BParticipant {
},
None => trace!("tried to remove protocol twice"),
};
if send_protocols.is_empty() {
if sorted_send_protocols.data.is_empty() {
break;
}
}
@ -330,7 +409,14 @@ impl BParticipant {
mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, RecvProtocols)>,
b2b_force_close_recv_protocol_r: async_channel::Receiver<Cid>,
b2b_close_send_protocol_s: async_channel::Sender<Cid>,
b2b_notify_send_of_recv_s: crossbeam_channel::Sender<ProtocolEvent>,
b2b_notify_send_of_recv_open_r: crossbeam_channel::Sender<(
Cid,
Sid,
Prio,
Promises,
Bandwidth,
)>,
b2b_notify_send_of_recv_close_s: crossbeam_channel::Sender<(Cid, Sid)>,
) {
let mut recv_protocols: HashMap<Cid, JoinHandle<()>> = HashMap::new();
// we should be able to directly await futures imo
@ -390,7 +476,13 @@ impl BParticipant {
guaranteed_bandwidth,
}) => {
trace!(?sid, "open stream");
let _ = b2b_notify_send_of_recv_s.send(r.unwrap());
let _ = b2b_notify_send_of_recv_open_r.send((
cid,
sid,
prio,
promises,
guaranteed_bandwidth,
));
// waiting for receiving is not necessary, because the send_mgr will first
// process this before process messages!
let stream = self
@ -401,7 +493,7 @@ impl BParticipant {
},
Ok(ProtocolEvent::CloseStream { sid }) => {
trace!(?sid, "close stream");
let _ = b2b_notify_send_of_recv_s.send(r.unwrap());
let _ = b2b_notify_send_of_recv_close_s.send((cid, sid));
self.delete_stream(sid).await;
retrigger(cid, p, &mut recv_protocols);
},
@ -591,6 +683,7 @@ impl BParticipant {
#[cfg(feature = "metrics")]
self.metrics.participants_disconnected_total.inc();
self.metrics.cleanup_participant(&self.remote_pid_string);
trace!("Stop participant_shutdown_mgr");
}
@ -677,6 +770,7 @@ mod tests {
mpsc::UnboundedSender<S2bCreateChannel>,
oneshot::Sender<S2bShutdownBparticipant>,
mpsc::UnboundedReceiver<B2sPrioStatistic>,
watch::Receiver<f32>,
JoinHandle<()>,
) {
let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap());
@ -691,6 +785,7 @@ mod tests {
b2a_stream_opened_r,
s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2a_bandwidth_stats_r,
) = runtime_clone.block_on(async move {
let local_pid = Pid::fake(0);
let remote_pid = Pid::fake(1);
@ -708,6 +803,7 @@ mod tests {
s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
b2a_bandwidth_stats_r,
handle,
)
}
@ -738,6 +834,7 @@ mod tests {
mut s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
handle,
) = mock_bparticipant();
@ -773,6 +870,7 @@ mod tests {
mut s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
handle,
) = mock_bparticipant();
@ -809,6 +907,7 @@ mod tests {
mut s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
handle,
) = mock_bparticipant();
@ -863,6 +962,7 @@ mod tests {
mut s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
handle,
) = mock_bparticipant();
@ -881,7 +981,7 @@ mod tests {
.unwrap();
let stream = runtime.block_on(b2a_stream_opened_r.recv()).unwrap();
assert_eq!(stream.promises(), Promises::ORDERED);
assert_eq!(stream.params().promises, Promises::ORDERED);
let (s, r) = oneshot::channel();
runtime.block_on(async {

View File

@ -177,15 +177,7 @@ impl Scheduler {
async move {
debug!(?address, "Got request to open a channel_creator");
#[cfg(feature = "metrics")]
self.metrics
.listen_requests_total
.with_label_values(&[match address {
ProtocolAddr::Tcp(_) => "tcp",
ProtocolAddr::Udp(_) => "udp",
ProtocolAddr::Mpsc(_) => "mpsc",
}])
.inc();
self.metrics.listen_request(&address);
let (end_sender, end_receiver) = oneshot::channel::<()>();
self.channel_listener
.lock()
@ -202,13 +194,11 @@ impl Scheduler {
async fn connect_mgr(&self, mut a2s_connect_r: mpsc::UnboundedReceiver<A2sConnect>) {
trace!("Start connect_mgr");
while let Some((addr, pid_sender)) = a2s_connect_r.recv().await {
let (protocol, cid, handshake) = match addr {
let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed);
let metrics = Arc::clone(&self.protocol_metrics);
self.metrics.connect_request(&addr);
let (protocol, handshake) = match addr {
ProtocolAddr::Tcp(addr) => {
#[cfg(feature = "metrics")]
self.metrics
.connect_requests_total
.with_label_values(&["tcp"])
.inc();
let stream = match net::TcpStream::connect(addr).await {
Ok(stream) => stream,
Err(e) => {
@ -216,13 +206,8 @@ impl Scheduler {
continue;
},
};
let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed);
info!("Connecting Tcp to: {}", stream.peer_addr().unwrap());
(
Protocols::new_tcp(stream, cid, Arc::clone(&self.protocol_metrics)),
cid,
false,
)
(Protocols::new_tcp(stream, cid, metrics), false)
},
ProtocolAddr::Mpsc(addr) => {
let mpsc_s = match MPSC_POOL.lock().await.get(&addr) {
@ -244,17 +229,9 @@ impl Scheduler {
.send((remote_to_local_s, local_to_remote_oneshot_s))
.unwrap();
let local_to_remote_s = local_to_remote_oneshot_r.await.unwrap();
let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed);
info!(?addr, "Connecting Mpsc");
(
Protocols::new_mpsc(
local_to_remote_s,
remote_to_local_r,
cid,
Arc::clone(&self.protocol_metrics),
),
cid,
Protocols::new_mpsc(local_to_remote_s, remote_to_local_r, cid, metrics),
false,
)
},
@ -591,6 +568,7 @@ impl Scheduler {
b2a_stream_opened_r,
s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
b2a_bandwidth_stats_r,
) = BParticipant::new(local_pid, pid, sid, Arc::clone(&metrics));
let participant = Participant::new(
@ -598,6 +576,7 @@ impl Scheduler {
pid,
a2b_open_stream_s,
b2a_stream_opened_r,
b2a_bandwidth_stats_r,
participant_channels.a2s_disconnect_s,
);

View File

@ -1,46 +0,0 @@
use core::hash::Hash;
use std::{collections::HashMap, time::Instant};
use tracing::Level;
/// used to collect multiple traces and not spam the console
pub(crate) struct DeferredTracer<T: Eq + Hash> {
level: Level,
items: HashMap<T, u64>,
last: Instant,
last_cnt: u32,
}
impl<T: Eq + Hash> DeferredTracer<T> {
pub(crate) fn new(level: Level) -> Self {
Self {
level,
items: HashMap::new(),
last: Instant::now(),
last_cnt: 0,
}
}
pub(crate) fn log(&mut self, t: T) {
if tracing::level_enabled!(self.level) {
*self.items.entry(t).or_default() += 1;
self.last = Instant::now();
self.last_cnt += 1;
} else {
}
}
pub(crate) fn print(&mut self) -> Option<HashMap<T, u64>> {
const MAX_LOGS: u32 = 10_000;
const MAX_SECS: u64 = 1;
if tracing::level_enabled!(self.level)
&& (self.last_cnt > MAX_LOGS || self.last.elapsed().as_secs() >= MAX_SECS)
{
if self.last_cnt > MAX_LOGS {
tracing::debug!("this seems to be logged continuesly");
}
Some(std::mem::take(&mut self.items))
} else {
None
}
}
}

117
network/src/util.rs Normal file
View File

@ -0,0 +1,117 @@
use core::hash::Hash;
use std::{collections::HashMap, time::Instant};
use tracing::Level;
/// used to collect multiple traces and not spam the console
pub(crate) struct DeferredTracer<T: Eq + Hash> {
level: Level,
items: HashMap<T, u64>,
last: Instant,
last_cnt: u32,
}
impl<T: Eq + Hash> DeferredTracer<T> {
pub(crate) fn new(level: Level) -> Self {
Self {
level,
items: HashMap::new(),
last: Instant::now(),
last_cnt: 0,
}
}
pub(crate) fn log(&mut self, t: T) {
if tracing::level_enabled!(self.level) {
*self.items.entry(t).or_default() += 1;
self.last = Instant::now();
self.last_cnt += 1;
} else {
}
}
pub(crate) fn print(&mut self) -> Option<HashMap<T, u64>> {
const MAX_LOGS: u32 = 10_000;
const MAX_SECS: u64 = 1;
if tracing::level_enabled!(self.level)
&& (self.last_cnt > MAX_LOGS || self.last.elapsed().as_secs() >= MAX_SECS)
{
if self.last_cnt > MAX_LOGS {
tracing::debug!("this seems to be logged continuesly");
}
Some(std::mem::take(&mut self.items))
} else {
None
}
}
}
/// Used for storing Protocols in a Participant or Stream <-> Protocol
pub(crate) struct SortedVec<K, V> {
pub data: Vec<(K, V)>,
}
impl<K, V> Default for SortedVec<K, V> {
fn default() -> Self { Self { data: vec![] } }
}
impl<K, V> SortedVec<K, V>
where
K: Ord + Copy,
{
pub fn insert(&mut self, k: K, v: V) {
self.data.push((k, v));
self.data.sort_by_key(|&(k, _)| k);
}
pub fn delete(&mut self, k: &K) -> Option<V> {
if let Ok(i) = self.data.binary_search_by_key(k, |&(k, _)| k) {
Some(self.data.remove(i).1)
} else {
None
}
}
pub fn get(&self, k: &K) -> Option<&V> {
if let Ok(i) = self.data.binary_search_by_key(k, |&(k, _)| k) {
Some(&self.data[i].1)
} else {
None
}
}
pub fn get_mut(&mut self, k: &K) -> Option<&mut V> {
if let Ok(i) = self.data.binary_search_by_key(k, |&(k, _)| k) {
Some(&mut self.data[i].1)
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sorted_vec() {
let mut vec = SortedVec::default();
vec.insert(10, "Hello");
println!("{:?}", vec.data);
vec.insert(30, "World");
println!("{:?}", vec.data);
vec.insert(20, " ");
println!("{:?}", vec.data);
assert_eq!(vec.data[0].1, "Hello");
assert_eq!(vec.data[1].1, " ");
assert_eq!(vec.data[2].1, "World");
assert_eq!(vec.get(&30), Some(&"World"));
assert_eq!(vec.get_mut(&20), Some(&mut " "));
assert_eq!(vec.get(&10), Some(&"Hello"));
assert_eq!(vec.delete(&40), None);
assert_eq!(vec.delete(&10), Some("Hello"));
assert_eq!(vec.delete(&10), None);
assert_eq!(vec.get(&30), Some(&"World"));
assert_eq!(vec.get_mut(&20), Some(&mut " "));
assert_eq!(vec.get(&10), None);
}
}

View File

@ -1,5 +1,5 @@
use common_net::msg::{ClientType, ServerGeneral, ServerMsg};
use network::{Message, Participant, Stream, StreamError};
use network::{Message, Participant, Stream, StreamError, StreamParams};
use serde::{de::DeserializeOwned, Serialize};
use specs::Component;
use specs_idvs::IdvStorage;
@ -26,6 +26,13 @@ pub struct Client {
character_screen_stream: Mutex<Stream>,
in_game_stream: Mutex<Stream>,
terrain_stream: Mutex<Stream>,
general_stream_params: StreamParams,
ping_stream_params: StreamParams,
register_stream_params: StreamParams,
character_screen_stream_params: StreamParams,
in_game_stream_params: StreamParams,
terrain_stream_params: StreamParams,
}
pub struct PreparedMsg {
@ -50,6 +57,12 @@ impl Client {
in_game_stream: Stream,
terrain_stream: Stream,
) -> Self {
let general_stream_params = general_stream.params();
let ping_stream_params = ping_stream.params();
let register_stream_params = register_stream.params();
let character_screen_stream_params = character_screen_stream.params();
let in_game_stream_params = in_game_stream.params();
let terrain_stream_params = terrain_stream.params();
Client {
client_type,
participant: Some(participant),
@ -62,6 +75,12 @@ impl Client {
character_screen_stream: Mutex::new(character_screen_stream),
in_game_stream: Mutex::new(in_game_stream),
terrain_stream: Mutex::new(terrain_stream),
general_stream_params,
ping_stream_params,
register_stream_params,
character_screen_stream_params,
in_game_stream_params,
terrain_stream_params,
}
}
@ -138,9 +157,9 @@ impl Client {
pub(crate) fn prepare<M: Into<ServerMsg>>(&self, msg: M) -> PreparedMsg {
match msg.into() {
ServerMsg::Info(m) => PreparedMsg::new(0, &m, &self.register_stream),
ServerMsg::Init(m) => PreparedMsg::new(0, &m, &self.register_stream),
ServerMsg::RegisterAnswer(m) => PreparedMsg::new(0, &m, &self.register_stream),
ServerMsg::Info(m) => PreparedMsg::new(0, &m, &self.register_stream_params),
ServerMsg::Init(m) => PreparedMsg::new(0, &m, &self.register_stream_params),
ServerMsg::RegisterAnswer(m) => PreparedMsg::new(0, &m, &self.register_stream_params),
ServerMsg::General(g) => {
match g {
//Character Screen related
@ -149,7 +168,7 @@ impl Client {
| ServerGeneral::CharacterActionError(_)
| ServerGeneral::CharacterCreated(_)
| ServerGeneral::CharacterSuccess => {
PreparedMsg::new(1, &g, &self.character_screen_stream)
PreparedMsg::new(1, &g, &self.character_screen_stream_params)
},
//Ingame related
ServerGeneral::GroupUpdate(_)
@ -164,12 +183,12 @@ impl Client {
| ServerGeneral::SiteEconomy(_)
| ServerGeneral::UpdatePendingTrade(_, _, _)
| ServerGeneral::FinishedTrade(_) => {
PreparedMsg::new(2, &g, &self.in_game_stream)
PreparedMsg::new(2, &g, &self.in_game_stream_params)
},
//Ingame related, terrain
ServerGeneral::TerrainChunkUpdate { .. }
| ServerGeneral::TerrainBlockUpdates(_) => {
PreparedMsg::new(5, &g, &self.terrain_stream)
PreparedMsg::new(5, &g, &self.terrain_stream_params)
},
// Always possible
ServerGeneral::PlayerListUpdate(_)
@ -183,11 +202,11 @@ impl Client {
| ServerGeneral::DeleteEntity(_)
| ServerGeneral::Disconnect(_)
| ServerGeneral::Notification(_) => {
PreparedMsg::new(3, &g, &self.general_stream)
PreparedMsg::new(3, &g, &self.general_stream_params)
},
}
},
ServerMsg::Ping(m) => PreparedMsg::new(4, &m, &self.ping_stream),
ServerMsg::Ping(m) => PreparedMsg::new(4, &m, &self.ping_stream_params),
}
}
@ -209,10 +228,10 @@ impl Client {
}
impl PreparedMsg {
fn new<M: Serialize + ?Sized>(id: u8, msg: &M, stream: &Mutex<Stream>) -> PreparedMsg {
fn new<M: Serialize + ?Sized>(id: u8, msg: &M, stream_params: &StreamParams) -> PreparedMsg {
Self {
stream_id: id,
message: Message::serialize(&msg, &stream.lock().unwrap()),
message: Message::serialize(&msg, stream_params.clone()),
}
}
}