Add a speedtest program to benchmark networking

This commit is contained in:
Marcel Märtens 2020-02-25 19:30:50 +01:00
parent 35233d07f9
commit a6f1e3f176
10 changed files with 353 additions and 126 deletions

27
Cargo.lock generated
View File

@ -573,9 +573,13 @@ version = "2.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9"
dependencies = [
"ansi_term",
"atty",
"bitflags",
"strsim 0.8.0",
"textwrap",
"unicode-width",
"vec_map",
]
[[package]]
@ -1049,7 +1053,7 @@ dependencies = [
"ident_case",
"proc-macro2 1.0.9",
"quote 1.0.3",
"strsim",
"strsim 0.9.3",
"syn 1.0.16",
]
@ -2813,6 +2817,21 @@ dependencies = [
"winapi 0.3.8",
]
[[package]]
name = "network-speed"
version = "0.1.0"
dependencies = [
"bincode",
"clap",
"futures 0.3.4",
"serde",
"tracing",
"tracing-subscriber",
"uuid 0.8.1",
"uvth",
"veloren-network",
]
[[package]]
name = "nix"
version = "0.14.1"
@ -4454,6 +4473,12 @@ dependencies = [
"bytes 0.4.12",
]
[[package]]
name = "strsim"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "strsim"
version = "0.9.3"

View File

@ -9,8 +9,9 @@ members = [
"server-cli",
"voxygen",
"world",
"network",
"network",
"network/tools/tcp-loadtest",
"network/tools/network-speed",
]
# default profile for devs, fast to compile, okay enough to run, no debug information

View File

@ -15,10 +15,12 @@ serde = "1.0"
serde_derive = "1.0"
mio = "0.6"
tracing = "0.1"
tracing-subscriber = "0.2.0-alpha.4"
byteorder = "1.3"
mio-extras = "2.0"
futures = "0.3"
prometheus = "0.7"
uuid = { version = "0.8", features = ["serde", "v4"] }
tlid = { path = "../../tlid", features = ["serde"]}
tlid = { path = "../../tlid", features = ["serde"]}
[dev-dependencies]
futures = "0.3"
tracing-subscriber = "0.2.0-alpha.4"

View File

@ -15,9 +15,8 @@ use mio::{
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
collections::HashMap,
marker::PhantomData,
sync::{
mpsc::{self, Receiver, Sender, TryRecvError},
mpsc::{self, Receiver, TryRecvError},
Arc, RwLock,
},
};
@ -270,8 +269,8 @@ impl Stream {
// updated by workes via a channel and needs to be intepreted on a send but it
// should almost ever be empty except for new channel creations and stream
// creations!
for worker in self.network_controller.iter() {
worker
for controller in self.network_controller.iter() {
controller
.get_tx()
.send(CtrlMsg::Send(OutGoingMessage {
buffer: messagebuffer.clone(),
@ -284,6 +283,7 @@ impl Stream {
Ok(())
}
//TODO: remove the Option, async should make it unnecesarry!
pub fn recv<M: DeserializeOwned>(&self) -> Result<Option<M>, StreamError> {
match self.msg_rx.try_recv() {
Ok(msg) => {

View File

@ -10,6 +10,10 @@ mod types;
mod udp;
mod worker;
pub use api::{
Address, Network, NetworkError, Participant, ParticipantError, Promise, Stream, StreamError,
};
#[cfg(test)]
pub mod tests {
use crate::api::*;
@ -63,7 +67,6 @@ pub mod tests {
fn aaa() { test_tracing(); }
#[test]
#[ignore]
fn client_server() {
let thread_pool = Arc::new(
ThreadPoolBuilder::new()
@ -82,7 +85,7 @@ pub mod tests {
let p1 = block_on(n1.connect(&a2)).unwrap(); //await
let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap();
s1.send("Hello World");
assert!(s1.send("Hello World").is_ok());
let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1
let s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1
@ -90,7 +93,7 @@ pub mod tests {
let s = block_on_recv(&s1_n2);
assert_eq!(s, Ok("Hello World".to_string()));
p1.close(s1);
assert!(p1.close(s1).is_ok());
}
#[test]
@ -118,14 +121,11 @@ pub mod tests {
let s4 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap();
let s5 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap();
thread::sleep(Duration::from_millis(3));
s3.send("Hello World3");
thread::sleep(Duration::from_millis(3));
s1.send("Hello World1");
s5.send("Hello World5");
s2.send("Hello World2");
s4.send("Hello World4");
thread::sleep(Duration::from_millis(3));
assert!(s3.send("Hello World3").is_ok());
assert!(s1.send("Hello World1").is_ok());
assert!(s5.send("Hello World5").is_ok());
assert!(s2.send("Hello World2").is_ok());
assert!(s4.send("Hello World4").is_ok());
let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1
let s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1
@ -138,20 +138,15 @@ pub mod tests {
let s = block_on_recv(&s3_n2);
assert_eq!(s, Ok("Hello World3".to_string()));
info!("1 read");
let s = block_on_recv(&s1_n2);
assert_eq!(s, Ok("Hello World1".to_string()));
info!("2 read");
let s = block_on_recv(&s2_n2);
assert_eq!(s, Ok("Hello World2".to_string()));
info!("3 read");
let s = block_on_recv(&s5_n2);
assert_eq!(s, Ok("Hello World5".to_string()));
info!("4 read");
let s = block_on_recv(&s4_n2);
assert_eq!(s, Ok("Hello World4".to_string()));
info!("5 read");
p1.close(s1);
assert!(p1.close(s1).is_ok());
}
}

View File

@ -2,10 +2,10 @@ use bincode;
use serde::{de::DeserializeOwned, Serialize};
//use std::collections::VecDeque;
use crate::types::{Mid, Sid};
use byteorder::{NetworkEndian, ReadBytesExt};
use std::sync::Arc;
use tracing::*;
#[derive(Debug)]
pub(crate) struct MessageBuffer {
// use VecDeque for msg storage, because it allows to quickly remove data from front.
//however VecDeque needs custom bincode code, but it's possible
@ -45,6 +45,31 @@ pub(crate) fn deserialize<M: DeserializeOwned>(buffer: MessageBuffer) -> M {
decoded
}
impl std::fmt::Debug for MessageBuffer {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
//TODO: small messages!
let len = self.data.len();
if len > 20 {
let n1 = (&self.data[0..4]).read_u32::<NetworkEndian>().unwrap();
let n2 = (&self.data[4..8]).read_u32::<NetworkEndian>().unwrap();
let n3 = (&self.data[8..12]).read_u32::<NetworkEndian>().unwrap();
write!(
f,
"MessageBuffer(len: {}, {}, {}, {}, {:?}..{:?})",
len,
n1,
n2,
n3,
&self.data[13..16],
&self.data[len - 8..len]
)
} else {
write!(f, "MessageBuffer(len: {}, {:?})", len, &self.data[..])
}
}
}
#[cfg(test)]
mod tests {
use crate::message::*;

View File

@ -13,7 +13,7 @@ pub(crate) struct TcpChannel {
impl TcpChannel {
pub fn new(endpoint: TcpStream) -> Self {
let mut b = vec![0; 200];
let mut b = vec![0; 1600000];
Self {
endpoint,
read_buffer: b.clone(),
@ -33,17 +33,38 @@ impl ChannelProtocol for TcpChannel {
trace!("incomming message with len: {}", n);
let mut cur = std::io::Cursor::new(&self.read_buffer[..n]);
while cur.position() < n as u64 {
let round_start = cur.position();
let r: Result<Frame, _> = bincode::deserialize_from(&mut cur);
match r {
Ok(frame) => result.push(frame),
Err(e) => {
error!(
let newlen = self.read_buffer.len() * 2;
let debug_part = &self.read_buffer[(round_start as usize)
..std::cmp::min(n as usize, (round_start + 10) as usize)];
warn!(
?self,
?e,
"failure parsing a message with len: {}, starting with: {:?}",
n,
&self.read_buffer[0..std::cmp::min(n, 10)]
?round_start,
"message cant be parsed, probably because buffer isn't large \
enough, starting with: {:?}, increase to {}",
debug_part,
newlen
);
error!(
"please please please find a solution, either we need to keep the \
buffer hight 1500 and hope for the other part to coorporate or \
we need a way to keep some data in read_buffer till next call or \
have a loop around it ... etc... which is error prone, so i dont \
want to do it!"
);
if newlen > 204800000 {
error!(
"something is seriossly broken with our messages, skipp the \
resize"
);
} else {
self.read_buffer.resize(newlen as usize, 0);
}
break;
},
}

View File

@ -1,7 +1,6 @@
use crate::{
channel::{Channel, ChannelProtocol, ChannelProtocols},
controller::Controller,
message::InCommingMessage,
metrics::NetworkMetrics,
tcp::TcpChannel,
types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg, TokenObjects},
@ -10,7 +9,7 @@ use mio::{self, Poll, PollOpt, Ready, Token};
use mio_extras::channel::{Receiver, Sender};
use std::{
collections::HashMap,
sync::{mpsc, mpsc::TryRecvError, Arc, RwLock},
sync::{mpsc::TryRecvError, Arc, RwLock},
time::Instant,
};
use tlid;
@ -101,108 +100,109 @@ impl Worker {
}
fn handle_ctl(&mut self) -> bool {
let msg = match self.ctrl_rx.try_recv() {
Ok(msg) => msg,
Err(TryRecvError::Empty) => {
return false;
},
Err(err) => {
panic!("Unexpected error '{}'", err);
},
};
loop {
let msg = match self.ctrl_rx.try_recv() {
Ok(msg) => msg,
Err(TryRecvError::Empty) => {
return false;
},
Err(err) => {
panic!("Unexpected error '{}'", err);
},
};
match msg {
CtrlMsg::Shutdown => {
debug!("Shutting Down");
for (tok, obj) in self.mio_tokens.tokens.iter_mut() {
if let TokenObjects::Channel(channel) = obj {
channel.shutdown();
channel.tick_send();
}
}
return true;
},
CtrlMsg::Register(handle, interest, opts) => {
let tok = self.mio_tokens.construct();
match &handle {
TokenObjects::TcpListener(h) => {
self.poll.register(h, tok, interest, opts).unwrap()
},
TokenObjects::Channel(channel) => {
match channel.get_protocol() {
ChannelProtocols::Tcp(c) => {
self.poll.register(c.get_handle(), tok, interest, opts)
},
ChannelProtocols::Udp(c) => {
self.poll.register(c.get_handle(), tok, interest, opts)
},
ChannelProtocols::Mpsc(c) => {
self.poll.register(c.get_handle(), tok, interest, opts)
},
}
.unwrap();
},
}
debug!(?handle, ?tok, "Registered new handle");
self.mio_tokens.insert(tok, handle);
},
CtrlMsg::OpenStream {
pid,
prio,
promises,
msg_tx,
return_sid,
} => {
let mut handled = false;
for (tok, obj) in self.mio_tokens.tokens.iter_mut() {
if let TokenObjects::Channel(channel) = obj {
if Some(pid) == channel.remote_pid {
let (msg_tx, msg_rx) = mpsc::channel::<InCommingMessage>();
let sid = channel.open_stream(prio, promises, msg_tx);
return_sid.send(sid);
match msg {
CtrlMsg::Shutdown => {
debug!("Shutting Down");
for (tok, obj) in self.mio_tokens.tokens.iter_mut() {
if let TokenObjects::Channel(channel) = obj {
channel.shutdown();
channel.tick_send();
error!("handle msg_tx");
handled = true;
break;
}
}
}
if !handled {
error!(?pid, "couldn't open Stream, didn't found pid");
}
},
CtrlMsg::CloseStream { pid, sid } => {
let mut handled = false;
for to in self.mio_tokens.tokens.values_mut() {
if let TokenObjects::Channel(channel) = to {
if Some(pid) == channel.remote_pid {
channel.close_stream(sid); //TODO: check participant
return true;
},
CtrlMsg::Register(handle, interest, opts) => {
let tok = self.mio_tokens.construct();
match &handle {
TokenObjects::TcpListener(h) => {
self.poll.register(h, tok, interest, opts).unwrap()
},
TokenObjects::Channel(channel) => {
match channel.get_protocol() {
ChannelProtocols::Tcp(c) => {
self.poll.register(c.get_handle(), tok, interest, opts)
},
ChannelProtocols::Udp(c) => {
self.poll.register(c.get_handle(), tok, interest, opts)
},
ChannelProtocols::Mpsc(c) => {
self.poll.register(c.get_handle(), tok, interest, opts)
},
}
.unwrap();
},
}
debug!(?handle, ?tok, "Registered new handle");
self.mio_tokens.insert(tok, handle);
},
CtrlMsg::OpenStream {
pid,
prio,
promises,
msg_tx,
return_sid,
} => {
let mut handled = false;
for (tok, obj) in self.mio_tokens.tokens.iter_mut() {
if let TokenObjects::Channel(channel) = obj {
if Some(pid) == channel.remote_pid {
let sid = channel.open_stream(prio, promises, msg_tx);
return_sid.send(sid);
channel.tick_send();
handled = true;
break;
}
}
}
if !handled {
error!(?pid, "couldn't open Stream, didn't found pid");
}
},
CtrlMsg::CloseStream { pid, sid } => {
let mut handled = false;
for to in self.mio_tokens.tokens.values_mut() {
if let TokenObjects::Channel(channel) = to {
if Some(pid) == channel.remote_pid {
channel.close_stream(sid); //TODO: check participant
channel.tick_send();
handled = true;
break;
}
}
}
if !handled {
error!(?pid, "couldn't close Stream, didn't found pid");
}
},
CtrlMsg::Send(outgoing) => {
let mut handled = false;
for to in self.mio_tokens.tokens.values_mut() {
if let TokenObjects::Channel(channel) = to {
channel.send(outgoing); //TODO: check participant
channel.tick_send();
handled = true;
break;
}
}
}
if !handled {
error!(?pid, "couldn't close Stream, didn't found pid");
}
},
CtrlMsg::Send(outgoing) => {
let mut handled = false;
for to in self.mio_tokens.tokens.values_mut() {
if let TokenObjects::Channel(channel) = to {
channel.send(outgoing); //TODO: check participant
channel.tick_send();
handled = true;
break;
if !handled {
error!(
"help, we should check here for stream data, but its in channel ...."
);
}
}
if !handled {
error!("help, we should check here for stream data, but its in channel ....");
}
},
};
false
},
};
}
}
fn handle_tok(&mut self, event: &mio::Event) {

View File

@ -0,0 +1,18 @@
[package]
name = "network-speed"
version = "0.1.0"
authors = ["Marcel Märtens <marcel.cochem@googlemail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
uvth = "3.1"
network = { package = "veloren-network", path = "../../../network" }
clap = "2.33"
uuid = { version = "0.8", features = ["serde", "v4"] }
futures = "0.3"
tracing = "0.1"
tracing-subscriber = "0.2.0-alpha.4"
bincode = "1.2"
serde = "1.0"

View File

@ -0,0 +1,140 @@
use clap::{App, Arg, SubCommand};
use futures::executor::block_on;
use network::{Address, Network, Participant, Promise, Stream};
use serde::{Deserialize, Serialize};
use std::{
net::SocketAddr,
sync::Arc,
thread,
time::{Duration, Instant},
};
use tracing::*;
use tracing_subscriber::EnvFilter;
use uuid::Uuid;
use uvth::ThreadPoolBuilder;
#[derive(Serialize, Deserialize, Debug)]
enum Msg {
Ping { id: u64, data: Vec<u8> },
Pong { id: u64, data: Vec<u8> },
}
fn main() {
let matches = App::new("Veloren Speed Test Utility")
.version("0.1.0")
.author("Marcel Märtens <marcel.cochem@googlemail.com>")
.about("Runs speedtests regarding different parameter to benchmark veloren-network")
.subcommand(
SubCommand::with_name("listen")
.about("Runs the counter part that pongs all requests")
.arg(
Arg::with_name("port")
.short("p")
.long("port")
.takes_value(true)
.help("port to listen on"),
),
)
.subcommand(
SubCommand::with_name("run")
.arg(
Arg::with_name("port")
.short("p")
.long("port")
.takes_value(true)
.help("port to connect too"),
)
.arg(
Arg::with_name("participants")
.long("participants")
.takes_value(true)
.help("number of participants to open"),
)
.arg(
Arg::with_name("streams")
.long("streams")
.takes_value(true)
.help("number of streams to open per participant"),
),
)
.get_matches();
let filter = EnvFilter::from_default_env().add_directive("warn".parse().unwrap());
//.add_directive("veloren_network::tests=trace".parse().unwrap());
tracing_subscriber::FmtSubscriber::builder()
// all spans/events with a level higher than TRACE (e.g, info, warn, etc.)
// will be written to stdout.
.with_max_level(Level::TRACE)
.with_env_filter(filter)
// sets this to be the default, global subscriber for this application.
.init();
if let Some(matches) = matches.subcommand_matches("listen") {
server();
};
if let Some(matches) = matches.subcommand_matches("run") {
client();
};
}
fn server() {
let thread_pool = Arc::new(
ThreadPoolBuilder::new()
.name("veloren-network-server".into())
.build(),
);
thread::sleep(Duration::from_millis(200));
let server = Network::new(Uuid::new_v4(), thread_pool.clone());
let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000)));
block_on(server.listen(&address)).unwrap(); //await
thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly!
loop {
let p1 = block_on(server.connected()).unwrap(); //remote representation of p1
let s1 = block_on(p1.opened()).unwrap(); //remote representation of s1
loop {
let m: Result<Option<Msg>, _> = s1.recv();
match m {
Ok(Some(Msg::Ping { id, data })) => {
//s1.send(Msg::Pong {id, data});
},
Err(e) => {},
_ => {},
}
}
}
}
fn client() {
let thread_pool = Arc::new(
ThreadPoolBuilder::new()
.name("veloren-network-server".into())
.build(),
);
thread::sleep(Duration::from_millis(200));
let client = Network::new(Uuid::new_v4(), thread_pool.clone());
let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000)));
thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly!
loop {
let p1 = block_on(client.connect(&address)).unwrap(); //remote representation of p1
let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1
let mut last = Instant::now();
loop {
let mut id = 0u64;
s1.send(Msg::Ping {
id,
data: vec![0; 100],
});
id += 1;
if id.rem_euclid(10000) == 0 {
let new = Instant::now();
let diff = new.duration_since(last);
last = new;
println!("10.000 took {}", diff.as_millis());
}
let _: Result<Option<Msg>, _> = s1.recv();
}
}
}