veloren/network/benches/speed.rs
Marcel Märtens 9028578bc8 Change the way Network is dropped.
Instead of keeping Runtime and manually spawn a task on `drop` this task is spawned at start and will wait to be triggered.
The `drop` methods then wait for completion, UNLESS they are in a async context, then they MUST NOT BLOCK (deadlock potential), so they defer it to the Runtime and HOPE for the runtime to exist long enough.
This get rid of the weird `block_in_place` which is only accessable with `rt-multi-threaded` and has some disadvantages.
We also wont requiere the runtime to be active all the time. Though its needed for a clean shutdown
2021-03-03 11:28:40 +01:00

144 lines
4.7 KiB
Rust

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use std::{net::SocketAddr, sync::Arc};
use tokio::{runtime::Runtime, sync::Mutex};
use veloren_network::{Message, Network, Participant, Pid, Promises, ProtocolAddr, Stream};
fn serialize(data: &[u8], stream: &Stream) { let _ = Message::serialize(data, &stream); }
async fn stream_msg(s1_a: Arc<Mutex<Stream>>, s1_b: Arc<Mutex<Stream>>, data: &[u8], cnt: usize) {
let mut s1_b = s1_b.lock().await;
let m = Message::serialize(&data, &s1_b);
std::thread::spawn(move || {
let mut s1_a = s1_a.try_lock().unwrap();
for _ in 0..cnt {
s1_a.send_raw(&m).unwrap();
}
});
for _ in 0..cnt {
s1_b.recv_raw().await.unwrap();
}
}
fn rt() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap()
}
fn criterion_util(c: &mut Criterion) {
let mut c = c.benchmark_group("net_util");
c.significance_level(0.1).sample_size(100);
let (r, _n_a, p_a, s1_a, _n_b, _p_b, _s1_b) =
network_participant_stream(ProtocolAddr::Mpsc(5000));
let s2_a = r.block_on(p_a.open(4, Promises::COMPRESSED, 0)).unwrap();
c.throughput(Throughput::Bytes(1000))
.bench_function("message_serialize", |b| {
let data = vec![0u8; 1000];
b.iter(|| serialize(&data, &s1_a))
});
c.throughput(Throughput::Bytes(1000))
.bench_function("message_serialize_compress", |b| {
let data = vec![0u8; 1000];
b.iter(|| serialize(&data, &s2_a))
});
}
fn criterion_mpsc(c: &mut Criterion) {
let mut c = c.benchmark_group("net_mpsc");
c.significance_level(0.1).sample_size(10);
let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, s1_b) =
network_participant_stream(ProtocolAddr::Mpsc(5000));
let s1_a = Arc::new(Mutex::new(s1_a));
let s1_b = Arc::new(Mutex::new(s1_b));
c.throughput(Throughput::Bytes(100000000)).bench_function(
BenchmarkId::new("100MB_in_10000_msg", ""),
|b| {
let data = vec![155u8; 100_000];
b.to_async(rt()).iter_with_setup(
|| (Arc::clone(&s1_a), Arc::clone(&s1_b)),
|(s1_a, s1_b)| stream_msg(s1_a, s1_b, &data, 1_000),
)
},
);
c.throughput(Throughput::Elements(100000)).bench_function(
BenchmarkId::new("100000_tiny_msg", ""),
|b| {
let data = vec![3u8; 5];
b.to_async(rt()).iter_with_setup(
|| (Arc::clone(&s1_a), Arc::clone(&s1_b)),
|(s1_a, s1_b)| stream_msg(s1_a, s1_b, &data, 100_000),
)
},
);
c.finish();
drop((_n_a, _p_a, _n_b, _p_b));
}
fn criterion_tcp(c: &mut Criterion) {
let mut c = c.benchmark_group("net_tcp");
c.significance_level(0.1).sample_size(10);
let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, s1_b) =
network_participant_stream(ProtocolAddr::Tcp(SocketAddr::from(([127, 0, 0, 1], 5000))));
let s1_a = Arc::new(Mutex::new(s1_a));
let s1_b = Arc::new(Mutex::new(s1_b));
c.throughput(Throughput::Bytes(100000000)).bench_function(
BenchmarkId::new("100MB_in_1000_msg", ""),
|b| {
let data = vec![155u8; 100_000];
b.to_async(rt()).iter_with_setup(
|| (Arc::clone(&s1_a), Arc::clone(&s1_b)),
|(s1_a, s1_b)| stream_msg(s1_a, s1_b, &data, 1_000),
)
},
);
c.throughput(Throughput::Elements(100000)).bench_function(
BenchmarkId::new("100000_tiny_msg", ""),
|b| {
let data = vec![3u8; 5];
b.to_async(rt()).iter_with_setup(
|| (Arc::clone(&s1_a), Arc::clone(&s1_b)),
|(s1_a, s1_b)| stream_msg(s1_a, s1_b, &data, 100_000),
)
},
);
c.finish();
drop((_n_a, _p_a, _n_b, _p_b));
}
criterion_group!(benches, criterion_util, criterion_mpsc, criterion_tcp);
criterion_main!(benches);
pub fn network_participant_stream(
addr: ProtocolAddr,
) -> (
Runtime,
Network,
Participant,
Stream,
Network,
Participant,
Stream,
) {
let runtime = Runtime::new().unwrap();
let (n_a, p1_a, s1_a, n_b, p1_b, s1_b) = runtime.block_on(async {
let n_a = Network::new(Pid::fake(0), &runtime);
let n_b = Network::new(Pid::fake(1), &runtime);
n_a.listen(addr.clone()).await.unwrap();
let p1_b = n_b.connect(addr).await.unwrap();
let p1_a = n_a.connected().await.unwrap();
let s1_a = p1_a.open(4, Promises::empty(), 0).await.unwrap();
let s1_b = p1_b.opened().await.unwrap();
(n_a, p1_a, s1_a, n_b, p1_b, s1_b)
});
(runtime, n_a, p1_a, s1_a, n_b, p1_b, s1_b)
}