mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
Bring networking tests to green
- Seperate worker into own directory - implement correct handshakes - implement correct receiving
This commit is contained in:
parent
3d8ddcb4b3
commit
5c5b33bd2a
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -4653,6 +4653,7 @@ name = "tlid"
|
||||
version = "0.2.2"
|
||||
dependencies = [
|
||||
"num-traits 0.2.11",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -19,4 +19,4 @@ tracing-subscriber = "0.2.0-alpha.4"
|
||||
byteorder = "1.3"
|
||||
mio-extras = "2.0"
|
||||
uuid = { version = "0.8", features = ["serde", "v4"] }
|
||||
tlid = { path = "../../tlid" }
|
||||
tlid = { path = "../../tlid", features = ["serde"]}
|
@ -1,8 +1,12 @@
|
||||
use crate::{
|
||||
internal::Channel,
|
||||
message::{self, Message},
|
||||
mio_worker::{CtrlMsg, MioWorker, TokenObjects},
|
||||
tcp_channel::TcpChannel,
|
||||
internal::RemoteParticipant,
|
||||
message::{self, OutGoingMessage},
|
||||
worker::{
|
||||
channel::Channel,
|
||||
tcp::TcpChannel,
|
||||
types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects},
|
||||
Controller,
|
||||
},
|
||||
};
|
||||
use enumset::*;
|
||||
use mio::{
|
||||
@ -10,8 +14,12 @@ use mio::{
|
||||
net::{TcpListener, TcpStream},
|
||||
PollOpt, Ready,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{marker::PhantomData, sync::Arc};
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
marker::PhantomData,
|
||||
sync::{mpsc::TryRecvError, Arc, RwLock},
|
||||
};
|
||||
use tlid;
|
||||
use tracing::*;
|
||||
use uuid::Uuid;
|
||||
@ -38,7 +46,9 @@ pub struct Participant {
|
||||
|
||||
pub struct Connection {}
|
||||
|
||||
pub struct Stream {}
|
||||
pub struct Stream {
|
||||
sid: Sid,
|
||||
}
|
||||
|
||||
pub trait Events {
|
||||
fn on_remote_connection_open(net: &Network<Self>, con: &Connection)
|
||||
@ -57,38 +67,87 @@ pub trait Events {
|
||||
|
||||
pub struct Network<E: Events> {
|
||||
token_pool: tlid::Pool<tlid::Wrapping<usize>>,
|
||||
mio_workers: Arc<Vec<MioWorker>>,
|
||||
worker_pool: tlid::Pool<tlid::Wrapping<u64>>,
|
||||
controller: Arc<Vec<Controller>>,
|
||||
thread_pool: Arc<ThreadPool>,
|
||||
participant_id: Uuid,
|
||||
participant_id: Pid,
|
||||
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
|
||||
_pe: PhantomData<E>,
|
||||
}
|
||||
|
||||
impl<E: Events> Network<E> {
|
||||
pub fn new(participant_id: Uuid, thread_pool: Arc<ThreadPool>) -> Self {
|
||||
let mut token_pool = tlid::Pool::new_full();
|
||||
let mio_workers = Arc::new(vec![MioWorker::new(
|
||||
(participant_id.as_u128().rem_euclid(1024)) as u64,
|
||||
let mut worker_pool = tlid::Pool::new_full();
|
||||
let remotes = Arc::new(RwLock::new(HashMap::new()));
|
||||
for _ in 0..participant_id.as_u128().rem_euclid(64) {
|
||||
worker_pool.next();
|
||||
//random offset from 0 for tests where multiple networks are
|
||||
// created and we do not want to polute the traces with
|
||||
// network pid everytime
|
||||
}
|
||||
let controller = Arc::new(vec![Controller::new(
|
||||
worker_pool.next(),
|
||||
participant_id,
|
||||
thread_pool.clone(),
|
||||
token_pool.subpool(1000000).unwrap(),
|
||||
remotes.clone(),
|
||||
)]);
|
||||
Self {
|
||||
token_pool,
|
||||
mio_workers,
|
||||
worker_pool,
|
||||
controller,
|
||||
thread_pool,
|
||||
participant_id,
|
||||
remotes,
|
||||
_pe: PhantomData::<E> {},
|
||||
}
|
||||
}
|
||||
|
||||
fn get_lowest_worker<'a: 'b, 'b>(list: &'a Arc<Vec<MioWorker>>) -> &'a MioWorker { &list[0] }
|
||||
fn get_lowest_worker<'a: 'b, 'b>(list: &'a Arc<Vec<Controller>>) -> &'a Controller { &list[0] }
|
||||
|
||||
pub fn send<'a, M: Message<'a>>(&self, msg: M, stream: &Stream) {
|
||||
let messagebuffer = message::serialize(&msg);
|
||||
pub fn send<M: Serialize>(&self, msg: M, stream: &Stream) {
|
||||
let messagebuffer = Arc::new(message::serialize(&msg));
|
||||
//transfer message to right worker to right channel to correct stream
|
||||
//TODO: why do we need a look here, i want my own local directory which is
|
||||
// updated by workes via a channel and needs to be intepreted on a send but it
|
||||
// should almost ever be empty except for new channel creations and stream
|
||||
// creations!
|
||||
for worker in self.controller.iter() {
|
||||
worker.get_tx().send(CtrlMsg::Send(OutGoingMessage {
|
||||
buffer: messagebuffer.clone(),
|
||||
cursor: 0,
|
||||
mid: None,
|
||||
sid: stream.sid,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn recv<M: DeserializeOwned>(&self, stream: &Stream) -> Option<M> {
|
||||
for worker in self.controller.iter() {
|
||||
let msg = match worker.get_rx().try_recv() {
|
||||
Ok(msg) => msg,
|
||||
Err(TryRecvError::Empty) => {
|
||||
return None;
|
||||
},
|
||||
Err(err) => {
|
||||
panic!("Unexpected error '{}'", err);
|
||||
},
|
||||
};
|
||||
|
||||
match msg {
|
||||
RtrnMsg::Receive(m) => {
|
||||
info!("delivering a message");
|
||||
return Some(message::deserialize(m.buffer));
|
||||
},
|
||||
_ => unimplemented!("woopsie"),
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn listen(&self, addr: &Address) {
|
||||
let worker = Self::get_lowest_worker(&self.mio_workers);
|
||||
let worker = Self::get_lowest_worker(&self.controller);
|
||||
let pipe = worker.get_tx();
|
||||
let address = addr.clone();
|
||||
self.thread_pool.execute(move || {
|
||||
@ -111,10 +170,11 @@ impl<E: Events> Network<E> {
|
||||
}
|
||||
|
||||
pub fn connect(&self, addr: &Address) -> Participant {
|
||||
let worker = Self::get_lowest_worker(&self.mio_workers);
|
||||
let worker = Self::get_lowest_worker(&self.controller);
|
||||
let pipe = worker.get_tx();
|
||||
let address = addr.clone();
|
||||
let pid = self.participant_id;
|
||||
let remotes = self.remotes.clone();
|
||||
self.thread_pool.execute(move || {
|
||||
let mut span = span!(Level::INFO, "connect", ?address);
|
||||
let _enter = span.enter();
|
||||
@ -128,9 +188,7 @@ impl<E: Events> Network<E> {
|
||||
},
|
||||
Ok(s) => s,
|
||||
};
|
||||
let mut channel = TcpChannel::new(tcp_stream);
|
||||
channel.handshake();
|
||||
channel.participant_id(pid);
|
||||
let mut channel = TcpChannel::new(tcp_stream, pid, remotes);
|
||||
pipe.send(CtrlMsg::Register(
|
||||
TokenObjects::TcpChannel(channel),
|
||||
Ready::readable() | Ready::writable(),
|
||||
@ -145,14 +203,14 @@ impl<E: Events> Network<E> {
|
||||
}
|
||||
|
||||
pub fn open(&self, part: Participant, prio: u8, promises: EnumSet<Promise>) -> Stream {
|
||||
for worker in self.mio_workers.iter() {
|
||||
for worker in self.controller.iter() {
|
||||
worker.get_tx().send(CtrlMsg::OpenStream {
|
||||
pid: uuid::Uuid::new_v4(),
|
||||
prio,
|
||||
promises,
|
||||
});
|
||||
}
|
||||
Stream {}
|
||||
Stream { sid: 0 }
|
||||
}
|
||||
|
||||
pub fn close(&self, stream: Stream) {}
|
||||
|
@ -1,59 +1,11 @@
|
||||
use crate::{
|
||||
api::{Address, Promise},
|
||||
message::{InCommingMessage, OutGoingMessage},
|
||||
api::Address,
|
||||
worker::types::{Mid, Sid},
|
||||
};
|
||||
use enumset::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{collections::VecDeque, time::Instant};
|
||||
|
||||
pub(crate) const VELOREN_MAGIC_NUMBER: &str = "VELOREN";
|
||||
pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 1, 0];
|
||||
|
||||
pub(crate) trait Channel {
|
||||
/*
|
||||
uninitialized_dirty_speed_buffer: is just a already allocated buffer, that probably is already dirty because it's getting reused to save allocations, feel free to use it, but expect nothing
|
||||
aprox_time is the time taken when the events come in, you can reuse it for message timeouts, to not make any more syscalls
|
||||
*/
|
||||
/// Execute when ready to read
|
||||
fn read(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant);
|
||||
/// Execute when ready to write
|
||||
fn write(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant);
|
||||
fn open_stream(&mut self, prio: u8, promises: EnumSet<Promise>) -> u32;
|
||||
fn close_stream(&mut self, sid: u32);
|
||||
fn handshake(&mut self);
|
||||
fn participant_id(&mut self, pid: uuid::Uuid);
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub(crate) enum Frame {
|
||||
Handshake {
|
||||
magic_number: String,
|
||||
version: [u32; 3],
|
||||
},
|
||||
ParticipantId {
|
||||
pid: uuid::Uuid,
|
||||
},
|
||||
OpenStream {
|
||||
sid: u32,
|
||||
prio: u8,
|
||||
promises: EnumSet<Promise>,
|
||||
},
|
||||
CloseStream {
|
||||
sid: u32,
|
||||
},
|
||||
DataHeader {
|
||||
id: u64,
|
||||
length: u64,
|
||||
},
|
||||
Data {
|
||||
id: u64,
|
||||
frame_no: u64,
|
||||
data: Vec<u8>,
|
||||
},
|
||||
}
|
||||
|
||||
pub(crate) type TcpFrame = Frame;
|
||||
|
||||
pub(crate) enum Protocol {
|
||||
Tcp,
|
||||
Udp,
|
||||
@ -69,28 +21,16 @@ impl Address {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Stream {
|
||||
sid: u32,
|
||||
prio: u8,
|
||||
promises: EnumSet<Promise>,
|
||||
to_send: VecDeque<OutGoingMessage>,
|
||||
to_receive: VecDeque<InCommingMessage>,
|
||||
pub struct RemoteParticipant {
|
||||
pub stream_id_pool: tlid::Pool<tlid::Wrapping<Sid>>,
|
||||
pub msg_id_pool: tlid::Pool<tlid::Wrapping<Mid>>,
|
||||
}
|
||||
|
||||
impl Stream {
|
||||
pub fn new(sid: u32, prio: u8, promises: EnumSet<Promise>) -> Self {
|
||||
Stream {
|
||||
sid,
|
||||
prio,
|
||||
promises,
|
||||
to_send: VecDeque::new(),
|
||||
to_receive: VecDeque::new(),
|
||||
impl RemoteParticipant {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
stream_id_pool: tlid::Pool::new_full(),
|
||||
msg_id_pool: tlid::Pool::new_full(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sid(&self) -> u32 { self.sid }
|
||||
|
||||
pub fn prio(&self) -> u8 { self.prio }
|
||||
|
||||
pub fn promises(&self) -> EnumSet<Promise> { self.promises }
|
||||
}
|
||||
|
@ -2,18 +2,18 @@
|
||||
mod api;
|
||||
mod internal;
|
||||
mod message;
|
||||
mod mio_worker;
|
||||
mod tcp_channel;
|
||||
mod worker;
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use crate::api::*;
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use tracing::*;
|
||||
use uuid::Uuid;
|
||||
use uvth::ThreadPoolBuilder;
|
||||
|
||||
struct N {
|
||||
id: u8,
|
||||
_id: u8,
|
||||
}
|
||||
|
||||
impl Events for N {
|
||||
@ -53,10 +53,8 @@ pub mod tests {
|
||||
test_tracing();
|
||||
let n1 = Network::<N>::new(Uuid::new_v4(), thread_pool.clone());
|
||||
let n2 = Network::<N>::new(Uuid::new_v4(), thread_pool.clone());
|
||||
let a1 = Address::Tcp(SocketAddr::from(([10, 52, 0, 101], 52000)));
|
||||
let a2 = Address::Tcp(SocketAddr::from(([10, 52, 0, 101], 52001)));
|
||||
//let a1 = Address::Tcp(SocketAddr::from(([10, 42, 2, 2], 52000)));
|
||||
//let a2 = Address::Tcp(SocketAddr::from(([10, 42, 2, 2], 52001)));
|
||||
let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000)));
|
||||
let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52001)));
|
||||
n1.listen(&a1); //await
|
||||
n2.listen(&a2); // only requiered here, but doesnt hurt on n1
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
@ -69,9 +67,16 @@ pub mod tests {
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
//n2.OnRemoteStreamOpen triggered
|
||||
|
||||
n1.send("", &s1);
|
||||
n1.send("Hello World", &s1);
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
// receive on n2 now
|
||||
|
||||
let s: Option<String> = n2.recv(&s1);
|
||||
for _ in 1..4 {
|
||||
error!("{:?}", s);
|
||||
}
|
||||
assert_eq!(s, Some("Hello World".to_string()));
|
||||
|
||||
n1.close(s1);
|
||||
//n2.OnRemoteStreamClose triggered
|
||||
|
||||
|
@ -1,39 +1,50 @@
|
||||
use bincode;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
//use std::collections::VecDeque;
|
||||
use crate::worker::types::{Mid, Sid};
|
||||
use std::sync::Arc;
|
||||
pub trait Message<'a> = Serialize + Deserialize<'a>;
|
||||
use tracing::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct MessageBuffer {
|
||||
// use VecDeque for msg storage, because it allows to quickly remove data from front.
|
||||
//however VecDeque needs custom bincode code, but it's possible
|
||||
data: Vec<u8>,
|
||||
pub data: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct OutGoingMessage {
|
||||
buffer: Arc<MessageBuffer>,
|
||||
cursor: u64,
|
||||
pub buffer: Arc<MessageBuffer>,
|
||||
pub cursor: u64,
|
||||
pub mid: Option<Mid>,
|
||||
pub sid: Sid,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct InCommingMessage {
|
||||
buffer: MessageBuffer,
|
||||
cursor: u64,
|
||||
pub buffer: MessageBuffer,
|
||||
pub length: u64,
|
||||
pub mid: Mid,
|
||||
pub sid: Sid,
|
||||
}
|
||||
|
||||
pub(crate) fn serialize<'a, M: Message<'a>>(message: &M) -> MessageBuffer {
|
||||
pub(crate) fn serialize<M: Serialize>(message: &M) -> MessageBuffer {
|
||||
let mut writer = {
|
||||
let actual_size = bincode::serialized_size(message).unwrap();
|
||||
Vec::<u8>::with_capacity(actual_size as usize)
|
||||
};
|
||||
if let Err(e) = bincode::serialize_into(&mut writer, message) {
|
||||
println!("Oh nooo {}", e);
|
||||
error!("Oh nooo {}", e);
|
||||
};
|
||||
MessageBuffer { data: writer }
|
||||
}
|
||||
|
||||
pub(crate) fn deserialize<M: DeserializeOwned>(buffer: MessageBuffer) -> M {
|
||||
let span = buffer.data;
|
||||
let decoded: M = bincode::deserialize(span.as_slice()).unwrap();
|
||||
decoded
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::message::*;
|
||||
|
330
network/src/worker/channel.rs
Normal file
330
network/src/worker/channel.rs
Normal file
@ -0,0 +1,330 @@
|
||||
use crate::{
|
||||
api::Promise,
|
||||
internal::{RemoteParticipant, VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION},
|
||||
message::{InCommingMessage, MessageBuffer, OutGoingMessage},
|
||||
worker::types::{Frame, Mid, Pid, RtrnMsg, Sid, Stream},
|
||||
};
|
||||
use enumset::EnumSet;
|
||||
use mio_extras::channel::Sender;
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
sync::{Arc, RwLock},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
pub(crate) trait Channel {
|
||||
/*
|
||||
uninitialized_dirty_speed_buffer: is just a already allocated buffer, that probably is already dirty because it's getting reused to save allocations, feel free to use it, but expect nothing
|
||||
aprox_time is the time taken when the events come in, you can reuse it for message timeouts, to not make any more syscalls
|
||||
*/
|
||||
/// Execute when ready to read
|
||||
fn read(
|
||||
&mut self,
|
||||
uninitialized_dirty_speed_buffer: &mut [u8; 65000],
|
||||
aprox_time: Instant,
|
||||
rtrn_tx: &Sender<RtrnMsg>,
|
||||
);
|
||||
/// Execute when ready to write
|
||||
fn write(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant);
|
||||
fn open_stream(&mut self, prio: u8, promises: EnumSet<Promise>) -> u32;
|
||||
fn close_stream(&mut self, sid: u32);
|
||||
fn handshake(&mut self);
|
||||
fn shutdown(&mut self);
|
||||
fn send(&mut self, outgoing: OutGoingMessage);
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ChannelState {
|
||||
pub stream_id_pool: Option<tlid::Pool<tlid::Wrapping<Sid>>>, /* TODO: stream_id unique per
|
||||
* participant */
|
||||
pub msg_id_pool: Option<tlid::Pool<tlid::Wrapping<Mid>>>, //TODO: msg_id unique per
|
||||
// participant
|
||||
pub local_pid: Pid,
|
||||
pub remote_pid: Option<Pid>,
|
||||
pub remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
|
||||
pub streams: Vec<Stream>,
|
||||
pub send_queue: VecDeque<Frame>,
|
||||
pub recv_queue: VecDeque<InCommingMessage>,
|
||||
pub send_handshake: bool,
|
||||
pub send_pid: bool,
|
||||
pub send_config: bool,
|
||||
pub send_shutdown: bool,
|
||||
pub recv_handshake: bool,
|
||||
pub recv_pid: bool,
|
||||
pub recv_config: bool,
|
||||
pub recv_shutdown: bool,
|
||||
}
|
||||
|
||||
/*
|
||||
Participant A
|
||||
Participant B
|
||||
A sends Handshake
|
||||
B receives Handshake and answers with Handshake
|
||||
A receives Handshake and answers with ParticipantId
|
||||
B receives ParticipantId and answeres with ParticipantId
|
||||
A receives ParticipantId and answers with Configuration for Streams and Messages
|
||||
---
|
||||
A and B can now concurrently open Streams and send messages
|
||||
---
|
||||
Shutdown phase
|
||||
*/
|
||||
|
||||
impl ChannelState {
|
||||
const WRONG_NUMBER: &'static [u8] = "Handshake does not contain the magic number requiered by \
|
||||
veloren server.\nWe are not sure if you are a valid \
|
||||
veloren client.\nClosing the connection"
|
||||
.as_bytes();
|
||||
const WRONG_VERSION: &'static str = "Handshake does not contain a correct magic number, but \
|
||||
invalid version.\nWe don't know how to communicate with \
|
||||
you.\n";
|
||||
|
||||
pub fn new(local_pid: Pid, remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>) -> Self {
|
||||
ChannelState {
|
||||
stream_id_pool: None,
|
||||
msg_id_pool: None,
|
||||
local_pid,
|
||||
remote_pid: None,
|
||||
remotes,
|
||||
streams: Vec::new(),
|
||||
send_queue: VecDeque::new(),
|
||||
recv_queue: VecDeque::new(),
|
||||
send_handshake: false,
|
||||
send_pid: false,
|
||||
send_config: false,
|
||||
send_shutdown: false,
|
||||
recv_handshake: false,
|
||||
recv_pid: false,
|
||||
recv_config: false,
|
||||
recv_shutdown: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn can_send(&self) -> bool {
|
||||
self.remote_pid.is_some()
|
||||
&& self.recv_handshake
|
||||
&& self.send_pid
|
||||
&& self.recv_pid
|
||||
&& (self.send_config || self.recv_config)
|
||||
&& !self.send_shutdown
|
||||
&& !self.recv_shutdown
|
||||
}
|
||||
|
||||
pub fn handle(&mut self, frame: Frame, rtrn_tx: &Sender<RtrnMsg>) {
|
||||
match frame {
|
||||
Frame::Handshake {
|
||||
magic_number,
|
||||
version,
|
||||
} => {
|
||||
if magic_number != VELOREN_MAGIC_NUMBER {
|
||||
error!("tcp connection with invalid handshake, closing connection");
|
||||
self.wrong_shutdown(Self::WRONG_NUMBER);
|
||||
}
|
||||
if version != VELOREN_NETWORK_VERSION {
|
||||
error!("tcp connection with wrong network version");
|
||||
self.wrong_shutdown(
|
||||
format!(
|
||||
"{} Our Version: {:?}\nYour Version: {:?}\nClosing the connection",
|
||||
Self::WRONG_VERSION,
|
||||
VELOREN_NETWORK_VERSION,
|
||||
version,
|
||||
)
|
||||
.as_bytes(),
|
||||
);
|
||||
}
|
||||
debug!("handshake completed");
|
||||
self.recv_handshake = true;
|
||||
if self.send_handshake {
|
||||
self.send_queue.push_back(Frame::ParticipantId {
|
||||
pid: self.local_pid,
|
||||
});
|
||||
self.send_pid = true;
|
||||
} else {
|
||||
self.send_queue.push_back(Frame::Handshake {
|
||||
magic_number: VELOREN_MAGIC_NUMBER.to_string(),
|
||||
version: VELOREN_NETWORK_VERSION,
|
||||
});
|
||||
self.send_handshake = true;
|
||||
}
|
||||
},
|
||||
Frame::Configure {
|
||||
stream_id_pool,
|
||||
msg_id_pool,
|
||||
} => {
|
||||
self.recv_config = true;
|
||||
//TODO remove range from rp! as this could probably cause duplicate ID !!!
|
||||
let mut remotes = self.remotes.write().unwrap();
|
||||
if let Some(pid) = self.remote_pid {
|
||||
if !remotes.contains_key(&pid) {
|
||||
remotes.insert(pid, RemoteParticipant::new());
|
||||
}
|
||||
if let Some(rp) = remotes.get_mut(&pid) {
|
||||
self.stream_id_pool = Some(stream_id_pool);
|
||||
self.msg_id_pool = Some(msg_id_pool);
|
||||
}
|
||||
}
|
||||
info!("recv config. This channel is now configured!");
|
||||
},
|
||||
Frame::ParticipantId { pid } => {
|
||||
if self.remote_pid.is_some() {
|
||||
error!(?pid, "invalid message, cant change participantId");
|
||||
return;
|
||||
}
|
||||
self.remote_pid = Some(pid);
|
||||
debug!(?pid, "Participant send their ID");
|
||||
self.recv_pid = true;
|
||||
if self.send_pid {
|
||||
let mut remotes = self.remotes.write().unwrap();
|
||||
if !remotes.contains_key(&pid) {
|
||||
remotes.insert(pid, RemoteParticipant::new());
|
||||
}
|
||||
if let Some(rp) = remotes.get_mut(&pid) {
|
||||
self.stream_id_pool = Some(rp.stream_id_pool.subpool(1000000).unwrap());
|
||||
self.msg_id_pool = Some(rp.msg_id_pool.subpool(1000000).unwrap());
|
||||
self.send_queue.push_back(Frame::Configure {
|
||||
stream_id_pool: rp.stream_id_pool.subpool(1000000).unwrap(),
|
||||
msg_id_pool: rp.msg_id_pool.subpool(1000000).unwrap(),
|
||||
});
|
||||
self.send_config = true;
|
||||
info!(?pid, "this channel is now configured!");
|
||||
}
|
||||
} else {
|
||||
self.send_queue.push_back(Frame::ParticipantId {
|
||||
pid: self.local_pid,
|
||||
});
|
||||
self.send_pid = true;
|
||||
}
|
||||
},
|
||||
Frame::Shutdown {} => {
|
||||
self.recv_shutdown = true;
|
||||
info!("shutting down channel");
|
||||
},
|
||||
Frame::OpenStream {
|
||||
sid,
|
||||
prio,
|
||||
promises,
|
||||
} => {
|
||||
if let Some(pid) = self.remote_pid {
|
||||
let stream = Stream::new(sid, prio, promises.clone());
|
||||
self.streams.push(stream);
|
||||
info!("opened a stream");
|
||||
} else {
|
||||
error!("called OpenStream before PartcipantID!");
|
||||
}
|
||||
},
|
||||
Frame::CloseStream { sid } => {
|
||||
if let Some(pid) = self.remote_pid {
|
||||
self.streams.retain(|stream| stream.sid() != sid);
|
||||
info!("closed a stream");
|
||||
}
|
||||
},
|
||||
Frame::DataHeader { mid, sid, length } => {
|
||||
debug!("Data Header {}", sid);
|
||||
let imsg = InCommingMessage {
|
||||
buffer: MessageBuffer { data: Vec::new() },
|
||||
length,
|
||||
mid,
|
||||
sid,
|
||||
};
|
||||
let mut found = false;
|
||||
for s in &mut self.streams {
|
||||
if s.sid() == sid {
|
||||
//TODO: move to Hashmap, so slow
|
||||
s.to_receive.push_back(imsg);
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
error!("couldn't find stream with sid: {}", sid);
|
||||
}
|
||||
},
|
||||
Frame::Data {
|
||||
id,
|
||||
start,
|
||||
mut data,
|
||||
} => {
|
||||
debug!("Data Package {}, len: {}", id, data.len());
|
||||
let mut found = false;
|
||||
for s in &mut self.streams {
|
||||
let mut pos = None;
|
||||
for i in 0..s.to_receive.len() {
|
||||
let m = &mut s.to_receive[i];
|
||||
if m.mid == id {
|
||||
found = true;
|
||||
m.buffer.data.append(&mut data);
|
||||
if m.buffer.data.len() as u64 == m.length {
|
||||
pos = Some(i);
|
||||
break;
|
||||
};
|
||||
};
|
||||
}
|
||||
if let Some(pos) = pos {
|
||||
for m in s.to_receive.drain(pos..pos + 1) {
|
||||
info!("receied message: {}", m.mid);
|
||||
//self.recv_queue.push_back(m);
|
||||
rtrn_tx.send(RtrnMsg::Receive(m));
|
||||
}
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
error!("couldn't find stream with mid: {}", id);
|
||||
}
|
||||
},
|
||||
Frame::Raw(data) => {
|
||||
info!("Got a Raw Package {:?}", data);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// This function will tick all streams according to priority and add them to the
|
||||
// send queue
|
||||
pub(crate) fn tick_streams(&mut self) {
|
||||
//ignoring prio for now
|
||||
//TODO: fix prio
|
||||
if let Some(msg_id_pool) = &mut self.msg_id_pool {
|
||||
for s in &mut self.streams {
|
||||
let mut remove = false;
|
||||
let sid = s.sid();
|
||||
if let Some(m) = s.to_send.front_mut() {
|
||||
let to_send = std::cmp::min(m.buffer.data.len() as u64 - m.cursor, 1400);
|
||||
if to_send > 0 {
|
||||
if m.cursor == 0 {
|
||||
let mid = msg_id_pool.next();
|
||||
m.mid = Some(mid);
|
||||
self.send_queue.push_back(Frame::DataHeader {
|
||||
mid,
|
||||
sid,
|
||||
length: m.buffer.data.len() as u64,
|
||||
});
|
||||
}
|
||||
self.send_queue.push_back(Frame::Data {
|
||||
id: m.mid.unwrap(),
|
||||
start: m.cursor,
|
||||
data: m.buffer.data[m.cursor as usize..(m.cursor + to_send) as usize]
|
||||
.to_vec(),
|
||||
});
|
||||
};
|
||||
m.cursor += to_send;
|
||||
if m.cursor == m.buffer.data.len() as u64 {
|
||||
remove = true;
|
||||
debug!(?m.mid, "finish message")
|
||||
}
|
||||
}
|
||||
if remove {
|
||||
s.to_send.pop_front();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn wrong_shutdown(&mut self, raw: &[u8]) {
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
debug!("sending client instructions before killing");
|
||||
self.send_queue.push_back(Frame::Raw(raw.to_vec()));
|
||||
self.send_queue.push_back(Frame::Shutdown {});
|
||||
self.send_shutdown = true;
|
||||
}
|
||||
}
|
||||
}
|
101
network/src/worker/mod.rs
Normal file
101
network/src/worker/mod.rs
Normal file
@ -0,0 +1,101 @@
|
||||
/*
|
||||
Most of the internals take place in it's own worker-thread.
|
||||
This folder contains all this outsourced calculation.
|
||||
This mod.rs contains the interface to communicate with the thread,
|
||||
communication is done via channels.
|
||||
*/
|
||||
pub mod channel;
|
||||
pub mod tcp;
|
||||
pub mod types;
|
||||
pub mod worker;
|
||||
|
||||
use crate::{
|
||||
internal::RemoteParticipant,
|
||||
worker::{
|
||||
types::{CtrlMsg, Pid, RtrnMsg, Statistics},
|
||||
worker::Worker,
|
||||
},
|
||||
};
|
||||
use mio::{self, Poll, PollOpt, Ready, Token};
|
||||
use mio_extras::channel::{channel, Receiver, Sender};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
use tlid;
|
||||
use tracing::*;
|
||||
use uvth::ThreadPool;
|
||||
|
||||
/*
|
||||
The MioWorker runs in it's own thread,
|
||||
it has a given set of Channels to work with.
|
||||
It is monitored, and when it's thread is fully loaded it can be splitted up into 2 MioWorkers
|
||||
*/
|
||||
pub struct Controller {
|
||||
poll: Arc<Poll>,
|
||||
statistics: Arc<RwLock<Statistics>>,
|
||||
ctrl_tx: Sender<CtrlMsg>,
|
||||
rtrn_rx: Receiver<RtrnMsg>,
|
||||
}
|
||||
|
||||
impl Controller {
|
||||
pub const CTRL_TOK: Token = Token(0);
|
||||
|
||||
pub fn new(
|
||||
wid: u64,
|
||||
pid: uuid::Uuid,
|
||||
thread_pool: Arc<ThreadPool>,
|
||||
mut token_pool: tlid::Pool<tlid::Wrapping<usize>>,
|
||||
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
|
||||
) -> Self {
|
||||
let poll = Arc::new(Poll::new().unwrap());
|
||||
let poll_clone = poll.clone();
|
||||
let statistics = Arc::new(RwLock::new(Statistics::default()));
|
||||
let statistics_clone = statistics.clone();
|
||||
|
||||
let (ctrl_tx, ctrl_rx) = channel();
|
||||
let (rtrn_tx, rtrn_rx) = channel();
|
||||
poll.register(&ctrl_rx, Self::CTRL_TOK, Ready::readable(), PollOpt::edge())
|
||||
.unwrap();
|
||||
// reserve 10 tokens in case they start with 0, //TODO: cleaner method
|
||||
for _ in 0..10 {
|
||||
token_pool.next();
|
||||
}
|
||||
|
||||
thread_pool.execute(move || {
|
||||
let w = wid;
|
||||
let span = span!(Level::INFO, "worker", ?w);
|
||||
let _enter = span.enter();
|
||||
let mut worker = Worker::new(
|
||||
pid,
|
||||
poll_clone,
|
||||
statistics_clone,
|
||||
remotes,
|
||||
token_pool,
|
||||
ctrl_rx,
|
||||
rtrn_tx,
|
||||
);
|
||||
worker.run();
|
||||
});
|
||||
Controller {
|
||||
poll,
|
||||
statistics,
|
||||
ctrl_tx,
|
||||
rtrn_rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_load_ratio(&self) -> f32 {
|
||||
let statistics = self.statistics.read().unwrap();
|
||||
statistics.nano_busy as f32 / (statistics.nano_busy + statistics.nano_wait + 1) as f32
|
||||
}
|
||||
|
||||
//TODO: split 4->5 MioWorkers and merge 5->4 MioWorkers
|
||||
|
||||
pub(crate) fn get_tx(&self) -> Sender<CtrlMsg> { self.ctrl_tx.clone() }
|
||||
|
||||
pub(crate) fn get_rx(&self) -> &Receiver<RtrnMsg> { &self.rtrn_rx }
|
||||
}
|
||||
impl Drop for Controller {
|
||||
fn drop(&mut self) { let _ = self.ctrl_tx.send(CtrlMsg::Shutdown); }
|
||||
}
|
159
network/src/worker/tcp.rs
Normal file
159
network/src/worker/tcp.rs
Normal file
@ -0,0 +1,159 @@
|
||||
use crate::{
|
||||
api::Promise,
|
||||
internal::{RemoteParticipant, VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION},
|
||||
message::OutGoingMessage,
|
||||
worker::{
|
||||
channel::{Channel, ChannelState},
|
||||
types::{Pid, RtrnMsg, Stream, TcpFrame},
|
||||
},
|
||||
};
|
||||
use bincode;
|
||||
use enumset::EnumSet;
|
||||
use mio::{self, net::TcpStream};
|
||||
use mio_extras::channel::Sender;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
io::{Read, Write},
|
||||
sync::{Arc, RwLock},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TcpChannel {
|
||||
state: ChannelState,
|
||||
pub tcpstream: TcpStream,
|
||||
}
|
||||
|
||||
impl TcpChannel {
|
||||
pub fn new(
|
||||
tcpstream: TcpStream,
|
||||
local_pid: Pid,
|
||||
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
|
||||
) -> Self {
|
||||
TcpChannel {
|
||||
state: ChannelState::new(local_pid, remotes),
|
||||
tcpstream,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Channel for TcpChannel {
|
||||
fn read(
|
||||
&mut self,
|
||||
uninitialized_dirty_speed_buffer: &mut [u8; 65000],
|
||||
aprox_time: Instant,
|
||||
rtrn_tx: &Sender<RtrnMsg>,
|
||||
) {
|
||||
let pid = self.state.remote_pid;
|
||||
let span = span!(Level::INFO, "channel", ?pid);
|
||||
let _enter = span.enter();
|
||||
match self.tcpstream.read(uninitialized_dirty_speed_buffer) {
|
||||
Ok(n) => {
|
||||
trace!("incomming message with len: {}", n);
|
||||
let mut cur = std::io::Cursor::new(&uninitialized_dirty_speed_buffer[..n]);
|
||||
while cur.position() < n as u64 {
|
||||
let r: Result<TcpFrame, _> = bincode::deserialize_from(&mut cur);
|
||||
match r {
|
||||
Ok(frame) => self.state.handle(frame, rtrn_tx),
|
||||
Err(e) => {
|
||||
error!(
|
||||
?self,
|
||||
?e,
|
||||
"failure parsing a message with len: {}, starting with: {:?}",
|
||||
n,
|
||||
&uninitialized_dirty_speed_buffer[0..std::cmp::min(n, 10)]
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
debug!("would block");
|
||||
},
|
||||
Err(e) => {
|
||||
panic!("{}", e);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
fn write(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant) {
|
||||
let pid = self.state.remote_pid;
|
||||
let span = span!(Level::INFO, "channel", ?pid);
|
||||
let _enter = span.enter();
|
||||
loop {
|
||||
while let Some(elem) = self.state.send_queue.pop_front() {
|
||||
if let Ok(mut data) = bincode::serialize(&elem) {
|
||||
let total = data.len();
|
||||
match self.tcpstream.write(&data) {
|
||||
Ok(n) if n == total => {},
|
||||
Ok(n) => {
|
||||
error!("could only send part");
|
||||
//let data = data.drain(n..).collect(); //TODO:
|
||||
// validate n.. is correct
|
||||
// to_send.push_front(data);
|
||||
},
|
||||
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
debug!("would block");
|
||||
return;
|
||||
},
|
||||
Err(e) => {
|
||||
panic!("{}", e);
|
||||
},
|
||||
};
|
||||
};
|
||||
}
|
||||
// run streams
|
||||
self.state.tick_streams();
|
||||
if self.state.send_queue.is_empty() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn open_stream(&mut self, prio: u8, promises: EnumSet<Promise>) -> u32 {
|
||||
// validate promises
|
||||
if let Some(stream_id_pool) = &mut self.state.stream_id_pool {
|
||||
let sid = stream_id_pool.next();
|
||||
let stream = Stream::new(sid, prio, promises.clone());
|
||||
self.state.streams.push(stream);
|
||||
self.state.send_queue.push_back(TcpFrame::OpenStream {
|
||||
sid,
|
||||
prio,
|
||||
promises,
|
||||
});
|
||||
return sid;
|
||||
}
|
||||
error!("fix me");
|
||||
return 0;
|
||||
//TODO: fix me
|
||||
}
|
||||
|
||||
fn close_stream(&mut self, sid: u32) {
|
||||
self.state.streams.retain(|stream| stream.sid() != sid);
|
||||
self.state
|
||||
.send_queue
|
||||
.push_back(TcpFrame::CloseStream { sid });
|
||||
}
|
||||
|
||||
fn handshake(&mut self) {
|
||||
self.state.send_queue.push_back(TcpFrame::Handshake {
|
||||
magic_number: VELOREN_MAGIC_NUMBER.to_string(),
|
||||
version: VELOREN_NETWORK_VERSION,
|
||||
});
|
||||
self.state.send_handshake = true;
|
||||
}
|
||||
|
||||
fn shutdown(&mut self) {
|
||||
self.state.send_queue.push_back(TcpFrame::Shutdown {});
|
||||
self.state.send_shutdown = true;
|
||||
}
|
||||
|
||||
fn send(&mut self, outgoing: OutGoingMessage) {
|
||||
//TODO: fix me
|
||||
for s in self.state.streams.iter_mut() {
|
||||
s.to_send.push_back(outgoing);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
126
network/src/worker/types.rs
Normal file
126
network/src/worker/types.rs
Normal file
@ -0,0 +1,126 @@
|
||||
use crate::{
|
||||
api::Promise,
|
||||
message::{InCommingMessage, OutGoingMessage},
|
||||
worker::tcp::TcpChannel,
|
||||
};
|
||||
use enumset::EnumSet;
|
||||
use mio::{self, net::TcpListener, PollOpt, Ready};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::VecDeque;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub type Pid = Uuid;
|
||||
pub type Sid = u32;
|
||||
pub type Mid = u64;
|
||||
|
||||
// Used for Communication between Controller <--> Worker
|
||||
pub(crate) enum CtrlMsg {
|
||||
Shutdown,
|
||||
Register(TokenObjects, Ready, PollOpt),
|
||||
OpenStream {
|
||||
pid: Pid,
|
||||
prio: u8,
|
||||
promises: EnumSet<Promise>,
|
||||
},
|
||||
CloseStream {
|
||||
pid: Pid,
|
||||
sid: Sid,
|
||||
},
|
||||
Send(OutGoingMessage),
|
||||
}
|
||||
|
||||
pub(crate) enum RtrnMsg {
|
||||
Shutdown,
|
||||
OpendStream {
|
||||
pid: Pid,
|
||||
prio: u8,
|
||||
promises: EnumSet<Promise>,
|
||||
},
|
||||
ClosedStream {
|
||||
pid: Pid,
|
||||
sid: Sid,
|
||||
},
|
||||
Receive(InCommingMessage),
|
||||
}
|
||||
|
||||
// MioStatistics should be copied in order to not hold locks for long
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Statistics {
|
||||
pub nano_wait: u128,
|
||||
pub nano_busy: u128,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum TokenObjects {
|
||||
TcpListener(TcpListener),
|
||||
TcpChannel(TcpChannel),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Stream {
|
||||
sid: Sid,
|
||||
prio: u8,
|
||||
promises: EnumSet<Promise>,
|
||||
pub to_send: VecDeque<OutGoingMessage>,
|
||||
pub to_receive: VecDeque<InCommingMessage>,
|
||||
}
|
||||
|
||||
impl Stream {
|
||||
pub fn new(sid: Sid, prio: u8, promises: EnumSet<Promise>) -> Self {
|
||||
Stream {
|
||||
sid,
|
||||
prio,
|
||||
promises,
|
||||
to_send: VecDeque::new(),
|
||||
to_receive: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sid(&self) -> Sid { self.sid }
|
||||
|
||||
pub fn prio(&self) -> u8 { self.prio }
|
||||
|
||||
pub fn promises(&self) -> EnumSet<Promise> { self.promises }
|
||||
}
|
||||
|
||||
// Used for Communication between Channel <----(TCP/UDP)----> Channel
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub(crate) enum Frame {
|
||||
Handshake {
|
||||
magic_number: String,
|
||||
version: [u32; 3],
|
||||
},
|
||||
Configure {
|
||||
//only one Participant will send this package and give the other a range to use
|
||||
stream_id_pool: tlid::Pool<tlid::Wrapping<Sid>>,
|
||||
msg_id_pool: tlid::Pool<tlid::Wrapping<Mid>>,
|
||||
},
|
||||
ParticipantId {
|
||||
pid: Pid,
|
||||
},
|
||||
Shutdown {/* Shutsdown this channel gracefully, if all channels are shut down, Participant is deleted */},
|
||||
OpenStream {
|
||||
sid: Sid,
|
||||
prio: u8,
|
||||
promises: EnumSet<Promise>,
|
||||
},
|
||||
CloseStream {
|
||||
sid: Sid,
|
||||
},
|
||||
DataHeader {
|
||||
mid: Mid,
|
||||
sid: Sid,
|
||||
length: u64,
|
||||
},
|
||||
Data {
|
||||
id: Mid,
|
||||
start: u64,
|
||||
data: Vec<u8>,
|
||||
},
|
||||
/* WARNING: Sending RAW is only used for debug purposes in case someone write a new API
|
||||
* against veloren Server! */
|
||||
Raw(Vec<u8>),
|
||||
}
|
||||
|
||||
pub(crate) type TcpFrame = Frame;
|
||||
pub(crate) type UdpFrame = Frame;
|
255
network/src/worker/worker.rs
Normal file
255
network/src/worker/worker.rs
Normal file
@ -0,0 +1,255 @@
|
||||
use crate::{
|
||||
internal::RemoteParticipant,
|
||||
worker::{
|
||||
channel::Channel,
|
||||
tcp::TcpChannel,
|
||||
types::{CtrlMsg, Pid, RtrnMsg, Statistics, TokenObjects},
|
||||
Controller,
|
||||
},
|
||||
};
|
||||
use mio::{self, Poll, PollOpt, Ready, Token};
|
||||
use mio_extras::channel::{Receiver, Sender};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{mpsc::TryRecvError, Arc, RwLock},
|
||||
time::Instant,
|
||||
};
|
||||
use tlid;
|
||||
use tracing::*;
|
||||
/*
|
||||
The worker lives in a own thread and only communcates with the outside via a Channel
|
||||
*/
|
||||
|
||||
pub(crate) struct MioTokens {
|
||||
pool: tlid::Pool<tlid::Wrapping<usize>>,
|
||||
pub tokens: HashMap<Token, TokenObjects>, //TODO: move to Vec<Options> for faster lookup
|
||||
}
|
||||
|
||||
impl MioTokens {
|
||||
pub fn new(pool: tlid::Pool<tlid::Wrapping<usize>>) -> Self {
|
||||
MioTokens {
|
||||
pool,
|
||||
tokens: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn construct(&mut self) -> Token { Token(self.pool.next()) }
|
||||
|
||||
pub fn insert(&mut self, tok: Token, obj: TokenObjects) {
|
||||
trace!(?tok, ?obj, "added new token");
|
||||
self.tokens.insert(tok, obj);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Worker {
|
||||
pid: Pid,
|
||||
poll: Arc<Poll>,
|
||||
statistics: Arc<RwLock<Statistics>>,
|
||||
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
|
||||
ctrl_rx: Receiver<CtrlMsg>,
|
||||
rtrn_tx: Sender<RtrnMsg>,
|
||||
mio_tokens: MioTokens,
|
||||
buf: [u8; 65000],
|
||||
time_before_poll: Instant,
|
||||
time_after_poll: Instant,
|
||||
}
|
||||
|
||||
impl Worker {
|
||||
pub fn new(
|
||||
pid: Pid,
|
||||
poll: Arc<Poll>,
|
||||
statistics: Arc<RwLock<Statistics>>,
|
||||
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
|
||||
token_pool: tlid::Pool<tlid::Wrapping<usize>>,
|
||||
ctrl_rx: Receiver<CtrlMsg>,
|
||||
rtrn_tx: Sender<RtrnMsg>,
|
||||
) -> Self {
|
||||
let mio_tokens = MioTokens::new(token_pool);
|
||||
Worker {
|
||||
pid,
|
||||
poll,
|
||||
statistics,
|
||||
remotes,
|
||||
ctrl_rx,
|
||||
rtrn_tx,
|
||||
mio_tokens,
|
||||
buf: [0; 65000],
|
||||
time_before_poll: Instant::now(),
|
||||
time_after_poll: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(&mut self) {
|
||||
let mut events = mio::Events::with_capacity(1024);
|
||||
loop {
|
||||
self.time_before_poll = Instant::now();
|
||||
if let Err(err) = self.poll.poll(&mut events, None) {
|
||||
error!("network poll error: {}", err);
|
||||
return;
|
||||
}
|
||||
self.time_after_poll = Instant::now();
|
||||
for event in &events {
|
||||
trace!(?event, "event");
|
||||
match event.token() {
|
||||
Controller::CTRL_TOK => {
|
||||
if self.handle_ctl() {
|
||||
return;
|
||||
}
|
||||
},
|
||||
_ => self.handle_tok(&event),
|
||||
};
|
||||
}
|
||||
self.handle_statistics();
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_ctl(&mut self) -> bool {
|
||||
let msg = match self.ctrl_rx.try_recv() {
|
||||
Ok(msg) => msg,
|
||||
Err(TryRecvError::Empty) => {
|
||||
return false;
|
||||
},
|
||||
Err(err) => {
|
||||
panic!("Unexpected error '{}'", err);
|
||||
},
|
||||
};
|
||||
|
||||
match msg {
|
||||
CtrlMsg::Shutdown => {
|
||||
debug!("Shutting Down");
|
||||
for (tok, obj) in self.mio_tokens.tokens.iter_mut() {
|
||||
if let TokenObjects::TcpChannel(channel) = obj {
|
||||
channel.shutdown();
|
||||
channel.write(&mut self.buf, self.time_after_poll);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
},
|
||||
CtrlMsg::Register(handle, interest, opts) => {
|
||||
let tok = self.mio_tokens.construct();
|
||||
match &handle {
|
||||
TokenObjects::TcpListener(h) => {
|
||||
self.poll.register(h, tok, interest, opts).unwrap()
|
||||
},
|
||||
TokenObjects::TcpChannel(channel) => self
|
||||
.poll
|
||||
.register(&channel.tcpstream, tok, interest, opts)
|
||||
.unwrap(),
|
||||
}
|
||||
debug!(?handle, ?tok, "Registered new handle");
|
||||
self.mio_tokens.insert(tok, handle);
|
||||
},
|
||||
CtrlMsg::OpenStream {
|
||||
pid,
|
||||
prio,
|
||||
promises,
|
||||
} => {
|
||||
for (tok, obj) in self.mio_tokens.tokens.iter_mut() {
|
||||
if let TokenObjects::TcpChannel(channel) = obj {
|
||||
channel.open_stream(prio, promises); //TODO: check participant
|
||||
channel.write(&mut self.buf, self.time_after_poll);
|
||||
}
|
||||
}
|
||||
//TODO:
|
||||
},
|
||||
CtrlMsg::CloseStream { pid, sid } => {
|
||||
//TODO:
|
||||
for to in self.mio_tokens.tokens.values_mut() {
|
||||
if let TokenObjects::TcpChannel(channel) = to {
|
||||
channel.close_stream(sid); //TODO: check participant
|
||||
channel.write(&mut self.buf, self.time_after_poll);
|
||||
}
|
||||
}
|
||||
},
|
||||
CtrlMsg::Send(outgoing) => {
|
||||
//TODO:
|
||||
for to in self.mio_tokens.tokens.values_mut() {
|
||||
if let TokenObjects::TcpChannel(channel) = to {
|
||||
channel.send(outgoing); //TODO: check participant
|
||||
channel.write(&mut self.buf, self.time_after_poll);
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
};
|
||||
false
|
||||
}
|
||||
|
||||
fn handle_tok(&mut self, event: &mio::Event) {
|
||||
let obj = match self.mio_tokens.tokens.get_mut(&event.token()) {
|
||||
Some(obj) => obj,
|
||||
None => panic!("Unexpected event token '{:?}'", &event.token()),
|
||||
};
|
||||
|
||||
match obj {
|
||||
TokenObjects::TcpListener(listener) => match listener.accept() {
|
||||
Ok((remote_stream, _)) => {
|
||||
info!(?remote_stream, "remote connected");
|
||||
|
||||
let tok = self.mio_tokens.construct();
|
||||
self.poll
|
||||
.register(
|
||||
&remote_stream,
|
||||
tok,
|
||||
Ready::readable() | Ready::writable(),
|
||||
PollOpt::edge(),
|
||||
)
|
||||
.unwrap();
|
||||
trace!(?remote_stream, ?tok, "registered");
|
||||
let mut channel =
|
||||
TcpChannel::new(remote_stream, self.pid, self.remotes.clone());
|
||||
channel.handshake();
|
||||
|
||||
self.mio_tokens
|
||||
.tokens
|
||||
.insert(tok, TokenObjects::TcpChannel(channel));
|
||||
},
|
||||
Err(err) => {
|
||||
error!(?err, "error during remote connected");
|
||||
},
|
||||
},
|
||||
TokenObjects::TcpChannel(channel) => {
|
||||
if event.readiness().is_readable() {
|
||||
trace!(?channel.tcpstream, "stream readable");
|
||||
channel.read(&mut self.buf, self.time_after_poll, &self.rtrn_tx);
|
||||
}
|
||||
if event.readiness().is_writable() {
|
||||
trace!(?channel.tcpstream, "stream writeable");
|
||||
channel.write(&mut self.buf, self.time_after_poll);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
fn handle_statistics(&mut self) {
|
||||
let time_after_work = Instant::now();
|
||||
let mut statistics = match self.statistics.try_write() {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
?e,
|
||||
"statistics dropped because they are currently accecssed"
|
||||
);
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
const KEEP_FACTOR: f64 = 0.995;
|
||||
//in order to weight new data stronger than older we fade them out with a
|
||||
// factor < 1. for 0.995 under full load (500 ticks a 1ms) we keep 8% of the old
|
||||
// value this means, that we start to see load comming up after
|
||||
// 500ms, but not for small spikes - as reordering for smaller spikes would be
|
||||
// to slow
|
||||
let first = self.time_after_poll.duration_since(self.time_before_poll);
|
||||
let second = time_after_work.duration_since(self.time_after_poll);
|
||||
statistics.nano_wait =
|
||||
(statistics.nano_wait as f64 * KEEP_FACTOR) as u128 + first.as_nanos();
|
||||
statistics.nano_busy =
|
||||
(statistics.nano_busy as f64 * KEEP_FACTOR) as u128 + second.as_nanos();
|
||||
|
||||
trace!(
|
||||
"current Load {}",
|
||||
statistics.nano_busy as f32 / (statistics.nano_busy + statistics.nano_wait + 1) as f32
|
||||
);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user