mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
first implementation of connect and tcp using a mio worker protocol and:
- introduce a loadtest, for tcp messages - cleanup api - added a unittest - prepared a handshake message, which will in next commits get removed again - experimental mio worker merges - using uuid for participant id
This commit is contained in:
parent
a01afd0c86
commit
52078f2251
115
Cargo.lock
generated
115
Cargo.lock
generated
@ -63,6 +63,15 @@ version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "000444226fcff248f2bc4c7625be32c63caccfecc2723a2b9f78a7487a49c407"
|
||||
|
||||
[[package]]
|
||||
name = "ansi_term"
|
||||
version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
|
||||
dependencies = [
|
||||
"winapi 0.3.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.27"
|
||||
@ -2584,6 +2593,15 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1"
|
||||
dependencies = [
|
||||
"regex-automata",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matches"
|
||||
version = "0.1.8"
|
||||
@ -3764,6 +3782,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4"
|
||||
dependencies = [
|
||||
"byteorder 1.3.4",
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -4183,6 +4202,15 @@ version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d"
|
||||
|
||||
[[package]]
|
||||
name = "sharded-slab"
|
||||
version = "0.0.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ae75d0445b5d3778c9da3d1f840faa16d0627c8607f78a74daf69e5b988c39a1"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "shared_library"
|
||||
version = "0.1.9"
|
||||
@ -4488,6 +4516,13 @@ dependencies = [
|
||||
"unicode-xid 0.2.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tcp-loadtest"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"rand 0.7.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tempdir"
|
||||
version = "0.3.7"
|
||||
@ -4802,6 +4837,77 @@ version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860"
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1721cc8cf7d770cc4257872507180f35a4797272f5962f24c806af9e7faf52ab"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"tracing-attributes",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-attributes"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7fbad39da2f9af1cae3016339ad7f2c7a9e870f12e8fd04c4fd7ef35b30c0d2b"
|
||||
dependencies = [
|
||||
"quote 1.0.3",
|
||||
"syn 1.0.16",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-core"
|
||||
version = "0.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0aa83a9a47081cd522c09c81b31aec2c9273424976f922ad61c053b58350b715"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-log"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"log 0.4.8",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-serde"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6ccba2f8f16e0ed268fc765d9b7ff22e965e7185d32f8f1ec8294fe17d86e79"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-subscriber"
|
||||
version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dedebcf5813b02261d6bab3a12c6a8ae702580c0405a2e8ec16c3713caf14c20"
|
||||
dependencies = [
|
||||
"ansi_term",
|
||||
"chrono",
|
||||
"lazy_static",
|
||||
"matchers",
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sharded-slab",
|
||||
"smallvec 1.2.0",
|
||||
"tracing-core",
|
||||
"tracing-log",
|
||||
"tracing-serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "treeculler"
|
||||
version = "0.1.0"
|
||||
@ -4928,6 +5034,10 @@ name = "uuid"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11"
|
||||
dependencies = [
|
||||
"rand 0.7.3",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "uvth"
|
||||
@ -5043,10 +5153,15 @@ name = "veloren-network"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bincode",
|
||||
"byteorder 1.3.4",
|
||||
"enumset",
|
||||
"mio",
|
||||
"rand 0.7.3",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"uuid 0.8.1",
|
||||
"uvth",
|
||||
]
|
||||
|
||||
|
@ -10,6 +10,7 @@ members = [
|
||||
"voxygen",
|
||||
"world",
|
||||
"network",
|
||||
"network/tools/tcp-loadtest",
|
||||
]
|
||||
|
||||
# default profile for devs, fast to compile, okay enough to run, no debug information
|
||||
|
@ -13,4 +13,9 @@ enumset = "0.4"
|
||||
bincode = "1.2"
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
mio = "0.6"
|
||||
mio = "0.6"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.2.0-alpha.4"
|
||||
byteorder = "1.3"
|
||||
rand = "0.7"
|
||||
uuid = { version = "0.8", features = ["serde", "v4"] }
|
@ -1,19 +1,20 @@
|
||||
use crate::{message::Message, protocol::Protocol};
|
||||
use crate::{
|
||||
message::{self, Message},
|
||||
mio_worker::{MioWorker, TokenObjects},
|
||||
tcp_channel::TcpChannel,
|
||||
};
|
||||
use enumset::*;
|
||||
use mio::{
|
||||
self,
|
||||
net::{TcpListener, TcpStream},
|
||||
Poll, PollOpt, Ready, Token,
|
||||
PollOpt, Ready,
|
||||
};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
marker::PhantomData,
|
||||
sync::{Arc, RwLock},
|
||||
time::Duration,
|
||||
};
|
||||
use uvth::{ThreadPool, ThreadPoolBuilder};
|
||||
use std::{marker::PhantomData, sync::Arc};
|
||||
use tracing::*;
|
||||
use uuid::Uuid;
|
||||
use uvth::ThreadPool;
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Address {
|
||||
Tcp(std::net::SocketAddr),
|
||||
Udp(std::net::SocketAddr),
|
||||
@ -36,130 +37,99 @@ pub struct Connection {}
|
||||
pub struct Stream {}
|
||||
|
||||
pub trait Events {
|
||||
fn OnRemoteConnectionOpen(net: &Network<Self>, con: &Connection)
|
||||
fn on_remote_connection_open(net: &Network<Self>, con: &Connection)
|
||||
where
|
||||
Self: std::marker::Sized;
|
||||
fn OnRemoteConnectionClose(net: &Network<Self>, con: &Connection)
|
||||
fn on_remote_connection_close(net: &Network<Self>, con: &Connection)
|
||||
where
|
||||
Self: std::marker::Sized;
|
||||
fn OnRemoteStreamOpen(net: &Network<Self>, st: &Stream)
|
||||
fn on_remote_stream_open(net: &Network<Self>, st: &Stream)
|
||||
where
|
||||
Self: std::marker::Sized;
|
||||
fn OnRemoteStreamClose(net: &Network<Self>, st: &Stream)
|
||||
fn on_remote_stream_close(net: &Network<Self>, st: &Stream)
|
||||
where
|
||||
Self: std::marker::Sized;
|
||||
}
|
||||
|
||||
pub enum TokenObjects {
|
||||
TCP_LISTENER(TcpListener),
|
||||
}
|
||||
|
||||
pub struct NetworkData {
|
||||
next_token_id: usize,
|
||||
tokens: HashMap<Token, TokenObjects>, //TODO: move to Vec<Options> for faster lookup
|
||||
poll: Poll,
|
||||
}
|
||||
|
||||
pub struct Network<E: Events> {
|
||||
internal_sync: Arc<RwLock<NetworkData>>,
|
||||
thread_pool: ThreadPool,
|
||||
participant_id: u64,
|
||||
mio_workers: Arc<Vec<MioWorker>>,
|
||||
thread_pool: Arc<ThreadPool>,
|
||||
participant_id: Uuid,
|
||||
_pe: PhantomData<E>,
|
||||
}
|
||||
|
||||
impl NetworkData {
|
||||
pub fn new() -> Self {
|
||||
NetworkData {
|
||||
next_token_id: 0,
|
||||
tokens: HashMap::new(),
|
||||
poll: Poll::new().unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: Events> Network<E> {
|
||||
const TCP_LISTEN_TOK: Token = Token(0);
|
||||
|
||||
pub fn new() -> Self {
|
||||
let thread_pool = ThreadPoolBuilder::new()
|
||||
.name("veloren-network".into())
|
||||
.build();
|
||||
let internal_sync = Arc::new(RwLock::new(NetworkData::new()));
|
||||
let internal_sync_clone = internal_sync.clone();
|
||||
thread_pool.execute(|| master_poll_worker(internal_sync_clone));
|
||||
pub fn new(participant_id: Uuid, thread_pool: Arc<ThreadPool>) -> Self {
|
||||
let mio_workers = Arc::new(vec![MioWorker::new(
|
||||
(participant_id.as_u128().rem_euclid(1024)) as u64,
|
||||
thread_pool.clone(),
|
||||
)]);
|
||||
Self {
|
||||
internal_sync,
|
||||
mio_workers,
|
||||
thread_pool,
|
||||
participant_id: 42,
|
||||
participant_id,
|
||||
_pe: PhantomData::<E> {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send<'a, M: Message<'a>>(&self, msg: M, stream: &Stream) {}
|
||||
fn get_lowest_worker<'a: 'b, 'b>(list: &'a Arc<Vec<MioWorker>>) -> &'a MioWorker { &list[0] }
|
||||
|
||||
pub fn send<'a, M: Message<'a>>(&self, msg: M, stream: &Stream) {
|
||||
let messagebuffer = message::serialize(&msg);
|
||||
}
|
||||
|
||||
pub fn listen(&self, addr: &Address) {
|
||||
let addr = addr.clone();
|
||||
let internal_sync = self.internal_sync.clone();
|
||||
self.thread_pool.execute(move || match addr {
|
||||
Address::Tcp(a) => {
|
||||
let tcp_listener = TcpListener::bind(&a).unwrap();
|
||||
let mut internal_sync = internal_sync.write().unwrap();
|
||||
let tok = Token(internal_sync.next_token_id);
|
||||
internal_sync.next_token_id += 1;
|
||||
internal_sync
|
||||
.poll
|
||||
.register(&tcp_listener, tok, Ready::readable(), PollOpt::edge())
|
||||
.unwrap();
|
||||
internal_sync
|
||||
.tokens
|
||||
.insert(tok, TokenObjects::TCP_LISTENER(tcp_listener));
|
||||
},
|
||||
Address::Udp(_) => unimplemented!("lazy me"),
|
||||
let mio_workers = self.mio_workers.clone();
|
||||
let address = addr.clone();
|
||||
self.thread_pool.execute(move || {
|
||||
let mut span = span!(Level::INFO, "listen", ?address);
|
||||
let _enter = span.enter();
|
||||
match address {
|
||||
Address::Tcp(a) => {
|
||||
info!("listening");
|
||||
let tcp_listener = TcpListener::bind(&a).unwrap();
|
||||
let worker = Self::get_lowest_worker(&mio_workers);
|
||||
worker.register(
|
||||
TokenObjects::TcpListener(tcp_listener),
|
||||
Ready::readable(),
|
||||
PollOpt::edge(),
|
||||
);
|
||||
},
|
||||
Address::Udp(_) => unimplemented!("lazy me"),
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub fn connect(&self, addr: &Address) -> Participant { Participant { addr: addr.clone() } }
|
||||
pub fn connect(&self, addr: &Address) -> Participant {
|
||||
let mio_workers = self.mio_workers.clone();
|
||||
let address = addr.clone();
|
||||
self.thread_pool.execute(move || {
|
||||
let mut span = span!(Level::INFO, "connect", ?address);
|
||||
let _enter = span.enter();
|
||||
match address {
|
||||
Address::Tcp(a) => {
|
||||
info!("connecting");
|
||||
let tcp_stream = match TcpStream::connect(&a) {
|
||||
Err(err) => {
|
||||
error!("could not open connection: {}", err);
|
||||
return;
|
||||
},
|
||||
Ok(s) => s,
|
||||
};
|
||||
let worker = Self::get_lowest_worker(&mio_workers);
|
||||
worker.register(
|
||||
TokenObjects::TcpChannel(TcpChannel::new(tcp_stream)),
|
||||
Ready::readable(),
|
||||
PollOpt::edge(),
|
||||
);
|
||||
},
|
||||
Address::Udp(_) => unimplemented!("lazy me"),
|
||||
}
|
||||
});
|
||||
Participant { addr: addr.clone() }
|
||||
}
|
||||
|
||||
pub fn open(&self, part: Participant, prio: u8, prom: EnumSet<Promise>) -> Stream { Stream {} }
|
||||
|
||||
pub fn close(&self, stream: Stream) {}
|
||||
}
|
||||
|
||||
fn master_poll_worker(internal_sync: Arc<RwLock<NetworkData>>) {
|
||||
let mut events = mio::Events::with_capacity(1024);
|
||||
loop {
|
||||
let internal_sync = internal_sync.write().unwrap();
|
||||
if let Err(err) = internal_sync
|
||||
.poll
|
||||
.poll(&mut events, Some(Duration::from_millis(1)))
|
||||
{
|
||||
//postbox_tx.send(Err(err.into()))?;
|
||||
return;
|
||||
}
|
||||
|
||||
for event in &events {
|
||||
match internal_sync.tokens.get(&event.token()) {
|
||||
Some(e) => {
|
||||
match e {
|
||||
TokenObjects::TCP_LISTENER(listener) => {
|
||||
match listener.accept() {
|
||||
Ok((stream, _)) => {}, /* PostBox::from_tcpstream(stream) */
|
||||
Err(err) => {}, /* Err(err.into()) */
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
None => panic!("Unexpected event token '{:?}'", &event.token()),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Address {
|
||||
pub fn getProtocol(&self) -> Protocol {
|
||||
match self {
|
||||
Address::Tcp(_) => Protocol::Tcp,
|
||||
Address::Udp(_) => Protocol::Udp,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
12
network/src/frame.rs
Normal file
12
network/src/frame.rs
Normal file
@ -0,0 +1,12 @@
|
||||
#[derive(Debug)]
|
||||
pub enum TcpFrame {
|
||||
Header {
|
||||
id: u64,
|
||||
length: u64,
|
||||
},
|
||||
Data {
|
||||
id: u64,
|
||||
frame_no: u64,
|
||||
data: Vec<u8>,
|
||||
},
|
||||
}
|
35
network/src/internal.rs
Normal file
35
network/src/internal.rs
Normal file
@ -0,0 +1,35 @@
|
||||
use crate::api::Address;
|
||||
|
||||
pub(crate) trait Channel {
|
||||
fn get_preferred_queue_size() -> usize;
|
||||
fn get_preferred_buffer_len() -> usize;
|
||||
fn queue(&self, msg: Vec<u8>);
|
||||
fn recv(&self) -> Option<Vec<u8>>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum TcpFrame {
|
||||
Header {
|
||||
id: u64,
|
||||
length: u64,
|
||||
},
|
||||
Data {
|
||||
id: u64,
|
||||
frame_no: u64,
|
||||
data: Vec<u8>,
|
||||
},
|
||||
}
|
||||
|
||||
pub(crate) enum Protocol {
|
||||
Tcp,
|
||||
Udp,
|
||||
}
|
||||
|
||||
impl Address {
|
||||
pub(crate) fn get_protocol(&self) -> Protocol {
|
||||
match self {
|
||||
Address::Tcp(_) => Protocol::Tcp,
|
||||
Address::Udp(_) => Protocol::Udp,
|
||||
}
|
||||
}
|
||||
}
|
107
network/src/internal_messages.rs
Normal file
107
network/src/internal_messages.rs
Normal file
@ -0,0 +1,107 @@
|
||||
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
||||
use std::io::{Read, Write};
|
||||
use tracing::*;
|
||||
|
||||
const VELOREN_MAGIC_NUMBER: &str = "VELOREN";
|
||||
const VELOREN_NETWORK_VERSION_MAJOR: u16 = 0;
|
||||
const VELOREN_NETWORK_VERSION_MINOR: u8 = 0;
|
||||
const VELOREN_NETWORK_VERSION_PATCH: u8 = 1;
|
||||
|
||||
pub fn encode_handshake1<W: Write>(stream: &mut W, participant_id: u64) {
|
||||
stream.write_all(VELOREN_MAGIC_NUMBER.as_bytes()).unwrap();
|
||||
stream.write_u8('\n' as u8).unwrap();
|
||||
stream
|
||||
.write_u16::<BigEndian>(VELOREN_NETWORK_VERSION_MAJOR)
|
||||
.unwrap();
|
||||
stream.write_u8('.' as u8).unwrap();
|
||||
stream.write_u8(VELOREN_NETWORK_VERSION_MINOR).unwrap();
|
||||
stream.write_u8('.' as u8).unwrap();
|
||||
stream.write_u8(VELOREN_NETWORK_VERSION_PATCH).unwrap();
|
||||
stream.write_u8('\n' as u8).unwrap();
|
||||
stream.write_u64::<BigEndian>(participant_id).unwrap();
|
||||
stream.write_u8('\n' as u8).unwrap();
|
||||
}
|
||||
|
||||
pub fn decode_handshake1<R: Read>(stream: &mut R) -> Result<(u16, u8, u8, u64), ()> {
|
||||
let mut veloren_buf: [u8; 7] = [0; 7];
|
||||
let mut major;
|
||||
let mut minor;
|
||||
let mut patch;
|
||||
let mut participant_id;
|
||||
match stream.read_exact(&mut veloren_buf) {
|
||||
Ok(()) if (veloren_buf == VELOREN_MAGIC_NUMBER.as_bytes()) => {},
|
||||
_ => {
|
||||
error!(?veloren_buf, "incompatible magic number");
|
||||
return Err(());
|
||||
},
|
||||
}
|
||||
match stream.read_u8().map(|u| u as char) {
|
||||
Ok('\n') => {},
|
||||
_ => return Err(()),
|
||||
}
|
||||
match stream.read_u16::<BigEndian>() {
|
||||
Ok(u) => major = u,
|
||||
_ => return Err(()),
|
||||
}
|
||||
match stream.read_u8().map(|u| u as char) {
|
||||
Ok('.') => {},
|
||||
_ => return Err(()),
|
||||
}
|
||||
match stream.read_u8() {
|
||||
Ok(u) => minor = u,
|
||||
_ => return Err(()),
|
||||
}
|
||||
match stream.read_u8().map(|u| u as char) {
|
||||
Ok('.') => {},
|
||||
_ => return Err(()),
|
||||
}
|
||||
match stream.read_u8() {
|
||||
Ok(u) => patch = u,
|
||||
_ => return Err(()),
|
||||
}
|
||||
match stream.read_u8().map(|u| u as char) {
|
||||
Ok('\n') => {},
|
||||
_ => return Err(()),
|
||||
}
|
||||
match stream.read_u64::<BigEndian>() {
|
||||
Ok(u) => participant_id = u,
|
||||
_ => return Err(()),
|
||||
}
|
||||
Ok((major, minor, patch, participant_id))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{internal_messages::*, tests::test_tracing};
|
||||
|
||||
#[test]
|
||||
fn handshake() {
|
||||
let mut data = Vec::new();
|
||||
encode_handshake1(&mut data, 1337);
|
||||
let dh = decode_handshake1(&mut data.as_slice());
|
||||
assert!(dh.is_ok());
|
||||
let (ma, mi, pa, p) = dh.unwrap();
|
||||
assert_eq!(ma, VELOREN_NETWORK_VERSION_MAJOR);
|
||||
assert_eq!(mi, VELOREN_NETWORK_VERSION_MINOR);
|
||||
assert_eq!(pa, VELOREN_NETWORK_VERSION_PATCH);
|
||||
assert_eq!(p, 1337);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handshake_decodeerror_incorrect() {
|
||||
let mut data = Vec::new();
|
||||
encode_handshake1(&mut data, 1337);
|
||||
data[3] = 'F' as u8;
|
||||
let dh = decode_handshake1(&mut data.as_slice());
|
||||
assert!(dh.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handshake_decodeerror_toless() {
|
||||
let mut data = Vec::new();
|
||||
encode_handshake1(&mut data, 1337);
|
||||
data.drain(9..);
|
||||
let dh = decode_handshake1(&mut data.as_slice());
|
||||
assert!(dh.is_err());
|
||||
}
|
||||
}
|
@ -1,24 +1,44 @@
|
||||
#![feature(trait_alias)]
|
||||
mod api;
|
||||
mod frame;
|
||||
mod internal;
|
||||
mod internal_messages;
|
||||
mod message;
|
||||
mod protocol;
|
||||
mod mio_worker;
|
||||
mod tcp_channel;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
pub mod tests {
|
||||
use crate::api::*;
|
||||
use std::net::SocketAddr;
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use uuid::Uuid;
|
||||
use uvth::ThreadPoolBuilder;
|
||||
|
||||
struct N {
|
||||
id: u8,
|
||||
}
|
||||
|
||||
impl Events for N {
|
||||
fn OnRemoteConnectionOpen(net: &Network<N>, con: &Connection) {}
|
||||
fn on_remote_connection_open(_net: &Network<N>, _con: &Connection) {}
|
||||
|
||||
fn OnRemoteConnectionClose(net: &Network<N>, con: &Connection) {}
|
||||
fn on_remote_connection_close(_net: &Network<N>, _con: &Connection) {}
|
||||
|
||||
fn OnRemoteStreamOpen(net: &Network<N>, st: &Stream) {}
|
||||
fn on_remote_stream_open(_net: &Network<N>, _st: &Stream) {}
|
||||
|
||||
fn OnRemoteStreamClose(net: &Network<N>, st: &Stream) {}
|
||||
fn on_remote_stream_close(_net: &Network<N>, _st: &Stream) {}
|
||||
}
|
||||
|
||||
pub fn test_tracing() {
|
||||
use tracing::Level;
|
||||
use tracing_subscriber;
|
||||
|
||||
tracing_subscriber::FmtSubscriber::builder()
|
||||
// all spans/events with a level higher than TRACE (e.g, info, warn, etc.)
|
||||
// will be written to stdout.
|
||||
.with_max_level(Level::TRACE)
|
||||
//.with_env_filter("veloren_network::api=info,my_crate::my_mod=debug,[my_span]=trace")
|
||||
// sets this to be the default, global subscriber for this application.
|
||||
.init();
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -28,14 +48,21 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn client_server() {
|
||||
let n1 = Network::<N>::new();
|
||||
let n2 = Network::<N>::new();
|
||||
let a1s = Address::Tcp(SocketAddr::from(([0, 0, 0, 0], 52000u16)));
|
||||
let a1 = Address::Tcp(SocketAddr::from(([1, 0, 0, 127], 52000u16)));
|
||||
let a2s = Address::Tcp(SocketAddr::from(([0, 0, 0, 0], 52001u16)));
|
||||
let a2 = Address::Tcp(SocketAddr::from(([1, 0, 0, 127], 52001u16)));
|
||||
n1.listen(&a1s); //await
|
||||
n2.listen(&a2s); // only requiered here, but doesnt hurt on n1
|
||||
let thread_pool = Arc::new(
|
||||
ThreadPoolBuilder::new()
|
||||
.name("veloren-network-test".into())
|
||||
.build(),
|
||||
);
|
||||
test_tracing();
|
||||
let n1 = Network::<N>::new(Uuid::new_v4(), thread_pool.clone());
|
||||
let n2 = Network::<N>::new(Uuid::new_v4(), thread_pool.clone());
|
||||
let a1 = Address::Tcp(SocketAddr::from(([10, 52, 0, 101], 52000)));
|
||||
let a2 = Address::Tcp(SocketAddr::from(([10, 52, 0, 101], 52001)));
|
||||
//let a1 = Address::Tcp(SocketAddr::from(([10, 42, 2, 2], 52000)));
|
||||
//let a2 = Address::Tcp(SocketAddr::from(([10, 42, 2, 2], 52001)));
|
||||
n1.listen(&a1); //await
|
||||
n2.listen(&a2); // only requiered here, but doesnt hurt on n1
|
||||
std::thread::sleep(std::time::Duration::from_millis(5));
|
||||
|
||||
let p1 = n1.connect(&a2); //await
|
||||
//n2.OnRemoteConnectionOpen triggered
|
||||
@ -48,5 +75,7 @@ mod tests {
|
||||
|
||||
n1.close(s1);
|
||||
//n2.OnRemoteStreamClose triggered
|
||||
|
||||
std::thread::sleep(std::time::Duration::from_millis(20000));
|
||||
}
|
||||
}
|
||||
|
@ -1,20 +1,29 @@
|
||||
use bincode;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
//use std::collections::VecDeque;
|
||||
use std::sync::Arc;
|
||||
pub trait Message<'a> = Serialize + Deserialize<'a>;
|
||||
|
||||
struct MessageBuffer {
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct MessageBuffer {
|
||||
// use VecDeque for msg storage, because it allows to quickly remove data from front.
|
||||
//however VecDeque needs custom bincode code, but it's possible
|
||||
data: Vec<u8>,
|
||||
}
|
||||
|
||||
struct OutGoingMessage {
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct OutGoingMessage {
|
||||
buffer: Arc<MessageBuffer>,
|
||||
cursor: u64,
|
||||
}
|
||||
|
||||
fn serialize<'a, M: Message<'a>>(message: &M) -> MessageBuffer {
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct InCommingMessage {
|
||||
buffer: MessageBuffer,
|
||||
cursor: u64,
|
||||
}
|
||||
|
||||
pub(crate) fn serialize<'a, M: Message<'a>>(message: &M) -> MessageBuffer {
|
||||
let mut writer = {
|
||||
let actual_size = bincode::serialized_size(message).unwrap();
|
||||
Vec::<u8>::with_capacity(actual_size as usize)
|
||||
|
291
network/src/mio_worker.rs
Normal file
291
network/src/mio_worker.rs
Normal file
@ -0,0 +1,291 @@
|
||||
use crate::tcp_channel::TcpChannel;
|
||||
use mio::{self, net::TcpListener, Poll, PollOpt, Ready, Token};
|
||||
use rand::{self, seq::IteratorRandom};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
io::{Read, Write},
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, RwLock,
|
||||
},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::{debug, error, info, span, trace, warn, Level};
|
||||
use uvth::ThreadPool;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum TokenObjects {
|
||||
TcpListener(TcpListener),
|
||||
TcpChannel(TcpChannel),
|
||||
}
|
||||
|
||||
pub(crate) struct MioTokens {
|
||||
next_token_id: usize,
|
||||
pub tokens: HashMap<Token, TokenObjects>, //TODO: move to Vec<Options> for faster lookup
|
||||
}
|
||||
|
||||
impl MioTokens {
|
||||
pub fn new() -> Self {
|
||||
MioTokens {
|
||||
next_token_id: 10,
|
||||
tokens: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn construct(&mut self) -> Token {
|
||||
let tok = Token(self.next_token_id);
|
||||
self.next_token_id += 1;
|
||||
tok
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, tok: Token, obj: TokenObjects) {
|
||||
trace!(?tok, ?obj, "added new token");
|
||||
self.tokens.insert(tok, obj);
|
||||
}
|
||||
}
|
||||
|
||||
// MioStatistics should be copied in order to not hold locks for long
|
||||
#[derive(Clone, Default)]
|
||||
pub struct MioStatistics {
|
||||
nano_wait: u128,
|
||||
nano_busy: u128,
|
||||
}
|
||||
|
||||
/*
|
||||
The MioWorker runs in it's own thread,
|
||||
it has a given set of Channels to work with.
|
||||
It is monitored, and when it's thread is fully loaded it can be splitted up into 2 MioWorkers
|
||||
*/
|
||||
pub struct MioWorker {
|
||||
worker_tag: u64, /* only relevant for logs */
|
||||
poll: Arc<Poll>,
|
||||
mio_tokens: Arc<RwLock<MioTokens>>,
|
||||
mio_statistics: Arc<RwLock<MioStatistics>>,
|
||||
shutdown: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl MioWorker {
|
||||
const CTRL_TOK: Token = Token(0);
|
||||
|
||||
pub fn new(worker_tag: u64, thread_pool: Arc<ThreadPool>) -> Self {
|
||||
let poll = Arc::new(Poll::new().unwrap());
|
||||
let poll_clone = poll.clone();
|
||||
let mio_tokens = Arc::new(RwLock::new(MioTokens::new()));
|
||||
let mio_tokens_clone = mio_tokens.clone();
|
||||
let mio_statistics = Arc::new(RwLock::new(MioStatistics::default()));
|
||||
let mio_statistics_clone = mio_statistics.clone();
|
||||
let shutdown = Arc::new(AtomicBool::new(false));
|
||||
let shutdown_clone = shutdown.clone();
|
||||
|
||||
let mw = MioWorker {
|
||||
worker_tag,
|
||||
poll,
|
||||
mio_tokens,
|
||||
mio_statistics,
|
||||
shutdown,
|
||||
};
|
||||
thread_pool.execute(move || {
|
||||
mio_worker(
|
||||
worker_tag,
|
||||
poll_clone,
|
||||
mio_tokens_clone,
|
||||
mio_statistics_clone,
|
||||
shutdown_clone,
|
||||
)
|
||||
});
|
||||
mw
|
||||
}
|
||||
|
||||
pub fn get_load_ratio(&self) -> f32 {
|
||||
let statistics = self.mio_statistics.read().unwrap();
|
||||
statistics.nano_busy as f32 / (statistics.nano_busy + statistics.nano_wait + 1) as f32
|
||||
}
|
||||
|
||||
//TODO: split 4->5 MioWorkers and merge 5->4 MioWorkers
|
||||
pub fn split(&self, worker_id: u64, thread_pool: Arc<ThreadPool>) -> Self {
|
||||
//fork off a second MioWorker and split load
|
||||
let second = MioWorker::new(worker_id, thread_pool);
|
||||
{
|
||||
let mut first_tokens = self.mio_tokens.write().unwrap();
|
||||
let mut second_tokens = second.mio_tokens.write().unwrap();
|
||||
let cnt = first_tokens.tokens.len() / 2;
|
||||
|
||||
for (key, val) in first_tokens
|
||||
.tokens
|
||||
.drain()
|
||||
.choose_multiple(&mut rand::thread_rng(), cnt / 2)
|
||||
{
|
||||
second_tokens.tokens.insert(key, val);
|
||||
}
|
||||
info!(
|
||||
"split MioWorker with {} tokens. New MioWorker has now {} tokens",
|
||||
cnt,
|
||||
second_tokens.tokens.len()
|
||||
);
|
||||
}
|
||||
second
|
||||
}
|
||||
|
||||
pub fn merge(&self, other: MioWorker) {
|
||||
//fork off a second MioWorker and split load
|
||||
let mut first_tokens = self.mio_tokens.write().unwrap();
|
||||
let mut second_tokens = other.mio_tokens.write().unwrap();
|
||||
let cnt = first_tokens.tokens.len();
|
||||
|
||||
for (key, val) in second_tokens.tokens.drain() {
|
||||
first_tokens.tokens.insert(key, val);
|
||||
}
|
||||
info!(
|
||||
"merge MioWorker with {} tokens. New MioWorker has now {} tokens",
|
||||
cnt,
|
||||
first_tokens.tokens.len()
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn register(&self, handle: TokenObjects, interest: Ready, opts: PollOpt) {
|
||||
let mut tokens = self.mio_tokens.write().unwrap();
|
||||
let tok = tokens.construct();
|
||||
match &handle {
|
||||
TokenObjects::TcpListener(h) => self.poll.register(h, tok, interest, opts).unwrap(),
|
||||
TokenObjects::TcpChannel(channel) => self
|
||||
.poll
|
||||
.register(&channel.stream, tok, interest, opts)
|
||||
.unwrap(),
|
||||
}
|
||||
trace!(?handle, ?tok, "registered");
|
||||
tokens.insert(tok, handle);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MioWorker {
|
||||
fn drop(&mut self) { self.shutdown.store(true, Ordering::Relaxed); }
|
||||
}
|
||||
|
||||
fn mio_worker(
|
||||
worker_tag: u64,
|
||||
poll: Arc<Poll>,
|
||||
mio_tokens: Arc<RwLock<MioTokens>>,
|
||||
mio_statistics: Arc<RwLock<MioStatistics>>,
|
||||
shutdown: Arc<AtomicBool>,
|
||||
) {
|
||||
let mut events = mio::Events::with_capacity(1024);
|
||||
let span = span!(Level::INFO, "mio worker", ?worker_tag);
|
||||
let _enter = span.enter();
|
||||
while !shutdown.load(Ordering::Relaxed) {
|
||||
let time_before_poll = Instant::now();
|
||||
if let Err(err) = poll.poll(&mut events, Some(Duration::from_millis(1000))) {
|
||||
error!("network poll error: {}", err);
|
||||
return;
|
||||
}
|
||||
let time_after_poll = Instant::now();
|
||||
|
||||
if !events.is_empty() {
|
||||
let mut mio_tokens = mio_tokens.write().unwrap();
|
||||
for event in &events {
|
||||
match mio_tokens.tokens.get_mut(&event.token()) {
|
||||
Some(e) => {
|
||||
trace!(?event, "event");
|
||||
match e {
|
||||
TokenObjects::TcpListener(listener) => match listener.accept() {
|
||||
Ok((mut remote_stream, _)) => {
|
||||
info!(?remote_stream, "remote connected");
|
||||
remote_stream.write_all("Hello Client".as_bytes()).unwrap();
|
||||
remote_stream.flush().unwrap();
|
||||
|
||||
let tok = mio_tokens.construct();
|
||||
poll.register(
|
||||
&remote_stream,
|
||||
tok,
|
||||
Ready::readable() | Ready::writable(),
|
||||
PollOpt::edge(),
|
||||
)
|
||||
.unwrap();
|
||||
trace!(?remote_stream, ?tok, "registered");
|
||||
mio_tokens.tokens.insert(
|
||||
tok,
|
||||
TokenObjects::TcpChannel(TcpChannel::new(remote_stream)),
|
||||
);
|
||||
},
|
||||
Err(err) => {
|
||||
error!(?err, "error during remote connected");
|
||||
},
|
||||
},
|
||||
TokenObjects::TcpChannel(channel) => {
|
||||
if event.readiness().is_readable() {
|
||||
trace!(?channel.stream, "stream readable");
|
||||
//TODO: read values here and put to message assembly
|
||||
let mut buf: [u8; 1500] = [0; 1500];
|
||||
match channel.stream.read(&mut buf) {
|
||||
Ok(n) => {
|
||||
warn!("incomming message with len: {}", n);
|
||||
channel
|
||||
.to_receive
|
||||
.write()
|
||||
.unwrap()
|
||||
.push_back(buf.to_vec());
|
||||
},
|
||||
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
debug!("would block");
|
||||
},
|
||||
Err(e) => {
|
||||
panic!("{}", e);
|
||||
},
|
||||
};
|
||||
}
|
||||
if event.readiness().is_writable() {
|
||||
debug!(?channel.stream, "stream writeable");
|
||||
let mut to_send = channel.to_send.write().unwrap();
|
||||
if let Some(mut data) = to_send.pop_front() {
|
||||
let total = data.len();
|
||||
match channel.stream.write(&data) {
|
||||
Ok(n) if n == total => {},
|
||||
Ok(n) => {
|
||||
debug!("could only send part");
|
||||
let data = data.drain(n..).collect(); //TODO: validate n.. is correct
|
||||
to_send.push_front(data);
|
||||
},
|
||||
Err(e)
|
||||
if e.kind() == std::io::ErrorKind::WouldBlock =>
|
||||
{
|
||||
debug!("would block");
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("{}", e);
|
||||
},
|
||||
};
|
||||
};
|
||||
}
|
||||
},
|
||||
_ => unimplemented!("still lazy me"),
|
||||
}
|
||||
},
|
||||
None => panic!("Unexpected event token '{:?}'", &event.token()),
|
||||
};
|
||||
}
|
||||
}
|
||||
let time_after_work = Instant::now();
|
||||
match mio_statistics.try_write() {
|
||||
Ok(mut mio_statistics) => {
|
||||
const OLD_KEEP_FACTOR: f64 = 0.995;
|
||||
//in order to weight new data stronger than older we fade them out with a
|
||||
// factor < 1. for 0.995 under full load (500 ticks a 1ms) we keep 8% of the old
|
||||
// value this means, that we start to see load comming up after
|
||||
// 500ms, but not for small spikes - as reordering for smaller spikes would be
|
||||
// to slow
|
||||
mio_statistics.nano_wait = (mio_statistics.nano_wait as f64 * OLD_KEEP_FACTOR)
|
||||
as u128
|
||||
+ time_after_poll.duration_since(time_before_poll).as_nanos();
|
||||
mio_statistics.nano_busy = (mio_statistics.nano_busy as f64 * OLD_KEEP_FACTOR)
|
||||
as u128
|
||||
+ time_after_work.duration_since(time_after_poll).as_nanos();
|
||||
|
||||
trace!(
|
||||
"current Load {}",
|
||||
mio_statistics.nano_busy as f32
|
||||
/ (mio_statistics.nano_busy + mio_statistics.nano_wait + 1) as f32
|
||||
);
|
||||
},
|
||||
Err(e) => warn!("statistics dropped because they are currently accecssed"),
|
||||
}
|
||||
}
|
||||
}
|
@ -1,4 +0,0 @@
|
||||
pub enum Protocol {
|
||||
Tcp,
|
||||
Udp,
|
||||
}
|
40
network/src/tcp_channel.rs
Normal file
40
network/src/tcp_channel.rs
Normal file
@ -0,0 +1,40 @@
|
||||
use crate::internal::Channel;
|
||||
use mio::{self, net::TcpStream};
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
use tracing::{debug, error, info, span, trace, warn, Level};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TcpChannel {
|
||||
pub stream: TcpStream,
|
||||
pub to_send: RwLock<VecDeque<Vec<u8>>>,
|
||||
pub to_receive: RwLock<VecDeque<Vec<u8>>>,
|
||||
}
|
||||
|
||||
impl TcpChannel {
|
||||
pub fn new(stream: TcpStream) -> Self {
|
||||
TcpChannel {
|
||||
stream,
|
||||
to_send: RwLock::new(VecDeque::new()),
|
||||
to_receive: RwLock::new(VecDeque::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Channel for TcpChannel {
|
||||
fn get_preferred_queue_size() -> usize {
|
||||
1400 /*TCP MTU is often 1500, minus some headers*/
|
||||
//TODO: get this from the underlying network interface
|
||||
}
|
||||
|
||||
fn get_preferred_buffer_len() -> usize {
|
||||
5
|
||||
// = 1400*5 = 7000bytes => 0.0056s of buffer on 10Mbit/s network
|
||||
}
|
||||
|
||||
fn queue(&self, msg: Vec<u8>) {}
|
||||
|
||||
fn recv(&self) -> Option<Vec<u8>> { None }
|
||||
}
|
11
network/tools/tcp-loadtest/Cargo.toml
Normal file
11
network/tools/tcp-loadtest/Cargo.toml
Normal file
@ -0,0 +1,11 @@
|
||||
[package]
|
||||
name = "tcp-loadtest"
|
||||
version = "0.1.0"
|
||||
authors = ["Marcel Märtens <marcel.cochem@googlemail.com>"]
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
|
||||
rand = "0.7"
|
104
network/tools/tcp-loadtest/src/main.rs
Normal file
104
network/tools/tcp-loadtest/src/main.rs
Normal file
@ -0,0 +1,104 @@
|
||||
use std::{
|
||||
env,
|
||||
io::Write,
|
||||
net::{SocketAddr, TcpStream},
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
thread,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
extern crate rand;
|
||||
|
||||
fn setup() -> Result<SocketAddr, u32> {
|
||||
let args: Vec<String> = env::args().collect();
|
||||
if args.len() < 3 {
|
||||
println!("usage: tcp-loadtest <ip> <port>");
|
||||
println!("example: tcp-loadtest 127.0.0.1 52000");
|
||||
return Err(1);
|
||||
}
|
||||
let a: SocketAddr = format!("{}:{}", args[1], args[2]).parse().unwrap();
|
||||
return Ok(a);
|
||||
}
|
||||
|
||||
fn main() -> Result<(), u32> {
|
||||
let addr = Arc::new(setup()?);
|
||||
let data: Arc<String> = Arc::new(
|
||||
(0..1000000)
|
||||
.map(|_| (0x20u8 + (rand::random::<f32>() * 96.0) as u8) as char)
|
||||
.collect(),
|
||||
);
|
||||
|
||||
let total_bytes_send = Arc::new(AtomicU64::new(0));
|
||||
let total_send_count = Arc::new(AtomicU64::new(0));
|
||||
let total_finished_threads = Arc::new(AtomicU64::new(0));
|
||||
let start_time = Instant::now();
|
||||
|
||||
let mut threads = Vec::new();
|
||||
let thread_count = 4;
|
||||
for i in 0..thread_count {
|
||||
let addr = addr.clone();
|
||||
let total_bytes_send = total_bytes_send.clone();
|
||||
let total_send_count = total_send_count.clone();
|
||||
let total_finished_threads = total_finished_threads.clone();
|
||||
let data = data.clone();
|
||||
threads.push(thread::spawn(move || {
|
||||
let mut stream = match TcpStream::connect(addr.as_ref()) {
|
||||
Err(err) => {
|
||||
total_finished_threads.fetch_add(1, Ordering::Relaxed);
|
||||
panic!("could not open connection: {}", err);
|
||||
},
|
||||
Ok(s) => s,
|
||||
};
|
||||
let mut thread_bytes_send: u64 = 0;
|
||||
let mut thread_last_sync = Instant::now();
|
||||
|
||||
loop {
|
||||
let tosend: u64 = rand::random::<u16>() as u64 * 10 + 1000;
|
||||
thread_bytes_send += tosend;
|
||||
|
||||
let cur = Instant::now();
|
||||
if cur.duration_since(thread_last_sync) >= Duration::from_secs(1) {
|
||||
thread_last_sync = cur;
|
||||
println!("[{}]send: {}MiB/s", i, thread_bytes_send / (1024 * 1024));
|
||||
total_bytes_send.fetch_add(thread_bytes_send, Ordering::Relaxed);
|
||||
thread_bytes_send = 0;
|
||||
}
|
||||
|
||||
total_send_count.fetch_add(1, Ordering::Relaxed);
|
||||
let ret = stream.write_all(data[0..(tosend as usize)].as_bytes());
|
||||
if ret.is_err() {
|
||||
println!("[{}] error: {}", i, ret.err().unwrap());
|
||||
total_finished_threads.fetch_add(1, Ordering::Relaxed);
|
||||
return;
|
||||
}
|
||||
//stream.flush();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
while total_finished_threads.load(Ordering::Relaxed) < thread_count {
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
}
|
||||
|
||||
let cur = Instant::now();
|
||||
let dur = cur.duration_since(start_time);
|
||||
println!("================");
|
||||
println!("test endet");
|
||||
println!(
|
||||
"total send: {}MiB",
|
||||
total_bytes_send.load(Ordering::Relaxed) / (1024 * 1024)
|
||||
);
|
||||
println!("total time: {}s", dur.as_secs());
|
||||
println!(
|
||||
"average: {}KiB/s",
|
||||
total_bytes_send.load(Ordering::Relaxed) * 1000 / dur.as_millis() as u64 / 1024
|
||||
);
|
||||
println!(
|
||||
"send count: {}/s",
|
||||
total_send_count.load(Ordering::Relaxed) * 1000 / dur.as_millis() as u64
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in New Issue
Block a user