From 74143e13d35f4d4f025c6d3f5128a16c4638be8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Wed, 4 Mar 2020 01:37:36 +0100 Subject: [PATCH] Implement a async recv test --- Cargo.lock | 16 +++ Cargo.toml | 1 + network/src/api.rs | 77 +++++----- network/src/channel.rs | 7 +- network/tools/async_recv/Cargo.toml | 19 +++ network/tools/async_recv/src/main.rs | 178 ++++++++++++++++++++++++ network/tools/network-speed/src/main.rs | 53 +++---- 7 files changed, 282 insertions(+), 69 deletions(-) create mode 100644 network/tools/async_recv/Cargo.toml create mode 100644 network/tools/async_recv/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 39bb60fbc2..cf565cf111 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -147,6 +147,22 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97be891acc47ca214468e09425d02cef3af2c94d0d82081cd02061f996802f14" +[[package]] +name = "async-recv" +version = "0.1.0" +dependencies = [ + "bincode", + "chrono", + "clap", + "futures 0.3.4", + "serde", + "tracing", + "tracing-subscriber", + "uuid 0.8.1", + "uvth", + "veloren-network", +] + [[package]] name = "async-std" version = "1.5.0" diff --git a/Cargo.toml b/Cargo.toml index 733b01935f..ba5085b3bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "network", "network/tools/tcp-loadtest", "network/tools/network-speed", + "network/tools/async_recv", ] # default profile for devs, fast to compile, okay enough to run, no debug information diff --git a/network/src/api.rs b/network/src/api.rs index e6142e181a..923c4ec1e9 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -51,7 +51,7 @@ pub struct Connection {} pub struct Stream { sid: Sid, msg_rx: Receiver, - network_controller: Arc>, + ctr_tx: mio_extras::channel::Sender, } pub struct Network { @@ -202,28 +202,28 @@ impl Participant { let (ctrl_tx, ctrl_rx) = mpsc::channel::(); let (msg_tx, msg_rx) = mpsc::channel::(); for controller in self.network_controller.iter() { - controller - .get_tx() - .send(CtrlMsg::OpenStream { - pid: self.remote_pid, - prio, - promises, - return_sid: ctrl_tx, - msg_tx, - }) - .unwrap(); - break; + let tx = controller.get_tx(); + tx.send(CtrlMsg::OpenStream { + pid: self.remote_pid, + prio, + promises, + return_sid: ctrl_tx, + msg_tx, + }) + .unwrap(); + + // I dont like the fact that i need to wait on the worker thread for getting my + // sid back :/ we could avoid this by introducing a Thread Local Network + // which owns some sids we can take without waiting + let sid = ctrl_rx.recv().unwrap(); + info!(?sid, " sucessfully opened stream"); + return Ok(Stream { + sid, + msg_rx, + ctr_tx: tx, + }); } - // I dont like the fact that i need to wait on the worker thread for getting my - // sid back :/ we could avoid this by introducing a Thread Local Network - // which owns some sids we can take without waiting - let sid = ctrl_rx.recv().unwrap(); - info!(?sid, " sucessfully opened stream"); - Ok(Stream { - sid, - msg_rx, - network_controller: self.network_controller.clone(), - }) + Err(ParticipantError::ParticipantDisconected) } pub fn close(&self, stream: Stream) -> Result<(), ParticipantError> { Ok(()) } @@ -245,7 +245,7 @@ impl Participant { return Ok(Stream { sid, msg_rx, - network_controller: self.network_controller.clone(), + ctr_tx: worker.get_tx(), }); } }; @@ -269,31 +269,24 @@ impl Stream { // updated by workes via a channel and needs to be intepreted on a send but it // should almost ever be empty except for new channel creations and stream // creations! - for controller in self.network_controller.iter() { - controller - .get_tx() - .send(CtrlMsg::Send(OutGoingMessage { - buffer: messagebuffer.clone(), - cursor: 0, - mid: None, - sid: self.sid, - })) - .unwrap(); - } + self.ctr_tx + .send(CtrlMsg::Send(OutGoingMessage { + buffer: messagebuffer.clone(), + cursor: 0, + mid: None, + sid: self.sid, + })) + .unwrap(); Ok(()) } - //TODO: remove the Option, async should make it unnecesarry! - pub fn recv(&self) -> Result, StreamError> { - match self.msg_rx.try_recv() { + pub async fn recv(&self) -> Result { + match self.msg_rx.recv() { Ok(msg) => { info!(?msg, "delivering a message"); - Ok(Some(message::deserialize(msg.buffer))) - }, - Err(TryRecvError::Empty) => Ok(None), - Err(err) => { - panic!("Unexpected error '{}'", err); + Ok(message::deserialize(msg.buffer)) }, + Err(err) => panic!("Unexpected error '{}'", err), } } } diff --git a/network/src/channel.rs b/network/src/channel.rs index d07826c7e1..350215c962 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -331,7 +331,12 @@ impl Channel { let tx = s.msg_tx(); for m in s.to_receive.drain(pos..pos + 1) { info!(?sid, ? m.mid, "received message"); - tx.send(m).unwrap(); + tx.send(m).map_err(|err| { + error!( + ?err, + "Couldn't deliver message, as stream no longer exists!" + ) + }); } } } diff --git a/network/tools/async_recv/Cargo.toml b/network/tools/async_recv/Cargo.toml new file mode 100644 index 0000000000..961a932669 --- /dev/null +++ b/network/tools/async_recv/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "async-recv" +version = "0.1.0" +authors = ["Marcel Märtens "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +uvth = "3.1" +network = { package = "veloren-network", path = "../../../network" } +clap = "2.33" +uuid = { version = "0.8", features = ["serde", "v4"] } +futures = "0.3" +tracing = "0.1" +chrono = "0.4" +tracing-subscriber = "0.2.0-alpha.4" +bincode = "1.2" +serde = "1.0" \ No newline at end of file diff --git a/network/tools/async_recv/src/main.rs b/network/tools/async_recv/src/main.rs new file mode 100644 index 0000000000..f7e3866381 --- /dev/null +++ b/network/tools/async_recv/src/main.rs @@ -0,0 +1,178 @@ +use chrono::prelude::*; +use clap::{App, Arg, SubCommand}; +use futures::executor::block_on; +use network::{Address, Network, Participant, Promise, Stream}; +use serde::{Deserialize, Serialize}; +use std::{ + net::SocketAddr, + sync::Arc, + thread, + time::{Duration, Instant}, +}; +use tracing::*; +use tracing_subscriber::EnvFilter; +use uuid::Uuid; +use uvth::ThreadPoolBuilder; + +#[derive(Serialize, Deserialize, Debug)] +enum Msg { + Ping(u64), + Pong(u64), +} + +fn main() { + let matches = App::new("Veloren Speed Test Utility") + .version("0.1.0") + .author("Marcel Märtens ") + .about("Runs speedtests regarding different parameter to benchmark veloren-network") + .subcommand( + SubCommand::with_name("listen") + .about("Runs the counter part that pongs all requests") + .arg( + Arg::with_name("port") + .short("p") + .long("port") + .takes_value(true) + .help("port to listen on"), + ), + ) + .subcommand( + SubCommand::with_name("run").arg( + Arg::with_name("port") + .short("p") + .long("port") + .takes_value(true) + .help("port to connect too"), + ), + ) + .get_matches(); + + let filter = EnvFilter::from_default_env().add_directive("error".parse().unwrap()); + //.add_directive("veloren_network::tests=trace".parse().unwrap()); + + tracing_subscriber::FmtSubscriber::builder() + // all spans/events with a level higher than TRACE (e.g, info, warn, etc.) + // will be written to stdout. + .with_max_level(Level::TRACE) + .with_env_filter(filter) + // sets this to be the default, global subscriber for this application. + .init(); + + if let Some(matches) = matches.subcommand_matches("listen") { + let port = matches + .value_of("port") + .map_or(52000, |v| v.parse::().unwrap_or(52000)); + server(port); + }; + if let Some(matches) = matches.subcommand_matches("run") { + let port = matches + .value_of("port") + .map_or(52000, |v| v.parse::().unwrap_or(52000)); + client(port); + }; +} + +fn server(port: u16) { + let thread_pool = Arc::new( + ThreadPoolBuilder::new() + .name("veloren-network-server".into()) + .build(), + ); + thread::sleep(Duration::from_millis(200)); + let server = Network::new(Uuid::new_v4(), thread_pool.clone()); + let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], port))); + block_on(server.listen(&address)).unwrap(); //await + thread::sleep(Duration::from_millis(10)); //TODO: listeing still doesnt block correctly! + println!("waiting for client"); + + let p1 = block_on(server.connected()).unwrap(); //remote representation of p1 + let s1 = block_on(p1.opened()).unwrap(); //remote representation of s1 + let s2 = block_on(p1.opened()).unwrap(); //remote representation of s2 + let t1 = thread::spawn(move || { + if let Ok(Msg::Ping(id)) = block_on(s1.recv()) { + thread::sleep(Duration::from_millis(3000)); + s1.send(Msg::Pong(id)); + println!("[{}], send s1_1", Utc::now().time()); + } + if let Ok(Msg::Ping(id)) = block_on(s1.recv()) { + thread::sleep(Duration::from_millis(3000)); + s1.send(Msg::Pong(id)); + println!("[{}], send s1_2", Utc::now().time()); + } + }); + let t2 = thread::spawn(move || { + if let Ok(Msg::Ping(id)) = block_on(s2.recv()) { + thread::sleep(Duration::from_millis(1000)); + s2.send(Msg::Pong(id)); + println!("[{}], send s2_1", Utc::now().time()); + } + if let Ok(Msg::Ping(id)) = block_on(s2.recv()) { + thread::sleep(Duration::from_millis(1000)); + s2.send(Msg::Pong(id)); + println!("[{}], send s2_2", Utc::now().time()); + } + }); + t1.join(); + t2.join(); + thread::sleep(Duration::from_millis(50)); +} + +async fn async_task1(s: Stream) -> u64 { + s.send(Msg::Ping(100)); + println!("[{}], s1_1...", Utc::now().time()); + let m1: Result = s.recv().await; + println!("[{}], s1_1: {:?}", Utc::now().time(), m1); + thread::sleep(Duration::from_millis(1000)); + s.send(Msg::Ping(101)); + println!("[{}], s1_2...", Utc::now().time()); + let m2: Result = s.recv().await; + println!("[{}], s1_2: {:?}", Utc::now().time(), m2); + match m2.unwrap() { + Msg::Pong(id) => id, + _ => panic!("wrong answer"), + } +} + +async fn async_task2(s: Stream) -> u64 { + s.send(Msg::Ping(200)); + println!("[{}], s2_1...", Utc::now().time()); + let m1: Result = s.recv().await; + println!("[{}], s2_1: {:?}", Utc::now().time(), m1); + thread::sleep(Duration::from_millis(5000)); + s.send(Msg::Ping(201)); + println!("[{}], s2_2...", Utc::now().time()); + let m2: Result = s.recv().await; + println!("[{}], s2_2: {:?}", Utc::now().time(), m2); + match m2.unwrap() { + Msg::Pong(id) => id, + _ => panic!("wrong answer"), + } +} + +fn client(port: u16) { + let thread_pool = Arc::new( + ThreadPoolBuilder::new() + .name("veloren-network-server".into()) + .build(), + ); + thread::sleep(Duration::from_millis(200)); + let client = Network::new(Uuid::new_v4(), thread_pool.clone()); + let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], port))); + thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! + + let p1 = block_on(client.connect(&address)).unwrap(); //remote representation of p1 + let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1 + let s2 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s2 + let before = Instant::now(); + block_on(async { + let f1 = async_task1(s1); + let f2 = async_task2(s2); + let x = futures::join!(f1, f2); + }); + if before.elapsed() < Duration::from_secs(13) { + println!("IT WORKS!"); + } else { + println!("doesn't seem to work :/") + } + thread::sleep(Duration::from_millis(50)); +} diff --git a/network/tools/network-speed/src/main.rs b/network/tools/network-speed/src/main.rs index ac6346bc02..0246b1978b 100644 --- a/network/tools/network-speed/src/main.rs +++ b/network/tools/network-speed/src/main.rs @@ -36,26 +36,21 @@ fn main() { ), ) .subcommand( - SubCommand::with_name("run") - .arg( - Arg::with_name("port") - .short("p") - .long("port") - .takes_value(true) - .help("port to connect too"), - ) - .arg( - Arg::with_name("participants") - .long("participants") - .takes_value(true) - .help("number of participants to open"), - ) - .arg( - Arg::with_name("streams") - .long("streams") - .takes_value(true) - .help("number of streams to open per participant"), - ), + SubCommand::with_name("run").arg( + Arg::with_name("port") + .short("p") + .long("port") + .takes_value(true) + .help("port to connect too"), + ), /* + .arg(Arg::with_name("participants") + .long("participants") + .takes_value(true) + .help("number of participants to open")) + .arg(Arg::with_name("streams") + .long("streams") + .takes_value(true) + .help("number of streams to open per participant"))*/ ) .get_matches(); @@ -71,14 +66,20 @@ fn main() { .init(); if let Some(matches) = matches.subcommand_matches("listen") { - server(); + let port = matches + .value_of("port") + .map_or(52000, |v| v.parse::().unwrap_or(52000)); + server(port); }; if let Some(matches) = matches.subcommand_matches("run") { - client(); + let port = matches + .value_of("port") + .map_or(52000, |v| v.parse::().unwrap_or(52000)); + client(port); }; } -fn server() { +fn server(port: u16) { let thread_pool = Arc::new( ThreadPoolBuilder::new() .name("veloren-network-server".into()) @@ -86,7 +87,7 @@ fn server() { ); thread::sleep(Duration::from_millis(200)); let server = Network::new(Uuid::new_v4(), thread_pool.clone()); - let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000))); + let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], port))); block_on(server.listen(&address)).unwrap(); //await thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! @@ -106,7 +107,7 @@ fn server() { } } -fn client() { +fn client(port: u16) { let thread_pool = Arc::new( ThreadPoolBuilder::new() .name("veloren-network-server".into()) @@ -114,7 +115,7 @@ fn client() { ); thread::sleep(Duration::from_millis(200)); let client = Network::new(Uuid::new_v4(), thread_pool.clone()); - let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000))); + let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], port))); thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! loop {