mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
514d5db038
- now last digit version is compatible 0.6.0 will connect to 0.6.1 - the TCP DATA Frames no longer contain START field, as it's not needed - the TCP OPENSTREAM Frames will now contain the BANDWIDTH field - MID is not Protocol internal Update network - update API with Bandwidth Update veloren - introduce better runtime and `async` things that are IO bound. - Remove `uvth` and instead use `tokio::runtime::Runtime::spawn_blocking` - remove futures_execute from client and server use tokio::runtime::Runtime instead - give threads a Name
261 lines
7.6 KiB
Rust
261 lines
7.6 KiB
Rust
use async_channel::*;
|
|
use async_trait::async_trait;
|
|
use bytes::{Bytes, BytesMut};
|
|
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
|
|
use std::{sync::Arc, time::Duration};
|
|
use tokio::runtime::Runtime;
|
|
use veloren_network_protocol::{
|
|
InitProtocol, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid, Promises, ProtocolError,
|
|
ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, RecvProtocol, SendProtocol, Sid,
|
|
TcpRecvProtocol, TcpSendProtocol, UnreliableDrain, UnreliableSink, _internal::OTFrame,
|
|
};
|
|
|
|
fn frame_serialize(frame: OTFrame, buffer: &mut BytesMut) { frame.write_bytes(buffer); }
|
|
|
|
async fn handshake<S, R>(p: [(S, R); 2])
|
|
where
|
|
S: SendProtocol,
|
|
R: RecvProtocol,
|
|
(S, R): InitProtocol,
|
|
{
|
|
let [mut p1, mut p2] = p;
|
|
tokio::join!(
|
|
async {
|
|
p1.initialize(true, Pid::fake(2), 1337).await.unwrap();
|
|
p1
|
|
},
|
|
async {
|
|
p2.initialize(false, Pid::fake(3), 42).await.unwrap();
|
|
p2
|
|
}
|
|
);
|
|
}
|
|
|
|
async fn send_msg<T: SendProtocol>(mut s: T, data: Bytes, cnt: usize) {
|
|
let bandwidth = data.len() as u64 + 100;
|
|
const SEC1: Duration = Duration::from_secs(1);
|
|
|
|
s.send(ProtocolEvent::OpenStream {
|
|
sid: Sid::new(12),
|
|
prio: 0,
|
|
promises: Promises::ORDERED,
|
|
guaranteed_bandwidth: 100_000,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
for i in 0..cnt {
|
|
s.send(ProtocolEvent::Message {
|
|
sid: Sid::new(12),
|
|
data: data.clone(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
if i.rem_euclid(50) == 0 {
|
|
s.flush(bandwidth * 50_u64, SEC1).await.unwrap();
|
|
}
|
|
}
|
|
s.flush(bandwidth * 1000_u64, SEC1).await.unwrap();
|
|
}
|
|
|
|
async fn recv_msg<T: RecvProtocol>(mut r: T, cnt: usize) {
|
|
r.recv().await.unwrap();
|
|
|
|
for _ in 0..cnt {
|
|
r.recv().await.unwrap();
|
|
}
|
|
}
|
|
|
|
async fn send_and_recv_msg<S: SendProtocol, R: RecvProtocol>(
|
|
p: [(S, R); 2],
|
|
data: Bytes,
|
|
cnt: usize,
|
|
) {
|
|
let [p1, p2] = p;
|
|
let (s, r) = (p1.0, p2.1);
|
|
|
|
tokio::join!(send_msg(s, data, cnt), recv_msg(r, cnt));
|
|
}
|
|
|
|
fn rt() -> Runtime {
|
|
tokio::runtime::Builder::new_current_thread()
|
|
.build()
|
|
.unwrap()
|
|
}
|
|
|
|
fn criterion_util(c: &mut Criterion) {
|
|
c.bench_function("mpsc_handshake", |b| {
|
|
b.to_async(rt())
|
|
.iter_with_setup(|| utils::ac_bound(10, None), handshake)
|
|
});
|
|
c.bench_function("frame_serialize_short", |b| {
|
|
let mut buffer = BytesMut::with_capacity(1500);
|
|
let frame = OTFrame::Data {
|
|
mid: 65,
|
|
data: Bytes::from(&b"hello_world"[..]),
|
|
};
|
|
b.iter_with_setup(
|
|
|| frame.clone(),
|
|
|frame| frame_serialize(frame, &mut buffer),
|
|
)
|
|
});
|
|
}
|
|
|
|
fn criterion_mpsc(c: &mut Criterion) {
|
|
let mut c = c.benchmark_group("mpsc");
|
|
c.significance_level(0.1).sample_size(10);
|
|
c.throughput(Throughput::Bytes(1000000000))
|
|
.bench_function("1GB_in_10000_msg", |b| {
|
|
let buffer = Bytes::from(&[155u8; 100_000][..]);
|
|
b.to_async(rt()).iter_with_setup(
|
|
|| (buffer.clone(), utils::ac_bound(10, None)),
|
|
|(b, p)| send_and_recv_msg(p, b, 10_000),
|
|
)
|
|
});
|
|
c.throughput(Throughput::Elements(1000000))
|
|
.bench_function("1000000_tiny_msg", |b| {
|
|
let buffer = Bytes::from(&[3u8; 5][..]);
|
|
b.to_async(rt()).iter_with_setup(
|
|
|| (buffer.clone(), utils::ac_bound(10, None)),
|
|
|(b, p)| send_and_recv_msg(p, b, 1_000_000),
|
|
)
|
|
});
|
|
c.finish();
|
|
}
|
|
|
|
fn criterion_tcp(c: &mut Criterion) {
|
|
let mut c = c.benchmark_group("tcp");
|
|
c.significance_level(0.1).sample_size(10);
|
|
c.throughput(Throughput::Bytes(1000000000))
|
|
.bench_function("1GB_in_10000_msg", |b| {
|
|
let buf = Bytes::from(&[155u8; 100_000][..]);
|
|
b.to_async(rt()).iter_with_setup(
|
|
|| (buf.clone(), utils::tcp_bound(10000, None)),
|
|
|(b, p)| send_and_recv_msg(p, b, 10_000),
|
|
)
|
|
});
|
|
c.throughput(Throughput::Elements(1000000))
|
|
.bench_function("1000000_tiny_msg", |b| {
|
|
let buf = Bytes::from(&[3u8; 5][..]);
|
|
b.to_async(rt()).iter_with_setup(
|
|
|| (buf.clone(), utils::tcp_bound(10000, None)),
|
|
|(b, p)| send_and_recv_msg(p, b, 1_000_000),
|
|
)
|
|
});
|
|
c.finish();
|
|
}
|
|
|
|
criterion_group!(benches, criterion_util, criterion_mpsc, criterion_tcp);
|
|
criterion_main!(benches);
|
|
|
|
mod utils {
|
|
use super::*;
|
|
|
|
pub struct ACDrain {
|
|
sender: Sender<MpscMsg>,
|
|
}
|
|
|
|
pub struct ACSink {
|
|
receiver: Receiver<MpscMsg>,
|
|
}
|
|
|
|
pub fn ac_bound(
|
|
cap: usize,
|
|
metrics: Option<ProtocolMetricCache>,
|
|
) -> [(MpscSendProtocol<ACDrain>, MpscRecvProtocol<ACSink>); 2] {
|
|
let (s1, r1) = async_channel::bounded(cap);
|
|
let (s2, r2) = async_channel::bounded(cap);
|
|
let m = metrics.unwrap_or_else(|| {
|
|
ProtocolMetricCache::new("mpsc", Arc::new(ProtocolMetrics::new().unwrap()))
|
|
});
|
|
[
|
|
(
|
|
MpscSendProtocol::new(ACDrain { sender: s1 }, m.clone()),
|
|
MpscRecvProtocol::new(ACSink { receiver: r2 }, m.clone()),
|
|
),
|
|
(
|
|
MpscSendProtocol::new(ACDrain { sender: s2 }, m.clone()),
|
|
MpscRecvProtocol::new(ACSink { receiver: r1 }, m),
|
|
),
|
|
]
|
|
}
|
|
|
|
pub struct TcpDrain {
|
|
sender: Sender<BytesMut>,
|
|
}
|
|
|
|
pub struct TcpSink {
|
|
receiver: Receiver<BytesMut>,
|
|
}
|
|
|
|
/// emulate Tcp protocol on Channels
|
|
pub fn tcp_bound(
|
|
cap: usize,
|
|
metrics: Option<ProtocolMetricCache>,
|
|
) -> [(TcpSendProtocol<TcpDrain>, TcpRecvProtocol<TcpSink>); 2] {
|
|
let (s1, r1) = async_channel::bounded(cap);
|
|
let (s2, r2) = async_channel::bounded(cap);
|
|
let m = metrics.unwrap_or_else(|| {
|
|
ProtocolMetricCache::new("tcp", Arc::new(ProtocolMetrics::new().unwrap()))
|
|
});
|
|
[
|
|
(
|
|
TcpSendProtocol::new(TcpDrain { sender: s1 }, m.clone()),
|
|
TcpRecvProtocol::new(TcpSink { receiver: r2 }, m.clone()),
|
|
),
|
|
(
|
|
TcpSendProtocol::new(TcpDrain { sender: s2 }, m.clone()),
|
|
TcpRecvProtocol::new(TcpSink { receiver: r1 }, m),
|
|
),
|
|
]
|
|
}
|
|
|
|
#[async_trait]
|
|
impl UnreliableDrain for ACDrain {
|
|
type DataFormat = MpscMsg;
|
|
|
|
async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> {
|
|
self.sender
|
|
.send(data)
|
|
.await
|
|
.map_err(|_| ProtocolError::Closed)
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl UnreliableSink for ACSink {
|
|
type DataFormat = MpscMsg;
|
|
|
|
async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError> {
|
|
self.receiver
|
|
.recv()
|
|
.await
|
|
.map_err(|_| ProtocolError::Closed)
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl UnreliableDrain for TcpDrain {
|
|
type DataFormat = BytesMut;
|
|
|
|
async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> {
|
|
self.sender
|
|
.send(data)
|
|
.await
|
|
.map_err(|_| ProtocolError::Closed)
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl UnreliableSink for TcpSink {
|
|
type DataFormat = BytesMut;
|
|
|
|
async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError> {
|
|
self.receiver
|
|
.recv()
|
|
.await
|
|
.map_err(|_| ProtocolError::Closed)
|
|
}
|
|
}
|
|
}
|