Implement a async recv test

This commit is contained in:
Marcel Märtens 2020-03-04 01:37:36 +01:00
parent 1e948389cc
commit 74143e13d3
7 changed files with 282 additions and 69 deletions

16
Cargo.lock generated
View File

@ -147,6 +147,22 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97be891acc47ca214468e09425d02cef3af2c94d0d82081cd02061f996802f14"
[[package]]
name = "async-recv"
version = "0.1.0"
dependencies = [
"bincode",
"chrono",
"clap",
"futures 0.3.4",
"serde",
"tracing",
"tracing-subscriber",
"uuid 0.8.1",
"uvth",
"veloren-network",
]
[[package]]
name = "async-std"
version = "1.5.0"

View File

@ -12,6 +12,7 @@ members = [
"network",
"network/tools/tcp-loadtest",
"network/tools/network-speed",
"network/tools/async_recv",
]
# default profile for devs, fast to compile, okay enough to run, no debug information

View File

@ -51,7 +51,7 @@ pub struct Connection {}
pub struct Stream {
sid: Sid,
msg_rx: Receiver<InCommingMessage>,
network_controller: Arc<Vec<Controller>>,
ctr_tx: mio_extras::channel::Sender<CtrlMsg>,
}
pub struct Network {
@ -202,28 +202,28 @@ impl Participant {
let (ctrl_tx, ctrl_rx) = mpsc::channel::<Sid>();
let (msg_tx, msg_rx) = mpsc::channel::<InCommingMessage>();
for controller in self.network_controller.iter() {
controller
.get_tx()
.send(CtrlMsg::OpenStream {
pid: self.remote_pid,
prio,
promises,
return_sid: ctrl_tx,
msg_tx,
})
.unwrap();
break;
let tx = controller.get_tx();
tx.send(CtrlMsg::OpenStream {
pid: self.remote_pid,
prio,
promises,
return_sid: ctrl_tx,
msg_tx,
})
.unwrap();
// I dont like the fact that i need to wait on the worker thread for getting my
// sid back :/ we could avoid this by introducing a Thread Local Network
// which owns some sids we can take without waiting
let sid = ctrl_rx.recv().unwrap();
info!(?sid, " sucessfully opened stream");
return Ok(Stream {
sid,
msg_rx,
ctr_tx: tx,
});
}
// I dont like the fact that i need to wait on the worker thread for getting my
// sid back :/ we could avoid this by introducing a Thread Local Network
// which owns some sids we can take without waiting
let sid = ctrl_rx.recv().unwrap();
info!(?sid, " sucessfully opened stream");
Ok(Stream {
sid,
msg_rx,
network_controller: self.network_controller.clone(),
})
Err(ParticipantError::ParticipantDisconected)
}
pub fn close(&self, stream: Stream) -> Result<(), ParticipantError> { Ok(()) }
@ -245,7 +245,7 @@ impl Participant {
return Ok(Stream {
sid,
msg_rx,
network_controller: self.network_controller.clone(),
ctr_tx: worker.get_tx(),
});
}
};
@ -269,31 +269,24 @@ impl Stream {
// updated by workes via a channel and needs to be intepreted on a send but it
// should almost ever be empty except for new channel creations and stream
// creations!
for controller in self.network_controller.iter() {
controller
.get_tx()
.send(CtrlMsg::Send(OutGoingMessage {
buffer: messagebuffer.clone(),
cursor: 0,
mid: None,
sid: self.sid,
}))
.unwrap();
}
self.ctr_tx
.send(CtrlMsg::Send(OutGoingMessage {
buffer: messagebuffer.clone(),
cursor: 0,
mid: None,
sid: self.sid,
}))
.unwrap();
Ok(())
}
//TODO: remove the Option, async should make it unnecesarry!
pub fn recv<M: DeserializeOwned>(&self) -> Result<Option<M>, StreamError> {
match self.msg_rx.try_recv() {
pub async fn recv<M: DeserializeOwned>(&self) -> Result<M, StreamError> {
match self.msg_rx.recv() {
Ok(msg) => {
info!(?msg, "delivering a message");
Ok(Some(message::deserialize(msg.buffer)))
},
Err(TryRecvError::Empty) => Ok(None),
Err(err) => {
panic!("Unexpected error '{}'", err);
Ok(message::deserialize(msg.buffer))
},
Err(err) => panic!("Unexpected error '{}'", err),
}
}
}

View File

@ -331,7 +331,12 @@ impl Channel {
let tx = s.msg_tx();
for m in s.to_receive.drain(pos..pos + 1) {
info!(?sid, ? m.mid, "received message");
tx.send(m).unwrap();
tx.send(m).map_err(|err| {
error!(
?err,
"Couldn't deliver message, as stream no longer exists!"
)
});
}
}
}

View File

@ -0,0 +1,19 @@
[package]
name = "async-recv"
version = "0.1.0"
authors = ["Marcel Märtens <marcel.cochem@googlemail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
uvth = "3.1"
network = { package = "veloren-network", path = "../../../network" }
clap = "2.33"
uuid = { version = "0.8", features = ["serde", "v4"] }
futures = "0.3"
tracing = "0.1"
chrono = "0.4"
tracing-subscriber = "0.2.0-alpha.4"
bincode = "1.2"
serde = "1.0"

View File

@ -0,0 +1,178 @@
use chrono::prelude::*;
use clap::{App, Arg, SubCommand};
use futures::executor::block_on;
use network::{Address, Network, Participant, Promise, Stream};
use serde::{Deserialize, Serialize};
use std::{
net::SocketAddr,
sync::Arc,
thread,
time::{Duration, Instant},
};
use tracing::*;
use tracing_subscriber::EnvFilter;
use uuid::Uuid;
use uvth::ThreadPoolBuilder;
#[derive(Serialize, Deserialize, Debug)]
enum Msg {
Ping(u64),
Pong(u64),
}
fn main() {
let matches = App::new("Veloren Speed Test Utility")
.version("0.1.0")
.author("Marcel Märtens <marcel.cochem@googlemail.com>")
.about("Runs speedtests regarding different parameter to benchmark veloren-network")
.subcommand(
SubCommand::with_name("listen")
.about("Runs the counter part that pongs all requests")
.arg(
Arg::with_name("port")
.short("p")
.long("port")
.takes_value(true)
.help("port to listen on"),
),
)
.subcommand(
SubCommand::with_name("run").arg(
Arg::with_name("port")
.short("p")
.long("port")
.takes_value(true)
.help("port to connect too"),
),
)
.get_matches();
let filter = EnvFilter::from_default_env().add_directive("error".parse().unwrap());
//.add_directive("veloren_network::tests=trace".parse().unwrap());
tracing_subscriber::FmtSubscriber::builder()
// all spans/events with a level higher than TRACE (e.g, info, warn, etc.)
// will be written to stdout.
.with_max_level(Level::TRACE)
.with_env_filter(filter)
// sets this to be the default, global subscriber for this application.
.init();
if let Some(matches) = matches.subcommand_matches("listen") {
let port = matches
.value_of("port")
.map_or(52000, |v| v.parse::<u16>().unwrap_or(52000));
server(port);
};
if let Some(matches) = matches.subcommand_matches("run") {
let port = matches
.value_of("port")
.map_or(52000, |v| v.parse::<u16>().unwrap_or(52000));
client(port);
};
}
fn server(port: u16) {
let thread_pool = Arc::new(
ThreadPoolBuilder::new()
.name("veloren-network-server".into())
.build(),
);
thread::sleep(Duration::from_millis(200));
let server = Network::new(Uuid::new_v4(), thread_pool.clone());
let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], port)));
block_on(server.listen(&address)).unwrap(); //await
thread::sleep(Duration::from_millis(10)); //TODO: listeing still doesnt block correctly!
println!("waiting for client");
let p1 = block_on(server.connected()).unwrap(); //remote representation of p1
let s1 = block_on(p1.opened()).unwrap(); //remote representation of s1
let s2 = block_on(p1.opened()).unwrap(); //remote representation of s2
let t1 = thread::spawn(move || {
if let Ok(Msg::Ping(id)) = block_on(s1.recv()) {
thread::sleep(Duration::from_millis(3000));
s1.send(Msg::Pong(id));
println!("[{}], send s1_1", Utc::now().time());
}
if let Ok(Msg::Ping(id)) = block_on(s1.recv()) {
thread::sleep(Duration::from_millis(3000));
s1.send(Msg::Pong(id));
println!("[{}], send s1_2", Utc::now().time());
}
});
let t2 = thread::spawn(move || {
if let Ok(Msg::Ping(id)) = block_on(s2.recv()) {
thread::sleep(Duration::from_millis(1000));
s2.send(Msg::Pong(id));
println!("[{}], send s2_1", Utc::now().time());
}
if let Ok(Msg::Ping(id)) = block_on(s2.recv()) {
thread::sleep(Duration::from_millis(1000));
s2.send(Msg::Pong(id));
println!("[{}], send s2_2", Utc::now().time());
}
});
t1.join();
t2.join();
thread::sleep(Duration::from_millis(50));
}
async fn async_task1(s: Stream) -> u64 {
s.send(Msg::Ping(100));
println!("[{}], s1_1...", Utc::now().time());
let m1: Result<Msg, _> = s.recv().await;
println!("[{}], s1_1: {:?}", Utc::now().time(), m1);
thread::sleep(Duration::from_millis(1000));
s.send(Msg::Ping(101));
println!("[{}], s1_2...", Utc::now().time());
let m2: Result<Msg, _> = s.recv().await;
println!("[{}], s1_2: {:?}", Utc::now().time(), m2);
match m2.unwrap() {
Msg::Pong(id) => id,
_ => panic!("wrong answer"),
}
}
async fn async_task2(s: Stream) -> u64 {
s.send(Msg::Ping(200));
println!("[{}], s2_1...", Utc::now().time());
let m1: Result<Msg, _> = s.recv().await;
println!("[{}], s2_1: {:?}", Utc::now().time(), m1);
thread::sleep(Duration::from_millis(5000));
s.send(Msg::Ping(201));
println!("[{}], s2_2...", Utc::now().time());
let m2: Result<Msg, _> = s.recv().await;
println!("[{}], s2_2: {:?}", Utc::now().time(), m2);
match m2.unwrap() {
Msg::Pong(id) => id,
_ => panic!("wrong answer"),
}
}
fn client(port: u16) {
let thread_pool = Arc::new(
ThreadPoolBuilder::new()
.name("veloren-network-server".into())
.build(),
);
thread::sleep(Duration::from_millis(200));
let client = Network::new(Uuid::new_v4(), thread_pool.clone());
let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], port)));
thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly!
let p1 = block_on(client.connect(&address)).unwrap(); //remote representation of p1
let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1
let s2 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s2
let before = Instant::now();
block_on(async {
let f1 = async_task1(s1);
let f2 = async_task2(s2);
let x = futures::join!(f1, f2);
});
if before.elapsed() < Duration::from_secs(13) {
println!("IT WORKS!");
} else {
println!("doesn't seem to work :/")
}
thread::sleep(Duration::from_millis(50));
}

View File

@ -36,26 +36,21 @@ fn main() {
),
)
.subcommand(
SubCommand::with_name("run")
.arg(
Arg::with_name("port")
.short("p")
.long("port")
.takes_value(true)
.help("port to connect too"),
)
.arg(
Arg::with_name("participants")
.long("participants")
.takes_value(true)
.help("number of participants to open"),
)
.arg(
Arg::with_name("streams")
.long("streams")
.takes_value(true)
.help("number of streams to open per participant"),
),
SubCommand::with_name("run").arg(
Arg::with_name("port")
.short("p")
.long("port")
.takes_value(true)
.help("port to connect too"),
), /*
.arg(Arg::with_name("participants")
.long("participants")
.takes_value(true)
.help("number of participants to open"))
.arg(Arg::with_name("streams")
.long("streams")
.takes_value(true)
.help("number of streams to open per participant"))*/
)
.get_matches();
@ -71,14 +66,20 @@ fn main() {
.init();
if let Some(matches) = matches.subcommand_matches("listen") {
server();
let port = matches
.value_of("port")
.map_or(52000, |v| v.parse::<u16>().unwrap_or(52000));
server(port);
};
if let Some(matches) = matches.subcommand_matches("run") {
client();
let port = matches
.value_of("port")
.map_or(52000, |v| v.parse::<u16>().unwrap_or(52000));
client(port);
};
}
fn server() {
fn server(port: u16) {
let thread_pool = Arc::new(
ThreadPoolBuilder::new()
.name("veloren-network-server".into())
@ -86,7 +87,7 @@ fn server() {
);
thread::sleep(Duration::from_millis(200));
let server = Network::new(Uuid::new_v4(), thread_pool.clone());
let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000)));
let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], port)));
block_on(server.listen(&address)).unwrap(); //await
thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly!
@ -106,7 +107,7 @@ fn server() {
}
}
fn client() {
fn client(port: u16) {
let thread_pool = Arc::new(
ThreadPoolBuilder::new()
.name("veloren-network-server".into())
@ -114,7 +115,7 @@ fn client() {
);
thread::sleep(Duration::from_millis(200));
let client = Network::new(Uuid::new_v4(), thread_pool.clone());
let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000)));
let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], port)));
thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly!
loop {