From bf93011c501b06c764d317c73984c9d2e962438f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Sun, 22 Mar 2020 14:47:21 +0100 Subject: [PATCH] COMPLETE REWRITE - use async_std and implement a async serialisaition - new participant, stream and drop on the participant - sending and receiving on streams --- Cargo.lock | 40 +- network/Cargo.toml | 8 +- network/src/api.rs | 557 +++++++++---------------- network/src/async_serde.rs | 178 ++++++++ network/src/channel.rs | 782 ++++++++++++----------------------- network/src/controller.rs | 178 -------- network/src/frames.rs | 37 ++ network/src/lib.rs | 15 +- network/src/message.rs | 2 +- network/src/metrics.rs | 143 ------- network/src/mpsc.rs | 83 ---- network/src/participant.rs | 294 +++++++++++++ network/src/prios.rs | 477 ++++++++++----------- network/src/scheduler.rs | 649 +++++++++++++++++++++++++++++ network/src/tcp.rs | 144 ------- network/src/types.rs | 208 +++------- network/src/udp.rs | 130 ------ network/src/worker.rs | 301 -------------- network/tests/helper.rs | 29 +- network/tests/integration.rs | 159 +++---- 20 files changed, 2002 insertions(+), 2412 deletions(-) create mode 100644 network/src/async_serde.rs delete mode 100644 network/src/controller.rs create mode 100644 network/src/frames.rs create mode 100644 network/src/participant.rs create mode 100644 network/src/scheduler.rs delete mode 100644 network/src/worker.rs diff --git a/Cargo.lock b/Cargo.lock index 734cd258f8..24f9ea4de0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1269,29 +1269,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "enumset" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93182dcb6530c757e5879b22ebc5cfbd034861585b442819389614e223ac1c47" -dependencies = [ - "enumset_derive", - "num-traits 0.2.11", - "serde", -] - -[[package]] -name = "enumset_derive" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "751a786cfcc7d5ceb9e0fe06f0e911da6ce3a3044633e029df4c370193c86a62" -dependencies = [ - "darling", - "proc-macro2 1.0.17", - "quote 1.0.6", - "syn 1.0.27", -] - [[package]] name = "env_logger" version = "0.6.2" @@ -1520,6 +1497,7 @@ dependencies = [ "futures-core", "futures-task", "futures-util", + "num_cpus", ] [[package]] @@ -4938,6 +4916,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "tracing-futures" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab7bb6f14721aa00656086e9335d363c5c8747bae02ebe32ea2c7dece5689b4c" +dependencies = [ + "pin-project", + "tracing", +] + [[package]] name = "tracing-log" version = "0.1.1" @@ -5362,20 +5350,18 @@ dependencies = [ name = "veloren_network" version = "0.1.0" dependencies = [ + "async-std", "bincode", "byteorder 1.3.4", - "enumset", "futures 0.3.5", "lazy_static", - "mio", - "mio-extras", "prometheus", "rand 0.7.3", "serde", "tlid", "tracing", + "tracing-futures", "tracing-subscriber", - "uuid 0.8.1", "uvth", ] diff --git a/network/Cargo.toml b/network/Cargo.toml index 100bccb97a..6f2e905d5f 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -8,8 +8,6 @@ edition = "2018" [dependencies] -enumset = { version = "0.4", features = ["serde"] } -uuid = { version = "0.8", features = ["serde", "v4"] } tlid = { path = "../../tlid", features = ["serde"]} #threadpool uvth = "3.1" @@ -18,13 +16,13 @@ bincode = "1.2" serde = "1.0" byteorder = "1.3" #sending -mio = "0.6" -mio-extras = "2.0" +async-std = { version = "1.5", features = ["std", "unstable"] } #tracing and metrics tracing = "0.1" +tracing-futures = "0.2" prometheus = "0.7" #async -futures = "0.3" +futures = { version = "0.3", features = ["thread-pool"] } #mpsc channel registry lazy_static = "1.4" rand = "0.7" diff --git a/network/src/api.rs b/network/src/api.rs index fe9f7bb97e..21f92d4db9 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -1,377 +1,251 @@ use crate::{ - channel::{Channel, ChannelProtocols}, - controller::Controller, message::{self, InCommingMessage, OutGoingMessage}, - metrics::NetworkMetrics, - mpsc::MpscChannel, - tcp::TcpChannel, - types::{CtrlMsg, Pid, Sid, TokenObjects}, + scheduler::Scheduler, + types::{Mid, Pid, Prio, Promises, Sid}, }; -use enumset::*; -use futures::stream::StreamExt; -use mio::{ - self, - net::{TcpListener, TcpStream}, - PollOpt, Ready, +use async_std::{sync::RwLock, task}; +use futures::{ + channel::{mpsc, oneshot}, + sink::SinkExt, + stream::StreamExt, }; -use mio_extras; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Serialize}; use std::{ collections::HashMap, - sync::{atomic::AtomicBool, mpsc, Arc, Mutex, RwLock}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, }; -use tlid; use tracing::*; -use uuid::Uuid; +use tracing_futures::Instrument; use uvth::ThreadPool; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Hash, PartialEq, Eq)] pub enum Address { Tcp(std::net::SocketAddr), Udp(std::net::SocketAddr), Mpsc(u64), } -#[derive(Serialize, Deserialize, EnumSetType, Debug)] -#[enumset(serialize_repr = "u8")] -pub enum Promise { - InOrder, - NoCorrupt, - GuaranteedDelivery, - Encrypted, -} - -#[derive(Clone)] +#[derive(Debug)] pub struct Participant { + local_pid: Pid, remote_pid: Pid, - network_controller: Arc>, + stream_open_sender: RwLock)>>, + stream_opened_receiver: RwLock>, + shutdown_receiver: RwLock>, + closed: AtomicBool, + disconnect_sender: Option>, } +#[derive(Debug)] pub struct Stream { + pid: Pid, sid: Sid, - remote_pid: Pid, + mid: Mid, + prio: Prio, + promises: Promises, + msg_send_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, + msg_recv_receiver: mpsc::UnboundedReceiver, + shutdown_receiver: oneshot::Receiver<()>, closed: AtomicBool, - closed_rx: mpsc::Receiver<()>, - msg_rx: futures::channel::mpsc::UnboundedReceiver, - ctr_tx: mio_extras::channel::Sender, + shutdown_sender: Option>, } +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct NetworkError {} + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct ParticipantError {} + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct StreamError {} + pub struct Network { - _token_pool: tlid::Pool>, - _worker_pool: tlid::Pool>, - controller: Arc>, - _thread_pool: Arc, - participant_id: Pid, - sid_backup_per_participant: Arc>>>>, - participants: RwLock>, - _metrics: Arc>, + local_pid: Pid, + participants: RwLock>>, + listen_sender: RwLock>, + connect_sender: RwLock)>>, + connected_receiver: RwLock>, + shutdown_sender: Option>, } impl Network { - pub fn new(participant_id: Uuid, thread_pool: Arc) -> Self { - let mut token_pool = tlid::Pool::new_full(); - let mut worker_pool = tlid::Pool::new_full(); - let sid_backup_per_participant = Arc::new(RwLock::new(HashMap::new())); - for _ in 0..participant_id.as_u128().rem_euclid(64) { - worker_pool.next(); - //random offset from 0 for tests where multiple networks are - // created and we do not want to polute the traces with - // network pid everywhere - } - let metrics = Arc::new(None); - let controller = Arc::new(vec![Controller::new( - worker_pool.next(), - participant_id, - thread_pool.clone(), - token_pool.subpool(1000000).unwrap(), - metrics.clone(), - sid_backup_per_participant.clone(), - )]); - let participants = RwLock::new(vec![]); + pub fn new(participant_id: Pid, thread_pool: &ThreadPool) -> Self { + //let participants = RwLock::new(vec![]); + let p = participant_id; + debug!(?p, "starting Network"); + let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) = + Scheduler::new(participant_id); + thread_pool.execute(move || { + let _handle = task::block_on( + scheduler + .run() + .instrument(tracing::info_span!("scheduler", ?p)), + ); + }); Self { - _token_pool: token_pool, - _worker_pool: worker_pool, - controller, - _thread_pool: thread_pool, - participant_id, - sid_backup_per_participant, - participants, - _metrics: metrics, + local_pid: participant_id, + participants: RwLock::new(HashMap::new()), + listen_sender: RwLock::new(listen_sender), + connect_sender: RwLock::new(connect_sender), + connected_receiver: RwLock::new(connected_receiver), + shutdown_sender: Some(shutdown_sender), } } - fn get_lowest_worker<'a: 'b, 'b>(list: &'a Arc>) -> &'a Controller { &list[0] } - - pub fn listen(&self, address: &Address) -> Result<(), NetworkError> { - let span = span!(Level::TRACE, "listen", ?address); - let worker = Self::get_lowest_worker(&self.controller); - let _enter = span.enter(); - match address { - Address::Tcp(a) => { - let tcp_listener = TcpListener::bind(&a)?; - info!("listening"); - worker.get_tx().send(CtrlMsg::Register( - TokenObjects::TcpListener(tcp_listener), - Ready::readable(), - PollOpt::edge(), - ))?; - }, - Address::Udp(_) => unimplemented!( - "UDP is currently not supportet problem is in internal worker - channel view. I \ - except to have every Channel it#s own socket, but UDP shares a Socket with \ - everyone on it. So there needs to be a instance that detects new connections \ - inside worker and then creates a new channel for them, while handling needs to \ - be done in UDP layer... however i am to lazy to build it yet." - ), - Address::Mpsc(a) => { - let (listen_tx, listen_rx) = mio_extras::channel::channel(); - let (connect_tx, conntect_rx) = mio_extras::channel::channel(); - let mut registry = (*crate::mpsc::MPSC_REGISTRY).write().unwrap(); - registry.insert(*a, Mutex::new((listen_tx, conntect_rx))); - info!("listening"); - let mpsc_channel = MpscChannel::new(connect_tx, listen_rx); - let mut channel = Channel::new( - self.participant_id, - ChannelProtocols::Mpsc(mpsc_channel), - self.sid_backup_per_participant.clone(), - None, - ); - channel.handshake(); - channel.tick_send(); - worker.get_tx().send(CtrlMsg::Register( - TokenObjects::Channel(channel), - Ready::readable() | Ready::writable(), - PollOpt::edge(), - ))?; - }, - }; + pub fn listen(&self, address: Address) -> Result<(), NetworkError> { + task::block_on(async { self.listen_sender.write().await.send(address).await }).unwrap(); Ok(()) } - pub async fn connect(&self, address: &Address) -> Result { - let worker = Self::get_lowest_worker(&self.controller); - let sid_backup_per_participant = self.sid_backup_per_participant.clone(); - let span = span!(Level::INFO, "connect", ?address); - let _enter = span.enter(); - match address { - Address::Tcp(a) => { - info!("connecting"); - let tcp_stream = TcpStream::connect(&a)?; - let tcp_channel = TcpChannel::new(tcp_stream); - let (ctrl_tx, ctrl_rx) = mpsc::channel::(); - let channel = Channel::new( - self.participant_id, - ChannelProtocols::Tcp(tcp_channel), - sid_backup_per_participant, - Some(ctrl_tx), - ); - worker.get_tx().send(CtrlMsg::Register( - TokenObjects::Channel(channel), - Ready::readable() | Ready::writable(), - PollOpt::edge(), - ))?; - let remote_pid = ctrl_rx.recv().unwrap(); - info!(?remote_pid, " sucessfully connected to"); - let part = Participant { - remote_pid, - network_controller: self.controller.clone(), - }; - self.participants.write().unwrap().push(part.clone()); - return Ok(part); + pub async fn connect(&self, address: Address) -> Result, NetworkError> { + let (pid_sender, pid_receiver) = oneshot::channel::(); + self.connect_sender + .write() + .await + .send((address, pid_sender)) + .await + .unwrap(); + match pid_receiver.await { + Ok(participant) => { + let pid = participant.remote_pid; + debug!(?pid, "received Participant from remote"); + let participant = Arc::new(participant); + self.participants + .write() + .await + .insert(participant.remote_pid, participant.clone()); + Ok(participant) }, - Address::Udp(_) => unimplemented!("lazy me"), - Address::Mpsc(a) => { - let mut registry = (*crate::mpsc::MPSC_REGISTRY).write().unwrap(); - let (listen_tx, conntect_rx) = match registry.remove(a) { - Some(x) => x.into_inner().unwrap(), - None => { - error!("could not connect to mpsc"); - return Err(NetworkError::NetworkDestroyed); - }, - }; - info!("connect to mpsc"); - let mpsc_channel = MpscChannel::new(listen_tx, conntect_rx); - let (ctrl_tx, ctrl_rx) = mpsc::channel::(); - let channel = Channel::new( - self.participant_id, - ChannelProtocols::Mpsc(mpsc_channel), - self.sid_backup_per_participant.clone(), - Some(ctrl_tx), - ); - worker.get_tx().send(CtrlMsg::Register( - TokenObjects::Channel(channel), - Ready::readable() | Ready::writable(), - PollOpt::edge(), - ))?; + Err(_) => Err(NetworkError {}), + } + } - let remote_pid = ctrl_rx.recv().unwrap(); - info!(?remote_pid, " sucessfully connected to"); - let part = Participant { - remote_pid, - network_controller: self.controller.clone(), - }; - self.participants.write().unwrap().push(part.clone()); - return Ok(part); + pub async fn connected(&self) -> Result, NetworkError> { + match self.connected_receiver.write().await.next().await { + Some(participant) => { + let participant = Arc::new(participant); + self.participants + .write() + .await + .insert(participant.remote_pid, participant.clone()); + Ok(participant) }, + None => Err(NetworkError {}), } } - pub fn disconnect(&self, _participant: Participant) -> Result<(), NetworkError> { - //todo: close all channels to a participant! - unimplemented!("sda"); - } + pub async fn disconnect(&self, participant: Arc) -> Result<(), NetworkError> { + // Remove, Close and try_unwrap error when unwrap fails! + let participant = self + .participants + .write() + .await + .remove(&participant.remote_pid) + .unwrap(); + participant.closed.store(true, Ordering::Relaxed); - pub fn participants(&self) -> std::sync::RwLockReadGuard> { - self.participants.read().unwrap() - } - - pub async fn connected(&self) -> Result { - // returns if a Participant connected and is ready - loop { - //ARRGGG - for worker in self.controller.iter() { - //TODO harden! - worker.tick(); - if let Ok(remote_pid) = worker.get_participant_connect_rx().try_recv() { - let part = Participant { - remote_pid, - network_controller: self.controller.clone(), - }; - self.participants.write().unwrap().push(part.clone()); - return Ok(part); - }; - } - std::thread::sleep(std::time::Duration::from_millis(1)); - } - } - - pub fn multisend( - &self, - streams: Vec, - msg: M, - ) -> Result<(), NetworkError> { - let messagebuffer = Arc::new(message::serialize(&msg)); - //TODO: why do we need a look here, i want my own local directory which is - // updated by workes via a channel and needs to be intepreted on a send but it - // should almost ever be empty except for new channel creations and stream - // creations! - for stream in streams { - stream - .ctr_tx - .send(CtrlMsg::Send(OutGoingMessage { - buffer: messagebuffer.clone(), - cursor: 0, - mid: None, - sid: stream.sid, - })) - .unwrap(); - } + if Arc::try_unwrap(participant).is_err() { + warn!( + "you are disconnecting and still keeping a reference to this participant, this is \ + a bad idea. Participant will only be dropped when you drop your last reference" + ); + }; Ok(()) } } +//TODO: HANDLE SHUTDOWN_RECEIVER + impl Participant { - pub fn open(&self, prio: u8, promises: EnumSet) -> Result { - let (msg_tx, msg_rx) = futures::channel::mpsc::unbounded::(); - for controller in self.network_controller.iter() { - //trigger tick: - controller.tick(); - let parts = controller.participants(); - let (stream_close_tx, stream_close_rx) = mpsc::channel(); - let sid = match parts.get(&self.remote_pid) { - Some(p) => { - let sid = p.sid_pool.write().unwrap().next(); - //prepare the closing of the new stream already - p.stream_close_txs - .write() - .unwrap() - .insert(sid, stream_close_tx); - sid - }, - None => return Err(ParticipantError::ParticipantDisconected), /* TODO: participant was never connected in the first case maybe... */ - }; - let tx = controller.get_tx(); - tx.send(CtrlMsg::OpenStream { - pid: self.remote_pid, - sid, - prio, - promises, - msg_tx, - }) - .unwrap(); - info!(?sid, " sucessfully opened stream"); - return Ok(Stream::new( - sid, - self.remote_pid, - stream_close_rx, - msg_rx, - tx, - )); + pub(crate) fn new( + local_pid: Pid, + remote_pid: Pid, + stream_open_sender: mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender)>, + stream_opened_receiver: mpsc::UnboundedReceiver, + shutdown_receiver: oneshot::Receiver<()>, + disconnect_sender: mpsc::UnboundedSender, + ) -> Self { + Self { + local_pid, + remote_pid, + stream_open_sender: RwLock::new(stream_open_sender), + stream_opened_receiver: RwLock::new(stream_opened_receiver), + shutdown_receiver: RwLock::new(shutdown_receiver), + closed: AtomicBool::new(false), + disconnect_sender: Some(disconnect_sender), + } + } + + pub async fn open(&self, prio: u8, promises: Promises) -> Result { + let (sid_sender, sid_receiver) = oneshot::channel(); + self.stream_open_sender + .write() + .await + .send((prio, promises, sid_sender)) + .await + .unwrap(); + match sid_receiver.await { + Ok(stream) => { + let sid = stream.sid; + debug!(?sid, "opened stream"); + Ok(stream) + }, + Err(_) => Err(ParticipantError {}), } - Err(ParticipantError::ParticipantDisconected) } pub async fn opened(&self) -> Result { - //TODO: make this async native! - loop { - // Going to all workers in a network, but only receive on specific channels! - for worker in self.network_controller.iter() { - worker.tick(); - let parts = worker.participants(); - if let Some(p) = parts.get(&self.remote_pid) { - if let Ok(stream) = p.stream_open_rx.try_recv() { - //need a try, as i depend on the tick, it's the same thread... - debug!("delivering a stream"); - return Ok(stream); - }; - } - } + match self.stream_opened_receiver.write().await.next().await { + Some(stream) => Ok(stream), + None => Err(ParticipantError {}), } } } impl Stream { - //TODO: What about SEND instead of Serializeable if it goes via PIPE ? - //TODO: timeout per message or per stream ? stream or ? like for Position Data, - // if not transmitted within 1 second, throw away... pub(crate) fn new( + pid: Pid, sid: Sid, - remote_pid: Pid, - closed_rx: mpsc::Receiver<()>, - msg_rx: futures::channel::mpsc::UnboundedReceiver, - ctr_tx: mio_extras::channel::Sender, + prio: Prio, + promises: Promises, + msg_send_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, + msg_recv_receiver: mpsc::UnboundedReceiver, + shutdown_receiver: oneshot::Receiver<()>, + shutdown_sender: mpsc::UnboundedSender, ) -> Self { Self { + pid, sid, - remote_pid, + mid: 0, + prio, + promises, + msg_send_sender, + msg_recv_receiver, + shutdown_receiver, closed: AtomicBool::new(false), - closed_rx, - msg_rx, - ctr_tx, + shutdown_sender: Some(shutdown_sender), } } - pub fn send(&self, msg: M) -> Result<(), StreamError> { - if self.is_closed() { - return Err(StreamError::StreamClosed); - } + pub async fn send(&mut self, msg: M) -> Result<(), StreamError> { let messagebuffer = Arc::new(message::serialize(&msg)); - self.ctr_tx - .send(CtrlMsg::Send(OutGoingMessage { + self.msg_send_sender + .send((self.prio, self.pid, self.sid, OutGoingMessage { buffer: messagebuffer, cursor: 0, - mid: None, + mid: self.mid, sid: self.sid, })) .unwrap(); + self.mid += 1; Ok(()) } pub async fn recv(&mut self) -> Result { - if self.is_closed() { - return Err(StreamError::StreamClosed); - } - match self.msg_rx.next().await { + match self.msg_recv_receiver.next().await { Some(msg) => { info!(?msg, "delivering a message"); Ok(message::deserialize(msg.buffer)) @@ -382,68 +256,47 @@ impl Stream { ), } } + //Todo: ERROR: TODO: implement me and the disconnecting! +} - pub fn close(mut self) -> Result<(), StreamError> { self.intclose() } - - fn is_closed(&self) -> bool { - use core::sync::atomic::Ordering; - if self.closed.load(Ordering::Relaxed) { - true - } else { - if let Ok(()) = self.closed_rx.try_recv() { - self.closed.store(true, Ordering::SeqCst); //TODO: Is this the right Ordering? - true - } else { - false - } - } +impl Drop for Network { + fn drop(&mut self) { + let p = self.local_pid; + debug!(?p, "shutting down Network"); + self.shutdown_sender.take().unwrap().send(()).unwrap(); } +} - fn intclose(&mut self) -> Result<(), StreamError> { - use core::sync::atomic::Ordering; - if self.is_closed() { - return Err(StreamError::StreamClosed); +impl Drop for Participant { + fn drop(&mut self) { + if !self.closed.load(Ordering::Relaxed) { + let p = self.remote_pid; + debug!(?p, "shutting down Participant"); + task::block_on(async { + self.disconnect_sender + .take() + .unwrap() + .send(self.remote_pid) + .await + .unwrap() + }); } - self.ctr_tx - .send(CtrlMsg::CloseStream { - pid: self.remote_pid, - sid: self.sid, - }) - .unwrap(); - self.closed.store(true, Ordering::SeqCst); //TODO: Is this the right Ordering? - Ok(()) } } impl Drop for Stream { fn drop(&mut self) { - let _ = self.intclose().map_err( - |e| error!(?self.sid, ?e, "could not properly shutdown stream, which got out of scope"), - ); + if !self.closed.load(Ordering::Relaxed) { + let s = self.sid; + debug!(?s, "shutting down Stream"); + task::block_on(async { + self.shutdown_sender + .take() + .unwrap() + .send(self.sid) + .await + .unwrap() + }); + } } } - -#[derive(Debug)] -pub enum NetworkError { - NetworkDestroyed, - WorkerDestroyed, - IoError(std::io::Error), -} - -#[derive(Debug, PartialEq)] -pub enum ParticipantError { - ParticipantDisconected, -} - -#[derive(Debug, PartialEq)] -pub enum StreamError { - StreamClosed, -} - -impl From for NetworkError { - fn from(err: std::io::Error) -> Self { NetworkError::IoError(err) } -} - -impl From> for NetworkError { - fn from(_err: mio_extras::channel::SendError) -> Self { NetworkError::WorkerDestroyed } -} diff --git a/network/src/async_serde.rs b/network/src/async_serde.rs new file mode 100644 index 0000000000..37fd6f2eb8 --- /dev/null +++ b/network/src/async_serde.rs @@ -0,0 +1,178 @@ +/* +use ::uvth::ThreadPool; +use bincode; +use serde::{de::DeserializeOwned, Serialize}; +use std::{ + future::Future, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll, Waker}, +}; + +pub struct SerializeFuture { + shared_state: Arc>, +} + +struct SerializeSharedState { + result: Option>, + waker: Option, +} + +pub struct DeserializeFuture { + shared_state: Arc>>, +} + +struct DeserializeSharedState { + result: Option, + waker: Option, +} + +impl Future for SerializeFuture { + type Output = Vec; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut shared_state = self.shared_state.lock().unwrap(); + if shared_state.result.is_some() { + Poll::Ready(shared_state.result.take().unwrap()) + } else { + shared_state.waker = Some(cx.waker().clone()); + Poll::Pending + } + } +} + +impl SerializeFuture { + pub fn new(message: M, pool: &ThreadPool) -> Self { + let shared_state = Arc::new(Mutex::new(SerializeSharedState { + result: None, + waker: None, + })); + // Spawn the new thread + let thread_shared_state = shared_state.clone(); + pool.execute(move || { + let mut writer = { + let actual_size = bincode::serialized_size(&message).unwrap(); + Vec::::with_capacity(actual_size as usize) + }; + if let Err(e) = bincode::serialize_into(&mut writer, &message) { + panic!( + "bincode serialize error, probably undefined behavior somewhere else, check \ + the possible error types of `bincode::serialize_into`: {}", + e + ); + }; + + let mut shared_state = thread_shared_state.lock().unwrap(); + shared_state.result = Some(writer); + if let Some(waker) = shared_state.waker.take() { + waker.wake() + } + }); + + Self { shared_state } + } +} + +impl Future for DeserializeFuture { + type Output = M; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut shared_state = self.shared_state.lock().unwrap(); + if shared_state.result.is_some() { + Poll::Ready(shared_state.result.take().unwrap()) + } else { + shared_state.waker = Some(cx.waker().clone()); + Poll::Pending + } + } +} + +impl DeserializeFuture { + pub fn new(data: Vec, pool: &ThreadPool) -> Self { + let shared_state = Arc::new(Mutex::new(DeserializeSharedState { + result: None, + waker: None, + })); + // Spawn the new thread + let thread_shared_state = shared_state.clone(); + pool.execute(move || { + let decoded: M = bincode::deserialize(data.as_slice()).unwrap(); + + let mut shared_state = thread_shared_state.lock().unwrap(); + shared_state.result = Some(decoded); + if let Some(waker) = shared_state.waker.take() { + waker.wake() + } + }); + + Self { shared_state } + } +} +*/ +/* +#[cfg(test)] +mod tests { + use crate::{ + async_serde::*, + message::{MessageBuffer, OutGoingMessage}, + types::{Frame, Sid}, + }; + use std::{collections::VecDeque, sync::Arc}; + use uvth::ThreadPoolBuilder; + + use async_std::{ + io::BufReader, + net::{TcpListener, TcpStream, ToSocketAddrs}, + prelude::*, + task, + }; + #[macro_use] use futures; + + async fn tick_tock(msg: String, pool: &ThreadPool) { + let serialized = SerializeFuture::new(msg.clone(), pool).await; + let deserialized = DeserializeFuture::::new(serialized, pool).await; + assert_eq!(msg, deserialized) + } + + #[test] + fn multiple_serialize() { + let msg = "ThisMessageisexactly100charactersLongToPrecislyMeassureSerialisation_SoYoucanSimplyCountThe123inhere".to_string(); + let pool = ThreadPoolBuilder::new().build(); + let (r1, r2, r3) = task::block_on(async { + let s1 = SerializeFuture::new(msg.clone(), &pool); + let s2 = SerializeFuture::new(msg.clone(), &pool); + let s3 = SerializeFuture::new(msg.clone(), &pool); + futures::join!(s1, s2, s3) + }); + assert_eq!(r1.len(), 108); + assert_eq!(r2.len(), 108); + assert_eq!(r3.len(), 108); + } + + #[test] + fn await_serialize() { + let msg = "ThisMessageisexactly100charactersLongToPrecislyMeassureSerialisation_SoYoucanSimplyCountThe123inhere".to_string(); + let pool = ThreadPoolBuilder::new().build(); + task::block_on(async { + let r1 = SerializeFuture::new(msg.clone(), &pool).await; + let r2 = SerializeFuture::new(msg.clone(), &pool).await; + let r3 = SerializeFuture::new(msg.clone(), &pool).await; + assert_eq!(r1.len(), 108); + assert_eq!(r2.len(), 108); + assert_eq!(r3.len(), 108); + }); + } + + #[test] + fn multiple_serialize_deserialize() { + let msg = "ThisMessageisexactly100charactersLongToPrecislyMeassureSerialisation_SoYoucanSimplyCountThe123inhere".to_string(); + let pool = ThreadPoolBuilder::new().build(); + task::block_on(async { + let s1 = tick_tock(msg.clone(), &pool); + let s2 = tick_tock(msg.clone(), &pool); + let s3 = tick_tock(msg.clone(), &pool); + futures::join!(s1, s2, s3) + }); + } +} +*/ diff --git a/network/src/channel.rs b/network/src/channel.rs index d82e0400d5..8a5d84c1ca 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -1,560 +1,306 @@ use crate::{ - api::Promise, - message::{InCommingMessage, MessageBuffer, OutGoingMessage}, - mpsc::MpscChannel, - tcp::TcpChannel, + frames::Frame, types::{ - Frame, IntStream, Pid, RtrnMsg, Sid, DEFAULT_SID_SIZE, VELOREN_MAGIC_NUMBER, + Cid, NetworkBuffer, Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION, }, - udp::UdpChannel, }; -use enumset::EnumSet; -use futures::{executor::block_on, sink::SinkExt}; -use rand::{thread_rng, Rng}; -use std::{ - collections::{HashMap, VecDeque}, - sync::{mpsc, Arc, RwLock}, -}; -use tlid; +use async_std::{net::TcpStream, prelude::*, sync::RwLock}; +use futures::{channel::mpsc, future::FutureExt, select, sink::SinkExt, stream::StreamExt}; use tracing::*; +//use futures::prelude::*; -pub(crate) trait ChannelProtocol { - type Handle: ?Sized + mio::Evented; - /// Execute when ready to read - fn read(&mut self) -> Vec; - /// Execute when ready to write - fn write>(&mut self, frames: &mut I); - /// used for mio - fn get_handle(&self) -> &Self::Handle; -} - -#[derive(Debug)] -pub(crate) enum ChannelProtocols { - Tcp(TcpChannel), - Udp(UdpChannel), - Mpsc(MpscChannel), -} - -#[derive(Debug)] pub(crate) struct Channel { - pub stream_id_pool: Option>>, /* TODO: stream_id unique per - * participant */ - // participantd - pub randomno: u64, - pub local_pid: Pid, - pub remote_pid: Option, - pub sid_backup_per_participant: Arc>>>>, - pub streams: Vec, - pub send_queue: VecDeque, - pub protocol: ChannelProtocols, - pub return_pid_to: Option>, //use for network::connect() - pub send_handshake: bool, - pub send_pid: bool, - pub send_config: bool, - pub send_shutdown: bool, - pub recv_handshake: bool, - pub recv_pid: bool, - pub recv_config: bool, - pub recv_shutdown: bool, + cid: Cid, + local_pid: Pid, + remote_pid: RwLock>, + send_state: RwLock, + recv_state: RwLock, } -/* - Participant A - Participant B - A sends Handshake - B receives Handshake and answers with Handshake - A receives Handshake and answers with ParticipantId - B receives ParticipantId and answeres with ParticipantId - A receives ParticipantId and answers with Configuration for Streams and Messages - --- - A and B can now concurrently open Streams and send messages - --- - Shutdown phase -*/ +#[derive(Debug, PartialEq)] +enum ChannelState { + None, + Handshake, + Pid, + Shutdown, +} impl Channel { const WRONG_NUMBER: &'static [u8] = "Handshake does not contain the magic number requiered by \ veloren server.\nWe are not sure if you are a valid \ veloren client.\nClosing the connection" .as_bytes(); - const WRONG_VERSION: &'static str = "Handshake does not contain a correct magic number, but \ + const WRONG_VERSION: &'static str = "Handshake does contain a correct magic number, but \ invalid version.\nWe don't know how to communicate with \ - you.\n"; + you.\nClosing the connection"; - pub fn new( - local_pid: Pid, - protocol: ChannelProtocols, - sid_backup_per_participant: Arc>>>>, - return_pid_to: Option>, - ) -> Self { - let randomno = thread_rng().gen(); - warn!(?randomno, "new channel,yay "); + pub fn new(cid: u64, local_pid: Pid) -> Self { Self { - randomno, - stream_id_pool: None, + cid, local_pid, - remote_pid: None, - sid_backup_per_participant, - streams: Vec::new(), - send_queue: VecDeque::new(), - protocol, - return_pid_to, - send_handshake: false, - send_pid: false, - send_config: false, - send_shutdown: false, - recv_handshake: false, - recv_pid: false, - recv_config: false, - recv_shutdown: false, + remote_pid: RwLock::new(None), + send_state: RwLock::new(ChannelState::None), + recv_state: RwLock::new(ChannelState::None), } } - pub fn can_send(&self) -> bool { - self.remote_pid.is_some() - && self.recv_handshake - && self.send_pid - && self.recv_pid - && (self.send_config || self.recv_config) - && !self.send_shutdown - && !self.recv_shutdown - } - - pub fn tick_recv( - &mut self, - worker_participants: &mut HashMap>>, - rtrn_tx: &mpsc::Sender, + /// (prot|part)_(in|out)_(sender|receiver) + /// prot: TO/FROM PROTOCOL = TCP + /// part: TO/FROM PARTICIPANT + /// in: FROM + /// out: TO + /// sender: mpsc::Sender + /// receiver: mpsc::Receiver + pub async fn run( + self, + protocol: TcpStream, + part_in_receiver: mpsc::UnboundedReceiver, + part_out_sender: mpsc::UnboundedSender<(Cid, Frame)>, + configured_sender: mpsc::UnboundedSender<(Cid, Pid, Sid)>, ) { - match &mut self.protocol { - ChannelProtocols::Tcp(c) => { - for frame in c.read() { - self.handle(frame, worker_participants, rtrn_tx); - } - }, - ChannelProtocols::Udp(c) => { - for frame in c.read() { - self.handle(frame, worker_participants, rtrn_tx); - } - }, - ChannelProtocols::Mpsc(c) => { - for frame in c.read() { - self.handle(frame, worker_participants, rtrn_tx); - } - }, - } + let (prot_in_sender, prot_in_receiver) = mpsc::unbounded::(); + let (prot_out_sender, prot_out_receiver) = mpsc::unbounded::(); + + futures::join!( + self.read(protocol.clone(), prot_in_sender), + self.write(protocol, prot_out_receiver, part_in_receiver), + self.frame_handler( + prot_in_receiver, + prot_out_sender, + part_out_sender, + configured_sender + ) + ); + + //return part_out_receiver; } - pub fn tick_send(&mut self) { - self.tick_streams(); - match &mut self.protocol { - ChannelProtocols::Tcp(c) => { - c.write(&mut self.send_queue.drain(..)); - }, - ChannelProtocols::Udp(c) => { - c.write(&mut self.send_queue.drain(..)); - }, - ChannelProtocols::Mpsc(c) => { - c.write(&mut self.send_queue.drain(..)); - }, - } - } - - fn handle( - &mut self, - frame: Frame, - worker_participants: &mut HashMap>>, - rtrn_tx: &mpsc::Sender, + pub async fn frame_handler( + &self, + mut frames: mpsc::UnboundedReceiver, + mut frame_sender: mpsc::UnboundedSender, + mut external_frame_sender: mpsc::UnboundedSender<(Cid, Frame)>, + mut configured_sender: mpsc::UnboundedSender<(Cid, Pid, Sid)>, ) { - match frame { - Frame::Handshake { - magic_number, - version, - } => { - if magic_number != VELOREN_MAGIC_NUMBER { - error!( - ?magic_number, - "connection with invalid magic_number, closing connection" - ); - self.wrong_shutdown(Self::WRONG_NUMBER); - } - if version != VELOREN_NETWORK_VERSION { - error!(?version, "tcp connection with wrong network version"); - self.wrong_shutdown( + const ERR_S: &str = "Got A Raw Message, these are usually Debug Messages indicating that \ + something went wrong on network layer and connection will be closed"; + while let Some(frame) = frames.next().await { + trace!(?frame, "recv frame"); + match frame { + Frame::Handshake { + magic_number, + version, + } => { + if self + .verify_handshake(magic_number, version, &mut frame_sender) + .await + .is_ok() + { + debug!("handshake completed"); + *self.recv_state.write().await = ChannelState::Handshake; + if *self.send_state.read().await == ChannelState::Handshake { + self.send_pid(&mut frame_sender).await; + } else { + self.send_handshake(&mut frame_sender).await; + } + }; + }, + Frame::ParticipantId { pid } => { + if self.remote_pid.read().await.is_some() { + error!(?pid, "invalid message, cant change participantId"); + return; + } + *self.remote_pid.write().await = Some(pid); + *self.recv_state.write().await = ChannelState::Pid; + debug!(?pid, "Participant send their ID"); + let stream_id_offset = if *self.send_state.read().await != ChannelState::Pid { + self.send_pid(&mut frame_sender).await; + STREAM_ID_OFFSET2 + } else { + STREAM_ID_OFFSET1 + }; + info!(?pid, "this channel is now configured!"); + configured_sender + .send((self.cid, pid, stream_id_offset)) + .await + .unwrap(); + }, + Frame::Shutdown => { + info!("shutdown signal received"); + *self.recv_state.write().await = ChannelState::Shutdown; + }, + /* Sending RAW is only used for debug purposes in case someone write a + * new API against veloren Server! */ + Frame::Raw(bytes) => match std::str::from_utf8(bytes.as_slice()) { + Ok(string) => error!(?string, ERR_S), + _ => error!(?bytes, ERR_S), + }, + _ => { + trace!("forward frame"); + external_frame_sender.send((self.cid, frame)).await.unwrap(); + }, + } + } + } + + pub async fn read( + &self, + mut protocol: TcpStream, + mut frame_handler: mpsc::UnboundedSender, + ) { + let mut buffer = NetworkBuffer::new(); + loop { + match protocol.read(buffer.get_write_slice(2048)).await { + Ok(0) => { + debug!(?buffer, "shutdown of tcp channel detected"); + frame_handler.send(Frame::Shutdown).await.unwrap(); + break; + }, + Ok(n) => { + buffer.actually_written(n); + trace!("incomming message with len: {}", n); + let slice = buffer.get_read_slice(); + let mut cur = std::io::Cursor::new(slice); + let mut read_ok = 0; + while cur.position() < n as u64 { + let round_start = cur.position() as usize; + let r: Result = bincode::deserialize_from(&mut cur); + match r { + Ok(frame) => { + frame_handler.send(frame).await.unwrap(); + read_ok = cur.position() as usize; + }, + Err(e) => { + // Probably we have to wait for moare data! + let first_bytes_of_msg = + &slice[round_start..std::cmp::min(n, round_start + 16)]; + debug!( + ?buffer, + ?e, + ?n, + ?round_start, + ?first_bytes_of_msg, + "message cant be parsed, probably because we need to wait for \ + more data" + ); + break; + }, + } + } + buffer.actually_read(read_ok); + }, + Err(e) => panic!("{}", e), + } + } + } + + pub async fn write( + &self, + mut protocol: TcpStream, + mut internal_frame_receiver: mpsc::UnboundedReceiver, + mut external_frame_receiver: mpsc::UnboundedReceiver, + ) { + while let Some(frame) = select! { + next = internal_frame_receiver.next().fuse() => next, + next = external_frame_receiver.next().fuse() => next, + } { + //dezerialize here as this is executed in a seperate thread PER channel. + // Limites Throughput per single Receiver but stays in same thread (maybe as its + // in a threadpool) + trace!(?frame, "going to send frame via tcp"); + let data = bincode::serialize(&frame).unwrap(); + protocol.write_all(data.as_slice()).await.unwrap(); + } + } + + async fn verify_handshake( + &self, + magic_number: String, + version: [u32; 3], + frame_sender: &mut mpsc::UnboundedSender, + ) -> Result<(), ()> { + if magic_number != VELOREN_MAGIC_NUMBER { + error!(?magic_number, "connection with invalid magic_number"); + #[cfg(debug_assertions)] + { + debug!("sending client instructions before killing"); + frame_sender + .send(Frame::Raw(Self::WRONG_NUMBER.to_vec())) + .await + .unwrap(); + frame_sender.send(Frame::Shutdown).await.unwrap(); + *self.send_state.write().await = ChannelState::Shutdown; + } + return Err(()); + } + if version != VELOREN_NETWORK_VERSION { + error!(?version, "connection with wrong network version"); + #[cfg(debug_assertions)] + { + debug!("sending client instructions before killing"); + frame_sender + .send(Frame::Raw( format!( "{} Our Version: {:?}\nYour Version: {:?}\nClosing the connection", Self::WRONG_VERSION, VELOREN_NETWORK_VERSION, version, ) - .as_bytes(), - ); - } - debug!("handshake completed"); - self.recv_handshake = true; - if self.send_handshake { - self.send_queue.push_back(Frame::ParticipantId { - pid: self.local_pid, - }); - self.send_pid = true; - } else { - self.send_queue.push_back(Frame::Handshake { - magic_number: VELOREN_MAGIC_NUMBER.to_string(), - version: VELOREN_NETWORK_VERSION, - }); - self.send_handshake = true; - } - }, - Frame::ParticipantId { pid } => { - if self.remote_pid.is_some() { - error!(?pid, "invalid message, cant change participantId"); - return; - } - self.remote_pid = Some(pid); - debug!(?pid, "Participant send their ID"); - self.recv_pid = true; - if self.send_pid { - //If participant is unknown to worker, assign some range from global pool - if !worker_participants.contains_key(&pid) { - let mut global_participants = - self.sid_backup_per_participant.write().unwrap(); - //if this is the first time a participant connects to this Controller - if !global_participants.contains_key(&pid) { - // I dont no participant, so i can safely assume that they don't know - // me. so HERE we gonna fill local network pool - global_participants.insert(pid, tlid::Pool::new_full()); - } - //grab a range for controller - let global_part_pool = global_participants.get_mut(&pid).unwrap(); - - let mut local_controller_sids = - tlid::subpool_wrapping(global_part_pool, DEFAULT_SID_SIZE).unwrap(); - let remote_controller_sids = - tlid::subpool_wrapping(global_part_pool, DEFAULT_SID_SIZE).unwrap(); - let mut local_worker_sids = - tlid::subpool_wrapping(global_part_pool, DEFAULT_SID_SIZE).unwrap(); - let remote_worker_sids = - tlid::subpool_wrapping(global_part_pool, DEFAULT_SID_SIZE).unwrap(); - - let local_controller_range = - tlid::RemoveAllocation::new(&mut local_controller_sids); - let local_worker_range = - tlid::RemoveAllocation::new(&mut local_worker_sids); - - worker_participants.insert(pid.clone(), local_worker_sids); - self.send_queue.push_back(Frame::Configure { - sender_controller_sids: local_controller_range, - sender_worker_sids: local_worker_range, - receiver_controller_sids: remote_controller_sids, - receiver_worker_sids: remote_worker_sids, - }); - self.send_config = true; - info!(?pid, "this channel is now configured!"); - if let Err(err) = rtrn_tx.send(RtrnMsg::ConnectedParticipant { - controller_sids: local_controller_sids, - pid, - }) { - error!(?err, "couldn't notify, is network already closed ?"); - } - } else { - warn!( - "a known participant opened an additional channel, UNCHECKED BECAUSE \ - NO TOKEN WAS IMPLEMENTED IN THE HANDSHAKE!" - ); - } - } else { - self.send_queue.push_back(Frame::ParticipantId { - pid: self.local_pid, - }); - self.send_pid = true; - } - }, - Frame::Configure { - sender_controller_sids, - sender_worker_sids, - mut receiver_controller_sids, - mut receiver_worker_sids, - } => { - let pid = match self.remote_pid { - Some(pid) => pid, - None => { - error!("Cant configure a Channel without a PID first!"); - return; - }, - }; - self.recv_config = true; - //Check if worker already knows about this participant - if !worker_participants.contains_key(&pid) { - let mut global_participants = self.sid_backup_per_participant.write().unwrap(); - if !global_participants.contains_key(&pid) { - // I dont no participant, so i can safely assume that they don't know me. so - // HERE we gonna fill local network pool - global_participants.insert(pid, tlid::Pool::new_full()); - } - //grab a range for controller - let global_part_pool = global_participants.get_mut(&pid).unwrap(); - - sender_controller_sids - .remove_from(global_part_pool) - .unwrap(); - sender_worker_sids.remove_from(global_part_pool).unwrap(); - tlid::RemoveAllocation::new(&mut receiver_controller_sids) - .remove_from(global_part_pool) - .unwrap(); - tlid::RemoveAllocation::new(&mut receiver_worker_sids) - .remove_from(global_part_pool) - .unwrap(); - - worker_participants.insert(pid.clone(), receiver_worker_sids); - if let Err(err) = rtrn_tx.send(RtrnMsg::ConnectedParticipant { - pid, - controller_sids: receiver_controller_sids, - }) { - error!(?err, "couldn't notify, is network already closed ?"); - } - if let Some(send) = &self.return_pid_to { - if let Err(err) = send.send(pid) { - error!( - ?err, - "couldn't notify of connected participant, is network already \ - closed ?" - ); - } - }; - self.return_pid_to = None; - } else { - warn!( - "a known participant opened an additional channel, UNCHECKED BECAUSE NO \ - TOKEN WAS IMPLEMENTED IN THE HANDSHAKE!" - ); - } - info!("recv config. This channel is now configured!"); - }, - Frame::Shutdown => { - self.recv_shutdown = true; - info!("shutting down channel"); - if let Err(err) = rtrn_tx.send(RtrnMsg::Shutdown) { - error!(?err, "couldn't notify of shutdown"); - } - }, - Frame::OpenStream { - sid, - prio, - promises, - } => { - if let Some(pid) = self.remote_pid { - let (msg_tx, msg_rx) = futures::channel::mpsc::unbounded::(); - let stream = IntStream::new(sid, prio, promises.clone(), msg_tx); - - trace!(?self.streams, "-OPEN STREAM- going to modify streams"); - self.streams.push(stream); - trace!(?self.streams, "-OPEN STREAM- did to modify streams"); - info!("opened a stream"); - if let Err(err) = rtrn_tx.send(RtrnMsg::OpendStream { - pid, - sid, - prio, - msg_rx, - promises, - }) { - error!(?err, "couldn't notify of opened stream"); - } - } else { - error!("called OpenStream before PartcipantID!"); - } - }, - Frame::CloseStream { sid } => { - if let Some(pid) = self.remote_pid { - trace!(?self.streams, "-CLOSE STREAM- going to modify streams"); - self.streams.retain(|stream| stream.sid() != sid); - trace!(?self.streams, "-CLOSE STREAM- did to modify streams"); - info!("closed a stream"); - if let Err(err) = rtrn_tx.send(RtrnMsg::ClosedStream { pid, sid }) { - error!(?err, "couldn't notify of closed stream"); - } - } - }, - Frame::DataHeader { mid, sid, length } => { - debug!("Data Header {}", sid); - let imsg = InCommingMessage { - buffer: MessageBuffer { data: Vec::new() }, - length, - mid, - sid, - }; - let mut found = false; - for s in &mut self.streams { - if s.sid() == sid { - //TODO: move to Hashmap, so slow - s.to_receive.push_back(imsg); - found = true; - break; - } - } - if !found { - error!("couldn't find stream with sid: {}", sid); - } - }, - Frame::Data { - id, - start: _, //TODO: use start to verify! - mut data, - } => { - debug!("Data Package {}, len: {}", id, data.len()); - let mut found = false; - for s in &mut self.streams { - let mut pos = None; - for i in 0..s.to_receive.len() { - let m = &mut s.to_receive[i]; - if m.mid == id { - found = true; - m.buffer.data.append(&mut data); - if m.buffer.data.len() as u64 == m.length { - pos = Some(i); - break; - }; - }; - } - if let Some(pos) = pos { - let sid = s.sid(); - let mut tx = s.msg_tx(); - for m in s.to_receive.drain(pos..pos + 1) { - info!(?sid, ? m.mid, "received message"); - //TODO: I dislike that block_on here! - block_on(async { - if let Err(err) = tx.send(m).await { - error!( - ?err, - "cannot notify that message was received, probably stream \ - is already closed" - ); - }; - }); - } - } - } - if !found { - error!("couldn't find stream with mid: {}", id); - } - }, - Frame::Raw(data) => { - info!("Got a Raw Package {:?}", data); - }, + .as_bytes() + .to_vec(), + )) + .await + .unwrap(); + frame_sender.send(Frame::Shutdown {}).await.unwrap(); + *self.send_state.write().await = ChannelState::Shutdown; + } + return Err(()); } + Ok(()) } - // This function will tick all streams according to priority and add them to the - // send queue - fn tick_streams(&mut self) { - //ignoring prio for now - //TODO: fix prio - for s in &mut self.streams { - let mut remove = false; - let sid = s.sid(); - if let Some(m) = s.to_send.front_mut() { - let to_send = std::cmp::min(m.buffer.data.len() as u64 - m.cursor, 1400); - if to_send > 0 { - if m.cursor == 0 { - let mid = s.mid_pool.next(); - m.mid = Some(mid); - self.send_queue.push_back(Frame::DataHeader { - mid, - sid, - length: m.buffer.data.len() as u64, - }); - } - self.send_queue.push_back(Frame::Data { - id: m.mid.unwrap(), - start: m.cursor, - data: m.buffer.data[m.cursor as usize..(m.cursor + to_send) as usize] - .to_vec(), - }); - }; - m.cursor += to_send; - if m.cursor == m.buffer.data.len() as u64 { - remove = true; - debug!(?m.mid, "finish message") - } - } - if remove { - s.to_send.pop_front(); + pub(crate) async fn send_handshake(&self, part_in_sender: &mut mpsc::UnboundedSender) { + part_in_sender + .send(Frame::Handshake { + magic_number: VELOREN_MAGIC_NUMBER.to_string(), + version: VELOREN_NETWORK_VERSION, + }) + .await + .unwrap(); + *self.send_state.write().await = ChannelState::Handshake; + } + + pub(crate) async fn send_pid(&self, part_in_sender: &mut mpsc::UnboundedSender) { + part_in_sender + .send(Frame::ParticipantId { + pid: self.local_pid, + }) + .await + .unwrap(); + *self.send_state.write().await = ChannelState::Pid; + } + /* + pub async fn run(&mut self) { + //let (incomming_sender, incomming_receiver) = mpsc::unbounded(); + futures::join!(self.listen_manager(), self.send_outgoing()); + } + + pub async fn listen_manager(&self) { + let (mut listen_sender, mut listen_receiver) = mpsc::unbounded::
(); + + while self.closed.load(Ordering::Relaxed) { + while let Some(address) = listen_receiver.next().await { + let (end_sender, end_receiver) = oneshot::channel::<()>(); + task::spawn(channel_creator(address, end_receiver)); } } } - fn wrong_shutdown(&mut self, raw: &[u8]) { - #[cfg(debug_assertions)] - { - debug!("sending client instructions before killing"); - self.send_queue.push_back(Frame::Raw(raw.to_vec())); - self.send_queue.push_back(Frame::Shutdown {}); - self.send_shutdown = true; + pub async fn send_outgoing(&self) { + //let prios = prios::PrioManager; + while self.closed.load(Ordering::Relaxed) { + } - } - - pub(crate) fn open_stream( - &mut self, - sid: Sid, - prio: u8, - promises: EnumSet, - msg_tx: futures::channel::mpsc::UnboundedSender, - ) { - // validate promises - trace!(?sid, "going to open a new stream"); - let stream = IntStream::new(sid, prio, promises.clone(), msg_tx); - trace!(?sid, "1"); - self.streams.push(stream); - trace!(?sid, "2"); - trace!(?self.streams, ?self.randomno, "2b"); - if self.streams.len() >= 0 { - // breakpoint here - let a = self.streams.len(); - if a > 1000 { - //this will never happen but is a blackbox to catch a - panic!("dasd"); - } - } - self.send_queue.push_back(Frame::OpenStream { - sid, - prio, - promises, - }); - } - - pub(crate) fn close_stream(&mut self, sid: Sid) { - trace!(?self.streams, "--CLOSE STREAM-- going to modify streams"); - self.streams.retain(|stream| stream.sid() != sid); - trace!(?self.streams, "--CLOSE STREAM-- did to modify streams"); - self.send_queue.push_back(Frame::CloseStream { sid }); - } - - pub(crate) fn handshake(&mut self) { - self.send_queue.push_back(Frame::Handshake { - magic_number: VELOREN_MAGIC_NUMBER.to_string(), - version: VELOREN_NETWORK_VERSION, - }); - self.send_handshake = true; - } - - pub(crate) fn shutdown(&mut self) { - self.send_queue.push_back(Frame::Shutdown {}); - self.send_shutdown = true; - } - - pub(crate) fn send(&mut self, outgoing: OutGoingMessage) { - trace!(?outgoing.sid, "3"); - trace!(?self.streams, ?self.randomno, "3b"); - - for s in self.streams.iter_mut() { - if s.sid() == outgoing.sid { - s.to_send.push_back(outgoing); - return; - } - } - trace!(?outgoing.sid, "4"); - let sid = &outgoing.sid; - error!(?sid, "couldn't send message, didn't found sid") - } - - pub(crate) fn get_protocol(&self) -> &ChannelProtocols { &self.protocol } + }*/ } diff --git a/network/src/controller.rs b/network/src/controller.rs deleted file mode 100644 index ce9bf2dcc6..0000000000 --- a/network/src/controller.rs +++ /dev/null @@ -1,178 +0,0 @@ -/* - Most of the internals take place in it's own worker-thread. - This folder contains all this outsourced calculation. - This controller contains the interface to communicate with the thread, - communication is done via channels. -*/ -use crate::{ - api::Stream, - metrics::NetworkMetrics, - types::{CtrlMsg, Pid, RtrnMsg, Sid}, - worker::Worker, -}; -use mio::{self, Poll, PollOpt, Ready, Token}; -use mio_extras::channel; -use std::{ - collections::HashMap, - sync::{mpsc, Arc, RwLock, RwLockReadGuard}, -}; -use tlid; -use tracing::*; -use uvth::ThreadPool; - -pub struct ControllerParticipant { - pub sid_pool: RwLock>>, - //TODO: move this in a future aware variant! via futures Channels - stream_open_tx: mpsc::Sender, - pub stream_open_rx: mpsc::Receiver, - pub stream_close_txs: RwLock>>, -} - -/* - The MioWorker runs in it's own thread, - it has a given set of Channels to work with. - It is monitored, and when it's thread is fully loaded it can be splitted up into 2 MioWorkers -*/ -pub struct Controller { - ctrl_tx: channel::Sender, - rtrn_rx: mpsc::Receiver, - - participant_connect_tx: mpsc::Sender, - participant_connect_rx: mpsc::Receiver, - - participants: RwLock>, -} - -impl Controller { - pub const CTRL_TOK: Token = Token(0); - - pub fn new( - wid: u64, - pid: uuid::Uuid, - thread_pool: Arc, - mut token_pool: tlid::Pool>, - metrics: Arc>, - sid_backup_per_participant: Arc>>>>, - ) -> Self { - let poll = Arc::new(Poll::new().unwrap()); - - let (ctrl_tx, ctrl_rx) = channel::channel(); - let (rtrn_tx, rtrn_rx) = mpsc::channel(); - let (participant_connect_tx, participant_connect_rx) = mpsc::channel(); - poll.register(&ctrl_rx, Self::CTRL_TOK, Ready::readable(), PollOpt::edge()) - .unwrap(); - // reserve 10 tokens in case they start with 0, //TODO: cleaner method - for _ in 0..10 { - token_pool.next(); - } - - thread_pool.execute(move || { - let w = wid; - let span = span!(Level::INFO, "worker", ?w); - let _enter = span.enter(); - let mut worker = Worker::new( - pid, - poll, - metrics, - sid_backup_per_participant, - token_pool, - ctrl_rx, - rtrn_tx, - ); - worker.run(); - }); - let participants = RwLock::new(HashMap::new()); - Controller { - ctrl_tx, - rtrn_rx, - participant_connect_rx, - participant_connect_tx, - participants, - } - } - - //TODO: split 4->5 MioWorkers and merge 5->4 MioWorkers - - pub(crate) fn get_tx(&self) -> channel::Sender { self.ctrl_tx.clone() } - - pub(crate) fn get_participant_connect_rx(&self) -> &mpsc::Receiver { - &self.participant_connect_rx - } - - pub(crate) fn tick(&self) { - for msg in self.rtrn_rx.try_iter() { - match msg { - /*TODO: WAIT, THIS ASSUMES CONNECTED PARTICIPANT IS ONLY EVER TRIGGERED ONCE PER CONTROLLER - that means, that it can happen multiple time for the same participant on multiple controller, - and even multiple channel on one worker shouldn't trigger it*/ - RtrnMsg::ConnectedParticipant { - pid, - controller_sids, - } => { - let mut parts = self.participants.write().unwrap(); - debug!( - ?pid, - "A new participant connected to this channel, we assign it the sid pool" - ); - let (stream_open_tx, stream_open_rx) = mpsc::channel(); - let part = ControllerParticipant { - sid_pool: RwLock::new(controller_sids), - stream_open_tx, - stream_open_rx, - stream_close_txs: RwLock::new(HashMap::new()), - }; - parts.insert(pid.clone(), part); - self.participant_connect_tx.send(pid).unwrap(); - }, - RtrnMsg::OpendStream { - pid, - sid, - prio: _, - msg_rx, - promises: _, - } => { - trace!( - ?pid, - ?sid, - "A new stream was opened on this channel, we assign it the participant" - ); - let parts = self.participants.read().unwrap(); - if let Some(p) = parts.get(&pid) { - let (stream_close_tx, stream_close_rx) = mpsc::channel(); - p.stream_close_txs - .write() - .unwrap() - .insert(sid, stream_close_tx); - p.stream_open_tx - .send(Stream::new( - sid, - pid, - stream_close_rx, - msg_rx, - self.ctrl_tx.clone(), - )) - .unwrap(); - } - }, - RtrnMsg::ClosedStream { pid, sid } => { - trace!(?pid, ?sid, "Stream got closeed, will route message"); - let parts = self.participants.read().unwrap(); - if let Some(p) = parts.get(&pid) { - if let Some(tx) = p.stream_close_txs.read().unwrap().get(&sid) { - tx.send(()).unwrap(); - trace!(?pid, ?sid, "routed message"); - } - } - }, - _ => {}, - } - } - } - - pub(crate) fn participants(&self) -> RwLockReadGuard> { - self.participants.read().unwrap() - } -} -impl Drop for Controller { - fn drop(&mut self) { let _ = self.ctrl_tx.send(CtrlMsg::Shutdown); } -} diff --git a/network/src/frames.rs b/network/src/frames.rs new file mode 100644 index 0000000000..37c4e956dd --- /dev/null +++ b/network/src/frames.rs @@ -0,0 +1,37 @@ +use crate::types::{Mid, Pid, Prio, Promises, Sid}; +use serde::{Deserialize, Serialize}; + +// Used for Communication between Channel <----(TCP/UDP)----> Channel +#[derive(Serialize, Deserialize, Debug)] +pub enum Frame { + Handshake { + magic_number: String, + version: [u32; 3], + }, + ParticipantId { + pid: Pid, + }, + Shutdown, /* Shutsdown this channel gracefully, if all channels are shut down, Participant + * is deleted */ + OpenStream { + sid: Sid, + prio: Prio, + promises: Promises, + }, + CloseStream { + sid: Sid, + }, + DataHeader { + mid: Mid, + sid: Sid, + length: u64, + }, + Data { + id: Mid, + start: u64, + data: Vec, + }, + /* WARNING: Sending RAW is only used for debug purposes in case someone write a new API + * against veloren Server! */ + Raw(Vec), +} diff --git a/network/src/lib.rs b/network/src/lib.rs index 943dc9679f..65ac2c542f 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -1,16 +1,27 @@ #![feature(trait_alias)] mod api; +mod async_serde; mod channel; -mod controller; +mod frames; mod message; mod metrics; mod mpsc; +mod participant; mod prios; +mod scheduler; mod tcp; mod types; mod udp; -mod worker; +pub use api::{Address, Network}; +pub use scheduler::Scheduler; +pub use types::{ + Pid, Promises, PROMISES_COMPRESSED, PROMISES_CONSISTENCY, PROMISES_ENCRYPTED, + PROMISES_GUARANTEED_DELIVERY, PROMISES_NONE, PROMISES_ORDERED, +}; + +/* pub use api::{ Address, Network, NetworkError, Participant, ParticipantError, Promise, Stream, StreamError, }; +*/ diff --git a/network/src/message.rs b/network/src/message.rs index 1d4de83202..9aec484321 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -16,7 +16,7 @@ pub(crate) struct MessageBuffer { pub(crate) struct OutGoingMessage { pub buffer: Arc, pub cursor: u64, - pub mid: Option, + pub mid: Mid, pub sid: Sid, } diff --git a/network/src/metrics.rs b/network/src/metrics.rs index 71cb59fca8..8b13789179 100644 --- a/network/src/metrics.rs +++ b/network/src/metrics.rs @@ -1,144 +1 @@ -use prometheus::{IntGauge, IntGaugeVec, Opts, Registry}; -use std::{ - error::Error, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, -}; -// 1 NetworkMetrics per Network -pub struct NetworkMetrics { - pub participants_connected: IntGauge, - pub channels_connected: IntGauge, - pub streams_open: IntGauge, - pub worker_count: IntGauge, - pub network_info: IntGauge, - // Frames, seperated by CHANNEL (add PART and PROTOCOL) AND FRAME TYPE, - pub frames_count: IntGaugeVec, - // send Messages, seperated by STREAM (add PART and PROTOCOL, CHANNEL), - pub message_count: IntGaugeVec, - // send Messages bytes, seperated by STREAM (add PART and PROTOCOL, CHANNEL), - pub bytes_send: IntGaugeVec, - // queued Messages, seperated by STREAM (add PART and PROTOCOL, CHANNEL), - pub queue_count: IntGaugeVec, - // worker seperated by CHANNEL (add PART and PROTOCOL), - pub worker_work_time: IntGaugeVec, - // worker seperated by CHANNEL (add PART and PROTOCOL), - pub worker_idle_time: IntGaugeVec, - // ping calculated based on last msg - pub participants_ping: IntGaugeVec, - tick: Arc, -} - -impl NetworkMetrics { - pub fn new(registry: &Registry, tick: Arc) -> Result> { - let participants_connected = IntGauge::with_opts(Opts::new( - "participants_connected", - "shows the number of participants connected to the network", - ))?; - let channels_connected = IntGauge::with_opts(Opts::new( - "channels_connected", - "number of all channels currently connected on the network", - ))?; - let streams_open = IntGauge::with_opts(Opts::new( - "streams_open", - "number of all streams currently open on the network", - ))?; - let worker_count = IntGauge::with_opts(Opts::new( - "worker_count", - "number of workers currently running", - ))?; - let opts = Opts::new("network_info", "Static Network information").const_label( - "version", - &format!( - "{}.{}.{}", - &crate::types::VELOREN_NETWORK_VERSION[0], - &crate::types::VELOREN_NETWORK_VERSION[1], - &crate::types::VELOREN_NETWORK_VERSION[2] - ), - ); - let network_info = IntGauge::with_opts(opts)?; - - let frames_count = IntGaugeVec::from(IntGaugeVec::new( - Opts::new( - "frames_count", - "time in ns requiered for a tick of the server", - ), - &["channel"], - )?); - let message_count = IntGaugeVec::from(IntGaugeVec::new( - Opts::new( - "message_count", - "time in ns requiered for a tick of the server", - ), - &["channel"], - )?); - let bytes_send = IntGaugeVec::from(IntGaugeVec::new( - Opts::new( - "bytes_send", - "time in ns requiered for a tick of the server", - ), - &["channel"], - )?); - let queue_count = IntGaugeVec::from(IntGaugeVec::new( - Opts::new( - "queue_count", - "time in ns requiered for a tick of the server", - ), - &["channel"], - )?); - let worker_work_time = IntGaugeVec::from(IntGaugeVec::new( - Opts::new( - "worker_work_time", - "time in ns requiered for a tick of the server", - ), - &["channel"], - )?); - let worker_idle_time = IntGaugeVec::from(IntGaugeVec::new( - Opts::new( - "worker_idle_time", - "time in ns requiered for a tick of the server", - ), - &["channel"], - )?); - let participants_ping = IntGaugeVec::from(IntGaugeVec::new( - Opts::new( - "participants_ping", - "time in ns requiered for a tick of the server", - ), - &["channel"], - )?); - - registry.register(Box::new(participants_connected.clone()))?; - registry.register(Box::new(channels_connected.clone()))?; - registry.register(Box::new(streams_open.clone()))?; - registry.register(Box::new(worker_count.clone()))?; - registry.register(Box::new(network_info.clone()))?; - registry.register(Box::new(frames_count.clone()))?; - registry.register(Box::new(message_count.clone()))?; - registry.register(Box::new(bytes_send.clone()))?; - registry.register(Box::new(queue_count.clone()))?; - registry.register(Box::new(worker_work_time.clone()))?; - registry.register(Box::new(worker_idle_time.clone()))?; - registry.register(Box::new(participants_ping.clone()))?; - - Ok(Self { - participants_connected, - channels_connected, - streams_open, - worker_count, - network_info, - frames_count, - message_count, - bytes_send, - queue_count, - worker_work_time, - worker_idle_time, - participants_ping, - tick, - }) - } - - pub fn _is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 } -} diff --git a/network/src/mpsc.rs b/network/src/mpsc.rs index 598bc3d092..8b13789179 100644 --- a/network/src/mpsc.rs +++ b/network/src/mpsc.rs @@ -1,84 +1 @@ -use crate::{channel::ChannelProtocol, types::Frame}; -use lazy_static::lazy_static; // 1.4.0 -use mio_extras::channel::{Receiver, Sender}; -use std::{ - collections::HashMap, - sync::{Mutex, RwLock}, -}; -use tracing::*; -lazy_static! { - pub(crate) static ref MPSC_REGISTRY: RwLock, Receiver)>>> = - RwLock::new(HashMap::new()); -} - -pub(crate) struct MpscChannel { - endpoint_sender: Sender, - endpoint_receiver: Receiver, -} - -impl MpscChannel { - pub fn new(endpoint_sender: Sender, endpoint_receiver: Receiver) -> Self { - Self { - endpoint_sender, - endpoint_receiver, - } - } -} - -impl ChannelProtocol for MpscChannel { - type Handle = Receiver; - - /// Execute when ready to read - fn read(&mut self) -> Vec { - let mut result = Vec::new(); - loop { - match self.endpoint_receiver.try_recv() { - Ok(frame) => { - trace!("incomming message"); - result.push(frame); - }, - Err(std::sync::mpsc::TryRecvError::Empty) => { - debug!("read would block"); - break; - }, - Err(std::sync::mpsc::TryRecvError::Disconnected) => { - trace!(?self, "shutdown of mpsc channel detected"); - result.push(Frame::Shutdown); - break; - }, - }; - } - result - } - - fn write>(&mut self, frames: &mut I) { - for frame in frames { - match self.endpoint_sender.send(frame) { - Ok(()) => { - trace!("sended"); - }, - Err(mio_extras::channel::SendError::Io(e)) - if e.kind() == std::io::ErrorKind::WouldBlock => - { - debug!("write would block"); - return; - } - Err(mio_extras::channel::SendError::Disconnected(frame)) => { - trace!(?frame, ?self, "shutdown of mpsc channel detected"); - return; - }, - Err(e) => { - panic!("{}", e); - }, - }; - } - } - - fn get_handle(&self) -> &Self::Handle { &self.endpoint_receiver } -} - -impl std::fmt::Debug for MpscChannel { - #[inline] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", "MPSC") } -} diff --git a/network/src/participant.rs b/network/src/participant.rs new file mode 100644 index 0000000000..e693d52c55 --- /dev/null +++ b/network/src/participant.rs @@ -0,0 +1,294 @@ +use crate::{ + api::Stream, + frames::Frame, + message::{InCommingMessage, MessageBuffer, OutGoingMessage}, + types::{Cid, Pid, Prio, Promises, Sid}, +}; +use async_std::sync::RwLock; +use futures::{ + channel::{mpsc, oneshot}, + sink::SinkExt, + stream::StreamExt, +}; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; +use tracing::*; + +#[derive(Debug)] +struct ControlChannels { + stream_open_receiver: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender)>, + stream_opened_sender: mpsc::UnboundedSender, + transfer_channel_receiver: mpsc::UnboundedReceiver<(Cid, mpsc::UnboundedSender)>, + frame_recv_receiver: mpsc::UnboundedReceiver, + shutdown_api_receiver: mpsc::UnboundedReceiver, + shutdown_api_sender: mpsc::UnboundedSender, + send_outgoing: Arc>>, //api + frame_send_receiver: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>, //scheduler +} + +#[derive(Debug)] +pub struct BParticipant { + remote_pid: Pid, + offset_sid: Sid, + channels: RwLock)>>, + streams: RwLock< + HashMap< + Sid, + ( + Prio, + Promises, + mpsc::UnboundedSender, + oneshot::Sender<()>, + ), + >, + >, + run_channels: Option, +} + +impl BParticipant { + pub(crate) fn new( + remote_pid: Pid, + offset_sid: Sid, + send_outgoing: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, + ) -> ( + Self, + mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender)>, + mpsc::UnboundedReceiver, + mpsc::UnboundedSender<(Cid, mpsc::UnboundedSender)>, + mpsc::UnboundedSender, + mpsc::UnboundedSender<(Pid, Sid, Frame)>, + ) { + let (stream_open_sender, stream_open_receiver) = + mpsc::unbounded::<(Prio, Promises, oneshot::Sender)>(); + let (stream_opened_sender, stream_opened_receiver) = mpsc::unbounded::(); + let (transfer_channel_sender, transfer_channel_receiver) = + mpsc::unbounded::<(Cid, mpsc::UnboundedSender)>(); + let (frame_recv_sender, frame_recv_receiver) = mpsc::unbounded::(); + //let (shutdown1_sender, shutdown1_receiver) = oneshot::channel(); + let (shutdown_api_sender, shutdown_api_receiver) = mpsc::unbounded(); + let (frame_send_sender, frame_send_receiver) = mpsc::unbounded::<(Pid, Sid, Frame)>(); + + let run_channels = Some(ControlChannels { + stream_open_receiver, + stream_opened_sender, + transfer_channel_receiver, + frame_recv_receiver, + //shutdown_sender: shutdown1_sender, + shutdown_api_receiver, + shutdown_api_sender, + send_outgoing: Arc::new(Mutex::new(send_outgoing)), + frame_send_receiver, + }); + + ( + Self { + remote_pid, + offset_sid, + channels: RwLock::new(vec![]), + streams: RwLock::new(HashMap::new()), + run_channels, + }, + stream_open_sender, + stream_opened_receiver, + transfer_channel_sender, + frame_recv_sender, + frame_send_sender, + //shutdown1_receiver, + ) + } + + pub async fn run(mut self) { + let run_channels = self.run_channels.take().unwrap(); + futures::join!( + self.transfer_channel_manager(run_channels.transfer_channel_receiver), + self.open_manager( + run_channels.stream_open_receiver, + run_channels.shutdown_api_sender.clone(), + run_channels.send_outgoing.clone(), + ), + self.handle_frames( + run_channels.frame_recv_receiver, + run_channels.stream_opened_sender, + run_channels.shutdown_api_sender, + run_channels.send_outgoing.clone(), + ), + self.send_manager(run_channels.frame_send_receiver), + self.shutdown_manager(run_channels.shutdown_api_receiver,), + ); + } + + async fn send_frame(&self, frame: Frame) { + // find out ideal channel + //TODO: just take first + if let Some((_cid, channel)) = self.channels.write().await.get_mut(0) { + channel.send(frame).await.unwrap(); + } else { + error!("participant has no channel to communicate on"); + } + } + + async fn handle_frames( + &self, + mut frame_recv_receiver: mpsc::UnboundedReceiver, + mut stream_opened_sender: mpsc::UnboundedSender, + shutdown_api_sender: mpsc::UnboundedSender, + send_outgoing: Arc>>, + ) { + trace!("start handle_frames"); + let send_outgoing = { send_outgoing.lock().unwrap().clone() }; + let mut messages = HashMap::new(); + while let Some(frame) = frame_recv_receiver.next().await { + debug!("handling frame"); + match frame { + Frame::OpenStream { + sid, + prio, + promises, + } => { + let send_outgoing = send_outgoing.clone(); + let stream = self + .create_stream(sid, prio, promises, send_outgoing, &shutdown_api_sender) + .await; + stream_opened_sender.send(stream).await.unwrap(); + trace!("opened frame from remote"); + }, + Frame::CloseStream { sid } => { + if let Some((_, _, _, sender)) = self.streams.write().await.remove(&sid) { + sender.send(()).unwrap(); + } else { + error!("unreachable, coudln't send close stream event!"); + } + trace!("closed frame from remote"); + }, + Frame::DataHeader { mid, sid, length } => { + let imsg = InCommingMessage { + buffer: MessageBuffer { data: Vec::new() }, + length, + mid, + sid, + }; + messages.insert(mid, imsg); + }, + Frame::Data { + id, + start: _, + mut data, + } => { + let finished = if let Some(imsg) = messages.get_mut(&id) { + imsg.buffer.data.append(&mut data); + imsg.buffer.data.len() as u64 == imsg.length + } else { + false + }; + if finished { + debug!(?id, "finished receiving message"); + let imsg = messages.remove(&id).unwrap(); + if let Some((_, _, sender, _)) = + self.streams.write().await.get_mut(&imsg.sid) + { + sender.send(imsg).await.unwrap(); + } + } + }, + _ => unreachable!("never reaches frame!"), + } + } + trace!("stop handle_frames"); + } + + async fn send_manager( + &self, + mut frame_send_receiver: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>, + ) { + trace!("start send_manager"); + while let Some((_, _, frame)) = frame_send_receiver.next().await { + self.send_frame(frame).await; + } + trace!("stop send_manager"); + } + + async fn transfer_channel_manager( + &self, + mut transfer_channel_receiver: mpsc::UnboundedReceiver<(Cid, mpsc::UnboundedSender)>, + ) { + trace!("start transfer_channel_manager"); + while let Some((cid, sender)) = transfer_channel_receiver.next().await { + debug!(?cid, "got a new channel to listen on"); + self.channels.write().await.push((cid, sender)); + } + trace!("stop transfer_channel_manager"); + } + + async fn open_manager( + &self, + mut stream_open_receiver: mpsc::UnboundedReceiver<( + Prio, + Promises, + oneshot::Sender, + )>, + shutdown_api_sender: mpsc::UnboundedSender, + send_outgoing: Arc>>, + ) { + trace!("start open_manager"); + let send_outgoing = { + //fighting the borrow checker ;) + send_outgoing.lock().unwrap().clone() + }; + let mut stream_ids = self.offset_sid; + while let Some((prio, promises, sender)) = stream_open_receiver.next().await { + debug!(?prio, ?promises, "got request to open a new steam"); + let send_outgoing = send_outgoing.clone(); + let sid = stream_ids; + let stream = self + .create_stream(sid, prio, promises, send_outgoing, &shutdown_api_sender) + .await; + self.send_frame(Frame::OpenStream { + sid, + prio, + promises, + }) + .await; + sender.send(stream).unwrap(); + stream_ids += 1; + } + trace!("stop open_manager"); + } + + async fn shutdown_manager(&self, mut shutdown_api_receiver: mpsc::UnboundedReceiver) { + trace!("start shutdown_manager"); + while let Some(sid) = shutdown_api_receiver.next().await { + trace!(?sid, "got request to close steam"); + self.streams.write().await.remove(&sid); + self.send_frame(Frame::CloseStream { sid }).await; + } + trace!("stop shutdown_manager"); + } + + async fn create_stream( + &self, + sid: Sid, + prio: Prio, + promises: Promises, + send_outgoing: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, + shutdown_api_sender: &mpsc::UnboundedSender, + ) -> Stream { + let (msg_recv_sender, msg_recv_receiver) = mpsc::unbounded::(); + let (shutdown1_sender, shutdown1_receiver) = oneshot::channel(); + self.streams + .write() + .await + .insert(sid, (prio, promises, msg_recv_sender, shutdown1_sender)); + Stream::new( + self.remote_pid, + sid, + prio, + promises, + send_outgoing, + msg_recv_receiver, + shutdown1_receiver, + shutdown_api_sender.clone(), + ) + } +} diff --git a/network/src/prios.rs b/network/src/prios.rs index 9abb6d3305..eaccb435d2 100644 --- a/network/src/prios.rs +++ b/network/src/prios.rs @@ -1,65 +1,29 @@ -/* - -This will become a single class, -it contains a list of all open Channels and all Participants and all streams. -Important, we need to change stream ids to be unique per participant -and msg ids need to be unique per participant too. The other way would be to always send sid with Data Frame but this is to much overhead. - -We need a external (like timer like) Future that opens a thread in threadpool, and is Ready once serialized - -We should focus in this implementation on the routing side, Prio and choosing the correct Protocol. -A Message should be delivered over 2 Channels, e.g. Create Info via TCP and data via UDP. keep in mind that UDP might be read before TCP is read... - -maybe even a future that builds together a message from incremental steps. - -Or a future that sends a message, however on each seend prio needs to be considered, maybe overkill. - - -it should be quite easy as all is in one thread now, but i am still not sure if its in the same as the network, or if we still have a sperate one, -probably start with a seperate thread for now. - -Focus on the routing for now, and ignoring protocols and details... -*/ - /* Priorities are handled the following way. Prios from 0-63 are allowed. all 5 numbers the throughput i halved. E.g. in the same time 100 prio0 messages are send, only 50 prio5, 25 prio10, 12 prio15 or 6 prio20 messages are send. -Node: TODO: prio0 will be send immeadiatly when found! +Note: TODO: prio0 will be send immeadiatly when found! */ -/* -algo: -let past = [u64, 100] = [0,0,0,0..] -send_prio0() -past[0] += 100; -#check_next_prio -if past[0] - past[1] > prio_numbers[1] { - sendprio1(); - past[1] += 100; - if past[0] - past[2] > prio_numbers[2] { - sendprio2(); - past[2] += 100; - } -} - - -*/ - -use crate::{message::OutGoingMessage, types::Frame}; +use crate::{ + frames::Frame, + message::OutGoingMessage, + types::{Pid, Prio, Sid}, +}; use std::{ collections::{HashSet, VecDeque}, sync::mpsc::{channel, Receiver, Sender}, }; +use tracing::*; + const PRIO_MAX: usize = 64; -struct PrioManager { +pub(crate) struct PrioManager { points: [u32; PRIO_MAX], - messages: [VecDeque; PRIO_MAX], - messages_tx: Sender<(u8, OutGoingMessage)>, - messages_rx: Receiver<(u8, OutGoingMessage)>, + messages: [VecDeque<(Pid, Sid, OutGoingMessage)>; PRIO_MAX], + messages_rx: Receiver<(Prio, Pid, Sid, OutGoingMessage)>, queued: HashSet, } @@ -73,89 +37,91 @@ impl PrioManager { 310419, 356578, 409600, 470507, 540470, 620838, ]; - pub fn new() -> Self { + pub fn new() -> (Self, Sender<(Prio, Pid, Sid, OutGoingMessage)>) { let (messages_tx, messages_rx) = channel(); - Self { - points: [0; PRIO_MAX], - messages: [ - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - VecDeque::new(), - ], + ( + Self { + points: [0; PRIO_MAX], + messages: [ + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + ], + messages_rx, + queued: HashSet::new(), //TODO: optimize with u64 and 64 bits + }, messages_tx, - messages_rx, - queued: HashSet::new(), //TODO: optimize with u64 and 64 bits - } + ) } fn tick(&mut self) { // Check Range - for (prio, msg) in self.messages_rx.try_iter() { + for (prio, pid, sid, msg) in self.messages_rx.try_iter() { debug_assert!(prio as usize <= PRIO_MAX); - println!("tick {}", prio); + trace!(?prio, ?sid, ?pid, "tick"); self.queued.insert(prio); - self.messages[prio as usize].push_back(msg); + self.messages[prio as usize].push_back((pid, sid, msg)); } } @@ -178,30 +144,30 @@ impl PrioManager { } /// returns if msg is empty - fn tick_msg>(msg: &mut OutGoingMessage, frames: &mut E) -> bool { + fn tick_msg>( + msg: &mut OutGoingMessage, + msg_pid: Pid, + msg_sid: Sid, + frames: &mut E, + ) -> bool { let to_send = std::cmp::min( msg.buffer.data.len() as u64 - msg.cursor, Self::FRAME_DATA_SIZE, ); if to_send > 0 { if msg.cursor == 0 { - //TODO: OutGoingMessage MUST HAVE A MID AT THIS POINT ALREADY! AS I HAVE NO - // IDEA OF STREAMS HERE! - debug_assert!(msg.mid.is_some()); - frames.extend(std::iter::once(Frame::DataHeader { - mid: msg - .mid - .expect("read comment 3 lines above this error message 41231255661"), + frames.extend(std::iter::once((msg_pid, msg_sid, Frame::DataHeader { + mid: msg.mid, sid: msg.sid, length: msg.buffer.data.len() as u64, - })); + }))); } - frames.extend(std::iter::once(Frame::Data { - id: msg.mid.unwrap(), + frames.extend(std::iter::once((msg_pid, msg_sid, Frame::Data { + id: msg.mid, start: msg.cursor, data: msg.buffer.data[msg.cursor as usize..(msg.cursor + to_send) as usize] .to_vec(), - })); + }))); }; msg.cursor += to_send; msg.cursor >= msg.buffer.data.len() as u64 @@ -216,26 +182,30 @@ impl PrioManager { /// high prio messages! /// - if no_of_frames is too low you wont saturate your Socket fully, thus /// have a lower bandwidth as possible - pub fn fill_frames>(&mut self, no_of_frames: usize, frames: &mut E) { + pub fn fill_frames>( + &mut self, + no_of_frames: usize, + frames: &mut E, + ) { self.tick(); for _ in 0..no_of_frames { match self.calc_next_prio() { Some(prio) => { - println!("dasd {}", prio); + trace!(?prio, "handle next prio"); self.points[prio as usize] += Self::PRIOS[prio as usize]; //pop message from front of VecDeque, handle it and push it back, so that all // => messages with same prio get a fair chance :) //TODO: evalaute not poping every time match self.messages[prio as usize].pop_front() { - Some(mut msg) => { - if Self::tick_msg(&mut msg, frames) { + Some((pid, sid, mut msg)) => { + if Self::tick_msg(&mut msg, pid, sid, frames) { //debug!(?m.mid, "finish message"); //check if prio is empty if self.messages[prio as usize].is_empty() { self.queued.remove(&prio); } } else { - self.messages[prio as usize].push_back(msg); + self.messages[prio as usize].push_back((pid, sid, msg)); //trace!(?m.mid, "repush message"); } }, @@ -251,47 +221,60 @@ impl PrioManager { } } } +} - pub fn get_tx(&self) -> &Sender<(u8, OutGoingMessage)> { &self.messages_tx } +impl std::fmt::Debug for PrioManager { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut cnt = 0; + for m in self.messages.iter() { + cnt += m.len(); + } + write!(f, "PrioManager(len: {}, queued: {:?})", cnt, &self.queued,) + } } #[cfg(test)] mod tests { use crate::{ + frames::Frame, message::{MessageBuffer, OutGoingMessage}, prios::*, - types::{Frame, Mid, Sid}, + types::{Pid, Prio, Sid}, }; use std::{collections::VecDeque, sync::Arc}; - fn mock_out(prio: u8, sid: Sid) -> (u8, OutGoingMessage) { - (prio, OutGoingMessage { + const SIZE: u64 = PrioManager::FRAME_DATA_SIZE; + const USIZE: usize = PrioManager::FRAME_DATA_SIZE as usize; + + fn mock_out(prio: Prio, sid: Sid) -> (Prio, Pid, Sid, OutGoingMessage) { + (prio, Pid::fake(0), sid, OutGoingMessage { buffer: Arc::new(MessageBuffer { data: vec![48, 49, 50], }), cursor: 0, - mid: Some(1), + mid: 1, sid, }) } - fn mock_out_large(prio: u8, sid: Sid) -> (u8, OutGoingMessage) { - const MSG_SIZE: usize = PrioManager::FRAME_DATA_SIZE as usize; - let mut data = vec![48; MSG_SIZE]; - data.append(&mut vec![49; MSG_SIZE]); + fn mock_out_large(prio: Prio, sid: Sid) -> (Prio, Pid, Sid, OutGoingMessage) { + let mut data = vec![48; USIZE]; + data.append(&mut vec![49; USIZE]); data.append(&mut vec![50; 20]); - (prio, OutGoingMessage { + (prio, Pid::fake(0), sid, OutGoingMessage { buffer: Arc::new(MessageBuffer { data }), cursor: 0, - mid: Some(1), + mid: 1, sid, }) } - fn assert_header(frames: &mut VecDeque, f_sid: Sid, f_length: u64) { + fn assert_header(frames: &mut VecDeque<(Pid, Sid, Frame)>, f_sid: Sid, f_length: u64) { let frame = frames .pop_front() - .expect("frames vecdeque doesn't contain enough frames!"); + .expect("frames vecdeque doesn't contain enough frames!") + .2; if let Frame::DataHeader { mid, sid, length } = frame { assert_eq!(mid, 1); assert_eq!(sid, f_sid); @@ -301,10 +284,11 @@ mod tests { } } - fn assert_data(frames: &mut VecDeque, f_start: u64, f_data: Vec) { + fn assert_data(frames: &mut VecDeque<(Pid, Sid, Frame)>, f_start: u64, f_data: Vec) { let frame = frames .pop_front() - .expect("frames vecdeque doesn't contain enough frames!"); + .expect("frames vecdeque doesn't contain enough frames!") + .2; if let Frame::Data { id, start, data } = frame { assert_eq!(id, 1); assert_eq!(start, f_start); @@ -316,8 +300,8 @@ mod tests { #[test] fn single_p16() { - let mut mgr = PrioManager::new(); - mgr.get_tx().send(mock_out(16, 1337)).unwrap(); + let (mut mgr, tx) = PrioManager::new(); + tx.send(mock_out(16, 1337)).unwrap(); let mut frames = VecDeque::new(); mgr.fill_frames(100, &mut frames); @@ -328,9 +312,9 @@ mod tests { #[test] fn single_p16_p20() { - let mut mgr = PrioManager::new(); - mgr.get_tx().send(mock_out(16, 1337)).unwrap(); - mgr.get_tx().send(mock_out(20, 42)).unwrap(); + let (mut mgr, tx) = PrioManager::new(); + tx.send(mock_out(16, 1337)).unwrap(); + tx.send(mock_out(20, 42)).unwrap(); let mut frames = VecDeque::new(); mgr.fill_frames(100, &mut frames); @@ -343,9 +327,9 @@ mod tests { #[test] fn single_p20_p16() { - let mut mgr = PrioManager::new(); - mgr.get_tx().send(mock_out(20, 42)).unwrap(); - mgr.get_tx().send(mock_out(16, 1337)).unwrap(); + let (mut mgr, tx) = PrioManager::new(); + tx.send(mock_out(20, 42)).unwrap(); + tx.send(mock_out(16, 1337)).unwrap(); let mut frames = VecDeque::new(); mgr.fill_frames(100, &mut frames); @@ -358,20 +342,20 @@ mod tests { #[test] fn multiple_p16_p20() { - let mut mgr = PrioManager::new(); - mgr.get_tx().send(mock_out(20, 2)).unwrap(); - mgr.get_tx().send(mock_out(16, 1)).unwrap(); - mgr.get_tx().send(mock_out(16, 3)).unwrap(); - mgr.get_tx().send(mock_out(16, 5)).unwrap(); - mgr.get_tx().send(mock_out(20, 4)).unwrap(); - mgr.get_tx().send(mock_out(20, 7)).unwrap(); - mgr.get_tx().send(mock_out(16, 6)).unwrap(); - mgr.get_tx().send(mock_out(20, 10)).unwrap(); - mgr.get_tx().send(mock_out(16, 8)).unwrap(); - mgr.get_tx().send(mock_out(20, 12)).unwrap(); - mgr.get_tx().send(mock_out(16, 9)).unwrap(); - mgr.get_tx().send(mock_out(16, 11)).unwrap(); - mgr.get_tx().send(mock_out(20, 13)).unwrap(); + let (mut mgr, tx) = PrioManager::new(); + tx.send(mock_out(20, 2)).unwrap(); + tx.send(mock_out(16, 1)).unwrap(); + tx.send(mock_out(16, 3)).unwrap(); + tx.send(mock_out(16, 5)).unwrap(); + tx.send(mock_out(20, 4)).unwrap(); + tx.send(mock_out(20, 7)).unwrap(); + tx.send(mock_out(16, 6)).unwrap(); + tx.send(mock_out(20, 10)).unwrap(); + tx.send(mock_out(16, 8)).unwrap(); + tx.send(mock_out(20, 12)).unwrap(); + tx.send(mock_out(16, 9)).unwrap(); + tx.send(mock_out(16, 11)).unwrap(); + tx.send(mock_out(20, 13)).unwrap(); let mut frames = VecDeque::new(); mgr.fill_frames(100, &mut frames); @@ -384,20 +368,20 @@ mod tests { #[test] fn multiple_fill_frames_p16_p20() { - let mut mgr = PrioManager::new(); - mgr.get_tx().send(mock_out(20, 2)).unwrap(); - mgr.get_tx().send(mock_out(16, 1)).unwrap(); - mgr.get_tx().send(mock_out(16, 3)).unwrap(); - mgr.get_tx().send(mock_out(16, 5)).unwrap(); - mgr.get_tx().send(mock_out(20, 4)).unwrap(); - mgr.get_tx().send(mock_out(20, 7)).unwrap(); - mgr.get_tx().send(mock_out(16, 6)).unwrap(); - mgr.get_tx().send(mock_out(20, 10)).unwrap(); - mgr.get_tx().send(mock_out(16, 8)).unwrap(); - mgr.get_tx().send(mock_out(20, 12)).unwrap(); - mgr.get_tx().send(mock_out(16, 9)).unwrap(); - mgr.get_tx().send(mock_out(16, 11)).unwrap(); - mgr.get_tx().send(mock_out(20, 13)).unwrap(); + let (mut mgr, tx) = PrioManager::new(); + tx.send(mock_out(20, 2)).unwrap(); + tx.send(mock_out(16, 1)).unwrap(); + tx.send(mock_out(16, 3)).unwrap(); + tx.send(mock_out(16, 5)).unwrap(); + tx.send(mock_out(20, 4)).unwrap(); + tx.send(mock_out(20, 7)).unwrap(); + tx.send(mock_out(16, 6)).unwrap(); + tx.send(mock_out(20, 10)).unwrap(); + tx.send(mock_out(16, 8)).unwrap(); + tx.send(mock_out(20, 12)).unwrap(); + tx.send(mock_out(16, 9)).unwrap(); + tx.send(mock_out(16, 11)).unwrap(); + tx.send(mock_out(20, 13)).unwrap(); let mut frames = VecDeque::new(); mgr.fill_frames(3, &mut frames); for i in 1..4 { @@ -415,107 +399,72 @@ mod tests { #[test] fn single_large_p16() { - let mut mgr = PrioManager::new(); - mgr.get_tx().send(mock_out_large(16, 1)).unwrap(); + let (mut mgr, tx) = PrioManager::new(); + tx.send(mock_out_large(16, 1)).unwrap(); let mut frames = VecDeque::new(); mgr.fill_frames(100, &mut frames); - assert_header(&mut frames, 1, PrioManager::FRAME_DATA_SIZE * 2 + 20); - assert_data(&mut frames, 0, vec![ - 48; - PrioManager::FRAME_DATA_SIZE as usize - ]); - assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE, vec![ - 49; - PrioManager::FRAME_DATA_SIZE - as usize - ]); - assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE * 2, vec![50; 20]); + assert_header(&mut frames, 1, SIZE * 2 + 20); + assert_data(&mut frames, 0, vec![48; USIZE]); + assert_data(&mut frames, SIZE, vec![49; USIZE]); + assert_data(&mut frames, SIZE * 2, vec![50; 20]); assert!(frames.is_empty()); } #[test] fn multiple_large_p16() { - let mut mgr = PrioManager::new(); - mgr.get_tx().send(mock_out_large(16, 1)).unwrap(); - mgr.get_tx().send(mock_out_large(16, 2)).unwrap(); + let (mut mgr, tx) = PrioManager::new(); + tx.send(mock_out_large(16, 1)).unwrap(); + tx.send(mock_out_large(16, 2)).unwrap(); let mut frames = VecDeque::new(); mgr.fill_frames(100, &mut frames); - assert_header(&mut frames, 1, PrioManager::FRAME_DATA_SIZE * 2 + 20); - assert_data(&mut frames, 0, vec![ - 48; - PrioManager::FRAME_DATA_SIZE as usize - ]); - assert_header(&mut frames, 2, PrioManager::FRAME_DATA_SIZE * 2 + 20); - assert_data(&mut frames, 0, vec![ - 48; - PrioManager::FRAME_DATA_SIZE as usize - ]); - assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE, vec![ - 49; - PrioManager::FRAME_DATA_SIZE - as usize - ]); - assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE, vec![ - 49; - PrioManager::FRAME_DATA_SIZE - as usize - ]); - assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE * 2, vec![50; 20]); - assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE * 2, vec![50; 20]); + assert_header(&mut frames, 1, SIZE * 2 + 20); + assert_data(&mut frames, 0, vec![48; USIZE]); + assert_header(&mut frames, 2, SIZE * 2 + 20); + assert_data(&mut frames, 0, vec![48; USIZE]); + assert_data(&mut frames, SIZE, vec![49; USIZE]); + assert_data(&mut frames, SIZE, vec![49; USIZE]); + assert_data(&mut frames, SIZE * 2, vec![50; 20]); + assert_data(&mut frames, SIZE * 2, vec![50; 20]); assert!(frames.is_empty()); } #[test] fn multiple_large_p16_sudden_p0() { - let mut mgr = PrioManager::new(); - mgr.get_tx().send(mock_out_large(16, 1)).unwrap(); - mgr.get_tx().send(mock_out_large(16, 2)).unwrap(); + let (mut mgr, tx) = PrioManager::new(); + tx.send(mock_out_large(16, 1)).unwrap(); + tx.send(mock_out_large(16, 2)).unwrap(); let mut frames = VecDeque::new(); mgr.fill_frames(3, &mut frames); - assert_header(&mut frames, 1, PrioManager::FRAME_DATA_SIZE * 2 + 20); - assert_data(&mut frames, 0, vec![ - 48; - PrioManager::FRAME_DATA_SIZE as usize - ]); - assert_header(&mut frames, 2, PrioManager::FRAME_DATA_SIZE * 2 + 20); - assert_data(&mut frames, 0, vec![ - 48; - PrioManager::FRAME_DATA_SIZE as usize - ]); - assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE, vec![ - 49; - PrioManager::FRAME_DATA_SIZE - as usize - ]); + assert_header(&mut frames, 1, SIZE * 2 + 20); + assert_data(&mut frames, 0, vec![48; USIZE]); + assert_header(&mut frames, 2, SIZE * 2 + 20); + assert_data(&mut frames, 0, vec![48; USIZE]); + assert_data(&mut frames, SIZE, vec![49; USIZE]); - mgr.get_tx().send(mock_out(0, 3)).unwrap(); + tx.send(mock_out(0, 3)).unwrap(); mgr.fill_frames(100, &mut frames); assert_header(&mut frames, 3, 3); assert_data(&mut frames, 0, vec![48, 49, 50]); - assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE, vec![ - 49; - PrioManager::FRAME_DATA_SIZE - as usize - ]); - assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE * 2, vec![50; 20]); - assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE * 2, vec![50; 20]); + assert_data(&mut frames, SIZE, vec![49; USIZE]); + assert_data(&mut frames, SIZE * 2, vec![50; 20]); + assert_data(&mut frames, SIZE * 2, vec![50; 20]); assert!(frames.is_empty()); } #[test] fn single_p20_thousand_p16_at_once() { - let mut mgr = PrioManager::new(); + let (mut mgr, tx) = PrioManager::new(); for _ in 0..998 { - mgr.get_tx().send(mock_out(16, 2)).unwrap(); + tx.send(mock_out(16, 2)).unwrap(); } - mgr.get_tx().send(mock_out(20, 1)).unwrap(); - mgr.get_tx().send(mock_out(16, 2)).unwrap(); - mgr.get_tx().send(mock_out(16, 2)).unwrap(); + tx.send(mock_out(20, 1)).unwrap(); + tx.send(mock_out(16, 2)).unwrap(); + tx.send(mock_out(16, 2)).unwrap(); let mut frames = VecDeque::new(); mgr.fill_frames(2000, &mut frames); @@ -531,16 +480,16 @@ mod tests { #[test] fn single_p20_thousand_p16_later() { - let mut mgr = PrioManager::new(); + let (mut mgr, tx) = PrioManager::new(); for _ in 0..998 { - mgr.get_tx().send(mock_out(16, 2)).unwrap(); + tx.send(mock_out(16, 2)).unwrap(); } let mut frames = VecDeque::new(); mgr.fill_frames(2000, &mut frames); //^unimportant frames, gonna be dropped - mgr.get_tx().send(mock_out(20, 1)).unwrap(); - mgr.get_tx().send(mock_out(16, 2)).unwrap(); - mgr.get_tx().send(mock_out(16, 2)).unwrap(); + tx.send(mock_out(20, 1)).unwrap(); + tx.send(mock_out(16, 2)).unwrap(); + tx.send(mock_out(16, 2)).unwrap(); let mut frames = VecDeque::new(); mgr.fill_frames(2000, &mut frames); diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs new file mode 100644 index 0000000000..7620001961 --- /dev/null +++ b/network/src/scheduler.rs @@ -0,0 +1,649 @@ +use crate::{ + api::{Address, Participant}, + channel::Channel, + frames::Frame, + message::OutGoingMessage, + participant::BParticipant, + prios::PrioManager, + types::{Cid, Pid, Prio, Sid}, +}; +use async_std::sync::RwLock; +use futures::{ + channel::{mpsc, oneshot}, + executor::ThreadPool, + future::FutureExt, + select, + sink::SinkExt, + stream::StreamExt, +}; +use std::{ + collections::{HashMap, VecDeque}, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, + }, +}; +use tracing::*; +use tracing_futures::Instrument; +//use futures::prelude::*; + +#[derive(Debug)] +struct ControlChannels { + listen_receiver: mpsc::UnboundedReceiver
, + connect_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender)>, + connected_sender: mpsc::UnboundedSender, + shutdown_receiver: oneshot::Receiver<()>, + prios: PrioManager, + prios_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, +} + +#[derive(Debug)] +pub struct Scheduler { + local_pid: Pid, + closed: AtomicBool, + pool: Arc, + run_channels: Option, + participants: Arc< + RwLock< + HashMap< + Pid, + ( + mpsc::UnboundedSender<(Cid, mpsc::UnboundedSender)>, + mpsc::UnboundedSender, + mpsc::UnboundedSender<(Pid, Sid, Frame)>, + ), + >, + >, + >, + participant_from_channel: Arc>>, + channel_ids: Arc, + channel_listener: RwLock>>, + unknown_channels: Arc< + RwLock< + HashMap< + Cid, + ( + mpsc::UnboundedSender, + Option>, + ), + >, + >, + >, +} + +impl Scheduler { + pub fn new( + local_pid: Pid, + ) -> ( + Self, + mpsc::UnboundedSender
, + mpsc::UnboundedSender<(Address, oneshot::Sender)>, + mpsc::UnboundedReceiver, + oneshot::Sender<()>, + ) { + let (listen_sender, listen_receiver) = mpsc::unbounded::
(); + let (connect_sender, connect_receiver) = + mpsc::unbounded::<(Address, oneshot::Sender)>(); + let (connected_sender, connected_receiver) = mpsc::unbounded::(); + let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>(); + let (prios, prios_sender) = PrioManager::new(); + + let run_channels = Some(ControlChannels { + listen_receiver, + connect_receiver, + connected_sender, + shutdown_receiver, + prios, + prios_sender, + }); + + ( + Self { + local_pid, + closed: AtomicBool::new(false), + pool: Arc::new(ThreadPool::new().unwrap()), + run_channels, + participants: Arc::new(RwLock::new(HashMap::new())), + participant_from_channel: Arc::new(RwLock::new(HashMap::new())), + channel_ids: Arc::new(AtomicU64::new(0)), + channel_listener: RwLock::new(HashMap::new()), + unknown_channels: Arc::new(RwLock::new(HashMap::new())), + }, + listen_sender, + connect_sender, + connected_receiver, + shutdown_sender, + ) + } + + pub async fn run(mut self) { + let (part_out_sender, part_out_receiver) = mpsc::unbounded::<(Cid, Frame)>(); + let (configured_sender, configured_receiver) = mpsc::unbounded::<(Cid, Pid, Sid)>(); + let (disconnect_sender, disconnect_receiver) = mpsc::unbounded::(); + let run_channels = self.run_channels.take().unwrap(); + + futures::join!( + self.listen_manager( + run_channels.listen_receiver, + part_out_sender.clone(), + configured_sender.clone(), + ), + self.connect_manager( + run_channels.connect_receiver, + part_out_sender, + configured_sender, + ), + self.disconnect_manager(disconnect_receiver,), + self.send_outgoing(run_channels.prios), + self.shutdown_manager(run_channels.shutdown_receiver), + self.handle_frames(part_out_receiver), + self.channel_configurer( + run_channels.connected_sender, + configured_receiver, + disconnect_sender, + run_channels.prios_sender.clone(), + ), + ); + } + + async fn listen_manager( + &self, + mut listen_receiver: mpsc::UnboundedReceiver
, + part_out_sender: mpsc::UnboundedSender<(Cid, Frame)>, + configured_sender: mpsc::UnboundedSender<(Cid, Pid, Sid)>, + ) { + trace!("start listen_manager"); + while let Some(address) = listen_receiver.next().await { + debug!(?address, "got request to open a channel_creator"); + let (end_sender, end_receiver) = oneshot::channel::<()>(); + self.channel_listener + .write() + .await + .insert(address.clone(), end_sender); + self.pool.spawn_ok(Self::channel_creator( + self.channel_ids.clone(), + self.local_pid, + address.clone(), + end_receiver, + self.pool.clone(), + part_out_sender.clone(), + configured_sender.clone(), + self.unknown_channels.clone(), + )); + } + trace!("stop listen_manager"); + } + + async fn connect_manager( + &self, + mut connect_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender)>, + part_out_sender: mpsc::UnboundedSender<(Cid, Frame)>, + configured_sender: mpsc::UnboundedSender<(Cid, Pid, Sid)>, + ) { + trace!("start connect_manager"); + while let Some((addr, pid_sender)) = connect_receiver.next().await { + match addr { + Address::Tcp(addr) => { + let stream = async_std::net::TcpStream::connect(addr).await.unwrap(); + info!("Connectiong TCP to: {}", stream.peer_addr().unwrap()); + let (part_in_sender, part_in_receiver) = mpsc::unbounded::(); + //channels are unknown till PID is known! + let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed); + self.unknown_channels + .write() + .await + .insert(cid, (part_in_sender, Some(pid_sender))); + self.pool.spawn_ok( + Channel::new(cid, self.local_pid) + .run( + stream, + part_in_receiver, + part_out_sender.clone(), + configured_sender.clone(), + ) + .instrument(tracing::info_span!("channel", ?addr)), + ); + }, + _ => unimplemented!(), + } + } + trace!("stop connect_manager"); + } + + async fn disconnect_manager(&self, mut disconnect_receiver: mpsc::UnboundedReceiver) { + trace!("start disconnect_manager"); + while let Some(pid) = disconnect_receiver.next().await { + error!(?pid, "I need to disconnect the pid"); + } + trace!("stop disconnect_manager"); + } + + async fn send_outgoing(&self, mut prios: PrioManager) { + //This time equals the MINIMUM Latency in average, so keep it down and //Todo: + // make it configureable or switch to await E.g. Prio 0 = await, prio 50 + // wait for more messages + const TICK_TIME: std::time::Duration = std::time::Duration::from_millis(10); + trace!("start send_outgoing"); + while !self.closed.load(Ordering::Relaxed) { + let mut frames = VecDeque::new(); + prios.fill_frames(3, &mut frames); + for (pid, sid, frame) in frames { + if let Some((_, _, sender)) = self.participants.write().await.get_mut(&pid) { + sender.send((pid, sid, frame)).await.unwrap(); + } + } + async_std::task::sleep(TICK_TIME).await; + } + trace!("stop send_outgoing"); + } + + async fn handle_frames(&self, mut part_out_receiver: mpsc::UnboundedReceiver<(Cid, Frame)>) { + trace!("start handle_frames"); + while let Some((cid, frame)) = part_out_receiver.next().await { + trace!("handling frame"); + if let Some(pid) = self.participant_from_channel.read().await.get(&cid) { + if let Some((_, sender, _)) = self.participants.write().await.get_mut(&pid) { + sender.send(frame).await.unwrap(); + } + } else { + error!("dropping frame, unreachable, got a frame from a non existing channel"); + } + } + trace!("stop handle_frames"); + } + + // + async fn channel_configurer( + &self, + mut connected_sender: mpsc::UnboundedSender, + mut receiver: mpsc::UnboundedReceiver<(Cid, Pid, Sid)>, + disconnect_sender: mpsc::UnboundedSender, + prios_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, + ) { + trace!("start channel_activator"); + while let Some((cid, pid, offset_sid)) = receiver.next().await { + if let Some((frame_sender, pid_oneshot)) = + self.unknown_channels.write().await.remove(&cid) + { + trace!( + ?cid, + ?pid, + "detected that my channel is ready!, activating it :)" + ); + let mut participants = self.participants.write().await; + if !participants.contains_key(&pid) { + debug!(?cid, "new participant connected via a channel"); + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + + let ( + bparticipant, + stream_open_sender, + stream_opened_receiver, + mut transfer_channel_receiver, + frame_recv_sender, + frame_send_sender, + ) = BParticipant::new(pid, offset_sid, prios_sender.clone()); + + let participant = Participant::new( + self.local_pid, + pid, + stream_open_sender, + stream_opened_receiver, + shutdown_receiver, + disconnect_sender.clone(), + ); + if let Some(pid_oneshot) = pid_oneshot { + // someone is waiting with connect, so give them their PID + pid_oneshot.send(participant).unwrap(); + } else { + // noone is waiting on this Participant, return in to Network + connected_sender.send(participant).await.unwrap(); + } + transfer_channel_receiver + .send((cid, frame_sender)) + .await + .unwrap(); + participants.insert( + pid, + ( + transfer_channel_receiver, + frame_recv_sender, + frame_send_sender, + ), + ); + self.participant_from_channel.write().await.insert(cid, pid); + self.pool.spawn_ok( + bparticipant + .run() + .instrument(tracing::info_span!("participant", ?pid)), + ); + } else { + error!( + "2ND channel of participants opens, but we cannot verify that this is not \ + a attack to " + ) + } + } + } + trace!("stop channel_activator"); + } + + pub async fn shutdown_manager(&self, receiver: oneshot::Receiver<()>) { + trace!("start shutdown_manager"); + receiver.await.unwrap(); + self.closed.store(true, Ordering::Relaxed); + trace!("stop shutdown_manager"); + } + + pub async fn channel_creator( + channel_ids: Arc, + local_pid: Pid, + addr: Address, + end_receiver: oneshot::Receiver<()>, + pool: Arc, + part_out_sender: mpsc::UnboundedSender<(Cid, Frame)>, + configured_sender: mpsc::UnboundedSender<(Cid, Pid, Sid)>, + unknown_channels: Arc< + RwLock< + HashMap< + Cid, + ( + mpsc::UnboundedSender, + Option>, + ), + >, + >, + >, + ) { + info!(?addr, "start up channel creator"); + match addr { + Address::Tcp(addr) => { + let listener = async_std::net::TcpListener::bind(addr).await.unwrap(); + let mut incoming = listener.incoming(); + let mut end_receiver = end_receiver.fuse(); + while let Some(stream) = select! { + next = incoming.next().fuse() => next, + _ = end_receiver => None, + } { + let stream = stream.unwrap(); + info!("Accepting TCP from: {}", stream.peer_addr().unwrap()); + let (mut part_in_sender, part_in_receiver) = mpsc::unbounded::(); + //channels are unknown till PID is known! + /* When A connects to a NETWORK, we, the listener answers with a Handshake. + Pro: - Its easier to debug, as someone who opens a port gets a magic number back! + Contra: - DOS posibility because we answer fist + - Speed, because otherwise the message can be send with the creation + */ + let cid = channel_ids.fetch_add(1, Ordering::Relaxed); + let channel = Channel::new(cid, local_pid); + channel.send_handshake(&mut part_in_sender).await; + pool.spawn_ok( + channel + .run( + stream, + part_in_receiver, + part_out_sender.clone(), + configured_sender.clone(), + ) + .instrument(tracing::info_span!("channel", ?addr)), + ); + unknown_channels + .write() + .await + .insert(cid, (part_in_sender, None)); + } + }, + _ => unimplemented!(), + } + info!(?addr, "ending channel creator"); + } +} + +/* +use crate::{ + async_serde, + channel::{Channel, ChannelProtocol, ChannelProtocols}, + controller::Controller, + metrics::NetworkMetrics, + prios::PrioManager, + tcp::TcpChannel, + types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects}, +}; +use std::{ + collections::{HashMap, VecDeque}, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc, + mpsc::TryRecvError, + Arc, + }, + time::Instant, +}; +use tlid; +use tracing::*; +use crate::types::Protocols; +use crate::frames::{ChannelFrame, ParticipantFrame, StreamFrame, Frame}; + +/* +The worker lives in a own thread and only communcates with the outside via a Channel + +Prios are done per participant, but their throughput is split equalli, +That allows indepentend calculation of prios (no global hotspot) while no Participant is starved as the total throughput is measured and aproximated :) + +streams are per participant, and channels are per participants, streams dont have a specific channel! +*/ + +use async_std::sync::RwLock; +use async_std::io::prelude::*; +use crate::async_serde::{SerializeFuture, DeserializeFuture}; +use uvth::ThreadPoolBuilder; +use async_std::stream::Stream; +use async_std::sync::{self, Sender, Receiver}; +use crate::types::{VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION,}; +use crate::message::InCommingMessage; + +use futures::channel::mpsc; +use futures::sink::SinkExt; +use futures::{select, FutureExt}; + +#[derive(Debug)] +struct BStream { + sid: Sid, + prio: u8, + promises: u8, +} + +struct BChannel { + remote_pid: Option, + stream: RwLock, + send_stream: Sender, + recv_stream: Receiver, + send_participant: Sender, + recv_participant: Receiver, + + send_handshake: bool, + send_pid: bool, + send_shutdown: bool, + recv_handshake: bool, + recv_pid: bool, + recv_shutdown: bool, +} + +struct BAcceptor { + listener: RwLock, +} + +struct BParticipant { + remote_pid: Pid, + channels: HashMap>, + streams: Vec, + sid_pool: tlid::Pool>, + prios: RwLock, + closed: AtomicBool, +} + +pub(crate) struct Scheduler { + local_pid: Pid, + metrics: Arc>, + participants: HashMap, + pending_channels: HashMap>, + /* ctrl_rx: Receiver, + * rtrn_tx: mpsc::Sender, */ +} + +impl BStream { + +} + +impl BChannel { + /* + /// Execute when ready to read + pub async fn recv(&self) -> Vec { + let mut buffer: [u8; 2000] = [0; 2000]; + let read = self.stream.write().await.read(&mut buffer).await; + match read { + Ok(n) => { + let x = DeserializeFuture::new(buffer[0..n].to_vec(), &ThreadPoolBuilder::new().build()).await; + return vec!(x); + }, + Err(e) => { + panic!("woops {}", e); + } + } + } + /// Execute when ready to write + pub async fn send>(&self, frames: &mut I) { + for frame in frames { + let x = SerializeFuture::new(frame, &ThreadPoolBuilder::new().build()).await; + self.stream.write().await.write_all(&x).await; + } + } + */ + + pub fn get_tx(&self) -> &Sender { + &self.send_stream + } + + pub fn get_rx(&self) -> &Receiver { + &self.recv_stream + } + + pub fn get_participant_tx(&self) -> &Sender { + &self.send_participant + } + + pub fn get_participant_rx(&self) -> &Receiver { + &self.recv_participant + } +} + + + +impl BParticipant { + pub async fn read(&self) { + while self.closed.load(Ordering::Relaxed) { + for channels in self.channels.values() { + for channel in channels.iter() { + //let frames = channel.recv().await; + let frame = channel.get_rx().recv().await.unwrap(); + match frame { + Frame::Channel(cf) => channel.handle(cf).await, + Frame::Participant(pf) => self.handle(pf).await, + Frame::Stream(sf) => {}, + } + } + } + async_std::task::sleep(std::time::Duration::from_millis(100)).await; + } + } + + pub async fn write(&self) { + let mut frames = VecDeque::<(u8, StreamFrame)>::new(); + while self.closed.load(Ordering::Relaxed) { + let todo_synced_amount_and_reasonable_choosen_throughput_based_on_feedback = 100; + self.prios.write().await.fill_frames( + todo_synced_amount_and_reasonable_choosen_throughput_based_on_feedback, + &mut frames, + ); + for (promises, frame) in frames.drain(..) { + let channel = self.chose_channel(promises); + channel.get_tx().send(Frame::Stream(frame)).await; + } + } + } + + pub async fn handle(&self, frame: ParticipantFrame) { + info!("got a frame to handle"); + /* + match frame { + ParticipantFrame::OpenStream { + sid, + prio, + promises, + } => { + if let Some(pid) = self.remote_pid { + let (msg_tx, msg_rx) = futures::channel::mpsc::unbounded::(); + let stream = IntStream::new(sid, prio, promises.clone(), msg_tx); + + trace!(?self.streams, "-OPEN STREAM- going to modify streams"); + self.streams.push(stream); + trace!(?self.streams, "-OPEN STREAM- did to modify streams"); + info!("opened a stream"); + if let Err(err) = rtrn_tx.send(RtrnMsg::OpendStream { + pid, + sid, + prio, + msg_rx, + promises, + }) { + error!(?err, "couldn't notify of opened stream"); + } + } else { + error!("called OpenStream before PartcipantID!"); + } + }, + ParticipantFrame::CloseStream { sid } => { + if let Some(pid) = self.remote_pid { + trace!(?self.streams, "-CLOSE STREAM- going to modify streams"); + self.streams.retain(|stream| stream.sid() != sid); + trace!(?self.streams, "-CLOSE STREAM- did to modify streams"); + info!("closed a stream"); + if let Err(err) = rtrn_tx.send(RtrnMsg::ClosedStream { pid, sid }) { + error!(?err, "couldn't notify of closed stream"); + } + } + }, + }*/ + } + + /// Endless task that will cover sending for Participant + pub async fn run(&mut self) { + let (incomming_sender, incomming_receiver) = mpsc::unbounded(); + futures::join!(self.read(), self.write()); + } + + pub fn chose_channel(&self, + promises: u8, /* */ + ) -> &BChannel { + for v in self.channels.values() { + for c in v { + return c; + } + } + panic!("No Channel!"); + } +} + +impl Scheduler { + pub fn new( + pid: Pid, + metrics: Arc>, + sid_backup_per_participant: Arc>>>>, + token_pool: tlid::Pool>, + ) -> Self { + panic!("asd"); + } + + pub fn run(&mut self) { loop {} } +} +*/ diff --git a/network/src/tcp.rs b/network/src/tcp.rs index 87fdb0e870..8b13789179 100644 --- a/network/src/tcp.rs +++ b/network/src/tcp.rs @@ -1,145 +1 @@ -use crate::{ - channel::ChannelProtocol, - types::{Frame, NetworkBuffer}, -}; -use bincode; -use mio::net::TcpStream; -use std::io::{Read, Write}; -use tracing::*; -pub(crate) struct TcpChannel { - endpoint: TcpStream, - read_buffer: NetworkBuffer, - write_buffer: NetworkBuffer, -} - -impl TcpChannel { - pub fn new(endpoint: TcpStream) -> Self { - Self { - endpoint, - read_buffer: NetworkBuffer::new(), - write_buffer: NetworkBuffer::new(), - } - } -} - -impl ChannelProtocol for TcpChannel { - type Handle = TcpStream; - - /// Execute when ready to read - fn read(&mut self) -> Vec { - let mut result = Vec::new(); - loop { - match self.endpoint.read(self.read_buffer.get_write_slice(2048)) { - Ok(0) => { - //Shutdown - trace!(?self, "shutdown of tcp channel detected"); - result.push(Frame::Shutdown); - break; - }, - Ok(n) => { - self.read_buffer.actually_written(n); - trace!("incomming message with len: {}", n); - let slice = self.read_buffer.get_read_slice(); - let mut cur = std::io::Cursor::new(slice); - let mut read_ok = 0; - while cur.position() < n as u64 { - let round_start = cur.position() as usize; - let r: Result = bincode::deserialize_from(&mut cur); - match r { - Ok(frame) => { - result.push(frame); - read_ok = cur.position() as usize; - }, - Err(e) => { - // Probably we have to wait for moare data! - let first_bytes_of_msg = - &slice[round_start..std::cmp::min(n, round_start + 16)]; - debug!( - ?self, - ?e, - ?n, - ?round_start, - ?first_bytes_of_msg, - "message cant be parsed, probably because we need to wait for \ - more data" - ); - break; - }, - } - } - self.read_buffer.actually_read(read_ok); - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("would block"); - break; - }, - Err(e) => panic!("{}", e), - }; - } - result - } - - /// Execute when ready to write - fn write>(&mut self, frames: &mut I) { - loop { - //serialize when len < MTU 1500, then write - if self.write_buffer.get_read_slice().len() < 1500 { - match frames.next() { - Some(frame) => { - if let Ok(size) = bincode::serialized_size(&frame) { - let slice = self.write_buffer.get_write_slice(size as usize); - if let Err(err) = bincode::serialize_into(slice, &frame) { - error!( - ?err, - "serialising frame was unsuccessful, this should never \ - happen! dropping frame!" - ) - } - self.write_buffer.actually_written(size as usize); //I have to rely on those informations to be consistent! - } else { - error!( - "getting size of frame was unsuccessful, this should never \ - happen! dropping frame!" - ) - }; - }, - None => break, - } - } - - match self.endpoint.write(self.write_buffer.get_read_slice()) { - Ok(n) => { - self.write_buffer.actually_read(n); - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("can't send tcp yet, would block"); - return; - }, - Err(e) => panic!("{}", e), - } - } - } - - fn get_handle(&self) -> &Self::Handle { &self.endpoint } -} - -impl std::fmt::Debug for TcpChannel { - #[inline] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.endpoint) - } -} - -impl std::fmt::Debug for NetworkBuffer { - #[inline] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "NetworkBuffer(len: {}, read: {}, write: {})", - self.data.len(), - self.read_idx, - self.write_idx - ) - } -} diff --git a/network/src/types.rs b/network/src/types.rs index d78be0613a..9d67c1e9dc 100644 --- a/network/src/types.rs +++ b/network/src/types.rs @@ -1,156 +1,24 @@ -use crate::{ - api::Promise, - channel::Channel, - message::{InCommingMessage, OutGoingMessage}, -}; -use enumset::EnumSet; -use futures; -use mio::{self, net::TcpListener, PollOpt, Ready}; +use rand::Rng; use serde::{Deserialize, Serialize}; -use std::collections::VecDeque; use tracing::*; -use uuid::Uuid; -//Participant Ids are randomly chosen -pub type Pid = Uuid; -//Stream Ids are unique per Participant* and are split in 2 ranges, one for -// every Network involved Every Channel gets a subrange during their handshake -// protocol from one of the 2 ranges -//*otherwise extra synchronization would be needed pub type Sid = u64; -//Message Ids are unique per Stream* and are split in 2 ranges, one for every -// Channel involved -//*otherwise extra synchronization would be needed pub type Mid = u64; +pub type Cid = u64; +pub type Prio = u8; +pub type Promises = u8; + +pub const PROMISES_NONE: Promises = 0; +pub const PROMISES_ORDERED: Promises = 1; +pub const PROMISES_CONSISTENCY: Promises = 2; +pub const PROMISES_GUARANTEED_DELIVERY: Promises = 4; +pub const PROMISES_COMPRESSED: Promises = 8; +pub const PROMISES_ENCRYPTED: Promises = 16; pub(crate) const VELOREN_MAGIC_NUMBER: &str = "VELOREN"; pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 2, 0]; -pub const DEFAULT_SID_SIZE: u64 = 1 << 48; - -// Used for Communication between Controller <--> Worker -pub(crate) enum CtrlMsg { - Shutdown, - Register(TokenObjects, Ready, PollOpt), - OpenStream { - pid: Pid, - sid: Sid, - prio: u8, - promises: EnumSet, - msg_tx: futures::channel::mpsc::UnboundedSender, - }, - CloseStream { - pid: Pid, - sid: Sid, - }, - Send(OutGoingMessage), -} - -pub(crate) enum RtrnMsg { - Shutdown, - ConnectedParticipant { - pid: Pid, - controller_sids: tlid::Pool>, - }, - OpendStream { - pid: Pid, - sid: Sid, - prio: u8, - msg_rx: futures::channel::mpsc::UnboundedReceiver, - promises: EnumSet, - }, - ClosedStream { - pid: Pid, - sid: Sid, - }, -} - -#[derive(Debug)] -pub(crate) enum TokenObjects { - TcpListener(TcpListener), - Channel(Channel), -} - -#[derive(Debug)] -pub(crate) struct IntStream { - sid: Sid, - prio: u8, - promises: EnumSet, - pub mid_pool: tlid::Pool>, - msg_tx: futures::channel::mpsc::UnboundedSender, - pub to_send: VecDeque, - pub to_receive: VecDeque, -} - -impl IntStream { - pub fn new( - sid: Sid, - prio: u8, - promises: EnumSet, - msg_tx: futures::channel::mpsc::UnboundedSender, - ) -> Self { - IntStream { - sid, - prio, - promises, - mid_pool: tlid::Pool::new_full(), - msg_tx, - to_send: VecDeque::new(), - to_receive: VecDeque::new(), - } - } - - pub fn sid(&self) -> Sid { self.sid } - - pub fn prio(&self) -> u8 { self.prio } - - pub fn msg_tx(&self) -> futures::channel::mpsc::UnboundedSender { - self.msg_tx.clone() - } - - pub fn promises(&self) -> EnumSet { self.promises } -} - -// Used for Communication between Channel <----(TCP/UDP)----> Channel -#[derive(Serialize, Deserialize, Debug)] -pub(crate) enum Frame { - Handshake { - magic_number: String, - version: [u32; 3], - }, - Configure { - //only one Participant will send this package and give the other a range to use - sender_controller_sids: tlid::RemoveAllocation, - sender_worker_sids: tlid::RemoveAllocation, - receiver_controller_sids: tlid::Pool>, - receiver_worker_sids: tlid::Pool>, - }, - ParticipantId { - pid: Pid, - }, - Shutdown, /* Shutsdown this channel gracefully, if all channels are shut down, Participant - * is deleted */ - OpenStream { - sid: Sid, - prio: u8, - promises: EnumSet, - }, - CloseStream { - sid: Sid, - }, - DataHeader { - mid: Mid, - sid: Sid, - length: u64, - }, - Data { - id: Mid, - start: u64, - data: Vec, - }, - /* WARNING: Sending RAW is only used for debug purposes in case someone write a new API - * against veloren Server! */ - Raw(Vec), -} +pub(crate) const STREAM_ID_OFFSET1: Sid = 0; +pub(crate) const STREAM_ID_OFFSET2: Sid = u64::MAX / 2; pub(crate) struct NetworkBuffer { pub(crate) data: Vec, @@ -158,6 +26,29 @@ pub(crate) struct NetworkBuffer { pub(crate) write_idx: usize, } +#[derive(PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)] +pub struct Pid { + internal: u128, +} + +impl Pid { + pub fn new() -> Self { + Self { + internal: rand::thread_rng().gen(), + } + } + + /// don't use fake! just for testing! + /// This will panic if pid i greater than 7, as i do not want you to use + /// this in production! + pub fn fake(pid: u8) -> Self { + assert!(pid < 8); + Self { + internal: pid as u128, + } + } +} + /// NetworkBuffer to use for streamed access /// valid data is between read_idx and write_idx! /// everything before read_idx is already processed and no longer important @@ -224,15 +115,22 @@ impl NetworkBuffer { } } -fn chose_protocol( - available_protocols: u8, /* 1 = TCP, 2= UDP, 4 = MPSC */ - promises: u8, /* */ -) -> u8 /*1,2 or 4*/ { - if available_protocols & (1 << 3) != 0 { - 4 - } else if available_protocols & (1 << 1) != 0 { - 1 - } else { - 2 +impl std::fmt::Debug for NetworkBuffer { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "NetworkBuffer(len: {}, read: {}, write: {})", + self.data.len(), + self.read_idx, + self.write_idx + ) + } +} + +impl std::fmt::Debug for Pid { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.internal) } } diff --git a/network/src/udp.rs b/network/src/udp.rs index c12cc838b4..8b13789179 100644 --- a/network/src/udp.rs +++ b/network/src/udp.rs @@ -1,131 +1 @@ -use crate::{ - channel::ChannelProtocol, - types::{Frame, NetworkBuffer}, -}; -use bincode; -use mio::net::UdpSocket; -use tracing::*; -pub(crate) struct UdpChannel { - endpoint: UdpSocket, - read_buffer: NetworkBuffer, - write_buffer: NetworkBuffer, -} - -impl UdpChannel { - pub fn _new(endpoint: UdpSocket) -> Self { - Self { - endpoint, - read_buffer: NetworkBuffer::new(), - write_buffer: NetworkBuffer::new(), - } - } -} - -impl ChannelProtocol for UdpChannel { - type Handle = UdpSocket; - - /// Execute when ready to read - fn read(&mut self) -> Vec { - let mut result = Vec::new(); - loop { - match self.endpoint.recv(self.read_buffer.get_write_slice(2048)) { - Ok(0) => { - //Shutdown - trace!(?self, "shutdown of tcp channel detected"); - result.push(Frame::Shutdown); - break; - }, - Ok(n) => { - self.read_buffer.actually_written(n); - trace!("incomming message with len: {}", n); - let slice = self.read_buffer.get_read_slice(); - let mut cur = std::io::Cursor::new(slice); - let mut read_ok = 0; - while cur.position() < n as u64 { - let round_start = cur.position() as usize; - let r: Result = bincode::deserialize_from(&mut cur); - match r { - Ok(frame) => { - result.push(frame); - read_ok = cur.position() as usize; - }, - Err(e) => { - // Probably we have to wait for moare data! - let first_bytes_of_msg = - &slice[round_start..std::cmp::min(n, round_start + 16)]; - debug!( - ?self, - ?e, - ?n, - ?round_start, - ?first_bytes_of_msg, - "message cant be parsed, probably because we need to wait for \ - more data" - ); - break; - }, - } - } - self.read_buffer.actually_read(read_ok); - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("would block"); - break; - }, - Err(e) => panic!("{}", e), - }; - } - result - } - - /// Execute when ready to write - fn write>(&mut self, frames: &mut I) { - loop { - //serialize when len < MTU 1500, then write - if self.write_buffer.get_read_slice().len() < 1500 { - match frames.next() { - Some(frame) => { - if let Ok(size) = bincode::serialized_size(&frame) { - let slice = self.write_buffer.get_write_slice(size as usize); - if let Err(err) = bincode::serialize_into(slice, &frame) { - error!( - ?err, - "serialising frame was unsuccessful, this should never \ - happen! dropping frame!" - ) - } - self.write_buffer.actually_written(size as usize); //I have to rely on those informations to be consistent! - } else { - error!( - "getting size of frame was unsuccessful, this should never \ - happen! dropping frame!" - ) - }; - }, - None => break, - } - } - - match self.endpoint.send(self.write_buffer.get_read_slice()) { - Ok(n) => { - self.write_buffer.actually_read(n); - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("can't send tcp yet, would block"); - return; - }, - Err(e) => panic!("{}", e), - } - } - } - - fn get_handle(&self) -> &Self::Handle { &self.endpoint } -} - -impl std::fmt::Debug for UdpChannel { - #[inline] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.endpoint) - } -} diff --git a/network/src/worker.rs b/network/src/worker.rs deleted file mode 100644 index b47cd2e4ed..0000000000 --- a/network/src/worker.rs +++ /dev/null @@ -1,301 +0,0 @@ -use crate::{ - channel::{Channel, ChannelProtocol, ChannelProtocols}, - controller::Controller, - metrics::NetworkMetrics, - tcp::TcpChannel, - types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects}, -}; -use mio::{self, Poll, PollOpt, Ready, Token}; -use mio_extras::channel::Receiver; -use std::{ - collections::HashMap, - sync::{mpsc, mpsc::TryRecvError, Arc, RwLock}, - time::Instant, -}; -use tlid; -use tracing::*; -/* -The worker lives in a own thread and only communcates with the outside via a Channel -*/ - -pub(crate) struct MioTokens { - pool: tlid::Pool>, - pub tokens: HashMap, //TODO: move to Vec for faster lookup -} - -impl MioTokens { - pub fn new(pool: tlid::Pool>) -> Self { - MioTokens { - pool, - tokens: HashMap::new(), - } - } - - pub fn construct(&mut self) -> Token { Token(self.pool.next()) } - - pub fn insert(&mut self, tok: Token, obj: TokenObjects) { - trace!(?tok, ?obj, "added new token"); - self.tokens.insert(tok, obj); - } -} - -pub(crate) struct Worker { - pid: Pid, - poll: Arc, - metrics: Arc>, - sid_backup_per_participant: Arc>>>>, - participants: HashMap>>, - ctrl_rx: Receiver, - rtrn_tx: mpsc::Sender, - mio_tokens: MioTokens, - time_before_poll: Instant, - time_after_poll: Instant, -} - -impl Worker { - pub fn new( - pid: Pid, - poll: Arc, - metrics: Arc>, - sid_backup_per_participant: Arc>>>>, - token_pool: tlid::Pool>, - ctrl_rx: Receiver, - rtrn_tx: mpsc::Sender, - ) -> Self { - let mio_tokens = MioTokens::new(token_pool); - Worker { - pid, - poll, - metrics, - sid_backup_per_participant, - participants: HashMap::new(), - ctrl_rx, - rtrn_tx, - mio_tokens, - time_before_poll: Instant::now(), - time_after_poll: Instant::now(), - } - } - - pub fn run(&mut self) { - let mut events = mio::Events::with_capacity(1024); - loop { - self.time_before_poll = Instant::now(); - if let Err(err) = self.poll.poll(&mut events, None) { - error!("network poll error: {}", err); - return; - } - self.time_after_poll = Instant::now(); - for event in &events { - trace!(?event, "event"); - match event.token() { - Controller::CTRL_TOK => { - if self.handle_ctl() { - return; - } - }, - _ => self.handle_tok(&event), - }; - } - self.handle_statistics(); - } - } - - fn handle_ctl(&mut self) -> bool { - info!("start in handle_ctl"); - loop { - info!("recv in handle_ctl"); - - let msg = match self.ctrl_rx.try_recv() { - Ok(msg) => msg, - Err(TryRecvError::Empty) => { - return false; - }, - Err(err) => { - panic!("Unexpected error '{}'", err); - }, - }; - info!("Loop in handle_ctl"); - - match msg { - CtrlMsg::Shutdown => { - debug!("Shutting Down"); - for (_, obj) in self.mio_tokens.tokens.iter_mut() { - if let TokenObjects::Channel(channel) = obj { - channel.shutdown(); - channel.tick_send(); - } - } - return true; - }, - CtrlMsg::Register(handle, interest, opts) => { - let tok = self.mio_tokens.construct(); - match &handle { - TokenObjects::TcpListener(h) => { - self.poll.register(h, tok, interest, opts).unwrap() - }, - TokenObjects::Channel(channel) => { - match channel.get_protocol() { - ChannelProtocols::Tcp(c) => { - self.poll.register(c.get_handle(), tok, interest, opts) - }, - ChannelProtocols::Udp(c) => { - self.poll.register(c.get_handle(), tok, interest, opts) - }, - ChannelProtocols::Mpsc(c) => { - self.poll.register(c.get_handle(), tok, interest, opts) - }, - } - .unwrap(); - }, - } - debug!(?handle, ?tok, "Registered new handle"); - self.mio_tokens.insert(tok, handle); - }, - CtrlMsg::OpenStream { - pid, - sid, - prio, - promises, - msg_tx, - } => { - let mut handled = false; - for (_, obj) in self.mio_tokens.tokens.iter_mut() { - if let TokenObjects::Channel(channel) = obj { - if Some(pid) == channel.remote_pid { - info!(?channel.streams, "-CTR- going to open stream"); - channel.open_stream(sid, prio, promises, msg_tx); - info!(?channel.streams, "-CTR- going to tick"); - channel.tick_send(); - info!(?channel.streams, "-CTR- did to open stream"); - handled = true; - break; - } - } - } - if !handled { - error!(?pid, "couldn't open Stream, didn't found pid"); - } - }, - CtrlMsg::CloseStream { pid, sid } => { - let mut handled = false; - for to in self.mio_tokens.tokens.values_mut() { - if let TokenObjects::Channel(channel) = to { - if Some(pid) == channel.remote_pid { - info!(?channel.streams, "-CTR- going to close stream"); - channel.close_stream(sid); //TODO: check participant - info!(?channel.streams, "-CTR- going to tick"); - channel.tick_send(); - info!(?channel.streams, "-CTR- did to close stream"); - handled = true; - break; - } - } - } - if !handled { - error!(?pid, "couldn't close Stream, didn't found pid"); - } - }, - CtrlMsg::Send(outgoing) => { - let mut handled = false; - for to in self.mio_tokens.tokens.values_mut() { - if let TokenObjects::Channel(channel) = to { - info!(?channel.streams, "-CTR- going to send msg"); - channel.send(outgoing); //TODO: check participant - info!(?channel.streams, "-CTR- going to tick"); - channel.tick_send(); - info!(?channel.streams, "-CTR- did to send msg"); - handled = true; - break; - } - } - if !handled { - error!( - "help, we should check here for stream data, but its in channel ...." - ); - } - }, - }; - } - } - - fn handle_tok(&mut self, event: &mio::Event) { - let obj = match self.mio_tokens.tokens.get_mut(&event.token()) { - Some(obj) => obj, - None => panic!("Unexpected event token '{:?}'", &event.token()), - }; - - match obj { - TokenObjects::TcpListener(listener) => match listener.accept() { - Ok((remote_stream, _)) => { - info!(?remote_stream, "remote connected"); - - let tok = self.mio_tokens.construct(); - self.poll - .register( - &remote_stream, - tok, - Ready::readable() | Ready::writable(), - PollOpt::edge(), - ) - .unwrap(); - trace!(?remote_stream, ?tok, "registered"); - let tcp_channel = TcpChannel::new(remote_stream); - let mut channel = Channel::new( - self.pid, - ChannelProtocols::Tcp(tcp_channel), - self.sid_backup_per_participant.clone(), - None, - ); - channel.handshake(); - channel.tick_send(); - - self.mio_tokens - .tokens - .insert(tok, TokenObjects::Channel(channel)); - }, - Err(err) => { - error!(?err, "error during remote connected"); - }, - }, - TokenObjects::Channel(channel) => { - if event.readiness().is_readable() { - let protocol = channel.get_protocol(); - trace!(?protocol, "channel readable"); - channel.tick_recv(&mut self.participants, &self.rtrn_tx); - } else { - trace!("channel not readable"); - } - if event.readiness().is_writable() { - let protocol = channel.get_protocol(); - trace!(?protocol, "channel writeable"); - channel.tick_send(); - } else { - trace!("channel not writeable"); - let protocol = channel.get_protocol(); - if let ChannelProtocols::Mpsc(_) = &protocol { - channel.tick_send(); //workaround for MPSC!!! ONLY for MPSC - } - } - }, - }; - } - - fn handle_statistics(&mut self) { - let time_after_work = Instant::now(); - - let idle = self.time_after_poll.duration_since(self.time_before_poll); - let work = time_after_work.duration_since(self.time_after_poll); - - if let Some(metric) = &*self.metrics { - metric - .worker_idle_time - .with_label_values(&["message"]) - .add(idle.as_millis() as i64); //todo convert correctly ! - metric - .worker_work_time - .with_label_values(&["message"]) - .add(work.as_millis() as i64); - } - } -} diff --git a/network/tests/helper.rs b/network/tests/helper.rs index 834315edf1..f447fde09b 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -1,19 +1,14 @@ use lazy_static::*; -use std::{sync::Arc, thread, time::Duration}; +use std::{ + net::SocketAddr, + sync::atomic::{AtomicU16, Ordering}, + thread, + time::Duration, +}; use tracing::*; use tracing_subscriber::EnvFilter; -use uvth::{ThreadPool, ThreadPoolBuilder}; - -pub fn setup(tracing: bool, mut sleep: u64) -> (Arc, u64) { - lazy_static! { - static ref THREAD_POOL: Arc = Arc::new( - ThreadPoolBuilder::new() - .name("veloren-network-test".into()) - .num_threads(2) - .build(), - ); - } +pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) { if tracing { sleep += 1000 } @@ -49,5 +44,13 @@ pub fn setup(tracing: bool, mut sleep: u64) -> (Arc, u64) { None }; - (THREAD_POOL.clone(), 0) + (0, 0) +} + +pub fn tcp() -> veloren_network::Address { + lazy_static! { + static ref PORTS: AtomicU16 = AtomicU16::new(5000); + } + let port = PORTS.fetch_add(1, Ordering::Relaxed); + veloren_network::Address::Tcp(SocketAddr::from(([127, 0, 0, 1], port))) } diff --git a/network/tests/integration.rs b/network/tests/integration.rs index 88e848eca9..45f95617c8 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -1,110 +1,77 @@ -use futures::executor::block_on; -use std::{net::SocketAddr, thread, time::Duration}; -use uuid::Uuid; -use veloren_network::{Address, Network, Promise}; - +use async_std::{sync::RwLock, task}; +use futures::{ + channel::{mpsc, oneshot}, + executor::ThreadPool, + sink::SinkExt, +}; +use std::sync::{atomic::AtomicU64, Arc}; +use veloren_network::{Network, Pid, Scheduler}; mod helper; +use std::collections::HashMap; +use tracing::*; +use uvth::ThreadPoolBuilder; -/* #[test] -fn tcp_simple() { - let (thread_pool, _) = helper::setup(true, 100); - let n1 = Network::new(Uuid::new_v4(), thread_pool.clone()); - let n2 = Network::new(Uuid::new_v4(), thread_pool.clone()); - let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000))); - let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52001))); - n1.listen(&a1).unwrap(); //await - n2.listen(&a2).unwrap(); // only requiered here, but doesnt hurt on n1 - thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! +fn network() { + let (_, _) = helper::setup(true, 100); + { + let addr1 = helper::tcp(); + let pool = ThreadPoolBuilder::new().num_threads(2).build(); + let n1 = Network::new(Pid::fake(1), &pool); + let n2 = Network::new(Pid::fake(2), &pool); - let p1 = block_on(n1.connect(&a2)).unwrap(); //await - let s1 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); + n1.listen(addr1.clone()).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(100)); - assert!(s1.send("Hello World").is_ok()); + let pid1 = task::block_on(n2.connect(addr1)).unwrap(); + warn!("yay connected"); - let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1 - let mut s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1 + let pid2 = task::block_on(n1.connected()).unwrap(); + warn!("yay connected"); - let s: Result = block_on(s1_n2.recv()); - assert_eq!(s, Ok("Hello World".to_string())); + let mut sid1_p1 = task::block_on(pid1.open(10, 0)).unwrap(); + let mut sid1_p2 = task::block_on(pid2.opened()).unwrap(); - assert!(s1.close().is_ok()); + task::block_on(sid1_p1.send("Hello World")).unwrap(); + let m1: Result = task::block_on(sid1_p2.recv()); + assert_eq!(m1, Ok("Hello World".to_string())); + + //assert_eq!(pid, Pid::fake(1)); + + std::thread::sleep(std::time::Duration::from_secs(10)); + } + std::thread::sleep(std::time::Duration::from_secs(2)); } -*/ -/* #[test] -fn tcp_5streams() { - let (thread_pool, _) = helper::setup(false, 200); - let n1 = Network::new(Uuid::new_v4(), thread_pool.clone()); - let n2 = Network::new(Uuid::new_v4(), thread_pool.clone()); - let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52010))); - let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52011))); - - n1.listen(&a1).unwrap(); //await - n2.listen(&a2).unwrap(); // only requiered here, but doesnt hurt on n1 - thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! - - let p1 = block_on(n1.connect(&a2)).unwrap(); //await - - let s1 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); - let s2 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); - let s3 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); - let s4 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); - let s5 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); - - assert!(s3.send("Hello World3").is_ok()); - assert!(s1.send("Hello World1").is_ok()); - assert!(s5.send("Hello World5").is_ok()); - assert!(s2.send("Hello World2").is_ok()); - assert!(s4.send("Hello World4").is_ok()); - - let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1 - let mut s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1 - let mut s2_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s2 - let mut s3_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s3 - let mut s4_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s4 - let mut s5_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s5 - - info!("all streams opened"); - - let s: Result = block_on(s3_n2.recv()); - assert_eq!(s, Ok("Hello World3".to_string())); - let s: Result = block_on(s1_n2.recv()); - assert_eq!(s, Ok("Hello World1".to_string())); - let s: Result = block_on(s2_n2.recv()); - assert_eq!(s, Ok("Hello World2".to_string())); - let s: Result = block_on(s5_n2.recv()); - assert_eq!(s, Ok("Hello World5".to_string())); - let s: Result = block_on(s4_n2.recv()); - assert_eq!(s, Ok("Hello World4".to_string())); - - assert!(s1.close().is_ok()); +#[ignore] +fn scheduler() { + let (_, _) = helper::setup(true, 100); + let addr = helper::tcp(); + let (scheduler, mut listen_tx, _, _, _) = Scheduler::new(Pid::new()); + task::block_on(listen_tx.send(addr)).unwrap(); + task::block_on(scheduler.run()); } -*/ + #[test] -fn mpsc_simple() { - let (thread_pool, _) = helper::setup(true, 2300); - let n1 = Network::new(Uuid::new_v4(), thread_pool.clone()); - let n2 = Network::new(Uuid::new_v4(), thread_pool.clone()); - let a1 = Address::Mpsc(42); - let a2 = Address::Mpsc(1337); - //n1.listen(&a1).unwrap(); //await //TODO: evaluate if this should be allowed - // or is forbidden behavior... - n2.listen(&a2).unwrap(); // only requiered here, but doesnt hurt on n1 - thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! - - let p1 = block_on(n1.connect(&a2)).unwrap(); //await - let s1 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); - - assert!(s1.send("Hello World").is_ok()); - - thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! - let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1 - let mut s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1 - - let s: Result = block_on(s1_n2.recv()); - assert_eq!(s, Ok("Hello World".to_string())); - - assert!(s1.close().is_ok()); +#[ignore] +fn channel_creator_test() { + let (_, _) = helper::setup(true, 100); + let (_end_sender, end_receiver) = oneshot::channel::<()>(); + let (part_out_sender, _part_out_receiver) = mpsc::unbounded(); + let (configured_sender, _configured_receiver) = mpsc::unbounded::<(u64, Pid, u64)>(); + let addr = helper::tcp(); + task::block_on(async { + Scheduler::channel_creator( + Arc::new(AtomicU64::new(0)), + Pid::new(), + addr, + end_receiver, + Arc::new(ThreadPool::new().unwrap()), + part_out_sender, + configured_sender, + Arc::new(RwLock::new(HashMap::new())), + ) + .await; + }); }