mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
Continue backend for networking and fill gaps, including:
- introduce tlid to allow - introduce channel trait - remove old experimental handshake - seperate mio_worker into multiple fn - implement stream in backend
This commit is contained in:
parent
52078f2251
commit
3d8ddcb4b3
19
Cargo.lock
generated
19
Cargo.lock
generated
@ -1251,19 +1251,20 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "enumset"
|
||||
version = "0.4.4"
|
||||
version = "0.4.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57b811aef4ff1cc938f13bbec348f0ecbfc2bb565b7ab90161c9f0b2805edc8a"
|
||||
checksum = "93182dcb6530c757e5879b22ebc5cfbd034861585b442819389614e223ac1c47"
|
||||
dependencies = [
|
||||
"enumset_derive",
|
||||
"num-traits 0.2.11",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "enumset_derive"
|
||||
version = "0.4.3"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b184c2d0714bbeeb6440481a19c78530aa210654d99529f13d2f860a1b447598"
|
||||
checksum = "751a786cfcc7d5ceb9e0fe06f0e911da6ce3a3044633e029df4c370193c86a62"
|
||||
dependencies = [
|
||||
"darling",
|
||||
"proc-macro2 1.0.9",
|
||||
@ -4647,6 +4648,13 @@ dependencies = [
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tlid"
|
||||
version = "0.2.2"
|
||||
dependencies = [
|
||||
"num-traits 0.2.11",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "0.1.22"
|
||||
@ -5156,9 +5164,10 @@ dependencies = [
|
||||
"byteorder 1.3.4",
|
||||
"enumset",
|
||||
"mio",
|
||||
"rand 0.7.3",
|
||||
"mio-extras",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"tlid",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"uuid 0.8.1",
|
||||
|
@ -9,7 +9,7 @@ edition = "2018"
|
||||
[dependencies]
|
||||
|
||||
uvth = "3.1"
|
||||
enumset = "0.4"
|
||||
enumset = { version = "0.4", features = ["serde"] }
|
||||
bincode = "1.2"
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
@ -17,5 +17,6 @@ mio = "0.6"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.2.0-alpha.4"
|
||||
byteorder = "1.3"
|
||||
rand = "0.7"
|
||||
mio-extras = "2.0"
|
||||
uuid = { version = "0.8", features = ["serde", "v4"] }
|
||||
tlid = { path = "../../tlid" }
|
@ -1,6 +1,7 @@
|
||||
use crate::{
|
||||
internal::Channel,
|
||||
message::{self, Message},
|
||||
mio_worker::{MioWorker, TokenObjects},
|
||||
mio_worker::{CtrlMsg, MioWorker, TokenObjects},
|
||||
tcp_channel::TcpChannel,
|
||||
};
|
||||
use enumset::*;
|
||||
@ -9,7 +10,9 @@ use mio::{
|
||||
net::{TcpListener, TcpStream},
|
||||
PollOpt, Ready,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{marker::PhantomData, sync::Arc};
|
||||
use tlid;
|
||||
use tracing::*;
|
||||
use uuid::Uuid;
|
||||
use uvth::ThreadPool;
|
||||
@ -20,7 +23,8 @@ pub enum Address {
|
||||
Udp(std::net::SocketAddr),
|
||||
}
|
||||
|
||||
#[derive(EnumSetType, Debug)]
|
||||
#[derive(Serialize, Deserialize, EnumSetType, Debug)]
|
||||
#[enumset(serialize_repr = "u8")]
|
||||
pub enum Promise {
|
||||
InOrder,
|
||||
NoCorrupt,
|
||||
@ -52,6 +56,7 @@ pub trait Events {
|
||||
}
|
||||
|
||||
pub struct Network<E: Events> {
|
||||
token_pool: tlid::Pool<tlid::Wrapping<usize>>,
|
||||
mio_workers: Arc<Vec<MioWorker>>,
|
||||
thread_pool: Arc<ThreadPool>,
|
||||
participant_id: Uuid,
|
||||
@ -60,11 +65,15 @@ pub struct Network<E: Events> {
|
||||
|
||||
impl<E: Events> Network<E> {
|
||||
pub fn new(participant_id: Uuid, thread_pool: Arc<ThreadPool>) -> Self {
|
||||
let mut token_pool = tlid::Pool::new_full();
|
||||
let mio_workers = Arc::new(vec![MioWorker::new(
|
||||
(participant_id.as_u128().rem_euclid(1024)) as u64,
|
||||
participant_id,
|
||||
thread_pool.clone(),
|
||||
token_pool.subpool(1000000).unwrap(),
|
||||
)]);
|
||||
Self {
|
||||
token_pool,
|
||||
mio_workers,
|
||||
thread_pool,
|
||||
participant_id,
|
||||
@ -79,21 +88,22 @@ impl<E: Events> Network<E> {
|
||||
}
|
||||
|
||||
pub fn listen(&self, addr: &Address) {
|
||||
let mio_workers = self.mio_workers.clone();
|
||||
let worker = Self::get_lowest_worker(&self.mio_workers);
|
||||
let pipe = worker.get_tx();
|
||||
let address = addr.clone();
|
||||
self.thread_pool.execute(move || {
|
||||
let mut span = span!(Level::INFO, "listen", ?address);
|
||||
let span = span!(Level::INFO, "listen", ?address);
|
||||
let _enter = span.enter();
|
||||
match address {
|
||||
Address::Tcp(a) => {
|
||||
info!("listening");
|
||||
let tcp_listener = TcpListener::bind(&a).unwrap();
|
||||
let worker = Self::get_lowest_worker(&mio_workers);
|
||||
worker.register(
|
||||
pipe.send(CtrlMsg::Register(
|
||||
TokenObjects::TcpListener(tcp_listener),
|
||||
Ready::readable(),
|
||||
PollOpt::edge(),
|
||||
);
|
||||
))
|
||||
.unwrap();
|
||||
},
|
||||
Address::Udp(_) => unimplemented!("lazy me"),
|
||||
}
|
||||
@ -101,8 +111,10 @@ impl<E: Events> Network<E> {
|
||||
}
|
||||
|
||||
pub fn connect(&self, addr: &Address) -> Participant {
|
||||
let mio_workers = self.mio_workers.clone();
|
||||
let worker = Self::get_lowest_worker(&self.mio_workers);
|
||||
let pipe = worker.get_tx();
|
||||
let address = addr.clone();
|
||||
let pid = self.participant_id;
|
||||
self.thread_pool.execute(move || {
|
||||
let mut span = span!(Level::INFO, "connect", ?address);
|
||||
let _enter = span.enter();
|
||||
@ -116,12 +128,15 @@ impl<E: Events> Network<E> {
|
||||
},
|
||||
Ok(s) => s,
|
||||
};
|
||||
let worker = Self::get_lowest_worker(&mio_workers);
|
||||
worker.register(
|
||||
TokenObjects::TcpChannel(TcpChannel::new(tcp_stream)),
|
||||
Ready::readable(),
|
||||
let mut channel = TcpChannel::new(tcp_stream);
|
||||
channel.handshake();
|
||||
channel.participant_id(pid);
|
||||
pipe.send(CtrlMsg::Register(
|
||||
TokenObjects::TcpChannel(channel),
|
||||
Ready::readable() | Ready::writable(),
|
||||
PollOpt::edge(),
|
||||
);
|
||||
))
|
||||
.unwrap();
|
||||
},
|
||||
Address::Udp(_) => unimplemented!("lazy me"),
|
||||
}
|
||||
@ -129,7 +144,16 @@ impl<E: Events> Network<E> {
|
||||
Participant { addr: addr.clone() }
|
||||
}
|
||||
|
||||
pub fn open(&self, part: Participant, prio: u8, prom: EnumSet<Promise>) -> Stream { Stream {} }
|
||||
pub fn open(&self, part: Participant, prio: u8, promises: EnumSet<Promise>) -> Stream {
|
||||
for worker in self.mio_workers.iter() {
|
||||
worker.get_tx().send(CtrlMsg::OpenStream {
|
||||
pid: uuid::Uuid::new_v4(),
|
||||
prio,
|
||||
promises,
|
||||
});
|
||||
}
|
||||
Stream {}
|
||||
}
|
||||
|
||||
pub fn close(&self, stream: Stream) {}
|
||||
}
|
||||
|
@ -1,12 +0,0 @@
|
||||
#[derive(Debug)]
|
||||
pub enum TcpFrame {
|
||||
Header {
|
||||
id: u64,
|
||||
length: u64,
|
||||
},
|
||||
Data {
|
||||
id: u64,
|
||||
frame_no: u64,
|
||||
data: Vec<u8>,
|
||||
},
|
||||
}
|
@ -1,15 +1,47 @@
|
||||
use crate::api::Address;
|
||||
use crate::{
|
||||
api::{Address, Promise},
|
||||
message::{InCommingMessage, OutGoingMessage},
|
||||
};
|
||||
use enumset::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{collections::VecDeque, time::Instant};
|
||||
|
||||
pub(crate) const VELOREN_MAGIC_NUMBER: &str = "VELOREN";
|
||||
pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 1, 0];
|
||||
|
||||
pub(crate) trait Channel {
|
||||
fn get_preferred_queue_size() -> usize;
|
||||
fn get_preferred_buffer_len() -> usize;
|
||||
fn queue(&self, msg: Vec<u8>);
|
||||
fn recv(&self) -> Option<Vec<u8>>;
|
||||
/*
|
||||
uninitialized_dirty_speed_buffer: is just a already allocated buffer, that probably is already dirty because it's getting reused to save allocations, feel free to use it, but expect nothing
|
||||
aprox_time is the time taken when the events come in, you can reuse it for message timeouts, to not make any more syscalls
|
||||
*/
|
||||
/// Execute when ready to read
|
||||
fn read(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant);
|
||||
/// Execute when ready to write
|
||||
fn write(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant);
|
||||
fn open_stream(&mut self, prio: u8, promises: EnumSet<Promise>) -> u32;
|
||||
fn close_stream(&mut self, sid: u32);
|
||||
fn handshake(&mut self);
|
||||
fn participant_id(&mut self, pid: uuid::Uuid);
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum TcpFrame {
|
||||
Header {
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub(crate) enum Frame {
|
||||
Handshake {
|
||||
magic_number: String,
|
||||
version: [u32; 3],
|
||||
},
|
||||
ParticipantId {
|
||||
pid: uuid::Uuid,
|
||||
},
|
||||
OpenStream {
|
||||
sid: u32,
|
||||
prio: u8,
|
||||
promises: EnumSet<Promise>,
|
||||
},
|
||||
CloseStream {
|
||||
sid: u32,
|
||||
},
|
||||
DataHeader {
|
||||
id: u64,
|
||||
length: u64,
|
||||
},
|
||||
@ -20,6 +52,8 @@ pub(crate) enum TcpFrame {
|
||||
},
|
||||
}
|
||||
|
||||
pub(crate) type TcpFrame = Frame;
|
||||
|
||||
pub(crate) enum Protocol {
|
||||
Tcp,
|
||||
Udp,
|
||||
@ -33,3 +67,30 @@ impl Address {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Stream {
|
||||
sid: u32,
|
||||
prio: u8,
|
||||
promises: EnumSet<Promise>,
|
||||
to_send: VecDeque<OutGoingMessage>,
|
||||
to_receive: VecDeque<InCommingMessage>,
|
||||
}
|
||||
|
||||
impl Stream {
|
||||
pub fn new(sid: u32, prio: u8, promises: EnumSet<Promise>) -> Self {
|
||||
Stream {
|
||||
sid,
|
||||
prio,
|
||||
promises,
|
||||
to_send: VecDeque::new(),
|
||||
to_receive: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sid(&self) -> u32 { self.sid }
|
||||
|
||||
pub fn prio(&self) -> u8 { self.prio }
|
||||
|
||||
pub fn promises(&self) -> EnumSet<Promise> { self.promises }
|
||||
}
|
||||
|
@ -1,107 +0,0 @@
|
||||
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
||||
use std::io::{Read, Write};
|
||||
use tracing::*;
|
||||
|
||||
const VELOREN_MAGIC_NUMBER: &str = "VELOREN";
|
||||
const VELOREN_NETWORK_VERSION_MAJOR: u16 = 0;
|
||||
const VELOREN_NETWORK_VERSION_MINOR: u8 = 0;
|
||||
const VELOREN_NETWORK_VERSION_PATCH: u8 = 1;
|
||||
|
||||
pub fn encode_handshake1<W: Write>(stream: &mut W, participant_id: u64) {
|
||||
stream.write_all(VELOREN_MAGIC_NUMBER.as_bytes()).unwrap();
|
||||
stream.write_u8('\n' as u8).unwrap();
|
||||
stream
|
||||
.write_u16::<BigEndian>(VELOREN_NETWORK_VERSION_MAJOR)
|
||||
.unwrap();
|
||||
stream.write_u8('.' as u8).unwrap();
|
||||
stream.write_u8(VELOREN_NETWORK_VERSION_MINOR).unwrap();
|
||||
stream.write_u8('.' as u8).unwrap();
|
||||
stream.write_u8(VELOREN_NETWORK_VERSION_PATCH).unwrap();
|
||||
stream.write_u8('\n' as u8).unwrap();
|
||||
stream.write_u64::<BigEndian>(participant_id).unwrap();
|
||||
stream.write_u8('\n' as u8).unwrap();
|
||||
}
|
||||
|
||||
pub fn decode_handshake1<R: Read>(stream: &mut R) -> Result<(u16, u8, u8, u64), ()> {
|
||||
let mut veloren_buf: [u8; 7] = [0; 7];
|
||||
let mut major;
|
||||
let mut minor;
|
||||
let mut patch;
|
||||
let mut participant_id;
|
||||
match stream.read_exact(&mut veloren_buf) {
|
||||
Ok(()) if (veloren_buf == VELOREN_MAGIC_NUMBER.as_bytes()) => {},
|
||||
_ => {
|
||||
error!(?veloren_buf, "incompatible magic number");
|
||||
return Err(());
|
||||
},
|
||||
}
|
||||
match stream.read_u8().map(|u| u as char) {
|
||||
Ok('\n') => {},
|
||||
_ => return Err(()),
|
||||
}
|
||||
match stream.read_u16::<BigEndian>() {
|
||||
Ok(u) => major = u,
|
||||
_ => return Err(()),
|
||||
}
|
||||
match stream.read_u8().map(|u| u as char) {
|
||||
Ok('.') => {},
|
||||
_ => return Err(()),
|
||||
}
|
||||
match stream.read_u8() {
|
||||
Ok(u) => minor = u,
|
||||
_ => return Err(()),
|
||||
}
|
||||
match stream.read_u8().map(|u| u as char) {
|
||||
Ok('.') => {},
|
||||
_ => return Err(()),
|
||||
}
|
||||
match stream.read_u8() {
|
||||
Ok(u) => patch = u,
|
||||
_ => return Err(()),
|
||||
}
|
||||
match stream.read_u8().map(|u| u as char) {
|
||||
Ok('\n') => {},
|
||||
_ => return Err(()),
|
||||
}
|
||||
match stream.read_u64::<BigEndian>() {
|
||||
Ok(u) => participant_id = u,
|
||||
_ => return Err(()),
|
||||
}
|
||||
Ok((major, minor, patch, participant_id))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{internal_messages::*, tests::test_tracing};
|
||||
|
||||
#[test]
|
||||
fn handshake() {
|
||||
let mut data = Vec::new();
|
||||
encode_handshake1(&mut data, 1337);
|
||||
let dh = decode_handshake1(&mut data.as_slice());
|
||||
assert!(dh.is_ok());
|
||||
let (ma, mi, pa, p) = dh.unwrap();
|
||||
assert_eq!(ma, VELOREN_NETWORK_VERSION_MAJOR);
|
||||
assert_eq!(mi, VELOREN_NETWORK_VERSION_MINOR);
|
||||
assert_eq!(pa, VELOREN_NETWORK_VERSION_PATCH);
|
||||
assert_eq!(p, 1337);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handshake_decodeerror_incorrect() {
|
||||
let mut data = Vec::new();
|
||||
encode_handshake1(&mut data, 1337);
|
||||
data[3] = 'F' as u8;
|
||||
let dh = decode_handshake1(&mut data.as_slice());
|
||||
assert!(dh.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handshake_decodeerror_toless() {
|
||||
let mut data = Vec::new();
|
||||
encode_handshake1(&mut data, 1337);
|
||||
data.drain(9..);
|
||||
let dh = decode_handshake1(&mut data.as_slice());
|
||||
assert!(dh.is_err());
|
||||
}
|
||||
}
|
@ -1,8 +1,6 @@
|
||||
#![feature(trait_alias)]
|
||||
mod api;
|
||||
mod frame;
|
||||
mod internal;
|
||||
mod internal_messages;
|
||||
mod message;
|
||||
mod mio_worker;
|
||||
mod tcp_channel;
|
||||
@ -30,7 +28,6 @@ pub mod tests {
|
||||
|
||||
pub fn test_tracing() {
|
||||
use tracing::Level;
|
||||
use tracing_subscriber;
|
||||
|
||||
tracing_subscriber::FmtSubscriber::builder()
|
||||
// all spans/events with a level higher than TRACE (e.g, info, warn, etc.)
|
||||
@ -62,12 +59,14 @@ pub mod tests {
|
||||
//let a2 = Address::Tcp(SocketAddr::from(([10, 42, 2, 2], 52001)));
|
||||
n1.listen(&a1); //await
|
||||
n2.listen(&a2); // only requiered here, but doesnt hurt on n1
|
||||
std::thread::sleep(std::time::Duration::from_millis(5));
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
|
||||
let p1 = n1.connect(&a2); //await
|
||||
//n2.OnRemoteConnectionOpen triggered
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
|
||||
let s1 = n1.open(p1, 16, Promise::InOrder | Promise::NoCorrupt);
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
//n2.OnRemoteStreamOpen triggered
|
||||
|
||||
n1.send("", &s1);
|
||||
|
@ -1,15 +1,13 @@
|
||||
use crate::tcp_channel::TcpChannel;
|
||||
use crate::{api::Promise, internal::Channel, message::OutGoingMessage, tcp_channel::TcpChannel};
|
||||
use enumset::EnumSet;
|
||||
use mio::{self, net::TcpListener, Poll, PollOpt, Ready, Token};
|
||||
use rand::{self, seq::IteratorRandom};
|
||||
use mio_extras::channel::{channel, Receiver, Sender};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
io::{Read, Write},
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, RwLock,
|
||||
},
|
||||
time::{Duration, Instant},
|
||||
sync::{mpsc::TryRecvError, Arc, RwLock},
|
||||
time::Instant,
|
||||
};
|
||||
use tlid;
|
||||
use tracing::{debug, error, info, span, trace, warn, Level};
|
||||
use uvth::ThreadPool;
|
||||
|
||||
@ -20,23 +18,19 @@ pub(crate) enum TokenObjects {
|
||||
}
|
||||
|
||||
pub(crate) struct MioTokens {
|
||||
next_token_id: usize,
|
||||
pool: tlid::Pool<tlid::Wrapping<usize>>,
|
||||
pub tokens: HashMap<Token, TokenObjects>, //TODO: move to Vec<Options> for faster lookup
|
||||
}
|
||||
|
||||
impl MioTokens {
|
||||
pub fn new() -> Self {
|
||||
pub fn new(pool: tlid::Pool<tlid::Wrapping<usize>>) -> Self {
|
||||
MioTokens {
|
||||
next_token_id: 10,
|
||||
pool,
|
||||
tokens: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn construct(&mut self) -> Token {
|
||||
let tok = Token(self.next_token_id);
|
||||
self.next_token_id += 1;
|
||||
tok
|
||||
}
|
||||
pub fn construct(&mut self) -> Token { Token(self.pool.next()) }
|
||||
|
||||
pub fn insert(&mut self, tok: Token, obj: TokenObjects) {
|
||||
trace!(?tok, ?obj, "added new token");
|
||||
@ -51,46 +45,71 @@ pub struct MioStatistics {
|
||||
nano_busy: u128,
|
||||
}
|
||||
|
||||
pub(crate) enum CtrlMsg {
|
||||
Shutdown,
|
||||
Register(TokenObjects, Ready, PollOpt),
|
||||
OpenStream {
|
||||
pid: uuid::Uuid,
|
||||
prio: u8,
|
||||
promises: EnumSet<Promise>,
|
||||
},
|
||||
CloseStream {
|
||||
pid: uuid::Uuid,
|
||||
sid: u32,
|
||||
},
|
||||
Send(OutGoingMessage),
|
||||
}
|
||||
|
||||
/*
|
||||
The MioWorker runs in it's own thread,
|
||||
it has a given set of Channels to work with.
|
||||
It is monitored, and when it's thread is fully loaded it can be splitted up into 2 MioWorkers
|
||||
*/
|
||||
pub struct MioWorker {
|
||||
worker_tag: u64, /* only relevant for logs */
|
||||
tag: u64, /* only relevant for logs */
|
||||
pid: uuid::Uuid,
|
||||
poll: Arc<Poll>,
|
||||
mio_tokens: Arc<RwLock<MioTokens>>,
|
||||
mio_statistics: Arc<RwLock<MioStatistics>>,
|
||||
shutdown: Arc<AtomicBool>,
|
||||
ctrl_tx: Sender<CtrlMsg>,
|
||||
}
|
||||
|
||||
impl MioWorker {
|
||||
const CTRL_TOK: Token = Token(0);
|
||||
pub const CTRL_TOK: Token = Token(0);
|
||||
|
||||
pub fn new(worker_tag: u64, thread_pool: Arc<ThreadPool>) -> Self {
|
||||
pub fn new(
|
||||
tag: u64,
|
||||
pid: uuid::Uuid,
|
||||
thread_pool: Arc<ThreadPool>,
|
||||
mut token_pool: tlid::Pool<tlid::Wrapping<usize>>,
|
||||
) -> Self {
|
||||
let poll = Arc::new(Poll::new().unwrap());
|
||||
let poll_clone = poll.clone();
|
||||
let mio_tokens = Arc::new(RwLock::new(MioTokens::new()));
|
||||
let mio_tokens_clone = mio_tokens.clone();
|
||||
let mio_statistics = Arc::new(RwLock::new(MioStatistics::default()));
|
||||
let mio_statistics_clone = mio_statistics.clone();
|
||||
let shutdown = Arc::new(AtomicBool::new(false));
|
||||
let shutdown_clone = shutdown.clone();
|
||||
|
||||
let (ctrl_tx, ctrl_rx) = channel();
|
||||
poll.register(&ctrl_rx, Self::CTRL_TOK, Ready::readable(), PollOpt::edge())
|
||||
.unwrap();
|
||||
// reserve 10 tokens in case they start with 0, //TODO: cleaner method
|
||||
for _ in 0..10 {
|
||||
token_pool.next();
|
||||
}
|
||||
|
||||
let mw = MioWorker {
|
||||
worker_tag,
|
||||
tag,
|
||||
pid,
|
||||
poll,
|
||||
mio_tokens,
|
||||
mio_statistics,
|
||||
shutdown,
|
||||
ctrl_tx,
|
||||
};
|
||||
thread_pool.execute(move || {
|
||||
mio_worker(
|
||||
worker_tag,
|
||||
tag,
|
||||
pid,
|
||||
poll_clone,
|
||||
mio_tokens_clone,
|
||||
mio_statistics_clone,
|
||||
shutdown_clone,
|
||||
token_pool,
|
||||
ctrl_rx,
|
||||
)
|
||||
});
|
||||
mw
|
||||
@ -102,86 +121,118 @@ impl MioWorker {
|
||||
}
|
||||
|
||||
//TODO: split 4->5 MioWorkers and merge 5->4 MioWorkers
|
||||
pub fn split(&self, worker_id: u64, thread_pool: Arc<ThreadPool>) -> Self {
|
||||
//fork off a second MioWorker and split load
|
||||
let second = MioWorker::new(worker_id, thread_pool);
|
||||
{
|
||||
let mut first_tokens = self.mio_tokens.write().unwrap();
|
||||
let mut second_tokens = second.mio_tokens.write().unwrap();
|
||||
let cnt = first_tokens.tokens.len() / 2;
|
||||
|
||||
for (key, val) in first_tokens
|
||||
.tokens
|
||||
.drain()
|
||||
.choose_multiple(&mut rand::thread_rng(), cnt / 2)
|
||||
{
|
||||
second_tokens.tokens.insert(key, val);
|
||||
}
|
||||
info!(
|
||||
"split MioWorker with {} tokens. New MioWorker has now {} tokens",
|
||||
cnt,
|
||||
second_tokens.tokens.len()
|
||||
);
|
||||
}
|
||||
second
|
||||
}
|
||||
|
||||
pub fn merge(&self, other: MioWorker) {
|
||||
//fork off a second MioWorker and split load
|
||||
let mut first_tokens = self.mio_tokens.write().unwrap();
|
||||
let mut second_tokens = other.mio_tokens.write().unwrap();
|
||||
let cnt = first_tokens.tokens.len();
|
||||
|
||||
for (key, val) in second_tokens.tokens.drain() {
|
||||
first_tokens.tokens.insert(key, val);
|
||||
}
|
||||
info!(
|
||||
"merge MioWorker with {} tokens. New MioWorker has now {} tokens",
|
||||
cnt,
|
||||
first_tokens.tokens.len()
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn register(&self, handle: TokenObjects, interest: Ready, opts: PollOpt) {
|
||||
let mut tokens = self.mio_tokens.write().unwrap();
|
||||
let tok = tokens.construct();
|
||||
match &handle {
|
||||
TokenObjects::TcpListener(h) => self.poll.register(h, tok, interest, opts).unwrap(),
|
||||
TokenObjects::TcpChannel(channel) => self
|
||||
.poll
|
||||
.register(&channel.stream, tok, interest, opts)
|
||||
.unwrap(),
|
||||
}
|
||||
trace!(?handle, ?tok, "registered");
|
||||
tokens.insert(tok, handle);
|
||||
}
|
||||
pub(crate) fn get_tx(&self) -> Sender<CtrlMsg> { self.ctrl_tx.clone() }
|
||||
}
|
||||
|
||||
impl Drop for MioWorker {
|
||||
fn drop(&mut self) { self.shutdown.store(true, Ordering::Relaxed); }
|
||||
fn drop(&mut self) { let _ = self.ctrl_tx.send(CtrlMsg::Shutdown); }
|
||||
}
|
||||
|
||||
fn mio_worker(
|
||||
worker_tag: u64,
|
||||
tag: u64,
|
||||
pid: uuid::Uuid,
|
||||
poll: Arc<Poll>,
|
||||
mio_tokens: Arc<RwLock<MioTokens>>,
|
||||
mio_statistics: Arc<RwLock<MioStatistics>>,
|
||||
shutdown: Arc<AtomicBool>,
|
||||
mut token_pool: tlid::Pool<tlid::Wrapping<usize>>,
|
||||
ctrl_rx: Receiver<CtrlMsg>,
|
||||
) {
|
||||
let mut mio_tokens = MioTokens::new(token_pool);
|
||||
let mut events = mio::Events::with_capacity(1024);
|
||||
let span = span!(Level::INFO, "mio worker", ?worker_tag);
|
||||
let mut buf: [u8; 65000] = [0; 65000];
|
||||
let span = span!(Level::INFO, "mio worker", ?tag);
|
||||
let _enter = span.enter();
|
||||
while !shutdown.load(Ordering::Relaxed) {
|
||||
loop {
|
||||
let time_before_poll = Instant::now();
|
||||
if let Err(err) = poll.poll(&mut events, Some(Duration::from_millis(1000))) {
|
||||
if let Err(err) = poll.poll(&mut events, None) {
|
||||
error!("network poll error: {}", err);
|
||||
return;
|
||||
}
|
||||
let time_after_poll = Instant::now();
|
||||
|
||||
if !events.is_empty() {
|
||||
let mut mio_tokens = mio_tokens.write().unwrap();
|
||||
for event in &events {
|
||||
match event.token() {
|
||||
MioWorker::CTRL_TOK => {
|
||||
if handle_ctl(&ctrl_rx, &mut mio_tokens, &poll, &mut buf, time_after_poll) {
|
||||
return;
|
||||
}
|
||||
},
|
||||
_ => handle_tok(
|
||||
pid,
|
||||
event,
|
||||
&mut mio_tokens,
|
||||
&poll,
|
||||
&mut buf,
|
||||
time_after_poll,
|
||||
),
|
||||
};
|
||||
}
|
||||
handle_statistics(&mio_statistics, time_before_poll, time_after_poll);
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_ctl(
|
||||
ctrl_rx: &Receiver<CtrlMsg>,
|
||||
mio_tokens: &mut MioTokens,
|
||||
poll: &Arc<Poll>,
|
||||
buf: &mut [u8; 65000],
|
||||
time_after_poll: Instant,
|
||||
) -> bool {
|
||||
match ctrl_rx.try_recv() {
|
||||
Ok(CtrlMsg::Shutdown) => {
|
||||
debug!("Shutting Down");
|
||||
return true;
|
||||
},
|
||||
Ok(CtrlMsg::Register(handle, interest, opts)) => {
|
||||
let tok = mio_tokens.construct();
|
||||
match &handle {
|
||||
TokenObjects::TcpListener(h) => poll.register(h, tok, interest, opts).unwrap(),
|
||||
TokenObjects::TcpChannel(channel) => poll
|
||||
.register(&channel.tcpstream, tok, interest, opts)
|
||||
.unwrap(),
|
||||
}
|
||||
debug!(?handle, ?tok, "Registered new handle");
|
||||
mio_tokens.insert(tok, handle);
|
||||
},
|
||||
Ok(CtrlMsg::OpenStream {
|
||||
pid,
|
||||
prio,
|
||||
promises,
|
||||
}) => {
|
||||
for (tok, obj) in mio_tokens.tokens.iter_mut() {
|
||||
if let TokenObjects::TcpChannel(channel) = obj {
|
||||
channel.open_stream(prio, promises); //TODO: check participant
|
||||
channel.write(buf, time_after_poll);
|
||||
}
|
||||
}
|
||||
//TODO:
|
||||
},
|
||||
Ok(CtrlMsg::CloseStream { pid, sid }) => {
|
||||
//TODO:
|
||||
for to in mio_tokens.tokens.values_mut() {
|
||||
if let TokenObjects::TcpChannel(channel) = to {
|
||||
channel.close_stream(sid); //TODO: check participant
|
||||
channel.write(buf, time_after_poll);
|
||||
}
|
||||
}
|
||||
},
|
||||
Ok(_) => unimplemented!("dad"),
|
||||
Err(TryRecvError::Empty) => {},
|
||||
Err(err) => {
|
||||
//postbox_tx.send(Err(err.into()))?;
|
||||
return true;
|
||||
},
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
fn handle_tok(
|
||||
pid: uuid::Uuid,
|
||||
event: mio::Event,
|
||||
mio_tokens: &mut MioTokens,
|
||||
poll: &Arc<Poll>,
|
||||
buf: &mut [u8; 65000],
|
||||
time_after_poll: Instant,
|
||||
) {
|
||||
match mio_tokens.tokens.get_mut(&event.token()) {
|
||||
Some(e) => {
|
||||
trace!(?event, "event");
|
||||
@ -189,8 +240,6 @@ fn mio_worker(
|
||||
TokenObjects::TcpListener(listener) => match listener.accept() {
|
||||
Ok((mut remote_stream, _)) => {
|
||||
info!(?remote_stream, "remote connected");
|
||||
remote_stream.write_all("Hello Client".as_bytes()).unwrap();
|
||||
remote_stream.flush().unwrap();
|
||||
|
||||
let tok = mio_tokens.construct();
|
||||
poll.register(
|
||||
@ -201,10 +250,13 @@ fn mio_worker(
|
||||
)
|
||||
.unwrap();
|
||||
trace!(?remote_stream, ?tok, "registered");
|
||||
mio_tokens.tokens.insert(
|
||||
tok,
|
||||
TokenObjects::TcpChannel(TcpChannel::new(remote_stream)),
|
||||
);
|
||||
let mut channel = TcpChannel::new(remote_stream);
|
||||
channel.handshake();
|
||||
channel.participant_id(pid);
|
||||
|
||||
mio_tokens
|
||||
.tokens
|
||||
.insert(tok, TokenObjects::TcpChannel(channel));
|
||||
},
|
||||
Err(err) => {
|
||||
error!(?err, "error during remote connected");
|
||||
@ -212,57 +264,25 @@ fn mio_worker(
|
||||
},
|
||||
TokenObjects::TcpChannel(channel) => {
|
||||
if event.readiness().is_readable() {
|
||||
trace!(?channel.stream, "stream readable");
|
||||
//TODO: read values here and put to message assembly
|
||||
let mut buf: [u8; 1500] = [0; 1500];
|
||||
match channel.stream.read(&mut buf) {
|
||||
Ok(n) => {
|
||||
warn!("incomming message with len: {}", n);
|
||||
channel
|
||||
.to_receive
|
||||
.write()
|
||||
.unwrap()
|
||||
.push_back(buf.to_vec());
|
||||
},
|
||||
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
debug!("would block");
|
||||
},
|
||||
Err(e) => {
|
||||
panic!("{}", e);
|
||||
},
|
||||
};
|
||||
trace!(?channel.tcpstream, "stream readable");
|
||||
channel.read(buf, time_after_poll);
|
||||
}
|
||||
if event.readiness().is_writable() {
|
||||
debug!(?channel.stream, "stream writeable");
|
||||
let mut to_send = channel.to_send.write().unwrap();
|
||||
if let Some(mut data) = to_send.pop_front() {
|
||||
let total = data.len();
|
||||
match channel.stream.write(&data) {
|
||||
Ok(n) if n == total => {},
|
||||
Ok(n) => {
|
||||
debug!("could only send part");
|
||||
let data = data.drain(n..).collect(); //TODO: validate n.. is correct
|
||||
to_send.push_front(data);
|
||||
},
|
||||
Err(e)
|
||||
if e.kind() == std::io::ErrorKind::WouldBlock =>
|
||||
{
|
||||
debug!("would block");
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("{}", e);
|
||||
},
|
||||
};
|
||||
};
|
||||
trace!(?channel.tcpstream, "stream writeable");
|
||||
channel.write(buf, time_after_poll);
|
||||
}
|
||||
},
|
||||
_ => unimplemented!("still lazy me"),
|
||||
}
|
||||
},
|
||||
None => panic!("Unexpected event token '{:?}'", &event.token()),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_statistics(
|
||||
mio_statistics: &Arc<RwLock<MioStatistics>>,
|
||||
time_before_poll: Instant,
|
||||
time_after_poll: Instant,
|
||||
) {
|
||||
let time_after_work = Instant::now();
|
||||
match mio_statistics.try_write() {
|
||||
Ok(mut mio_statistics) => {
|
||||
@ -272,11 +292,9 @@ fn mio_worker(
|
||||
// value this means, that we start to see load comming up after
|
||||
// 500ms, but not for small spikes - as reordering for smaller spikes would be
|
||||
// to slow
|
||||
mio_statistics.nano_wait = (mio_statistics.nano_wait as f64 * OLD_KEEP_FACTOR)
|
||||
as u128
|
||||
mio_statistics.nano_wait = (mio_statistics.nano_wait as f64 * OLD_KEEP_FACTOR) as u128
|
||||
+ time_after_poll.duration_since(time_before_poll).as_nanos();
|
||||
mio_statistics.nano_busy = (mio_statistics.nano_busy as f64 * OLD_KEEP_FACTOR)
|
||||
as u128
|
||||
mio_statistics.nano_busy = (mio_statistics.nano_busy as f64 * OLD_KEEP_FACTOR) as u128
|
||||
+ time_after_work.duration_since(time_after_poll).as_nanos();
|
||||
|
||||
trace!(
|
||||
@ -287,5 +305,4 @@ fn mio_worker(
|
||||
},
|
||||
Err(e) => warn!("statistics dropped because they are currently accecssed"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,40 +1,190 @@
|
||||
use crate::internal::Channel;
|
||||
use crate::{
|
||||
api::Promise,
|
||||
internal::{Channel, Stream, TcpFrame, VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION},
|
||||
};
|
||||
use bincode;
|
||||
use enumset::EnumSet;
|
||||
use mio::{self, net::TcpStream};
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::{Arc, RwLock},
|
||||
io::{Read, Write},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::{debug, error, info, span, trace, warn, Level};
|
||||
use tracing::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TcpChannel {
|
||||
pub stream: TcpStream,
|
||||
pub to_send: RwLock<VecDeque<Vec<u8>>>,
|
||||
pub to_receive: RwLock<VecDeque<Vec<u8>>>,
|
||||
stream_id_pool: tlid::Pool<tlid::Wrapping<u32>>, //TODO: stream_id unique per participant
|
||||
msg_id_pool: tlid::Pool<tlid::Wrapping<u64>>, //TODO: msg_id unique per participant
|
||||
participant_id: Option<uuid::Uuid>,
|
||||
pub tcpstream: TcpStream,
|
||||
pub streams: Vec<Stream>,
|
||||
pub send_queue: VecDeque<TcpFrame>,
|
||||
pub recv_queue: VecDeque<TcpFrame>,
|
||||
}
|
||||
|
||||
impl TcpChannel {
|
||||
pub fn new(stream: TcpStream) -> Self {
|
||||
pub fn new(tcpstream: TcpStream) -> Self {
|
||||
TcpChannel {
|
||||
stream,
|
||||
to_send: RwLock::new(VecDeque::new()),
|
||||
to_receive: RwLock::new(VecDeque::new()),
|
||||
stream_id_pool: tlid::Pool::new_full(),
|
||||
msg_id_pool: tlid::Pool::new_full(),
|
||||
participant_id: None,
|
||||
tcpstream,
|
||||
streams: Vec::new(),
|
||||
send_queue: VecDeque::new(),
|
||||
recv_queue: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_frame(&mut self, frame: TcpFrame) {
|
||||
match frame {
|
||||
TcpFrame::Handshake {
|
||||
magic_number,
|
||||
version,
|
||||
} => {
|
||||
if magic_number != VELOREN_MAGIC_NUMBER {
|
||||
error!("tcp connection with invalid handshake, closing connection");
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
debug!("sending client instructions before killing");
|
||||
let _ = self.tcpstream.write(
|
||||
"Handshake does not contain the magic number requiered by veloren \
|
||||
server.\nWe are not sure if you are a valid veloren client.\nClosing \
|
||||
the connection"
|
||||
.as_bytes(),
|
||||
);
|
||||
}
|
||||
}
|
||||
if version != VELOREN_NETWORK_VERSION {
|
||||
error!("tcp connection with wrong network version");
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
debug!("sending client instructions before killing");
|
||||
let _ = self.tcpstream.write(
|
||||
format!(
|
||||
"Handshake does not contain a correct magic number, but invalid \
|
||||
version.\nWe don't know how to communicate with you.\nOur \
|
||||
Version: {:?}\nYour Version: {:?}\nClosing the connection",
|
||||
VELOREN_NETWORK_VERSION, version,
|
||||
)
|
||||
.as_bytes(),
|
||||
);
|
||||
}
|
||||
}
|
||||
info!(?self, "handshake completed");
|
||||
},
|
||||
TcpFrame::ParticipantId { pid } => {
|
||||
self.participant_id = Some(pid);
|
||||
info!("Participant: {} send their ID", pid);
|
||||
},
|
||||
TcpFrame::OpenStream {
|
||||
sid,
|
||||
prio,
|
||||
promises,
|
||||
} => {
|
||||
if let Some(pid) = self.participant_id {
|
||||
let sid = self.stream_id_pool.next();
|
||||
let stream = Stream::new(sid, prio, promises.clone());
|
||||
self.streams.push(stream);
|
||||
info!("Participant: {} opened a stream", pid);
|
||||
}
|
||||
},
|
||||
TcpFrame::CloseStream { sid } => {
|
||||
if let Some(pid) = self.participant_id {
|
||||
self.streams.retain(|stream| stream.sid() != sid);
|
||||
info!("Participant: {} closed a stream", pid);
|
||||
}
|
||||
},
|
||||
TcpFrame::DataHeader { id, length } => {
|
||||
info!("Data Header {}", id);
|
||||
},
|
||||
TcpFrame::Data { id, frame_no, data } => {
|
||||
info!("Data Package {}", id);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Channel for TcpChannel {
|
||||
fn get_preferred_queue_size() -> usize {
|
||||
1400 /*TCP MTU is often 1500, minus some headers*/
|
||||
//TODO: get this from the underlying network interface
|
||||
fn read(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant) {
|
||||
match self.tcpstream.read(uninitialized_dirty_speed_buffer) {
|
||||
Ok(n) => {
|
||||
trace!("incomming message with len: {}", n);
|
||||
let mut cur = std::io::Cursor::new(&uninitialized_dirty_speed_buffer[..n]);
|
||||
while cur.position() < n as u64 {
|
||||
let r: Result<TcpFrame, _> = bincode::deserialize_from(&mut cur);
|
||||
match r {
|
||||
Ok(frame) => self.handle_frame(frame),
|
||||
Err(e) => {
|
||||
error!(
|
||||
?self,
|
||||
?e,
|
||||
"failure parsing a message with len: {}, starting with: {:?}",
|
||||
n,
|
||||
&uninitialized_dirty_speed_buffer[0..std::cmp::min(n, 10)]
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
debug!("would block");
|
||||
},
|
||||
Err(e) => {
|
||||
panic!("{}", e);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
fn get_preferred_buffer_len() -> usize {
|
||||
5
|
||||
// = 1400*5 = 7000bytes => 0.0056s of buffer on 10Mbit/s network
|
||||
fn write(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant) {
|
||||
while let Some(elem) = self.send_queue.pop_front() {
|
||||
if let Ok(mut data) = bincode::serialize(&elem) {
|
||||
let total = data.len();
|
||||
match self.tcpstream.write(&data) {
|
||||
Ok(n) if n == total => {},
|
||||
Ok(n) => {
|
||||
error!("could only send part");
|
||||
//let data = data.drain(n..).collect(); //TODO:
|
||||
// validate n.. is correct
|
||||
// to_send.push_front(data);
|
||||
},
|
||||
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
debug!("would block");
|
||||
},
|
||||
Err(e) => {
|
||||
panic!("{}", e);
|
||||
},
|
||||
};
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn queue(&self, msg: Vec<u8>) {}
|
||||
fn open_stream(&mut self, prio: u8, promises: EnumSet<Promise>) -> u32 {
|
||||
// validate promises
|
||||
let sid = self.stream_id_pool.next();
|
||||
let stream = Stream::new(sid, prio, promises.clone());
|
||||
self.streams.push(stream);
|
||||
self.send_queue.push_back(TcpFrame::OpenStream {
|
||||
sid,
|
||||
prio,
|
||||
promises,
|
||||
});
|
||||
sid
|
||||
}
|
||||
|
||||
fn recv(&self) -> Option<Vec<u8>> { None }
|
||||
fn close_stream(&mut self, sid: u32) {
|
||||
self.streams.retain(|stream| stream.sid() != sid);
|
||||
self.send_queue.push_back(TcpFrame::CloseStream { sid });
|
||||
}
|
||||
|
||||
fn handshake(&mut self) {
|
||||
self.send_queue.push_back(TcpFrame::Handshake {
|
||||
magic_number: VELOREN_MAGIC_NUMBER.to_string(),
|
||||
version: VELOREN_NETWORK_VERSION,
|
||||
});
|
||||
}
|
||||
|
||||
fn participant_id(&mut self, pid: uuid::Uuid) {
|
||||
self.send_queue.push_back(TcpFrame::ParticipantId { pid });
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user