Great improvements to the codebase:

- better logging in network
 - we now notify the send of what happened in recv in participant.
 - works with veloren master servers
 - works in singleplayer, using a actual mid.
 - add `mpsc` in whole stack incl tests
 - speed up internal read/write with `Bytes` crate
 - use `prometheus-hyper` for metrics
 - use a metrics cache
This commit is contained in:
Marcel Märtens 2021-02-10 11:37:42 +01:00
parent 9884019963
commit ea8ab1ce7a
29 changed files with 852 additions and 893 deletions

84
Cargo.lock generated
View File

@ -225,12 +225,6 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eab1c04a571841102f5345a8fc0f6bb3d31c315dec879b5c6e42e40ce7ffa34e"
[[package]]
name = "ascii"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbf56136a5198c7b01a49e3afcbef6cf84597273d298f54432926024107b0109"
[[package]]
name = "assets_manager"
version = "0.4.3"
@ -723,7 +717,7 @@ version = "3.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da3da6baa321ec19e1cc41d31bf599f00c783d0517095cdaf0332e3fe8d20680"
dependencies = [
"ascii 0.9.3",
"ascii",
"byteorder",
"either",
"memchr",
@ -2341,6 +2335,16 @@ dependencies = [
"http",
]
[[package]]
name = "http-body"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2861bd27ee074e5ee891e8b539837a9430012e249d7f0ca2d795650f579c1994"
dependencies = [
"bytes 1.0.1",
"http",
]
[[package]]
name = "httparse"
version = "1.3.5"
@ -2371,7 +2375,7 @@ dependencies = [
"futures-util",
"h2",
"http",
"http-body",
"http-body 0.3.1",
"httparse",
"httpdate",
"itoa",
@ -2383,6 +2387,29 @@ dependencies = [
"want",
]
[[package]]
name = "hyper"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8e946c2b1349055e0b72ae281b238baf1a3ea7307c7e9f9d64673bdd9c26ac7"
dependencies = [
"bytes 1.0.1",
"futures-channel",
"futures-core",
"futures-util",
"http",
"http-body 0.4.0",
"httparse",
"httpdate",
"itoa",
"pin-project 1.0.5",
"socket2",
"tokio 1.2.0",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.21.0"
@ -2391,7 +2418,7 @@ checksum = "37743cc83e8ee85eacfce90f2f4102030d9ff0a95244098d781e9bee4a90abb6"
dependencies = [
"bytes 0.5.6",
"futures-util",
"hyper",
"hyper 0.13.10",
"log",
"rustls 0.18.1",
"tokio 0.2.25",
@ -3952,6 +3979,18 @@ dependencies = [
"thiserror",
]
[[package]]
name = "prometheus-hyper"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc47fa532a12d544229015dd3fae32394949af098b8fe9a327b8c1e4c911d1c8"
dependencies = [
"hyper 0.14.4",
"prometheus",
"tokio 1.2.0",
"tracing",
]
[[package]]
name = "publicsuffix"
version = "1.5.4"
@ -4230,8 +4269,8 @@ dependencies = [
"futures-core",
"futures-util",
"http",
"http-body",
"hyper",
"http-body 0.3.1",
"hyper 0.13.10",
"hyper-rustls",
"ipnet",
"js-sys",
@ -5069,19 +5108,6 @@ dependencies = [
"syn 1.0.60",
]
[[package]]
name = "tiny_http"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eded47106b8e52d8ed8119f0ea6e8c0f5881e69783e0297b5a8462958f334bc1"
dependencies = [
"ascii 1.0.0",
"chrono",
"chunked_transfer",
"log",
"url",
]
[[package]]
name = "tinytemplate"
version = "1.2.0"
@ -5137,8 +5163,11 @@ dependencies = [
"memchr",
"mio 0.7.7",
"num_cpus",
"once_cell",
"pin-project-lite 0.2.4",
"signal-hook-registry",
"tokio-macros",
"winapi 0.3.9",
]
[[package]]
@ -5682,6 +5711,7 @@ dependencies = [
"async-trait",
"bincode",
"bitflags",
"bytes 1.0.1",
"clap",
"crossbeam-channel 0.5.0",
"futures-core",
@ -5689,14 +5719,13 @@ dependencies = [
"lazy_static",
"lz-fear",
"prometheus",
"prometheus-hyper",
"rand 0.8.3",
"serde",
"shellexpand",
"tiny_http",
"tokio 1.2.0",
"tokio-stream",
"tracing",
"tracing-futures",
"tracing-subscriber",
"veloren-network-protocol",
]
@ -5708,6 +5737,7 @@ dependencies = [
"async-channel",
"async-trait",
"bitflags",
"bytes 1.0.1",
"criterion",
"prometheus",
"rand 0.8.3",
@ -5762,6 +5792,7 @@ dependencies = [
"libsqlite3-sys",
"portpicker",
"prometheus",
"prometheus-hyper",
"rand 0.8.3",
"rayon",
"ron",
@ -5771,7 +5802,6 @@ dependencies = [
"slab",
"specs",
"specs-idvs",
"tiny_http",
"tokio 1.2.0",
"tracing",
"uvth",

View File

@ -21,7 +21,7 @@ uvth = "3.1.1"
futures-util = "0.3.7"
futures-executor = "0.3"
futures-timer = "3.0"
tokio = { version = "1.0.1", default-features = false, features = ["rt"] }
tokio = { version = "1", default-features = false, features = ["rt"] }
image = { version = "0.23.12", default-features = false, features = ["png"] }
num = "0.3.1"
num_cpus = "1.10.1"

View File

@ -14,7 +14,7 @@ default = ["metrics","compression"]
[dependencies]
network-protocol = { package = "veloren-network-protocol", path = "protocol", default-features = false }
network-protocol = { package = "veloren-network-protocol", path = "protocol" }
#serialisation
bincode = "1.3.1"
@ -24,8 +24,7 @@ crossbeam-channel = "0.5"
tokio = { version = "1.2", default-features = false, features = ["io-util", "macros", "rt", "net", "time"] }
tokio-stream = { version = "0.1.2", default-features = false }
#tracing and metrics
tracing = { version = "0.1", default-features = false }
tracing-futures = "0.2"
tracing = { version = "0.1", default-features = false, features = ["attributes"]}
prometheus = { version = "0.11", default-features = false, optional = true }
#async
futures-core = { version = "0.3", default-features = false }
@ -39,12 +38,25 @@ bitflags = "1.2.1"
lz-fear = { version = "0.1.1", optional = true }
# async traits
async-trait = "0.1.42"
bytes = "^1"
[dev-dependencies]
tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] }
tokio = { version = "1.1.0", default-features = false, features = ["io-std", "fs", "rt-multi-thread"] }
tokio = { version = "1.2", default-features = false, features = ["io-std", "fs", "rt-multi-thread"] }
futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] }
clap = { version = "2.33", default-features = false }
shellexpand = "2.0.0"
tiny_http = "0.8.0"
serde = { version = "1.0", features = ["derive"] }
prometheus-hyper = "0.1.1"
[[example]]
name = "fileshare"
[[example]]
name = "network-speed"
[[example]]
name = "chat"
[[example]]
name = "tcp_loadtest"

View File

@ -3,11 +3,12 @@
/// (cd network/examples/network-speed && RUST_BACKTRACE=1 cargo run --profile=debuginfo -Z unstable-options -- --trace=error --protocol=tcp --mode=server)
/// (cd network/examples/network-speed && RUST_BACKTRACE=1 cargo run --profile=debuginfo -Z unstable-options -- --trace=error --protocol=tcp --mode=client)
/// ```
mod metrics;
use clap::{App, Arg};
use prometheus::Registry;
use prometheus_hyper::Server;
use serde::{Deserialize, Serialize};
use std::{
net::SocketAddr,
sync::Arc,
thread,
time::{Duration, Instant},
@ -121,9 +122,13 @@ fn main() {
}
fn server(address: ProtocolAddr, runtime: Arc<Runtime>) {
let mut metrics = metrics::SimpleMetrics::new();
let server = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), metrics.registry());
metrics.run("0.0.0.0:59112".parse().unwrap()).unwrap();
let registry = Arc::new(Registry::new());
let server = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &registry);
runtime.spawn(Server::run(
Arc::clone(&registry),
SocketAddr::from(([0; 4], 59112)),
futures_util::future::pending(),
));
runtime.block_on(server.listen(address)).unwrap();
loop {
@ -148,9 +153,13 @@ fn server(address: ProtocolAddr, runtime: Arc<Runtime>) {
}
fn client(address: ProtocolAddr, runtime: Arc<Runtime>) {
let mut metrics = metrics::SimpleMetrics::new();
let client = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), metrics.registry());
metrics.run("0.0.0.0:59111".parse().unwrap()).unwrap();
let registry = Arc::new(Registry::new());
let client = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &registry);
runtime.spawn(Server::run(
Arc::clone(&registry),
SocketAddr::from(([0; 4], 59111)),
futures_util::future::pending(),
));
let p1 = runtime.block_on(client.connect(address)).unwrap(); //remote representation of p1
let mut s1 = runtime

View File

@ -1,92 +0,0 @@
use prometheus::{Encoder, Registry, TextEncoder};
use std::{
error::Error,
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
};
use tracing::*;
pub struct SimpleMetrics {
running: Arc<AtomicBool>,
handle: Option<thread::JoinHandle<()>>,
registry: Option<Registry>,
}
impl SimpleMetrics {
pub fn new() -> Self {
let running = Arc::new(AtomicBool::new(false));
let registry = Some(Registry::new());
Self {
running,
handle: None,
registry,
}
}
pub fn registry(&self) -> &Registry {
match self.registry {
Some(ref r) => r,
None => panic!("You cannot longer register new metrics after the server has started!"),
}
}
pub fn run(&mut self, addr: SocketAddr) -> Result<(), Box<dyn Error>> {
self.running.store(true, Ordering::Relaxed);
let running2 = self.running.clone();
let registry = self
.registry
.take()
.expect("ServerMetrics must be already started");
//TODO: make this a job
self.handle = Some(thread::spawn(move || {
let server = tiny_http::Server::http(addr).unwrap();
const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
debug!("starting tiny_http server to serve metrics");
while running2.load(Ordering::Relaxed) {
let request = match server.recv_timeout(TIMEOUT) {
Ok(Some(rq)) => rq,
Ok(None) => continue,
Err(e) => {
println!("Error: {}", e);
break;
},
};
let mf = registry.gather();
let encoder = TextEncoder::new();
let mut buffer = vec![];
encoder
.encode(&mf, &mut buffer)
.expect("Failed to encoder metrics text.");
let response = tiny_http::Response::from_string(
String::from_utf8(buffer).expect("Failed to parse bytes as a string."),
);
if let Err(e) = request.respond(response) {
error!(
?e,
"The metrics HTTP server had encountered and error with answering"
)
}
}
debug!("Stopping tiny_http server to serve metrics");
}));
Ok(())
}
}
impl Drop for SimpleMetrics {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
let handle = self.handle.take();
handle
.expect("ServerMetrics worker handle does not exist.")
.join()
.expect("Error shutting down prometheus metric exporter");
}
}

View File

@ -9,6 +9,7 @@ edition = "2018"
[features]
metrics = ["prometheus"]
trace_pedantic = [] # use for debug only
default = ["metrics"]
@ -22,6 +23,7 @@ bitflags = "1.2.1"
rand = { version = "0.8" }
# async traits
async-trait = "0.1.42"
bytes = "^1"
[dev-dependencies]
async-channel = "1.5.1"

View File

@ -1,5 +1,6 @@
use async_channel::*;
use async_trait::async_trait;
use bytes::BytesMut;
use criterion::{criterion_group, criterion_main, Criterion};
use std::{sync::Arc, time::Duration};
use veloren_network_protocol::{
@ -8,7 +9,7 @@ use veloren_network_protocol::{
Sid, TcpRecvProtcol, TcpSendProtcol, UnreliableDrain, UnreliableSink, _internal::Frame,
};
fn frame_serialize(frame: Frame, buffer: &mut [u8]) -> usize { frame.to_bytes(buffer).0 }
fn frame_serialize(frame: Frame, buffer: &mut BytesMut) { frame.to_bytes(buffer); }
async fn mpsc_msg(buffer: Arc<MessageBuffer>) {
// Arrrg, need to include constructor here
@ -102,7 +103,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.to_async(rt()).iter(|| mpsc_handshake())
});
let mut buffer = [0u8; 1500];
let mut buffer = BytesMut::with_capacity(1500);
c.bench_function("frame_serialize_short", |b| {
let frame = Frame::Data {
@ -110,7 +111,7 @@ fn criterion_benchmark(c: &mut Criterion) {
start: 89u64,
data: b"hello_world".to_vec(),
};
b.iter(move || frame_serialize(frame.clone(), &mut buffer))
b.iter(|| frame_serialize(frame.clone(), &mut buffer))
});
c.bench_function("tcp_short_msg", |b| {
@ -126,6 +127,11 @@ fn criterion_benchmark(c: &mut Criterion) {
b.to_async(rt())
.iter(|| tcp_msg(Arc::clone(&buffer), 10_000))
});
c.bench_function("tcp_1000000_tiny_msg", |b| {
let buffer = Arc::new(MessageBuffer { data: vec![3u8; 5] });
b.to_async(rt())
.iter(|| tcp_msg(Arc::clone(&buffer), 1_000_000))
});
}
criterion_group!(benches, criterion_benchmark);
@ -164,11 +170,11 @@ mod utils {
}
pub struct TcpDrain {
sender: Sender<Vec<u8>>,
sender: Sender<BytesMut>,
}
pub struct TcpSink {
receiver: Receiver<Vec<u8>>,
receiver: Receiver<BytesMut>,
}
/// emulate Tcp protocol on Channels
@ -219,7 +225,7 @@ mod utils {
#[async_trait]
impl UnreliableDrain for TcpDrain {
type DataFormat = Vec<u8>;
type DataFormat = BytesMut;
async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> {
self.sender
@ -231,7 +237,7 @@ mod utils {
#[async_trait]
impl UnreliableSink for TcpSink {
type DataFormat = Vec<u8>;
type DataFormat = BytesMut;
async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError> {
self.receiver

View File

@ -1,5 +1,5 @@
use crate::types::{Mid, Pid, Prio, Promises, Sid};
use std::{collections::VecDeque, convert::TryFrom};
use bytes::{Buf, BufMut, BytesMut};
// const FRAME_RESERVED_1: u8 = 0;
const FRAME_HANDSHAKE: u8 = 1;
@ -62,37 +62,32 @@ impl InitFrame {
pub(crate) const RAW_CNS: usize = 2;
//provide an appropriate buffer size. > 1500
pub(crate) fn to_bytes(self, bytes: &mut [u8]) -> usize {
pub(crate) fn to_bytes(self, bytes: &mut BytesMut) {
match self {
InitFrame::Handshake {
magic_number,
version,
} => {
let x = FRAME_HANDSHAKE.to_be_bytes();
bytes[0] = x[0];
bytes[1..8].copy_from_slice(&magic_number);
bytes[8..12].copy_from_slice(&version[0].to_le_bytes());
bytes[12..16].copy_from_slice(&version[1].to_le_bytes());
bytes[16..Self::HANDSHAKE_CNS + 1].copy_from_slice(&version[2].to_le_bytes());
Self::HANDSHAKE_CNS + 1
bytes.put_u8(FRAME_HANDSHAKE);
bytes.put_slice(&magic_number);
bytes.put_u32_le(version[0]);
bytes.put_u32_le(version[1]);
bytes.put_u32_le(version[2]);
},
InitFrame::Init { pid, secret } => {
bytes[0] = FRAME_INIT.to_be_bytes()[0];
bytes[1..17].copy_from_slice(&pid.to_le_bytes());
bytes[17..Self::INIT_CNS + 1].copy_from_slice(&secret.to_le_bytes());
Self::INIT_CNS + 1
bytes.put_u8(FRAME_INIT);
pid.to_bytes(bytes);
bytes.put_u128_le(secret);
},
InitFrame::Raw(data) => {
bytes[0] = FRAME_RAW.to_be_bytes()[0];
bytes[1..3].copy_from_slice(&(data.len() as u16).to_le_bytes());
bytes[Self::RAW_CNS + 1..(data.len() + Self::RAW_CNS + 1)]
.clone_from_slice(&data[..]);
Self::RAW_CNS + 1 + data.len()
bytes.put_u8(FRAME_RAW);
bytes.put_u16_le(data.len() as u16);
bytes.put_slice(&data);
},
}
}
pub(crate) fn to_frame(bytes: Vec<u8>) -> Option<Self> {
pub(crate) fn to_frame(bytes: &mut BytesMut) -> Option<Self> {
let frame_no = match bytes.get(0) {
Some(&f) => f,
None => return None,
@ -102,61 +97,43 @@ impl InitFrame {
if bytes.len() < Self::HANDSHAKE_CNS + 1 {
return None;
}
InitFrame::gen_handshake(
*<&[u8; Self::HANDSHAKE_CNS]>::try_from(&bytes[1..Self::HANDSHAKE_CNS + 1])
.unwrap(),
)
bytes.advance(1);
let mut magic_number_bytes = bytes.copy_to_bytes(7);
let mut magic_number = [0u8; 7];
magic_number_bytes.copy_to_slice(&mut magic_number);
InitFrame::Handshake {
magic_number,
version: [bytes.get_u32_le(), bytes.get_u32_le(), bytes.get_u32_le()],
}
},
FRAME_INIT => {
if bytes.len() < Self::INIT_CNS + 1 {
return None;
}
InitFrame::gen_init(
*<&[u8; Self::INIT_CNS]>::try_from(&bytes[1..Self::INIT_CNS + 1]).unwrap(),
)
bytes.advance(1);
InitFrame::Init {
pid: Pid::from_bytes(bytes),
secret: bytes.get_u128_le(),
}
},
FRAME_RAW => {
if bytes.len() < Self::RAW_CNS + 1 {
return None;
}
let length = InitFrame::gen_raw(
*<&[u8; Self::RAW_CNS]>::try_from(&bytes[1..Self::RAW_CNS + 1]).unwrap(),
);
let mut data = vec![0; length as usize];
let slice = &bytes[Self::RAW_CNS + 1..];
if slice.len() != length as usize {
return None;
}
data.copy_from_slice(&bytes[Self::RAW_CNS + 1..]);
bytes.advance(1);
let length = bytes.get_u16_le() as usize;
// lower length is allowed
let max_length = length.min(bytes.len());
println!("dasdasd {:?}", length);
println!("aaaaa {:?}", max_length);
let mut data = vec![0; max_length];
data.copy_from_slice(&bytes[..max_length]);
InitFrame::Raw(data)
},
_ => InitFrame::Raw(bytes),
_ => InitFrame::Raw(bytes.to_vec()),
};
Some(frame)
}
fn gen_handshake(buf: [u8; Self::HANDSHAKE_CNS]) -> Self {
let magic_number = *<&[u8; 7]>::try_from(&buf[0..7]).unwrap();
InitFrame::Handshake {
magic_number,
version: [
u32::from_le_bytes(*<&[u8; 4]>::try_from(&buf[7..11]).unwrap()),
u32::from_le_bytes(*<&[u8; 4]>::try_from(&buf[11..15]).unwrap()),
u32::from_le_bytes(*<&[u8; 4]>::try_from(&buf[15..Self::HANDSHAKE_CNS]).unwrap()),
],
}
}
fn gen_init(buf: [u8; Self::INIT_CNS]) -> Self {
InitFrame::Init {
pid: Pid::from_le_bytes(*<&[u8; 16]>::try_from(&buf[0..16]).unwrap()),
secret: u128::from_le_bytes(*<&[u8; 16]>::try_from(&buf[16..Self::INIT_CNS]).unwrap()),
}
}
fn gen_raw(buf: [u8; Self::RAW_CNS]) -> u16 {
u16::from_le_bytes(*<&[u8; 2]>::try_from(&buf[0..Self::RAW_CNS]).unwrap())
}
}
impl Frame {
@ -164,82 +141,53 @@ impl Frame {
/// const part of the DATA frame, actual size is variable
pub(crate) const DATA_CNS: usize = 18;
pub(crate) const DATA_HEADER_CNS: usize = 24;
#[cfg(feature = "metrics")]
pub const FRAMES_LEN: u8 = 5;
pub(crate) const OPEN_STREAM_CNS: usize = 10;
// Size WITHOUT the 1rst indicating byte
pub(crate) const SHUTDOWN_CNS: usize = 0;
#[cfg(feature = "metrics")]
pub const fn int_to_string(i: u8) -> &'static str {
match i {
0 => "Shutdown",
1 => "OpenStream",
2 => "CloseStream",
3 => "DataHeader",
4 => "Data",
_ => "",
}
}
#[cfg(feature = "metrics")]
pub fn get_int(&self) -> u8 {
match self {
Frame::Shutdown => 0,
Frame::OpenStream { .. } => 1,
Frame::CloseStream { .. } => 2,
Frame::DataHeader { .. } => 3,
Frame::Data { .. } => 4,
}
}
#[cfg(feature = "metrics")]
pub fn get_string(&self) -> &str { Self::int_to_string(self.get_int()) }
//provide an appropriate buffer size. > 1500
pub fn to_bytes(self, bytes: &mut [u8]) -> (/* buf */ usize, /* actual data */ u64) {
pub fn to_bytes(self, bytes: &mut BytesMut) -> u64 {
match self {
Frame::Shutdown => {
bytes[Self::SHUTDOWN_CNS] = FRAME_SHUTDOWN.to_be_bytes()[0];
(Self::SHUTDOWN_CNS + 1, 0)
bytes.put_u8(FRAME_SHUTDOWN);
0
},
Frame::OpenStream {
sid,
prio,
promises,
} => {
bytes[0] = FRAME_OPEN_STREAM.to_be_bytes()[0];
bytes[1..9].copy_from_slice(&sid.to_le_bytes());
bytes[9] = prio.to_le_bytes()[0];
bytes[Self::OPEN_STREAM_CNS] = promises.to_le_bytes()[0];
(Self::OPEN_STREAM_CNS + 1, 0)
bytes.put_u8(FRAME_OPEN_STREAM);
bytes.put_slice(&sid.to_le_bytes());
bytes.put_u8(prio);
bytes.put_u8(promises.to_le_bytes()[0]);
0
},
Frame::CloseStream { sid } => {
bytes[0] = FRAME_CLOSE_STREAM.to_be_bytes()[0];
bytes[1..Self::CLOSE_STREAM_CNS + 1].copy_from_slice(&sid.to_le_bytes());
(Self::CLOSE_STREAM_CNS + 1, 0)
bytes.put_u8(FRAME_CLOSE_STREAM);
bytes.put_slice(&sid.to_le_bytes());
0
},
Frame::DataHeader { mid, sid, length } => {
bytes[0] = FRAME_DATA_HEADER.to_be_bytes()[0];
bytes[1..9].copy_from_slice(&mid.to_le_bytes());
bytes[9..17].copy_from_slice(&sid.to_le_bytes());
bytes[17..Self::DATA_HEADER_CNS + 1].copy_from_slice(&length.to_le_bytes());
(Self::DATA_HEADER_CNS + 1, 0)
bytes.put_u8(FRAME_DATA_HEADER);
bytes.put_u64_le(mid);
bytes.put_slice(&sid.to_le_bytes());
bytes.put_u64_le(length);
0
},
Frame::Data { mid, start, data } => {
bytes[0] = FRAME_DATA.to_be_bytes()[0];
bytes[1..9].copy_from_slice(&mid.to_le_bytes());
bytes[9..17].copy_from_slice(&start.to_le_bytes());
bytes[17..Self::DATA_CNS + 1].copy_from_slice(&(data.len() as u16).to_le_bytes());
bytes[Self::DATA_CNS + 1..(data.len() + Self::DATA_CNS + 1)]
.clone_from_slice(&data[..]);
(Self::DATA_CNS + 1 + data.len(), data.len() as u64)
bytes.put_u8(FRAME_DATA);
bytes.put_u64_le(mid);
bytes.put_u64_le(start);
bytes.put_u16_le(data.len() as u16);
bytes.put_slice(&data);
data.len() as u64
},
}
}
pub(crate) fn to_frame(bytes: &mut VecDeque<u8>) -> Option<Self> {
let frame_no = match bytes.get(0) {
pub(crate) fn to_frame(bytes: &mut BytesMut) -> Option<Self> {
let frame_no = match bytes.first() {
Some(&f) => f,
None => return None,
};
@ -249,6 +197,9 @@ impl Frame {
FRAME_CLOSE_STREAM => Self::CLOSE_STREAM_CNS,
FRAME_DATA_HEADER => Self::DATA_HEADER_CNS,
FRAME_DATA => {
if bytes.len() < 17 + 1 + 1 {
return None;
}
u16::from_le_bytes([bytes[16 + 1], bytes[17 + 1]]) as usize + Self::DATA_CNS
},
_ => return None,
@ -260,68 +211,49 @@ impl Frame {
let frame = match frame_no {
FRAME_SHUTDOWN => {
let _ = bytes.drain(..size + 1);
let _ = bytes.split_to(size + 1);
Frame::Shutdown
},
FRAME_OPEN_STREAM => {
let bytes = bytes.drain(..size + 1).skip(1).collect::<Vec<u8>>();
Frame::gen_open_stream(<[u8; 10]>::try_from(bytes).unwrap())
let mut bytes = bytes.split_to(size + 1);
bytes.advance(1);
Frame::OpenStream {
sid: Sid::new(bytes.get_u64_le()),
prio: bytes.get_u8(),
promises: Promises::from_bits_truncate(bytes.get_u8()),
}
},
FRAME_CLOSE_STREAM => {
let bytes = bytes.drain(..size + 1).skip(1).collect::<Vec<u8>>();
Frame::gen_close_stream(<[u8; 8]>::try_from(bytes).unwrap())
let mut bytes = bytes.split_to(size + 1);
bytes.advance(1);
Frame::CloseStream {
sid: Sid::new(bytes.get_u64_le()),
}
},
FRAME_DATA_HEADER => {
let bytes = bytes.drain(..size + 1).skip(1).collect::<Vec<u8>>();
Frame::gen_data_header(<[u8; 24]>::try_from(bytes).unwrap())
let mut bytes = bytes.split_to(size + 1);
bytes.advance(1);
Frame::DataHeader {
mid: bytes.get_u64_le(),
sid: Sid::new(bytes.get_u64_le()),
length: bytes.get_u64_le(),
}
},
FRAME_DATA => {
let info = bytes
.drain(..Self::DATA_CNS + 1)
.skip(1)
.collect::<Vec<u8>>();
let (mid, start, length) = Frame::gen_data(<[u8; 18]>::try_from(info).unwrap());
let mut info = bytes.split_to(Self::DATA_CNS + 1);
info.advance(1);
let mid = info.get_u64_le();
let start = info.get_u64_le();
let length = info.get_u16_le();
debug_assert_eq!(length as usize, size - Self::DATA_CNS);
let data = bytes.drain(..length as usize).collect::<Vec<u8>>();
let data = bytes.split_to(length as usize);
let data = data.to_vec();
Frame::Data { mid, start, data }
},
_ => unreachable!("Frame::to_frame should be handled before!"),
};
Some(frame)
}
fn gen_open_stream(buf: [u8; Self::OPEN_STREAM_CNS]) -> Self {
Frame::OpenStream {
sid: Sid::from_le_bytes(*<&[u8; 8]>::try_from(&buf[0..8]).unwrap()),
prio: buf[8],
promises: Promises::from_bits_truncate(buf[Self::OPEN_STREAM_CNS - 1]),
}
}
fn gen_close_stream(buf: [u8; Self::CLOSE_STREAM_CNS]) -> Self {
Frame::CloseStream {
sid: Sid::from_le_bytes(
*<&[u8; 8]>::try_from(&buf[0..Self::CLOSE_STREAM_CNS]).unwrap(),
),
}
}
fn gen_data_header(buf: [u8; Self::DATA_HEADER_CNS]) -> Self {
Frame::DataHeader {
mid: Mid::from_le_bytes(*<&[u8; 8]>::try_from(&buf[0..8]).unwrap()),
sid: Sid::from_le_bytes(*<&[u8; 8]>::try_from(&buf[8..16]).unwrap()),
length: u64::from_le_bytes(
*<&[u8; 8]>::try_from(&buf[16..Self::DATA_HEADER_CNS]).unwrap(),
),
}
}
fn gen_data(buf: [u8; Self::DATA_CNS]) -> (Mid, u64, u16) {
let mid = Mid::from_le_bytes(*<&[u8; 8]>::try_from(&buf[0..8]).unwrap());
let start = u64::from_le_bytes(*<&[u8; 8]>::try_from(&buf[8..16]).unwrap());
let length = u16::from_le_bytes(*<&[u8; 2]>::try_from(&buf[16..Self::DATA_CNS]).unwrap());
(mid, start, length)
}
}
#[cfg(test)]
@ -375,10 +307,9 @@ mod tests {
#[test]
fn initframe_individual() {
let dupl = |frame: InitFrame| {
let mut buffer = vec![0u8; 1500];
let size = InitFrame::to_bytes(frame.clone(), &mut buffer);
buffer.truncate(size);
InitFrame::to_frame(buffer)
let mut buffer = BytesMut::with_capacity(1500);
InitFrame::to_bytes(frame.clone(), &mut buffer);
InitFrame::to_frame(&mut buffer)
};
for frame in get_initframes() {
@ -389,29 +320,18 @@ mod tests {
#[test]
fn initframe_multiple() {
let mut buffer = vec![0u8; 3000];
let mut buffer = BytesMut::with_capacity(3000);
let mut frames = get_initframes();
let mut last = 0;
// to string
let sizes = frames
.iter()
.map(|f| {
let s = InitFrame::to_bytes(f.clone(), &mut buffer[last..]);
last += s;
s
})
.collect::<Vec<_>>();
for f in &frames {
InitFrame::to_bytes(f.clone(), &mut buffer);
}
// from string
let mut last = 0;
let mut framesd = sizes
let mut framesd = frames
.iter()
.map(|&s| {
let f = InitFrame::to_frame(buffer[last..last + s].to_vec());
last += s;
f
})
.map(|&_| InitFrame::to_frame(&mut buffer))
.collect::<Vec<_>>();
// compare
@ -424,10 +344,9 @@ mod tests {
#[test]
fn frame_individual() {
let dupl = |frame: Frame| {
let mut buffer = vec![0u8; 1500];
let (size, _) = Frame::to_bytes(frame.clone(), &mut buffer);
let mut deque = buffer[..size].iter().map(|b| *b).collect();
Frame::to_frame(&mut deque)
let mut buffer = BytesMut::with_capacity(1500);
Frame::to_bytes(frame.clone(), &mut buffer);
Frame::to_frame(&mut buffer)
};
for frame in get_frames() {
@ -438,31 +357,16 @@ mod tests {
#[test]
fn frame_multiple() {
let mut buffer = vec![0u8; 3000];
let mut buffer = BytesMut::with_capacity(3000);
let mut frames = get_frames();
let mut last = 0;
// to string
let sizes = frames
.iter()
.map(|f| {
let s = Frame::to_bytes(f.clone(), &mut buffer[last..]).0;
last += s;
s
})
.collect::<Vec<_>>();
assert_eq!(sizes[0], 1 + Frame::OPEN_STREAM_CNS);
assert_eq!(sizes[1], 1 + Frame::DATA_HEADER_CNS);
assert_eq!(sizes[2], 1 + Frame::DATA_CNS + 20);
assert_eq!(sizes[3], 1 + Frame::DATA_CNS + 16);
assert_eq!(sizes[4], 1 + Frame::CLOSE_STREAM_CNS);
assert_eq!(sizes[5], 1 + Frame::SHUTDOWN_CNS);
let mut buffer = buffer.drain(..).collect::<VecDeque<_>>();
for f in &frames {
Frame::to_bytes(f.clone(), &mut buffer);
}
// from string
let mut framesd = sizes
let mut framesd = frames
.iter()
.map(|&_| Frame::to_frame(&mut buffer))
.collect::<Vec<_>>();
@ -476,32 +380,31 @@ mod tests {
#[test]
fn frame_exact_size() {
let mut buffer = vec![0u8; Frame::CLOSE_STREAM_CNS+1/*first byte*/];
const SIZE: usize = Frame::CLOSE_STREAM_CNS+1/*first byte*/;
let mut buffer = BytesMut::with_capacity(SIZE);
let frame1 = Frame::CloseStream {
sid: Sid::new(1337),
};
let _ = Frame::to_bytes(frame1.clone(), &mut buffer);
let frame1 = Frame::CloseStream { sid: Sid::new(2) };
Frame::to_bytes(frame1.clone(), &mut buffer);
assert_eq!(buffer.len(), SIZE);
let mut deque = buffer.iter().map(|b| *b).collect();
let frame2 = Frame::to_frame(&mut deque);
assert_eq!(Some(frame1), frame2);
}
#[test]
#[should_panic]
fn initframe_too_short_buffer() {
let mut buffer = vec![0u8; 10];
let mut buffer = BytesMut::with_capacity(10);
let frame1 = InitFrame::Handshake {
magic_number: VELOREN_MAGIC_NUMBER,
version: VELOREN_NETWORK_VERSION,
};
let _ = InitFrame::to_bytes(frame1.clone(), &mut buffer);
InitFrame::to_bytes(frame1.clone(), &mut buffer);
}
#[test]
fn initframe_too_less_data() {
let mut buffer = vec![0u8; 20];
let mut buffer = BytesMut::with_capacity(20);
let frame1 = InitFrame::Handshake {
magic_number: VELOREN_MAGIC_NUMBER,
@ -509,79 +412,78 @@ mod tests {
};
let _ = InitFrame::to_bytes(frame1.clone(), &mut buffer);
buffer.truncate(6); // simulate partial retrieve
let frame1d = InitFrame::to_frame(buffer[..6].to_vec());
let frame1d = InitFrame::to_frame(&mut buffer);
assert_eq!(frame1d, None);
}
#[test]
fn initframe_rubish() {
let buffer = b"dtrgwcser".to_vec();
let mut buffer = BytesMut::from(&b"dtrgwcser"[..]);
assert_eq!(
InitFrame::to_frame(buffer),
InitFrame::to_frame(&mut buffer),
Some(InitFrame::Raw(b"dtrgwcser".to_vec()))
);
}
#[test]
fn initframe_attack_too_much_length() {
let mut buffer = vec![0u8; 50];
let mut buffer = BytesMut::with_capacity(50);
let frame1 = InitFrame::Raw(b"foobar".to_vec());
let _ = InitFrame::to_bytes(frame1.clone(), &mut buffer);
buffer[2] = 255;
let framed = InitFrame::to_frame(buffer);
assert_eq!(framed, None);
buffer[1] = 255;
let framed = InitFrame::to_frame(&mut buffer);
assert_eq!(framed, Some(frame1));
}
#[test]
fn initframe_attack_too_low_length() {
let mut buffer = vec![0u8; 50];
let mut buffer = BytesMut::with_capacity(50);
let frame1 = InitFrame::Raw(b"foobar".to_vec());
let _ = InitFrame::to_bytes(frame1.clone(), &mut buffer);
buffer[2] = 3;
let framed = InitFrame::to_frame(buffer);
assert_eq!(framed, None);
buffer[1] = 3;
let framed = InitFrame::to_frame(&mut buffer);
// we accept a different frame here, as it's RAW and debug only!
assert_eq!(framed, Some(InitFrame::Raw(b"foo".to_vec())));
}
#[test]
#[should_panic]
fn frame_too_short_buffer() {
let mut buffer = vec![0u8; 10];
let mut buffer = BytesMut::with_capacity(10);
let frame1 = Frame::OpenStream {
sid: Sid::new(88),
promises: Promises::ENCRYPTED,
prio: 88,
};
let _ = Frame::to_bytes(frame1.clone(), &mut buffer);
Frame::to_bytes(frame1.clone(), &mut buffer);
}
#[test]
fn frame_too_less_data() {
let mut buffer = vec![0u8; 20];
let mut buffer = BytesMut::with_capacity(20);
let frame1 = Frame::OpenStream {
sid: Sid::new(88),
promises: Promises::ENCRYPTED,
prio: 88,
};
let _ = Frame::to_bytes(frame1.clone(), &mut buffer);
Frame::to_bytes(frame1.clone(), &mut buffer);
buffer.truncate(6); // simulate partial retrieve
let mut buffer = buffer.drain(..6).collect::<VecDeque<_>>();
let frame1d = Frame::to_frame(&mut buffer);
assert_eq!(frame1d, None);
}
#[test]
fn frame_rubish() {
let mut buffer = b"dtrgwcser".iter().map(|u| *u).collect::<VecDeque<_>>();
let mut buffer = BytesMut::from(&b"dtrgwcser"[..]);
assert_eq!(Frame::to_frame(&mut buffer), None);
}
#[test]
fn frame_attack_too_much_length() {
let mut buffer = vec![0u8; 50];
let mut buffer = BytesMut::with_capacity(50);
let frame1 = Frame::Data {
mid: 7u64,
@ -589,16 +491,15 @@ mod tests {
data: b"foobar".to_vec(),
};
let _ = Frame::to_bytes(frame1.clone(), &mut buffer);
Frame::to_bytes(frame1.clone(), &mut buffer);
buffer[17] = 255;
let mut buffer = buffer.drain(..).collect::<VecDeque<_>>();
let framed = Frame::to_frame(&mut buffer);
assert_eq!(framed, None);
}
#[test]
fn frame_attack_too_low_length() {
let mut buffer = vec![0u8; 50];
let mut buffer = BytesMut::with_capacity(50);
let frame1 = Frame::Data {
mid: 7u64,
@ -606,9 +507,8 @@ mod tests {
data: b"foobar".to_vec(),
};
let _ = Frame::to_bytes(frame1.clone(), &mut buffer);
Frame::to_bytes(frame1.clone(), &mut buffer);
buffer[17] = 3;
let mut buffer = buffer.drain(..).collect::<VecDeque<_>>();
let framed = Frame::to_frame(&mut buffer);
assert_eq!(
framed,
@ -622,13 +522,4 @@ mod tests {
let framed = Frame::to_frame(&mut buffer);
assert_eq!(framed, None);
}
#[test]
fn frame_int2str() {
assert_eq!(Frame::int_to_string(0), "Shutdown");
assert_eq!(Frame::int_to_string(1), "OpenStream");
assert_eq!(Frame::int_to_string(2), "CloseStream");
assert_eq!(Frame::int_to_string(3), "DataHeader");
assert_eq!(Frame::int_to_string(4), "Data");
}
}

View File

@ -1,5 +1,6 @@
use crate::ProtocolError;
use async_trait::async_trait;
use bytes::BytesMut;
use std::collections::VecDeque;
///! I/O-Free (Sans-I/O) protocol https://sans-io.readthedocs.io/how-to-sans-io.html
@ -17,11 +18,11 @@ pub trait UnreliableSink: Send {
}
pub struct BaseDrain {
data: VecDeque<Vec<u8>>,
data: VecDeque<BytesMut>,
}
pub struct BaseSink {
data: VecDeque<Vec<u8>>,
data: VecDeque<BytesMut>,
}
impl BaseDrain {
@ -44,7 +45,7 @@ impl BaseSink {
#[async_trait]
impl UnreliableDrain for BaseDrain {
type DataFormat = Vec<u8>;
type DataFormat = BytesMut;
async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> {
self.data.push_back(data);
@ -54,7 +55,7 @@ impl UnreliableDrain for BaseDrain {
#[async_trait]
impl UnreliableSink for BaseSink {
type DataFormat = Vec<u8>;
type DataFormat = BytesMut;
async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError> {
self.data.pop_front().ok_or(ProtocolError::Closed)

View File

@ -40,6 +40,9 @@ pub trait InitProtocol {
pub trait SendProtocol {
//a stream MUST be bound to a specific Protocol, there will be a failover
// feature comming for the case where a Protocol fails completly
/// use this to notify the sending side of streams that were created/remove
/// from remote
fn notify_from_recv(&mut self, event: ProtocolEvent);
async fn send(&mut self, event: ProtocolEvent) -> Result<(), ProtocolError>;
async fn flush(
&mut self,

View File

@ -1,8 +1,11 @@
use crate::types::Sid;
#[cfg(feature = "metrics")]
use prometheus::{IntCounterVec, IntGaugeVec, Opts, Registry};
use prometheus::{
core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
IntCounterVec, IntGaugeVec, Opts, Registry,
};
#[cfg(feature = "metrics")]
use std::{error::Error, sync::Arc};
use std::{collections::HashMap, error::Error, sync::Arc};
#[allow(dead_code)]
pub enum RemoveReason {
@ -57,6 +60,12 @@ pub struct ProtocolMetrics {
pub struct ProtocolMetricCache {
cid: String,
m: Arc<ProtocolMetrics>,
cache: HashMap<Sid, CacheLine>,
sdata_frames_t: GenericCounter<AtomicU64>,
sdata_frames_b: GenericCounter<AtomicU64>,
rdata_frames_t: GenericCounter<AtomicU64>,
rdata_frames_b: GenericCounter<AtomicU64>,
ping: GenericGauge<AtomicI64>,
}
#[cfg(not(feature = "metrics"))]
@ -192,179 +201,134 @@ impl ProtocolMetrics {
}
}
#[cfg(feature = "metrics")]
#[derive(Debug, Clone)]
pub(crate) struct CacheLine {
smsg_it: GenericCounter<AtomicU64>,
smsg_ib: GenericCounter<AtomicU64>,
smsg_ot: [GenericCounter<AtomicU64>; 2],
smsg_ob: [GenericCounter<AtomicU64>; 2],
rmsg_it: GenericCounter<AtomicU64>,
rmsg_ib: GenericCounter<AtomicU64>,
rmsg_ot: [GenericCounter<AtomicU64>; 2],
rmsg_ob: [GenericCounter<AtomicU64>; 2],
}
#[cfg(feature = "metrics")]
impl ProtocolMetricCache {
pub fn new(channel_key: &str, metrics: Arc<ProtocolMetrics>) -> Self {
let cid = channel_key.to_string();
let sdata_frames_t = metrics.sdata_frames_t.with_label_values(&[&cid]);
let sdata_frames_b = metrics.sdata_frames_b.with_label_values(&[&cid]);
let rdata_frames_t = metrics.rdata_frames_t.with_label_values(&[&cid]);
let rdata_frames_b = metrics.rdata_frames_b.with_label_values(&[&cid]);
let ping = metrics.ping.with_label_values(&[&cid]);
Self {
cid: channel_key.to_string(),
cid,
m: metrics,
cache: HashMap::new(),
sdata_frames_t,
sdata_frames_b,
rdata_frames_t,
rdata_frames_b,
ping,
}
}
pub(crate) fn smsg_it(&self, sid: Sid) {
self.m
.smsg_it
.with_label_values(&[&self.cid, &sid.to_string()])
.inc();
pub(crate) fn init_sid(&mut self, sid: Sid) -> &CacheLine {
let cid = &self.cid;
let m = &self.m;
self.cache.entry(sid).or_insert_with_key(|sid| {
let s = sid.to_string();
let finished = RemoveReason::Finished.to_str();
let dropped = RemoveReason::Dropped.to_str();
CacheLine {
smsg_it: m.smsg_it.with_label_values(&[&cid, &s]),
smsg_ib: m.smsg_ib.with_label_values(&[&cid, &s]),
smsg_ot: [
m.smsg_ot.with_label_values(&[&cid, &s, &finished]),
m.smsg_ot.with_label_values(&[&cid, &s, &dropped]),
],
smsg_ob: [
m.smsg_ob.with_label_values(&[&cid, &s, &finished]),
m.smsg_ob.with_label_values(&[&cid, &s, &dropped]),
],
rmsg_it: m.rmsg_it.with_label_values(&[&cid, &s]),
rmsg_ib: m.rmsg_ib.with_label_values(&[&cid, &s]),
rmsg_ot: [
m.rmsg_ot.with_label_values(&[&cid, &s, &finished]),
m.rmsg_ot.with_label_values(&[&cid, &s, &dropped]),
],
rmsg_ob: [
m.rmsg_ob.with_label_values(&[&cid, &s, &finished]),
m.rmsg_ob.with_label_values(&[&cid, &s, &dropped]),
],
}
})
}
pub(crate) fn smsg_ib(&self, sid: Sid, bytes: u64) {
self.m
.smsg_ib
.with_label_values(&[&self.cid, &sid.to_string()])
.inc_by(bytes);
pub(crate) fn smsg_ib(&mut self, sid: Sid, bytes: u64) {
let line = self.init_sid(sid);
line.smsg_it.inc();
line.smsg_ib.inc_by(bytes);
}
pub(crate) fn smsg_ot(&self, sid: Sid, reason: RemoveReason) {
self.m
.smsg_ot
.with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()])
.inc();
pub(crate) fn smsg_ob(&mut self, sid: Sid, reason: RemoveReason, bytes: u64) {
let line = self.init_sid(sid);
line.smsg_ot[reason.i()].inc();
line.smsg_ob[reason.i()].inc_by(bytes);
}
pub(crate) fn smsg_ob(&self, sid: Sid, reason: RemoveReason, bytes: u64) {
self.m
.smsg_ob
.with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()])
.inc_by(bytes);
pub(crate) fn sdata_frames_b(&mut self, bytes: u64) {
self.sdata_frames_t.inc();
self.sdata_frames_b.inc_by(bytes);
}
pub(crate) fn sdata_frames_t(&self) {
self.m.sdata_frames_t.with_label_values(&[&self.cid]).inc();
pub(crate) fn rmsg_ib(&mut self, sid: Sid, bytes: u64) {
let line = self.init_sid(sid);
line.rmsg_it.inc();
line.rmsg_ib.inc_by(bytes);
}
pub(crate) fn sdata_frames_b(&self, bytes: u64) {
self.m
.sdata_frames_b
.with_label_values(&[&self.cid])
.inc_by(bytes);
pub(crate) fn rmsg_ob(&mut self, sid: Sid, reason: RemoveReason, bytes: u64) {
let line = self.init_sid(sid);
line.rmsg_ot[reason.i()].inc();
line.rmsg_ob[reason.i()].inc_by(bytes);
}
pub(crate) fn rmsg_it(&self, sid: Sid) {
self.m
.rmsg_it
.with_label_values(&[&self.cid, &sid.to_string()])
.inc();
}
pub(crate) fn rmsg_ib(&self, sid: Sid, bytes: u64) {
self.m
.rmsg_ib
.with_label_values(&[&self.cid, &sid.to_string()])
.inc_by(bytes);
}
pub(crate) fn rmsg_ot(&self, sid: Sid, reason: RemoveReason) {
self.m
.rmsg_ot
.with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()])
.inc();
}
pub(crate) fn rmsg_ob(&self, sid: Sid, reason: RemoveReason, bytes: u64) {
self.m
.rmsg_ob
.with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()])
.inc_by(bytes);
}
pub(crate) fn rdata_frames_t(&self) {
self.m.rdata_frames_t.with_label_values(&[&self.cid]).inc();
}
pub(crate) fn rdata_frames_b(&self, bytes: u64) {
self.m
.rdata_frames_b
.with_label_values(&[&self.cid])
.inc_by(bytes);
pub(crate) fn rdata_frames_b(&mut self, bytes: u64) {
self.rdata_frames_t.inc();
self.rdata_frames_b.inc_by(bytes);
}
#[cfg(test)]
pub(crate) fn assert_msg(&self, sid: Sid, cnt: u64, reason: RemoveReason) {
assert_eq!(
self.m
.smsg_it
.with_label_values(&[&self.cid, &sid.to_string()])
.get(),
cnt
);
assert_eq!(
self.m
.smsg_ot
.with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()])
.get(),
cnt
);
assert_eq!(
self.m
.rmsg_it
.with_label_values(&[&self.cid, &sid.to_string()])
.get(),
cnt
);
assert_eq!(
self.m
.rmsg_ot
.with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()])
.get(),
cnt
);
pub(crate) fn assert_msg(&mut self, sid: Sid, cnt: u64, reason: RemoveReason) {
let line = self.init_sid(sid);
assert_eq!(line.smsg_it.get(), cnt);
assert_eq!(line.smsg_ot[reason.i()].get(), cnt);
assert_eq!(line.rmsg_it.get(), cnt);
assert_eq!(line.rmsg_ot[reason.i()].get(), cnt);
}
#[cfg(test)]
pub(crate) fn assert_msg_bytes(&self, sid: Sid, bytes: u64, reason: RemoveReason) {
assert_eq!(
self.m
.smsg_ib
.with_label_values(&[&self.cid, &sid.to_string()])
.get(),
bytes
);
assert_eq!(
self.m
.smsg_ob
.with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()])
.get(),
bytes
);
assert_eq!(
self.m
.rmsg_ib
.with_label_values(&[&self.cid, &sid.to_string()])
.get(),
bytes
);
assert_eq!(
self.m
.rmsg_ob
.with_label_values(&[&self.cid, &sid.to_string(), reason.to_str()])
.get(),
bytes
);
pub(crate) fn assert_msg_bytes(&mut self, sid: Sid, bytes: u64, reason: RemoveReason) {
let line = self.init_sid(sid);
assert_eq!(line.smsg_ib.get(), bytes);
assert_eq!(line.smsg_ob[reason.i()].get(), bytes);
assert_eq!(line.rmsg_ib.get(), bytes);
assert_eq!(line.rmsg_ob[reason.i()].get(), bytes);
}
#[cfg(test)]
pub(crate) fn assert_data_frames(&self, cnt: u64) {
assert_eq!(
self.m.sdata_frames_t.with_label_values(&[&self.cid]).get(),
cnt
);
assert_eq!(
self.m.rdata_frames_t.with_label_values(&[&self.cid]).get(),
cnt
);
pub(crate) fn assert_data_frames(&mut self, cnt: u64) {
assert_eq!(self.sdata_frames_t.get(), cnt);
assert_eq!(self.rdata_frames_t.get(), cnt);
}
#[cfg(test)]
pub(crate) fn assert_data_frames_bytes(&self, bytes: u64) {
assert_eq!(
self.m.sdata_frames_b.with_label_values(&[&self.cid]).get(),
bytes
);
assert_eq!(
self.m.rdata_frames_b.with_label_values(&[&self.cid]).get(),
bytes
);
pub(crate) fn assert_data_frames_bytes(&mut self, bytes: u64) {
assert_eq!(self.sdata_frames_b.get(), bytes);
assert_eq!(self.rdata_frames_b.get(), bytes);
}
}
@ -378,29 +342,29 @@ impl std::fmt::Debug for ProtocolMetrics {
#[cfg(not(feature = "metrics"))]
impl ProtocolMetricCache {
pub(crate) fn smsg_it(&self, _sid: Sid) {}
pub(crate) fn smsg_it(&mut self, _sid: Sid) {}
pub(crate) fn smsg_ib(&self, _sid: Sid, _b: u64) {}
pub(crate) fn smsg_ib(&mut self, _sid: Sid, _b: u64) {}
pub(crate) fn smsg_ot(&self, _sid: Sid, _reason: RemoveReason) {}
pub(crate) fn smsg_ot(&mut self, _sid: Sid, _reason: RemoveReason) {}
pub(crate) fn smsg_ob(&self, _sid: Sid, _reason: RemoveReason, _b: u64) {}
pub(crate) fn smsg_ob(&mut self, _sid: Sid, _reason: RemoveReason, _b: u64) {}
pub(crate) fn sdata_frames_t(&self) {}
pub(crate) fn sdata_frames_t(&mut self) {}
pub(crate) fn sdata_frames_b(&self, _b: u64) {}
pub(crate) fn sdata_frames_b(&mut self, _b: u64) {}
pub(crate) fn rmsg_it(&self, _sid: Sid) {}
pub(crate) fn rmsg_it(&mut self, _sid: Sid) {}
pub(crate) fn rmsg_ib(&self, _sid: Sid, _b: u64) {}
pub(crate) fn rmsg_ib(&mut self, _sid: Sid, _b: u64) {}
pub(crate) fn rmsg_ot(&self, _sid: Sid, _reason: RemoveReason) {}
pub(crate) fn rmsg_ot(&mut self, _sid: Sid, _reason: RemoveReason) {}
pub(crate) fn rmsg_ob(&self, _sid: Sid, _reason: RemoveReason, _b: u64) {}
pub(crate) fn rmsg_ob(&mut self, _sid: Sid, _reason: RemoveReason, _b: u64) {}
pub(crate) fn rdata_frames_t(&self) {}
pub(crate) fn rdata_frames_t(&mut self) {}
pub(crate) fn rdata_frames_b(&self, _b: u64) {}
pub(crate) fn rdata_frames_b(&mut self, _b: u64) {}
}
impl RemoveReason {
@ -411,4 +375,12 @@ impl RemoveReason {
RemoveReason::Finished => "Finished",
}
}
#[cfg(feature = "metrics")]
fn i(&self) -> usize {
match self {
RemoveReason::Dropped => 0,
RemoveReason::Finished => 1,
}
}
}

View File

@ -9,7 +9,10 @@ use crate::{
};
use async_trait::async_trait;
use std::time::{Duration, Instant};
#[cfg(feature = "trace_pedantic")]
use tracing::trace;
#[derive(Debug)]
pub /* should be private */ enum MpscMsg {
Event(ProtocolEvent),
InitFrame(InitFrame),
@ -59,7 +62,11 @@ impl<D> SendProtocol for MpscSendProtcol<D>
where
D: UnreliableDrain<DataFormat = MpscMsg>,
{
fn notify_from_recv(&mut self, _event: ProtocolEvent) {}
async fn send(&mut self, event: ProtocolEvent) -> Result<(), ProtocolError> {
#[cfg(feature = "trace_pedantic")]
trace!(?event, "send");
match &event {
ProtocolEvent::Message {
buffer,
@ -68,10 +75,8 @@ where
} => {
let sid = *sid;
let bytes = buffer.data.len() as u64;
self.metrics.smsg_it(sid);
self.metrics.smsg_ib(sid, bytes);
let r = self.drain.send(MpscMsg::Event(event)).await;
self.metrics.smsg_ot(sid, RemoveReason::Finished);
self.metrics.smsg_ob(sid, RemoveReason::Finished, bytes);
r
},
@ -88,7 +93,10 @@ where
S: UnreliableSink<DataFormat = MpscMsg>,
{
async fn recv(&mut self) -> Result<ProtocolEvent, ProtocolError> {
match self.sink.recv().await? {
let event = self.sink.recv().await?;
#[cfg(feature = "trace_pedantic")]
trace!(?event, "recv");
match event {
MpscMsg::Event(e) => {
if let ProtocolEvent::Message {
buffer,
@ -98,9 +106,7 @@ where
{
let sid = *sid;
let bytes = buffer.data.len() as u64;
self.metrics.rmsg_it(sid);
self.metrics.rmsg_ib(sid, bytes);
self.metrics.rmsg_ot(sid, RemoveReason::Finished);
self.metrics.rmsg_ob(sid, RemoveReason::Finished, bytes);
}
Ok(e)

View File

@ -4,14 +4,18 @@ use crate::{
metrics::{ProtocolMetricCache, RemoveReason},
types::{Bandwidth, Mid, Prio, Promises, Sid},
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
time::Duration,
};
#[derive(Debug)]
struct StreamInfo {
pub(crate) guaranteed_bandwidth: Bandwidth,
pub(crate) prio: Prio,
pub(crate) promises: Promises,
pub(crate) messages: Vec<OutgoingMessage>,
pub(crate) messages: VecDeque<OutgoingMessage>,
}
/// Responsible for queueing messages.
@ -47,7 +51,7 @@ impl PrioManager {
guaranteed_bandwidth,
prio,
promises,
messages: vec![],
messages: VecDeque::new(),
});
}
@ -68,7 +72,7 @@ impl PrioManager {
.get_mut(&sid)
.unwrap()
.messages
.push(OutgoingMessage::new(buffer, mid, sid));
.push_back(OutgoingMessage::new(buffer, mid, sid));
}
/// bandwidth might be extended, as for technical reasons
@ -79,7 +83,7 @@ impl PrioManager {
let mut frames = vec![];
let mut prios = [0u64; (Self::HIGHEST_PRIO + 1) as usize];
let metrics = &self.metrics;
let metrics = &mut self.metrics;
let mut process_stream =
|stream: &mut StreamInfo, mut bandwidth: i64, cur_bytes: &mut u64| {
@ -103,9 +107,8 @@ impl PrioManager {
//cleanup
for i in finished.iter().rev() {
let msg = stream.messages.remove(*i);
let msg = stream.messages.remove(*i).unwrap();
let (sid, bytes) = msg.get_sid_len();
metrics.smsg_ot(sid, RemoveReason::Finished);
metrics.smsg_ob(sid, RemoveReason::Finished, bytes);
}
};

View File

@ -9,21 +9,25 @@ use crate::{
ProtocolError, RecvProtocol, SendProtocol,
};
use async_trait::async_trait;
use bytes::BytesMut;
use std::{
collections::{HashMap, VecDeque},
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
use tracing::info;
#[cfg(feature = "trace_pedantic")]
use tracing::trace;
#[derive(Debug)]
pub struct TcpSendProtcol<D>
where
D: UnreliableDrain<DataFormat = Vec<u8>>,
D: UnreliableDrain<DataFormat = BytesMut>,
{
buffer: Vec<u8>,
buffer: BytesMut,
store: PrioManager,
closing_streams: Vec<Sid>,
notify_closing_streams: Vec<Sid>,
pending_shutdown: bool,
drain: D,
last: Instant,
@ -33,9 +37,9 @@ where
#[derive(Debug)]
pub struct TcpRecvProtcol<S>
where
S: UnreliableSink<DataFormat = Vec<u8>>,
S: UnreliableSink<DataFormat = BytesMut>,
{
buffer: VecDeque<u8>,
buffer: BytesMut,
incoming: HashMap<Mid, IncomingMsg>,
sink: S,
metrics: ProtocolMetricCache,
@ -43,13 +47,14 @@ where
impl<D> TcpSendProtcol<D>
where
D: UnreliableDrain<DataFormat = Vec<u8>>,
D: UnreliableDrain<DataFormat = BytesMut>,
{
pub fn new(drain: D, metrics: ProtocolMetricCache) -> Self {
Self {
buffer: vec![0u8; 1500],
buffer: BytesMut::new(),
store: PrioManager::new(metrics.clone()),
closing_streams: vec![],
notify_closing_streams: vec![],
pending_shutdown: false,
drain,
last: Instant::now(),
@ -60,11 +65,11 @@ where
impl<S> TcpRecvProtcol<S>
where
S: UnreliableSink<DataFormat = Vec<u8>>,
S: UnreliableSink<DataFormat = BytesMut>,
{
pub fn new(sink: S, metrics: ProtocolMetricCache) -> Self {
Self {
buffer: VecDeque::new(),
buffer: BytesMut::new(),
incoming: HashMap::new(),
sink,
metrics,
@ -75,9 +80,9 @@ where
#[async_trait]
impl<D> SendProtocol for TcpSendProtcol<D>
where
D: UnreliableDrain<DataFormat = Vec<u8>>,
D: UnreliableDrain<DataFormat = BytesMut>,
{
async fn send(&mut self, event: ProtocolEvent) -> Result<(), ProtocolError> {
fn notify_from_recv(&mut self, event: ProtocolEvent) {
match event {
ProtocolEvent::OpenStream {
sid,
@ -87,31 +92,54 @@ where
} => {
self.store
.open_stream(sid, prio, promises, guaranteed_bandwidth);
let frame = event.to_frame();
let (s, _) = frame.to_bytes(&mut self.buffer);
self.drain.send(self.buffer[..s].to_vec()).await?;
},
ProtocolEvent::CloseStream { sid } => {
if !self.store.try_close_stream(sid) {
#[cfg(feature = "trace_pedantic")]
trace!(?sid, "hold back notify close stream");
self.notify_closing_streams.push(sid);
}
},
_ => {},
}
}
async fn send(&mut self, event: ProtocolEvent) -> Result<(), ProtocolError> {
#[cfg(feature = "trace_pedantic")]
trace!(?event, "send");
match event {
ProtocolEvent::OpenStream {
sid,
prio,
promises,
guaranteed_bandwidth,
} => {
self.store
.open_stream(sid, prio, promises, guaranteed_bandwidth);
event.to_frame().to_bytes(&mut self.buffer);
self.drain.send(self.buffer.split()).await?;
},
ProtocolEvent::CloseStream { sid } => {
if self.store.try_close_stream(sid) {
let frame = event.to_frame();
let (s, _) = frame.to_bytes(&mut self.buffer);
self.drain.send(self.buffer[..s].to_vec()).await?;
event.to_frame().to_bytes(&mut self.buffer);
self.drain.send(self.buffer.split()).await?;
} else {
#[cfg(feature = "trace_pedantic")]
trace!(?sid, "hold back close stream");
self.closing_streams.push(sid);
}
},
ProtocolEvent::Shutdown => {
if self.store.is_empty() {
tracing::error!(?event, "send frame");
let frame = event.to_frame();
let (s, _) = frame.to_bytes(&mut self.buffer);
self.drain.send(self.buffer[..s].to_vec()).await?;
event.to_frame().to_bytes(&mut self.buffer);
self.drain.send(self.buffer.split()).await?;
} else {
#[cfg(feature = "trace_pedantic")]
trace!("hold back shutdown");
self.pending_shutdown = true;
}
},
ProtocolEvent::Message { buffer, mid, sid } => {
self.metrics.smsg_it(sid);
self.metrics.smsg_ib(sid, buffer.data.len() as u64);
self.store.add(buffer, mid, sid);
},
@ -128,30 +156,43 @@ where
data,
} = &frame
{
self.metrics.sdata_frames_t();
self.metrics.sdata_frames_b(data.len() as u64);
}
let (s, _) = frame.to_bytes(&mut self.buffer);
self.drain.send(self.buffer[..s].to_vec()).await?;
tracing::warn!("send data frame, woop");
frame.to_bytes(&mut self.buffer);
self.drain.send(self.buffer.split()).await?;
}
let mut finished_streams = vec![];
for (i, sid) in self.closing_streams.iter().enumerate() {
if self.store.try_close_stream(*sid) {
let frame = ProtocolEvent::CloseStream { sid: *sid }.to_frame();
let (s, _) = frame.to_bytes(&mut self.buffer);
self.drain.send(self.buffer[..s].to_vec()).await?;
for (i, &sid) in self.closing_streams.iter().enumerate() {
if self.store.try_close_stream(sid) {
#[cfg(feature = "trace_pedantic")]
trace!(?sid, "close stream, as it's now empty");
Frame::CloseStream { sid }.to_bytes(&mut self.buffer);
self.drain.send(self.buffer.split()).await?;
finished_streams.push(i);
}
}
for i in finished_streams.iter().rev() {
self.closing_streams.remove(*i);
}
let mut finished_streams = vec![];
for (i, sid) in self.notify_closing_streams.iter().enumerate() {
if self.store.try_close_stream(*sid) {
#[cfg(feature = "trace_pedantic")]
trace!(?sid, "close stream, as it's now empty");
finished_streams.push(i);
}
}
for i in finished_streams.iter().rev() {
self.notify_closing_streams.remove(*i);
}
if self.pending_shutdown && self.store.is_empty() {
tracing::error!("send shutdown frame");
let frame = ProtocolEvent::Shutdown {}.to_frame();
let (s, _) = frame.to_bytes(&mut self.buffer);
self.drain.send(self.buffer[..s].to_vec()).await?;
#[cfg(feature = "trace_pedantic")]
trace!("shutdown, as it's now empty");
Frame::Shutdown {}.to_bytes(&mut self.buffer);
self.drain.send(self.buffer.split()).await?;
self.pending_shutdown = false;
}
Ok(())
@ -173,14 +214,13 @@ struct IncomingMsg {
#[async_trait]
impl<S> RecvProtocol for TcpRecvProtcol<S>
where
S: UnreliableSink<DataFormat = Vec<u8>>,
S: UnreliableSink<DataFormat = BytesMut>,
{
async fn recv(&mut self) -> Result<ProtocolEvent, ProtocolError> {
tracing::error!(?self.buffer, "enter loop");
'outer: loop {
tracing::error!(?self.buffer, "continue loop");
while let Some(frame) = Frame::to_frame(&mut self.buffer) {
tracing::error!(?frame, "recv frame");
#[cfg(feature = "trace_pedantic")]
trace!(?frame, "recv");
match frame {
Frame::Shutdown => break 'outer Ok(ProtocolEvent::Shutdown),
Frame::OpenStream {
@ -204,7 +244,6 @@ where
length,
data: MessageBuffer { data: vec![] },
};
self.metrics.rmsg_it(sid);
self.metrics.rmsg_ib(sid, length);
self.incoming.insert(mid, m);
},
@ -213,12 +252,14 @@ where
start: _,
mut data,
} => {
self.metrics.rdata_frames_t();
self.metrics.rdata_frames_b(data.len() as u64);
let m = match self.incoming.get_mut(&mid) {
Some(m) => m,
None => {
info!("protocol violation by remote side: send Data before Header");
info!(
?mid,
"protocol violation by remote side: send Data before Header"
);
break 'outer Err(ProtocolError::Closed);
},
};
@ -227,7 +268,6 @@ where
// finished, yay
drop(m);
let m = self.incoming.remove(&mid).unwrap();
self.metrics.rmsg_ot(m.sid, RemoveReason::Finished);
self.metrics.rmsg_ob(
m.sid,
RemoveReason::Finished,
@ -242,13 +282,8 @@ where
},
};
}
tracing::error!(?self.buffer, "receiving on tcp sink");
let chunk = self.sink.recv().await?;
self.buffer.reserve(chunk.len());
for b in chunk {
self.buffer.push_back(b);
}
tracing::error!(?self.buffer,"receiving on tcp sink done");
self.buffer.extend_from_slice(&chunk);
}
}
}
@ -256,12 +291,11 @@ where
#[async_trait]
impl<D> ReliableDrain for TcpSendProtcol<D>
where
D: UnreliableDrain<DataFormat = Vec<u8>>,
D: UnreliableDrain<DataFormat = BytesMut>,
{
async fn send(&mut self, frame: InitFrame) -> Result<(), ProtocolError> {
let mut buffer = vec![0u8; 1500];
let s = frame.to_bytes(&mut buffer);
buffer.truncate(s);
let mut buffer = BytesMut::with_capacity(500);
frame.to_bytes(&mut buffer);
self.drain.send(buffer).await
}
}
@ -269,22 +303,13 @@ where
#[async_trait]
impl<S> ReliableSink for TcpRecvProtcol<S>
where
S: UnreliableSink<DataFormat = Vec<u8>>,
S: UnreliableSink<DataFormat = BytesMut>,
{
async fn recv(&mut self) -> Result<InitFrame, ProtocolError> {
while self.buffer.len() < 100 {
let chunk = self.sink.recv().await?;
self.buffer.reserve(chunk.len());
for b in chunk {
self.buffer.push_back(b);
}
let todo_use_bytes_instead = self.buffer.iter().map(|b| *b).collect();
if let Some(frame) = InitFrame::to_frame(todo_use_bytes_instead) {
match frame {
InitFrame::Handshake { .. } => self.buffer.drain(.. InitFrame::HANDSHAKE_CNS + 1),
InitFrame::Init { .. } => self.buffer.drain(.. InitFrame::INIT_CNS + 1),
InitFrame::Raw { .. } => self.buffer.drain(.. InitFrame::RAW_CNS + 1),
};
self.buffer.extend_from_slice(&chunk);
if let Some(frame) = InitFrame::to_frame(&mut self.buffer) {
return Ok(frame);
}
}
@ -303,11 +328,11 @@ mod test_utils {
use async_channel::*;
pub struct TcpDrain {
pub sender: Sender<Vec<u8>>,
pub sender: Sender<BytesMut>,
}
pub struct TcpSink {
pub receiver: Receiver<Vec<u8>>,
pub receiver: Receiver<BytesMut>,
}
/// emulate Tcp protocol on Channels
@ -334,7 +359,7 @@ mod test_utils {
#[async_trait]
impl UnreliableDrain for TcpDrain {
type DataFormat = Vec<u8>;
type DataFormat = BytesMut;
async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> {
self.sender
@ -346,7 +371,7 @@ mod test_utils {
#[async_trait]
impl UnreliableSink for TcpSink {
type DataFormat = Vec<u8>;
type DataFormat = BytesMut;
async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError> {
self.receiver
@ -365,6 +390,7 @@ mod tests {
types::{Pid, Promises, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2},
InitProtocol, MessageBuffer, ProtocolEvent, RecvProtocol, SendProtocol,
};
use bytes::BytesMut;
use std::{sync::Arc, time::Duration};
#[tokio::test]
@ -431,7 +457,7 @@ mod tests {
#[tokio::test]
async fn send_long_msg() {
let metrics =
let mut metrics =
ProtocolMetricCache::new("long_tcp", Arc::new(ProtocolMetrics::new().unwrap()));
let sid = Sid::new(1);
let [p1, p2] = tcp_bound(10000, Some(metrics.clone()));
@ -538,39 +564,36 @@ mod tests {
const DATA1: &[u8; 69] =
b"We need to make sure that its okay to send OPEN_STREAM and DATA_HEAD ";
const DATA2: &[u8; 95] = b"in one chunk and (DATA and CLOSE_STREAM) in the second chunk. and then keep the connection open";
let mut buf = vec![0u8; 1500];
let event = ProtocolEvent::OpenStream {
let mut bytes = BytesMut::with_capacity(1500);
use crate::frame::Frame;
Frame::OpenStream {
sid,
prio: 5u8,
promises: Promises::COMPRESSED,
guaranteed_bandwidth: 0,
};
let (i, _) = event.to_frame().to_bytes(&mut buf);
let (i2, _) = crate::frame::Frame::DataHeader {
}
.to_bytes(&mut bytes);
Frame::DataHeader {
mid: 99,
sid,
length: (DATA1.len() + DATA2.len()) as u64,
}
.to_bytes(&mut buf[i..]);
buf.truncate(i + i2);
s.send(buf).await.unwrap();
.to_bytes(&mut bytes);
s.send(bytes.split()).await.unwrap();
let mut buf = vec![0u8; 1500];
let (i, _) = crate::frame::Frame::Data {
Frame::Data {
mid: 99,
start: 0,
data: DATA1.to_vec(),
}
.to_bytes(&mut buf);
let (i2, _) = crate::frame::Frame::Data {
.to_bytes(&mut bytes);
Frame::Data {
mid: 99,
start: DATA1.len() as u64,
data: DATA2.to_vec(),
}
.to_bytes(&mut buf[i..]);
let (i3, _) = crate::frame::Frame::CloseStream { sid }.to_bytes(&mut buf[i + i2..]);
buf.truncate(i + i2 + i3);
s.send(buf).await.unwrap();
.to_bytes(&mut bytes);
Frame::CloseStream { sid }.to_bytes(&mut bytes);
s.send(bytes.split()).await.unwrap();
let e = r.recv().await.unwrap();
assert!(matches!(e, ProtocolEvent::OpenStream { .. }));
@ -581,4 +604,58 @@ mod tests {
let e = r.recv().await.unwrap();
assert!(matches!(e, ProtocolEvent::CloseStream { .. }));
}
#[tokio::test]
#[should_panic]
async fn send_on_stream_from_remote_without_notify() {
//remote opens stream
//we send on it
let [mut p1, mut p2] = tcp_bound(10, None);
let event = ProtocolEvent::OpenStream {
sid: Sid::new(10),
prio: 3u8,
promises: Promises::ORDERED,
guaranteed_bandwidth: 1_000_000,
};
p1.0.send(event).await.unwrap();
let _ = p2.1.recv().await.unwrap();
let event = ProtocolEvent::Message {
sid: Sid::new(10),
mid: 0,
buffer: Arc::new(MessageBuffer {
data: vec![188u8; 600],
}),
};
p2.0.send(event.clone()).await.unwrap();
p2.0.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
let e = p1.1.recv().await.unwrap();
assert_eq!(event, e);
}
#[tokio::test]
async fn send_on_stream_from_remote() {
//remote opens stream
//we send on it
let [mut p1, mut p2] = tcp_bound(10, None);
let event = ProtocolEvent::OpenStream {
sid: Sid::new(10),
prio: 3u8,
promises: Promises::ORDERED,
guaranteed_bandwidth: 1_000_000,
};
p1.0.send(event).await.unwrap();
let e = p2.1.recv().await.unwrap();
p2.0.notify_from_recv(e);
let event = ProtocolEvent::Message {
sid: Sid::new(10),
mid: 0,
buffer: Arc::new(MessageBuffer {
data: vec![188u8; 600],
}),
};
p2.0.send(event.clone()).await.unwrap();
p2.0.flush(1_000_000, Duration::from_secs(1)).await.unwrap();
let e = p1.1.recv().await.unwrap();
assert_eq!(event, e);
}
}

View File

@ -1,4 +1,5 @@
use bitflags::bitflags;
use bytes::{Buf, BufMut, BytesMut};
use rand::Rng;
pub type Mid = u64;
@ -88,25 +89,19 @@ impl Pid {
}
}
pub(crate) fn to_le_bytes(&self) -> [u8; 16] { self.internal.to_le_bytes() }
pub(crate) fn from_le_bytes(bytes: [u8; 16]) -> Self {
pub(crate) fn from_bytes(bytes: &mut BytesMut) -> Self {
Self {
internal: u128::from_le_bytes(bytes),
internal: bytes.get_u128_le(),
}
}
pub(crate) fn to_bytes(&self, bytes: &mut BytesMut) { bytes.put_u128_le(self.internal) }
}
impl Sid {
pub const fn new(internal: u64) -> Self { Self { internal } }
pub(crate) fn to_le_bytes(&self) -> [u8; 8] { self.internal.to_le_bytes() }
pub(crate) fn from_le_bytes(bytes: [u8; 8]) -> Self {
Self {
internal: u64::from_le_bytes(bytes),
}
}
}
impl std::fmt::Debug for Pid {

View File

@ -9,7 +9,7 @@ use crate::{
};
#[cfg(feature = "compression")]
use lz_fear::raw::DecodeError;
use network_protocol::{Bandwidth, MessageBuffer, Mid, Pid, Prio, Promises, Sid};
use network_protocol::{Bandwidth, MessageBuffer, Pid, Prio, Promises, Sid};
#[cfg(feature = "metrics")]
use prometheus::Registry;
use serde::{de::DeserializeOwned, Serialize};
@ -28,7 +28,6 @@ use tokio::{
sync::{mpsc, oneshot, Mutex},
};
use tracing::*;
use tracing_futures::Instrument;
type A2sDisconnect = Arc<Mutex<Option<mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>>>>;
@ -70,9 +69,9 @@ pub struct Participant {
/// [`opened`]: Participant::opened
#[derive(Debug)]
pub struct Stream {
pid: Pid,
local_pid: Pid,
remote_pid: Pid,
sid: Sid,
mid: Mid,
prio: Prio,
promises: Promises,
guaranteed_bandwidth: Bandwidth,
@ -239,7 +238,8 @@ impl Network {
#[cfg(feature = "metrics")] registry: Option<&Registry>,
) -> Self {
let p = participant_id;
debug!(?p, "Starting Network");
let span = tracing::info_span!("network", ?p);
span.in_scope(|| trace!("Starting Network"));
let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) =
Scheduler::new(
participant_id,
@ -247,14 +247,14 @@ impl Network {
#[cfg(feature = "metrics")]
registry,
);
runtime.spawn(async move {
trace!(?p, "Starting scheduler in own thread");
scheduler
.run()
.instrument(tracing::info_span!("scheduler", ?p))
.await;
trace!(?p, "Stopping scheduler and his own thread");
});
runtime.spawn(
async move {
trace!("Starting scheduler in own thread");
scheduler.run().await;
trace!("Stopping scheduler and his own thread");
}
.instrument(tracing::info_span!("network", ?p)),
);
Self {
local_pid: participant_id,
runtime,
@ -295,6 +295,7 @@ impl Network {
/// ```
///
/// [`connected`]: Network::connected
#[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
pub async fn listen(&self, address: ProtocolAddr) -> Result<(), NetworkError> {
let (s2a_result_s, s2a_result_r) = oneshot::channel::<tokio::io::Result<()>>();
debug!(?address, "listening on address");
@ -350,6 +351,7 @@ impl Network {
///
/// [`Streams`]: crate::api::Stream
/// [`ProtocolAddres`]: crate::api::ProtocolAddr
#[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
pub async fn connect(&self, address: ProtocolAddr) -> Result<Participant, NetworkError> {
let (pid_sender, pid_receiver) = oneshot::channel::<io::Result<Participant>>();
debug!(?address, "Connect to address");
@ -361,15 +363,12 @@ impl Network {
Ok(p) => p,
Err(e) => return Err(NetworkError::ConnectFailed(e)),
};
let pid = participant.remote_pid;
debug!(
?pid,
"Received Participant id from remote and return to user"
);
let remote_pid = participant.remote_pid;
trace!(?remote_pid, "connected");
self.participant_disconnect_sender
.lock()
.await
.insert(pid, Arc::clone(&participant.a2s_disconnect_s));
.insert(remote_pid, Arc::clone(&participant.a2s_disconnect_s));
Ok(participant)
}
@ -406,6 +405,7 @@ impl Network {
///
/// [`Streams`]: crate::api::Stream
/// [`listen`]: crate::api::Network::listen
#[instrument(name="network", skip(self), fields(p = %self.local_pid))]
pub async fn connected(&self) -> Result<Participant, NetworkError> {
let participant = self.connected_receiver.lock().await.recv().await?;
self.participant_disconnect_sender.lock().await.insert(
@ -475,6 +475,7 @@ impl Participant {
/// ```
///
/// [`Streams`]: crate::api::Stream
#[instrument(name="network", skip(self, prio, promises), fields(p = %self.local_pid))]
pub async fn open(&self, prio: u8, promises: Promises) -> Result<Stream, ParticipantError> {
let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel::<Stream>();
if let Err(e) = self.a2b_open_stream_s.lock().await.send((
@ -489,11 +490,11 @@ impl Participant {
match p2a_return_stream_r.await {
Ok(stream) => {
let sid = stream.sid;
debug!(?sid, ?self.remote_pid, "opened stream");
trace!(?sid, "opened stream");
Ok(stream)
},
Err(_) => {
debug!(?self.remote_pid, "p2a_return_stream_r failed, closing participant");
debug!("p2a_return_stream_r failed, closing participant");
Err(ParticipantError::ParticipantDisconnected)
},
}
@ -532,15 +533,16 @@ impl Participant {
/// [`Streams`]: crate::api::Stream
/// [`connected`]: Network::connected
/// [`open`]: Participant::open
#[instrument(name="network", skip(self), fields(p = %self.local_pid))]
pub async fn opened(&self) -> Result<Stream, ParticipantError> {
match self.b2a_stream_opened_r.lock().await.recv().await {
Some(stream) => {
let sid = stream.sid;
debug!(?sid, ?self.remote_pid, "Receive opened stream");
debug!(?sid, "Receive opened stream");
Ok(stream)
},
None => {
debug!(?self.remote_pid, "stream_opened_receiver failed, closing participant");
debug!("stream_opened_receiver failed, closing participant");
Err(ParticipantError::ParticipantDisconnected)
},
}
@ -589,10 +591,10 @@ impl Participant {
/// ```
///
/// [`Streams`]: crate::api::Stream
#[instrument(name="network", skip(self), fields(p = %self.local_pid))]
pub async fn disconnect(self) -> Result<(), ParticipantError> {
// Remove, Close and try_unwrap error when unwrap fails!
let pid = self.remote_pid;
debug!(?pid, "Closing participant from network");
debug!("Closing participant from network");
//Streams will be closed by BParticipant
match self.a2s_disconnect_s.lock().await.take() {
@ -601,14 +603,14 @@ impl Participant {
// Participant is connecting to Scheduler here, not as usual
// Participant<->BParticipant
a2s_disconnect_s
.send((pid, (Duration::from_secs(120), finished_sender)))
.send((self.remote_pid, (Duration::from_secs(120), finished_sender)))
.expect("Something is wrong in internal scheduler coding");
match finished_receiver.await {
Ok(res) => {
match res {
Ok(()) => trace!(?pid, "Participant is now closed"),
Ok(()) => trace!("Participant is now closed"),
Err(ref e) => {
trace!(?pid, ?e, "Error occurred during shutdown of participant")
trace!(?e, "Error occurred during shutdown of participant")
},
};
res
@ -616,7 +618,6 @@ impl Participant {
Err(e) => {
//this is a bug. but as i am Participant i can't destroy the network
error!(
?pid,
?e,
"Failed to get a message back from the scheduler, seems like the \
network is already closed"
@ -642,7 +643,8 @@ impl Participant {
impl Stream {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
pid: Pid,
local_pid: Pid,
remote_pid: Pid,
sid: Sid,
prio: Prio,
promises: Promises,
@ -653,9 +655,9 @@ impl Stream {
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
) -> Self {
Self {
pid,
local_pid,
remote_pid,
sid,
mid: 0,
prio,
promises,
guaranteed_bandwidth,
@ -779,7 +781,6 @@ impl Stream {
message.verify(&self);
self.a2b_msg_s
.send((self.sid, Arc::clone(&message.buffer)))?;
self.mid += 1;
Ok(())
}
@ -942,13 +943,10 @@ impl core::cmp::PartialEq for Participant {
}
impl Drop for Network {
#[instrument(name="network", skip(self), fields(p = %self.local_pid))]
fn drop(&mut self) {
let pid = self.local_pid;
debug!(?pid, "Shutting down Network");
trace!(
?pid,
"Shutting down Participants of Network, while we still have metrics"
);
debug!("Shutting down Network");
trace!("Shutting down Participants of Network, while we still have metrics");
let mut finished_receiver_list = vec![];
if tokio::runtime::Handle::try_current().is_ok() {
@ -991,25 +989,25 @@ impl Drop for Network {
}
});
});
trace!(?pid, "Participants have shut down!");
trace!(?pid, "Shutting down Scheduler");
trace!("Participants have shut down!");
trace!("Shutting down Scheduler");
self.shutdown_sender
.take()
.unwrap()
.send(())
.expect("Scheduler is closed, but nobody other should be able to close it");
debug!(?pid, "Network has shut down");
debug!("Network has shut down");
}
}
impl Drop for Participant {
#[instrument(name="remote", skip(self), fields(p = %self.remote_pid))]
#[instrument(name="network", skip(self), fields(p = %self.local_pid))]
fn drop(&mut self) {
use tokio::sync::oneshot::error::TryRecvError;
// ignore closed, as we need to send it even though we disconnected the
// participant from network
let pid = self.remote_pid;
debug!(?pid, "Shutting down Participant");
debug!("Shutting down Participant");
match self
.a2s_disconnect_s
@ -1017,25 +1015,27 @@ impl Drop for Participant {
.expect("Participant in use while beeing dropped")
.take()
{
None => trace!(
?pid,
"Participant has been shutdown cleanly, no further waiting is required!"
),
None => info!("Participant already has been shutdown gracefully"),
Some(a2s_disconnect_s) => {
debug!(?pid, "Disconnect from Scheduler");
debug!("Disconnect from Scheduler");
let (finished_sender, mut finished_receiver) = oneshot::channel();
a2s_disconnect_s
.send((self.remote_pid, (Duration::from_secs(120), finished_sender)))
.expect("Something is wrong in internal scheduler coding");
loop {
match finished_receiver.try_recv() {
Ok(Ok(())) => break,
Ok(Err(e)) => error!(
?pid,
?e,
"Error while dropping the participant, couldn't send all outgoing \
messages, dropping remaining"
),
Ok(Ok(())) => {
info!("Participant dropped gracefully");
break;
},
Ok(Err(e)) => {
error!(
?e,
"Error while dropping the participant, couldn't send all outgoing \
messages, dropping remaining"
);
break;
},
Err(TryRecvError::Closed) => {
panic!("Something is wrong in internal scheduler/participant coding")
},
@ -1047,17 +1047,17 @@ impl Drop for Participant {
}
},
}
debug!(?pid, "Participant dropped");
}
}
impl Drop for Stream {
#[instrument(name="remote", skip(self), fields(p = %self.remote_pid))]
#[instrument(name="network", skip(self), fields(p = %self.local_pid))]
fn drop(&mut self) {
// send if closed is unnecessary but doesn't hurt, we must not crash
if !self.send_closed.load(Ordering::Relaxed) {
let sid = self.sid;
let pid = self.pid;
debug!(?pid, ?sid, "Shutting down Stream");
debug!(?sid, "Shutting down Stream");
if let Err(e) = self.a2b_close_stream_s.take().unwrap().send(self.sid) {
debug!(
?e,
@ -1066,8 +1066,7 @@ impl Drop for Stream {
}
} else {
let sid = self.sid;
let pid = self.pid;
trace!(?pid, ?sid, "Stream Drop not needed");
trace!(?sid, "Stream Drop not needed");
}
}
}

View File

@ -1,4 +1,5 @@
use async_trait::async_trait;
use bytes::BytesMut;
use network_protocol::{
InitProtocolError, MpscMsg, MpscRecvProtcol, MpscSendProtcol, Pid, ProtocolError,
ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtcol, TcpSendProtcol,
@ -42,7 +43,13 @@ impl Protocols {
let metrics = ProtocolMetricCache {};
let sp = TcpSendProtcol::new(TcpDrain { half: w }, metrics.clone());
let rp = TcpRecvProtcol::new(TcpSink { half: r }, metrics.clone());
let rp = TcpRecvProtcol::new(
TcpSink {
half: r,
buffer: BytesMut::new(),
},
metrics.clone(),
);
Protocols::Tcp((sp, rp))
}
@ -86,6 +93,13 @@ impl network_protocol::InitProtocol for Protocols {
#[async_trait]
impl network_protocol::SendProtocol for SendProtocols {
fn notify_from_recv(&mut self, event: ProtocolEvent) {
match self {
SendProtocols::Tcp(s) => s.notify_from_recv(event),
SendProtocols::Mpsc(s) => s.notify_from_recv(event),
}
}
async fn send(&mut self, event: ProtocolEvent) -> Result<(), ProtocolError> {
match self {
SendProtocols::Tcp(s) => s.send(event).await,
@ -121,14 +135,14 @@ pub struct TcpDrain {
#[derive(Debug)]
pub struct TcpSink {
half: OwnedReadHalf,
buffer: BytesMut,
}
#[async_trait]
impl UnreliableDrain for TcpDrain {
type DataFormat = Vec<u8>;
type DataFormat = BytesMut;
async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> {
//self.half.recv
match self.half.write_all(&data).await {
Ok(()) => Ok(()),
Err(_) => Err(ProtocolError::Closed),
@ -138,15 +152,12 @@ impl UnreliableDrain for TcpDrain {
#[async_trait]
impl UnreliableSink for TcpSink {
type DataFormat = Vec<u8>;
type DataFormat = BytesMut;
async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError> {
let mut data = vec![0u8; 1500];
match self.half.read(&mut data).await {
Ok(n) => {
data.truncate(n);
Ok(data)
},
self.buffer.resize(1500, 0u8);
match self.half.read(&mut self.buffer).await {
Ok(n) => Ok(self.buffer.split_to(n)),
Err(_) => Err(ProtocolError::Closed),
}
}

View File

@ -200,6 +200,7 @@ mod tests {
Stream::new(
Pid::fake(0),
Pid::fake(1),
Sid::new(0),
0u8,
promises,

View File

@ -60,6 +60,7 @@ struct ShutdownInfo {
#[derive(Debug)]
pub struct BParticipant {
local_pid: Pid, //tracing
remote_pid: Pid,
remote_pid_string: String, //optimisation
offset_sid: Sid,
@ -82,6 +83,7 @@ impl BParticipant {
#[allow(clippy::type_complexity)]
pub(crate) fn new(
local_pid: Pid,
remote_pid: Pid,
offset_sid: Sid,
#[cfg(feature = "metrics")] metrics: Arc<NetworkMetrics>,
@ -106,6 +108,7 @@ impl BParticipant {
(
Self {
local_pid,
remote_pid,
remote_pid_string: remote_pid.to_string(),
offset_sid,
@ -135,6 +138,8 @@ impl BParticipant {
async_channel::unbounded::<Cid>();
let (b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_r) =
async_channel::unbounded::<Cid>();
let (b2b_notify_send_of_recv_s, b2b_notify_send_of_recv_r) =
mpsc::unbounded_channel::<ProtocolEvent>();
let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded_channel::<Sid>();
const STREAM_BOUND: usize = 10_000;
@ -142,6 +147,7 @@ impl BParticipant {
crossbeam_channel::bounded::<(Sid, Arc<MessageBuffer>)>(STREAM_BOUND);
let run_channels = self.run_channels.take().unwrap();
trace!("start all managers");
tokio::join!(
self.send_mgr(
run_channels.a2b_open_stream_r,
@ -149,18 +155,22 @@ impl BParticipant {
a2b_msg_r,
b2b_add_send_protocol_r,
b2b_close_send_protocol_r,
b2b_notify_send_of_recv_r,
b2s_prio_statistic_s,
a2b_msg_s.clone(), //self
a2b_close_stream_s.clone(), //self
),
)
.instrument(tracing::info_span!("send")),
self.recv_mgr(
run_channels.b2a_stream_opened_s,
b2b_add_recv_protocol_r,
b2b_force_close_recv_protocol_r,
b2b_close_send_protocol_s.clone(),
b2b_notify_send_of_recv_s,
a2b_msg_s.clone(), //self
a2b_close_stream_s.clone(), //self
),
)
.instrument(tracing::info_span!("recv")),
self.create_channel_mgr(
run_channels.s2b_create_channel_r,
b2b_add_send_protocol_s,
@ -182,6 +192,7 @@ impl BParticipant {
a2b_msg_r: crossbeam_channel::Receiver<(Sid, Arc<MessageBuffer>)>,
mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, SendProtocols)>,
b2b_close_send_protocol_r: async_channel::Receiver<Cid>,
mut b2b_notify_send_of_recv_r: mpsc::UnboundedReceiver<ProtocolEvent>,
_b2s_prio_statistic_s: mpsc::UnboundedSender<B2sPrioStatistic>,
a2b_msg_s: crossbeam_channel::Sender<(Sid, Arc<MessageBuffer>)>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
@ -189,27 +200,29 @@ impl BParticipant {
let mut send_protocols: HashMap<Cid, SendProtocols> = HashMap::new();
let mut interval = tokio::time::interval(Self::TICK_TIME);
let mut stream_ids = self.offset_sid;
trace!("workaround, activly wait for first protocol");
let mut fake_mid = 0; //TODO: move MID to protocol, should be inc per stream ? or ?
trace!("workaround, actively wait for first protocol");
b2b_add_protocol_r
.recv()
.await
.map(|(c, p)| send_protocols.insert(c, p));
trace!("Start send_mgr");
loop {
let (open, close, _, addp, remp) = select!(
next = a2b_open_stream_r.recv().fuse() => (Some(next), None, None, None, None),
next = a2b_close_stream_r.recv().fuse() => (None, Some(next), None, None, None),
_ = interval.tick() => (None, None, Some(()), None, None),
next = b2b_add_protocol_r.recv().fuse() => (None, None, None, Some(next), None),
next = b2b_close_send_protocol_r.recv().fuse() => (None, None, None, None, Some(next)),
let (open, close, r_event, _, addp, remp) = select!(
n = a2b_open_stream_r.recv().fuse() => (Some(n), None, None, None, None, None),
n = a2b_close_stream_r.recv().fuse() => (None, Some(n), None, None, None, None),
n = b2b_notify_send_of_recv_r.recv().fuse() => (None, None, Some(n), None, None, None),
_ = interval.tick() => (None, None, None, Some(()), None, None),
n = b2b_add_protocol_r.recv().fuse() => (None, None, None, None, Some(n), None),
n = b2b_close_send_protocol_r.recv().fuse() => (None, None, None, None, None, Some(n)),
);
trace!(?open, ?close, ?addp, ?remp, "foobar");
addp.flatten().map(|(c, p)| send_protocols.insert(c, p));
addp.flatten().map(|(cid, p)| {
debug!(?cid, "add protocol");
send_protocols.insert(cid, p)
});
match remp {
Some(Ok(cid)) => {
trace!(?cid, "remove send protocol");
debug!(?cid, "remove protocol");
match send_protocols.remove(&cid) {
Some(mut prot) => {
trace!("blocking flush");
@ -230,15 +243,19 @@ impl BParticipant {
let active = match send_protocols.get_mut(&cid) {
Some(a) => a,
None => {
warn!("no channel arrg");
warn!("no channel");
continue;
},
};
let active_err = async {
if let Some(Some(event)) = r_event {
active.notify_from_recv(event);
}
if let Some(Some((prio, promises, guaranteed_bandwidth, return_s))) = open {
trace!(?stream_ids, "openuing some new stream");
let sid = stream_ids;
trace!(?sid, "open stream");
stream_ids += Sid::from(1);
let stream = self
.create_stream(
@ -264,25 +281,24 @@ impl BParticipant {
// get all messages and assign it to a channel
for (sid, buffer) in a2b_msg_r.try_iter() {
warn!(?sid, "sending!");
fake_mid += 1;
active
.send(ProtocolEvent::Message {
buffer,
mid: 0u64,
mid: fake_mid,
sid,
})
.await?
}
if let Some(Some(sid)) = close {
warn!(?sid, "delete_stream!");
trace!(?stream_ids, "delete stream");
self.delete_stream(sid).await;
// Fire&Forget the protocol will take care to verify that this Frame is delayed
// till the last msg was received!
active.send(ProtocolEvent::CloseStream { sid }).await?;
}
warn!("flush!");
active
.flush(1_000_000, Duration::from_secs(1) /* TODO */)
.await?; //this actually blocks, so we cant set streams whilte it.
@ -291,7 +307,7 @@ impl BParticipant {
}
.await;
if let Err(e) = active_err {
info!(?cid, ?e, "send protocol failed, shutting down channel");
info!(?cid, ?e, "protocol failed, shutting down channel");
// remote recv will now fail, which will trigger remote send which will trigger
// recv
send_protocols.remove(&cid).unwrap();
@ -308,6 +324,7 @@ impl BParticipant {
mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, RecvProtocols)>,
b2b_force_close_recv_protocol_r: async_channel::Receiver<Cid>,
b2b_close_send_protocol_s: async_channel::Sender<Cid>,
b2b_notify_send_of_recv_s: mpsc::UnboundedSender<ProtocolEvent>,
a2b_msg_s: crossbeam_channel::Sender<(Sid, Arc<MessageBuffer>)>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
) {
@ -327,13 +344,15 @@ impl BParticipant {
let remove_c = |recv_protocols: &mut HashMap<Cid, JoinHandle<()>>, cid: &Cid| {
match recv_protocols.remove(&cid) {
Some(h) => h.abort(),
Some(h) => {
h.abort();
debug!(?cid, "remove protocol");
},
None => trace!("tried to remove protocol twice"),
};
recv_protocols.is_empty()
};
trace!("Start recv_mgr");
loop {
let (event, addp, remp) = select!(
next = hacky_recv_r.recv().fuse() => (Some(next), None, None),
@ -342,6 +361,7 @@ impl BParticipant {
);
addp.map(|(cid, p)| {
debug!(?cid, "add protocol");
retrigger(cid, p, &mut recv_protocols);
});
if let Some(Ok(cid)) = remp {
@ -351,7 +371,6 @@ impl BParticipant {
}
};
warn!(?event, "recv event!");
if let Some(Some((cid, r, p))) = event {
match r {
Ok(ProtocolEvent::OpenStream {
@ -361,6 +380,7 @@ impl BParticipant {
guaranteed_bandwidth,
}) => {
trace!(?sid, "open stream");
let _ = b2b_notify_send_of_recv_s.send(r.unwrap());
let stream = self
.create_stream(
sid,
@ -376,6 +396,7 @@ impl BParticipant {
},
Ok(ProtocolEvent::CloseStream { sid }) => {
trace!(?sid, "close stream");
let _ = b2b_notify_send_of_recv_s.send(r.unwrap());
self.delete_stream(sid).await;
retrigger(cid, p, &mut recv_protocols);
},
@ -410,7 +431,7 @@ impl BParticipant {
}
},
Err(e) => {
info!(?cid, ?e, "recv protocol failed, shutting down channel");
info!(?e, ?cid, "protocol failed, shutting down channel");
if let Err(e) = b2b_close_send_protocol_s.send(cid).await {
debug!(?e, ?cid, "send_mgr was already closed simultaneously");
}
@ -433,7 +454,6 @@ impl BParticipant {
b2b_add_send_protocol_s: mpsc::UnboundedSender<(Cid, SendProtocols)>,
b2b_add_recv_protocol_s: mpsc::UnboundedSender<(Cid, RecvProtocols)>,
) {
trace!("Start create_channel_mgr");
let s2b_create_channel_r = UnboundedReceiverStream::new(s2b_create_channel_r);
s2b_create_channel_r
.for_each_concurrent(None, |(cid, _, protocol, b2s_create_channel_done_s)| {
@ -524,12 +544,8 @@ impl BParticipant {
}
}
};
trace!("Start participant_shutdown_mgr");
let (timeout_time, sender) = s2b_shutdown_bparticipant_r.await.unwrap();
debug!("participant_shutdown_mgr triggered");
debug!("Closing all streams for send");
debug!("participant_shutdown_mgr triggered. Closing all streams for send");
{
let lock = self.streams.read().await;
for si in lock.values() {
@ -632,6 +648,7 @@ impl BParticipant {
.with_label_values(&[&self.remote_pid_string])
.inc();
Stream::new(
self.local_pid,
self.remote_pid,
sid,
prio,
@ -676,11 +693,12 @@ mod tests {
s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
) = runtime_clone.block_on(async move {
let pid = Pid::fake(1);
let local_pid = Pid::fake(0);
let remote_pid = Pid::fake(1);
let sid = Sid::new(1000);
let metrics = Arc::new(NetworkMetrics::new(&pid).unwrap());
let metrics = Arc::new(NetworkMetrics::new(&local_pid).unwrap());
BParticipant::new(pid, sid, Arc::clone(&metrics))
BParticipant::new(local_pid, remote_pid, sid, Arc::clone(&metrics))
});
let handle = runtime_clone.spawn(bparticipant.run(b2s_prio_statistic_s));

View File

@ -6,7 +6,7 @@ use crate::{
participant::{B2sPrioStatistic, BParticipant, S2bCreateChannel, S2bShutdownBparticipant},
};
use futures_util::{FutureExt, StreamExt};
use network_protocol::Pid;
use network_protocol::{MpscMsg, Pid};
#[cfg(feature = "metrics")]
use prometheus::Registry;
use rand::Rng;
@ -26,16 +26,21 @@ use tokio::{
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
use tracing_futures::Instrument;
/// Naming of Channels `x2x`
/// - a: api
/// - s: scheduler
/// - b: bparticipant
/// - p: prios
/// - r: protocol
/// - w: wire
/// - c: channel/handshake
// Naming of Channels `x2x`
// - a: api
// - s: scheduler
// - b: bparticipant
// - p: prios
// - r: protocol
// - w: wire
// - c: channel/handshake
lazy_static::lazy_static! {
static ref MPSC_POOL: Mutex<HashMap<u64, mpsc::UnboundedSender<(mpsc::Sender<MpscMsg>, oneshot::Sender<mpsc::Sender<MpscMsg>>)>>> = {
Mutex::new(HashMap::new())
};
}
#[derive(Debug)]
struct ParticipantInfo {
@ -80,6 +85,8 @@ pub struct Scheduler {
}
impl Scheduler {
const MPSC_CHANNEL_BOUND: usize = 1000;
pub fn new(
local_pid: Pid,
runtime: Arc<Runtime>,
@ -215,7 +222,35 @@ impl Scheduler {
};
info!("Connecting Tcp to: {}", stream.peer_addr().unwrap());
(Protocols::new_tcp(stream), false)
}, /* */
},
ProtocolAddr::Mpsc(addr) => {
let mpsc_s = match MPSC_POOL.lock().await.get(&addr) {
Some(s) => s.clone(),
None => {
pid_sender
.send(Err(std::io::Error::new(
std::io::ErrorKind::NotConnected,
"no mpsc listen on this addr",
)))
.unwrap();
continue;
},
};
let (remote_to_local_s, remote_to_local_r) =
mpsc::channel(Self::MPSC_CHANNEL_BOUND);
let (local_to_remote_oneshot_s, local_to_remote_oneshot_r) = oneshot::channel();
mpsc_s
.send((remote_to_local_s, local_to_remote_oneshot_s))
.unwrap();
let local_to_remote_s = local_to_remote_oneshot_r.await.unwrap();
info!(?addr, "Connecting Mpsc");
(
Protocols::new_mpsc(local_to_remote_s, remote_to_local_r),
false,
)
},
/* */
//ProtocolAddr::Udp(addr) => {
//#[cfg(feature = "metrics")]
//self.metrics
@ -367,7 +402,7 @@ impl Scheduler {
info!(
?addr,
?e,
"Listener couldn't be started due to error on tcp bind"
"Tcp bind error durin listener startup"
);
s2a_listen_result_s.send(Err(e)).unwrap();
return;
@ -390,6 +425,25 @@ impl Scheduler {
self.init_protocol(Protocols::new_tcp(stream), None, true)
.await;
}
},
ProtocolAddr::Mpsc(addr) => {
let (mpsc_s, mut mpsc_r) = mpsc::unbounded_channel();
MPSC_POOL.lock().await.insert(addr, mpsc_s);
s2a_listen_result_s.send(Ok(())).unwrap();
trace!(?addr, "Listener bound");
let mut end_receiver = s2s_stop_listening_r.fuse();
while let Some((local_to_remote_s, local_remote_to_local_s)) = select! {
next = mpsc_r.recv().fuse() => next,
_ = &mut end_receiver => None,
} {
let (remote_to_local_s, remote_to_local_r) = mpsc::channel(Self::MPSC_CHANNEL_BOUND);
local_remote_to_local_s.send(remote_to_local_s).unwrap();
info!(?addr, "Accepting Mpsc from");
self.init_protocol(Protocols::new_mpsc(local_to_remote_s, remote_to_local_r), None, true)
.await;
}
warn!("MpscStream Failed, stopping");
},/*
ProtocolAddr::Udp(addr) => {
let socket = match net::UdpSocket::bind(addr).await {
@ -522,6 +576,7 @@ impl Scheduler {
s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
) = BParticipant::new(
local_pid,
pid,
sid,
#[cfg(feature = "metrics")]
@ -545,10 +600,11 @@ impl Scheduler {
});
drop(participants);
trace!("dropped participants lock");
let p = pid;
runtime.spawn(
bparticipant
.run(participant_channels.b2s_prio_statistic_s)
.instrument(tracing::info_span!("participant", ?pid)),
.instrument(tracing::info_span!("remote", ?p)),
);
//create a new channel within BParticipant and wait for it to run
let (b2s_create_channel_done_s, b2s_create_channel_done_r) =

View File

@ -2,7 +2,7 @@ use lazy_static::*;
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicU16, Ordering},
atomic::{AtomicU16, AtomicU64, Ordering},
Arc,
},
thread,
@ -92,3 +92,12 @@ pub fn udp() -> veloren_network::ProtocolAddr {
let port = PORTS.fetch_add(1, Ordering::Relaxed);
veloren_network::ProtocolAddr::Udp(SocketAddr::from(([127, 0, 0, 1], port)))
}
#[allow(dead_code)]
pub fn mpsc() -> veloren_network::ProtocolAddr {
lazy_static! {
static ref PORTS: AtomicU64 = AtomicU64::new(5000);
}
let port = PORTS.fetch_add(1, Ordering::Relaxed);
veloren_network::ProtocolAddr::Mpsc(port)
}

View File

@ -2,7 +2,7 @@ use std::sync::Arc;
use tokio::runtime::Runtime;
use veloren_network::{NetworkError, StreamError};
mod helper;
use helper::{network_participant_stream, tcp, udp};
use helper::{mpsc, network_participant_stream, tcp, udp};
use std::io::ErrorKind;
use veloren_network::{Network, Pid, Promises, ProtocolAddr};
@ -50,6 +50,31 @@ fn stream_simple_3msg() {
}
#[test]
fn stream_simple_mpsc() {
let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(mpsc());
s1_a.send("Hello World").unwrap();
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
}
#[test]
fn stream_simple_mpsc_3msg() {
let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(mpsc());
s1_a.send("Hello World").unwrap();
s1_a.send(1337).unwrap();
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
assert_eq!(r.block_on(s1_b.recv()), Ok(1337));
s1_a.send("3rdMessage").unwrap();
assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
}
#[test]
#[ignore]
fn stream_simple_udp() {
let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp());
@ -60,6 +85,7 @@ fn stream_simple_udp() {
}
#[test]
#[ignore]
fn stream_simple_udp_3msg() {
let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp());
@ -101,6 +127,7 @@ fn tcp_and_udp_2_connections() -> std::result::Result<(), Box<dyn std::error::Er
}
#[test]
#[ignore]
fn failed_listen_on_used_ports() -> std::result::Result<(), Box<dyn std::error::Error>> {
let (_, _) = helper::setup(false, 0);
let r = Arc::new(Runtime::new().unwrap());

View File

@ -15,7 +15,7 @@ server = { package = "veloren-server", path = "../server", default-features = fa
common = { package = "veloren-common", path = "../common" }
common-net = { package = "veloren-common-net", path = "../common/net" }
tokio = { version = "1.0.1", default-features = false, features = ["rt-multi-thread"] }
tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] }
ansi-parser = "0.7"
clap = "2.33"
crossterm = "0.18"

View File

@ -19,6 +19,7 @@ pub fn init(basic: bool) {
.add_directive("uvth=warn".parse().unwrap())
.add_directive("tiny_http=warn".parse().unwrap())
.add_directive("mio::sys::windows=debug".parse().unwrap())
.add_directive("veloren_network_protocol=info".parse().unwrap())
.add_directive(
"veloren_server::persistence::character=info"
.parse()

View File

@ -28,7 +28,8 @@ futures-util = "0.3.7"
futures-executor = "0.3"
futures-timer = "3.0"
futures-channel = "0.3"
tokio = { version = "1.0.1", default-features = false, features = ["rt"] }
tokio = { version = "1", default-features = false, features = ["rt"] }
prometheus-hyper = "0.1.1"
itertools = "0.9"
lazy_static = "1.4.0"
scan_fmt = { git = "https://github.com/Imberflur/scan_fmt" }
@ -41,7 +42,6 @@ hashbrown = { version = "0.9", features = ["rayon", "serde", "nightly"] }
rayon = "1.5"
crossbeam-channel = "0.5"
prometheus = { version = "0.11", default-features = false}
tiny_http = "0.8.0"
portpicker = { git = "https://github.com/xMAC94x/portpicker-rs", rev = "df6b37872f3586ac3b21d08b56c8ec7cd92fb172" }
authc = { git = "https://gitlab.com/veloren/auth.git", rev = "bffb5181a35c19ddfd33ee0b4aedba741aafb68d" }
libsqlite3-sys = { version = "0.18", features = ["bundled"] }

View File

@ -76,12 +76,14 @@ use common_net::{
use common_sys::plugin::PluginMgr;
use common_sys::state::State;
use futures_executor::block_on;
use metrics::{PhysicsMetrics, ServerMetrics, StateTickMetrics, TickMetrics};
use metrics::{PhysicsMetrics, StateTickMetrics, TickMetrics};
use network::{Network, Pid, ProtocolAddr};
use persistence::{
character_loader::{CharacterLoader, CharacterLoaderResponseKind},
character_updater::CharacterUpdater,
};
use prometheus::Registry;
use prometheus_hyper::Server as PrometheusServer;
use specs::{join::Join, Builder, Entity as EcsEntity, RunNow, SystemData, WorldExt};
use std::{
i32,
@ -91,7 +93,7 @@ use std::{
};
#[cfg(not(feature = "worldgen"))]
use test_world::{IndexOwned, World};
use tokio::runtime::Runtime;
use tokio::{runtime::Runtime, sync::Notify};
use tracing::{debug, error, info, trace};
use uvth::{ThreadPool, ThreadPoolBuilder};
use vek::*;
@ -124,7 +126,7 @@ pub struct Server {
_runtime: Arc<Runtime>,
thread_pool: ThreadPool,
metrics: ServerMetrics,
metrics_shutdown: Arc<Notify>,
tick_metrics: TickMetrics,
state_tick_metrics: StateTickMetrics,
physics_metrics: PhysicsMetrics,
@ -350,28 +352,35 @@ impl Server {
state.ecs_mut().insert(DeletedEntities::default());
let mut metrics = ServerMetrics::new();
// register all metrics submodules here
let (tick_metrics, registry_tick) = TickMetrics::new(metrics.tick_clone())
.expect("Failed to initialize server tick metrics submodule.");
let (tick_metrics, registry_tick) =
TickMetrics::new().expect("Failed to initialize server tick metrics submodule.");
let (state_tick_metrics, registry_state) = StateTickMetrics::new().unwrap();
let (physics_metrics, registry_physics) = PhysicsMetrics::new().unwrap();
registry_chunk(&metrics.registry()).expect("failed to register chunk gen metrics");
registry_network(&metrics.registry()).expect("failed to register network request metrics");
registry_player(&metrics.registry()).expect("failed to register player metrics");
registry_tick(&metrics.registry()).expect("failed to register tick metrics");
registry_state(&metrics.registry()).expect("failed to register state metrics");
registry_physics(&metrics.registry()).expect("failed to register state metrics");
let registry = Arc::new(Registry::new());
registry_chunk(&registry).expect("failed to register chunk gen metrics");
registry_network(&registry).expect("failed to register network request metrics");
registry_player(&registry).expect("failed to register player metrics");
registry_tick(&registry).expect("failed to register tick metrics");
registry_state(&registry).expect("failed to register state metrics");
registry_physics(&registry).expect("failed to register state metrics");
let thread_pool = ThreadPoolBuilder::new()
.name("veloren-worker".to_string())
.build();
let network =
Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &metrics.registry());
metrics
.run(settings.metrics_address)
.expect("Failed to initialize server metrics submodule.");
let network = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &registry);
let metrics_shutdown = Arc::new(Notify::new());
let metrics_shutdown_clone = Arc::clone(&metrics_shutdown);
let addr = settings.metrics_address;
runtime.spawn(async move {
PrometheusServer::run(
Arc::clone(&registry),
addr,
metrics_shutdown_clone.notified(),
)
.await
});
block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?;
let connection_handler = ConnectionHandler::new(network);
@ -392,7 +401,7 @@ impl Server {
_runtime: runtime,
thread_pool,
metrics,
metrics_shutdown,
tick_metrics,
state_tick_metrics,
physics_metrics,
@ -904,7 +913,7 @@ impl Server {
.tick_time
.with_label_values(&["metrics"])
.set(end_of_server_tick.elapsed().as_nanos() as i64);
self.metrics.tick();
self.tick_metrics.tick();
// 9) Finish the tick, pass control back to the frontend.
@ -1150,6 +1159,7 @@ impl Server {
impl Drop for Server {
fn drop(&mut self) {
self.metrics_shutdown.notify_one();
self.state
.notify_players(ServerGeneral::Disconnect(DisconnectReason::Shutdown));
}

View File

@ -1,19 +1,16 @@
use prometheus::{
Encoder, Gauge, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
Opts, Registry, TextEncoder,
Gauge, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts,
Registry,
};
use std::{
convert::TryInto,
error::Error,
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
atomic::{AtomicU64, Ordering},
Arc,
},
thread,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tracing::{debug, error};
type RegistryFn = Box<dyn FnOnce(&Registry) -> Result<(), prometheus::Error>>;
@ -60,13 +57,6 @@ pub struct TickMetrics {
tick: Arc<AtomicU64>,
}
pub struct ServerMetrics {
running: Arc<AtomicBool>,
handle: Option<thread::JoinHandle<()>>,
registry: Option<Registry>,
tick: Arc<AtomicU64>,
}
impl PhysicsMetrics {
pub fn new() -> Result<(Self, RegistryFn), prometheus::Error> {
let entity_entity_collision_checks_count = IntCounter::with_opts(Opts::new(
@ -265,7 +255,7 @@ impl ChunkGenMetrics {
}
impl TickMetrics {
pub fn new(tick: Arc<AtomicU64>) -> Result<(Self, RegistryFn), Box<dyn Error>> {
pub fn new() -> Result<(Self, RegistryFn), Box<dyn Error>> {
let chonks_count = IntGauge::with_opts(Opts::new(
"chonks_count",
"number of all chonks currently active on the server",
@ -315,6 +305,7 @@ impl TickMetrics {
let time_of_day_clone = time_of_day.clone();
let light_count_clone = light_count.clone();
let tick_time_clone = tick_time.clone();
let tick = Arc::new(AtomicU64::new(0));
let f = |registry: &Registry| {
registry.register(Box::new(chonks_count_clone))?;
@ -346,87 +337,7 @@ impl TickMetrics {
))
}
pub fn tick(&self) { self.tick.fetch_add(1, Ordering::Relaxed); }
pub fn is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 }
}
impl ServerMetrics {
#[allow(clippy::new_without_default)] // TODO: Pending review in #587
pub fn new() -> Self {
let running = Arc::new(AtomicBool::new(false));
let tick = Arc::new(AtomicU64::new(0));
let registry = Some(Registry::new());
Self {
running,
handle: None,
registry,
tick,
}
}
pub fn registry(&self) -> &Registry {
match self.registry {
Some(ref r) => r,
None => panic!("You cannot longer register new metrics after the server has started!"),
}
}
pub fn run(&mut self, addr: SocketAddr) -> Result<(), Box<dyn Error>> {
self.running.store(true, Ordering::Relaxed);
let running2 = Arc::clone(&self.running);
let registry = self
.registry
.take()
.expect("ServerMetrics must be already started");
//TODO: make this a job
self.handle = Some(thread::spawn(move || {
let server = tiny_http::Server::http(addr).unwrap();
const TIMEOUT: Duration = Duration::from_secs(1);
debug!("starting tiny_http server to serve metrics");
while running2.load(Ordering::Relaxed) {
let request = match server.recv_timeout(TIMEOUT) {
Ok(Some(rq)) => rq,
Ok(None) => continue,
Err(e) => {
error!(?e, "metrics http server error");
break;
},
};
let mf = registry.gather();
let encoder = TextEncoder::new();
let mut buffer = vec![];
encoder
.encode(&mf, &mut buffer)
.expect("Failed to encoder metrics text.");
let response = tiny_http::Response::from_string(
String::from_utf8(buffer).expect("Failed to parse bytes as a string."),
);
if let Err(e) = request.respond(response) {
error!(
?e,
"The metrics HTTP server had encountered and error with answering",
);
}
}
debug!("stopping tiny_http server to serve metrics");
}));
Ok(())
}
pub fn tick(&self) -> u64 { self.tick.fetch_add(1, Ordering::Relaxed) + 1 }
pub fn tick_clone(&self) -> Arc<AtomicU64> { Arc::clone(&self.tick) }
}
impl Drop for ServerMetrics {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
let handle = self.handle.take();
handle
.expect("ServerMetrics worker handle does not exist.")
.join()
.expect("Error shutting down prometheus metric exporter");
}
}

View File

@ -82,7 +82,7 @@ ron = {version = "0.6", default-features = false}
serde = {version = "1.0", features = [ "rc", "derive" ]}
treeculler = "0.1.0"
uvth = "3.1.1"
tokio = { version = "1.0.1", default-features = false, features = ["rt-multi-thread"] }
tokio = { version = "1", default-features = false, features = ["rt-multi-thread"] }
num_cpus = "1.0"
# vec_map = { version = "0.8.2" }
inline_tweak = "1.0.2"

View File

@ -45,6 +45,7 @@ pub fn init(settings: &Settings) -> Vec<impl Drop> {
.add_directive("uvth=warn".parse().unwrap())
.add_directive("tiny_http=warn".parse().unwrap())
.add_directive("mio::sys::windows=debug".parse().unwrap())
.add_directive("veloren_network_protocol=info".parse().unwrap())
.add_directive(
"veloren_server::persistence::character=info"
.parse()