2020-01-13 16:53:28 +00:00
|
|
|
use crate::{
|
2020-02-21 13:08:34 +00:00
|
|
|
channel::{Channel, ChannelProtocols},
|
|
|
|
controller::Controller,
|
2020-02-21 15:10:55 +00:00
|
|
|
message::{self, InCommingMessage, OutGoingMessage},
|
2020-02-21 13:08:34 +00:00
|
|
|
metrics::NetworkMetrics,
|
|
|
|
tcp::TcpChannel,
|
|
|
|
types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg, Sid, TokenObjects},
|
2020-01-13 16:53:28 +00:00
|
|
|
};
|
2019-12-20 13:56:01 +00:00
|
|
|
use enumset::*;
|
2020-03-04 15:52:30 +00:00
|
|
|
use futures::stream::StreamExt;
|
2019-12-20 13:56:01 +00:00
|
|
|
use mio::{
|
|
|
|
self,
|
|
|
|
net::{TcpListener, TcpStream},
|
2020-01-13 16:53:28 +00:00
|
|
|
PollOpt, Ready,
|
2019-12-20 13:56:01 +00:00
|
|
|
};
|
2020-02-04 15:42:04 +00:00
|
|
|
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
|
|
|
use std::{
|
|
|
|
collections::HashMap,
|
2020-03-04 15:52:30 +00:00
|
|
|
sync::{mpsc, Arc, RwLock},
|
2020-02-04 15:42:04 +00:00
|
|
|
};
|
2020-01-22 16:44:32 +00:00
|
|
|
use tlid;
|
2020-01-13 16:53:28 +00:00
|
|
|
use tracing::*;
|
|
|
|
use uuid::Uuid;
|
|
|
|
use uvth::ThreadPool;
|
2019-12-20 13:56:01 +00:00
|
|
|
|
2020-01-13 16:53:28 +00:00
|
|
|
#[derive(Clone, Debug)]
|
2019-12-20 13:56:01 +00:00
|
|
|
pub enum Address {
|
|
|
|
Tcp(std::net::SocketAddr),
|
|
|
|
Udp(std::net::SocketAddr),
|
|
|
|
}
|
|
|
|
|
2020-01-22 16:44:32 +00:00
|
|
|
#[derive(Serialize, Deserialize, EnumSetType, Debug)]
|
|
|
|
#[enumset(serialize_repr = "u8")]
|
2019-12-20 13:56:01 +00:00
|
|
|
pub enum Promise {
|
|
|
|
InOrder,
|
|
|
|
NoCorrupt,
|
|
|
|
GuaranteedDelivery,
|
|
|
|
Encrypted,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct Participant {
|
|
|
|
addr: Address,
|
2020-02-10 17:25:47 +00:00
|
|
|
remote_pid: Pid,
|
2020-02-21 15:10:55 +00:00
|
|
|
network_controller: Arc<Vec<Controller>>,
|
2019-12-20 13:56:01 +00:00
|
|
|
}
|
|
|
|
|
2020-02-04 15:42:04 +00:00
|
|
|
pub struct Stream {
|
|
|
|
sid: Sid,
|
2020-03-04 10:59:19 +00:00
|
|
|
msg_rx: futures::channel::mpsc::UnboundedReceiver<InCommingMessage>,
|
2020-03-04 00:37:36 +00:00
|
|
|
ctr_tx: mio_extras::channel::Sender<CtrlMsg>,
|
2020-02-04 15:42:04 +00:00
|
|
|
}
|
2019-12-20 13:56:01 +00:00
|
|
|
|
2020-02-21 15:10:55 +00:00
|
|
|
pub struct Network {
|
2020-01-22 16:44:32 +00:00
|
|
|
token_pool: tlid::Pool<tlid::Wrapping<usize>>,
|
2020-02-04 15:42:04 +00:00
|
|
|
worker_pool: tlid::Pool<tlid::Wrapping<u64>>,
|
|
|
|
controller: Arc<Vec<Controller>>,
|
2020-01-13 16:53:28 +00:00
|
|
|
thread_pool: Arc<ThreadPool>,
|
2020-02-04 15:42:04 +00:00
|
|
|
participant_id: Pid,
|
|
|
|
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
|
2020-02-19 17:08:57 +00:00
|
|
|
metrics: Arc<Option<NetworkMetrics>>,
|
2019-12-20 13:56:01 +00:00
|
|
|
}
|
|
|
|
|
2020-02-21 15:10:55 +00:00
|
|
|
impl Network {
|
2020-01-13 16:53:28 +00:00
|
|
|
pub fn new(participant_id: Uuid, thread_pool: Arc<ThreadPool>) -> Self {
|
2020-01-22 16:44:32 +00:00
|
|
|
let mut token_pool = tlid::Pool::new_full();
|
2020-02-04 15:42:04 +00:00
|
|
|
let mut worker_pool = tlid::Pool::new_full();
|
|
|
|
let remotes = Arc::new(RwLock::new(HashMap::new()));
|
|
|
|
for _ in 0..participant_id.as_u128().rem_euclid(64) {
|
|
|
|
worker_pool.next();
|
|
|
|
//random offset from 0 for tests where multiple networks are
|
|
|
|
// created and we do not want to polute the traces with
|
|
|
|
// network pid everytime
|
|
|
|
}
|
2020-02-19 17:08:57 +00:00
|
|
|
let metrics = Arc::new(None);
|
2020-02-04 15:42:04 +00:00
|
|
|
let controller = Arc::new(vec![Controller::new(
|
|
|
|
worker_pool.next(),
|
2020-01-22 16:44:32 +00:00
|
|
|
participant_id,
|
2020-01-13 16:53:28 +00:00
|
|
|
thread_pool.clone(),
|
2020-01-22 16:44:32 +00:00
|
|
|
token_pool.subpool(1000000).unwrap(),
|
2020-02-19 17:08:57 +00:00
|
|
|
metrics.clone(),
|
2020-02-04 15:42:04 +00:00
|
|
|
remotes.clone(),
|
2020-01-13 16:53:28 +00:00
|
|
|
)]);
|
2019-12-20 13:56:01 +00:00
|
|
|
Self {
|
2020-01-22 16:44:32 +00:00
|
|
|
token_pool,
|
2020-02-04 15:42:04 +00:00
|
|
|
worker_pool,
|
|
|
|
controller,
|
2019-12-20 13:56:01 +00:00
|
|
|
thread_pool,
|
2020-01-13 16:53:28 +00:00
|
|
|
participant_id,
|
2020-02-04 15:42:04 +00:00
|
|
|
remotes,
|
2020-02-19 17:08:57 +00:00
|
|
|
metrics,
|
2019-12-20 13:56:01 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-02-04 15:42:04 +00:00
|
|
|
fn get_lowest_worker<'a: 'b, 'b>(list: &'a Arc<Vec<Controller>>) -> &'a Controller { &list[0] }
|
|
|
|
|
2020-02-10 17:25:47 +00:00
|
|
|
pub async fn listen(&self, address: &Address) -> Result<(), NetworkError> {
|
|
|
|
let span = span!(Level::TRACE, "listen", ?address);
|
2020-02-04 15:42:04 +00:00
|
|
|
let worker = Self::get_lowest_worker(&self.controller);
|
2020-02-10 17:25:47 +00:00
|
|
|
let _enter = span.enter();
|
|
|
|
match address {
|
|
|
|
Address::Tcp(a) => {
|
|
|
|
let tcp_listener = TcpListener::bind(&a)?;
|
|
|
|
info!("listening");
|
|
|
|
worker.get_tx().send(CtrlMsg::Register(
|
|
|
|
TokenObjects::TcpListener(tcp_listener),
|
|
|
|
Ready::readable(),
|
|
|
|
PollOpt::edge(),
|
|
|
|
))?;
|
|
|
|
},
|
|
|
|
Address::Udp(_) => unimplemented!("lazy me"),
|
|
|
|
};
|
|
|
|
Ok(())
|
2019-12-20 13:56:01 +00:00
|
|
|
}
|
|
|
|
|
2020-02-10 17:25:47 +00:00
|
|
|
pub async fn connect(&self, address: &Address) -> Result<Participant, NetworkError> {
|
2020-02-04 15:42:04 +00:00
|
|
|
let worker = Self::get_lowest_worker(&self.controller);
|
2020-01-22 16:44:32 +00:00
|
|
|
let pid = self.participant_id;
|
2020-02-04 15:42:04 +00:00
|
|
|
let remotes = self.remotes.clone();
|
2020-03-04 15:52:30 +00:00
|
|
|
let span = span!(Level::INFO, "connect", ?address);
|
2020-02-10 17:25:47 +00:00
|
|
|
let _enter = span.enter();
|
|
|
|
match address {
|
|
|
|
Address::Tcp(a) => {
|
|
|
|
info!("connecting");
|
|
|
|
let tcp_stream = TcpStream::connect(&a)?;
|
|
|
|
let tcp_channel = TcpChannel::new(tcp_stream);
|
2020-02-21 15:10:55 +00:00
|
|
|
let (ctrl_tx, ctrl_rx) = mpsc::channel::<Pid>();
|
2020-03-04 15:52:30 +00:00
|
|
|
let channel = Channel::new(
|
2020-02-20 16:04:58 +00:00
|
|
|
pid,
|
|
|
|
ChannelProtocols::Tcp(tcp_channel),
|
|
|
|
remotes,
|
|
|
|
Some(ctrl_tx),
|
|
|
|
);
|
2020-02-10 17:25:47 +00:00
|
|
|
worker.get_tx().send(CtrlMsg::Register(
|
2020-02-20 16:04:58 +00:00
|
|
|
TokenObjects::Channel(channel),
|
2020-02-10 17:25:47 +00:00
|
|
|
Ready::readable() | Ready::writable(),
|
|
|
|
PollOpt::edge(),
|
|
|
|
))?;
|
2020-02-20 16:04:58 +00:00
|
|
|
let remote_pid = ctrl_rx.recv().unwrap();
|
|
|
|
info!(?remote_pid, " sucessfully connected to");
|
|
|
|
return Ok(Participant {
|
|
|
|
addr: address.clone(),
|
|
|
|
remote_pid,
|
2020-02-21 15:10:55 +00:00
|
|
|
network_controller: self.controller.clone(),
|
2020-02-20 16:04:58 +00:00
|
|
|
});
|
2020-02-10 17:25:47 +00:00
|
|
|
},
|
|
|
|
Address::Udp(_) => unimplemented!("lazy me"),
|
|
|
|
}
|
2019-12-20 13:56:01 +00:00
|
|
|
}
|
|
|
|
|
2020-02-10 17:25:47 +00:00
|
|
|
//TODO: evaluate if move to Participant
|
|
|
|
pub async fn _disconnect(&self, participant: Participant) -> Result<(), NetworkError> {
|
|
|
|
panic!("sda");
|
2020-01-22 16:44:32 +00:00
|
|
|
}
|
2020-01-13 16:53:28 +00:00
|
|
|
|
2020-02-10 17:25:47 +00:00
|
|
|
pub fn participants(&self) -> Vec<Participant> {
|
|
|
|
panic!("sda");
|
|
|
|
}
|
|
|
|
|
2020-02-21 15:10:55 +00:00
|
|
|
pub async fn connected(&self) -> Result<Participant, NetworkError> {
|
2020-02-10 17:25:47 +00:00
|
|
|
// returns if a Participant connected and is ready
|
2020-02-21 15:10:55 +00:00
|
|
|
loop {
|
|
|
|
//ARRGGG
|
|
|
|
for worker in self.controller.iter() {
|
|
|
|
//TODO harden!
|
|
|
|
if let Ok(msg) = worker.get_rx().try_recv() {
|
|
|
|
if let RtrnMsg::ConnectedParticipant { pid } = msg {
|
|
|
|
return Ok(Participant {
|
|
|
|
addr: Address::Tcp(std::net::SocketAddr::from(([1, 3, 3, 7], 1337))), /* TODO: FIXME */
|
|
|
|
remote_pid: pid,
|
|
|
|
network_controller: self.controller.clone(),
|
|
|
|
});
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
}
|
2020-02-10 17:25:47 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn _disconnected(&self) -> Result<Participant, NetworkError> {
|
|
|
|
// returns if a Participant connected and is ready
|
|
|
|
panic!("sda");
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn multisend<M: Serialize>(
|
|
|
|
&self,
|
|
|
|
streams: Vec<Stream>,
|
|
|
|
msg: M,
|
|
|
|
) -> Result<(), NetworkError> {
|
2020-03-04 15:52:30 +00:00
|
|
|
let messagebuffer = Arc::new(message::serialize(&msg));
|
|
|
|
//TODO: why do we need a look here, i want my own local directory which is
|
|
|
|
// updated by workes via a channel and needs to be intepreted on a send but it
|
|
|
|
// should almost ever be empty except for new channel creations and stream
|
|
|
|
// creations!
|
|
|
|
for stream in streams {
|
|
|
|
stream
|
|
|
|
.ctr_tx
|
|
|
|
.send(CtrlMsg::Send(OutGoingMessage {
|
|
|
|
buffer: messagebuffer.clone(),
|
|
|
|
cursor: 0,
|
|
|
|
mid: None,
|
|
|
|
sid: stream.sid,
|
|
|
|
}))
|
|
|
|
.unwrap();
|
|
|
|
}
|
|
|
|
Ok(())
|
2020-02-10 17:25:47 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Participant {
|
2020-02-21 15:10:55 +00:00
|
|
|
pub async fn open(
|
2020-02-10 17:25:47 +00:00
|
|
|
&self,
|
|
|
|
prio: u8,
|
|
|
|
promises: EnumSet<Promise>,
|
|
|
|
) -> Result<Stream, ParticipantError> {
|
2020-02-21 15:10:55 +00:00
|
|
|
let (ctrl_tx, ctrl_rx) = mpsc::channel::<Sid>();
|
2020-03-04 10:59:19 +00:00
|
|
|
let (msg_tx, msg_rx) = futures::channel::mpsc::unbounded::<InCommingMessage>();
|
2020-02-21 15:10:55 +00:00
|
|
|
for controller in self.network_controller.iter() {
|
2020-03-04 00:37:36 +00:00
|
|
|
let tx = controller.get_tx();
|
|
|
|
tx.send(CtrlMsg::OpenStream {
|
|
|
|
pid: self.remote_pid,
|
|
|
|
prio,
|
|
|
|
promises,
|
|
|
|
return_sid: ctrl_tx,
|
|
|
|
msg_tx,
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
// I dont like the fact that i need to wait on the worker thread for getting my
|
|
|
|
// sid back :/ we could avoid this by introducing a Thread Local Network
|
|
|
|
// which owns some sids we can take without waiting
|
|
|
|
let sid = ctrl_rx.recv().unwrap();
|
|
|
|
info!(?sid, " sucessfully opened stream");
|
|
|
|
return Ok(Stream {
|
|
|
|
sid,
|
|
|
|
msg_rx,
|
|
|
|
ctr_tx: tx,
|
|
|
|
});
|
2020-02-21 15:10:55 +00:00
|
|
|
}
|
2020-03-04 00:37:36 +00:00
|
|
|
Err(ParticipantError::ParticipantDisconected)
|
2020-02-10 17:25:47 +00:00
|
|
|
}
|
|
|
|
|
2020-03-04 15:52:30 +00:00
|
|
|
pub fn close(&self, stream: Stream) -> Result<(), ParticipantError> {
|
|
|
|
for controller in self.network_controller.iter() {
|
|
|
|
let tx = controller.get_tx();
|
|
|
|
tx.send(CtrlMsg::CloseStream {
|
|
|
|
pid: self.remote_pid,
|
|
|
|
sid: stream.sid,
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
Err(ParticipantError::ParticipantDisconected)
|
|
|
|
}
|
2020-02-21 15:10:55 +00:00
|
|
|
|
|
|
|
pub async fn opened(&self) -> Result<Stream, ParticipantError> {
|
|
|
|
loop {
|
|
|
|
//ARRGGG
|
|
|
|
for worker in self.network_controller.iter() {
|
|
|
|
//TODO harden!
|
|
|
|
if let Ok(msg) = worker.get_rx().try_recv() {
|
|
|
|
if let RtrnMsg::OpendStream {
|
|
|
|
pid,
|
|
|
|
sid,
|
|
|
|
prio,
|
|
|
|
msg_rx,
|
|
|
|
promises,
|
|
|
|
} = msg
|
|
|
|
{
|
|
|
|
return Ok(Stream {
|
|
|
|
sid,
|
|
|
|
msg_rx,
|
2020-03-04 00:37:36 +00:00
|
|
|
ctr_tx: worker.get_tx(),
|
2020-02-21 15:10:55 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
}
|
2020-02-10 17:25:47 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn _closed(&self) -> Result<Stream, ParticipantError> {
|
2020-03-04 15:52:30 +00:00
|
|
|
panic!("aaa");
|
2020-02-10 17:25:47 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Stream {
|
|
|
|
//TODO: What about SEND instead of Serializeable if it goes via PIPE ?
|
|
|
|
//TODO: timeout per message or per stream ? stream or ?
|
|
|
|
|
2020-02-21 15:10:55 +00:00
|
|
|
pub fn send<M: Serialize>(&self, msg: M) -> Result<(), StreamError> {
|
|
|
|
let messagebuffer = Arc::new(message::serialize(&msg));
|
|
|
|
//transfer message to right worker to right channel to correct stream
|
|
|
|
//TODO: why do we need a look here, i want my own local directory which is
|
|
|
|
// updated by workes via a channel and needs to be intepreted on a send but it
|
|
|
|
// should almost ever be empty except for new channel creations and stream
|
|
|
|
// creations!
|
2020-03-04 00:37:36 +00:00
|
|
|
self.ctr_tx
|
|
|
|
.send(CtrlMsg::Send(OutGoingMessage {
|
2020-03-04 15:52:30 +00:00
|
|
|
buffer: messagebuffer,
|
2020-03-04 00:37:36 +00:00
|
|
|
cursor: 0,
|
|
|
|
mid: None,
|
|
|
|
sid: self.sid,
|
|
|
|
}))
|
|
|
|
.unwrap();
|
2020-02-21 15:10:55 +00:00
|
|
|
Ok(())
|
2020-02-10 17:25:47 +00:00
|
|
|
}
|
|
|
|
|
2020-03-04 10:59:19 +00:00
|
|
|
pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError> {
|
|
|
|
match self.msg_rx.next().await {
|
|
|
|
Some(msg) => {
|
2020-02-21 15:10:55 +00:00
|
|
|
info!(?msg, "delivering a message");
|
2020-03-04 00:37:36 +00:00
|
|
|
Ok(message::deserialize(msg.buffer))
|
2020-02-21 15:10:55 +00:00
|
|
|
},
|
2020-03-04 10:59:19 +00:00
|
|
|
None => panic!(
|
|
|
|
"Unexpected error, probably stream was destroyed... maybe i dont know yet, no \
|
|
|
|
idea of async stuff"
|
|
|
|
),
|
2020-02-21 15:10:55 +00:00
|
|
|
}
|
2020-02-10 17:25:47 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub enum NetworkError {
|
|
|
|
NetworkDestroyed,
|
|
|
|
WorkerDestroyed,
|
|
|
|
IoError(std::io::Error),
|
|
|
|
}
|
|
|
|
|
2020-02-21 15:10:55 +00:00
|
|
|
#[derive(Debug, PartialEq)]
|
2020-02-10 17:25:47 +00:00
|
|
|
pub enum ParticipantError {
|
|
|
|
ParticipantDisconected,
|
|
|
|
}
|
|
|
|
|
2020-02-21 15:10:55 +00:00
|
|
|
#[derive(Debug, PartialEq)]
|
2020-02-10 17:25:47 +00:00
|
|
|
pub enum StreamError {
|
|
|
|
StreamClosed,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<std::io::Error> for NetworkError {
|
|
|
|
fn from(err: std::io::Error) -> Self { NetworkError::IoError(err) }
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> From<mio_extras::channel::SendError<T>> for NetworkError {
|
2020-03-04 15:52:30 +00:00
|
|
|
fn from(_err: mio_extras::channel::SendError<T>) -> Self { NetworkError::WorkerDestroyed }
|
2019-12-20 13:56:01 +00:00
|
|
|
}
|