diff --git a/Cargo.lock b/Cargo.lock index 9f3e444f5e..39bb60fbc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -573,9 +573,13 @@ version = "2.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9" dependencies = [ + "ansi_term", + "atty", "bitflags", + "strsim 0.8.0", "textwrap", "unicode-width", + "vec_map", ] [[package]] @@ -1049,7 +1053,7 @@ dependencies = [ "ident_case", "proc-macro2 1.0.9", "quote 1.0.3", - "strsim", + "strsim 0.9.3", "syn 1.0.16", ] @@ -2813,6 +2817,21 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "network-speed" +version = "0.1.0" +dependencies = [ + "bincode", + "clap", + "futures 0.3.4", + "serde", + "tracing", + "tracing-subscriber", + "uuid 0.8.1", + "uvth", + "veloren-network", +] + [[package]] name = "nix" version = "0.14.1" @@ -4454,6 +4473,12 @@ dependencies = [ "bytes 0.4.12", ] +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + [[package]] name = "strsim" version = "0.9.3" diff --git a/Cargo.toml b/Cargo.toml index 860d8136b2..733b01935f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,8 +9,9 @@ members = [ "server-cli", "voxygen", "world", - "network", + "network", "network/tools/tcp-loadtest", + "network/tools/network-speed", ] # default profile for devs, fast to compile, okay enough to run, no debug information diff --git a/network/Cargo.toml b/network/Cargo.toml index e2967e9021..65a111de1e 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -15,10 +15,12 @@ serde = "1.0" serde_derive = "1.0" mio = "0.6" tracing = "0.1" -tracing-subscriber = "0.2.0-alpha.4" byteorder = "1.3" mio-extras = "2.0" -futures = "0.3" prometheus = "0.7" uuid = { version = "0.8", features = ["serde", "v4"] } -tlid = { path = "../../tlid", features = ["serde"]} \ No newline at end of file +tlid = { path = "../../tlid", features = ["serde"]} + +[dev-dependencies] +futures = "0.3" +tracing-subscriber = "0.2.0-alpha.4" \ No newline at end of file diff --git a/network/src/api.rs b/network/src/api.rs index 38805d248e..e6142e181a 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -15,9 +15,8 @@ use mio::{ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{ collections::HashMap, - marker::PhantomData, sync::{ - mpsc::{self, Receiver, Sender, TryRecvError}, + mpsc::{self, Receiver, TryRecvError}, Arc, RwLock, }, }; @@ -270,8 +269,8 @@ impl Stream { // updated by workes via a channel and needs to be intepreted on a send but it // should almost ever be empty except for new channel creations and stream // creations! - for worker in self.network_controller.iter() { - worker + for controller in self.network_controller.iter() { + controller .get_tx() .send(CtrlMsg::Send(OutGoingMessage { buffer: messagebuffer.clone(), @@ -284,6 +283,7 @@ impl Stream { Ok(()) } + //TODO: remove the Option, async should make it unnecesarry! pub fn recv<M: DeserializeOwned>(&self) -> Result<Option<M>, StreamError> { match self.msg_rx.try_recv() { Ok(msg) => { diff --git a/network/src/lib.rs b/network/src/lib.rs index 94b67aa6b4..0207c10e83 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -10,6 +10,10 @@ mod types; mod udp; mod worker; +pub use api::{ + Address, Network, NetworkError, Participant, ParticipantError, Promise, Stream, StreamError, +}; + #[cfg(test)] pub mod tests { use crate::api::*; @@ -63,7 +67,6 @@ pub mod tests { fn aaa() { test_tracing(); } #[test] - #[ignore] fn client_server() { let thread_pool = Arc::new( ThreadPoolBuilder::new() @@ -82,7 +85,7 @@ pub mod tests { let p1 = block_on(n1.connect(&a2)).unwrap(); //await let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); - s1.send("Hello World"); + assert!(s1.send("Hello World").is_ok()); let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1 let s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1 @@ -90,7 +93,7 @@ pub mod tests { let s = block_on_recv(&s1_n2); assert_eq!(s, Ok("Hello World".to_string())); - p1.close(s1); + assert!(p1.close(s1).is_ok()); } #[test] @@ -118,14 +121,11 @@ pub mod tests { let s4 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); let s5 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); - thread::sleep(Duration::from_millis(3)); - s3.send("Hello World3"); - thread::sleep(Duration::from_millis(3)); - s1.send("Hello World1"); - s5.send("Hello World5"); - s2.send("Hello World2"); - s4.send("Hello World4"); - thread::sleep(Duration::from_millis(3)); + assert!(s3.send("Hello World3").is_ok()); + assert!(s1.send("Hello World1").is_ok()); + assert!(s5.send("Hello World5").is_ok()); + assert!(s2.send("Hello World2").is_ok()); + assert!(s4.send("Hello World4").is_ok()); let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1 let s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1 @@ -138,20 +138,15 @@ pub mod tests { let s = block_on_recv(&s3_n2); assert_eq!(s, Ok("Hello World3".to_string())); - info!("1 read"); let s = block_on_recv(&s1_n2); assert_eq!(s, Ok("Hello World1".to_string())); - info!("2 read"); let s = block_on_recv(&s2_n2); assert_eq!(s, Ok("Hello World2".to_string())); - info!("3 read"); let s = block_on_recv(&s5_n2); assert_eq!(s, Ok("Hello World5".to_string())); - info!("4 read"); let s = block_on_recv(&s4_n2); assert_eq!(s, Ok("Hello World4".to_string())); - info!("5 read"); - p1.close(s1); + assert!(p1.close(s1).is_ok()); } } diff --git a/network/src/message.rs b/network/src/message.rs index 5e5882154f..1d4de83202 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -2,10 +2,10 @@ use bincode; use serde::{de::DeserializeOwned, Serialize}; //use std::collections::VecDeque; use crate::types::{Mid, Sid}; +use byteorder::{NetworkEndian, ReadBytesExt}; use std::sync::Arc; use tracing::*; -#[derive(Debug)] pub(crate) struct MessageBuffer { // use VecDeque for msg storage, because it allows to quickly remove data from front. //however VecDeque needs custom bincode code, but it's possible @@ -45,6 +45,31 @@ pub(crate) fn deserialize<M: DeserializeOwned>(buffer: MessageBuffer) -> M { decoded } +impl std::fmt::Debug for MessageBuffer { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + //TODO: small messages! + let len = self.data.len(); + if len > 20 { + let n1 = (&self.data[0..4]).read_u32::<NetworkEndian>().unwrap(); + let n2 = (&self.data[4..8]).read_u32::<NetworkEndian>().unwrap(); + let n3 = (&self.data[8..12]).read_u32::<NetworkEndian>().unwrap(); + write!( + f, + "MessageBuffer(len: {}, {}, {}, {}, {:?}..{:?})", + len, + n1, + n2, + n3, + &self.data[13..16], + &self.data[len - 8..len] + ) + } else { + write!(f, "MessageBuffer(len: {}, {:?})", len, &self.data[..]) + } + } +} + #[cfg(test)] mod tests { use crate::message::*; diff --git a/network/src/tcp.rs b/network/src/tcp.rs index 1e92f2f8f1..4d3a060aca 100644 --- a/network/src/tcp.rs +++ b/network/src/tcp.rs @@ -13,7 +13,7 @@ pub(crate) struct TcpChannel { impl TcpChannel { pub fn new(endpoint: TcpStream) -> Self { - let mut b = vec![0; 200]; + let mut b = vec![0; 1600000]; Self { endpoint, read_buffer: b.clone(), @@ -33,17 +33,38 @@ impl ChannelProtocol for TcpChannel { trace!("incomming message with len: {}", n); let mut cur = std::io::Cursor::new(&self.read_buffer[..n]); while cur.position() < n as u64 { + let round_start = cur.position(); let r: Result<Frame, _> = bincode::deserialize_from(&mut cur); match r { Ok(frame) => result.push(frame), Err(e) => { - error!( + let newlen = self.read_buffer.len() * 2; + let debug_part = &self.read_buffer[(round_start as usize) + ..std::cmp::min(n as usize, (round_start + 10) as usize)]; + warn!( ?self, ?e, - "failure parsing a message with len: {}, starting with: {:?}", - n, - &self.read_buffer[0..std::cmp::min(n, 10)] + ?round_start, + "message cant be parsed, probably because buffer isn't large \ + enough, starting with: {:?}, increase to {}", + debug_part, + newlen ); + error!( + "please please please find a solution, either we need to keep the \ + buffer hight 1500 and hope for the other part to coorporate or \ + we need a way to keep some data in read_buffer till next call or \ + have a loop around it ... etc... which is error prone, so i dont \ + want to do it!" + ); + if newlen > 204800000 { + error!( + "something is seriossly broken with our messages, skipp the \ + resize" + ); + } else { + self.read_buffer.resize(newlen as usize, 0); + } break; }, } diff --git a/network/src/worker.rs b/network/src/worker.rs index 958b111211..70b05eab83 100644 --- a/network/src/worker.rs +++ b/network/src/worker.rs @@ -1,7 +1,6 @@ use crate::{ channel::{Channel, ChannelProtocol, ChannelProtocols}, controller::Controller, - message::InCommingMessage, metrics::NetworkMetrics, tcp::TcpChannel, types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg, TokenObjects}, @@ -10,7 +9,7 @@ use mio::{self, Poll, PollOpt, Ready, Token}; use mio_extras::channel::{Receiver, Sender}; use std::{ collections::HashMap, - sync::{mpsc, mpsc::TryRecvError, Arc, RwLock}, + sync::{mpsc::TryRecvError, Arc, RwLock}, time::Instant, }; use tlid; @@ -101,108 +100,109 @@ impl Worker { } fn handle_ctl(&mut self) -> bool { - let msg = match self.ctrl_rx.try_recv() { - Ok(msg) => msg, - Err(TryRecvError::Empty) => { - return false; - }, - Err(err) => { - panic!("Unexpected error '{}'", err); - }, - }; + loop { + let msg = match self.ctrl_rx.try_recv() { + Ok(msg) => msg, + Err(TryRecvError::Empty) => { + return false; + }, + Err(err) => { + panic!("Unexpected error '{}'", err); + }, + }; - match msg { - CtrlMsg::Shutdown => { - debug!("Shutting Down"); - for (tok, obj) in self.mio_tokens.tokens.iter_mut() { - if let TokenObjects::Channel(channel) = obj { - channel.shutdown(); - channel.tick_send(); - } - } - return true; - }, - CtrlMsg::Register(handle, interest, opts) => { - let tok = self.mio_tokens.construct(); - match &handle { - TokenObjects::TcpListener(h) => { - self.poll.register(h, tok, interest, opts).unwrap() - }, - TokenObjects::Channel(channel) => { - match channel.get_protocol() { - ChannelProtocols::Tcp(c) => { - self.poll.register(c.get_handle(), tok, interest, opts) - }, - ChannelProtocols::Udp(c) => { - self.poll.register(c.get_handle(), tok, interest, opts) - }, - ChannelProtocols::Mpsc(c) => { - self.poll.register(c.get_handle(), tok, interest, opts) - }, - } - .unwrap(); - }, - } - debug!(?handle, ?tok, "Registered new handle"); - self.mio_tokens.insert(tok, handle); - }, - CtrlMsg::OpenStream { - pid, - prio, - promises, - msg_tx, - return_sid, - } => { - let mut handled = false; - for (tok, obj) in self.mio_tokens.tokens.iter_mut() { - if let TokenObjects::Channel(channel) = obj { - if Some(pid) == channel.remote_pid { - let (msg_tx, msg_rx) = mpsc::channel::<InCommingMessage>(); - let sid = channel.open_stream(prio, promises, msg_tx); - return_sid.send(sid); + match msg { + CtrlMsg::Shutdown => { + debug!("Shutting Down"); + for (tok, obj) in self.mio_tokens.tokens.iter_mut() { + if let TokenObjects::Channel(channel) = obj { + channel.shutdown(); channel.tick_send(); - error!("handle msg_tx"); - handled = true; - break; } } - } - if !handled { - error!(?pid, "couldn't open Stream, didn't found pid"); - } - }, - CtrlMsg::CloseStream { pid, sid } => { - let mut handled = false; - for to in self.mio_tokens.tokens.values_mut() { - if let TokenObjects::Channel(channel) = to { - if Some(pid) == channel.remote_pid { - channel.close_stream(sid); //TODO: check participant + return true; + }, + CtrlMsg::Register(handle, interest, opts) => { + let tok = self.mio_tokens.construct(); + match &handle { + TokenObjects::TcpListener(h) => { + self.poll.register(h, tok, interest, opts).unwrap() + }, + TokenObjects::Channel(channel) => { + match channel.get_protocol() { + ChannelProtocols::Tcp(c) => { + self.poll.register(c.get_handle(), tok, interest, opts) + }, + ChannelProtocols::Udp(c) => { + self.poll.register(c.get_handle(), tok, interest, opts) + }, + ChannelProtocols::Mpsc(c) => { + self.poll.register(c.get_handle(), tok, interest, opts) + }, + } + .unwrap(); + }, + } + debug!(?handle, ?tok, "Registered new handle"); + self.mio_tokens.insert(tok, handle); + }, + CtrlMsg::OpenStream { + pid, + prio, + promises, + msg_tx, + return_sid, + } => { + let mut handled = false; + for (tok, obj) in self.mio_tokens.tokens.iter_mut() { + if let TokenObjects::Channel(channel) = obj { + if Some(pid) == channel.remote_pid { + let sid = channel.open_stream(prio, promises, msg_tx); + return_sid.send(sid); + channel.tick_send(); + handled = true; + break; + } + } + } + if !handled { + error!(?pid, "couldn't open Stream, didn't found pid"); + } + }, + CtrlMsg::CloseStream { pid, sid } => { + let mut handled = false; + for to in self.mio_tokens.tokens.values_mut() { + if let TokenObjects::Channel(channel) = to { + if Some(pid) == channel.remote_pid { + channel.close_stream(sid); //TODO: check participant + channel.tick_send(); + handled = true; + break; + } + } + } + if !handled { + error!(?pid, "couldn't close Stream, didn't found pid"); + } + }, + CtrlMsg::Send(outgoing) => { + let mut handled = false; + for to in self.mio_tokens.tokens.values_mut() { + if let TokenObjects::Channel(channel) = to { + channel.send(outgoing); //TODO: check participant channel.tick_send(); handled = true; break; } } - } - if !handled { - error!(?pid, "couldn't close Stream, didn't found pid"); - } - }, - CtrlMsg::Send(outgoing) => { - let mut handled = false; - for to in self.mio_tokens.tokens.values_mut() { - if let TokenObjects::Channel(channel) = to { - channel.send(outgoing); //TODO: check participant - channel.tick_send(); - handled = true; - break; + if !handled { + error!( + "help, we should check here for stream data, but its in channel ...." + ); } - } - if !handled { - error!("help, we should check here for stream data, but its in channel ...."); - } - }, - }; - false + }, + }; + } } fn handle_tok(&mut self, event: &mio::Event) { diff --git a/network/tools/network-speed/Cargo.toml b/network/tools/network-speed/Cargo.toml new file mode 100644 index 0000000000..55e9a1006d --- /dev/null +++ b/network/tools/network-speed/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "network-speed" +version = "0.1.0" +authors = ["Marcel Märtens <marcel.cochem@googlemail.com>"] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +uvth = "3.1" +network = { package = "veloren-network", path = "../../../network" } +clap = "2.33" +uuid = { version = "0.8", features = ["serde", "v4"] } +futures = "0.3" +tracing = "0.1" +tracing-subscriber = "0.2.0-alpha.4" +bincode = "1.2" +serde = "1.0" \ No newline at end of file diff --git a/network/tools/network-speed/src/main.rs b/network/tools/network-speed/src/main.rs new file mode 100644 index 0000000000..88d117e24b --- /dev/null +++ b/network/tools/network-speed/src/main.rs @@ -0,0 +1,140 @@ +use clap::{App, Arg, SubCommand}; +use futures::executor::block_on; +use network::{Address, Network, Participant, Promise, Stream}; +use serde::{Deserialize, Serialize}; +use std::{ + net::SocketAddr, + sync::Arc, + thread, + time::{Duration, Instant}, +}; +use tracing::*; +use tracing_subscriber::EnvFilter; +use uuid::Uuid; +use uvth::ThreadPoolBuilder; + +#[derive(Serialize, Deserialize, Debug)] +enum Msg { + Ping { id: u64, data: Vec<u8> }, + Pong { id: u64, data: Vec<u8> }, +} + +fn main() { + let matches = App::new("Veloren Speed Test Utility") + .version("0.1.0") + .author("Marcel Märtens <marcel.cochem@googlemail.com>") + .about("Runs speedtests regarding different parameter to benchmark veloren-network") + .subcommand( + SubCommand::with_name("listen") + .about("Runs the counter part that pongs all requests") + .arg( + Arg::with_name("port") + .short("p") + .long("port") + .takes_value(true) + .help("port to listen on"), + ), + ) + .subcommand( + SubCommand::with_name("run") + .arg( + Arg::with_name("port") + .short("p") + .long("port") + .takes_value(true) + .help("port to connect too"), + ) + .arg( + Arg::with_name("participants") + .long("participants") + .takes_value(true) + .help("number of participants to open"), + ) + .arg( + Arg::with_name("streams") + .long("streams") + .takes_value(true) + .help("number of streams to open per participant"), + ), + ) + .get_matches(); + + let filter = EnvFilter::from_default_env().add_directive("warn".parse().unwrap()); + //.add_directive("veloren_network::tests=trace".parse().unwrap()); + + tracing_subscriber::FmtSubscriber::builder() + // all spans/events with a level higher than TRACE (e.g, info, warn, etc.) + // will be written to stdout. + .with_max_level(Level::TRACE) + .with_env_filter(filter) + // sets this to be the default, global subscriber for this application. + .init(); + + if let Some(matches) = matches.subcommand_matches("listen") { + server(); + }; + if let Some(matches) = matches.subcommand_matches("run") { + client(); + }; +} + +fn server() { + let thread_pool = Arc::new( + ThreadPoolBuilder::new() + .name("veloren-network-server".into()) + .build(), + ); + thread::sleep(Duration::from_millis(200)); + let server = Network::new(Uuid::new_v4(), thread_pool.clone()); + let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000))); + block_on(server.listen(&address)).unwrap(); //await + thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! + + loop { + let p1 = block_on(server.connected()).unwrap(); //remote representation of p1 + let s1 = block_on(p1.opened()).unwrap(); //remote representation of s1 + loop { + let m: Result<Option<Msg>, _> = s1.recv(); + match m { + Ok(Some(Msg::Ping { id, data })) => { + //s1.send(Msg::Pong {id, data}); + }, + Err(e) => {}, + _ => {}, + } + } + } +} + +fn client() { + let thread_pool = Arc::new( + ThreadPoolBuilder::new() + .name("veloren-network-server".into()) + .build(), + ); + thread::sleep(Duration::from_millis(200)); + let client = Network::new(Uuid::new_v4(), thread_pool.clone()); + let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000))); + thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! + + loop { + let p1 = block_on(client.connect(&address)).unwrap(); //remote representation of p1 + let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1 + let mut last = Instant::now(); + loop { + let mut id = 0u64; + s1.send(Msg::Ping { + id, + data: vec![0; 100], + }); + id += 1; + if id.rem_euclid(10000) == 0 { + let new = Instant::now(); + let diff = new.duration_since(last); + last = new; + println!("10.000 took {}", diff.as_millis()); + } + let _: Result<Option<Msg>, _> = s1.recv(); + } + } +}