From 499a895922e27bb6a50666ba2518ca5f6565032d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Tue, 10 Mar 2020 01:07:36 +0100 Subject: [PATCH] shutdown and udp/mpsc - theorectically closing of streams and shutdown - mpsc and udp preparations - cleanup and build better tests --- Cargo.lock | 44 +- Cargo.toml | 2 +- network/Cargo.toml | 7 +- network/src/api.rs | 306 ++++++++----- network/src/channel.rs | 259 +++++++---- network/src/controller.rs | 130 +++++- network/src/lib.rs | 123 +----- network/src/mpsc.rs | 31 +- network/src/prios.rs | 559 ++++++++++++++++++++++++ network/src/tcp.rs | 84 +--- network/src/types.rs | 108 ++++- network/src/udp.rs | 140 +++--- network/src/worker.rs | 54 ++- network/tests/helper.rs | 53 +++ network/tests/integration.rs | 110 +++++ network/tools/async_recv/Cargo.toml | 4 +- network/tools/async_recv/src/main.rs | 8 +- network/tools/network-speed/Cargo.toml | 4 +- network/tools/network-speed/src/main.rs | 17 +- 19 files changed, 1521 insertions(+), 522 deletions(-) create mode 100644 network/src/prios.rs create mode 100644 network/tests/helper.rs create mode 100644 network/tests/integration.rs diff --git a/Cargo.lock b/Cargo.lock index 75f967c88b..734cd258f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -169,7 +169,7 @@ dependencies = [ "tracing-subscriber", "uuid 0.8.1", "uvth", - "veloren-network", + "veloren_network", ] [[package]] @@ -2864,7 +2864,7 @@ dependencies = [ "tracing-subscriber", "uuid 0.8.1", "uvth", - "veloren-network", + "veloren_network", ] [[package]] @@ -5234,25 +5234,6 @@ dependencies = [ "vek 0.10.4", ] -[[package]] -name = "veloren-network" -version = "0.1.0" -dependencies = [ - "bincode", - "byteorder 1.3.4", - "enumset", - "futures 0.3.5", - "mio", - "mio-extras", - "prometheus", - "serde", - "tlid", - "tracing", - "tracing-subscriber", - "uuid 0.8.1", - "uvth", -] - [[package]] name = "veloren-server" version = "0.6.0" @@ -5377,6 +5358,27 @@ dependencies = [ "veloren-common", ] +[[package]] +name = "veloren_network" +version = "0.1.0" +dependencies = [ + "bincode", + "byteorder 1.3.4", + "enumset", + "futures 0.3.5", + "lazy_static", + "mio", + "mio-extras", + "prometheus", + "rand 0.7.3", + "serde", + "tlid", + "tracing", + "tracing-subscriber", + "uuid 0.8.1", + "uvth", +] + [[package]] name = "version_check" version = "0.1.5" diff --git a/Cargo.toml b/Cargo.toml index ba5085b3bf..1e329dce3f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ incremental = true # All dependencies (but not this crate itself) [profile.dev.package."*"] opt-level = 3 -[profile.dev.package."veloren-network"] +[profile.dev.package."veloren_network"] opt-level = 2 [profile.dev.package."veloren-common"] opt-level = 2 diff --git a/network/Cargo.toml b/network/Cargo.toml index e2cefa5411..100bccb97a 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "veloren-network" +name = "veloren_network" version = "0.1.0" authors = ["Marcel Märtens "] edition = "2018" @@ -25,6 +25,9 @@ tracing = "0.1" prometheus = "0.7" #async futures = "0.3" +#mpsc channel registry +lazy_static = "1.4" +rand = "0.7" [dev-dependencies] -tracing-subscriber = "0.2.0-alpha.4" \ No newline at end of file +tracing-subscriber = "0.2.3" \ No newline at end of file diff --git a/network/src/api.rs b/network/src/api.rs index f6dd5c32f9..fe9f7bb97e 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -3,8 +3,9 @@ use crate::{ controller::Controller, message::{self, InCommingMessage, OutGoingMessage}, metrics::NetworkMetrics, + mpsc::MpscChannel, tcp::TcpChannel, - types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg, Sid, TokenObjects}, + types::{CtrlMsg, Pid, Sid, TokenObjects}, }; use enumset::*; use futures::stream::StreamExt; @@ -13,10 +14,11 @@ use mio::{ net::{TcpListener, TcpStream}, PollOpt, Ready, }; +use mio_extras; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{ collections::HashMap, - sync::{mpsc, Arc, RwLock}, + sync::{atomic::AtomicBool, mpsc, Arc, Mutex, RwLock}, }; use tlid; use tracing::*; @@ -27,6 +29,7 @@ use uvth::ThreadPool; pub enum Address { Tcp(std::net::SocketAddr), Udp(std::net::SocketAddr), + Mpsc(u64), } #[derive(Serialize, Deserialize, EnumSetType, Debug)] @@ -38,38 +41,42 @@ pub enum Promise { Encrypted, } +#[derive(Clone)] pub struct Participant { - addr: Address, remote_pid: Pid, network_controller: Arc>, } pub struct Stream { sid: Sid, + remote_pid: Pid, + closed: AtomicBool, + closed_rx: mpsc::Receiver<()>, msg_rx: futures::channel::mpsc::UnboundedReceiver, ctr_tx: mio_extras::channel::Sender, } pub struct Network { - token_pool: tlid::Pool>, - worker_pool: tlid::Pool>, + _token_pool: tlid::Pool>, + _worker_pool: tlid::Pool>, controller: Arc>, - thread_pool: Arc, + _thread_pool: Arc, participant_id: Pid, - remotes: Arc>>, - metrics: Arc>, + sid_backup_per_participant: Arc>>>>, + participants: RwLock>, + _metrics: Arc>, } impl Network { pub fn new(participant_id: Uuid, thread_pool: Arc) -> Self { let mut token_pool = tlid::Pool::new_full(); let mut worker_pool = tlid::Pool::new_full(); - let remotes = Arc::new(RwLock::new(HashMap::new())); + let sid_backup_per_participant = Arc::new(RwLock::new(HashMap::new())); for _ in 0..participant_id.as_u128().rem_euclid(64) { worker_pool.next(); //random offset from 0 for tests where multiple networks are // created and we do not want to polute the traces with - // network pid everytime + // network pid everywhere } let metrics = Arc::new(None); let controller = Arc::new(vec![Controller::new( @@ -78,22 +85,24 @@ impl Network { thread_pool.clone(), token_pool.subpool(1000000).unwrap(), metrics.clone(), - remotes.clone(), + sid_backup_per_participant.clone(), )]); + let participants = RwLock::new(vec![]); Self { - token_pool, - worker_pool, + _token_pool: token_pool, + _worker_pool: worker_pool, controller, - thread_pool, + _thread_pool: thread_pool, participant_id, - remotes, - metrics, + sid_backup_per_participant, + participants, + _metrics: metrics, } } fn get_lowest_worker<'a: 'b, 'b>(list: &'a Arc>) -> &'a Controller { &list[0] } - pub async fn listen(&self, address: &Address) -> Result<(), NetworkError> { + pub fn listen(&self, address: &Address) -> Result<(), NetworkError> { let span = span!(Level::TRACE, "listen", ?address); let worker = Self::get_lowest_worker(&self.controller); let _enter = span.enter(); @@ -107,15 +116,41 @@ impl Network { PollOpt::edge(), ))?; }, - Address::Udp(_) => unimplemented!("lazy me"), + Address::Udp(_) => unimplemented!( + "UDP is currently not supportet problem is in internal worker - channel view. I \ + except to have every Channel it#s own socket, but UDP shares a Socket with \ + everyone on it. So there needs to be a instance that detects new connections \ + inside worker and then creates a new channel for them, while handling needs to \ + be done in UDP layer... however i am to lazy to build it yet." + ), + Address::Mpsc(a) => { + let (listen_tx, listen_rx) = mio_extras::channel::channel(); + let (connect_tx, conntect_rx) = mio_extras::channel::channel(); + let mut registry = (*crate::mpsc::MPSC_REGISTRY).write().unwrap(); + registry.insert(*a, Mutex::new((listen_tx, conntect_rx))); + info!("listening"); + let mpsc_channel = MpscChannel::new(connect_tx, listen_rx); + let mut channel = Channel::new( + self.participant_id, + ChannelProtocols::Mpsc(mpsc_channel), + self.sid_backup_per_participant.clone(), + None, + ); + channel.handshake(); + channel.tick_send(); + worker.get_tx().send(CtrlMsg::Register( + TokenObjects::Channel(channel), + Ready::readable() | Ready::writable(), + PollOpt::edge(), + ))?; + }, }; Ok(()) } pub async fn connect(&self, address: &Address) -> Result { let worker = Self::get_lowest_worker(&self.controller); - let pid = self.participant_id; - let remotes = self.remotes.clone(); + let sid_backup_per_participant = self.sid_backup_per_participant.clone(); let span = span!(Level::INFO, "connect", ?address); let _enter = span.enter(); match address { @@ -125,9 +160,9 @@ impl Network { let tcp_channel = TcpChannel::new(tcp_stream); let (ctrl_tx, ctrl_rx) = mpsc::channel::(); let channel = Channel::new( - pid, + self.participant_id, ChannelProtocols::Tcp(tcp_channel), - remotes, + sid_backup_per_participant, Some(ctrl_tx), ); worker.get_tx().send(CtrlMsg::Register( @@ -137,23 +172,57 @@ impl Network { ))?; let remote_pid = ctrl_rx.recv().unwrap(); info!(?remote_pid, " sucessfully connected to"); - return Ok(Participant { - addr: address.clone(), + let part = Participant { remote_pid, network_controller: self.controller.clone(), - }); + }; + self.participants.write().unwrap().push(part.clone()); + return Ok(part); }, Address::Udp(_) => unimplemented!("lazy me"), + Address::Mpsc(a) => { + let mut registry = (*crate::mpsc::MPSC_REGISTRY).write().unwrap(); + let (listen_tx, conntect_rx) = match registry.remove(a) { + Some(x) => x.into_inner().unwrap(), + None => { + error!("could not connect to mpsc"); + return Err(NetworkError::NetworkDestroyed); + }, + }; + info!("connect to mpsc"); + let mpsc_channel = MpscChannel::new(listen_tx, conntect_rx); + let (ctrl_tx, ctrl_rx) = mpsc::channel::(); + let channel = Channel::new( + self.participant_id, + ChannelProtocols::Mpsc(mpsc_channel), + self.sid_backup_per_participant.clone(), + Some(ctrl_tx), + ); + worker.get_tx().send(CtrlMsg::Register( + TokenObjects::Channel(channel), + Ready::readable() | Ready::writable(), + PollOpt::edge(), + ))?; + + let remote_pid = ctrl_rx.recv().unwrap(); + info!(?remote_pid, " sucessfully connected to"); + let part = Participant { + remote_pid, + network_controller: self.controller.clone(), + }; + self.participants.write().unwrap().push(part.clone()); + return Ok(part); + }, } } - //TODO: evaluate if move to Participant - pub async fn _disconnect(&self, participant: Participant) -> Result<(), NetworkError> { - panic!("sda"); + pub fn disconnect(&self, _participant: Participant) -> Result<(), NetworkError> { + //todo: close all channels to a participant! + unimplemented!("sda"); } - pub fn participants(&self) -> Vec { - panic!("sda"); + pub fn participants(&self) -> std::sync::RwLockReadGuard> { + self.participants.read().unwrap() } pub async fn connected(&self) -> Result { @@ -162,25 +231,21 @@ impl Network { //ARRGGG for worker in self.controller.iter() { //TODO harden! - if let Ok(msg) = worker.get_rx().try_recv() { - if let RtrnMsg::ConnectedParticipant { pid } = msg { - return Ok(Participant { - addr: Address::Tcp(std::net::SocketAddr::from(([1, 3, 3, 7], 1337))), /* TODO: FIXME */ - remote_pid: pid, - network_controller: self.controller.clone(), - }); - } + worker.tick(); + if let Ok(remote_pid) = worker.get_participant_connect_rx().try_recv() { + let part = Participant { + remote_pid, + network_controller: self.controller.clone(), + }; + self.participants.write().unwrap().push(part.clone()); + return Ok(part); }; } + std::thread::sleep(std::time::Duration::from_millis(1)); } } - pub async fn _disconnected(&self) -> Result { - // returns if a Participant connected and is ready - panic!("sda"); - } - - pub async fn multisend( + pub fn multisend( &self, streams: Vec, msg: M, @@ -206,92 +271,91 @@ impl Network { } impl Participant { - pub async fn open( - &self, - prio: u8, - promises: EnumSet, - ) -> Result { - let (ctrl_tx, ctrl_rx) = mpsc::channel::(); + pub fn open(&self, prio: u8, promises: EnumSet) -> Result { let (msg_tx, msg_rx) = futures::channel::mpsc::unbounded::(); for controller in self.network_controller.iter() { + //trigger tick: + controller.tick(); + let parts = controller.participants(); + let (stream_close_tx, stream_close_rx) = mpsc::channel(); + let sid = match parts.get(&self.remote_pid) { + Some(p) => { + let sid = p.sid_pool.write().unwrap().next(); + //prepare the closing of the new stream already + p.stream_close_txs + .write() + .unwrap() + .insert(sid, stream_close_tx); + sid + }, + None => return Err(ParticipantError::ParticipantDisconected), /* TODO: participant was never connected in the first case maybe... */ + }; let tx = controller.get_tx(); tx.send(CtrlMsg::OpenStream { pid: self.remote_pid, + sid, prio, promises, - return_sid: ctrl_tx, msg_tx, }) .unwrap(); - - // I dont like the fact that i need to wait on the worker thread for getting my - // sid back :/ we could avoid this by introducing a Thread Local Network - // which owns some sids we can take without waiting - let sid = ctrl_rx.recv().unwrap(); info!(?sid, " sucessfully opened stream"); - return Ok(Stream { + return Ok(Stream::new( sid, + self.remote_pid, + stream_close_rx, msg_rx, - ctr_tx: tx, - }); - } - Err(ParticipantError::ParticipantDisconected) - } - - pub fn close(&self, stream: Stream) -> Result<(), ParticipantError> { - for controller in self.network_controller.iter() { - let tx = controller.get_tx(); - tx.send(CtrlMsg::CloseStream { - pid: self.remote_pid, - sid: stream.sid, - }) - .unwrap(); - return Ok(()); + tx, + )); } Err(ParticipantError::ParticipantDisconected) } pub async fn opened(&self) -> Result { + //TODO: make this async native! loop { - //ARRGGG + // Going to all workers in a network, but only receive on specific channels! for worker in self.network_controller.iter() { - //TODO harden! - if let Ok(msg) = worker.get_rx().try_recv() { - if let RtrnMsg::OpendStream { - pid, - sid, - prio, - msg_rx, - promises, - } = msg - { - return Ok(Stream { - sid, - msg_rx, - ctr_tx: worker.get_tx(), - }); - } - }; + worker.tick(); + let parts = worker.participants(); + if let Some(p) = parts.get(&self.remote_pid) { + if let Ok(stream) = p.stream_open_rx.try_recv() { + //need a try, as i depend on the tick, it's the same thread... + debug!("delivering a stream"); + return Ok(stream); + }; + } } } } - - pub async fn _closed(&self) -> Result { - panic!("aaa"); - } } impl Stream { //TODO: What about SEND instead of Serializeable if it goes via PIPE ? - //TODO: timeout per message or per stream ? stream or ? + //TODO: timeout per message or per stream ? stream or ? like for Position Data, + // if not transmitted within 1 second, throw away... + pub(crate) fn new( + sid: Sid, + remote_pid: Pid, + closed_rx: mpsc::Receiver<()>, + msg_rx: futures::channel::mpsc::UnboundedReceiver, + ctr_tx: mio_extras::channel::Sender, + ) -> Self { + Self { + sid, + remote_pid, + closed: AtomicBool::new(false), + closed_rx, + msg_rx, + ctr_tx, + } + } pub fn send(&self, msg: M) -> Result<(), StreamError> { + if self.is_closed() { + return Err(StreamError::StreamClosed); + } let messagebuffer = Arc::new(message::serialize(&msg)); - //transfer message to right worker to right channel to correct stream - //TODO: why do we need a look here, i want my own local directory which is - // updated by workes via a channel and needs to be intepreted on a send but it - // should almost ever be empty except for new channel creations and stream - // creations! self.ctr_tx .send(CtrlMsg::Send(OutGoingMessage { buffer: messagebuffer, @@ -304,6 +368,9 @@ impl Stream { } pub async fn recv(&mut self) -> Result { + if self.is_closed() { + return Err(StreamError::StreamClosed); + } match self.msg_rx.next().await { Some(msg) => { info!(?msg, "delivering a message"); @@ -315,6 +382,45 @@ impl Stream { ), } } + + pub fn close(mut self) -> Result<(), StreamError> { self.intclose() } + + fn is_closed(&self) -> bool { + use core::sync::atomic::Ordering; + if self.closed.load(Ordering::Relaxed) { + true + } else { + if let Ok(()) = self.closed_rx.try_recv() { + self.closed.store(true, Ordering::SeqCst); //TODO: Is this the right Ordering? + true + } else { + false + } + } + } + + fn intclose(&mut self) -> Result<(), StreamError> { + use core::sync::atomic::Ordering; + if self.is_closed() { + return Err(StreamError::StreamClosed); + } + self.ctr_tx + .send(CtrlMsg::CloseStream { + pid: self.remote_pid, + sid: self.sid, + }) + .unwrap(); + self.closed.store(true, Ordering::SeqCst); //TODO: Is this the right Ordering? + Ok(()) + } +} + +impl Drop for Stream { + fn drop(&mut self) { + let _ = self.intclose().map_err( + |e| error!(?self.sid, ?e, "could not properly shutdown stream, which got out of scope"), + ); + } } #[derive(Debug)] diff --git a/network/src/channel.rs b/network/src/channel.rs index 64905a2ab1..d82e0400d5 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -4,18 +4,19 @@ use crate::{ mpsc::MpscChannel, tcp::TcpChannel, types::{ - Frame, IntStream, Mid, Pid, RemoteParticipant, RtrnMsg, Sid, VELOREN_MAGIC_NUMBER, + Frame, IntStream, Pid, RtrnMsg, Sid, DEFAULT_SID_SIZE, VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION, }, udp::UdpChannel, }; use enumset::EnumSet; use futures::{executor::block_on, sink::SinkExt}; -use mio_extras::channel::Sender; +use rand::{thread_rng, Rng}; use std::{ collections::{HashMap, VecDeque}, - sync::{Arc, RwLock}, + sync::{mpsc, Arc, RwLock}, }; +use tlid; use tracing::*; pub(crate) trait ChannelProtocol { @@ -39,11 +40,11 @@ pub(crate) enum ChannelProtocols { pub(crate) struct Channel { pub stream_id_pool: Option>>, /* TODO: stream_id unique per * participant */ - pub msg_id_pool: Option>>, //TODO: msg_id unique per - // participant + // participantd + pub randomno: u64, pub local_pid: Pid, pub remote_pid: Option, - pub remotes: Arc>>, + pub sid_backup_per_participant: Arc>>>>, pub streams: Vec, pub send_queue: VecDeque, pub protocol: ChannelProtocols, @@ -84,15 +85,17 @@ impl Channel { pub fn new( local_pid: Pid, protocol: ChannelProtocols, - remotes: Arc>>, + sid_backup_per_participant: Arc>>>>, return_pid_to: Option>, ) -> Self { + let randomno = thread_rng().gen(); + warn!(?randomno, "new channel,yay "); Self { + randomno, stream_id_pool: None, - msg_id_pool: None, local_pid, remote_pid: None, - remotes, + sid_backup_per_participant, streams: Vec::new(), send_queue: VecDeque::new(), protocol, @@ -118,21 +121,25 @@ impl Channel { && !self.recv_shutdown } - pub fn tick_recv(&mut self, rtrn_tx: &Sender) { + pub fn tick_recv( + &mut self, + worker_participants: &mut HashMap>>, + rtrn_tx: &mpsc::Sender, + ) { match &mut self.protocol { ChannelProtocols::Tcp(c) => { for frame in c.read() { - self.handle(frame, rtrn_tx); + self.handle(frame, worker_participants, rtrn_tx); } }, ChannelProtocols::Udp(c) => { for frame in c.read() { - self.handle(frame, rtrn_tx); + self.handle(frame, worker_participants, rtrn_tx); } }, ChannelProtocols::Mpsc(c) => { for frame in c.read() { - self.handle(frame, rtrn_tx); + self.handle(frame, worker_participants, rtrn_tx); } }, } @@ -153,7 +160,12 @@ impl Channel { } } - fn handle(&mut self, frame: Frame, rtrn_tx: &Sender) { + fn handle( + &mut self, + frame: Frame, + worker_participants: &mut HashMap>>, + rtrn_tx: &mpsc::Sender, + ) { match frame { Frame::Handshake { magic_number, @@ -202,32 +214,54 @@ impl Channel { debug!(?pid, "Participant send their ID"); self.recv_pid = true; if self.send_pid { - let mut remotes = self.remotes.write().unwrap(); - if !remotes.contains_key(&pid) { - remotes.insert(pid, RemoteParticipant::new()); + //If participant is unknown to worker, assign some range from global pool + if !worker_participants.contains_key(&pid) { + let mut global_participants = + self.sid_backup_per_participant.write().unwrap(); + //if this is the first time a participant connects to this Controller + if !global_participants.contains_key(&pid) { + // I dont no participant, so i can safely assume that they don't know + // me. so HERE we gonna fill local network pool + global_participants.insert(pid, tlid::Pool::new_full()); + } + //grab a range for controller + let global_part_pool = global_participants.get_mut(&pid).unwrap(); + + let mut local_controller_sids = + tlid::subpool_wrapping(global_part_pool, DEFAULT_SID_SIZE).unwrap(); + let remote_controller_sids = + tlid::subpool_wrapping(global_part_pool, DEFAULT_SID_SIZE).unwrap(); + let mut local_worker_sids = + tlid::subpool_wrapping(global_part_pool, DEFAULT_SID_SIZE).unwrap(); + let remote_worker_sids = + tlid::subpool_wrapping(global_part_pool, DEFAULT_SID_SIZE).unwrap(); + + let local_controller_range = + tlid::RemoveAllocation::new(&mut local_controller_sids); + let local_worker_range = + tlid::RemoveAllocation::new(&mut local_worker_sids); + + worker_participants.insert(pid.clone(), local_worker_sids); + self.send_queue.push_back(Frame::Configure { + sender_controller_sids: local_controller_range, + sender_worker_sids: local_worker_range, + receiver_controller_sids: remote_controller_sids, + receiver_worker_sids: remote_worker_sids, + }); + self.send_config = true; + info!(?pid, "this channel is now configured!"); + if let Err(err) = rtrn_tx.send(RtrnMsg::ConnectedParticipant { + controller_sids: local_controller_sids, + pid, + }) { + error!(?err, "couldn't notify, is network already closed ?"); + } } else { warn!( "a known participant opened an additional channel, UNCHECKED BECAUSE \ NO TOKEN WAS IMPLEMENTED IN THE HANDSHAKE!" ); } - if let Some(rp) = remotes.get_mut(&pid) { - self.stream_id_pool = Some(rp.stream_id_pool.subpool(1000000).unwrap()); - self.msg_id_pool = Some(rp.msg_id_pool.subpool(1000000).unwrap()); - self.send_queue.push_back(Frame::Configure { - stream_id_pool: rp.stream_id_pool.subpool(1000000).unwrap(), - msg_id_pool: rp.msg_id_pool.subpool(1000000).unwrap(), - }); - self.send_config = true; - info!(?pid, "this channel is now configured!"); - if let Err(err) = rtrn_tx.send(RtrnMsg::ConnectedParticipant { pid }) { - error!( - ?err, - "couldn't notify of connected participant, is network already \ - closed ?" - ); - } - } } else { self.send_queue.push_back(Frame::ParticipantId { pid: self.local_pid, @@ -236,20 +270,47 @@ impl Channel { } }, Frame::Configure { - stream_id_pool, - msg_id_pool, + sender_controller_sids, + sender_worker_sids, + mut receiver_controller_sids, + mut receiver_worker_sids, } => { + let pid = match self.remote_pid { + Some(pid) => pid, + None => { + error!("Cant configure a Channel without a PID first!"); + return; + }, + }; self.recv_config = true; - //TODO remove range from rp! as this could probably cause duplicate ID !!! - let mut remotes = self.remotes.write().unwrap(); - if let Some(pid) = self.remote_pid { - if !remotes.contains_key(&pid) { - remotes.insert(pid, RemoteParticipant::new()); + //Check if worker already knows about this participant + if !worker_participants.contains_key(&pid) { + let mut global_participants = self.sid_backup_per_participant.write().unwrap(); + if !global_participants.contains_key(&pid) { + // I dont no participant, so i can safely assume that they don't know me. so + // HERE we gonna fill local network pool + global_participants.insert(pid, tlid::Pool::new_full()); } - if let Some(_rp) = remotes.get_mut(&pid) { - //TODO: make use of RemoteParticipant - self.stream_id_pool = Some(stream_id_pool); - self.msg_id_pool = Some(msg_id_pool); + //grab a range for controller + let global_part_pool = global_participants.get_mut(&pid).unwrap(); + + sender_controller_sids + .remove_from(global_part_pool) + .unwrap(); + sender_worker_sids.remove_from(global_part_pool).unwrap(); + tlid::RemoveAllocation::new(&mut receiver_controller_sids) + .remove_from(global_part_pool) + .unwrap(); + tlid::RemoveAllocation::new(&mut receiver_worker_sids) + .remove_from(global_part_pool) + .unwrap(); + + worker_participants.insert(pid.clone(), receiver_worker_sids); + if let Err(err) = rtrn_tx.send(RtrnMsg::ConnectedParticipant { + pid, + controller_sids: receiver_controller_sids, + }) { + error!(?err, "couldn't notify, is network already closed ?"); } if let Some(send) = &self.return_pid_to { if let Err(err) = send.send(pid) { @@ -262,11 +323,14 @@ impl Channel { }; self.return_pid_to = None; } else { - warn!(?self, "Protocol is done wrong!"); + warn!( + "a known participant opened an additional channel, UNCHECKED BECAUSE NO \ + TOKEN WAS IMPLEMENTED IN THE HANDSHAKE!" + ); } info!("recv config. This channel is now configured!"); }, - Frame::Shutdown {} => { + Frame::Shutdown => { self.recv_shutdown = true; info!("shutting down channel"); if let Err(err) = rtrn_tx.send(RtrnMsg::Shutdown) { @@ -281,7 +345,10 @@ impl Channel { if let Some(pid) = self.remote_pid { let (msg_tx, msg_rx) = futures::channel::mpsc::unbounded::(); let stream = IntStream::new(sid, prio, promises.clone(), msg_tx); + + trace!(?self.streams, "-OPEN STREAM- going to modify streams"); self.streams.push(stream); + trace!(?self.streams, "-OPEN STREAM- did to modify streams"); info!("opened a stream"); if let Err(err) = rtrn_tx.send(RtrnMsg::OpendStream { pid, @@ -298,7 +365,9 @@ impl Channel { }, Frame::CloseStream { sid } => { if let Some(pid) = self.remote_pid { + trace!(?self.streams, "-CLOSE STREAM- going to modify streams"); self.streams.retain(|stream| stream.sid() != sid); + trace!(?self.streams, "-CLOSE STREAM- did to modify streams"); info!("closed a stream"); if let Err(err) = rtrn_tx.send(RtrnMsg::ClosedStream { pid, sid }) { error!(?err, "couldn't notify of closed stream"); @@ -379,38 +448,36 @@ impl Channel { fn tick_streams(&mut self) { //ignoring prio for now //TODO: fix prio - if let Some(msg_id_pool) = &mut self.msg_id_pool { - for s in &mut self.streams { - let mut remove = false; - let sid = s.sid(); - if let Some(m) = s.to_send.front_mut() { - let to_send = std::cmp::min(m.buffer.data.len() as u64 - m.cursor, 1400); - if to_send > 0 { - if m.cursor == 0 { - let mid = msg_id_pool.next(); - m.mid = Some(mid); - self.send_queue.push_back(Frame::DataHeader { - mid, - sid, - length: m.buffer.data.len() as u64, - }); - } - self.send_queue.push_back(Frame::Data { - id: m.mid.unwrap(), - start: m.cursor, - data: m.buffer.data[m.cursor as usize..(m.cursor + to_send) as usize] - .to_vec(), + for s in &mut self.streams { + let mut remove = false; + let sid = s.sid(); + if let Some(m) = s.to_send.front_mut() { + let to_send = std::cmp::min(m.buffer.data.len() as u64 - m.cursor, 1400); + if to_send > 0 { + if m.cursor == 0 { + let mid = s.mid_pool.next(); + m.mid = Some(mid); + self.send_queue.push_back(Frame::DataHeader { + mid, + sid, + length: m.buffer.data.len() as u64, }); - }; - m.cursor += to_send; - if m.cursor == m.buffer.data.len() as u64 { - remove = true; - debug!(?m.mid, "finish message") } + self.send_queue.push_back(Frame::Data { + id: m.mid.unwrap(), + start: m.cursor, + data: m.buffer.data[m.cursor as usize..(m.cursor + to_send) as usize] + .to_vec(), + }); + }; + m.cursor += to_send; + if m.cursor == m.buffer.data.len() as u64 { + remove = true; + debug!(?m.mid, "finish message") } - if remove { - s.to_send.pop_front(); - } + } + if remove { + s.to_send.pop_front(); } } } @@ -427,29 +494,37 @@ impl Channel { pub(crate) fn open_stream( &mut self, + sid: Sid, prio: u8, promises: EnumSet, msg_tx: futures::channel::mpsc::UnboundedSender, - ) -> Sid { + ) { // validate promises - if let Some(stream_id_pool) = &mut self.stream_id_pool { - let sid = stream_id_pool.next(); - trace!(?sid, "going to open a new stream"); - let stream = IntStream::new(sid, prio, promises.clone(), msg_tx); - self.streams.push(stream); - self.send_queue.push_back(Frame::OpenStream { - sid, - prio, - promises, - }); - return sid; - } else { - panic!("cant open stream because connection isn't initialized"); + trace!(?sid, "going to open a new stream"); + let stream = IntStream::new(sid, prio, promises.clone(), msg_tx); + trace!(?sid, "1"); + self.streams.push(stream); + trace!(?sid, "2"); + trace!(?self.streams, ?self.randomno, "2b"); + if self.streams.len() >= 0 { + // breakpoint here + let a = self.streams.len(); + if a > 1000 { + //this will never happen but is a blackbox to catch a + panic!("dasd"); + } } + self.send_queue.push_back(Frame::OpenStream { + sid, + prio, + promises, + }); } pub(crate) fn close_stream(&mut self, sid: Sid) { + trace!(?self.streams, "--CLOSE STREAM-- going to modify streams"); self.streams.retain(|stream| stream.sid() != sid); + trace!(?self.streams, "--CLOSE STREAM-- did to modify streams"); self.send_queue.push_back(Frame::CloseStream { sid }); } @@ -467,12 +542,16 @@ impl Channel { } pub(crate) fn send(&mut self, outgoing: OutGoingMessage) { + trace!(?outgoing.sid, "3"); + trace!(?self.streams, ?self.randomno, "3b"); + for s in self.streams.iter_mut() { if s.sid() == outgoing.sid { s.to_send.push_back(outgoing); return; } } + trace!(?outgoing.sid, "4"); let sid = &outgoing.sid; error!(?sid, "couldn't send message, didn't found sid") } diff --git a/network/src/controller.rs b/network/src/controller.rs index 54204f67b3..ce9bf2dcc6 100644 --- a/network/src/controller.rs +++ b/network/src/controller.rs @@ -5,28 +5,42 @@ communication is done via channels. */ use crate::{ + api::Stream, metrics::NetworkMetrics, - types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg}, + types::{CtrlMsg, Pid, RtrnMsg, Sid}, worker::Worker, }; use mio::{self, Poll, PollOpt, Ready, Token}; -use mio_extras::channel::{channel, Receiver, Sender}; +use mio_extras::channel; use std::{ collections::HashMap, - sync::{Arc, RwLock}, + sync::{mpsc, Arc, RwLock, RwLockReadGuard}, }; use tlid; use tracing::*; use uvth::ThreadPool; +pub struct ControllerParticipant { + pub sid_pool: RwLock>>, + //TODO: move this in a future aware variant! via futures Channels + stream_open_tx: mpsc::Sender, + pub stream_open_rx: mpsc::Receiver, + pub stream_close_txs: RwLock>>, +} + /* The MioWorker runs in it's own thread, it has a given set of Channels to work with. It is monitored, and when it's thread is fully loaded it can be splitted up into 2 MioWorkers */ pub struct Controller { - ctrl_tx: Sender, - rtrn_rx: Receiver, + ctrl_tx: channel::Sender, + rtrn_rx: mpsc::Receiver, + + participant_connect_tx: mpsc::Sender, + participant_connect_rx: mpsc::Receiver, + + participants: RwLock>, } impl Controller { @@ -38,12 +52,13 @@ impl Controller { thread_pool: Arc, mut token_pool: tlid::Pool>, metrics: Arc>, - remotes: Arc>>, + sid_backup_per_participant: Arc>>>>, ) -> Self { let poll = Arc::new(Poll::new().unwrap()); - let (ctrl_tx, ctrl_rx) = channel(); - let (rtrn_tx, rtrn_rx) = channel(); + let (ctrl_tx, ctrl_rx) = channel::channel(); + let (rtrn_tx, rtrn_rx) = mpsc::channel(); + let (participant_connect_tx, participant_connect_rx) = mpsc::channel(); poll.register(&ctrl_rx, Self::CTRL_TOK, Ready::readable(), PollOpt::edge()) .unwrap(); // reserve 10 tokens in case they start with 0, //TODO: cleaner method @@ -55,17 +70,108 @@ impl Controller { let w = wid; let span = span!(Level::INFO, "worker", ?w); let _enter = span.enter(); - let mut worker = Worker::new(pid, poll, metrics, remotes, token_pool, ctrl_rx, rtrn_tx); + let mut worker = Worker::new( + pid, + poll, + metrics, + sid_backup_per_participant, + token_pool, + ctrl_rx, + rtrn_tx, + ); worker.run(); }); - Controller { ctrl_tx, rtrn_rx } + let participants = RwLock::new(HashMap::new()); + Controller { + ctrl_tx, + rtrn_rx, + participant_connect_rx, + participant_connect_tx, + participants, + } } //TODO: split 4->5 MioWorkers and merge 5->4 MioWorkers - pub(crate) fn get_tx(&self) -> Sender { self.ctrl_tx.clone() } + pub(crate) fn get_tx(&self) -> channel::Sender { self.ctrl_tx.clone() } - pub(crate) fn get_rx(&self) -> &Receiver { &self.rtrn_rx } + pub(crate) fn get_participant_connect_rx(&self) -> &mpsc::Receiver { + &self.participant_connect_rx + } + + pub(crate) fn tick(&self) { + for msg in self.rtrn_rx.try_iter() { + match msg { + /*TODO: WAIT, THIS ASSUMES CONNECTED PARTICIPANT IS ONLY EVER TRIGGERED ONCE PER CONTROLLER + that means, that it can happen multiple time for the same participant on multiple controller, + and even multiple channel on one worker shouldn't trigger it*/ + RtrnMsg::ConnectedParticipant { + pid, + controller_sids, + } => { + let mut parts = self.participants.write().unwrap(); + debug!( + ?pid, + "A new participant connected to this channel, we assign it the sid pool" + ); + let (stream_open_tx, stream_open_rx) = mpsc::channel(); + let part = ControllerParticipant { + sid_pool: RwLock::new(controller_sids), + stream_open_tx, + stream_open_rx, + stream_close_txs: RwLock::new(HashMap::new()), + }; + parts.insert(pid.clone(), part); + self.participant_connect_tx.send(pid).unwrap(); + }, + RtrnMsg::OpendStream { + pid, + sid, + prio: _, + msg_rx, + promises: _, + } => { + trace!( + ?pid, + ?sid, + "A new stream was opened on this channel, we assign it the participant" + ); + let parts = self.participants.read().unwrap(); + if let Some(p) = parts.get(&pid) { + let (stream_close_tx, stream_close_rx) = mpsc::channel(); + p.stream_close_txs + .write() + .unwrap() + .insert(sid, stream_close_tx); + p.stream_open_tx + .send(Stream::new( + sid, + pid, + stream_close_rx, + msg_rx, + self.ctrl_tx.clone(), + )) + .unwrap(); + } + }, + RtrnMsg::ClosedStream { pid, sid } => { + trace!(?pid, ?sid, "Stream got closeed, will route message"); + let parts = self.participants.read().unwrap(); + if let Some(p) = parts.get(&pid) { + if let Some(tx) = p.stream_close_txs.read().unwrap().get(&sid) { + tx.send(()).unwrap(); + trace!(?pid, ?sid, "routed message"); + } + } + }, + _ => {}, + } + } + } + + pub(crate) fn participants(&self) -> RwLockReadGuard> { + self.participants.read().unwrap() + } } impl Drop for Controller { fn drop(&mut self) { let _ = self.ctrl_tx.send(CtrlMsg::Shutdown); } diff --git a/network/src/lib.rs b/network/src/lib.rs index 6f593db72f..943dc9679f 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -5,6 +5,7 @@ mod controller; mod message; mod metrics; mod mpsc; +mod prios; mod tcp; mod types; mod udp; @@ -13,125 +14,3 @@ mod worker; pub use api::{ Address, Network, NetworkError, Participant, ParticipantError, Promise, Stream, StreamError, }; - -#[cfg(test)] -pub mod tests { - use crate::api::*; - use futures::executor::block_on; - use std::{net::SocketAddr, sync::Arc, thread, time::Duration}; - use tracing::*; - use tracing_subscriber::EnvFilter; - use uuid::Uuid; - use uvth::ThreadPoolBuilder; - - pub fn test_tracing() { - let filter = EnvFilter::from_default_env() - //.add_directive("[worker]=trace".parse().unwrap()) - .add_directive("trace".parse().unwrap()) - .add_directive("veloren_network::tests=trace".parse().unwrap()) - .add_directive("veloren_network::worker=debug".parse().unwrap()) - .add_directive("veloren_network::controller=trace".parse().unwrap()) - .add_directive("veloren_network::channel=trace".parse().unwrap()) - .add_directive("veloren_network::message=trace".parse().unwrap()) - .add_directive("veloren_network::metrics=trace".parse().unwrap()) - .add_directive("veloren_network::types=trace".parse().unwrap()) - .add_directive("veloren_network::mpsc=debug".parse().unwrap()) - .add_directive("veloren_network::udp=debug".parse().unwrap()) - .add_directive("veloren_network::tcp=debug".parse().unwrap()); - - tracing_subscriber::FmtSubscriber::builder() - // all spans/events with a level higher than TRACE (e.g, info, warn, etc.) - // will be written to stdout. - .with_max_level(Level::TRACE) - .with_env_filter(filter) - // sets this to be the default, global subscriber for this application. - .init(); - } - - #[test] - fn aaa() { test_tracing(); } - - #[test] - fn client_server() { - let thread_pool = Arc::new( - ThreadPoolBuilder::new() - .name("veloren-network-test".into()) - .build(), - ); - thread::sleep(Duration::from_millis(200)); - let n1 = Network::new(Uuid::new_v4(), thread_pool.clone()); - let n2 = Network::new(Uuid::new_v4(), thread_pool.clone()); - let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000))); - let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52001))); - block_on(n1.listen(&a1)).unwrap(); //await - block_on(n2.listen(&a2)).unwrap(); // only requiered here, but doesnt hurt on n1 - thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! - - let p1 = block_on(n1.connect(&a2)).unwrap(); //await - let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); - - assert!(s1.send("Hello World").is_ok()); - - let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1 - let mut s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1 - - let s: Result = block_on(s1_n2.recv()); - assert_eq!(s, Ok("Hello World".to_string())); - - assert!(p1.close(s1).is_ok()); - } - - #[test] - fn client_server_stream() { - let thread_pool = Arc::new( - ThreadPoolBuilder::new() - .name("veloren-network-test".into()) - .build(), - ); - thread::sleep(Duration::from_millis(400)); - let n1 = Network::new(Uuid::new_v4(), thread_pool.clone()); - let n2 = Network::new(Uuid::new_v4(), thread_pool.clone()); - let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52010))); - let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52011))); - - block_on(n1.listen(&a1)).unwrap(); //await - block_on(n2.listen(&a2)).unwrap(); // only requiered here, but doesnt hurt on n1 - thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! - - let p1 = block_on(n1.connect(&a2)).unwrap(); //await - - let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); - let s2 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); - let s3 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); - let s4 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); - let s5 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); - - assert!(s3.send("Hello World3").is_ok()); - assert!(s1.send("Hello World1").is_ok()); - assert!(s5.send("Hello World5").is_ok()); - assert!(s2.send("Hello World2").is_ok()); - assert!(s4.send("Hello World4").is_ok()); - - let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1 - let mut s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1 - let mut s2_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s2 - let mut s3_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s3 - let mut s4_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s4 - let mut s5_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s5 - - info!("all streams opened"); - - let s: Result = block_on(s3_n2.recv()); - assert_eq!(s, Ok("Hello World3".to_string())); - let s: Result = block_on(s1_n2.recv()); - assert_eq!(s, Ok("Hello World1".to_string())); - let s: Result = block_on(s2_n2.recv()); - assert_eq!(s, Ok("Hello World2".to_string())); - let s: Result = block_on(s5_n2.recv()); - assert_eq!(s, Ok("Hello World5".to_string())); - let s: Result = block_on(s4_n2.recv()); - assert_eq!(s, Ok("Hello World4".to_string())); - - assert!(p1.close(s1).is_ok()); - } -} diff --git a/network/src/mpsc.rs b/network/src/mpsc.rs index d1b70604ee..598bc3d092 100644 --- a/network/src/mpsc.rs +++ b/network/src/mpsc.rs @@ -1,13 +1,30 @@ use crate::{channel::ChannelProtocol, types::Frame}; +use lazy_static::lazy_static; // 1.4.0 use mio_extras::channel::{Receiver, Sender}; +use std::{ + collections::HashMap, + sync::{Mutex, RwLock}, +}; use tracing::*; +lazy_static! { + pub(crate) static ref MPSC_REGISTRY: RwLock, Receiver)>>> = + RwLock::new(HashMap::new()); +} + pub(crate) struct MpscChannel { endpoint_sender: Sender, endpoint_receiver: Receiver, } -impl MpscChannel {} +impl MpscChannel { + pub fn new(endpoint_sender: Sender, endpoint_receiver: Receiver) -> Self { + Self { + endpoint_sender, + endpoint_receiver, + } + } +} impl ChannelProtocol for MpscChannel { type Handle = Receiver; @@ -22,11 +39,13 @@ impl ChannelProtocol for MpscChannel { result.push(frame); }, Err(std::sync::mpsc::TryRecvError::Empty) => { - debug!("would block"); + debug!("read would block"); break; }, Err(std::sync::mpsc::TryRecvError::Disconnected) => { - panic!("disconnected"); + trace!(?self, "shutdown of mpsc channel detected"); + result.push(Frame::Shutdown); + break; }, }; } @@ -42,9 +61,13 @@ impl ChannelProtocol for MpscChannel { Err(mio_extras::channel::SendError::Io(e)) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("would block"); + debug!("write would block"); return; } + Err(mio_extras::channel::SendError::Disconnected(frame)) => { + trace!(?frame, ?self, "shutdown of mpsc channel detected"); + return; + }, Err(e) => { panic!("{}", e); }, diff --git a/network/src/prios.rs b/network/src/prios.rs new file mode 100644 index 0000000000..9abb6d3305 --- /dev/null +++ b/network/src/prios.rs @@ -0,0 +1,559 @@ +/* + +This will become a single class, +it contains a list of all open Channels and all Participants and all streams. +Important, we need to change stream ids to be unique per participant +and msg ids need to be unique per participant too. The other way would be to always send sid with Data Frame but this is to much overhead. + +We need a external (like timer like) Future that opens a thread in threadpool, and is Ready once serialized + +We should focus in this implementation on the routing side, Prio and choosing the correct Protocol. +A Message should be delivered over 2 Channels, e.g. Create Info via TCP and data via UDP. keep in mind that UDP might be read before TCP is read... + +maybe even a future that builds together a message from incremental steps. + +Or a future that sends a message, however on each seend prio needs to be considered, maybe overkill. + + +it should be quite easy as all is in one thread now, but i am still not sure if its in the same as the network, or if we still have a sperate one, +probably start with a seperate thread for now. + +Focus on the routing for now, and ignoring protocols and details... +*/ + +/* +Priorities are handled the following way. +Prios from 0-63 are allowed. +all 5 numbers the throughput i halved. +E.g. in the same time 100 prio0 messages are send, only 50 prio5, 25 prio10, 12 prio15 or 6 prio20 messages are send. +Node: TODO: prio0 will be send immeadiatly when found! +*/ + +/* +algo: +let past = [u64, 100] = [0,0,0,0..] +send_prio0() +past[0] += 100; +#check_next_prio +if past[0] - past[1] > prio_numbers[1] { + sendprio1(); + past[1] += 100; + if past[0] - past[2] > prio_numbers[2] { + sendprio2(); + past[2] += 100; + } +} + + +*/ + +use crate::{message::OutGoingMessage, types::Frame}; +use std::{ + collections::{HashSet, VecDeque}, + sync::mpsc::{channel, Receiver, Sender}, +}; + +const PRIO_MAX: usize = 64; + +struct PrioManager { + points: [u32; PRIO_MAX], + messages: [VecDeque; PRIO_MAX], + messages_tx: Sender<(u8, OutGoingMessage)>, + messages_rx: Receiver<(u8, OutGoingMessage)>, + queued: HashSet, +} + +impl PrioManager { + const FRAME_DATA_SIZE: u64 = 1400; + const PRIOS: [u32; PRIO_MAX] = [ + 100, 115, 132, 152, 174, 200, 230, 264, 303, 348, 400, 459, 528, 606, 696, 800, 919, 1056, + 1213, 1393, 1600, 1838, 2111, 2425, 2786, 3200, 3676, 4222, 4850, 5572, 6400, 7352, 8445, + 9701, 11143, 12800, 14703, 16890, 19401, 22286, 25600, 29407, 33779, 38802, 44572, 51200, + 58813, 67559, 77605, 89144, 102400, 117627, 135118, 155209, 178289, 204800, 235253, 270235, + 310419, 356578, 409600, 470507, 540470, 620838, + ]; + + pub fn new() -> Self { + let (messages_tx, messages_rx) = channel(); + Self { + points: [0; PRIO_MAX], + messages: [ + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + VecDeque::new(), + ], + messages_tx, + messages_rx, + queued: HashSet::new(), //TODO: optimize with u64 and 64 bits + } + } + + fn tick(&mut self) { + // Check Range + for (prio, msg) in self.messages_rx.try_iter() { + debug_assert!(prio as usize <= PRIO_MAX); + println!("tick {}", prio); + self.queued.insert(prio); + self.messages[prio as usize].push_back(msg); + } + } + + //if None returned, we are empty! + fn calc_next_prio(&self) -> Option { + // compare all queued prios, max 64 operations + let mut lowest = std::u32::MAX; + let mut lowest_id = None; + for &n in &self.queued { + let n_points = self.points[n as usize]; + if n_points < lowest { + lowest = n_points; + lowest_id = Some(n) + } else if n_points == lowest && lowest_id.is_some() && n < lowest_id.unwrap() { + //on equial points lowest first! + lowest_id = Some(n) + } + } + lowest_id + } + + /// returns if msg is empty + fn tick_msg>(msg: &mut OutGoingMessage, frames: &mut E) -> bool { + let to_send = std::cmp::min( + msg.buffer.data.len() as u64 - msg.cursor, + Self::FRAME_DATA_SIZE, + ); + if to_send > 0 { + if msg.cursor == 0 { + //TODO: OutGoingMessage MUST HAVE A MID AT THIS POINT ALREADY! AS I HAVE NO + // IDEA OF STREAMS HERE! + debug_assert!(msg.mid.is_some()); + frames.extend(std::iter::once(Frame::DataHeader { + mid: msg + .mid + .expect("read comment 3 lines above this error message 41231255661"), + sid: msg.sid, + length: msg.buffer.data.len() as u64, + })); + } + frames.extend(std::iter::once(Frame::Data { + id: msg.mid.unwrap(), + start: msg.cursor, + data: msg.buffer.data[msg.cursor as usize..(msg.cursor + to_send) as usize] + .to_vec(), + })); + }; + msg.cursor += to_send; + msg.cursor >= msg.buffer.data.len() as u64 + } + + /// no_of_frames = frames.len() + /// Your goal is to try to find a realistic no_of_frames! + /// no_of_frames should be choosen so, that all Frames can be send out till + /// the next tick! + /// - if no_of_frames is too high you will fill either the Socket buffer, + /// or your internal buffer. In that case you will increase latency for + /// high prio messages! + /// - if no_of_frames is too low you wont saturate your Socket fully, thus + /// have a lower bandwidth as possible + pub fn fill_frames>(&mut self, no_of_frames: usize, frames: &mut E) { + self.tick(); + for _ in 0..no_of_frames { + match self.calc_next_prio() { + Some(prio) => { + println!("dasd {}", prio); + self.points[prio as usize] += Self::PRIOS[prio as usize]; + //pop message from front of VecDeque, handle it and push it back, so that all + // => messages with same prio get a fair chance :) + //TODO: evalaute not poping every time + match self.messages[prio as usize].pop_front() { + Some(mut msg) => { + if Self::tick_msg(&mut msg, frames) { + //debug!(?m.mid, "finish message"); + //check if prio is empty + if self.messages[prio as usize].is_empty() { + self.queued.remove(&prio); + } + } else { + self.messages[prio as usize].push_back(msg); + //trace!(?m.mid, "repush message"); + } + }, + None => unreachable!("msg not in VecDeque, but queued"), + } + }, + None => { + //QUEUE is empty, we are clearing the POINTS to not build up huge pipes of + // POINTS on a prio from the past + self.points = [0; PRIO_MAX]; + break; + }, + } + } + } + + pub fn get_tx(&self) -> &Sender<(u8, OutGoingMessage)> { &self.messages_tx } +} + +#[cfg(test)] +mod tests { + use crate::{ + message::{MessageBuffer, OutGoingMessage}, + prios::*, + types::{Frame, Mid, Sid}, + }; + use std::{collections::VecDeque, sync::Arc}; + + fn mock_out(prio: u8, sid: Sid) -> (u8, OutGoingMessage) { + (prio, OutGoingMessage { + buffer: Arc::new(MessageBuffer { + data: vec![48, 49, 50], + }), + cursor: 0, + mid: Some(1), + sid, + }) + } + + fn mock_out_large(prio: u8, sid: Sid) -> (u8, OutGoingMessage) { + const MSG_SIZE: usize = PrioManager::FRAME_DATA_SIZE as usize; + let mut data = vec![48; MSG_SIZE]; + data.append(&mut vec![49; MSG_SIZE]); + data.append(&mut vec![50; 20]); + (prio, OutGoingMessage { + buffer: Arc::new(MessageBuffer { data }), + cursor: 0, + mid: Some(1), + sid, + }) + } + + fn assert_header(frames: &mut VecDeque, f_sid: Sid, f_length: u64) { + let frame = frames + .pop_front() + .expect("frames vecdeque doesn't contain enough frames!"); + if let Frame::DataHeader { mid, sid, length } = frame { + assert_eq!(mid, 1); + assert_eq!(sid, f_sid); + assert_eq!(length, f_length); + } else { + panic!("wrong frame type!, expected DataHeader"); + } + } + + fn assert_data(frames: &mut VecDeque, f_start: u64, f_data: Vec) { + let frame = frames + .pop_front() + .expect("frames vecdeque doesn't contain enough frames!"); + if let Frame::Data { id, start, data } = frame { + assert_eq!(id, 1); + assert_eq!(start, f_start); + assert_eq!(data, f_data); + } else { + panic!("wrong frame type!, expected Data"); + } + } + + #[test] + fn single_p16() { + let mut mgr = PrioManager::new(); + mgr.get_tx().send(mock_out(16, 1337)).unwrap(); + let mut frames = VecDeque::new(); + mgr.fill_frames(100, &mut frames); + + assert_header(&mut frames, 1337, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + assert!(frames.is_empty()); + } + + #[test] + fn single_p16_p20() { + let mut mgr = PrioManager::new(); + mgr.get_tx().send(mock_out(16, 1337)).unwrap(); + mgr.get_tx().send(mock_out(20, 42)).unwrap(); + let mut frames = VecDeque::new(); + mgr.fill_frames(100, &mut frames); + + assert_header(&mut frames, 1337, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + assert_header(&mut frames, 42, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + assert!(frames.is_empty()); + } + + #[test] + fn single_p20_p16() { + let mut mgr = PrioManager::new(); + mgr.get_tx().send(mock_out(20, 42)).unwrap(); + mgr.get_tx().send(mock_out(16, 1337)).unwrap(); + let mut frames = VecDeque::new(); + mgr.fill_frames(100, &mut frames); + + assert_header(&mut frames, 1337, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + assert_header(&mut frames, 42, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + assert!(frames.is_empty()); + } + + #[test] + fn multiple_p16_p20() { + let mut mgr = PrioManager::new(); + mgr.get_tx().send(mock_out(20, 2)).unwrap(); + mgr.get_tx().send(mock_out(16, 1)).unwrap(); + mgr.get_tx().send(mock_out(16, 3)).unwrap(); + mgr.get_tx().send(mock_out(16, 5)).unwrap(); + mgr.get_tx().send(mock_out(20, 4)).unwrap(); + mgr.get_tx().send(mock_out(20, 7)).unwrap(); + mgr.get_tx().send(mock_out(16, 6)).unwrap(); + mgr.get_tx().send(mock_out(20, 10)).unwrap(); + mgr.get_tx().send(mock_out(16, 8)).unwrap(); + mgr.get_tx().send(mock_out(20, 12)).unwrap(); + mgr.get_tx().send(mock_out(16, 9)).unwrap(); + mgr.get_tx().send(mock_out(16, 11)).unwrap(); + mgr.get_tx().send(mock_out(20, 13)).unwrap(); + let mut frames = VecDeque::new(); + mgr.fill_frames(100, &mut frames); + + for i in 1..14 { + assert_header(&mut frames, i, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + } + assert!(frames.is_empty()); + } + + #[test] + fn multiple_fill_frames_p16_p20() { + let mut mgr = PrioManager::new(); + mgr.get_tx().send(mock_out(20, 2)).unwrap(); + mgr.get_tx().send(mock_out(16, 1)).unwrap(); + mgr.get_tx().send(mock_out(16, 3)).unwrap(); + mgr.get_tx().send(mock_out(16, 5)).unwrap(); + mgr.get_tx().send(mock_out(20, 4)).unwrap(); + mgr.get_tx().send(mock_out(20, 7)).unwrap(); + mgr.get_tx().send(mock_out(16, 6)).unwrap(); + mgr.get_tx().send(mock_out(20, 10)).unwrap(); + mgr.get_tx().send(mock_out(16, 8)).unwrap(); + mgr.get_tx().send(mock_out(20, 12)).unwrap(); + mgr.get_tx().send(mock_out(16, 9)).unwrap(); + mgr.get_tx().send(mock_out(16, 11)).unwrap(); + mgr.get_tx().send(mock_out(20, 13)).unwrap(); + let mut frames = VecDeque::new(); + mgr.fill_frames(3, &mut frames); + for i in 1..4 { + assert_header(&mut frames, i, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + } + assert!(frames.is_empty()); + mgr.fill_frames(11, &mut frames); + for i in 4..14 { + assert_header(&mut frames, i, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + } + assert!(frames.is_empty()); + } + + #[test] + fn single_large_p16() { + let mut mgr = PrioManager::new(); + mgr.get_tx().send(mock_out_large(16, 1)).unwrap(); + let mut frames = VecDeque::new(); + mgr.fill_frames(100, &mut frames); + + assert_header(&mut frames, 1, PrioManager::FRAME_DATA_SIZE * 2 + 20); + assert_data(&mut frames, 0, vec![ + 48; + PrioManager::FRAME_DATA_SIZE as usize + ]); + assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE, vec![ + 49; + PrioManager::FRAME_DATA_SIZE + as usize + ]); + assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE * 2, vec![50; 20]); + assert!(frames.is_empty()); + } + + #[test] + fn multiple_large_p16() { + let mut mgr = PrioManager::new(); + mgr.get_tx().send(mock_out_large(16, 1)).unwrap(); + mgr.get_tx().send(mock_out_large(16, 2)).unwrap(); + let mut frames = VecDeque::new(); + mgr.fill_frames(100, &mut frames); + + assert_header(&mut frames, 1, PrioManager::FRAME_DATA_SIZE * 2 + 20); + assert_data(&mut frames, 0, vec![ + 48; + PrioManager::FRAME_DATA_SIZE as usize + ]); + assert_header(&mut frames, 2, PrioManager::FRAME_DATA_SIZE * 2 + 20); + assert_data(&mut frames, 0, vec![ + 48; + PrioManager::FRAME_DATA_SIZE as usize + ]); + assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE, vec![ + 49; + PrioManager::FRAME_DATA_SIZE + as usize + ]); + assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE, vec![ + 49; + PrioManager::FRAME_DATA_SIZE + as usize + ]); + assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE * 2, vec![50; 20]); + assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE * 2, vec![50; 20]); + assert!(frames.is_empty()); + } + + #[test] + fn multiple_large_p16_sudden_p0() { + let mut mgr = PrioManager::new(); + mgr.get_tx().send(mock_out_large(16, 1)).unwrap(); + mgr.get_tx().send(mock_out_large(16, 2)).unwrap(); + let mut frames = VecDeque::new(); + mgr.fill_frames(3, &mut frames); + + assert_header(&mut frames, 1, PrioManager::FRAME_DATA_SIZE * 2 + 20); + assert_data(&mut frames, 0, vec![ + 48; + PrioManager::FRAME_DATA_SIZE as usize + ]); + assert_header(&mut frames, 2, PrioManager::FRAME_DATA_SIZE * 2 + 20); + assert_data(&mut frames, 0, vec![ + 48; + PrioManager::FRAME_DATA_SIZE as usize + ]); + assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE, vec![ + 49; + PrioManager::FRAME_DATA_SIZE + as usize + ]); + + mgr.get_tx().send(mock_out(0, 3)).unwrap(); + mgr.fill_frames(100, &mut frames); + + assert_header(&mut frames, 3, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + + assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE, vec![ + 49; + PrioManager::FRAME_DATA_SIZE + as usize + ]); + assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE * 2, vec![50; 20]); + assert_data(&mut frames, PrioManager::FRAME_DATA_SIZE * 2, vec![50; 20]); + assert!(frames.is_empty()); + } + + #[test] + fn single_p20_thousand_p16_at_once() { + let mut mgr = PrioManager::new(); + for _ in 0..998 { + mgr.get_tx().send(mock_out(16, 2)).unwrap(); + } + mgr.get_tx().send(mock_out(20, 1)).unwrap(); + mgr.get_tx().send(mock_out(16, 2)).unwrap(); + mgr.get_tx().send(mock_out(16, 2)).unwrap(); + let mut frames = VecDeque::new(); + mgr.fill_frames(2000, &mut frames); + + assert_header(&mut frames, 2, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + assert_header(&mut frames, 1, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + assert_header(&mut frames, 2, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + assert_header(&mut frames, 2, 3); + //unimportant + } + + #[test] + fn single_p20_thousand_p16_later() { + let mut mgr = PrioManager::new(); + for _ in 0..998 { + mgr.get_tx().send(mock_out(16, 2)).unwrap(); + } + let mut frames = VecDeque::new(); + mgr.fill_frames(2000, &mut frames); + //^unimportant frames, gonna be dropped + mgr.get_tx().send(mock_out(20, 1)).unwrap(); + mgr.get_tx().send(mock_out(16, 2)).unwrap(); + mgr.get_tx().send(mock_out(16, 2)).unwrap(); + let mut frames = VecDeque::new(); + mgr.fill_frames(2000, &mut frames); + + //important in that test is, that after the first frames got cleared i reset + // the Points even though 998 prio 16 messages have been send at this + // point and 0 prio20 messages the next mesasge is a prio16 message + // again, and only then prio20! we dont want to build dept over a idling + // connection + assert_header(&mut frames, 2, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + assert_header(&mut frames, 1, 3); + assert_data(&mut frames, 0, vec![48, 49, 50]); + assert_header(&mut frames, 2, 3); + //unimportant + } +} diff --git a/network/src/tcp.rs b/network/src/tcp.rs index 69296bf2aa..87fdb0e870 100644 --- a/network/src/tcp.rs +++ b/network/src/tcp.rs @@ -1,4 +1,7 @@ -use crate::{channel::ChannelProtocol, types::Frame}; +use crate::{ + channel::ChannelProtocol, + types::{Frame, NetworkBuffer}, +}; use bincode; use mio::net::TcpStream; use std::io::{Read, Write}; @@ -6,17 +9,10 @@ use tracing::*; pub(crate) struct TcpChannel { endpoint: TcpStream, - //these buffers only ever contain 1 FRAME ! read_buffer: NetworkBuffer, write_buffer: NetworkBuffer, } -struct NetworkBuffer { - data: Vec, - read_idx: usize, - write_idx: usize, -} - impl TcpChannel { pub fn new(endpoint: TcpStream) -> Self { Self { @@ -27,72 +23,6 @@ impl TcpChannel { } } -/// NetworkBuffer to use for streamed access -/// valid data is between read_idx and write_idx! -/// everything before read_idx is already processed and no longer important -/// everything after write_idx is either 0 or random data buffered -impl NetworkBuffer { - fn new() -> Self { - NetworkBuffer { - data: vec![0; 2048], - read_idx: 0, - write_idx: 0, - } - } - - fn get_write_slice(&mut self, min_size: usize) -> &mut [u8] { - if self.data.len() < self.write_idx + min_size { - trace!( - ?self, - ?min_size, - "need to resize because buffer is to small" - ); - self.data.resize(self.write_idx + min_size, 0); - } - &mut self.data[self.write_idx..] - } - - fn actually_written(&mut self, cnt: usize) { self.write_idx += cnt; } - - fn get_read_slice(&self) -> &[u8] { &self.data[self.read_idx..self.write_idx] } - - fn actually_read(&mut self, cnt: usize) { - self.read_idx += cnt; - if self.read_idx == self.write_idx { - if self.read_idx > 10485760 { - trace!(?self, "buffer empty, resetting indices"); - } - self.read_idx = 0; - self.write_idx = 0; - } - if self.write_idx > 10485760 { - if self.write_idx - self.read_idx < 65536 { - debug!( - ?self, - "This buffer is filled over 10 MB, but the actual data diff is less then \ - 65kB, which is a sign of stressing this connection much as always new data \ - comes in - nevertheless, in order to handle this we will remove some data \ - now so that this buffer doesn't grow endlessly" - ); - let mut i2 = 0; - for i in self.read_idx..self.write_idx { - self.data[i2] = self.data[i]; - i2 += 1; - } - self.read_idx = 0; - self.write_idx = i2; - } - if self.data.len() > 67108864 { - warn!( - ?self, - "over 64Mbyte used, something seems fishy, len: {}", - self.data.len() - ); - } - } - } -} - impl ChannelProtocol for TcpChannel { type Handle = TcpStream; @@ -101,6 +31,12 @@ impl ChannelProtocol for TcpChannel { let mut result = Vec::new(); loop { match self.endpoint.read(self.read_buffer.get_write_slice(2048)) { + Ok(0) => { + //Shutdown + trace!(?self, "shutdown of tcp channel detected"); + result.push(Frame::Shutdown); + break; + }, Ok(n) => { self.read_buffer.actually_written(n); trace!("incomming message with len: {}", n); diff --git a/network/src/types.rs b/network/src/types.rs index 160d90cbdc..d78be0613a 100644 --- a/network/src/types.rs +++ b/network/src/types.rs @@ -7,7 +7,8 @@ use enumset::EnumSet; use futures; use mio::{self, net::TcpListener, PollOpt, Ready}; use serde::{Deserialize, Serialize}; -use std::{collections::VecDeque, sync::mpsc}; +use std::collections::VecDeque; +use tracing::*; use uuid::Uuid; //Participant Ids are randomly chosen @@ -16,14 +17,15 @@ pub type Pid = Uuid; // every Network involved Every Channel gets a subrange during their handshake // protocol from one of the 2 ranges //*otherwise extra synchronization would be needed -pub type Sid = u32; +pub type Sid = u64; //Message Ids are unique per Stream* and are split in 2 ranges, one for every // Channel involved //*otherwise extra synchronization would be needed pub type Mid = u64; pub(crate) const VELOREN_MAGIC_NUMBER: &str = "VELOREN"; -pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 1, 0]; +pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 2, 0]; +pub const DEFAULT_SID_SIZE: u64 = 1 << 48; // Used for Communication between Controller <--> Worker pub(crate) enum CtrlMsg { @@ -31,10 +33,10 @@ pub(crate) enum CtrlMsg { Register(TokenObjects, Ready, PollOpt), OpenStream { pid: Pid, + sid: Sid, prio: u8, promises: EnumSet, msg_tx: futures::channel::mpsc::UnboundedSender, - return_sid: mpsc::Sender, }, CloseStream { pid: Pid, @@ -47,6 +49,7 @@ pub(crate) enum RtrnMsg { Shutdown, ConnectedParticipant { pid: Pid, + controller_sids: tlid::Pool>, }, OpendStream { pid: Pid, @@ -72,6 +75,7 @@ pub(crate) struct IntStream { sid: Sid, prio: u8, promises: EnumSet, + pub mid_pool: tlid::Pool>, msg_tx: futures::channel::mpsc::UnboundedSender, pub to_send: VecDeque, pub to_receive: VecDeque, @@ -88,6 +92,7 @@ impl IntStream { sid, prio, promises, + mid_pool: tlid::Pool::new_full(), msg_tx, to_send: VecDeque::new(), to_receive: VecDeque::new(), @@ -114,13 +119,16 @@ pub(crate) enum Frame { }, Configure { //only one Participant will send this package and give the other a range to use - stream_id_pool: tlid::Pool>, - msg_id_pool: tlid::Pool>, + sender_controller_sids: tlid::RemoveAllocation, + sender_worker_sids: tlid::RemoveAllocation, + receiver_controller_sids: tlid::Pool>, + receiver_worker_sids: tlid::Pool>, }, ParticipantId { pid: Pid, }, - Shutdown {/* Shutsdown this channel gracefully, if all channels are shut down, Participant is deleted */}, + Shutdown, /* Shutsdown this channel gracefully, if all channels are shut down, Participant + * is deleted */ OpenStream { sid: Sid, prio: u8, @@ -144,17 +152,87 @@ pub(crate) enum Frame { Raw(Vec), } -#[derive(Debug)] -pub struct RemoteParticipant { - pub stream_id_pool: tlid::Pool>, - pub msg_id_pool: tlid::Pool>, +pub(crate) struct NetworkBuffer { + pub(crate) data: Vec, + pub(crate) read_idx: usize, + pub(crate) write_idx: usize, } -impl RemoteParticipant { +/// NetworkBuffer to use for streamed access +/// valid data is between read_idx and write_idx! +/// everything before read_idx is already processed and no longer important +/// everything after write_idx is either 0 or random data buffered +impl NetworkBuffer { pub(crate) fn new() -> Self { - Self { - stream_id_pool: tlid::Pool::new_full(), - msg_id_pool: tlid::Pool::new_full(), + NetworkBuffer { + data: vec![0; 2048], + read_idx: 0, + write_idx: 0, + } + } + + pub(crate) fn get_write_slice(&mut self, min_size: usize) -> &mut [u8] { + if self.data.len() < self.write_idx + min_size { + trace!( + ?self, + ?min_size, + "need to resize because buffer is to small" + ); + self.data.resize(self.write_idx + min_size, 0); + } + &mut self.data[self.write_idx..] + } + + pub(crate) fn actually_written(&mut self, cnt: usize) { self.write_idx += cnt; } + + pub(crate) fn get_read_slice(&self) -> &[u8] { &self.data[self.read_idx..self.write_idx] } + + pub(crate) fn actually_read(&mut self, cnt: usize) { + self.read_idx += cnt; + if self.read_idx == self.write_idx { + if self.read_idx > 10485760 { + trace!(?self, "buffer empty, resetting indices"); + } + self.read_idx = 0; + self.write_idx = 0; + } + if self.write_idx > 10485760 { + if self.write_idx - self.read_idx < 65536 { + debug!( + ?self, + "This buffer is filled over 10 MB, but the actual data diff is less then \ + 65kB, which is a sign of stressing this connection much as always new data \ + comes in - nevertheless, in order to handle this we will remove some data \ + now so that this buffer doesn't grow endlessly" + ); + let mut i2 = 0; + for i in self.read_idx..self.write_idx { + self.data[i2] = self.data[i]; + i2 += 1; + } + self.read_idx = 0; + self.write_idx = i2; + } + if self.data.len() > 67108864 { + warn!( + ?self, + "over 64Mbyte used, something seems fishy, len: {}", + self.data.len() + ); + } } } } + +fn chose_protocol( + available_protocols: u8, /* 1 = TCP, 2= UDP, 4 = MPSC */ + promises: u8, /* */ +) -> u8 /*1,2 or 4*/ { + if available_protocols & (1 << 3) != 0 { + 4 + } else if available_protocols & (1 << 1) != 0 { + 1 + } else { + 2 + } +} diff --git a/network/src/udp.rs b/network/src/udp.rs index ae685cf3b9..c12cc838b4 100644 --- a/network/src/udp.rs +++ b/network/src/udp.rs @@ -1,20 +1,23 @@ -use crate::{channel::ChannelProtocol, types::Frame}; +use crate::{ + channel::ChannelProtocol, + types::{Frame, NetworkBuffer}, +}; use bincode; use mio::net::UdpSocket; use tracing::*; pub(crate) struct UdpChannel { endpoint: UdpSocket, - read_buffer: Vec, - _write_buffer: Vec, + read_buffer: NetworkBuffer, + write_buffer: NetworkBuffer, } impl UdpChannel { pub fn _new(endpoint: UdpSocket) -> Self { Self { endpoint, - read_buffer: Vec::new(), - _write_buffer: Vec::new(), + read_buffer: NetworkBuffer::new(), + write_buffer: NetworkBuffer::new(), } } } @@ -25,58 +28,95 @@ impl ChannelProtocol for UdpChannel { /// Execute when ready to read fn read(&mut self) -> Vec { let mut result = Vec::new(); - match self.endpoint.recv_from(self.read_buffer.as_mut_slice()) { - Ok((n, _)) => { - trace!("incomming message with len: {}", n); - let mut cur = std::io::Cursor::new(&self.read_buffer[..n]); - while cur.position() < n as u64 { - let r: Result = bincode::deserialize_from(&mut cur); - match r { - Ok(frame) => result.push(frame), - Err(e) => { - error!( - ?self, - ?e, - "failure parsing a message with len: {}, starting with: {:?}", - n, - &self.read_buffer[0..std::cmp::min(n, 10)] - ); - break; - }, + loop { + match self.endpoint.recv(self.read_buffer.get_write_slice(2048)) { + Ok(0) => { + //Shutdown + trace!(?self, "shutdown of tcp channel detected"); + result.push(Frame::Shutdown); + break; + }, + Ok(n) => { + self.read_buffer.actually_written(n); + trace!("incomming message with len: {}", n); + let slice = self.read_buffer.get_read_slice(); + let mut cur = std::io::Cursor::new(slice); + let mut read_ok = 0; + while cur.position() < n as u64 { + let round_start = cur.position() as usize; + let r: Result = bincode::deserialize_from(&mut cur); + match r { + Ok(frame) => { + result.push(frame); + read_ok = cur.position() as usize; + }, + Err(e) => { + // Probably we have to wait for moare data! + let first_bytes_of_msg = + &slice[round_start..std::cmp::min(n, round_start + 16)]; + debug!( + ?self, + ?e, + ?n, + ?round_start, + ?first_bytes_of_msg, + "message cant be parsed, probably because we need to wait for \ + more data" + ); + break; + }, + } } - } - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("would block"); - }, - Err(e) => { - panic!("{}", e); - }, - }; + self.read_buffer.actually_read(read_ok); + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + debug!("would block"); + break; + }, + Err(e) => panic!("{}", e), + }; + } result } /// Execute when ready to write fn write>(&mut self, frames: &mut I) { - for frame in frames { - if let Ok(data) = bincode::serialize(&frame) { - let total = data.len(); - match self.endpoint.send(&data) { - Ok(n) if n == total => { - trace!("send {} bytes", n); + loop { + //serialize when len < MTU 1500, then write + if self.write_buffer.get_read_slice().len() < 1500 { + match frames.next() { + Some(frame) => { + if let Ok(size) = bincode::serialized_size(&frame) { + let slice = self.write_buffer.get_write_slice(size as usize); + if let Err(err) = bincode::serialize_into(slice, &frame) { + error!( + ?err, + "serialising frame was unsuccessful, this should never \ + happen! dropping frame!" + ) + } + self.write_buffer.actually_written(size as usize); //I have to rely on those informations to be consistent! + } else { + error!( + "getting size of frame was unsuccessful, this should never \ + happen! dropping frame!" + ) + }; }, - Ok(_) => { - error!("could only send part"); - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("would block"); - return; - }, - Err(e) => { - panic!("{}", e); - }, - }; - }; + None => break, + } + } + + match self.endpoint.send(self.write_buffer.get_read_slice()) { + Ok(n) => { + self.write_buffer.actually_read(n); + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + debug!("can't send tcp yet, would block"); + return; + }, + Err(e) => panic!("{}", e), + } } } diff --git a/network/src/worker.rs b/network/src/worker.rs index 281547ce03..b47cd2e4ed 100644 --- a/network/src/worker.rs +++ b/network/src/worker.rs @@ -3,13 +3,13 @@ use crate::{ controller::Controller, metrics::NetworkMetrics, tcp::TcpChannel, - types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg, TokenObjects}, + types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects}, }; use mio::{self, Poll, PollOpt, Ready, Token}; -use mio_extras::channel::{Receiver, Sender}; +use mio_extras::channel::Receiver; use std::{ collections::HashMap, - sync::{mpsc::TryRecvError, Arc, RwLock}, + sync::{mpsc, mpsc::TryRecvError, Arc, RwLock}, time::Instant, }; use tlid; @@ -43,9 +43,10 @@ pub(crate) struct Worker { pid: Pid, poll: Arc, metrics: Arc>, - remotes: Arc>>, + sid_backup_per_participant: Arc>>>>, + participants: HashMap>>, ctrl_rx: Receiver, - rtrn_tx: Sender, + rtrn_tx: mpsc::Sender, mio_tokens: MioTokens, time_before_poll: Instant, time_after_poll: Instant, @@ -56,17 +57,18 @@ impl Worker { pid: Pid, poll: Arc, metrics: Arc>, - remotes: Arc>>, + sid_backup_per_participant: Arc>>>>, token_pool: tlid::Pool>, ctrl_rx: Receiver, - rtrn_tx: Sender, + rtrn_tx: mpsc::Sender, ) -> Self { let mio_tokens = MioTokens::new(token_pool); Worker { pid, poll, metrics, - remotes, + sid_backup_per_participant, + participants: HashMap::new(), ctrl_rx, rtrn_tx, mio_tokens, @@ -100,7 +102,10 @@ impl Worker { } fn handle_ctl(&mut self) -> bool { + info!("start in handle_ctl"); loop { + info!("recv in handle_ctl"); + let msg = match self.ctrl_rx.try_recv() { Ok(msg) => msg, Err(TryRecvError::Empty) => { @@ -110,6 +115,7 @@ impl Worker { panic!("Unexpected error '{}'", err); }, }; + info!("Loop in handle_ctl"); match msg { CtrlMsg::Shutdown => { @@ -148,24 +154,20 @@ impl Worker { }, CtrlMsg::OpenStream { pid, + sid, prio, promises, msg_tx, - return_sid, } => { let mut handled = false; for (_, obj) in self.mio_tokens.tokens.iter_mut() { if let TokenObjects::Channel(channel) = obj { if Some(pid) == channel.remote_pid { - let sid = channel.open_stream(prio, promises, msg_tx); - if let Err(err) = return_sid.send(sid) { - error!( - ?err, - "cannot send that a stream opened, probably channel was \ - already closed!" - ); - }; + info!(?channel.streams, "-CTR- going to open stream"); + channel.open_stream(sid, prio, promises, msg_tx); + info!(?channel.streams, "-CTR- going to tick"); channel.tick_send(); + info!(?channel.streams, "-CTR- did to open stream"); handled = true; break; } @@ -180,8 +182,11 @@ impl Worker { for to in self.mio_tokens.tokens.values_mut() { if let TokenObjects::Channel(channel) = to { if Some(pid) == channel.remote_pid { + info!(?channel.streams, "-CTR- going to close stream"); channel.close_stream(sid); //TODO: check participant + info!(?channel.streams, "-CTR- going to tick"); channel.tick_send(); + info!(?channel.streams, "-CTR- did to close stream"); handled = true; break; } @@ -195,8 +200,11 @@ impl Worker { let mut handled = false; for to in self.mio_tokens.tokens.values_mut() { if let TokenObjects::Channel(channel) = to { + info!(?channel.streams, "-CTR- going to send msg"); channel.send(outgoing); //TODO: check participant + info!(?channel.streams, "-CTR- going to tick"); channel.tick_send(); + info!(?channel.streams, "-CTR- did to send msg"); handled = true; break; } @@ -236,7 +244,7 @@ impl Worker { let mut channel = Channel::new( self.pid, ChannelProtocols::Tcp(tcp_channel), - self.remotes.clone(), + self.sid_backup_per_participant.clone(), None, ); channel.handshake(); @@ -254,12 +262,20 @@ impl Worker { if event.readiness().is_readable() { let protocol = channel.get_protocol(); trace!(?protocol, "channel readable"); - channel.tick_recv(&self.rtrn_tx); + channel.tick_recv(&mut self.participants, &self.rtrn_tx); + } else { + trace!("channel not readable"); } if event.readiness().is_writable() { let protocol = channel.get_protocol(); trace!(?protocol, "channel writeable"); channel.tick_send(); + } else { + trace!("channel not writeable"); + let protocol = channel.get_protocol(); + if let ChannelProtocols::Mpsc(_) = &protocol { + channel.tick_send(); //workaround for MPSC!!! ONLY for MPSC + } } }, }; diff --git a/network/tests/helper.rs b/network/tests/helper.rs new file mode 100644 index 0000000000..834315edf1 --- /dev/null +++ b/network/tests/helper.rs @@ -0,0 +1,53 @@ +use lazy_static::*; +use std::{sync::Arc, thread, time::Duration}; +use tracing::*; +use tracing_subscriber::EnvFilter; +use uvth::{ThreadPool, ThreadPoolBuilder}; + +pub fn setup(tracing: bool, mut sleep: u64) -> (Arc, u64) { + lazy_static! { + static ref THREAD_POOL: Arc = Arc::new( + ThreadPoolBuilder::new() + .name("veloren-network-test".into()) + .num_threads(2) + .build(), + ); + } + + if tracing { + sleep += 1000 + } + if sleep > 0 { + thread::sleep(Duration::from_millis(sleep)); + } + + let _subscriber = if tracing { + let filter = EnvFilter::from_default_env() + //.add_directive("[worker]=trace".parse().unwrap()) + .add_directive("trace".parse().unwrap()) + .add_directive("veloren_network::tests=trace".parse().unwrap()) + .add_directive("veloren_network::worker=debug".parse().unwrap()) + .add_directive("veloren_network::controller=trace".parse().unwrap()) + .add_directive("veloren_network::channel=trace".parse().unwrap()) + .add_directive("veloren_network::message=trace".parse().unwrap()) + .add_directive("veloren_network::metrics=trace".parse().unwrap()) + .add_directive("veloren_network::types=trace".parse().unwrap()) + .add_directive("veloren_network::mpsc=debug".parse().unwrap()) + .add_directive("veloren_network::udp=debug".parse().unwrap()) + .add_directive("veloren_network::tcp=debug".parse().unwrap()); + + Some( + tracing_subscriber::FmtSubscriber::builder() + // all spans/events with a level higher than TRACE (e.g, info, warn, etc.) + // will be written to stdout. + .with_max_level(Level::TRACE) + .with_env_filter(filter) + // sets this to be the default, global subscriber for this application. + .try_init(), + ) + } else { + None + }; + + (THREAD_POOL.clone(), 0) +} diff --git a/network/tests/integration.rs b/network/tests/integration.rs new file mode 100644 index 0000000000..88e848eca9 --- /dev/null +++ b/network/tests/integration.rs @@ -0,0 +1,110 @@ +use futures::executor::block_on; +use std::{net::SocketAddr, thread, time::Duration}; +use uuid::Uuid; +use veloren_network::{Address, Network, Promise}; + +mod helper; + +/* +#[test] +fn tcp_simple() { + let (thread_pool, _) = helper::setup(true, 100); + let n1 = Network::new(Uuid::new_v4(), thread_pool.clone()); + let n2 = Network::new(Uuid::new_v4(), thread_pool.clone()); + let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000))); + let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52001))); + n1.listen(&a1).unwrap(); //await + n2.listen(&a2).unwrap(); // only requiered here, but doesnt hurt on n1 + thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! + + let p1 = block_on(n1.connect(&a2)).unwrap(); //await + let s1 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); + + assert!(s1.send("Hello World").is_ok()); + + let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1 + let mut s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1 + + let s: Result = block_on(s1_n2.recv()); + assert_eq!(s, Ok("Hello World".to_string())); + + assert!(s1.close().is_ok()); +} +*/ + +/* +#[test] +fn tcp_5streams() { + let (thread_pool, _) = helper::setup(false, 200); + let n1 = Network::new(Uuid::new_v4(), thread_pool.clone()); + let n2 = Network::new(Uuid::new_v4(), thread_pool.clone()); + let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52010))); + let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52011))); + + n1.listen(&a1).unwrap(); //await + n2.listen(&a2).unwrap(); // only requiered here, but doesnt hurt on n1 + thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! + + let p1 = block_on(n1.connect(&a2)).unwrap(); //await + + let s1 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); + let s2 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); + let s3 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); + let s4 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); + let s5 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); + + assert!(s3.send("Hello World3").is_ok()); + assert!(s1.send("Hello World1").is_ok()); + assert!(s5.send("Hello World5").is_ok()); + assert!(s2.send("Hello World2").is_ok()); + assert!(s4.send("Hello World4").is_ok()); + + let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1 + let mut s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1 + let mut s2_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s2 + let mut s3_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s3 + let mut s4_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s4 + let mut s5_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s5 + + info!("all streams opened"); + + let s: Result = block_on(s3_n2.recv()); + assert_eq!(s, Ok("Hello World3".to_string())); + let s: Result = block_on(s1_n2.recv()); + assert_eq!(s, Ok("Hello World1".to_string())); + let s: Result = block_on(s2_n2.recv()); + assert_eq!(s, Ok("Hello World2".to_string())); + let s: Result = block_on(s5_n2.recv()); + assert_eq!(s, Ok("Hello World5".to_string())); + let s: Result = block_on(s4_n2.recv()); + assert_eq!(s, Ok("Hello World4".to_string())); + + assert!(s1.close().is_ok()); +} +*/ +#[test] +fn mpsc_simple() { + let (thread_pool, _) = helper::setup(true, 2300); + let n1 = Network::new(Uuid::new_v4(), thread_pool.clone()); + let n2 = Network::new(Uuid::new_v4(), thread_pool.clone()); + let a1 = Address::Mpsc(42); + let a2 = Address::Mpsc(1337); + //n1.listen(&a1).unwrap(); //await //TODO: evaluate if this should be allowed + // or is forbidden behavior... + n2.listen(&a2).unwrap(); // only requiered here, but doesnt hurt on n1 + thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! + + let p1 = block_on(n1.connect(&a2)).unwrap(); //await + let s1 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); + + assert!(s1.send("Hello World").is_ok()); + + thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! + let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1 + let mut s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1 + + let s: Result = block_on(s1_n2.recv()); + assert_eq!(s, Ok("Hello World".to_string())); + + assert!(s1.close().is_ok()); +} diff --git a/network/tools/async_recv/Cargo.toml b/network/tools/async_recv/Cargo.toml index 961a932669..36793d1079 100644 --- a/network/tools/async_recv/Cargo.toml +++ b/network/tools/async_recv/Cargo.toml @@ -8,12 +8,12 @@ edition = "2018" [dependencies] uvth = "3.1" -network = { package = "veloren-network", path = "../../../network" } +network = { package = "veloren_network", path = "../../../network" } clap = "2.33" uuid = { version = "0.8", features = ["serde", "v4"] } futures = "0.3" tracing = "0.1" chrono = "0.4" -tracing-subscriber = "0.2.0-alpha.4" +tracing-subscriber = "0.2.3" bincode = "1.2" serde = "1.0" \ No newline at end of file diff --git a/network/tools/async_recv/src/main.rs b/network/tools/async_recv/src/main.rs index f3b0653037..25133c2c9d 100644 --- a/network/tools/async_recv/src/main.rs +++ b/network/tools/async_recv/src/main.rs @@ -47,7 +47,7 @@ fn main() { ) .get_matches(); - let filter = EnvFilter::from_default_env().add_directive("error".parse().unwrap()); + let filter = EnvFilter::from_default_env().add_directive("trace".parse().unwrap()); //.add_directive("veloren_network::tests=trace".parse().unwrap()); tracing_subscriber::FmtSubscriber::builder() @@ -81,7 +81,7 @@ fn server(port: u16) { thread::sleep(Duration::from_millis(200)); let server = Network::new(Uuid::new_v4(), thread_pool.clone()); let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], port))); - block_on(server.listen(&address)).unwrap(); //await + server.listen(&address).unwrap(); //await thread::sleep(Duration::from_millis(10)); //TODO: listeing still doesnt block correctly! println!("waiting for client"); @@ -161,8 +161,8 @@ fn client(port: u16) { thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! let p1 = block_on(client.connect(&address)).unwrap(); //remote representation of p1 - let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1 - let s2 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s2 + let s1 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); //remote representation of s1 + let s2 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); //remote representation of s2 let before = Instant::now(); block_on(async { let f1 = async_task1(s1); diff --git a/network/tools/network-speed/Cargo.toml b/network/tools/network-speed/Cargo.toml index 55e9a1006d..5648ff14c9 100644 --- a/network/tools/network-speed/Cargo.toml +++ b/network/tools/network-speed/Cargo.toml @@ -8,11 +8,11 @@ edition = "2018" [dependencies] uvth = "3.1" -network = { package = "veloren-network", path = "../../../network" } +network = { package = "veloren_network", path = "../../../network" } clap = "2.33" uuid = { version = "0.8", features = ["serde", "v4"] } futures = "0.3" tracing = "0.1" -tracing-subscriber = "0.2.0-alpha.4" +tracing-subscriber = "0.2.3" bincode = "1.2" serde = "1.0" \ No newline at end of file diff --git a/network/tools/network-speed/src/main.rs b/network/tools/network-speed/src/main.rs index 64a12ba772..c3b1ec759f 100644 --- a/network/tools/network-speed/src/main.rs +++ b/network/tools/network-speed/src/main.rs @@ -64,7 +64,7 @@ fn main() { .with_env_filter(filter) // sets this to be the default, global subscriber for this application. .init(); - + /* if let Some(matches) = matches.subcommand_matches("listen") { let port = matches .value_of("port") @@ -76,7 +76,12 @@ fn main() { .value_of("port") .map_or(52000, |v| v.parse::().unwrap_or(52000)); client(port); - }; + };*/ + thread::spawn(|| { + server(52000); + }); + thread::sleep(Duration::from_millis(3)); + client(52000); } fn server(port: u16) { @@ -88,7 +93,9 @@ fn server(port: u16) { thread::sleep(Duration::from_millis(200)); let server = Network::new(Uuid::new_v4(), thread_pool.clone()); let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], port))); - block_on(server.listen(&address)).unwrap(); //await + //let address = Address::Mpsc(port as u64); + //let address = Address::Udp(SocketAddr::from(([127, 0, 0, 1], port))); + server.listen(&address).unwrap(); //await thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! loop { @@ -116,11 +123,13 @@ fn client(port: u16) { thread::sleep(Duration::from_millis(200)); let client = Network::new(Uuid::new_v4(), thread_pool.clone()); let address = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], port))); + //let address = Address::Mpsc(port as u64); + //let address = Address::Udp(SocketAddr::from(([127, 0, 0, 1], port))); thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! loop { let p1 = block_on(client.connect(&address)).unwrap(); //remote representation of p1 - let mut s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1 + let mut s1 = p1.open(16, Promise::InOrder | Promise::NoCorrupt).unwrap(); //remote representation of s1 let mut last = Instant::now(); let mut id = 0u64; loop {