diff --git a/Cargo.lock b/Cargo.lock index e83a3c5fde..5b63cf0bef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,6 +63,15 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "000444226fcff248f2bc4c7625be32c63caccfecc2723a2b9f78a7487a49c407" +[[package]] +name = "ansi_term" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" +dependencies = [ + "winapi 0.3.8", +] + [[package]] name = "anyhow" version = "1.0.27" @@ -2584,6 +2593,15 @@ dependencies = [ "libc", ] +[[package]] +name = "matchers" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" +dependencies = [ + "regex-automata", +] + [[package]] name = "matches" version = "0.1.8" @@ -3764,6 +3782,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" dependencies = [ "byteorder 1.3.4", + "regex-syntax", ] [[package]] @@ -4183,6 +4202,15 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" +[[package]] +name = "sharded-slab" +version = "0.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae75d0445b5d3778c9da3d1f840faa16d0627c8607f78a74daf69e5b988c39a1" +dependencies = [ + "lazy_static", +] + [[package]] name = "shared_library" version = "0.1.9" @@ -4488,6 +4516,13 @@ dependencies = [ "unicode-xid 0.2.0", ] +[[package]] +name = "tcp-loadtest" +version = "0.1.0" +dependencies = [ + "rand 0.7.3", +] + [[package]] name = "tempdir" version = "0.3.7" @@ -4802,6 +4837,77 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860" +[[package]] +name = "tracing" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1721cc8cf7d770cc4257872507180f35a4797272f5962f24c806af9e7faf52ab" +dependencies = [ + "cfg-if", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fbad39da2f9af1cae3016339ad7f2c7a9e870f12e8fd04c4fd7ef35b30c0d2b" +dependencies = [ + "quote 1.0.3", + "syn 1.0.16", +] + +[[package]] +name = "tracing-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0aa83a9a47081cd522c09c81b31aec2c9273424976f922ad61c053b58350b715" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "tracing-log" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9" +dependencies = [ + "lazy_static", + "log 0.4.8", + "tracing-core", +] + +[[package]] +name = "tracing-serde" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6ccba2f8f16e0ed268fc765d9b7ff22e965e7185d32f8f1ec8294fe17d86e79" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dedebcf5813b02261d6bab3a12c6a8ae702580c0405a2e8ec16c3713caf14c20" +dependencies = [ + "ansi_term", + "chrono", + "lazy_static", + "matchers", + "regex", + "serde", + "serde_json", + "sharded-slab", + "smallvec 1.2.0", + "tracing-core", + "tracing-log", + "tracing-serde", +] + [[package]] name = "treeculler" version = "0.1.0" @@ -4928,6 +5034,10 @@ name = "uuid" version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11" +dependencies = [ + "rand 0.7.3", + "serde", +] [[package]] name = "uvth" @@ -5043,10 +5153,15 @@ name = "veloren-network" version = "0.1.0" dependencies = [ "bincode", + "byteorder 1.3.4", "enumset", "mio", + "rand 0.7.3", "serde", "serde_derive", + "tracing", + "tracing-subscriber", + "uuid 0.8.1", "uvth", ] diff --git a/Cargo.toml b/Cargo.toml index 901fb4ef5b..860d8136b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "voxygen", "world", "network", + "network/tools/tcp-loadtest", ] # default profile for devs, fast to compile, okay enough to run, no debug information diff --git a/network/Cargo.toml b/network/Cargo.toml index 413a2a9e14..f946aa859e 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -13,4 +13,9 @@ enumset = "0.4" bincode = "1.2" serde = "1.0" serde_derive = "1.0" -mio = "0.6" \ No newline at end of file +mio = "0.6" +tracing = "0.1" +tracing-subscriber = "0.2.0-alpha.4" +byteorder = "1.3" +rand = "0.7" +uuid = { version = "0.8", features = ["serde", "v4"] } \ No newline at end of file diff --git a/network/src/api.rs b/network/src/api.rs index ff8067ac29..ab89007e54 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -1,19 +1,20 @@ -use crate::{message::Message, protocol::Protocol}; +use crate::{ + message::{self, Message}, + mio_worker::{MioWorker, TokenObjects}, + tcp_channel::TcpChannel, +}; use enumset::*; use mio::{ self, net::{TcpListener, TcpStream}, - Poll, PollOpt, Ready, Token, + PollOpt, Ready, }; -use std::{ - collections::HashMap, - marker::PhantomData, - sync::{Arc, RwLock}, - time::Duration, -}; -use uvth::{ThreadPool, ThreadPoolBuilder}; +use std::{marker::PhantomData, sync::Arc}; +use tracing::*; +use uuid::Uuid; +use uvth::ThreadPool; -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum Address { Tcp(std::net::SocketAddr), Udp(std::net::SocketAddr), @@ -36,130 +37,99 @@ pub struct Connection {} pub struct Stream {} pub trait Events { - fn OnRemoteConnectionOpen(net: &Network, con: &Connection) + fn on_remote_connection_open(net: &Network, con: &Connection) where Self: std::marker::Sized; - fn OnRemoteConnectionClose(net: &Network, con: &Connection) + fn on_remote_connection_close(net: &Network, con: &Connection) where Self: std::marker::Sized; - fn OnRemoteStreamOpen(net: &Network, st: &Stream) + fn on_remote_stream_open(net: &Network, st: &Stream) where Self: std::marker::Sized; - fn OnRemoteStreamClose(net: &Network, st: &Stream) + fn on_remote_stream_close(net: &Network, st: &Stream) where Self: std::marker::Sized; } -pub enum TokenObjects { - TCP_LISTENER(TcpListener), -} - -pub struct NetworkData { - next_token_id: usize, - tokens: HashMap, //TODO: move to Vec for faster lookup - poll: Poll, -} - pub struct Network { - internal_sync: Arc>, - thread_pool: ThreadPool, - participant_id: u64, + mio_workers: Arc>, + thread_pool: Arc, + participant_id: Uuid, _pe: PhantomData, } -impl NetworkData { - pub fn new() -> Self { - NetworkData { - next_token_id: 0, - tokens: HashMap::new(), - poll: Poll::new().unwrap(), - } - } -} - impl Network { - const TCP_LISTEN_TOK: Token = Token(0); - - pub fn new() -> Self { - let thread_pool = ThreadPoolBuilder::new() - .name("veloren-network".into()) - .build(); - let internal_sync = Arc::new(RwLock::new(NetworkData::new())); - let internal_sync_clone = internal_sync.clone(); - thread_pool.execute(|| master_poll_worker(internal_sync_clone)); + pub fn new(participant_id: Uuid, thread_pool: Arc) -> Self { + let mio_workers = Arc::new(vec![MioWorker::new( + (participant_id.as_u128().rem_euclid(1024)) as u64, + thread_pool.clone(), + )]); Self { - internal_sync, + mio_workers, thread_pool, - participant_id: 42, + participant_id, _pe: PhantomData:: {}, } } - pub fn send<'a, M: Message<'a>>(&self, msg: M, stream: &Stream) {} + fn get_lowest_worker<'a: 'b, 'b>(list: &'a Arc>) -> &'a MioWorker { &list[0] } + + pub fn send<'a, M: Message<'a>>(&self, msg: M, stream: &Stream) { + let messagebuffer = message::serialize(&msg); + } pub fn listen(&self, addr: &Address) { - let addr = addr.clone(); - let internal_sync = self.internal_sync.clone(); - self.thread_pool.execute(move || match addr { - Address::Tcp(a) => { - let tcp_listener = TcpListener::bind(&a).unwrap(); - let mut internal_sync = internal_sync.write().unwrap(); - let tok = Token(internal_sync.next_token_id); - internal_sync.next_token_id += 1; - internal_sync - .poll - .register(&tcp_listener, tok, Ready::readable(), PollOpt::edge()) - .unwrap(); - internal_sync - .tokens - .insert(tok, TokenObjects::TCP_LISTENER(tcp_listener)); - }, - Address::Udp(_) => unimplemented!("lazy me"), + let mio_workers = self.mio_workers.clone(); + let address = addr.clone(); + self.thread_pool.execute(move || { + let mut span = span!(Level::INFO, "listen", ?address); + let _enter = span.enter(); + match address { + Address::Tcp(a) => { + info!("listening"); + let tcp_listener = TcpListener::bind(&a).unwrap(); + let worker = Self::get_lowest_worker(&mio_workers); + worker.register( + TokenObjects::TcpListener(tcp_listener), + Ready::readable(), + PollOpt::edge(), + ); + }, + Address::Udp(_) => unimplemented!("lazy me"), + } }); } - pub fn connect(&self, addr: &Address) -> Participant { Participant { addr: addr.clone() } } + pub fn connect(&self, addr: &Address) -> Participant { + let mio_workers = self.mio_workers.clone(); + let address = addr.clone(); + self.thread_pool.execute(move || { + let mut span = span!(Level::INFO, "connect", ?address); + let _enter = span.enter(); + match address { + Address::Tcp(a) => { + info!("connecting"); + let tcp_stream = match TcpStream::connect(&a) { + Err(err) => { + error!("could not open connection: {}", err); + return; + }, + Ok(s) => s, + }; + let worker = Self::get_lowest_worker(&mio_workers); + worker.register( + TokenObjects::TcpChannel(TcpChannel::new(tcp_stream)), + Ready::readable(), + PollOpt::edge(), + ); + }, + Address::Udp(_) => unimplemented!("lazy me"), + } + }); + Participant { addr: addr.clone() } + } pub fn open(&self, part: Participant, prio: u8, prom: EnumSet) -> Stream { Stream {} } pub fn close(&self, stream: Stream) {} } - -fn master_poll_worker(internal_sync: Arc>) { - let mut events = mio::Events::with_capacity(1024); - loop { - let internal_sync = internal_sync.write().unwrap(); - if let Err(err) = internal_sync - .poll - .poll(&mut events, Some(Duration::from_millis(1))) - { - //postbox_tx.send(Err(err.into()))?; - return; - } - - for event in &events { - match internal_sync.tokens.get(&event.token()) { - Some(e) => { - match e { - TokenObjects::TCP_LISTENER(listener) => { - match listener.accept() { - Ok((stream, _)) => {}, /* PostBox::from_tcpstream(stream) */ - Err(err) => {}, /* Err(err.into()) */ - } - }, - } - }, - None => panic!("Unexpected event token '{:?}'", &event.token()), - }; - } - } -} - -impl Address { - pub fn getProtocol(&self) -> Protocol { - match self { - Address::Tcp(_) => Protocol::Tcp, - Address::Udp(_) => Protocol::Udp, - } - } -} diff --git a/network/src/frame.rs b/network/src/frame.rs new file mode 100644 index 0000000000..4933fcc51c --- /dev/null +++ b/network/src/frame.rs @@ -0,0 +1,12 @@ +#[derive(Debug)] +pub enum TcpFrame { + Header { + id: u64, + length: u64, + }, + Data { + id: u64, + frame_no: u64, + data: Vec, + }, +} diff --git a/network/src/internal.rs b/network/src/internal.rs new file mode 100644 index 0000000000..5a63f7c28a --- /dev/null +++ b/network/src/internal.rs @@ -0,0 +1,35 @@ +use crate::api::Address; + +pub(crate) trait Channel { + fn get_preferred_queue_size() -> usize; + fn get_preferred_buffer_len() -> usize; + fn queue(&self, msg: Vec); + fn recv(&self) -> Option>; +} + +#[derive(Debug)] +pub(crate) enum TcpFrame { + Header { + id: u64, + length: u64, + }, + Data { + id: u64, + frame_no: u64, + data: Vec, + }, +} + +pub(crate) enum Protocol { + Tcp, + Udp, +} + +impl Address { + pub(crate) fn get_protocol(&self) -> Protocol { + match self { + Address::Tcp(_) => Protocol::Tcp, + Address::Udp(_) => Protocol::Udp, + } + } +} diff --git a/network/src/internal_messages.rs b/network/src/internal_messages.rs new file mode 100644 index 0000000000..71f3660341 --- /dev/null +++ b/network/src/internal_messages.rs @@ -0,0 +1,107 @@ +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use std::io::{Read, Write}; +use tracing::*; + +const VELOREN_MAGIC_NUMBER: &str = "VELOREN"; +const VELOREN_NETWORK_VERSION_MAJOR: u16 = 0; +const VELOREN_NETWORK_VERSION_MINOR: u8 = 0; +const VELOREN_NETWORK_VERSION_PATCH: u8 = 1; + +pub fn encode_handshake1(stream: &mut W, participant_id: u64) { + stream.write_all(VELOREN_MAGIC_NUMBER.as_bytes()).unwrap(); + stream.write_u8('\n' as u8).unwrap(); + stream + .write_u16::(VELOREN_NETWORK_VERSION_MAJOR) + .unwrap(); + stream.write_u8('.' as u8).unwrap(); + stream.write_u8(VELOREN_NETWORK_VERSION_MINOR).unwrap(); + stream.write_u8('.' as u8).unwrap(); + stream.write_u8(VELOREN_NETWORK_VERSION_PATCH).unwrap(); + stream.write_u8('\n' as u8).unwrap(); + stream.write_u64::(participant_id).unwrap(); + stream.write_u8('\n' as u8).unwrap(); +} + +pub fn decode_handshake1(stream: &mut R) -> Result<(u16, u8, u8, u64), ()> { + let mut veloren_buf: [u8; 7] = [0; 7]; + let mut major; + let mut minor; + let mut patch; + let mut participant_id; + match stream.read_exact(&mut veloren_buf) { + Ok(()) if (veloren_buf == VELOREN_MAGIC_NUMBER.as_bytes()) => {}, + _ => { + error!(?veloren_buf, "incompatible magic number"); + return Err(()); + }, + } + match stream.read_u8().map(|u| u as char) { + Ok('\n') => {}, + _ => return Err(()), + } + match stream.read_u16::() { + Ok(u) => major = u, + _ => return Err(()), + } + match stream.read_u8().map(|u| u as char) { + Ok('.') => {}, + _ => return Err(()), + } + match stream.read_u8() { + Ok(u) => minor = u, + _ => return Err(()), + } + match stream.read_u8().map(|u| u as char) { + Ok('.') => {}, + _ => return Err(()), + } + match stream.read_u8() { + Ok(u) => patch = u, + _ => return Err(()), + } + match stream.read_u8().map(|u| u as char) { + Ok('\n') => {}, + _ => return Err(()), + } + match stream.read_u64::() { + Ok(u) => participant_id = u, + _ => return Err(()), + } + Ok((major, minor, patch, participant_id)) +} + +#[cfg(test)] +mod tests { + use crate::{internal_messages::*, tests::test_tracing}; + + #[test] + fn handshake() { + let mut data = Vec::new(); + encode_handshake1(&mut data, 1337); + let dh = decode_handshake1(&mut data.as_slice()); + assert!(dh.is_ok()); + let (ma, mi, pa, p) = dh.unwrap(); + assert_eq!(ma, VELOREN_NETWORK_VERSION_MAJOR); + assert_eq!(mi, VELOREN_NETWORK_VERSION_MINOR); + assert_eq!(pa, VELOREN_NETWORK_VERSION_PATCH); + assert_eq!(p, 1337); + } + + #[test] + fn handshake_decodeerror_incorrect() { + let mut data = Vec::new(); + encode_handshake1(&mut data, 1337); + data[3] = 'F' as u8; + let dh = decode_handshake1(&mut data.as_slice()); + assert!(dh.is_err()); + } + + #[test] + fn handshake_decodeerror_toless() { + let mut data = Vec::new(); + encode_handshake1(&mut data, 1337); + data.drain(9..); + let dh = decode_handshake1(&mut data.as_slice()); + assert!(dh.is_err()); + } +} diff --git a/network/src/lib.rs b/network/src/lib.rs index 45ca1c46ab..e5b5b8a15e 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -1,24 +1,44 @@ #![feature(trait_alias)] mod api; +mod frame; +mod internal; +mod internal_messages; mod message; -mod protocol; +mod mio_worker; +mod tcp_channel; #[cfg(test)] -mod tests { +pub mod tests { use crate::api::*; - use std::net::SocketAddr; + use std::{net::SocketAddr, sync::Arc}; + use uuid::Uuid; + use uvth::ThreadPoolBuilder; struct N { id: u8, } + impl Events for N { - fn OnRemoteConnectionOpen(net: &Network, con: &Connection) {} + fn on_remote_connection_open(_net: &Network, _con: &Connection) {} - fn OnRemoteConnectionClose(net: &Network, con: &Connection) {} + fn on_remote_connection_close(_net: &Network, _con: &Connection) {} - fn OnRemoteStreamOpen(net: &Network, st: &Stream) {} + fn on_remote_stream_open(_net: &Network, _st: &Stream) {} - fn OnRemoteStreamClose(net: &Network, st: &Stream) {} + fn on_remote_stream_close(_net: &Network, _st: &Stream) {} + } + + pub fn test_tracing() { + use tracing::Level; + use tracing_subscriber; + + tracing_subscriber::FmtSubscriber::builder() + // all spans/events with a level higher than TRACE (e.g, info, warn, etc.) + // will be written to stdout. + .with_max_level(Level::TRACE) + //.with_env_filter("veloren_network::api=info,my_crate::my_mod=debug,[my_span]=trace") + // sets this to be the default, global subscriber for this application. + .init(); } #[test] @@ -28,14 +48,21 @@ mod tests { #[test] fn client_server() { - let n1 = Network::::new(); - let n2 = Network::::new(); - let a1s = Address::Tcp(SocketAddr::from(([0, 0, 0, 0], 52000u16))); - let a1 = Address::Tcp(SocketAddr::from(([1, 0, 0, 127], 52000u16))); - let a2s = Address::Tcp(SocketAddr::from(([0, 0, 0, 0], 52001u16))); - let a2 = Address::Tcp(SocketAddr::from(([1, 0, 0, 127], 52001u16))); - n1.listen(&a1s); //await - n2.listen(&a2s); // only requiered here, but doesnt hurt on n1 + let thread_pool = Arc::new( + ThreadPoolBuilder::new() + .name("veloren-network-test".into()) + .build(), + ); + test_tracing(); + let n1 = Network::::new(Uuid::new_v4(), thread_pool.clone()); + let n2 = Network::::new(Uuid::new_v4(), thread_pool.clone()); + let a1 = Address::Tcp(SocketAddr::from(([10, 52, 0, 101], 52000))); + let a2 = Address::Tcp(SocketAddr::from(([10, 52, 0, 101], 52001))); + //let a1 = Address::Tcp(SocketAddr::from(([10, 42, 2, 2], 52000))); + //let a2 = Address::Tcp(SocketAddr::from(([10, 42, 2, 2], 52001))); + n1.listen(&a1); //await + n2.listen(&a2); // only requiered here, but doesnt hurt on n1 + std::thread::sleep(std::time::Duration::from_millis(5)); let p1 = n1.connect(&a2); //await //n2.OnRemoteConnectionOpen triggered @@ -48,5 +75,7 @@ mod tests { n1.close(s1); //n2.OnRemoteStreamClose triggered + + std::thread::sleep(std::time::Duration::from_millis(20000)); } } diff --git a/network/src/message.rs b/network/src/message.rs index a7b5fffc5d..75cdb706ad 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -1,20 +1,29 @@ use bincode; use serde::{Deserialize, Serialize}; -use std::{collections::VecDeque, sync::Arc}; +//use std::collections::VecDeque; +use std::sync::Arc; pub trait Message<'a> = Serialize + Deserialize<'a>; -struct MessageBuffer { +#[derive(Debug)] +pub(crate) struct MessageBuffer { // use VecDeque for msg storage, because it allows to quickly remove data from front. //however VecDeque needs custom bincode code, but it's possible data: Vec, } -struct OutGoingMessage { +#[derive(Debug)] +pub(crate) struct OutGoingMessage { buffer: Arc, cursor: u64, } -fn serialize<'a, M: Message<'a>>(message: &M) -> MessageBuffer { +#[derive(Debug)] +pub(crate) struct InCommingMessage { + buffer: MessageBuffer, + cursor: u64, +} + +pub(crate) fn serialize<'a, M: Message<'a>>(message: &M) -> MessageBuffer { let mut writer = { let actual_size = bincode::serialized_size(message).unwrap(); Vec::::with_capacity(actual_size as usize) diff --git a/network/src/mio_worker.rs b/network/src/mio_worker.rs new file mode 100644 index 0000000000..4aa7d591b6 --- /dev/null +++ b/network/src/mio_worker.rs @@ -0,0 +1,291 @@ +use crate::tcp_channel::TcpChannel; +use mio::{self, net::TcpListener, Poll, PollOpt, Ready, Token}; +use rand::{self, seq::IteratorRandom}; +use std::{ + collections::HashMap, + io::{Read, Write}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + time::{Duration, Instant}, +}; +use tracing::{debug, error, info, span, trace, warn, Level}; +use uvth::ThreadPool; + +#[derive(Debug)] +pub(crate) enum TokenObjects { + TcpListener(TcpListener), + TcpChannel(TcpChannel), +} + +pub(crate) struct MioTokens { + next_token_id: usize, + pub tokens: HashMap, //TODO: move to Vec for faster lookup +} + +impl MioTokens { + pub fn new() -> Self { + MioTokens { + next_token_id: 10, + tokens: HashMap::new(), + } + } + + pub fn construct(&mut self) -> Token { + let tok = Token(self.next_token_id); + self.next_token_id += 1; + tok + } + + pub fn insert(&mut self, tok: Token, obj: TokenObjects) { + trace!(?tok, ?obj, "added new token"); + self.tokens.insert(tok, obj); + } +} + +// MioStatistics should be copied in order to not hold locks for long +#[derive(Clone, Default)] +pub struct MioStatistics { + nano_wait: u128, + nano_busy: u128, +} + +/* + The MioWorker runs in it's own thread, + it has a given set of Channels to work with. + It is monitored, and when it's thread is fully loaded it can be splitted up into 2 MioWorkers +*/ +pub struct MioWorker { + worker_tag: u64, /* only relevant for logs */ + poll: Arc, + mio_tokens: Arc>, + mio_statistics: Arc>, + shutdown: Arc, +} + +impl MioWorker { + const CTRL_TOK: Token = Token(0); + + pub fn new(worker_tag: u64, thread_pool: Arc) -> Self { + let poll = Arc::new(Poll::new().unwrap()); + let poll_clone = poll.clone(); + let mio_tokens = Arc::new(RwLock::new(MioTokens::new())); + let mio_tokens_clone = mio_tokens.clone(); + let mio_statistics = Arc::new(RwLock::new(MioStatistics::default())); + let mio_statistics_clone = mio_statistics.clone(); + let shutdown = Arc::new(AtomicBool::new(false)); + let shutdown_clone = shutdown.clone(); + + let mw = MioWorker { + worker_tag, + poll, + mio_tokens, + mio_statistics, + shutdown, + }; + thread_pool.execute(move || { + mio_worker( + worker_tag, + poll_clone, + mio_tokens_clone, + mio_statistics_clone, + shutdown_clone, + ) + }); + mw + } + + pub fn get_load_ratio(&self) -> f32 { + let statistics = self.mio_statistics.read().unwrap(); + statistics.nano_busy as f32 / (statistics.nano_busy + statistics.nano_wait + 1) as f32 + } + + //TODO: split 4->5 MioWorkers and merge 5->4 MioWorkers + pub fn split(&self, worker_id: u64, thread_pool: Arc) -> Self { + //fork off a second MioWorker and split load + let second = MioWorker::new(worker_id, thread_pool); + { + let mut first_tokens = self.mio_tokens.write().unwrap(); + let mut second_tokens = second.mio_tokens.write().unwrap(); + let cnt = first_tokens.tokens.len() / 2; + + for (key, val) in first_tokens + .tokens + .drain() + .choose_multiple(&mut rand::thread_rng(), cnt / 2) + { + second_tokens.tokens.insert(key, val); + } + info!( + "split MioWorker with {} tokens. New MioWorker has now {} tokens", + cnt, + second_tokens.tokens.len() + ); + } + second + } + + pub fn merge(&self, other: MioWorker) { + //fork off a second MioWorker and split load + let mut first_tokens = self.mio_tokens.write().unwrap(); + let mut second_tokens = other.mio_tokens.write().unwrap(); + let cnt = first_tokens.tokens.len(); + + for (key, val) in second_tokens.tokens.drain() { + first_tokens.tokens.insert(key, val); + } + info!( + "merge MioWorker with {} tokens. New MioWorker has now {} tokens", + cnt, + first_tokens.tokens.len() + ); + } + + pub(crate) fn register(&self, handle: TokenObjects, interest: Ready, opts: PollOpt) { + let mut tokens = self.mio_tokens.write().unwrap(); + let tok = tokens.construct(); + match &handle { + TokenObjects::TcpListener(h) => self.poll.register(h, tok, interest, opts).unwrap(), + TokenObjects::TcpChannel(channel) => self + .poll + .register(&channel.stream, tok, interest, opts) + .unwrap(), + } + trace!(?handle, ?tok, "registered"); + tokens.insert(tok, handle); + } +} + +impl Drop for MioWorker { + fn drop(&mut self) { self.shutdown.store(true, Ordering::Relaxed); } +} + +fn mio_worker( + worker_tag: u64, + poll: Arc, + mio_tokens: Arc>, + mio_statistics: Arc>, + shutdown: Arc, +) { + let mut events = mio::Events::with_capacity(1024); + let span = span!(Level::INFO, "mio worker", ?worker_tag); + let _enter = span.enter(); + while !shutdown.load(Ordering::Relaxed) { + let time_before_poll = Instant::now(); + if let Err(err) = poll.poll(&mut events, Some(Duration::from_millis(1000))) { + error!("network poll error: {}", err); + return; + } + let time_after_poll = Instant::now(); + + if !events.is_empty() { + let mut mio_tokens = mio_tokens.write().unwrap(); + for event in &events { + match mio_tokens.tokens.get_mut(&event.token()) { + Some(e) => { + trace!(?event, "event"); + match e { + TokenObjects::TcpListener(listener) => match listener.accept() { + Ok((mut remote_stream, _)) => { + info!(?remote_stream, "remote connected"); + remote_stream.write_all("Hello Client".as_bytes()).unwrap(); + remote_stream.flush().unwrap(); + + let tok = mio_tokens.construct(); + poll.register( + &remote_stream, + tok, + Ready::readable() | Ready::writable(), + PollOpt::edge(), + ) + .unwrap(); + trace!(?remote_stream, ?tok, "registered"); + mio_tokens.tokens.insert( + tok, + TokenObjects::TcpChannel(TcpChannel::new(remote_stream)), + ); + }, + Err(err) => { + error!(?err, "error during remote connected"); + }, + }, + TokenObjects::TcpChannel(channel) => { + if event.readiness().is_readable() { + trace!(?channel.stream, "stream readable"); + //TODO: read values here and put to message assembly + let mut buf: [u8; 1500] = [0; 1500]; + match channel.stream.read(&mut buf) { + Ok(n) => { + warn!("incomming message with len: {}", n); + channel + .to_receive + .write() + .unwrap() + .push_back(buf.to_vec()); + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + debug!("would block"); + }, + Err(e) => { + panic!("{}", e); + }, + }; + } + if event.readiness().is_writable() { + debug!(?channel.stream, "stream writeable"); + let mut to_send = channel.to_send.write().unwrap(); + if let Some(mut data) = to_send.pop_front() { + let total = data.len(); + match channel.stream.write(&data) { + Ok(n) if n == total => {}, + Ok(n) => { + debug!("could only send part"); + let data = data.drain(n..).collect(); //TODO: validate n.. is correct + to_send.push_front(data); + }, + Err(e) + if e.kind() == std::io::ErrorKind::WouldBlock => + { + debug!("would block"); + } + Err(e) => { + panic!("{}", e); + }, + }; + }; + } + }, + _ => unimplemented!("still lazy me"), + } + }, + None => panic!("Unexpected event token '{:?}'", &event.token()), + }; + } + } + let time_after_work = Instant::now(); + match mio_statistics.try_write() { + Ok(mut mio_statistics) => { + const OLD_KEEP_FACTOR: f64 = 0.995; + //in order to weight new data stronger than older we fade them out with a + // factor < 1. for 0.995 under full load (500 ticks a 1ms) we keep 8% of the old + // value this means, that we start to see load comming up after + // 500ms, but not for small spikes - as reordering for smaller spikes would be + // to slow + mio_statistics.nano_wait = (mio_statistics.nano_wait as f64 * OLD_KEEP_FACTOR) + as u128 + + time_after_poll.duration_since(time_before_poll).as_nanos(); + mio_statistics.nano_busy = (mio_statistics.nano_busy as f64 * OLD_KEEP_FACTOR) + as u128 + + time_after_work.duration_since(time_after_poll).as_nanos(); + + trace!( + "current Load {}", + mio_statistics.nano_busy as f32 + / (mio_statistics.nano_busy + mio_statistics.nano_wait + 1) as f32 + ); + }, + Err(e) => warn!("statistics dropped because they are currently accecssed"), + } + } +} diff --git a/network/src/protocol.rs b/network/src/protocol.rs deleted file mode 100644 index e46e33610d..0000000000 --- a/network/src/protocol.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub enum Protocol { - Tcp, - Udp, -} diff --git a/network/src/tcp_channel.rs b/network/src/tcp_channel.rs new file mode 100644 index 0000000000..3ce048a9a6 --- /dev/null +++ b/network/src/tcp_channel.rs @@ -0,0 +1,40 @@ +use crate::internal::Channel; +use mio::{self, net::TcpStream}; +use std::{ + collections::VecDeque, + sync::{Arc, RwLock}, +}; +use tracing::{debug, error, info, span, trace, warn, Level}; + +#[derive(Debug)] +pub(crate) struct TcpChannel { + pub stream: TcpStream, + pub to_send: RwLock>>, + pub to_receive: RwLock>>, +} + +impl TcpChannel { + pub fn new(stream: TcpStream) -> Self { + TcpChannel { + stream, + to_send: RwLock::new(VecDeque::new()), + to_receive: RwLock::new(VecDeque::new()), + } + } +} + +impl Channel for TcpChannel { + fn get_preferred_queue_size() -> usize { + 1400 /*TCP MTU is often 1500, minus some headers*/ + //TODO: get this from the underlying network interface + } + + fn get_preferred_buffer_len() -> usize { + 5 + // = 1400*5 = 7000bytes => 0.0056s of buffer on 10Mbit/s network + } + + fn queue(&self, msg: Vec) {} + + fn recv(&self) -> Option> { None } +} diff --git a/network/tools/tcp-loadtest/Cargo.toml b/network/tools/tcp-loadtest/Cargo.toml new file mode 100644 index 0000000000..493712de9a --- /dev/null +++ b/network/tools/tcp-loadtest/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "tcp-loadtest" +version = "0.1.0" +authors = ["Marcel Märtens "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] + +rand = "0.7" \ No newline at end of file diff --git a/network/tools/tcp-loadtest/src/main.rs b/network/tools/tcp-loadtest/src/main.rs new file mode 100644 index 0000000000..ccde9ad9b7 --- /dev/null +++ b/network/tools/tcp-loadtest/src/main.rs @@ -0,0 +1,104 @@ +use std::{ + env, + io::Write, + net::{SocketAddr, TcpStream}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + thread, + time::{Duration, Instant}, +}; +extern crate rand; + +fn setup() -> Result { + let args: Vec = env::args().collect(); + if args.len() < 3 { + println!("usage: tcp-loadtest "); + println!("example: tcp-loadtest 127.0.0.1 52000"); + return Err(1); + } + let a: SocketAddr = format!("{}:{}", args[1], args[2]).parse().unwrap(); + return Ok(a); +} + +fn main() -> Result<(), u32> { + let addr = Arc::new(setup()?); + let data: Arc = Arc::new( + (0..1000000) + .map(|_| (0x20u8 + (rand::random::() * 96.0) as u8) as char) + .collect(), + ); + + let total_bytes_send = Arc::new(AtomicU64::new(0)); + let total_send_count = Arc::new(AtomicU64::new(0)); + let total_finished_threads = Arc::new(AtomicU64::new(0)); + let start_time = Instant::now(); + + let mut threads = Vec::new(); + let thread_count = 4; + for i in 0..thread_count { + let addr = addr.clone(); + let total_bytes_send = total_bytes_send.clone(); + let total_send_count = total_send_count.clone(); + let total_finished_threads = total_finished_threads.clone(); + let data = data.clone(); + threads.push(thread::spawn(move || { + let mut stream = match TcpStream::connect(addr.as_ref()) { + Err(err) => { + total_finished_threads.fetch_add(1, Ordering::Relaxed); + panic!("could not open connection: {}", err); + }, + Ok(s) => s, + }; + let mut thread_bytes_send: u64 = 0; + let mut thread_last_sync = Instant::now(); + + loop { + let tosend: u64 = rand::random::() as u64 * 10 + 1000; + thread_bytes_send += tosend; + + let cur = Instant::now(); + if cur.duration_since(thread_last_sync) >= Duration::from_secs(1) { + thread_last_sync = cur; + println!("[{}]send: {}MiB/s", i, thread_bytes_send / (1024 * 1024)); + total_bytes_send.fetch_add(thread_bytes_send, Ordering::Relaxed); + thread_bytes_send = 0; + } + + total_send_count.fetch_add(1, Ordering::Relaxed); + let ret = stream.write_all(data[0..(tosend as usize)].as_bytes()); + if ret.is_err() { + println!("[{}] error: {}", i, ret.err().unwrap()); + total_finished_threads.fetch_add(1, Ordering::Relaxed); + return; + } + //stream.flush(); + } + })); + } + + while total_finished_threads.load(Ordering::Relaxed) < thread_count { + thread::sleep(Duration::from_millis(10)); + } + + let cur = Instant::now(); + let dur = cur.duration_since(start_time); + println!("================"); + println!("test endet"); + println!( + "total send: {}MiB", + total_bytes_send.load(Ordering::Relaxed) / (1024 * 1024) + ); + println!("total time: {}s", dur.as_secs()); + println!( + "average: {}KiB/s", + total_bytes_send.load(Ordering::Relaxed) * 1000 / dur.as_millis() as u64 / 1024 + ); + println!( + "send count: {}/s", + total_send_count.load(Ordering::Relaxed) * 1000 / dur.as_millis() as u64 + ); + + Ok(()) +}