diff --git a/Cargo.lock b/Cargo.lock index bd0dcb62ec..0642e1e5a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,12 +225,6 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eab1c04a571841102f5345a8fc0f6bb3d31c315dec879b5c6e42e40ce7ffa34e" -[[package]] -name = "ascii" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbf56136a5198c7b01a49e3afcbef6cf84597273d298f54432926024107b0109" - [[package]] name = "assets_manager" version = "0.4.3" @@ -723,7 +717,7 @@ version = "3.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da3da6baa321ec19e1cc41d31bf599f00c783d0517095cdaf0332e3fe8d20680" dependencies = [ - "ascii 0.9.3", + "ascii", "byteorder", "either", "memchr", @@ -2341,6 +2335,16 @@ dependencies = [ "http", ] +[[package]] +name = "http-body" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2861bd27ee074e5ee891e8b539837a9430012e249d7f0ca2d795650f579c1994" +dependencies = [ + "bytes 1.0.1", + "http", +] + [[package]] name = "httparse" version = "1.3.5" @@ -2371,7 +2375,7 @@ dependencies = [ "futures-util", "h2", "http", - "http-body", + "http-body 0.3.1", "httparse", "httpdate", "itoa", @@ -2383,6 +2387,29 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8e946c2b1349055e0b72ae281b238baf1a3ea7307c7e9f9d64673bdd9c26ac7" +dependencies = [ + "bytes 1.0.1", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body 0.4.0", + "httparse", + "httpdate", + "itoa", + "pin-project 1.0.5", + "socket2", + "tokio 1.2.0", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper-rustls" version = "0.21.0" @@ -2391,7 +2418,7 @@ checksum = "37743cc83e8ee85eacfce90f2f4102030d9ff0a95244098d781e9bee4a90abb6" dependencies = [ "bytes 0.5.6", "futures-util", - "hyper", + "hyper 0.13.10", "log", "rustls 0.18.1", "tokio 0.2.25", @@ -3952,6 +3979,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prometheus-hyper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc47fa532a12d544229015dd3fae32394949af098b8fe9a327b8c1e4c911d1c8" +dependencies = [ + "hyper 0.14.4", + "prometheus", + "tokio 1.2.0", + "tracing", +] + [[package]] name = "publicsuffix" version = "1.5.4" @@ -4230,8 +4269,8 @@ dependencies = [ "futures-core", "futures-util", "http", - "http-body", - "hyper", + "http-body 0.3.1", + "hyper 0.13.10", "hyper-rustls", "ipnet", "js-sys", @@ -5069,19 +5108,6 @@ dependencies = [ "syn 1.0.60", ] -[[package]] -name = "tiny_http" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eded47106b8e52d8ed8119f0ea6e8c0f5881e69783e0297b5a8462958f334bc1" -dependencies = [ - "ascii 1.0.0", - "chrono", - "chunked_transfer", - "log", - "url", -] - [[package]] name = "tinytemplate" version = "1.2.0" @@ -5137,8 +5163,11 @@ dependencies = [ "memchr", "mio 0.7.7", "num_cpus", + "once_cell", "pin-project-lite 0.2.4", + "signal-hook-registry", "tokio-macros", + "winapi 0.3.9", ] [[package]] @@ -5682,6 +5711,7 @@ dependencies = [ "async-trait", "bincode", "bitflags", + "bytes 1.0.1", "clap", "crossbeam-channel 0.5.0", "futures-core", @@ -5689,14 +5719,13 @@ dependencies = [ "lazy_static", "lz-fear", "prometheus", + "prometheus-hyper", "rand 0.8.3", "serde", "shellexpand", - "tiny_http", "tokio 1.2.0", "tokio-stream", "tracing", - "tracing-futures", "tracing-subscriber", "veloren-network-protocol", ] @@ -5708,6 +5737,7 @@ dependencies = [ "async-channel", "async-trait", "bitflags", + "bytes 1.0.1", "criterion", "prometheus", "rand 0.8.3", @@ -5762,6 +5792,7 @@ dependencies = [ "libsqlite3-sys", "portpicker", "prometheus", + "prometheus-hyper", "rand 0.8.3", "rayon", "ron", @@ -5771,7 +5802,6 @@ dependencies = [ "slab", "specs", "specs-idvs", - "tiny_http", "tokio 1.2.0", "tracing", "uvth", diff --git a/client/Cargo.toml b/client/Cargo.toml index fcda01cb65..d8f6cb969a 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -21,7 +21,7 @@ uvth = "3.1.1" futures-util = "0.3.7" futures-executor = "0.3" futures-timer = "3.0" -tokio = { version = "1.0.1", default-features = false, features = ["rt"] } +tokio = { version = "1", default-features = false, features = ["rt"] } image = { version = "0.23.12", default-features = false, features = ["png"] } num = "0.3.1" num_cpus = "1.10.1" diff --git a/network/Cargo.toml b/network/Cargo.toml index f548278896..de6e028176 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -14,7 +14,7 @@ default = ["metrics","compression"] [dependencies] -network-protocol = { package = "veloren-network-protocol", path = "protocol", default-features = false } +network-protocol = { package = "veloren-network-protocol", path = "protocol" } #serialisation bincode = "1.3.1" @@ -24,8 +24,7 @@ crossbeam-channel = "0.5" tokio = { version = "1.2", default-features = false, features = ["io-util", "macros", "rt", "net", "time"] } tokio-stream = { version = "0.1.2", default-features = false } #tracing and metrics -tracing = { version = "0.1", default-features = false } -tracing-futures = "0.2" +tracing = { version = "0.1", default-features = false, features = ["attributes"]} prometheus = { version = "0.11", default-features = false, optional = true } #async futures-core = { version = "0.3", default-features = false } @@ -39,12 +38,25 @@ bitflags = "1.2.1" lz-fear = { version = "0.1.1", optional = true } # async traits async-trait = "0.1.42" +bytes = "^1" [dev-dependencies] tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] } -tokio = { version = "1.1.0", default-features = false, features = ["io-std", "fs", "rt-multi-thread"] } +tokio = { version = "1.2", default-features = false, features = ["io-std", "fs", "rt-multi-thread"] } futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] } clap = { version = "2.33", default-features = false } shellexpand = "2.0.0" -tiny_http = "0.8.0" serde = { version = "1.0", features = ["derive"] } +prometheus-hyper = "0.1.1" + +[[example]] +name = "fileshare" + +[[example]] +name = "network-speed" + +[[example]] +name = "chat" + +[[example]] +name = "tcp_loadtest" diff --git a/network/examples/network-speed/main.rs b/network/examples/network-speed/main.rs index 9814cec998..37d076b5bd 100644 --- a/network/examples/network-speed/main.rs +++ b/network/examples/network-speed/main.rs @@ -3,11 +3,12 @@ /// (cd network/examples/network-speed && RUST_BACKTRACE=1 cargo run --profile=debuginfo -Z unstable-options -- --trace=error --protocol=tcp --mode=server) /// (cd network/examples/network-speed && RUST_BACKTRACE=1 cargo run --profile=debuginfo -Z unstable-options -- --trace=error --protocol=tcp --mode=client) /// ``` -mod metrics; - use clap::{App, Arg}; +use prometheus::Registry; +use prometheus_hyper::Server; use serde::{Deserialize, Serialize}; use std::{ + net::SocketAddr, sync::Arc, thread, time::{Duration, Instant}, @@ -121,9 +122,13 @@ fn main() { } fn server(address: ProtocolAddr, runtime: Arc) { - let mut metrics = metrics::SimpleMetrics::new(); - let server = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), metrics.registry()); - metrics.run("0.0.0.0:59112".parse().unwrap()).unwrap(); + let registry = Arc::new(Registry::new()); + let server = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), ®istry); + runtime.spawn(Server::run( + Arc::clone(®istry), + SocketAddr::from(([0; 4], 59112)), + futures_util::future::pending(), + )); runtime.block_on(server.listen(address)).unwrap(); loop { @@ -148,9 +153,13 @@ fn server(address: ProtocolAddr, runtime: Arc) { } fn client(address: ProtocolAddr, runtime: Arc) { - let mut metrics = metrics::SimpleMetrics::new(); - let client = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), metrics.registry()); - metrics.run("0.0.0.0:59111".parse().unwrap()).unwrap(); + let registry = Arc::new(Registry::new()); + let client = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), ®istry); + runtime.spawn(Server::run( + Arc::clone(®istry), + SocketAddr::from(([0; 4], 59111)), + futures_util::future::pending(), + )); let p1 = runtime.block_on(client.connect(address)).unwrap(); //remote representation of p1 let mut s1 = runtime diff --git a/network/examples/network-speed/metrics.rs b/network/examples/network-speed/metrics.rs deleted file mode 100644 index 657a00abf4..0000000000 --- a/network/examples/network-speed/metrics.rs +++ /dev/null @@ -1,92 +0,0 @@ -use prometheus::{Encoder, Registry, TextEncoder}; -use std::{ - error::Error, - net::SocketAddr, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread, -}; -use tracing::*; - -pub struct SimpleMetrics { - running: Arc, - handle: Option>, - registry: Option, -} - -impl SimpleMetrics { - pub fn new() -> Self { - let running = Arc::new(AtomicBool::new(false)); - let registry = Some(Registry::new()); - - Self { - running, - handle: None, - registry, - } - } - - pub fn registry(&self) -> &Registry { - match self.registry { - Some(ref r) => r, - None => panic!("You cannot longer register new metrics after the server has started!"), - } - } - - pub fn run(&mut self, addr: SocketAddr) -> Result<(), Box> { - self.running.store(true, Ordering::Relaxed); - let running2 = self.running.clone(); - - let registry = self - .registry - .take() - .expect("ServerMetrics must be already started"); - - //TODO: make this a job - self.handle = Some(thread::spawn(move || { - let server = tiny_http::Server::http(addr).unwrap(); - const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1); - debug!("starting tiny_http server to serve metrics"); - while running2.load(Ordering::Relaxed) { - let request = match server.recv_timeout(TIMEOUT) { - Ok(Some(rq)) => rq, - Ok(None) => continue, - Err(e) => { - println!("Error: {}", e); - break; - }, - }; - let mf = registry.gather(); - let encoder = TextEncoder::new(); - let mut buffer = vec![]; - encoder - .encode(&mf, &mut buffer) - .expect("Failed to encoder metrics text."); - let response = tiny_http::Response::from_string( - String::from_utf8(buffer).expect("Failed to parse bytes as a string."), - ); - if let Err(e) = request.respond(response) { - error!( - ?e, - "The metrics HTTP server had encountered and error with answering" - ) - } - } - debug!("Stopping tiny_http server to serve metrics"); - })); - Ok(()) - } -} - -impl Drop for SimpleMetrics { - fn drop(&mut self) { - self.running.store(false, Ordering::Relaxed); - let handle = self.handle.take(); - handle - .expect("ServerMetrics worker handle does not exist.") - .join() - .expect("Error shutting down prometheus metric exporter"); - } -} diff --git a/network/protocol/Cargo.toml b/network/protocol/Cargo.toml index e097314b6b..a9bd701940 100644 --- a/network/protocol/Cargo.toml +++ b/network/protocol/Cargo.toml @@ -9,6 +9,7 @@ edition = "2018" [features] metrics = ["prometheus"] +trace_pedantic = [] # use for debug only default = ["metrics"] @@ -22,6 +23,7 @@ bitflags = "1.2.1" rand = { version = "0.8" } # async traits async-trait = "0.1.42" +bytes = "^1" [dev-dependencies] async-channel = "1.5.1" diff --git a/network/protocol/benches/protocols.rs b/network/protocol/benches/protocols.rs index 5151083b98..f9ad557682 100644 --- a/network/protocol/benches/protocols.rs +++ b/network/protocol/benches/protocols.rs @@ -1,5 +1,6 @@ use async_channel::*; use async_trait::async_trait; +use bytes::BytesMut; use criterion::{criterion_group, criterion_main, Criterion}; use std::{sync::Arc, time::Duration}; use veloren_network_protocol::{ @@ -8,7 +9,7 @@ use veloren_network_protocol::{ Sid, TcpRecvProtcol, TcpSendProtcol, UnreliableDrain, UnreliableSink, _internal::Frame, }; -fn frame_serialize(frame: Frame, buffer: &mut [u8]) -> usize { frame.to_bytes(buffer).0 } +fn frame_serialize(frame: Frame, buffer: &mut BytesMut) { frame.to_bytes(buffer); } async fn mpsc_msg(buffer: Arc) { // Arrrg, need to include constructor here @@ -102,7 +103,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.to_async(rt()).iter(|| mpsc_handshake()) }); - let mut buffer = [0u8; 1500]; + let mut buffer = BytesMut::with_capacity(1500); c.bench_function("frame_serialize_short", |b| { let frame = Frame::Data { @@ -110,7 +111,7 @@ fn criterion_benchmark(c: &mut Criterion) { start: 89u64, data: b"hello_world".to_vec(), }; - b.iter(move || frame_serialize(frame.clone(), &mut buffer)) + b.iter(|| frame_serialize(frame.clone(), &mut buffer)) }); c.bench_function("tcp_short_msg", |b| { @@ -126,6 +127,11 @@ fn criterion_benchmark(c: &mut Criterion) { b.to_async(rt()) .iter(|| tcp_msg(Arc::clone(&buffer), 10_000)) }); + c.bench_function("tcp_1000000_tiny_msg", |b| { + let buffer = Arc::new(MessageBuffer { data: vec![3u8; 5] }); + b.to_async(rt()) + .iter(|| tcp_msg(Arc::clone(&buffer), 1_000_000)) + }); } criterion_group!(benches, criterion_benchmark); @@ -164,11 +170,11 @@ mod utils { } pub struct TcpDrain { - sender: Sender>, + sender: Sender, } pub struct TcpSink { - receiver: Receiver>, + receiver: Receiver, } /// emulate Tcp protocol on Channels @@ -219,7 +225,7 @@ mod utils { #[async_trait] impl UnreliableDrain for TcpDrain { - type DataFormat = Vec; + type DataFormat = BytesMut; async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> { self.sender @@ -231,7 +237,7 @@ mod utils { #[async_trait] impl UnreliableSink for TcpSink { - type DataFormat = Vec; + type DataFormat = BytesMut; async fn recv(&mut self) -> Result { self.receiver diff --git a/network/protocol/src/frame.rs b/network/protocol/src/frame.rs index 4824498940..2f1bb3eb4c 100644 --- a/network/protocol/src/frame.rs +++ b/network/protocol/src/frame.rs @@ -1,5 +1,5 @@ use crate::types::{Mid, Pid, Prio, Promises, Sid}; -use std::{collections::VecDeque, convert::TryFrom}; +use bytes::{Buf, BufMut, BytesMut}; // const FRAME_RESERVED_1: u8 = 0; const FRAME_HANDSHAKE: u8 = 1; @@ -62,37 +62,32 @@ impl InitFrame { pub(crate) const RAW_CNS: usize = 2; //provide an appropriate buffer size. > 1500 - pub(crate) fn to_bytes(self, bytes: &mut [u8]) -> usize { + pub(crate) fn to_bytes(self, bytes: &mut BytesMut) { match self { InitFrame::Handshake { magic_number, version, } => { - let x = FRAME_HANDSHAKE.to_be_bytes(); - bytes[0] = x[0]; - bytes[1..8].copy_from_slice(&magic_number); - bytes[8..12].copy_from_slice(&version[0].to_le_bytes()); - bytes[12..16].copy_from_slice(&version[1].to_le_bytes()); - bytes[16..Self::HANDSHAKE_CNS + 1].copy_from_slice(&version[2].to_le_bytes()); - Self::HANDSHAKE_CNS + 1 + bytes.put_u8(FRAME_HANDSHAKE); + bytes.put_slice(&magic_number); + bytes.put_u32_le(version[0]); + bytes.put_u32_le(version[1]); + bytes.put_u32_le(version[2]); }, InitFrame::Init { pid, secret } => { - bytes[0] = FRAME_INIT.to_be_bytes()[0]; - bytes[1..17].copy_from_slice(&pid.to_le_bytes()); - bytes[17..Self::INIT_CNS + 1].copy_from_slice(&secret.to_le_bytes()); - Self::INIT_CNS + 1 + bytes.put_u8(FRAME_INIT); + pid.to_bytes(bytes); + bytes.put_u128_le(secret); }, InitFrame::Raw(data) => { - bytes[0] = FRAME_RAW.to_be_bytes()[0]; - bytes[1..3].copy_from_slice(&(data.len() as u16).to_le_bytes()); - bytes[Self::RAW_CNS + 1..(data.len() + Self::RAW_CNS + 1)] - .clone_from_slice(&data[..]); - Self::RAW_CNS + 1 + data.len() + bytes.put_u8(FRAME_RAW); + bytes.put_u16_le(data.len() as u16); + bytes.put_slice(&data); }, } } - pub(crate) fn to_frame(bytes: Vec) -> Option { + pub(crate) fn to_frame(bytes: &mut BytesMut) -> Option { let frame_no = match bytes.get(0) { Some(&f) => f, None => return None, @@ -102,61 +97,43 @@ impl InitFrame { if bytes.len() < Self::HANDSHAKE_CNS + 1 { return None; } - InitFrame::gen_handshake( - *<&[u8; Self::HANDSHAKE_CNS]>::try_from(&bytes[1..Self::HANDSHAKE_CNS + 1]) - .unwrap(), - ) + bytes.advance(1); + let mut magic_number_bytes = bytes.copy_to_bytes(7); + let mut magic_number = [0u8; 7]; + magic_number_bytes.copy_to_slice(&mut magic_number); + InitFrame::Handshake { + magic_number, + version: [bytes.get_u32_le(), bytes.get_u32_le(), bytes.get_u32_le()], + } }, FRAME_INIT => { if bytes.len() < Self::INIT_CNS + 1 { return None; } - InitFrame::gen_init( - *<&[u8; Self::INIT_CNS]>::try_from(&bytes[1..Self::INIT_CNS + 1]).unwrap(), - ) + bytes.advance(1); + InitFrame::Init { + pid: Pid::from_bytes(bytes), + secret: bytes.get_u128_le(), + } }, FRAME_RAW => { if bytes.len() < Self::RAW_CNS + 1 { return None; } - let length = InitFrame::gen_raw( - *<&[u8; Self::RAW_CNS]>::try_from(&bytes[1..Self::RAW_CNS + 1]).unwrap(), - ); - let mut data = vec![0; length as usize]; - let slice = &bytes[Self::RAW_CNS + 1..]; - if slice.len() != length as usize { - return None; - } - data.copy_from_slice(&bytes[Self::RAW_CNS + 1..]); + bytes.advance(1); + let length = bytes.get_u16_le() as usize; + // lower length is allowed + let max_length = length.min(bytes.len()); + println!("dasdasd {:?}", length); + println!("aaaaa {:?}", max_length); + let mut data = vec![0; max_length]; + data.copy_from_slice(&bytes[..max_length]); InitFrame::Raw(data) }, - _ => InitFrame::Raw(bytes), + _ => InitFrame::Raw(bytes.to_vec()), }; Some(frame) } - - fn gen_handshake(buf: [u8; Self::HANDSHAKE_CNS]) -> Self { - let magic_number = *<&[u8; 7]>::try_from(&buf[0..7]).unwrap(); - InitFrame::Handshake { - magic_number, - version: [ - u32::from_le_bytes(*<&[u8; 4]>::try_from(&buf[7..11]).unwrap()), - u32::from_le_bytes(*<&[u8; 4]>::try_from(&buf[11..15]).unwrap()), - u32::from_le_bytes(*<&[u8; 4]>::try_from(&buf[15..Self::HANDSHAKE_CNS]).unwrap()), - ], - } - } - - fn gen_init(buf: [u8; Self::INIT_CNS]) -> Self { - InitFrame::Init { - pid: Pid::from_le_bytes(*<&[u8; 16]>::try_from(&buf[0..16]).unwrap()), - secret: u128::from_le_bytes(*<&[u8; 16]>::try_from(&buf[16..Self::INIT_CNS]).unwrap()), - } - } - - fn gen_raw(buf: [u8; Self::RAW_CNS]) -> u16 { - u16::from_le_bytes(*<&[u8; 2]>::try_from(&buf[0..Self::RAW_CNS]).unwrap()) - } } impl Frame { @@ -164,82 +141,53 @@ impl Frame { /// const part of the DATA frame, actual size is variable pub(crate) const DATA_CNS: usize = 18; pub(crate) const DATA_HEADER_CNS: usize = 24; - #[cfg(feature = "metrics")] - pub const FRAMES_LEN: u8 = 5; pub(crate) const OPEN_STREAM_CNS: usize = 10; // Size WITHOUT the 1rst indicating byte pub(crate) const SHUTDOWN_CNS: usize = 0; - #[cfg(feature = "metrics")] - pub const fn int_to_string(i: u8) -> &'static str { - match i { - 0 => "Shutdown", - 1 => "OpenStream", - 2 => "CloseStream", - 3 => "DataHeader", - 4 => "Data", - _ => "", - } - } - - #[cfg(feature = "metrics")] - pub fn get_int(&self) -> u8 { - match self { - Frame::Shutdown => 0, - Frame::OpenStream { .. } => 1, - Frame::CloseStream { .. } => 2, - Frame::DataHeader { .. } => 3, - Frame::Data { .. } => 4, - } - } - - #[cfg(feature = "metrics")] - pub fn get_string(&self) -> &str { Self::int_to_string(self.get_int()) } - //provide an appropriate buffer size. > 1500 - pub fn to_bytes(self, bytes: &mut [u8]) -> (/* buf */ usize, /* actual data */ u64) { + pub fn to_bytes(self, bytes: &mut BytesMut) -> u64 { match self { Frame::Shutdown => { - bytes[Self::SHUTDOWN_CNS] = FRAME_SHUTDOWN.to_be_bytes()[0]; - (Self::SHUTDOWN_CNS + 1, 0) + bytes.put_u8(FRAME_SHUTDOWN); + 0 }, Frame::OpenStream { sid, prio, promises, } => { - bytes[0] = FRAME_OPEN_STREAM.to_be_bytes()[0]; - bytes[1..9].copy_from_slice(&sid.to_le_bytes()); - bytes[9] = prio.to_le_bytes()[0]; - bytes[Self::OPEN_STREAM_CNS] = promises.to_le_bytes()[0]; - (Self::OPEN_STREAM_CNS + 1, 0) + bytes.put_u8(FRAME_OPEN_STREAM); + bytes.put_slice(&sid.to_le_bytes()); + bytes.put_u8(prio); + bytes.put_u8(promises.to_le_bytes()[0]); + 0 }, Frame::CloseStream { sid } => { - bytes[0] = FRAME_CLOSE_STREAM.to_be_bytes()[0]; - bytes[1..Self::CLOSE_STREAM_CNS + 1].copy_from_slice(&sid.to_le_bytes()); - (Self::CLOSE_STREAM_CNS + 1, 0) + bytes.put_u8(FRAME_CLOSE_STREAM); + bytes.put_slice(&sid.to_le_bytes()); + 0 }, Frame::DataHeader { mid, sid, length } => { - bytes[0] = FRAME_DATA_HEADER.to_be_bytes()[0]; - bytes[1..9].copy_from_slice(&mid.to_le_bytes()); - bytes[9..17].copy_from_slice(&sid.to_le_bytes()); - bytes[17..Self::DATA_HEADER_CNS + 1].copy_from_slice(&length.to_le_bytes()); - (Self::DATA_HEADER_CNS + 1, 0) + bytes.put_u8(FRAME_DATA_HEADER); + bytes.put_u64_le(mid); + bytes.put_slice(&sid.to_le_bytes()); + bytes.put_u64_le(length); + 0 }, Frame::Data { mid, start, data } => { - bytes[0] = FRAME_DATA.to_be_bytes()[0]; - bytes[1..9].copy_from_slice(&mid.to_le_bytes()); - bytes[9..17].copy_from_slice(&start.to_le_bytes()); - bytes[17..Self::DATA_CNS + 1].copy_from_slice(&(data.len() as u16).to_le_bytes()); - bytes[Self::DATA_CNS + 1..(data.len() + Self::DATA_CNS + 1)] - .clone_from_slice(&data[..]); - (Self::DATA_CNS + 1 + data.len(), data.len() as u64) + bytes.put_u8(FRAME_DATA); + bytes.put_u64_le(mid); + bytes.put_u64_le(start); + bytes.put_u16_le(data.len() as u16); + bytes.put_slice(&data); + data.len() as u64 }, } } - pub(crate) fn to_frame(bytes: &mut VecDeque) -> Option { - let frame_no = match bytes.get(0) { + pub(crate) fn to_frame(bytes: &mut BytesMut) -> Option { + let frame_no = match bytes.first() { Some(&f) => f, None => return None, }; @@ -249,6 +197,9 @@ impl Frame { FRAME_CLOSE_STREAM => Self::CLOSE_STREAM_CNS, FRAME_DATA_HEADER => Self::DATA_HEADER_CNS, FRAME_DATA => { + if bytes.len() < 17 + 1 + 1 { + return None; + } u16::from_le_bytes([bytes[16 + 1], bytes[17 + 1]]) as usize + Self::DATA_CNS }, _ => return None, @@ -260,68 +211,49 @@ impl Frame { let frame = match frame_no { FRAME_SHUTDOWN => { - let _ = bytes.drain(..size + 1); + let _ = bytes.split_to(size + 1); Frame::Shutdown }, FRAME_OPEN_STREAM => { - let bytes = bytes.drain(..size + 1).skip(1).collect::>(); - Frame::gen_open_stream(<[u8; 10]>::try_from(bytes).unwrap()) + let mut bytes = bytes.split_to(size + 1); + bytes.advance(1); + Frame::OpenStream { + sid: Sid::new(bytes.get_u64_le()), + prio: bytes.get_u8(), + promises: Promises::from_bits_truncate(bytes.get_u8()), + } }, FRAME_CLOSE_STREAM => { - let bytes = bytes.drain(..size + 1).skip(1).collect::>(); - Frame::gen_close_stream(<[u8; 8]>::try_from(bytes).unwrap()) + let mut bytes = bytes.split_to(size + 1); + bytes.advance(1); + Frame::CloseStream { + sid: Sid::new(bytes.get_u64_le()), + } }, FRAME_DATA_HEADER => { - let bytes = bytes.drain(..size + 1).skip(1).collect::>(); - Frame::gen_data_header(<[u8; 24]>::try_from(bytes).unwrap()) + let mut bytes = bytes.split_to(size + 1); + bytes.advance(1); + Frame::DataHeader { + mid: bytes.get_u64_le(), + sid: Sid::new(bytes.get_u64_le()), + length: bytes.get_u64_le(), + } }, FRAME_DATA => { - let info = bytes - .drain(..Self::DATA_CNS + 1) - .skip(1) - .collect::>(); - let (mid, start, length) = Frame::gen_data(<[u8; 18]>::try_from(info).unwrap()); + let mut info = bytes.split_to(Self::DATA_CNS + 1); + info.advance(1); + let mid = info.get_u64_le(); + let start = info.get_u64_le(); + let length = info.get_u16_le(); debug_assert_eq!(length as usize, size - Self::DATA_CNS); - let data = bytes.drain(..length as usize).collect::>(); + let data = bytes.split_to(length as usize); + let data = data.to_vec(); Frame::Data { mid, start, data } }, _ => unreachable!("Frame::to_frame should be handled before!"), }; Some(frame) } - - fn gen_open_stream(buf: [u8; Self::OPEN_STREAM_CNS]) -> Self { - Frame::OpenStream { - sid: Sid::from_le_bytes(*<&[u8; 8]>::try_from(&buf[0..8]).unwrap()), - prio: buf[8], - promises: Promises::from_bits_truncate(buf[Self::OPEN_STREAM_CNS - 1]), - } - } - - fn gen_close_stream(buf: [u8; Self::CLOSE_STREAM_CNS]) -> Self { - Frame::CloseStream { - sid: Sid::from_le_bytes( - *<&[u8; 8]>::try_from(&buf[0..Self::CLOSE_STREAM_CNS]).unwrap(), - ), - } - } - - fn gen_data_header(buf: [u8; Self::DATA_HEADER_CNS]) -> Self { - Frame::DataHeader { - mid: Mid::from_le_bytes(*<&[u8; 8]>::try_from(&buf[0..8]).unwrap()), - sid: Sid::from_le_bytes(*<&[u8; 8]>::try_from(&buf[8..16]).unwrap()), - length: u64::from_le_bytes( - *<&[u8; 8]>::try_from(&buf[16..Self::DATA_HEADER_CNS]).unwrap(), - ), - } - } - - fn gen_data(buf: [u8; Self::DATA_CNS]) -> (Mid, u64, u16) { - let mid = Mid::from_le_bytes(*<&[u8; 8]>::try_from(&buf[0..8]).unwrap()); - let start = u64::from_le_bytes(*<&[u8; 8]>::try_from(&buf[8..16]).unwrap()); - let length = u16::from_le_bytes(*<&[u8; 2]>::try_from(&buf[16..Self::DATA_CNS]).unwrap()); - (mid, start, length) - } } #[cfg(test)] @@ -375,10 +307,9 @@ mod tests { #[test] fn initframe_individual() { let dupl = |frame: InitFrame| { - let mut buffer = vec![0u8; 1500]; - let size = InitFrame::to_bytes(frame.clone(), &mut buffer); - buffer.truncate(size); - InitFrame::to_frame(buffer) + let mut buffer = BytesMut::with_capacity(1500); + InitFrame::to_bytes(frame.clone(), &mut buffer); + InitFrame::to_frame(&mut buffer) }; for frame in get_initframes() { @@ -389,29 +320,18 @@ mod tests { #[test] fn initframe_multiple() { - let mut buffer = vec![0u8; 3000]; + let mut buffer = BytesMut::with_capacity(3000); let mut frames = get_initframes(); - let mut last = 0; // to string - let sizes = frames - .iter() - .map(|f| { - let s = InitFrame::to_bytes(f.clone(), &mut buffer[last..]); - last += s; - s - }) - .collect::>(); + for f in &frames { + InitFrame::to_bytes(f.clone(), &mut buffer); + } // from string - let mut last = 0; - let mut framesd = sizes + let mut framesd = frames .iter() - .map(|&s| { - let f = InitFrame::to_frame(buffer[last..last + s].to_vec()); - last += s; - f - }) + .map(|&_| InitFrame::to_frame(&mut buffer)) .collect::>(); // compare @@ -424,10 +344,9 @@ mod tests { #[test] fn frame_individual() { let dupl = |frame: Frame| { - let mut buffer = vec![0u8; 1500]; - let (size, _) = Frame::to_bytes(frame.clone(), &mut buffer); - let mut deque = buffer[..size].iter().map(|b| *b).collect(); - Frame::to_frame(&mut deque) + let mut buffer = BytesMut::with_capacity(1500); + Frame::to_bytes(frame.clone(), &mut buffer); + Frame::to_frame(&mut buffer) }; for frame in get_frames() { @@ -438,31 +357,16 @@ mod tests { #[test] fn frame_multiple() { - let mut buffer = vec![0u8; 3000]; + let mut buffer = BytesMut::with_capacity(3000); let mut frames = get_frames(); - let mut last = 0; // to string - let sizes = frames - .iter() - .map(|f| { - let s = Frame::to_bytes(f.clone(), &mut buffer[last..]).0; - last += s; - s - }) - .collect::>(); - - assert_eq!(sizes[0], 1 + Frame::OPEN_STREAM_CNS); - assert_eq!(sizes[1], 1 + Frame::DATA_HEADER_CNS); - assert_eq!(sizes[2], 1 + Frame::DATA_CNS + 20); - assert_eq!(sizes[3], 1 + Frame::DATA_CNS + 16); - assert_eq!(sizes[4], 1 + Frame::CLOSE_STREAM_CNS); - assert_eq!(sizes[5], 1 + Frame::SHUTDOWN_CNS); - - let mut buffer = buffer.drain(..).collect::>(); + for f in &frames { + Frame::to_bytes(f.clone(), &mut buffer); + } // from string - let mut framesd = sizes + let mut framesd = frames .iter() .map(|&_| Frame::to_frame(&mut buffer)) .collect::>(); @@ -476,32 +380,31 @@ mod tests { #[test] fn frame_exact_size() { - let mut buffer = vec![0u8; Frame::CLOSE_STREAM_CNS+1/*first byte*/]; + const SIZE: usize = Frame::CLOSE_STREAM_CNS+1/*first byte*/; + let mut buffer = BytesMut::with_capacity(SIZE); - let frame1 = Frame::CloseStream { - sid: Sid::new(1337), - }; - let _ = Frame::to_bytes(frame1.clone(), &mut buffer); + let frame1 = Frame::CloseStream { sid: Sid::new(2) }; + Frame::to_bytes(frame1.clone(), &mut buffer); + assert_eq!(buffer.len(), SIZE); let mut deque = buffer.iter().map(|b| *b).collect(); let frame2 = Frame::to_frame(&mut deque); assert_eq!(Some(frame1), frame2); } #[test] - #[should_panic] fn initframe_too_short_buffer() { - let mut buffer = vec![0u8; 10]; + let mut buffer = BytesMut::with_capacity(10); let frame1 = InitFrame::Handshake { magic_number: VELOREN_MAGIC_NUMBER, version: VELOREN_NETWORK_VERSION, }; - let _ = InitFrame::to_bytes(frame1.clone(), &mut buffer); + InitFrame::to_bytes(frame1.clone(), &mut buffer); } #[test] fn initframe_too_less_data() { - let mut buffer = vec![0u8; 20]; + let mut buffer = BytesMut::with_capacity(20); let frame1 = InitFrame::Handshake { magic_number: VELOREN_MAGIC_NUMBER, @@ -509,79 +412,78 @@ mod tests { }; let _ = InitFrame::to_bytes(frame1.clone(), &mut buffer); buffer.truncate(6); // simulate partial retrieve - let frame1d = InitFrame::to_frame(buffer[..6].to_vec()); + let frame1d = InitFrame::to_frame(&mut buffer); assert_eq!(frame1d, None); } #[test] fn initframe_rubish() { - let buffer = b"dtrgwcser".to_vec(); + let mut buffer = BytesMut::from(&b"dtrgwcser"[..]); assert_eq!( - InitFrame::to_frame(buffer), + InitFrame::to_frame(&mut buffer), Some(InitFrame::Raw(b"dtrgwcser".to_vec())) ); } #[test] fn initframe_attack_too_much_length() { - let mut buffer = vec![0u8; 50]; + let mut buffer = BytesMut::with_capacity(50); let frame1 = InitFrame::Raw(b"foobar".to_vec()); let _ = InitFrame::to_bytes(frame1.clone(), &mut buffer); - buffer[2] = 255; - let framed = InitFrame::to_frame(buffer); - assert_eq!(framed, None); + buffer[1] = 255; + let framed = InitFrame::to_frame(&mut buffer); + assert_eq!(framed, Some(frame1)); } #[test] fn initframe_attack_too_low_length() { - let mut buffer = vec![0u8; 50]; + let mut buffer = BytesMut::with_capacity(50); let frame1 = InitFrame::Raw(b"foobar".to_vec()); let _ = InitFrame::to_bytes(frame1.clone(), &mut buffer); - buffer[2] = 3; - let framed = InitFrame::to_frame(buffer); - assert_eq!(framed, None); + buffer[1] = 3; + let framed = InitFrame::to_frame(&mut buffer); + // we accept a different frame here, as it's RAW and debug only! + assert_eq!(framed, Some(InitFrame::Raw(b"foo".to_vec()))); } #[test] - #[should_panic] fn frame_too_short_buffer() { - let mut buffer = vec![0u8; 10]; + let mut buffer = BytesMut::with_capacity(10); let frame1 = Frame::OpenStream { sid: Sid::new(88), promises: Promises::ENCRYPTED, prio: 88, }; - let _ = Frame::to_bytes(frame1.clone(), &mut buffer); + Frame::to_bytes(frame1.clone(), &mut buffer); } #[test] fn frame_too_less_data() { - let mut buffer = vec![0u8; 20]; + let mut buffer = BytesMut::with_capacity(20); let frame1 = Frame::OpenStream { sid: Sid::new(88), promises: Promises::ENCRYPTED, prio: 88, }; - let _ = Frame::to_bytes(frame1.clone(), &mut buffer); + Frame::to_bytes(frame1.clone(), &mut buffer); buffer.truncate(6); // simulate partial retrieve - let mut buffer = buffer.drain(..6).collect::>(); let frame1d = Frame::to_frame(&mut buffer); assert_eq!(frame1d, None); } #[test] fn frame_rubish() { - let mut buffer = b"dtrgwcser".iter().map(|u| *u).collect::>(); + let mut buffer = BytesMut::from(&b"dtrgwcser"[..]); assert_eq!(Frame::to_frame(&mut buffer), None); } #[test] fn frame_attack_too_much_length() { - let mut buffer = vec![0u8; 50]; + let mut buffer = BytesMut::with_capacity(50); let frame1 = Frame::Data { mid: 7u64, @@ -589,16 +491,15 @@ mod tests { data: b"foobar".to_vec(), }; - let _ = Frame::to_bytes(frame1.clone(), &mut buffer); + Frame::to_bytes(frame1.clone(), &mut buffer); buffer[17] = 255; - let mut buffer = buffer.drain(..).collect::>(); let framed = Frame::to_frame(&mut buffer); assert_eq!(framed, None); } #[test] fn frame_attack_too_low_length() { - let mut buffer = vec![0u8; 50]; + let mut buffer = BytesMut::with_capacity(50); let frame1 = Frame::Data { mid: 7u64, @@ -606,9 +507,8 @@ mod tests { data: b"foobar".to_vec(), }; - let _ = Frame::to_bytes(frame1.clone(), &mut buffer); + Frame::to_bytes(frame1.clone(), &mut buffer); buffer[17] = 3; - let mut buffer = buffer.drain(..).collect::>(); let framed = Frame::to_frame(&mut buffer); assert_eq!( framed, @@ -622,13 +522,4 @@ mod tests { let framed = Frame::to_frame(&mut buffer); assert_eq!(framed, None); } - - #[test] - fn frame_int2str() { - assert_eq!(Frame::int_to_string(0), "Shutdown"); - assert_eq!(Frame::int_to_string(1), "OpenStream"); - assert_eq!(Frame::int_to_string(2), "CloseStream"); - assert_eq!(Frame::int_to_string(3), "DataHeader"); - assert_eq!(Frame::int_to_string(4), "Data"); - } } diff --git a/network/protocol/src/io.rs b/network/protocol/src/io.rs index c4e3eba43e..6ccf40e7d8 100644 --- a/network/protocol/src/io.rs +++ b/network/protocol/src/io.rs @@ -1,5 +1,6 @@ use crate::ProtocolError; use async_trait::async_trait; +use bytes::BytesMut; use std::collections::VecDeque; ///! I/O-Free (Sans-I/O) protocol https://sans-io.readthedocs.io/how-to-sans-io.html @@ -17,11 +18,11 @@ pub trait UnreliableSink: Send { } pub struct BaseDrain { - data: VecDeque>, + data: VecDeque, } pub struct BaseSink { - data: VecDeque>, + data: VecDeque, } impl BaseDrain { @@ -44,7 +45,7 @@ impl BaseSink { #[async_trait] impl UnreliableDrain for BaseDrain { - type DataFormat = Vec; + type DataFormat = BytesMut; async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> { self.data.push_back(data); @@ -54,7 +55,7 @@ impl UnreliableDrain for BaseDrain { #[async_trait] impl UnreliableSink for BaseSink { - type DataFormat = Vec; + type DataFormat = BytesMut; async fn recv(&mut self) -> Result { self.data.pop_front().ok_or(ProtocolError::Closed) diff --git a/network/protocol/src/lib.rs b/network/protocol/src/lib.rs index 8d49ed58c9..295d292881 100644 --- a/network/protocol/src/lib.rs +++ b/network/protocol/src/lib.rs @@ -40,6 +40,9 @@ pub trait InitProtocol { pub trait SendProtocol { //a stream MUST be bound to a specific Protocol, there will be a failover // feature comming for the case where a Protocol fails completly + /// use this to notify the sending side of streams that were created/remove + /// from remote + fn notify_from_recv(&mut self, event: ProtocolEvent); async fn send(&mut self, event: ProtocolEvent) -> Result<(), ProtocolError>; async fn flush( &mut self, diff --git a/network/protocol/src/metrics.rs b/network/protocol/src/metrics.rs index 715a06fc9d..0b5c66872a 100644 --- a/network/protocol/src/metrics.rs +++ b/network/protocol/src/metrics.rs @@ -1,8 +1,11 @@ use crate::types::Sid; #[cfg(feature = "metrics")] -use prometheus::{IntCounterVec, IntGaugeVec, Opts, Registry}; +use prometheus::{ + core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge}, + IntCounterVec, IntGaugeVec, Opts, Registry, +}; #[cfg(feature = "metrics")] -use std::{error::Error, sync::Arc}; +use std::{collections::HashMap, error::Error, sync::Arc}; #[allow(dead_code)] pub enum RemoveReason { @@ -57,6 +60,12 @@ pub struct ProtocolMetrics { pub struct ProtocolMetricCache { cid: String, m: Arc, + cache: HashMap, + sdata_frames_t: GenericCounter, + sdata_frames_b: GenericCounter, + rdata_frames_t: GenericCounter, + rdata_frames_b: GenericCounter, + ping: GenericGauge, } #[cfg(not(feature = "metrics"))] @@ -192,179 +201,134 @@ impl ProtocolMetrics { } } +#[cfg(feature = "metrics")] +#[derive(Debug, Clone)] +pub(crate) struct CacheLine { + smsg_it: GenericCounter, + smsg_ib: GenericCounter, + smsg_ot: [GenericCounter; 2], + smsg_ob: [GenericCounter; 2], + rmsg_it: GenericCounter, + rmsg_ib: GenericCounter, + rmsg_ot: [GenericCounter; 2], + rmsg_ob: [GenericCounter; 2], +} + #[cfg(feature = "metrics")] impl ProtocolMetricCache { pub fn new(channel_key: &str, metrics: Arc) -> Self { + let cid = channel_key.to_string(); + let sdata_frames_t = metrics.sdata_frames_t.with_label_values(&[&cid]); + let sdata_frames_b = metrics.sdata_frames_b.with_label_values(&[&cid]); + let rdata_frames_t = metrics.rdata_frames_t.with_label_values(&[&cid]); + let rdata_frames_b = metrics.rdata_frames_b.with_label_values(&[&cid]); + let ping = metrics.ping.with_label_values(&[&cid]); Self { - cid: channel_key.to_string(), + cid, m: metrics, + cache: HashMap::new(), + sdata_frames_t, + sdata_frames_b, + rdata_frames_t, + rdata_frames_b, + ping, } } - pub(crate) fn smsg_it(&self, sid: Sid) { - self.m - .smsg_it - .with_label_values(&[&self.cid, &sid.to_string()]) - .inc(); + pub(crate) fn init_sid(&mut self, sid: Sid) -> &CacheLine { + let cid = &self.cid; + let m = &self.m; + self.cache.entry(sid).or_insert_with_key(|sid| { + let s = sid.to_string(); + let finished = RemoveReason::Finished.to_str(); + let dropped = RemoveReason::Dropped.to_str(); + CacheLine { + smsg_it: m.smsg_it.with_label_values(&[&cid, &s]), + smsg_ib: m.smsg_ib.with_label_values(&[&cid, &s]), + smsg_ot: [ + m.smsg_ot.with_label_values(&[&cid, &s, &finished]), + m.smsg_ot.with_label_values(&[&cid, &s, &dropped]), + ], + smsg_ob: [ + m.smsg_ob.with_label_values(&[&cid, &s, &finished]), + m.smsg_ob.with_label_values(&[&cid, &s, &dropped]), + ], + rmsg_it: m.rmsg_it.with_label_values(&[&cid, &s]), + rmsg_ib: m.rmsg_ib.with_label_values(&[&cid, &s]), + rmsg_ot: [ + m.rmsg_ot.with_label_values(&[&cid, &s, &finished]), + m.rmsg_ot.with_label_values(&[&cid, &s, &dropped]), + ], + rmsg_ob: [ + m.rmsg_ob.with_label_values(&[&cid, &s, &finished]), + m.rmsg_ob.with_label_values(&[&cid, &s, &dropped]), + ], + } + }) } - pub(crate) fn smsg_ib(&self, sid: Sid, bytes: u64) { - self.m - .smsg_ib - .with_label_values(&[&self.cid, &sid.to_string()]) - .inc_by(bytes); + pub(crate) fn smsg_ib(&mut self, sid: Sid, bytes: u64) { + let line = self.init_sid(sid); + line.smsg_it.inc(); + line.smsg_ib.inc_by(bytes); } - pub(crate) fn smsg_ot(&self, sid: Sid, reason: RemoveReason) { - self.m - .smsg_ot - .with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()]) - .inc(); + pub(crate) fn smsg_ob(&mut self, sid: Sid, reason: RemoveReason, bytes: u64) { + let line = self.init_sid(sid); + line.smsg_ot[reason.i()].inc(); + line.smsg_ob[reason.i()].inc_by(bytes); } - pub(crate) fn smsg_ob(&self, sid: Sid, reason: RemoveReason, bytes: u64) { - self.m - .smsg_ob - .with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()]) - .inc_by(bytes); + pub(crate) fn sdata_frames_b(&mut self, bytes: u64) { + self.sdata_frames_t.inc(); + self.sdata_frames_b.inc_by(bytes); } - pub(crate) fn sdata_frames_t(&self) { - self.m.sdata_frames_t.with_label_values(&[&self.cid]).inc(); + pub(crate) fn rmsg_ib(&mut self, sid: Sid, bytes: u64) { + let line = self.init_sid(sid); + line.rmsg_it.inc(); + line.rmsg_ib.inc_by(bytes); } - pub(crate) fn sdata_frames_b(&self, bytes: u64) { - self.m - .sdata_frames_b - .with_label_values(&[&self.cid]) - .inc_by(bytes); + pub(crate) fn rmsg_ob(&mut self, sid: Sid, reason: RemoveReason, bytes: u64) { + let line = self.init_sid(sid); + line.rmsg_ot[reason.i()].inc(); + line.rmsg_ob[reason.i()].inc_by(bytes); } - pub(crate) fn rmsg_it(&self, sid: Sid) { - self.m - .rmsg_it - .with_label_values(&[&self.cid, &sid.to_string()]) - .inc(); - } - - pub(crate) fn rmsg_ib(&self, sid: Sid, bytes: u64) { - self.m - .rmsg_ib - .with_label_values(&[&self.cid, &sid.to_string()]) - .inc_by(bytes); - } - - pub(crate) fn rmsg_ot(&self, sid: Sid, reason: RemoveReason) { - self.m - .rmsg_ot - .with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()]) - .inc(); - } - - pub(crate) fn rmsg_ob(&self, sid: Sid, reason: RemoveReason, bytes: u64) { - self.m - .rmsg_ob - .with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()]) - .inc_by(bytes); - } - - pub(crate) fn rdata_frames_t(&self) { - self.m.rdata_frames_t.with_label_values(&[&self.cid]).inc(); - } - - pub(crate) fn rdata_frames_b(&self, bytes: u64) { - self.m - .rdata_frames_b - .with_label_values(&[&self.cid]) - .inc_by(bytes); + pub(crate) fn rdata_frames_b(&mut self, bytes: u64) { + self.rdata_frames_t.inc(); + self.rdata_frames_b.inc_by(bytes); } #[cfg(test)] - pub(crate) fn assert_msg(&self, sid: Sid, cnt: u64, reason: RemoveReason) { - assert_eq!( - self.m - .smsg_it - .with_label_values(&[&self.cid, &sid.to_string()]) - .get(), - cnt - ); - assert_eq!( - self.m - .smsg_ot - .with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()]) - .get(), - cnt - ); - assert_eq!( - self.m - .rmsg_it - .with_label_values(&[&self.cid, &sid.to_string()]) - .get(), - cnt - ); - assert_eq!( - self.m - .rmsg_ot - .with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()]) - .get(), - cnt - ); + pub(crate) fn assert_msg(&mut self, sid: Sid, cnt: u64, reason: RemoveReason) { + let line = self.init_sid(sid); + assert_eq!(line.smsg_it.get(), cnt); + assert_eq!(line.smsg_ot[reason.i()].get(), cnt); + assert_eq!(line.rmsg_it.get(), cnt); + assert_eq!(line.rmsg_ot[reason.i()].get(), cnt); } #[cfg(test)] - pub(crate) fn assert_msg_bytes(&self, sid: Sid, bytes: u64, reason: RemoveReason) { - assert_eq!( - self.m - .smsg_ib - .with_label_values(&[&self.cid, &sid.to_string()]) - .get(), - bytes - ); - assert_eq!( - self.m - .smsg_ob - .with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()]) - .get(), - bytes - ); - assert_eq!( - self.m - .rmsg_ib - .with_label_values(&[&self.cid, &sid.to_string()]) - .get(), - bytes - ); - assert_eq!( - self.m - .rmsg_ob - .with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()]) - .get(), - bytes - ); + pub(crate) fn assert_msg_bytes(&mut self, sid: Sid, bytes: u64, reason: RemoveReason) { + let line = self.init_sid(sid); + assert_eq!(line.smsg_ib.get(), bytes); + assert_eq!(line.smsg_ob[reason.i()].get(), bytes); + assert_eq!(line.rmsg_ib.get(), bytes); + assert_eq!(line.rmsg_ob[reason.i()].get(), bytes); } #[cfg(test)] - pub(crate) fn assert_data_frames(&self, cnt: u64) { - assert_eq!( - self.m.sdata_frames_t.with_label_values(&[&self.cid]).get(), - cnt - ); - assert_eq!( - self.m.rdata_frames_t.with_label_values(&[&self.cid]).get(), - cnt - ); + pub(crate) fn assert_data_frames(&mut self, cnt: u64) { + assert_eq!(self.sdata_frames_t.get(), cnt); + assert_eq!(self.rdata_frames_t.get(), cnt); } #[cfg(test)] - pub(crate) fn assert_data_frames_bytes(&self, bytes: u64) { - assert_eq!( - self.m.sdata_frames_b.with_label_values(&[&self.cid]).get(), - bytes - ); - assert_eq!( - self.m.rdata_frames_b.with_label_values(&[&self.cid]).get(), - bytes - ); + pub(crate) fn assert_data_frames_bytes(&mut self, bytes: u64) { + assert_eq!(self.sdata_frames_b.get(), bytes); + assert_eq!(self.rdata_frames_b.get(), bytes); } } @@ -378,29 +342,29 @@ impl std::fmt::Debug for ProtocolMetrics { #[cfg(not(feature = "metrics"))] impl ProtocolMetricCache { - pub(crate) fn smsg_it(&self, _sid: Sid) {} + pub(crate) fn smsg_it(&mut self, _sid: Sid) {} - pub(crate) fn smsg_ib(&self, _sid: Sid, _b: u64) {} + pub(crate) fn smsg_ib(&mut self, _sid: Sid, _b: u64) {} - pub(crate) fn smsg_ot(&self, _sid: Sid, _reason: RemoveReason) {} + pub(crate) fn smsg_ot(&mut self, _sid: Sid, _reason: RemoveReason) {} - pub(crate) fn smsg_ob(&self, _sid: Sid, _reason: RemoveReason, _b: u64) {} + pub(crate) fn smsg_ob(&mut self, _sid: Sid, _reason: RemoveReason, _b: u64) {} - pub(crate) fn sdata_frames_t(&self) {} + pub(crate) fn sdata_frames_t(&mut self) {} - pub(crate) fn sdata_frames_b(&self, _b: u64) {} + pub(crate) fn sdata_frames_b(&mut self, _b: u64) {} - pub(crate) fn rmsg_it(&self, _sid: Sid) {} + pub(crate) fn rmsg_it(&mut self, _sid: Sid) {} - pub(crate) fn rmsg_ib(&self, _sid: Sid, _b: u64) {} + pub(crate) fn rmsg_ib(&mut self, _sid: Sid, _b: u64) {} - pub(crate) fn rmsg_ot(&self, _sid: Sid, _reason: RemoveReason) {} + pub(crate) fn rmsg_ot(&mut self, _sid: Sid, _reason: RemoveReason) {} - pub(crate) fn rmsg_ob(&self, _sid: Sid, _reason: RemoveReason, _b: u64) {} + pub(crate) fn rmsg_ob(&mut self, _sid: Sid, _reason: RemoveReason, _b: u64) {} - pub(crate) fn rdata_frames_t(&self) {} + pub(crate) fn rdata_frames_t(&mut self) {} - pub(crate) fn rdata_frames_b(&self, _b: u64) {} + pub(crate) fn rdata_frames_b(&mut self, _b: u64) {} } impl RemoveReason { @@ -411,4 +375,12 @@ impl RemoveReason { RemoveReason::Finished => "Finished", } } + + #[cfg(feature = "metrics")] + fn i(&self) -> usize { + match self { + RemoveReason::Dropped => 0, + RemoveReason::Finished => 1, + } + } } diff --git a/network/protocol/src/mpsc.rs b/network/protocol/src/mpsc.rs index 3e9e5d55fe..0fbbee6300 100644 --- a/network/protocol/src/mpsc.rs +++ b/network/protocol/src/mpsc.rs @@ -9,7 +9,10 @@ use crate::{ }; use async_trait::async_trait; use std::time::{Duration, Instant}; +#[cfg(feature = "trace_pedantic")] +use tracing::trace; +#[derive(Debug)] pub /* should be private */ enum MpscMsg { Event(ProtocolEvent), InitFrame(InitFrame), @@ -59,7 +62,11 @@ impl SendProtocol for MpscSendProtcol where D: UnreliableDrain, { + fn notify_from_recv(&mut self, _event: ProtocolEvent) {} + async fn send(&mut self, event: ProtocolEvent) -> Result<(), ProtocolError> { + #[cfg(feature = "trace_pedantic")] + trace!(?event, "send"); match &event { ProtocolEvent::Message { buffer, @@ -68,10 +75,8 @@ where } => { let sid = *sid; let bytes = buffer.data.len() as u64; - self.metrics.smsg_it(sid); self.metrics.smsg_ib(sid, bytes); let r = self.drain.send(MpscMsg::Event(event)).await; - self.metrics.smsg_ot(sid, RemoveReason::Finished); self.metrics.smsg_ob(sid, RemoveReason::Finished, bytes); r }, @@ -88,7 +93,10 @@ where S: UnreliableSink, { async fn recv(&mut self) -> Result { - match self.sink.recv().await? { + let event = self.sink.recv().await?; + #[cfg(feature = "trace_pedantic")] + trace!(?event, "recv"); + match event { MpscMsg::Event(e) => { if let ProtocolEvent::Message { buffer, @@ -98,9 +106,7 @@ where { let sid = *sid; let bytes = buffer.data.len() as u64; - self.metrics.rmsg_it(sid); self.metrics.rmsg_ib(sid, bytes); - self.metrics.rmsg_ot(sid, RemoveReason::Finished); self.metrics.rmsg_ob(sid, RemoveReason::Finished, bytes); } Ok(e) diff --git a/network/protocol/src/prio.rs b/network/protocol/src/prio.rs index 35b7067352..2348028d15 100644 --- a/network/protocol/src/prio.rs +++ b/network/protocol/src/prio.rs @@ -4,14 +4,18 @@ use crate::{ metrics::{ProtocolMetricCache, RemoveReason}, types::{Bandwidth, Mid, Prio, Promises, Sid}, }; -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, + time::Duration, +}; #[derive(Debug)] struct StreamInfo { pub(crate) guaranteed_bandwidth: Bandwidth, pub(crate) prio: Prio, pub(crate) promises: Promises, - pub(crate) messages: Vec, + pub(crate) messages: VecDeque, } /// Responsible for queueing messages. @@ -47,7 +51,7 @@ impl PrioManager { guaranteed_bandwidth, prio, promises, - messages: vec![], + messages: VecDeque::new(), }); } @@ -68,7 +72,7 @@ impl PrioManager { .get_mut(&sid) .unwrap() .messages - .push(OutgoingMessage::new(buffer, mid, sid)); + .push_back(OutgoingMessage::new(buffer, mid, sid)); } /// bandwidth might be extended, as for technical reasons @@ -79,7 +83,7 @@ impl PrioManager { let mut frames = vec![]; let mut prios = [0u64; (Self::HIGHEST_PRIO + 1) as usize]; - let metrics = &self.metrics; + let metrics = &mut self.metrics; let mut process_stream = |stream: &mut StreamInfo, mut bandwidth: i64, cur_bytes: &mut u64| { @@ -103,9 +107,8 @@ impl PrioManager { //cleanup for i in finished.iter().rev() { - let msg = stream.messages.remove(*i); + let msg = stream.messages.remove(*i).unwrap(); let (sid, bytes) = msg.get_sid_len(); - metrics.smsg_ot(sid, RemoveReason::Finished); metrics.smsg_ob(sid, RemoveReason::Finished, bytes); } }; diff --git a/network/protocol/src/tcp.rs b/network/protocol/src/tcp.rs index e1c8e10e84..dd84d5b013 100644 --- a/network/protocol/src/tcp.rs +++ b/network/protocol/src/tcp.rs @@ -9,21 +9,25 @@ use crate::{ ProtocolError, RecvProtocol, SendProtocol, }; use async_trait::async_trait; +use bytes::BytesMut; use std::{ - collections::{HashMap, VecDeque}, + collections::HashMap, sync::Arc, time::{Duration, Instant}, }; use tracing::info; +#[cfg(feature = "trace_pedantic")] +use tracing::trace; #[derive(Debug)] pub struct TcpSendProtcol where - D: UnreliableDrain>, + D: UnreliableDrain, { - buffer: Vec, + buffer: BytesMut, store: PrioManager, closing_streams: Vec, + notify_closing_streams: Vec, pending_shutdown: bool, drain: D, last: Instant, @@ -33,9 +37,9 @@ where #[derive(Debug)] pub struct TcpRecvProtcol where - S: UnreliableSink>, + S: UnreliableSink, { - buffer: VecDeque, + buffer: BytesMut, incoming: HashMap, sink: S, metrics: ProtocolMetricCache, @@ -43,13 +47,14 @@ where impl TcpSendProtcol where - D: UnreliableDrain>, + D: UnreliableDrain, { pub fn new(drain: D, metrics: ProtocolMetricCache) -> Self { Self { - buffer: vec![0u8; 1500], + buffer: BytesMut::new(), store: PrioManager::new(metrics.clone()), closing_streams: vec![], + notify_closing_streams: vec![], pending_shutdown: false, drain, last: Instant::now(), @@ -60,11 +65,11 @@ where impl TcpRecvProtcol where - S: UnreliableSink>, + S: UnreliableSink, { pub fn new(sink: S, metrics: ProtocolMetricCache) -> Self { Self { - buffer: VecDeque::new(), + buffer: BytesMut::new(), incoming: HashMap::new(), sink, metrics, @@ -75,9 +80,9 @@ where #[async_trait] impl SendProtocol for TcpSendProtcol where - D: UnreliableDrain>, + D: UnreliableDrain, { - async fn send(&mut self, event: ProtocolEvent) -> Result<(), ProtocolError> { + fn notify_from_recv(&mut self, event: ProtocolEvent) { match event { ProtocolEvent::OpenStream { sid, @@ -87,31 +92,54 @@ where } => { self.store .open_stream(sid, prio, promises, guaranteed_bandwidth); - let frame = event.to_frame(); - let (s, _) = frame.to_bytes(&mut self.buffer); - self.drain.send(self.buffer[..s].to_vec()).await?; + }, + ProtocolEvent::CloseStream { sid } => { + if !self.store.try_close_stream(sid) { + #[cfg(feature = "trace_pedantic")] + trace!(?sid, "hold back notify close stream"); + self.notify_closing_streams.push(sid); + } + }, + _ => {}, + } + } + + async fn send(&mut self, event: ProtocolEvent) -> Result<(), ProtocolError> { + #[cfg(feature = "trace_pedantic")] + trace!(?event, "send"); + match event { + ProtocolEvent::OpenStream { + sid, + prio, + promises, + guaranteed_bandwidth, + } => { + self.store + .open_stream(sid, prio, promises, guaranteed_bandwidth); + event.to_frame().to_bytes(&mut self.buffer); + self.drain.send(self.buffer.split()).await?; }, ProtocolEvent::CloseStream { sid } => { if self.store.try_close_stream(sid) { - let frame = event.to_frame(); - let (s, _) = frame.to_bytes(&mut self.buffer); - self.drain.send(self.buffer[..s].to_vec()).await?; + event.to_frame().to_bytes(&mut self.buffer); + self.drain.send(self.buffer.split()).await?; } else { + #[cfg(feature = "trace_pedantic")] + trace!(?sid, "hold back close stream"); self.closing_streams.push(sid); } }, ProtocolEvent::Shutdown => { if self.store.is_empty() { - tracing::error!(?event, "send frame"); - let frame = event.to_frame(); - let (s, _) = frame.to_bytes(&mut self.buffer); - self.drain.send(self.buffer[..s].to_vec()).await?; + event.to_frame().to_bytes(&mut self.buffer); + self.drain.send(self.buffer.split()).await?; } else { + #[cfg(feature = "trace_pedantic")] + trace!("hold back shutdown"); self.pending_shutdown = true; } }, ProtocolEvent::Message { buffer, mid, sid } => { - self.metrics.smsg_it(sid); self.metrics.smsg_ib(sid, buffer.data.len() as u64); self.store.add(buffer, mid, sid); }, @@ -128,30 +156,43 @@ where data, } = &frame { - self.metrics.sdata_frames_t(); self.metrics.sdata_frames_b(data.len() as u64); } - let (s, _) = frame.to_bytes(&mut self.buffer); - self.drain.send(self.buffer[..s].to_vec()).await?; - tracing::warn!("send data frame, woop"); + frame.to_bytes(&mut self.buffer); + self.drain.send(self.buffer.split()).await?; } + let mut finished_streams = vec![]; - for (i, sid) in self.closing_streams.iter().enumerate() { - if self.store.try_close_stream(*sid) { - let frame = ProtocolEvent::CloseStream { sid: *sid }.to_frame(); - let (s, _) = frame.to_bytes(&mut self.buffer); - self.drain.send(self.buffer[..s].to_vec()).await?; + for (i, &sid) in self.closing_streams.iter().enumerate() { + if self.store.try_close_stream(sid) { + #[cfg(feature = "trace_pedantic")] + trace!(?sid, "close stream, as it's now empty"); + Frame::CloseStream { sid }.to_bytes(&mut self.buffer); + self.drain.send(self.buffer.split()).await?; finished_streams.push(i); } } for i in finished_streams.iter().rev() { self.closing_streams.remove(*i); } + + let mut finished_streams = vec![]; + for (i, sid) in self.notify_closing_streams.iter().enumerate() { + if self.store.try_close_stream(*sid) { + #[cfg(feature = "trace_pedantic")] + trace!(?sid, "close stream, as it's now empty"); + finished_streams.push(i); + } + } + for i in finished_streams.iter().rev() { + self.notify_closing_streams.remove(*i); + } + if self.pending_shutdown && self.store.is_empty() { - tracing::error!("send shutdown frame"); - let frame = ProtocolEvent::Shutdown {}.to_frame(); - let (s, _) = frame.to_bytes(&mut self.buffer); - self.drain.send(self.buffer[..s].to_vec()).await?; + #[cfg(feature = "trace_pedantic")] + trace!("shutdown, as it's now empty"); + Frame::Shutdown {}.to_bytes(&mut self.buffer); + self.drain.send(self.buffer.split()).await?; self.pending_shutdown = false; } Ok(()) @@ -173,14 +214,13 @@ struct IncomingMsg { #[async_trait] impl RecvProtocol for TcpRecvProtcol where - S: UnreliableSink>, + S: UnreliableSink, { async fn recv(&mut self) -> Result { - tracing::error!(?self.buffer, "enter loop"); 'outer: loop { - tracing::error!(?self.buffer, "continue loop"); while let Some(frame) = Frame::to_frame(&mut self.buffer) { - tracing::error!(?frame, "recv frame"); + #[cfg(feature = "trace_pedantic")] + trace!(?frame, "recv"); match frame { Frame::Shutdown => break 'outer Ok(ProtocolEvent::Shutdown), Frame::OpenStream { @@ -204,7 +244,6 @@ where length, data: MessageBuffer { data: vec![] }, }; - self.metrics.rmsg_it(sid); self.metrics.rmsg_ib(sid, length); self.incoming.insert(mid, m); }, @@ -213,12 +252,14 @@ where start: _, mut data, } => { - self.metrics.rdata_frames_t(); self.metrics.rdata_frames_b(data.len() as u64); let m = match self.incoming.get_mut(&mid) { Some(m) => m, None => { - info!("protocol violation by remote side: send Data before Header"); + info!( + ?mid, + "protocol violation by remote side: send Data before Header" + ); break 'outer Err(ProtocolError::Closed); }, }; @@ -227,7 +268,6 @@ where // finished, yay drop(m); let m = self.incoming.remove(&mid).unwrap(); - self.metrics.rmsg_ot(m.sid, RemoveReason::Finished); self.metrics.rmsg_ob( m.sid, RemoveReason::Finished, @@ -242,13 +282,8 @@ where }, }; } - tracing::error!(?self.buffer, "receiving on tcp sink"); let chunk = self.sink.recv().await?; - self.buffer.reserve(chunk.len()); - for b in chunk { - self.buffer.push_back(b); - } - tracing::error!(?self.buffer,"receiving on tcp sink done"); + self.buffer.extend_from_slice(&chunk); } } } @@ -256,12 +291,11 @@ where #[async_trait] impl ReliableDrain for TcpSendProtcol where - D: UnreliableDrain>, + D: UnreliableDrain, { async fn send(&mut self, frame: InitFrame) -> Result<(), ProtocolError> { - let mut buffer = vec![0u8; 1500]; - let s = frame.to_bytes(&mut buffer); - buffer.truncate(s); + let mut buffer = BytesMut::with_capacity(500); + frame.to_bytes(&mut buffer); self.drain.send(buffer).await } } @@ -269,22 +303,13 @@ where #[async_trait] impl ReliableSink for TcpRecvProtcol where - S: UnreliableSink>, + S: UnreliableSink, { async fn recv(&mut self) -> Result { while self.buffer.len() < 100 { let chunk = self.sink.recv().await?; - self.buffer.reserve(chunk.len()); - for b in chunk { - self.buffer.push_back(b); - } - let todo_use_bytes_instead = self.buffer.iter().map(|b| *b).collect(); - if let Some(frame) = InitFrame::to_frame(todo_use_bytes_instead) { - match frame { - InitFrame::Handshake { .. } => self.buffer.drain(.. InitFrame::HANDSHAKE_CNS + 1), - InitFrame::Init { .. } => self.buffer.drain(.. InitFrame::INIT_CNS + 1), - InitFrame::Raw { .. } => self.buffer.drain(.. InitFrame::RAW_CNS + 1), - }; + self.buffer.extend_from_slice(&chunk); + if let Some(frame) = InitFrame::to_frame(&mut self.buffer) { return Ok(frame); } } @@ -303,11 +328,11 @@ mod test_utils { use async_channel::*; pub struct TcpDrain { - pub sender: Sender>, + pub sender: Sender, } pub struct TcpSink { - pub receiver: Receiver>, + pub receiver: Receiver, } /// emulate Tcp protocol on Channels @@ -334,7 +359,7 @@ mod test_utils { #[async_trait] impl UnreliableDrain for TcpDrain { - type DataFormat = Vec; + type DataFormat = BytesMut; async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> { self.sender @@ -346,7 +371,7 @@ mod test_utils { #[async_trait] impl UnreliableSink for TcpSink { - type DataFormat = Vec; + type DataFormat = BytesMut; async fn recv(&mut self) -> Result { self.receiver @@ -365,6 +390,7 @@ mod tests { types::{Pid, Promises, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2}, InitProtocol, MessageBuffer, ProtocolEvent, RecvProtocol, SendProtocol, }; + use bytes::BytesMut; use std::{sync::Arc, time::Duration}; #[tokio::test] @@ -431,7 +457,7 @@ mod tests { #[tokio::test] async fn send_long_msg() { - let metrics = + let mut metrics = ProtocolMetricCache::new("long_tcp", Arc::new(ProtocolMetrics::new().unwrap())); let sid = Sid::new(1); let [p1, p2] = tcp_bound(10000, Some(metrics.clone())); @@ -538,39 +564,36 @@ mod tests { const DATA1: &[u8; 69] = b"We need to make sure that its okay to send OPEN_STREAM and DATA_HEAD "; const DATA2: &[u8; 95] = b"in one chunk and (DATA and CLOSE_STREAM) in the second chunk. and then keep the connection open"; - let mut buf = vec![0u8; 1500]; - let event = ProtocolEvent::OpenStream { + let mut bytes = BytesMut::with_capacity(1500); + use crate::frame::Frame; + Frame::OpenStream { sid, prio: 5u8, promises: Promises::COMPRESSED, - guaranteed_bandwidth: 0, - }; - let (i, _) = event.to_frame().to_bytes(&mut buf); - let (i2, _) = crate::frame::Frame::DataHeader { + } + .to_bytes(&mut bytes); + Frame::DataHeader { mid: 99, sid, length: (DATA1.len() + DATA2.len()) as u64, } - .to_bytes(&mut buf[i..]); - buf.truncate(i + i2); - s.send(buf).await.unwrap(); + .to_bytes(&mut bytes); + s.send(bytes.split()).await.unwrap(); - let mut buf = vec![0u8; 1500]; - let (i, _) = crate::frame::Frame::Data { + Frame::Data { mid: 99, start: 0, data: DATA1.to_vec(), } - .to_bytes(&mut buf); - let (i2, _) = crate::frame::Frame::Data { + .to_bytes(&mut bytes); + Frame::Data { mid: 99, start: DATA1.len() as u64, data: DATA2.to_vec(), } - .to_bytes(&mut buf[i..]); - let (i3, _) = crate::frame::Frame::CloseStream { sid }.to_bytes(&mut buf[i + i2..]); - buf.truncate(i + i2 + i3); - s.send(buf).await.unwrap(); + .to_bytes(&mut bytes); + Frame::CloseStream { sid }.to_bytes(&mut bytes); + s.send(bytes.split()).await.unwrap(); let e = r.recv().await.unwrap(); assert!(matches!(e, ProtocolEvent::OpenStream { .. })); @@ -581,4 +604,58 @@ mod tests { let e = r.recv().await.unwrap(); assert!(matches!(e, ProtocolEvent::CloseStream { .. })); } + + #[tokio::test] + #[should_panic] + async fn send_on_stream_from_remote_without_notify() { + //remote opens stream + //we send on it + let [mut p1, mut p2] = tcp_bound(10, None); + let event = ProtocolEvent::OpenStream { + sid: Sid::new(10), + prio: 3u8, + promises: Promises::ORDERED, + guaranteed_bandwidth: 1_000_000, + }; + p1.0.send(event).await.unwrap(); + let _ = p2.1.recv().await.unwrap(); + let event = ProtocolEvent::Message { + sid: Sid::new(10), + mid: 0, + buffer: Arc::new(MessageBuffer { + data: vec![188u8; 600], + }), + }; + p2.0.send(event.clone()).await.unwrap(); + p2.0.flush(1_000_000, Duration::from_secs(1)).await.unwrap(); + let e = p1.1.recv().await.unwrap(); + assert_eq!(event, e); + } + + #[tokio::test] + async fn send_on_stream_from_remote() { + //remote opens stream + //we send on it + let [mut p1, mut p2] = tcp_bound(10, None); + let event = ProtocolEvent::OpenStream { + sid: Sid::new(10), + prio: 3u8, + promises: Promises::ORDERED, + guaranteed_bandwidth: 1_000_000, + }; + p1.0.send(event).await.unwrap(); + let e = p2.1.recv().await.unwrap(); + p2.0.notify_from_recv(e); + let event = ProtocolEvent::Message { + sid: Sid::new(10), + mid: 0, + buffer: Arc::new(MessageBuffer { + data: vec![188u8; 600], + }), + }; + p2.0.send(event.clone()).await.unwrap(); + p2.0.flush(1_000_000, Duration::from_secs(1)).await.unwrap(); + let e = p1.1.recv().await.unwrap(); + assert_eq!(event, e); + } } diff --git a/network/protocol/src/types.rs b/network/protocol/src/types.rs index b6f63ca208..ba9348a16d 100644 --- a/network/protocol/src/types.rs +++ b/network/protocol/src/types.rs @@ -1,4 +1,5 @@ use bitflags::bitflags; +use bytes::{Buf, BufMut, BytesMut}; use rand::Rng; pub type Mid = u64; @@ -88,25 +89,19 @@ impl Pid { } } - pub(crate) fn to_le_bytes(&self) -> [u8; 16] { self.internal.to_le_bytes() } - - pub(crate) fn from_le_bytes(bytes: [u8; 16]) -> Self { + pub(crate) fn from_bytes(bytes: &mut BytesMut) -> Self { Self { - internal: u128::from_le_bytes(bytes), + internal: bytes.get_u128_le(), } } + + pub(crate) fn to_bytes(&self, bytes: &mut BytesMut) { bytes.put_u128_le(self.internal) } } impl Sid { pub const fn new(internal: u64) -> Self { Self { internal } } pub(crate) fn to_le_bytes(&self) -> [u8; 8] { self.internal.to_le_bytes() } - - pub(crate) fn from_le_bytes(bytes: [u8; 8]) -> Self { - Self { - internal: u64::from_le_bytes(bytes), - } - } } impl std::fmt::Debug for Pid { diff --git a/network/src/api.rs b/network/src/api.rs index 08274c90be..bd06f1e472 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -9,7 +9,7 @@ use crate::{ }; #[cfg(feature = "compression")] use lz_fear::raw::DecodeError; -use network_protocol::{Bandwidth, MessageBuffer, Mid, Pid, Prio, Promises, Sid}; +use network_protocol::{Bandwidth, MessageBuffer, Pid, Prio, Promises, Sid}; #[cfg(feature = "metrics")] use prometheus::Registry; use serde::{de::DeserializeOwned, Serialize}; @@ -28,7 +28,6 @@ use tokio::{ sync::{mpsc, oneshot, Mutex}, }; use tracing::*; -use tracing_futures::Instrument; type A2sDisconnect = Arc>>>; @@ -70,9 +69,9 @@ pub struct Participant { /// [`opened`]: Participant::opened #[derive(Debug)] pub struct Stream { - pid: Pid, + local_pid: Pid, + remote_pid: Pid, sid: Sid, - mid: Mid, prio: Prio, promises: Promises, guaranteed_bandwidth: Bandwidth, @@ -239,7 +238,8 @@ impl Network { #[cfg(feature = "metrics")] registry: Option<&Registry>, ) -> Self { let p = participant_id; - debug!(?p, "Starting Network"); + let span = tracing::info_span!("network", ?p); + span.in_scope(|| trace!("Starting Network")); let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) = Scheduler::new( participant_id, @@ -247,14 +247,14 @@ impl Network { #[cfg(feature = "metrics")] registry, ); - runtime.spawn(async move { - trace!(?p, "Starting scheduler in own thread"); - scheduler - .run() - .instrument(tracing::info_span!("scheduler", ?p)) - .await; - trace!(?p, "Stopping scheduler and his own thread"); - }); + runtime.spawn( + async move { + trace!("Starting scheduler in own thread"); + scheduler.run().await; + trace!("Stopping scheduler and his own thread"); + } + .instrument(tracing::info_span!("network", ?p)), + ); Self { local_pid: participant_id, runtime, @@ -295,6 +295,7 @@ impl Network { /// ``` /// /// [`connected`]: Network::connected + #[instrument(name="network", skip(self, address), fields(p = %self.local_pid))] pub async fn listen(&self, address: ProtocolAddr) -> Result<(), NetworkError> { let (s2a_result_s, s2a_result_r) = oneshot::channel::>(); debug!(?address, "listening on address"); @@ -350,6 +351,7 @@ impl Network { /// /// [`Streams`]: crate::api::Stream /// [`ProtocolAddres`]: crate::api::ProtocolAddr + #[instrument(name="network", skip(self, address), fields(p = %self.local_pid))] pub async fn connect(&self, address: ProtocolAddr) -> Result { let (pid_sender, pid_receiver) = oneshot::channel::>(); debug!(?address, "Connect to address"); @@ -361,15 +363,12 @@ impl Network { Ok(p) => p, Err(e) => return Err(NetworkError::ConnectFailed(e)), }; - let pid = participant.remote_pid; - debug!( - ?pid, - "Received Participant id from remote and return to user" - ); + let remote_pid = participant.remote_pid; + trace!(?remote_pid, "connected"); self.participant_disconnect_sender .lock() .await - .insert(pid, Arc::clone(&participant.a2s_disconnect_s)); + .insert(remote_pid, Arc::clone(&participant.a2s_disconnect_s)); Ok(participant) } @@ -406,6 +405,7 @@ impl Network { /// /// [`Streams`]: crate::api::Stream /// [`listen`]: crate::api::Network::listen + #[instrument(name="network", skip(self), fields(p = %self.local_pid))] pub async fn connected(&self) -> Result { let participant = self.connected_receiver.lock().await.recv().await?; self.participant_disconnect_sender.lock().await.insert( @@ -475,6 +475,7 @@ impl Participant { /// ``` /// /// [`Streams`]: crate::api::Stream + #[instrument(name="network", skip(self, prio, promises), fields(p = %self.local_pid))] pub async fn open(&self, prio: u8, promises: Promises) -> Result { let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel::(); if let Err(e) = self.a2b_open_stream_s.lock().await.send(( @@ -489,11 +490,11 @@ impl Participant { match p2a_return_stream_r.await { Ok(stream) => { let sid = stream.sid; - debug!(?sid, ?self.remote_pid, "opened stream"); + trace!(?sid, "opened stream"); Ok(stream) }, Err(_) => { - debug!(?self.remote_pid, "p2a_return_stream_r failed, closing participant"); + debug!("p2a_return_stream_r failed, closing participant"); Err(ParticipantError::ParticipantDisconnected) }, } @@ -532,15 +533,16 @@ impl Participant { /// [`Streams`]: crate::api::Stream /// [`connected`]: Network::connected /// [`open`]: Participant::open + #[instrument(name="network", skip(self), fields(p = %self.local_pid))] pub async fn opened(&self) -> Result { match self.b2a_stream_opened_r.lock().await.recv().await { Some(stream) => { let sid = stream.sid; - debug!(?sid, ?self.remote_pid, "Receive opened stream"); + debug!(?sid, "Receive opened stream"); Ok(stream) }, None => { - debug!(?self.remote_pid, "stream_opened_receiver failed, closing participant"); + debug!("stream_opened_receiver failed, closing participant"); Err(ParticipantError::ParticipantDisconnected) }, } @@ -589,10 +591,10 @@ impl Participant { /// ``` /// /// [`Streams`]: crate::api::Stream + #[instrument(name="network", skip(self), fields(p = %self.local_pid))] pub async fn disconnect(self) -> Result<(), ParticipantError> { // Remove, Close and try_unwrap error when unwrap fails! - let pid = self.remote_pid; - debug!(?pid, "Closing participant from network"); + debug!("Closing participant from network"); //Streams will be closed by BParticipant match self.a2s_disconnect_s.lock().await.take() { @@ -601,14 +603,14 @@ impl Participant { // Participant is connecting to Scheduler here, not as usual // Participant<->BParticipant a2s_disconnect_s - .send((pid, (Duration::from_secs(120), finished_sender))) + .send((self.remote_pid, (Duration::from_secs(120), finished_sender))) .expect("Something is wrong in internal scheduler coding"); match finished_receiver.await { Ok(res) => { match res { - Ok(()) => trace!(?pid, "Participant is now closed"), + Ok(()) => trace!("Participant is now closed"), Err(ref e) => { - trace!(?pid, ?e, "Error occurred during shutdown of participant") + trace!(?e, "Error occurred during shutdown of participant") }, }; res @@ -616,7 +618,6 @@ impl Participant { Err(e) => { //this is a bug. but as i am Participant i can't destroy the network error!( - ?pid, ?e, "Failed to get a message back from the scheduler, seems like the \ network is already closed" @@ -642,7 +643,8 @@ impl Participant { impl Stream { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - pid: Pid, + local_pid: Pid, + remote_pid: Pid, sid: Sid, prio: Prio, promises: Promises, @@ -653,9 +655,9 @@ impl Stream { a2b_close_stream_s: mpsc::UnboundedSender, ) -> Self { Self { - pid, + local_pid, + remote_pid, sid, - mid: 0, prio, promises, guaranteed_bandwidth, @@ -779,7 +781,6 @@ impl Stream { message.verify(&self); self.a2b_msg_s .send((self.sid, Arc::clone(&message.buffer)))?; - self.mid += 1; Ok(()) } @@ -942,13 +943,10 @@ impl core::cmp::PartialEq for Participant { } impl Drop for Network { + #[instrument(name="network", skip(self), fields(p = %self.local_pid))] fn drop(&mut self) { - let pid = self.local_pid; - debug!(?pid, "Shutting down Network"); - trace!( - ?pid, - "Shutting down Participants of Network, while we still have metrics" - ); + debug!("Shutting down Network"); + trace!("Shutting down Participants of Network, while we still have metrics"); let mut finished_receiver_list = vec![]; if tokio::runtime::Handle::try_current().is_ok() { @@ -991,25 +989,25 @@ impl Drop for Network { } }); }); - trace!(?pid, "Participants have shut down!"); - trace!(?pid, "Shutting down Scheduler"); + trace!("Participants have shut down!"); + trace!("Shutting down Scheduler"); self.shutdown_sender .take() .unwrap() .send(()) .expect("Scheduler is closed, but nobody other should be able to close it"); - debug!(?pid, "Network has shut down"); + debug!("Network has shut down"); } } impl Drop for Participant { + #[instrument(name="remote", skip(self), fields(p = %self.remote_pid))] + #[instrument(name="network", skip(self), fields(p = %self.local_pid))] fn drop(&mut self) { use tokio::sync::oneshot::error::TryRecvError; - // ignore closed, as we need to send it even though we disconnected the // participant from network - let pid = self.remote_pid; - debug!(?pid, "Shutting down Participant"); + debug!("Shutting down Participant"); match self .a2s_disconnect_s @@ -1017,25 +1015,27 @@ impl Drop for Participant { .expect("Participant in use while beeing dropped") .take() { - None => trace!( - ?pid, - "Participant has been shutdown cleanly, no further waiting is required!" - ), + None => info!("Participant already has been shutdown gracefully"), Some(a2s_disconnect_s) => { - debug!(?pid, "Disconnect from Scheduler"); + debug!("Disconnect from Scheduler"); let (finished_sender, mut finished_receiver) = oneshot::channel(); a2s_disconnect_s .send((self.remote_pid, (Duration::from_secs(120), finished_sender))) .expect("Something is wrong in internal scheduler coding"); loop { match finished_receiver.try_recv() { - Ok(Ok(())) => break, - Ok(Err(e)) => error!( - ?pid, - ?e, - "Error while dropping the participant, couldn't send all outgoing \ - messages, dropping remaining" - ), + Ok(Ok(())) => { + info!("Participant dropped gracefully"); + break; + }, + Ok(Err(e)) => { + error!( + ?e, + "Error while dropping the participant, couldn't send all outgoing \ + messages, dropping remaining" + ); + break; + }, Err(TryRecvError::Closed) => { panic!("Something is wrong in internal scheduler/participant coding") }, @@ -1047,17 +1047,17 @@ impl Drop for Participant { } }, } - debug!(?pid, "Participant dropped"); } } impl Drop for Stream { + #[instrument(name="remote", skip(self), fields(p = %self.remote_pid))] + #[instrument(name="network", skip(self), fields(p = %self.local_pid))] fn drop(&mut self) { // send if closed is unnecessary but doesn't hurt, we must not crash if !self.send_closed.load(Ordering::Relaxed) { let sid = self.sid; - let pid = self.pid; - debug!(?pid, ?sid, "Shutting down Stream"); + debug!(?sid, "Shutting down Stream"); if let Err(e) = self.a2b_close_stream_s.take().unwrap().send(self.sid) { debug!( ?e, @@ -1066,8 +1066,7 @@ impl Drop for Stream { } } else { let sid = self.sid; - let pid = self.pid; - trace!(?pid, ?sid, "Stream Drop not needed"); + trace!(?sid, "Stream Drop not needed"); } } } diff --git a/network/src/channel.rs b/network/src/channel.rs index 654175fb1d..9b6472268e 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use bytes::BytesMut; use network_protocol::{ InitProtocolError, MpscMsg, MpscRecvProtcol, MpscSendProtcol, Pid, ProtocolError, ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtcol, TcpSendProtcol, @@ -42,7 +43,13 @@ impl Protocols { let metrics = ProtocolMetricCache {}; let sp = TcpSendProtcol::new(TcpDrain { half: w }, metrics.clone()); - let rp = TcpRecvProtcol::new(TcpSink { half: r }, metrics.clone()); + let rp = TcpRecvProtcol::new( + TcpSink { + half: r, + buffer: BytesMut::new(), + }, + metrics.clone(), + ); Protocols::Tcp((sp, rp)) } @@ -86,6 +93,13 @@ impl network_protocol::InitProtocol for Protocols { #[async_trait] impl network_protocol::SendProtocol for SendProtocols { + fn notify_from_recv(&mut self, event: ProtocolEvent) { + match self { + SendProtocols::Tcp(s) => s.notify_from_recv(event), + SendProtocols::Mpsc(s) => s.notify_from_recv(event), + } + } + async fn send(&mut self, event: ProtocolEvent) -> Result<(), ProtocolError> { match self { SendProtocols::Tcp(s) => s.send(event).await, @@ -121,14 +135,14 @@ pub struct TcpDrain { #[derive(Debug)] pub struct TcpSink { half: OwnedReadHalf, + buffer: BytesMut, } #[async_trait] impl UnreliableDrain for TcpDrain { - type DataFormat = Vec; + type DataFormat = BytesMut; async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> { - //self.half.recv match self.half.write_all(&data).await { Ok(()) => Ok(()), Err(_) => Err(ProtocolError::Closed), @@ -138,15 +152,12 @@ impl UnreliableDrain for TcpDrain { #[async_trait] impl UnreliableSink for TcpSink { - type DataFormat = Vec; + type DataFormat = BytesMut; async fn recv(&mut self) -> Result { - let mut data = vec![0u8; 1500]; - match self.half.read(&mut data).await { - Ok(n) => { - data.truncate(n); - Ok(data) - }, + self.buffer.resize(1500, 0u8); + match self.half.read(&mut self.buffer).await { + Ok(n) => Ok(self.buffer.split_to(n)), Err(_) => Err(ProtocolError::Closed), } } diff --git a/network/src/message.rs b/network/src/message.rs index 0ad24c63ad..1969854a7a 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -200,6 +200,7 @@ mod tests { Stream::new( Pid::fake(0), + Pid::fake(1), Sid::new(0), 0u8, promises, diff --git a/network/src/participant.rs b/network/src/participant.rs index a942632f7b..7d07a5c5ac 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -60,6 +60,7 @@ struct ShutdownInfo { #[derive(Debug)] pub struct BParticipant { + local_pid: Pid, //tracing remote_pid: Pid, remote_pid_string: String, //optimisation offset_sid: Sid, @@ -82,6 +83,7 @@ impl BParticipant { #[allow(clippy::type_complexity)] pub(crate) fn new( + local_pid: Pid, remote_pid: Pid, offset_sid: Sid, #[cfg(feature = "metrics")] metrics: Arc, @@ -106,6 +108,7 @@ impl BParticipant { ( Self { + local_pid, remote_pid, remote_pid_string: remote_pid.to_string(), offset_sid, @@ -135,6 +138,8 @@ impl BParticipant { async_channel::unbounded::(); let (b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_r) = async_channel::unbounded::(); + let (b2b_notify_send_of_recv_s, b2b_notify_send_of_recv_r) = + mpsc::unbounded_channel::(); let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded_channel::(); const STREAM_BOUND: usize = 10_000; @@ -142,6 +147,7 @@ impl BParticipant { crossbeam_channel::bounded::<(Sid, Arc)>(STREAM_BOUND); let run_channels = self.run_channels.take().unwrap(); + trace!("start all managers"); tokio::join!( self.send_mgr( run_channels.a2b_open_stream_r, @@ -149,18 +155,22 @@ impl BParticipant { a2b_msg_r, b2b_add_send_protocol_r, b2b_close_send_protocol_r, + b2b_notify_send_of_recv_r, b2s_prio_statistic_s, a2b_msg_s.clone(), //self a2b_close_stream_s.clone(), //self - ), + ) + .instrument(tracing::info_span!("send")), self.recv_mgr( run_channels.b2a_stream_opened_s, b2b_add_recv_protocol_r, b2b_force_close_recv_protocol_r, b2b_close_send_protocol_s.clone(), + b2b_notify_send_of_recv_s, a2b_msg_s.clone(), //self a2b_close_stream_s.clone(), //self - ), + ) + .instrument(tracing::info_span!("recv")), self.create_channel_mgr( run_channels.s2b_create_channel_r, b2b_add_send_protocol_s, @@ -182,6 +192,7 @@ impl BParticipant { a2b_msg_r: crossbeam_channel::Receiver<(Sid, Arc)>, mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, SendProtocols)>, b2b_close_send_protocol_r: async_channel::Receiver, + mut b2b_notify_send_of_recv_r: mpsc::UnboundedReceiver, _b2s_prio_statistic_s: mpsc::UnboundedSender, a2b_msg_s: crossbeam_channel::Sender<(Sid, Arc)>, a2b_close_stream_s: mpsc::UnboundedSender, @@ -189,27 +200,29 @@ impl BParticipant { let mut send_protocols: HashMap = HashMap::new(); let mut interval = tokio::time::interval(Self::TICK_TIME); let mut stream_ids = self.offset_sid; - trace!("workaround, activly wait for first protocol"); + let mut fake_mid = 0; //TODO: move MID to protocol, should be inc per stream ? or ? + trace!("workaround, actively wait for first protocol"); b2b_add_protocol_r .recv() .await .map(|(c, p)| send_protocols.insert(c, p)); - trace!("Start send_mgr"); loop { - let (open, close, _, addp, remp) = select!( - next = a2b_open_stream_r.recv().fuse() => (Some(next), None, None, None, None), - next = a2b_close_stream_r.recv().fuse() => (None, Some(next), None, None, None), - _ = interval.tick() => (None, None, Some(()), None, None), - next = b2b_add_protocol_r.recv().fuse() => (None, None, None, Some(next), None), - next = b2b_close_send_protocol_r.recv().fuse() => (None, None, None, None, Some(next)), + let (open, close, r_event, _, addp, remp) = select!( + n = a2b_open_stream_r.recv().fuse() => (Some(n), None, None, None, None, None), + n = a2b_close_stream_r.recv().fuse() => (None, Some(n), None, None, None, None), + n = b2b_notify_send_of_recv_r.recv().fuse() => (None, None, Some(n), None, None, None), + _ = interval.tick() => (None, None, None, Some(()), None, None), + n = b2b_add_protocol_r.recv().fuse() => (None, None, None, None, Some(n), None), + n = b2b_close_send_protocol_r.recv().fuse() => (None, None, None, None, None, Some(n)), ); - trace!(?open, ?close, ?addp, ?remp, "foobar"); - - addp.flatten().map(|(c, p)| send_protocols.insert(c, p)); + addp.flatten().map(|(cid, p)| { + debug!(?cid, "add protocol"); + send_protocols.insert(cid, p) + }); match remp { Some(Ok(cid)) => { - trace!(?cid, "remove send protocol"); + debug!(?cid, "remove protocol"); match send_protocols.remove(&cid) { Some(mut prot) => { trace!("blocking flush"); @@ -230,15 +243,19 @@ impl BParticipant { let active = match send_protocols.get_mut(&cid) { Some(a) => a, None => { - warn!("no channel arrg"); + warn!("no channel"); continue; }, }; let active_err = async { + if let Some(Some(event)) = r_event { + active.notify_from_recv(event); + } + if let Some(Some((prio, promises, guaranteed_bandwidth, return_s))) = open { - trace!(?stream_ids, "openuing some new stream"); let sid = stream_ids; + trace!(?sid, "open stream"); stream_ids += Sid::from(1); let stream = self .create_stream( @@ -264,25 +281,24 @@ impl BParticipant { // get all messages and assign it to a channel for (sid, buffer) in a2b_msg_r.try_iter() { - warn!(?sid, "sending!"); + fake_mid += 1; active .send(ProtocolEvent::Message { buffer, - mid: 0u64, + mid: fake_mid, sid, }) .await? } if let Some(Some(sid)) = close { - warn!(?sid, "delete_stream!"); + trace!(?stream_ids, "delete stream"); self.delete_stream(sid).await; // Fire&Forget the protocol will take care to verify that this Frame is delayed // till the last msg was received! active.send(ProtocolEvent::CloseStream { sid }).await?; } - warn!("flush!"); active .flush(1_000_000, Duration::from_secs(1) /* TODO */) .await?; //this actually blocks, so we cant set streams whilte it. @@ -291,7 +307,7 @@ impl BParticipant { } .await; if let Err(e) = active_err { - info!(?cid, ?e, "send protocol failed, shutting down channel"); + info!(?cid, ?e, "protocol failed, shutting down channel"); // remote recv will now fail, which will trigger remote send which will trigger // recv send_protocols.remove(&cid).unwrap(); @@ -308,6 +324,7 @@ impl BParticipant { mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, RecvProtocols)>, b2b_force_close_recv_protocol_r: async_channel::Receiver, b2b_close_send_protocol_s: async_channel::Sender, + b2b_notify_send_of_recv_s: mpsc::UnboundedSender, a2b_msg_s: crossbeam_channel::Sender<(Sid, Arc)>, a2b_close_stream_s: mpsc::UnboundedSender, ) { @@ -327,13 +344,15 @@ impl BParticipant { let remove_c = |recv_protocols: &mut HashMap>, cid: &Cid| { match recv_protocols.remove(&cid) { - Some(h) => h.abort(), + Some(h) => { + h.abort(); + debug!(?cid, "remove protocol"); + }, None => trace!("tried to remove protocol twice"), }; recv_protocols.is_empty() }; - trace!("Start recv_mgr"); loop { let (event, addp, remp) = select!( next = hacky_recv_r.recv().fuse() => (Some(next), None, None), @@ -342,6 +361,7 @@ impl BParticipant { ); addp.map(|(cid, p)| { + debug!(?cid, "add protocol"); retrigger(cid, p, &mut recv_protocols); }); if let Some(Ok(cid)) = remp { @@ -351,7 +371,6 @@ impl BParticipant { } }; - warn!(?event, "recv event!"); if let Some(Some((cid, r, p))) = event { match r { Ok(ProtocolEvent::OpenStream { @@ -361,6 +380,7 @@ impl BParticipant { guaranteed_bandwidth, }) => { trace!(?sid, "open stream"); + let _ = b2b_notify_send_of_recv_s.send(r.unwrap()); let stream = self .create_stream( sid, @@ -376,6 +396,7 @@ impl BParticipant { }, Ok(ProtocolEvent::CloseStream { sid }) => { trace!(?sid, "close stream"); + let _ = b2b_notify_send_of_recv_s.send(r.unwrap()); self.delete_stream(sid).await; retrigger(cid, p, &mut recv_protocols); }, @@ -410,7 +431,7 @@ impl BParticipant { } }, Err(e) => { - info!(?cid, ?e, "recv protocol failed, shutting down channel"); + info!(?e, ?cid, "protocol failed, shutting down channel"); if let Err(e) = b2b_close_send_protocol_s.send(cid).await { debug!(?e, ?cid, "send_mgr was already closed simultaneously"); } @@ -433,7 +454,6 @@ impl BParticipant { b2b_add_send_protocol_s: mpsc::UnboundedSender<(Cid, SendProtocols)>, b2b_add_recv_protocol_s: mpsc::UnboundedSender<(Cid, RecvProtocols)>, ) { - trace!("Start create_channel_mgr"); let s2b_create_channel_r = UnboundedReceiverStream::new(s2b_create_channel_r); s2b_create_channel_r .for_each_concurrent(None, |(cid, _, protocol, b2s_create_channel_done_s)| { @@ -524,12 +544,8 @@ impl BParticipant { } } }; - - trace!("Start participant_shutdown_mgr"); let (timeout_time, sender) = s2b_shutdown_bparticipant_r.await.unwrap(); - debug!("participant_shutdown_mgr triggered"); - - debug!("Closing all streams for send"); + debug!("participant_shutdown_mgr triggered. Closing all streams for send"); { let lock = self.streams.read().await; for si in lock.values() { @@ -632,6 +648,7 @@ impl BParticipant { .with_label_values(&[&self.remote_pid_string]) .inc(); Stream::new( + self.local_pid, self.remote_pid, sid, prio, @@ -676,11 +693,12 @@ mod tests { s2b_create_channel_s, s2b_shutdown_bparticipant_s, ) = runtime_clone.block_on(async move { - let pid = Pid::fake(1); + let local_pid = Pid::fake(0); + let remote_pid = Pid::fake(1); let sid = Sid::new(1000); - let metrics = Arc::new(NetworkMetrics::new(&pid).unwrap()); + let metrics = Arc::new(NetworkMetrics::new(&local_pid).unwrap()); - BParticipant::new(pid, sid, Arc::clone(&metrics)) + BParticipant::new(local_pid, remote_pid, sid, Arc::clone(&metrics)) }); let handle = runtime_clone.spawn(bparticipant.run(b2s_prio_statistic_s)); diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index eb6d21bd7e..a1f31fc384 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -6,7 +6,7 @@ use crate::{ participant::{B2sPrioStatistic, BParticipant, S2bCreateChannel, S2bShutdownBparticipant}, }; use futures_util::{FutureExt, StreamExt}; -use network_protocol::Pid; +use network_protocol::{MpscMsg, Pid}; #[cfg(feature = "metrics")] use prometheus::Registry; use rand::Rng; @@ -26,16 +26,21 @@ use tokio::{ }; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; -use tracing_futures::Instrument; -/// Naming of Channels `x2x` -/// - a: api -/// - s: scheduler -/// - b: bparticipant -/// - p: prios -/// - r: protocol -/// - w: wire -/// - c: channel/handshake +// Naming of Channels `x2x` +// - a: api +// - s: scheduler +// - b: bparticipant +// - p: prios +// - r: protocol +// - w: wire +// - c: channel/handshake + +lazy_static::lazy_static! { + static ref MPSC_POOL: Mutex, oneshot::Sender>)>>> = { + Mutex::new(HashMap::new()) + }; +} #[derive(Debug)] struct ParticipantInfo { @@ -80,6 +85,8 @@ pub struct Scheduler { } impl Scheduler { + const MPSC_CHANNEL_BOUND: usize = 1000; + pub fn new( local_pid: Pid, runtime: Arc, @@ -215,7 +222,35 @@ impl Scheduler { }; info!("Connecting Tcp to: {}", stream.peer_addr().unwrap()); (Protocols::new_tcp(stream), false) - }, /* */ + }, + ProtocolAddr::Mpsc(addr) => { + let mpsc_s = match MPSC_POOL.lock().await.get(&addr) { + Some(s) => s.clone(), + None => { + pid_sender + .send(Err(std::io::Error::new( + std::io::ErrorKind::NotConnected, + "no mpsc listen on this addr", + ))) + .unwrap(); + continue; + }, + }; + let (remote_to_local_s, remote_to_local_r) = + mpsc::channel(Self::MPSC_CHANNEL_BOUND); + let (local_to_remote_oneshot_s, local_to_remote_oneshot_r) = oneshot::channel(); + mpsc_s + .send((remote_to_local_s, local_to_remote_oneshot_s)) + .unwrap(); + let local_to_remote_s = local_to_remote_oneshot_r.await.unwrap(); + + info!(?addr, "Connecting Mpsc"); + ( + Protocols::new_mpsc(local_to_remote_s, remote_to_local_r), + false, + ) + }, + /* */ //ProtocolAddr::Udp(addr) => { //#[cfg(feature = "metrics")] //self.metrics @@ -367,7 +402,7 @@ impl Scheduler { info!( ?addr, ?e, - "Listener couldn't be started due to error on tcp bind" + "Tcp bind error durin listener startup" ); s2a_listen_result_s.send(Err(e)).unwrap(); return; @@ -390,6 +425,25 @@ impl Scheduler { self.init_protocol(Protocols::new_tcp(stream), None, true) .await; } + }, + ProtocolAddr::Mpsc(addr) => { + let (mpsc_s, mut mpsc_r) = mpsc::unbounded_channel(); + MPSC_POOL.lock().await.insert(addr, mpsc_s); + s2a_listen_result_s.send(Ok(())).unwrap(); + trace!(?addr, "Listener bound"); + + let mut end_receiver = s2s_stop_listening_r.fuse(); + while let Some((local_to_remote_s, local_remote_to_local_s)) = select! { + next = mpsc_r.recv().fuse() => next, + _ = &mut end_receiver => None, + } { + let (remote_to_local_s, remote_to_local_r) = mpsc::channel(Self::MPSC_CHANNEL_BOUND); + local_remote_to_local_s.send(remote_to_local_s).unwrap(); + info!(?addr, "Accepting Mpsc from"); + self.init_protocol(Protocols::new_mpsc(local_to_remote_s, remote_to_local_r), None, true) + .await; + } + warn!("MpscStream Failed, stopping"); },/* ProtocolAddr::Udp(addr) => { let socket = match net::UdpSocket::bind(addr).await { @@ -522,6 +576,7 @@ impl Scheduler { s2b_create_channel_s, s2b_shutdown_bparticipant_s, ) = BParticipant::new( + local_pid, pid, sid, #[cfg(feature = "metrics")] @@ -545,10 +600,11 @@ impl Scheduler { }); drop(participants); trace!("dropped participants lock"); + let p = pid; runtime.spawn( bparticipant .run(participant_channels.b2s_prio_statistic_s) - .instrument(tracing::info_span!("participant", ?pid)), + .instrument(tracing::info_span!("remote", ?p)), ); //create a new channel within BParticipant and wait for it to run let (b2s_create_channel_done_s, b2s_create_channel_done_r) = diff --git a/network/tests/helper.rs b/network/tests/helper.rs index 64c65b0e91..a06b59578c 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -2,7 +2,7 @@ use lazy_static::*; use std::{ net::SocketAddr, sync::{ - atomic::{AtomicU16, Ordering}, + atomic::{AtomicU16, AtomicU64, Ordering}, Arc, }, thread, @@ -92,3 +92,12 @@ pub fn udp() -> veloren_network::ProtocolAddr { let port = PORTS.fetch_add(1, Ordering::Relaxed); veloren_network::ProtocolAddr::Udp(SocketAddr::from(([127, 0, 0, 1], port))) } + +#[allow(dead_code)] +pub fn mpsc() -> veloren_network::ProtocolAddr { + lazy_static! { + static ref PORTS: AtomicU64 = AtomicU64::new(5000); + } + let port = PORTS.fetch_add(1, Ordering::Relaxed); + veloren_network::ProtocolAddr::Mpsc(port) +} diff --git a/network/tests/integration.rs b/network/tests/integration.rs index b78619d65d..fd33dab8e3 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use tokio::runtime::Runtime; use veloren_network::{NetworkError, StreamError}; mod helper; -use helper::{network_participant_stream, tcp, udp}; +use helper::{mpsc, network_participant_stream, tcp, udp}; use std::io::ErrorKind; use veloren_network::{Network, Pid, Promises, ProtocolAddr}; @@ -50,6 +50,31 @@ fn stream_simple_3msg() { } #[test] +fn stream_simple_mpsc() { + let (_, _) = helper::setup(false, 0); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(mpsc()); + + s1_a.send("Hello World").unwrap(); + assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); + drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown +} + +#[test] +fn stream_simple_mpsc_3msg() { + let (_, _) = helper::setup(false, 0); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(mpsc()); + + s1_a.send("Hello World").unwrap(); + s1_a.send(1337).unwrap(); + assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); + assert_eq!(r.block_on(s1_b.recv()), Ok(1337)); + s1_a.send("3rdMessage").unwrap(); + assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string())); + drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown +} + +#[test] +#[ignore] fn stream_simple_udp() { let (_, _) = helper::setup(false, 0); let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp()); @@ -60,6 +85,7 @@ fn stream_simple_udp() { } #[test] +#[ignore] fn stream_simple_udp_3msg() { let (_, _) = helper::setup(false, 0); let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp()); @@ -101,6 +127,7 @@ fn tcp_and_udp_2_connections() -> std::result::Result<(), Box std::result::Result<(), Box> { let (_, _) = helper::setup(false, 0); let r = Arc::new(Runtime::new().unwrap()); diff --git a/server-cli/Cargo.toml b/server-cli/Cargo.toml index 4d8a3cf866..d75e1987ec 100644 --- a/server-cli/Cargo.toml +++ b/server-cli/Cargo.toml @@ -15,7 +15,7 @@ server = { package = "veloren-server", path = "../server", default-features = fa common = { package = "veloren-common", path = "../common" } common-net = { package = "veloren-common-net", path = "../common/net" } -tokio = { version = "1.0.1", default-features = false, features = ["rt-multi-thread"] } +tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] } ansi-parser = "0.7" clap = "2.33" crossterm = "0.18" diff --git a/server-cli/src/logging.rs b/server-cli/src/logging.rs index 6a738ed2c5..18c0952780 100644 --- a/server-cli/src/logging.rs +++ b/server-cli/src/logging.rs @@ -19,6 +19,7 @@ pub fn init(basic: bool) { .add_directive("uvth=warn".parse().unwrap()) .add_directive("tiny_http=warn".parse().unwrap()) .add_directive("mio::sys::windows=debug".parse().unwrap()) + .add_directive("veloren_network_protocol=info".parse().unwrap()) .add_directive( "veloren_server::persistence::character=info" .parse() diff --git a/server/Cargo.toml b/server/Cargo.toml index 8726a037d8..f73ed1f02f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -28,7 +28,8 @@ futures-util = "0.3.7" futures-executor = "0.3" futures-timer = "3.0" futures-channel = "0.3" -tokio = { version = "1.0.1", default-features = false, features = ["rt"] } +tokio = { version = "1", default-features = false, features = ["rt"] } +prometheus-hyper = "0.1.1" itertools = "0.9" lazy_static = "1.4.0" scan_fmt = { git = "https://github.com/Imberflur/scan_fmt" } @@ -41,7 +42,6 @@ hashbrown = { version = "0.9", features = ["rayon", "serde", "nightly"] } rayon = "1.5" crossbeam-channel = "0.5" prometheus = { version = "0.11", default-features = false} -tiny_http = "0.8.0" portpicker = { git = "https://github.com/xMAC94x/portpicker-rs", rev = "df6b37872f3586ac3b21d08b56c8ec7cd92fb172" } authc = { git = "https://gitlab.com/veloren/auth.git", rev = "bffb5181a35c19ddfd33ee0b4aedba741aafb68d" } libsqlite3-sys = { version = "0.18", features = ["bundled"] } diff --git a/server/src/lib.rs b/server/src/lib.rs index a8ac9487fe..40aa2b0804 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -76,12 +76,14 @@ use common_net::{ use common_sys::plugin::PluginMgr; use common_sys::state::State; use futures_executor::block_on; -use metrics::{PhysicsMetrics, ServerMetrics, StateTickMetrics, TickMetrics}; +use metrics::{PhysicsMetrics, StateTickMetrics, TickMetrics}; use network::{Network, Pid, ProtocolAddr}; use persistence::{ character_loader::{CharacterLoader, CharacterLoaderResponseKind}, character_updater::CharacterUpdater, }; +use prometheus::Registry; +use prometheus_hyper::Server as PrometheusServer; use specs::{join::Join, Builder, Entity as EcsEntity, RunNow, SystemData, WorldExt}; use std::{ i32, @@ -91,7 +93,7 @@ use std::{ }; #[cfg(not(feature = "worldgen"))] use test_world::{IndexOwned, World}; -use tokio::runtime::Runtime; +use tokio::{runtime::Runtime, sync::Notify}; use tracing::{debug, error, info, trace}; use uvth::{ThreadPool, ThreadPoolBuilder}; use vek::*; @@ -124,7 +126,7 @@ pub struct Server { _runtime: Arc, thread_pool: ThreadPool, - metrics: ServerMetrics, + metrics_shutdown: Arc, tick_metrics: TickMetrics, state_tick_metrics: StateTickMetrics, physics_metrics: PhysicsMetrics, @@ -350,28 +352,35 @@ impl Server { state.ecs_mut().insert(DeletedEntities::default()); - let mut metrics = ServerMetrics::new(); // register all metrics submodules here - let (tick_metrics, registry_tick) = TickMetrics::new(metrics.tick_clone()) - .expect("Failed to initialize server tick metrics submodule."); + let (tick_metrics, registry_tick) = + TickMetrics::new().expect("Failed to initialize server tick metrics submodule."); let (state_tick_metrics, registry_state) = StateTickMetrics::new().unwrap(); let (physics_metrics, registry_physics) = PhysicsMetrics::new().unwrap(); - registry_chunk(&metrics.registry()).expect("failed to register chunk gen metrics"); - registry_network(&metrics.registry()).expect("failed to register network request metrics"); - registry_player(&metrics.registry()).expect("failed to register player metrics"); - registry_tick(&metrics.registry()).expect("failed to register tick metrics"); - registry_state(&metrics.registry()).expect("failed to register state metrics"); - registry_physics(&metrics.registry()).expect("failed to register state metrics"); + let registry = Arc::new(Registry::new()); + registry_chunk(®istry).expect("failed to register chunk gen metrics"); + registry_network(®istry).expect("failed to register network request metrics"); + registry_player(®istry).expect("failed to register player metrics"); + registry_tick(®istry).expect("failed to register tick metrics"); + registry_state(®istry).expect("failed to register state metrics"); + registry_physics(®istry).expect("failed to register state metrics"); let thread_pool = ThreadPoolBuilder::new() .name("veloren-worker".to_string()) .build(); - let network = - Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &metrics.registry()); - metrics - .run(settings.metrics_address) - .expect("Failed to initialize server metrics submodule."); + let network = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), ®istry); + let metrics_shutdown = Arc::new(Notify::new()); + let metrics_shutdown_clone = Arc::clone(&metrics_shutdown); + let addr = settings.metrics_address; + runtime.spawn(async move { + PrometheusServer::run( + Arc::clone(®istry), + addr, + metrics_shutdown_clone.notified(), + ) + .await + }); block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?; let connection_handler = ConnectionHandler::new(network); @@ -392,7 +401,7 @@ impl Server { _runtime: runtime, thread_pool, - metrics, + metrics_shutdown, tick_metrics, state_tick_metrics, physics_metrics, @@ -904,7 +913,7 @@ impl Server { .tick_time .with_label_values(&["metrics"]) .set(end_of_server_tick.elapsed().as_nanos() as i64); - self.metrics.tick(); + self.tick_metrics.tick(); // 9) Finish the tick, pass control back to the frontend. @@ -1150,6 +1159,7 @@ impl Server { impl Drop for Server { fn drop(&mut self) { + self.metrics_shutdown.notify_one(); self.state .notify_players(ServerGeneral::Disconnect(DisconnectReason::Shutdown)); } diff --git a/server/src/metrics.rs b/server/src/metrics.rs index d52ae33538..ac57121437 100644 --- a/server/src/metrics.rs +++ b/server/src/metrics.rs @@ -1,19 +1,16 @@ use prometheus::{ - Encoder, Gauge, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, - Opts, Registry, TextEncoder, + Gauge, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, + Registry, }; use std::{ convert::TryInto, error::Error, - net::SocketAddr, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicU64, Ordering}, Arc, }, - thread, time::{Duration, SystemTime, UNIX_EPOCH}, }; -use tracing::{debug, error}; type RegistryFn = Box Result<(), prometheus::Error>>; @@ -60,13 +57,6 @@ pub struct TickMetrics { tick: Arc, } -pub struct ServerMetrics { - running: Arc, - handle: Option>, - registry: Option, - tick: Arc, -} - impl PhysicsMetrics { pub fn new() -> Result<(Self, RegistryFn), prometheus::Error> { let entity_entity_collision_checks_count = IntCounter::with_opts(Opts::new( @@ -265,7 +255,7 @@ impl ChunkGenMetrics { } impl TickMetrics { - pub fn new(tick: Arc) -> Result<(Self, RegistryFn), Box> { + pub fn new() -> Result<(Self, RegistryFn), Box> { let chonks_count = IntGauge::with_opts(Opts::new( "chonks_count", "number of all chonks currently active on the server", @@ -315,6 +305,7 @@ impl TickMetrics { let time_of_day_clone = time_of_day.clone(); let light_count_clone = light_count.clone(); let tick_time_clone = tick_time.clone(); + let tick = Arc::new(AtomicU64::new(0)); let f = |registry: &Registry| { registry.register(Box::new(chonks_count_clone))?; @@ -346,87 +337,7 @@ impl TickMetrics { )) } + pub fn tick(&self) { self.tick.fetch_add(1, Ordering::Relaxed); } + pub fn is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 } } - -impl ServerMetrics { - #[allow(clippy::new_without_default)] // TODO: Pending review in #587 - pub fn new() -> Self { - let running = Arc::new(AtomicBool::new(false)); - let tick = Arc::new(AtomicU64::new(0)); - let registry = Some(Registry::new()); - - Self { - running, - handle: None, - registry, - tick, - } - } - - pub fn registry(&self) -> &Registry { - match self.registry { - Some(ref r) => r, - None => panic!("You cannot longer register new metrics after the server has started!"), - } - } - - pub fn run(&mut self, addr: SocketAddr) -> Result<(), Box> { - self.running.store(true, Ordering::Relaxed); - let running2 = Arc::clone(&self.running); - - let registry = self - .registry - .take() - .expect("ServerMetrics must be already started"); - - //TODO: make this a job - self.handle = Some(thread::spawn(move || { - let server = tiny_http::Server::http(addr).unwrap(); - const TIMEOUT: Duration = Duration::from_secs(1); - debug!("starting tiny_http server to serve metrics"); - while running2.load(Ordering::Relaxed) { - let request = match server.recv_timeout(TIMEOUT) { - Ok(Some(rq)) => rq, - Ok(None) => continue, - Err(e) => { - error!(?e, "metrics http server error"); - break; - }, - }; - let mf = registry.gather(); - let encoder = TextEncoder::new(); - let mut buffer = vec![]; - encoder - .encode(&mf, &mut buffer) - .expect("Failed to encoder metrics text."); - let response = tiny_http::Response::from_string( - String::from_utf8(buffer).expect("Failed to parse bytes as a string."), - ); - if let Err(e) = request.respond(response) { - error!( - ?e, - "The metrics HTTP server had encountered and error with answering", - ); - } - } - debug!("stopping tiny_http server to serve metrics"); - })); - Ok(()) - } - - pub fn tick(&self) -> u64 { self.tick.fetch_add(1, Ordering::Relaxed) + 1 } - - pub fn tick_clone(&self) -> Arc { Arc::clone(&self.tick) } -} - -impl Drop for ServerMetrics { - fn drop(&mut self) { - self.running.store(false, Ordering::Relaxed); - let handle = self.handle.take(); - handle - .expect("ServerMetrics worker handle does not exist.") - .join() - .expect("Error shutting down prometheus metric exporter"); - } -} diff --git a/voxygen/Cargo.toml b/voxygen/Cargo.toml index aeebeb0a30..08a7a72c46 100644 --- a/voxygen/Cargo.toml +++ b/voxygen/Cargo.toml @@ -82,7 +82,7 @@ ron = {version = "0.6", default-features = false} serde = {version = "1.0", features = [ "rc", "derive" ]} treeculler = "0.1.0" uvth = "3.1.1" -tokio = { version = "1.0.1", default-features = false, features = ["rt-multi-thread"] } +tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] } num_cpus = "1.0" # vec_map = { version = "0.8.2" } inline_tweak = "1.0.2" diff --git a/voxygen/src/logging.rs b/voxygen/src/logging.rs index e0b4a962df..d0c3bd98d5 100644 --- a/voxygen/src/logging.rs +++ b/voxygen/src/logging.rs @@ -45,6 +45,7 @@ pub fn init(settings: &Settings) -> Vec { .add_directive("uvth=warn".parse().unwrap()) .add_directive("tiny_http=warn".parse().unwrap()) .add_directive("mio::sys::windows=debug".parse().unwrap()) + .add_directive("veloren_network_protocol=info".parse().unwrap()) .add_directive( "veloren_server::persistence::character=info" .parse()