COMPLETE REWRITE

- use async_std and implement a async serialisaition
- new participant, stream and drop on the participant
- sending and receiving on streams
This commit is contained in:
Marcel Märtens 2020-03-22 14:47:21 +01:00
parent 499a895922
commit 595f1502b3
20 changed files with 2002 additions and 2412 deletions

40
Cargo.lock generated
View File

@ -1269,29 +1269,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "enumset"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93182dcb6530c757e5879b22ebc5cfbd034861585b442819389614e223ac1c47"
dependencies = [
"enumset_derive",
"num-traits 0.2.11",
"serde",
]
[[package]]
name = "enumset_derive"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "751a786cfcc7d5ceb9e0fe06f0e911da6ce3a3044633e029df4c370193c86a62"
dependencies = [
"darling",
"proc-macro2 1.0.17",
"quote 1.0.6",
"syn 1.0.27",
]
[[package]]
name = "env_logger"
version = "0.6.2"
@ -1520,6 +1497,7 @@ dependencies = [
"futures-core",
"futures-task",
"futures-util",
"num_cpus",
]
[[package]]
@ -4938,6 +4916,16 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "tracing-futures"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab7bb6f14721aa00656086e9335d363c5c8747bae02ebe32ea2c7dece5689b4c"
dependencies = [
"pin-project",
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.1.1"
@ -5362,20 +5350,18 @@ dependencies = [
name = "veloren_network"
version = "0.1.0"
dependencies = [
"async-std",
"bincode",
"byteorder 1.3.4",
"enumset",
"futures 0.3.5",
"lazy_static",
"mio",
"mio-extras",
"prometheus",
"rand 0.7.3",
"serde",
"tlid",
"tracing",
"tracing-futures",
"tracing-subscriber",
"uuid 0.8.1",
"uvth",
]

View File

@ -8,8 +8,6 @@ edition = "2018"
[dependencies]
enumset = { version = "0.4", features = ["serde"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
tlid = { path = "../../tlid", features = ["serde"]}
#threadpool
uvth = "3.1"
@ -18,13 +16,13 @@ bincode = "1.2"
serde = "1.0"
byteorder = "1.3"
#sending
mio = "0.6"
mio-extras = "2.0"
async-std = { version = "1.5", features = ["std", "unstable"] }
#tracing and metrics
tracing = "0.1"
tracing-futures = "0.2"
prometheus = "0.7"
#async
futures = "0.3"
futures = { version = "0.3", features = ["thread-pool"] }
#mpsc channel registry
lazy_static = "1.4"
rand = "0.7"

View File

@ -1,377 +1,251 @@
use crate::{
channel::{Channel, ChannelProtocols},
controller::Controller,
message::{self, InCommingMessage, OutGoingMessage},
metrics::NetworkMetrics,
mpsc::MpscChannel,
tcp::TcpChannel,
types::{CtrlMsg, Pid, Sid, TokenObjects},
scheduler::Scheduler,
types::{Mid, Pid, Prio, Promises, Sid},
};
use enumset::*;
use futures::stream::StreamExt;
use mio::{
self,
net::{TcpListener, TcpStream},
PollOpt, Ready,
use async_std::{sync::RwLock, task};
use futures::{
channel::{mpsc, oneshot},
sink::SinkExt,
stream::StreamExt,
};
use mio_extras;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::HashMap,
sync::{atomic::AtomicBool, mpsc, Arc, Mutex, RwLock},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tlid;
use tracing::*;
use uuid::Uuid;
use tracing_futures::Instrument;
use uvth::ThreadPool;
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum Address {
Tcp(std::net::SocketAddr),
Udp(std::net::SocketAddr),
Mpsc(u64),
}
#[derive(Serialize, Deserialize, EnumSetType, Debug)]
#[enumset(serialize_repr = "u8")]
pub enum Promise {
InOrder,
NoCorrupt,
GuaranteedDelivery,
Encrypted,
}
#[derive(Clone)]
#[derive(Debug)]
pub struct Participant {
local_pid: Pid,
remote_pid: Pid,
network_controller: Arc<Vec<Controller>>,
stream_open_sender: RwLock<mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>>,
stream_opened_receiver: RwLock<mpsc::UnboundedReceiver<Stream>>,
shutdown_receiver: RwLock<oneshot::Receiver<()>>,
closed: AtomicBool,
disconnect_sender: Option<mpsc::UnboundedSender<Pid>>,
}
#[derive(Debug)]
pub struct Stream {
pid: Pid,
sid: Sid,
remote_pid: Pid,
mid: Mid,
prio: Prio,
promises: Promises,
msg_send_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
msg_recv_receiver: mpsc::UnboundedReceiver<InCommingMessage>,
shutdown_receiver: oneshot::Receiver<()>,
closed: AtomicBool,
closed_rx: mpsc::Receiver<()>,
msg_rx: futures::channel::mpsc::UnboundedReceiver<InCommingMessage>,
ctr_tx: mio_extras::channel::Sender<CtrlMsg>,
shutdown_sender: Option<mpsc::UnboundedSender<Sid>>,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct NetworkError {}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct ParticipantError {}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct StreamError {}
pub struct Network {
_token_pool: tlid::Pool<tlid::Wrapping<usize>>,
_worker_pool: tlid::Pool<tlid::Wrapping<u64>>,
controller: Arc<Vec<Controller>>,
_thread_pool: Arc<ThreadPool>,
participant_id: Pid,
sid_backup_per_participant: Arc<RwLock<HashMap<Pid, tlid::Pool<tlid::Checked<Sid>>>>>,
participants: RwLock<Vec<Participant>>,
_metrics: Arc<Option<NetworkMetrics>>,
local_pid: Pid,
participants: RwLock<HashMap<Pid, Arc<Participant>>>,
listen_sender: RwLock<mpsc::UnboundedSender<Address>>,
connect_sender: RwLock<mpsc::UnboundedSender<(Address, oneshot::Sender<Participant>)>>,
connected_receiver: RwLock<mpsc::UnboundedReceiver<Participant>>,
shutdown_sender: Option<oneshot::Sender<()>>,
}
impl Network {
pub fn new(participant_id: Uuid, thread_pool: Arc<ThreadPool>) -> Self {
let mut token_pool = tlid::Pool::new_full();
let mut worker_pool = tlid::Pool::new_full();
let sid_backup_per_participant = Arc::new(RwLock::new(HashMap::new()));
for _ in 0..participant_id.as_u128().rem_euclid(64) {
worker_pool.next();
//random offset from 0 for tests where multiple networks are
// created and we do not want to polute the traces with
// network pid everywhere
}
let metrics = Arc::new(None);
let controller = Arc::new(vec![Controller::new(
worker_pool.next(),
participant_id,
thread_pool.clone(),
token_pool.subpool(1000000).unwrap(),
metrics.clone(),
sid_backup_per_participant.clone(),
)]);
let participants = RwLock::new(vec![]);
pub fn new(participant_id: Pid, thread_pool: &ThreadPool) -> Self {
//let participants = RwLock::new(vec![]);
let p = participant_id;
debug!(?p, "starting Network");
let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) =
Scheduler::new(participant_id);
thread_pool.execute(move || {
let _handle = task::block_on(
scheduler
.run()
.instrument(tracing::info_span!("scheduler", ?p)),
);
});
Self {
_token_pool: token_pool,
_worker_pool: worker_pool,
controller,
_thread_pool: thread_pool,
participant_id,
sid_backup_per_participant,
participants,
_metrics: metrics,
local_pid: participant_id,
participants: RwLock::new(HashMap::new()),
listen_sender: RwLock::new(listen_sender),
connect_sender: RwLock::new(connect_sender),
connected_receiver: RwLock::new(connected_receiver),
shutdown_sender: Some(shutdown_sender),
}
}
fn get_lowest_worker<'a: 'b, 'b>(list: &'a Arc<Vec<Controller>>) -> &'a Controller { &list[0] }
pub fn listen(&self, address: &Address) -> Result<(), NetworkError> {
let span = span!(Level::TRACE, "listen", ?address);
let worker = Self::get_lowest_worker(&self.controller);
let _enter = span.enter();
match address {
Address::Tcp(a) => {
let tcp_listener = TcpListener::bind(&a)?;
info!("listening");
worker.get_tx().send(CtrlMsg::Register(
TokenObjects::TcpListener(tcp_listener),
Ready::readable(),
PollOpt::edge(),
))?;
},
Address::Udp(_) => unimplemented!(
"UDP is currently not supportet problem is in internal worker - channel view. I \
except to have every Channel it#s own socket, but UDP shares a Socket with \
everyone on it. So there needs to be a instance that detects new connections \
inside worker and then creates a new channel for them, while handling needs to \
be done in UDP layer... however i am to lazy to build it yet."
),
Address::Mpsc(a) => {
let (listen_tx, listen_rx) = mio_extras::channel::channel();
let (connect_tx, conntect_rx) = mio_extras::channel::channel();
let mut registry = (*crate::mpsc::MPSC_REGISTRY).write().unwrap();
registry.insert(*a, Mutex::new((listen_tx, conntect_rx)));
info!("listening");
let mpsc_channel = MpscChannel::new(connect_tx, listen_rx);
let mut channel = Channel::new(
self.participant_id,
ChannelProtocols::Mpsc(mpsc_channel),
self.sid_backup_per_participant.clone(),
None,
);
channel.handshake();
channel.tick_send();
worker.get_tx().send(CtrlMsg::Register(
TokenObjects::Channel(channel),
Ready::readable() | Ready::writable(),
PollOpt::edge(),
))?;
},
};
pub fn listen(&self, address: Address) -> Result<(), NetworkError> {
task::block_on(async { self.listen_sender.write().await.send(address).await }).unwrap();
Ok(())
}
pub async fn connect(&self, address: &Address) -> Result<Participant, NetworkError> {
let worker = Self::get_lowest_worker(&self.controller);
let sid_backup_per_participant = self.sid_backup_per_participant.clone();
let span = span!(Level::INFO, "connect", ?address);
let _enter = span.enter();
match address {
Address::Tcp(a) => {
info!("connecting");
let tcp_stream = TcpStream::connect(&a)?;
let tcp_channel = TcpChannel::new(tcp_stream);
let (ctrl_tx, ctrl_rx) = mpsc::channel::<Pid>();
let channel = Channel::new(
self.participant_id,
ChannelProtocols::Tcp(tcp_channel),
sid_backup_per_participant,
Some(ctrl_tx),
);
worker.get_tx().send(CtrlMsg::Register(
TokenObjects::Channel(channel),
Ready::readable() | Ready::writable(),
PollOpt::edge(),
))?;
let remote_pid = ctrl_rx.recv().unwrap();
info!(?remote_pid, " sucessfully connected to");
let part = Participant {
remote_pid,
network_controller: self.controller.clone(),
};
self.participants.write().unwrap().push(part.clone());
return Ok(part);
pub async fn connect(&self, address: Address) -> Result<Arc<Participant>, NetworkError> {
let (pid_sender, pid_receiver) = oneshot::channel::<Participant>();
self.connect_sender
.write()
.await
.send((address, pid_sender))
.await
.unwrap();
match pid_receiver.await {
Ok(participant) => {
let pid = participant.remote_pid;
debug!(?pid, "received Participant from remote");
let participant = Arc::new(participant);
self.participants
.write()
.await
.insert(participant.remote_pid, participant.clone());
Ok(participant)
},
Address::Udp(_) => unimplemented!("lazy me"),
Address::Mpsc(a) => {
let mut registry = (*crate::mpsc::MPSC_REGISTRY).write().unwrap();
let (listen_tx, conntect_rx) = match registry.remove(a) {
Some(x) => x.into_inner().unwrap(),
None => {
error!("could not connect to mpsc");
return Err(NetworkError::NetworkDestroyed);
},
};
info!("connect to mpsc");
let mpsc_channel = MpscChannel::new(listen_tx, conntect_rx);
let (ctrl_tx, ctrl_rx) = mpsc::channel::<Pid>();
let channel = Channel::new(
self.participant_id,
ChannelProtocols::Mpsc(mpsc_channel),
self.sid_backup_per_participant.clone(),
Some(ctrl_tx),
);
worker.get_tx().send(CtrlMsg::Register(
TokenObjects::Channel(channel),
Ready::readable() | Ready::writable(),
PollOpt::edge(),
))?;
Err(_) => Err(NetworkError {}),
}
}
let remote_pid = ctrl_rx.recv().unwrap();
info!(?remote_pid, " sucessfully connected to");
let part = Participant {
remote_pid,
network_controller: self.controller.clone(),
};
self.participants.write().unwrap().push(part.clone());
return Ok(part);
pub async fn connected(&self) -> Result<Arc<Participant>, NetworkError> {
match self.connected_receiver.write().await.next().await {
Some(participant) => {
let participant = Arc::new(participant);
self.participants
.write()
.await
.insert(participant.remote_pid, participant.clone());
Ok(participant)
},
None => Err(NetworkError {}),
}
}
pub fn disconnect(&self, _participant: Participant) -> Result<(), NetworkError> {
//todo: close all channels to a participant!
unimplemented!("sda");
}
pub async fn disconnect(&self, participant: Arc<Participant>) -> Result<(), NetworkError> {
// Remove, Close and try_unwrap error when unwrap fails!
let participant = self
.participants
.write()
.await
.remove(&participant.remote_pid)
.unwrap();
participant.closed.store(true, Ordering::Relaxed);
pub fn participants(&self) -> std::sync::RwLockReadGuard<Vec<Participant>> {
self.participants.read().unwrap()
}
pub async fn connected(&self) -> Result<Participant, NetworkError> {
// returns if a Participant connected and is ready
loop {
//ARRGGG
for worker in self.controller.iter() {
//TODO harden!
worker.tick();
if let Ok(remote_pid) = worker.get_participant_connect_rx().try_recv() {
let part = Participant {
remote_pid,
network_controller: self.controller.clone(),
};
self.participants.write().unwrap().push(part.clone());
return Ok(part);
};
}
std::thread::sleep(std::time::Duration::from_millis(1));
}
}
pub fn multisend<M: Serialize>(
&self,
streams: Vec<Stream>,
msg: M,
) -> Result<(), NetworkError> {
let messagebuffer = Arc::new(message::serialize(&msg));
//TODO: why do we need a look here, i want my own local directory which is
// updated by workes via a channel and needs to be intepreted on a send but it
// should almost ever be empty except for new channel creations and stream
// creations!
for stream in streams {
stream
.ctr_tx
.send(CtrlMsg::Send(OutGoingMessage {
buffer: messagebuffer.clone(),
cursor: 0,
mid: None,
sid: stream.sid,
}))
.unwrap();
}
if Arc::try_unwrap(participant).is_err() {
warn!(
"you are disconnecting and still keeping a reference to this participant, this is \
a bad idea. Participant will only be dropped when you drop your last reference"
);
};
Ok(())
}
}
//TODO: HANDLE SHUTDOWN_RECEIVER
impl Participant {
pub fn open(&self, prio: u8, promises: EnumSet<Promise>) -> Result<Stream, ParticipantError> {
let (msg_tx, msg_rx) = futures::channel::mpsc::unbounded::<InCommingMessage>();
for controller in self.network_controller.iter() {
//trigger tick:
controller.tick();
let parts = controller.participants();
let (stream_close_tx, stream_close_rx) = mpsc::channel();
let sid = match parts.get(&self.remote_pid) {
Some(p) => {
let sid = p.sid_pool.write().unwrap().next();
//prepare the closing of the new stream already
p.stream_close_txs
.write()
.unwrap()
.insert(sid, stream_close_tx);
sid
},
None => return Err(ParticipantError::ParticipantDisconected), /* TODO: participant was never connected in the first case maybe... */
};
let tx = controller.get_tx();
tx.send(CtrlMsg::OpenStream {
pid: self.remote_pid,
sid,
prio,
promises,
msg_tx,
})
.unwrap();
info!(?sid, " sucessfully opened stream");
return Ok(Stream::new(
sid,
self.remote_pid,
stream_close_rx,
msg_rx,
tx,
));
pub(crate) fn new(
local_pid: Pid,
remote_pid: Pid,
stream_open_sender: mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>,
stream_opened_receiver: mpsc::UnboundedReceiver<Stream>,
shutdown_receiver: oneshot::Receiver<()>,
disconnect_sender: mpsc::UnboundedSender<Pid>,
) -> Self {
Self {
local_pid,
remote_pid,
stream_open_sender: RwLock::new(stream_open_sender),
stream_opened_receiver: RwLock::new(stream_opened_receiver),
shutdown_receiver: RwLock::new(shutdown_receiver),
closed: AtomicBool::new(false),
disconnect_sender: Some(disconnect_sender),
}
}
pub async fn open(&self, prio: u8, promises: Promises) -> Result<Stream, ParticipantError> {
let (sid_sender, sid_receiver) = oneshot::channel();
self.stream_open_sender
.write()
.await
.send((prio, promises, sid_sender))
.await
.unwrap();
match sid_receiver.await {
Ok(stream) => {
let sid = stream.sid;
debug!(?sid, "opened stream");
Ok(stream)
},
Err(_) => Err(ParticipantError {}),
}
Err(ParticipantError::ParticipantDisconected)
}
pub async fn opened(&self) -> Result<Stream, ParticipantError> {
//TODO: make this async native!
loop {
// Going to all workers in a network, but only receive on specific channels!
for worker in self.network_controller.iter() {
worker.tick();
let parts = worker.participants();
if let Some(p) = parts.get(&self.remote_pid) {
if let Ok(stream) = p.stream_open_rx.try_recv() {
//need a try, as i depend on the tick, it's the same thread...
debug!("delivering a stream");
return Ok(stream);
};
}
}
match self.stream_opened_receiver.write().await.next().await {
Some(stream) => Ok(stream),
None => Err(ParticipantError {}),
}
}
}
impl Stream {
//TODO: What about SEND instead of Serializeable if it goes via PIPE ?
//TODO: timeout per message or per stream ? stream or ? like for Position Data,
// if not transmitted within 1 second, throw away...
pub(crate) fn new(
pid: Pid,
sid: Sid,
remote_pid: Pid,
closed_rx: mpsc::Receiver<()>,
msg_rx: futures::channel::mpsc::UnboundedReceiver<InCommingMessage>,
ctr_tx: mio_extras::channel::Sender<CtrlMsg>,
prio: Prio,
promises: Promises,
msg_send_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
msg_recv_receiver: mpsc::UnboundedReceiver<InCommingMessage>,
shutdown_receiver: oneshot::Receiver<()>,
shutdown_sender: mpsc::UnboundedSender<Sid>,
) -> Self {
Self {
pid,
sid,
remote_pid,
mid: 0,
prio,
promises,
msg_send_sender,
msg_recv_receiver,
shutdown_receiver,
closed: AtomicBool::new(false),
closed_rx,
msg_rx,
ctr_tx,
shutdown_sender: Some(shutdown_sender),
}
}
pub fn send<M: Serialize>(&self, msg: M) -> Result<(), StreamError> {
if self.is_closed() {
return Err(StreamError::StreamClosed);
}
pub async fn send<M: Serialize>(&mut self, msg: M) -> Result<(), StreamError> {
let messagebuffer = Arc::new(message::serialize(&msg));
self.ctr_tx
.send(CtrlMsg::Send(OutGoingMessage {
self.msg_send_sender
.send((self.prio, self.pid, self.sid, OutGoingMessage {
buffer: messagebuffer,
cursor: 0,
mid: None,
mid: self.mid,
sid: self.sid,
}))
.unwrap();
self.mid += 1;
Ok(())
}
pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError> {
if self.is_closed() {
return Err(StreamError::StreamClosed);
}
match self.msg_rx.next().await {
match self.msg_recv_receiver.next().await {
Some(msg) => {
info!(?msg, "delivering a message");
Ok(message::deserialize(msg.buffer))
@ -382,68 +256,47 @@ impl Stream {
),
}
}
//Todo: ERROR: TODO: implement me and the disconnecting!
}
pub fn close(mut self) -> Result<(), StreamError> { self.intclose() }
fn is_closed(&self) -> bool {
use core::sync::atomic::Ordering;
if self.closed.load(Ordering::Relaxed) {
true
} else {
if let Ok(()) = self.closed_rx.try_recv() {
self.closed.store(true, Ordering::SeqCst); //TODO: Is this the right Ordering?
true
} else {
false
}
}
impl Drop for Network {
fn drop(&mut self) {
let p = self.local_pid;
debug!(?p, "shutting down Network");
self.shutdown_sender.take().unwrap().send(()).unwrap();
}
}
fn intclose(&mut self) -> Result<(), StreamError> {
use core::sync::atomic::Ordering;
if self.is_closed() {
return Err(StreamError::StreamClosed);
impl Drop for Participant {
fn drop(&mut self) {
if !self.closed.load(Ordering::Relaxed) {
let p = self.remote_pid;
debug!(?p, "shutting down Participant");
task::block_on(async {
self.disconnect_sender
.take()
.unwrap()
.send(self.remote_pid)
.await
.unwrap()
});
}
self.ctr_tx
.send(CtrlMsg::CloseStream {
pid: self.remote_pid,
sid: self.sid,
})
.unwrap();
self.closed.store(true, Ordering::SeqCst); //TODO: Is this the right Ordering?
Ok(())
}
}
impl Drop for Stream {
fn drop(&mut self) {
let _ = self.intclose().map_err(
|e| error!(?self.sid, ?e, "could not properly shutdown stream, which got out of scope"),
);
if !self.closed.load(Ordering::Relaxed) {
let s = self.sid;
debug!(?s, "shutting down Stream");
task::block_on(async {
self.shutdown_sender
.take()
.unwrap()
.send(self.sid)
.await
.unwrap()
});
}
}
}
#[derive(Debug)]
pub enum NetworkError {
NetworkDestroyed,
WorkerDestroyed,
IoError(std::io::Error),
}
#[derive(Debug, PartialEq)]
pub enum ParticipantError {
ParticipantDisconected,
}
#[derive(Debug, PartialEq)]
pub enum StreamError {
StreamClosed,
}
impl From<std::io::Error> for NetworkError {
fn from(err: std::io::Error) -> Self { NetworkError::IoError(err) }
}
impl<T> From<mio_extras::channel::SendError<T>> for NetworkError {
fn from(_err: mio_extras::channel::SendError<T>) -> Self { NetworkError::WorkerDestroyed }
}

178
network/src/async_serde.rs Normal file
View File

@ -0,0 +1,178 @@
/*
use ::uvth::ThreadPool;
use bincode;
use serde::{de::DeserializeOwned, Serialize};
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
};
pub struct SerializeFuture {
shared_state: Arc<Mutex<SerializeSharedState>>,
}
struct SerializeSharedState {
result: Option<Vec<u8>>,
waker: Option<Waker>,
}
pub struct DeserializeFuture<M: 'static + Send + DeserializeOwned> {
shared_state: Arc<Mutex<DeserializeSharedState<M>>>,
}
struct DeserializeSharedState<M: 'static + Send + DeserializeOwned> {
result: Option<M>,
waker: Option<Waker>,
}
impl Future for SerializeFuture {
type Output = Vec<u8>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.result.is_some() {
Poll::Ready(shared_state.result.take().unwrap())
} else {
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl SerializeFuture {
pub fn new<M: 'static + Send + Serialize>(message: M, pool: &ThreadPool) -> Self {
let shared_state = Arc::new(Mutex::new(SerializeSharedState {
result: None,
waker: None,
}));
// Spawn the new thread
let thread_shared_state = shared_state.clone();
pool.execute(move || {
let mut writer = {
let actual_size = bincode::serialized_size(&message).unwrap();
Vec::<u8>::with_capacity(actual_size as usize)
};
if let Err(e) = bincode::serialize_into(&mut writer, &message) {
panic!(
"bincode serialize error, probably undefined behavior somewhere else, check \
the possible error types of `bincode::serialize_into`: {}",
e
);
};
let mut shared_state = thread_shared_state.lock().unwrap();
shared_state.result = Some(writer);
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
Self { shared_state }
}
}
impl<M: 'static + Send + DeserializeOwned> Future for DeserializeFuture<M> {
type Output = M;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.result.is_some() {
Poll::Ready(shared_state.result.take().unwrap())
} else {
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl<M: 'static + Send + DeserializeOwned> DeserializeFuture<M> {
pub fn new(data: Vec<u8>, pool: &ThreadPool) -> Self {
let shared_state = Arc::new(Mutex::new(DeserializeSharedState {
result: None,
waker: None,
}));
// Spawn the new thread
let thread_shared_state = shared_state.clone();
pool.execute(move || {
let decoded: M = bincode::deserialize(data.as_slice()).unwrap();
let mut shared_state = thread_shared_state.lock().unwrap();
shared_state.result = Some(decoded);
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
Self { shared_state }
}
}
*/
/*
#[cfg(test)]
mod tests {
use crate::{
async_serde::*,
message::{MessageBuffer, OutGoingMessage},
types::{Frame, Sid},
};
use std::{collections::VecDeque, sync::Arc};
use uvth::ThreadPoolBuilder;
use async_std::{
io::BufReader,
net::{TcpListener, TcpStream, ToSocketAddrs},
prelude::*,
task,
};
#[macro_use] use futures;
async fn tick_tock(msg: String, pool: &ThreadPool) {
let serialized = SerializeFuture::new(msg.clone(), pool).await;
let deserialized = DeserializeFuture::<String>::new(serialized, pool).await;
assert_eq!(msg, deserialized)
}
#[test]
fn multiple_serialize() {
let msg = "ThisMessageisexactly100charactersLongToPrecislyMeassureSerialisation_SoYoucanSimplyCountThe123inhere".to_string();
let pool = ThreadPoolBuilder::new().build();
let (r1, r2, r3) = task::block_on(async {
let s1 = SerializeFuture::new(msg.clone(), &pool);
let s2 = SerializeFuture::new(msg.clone(), &pool);
let s3 = SerializeFuture::new(msg.clone(), &pool);
futures::join!(s1, s2, s3)
});
assert_eq!(r1.len(), 108);
assert_eq!(r2.len(), 108);
assert_eq!(r3.len(), 108);
}
#[test]
fn await_serialize() {
let msg = "ThisMessageisexactly100charactersLongToPrecislyMeassureSerialisation_SoYoucanSimplyCountThe123inhere".to_string();
let pool = ThreadPoolBuilder::new().build();
task::block_on(async {
let r1 = SerializeFuture::new(msg.clone(), &pool).await;
let r2 = SerializeFuture::new(msg.clone(), &pool).await;
let r3 = SerializeFuture::new(msg.clone(), &pool).await;
assert_eq!(r1.len(), 108);
assert_eq!(r2.len(), 108);
assert_eq!(r3.len(), 108);
});
}
#[test]
fn multiple_serialize_deserialize() {
let msg = "ThisMessageisexactly100charactersLongToPrecislyMeassureSerialisation_SoYoucanSimplyCountThe123inhere".to_string();
let pool = ThreadPoolBuilder::new().build();
task::block_on(async {
let s1 = tick_tock(msg.clone(), &pool);
let s2 = tick_tock(msg.clone(), &pool);
let s3 = tick_tock(msg.clone(), &pool);
futures::join!(s1, s2, s3)
});
}
}
*/

View File

@ -1,560 +1,306 @@
use crate::{
api::Promise,
message::{InCommingMessage, MessageBuffer, OutGoingMessage},
mpsc::MpscChannel,
tcp::TcpChannel,
frames::Frame,
types::{
Frame, IntStream, Pid, RtrnMsg, Sid, DEFAULT_SID_SIZE, VELOREN_MAGIC_NUMBER,
Cid, NetworkBuffer, Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER,
VELOREN_NETWORK_VERSION,
},
udp::UdpChannel,
};
use enumset::EnumSet;
use futures::{executor::block_on, sink::SinkExt};
use rand::{thread_rng, Rng};
use std::{
collections::{HashMap, VecDeque},
sync::{mpsc, Arc, RwLock},
};
use tlid;
use async_std::{net::TcpStream, prelude::*, sync::RwLock};
use futures::{channel::mpsc, future::FutureExt, select, sink::SinkExt, stream::StreamExt};
use tracing::*;
//use futures::prelude::*;
pub(crate) trait ChannelProtocol {
type Handle: ?Sized + mio::Evented;
/// Execute when ready to read
fn read(&mut self) -> Vec<Frame>;
/// Execute when ready to write
fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I);
/// used for mio
fn get_handle(&self) -> &Self::Handle;
}
#[derive(Debug)]
pub(crate) enum ChannelProtocols {
Tcp(TcpChannel),
Udp(UdpChannel),
Mpsc(MpscChannel),
}
#[derive(Debug)]
pub(crate) struct Channel {
pub stream_id_pool: Option<tlid::Pool<tlid::Wrapping<Sid>>>, /* TODO: stream_id unique per
* participant */
// participantd
pub randomno: u64,
pub local_pid: Pid,
pub remote_pid: Option<Pid>,
pub sid_backup_per_participant: Arc<RwLock<HashMap<Pid, tlid::Pool<tlid::Checked<Sid>>>>>,
pub streams: Vec<IntStream>,
pub send_queue: VecDeque<Frame>,
pub protocol: ChannelProtocols,
pub return_pid_to: Option<std::sync::mpsc::Sender<Pid>>, //use for network::connect()
pub send_handshake: bool,
pub send_pid: bool,
pub send_config: bool,
pub send_shutdown: bool,
pub recv_handshake: bool,
pub recv_pid: bool,
pub recv_config: bool,
pub recv_shutdown: bool,
cid: Cid,
local_pid: Pid,
remote_pid: RwLock<Option<Pid>>,
send_state: RwLock<ChannelState>,
recv_state: RwLock<ChannelState>,
}
/*
Participant A
Participant B
A sends Handshake
B receives Handshake and answers with Handshake
A receives Handshake and answers with ParticipantId
B receives ParticipantId and answeres with ParticipantId
A receives ParticipantId and answers with Configuration for Streams and Messages
---
A and B can now concurrently open Streams and send messages
---
Shutdown phase
*/
#[derive(Debug, PartialEq)]
enum ChannelState {
None,
Handshake,
Pid,
Shutdown,
}
impl Channel {
const WRONG_NUMBER: &'static [u8] = "Handshake does not contain the magic number requiered by \
veloren server.\nWe are not sure if you are a valid \
veloren client.\nClosing the connection"
.as_bytes();
const WRONG_VERSION: &'static str = "Handshake does not contain a correct magic number, but \
const WRONG_VERSION: &'static str = "Handshake does contain a correct magic number, but \
invalid version.\nWe don't know how to communicate with \
you.\n";
you.\nClosing the connection";
pub fn new(
local_pid: Pid,
protocol: ChannelProtocols,
sid_backup_per_participant: Arc<RwLock<HashMap<Pid, tlid::Pool<tlid::Checked<Sid>>>>>,
return_pid_to: Option<std::sync::mpsc::Sender<Pid>>,
) -> Self {
let randomno = thread_rng().gen();
warn!(?randomno, "new channel,yay ");
pub fn new(cid: u64, local_pid: Pid) -> Self {
Self {
randomno,
stream_id_pool: None,
cid,
local_pid,
remote_pid: None,
sid_backup_per_participant,
streams: Vec::new(),
send_queue: VecDeque::new(),
protocol,
return_pid_to,
send_handshake: false,
send_pid: false,
send_config: false,
send_shutdown: false,
recv_handshake: false,
recv_pid: false,
recv_config: false,
recv_shutdown: false,
remote_pid: RwLock::new(None),
send_state: RwLock::new(ChannelState::None),
recv_state: RwLock::new(ChannelState::None),
}
}
pub fn can_send(&self) -> bool {
self.remote_pid.is_some()
&& self.recv_handshake
&& self.send_pid
&& self.recv_pid
&& (self.send_config || self.recv_config)
&& !self.send_shutdown
&& !self.recv_shutdown
}
pub fn tick_recv(
&mut self,
worker_participants: &mut HashMap<Pid, tlid::Pool<tlid::Wrapping<Sid>>>,
rtrn_tx: &mpsc::Sender<RtrnMsg>,
/// (prot|part)_(in|out)_(sender|receiver)
/// prot: TO/FROM PROTOCOL = TCP
/// part: TO/FROM PARTICIPANT
/// in: FROM
/// out: TO
/// sender: mpsc::Sender
/// receiver: mpsc::Receiver
pub async fn run(
self,
protocol: TcpStream,
part_in_receiver: mpsc::UnboundedReceiver<Frame>,
part_out_sender: mpsc::UnboundedSender<(Cid, Frame)>,
configured_sender: mpsc::UnboundedSender<(Cid, Pid, Sid)>,
) {
match &mut self.protocol {
ChannelProtocols::Tcp(c) => {
for frame in c.read() {
self.handle(frame, worker_participants, rtrn_tx);
}
},
ChannelProtocols::Udp(c) => {
for frame in c.read() {
self.handle(frame, worker_participants, rtrn_tx);
}
},
ChannelProtocols::Mpsc(c) => {
for frame in c.read() {
self.handle(frame, worker_participants, rtrn_tx);
}
},
}
let (prot_in_sender, prot_in_receiver) = mpsc::unbounded::<Frame>();
let (prot_out_sender, prot_out_receiver) = mpsc::unbounded::<Frame>();
futures::join!(
self.read(protocol.clone(), prot_in_sender),
self.write(protocol, prot_out_receiver, part_in_receiver),
self.frame_handler(
prot_in_receiver,
prot_out_sender,
part_out_sender,
configured_sender
)
);
//return part_out_receiver;
}
pub fn tick_send(&mut self) {
self.tick_streams();
match &mut self.protocol {
ChannelProtocols::Tcp(c) => {
c.write(&mut self.send_queue.drain(..));
},
ChannelProtocols::Udp(c) => {
c.write(&mut self.send_queue.drain(..));
},
ChannelProtocols::Mpsc(c) => {
c.write(&mut self.send_queue.drain(..));
},
}
}
fn handle(
&mut self,
frame: Frame,
worker_participants: &mut HashMap<Pid, tlid::Pool<tlid::Wrapping<Sid>>>,
rtrn_tx: &mpsc::Sender<RtrnMsg>,
pub async fn frame_handler(
&self,
mut frames: mpsc::UnboundedReceiver<Frame>,
mut frame_sender: mpsc::UnboundedSender<Frame>,
mut external_frame_sender: mpsc::UnboundedSender<(Cid, Frame)>,
mut configured_sender: mpsc::UnboundedSender<(Cid, Pid, Sid)>,
) {
match frame {
Frame::Handshake {
magic_number,
version,
} => {
if magic_number != VELOREN_MAGIC_NUMBER {
error!(
?magic_number,
"connection with invalid magic_number, closing connection"
);
self.wrong_shutdown(Self::WRONG_NUMBER);
}
if version != VELOREN_NETWORK_VERSION {
error!(?version, "tcp connection with wrong network version");
self.wrong_shutdown(
const ERR_S: &str = "Got A Raw Message, these are usually Debug Messages indicating that \
something went wrong on network layer and connection will be closed";
while let Some(frame) = frames.next().await {
trace!(?frame, "recv frame");
match frame {
Frame::Handshake {
magic_number,
version,
} => {
if self
.verify_handshake(magic_number, version, &mut frame_sender)
.await
.is_ok()
{
debug!("handshake completed");
*self.recv_state.write().await = ChannelState::Handshake;
if *self.send_state.read().await == ChannelState::Handshake {
self.send_pid(&mut frame_sender).await;
} else {
self.send_handshake(&mut frame_sender).await;
}
};
},
Frame::ParticipantId { pid } => {
if self.remote_pid.read().await.is_some() {
error!(?pid, "invalid message, cant change participantId");
return;
}
*self.remote_pid.write().await = Some(pid);
*self.recv_state.write().await = ChannelState::Pid;
debug!(?pid, "Participant send their ID");
let stream_id_offset = if *self.send_state.read().await != ChannelState::Pid {
self.send_pid(&mut frame_sender).await;
STREAM_ID_OFFSET2
} else {
STREAM_ID_OFFSET1
};
info!(?pid, "this channel is now configured!");
configured_sender
.send((self.cid, pid, stream_id_offset))
.await
.unwrap();
},
Frame::Shutdown => {
info!("shutdown signal received");
*self.recv_state.write().await = ChannelState::Shutdown;
},
/* Sending RAW is only used for debug purposes in case someone write a
* new API against veloren Server! */
Frame::Raw(bytes) => match std::str::from_utf8(bytes.as_slice()) {
Ok(string) => error!(?string, ERR_S),
_ => error!(?bytes, ERR_S),
},
_ => {
trace!("forward frame");
external_frame_sender.send((self.cid, frame)).await.unwrap();
},
}
}
}
pub async fn read(
&self,
mut protocol: TcpStream,
mut frame_handler: mpsc::UnboundedSender<Frame>,
) {
let mut buffer = NetworkBuffer::new();
loop {
match protocol.read(buffer.get_write_slice(2048)).await {
Ok(0) => {
debug!(?buffer, "shutdown of tcp channel detected");
frame_handler.send(Frame::Shutdown).await.unwrap();
break;
},
Ok(n) => {
buffer.actually_written(n);
trace!("incomming message with len: {}", n);
let slice = buffer.get_read_slice();
let mut cur = std::io::Cursor::new(slice);
let mut read_ok = 0;
while cur.position() < n as u64 {
let round_start = cur.position() as usize;
let r: Result<Frame, _> = bincode::deserialize_from(&mut cur);
match r {
Ok(frame) => {
frame_handler.send(frame).await.unwrap();
read_ok = cur.position() as usize;
},
Err(e) => {
// Probably we have to wait for moare data!
let first_bytes_of_msg =
&slice[round_start..std::cmp::min(n, round_start + 16)];
debug!(
?buffer,
?e,
?n,
?round_start,
?first_bytes_of_msg,
"message cant be parsed, probably because we need to wait for \
more data"
);
break;
},
}
}
buffer.actually_read(read_ok);
},
Err(e) => panic!("{}", e),
}
}
}
pub async fn write(
&self,
mut protocol: TcpStream,
mut internal_frame_receiver: mpsc::UnboundedReceiver<Frame>,
mut external_frame_receiver: mpsc::UnboundedReceiver<Frame>,
) {
while let Some(frame) = select! {
next = internal_frame_receiver.next().fuse() => next,
next = external_frame_receiver.next().fuse() => next,
} {
//dezerialize here as this is executed in a seperate thread PER channel.
// Limites Throughput per single Receiver but stays in same thread (maybe as its
// in a threadpool)
trace!(?frame, "going to send frame via tcp");
let data = bincode::serialize(&frame).unwrap();
protocol.write_all(data.as_slice()).await.unwrap();
}
}
async fn verify_handshake(
&self,
magic_number: String,
version: [u32; 3],
frame_sender: &mut mpsc::UnboundedSender<Frame>,
) -> Result<(), ()> {
if magic_number != VELOREN_MAGIC_NUMBER {
error!(?magic_number, "connection with invalid magic_number");
#[cfg(debug_assertions)]
{
debug!("sending client instructions before killing");
frame_sender
.send(Frame::Raw(Self::WRONG_NUMBER.to_vec()))
.await
.unwrap();
frame_sender.send(Frame::Shutdown).await.unwrap();
*self.send_state.write().await = ChannelState::Shutdown;
}
return Err(());
}
if version != VELOREN_NETWORK_VERSION {
error!(?version, "connection with wrong network version");
#[cfg(debug_assertions)]
{
debug!("sending client instructions before killing");
frame_sender
.send(Frame::Raw(
format!(
"{} Our Version: {:?}\nYour Version: {:?}\nClosing the connection",
Self::WRONG_VERSION,
VELOREN_NETWORK_VERSION,
version,
)
.as_bytes(),
);
}
debug!("handshake completed");
self.recv_handshake = true;
if self.send_handshake {
self.send_queue.push_back(Frame::ParticipantId {
pid: self.local_pid,
});
self.send_pid = true;
} else {
self.send_queue.push_back(Frame::Handshake {
magic_number: VELOREN_MAGIC_NUMBER.to_string(),
version: VELOREN_NETWORK_VERSION,
});
self.send_handshake = true;
}
},
Frame::ParticipantId { pid } => {
if self.remote_pid.is_some() {
error!(?pid, "invalid message, cant change participantId");
return;
}
self.remote_pid = Some(pid);
debug!(?pid, "Participant send their ID");
self.recv_pid = true;
if self.send_pid {
//If participant is unknown to worker, assign some range from global pool
if !worker_participants.contains_key(&pid) {
let mut global_participants =
self.sid_backup_per_participant.write().unwrap();
//if this is the first time a participant connects to this Controller
if !global_participants.contains_key(&pid) {
// I dont no participant, so i can safely assume that they don't know
// me. so HERE we gonna fill local network pool
global_participants.insert(pid, tlid::Pool::new_full());
}
//grab a range for controller
let global_part_pool = global_participants.get_mut(&pid).unwrap();
let mut local_controller_sids =
tlid::subpool_wrapping(global_part_pool, DEFAULT_SID_SIZE).unwrap();
let remote_controller_sids =
tlid::subpool_wrapping(global_part_pool, DEFAULT_SID_SIZE).unwrap();
let mut local_worker_sids =
tlid::subpool_wrapping(global_part_pool, DEFAULT_SID_SIZE).unwrap();
let remote_worker_sids =
tlid::subpool_wrapping(global_part_pool, DEFAULT_SID_SIZE).unwrap();
let local_controller_range =
tlid::RemoveAllocation::new(&mut local_controller_sids);
let local_worker_range =
tlid::RemoveAllocation::new(&mut local_worker_sids);
worker_participants.insert(pid.clone(), local_worker_sids);
self.send_queue.push_back(Frame::Configure {
sender_controller_sids: local_controller_range,
sender_worker_sids: local_worker_range,
receiver_controller_sids: remote_controller_sids,
receiver_worker_sids: remote_worker_sids,
});
self.send_config = true;
info!(?pid, "this channel is now configured!");
if let Err(err) = rtrn_tx.send(RtrnMsg::ConnectedParticipant {
controller_sids: local_controller_sids,
pid,
}) {
error!(?err, "couldn't notify, is network already closed ?");
}
} else {
warn!(
"a known participant opened an additional channel, UNCHECKED BECAUSE \
NO TOKEN WAS IMPLEMENTED IN THE HANDSHAKE!"
);
}
} else {
self.send_queue.push_back(Frame::ParticipantId {
pid: self.local_pid,
});
self.send_pid = true;
}
},
Frame::Configure {
sender_controller_sids,
sender_worker_sids,
mut receiver_controller_sids,
mut receiver_worker_sids,
} => {
let pid = match self.remote_pid {
Some(pid) => pid,
None => {
error!("Cant configure a Channel without a PID first!");
return;
},
};
self.recv_config = true;
//Check if worker already knows about this participant
if !worker_participants.contains_key(&pid) {
let mut global_participants = self.sid_backup_per_participant.write().unwrap();
if !global_participants.contains_key(&pid) {
// I dont no participant, so i can safely assume that they don't know me. so
// HERE we gonna fill local network pool
global_participants.insert(pid, tlid::Pool::new_full());
}
//grab a range for controller
let global_part_pool = global_participants.get_mut(&pid).unwrap();
sender_controller_sids
.remove_from(global_part_pool)
.unwrap();
sender_worker_sids.remove_from(global_part_pool).unwrap();
tlid::RemoveAllocation::new(&mut receiver_controller_sids)
.remove_from(global_part_pool)
.unwrap();
tlid::RemoveAllocation::new(&mut receiver_worker_sids)
.remove_from(global_part_pool)
.unwrap();
worker_participants.insert(pid.clone(), receiver_worker_sids);
if let Err(err) = rtrn_tx.send(RtrnMsg::ConnectedParticipant {
pid,
controller_sids: receiver_controller_sids,
}) {
error!(?err, "couldn't notify, is network already closed ?");
}
if let Some(send) = &self.return_pid_to {
if let Err(err) = send.send(pid) {
error!(
?err,
"couldn't notify of connected participant, is network already \
closed ?"
);
}
};
self.return_pid_to = None;
} else {
warn!(
"a known participant opened an additional channel, UNCHECKED BECAUSE NO \
TOKEN WAS IMPLEMENTED IN THE HANDSHAKE!"
);
}
info!("recv config. This channel is now configured!");
},
Frame::Shutdown => {
self.recv_shutdown = true;
info!("shutting down channel");
if let Err(err) = rtrn_tx.send(RtrnMsg::Shutdown) {
error!(?err, "couldn't notify of shutdown");
}
},
Frame::OpenStream {
sid,
prio,
promises,
} => {
if let Some(pid) = self.remote_pid {
let (msg_tx, msg_rx) = futures::channel::mpsc::unbounded::<InCommingMessage>();
let stream = IntStream::new(sid, prio, promises.clone(), msg_tx);
trace!(?self.streams, "-OPEN STREAM- going to modify streams");
self.streams.push(stream);
trace!(?self.streams, "-OPEN STREAM- did to modify streams");
info!("opened a stream");
if let Err(err) = rtrn_tx.send(RtrnMsg::OpendStream {
pid,
sid,
prio,
msg_rx,
promises,
}) {
error!(?err, "couldn't notify of opened stream");
}
} else {
error!("called OpenStream before PartcipantID!");
}
},
Frame::CloseStream { sid } => {
if let Some(pid) = self.remote_pid {
trace!(?self.streams, "-CLOSE STREAM- going to modify streams");
self.streams.retain(|stream| stream.sid() != sid);
trace!(?self.streams, "-CLOSE STREAM- did to modify streams");
info!("closed a stream");
if let Err(err) = rtrn_tx.send(RtrnMsg::ClosedStream { pid, sid }) {
error!(?err, "couldn't notify of closed stream");
}
}
},
Frame::DataHeader { mid, sid, length } => {
debug!("Data Header {}", sid);
let imsg = InCommingMessage {
buffer: MessageBuffer { data: Vec::new() },
length,
mid,
sid,
};
let mut found = false;
for s in &mut self.streams {
if s.sid() == sid {
//TODO: move to Hashmap, so slow
s.to_receive.push_back(imsg);
found = true;
break;
}
}
if !found {
error!("couldn't find stream with sid: {}", sid);
}
},
Frame::Data {
id,
start: _, //TODO: use start to verify!
mut data,
} => {
debug!("Data Package {}, len: {}", id, data.len());
let mut found = false;
for s in &mut self.streams {
let mut pos = None;
for i in 0..s.to_receive.len() {
let m = &mut s.to_receive[i];
if m.mid == id {
found = true;
m.buffer.data.append(&mut data);
if m.buffer.data.len() as u64 == m.length {
pos = Some(i);
break;
};
};
}
if let Some(pos) = pos {
let sid = s.sid();
let mut tx = s.msg_tx();
for m in s.to_receive.drain(pos..pos + 1) {
info!(?sid, ? m.mid, "received message");
//TODO: I dislike that block_on here!
block_on(async {
if let Err(err) = tx.send(m).await {
error!(
?err,
"cannot notify that message was received, probably stream \
is already closed"
);
};
});
}
}
}
if !found {
error!("couldn't find stream with mid: {}", id);
}
},
Frame::Raw(data) => {
info!("Got a Raw Package {:?}", data);
},
.as_bytes()
.to_vec(),
))
.await
.unwrap();
frame_sender.send(Frame::Shutdown {}).await.unwrap();
*self.send_state.write().await = ChannelState::Shutdown;
}
return Err(());
}
Ok(())
}
// This function will tick all streams according to priority and add them to the
// send queue
fn tick_streams(&mut self) {
//ignoring prio for now
//TODO: fix prio
for s in &mut self.streams {
let mut remove = false;
let sid = s.sid();
if let Some(m) = s.to_send.front_mut() {
let to_send = std::cmp::min(m.buffer.data.len() as u64 - m.cursor, 1400);
if to_send > 0 {
if m.cursor == 0 {
let mid = s.mid_pool.next();
m.mid = Some(mid);
self.send_queue.push_back(Frame::DataHeader {
mid,
sid,
length: m.buffer.data.len() as u64,
});
}
self.send_queue.push_back(Frame::Data {
id: m.mid.unwrap(),
start: m.cursor,
data: m.buffer.data[m.cursor as usize..(m.cursor + to_send) as usize]
.to_vec(),
});
};
m.cursor += to_send;
if m.cursor == m.buffer.data.len() as u64 {
remove = true;
debug!(?m.mid, "finish message")
}
}
if remove {
s.to_send.pop_front();
pub(crate) async fn send_handshake(&self, part_in_sender: &mut mpsc::UnboundedSender<Frame>) {
part_in_sender
.send(Frame::Handshake {
magic_number: VELOREN_MAGIC_NUMBER.to_string(),
version: VELOREN_NETWORK_VERSION,
})
.await
.unwrap();
*self.send_state.write().await = ChannelState::Handshake;
}
pub(crate) async fn send_pid(&self, part_in_sender: &mut mpsc::UnboundedSender<Frame>) {
part_in_sender
.send(Frame::ParticipantId {
pid: self.local_pid,
})
.await
.unwrap();
*self.send_state.write().await = ChannelState::Pid;
}
/*
pub async fn run(&mut self) {
//let (incomming_sender, incomming_receiver) = mpsc::unbounded();
futures::join!(self.listen_manager(), self.send_outgoing());
}
pub async fn listen_manager(&self) {
let (mut listen_sender, mut listen_receiver) = mpsc::unbounded::<Address>();
while self.closed.load(Ordering::Relaxed) {
while let Some(address) = listen_receiver.next().await {
let (end_sender, end_receiver) = oneshot::channel::<()>();
task::spawn(channel_creator(address, end_receiver));
}
}
}
fn wrong_shutdown(&mut self, raw: &[u8]) {
#[cfg(debug_assertions)]
{
debug!("sending client instructions before killing");
self.send_queue.push_back(Frame::Raw(raw.to_vec()));
self.send_queue.push_back(Frame::Shutdown {});
self.send_shutdown = true;
pub async fn send_outgoing(&self) {
//let prios = prios::PrioManager;
while self.closed.load(Ordering::Relaxed) {
}
}
pub(crate) fn open_stream(
&mut self,
sid: Sid,
prio: u8,
promises: EnumSet<Promise>,
msg_tx: futures::channel::mpsc::UnboundedSender<InCommingMessage>,
) {
// validate promises
trace!(?sid, "going to open a new stream");
let stream = IntStream::new(sid, prio, promises.clone(), msg_tx);
trace!(?sid, "1");
self.streams.push(stream);
trace!(?sid, "2");
trace!(?self.streams, ?self.randomno, "2b");
if self.streams.len() >= 0 {
// breakpoint here
let a = self.streams.len();
if a > 1000 {
//this will never happen but is a blackbox to catch a
panic!("dasd");
}
}
self.send_queue.push_back(Frame::OpenStream {
sid,
prio,
promises,
});
}
pub(crate) fn close_stream(&mut self, sid: Sid) {
trace!(?self.streams, "--CLOSE STREAM-- going to modify streams");
self.streams.retain(|stream| stream.sid() != sid);
trace!(?self.streams, "--CLOSE STREAM-- did to modify streams");
self.send_queue.push_back(Frame::CloseStream { sid });
}
pub(crate) fn handshake(&mut self) {
self.send_queue.push_back(Frame::Handshake {
magic_number: VELOREN_MAGIC_NUMBER.to_string(),
version: VELOREN_NETWORK_VERSION,
});
self.send_handshake = true;
}
pub(crate) fn shutdown(&mut self) {
self.send_queue.push_back(Frame::Shutdown {});
self.send_shutdown = true;
}
pub(crate) fn send(&mut self, outgoing: OutGoingMessage) {
trace!(?outgoing.sid, "3");
trace!(?self.streams, ?self.randomno, "3b");
for s in self.streams.iter_mut() {
if s.sid() == outgoing.sid {
s.to_send.push_back(outgoing);
return;
}
}
trace!(?outgoing.sid, "4");
let sid = &outgoing.sid;
error!(?sid, "couldn't send message, didn't found sid")
}
pub(crate) fn get_protocol(&self) -> &ChannelProtocols { &self.protocol }
}*/
}

View File

@ -1,178 +0,0 @@
/*
Most of the internals take place in it's own worker-thread.
This folder contains all this outsourced calculation.
This controller contains the interface to communicate with the thread,
communication is done via channels.
*/
use crate::{
api::Stream,
metrics::NetworkMetrics,
types::{CtrlMsg, Pid, RtrnMsg, Sid},
worker::Worker,
};
use mio::{self, Poll, PollOpt, Ready, Token};
use mio_extras::channel;
use std::{
collections::HashMap,
sync::{mpsc, Arc, RwLock, RwLockReadGuard},
};
use tlid;
use tracing::*;
use uvth::ThreadPool;
pub struct ControllerParticipant {
pub sid_pool: RwLock<tlid::Pool<tlid::Wrapping<Sid>>>,
//TODO: move this in a future aware variant! via futures Channels
stream_open_tx: mpsc::Sender<Stream>,
pub stream_open_rx: mpsc::Receiver<Stream>,
pub stream_close_txs: RwLock<HashMap<Sid, mpsc::Sender<()>>>,
}
/*
The MioWorker runs in it's own thread,
it has a given set of Channels to work with.
It is monitored, and when it's thread is fully loaded it can be splitted up into 2 MioWorkers
*/
pub struct Controller {
ctrl_tx: channel::Sender<CtrlMsg>,
rtrn_rx: mpsc::Receiver<RtrnMsg>,
participant_connect_tx: mpsc::Sender<Pid>,
participant_connect_rx: mpsc::Receiver<Pid>,
participants: RwLock<HashMap<Pid, ControllerParticipant>>,
}
impl Controller {
pub const CTRL_TOK: Token = Token(0);
pub fn new(
wid: u64,
pid: uuid::Uuid,
thread_pool: Arc<ThreadPool>,
mut token_pool: tlid::Pool<tlid::Wrapping<usize>>,
metrics: Arc<Option<NetworkMetrics>>,
sid_backup_per_participant: Arc<RwLock<HashMap<Pid, tlid::Pool<tlid::Checked<Sid>>>>>,
) -> Self {
let poll = Arc::new(Poll::new().unwrap());
let (ctrl_tx, ctrl_rx) = channel::channel();
let (rtrn_tx, rtrn_rx) = mpsc::channel();
let (participant_connect_tx, participant_connect_rx) = mpsc::channel();
poll.register(&ctrl_rx, Self::CTRL_TOK, Ready::readable(), PollOpt::edge())
.unwrap();
// reserve 10 tokens in case they start with 0, //TODO: cleaner method
for _ in 0..10 {
token_pool.next();
}
thread_pool.execute(move || {
let w = wid;
let span = span!(Level::INFO, "worker", ?w);
let _enter = span.enter();
let mut worker = Worker::new(
pid,
poll,
metrics,
sid_backup_per_participant,
token_pool,
ctrl_rx,
rtrn_tx,
);
worker.run();
});
let participants = RwLock::new(HashMap::new());
Controller {
ctrl_tx,
rtrn_rx,
participant_connect_rx,
participant_connect_tx,
participants,
}
}
//TODO: split 4->5 MioWorkers and merge 5->4 MioWorkers
pub(crate) fn get_tx(&self) -> channel::Sender<CtrlMsg> { self.ctrl_tx.clone() }
pub(crate) fn get_participant_connect_rx(&self) -> &mpsc::Receiver<Pid> {
&self.participant_connect_rx
}
pub(crate) fn tick(&self) {
for msg in self.rtrn_rx.try_iter() {
match msg {
/*TODO: WAIT, THIS ASSUMES CONNECTED PARTICIPANT IS ONLY EVER TRIGGERED ONCE PER CONTROLLER
that means, that it can happen multiple time for the same participant on multiple controller,
and even multiple channel on one worker shouldn't trigger it*/
RtrnMsg::ConnectedParticipant {
pid,
controller_sids,
} => {
let mut parts = self.participants.write().unwrap();
debug!(
?pid,
"A new participant connected to this channel, we assign it the sid pool"
);
let (stream_open_tx, stream_open_rx) = mpsc::channel();
let part = ControllerParticipant {
sid_pool: RwLock::new(controller_sids),
stream_open_tx,
stream_open_rx,
stream_close_txs: RwLock::new(HashMap::new()),
};
parts.insert(pid.clone(), part);
self.participant_connect_tx.send(pid).unwrap();
},
RtrnMsg::OpendStream {
pid,
sid,
prio: _,
msg_rx,
promises: _,
} => {
trace!(
?pid,
?sid,
"A new stream was opened on this channel, we assign it the participant"
);
let parts = self.participants.read().unwrap();
if let Some(p) = parts.get(&pid) {
let (stream_close_tx, stream_close_rx) = mpsc::channel();
p.stream_close_txs
.write()
.unwrap()
.insert(sid, stream_close_tx);
p.stream_open_tx
.send(Stream::new(
sid,
pid,
stream_close_rx,
msg_rx,
self.ctrl_tx.clone(),
))
.unwrap();
}
},
RtrnMsg::ClosedStream { pid, sid } => {
trace!(?pid, ?sid, "Stream got closeed, will route message");
let parts = self.participants.read().unwrap();
if let Some(p) = parts.get(&pid) {
if let Some(tx) = p.stream_close_txs.read().unwrap().get(&sid) {
tx.send(()).unwrap();
trace!(?pid, ?sid, "routed message");
}
}
},
_ => {},
}
}
}
pub(crate) fn participants(&self) -> RwLockReadGuard<HashMap<Pid, ControllerParticipant>> {
self.participants.read().unwrap()
}
}
impl Drop for Controller {
fn drop(&mut self) { let _ = self.ctrl_tx.send(CtrlMsg::Shutdown); }
}

37
network/src/frames.rs Normal file
View File

@ -0,0 +1,37 @@
use crate::types::{Mid, Pid, Prio, Promises, Sid};
use serde::{Deserialize, Serialize};
// Used for Communication between Channel <----(TCP/UDP)----> Channel
#[derive(Serialize, Deserialize, Debug)]
pub enum Frame {
Handshake {
magic_number: String,
version: [u32; 3],
},
ParticipantId {
pid: Pid,
},
Shutdown, /* Shutsdown this channel gracefully, if all channels are shut down, Participant
* is deleted */
OpenStream {
sid: Sid,
prio: Prio,
promises: Promises,
},
CloseStream {
sid: Sid,
},
DataHeader {
mid: Mid,
sid: Sid,
length: u64,
},
Data {
id: Mid,
start: u64,
data: Vec<u8>,
},
/* WARNING: Sending RAW is only used for debug purposes in case someone write a new API
* against veloren Server! */
Raw(Vec<u8>),
}

View File

@ -1,16 +1,27 @@
#![feature(trait_alias)]
mod api;
mod async_serde;
mod channel;
mod controller;
mod frames;
mod message;
mod metrics;
mod mpsc;
mod participant;
mod prios;
mod scheduler;
mod tcp;
mod types;
mod udp;
mod worker;
pub use api::{Address, Network};
pub use scheduler::Scheduler;
pub use types::{
Pid, Promises, PROMISES_COMPRESSED, PROMISES_CONSISTENCY, PROMISES_ENCRYPTED,
PROMISES_GUARANTEED_DELIVERY, PROMISES_NONE, PROMISES_ORDERED,
};
/*
pub use api::{
Address, Network, NetworkError, Participant, ParticipantError, Promise, Stream, StreamError,
};
*/

View File

@ -16,7 +16,7 @@ pub(crate) struct MessageBuffer {
pub(crate) struct OutGoingMessage {
pub buffer: Arc<MessageBuffer>,
pub cursor: u64,
pub mid: Option<Mid>,
pub mid: Mid,
pub sid: Sid,
}

View File

@ -1,144 +1 @@
use prometheus::{IntGauge, IntGaugeVec, Opts, Registry};
use std::{
error::Error,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
// 1 NetworkMetrics per Network
pub struct NetworkMetrics {
pub participants_connected: IntGauge,
pub channels_connected: IntGauge,
pub streams_open: IntGauge,
pub worker_count: IntGauge,
pub network_info: IntGauge,
// Frames, seperated by CHANNEL (add PART and PROTOCOL) AND FRAME TYPE,
pub frames_count: IntGaugeVec,
// send Messages, seperated by STREAM (add PART and PROTOCOL, CHANNEL),
pub message_count: IntGaugeVec,
// send Messages bytes, seperated by STREAM (add PART and PROTOCOL, CHANNEL),
pub bytes_send: IntGaugeVec,
// queued Messages, seperated by STREAM (add PART and PROTOCOL, CHANNEL),
pub queue_count: IntGaugeVec,
// worker seperated by CHANNEL (add PART and PROTOCOL),
pub worker_work_time: IntGaugeVec,
// worker seperated by CHANNEL (add PART and PROTOCOL),
pub worker_idle_time: IntGaugeVec,
// ping calculated based on last msg
pub participants_ping: IntGaugeVec,
tick: Arc<AtomicU64>,
}
impl NetworkMetrics {
pub fn new(registry: &Registry, tick: Arc<AtomicU64>) -> Result<Self, Box<dyn Error>> {
let participants_connected = IntGauge::with_opts(Opts::new(
"participants_connected",
"shows the number of participants connected to the network",
))?;
let channels_connected = IntGauge::with_opts(Opts::new(
"channels_connected",
"number of all channels currently connected on the network",
))?;
let streams_open = IntGauge::with_opts(Opts::new(
"streams_open",
"number of all streams currently open on the network",
))?;
let worker_count = IntGauge::with_opts(Opts::new(
"worker_count",
"number of workers currently running",
))?;
let opts = Opts::new("network_info", "Static Network information").const_label(
"version",
&format!(
"{}.{}.{}",
&crate::types::VELOREN_NETWORK_VERSION[0],
&crate::types::VELOREN_NETWORK_VERSION[1],
&crate::types::VELOREN_NETWORK_VERSION[2]
),
);
let network_info = IntGauge::with_opts(opts)?;
let frames_count = IntGaugeVec::from(IntGaugeVec::new(
Opts::new(
"frames_count",
"time in ns requiered for a tick of the server",
),
&["channel"],
)?);
let message_count = IntGaugeVec::from(IntGaugeVec::new(
Opts::new(
"message_count",
"time in ns requiered for a tick of the server",
),
&["channel"],
)?);
let bytes_send = IntGaugeVec::from(IntGaugeVec::new(
Opts::new(
"bytes_send",
"time in ns requiered for a tick of the server",
),
&["channel"],
)?);
let queue_count = IntGaugeVec::from(IntGaugeVec::new(
Opts::new(
"queue_count",
"time in ns requiered for a tick of the server",
),
&["channel"],
)?);
let worker_work_time = IntGaugeVec::from(IntGaugeVec::new(
Opts::new(
"worker_work_time",
"time in ns requiered for a tick of the server",
),
&["channel"],
)?);
let worker_idle_time = IntGaugeVec::from(IntGaugeVec::new(
Opts::new(
"worker_idle_time",
"time in ns requiered for a tick of the server",
),
&["channel"],
)?);
let participants_ping = IntGaugeVec::from(IntGaugeVec::new(
Opts::new(
"participants_ping",
"time in ns requiered for a tick of the server",
),
&["channel"],
)?);
registry.register(Box::new(participants_connected.clone()))?;
registry.register(Box::new(channels_connected.clone()))?;
registry.register(Box::new(streams_open.clone()))?;
registry.register(Box::new(worker_count.clone()))?;
registry.register(Box::new(network_info.clone()))?;
registry.register(Box::new(frames_count.clone()))?;
registry.register(Box::new(message_count.clone()))?;
registry.register(Box::new(bytes_send.clone()))?;
registry.register(Box::new(queue_count.clone()))?;
registry.register(Box::new(worker_work_time.clone()))?;
registry.register(Box::new(worker_idle_time.clone()))?;
registry.register(Box::new(participants_ping.clone()))?;
Ok(Self {
participants_connected,
channels_connected,
streams_open,
worker_count,
network_info,
frames_count,
message_count,
bytes_send,
queue_count,
worker_work_time,
worker_idle_time,
participants_ping,
tick,
})
}
pub fn _is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 }
}

View File

@ -1,84 +1 @@
use crate::{channel::ChannelProtocol, types::Frame};
use lazy_static::lazy_static; // 1.4.0
use mio_extras::channel::{Receiver, Sender};
use std::{
collections::HashMap,
sync::{Mutex, RwLock},
};
use tracing::*;
lazy_static! {
pub(crate) static ref MPSC_REGISTRY: RwLock<HashMap<u64, Mutex<(Sender<Frame>, Receiver<Frame>)>>> =
RwLock::new(HashMap::new());
}
pub(crate) struct MpscChannel {
endpoint_sender: Sender<Frame>,
endpoint_receiver: Receiver<Frame>,
}
impl MpscChannel {
pub fn new(endpoint_sender: Sender<Frame>, endpoint_receiver: Receiver<Frame>) -> Self {
Self {
endpoint_sender,
endpoint_receiver,
}
}
}
impl ChannelProtocol for MpscChannel {
type Handle = Receiver<Frame>;
/// Execute when ready to read
fn read(&mut self) -> Vec<Frame> {
let mut result = Vec::new();
loop {
match self.endpoint_receiver.try_recv() {
Ok(frame) => {
trace!("incomming message");
result.push(frame);
},
Err(std::sync::mpsc::TryRecvError::Empty) => {
debug!("read would block");
break;
},
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
trace!(?self, "shutdown of mpsc channel detected");
result.push(Frame::Shutdown);
break;
},
};
}
result
}
fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I) {
for frame in frames {
match self.endpoint_sender.send(frame) {
Ok(()) => {
trace!("sended");
},
Err(mio_extras::channel::SendError::Io(e))
if e.kind() == std::io::ErrorKind::WouldBlock =>
{
debug!("write would block");
return;
}
Err(mio_extras::channel::SendError::Disconnected(frame)) => {
trace!(?frame, ?self, "shutdown of mpsc channel detected");
return;
},
Err(e) => {
panic!("{}", e);
},
};
}
}
fn get_handle(&self) -> &Self::Handle { &self.endpoint_receiver }
}
impl std::fmt::Debug for MpscChannel {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", "MPSC") }
}

294
network/src/participant.rs Normal file
View File

@ -0,0 +1,294 @@
use crate::{
api::Stream,
frames::Frame,
message::{InCommingMessage, MessageBuffer, OutGoingMessage},
types::{Cid, Pid, Prio, Promises, Sid},
};
use async_std::sync::RwLock;
use futures::{
channel::{mpsc, oneshot},
sink::SinkExt,
stream::StreamExt,
};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use tracing::*;
#[derive(Debug)]
struct ControlChannels {
stream_open_receiver: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender<Stream>)>,
stream_opened_sender: mpsc::UnboundedSender<Stream>,
transfer_channel_receiver: mpsc::UnboundedReceiver<(Cid, mpsc::UnboundedSender<Frame>)>,
frame_recv_receiver: mpsc::UnboundedReceiver<Frame>,
shutdown_api_receiver: mpsc::UnboundedReceiver<Sid>,
shutdown_api_sender: mpsc::UnboundedSender<Sid>,
send_outgoing: Arc<Mutex<std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>>>, //api
frame_send_receiver: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>, //scheduler
}
#[derive(Debug)]
pub struct BParticipant {
remote_pid: Pid,
offset_sid: Sid,
channels: RwLock<Vec<(Cid, mpsc::UnboundedSender<Frame>)>>,
streams: RwLock<
HashMap<
Sid,
(
Prio,
Promises,
mpsc::UnboundedSender<InCommingMessage>,
oneshot::Sender<()>,
),
>,
>,
run_channels: Option<ControlChannels>,
}
impl BParticipant {
pub(crate) fn new(
remote_pid: Pid,
offset_sid: Sid,
send_outgoing: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
) -> (
Self,
mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>,
mpsc::UnboundedReceiver<Stream>,
mpsc::UnboundedSender<(Cid, mpsc::UnboundedSender<Frame>)>,
mpsc::UnboundedSender<Frame>,
mpsc::UnboundedSender<(Pid, Sid, Frame)>,
) {
let (stream_open_sender, stream_open_receiver) =
mpsc::unbounded::<(Prio, Promises, oneshot::Sender<Stream>)>();
let (stream_opened_sender, stream_opened_receiver) = mpsc::unbounded::<Stream>();
let (transfer_channel_sender, transfer_channel_receiver) =
mpsc::unbounded::<(Cid, mpsc::UnboundedSender<Frame>)>();
let (frame_recv_sender, frame_recv_receiver) = mpsc::unbounded::<Frame>();
//let (shutdown1_sender, shutdown1_receiver) = oneshot::channel();
let (shutdown_api_sender, shutdown_api_receiver) = mpsc::unbounded();
let (frame_send_sender, frame_send_receiver) = mpsc::unbounded::<(Pid, Sid, Frame)>();
let run_channels = Some(ControlChannels {
stream_open_receiver,
stream_opened_sender,
transfer_channel_receiver,
frame_recv_receiver,
//shutdown_sender: shutdown1_sender,
shutdown_api_receiver,
shutdown_api_sender,
send_outgoing: Arc::new(Mutex::new(send_outgoing)),
frame_send_receiver,
});
(
Self {
remote_pid,
offset_sid,
channels: RwLock::new(vec![]),
streams: RwLock::new(HashMap::new()),
run_channels,
},
stream_open_sender,
stream_opened_receiver,
transfer_channel_sender,
frame_recv_sender,
frame_send_sender,
//shutdown1_receiver,
)
}
pub async fn run(mut self) {
let run_channels = self.run_channels.take().unwrap();
futures::join!(
self.transfer_channel_manager(run_channels.transfer_channel_receiver),
self.open_manager(
run_channels.stream_open_receiver,
run_channels.shutdown_api_sender.clone(),
run_channels.send_outgoing.clone(),
),
self.handle_frames(
run_channels.frame_recv_receiver,
run_channels.stream_opened_sender,
run_channels.shutdown_api_sender,
run_channels.send_outgoing.clone(),
),
self.send_manager(run_channels.frame_send_receiver),
self.shutdown_manager(run_channels.shutdown_api_receiver,),
);
}
async fn send_frame(&self, frame: Frame) {
// find out ideal channel
//TODO: just take first
if let Some((_cid, channel)) = self.channels.write().await.get_mut(0) {
channel.send(frame).await.unwrap();
} else {
error!("participant has no channel to communicate on");
}
}
async fn handle_frames(
&self,
mut frame_recv_receiver: mpsc::UnboundedReceiver<Frame>,
mut stream_opened_sender: mpsc::UnboundedSender<Stream>,
shutdown_api_sender: mpsc::UnboundedSender<Sid>,
send_outgoing: Arc<Mutex<std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>>>,
) {
trace!("start handle_frames");
let send_outgoing = { send_outgoing.lock().unwrap().clone() };
let mut messages = HashMap::new();
while let Some(frame) = frame_recv_receiver.next().await {
debug!("handling frame");
match frame {
Frame::OpenStream {
sid,
prio,
promises,
} => {
let send_outgoing = send_outgoing.clone();
let stream = self
.create_stream(sid, prio, promises, send_outgoing, &shutdown_api_sender)
.await;
stream_opened_sender.send(stream).await.unwrap();
trace!("opened frame from remote");
},
Frame::CloseStream { sid } => {
if let Some((_, _, _, sender)) = self.streams.write().await.remove(&sid) {
sender.send(()).unwrap();
} else {
error!("unreachable, coudln't send close stream event!");
}
trace!("closed frame from remote");
},
Frame::DataHeader { mid, sid, length } => {
let imsg = InCommingMessage {
buffer: MessageBuffer { data: Vec::new() },
length,
mid,
sid,
};
messages.insert(mid, imsg);
},
Frame::Data {
id,
start: _,
mut data,
} => {
let finished = if let Some(imsg) = messages.get_mut(&id) {
imsg.buffer.data.append(&mut data);
imsg.buffer.data.len() as u64 == imsg.length
} else {
false
};
if finished {
debug!(?id, "finished receiving message");
let imsg = messages.remove(&id).unwrap();
if let Some((_, _, sender, _)) =
self.streams.write().await.get_mut(&imsg.sid)
{
sender.send(imsg).await.unwrap();
}
}
},
_ => unreachable!("never reaches frame!"),
}
}
trace!("stop handle_frames");
}
async fn send_manager(
&self,
mut frame_send_receiver: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>,
) {
trace!("start send_manager");
while let Some((_, _, frame)) = frame_send_receiver.next().await {
self.send_frame(frame).await;
}
trace!("stop send_manager");
}
async fn transfer_channel_manager(
&self,
mut transfer_channel_receiver: mpsc::UnboundedReceiver<(Cid, mpsc::UnboundedSender<Frame>)>,
) {
trace!("start transfer_channel_manager");
while let Some((cid, sender)) = transfer_channel_receiver.next().await {
debug!(?cid, "got a new channel to listen on");
self.channels.write().await.push((cid, sender));
}
trace!("stop transfer_channel_manager");
}
async fn open_manager(
&self,
mut stream_open_receiver: mpsc::UnboundedReceiver<(
Prio,
Promises,
oneshot::Sender<Stream>,
)>,
shutdown_api_sender: mpsc::UnboundedSender<Sid>,
send_outgoing: Arc<Mutex<std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>>>,
) {
trace!("start open_manager");
let send_outgoing = {
//fighting the borrow checker ;)
send_outgoing.lock().unwrap().clone()
};
let mut stream_ids = self.offset_sid;
while let Some((prio, promises, sender)) = stream_open_receiver.next().await {
debug!(?prio, ?promises, "got request to open a new steam");
let send_outgoing = send_outgoing.clone();
let sid = stream_ids;
let stream = self
.create_stream(sid, prio, promises, send_outgoing, &shutdown_api_sender)
.await;
self.send_frame(Frame::OpenStream {
sid,
prio,
promises,
})
.await;
sender.send(stream).unwrap();
stream_ids += 1;
}
trace!("stop open_manager");
}
async fn shutdown_manager(&self, mut shutdown_api_receiver: mpsc::UnboundedReceiver<Sid>) {
trace!("start shutdown_manager");
while let Some(sid) = shutdown_api_receiver.next().await {
trace!(?sid, "got request to close steam");
self.streams.write().await.remove(&sid);
self.send_frame(Frame::CloseStream { sid }).await;
}
trace!("stop shutdown_manager");
}
async fn create_stream(
&self,
sid: Sid,
prio: Prio,
promises: Promises,
send_outgoing: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
shutdown_api_sender: &mpsc::UnboundedSender<Sid>,
) -> Stream {
let (msg_recv_sender, msg_recv_receiver) = mpsc::unbounded::<InCommingMessage>();
let (shutdown1_sender, shutdown1_receiver) = oneshot::channel();
self.streams
.write()
.await
.insert(sid, (prio, promises, msg_recv_sender, shutdown1_sender));
Stream::new(
self.remote_pid,
sid,
prio,
promises,
send_outgoing,
msg_recv_receiver,
shutdown1_receiver,
shutdown_api_sender.clone(),
)
}
}

View File

@ -1,65 +1,29 @@
/*
This will become a single class,
it contains a list of all open Channels and all Participants and all streams.
Important, we need to change stream ids to be unique per participant
and msg ids need to be unique per participant too. The other way would be to always send sid with Data Frame but this is to much overhead.
We need a external (like timer like) Future that opens a thread in threadpool, and is Ready once serialized
We should focus in this implementation on the routing side, Prio and choosing the correct Protocol.
A Message should be delivered over 2 Channels, e.g. Create Info via TCP and data via UDP. keep in mind that UDP might be read before TCP is read...
maybe even a future that builds together a message from incremental steps.
Or a future that sends a message, however on each seend prio needs to be considered, maybe overkill.
it should be quite easy as all is in one thread now, but i am still not sure if its in the same as the network, or if we still have a sperate one,
probably start with a seperate thread for now.
Focus on the routing for now, and ignoring protocols and details...
*/
/*
Priorities are handled the following way.
Prios from 0-63 are allowed.
all 5 numbers the throughput i halved.
E.g. in the same time 100 prio0 messages are send, only 50 prio5, 25 prio10, 12 prio15 or 6 prio20 messages are send.
Node: TODO: prio0 will be send immeadiatly when found!
Note: TODO: prio0 will be send immeadiatly when found!
*/
/*
algo:
let past = [u64, 100] = [0,0,0,0..]
send_prio0()
past[0] += 100;
#check_next_prio
if past[0] - past[1] > prio_numbers[1] {
sendprio1();
past[1] += 100;
if past[0] - past[2] > prio_numbers[2] {
sendprio2();
past[2] += 100;
}
}
*/
use crate::{message::OutGoingMessage, types::Frame};
use crate::{
frames::Frame,
message::OutGoingMessage,
types::{Pid, Prio, Sid},
};
use std::{
collections::{HashSet, VecDeque},
sync::mpsc::{channel, Receiver, Sender},
};
use tracing::*;
const PRIO_MAX: usize = 64;
struct PrioManager {
pub(crate) struct PrioManager {
points: [u32; PRIO_MAX],
messages: [VecDeque<OutGoingMessage>; PRIO_MAX],
messages_tx: Sender<(u8, OutGoingMessage)>,
messages_rx: Receiver<(u8, OutGoingMessage)>,
messages: [VecDeque<(Pid, Sid, OutGoingMessage)>; PRIO_MAX],
messages_rx: Receiver<(Prio, Pid, Sid, OutGoingMessage)>,
queued: HashSet<u8>,
}
@ -73,89 +37,91 @@ impl PrioManager {
310419, 356578, 409600, 470507, 540470, 620838,
];
pub fn new() -> Self {
pub fn new() -> (Self, Sender<(Prio, Pid, Sid, OutGoingMessage)>) {
let (messages_tx, messages_rx) = channel();
Self {
points: [0; PRIO_MAX],
messages: [
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
],
(
Self {
points: [0; PRIO_MAX],
messages: [
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
],
messages_rx,
queued: HashSet::new(), //TODO: optimize with u64 and 64 bits
},
messages_tx,
messages_rx,
queued: HashSet::new(), //TODO: optimize with u64 and 64 bits
}
)
}
fn tick(&mut self) {
// Check Range
for (prio, msg) in self.messages_rx.try_iter() {
for (prio, pid, sid, msg) in self.messages_rx.try_iter() {
debug_assert!(prio as usize <= PRIO_MAX);
println!("tick {}", prio);
trace!(?prio, ?sid, ?pid, "tick");
self.queued.insert(prio);
self.messages[prio as usize].push_back(msg);
self.messages[prio as usize].push_back((pid, sid, msg));
}
}
@ -178,30 +144,30 @@ impl PrioManager {
}
/// returns if msg is empty
fn tick_msg<E: Extend<Frame>>(msg: &mut OutGoingMessage, frames: &mut E) -> bool {
fn tick_msg<E: Extend<(Pid, Sid, Frame)>>(
msg: &mut OutGoingMessage,
msg_pid: Pid,
msg_sid: Sid,
frames: &mut E,
) -> bool {
let to_send = std::cmp::min(
msg.buffer.data.len() as u64 - msg.cursor,
Self::FRAME_DATA_SIZE,
);
if to_send > 0 {
if msg.cursor == 0 {
//TODO: OutGoingMessage MUST HAVE A MID AT THIS POINT ALREADY! AS I HAVE NO
// IDEA OF STREAMS HERE!
debug_assert!(msg.mid.is_some());
frames.extend(std::iter::once(Frame::DataHeader {
mid: msg
.mid
.expect("read comment 3 lines above this error message 41231255661"),
frames.extend(std::iter::once((msg_pid, msg_sid, Frame::DataHeader {
mid: msg.mid,
sid: msg.sid,
length: msg.buffer.data.len() as u64,
}));
})));
}
frames.extend(std::iter::once(Frame::Data {
id: msg.mid.unwrap(),
frames.extend(std::iter::once((msg_pid, msg_sid, Frame::Data {
id: msg.mid,
start: msg.cursor,
data: msg.buffer.data[msg.cursor as usize..(msg.cursor + to_send) as usize]
.to_vec(),
}));
})));
};
msg.cursor += to_send;
msg.cursor >= msg.buffer.data.len() as u64
@ -216,26 +182,30 @@ impl PrioManager {
/// high prio messages!
/// - if no_of_frames is too low you wont saturate your Socket fully, thus
/// have a lower bandwidth as possible
pub fn fill_frames<E: Extend<Frame>>(&mut self, no_of_frames: usize, frames: &mut E) {
pub fn fill_frames<E: Extend<(Pid, Sid, Frame)>>(
&mut self,
no_of_frames: usize,
frames: &mut E,
) {
self.tick();
for _ in 0..no_of_frames {
match self.calc_next_prio() {
Some(prio) => {
println!("dasd {}", prio);
trace!(?prio, "handle next prio");
self.points[prio as usize] += Self::PRIOS[prio as usize];
//pop message from front of VecDeque, handle it and push it back, so that all
// => messages with same prio get a fair chance :)
//TODO: evalaute not poping every time
match self.messages[prio as usize].pop_front() {
Some(mut msg) => {
if Self::tick_msg(&mut msg, frames) {
Some((pid, sid, mut msg)) => {
if Self::tick_msg(&mut msg, pid, sid, frames) {
//debug!(?m.mid, "finish message");
//check if prio is empty
if self.messages[prio as usize].is_empty() {
self.queued.remove(&prio);
}
} else {
self.messages[prio as usize].push_back(msg);
self.messages[prio as usize].push_back((pid, sid, msg));
//trace!(?m.mid, "repush message");
}
},
@ -251,47 +221,60 @@ impl PrioManager {
}
}
}
}
pub fn get_tx(&self) -> &Sender<(u8, OutGoingMessage)> { &self.messages_tx }
impl std::fmt::Debug for PrioManager {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut cnt = 0;
for m in self.messages.iter() {
cnt += m.len();
}
write!(f, "PrioManager(len: {}, queued: {:?})", cnt, &self.queued,)
}
}
#[cfg(test)]
mod tests {
use crate::{
frames::Frame,
message::{MessageBuffer, OutGoingMessage},
prios::*,
types::{Frame, Mid, Sid},
types::{Pid, Prio, Sid},
};
use std::{collections::VecDeque, sync::Arc};
fn mock_out(prio: u8, sid: Sid) -> (u8, OutGoingMessage) {
(prio, OutGoingMessage {
const SIZE: u64 = PrioManager::FRAME_DATA_SIZE;
const USIZE: usize = PrioManager::FRAME_DATA_SIZE as usize;
fn mock_out(prio: Prio, sid: Sid) -> (Prio, Pid, Sid, OutGoingMessage) {
(prio, Pid::fake(0), sid, OutGoingMessage {
buffer: Arc::new(MessageBuffer {
data: vec![48, 49, 50],
}),
cursor: 0,
mid: Some(1),
mid: 1,
sid,
})
}
fn mock_out_large(prio: u8, sid: Sid) -> (u8, OutGoingMessage) {
const MSG_SIZE: usize = PrioManager::FRAME_DATA_SIZE as usize;
let mut data = vec![48; MSG_SIZE];
data.append(&mut vec![49; MSG_SIZE]);
fn mock_out_large(prio: Prio, sid: Sid) -> (Prio, Pid, Sid, OutGoingMessage) {
let mut data = vec![48; USIZE];
data.append(&mut vec![49; USIZE]);
data.append(&mut vec![50; 20]);
(prio, OutGoingMessage {
(prio, Pid::fake(0), sid, OutGoingMessage {
buffer: Arc::new(MessageBuffer { data }),
cursor: 0,
mid: Some(1),
mid: 1,
sid,
})
}
fn assert_header(frames: &mut VecDeque<Frame>, f_sid: Sid, f_length: u64) {
fn assert_header(frames: &mut VecDeque<(Pid, Sid, Frame)>, f_sid: Sid, f_length: u64) {
let frame = frames
.pop_front()
.expect("frames vecdeque doesn't contain enough frames!");
.expect("frames vecdeque doesn't contain enough frames!")
.2;
if let Frame::DataHeader { mid, sid, length } = frame {
assert_eq!(mid, 1);
assert_eq!(sid, f_sid);
@ -301,10 +284,11 @@ mod tests {
}
}
fn assert_data(frames: &mut VecDeque<Frame>, f_start: u64, f_data: Vec<u8>) {
fn assert_data(frames: &mut VecDeque<(Pid, Sid, Frame)>, f_start: u64, f_data: Vec<u8>) {
let frame = frames
.pop_front()
.expect("frames vecdeque doesn't contain enough frames!");
.expect("frames vecdeque doesn't contain enough frames!")
.2;
if let Frame::Data { id, start, data } = frame {
assert_eq!(id, 1);
assert_eq!(start, f_start);
@ -316,8 +300,8 @@ mod tests {
#[test]
fn single_p16() {
let mut mgr = PrioManager::new();
mgr.get_tx().send(mock_out(16, 1337)).unwrap();
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out(16, 1337)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(100, &mut frames);
@ -328,9 +312,9 @@ mod tests {
#[test]
fn single_p16_p20() {
let mut mgr = PrioManager::new();
mgr.get_tx().send(mock_out(16, 1337)).unwrap();
mgr.get_tx().send(mock_out(20, 42)).unwrap();
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out(16, 1337)).unwrap();
tx.send(mock_out(20, 42)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(100, &mut frames);
@ -343,9 +327,9 @@ mod tests {
#[test]
fn single_p20_p16() {
let mut mgr = PrioManager::new();
mgr.get_tx().send(mock_out(20, 42)).unwrap();
mgr.get_tx().send(mock_out(16, 1337)).unwrap();
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out(20, 42)).unwrap();
tx.send(mock_out(16, 1337)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(100, &mut frames);
@ -358,20 +342,20 @@ mod tests {
#[test]
fn multiple_p16_p20() {
let mut mgr = PrioManager::new();
mgr.get_tx().send(mock_out(20, 2)).unwrap();
mgr.get_tx().send(mock_out(16, 1)).unwrap();
mgr.get_tx().send(mock_out(16, 3)).unwrap();
mgr.get_tx().send(mock_out(16, 5)).unwrap();
mgr.get_tx().send(mock_out(20, 4)).unwrap();
mgr.get_tx().send(mock_out(20, 7)).unwrap();
mgr.get_tx().send(mock_out(16, 6)).unwrap();
mgr.get_tx().send(mock_out(20, 10)).unwrap();
mgr.get_tx().send(mock_out(16, 8)).unwrap();
mgr.get_tx().send(mock_out(20, 12)).unwrap();
mgr.get_tx().send(mock_out(16, 9)).unwrap();
mgr.get_tx().send(mock_out(16, 11)).unwrap();
mgr.get_tx().send(mock_out(20, 13)).unwrap();
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out(20, 2)).unwrap();
tx.send(mock_out(16, 1)).unwrap();
tx.send(mock_out(16, 3)).unwrap();
tx.send(mock_out(16, 5)).unwrap();
tx.send(mock_out(20, 4)).unwrap();
tx.send(mock_out(20, 7)).unwrap();
tx.send(mock_out(16, 6)).unwrap();
tx.send(mock_out(20, 10)).unwrap();
tx.send(mock_out(16, 8)).unwrap();
tx.send(mock_out(20, 12)).unwrap();
tx.send(mock_out(16, 9)).unwrap();
tx.send(mock_out(16, 11)).unwrap();
tx.send(mock_out(20, 13)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(100, &mut frames);
@ -384,20 +368,20 @@ mod tests {
#[test]
fn multiple_fill_frames_p16_p20() {
let mut mgr = PrioManager::new();
mgr.get_tx().send(mock_out(20, 2)).unwrap();
mgr.get_tx().send(mock_out(16, 1)).unwrap();
mgr.get_tx().send(mock_out(16, 3)).unwrap();
mgr.get_tx().send(mock_out(16, 5)).unwrap();
mgr.get_tx().send(mock_out(20, 4)).unwrap();
mgr.get_tx().send(mock_out(20, 7)).unwrap();
mgr.get_tx().send(mock_out(16, 6)).unwrap();
mgr.get_tx().send(mock_out(20, 10)).unwrap();
mgr.get_tx().send(mock_out(16, 8)).unwrap();
mgr.get_tx().send(mock_out(20, 12)).unwrap();
mgr.get_tx().send(mock_out(16, 9)).unwrap();
mgr.get_tx().send(mock_out(16, 11)).unwrap();
mgr.get_tx().send(mock_out(20, 13)).unwrap();
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out(20, 2)).unwrap();
tx.send(mock_out(16, 1)).unwrap();
tx.send(mock_out(16, 3)).unwrap();
tx.send(mock_out(16, 5)).unwrap();
tx.send(mock_out(20, 4)).unwrap();
tx.send(mock_out(20, 7)).unwrap();
tx.send(mock_out(16, 6)).unwrap();
tx.send(mock_out(20, 10)).unwrap();
tx.send(mock_out(16, 8)).unwrap();
tx.send(mock_out(20, 12)).unwrap();
tx.send(mock_out(16, 9)).unwrap();
tx.send(mock_out(16, 11)).unwrap();
tx.send(mock_out(20, 13)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(3, &mut frames);
for i in 1..4 {
@ -415,107 +399,72 @@ mod tests {
#[test]
fn single_large_p16() {
let mut mgr = PrioManager::new();
mgr.get_tx().send(mock_out_large(16, 1)).unwrap();
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out_large(16, 1)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(100, &mut frames);
assert_header(&mut frames, 1, PrioManager::FRAME_DATA_SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![
48;
PrioManager::FRAME_DATA_SIZE as usize
]);
assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE, vec![
49;
PrioManager::FRAME_DATA_SIZE
as usize
]);
assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE * 2, vec![50; 20]);
assert_header(&mut frames, 1, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
assert_data(&mut frames, SIZE, vec![49; USIZE]);
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
assert!(frames.is_empty());
}
#[test]
fn multiple_large_p16() {
let mut mgr = PrioManager::new();
mgr.get_tx().send(mock_out_large(16, 1)).unwrap();
mgr.get_tx().send(mock_out_large(16, 2)).unwrap();
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out_large(16, 1)).unwrap();
tx.send(mock_out_large(16, 2)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(100, &mut frames);
assert_header(&mut frames, 1, PrioManager::FRAME_DATA_SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![
48;
PrioManager::FRAME_DATA_SIZE as usize
]);
assert_header(&mut frames, 2, PrioManager::FRAME_DATA_SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![
48;
PrioManager::FRAME_DATA_SIZE as usize
]);
assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE, vec![
49;
PrioManager::FRAME_DATA_SIZE
as usize
]);
assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE, vec![
49;
PrioManager::FRAME_DATA_SIZE
as usize
]);
assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE * 2, vec![50; 20]);
assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE * 2, vec![50; 20]);
assert_header(&mut frames, 1, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
assert_header(&mut frames, 2, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
assert_data(&mut frames, SIZE, vec![49; USIZE]);
assert_data(&mut frames, SIZE, vec![49; USIZE]);
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
assert!(frames.is_empty());
}
#[test]
fn multiple_large_p16_sudden_p0() {
let mut mgr = PrioManager::new();
mgr.get_tx().send(mock_out_large(16, 1)).unwrap();
mgr.get_tx().send(mock_out_large(16, 2)).unwrap();
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out_large(16, 1)).unwrap();
tx.send(mock_out_large(16, 2)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(3, &mut frames);
assert_header(&mut frames, 1, PrioManager::FRAME_DATA_SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![
48;
PrioManager::FRAME_DATA_SIZE as usize
]);
assert_header(&mut frames, 2, PrioManager::FRAME_DATA_SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![
48;
PrioManager::FRAME_DATA_SIZE as usize
]);
assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE, vec![
49;
PrioManager::FRAME_DATA_SIZE
as usize
]);
assert_header(&mut frames, 1, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
assert_header(&mut frames, 2, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
assert_data(&mut frames, SIZE, vec![49; USIZE]);
mgr.get_tx().send(mock_out(0, 3)).unwrap();
tx.send(mock_out(0, 3)).unwrap();
mgr.fill_frames(100, &mut frames);
assert_header(&mut frames, 3, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE, vec![
49;
PrioManager::FRAME_DATA_SIZE
as usize
]);
assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE * 2, vec![50; 20]);
assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE * 2, vec![50; 20]);
assert_data(&mut frames, SIZE, vec![49; USIZE]);
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
assert_data(&mut frames, SIZE * 2, vec![50; 20]);
assert!(frames.is_empty());
}
#[test]
fn single_p20_thousand_p16_at_once() {
let mut mgr = PrioManager::new();
let (mut mgr, tx) = PrioManager::new();
for _ in 0..998 {
mgr.get_tx().send(mock_out(16, 2)).unwrap();
tx.send(mock_out(16, 2)).unwrap();
}
mgr.get_tx().send(mock_out(20, 1)).unwrap();
mgr.get_tx().send(mock_out(16, 2)).unwrap();
mgr.get_tx().send(mock_out(16, 2)).unwrap();
tx.send(mock_out(20, 1)).unwrap();
tx.send(mock_out(16, 2)).unwrap();
tx.send(mock_out(16, 2)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(2000, &mut frames);
@ -531,16 +480,16 @@ mod tests {
#[test]
fn single_p20_thousand_p16_later() {
let mut mgr = PrioManager::new();
let (mut mgr, tx) = PrioManager::new();
for _ in 0..998 {
mgr.get_tx().send(mock_out(16, 2)).unwrap();
tx.send(mock_out(16, 2)).unwrap();
}
let mut frames = VecDeque::new();
mgr.fill_frames(2000, &mut frames);
//^unimportant frames, gonna be dropped
mgr.get_tx().send(mock_out(20, 1)).unwrap();
mgr.get_tx().send(mock_out(16, 2)).unwrap();
mgr.get_tx().send(mock_out(16, 2)).unwrap();
tx.send(mock_out(20, 1)).unwrap();
tx.send(mock_out(16, 2)).unwrap();
tx.send(mock_out(16, 2)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(2000, &mut frames);

649
network/src/scheduler.rs Normal file
View File

@ -0,0 +1,649 @@
use crate::{
api::{Address, Participant},
channel::Channel,
frames::Frame,
message::OutGoingMessage,
participant::BParticipant,
prios::PrioManager,
types::{Cid, Pid, Prio, Sid},
};
use async_std::sync::RwLock;
use futures::{
channel::{mpsc, oneshot},
executor::ThreadPool,
future::FutureExt,
select,
sink::SinkExt,
stream::StreamExt,
};
use std::{
collections::{HashMap, VecDeque},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
};
use tracing::*;
use tracing_futures::Instrument;
//use futures::prelude::*;
#[derive(Debug)]
struct ControlChannels {
listen_receiver: mpsc::UnboundedReceiver<Address>,
connect_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender<Participant>)>,
connected_sender: mpsc::UnboundedSender<Participant>,
shutdown_receiver: oneshot::Receiver<()>,
prios: PrioManager,
prios_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
}
#[derive(Debug)]
pub struct Scheduler {
local_pid: Pid,
closed: AtomicBool,
pool: Arc<ThreadPool>,
run_channels: Option<ControlChannels>,
participants: Arc<
RwLock<
HashMap<
Pid,
(
mpsc::UnboundedSender<(Cid, mpsc::UnboundedSender<Frame>)>,
mpsc::UnboundedSender<Frame>,
mpsc::UnboundedSender<(Pid, Sid, Frame)>,
),
>,
>,
>,
participant_from_channel: Arc<RwLock<HashMap<Cid, Pid>>>,
channel_ids: Arc<AtomicU64>,
channel_listener: RwLock<HashMap<Address, oneshot::Sender<()>>>,
unknown_channels: Arc<
RwLock<
HashMap<
Cid,
(
mpsc::UnboundedSender<Frame>,
Option<oneshot::Sender<Participant>>,
),
>,
>,
>,
}
impl Scheduler {
pub fn new(
local_pid: Pid,
) -> (
Self,
mpsc::UnboundedSender<Address>,
mpsc::UnboundedSender<(Address, oneshot::Sender<Participant>)>,
mpsc::UnboundedReceiver<Participant>,
oneshot::Sender<()>,
) {
let (listen_sender, listen_receiver) = mpsc::unbounded::<Address>();
let (connect_sender, connect_receiver) =
mpsc::unbounded::<(Address, oneshot::Sender<Participant>)>();
let (connected_sender, connected_receiver) = mpsc::unbounded::<Participant>();
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let (prios, prios_sender) = PrioManager::new();
let run_channels = Some(ControlChannels {
listen_receiver,
connect_receiver,
connected_sender,
shutdown_receiver,
prios,
prios_sender,
});
(
Self {
local_pid,
closed: AtomicBool::new(false),
pool: Arc::new(ThreadPool::new().unwrap()),
run_channels,
participants: Arc::new(RwLock::new(HashMap::new())),
participant_from_channel: Arc::new(RwLock::new(HashMap::new())),
channel_ids: Arc::new(AtomicU64::new(0)),
channel_listener: RwLock::new(HashMap::new()),
unknown_channels: Arc::new(RwLock::new(HashMap::new())),
},
listen_sender,
connect_sender,
connected_receiver,
shutdown_sender,
)
}
pub async fn run(mut self) {
let (part_out_sender, part_out_receiver) = mpsc::unbounded::<(Cid, Frame)>();
let (configured_sender, configured_receiver) = mpsc::unbounded::<(Cid, Pid, Sid)>();
let (disconnect_sender, disconnect_receiver) = mpsc::unbounded::<Pid>();
let run_channels = self.run_channels.take().unwrap();
futures::join!(
self.listen_manager(
run_channels.listen_receiver,
part_out_sender.clone(),
configured_sender.clone(),
),
self.connect_manager(
run_channels.connect_receiver,
part_out_sender,
configured_sender,
),
self.disconnect_manager(disconnect_receiver,),
self.send_outgoing(run_channels.prios),
self.shutdown_manager(run_channels.shutdown_receiver),
self.handle_frames(part_out_receiver),
self.channel_configurer(
run_channels.connected_sender,
configured_receiver,
disconnect_sender,
run_channels.prios_sender.clone(),
),
);
}
async fn listen_manager(
&self,
mut listen_receiver: mpsc::UnboundedReceiver<Address>,
part_out_sender: mpsc::UnboundedSender<(Cid, Frame)>,
configured_sender: mpsc::UnboundedSender<(Cid, Pid, Sid)>,
) {
trace!("start listen_manager");
while let Some(address) = listen_receiver.next().await {
debug!(?address, "got request to open a channel_creator");
let (end_sender, end_receiver) = oneshot::channel::<()>();
self.channel_listener
.write()
.await
.insert(address.clone(), end_sender);
self.pool.spawn_ok(Self::channel_creator(
self.channel_ids.clone(),
self.local_pid,
address.clone(),
end_receiver,
self.pool.clone(),
part_out_sender.clone(),
configured_sender.clone(),
self.unknown_channels.clone(),
));
}
trace!("stop listen_manager");
}
async fn connect_manager(
&self,
mut connect_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender<Participant>)>,
part_out_sender: mpsc::UnboundedSender<(Cid, Frame)>,
configured_sender: mpsc::UnboundedSender<(Cid, Pid, Sid)>,
) {
trace!("start connect_manager");
while let Some((addr, pid_sender)) = connect_receiver.next().await {
match addr {
Address::Tcp(addr) => {
let stream = async_std::net::TcpStream::connect(addr).await.unwrap();
info!("Connectiong TCP to: {}", stream.peer_addr().unwrap());
let (part_in_sender, part_in_receiver) = mpsc::unbounded::<Frame>();
//channels are unknown till PID is known!
let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed);
self.unknown_channels
.write()
.await
.insert(cid, (part_in_sender, Some(pid_sender)));
self.pool.spawn_ok(
Channel::new(cid, self.local_pid)
.run(
stream,
part_in_receiver,
part_out_sender.clone(),
configured_sender.clone(),
)
.instrument(tracing::info_span!("channel", ?addr)),
);
},
_ => unimplemented!(),
}
}
trace!("stop connect_manager");
}
async fn disconnect_manager(&self, mut disconnect_receiver: mpsc::UnboundedReceiver<Pid>) {
trace!("start disconnect_manager");
while let Some(pid) = disconnect_receiver.next().await {
error!(?pid, "I need to disconnect the pid");
}
trace!("stop disconnect_manager");
}
async fn send_outgoing(&self, mut prios: PrioManager) {
//This time equals the MINIMUM Latency in average, so keep it down and //Todo:
// make it configureable or switch to await E.g. Prio 0 = await, prio 50
// wait for more messages
const TICK_TIME: std::time::Duration = std::time::Duration::from_millis(10);
trace!("start send_outgoing");
while !self.closed.load(Ordering::Relaxed) {
let mut frames = VecDeque::new();
prios.fill_frames(3, &mut frames);
for (pid, sid, frame) in frames {
if let Some((_, _, sender)) = self.participants.write().await.get_mut(&pid) {
sender.send((pid, sid, frame)).await.unwrap();
}
}
async_std::task::sleep(TICK_TIME).await;
}
trace!("stop send_outgoing");
}
async fn handle_frames(&self, mut part_out_receiver: mpsc::UnboundedReceiver<(Cid, Frame)>) {
trace!("start handle_frames");
while let Some((cid, frame)) = part_out_receiver.next().await {
trace!("handling frame");
if let Some(pid) = self.participant_from_channel.read().await.get(&cid) {
if let Some((_, sender, _)) = self.participants.write().await.get_mut(&pid) {
sender.send(frame).await.unwrap();
}
} else {
error!("dropping frame, unreachable, got a frame from a non existing channel");
}
}
trace!("stop handle_frames");
}
//
async fn channel_configurer(
&self,
mut connected_sender: mpsc::UnboundedSender<Participant>,
mut receiver: mpsc::UnboundedReceiver<(Cid, Pid, Sid)>,
disconnect_sender: mpsc::UnboundedSender<Pid>,
prios_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
) {
trace!("start channel_activator");
while let Some((cid, pid, offset_sid)) = receiver.next().await {
if let Some((frame_sender, pid_oneshot)) =
self.unknown_channels.write().await.remove(&cid)
{
trace!(
?cid,
?pid,
"detected that my channel is ready!, activating it :)"
);
let mut participants = self.participants.write().await;
if !participants.contains_key(&pid) {
debug!(?cid, "new participant connected via a channel");
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let (
bparticipant,
stream_open_sender,
stream_opened_receiver,
mut transfer_channel_receiver,
frame_recv_sender,
frame_send_sender,
) = BParticipant::new(pid, offset_sid, prios_sender.clone());
let participant = Participant::new(
self.local_pid,
pid,
stream_open_sender,
stream_opened_receiver,
shutdown_receiver,
disconnect_sender.clone(),
);
if let Some(pid_oneshot) = pid_oneshot {
// someone is waiting with connect, so give them their PID
pid_oneshot.send(participant).unwrap();
} else {
// noone is waiting on this Participant, return in to Network
connected_sender.send(participant).await.unwrap();
}
transfer_channel_receiver
.send((cid, frame_sender))
.await
.unwrap();
participants.insert(
pid,
(
transfer_channel_receiver,
frame_recv_sender,
frame_send_sender,
),
);
self.participant_from_channel.write().await.insert(cid, pid);
self.pool.spawn_ok(
bparticipant
.run()
.instrument(tracing::info_span!("participant", ?pid)),
);
} else {
error!(
"2ND channel of participants opens, but we cannot verify that this is not \
a attack to "
)
}
}
}
trace!("stop channel_activator");
}
pub async fn shutdown_manager(&self, receiver: oneshot::Receiver<()>) {
trace!("start shutdown_manager");
receiver.await.unwrap();
self.closed.store(true, Ordering::Relaxed);
trace!("stop shutdown_manager");
}
pub async fn channel_creator(
channel_ids: Arc<AtomicU64>,
local_pid: Pid,
addr: Address,
end_receiver: oneshot::Receiver<()>,
pool: Arc<ThreadPool>,
part_out_sender: mpsc::UnboundedSender<(Cid, Frame)>,
configured_sender: mpsc::UnboundedSender<(Cid, Pid, Sid)>,
unknown_channels: Arc<
RwLock<
HashMap<
Cid,
(
mpsc::UnboundedSender<Frame>,
Option<oneshot::Sender<Participant>>,
),
>,
>,
>,
) {
info!(?addr, "start up channel creator");
match addr {
Address::Tcp(addr) => {
let listener = async_std::net::TcpListener::bind(addr).await.unwrap();
let mut incoming = listener.incoming();
let mut end_receiver = end_receiver.fuse();
while let Some(stream) = select! {
next = incoming.next().fuse() => next,
_ = end_receiver => None,
} {
let stream = stream.unwrap();
info!("Accepting TCP from: {}", stream.peer_addr().unwrap());
let (mut part_in_sender, part_in_receiver) = mpsc::unbounded::<Frame>();
//channels are unknown till PID is known!
/* When A connects to a NETWORK, we, the listener answers with a Handshake.
Pro: - Its easier to debug, as someone who opens a port gets a magic number back!
Contra: - DOS posibility because we answer fist
- Speed, because otherwise the message can be send with the creation
*/
let cid = channel_ids.fetch_add(1, Ordering::Relaxed);
let channel = Channel::new(cid, local_pid);
channel.send_handshake(&mut part_in_sender).await;
pool.spawn_ok(
channel
.run(
stream,
part_in_receiver,
part_out_sender.clone(),
configured_sender.clone(),
)
.instrument(tracing::info_span!("channel", ?addr)),
);
unknown_channels
.write()
.await
.insert(cid, (part_in_sender, None));
}
},
_ => unimplemented!(),
}
info!(?addr, "ending channel creator");
}
}
/*
use crate::{
async_serde,
channel::{Channel, ChannelProtocol, ChannelProtocols},
controller::Controller,
metrics::NetworkMetrics,
prios::PrioManager,
tcp::TcpChannel,
types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects},
};
use std::{
collections::{HashMap, VecDeque},
sync::{
atomic::{AtomicBool, Ordering},
mpsc,
mpsc::TryRecvError,
Arc,
},
time::Instant,
};
use tlid;
use tracing::*;
use crate::types::Protocols;
use crate::frames::{ChannelFrame, ParticipantFrame, StreamFrame, Frame};
/*
The worker lives in a own thread and only communcates with the outside via a Channel
Prios are done per participant, but their throughput is split equalli,
That allows indepentend calculation of prios (no global hotspot) while no Participant is starved as the total throughput is measured and aproximated :)
streams are per participant, and channels are per participants, streams dont have a specific channel!
*/
use async_std::sync::RwLock;
use async_std::io::prelude::*;
use crate::async_serde::{SerializeFuture, DeserializeFuture};
use uvth::ThreadPoolBuilder;
use async_std::stream::Stream;
use async_std::sync::{self, Sender, Receiver};
use crate::types::{VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION,};
use crate::message::InCommingMessage;
use futures::channel::mpsc;
use futures::sink::SinkExt;
use futures::{select, FutureExt};
#[derive(Debug)]
struct BStream {
sid: Sid,
prio: u8,
promises: u8,
}
struct BChannel {
remote_pid: Option<Pid>,
stream: RwLock<async_std::net::TcpStream>,
send_stream: Sender<Frame>,
recv_stream: Receiver<Frame>,
send_participant: Sender<Frame>,
recv_participant: Receiver<Frame>,
send_handshake: bool,
send_pid: bool,
send_shutdown: bool,
recv_handshake: bool,
recv_pid: bool,
recv_shutdown: bool,
}
struct BAcceptor {
listener: RwLock<async_std::net::TcpListener>,
}
struct BParticipant {
remote_pid: Pid,
channels: HashMap<Protocols, Vec<BChannel>>,
streams: Vec<BStream>,
sid_pool: tlid::Pool<tlid::Wrapping<Sid>>,
prios: RwLock<PrioManager>,
closed: AtomicBool,
}
pub(crate) struct Scheduler {
local_pid: Pid,
metrics: Arc<Option<NetworkMetrics>>,
participants: HashMap<Pid, BParticipant>,
pending_channels: HashMap<Protocols, Vec<BChannel>>,
/* ctrl_rx: Receiver<CtrlMsg>,
* rtrn_tx: mpsc::Sender<RtrnMsg>, */
}
impl BStream {
}
impl BChannel {
/*
/// Execute when ready to read
pub async fn recv(&self) -> Vec<Frame> {
let mut buffer: [u8; 2000] = [0; 2000];
let read = self.stream.write().await.read(&mut buffer).await;
match read {
Ok(n) => {
let x = DeserializeFuture::new(buffer[0..n].to_vec(), &ThreadPoolBuilder::new().build()).await;
return vec!(x);
},
Err(e) => {
panic!("woops {}", e);
}
}
}
/// Execute when ready to write
pub async fn send<I: std::iter::Iterator<Item = Frame>>(&self, frames: &mut I) {
for frame in frames {
let x = SerializeFuture::new(frame, &ThreadPoolBuilder::new().build()).await;
self.stream.write().await.write_all(&x).await;
}
}
*/
pub fn get_tx(&self) -> &Sender<Frame> {
&self.send_stream
}
pub fn get_rx(&self) -> &Receiver<Frame> {
&self.recv_stream
}
pub fn get_participant_tx(&self) -> &Sender<Frame> {
&self.send_participant
}
pub fn get_participant_rx(&self) -> &Receiver<Frame> {
&self.recv_participant
}
}
impl BParticipant {
pub async fn read(&self) {
while self.closed.load(Ordering::Relaxed) {
for channels in self.channels.values() {
for channel in channels.iter() {
//let frames = channel.recv().await;
let frame = channel.get_rx().recv().await.unwrap();
match frame {
Frame::Channel(cf) => channel.handle(cf).await,
Frame::Participant(pf) => self.handle(pf).await,
Frame::Stream(sf) => {},
}
}
}
async_std::task::sleep(std::time::Duration::from_millis(100)).await;
}
}
pub async fn write(&self) {
let mut frames = VecDeque::<(u8, StreamFrame)>::new();
while self.closed.load(Ordering::Relaxed) {
let todo_synced_amount_and_reasonable_choosen_throughput_based_on_feedback = 100;
self.prios.write().await.fill_frames(
todo_synced_amount_and_reasonable_choosen_throughput_based_on_feedback,
&mut frames,
);
for (promises, frame) in frames.drain(..) {
let channel = self.chose_channel(promises);
channel.get_tx().send(Frame::Stream(frame)).await;
}
}
}
pub async fn handle(&self, frame: ParticipantFrame) {
info!("got a frame to handle");
/*
match frame {
ParticipantFrame::OpenStream {
sid,
prio,
promises,
} => {
if let Some(pid) = self.remote_pid {
let (msg_tx, msg_rx) = futures::channel::mpsc::unbounded::<InCommingMessage>();
let stream = IntStream::new(sid, prio, promises.clone(), msg_tx);
trace!(?self.streams, "-OPEN STREAM- going to modify streams");
self.streams.push(stream);
trace!(?self.streams, "-OPEN STREAM- did to modify streams");
info!("opened a stream");
if let Err(err) = rtrn_tx.send(RtrnMsg::OpendStream {
pid,
sid,
prio,
msg_rx,
promises,
}) {
error!(?err, "couldn't notify of opened stream");
}
} else {
error!("called OpenStream before PartcipantID!");
}
},
ParticipantFrame::CloseStream { sid } => {
if let Some(pid) = self.remote_pid {
trace!(?self.streams, "-CLOSE STREAM- going to modify streams");
self.streams.retain(|stream| stream.sid() != sid);
trace!(?self.streams, "-CLOSE STREAM- did to modify streams");
info!("closed a stream");
if let Err(err) = rtrn_tx.send(RtrnMsg::ClosedStream { pid, sid }) {
error!(?err, "couldn't notify of closed stream");
}
}
},
}*/
}
/// Endless task that will cover sending for Participant
pub async fn run(&mut self) {
let (incomming_sender, incomming_receiver) = mpsc::unbounded();
futures::join!(self.read(), self.write());
}
pub fn chose_channel(&self,
promises: u8, /* */
) -> &BChannel {
for v in self.channels.values() {
for c in v {
return c;
}
}
panic!("No Channel!");
}
}
impl Scheduler {
pub fn new(
pid: Pid,
metrics: Arc<Option<NetworkMetrics>>,
sid_backup_per_participant: Arc<RwLock<HashMap<Pid, tlid::Pool<tlid::Checked<Sid>>>>>,
token_pool: tlid::Pool<tlid::Wrapping<usize>>,
) -> Self {
panic!("asd");
}
pub fn run(&mut self) { loop {} }
}
*/

View File

@ -1,145 +1 @@
use crate::{
channel::ChannelProtocol,
types::{Frame, NetworkBuffer},
};
use bincode;
use mio::net::TcpStream;
use std::io::{Read, Write};
use tracing::*;
pub(crate) struct TcpChannel {
endpoint: TcpStream,
read_buffer: NetworkBuffer,
write_buffer: NetworkBuffer,
}
impl TcpChannel {
pub fn new(endpoint: TcpStream) -> Self {
Self {
endpoint,
read_buffer: NetworkBuffer::new(),
write_buffer: NetworkBuffer::new(),
}
}
}
impl ChannelProtocol for TcpChannel {
type Handle = TcpStream;
/// Execute when ready to read
fn read(&mut self) -> Vec<Frame> {
let mut result = Vec::new();
loop {
match self.endpoint.read(self.read_buffer.get_write_slice(2048)) {
Ok(0) => {
//Shutdown
trace!(?self, "shutdown of tcp channel detected");
result.push(Frame::Shutdown);
break;
},
Ok(n) => {
self.read_buffer.actually_written(n);
trace!("incomming message with len: {}", n);
let slice = self.read_buffer.get_read_slice();
let mut cur = std::io::Cursor::new(slice);
let mut read_ok = 0;
while cur.position() < n as u64 {
let round_start = cur.position() as usize;
let r: Result<Frame, _> = bincode::deserialize_from(&mut cur);
match r {
Ok(frame) => {
result.push(frame);
read_ok = cur.position() as usize;
},
Err(e) => {
// Probably we have to wait for moare data!
let first_bytes_of_msg =
&slice[round_start..std::cmp::min(n, round_start + 16)];
debug!(
?self,
?e,
?n,
?round_start,
?first_bytes_of_msg,
"message cant be parsed, probably because we need to wait for \
more data"
);
break;
},
}
}
self.read_buffer.actually_read(read_ok);
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("would block");
break;
},
Err(e) => panic!("{}", e),
};
}
result
}
/// Execute when ready to write
fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I) {
loop {
//serialize when len < MTU 1500, then write
if self.write_buffer.get_read_slice().len() < 1500 {
match frames.next() {
Some(frame) => {
if let Ok(size) = bincode::serialized_size(&frame) {
let slice = self.write_buffer.get_write_slice(size as usize);
if let Err(err) = bincode::serialize_into(slice, &frame) {
error!(
?err,
"serialising frame was unsuccessful, this should never \
happen! dropping frame!"
)
}
self.write_buffer.actually_written(size as usize); //I have to rely on those informations to be consistent!
} else {
error!(
"getting size of frame was unsuccessful, this should never \
happen! dropping frame!"
)
};
},
None => break,
}
}
match self.endpoint.write(self.write_buffer.get_read_slice()) {
Ok(n) => {
self.write_buffer.actually_read(n);
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("can't send tcp yet, would block");
return;
},
Err(e) => panic!("{}", e),
}
}
}
fn get_handle(&self) -> &Self::Handle { &self.endpoint }
}
impl std::fmt::Debug for TcpChannel {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.endpoint)
}
}
impl std::fmt::Debug for NetworkBuffer {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"NetworkBuffer(len: {}, read: {}, write: {})",
self.data.len(),
self.read_idx,
self.write_idx
)
}
}

View File

@ -1,156 +1,24 @@
use crate::{
api::Promise,
channel::Channel,
message::{InCommingMessage, OutGoingMessage},
};
use enumset::EnumSet;
use futures;
use mio::{self, net::TcpListener, PollOpt, Ready};
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use tracing::*;
use uuid::Uuid;
//Participant Ids are randomly chosen
pub type Pid = Uuid;
//Stream Ids are unique per Participant* and are split in 2 ranges, one for
// every Network involved Every Channel gets a subrange during their handshake
// protocol from one of the 2 ranges
//*otherwise extra synchronization would be needed
pub type Sid = u64;
//Message Ids are unique per Stream* and are split in 2 ranges, one for every
// Channel involved
//*otherwise extra synchronization would be needed
pub type Mid = u64;
pub type Cid = u64;
pub type Prio = u8;
pub type Promises = u8;
pub const PROMISES_NONE: Promises = 0;
pub const PROMISES_ORDERED: Promises = 1;
pub const PROMISES_CONSISTENCY: Promises = 2;
pub const PROMISES_GUARANTEED_DELIVERY: Promises = 4;
pub const PROMISES_COMPRESSED: Promises = 8;
pub const PROMISES_ENCRYPTED: Promises = 16;
pub(crate) const VELOREN_MAGIC_NUMBER: &str = "VELOREN";
pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 2, 0];
pub const DEFAULT_SID_SIZE: u64 = 1 << 48;
// Used for Communication between Controller <--> Worker
pub(crate) enum CtrlMsg {
Shutdown,
Register(TokenObjects, Ready, PollOpt),
OpenStream {
pid: Pid,
sid: Sid,
prio: u8,
promises: EnumSet<Promise>,
msg_tx: futures::channel::mpsc::UnboundedSender<InCommingMessage>,
},
CloseStream {
pid: Pid,
sid: Sid,
},
Send(OutGoingMessage),
}
pub(crate) enum RtrnMsg {
Shutdown,
ConnectedParticipant {
pid: Pid,
controller_sids: tlid::Pool<tlid::Wrapping<Sid>>,
},
OpendStream {
pid: Pid,
sid: Sid,
prio: u8,
msg_rx: futures::channel::mpsc::UnboundedReceiver<InCommingMessage>,
promises: EnumSet<Promise>,
},
ClosedStream {
pid: Pid,
sid: Sid,
},
}
#[derive(Debug)]
pub(crate) enum TokenObjects {
TcpListener(TcpListener),
Channel(Channel),
}
#[derive(Debug)]
pub(crate) struct IntStream {
sid: Sid,
prio: u8,
promises: EnumSet<Promise>,
pub mid_pool: tlid::Pool<tlid::Wrapping<Mid>>,
msg_tx: futures::channel::mpsc::UnboundedSender<InCommingMessage>,
pub to_send: VecDeque<OutGoingMessage>,
pub to_receive: VecDeque<InCommingMessage>,
}
impl IntStream {
pub fn new(
sid: Sid,
prio: u8,
promises: EnumSet<Promise>,
msg_tx: futures::channel::mpsc::UnboundedSender<InCommingMessage>,
) -> Self {
IntStream {
sid,
prio,
promises,
mid_pool: tlid::Pool::new_full(),
msg_tx,
to_send: VecDeque::new(),
to_receive: VecDeque::new(),
}
}
pub fn sid(&self) -> Sid { self.sid }
pub fn prio(&self) -> u8 { self.prio }
pub fn msg_tx(&self) -> futures::channel::mpsc::UnboundedSender<InCommingMessage> {
self.msg_tx.clone()
}
pub fn promises(&self) -> EnumSet<Promise> { self.promises }
}
// Used for Communication between Channel <----(TCP/UDP)----> Channel
#[derive(Serialize, Deserialize, Debug)]
pub(crate) enum Frame {
Handshake {
magic_number: String,
version: [u32; 3],
},
Configure {
//only one Participant will send this package and give the other a range to use
sender_controller_sids: tlid::RemoveAllocation<Sid>,
sender_worker_sids: tlid::RemoveAllocation<Sid>,
receiver_controller_sids: tlid::Pool<tlid::Wrapping<Sid>>,
receiver_worker_sids: tlid::Pool<tlid::Wrapping<Sid>>,
},
ParticipantId {
pid: Pid,
},
Shutdown, /* Shutsdown this channel gracefully, if all channels are shut down, Participant
* is deleted */
OpenStream {
sid: Sid,
prio: u8,
promises: EnumSet<Promise>,
},
CloseStream {
sid: Sid,
},
DataHeader {
mid: Mid,
sid: Sid,
length: u64,
},
Data {
id: Mid,
start: u64,
data: Vec<u8>,
},
/* WARNING: Sending RAW is only used for debug purposes in case someone write a new API
* against veloren Server! */
Raw(Vec<u8>),
}
pub(crate) const STREAM_ID_OFFSET1: Sid = 0;
pub(crate) const STREAM_ID_OFFSET2: Sid = u64::MAX / 2;
pub(crate) struct NetworkBuffer {
pub(crate) data: Vec<u8>,
@ -158,6 +26,29 @@ pub(crate) struct NetworkBuffer {
pub(crate) write_idx: usize,
}
#[derive(PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
pub struct Pid {
internal: u128,
}
impl Pid {
pub fn new() -> Self {
Self {
internal: rand::thread_rng().gen(),
}
}
/// don't use fake! just for testing!
/// This will panic if pid i greater than 7, as i do not want you to use
/// this in production!
pub fn fake(pid: u8) -> Self {
assert!(pid < 8);
Self {
internal: pid as u128,
}
}
}
/// NetworkBuffer to use for streamed access
/// valid data is between read_idx and write_idx!
/// everything before read_idx is already processed and no longer important
@ -224,15 +115,22 @@ impl NetworkBuffer {
}
}
fn chose_protocol(
available_protocols: u8, /* 1 = TCP, 2= UDP, 4 = MPSC */
promises: u8, /* */
) -> u8 /*1,2 or 4*/ {
if available_protocols & (1 << 3) != 0 {
4
} else if available_protocols & (1 << 1) != 0 {
1
} else {
2
impl std::fmt::Debug for NetworkBuffer {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"NetworkBuffer(len: {}, read: {}, write: {})",
self.data.len(),
self.read_idx,
self.write_idx
)
}
}
impl std::fmt::Debug for Pid {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.internal)
}
}

View File

@ -1,131 +1 @@
use crate::{
channel::ChannelProtocol,
types::{Frame, NetworkBuffer},
};
use bincode;
use mio::net::UdpSocket;
use tracing::*;
pub(crate) struct UdpChannel {
endpoint: UdpSocket,
read_buffer: NetworkBuffer,
write_buffer: NetworkBuffer,
}
impl UdpChannel {
pub fn _new(endpoint: UdpSocket) -> Self {
Self {
endpoint,
read_buffer: NetworkBuffer::new(),
write_buffer: NetworkBuffer::new(),
}
}
}
impl ChannelProtocol for UdpChannel {
type Handle = UdpSocket;
/// Execute when ready to read
fn read(&mut self) -> Vec<Frame> {
let mut result = Vec::new();
loop {
match self.endpoint.recv(self.read_buffer.get_write_slice(2048)) {
Ok(0) => {
//Shutdown
trace!(?self, "shutdown of tcp channel detected");
result.push(Frame::Shutdown);
break;
},
Ok(n) => {
self.read_buffer.actually_written(n);
trace!("incomming message with len: {}", n);
let slice = self.read_buffer.get_read_slice();
let mut cur = std::io::Cursor::new(slice);
let mut read_ok = 0;
while cur.position() < n as u64 {
let round_start = cur.position() as usize;
let r: Result<Frame, _> = bincode::deserialize_from(&mut cur);
match r {
Ok(frame) => {
result.push(frame);
read_ok = cur.position() as usize;
},
Err(e) => {
// Probably we have to wait for moare data!
let first_bytes_of_msg =
&slice[round_start..std::cmp::min(n, round_start + 16)];
debug!(
?self,
?e,
?n,
?round_start,
?first_bytes_of_msg,
"message cant be parsed, probably because we need to wait for \
more data"
);
break;
},
}
}
self.read_buffer.actually_read(read_ok);
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("would block");
break;
},
Err(e) => panic!("{}", e),
};
}
result
}
/// Execute when ready to write
fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I) {
loop {
//serialize when len < MTU 1500, then write
if self.write_buffer.get_read_slice().len() < 1500 {
match frames.next() {
Some(frame) => {
if let Ok(size) = bincode::serialized_size(&frame) {
let slice = self.write_buffer.get_write_slice(size as usize);
if let Err(err) = bincode::serialize_into(slice, &frame) {
error!(
?err,
"serialising frame was unsuccessful, this should never \
happen! dropping frame!"
)
}
self.write_buffer.actually_written(size as usize); //I have to rely on those informations to be consistent!
} else {
error!(
"getting size of frame was unsuccessful, this should never \
happen! dropping frame!"
)
};
},
None => break,
}
}
match self.endpoint.send(self.write_buffer.get_read_slice()) {
Ok(n) => {
self.write_buffer.actually_read(n);
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("can't send tcp yet, would block");
return;
},
Err(e) => panic!("{}", e),
}
}
}
fn get_handle(&self) -> &Self::Handle { &self.endpoint }
}
impl std::fmt::Debug for UdpChannel {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.endpoint)
}
}

View File

@ -1,301 +0,0 @@
use crate::{
channel::{Channel, ChannelProtocol, ChannelProtocols},
controller::Controller,
metrics::NetworkMetrics,
tcp::TcpChannel,
types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects},
};
use mio::{self, Poll, PollOpt, Ready, Token};
use mio_extras::channel::Receiver;
use std::{
collections::HashMap,
sync::{mpsc, mpsc::TryRecvError, Arc, RwLock},
time::Instant,
};
use tlid;
use tracing::*;
/*
The worker lives in a own thread and only communcates with the outside via a Channel
*/
pub(crate) struct MioTokens {
pool: tlid::Pool<tlid::Wrapping<usize>>,
pub tokens: HashMap<Token, TokenObjects>, //TODO: move to Vec<Options> for faster lookup
}
impl MioTokens {
pub fn new(pool: tlid::Pool<tlid::Wrapping<usize>>) -> Self {
MioTokens {
pool,
tokens: HashMap::new(),
}
}
pub fn construct(&mut self) -> Token { Token(self.pool.next()) }
pub fn insert(&mut self, tok: Token, obj: TokenObjects) {
trace!(?tok, ?obj, "added new token");
self.tokens.insert(tok, obj);
}
}
pub(crate) struct Worker {
pid: Pid,
poll: Arc<Poll>,
metrics: Arc<Option<NetworkMetrics>>,
sid_backup_per_participant: Arc<RwLock<HashMap<Pid, tlid::Pool<tlid::Checked<Sid>>>>>,
participants: HashMap<Pid, tlid::Pool<tlid::Wrapping<Sid>>>,
ctrl_rx: Receiver<CtrlMsg>,
rtrn_tx: mpsc::Sender<RtrnMsg>,
mio_tokens: MioTokens,
time_before_poll: Instant,
time_after_poll: Instant,
}
impl Worker {
pub fn new(
pid: Pid,
poll: Arc<Poll>,
metrics: Arc<Option<NetworkMetrics>>,
sid_backup_per_participant: Arc<RwLock<HashMap<Pid, tlid::Pool<tlid::Checked<Sid>>>>>,
token_pool: tlid::Pool<tlid::Wrapping<usize>>,
ctrl_rx: Receiver<CtrlMsg>,
rtrn_tx: mpsc::Sender<RtrnMsg>,
) -> Self {
let mio_tokens = MioTokens::new(token_pool);
Worker {
pid,
poll,
metrics,
sid_backup_per_participant,
participants: HashMap::new(),
ctrl_rx,
rtrn_tx,
mio_tokens,
time_before_poll: Instant::now(),
time_after_poll: Instant::now(),
}
}
pub fn run(&mut self) {
let mut events = mio::Events::with_capacity(1024);
loop {
self.time_before_poll = Instant::now();
if let Err(err) = self.poll.poll(&mut events, None) {
error!("network poll error: {}", err);
return;
}
self.time_after_poll = Instant::now();
for event in &events {
trace!(?event, "event");
match event.token() {
Controller::CTRL_TOK => {
if self.handle_ctl() {
return;
}
},
_ => self.handle_tok(&event),
};
}
self.handle_statistics();
}
}
fn handle_ctl(&mut self) -> bool {
info!("start in handle_ctl");
loop {
info!("recv in handle_ctl");
let msg = match self.ctrl_rx.try_recv() {
Ok(msg) => msg,
Err(TryRecvError::Empty) => {
return false;
},
Err(err) => {
panic!("Unexpected error '{}'", err);
},
};
info!("Loop in handle_ctl");
match msg {
CtrlMsg::Shutdown => {
debug!("Shutting Down");
for (_, obj) in self.mio_tokens.tokens.iter_mut() {
if let TokenObjects::Channel(channel) = obj {
channel.shutdown();
channel.tick_send();
}
}
return true;
},
CtrlMsg::Register(handle, interest, opts) => {
let tok = self.mio_tokens.construct();
match &handle {
TokenObjects::TcpListener(h) => {
self.poll.register(h, tok, interest, opts).unwrap()
},
TokenObjects::Channel(channel) => {
match channel.get_protocol() {
ChannelProtocols::Tcp(c) => {
self.poll.register(c.get_handle(), tok, interest, opts)
},
ChannelProtocols::Udp(c) => {
self.poll.register(c.get_handle(), tok, interest, opts)
},
ChannelProtocols::Mpsc(c) => {
self.poll.register(c.get_handle(), tok, interest, opts)
},
}
.unwrap();
},
}
debug!(?handle, ?tok, "Registered new handle");
self.mio_tokens.insert(tok, handle);
},
CtrlMsg::OpenStream {
pid,
sid,
prio,
promises,
msg_tx,
} => {
let mut handled = false;
for (_, obj) in self.mio_tokens.tokens.iter_mut() {
if let TokenObjects::Channel(channel) = obj {
if Some(pid) == channel.remote_pid {
info!(?channel.streams, "-CTR- going to open stream");
channel.open_stream(sid, prio, promises, msg_tx);
info!(?channel.streams, "-CTR- going to tick");
channel.tick_send();
info!(?channel.streams, "-CTR- did to open stream");
handled = true;
break;
}
}
}
if !handled {
error!(?pid, "couldn't open Stream, didn't found pid");
}
},
CtrlMsg::CloseStream { pid, sid } => {
let mut handled = false;
for to in self.mio_tokens.tokens.values_mut() {
if let TokenObjects::Channel(channel) = to {
if Some(pid) == channel.remote_pid {
info!(?channel.streams, "-CTR- going to close stream");
channel.close_stream(sid); //TODO: check participant
info!(?channel.streams, "-CTR- going to tick");
channel.tick_send();
info!(?channel.streams, "-CTR- did to close stream");
handled = true;
break;
}
}
}
if !handled {
error!(?pid, "couldn't close Stream, didn't found pid");
}
},
CtrlMsg::Send(outgoing) => {
let mut handled = false;
for to in self.mio_tokens.tokens.values_mut() {
if let TokenObjects::Channel(channel) = to {
info!(?channel.streams, "-CTR- going to send msg");
channel.send(outgoing); //TODO: check participant
info!(?channel.streams, "-CTR- going to tick");
channel.tick_send();
info!(?channel.streams, "-CTR- did to send msg");
handled = true;
break;
}
}
if !handled {
error!(
"help, we should check here for stream data, but its in channel ...."
);
}
},
};
}
}
fn handle_tok(&mut self, event: &mio::Event) {
let obj = match self.mio_tokens.tokens.get_mut(&event.token()) {
Some(obj) => obj,
None => panic!("Unexpected event token '{:?}'", &event.token()),
};
match obj {
TokenObjects::TcpListener(listener) => match listener.accept() {
Ok((remote_stream, _)) => {
info!(?remote_stream, "remote connected");
let tok = self.mio_tokens.construct();
self.poll
.register(
&remote_stream,
tok,
Ready::readable() | Ready::writable(),
PollOpt::edge(),
)
.unwrap();
trace!(?remote_stream, ?tok, "registered");
let tcp_channel = TcpChannel::new(remote_stream);
let mut channel = Channel::new(
self.pid,
ChannelProtocols::Tcp(tcp_channel),
self.sid_backup_per_participant.clone(),
None,
);
channel.handshake();
channel.tick_send();
self.mio_tokens
.tokens
.insert(tok, TokenObjects::Channel(channel));
},
Err(err) => {
error!(?err, "error during remote connected");
},
},
TokenObjects::Channel(channel) => {
if event.readiness().is_readable() {
let protocol = channel.get_protocol();
trace!(?protocol, "channel readable");
channel.tick_recv(&mut self.participants, &self.rtrn_tx);
} else {
trace!("channel not readable");
}
if event.readiness().is_writable() {
let protocol = channel.get_protocol();
trace!(?protocol, "channel writeable");
channel.tick_send();
} else {
trace!("channel not writeable");
let protocol = channel.get_protocol();
if let ChannelProtocols::Mpsc(_) = &protocol {
channel.tick_send(); //workaround for MPSC!!! ONLY for MPSC
}
}
},
};
}
fn handle_statistics(&mut self) {
let time_after_work = Instant::now();
let idle = self.time_after_poll.duration_since(self.time_before_poll);
let work = time_after_work.duration_since(self.time_after_poll);
if let Some(metric) = &*self.metrics {
metric
.worker_idle_time
.with_label_values(&["message"])
.add(idle.as_millis() as i64); //todo convert correctly !
metric
.worker_work_time
.with_label_values(&["message"])
.add(work.as_millis() as i64);
}
}
}

View File

@ -1,19 +1,14 @@
use lazy_static::*;
use std::{sync::Arc, thread, time::Duration};
use std::{
net::SocketAddr,
sync::atomic::{AtomicU16, Ordering},
thread,
time::Duration,
};
use tracing::*;
use tracing_subscriber::EnvFilter;
use uvth::{ThreadPool, ThreadPoolBuilder};
pub fn setup(tracing: bool, mut sleep: u64) -> (Arc<ThreadPool>, u64) {
lazy_static! {
static ref THREAD_POOL: Arc<ThreadPool> = Arc::new(
ThreadPoolBuilder::new()
.name("veloren-network-test".into())
.num_threads(2)
.build(),
);
}
pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) {
if tracing {
sleep += 1000
}
@ -49,5 +44,13 @@ pub fn setup(tracing: bool, mut sleep: u64) -> (Arc<ThreadPool>, u64) {
None
};
(THREAD_POOL.clone(), 0)
(0, 0)
}
pub fn tcp() -> veloren_network::Address {
lazy_static! {
static ref PORTS: AtomicU16 = AtomicU16::new(5000);
}
let port = PORTS.fetch_add(1, Ordering::Relaxed);
veloren_network::Address::Tcp(SocketAddr::from(([127, 0, 0, 1], port)))
}

View File

@ -1,110 +1,77 @@
use futures::executor::block_on;
use std::{net::SocketAddr, thread, time::Duration};
use uuid::Uuid;
use veloren_network::{Address, Network, Promise};
use async_std::{sync::RwLock, task};
use futures::{
channel::{mpsc, oneshot},
executor::ThreadPool,
sink::SinkExt,
};
use std::sync::{atomic::AtomicU64, Arc};
use veloren_network::{Network, Pid, Scheduler};
mod helper;
use std::collections::HashMap;
use tracing::*;
use uvth::ThreadPoolBuilder;
/*
#[test]
fn tcp_simple() {
let (thread_pool, _) = helper::setup(true, 100);
let n1 = Network::new(Uuid::new_v4(), thread_pool.clone());
let n2 = Network::new(Uuid::new_v4(), thread_pool.clone());
let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000)));
let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52001)));
n1.listen(&a1).unwrap(); //await
n2.listen(&a2).unwrap(); // only requiered here, but doesnt hurt on n1
thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly!
fn network() {
let (_, _) = helper::setup(true, 100);
{
let addr1 = helper::tcp();
let pool = ThreadPoolBuilder::new().num_threads(2).build();
let n1 = Network::new(Pid::fake(1), &pool);
let n2 = Network::new(Pid::fake(2), &pool);
let p1 = block_on(n1.connect(&a2)).unwrap(); //await
let s1 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap();
n1.listen(addr1.clone()).unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));
assert!(s1.send("Hello World").is_ok());
let pid1 = task::block_on(n2.connect(addr1)).unwrap();
warn!("yay connected");
let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1
let mut s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1
let pid2 = task::block_on(n1.connected()).unwrap();
warn!("yay connected");
let s: Result<String, _> = block_on(s1_n2.recv());
assert_eq!(s, Ok("Hello World".to_string()));
let mut sid1_p1 = task::block_on(pid1.open(10, 0)).unwrap();
let mut sid1_p2 = task::block_on(pid2.opened()).unwrap();
assert!(s1.close().is_ok());
task::block_on(sid1_p1.send("Hello World")).unwrap();
let m1: Result<String, _> = task::block_on(sid1_p2.recv());
assert_eq!(m1, Ok("Hello World".to_string()));
//assert_eq!(pid, Pid::fake(1));
std::thread::sleep(std::time::Duration::from_secs(10));
}
std::thread::sleep(std::time::Duration::from_secs(2));
}
*/
/*
#[test]
fn tcp_5streams() {
let (thread_pool, _) = helper::setup(false, 200);
let n1 = Network::new(Uuid::new_v4(), thread_pool.clone());
let n2 = Network::new(Uuid::new_v4(), thread_pool.clone());
let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52010)));
let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52011)));
n1.listen(&a1).unwrap(); //await
n2.listen(&a2).unwrap(); // only requiered here, but doesnt hurt on n1
thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly!
let p1 = block_on(n1.connect(&a2)).unwrap(); //await
let s1 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap();
let s2 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap();
let s3 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap();
let s4 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap();
let s5 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap();
assert!(s3.send("Hello World3").is_ok());
assert!(s1.send("Hello World1").is_ok());
assert!(s5.send("Hello World5").is_ok());
assert!(s2.send("Hello World2").is_ok());
assert!(s4.send("Hello World4").is_ok());
let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1
let mut s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1
let mut s2_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s2
let mut s3_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s3
let mut s4_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s4
let mut s5_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s5
info!("all streams opened");
let s: Result<String, _> = block_on(s3_n2.recv());
assert_eq!(s, Ok("Hello World3".to_string()));
let s: Result<String, _> = block_on(s1_n2.recv());
assert_eq!(s, Ok("Hello World1".to_string()));
let s: Result<String, _> = block_on(s2_n2.recv());
assert_eq!(s, Ok("Hello World2".to_string()));
let s: Result<String, _> = block_on(s5_n2.recv());
assert_eq!(s, Ok("Hello World5".to_string()));
let s: Result<String, _> = block_on(s4_n2.recv());
assert_eq!(s, Ok("Hello World4".to_string()));
assert!(s1.close().is_ok());
#[ignore]
fn scheduler() {
let (_, _) = helper::setup(true, 100);
let addr = helper::tcp();
let (scheduler, mut listen_tx, _, _, _) = Scheduler::new(Pid::new());
task::block_on(listen_tx.send(addr)).unwrap();
task::block_on(scheduler.run());
}
*/
#[test]
fn mpsc_simple() {
let (thread_pool, _) = helper::setup(true, 2300);
let n1 = Network::new(Uuid::new_v4(), thread_pool.clone());
let n2 = Network::new(Uuid::new_v4(), thread_pool.clone());
let a1 = Address::Mpsc(42);
let a2 = Address::Mpsc(1337);
//n1.listen(&a1).unwrap(); //await //TODO: evaluate if this should be allowed
// or is forbidden behavior...
n2.listen(&a2).unwrap(); // only requiered here, but doesnt hurt on n1
thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly!
let p1 = block_on(n1.connect(&a2)).unwrap(); //await
let s1 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap();
assert!(s1.send("Hello World").is_ok());
thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly!
let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1
let mut s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1
let s: Result<String, _> = block_on(s1_n2.recv());
assert_eq!(s, Ok("Hello World".to_string()));
assert!(s1.close().is_ok());
#[ignore]
fn channel_creator_test() {
let (_, _) = helper::setup(true, 100);
let (_end_sender, end_receiver) = oneshot::channel::<()>();
let (part_out_sender, _part_out_receiver) = mpsc::unbounded();
let (configured_sender, _configured_receiver) = mpsc::unbounded::<(u64, Pid, u64)>();
let addr = helper::tcp();
task::block_on(async {
Scheduler::channel_creator(
Arc::new(AtomicU64::new(0)),
Pid::new(),
addr,
end_receiver,
Arc::new(ThreadPool::new().unwrap()),
part_out_sender,
configured_sender,
Arc::new(RwLock::new(HashMap::new())),
)
.await;
});
}