From 1435d8d6be6916b07074ff4b74102be3636a5886 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Fri, 12 Jun 2020 11:53:59 +0200 Subject: [PATCH] remove unused files --- network/src/mio_worker.rs | 308 ------------------------------------- network/src/tcp_channel.rs | 190 ----------------------- 2 files changed, 498 deletions(-) delete mode 100644 network/src/mio_worker.rs delete mode 100644 network/src/tcp_channel.rs diff --git a/network/src/mio_worker.rs b/network/src/mio_worker.rs deleted file mode 100644 index 805adf82ee..0000000000 --- a/network/src/mio_worker.rs +++ /dev/null @@ -1,308 +0,0 @@ -use crate::{api::Promise, internal::Channel, message::OutGoingMessage, tcp_channel::TcpChannel}; -use enumset::EnumSet; -use mio::{self, net::TcpListener, Poll, PollOpt, Ready, Token}; -use mio_extras::channel::{channel, Receiver, Sender}; -use std::{ - collections::HashMap, - sync::{mpsc::TryRecvError, Arc, RwLock}, - time::Instant, -}; -use tlid; -use tracing::{debug, error, info, span, trace, warn, Level}; -use uvth::ThreadPool; - -#[derive(Debug)] -pub(crate) enum TokenObjects { - TcpListener(TcpListener), - TcpChannel(TcpChannel), -} - -pub(crate) struct MioTokens { - pool: tlid::Pool>, - pub tokens: HashMap, //TODO: move to Vec for faster lookup -} - -impl MioTokens { - pub fn new(pool: tlid::Pool>) -> Self { - MioTokens { - pool, - tokens: HashMap::new(), - } - } - - pub fn construct(&mut self) -> Token { Token(self.pool.next()) } - - pub fn insert(&mut self, tok: Token, obj: TokenObjects) { - trace!(?tok, ?obj, "added new token"); - self.tokens.insert(tok, obj); - } -} - -// MioStatistics should be copied in order to not hold locks for long -#[derive(Clone, Default)] -pub struct MioStatistics { - nano_wait: u128, - nano_busy: u128, -} - -pub(crate) enum CtrlMsg { - Shutdown, - Register(TokenObjects, Ready, PollOpt), - OpenStream { - pid: uuid::Uuid, - prio: u8, - promises: EnumSet, - }, - CloseStream { - pid: uuid::Uuid, - sid: u32, - }, - Send(OutGoingMessage), -} - -/* - The MioWorker runs in it's own thread, - it has a given set of Channels to work with. - It is monitored, and when it's thread is fully loaded it can be splitted up into 2 MioWorkers -*/ -pub struct MioWorker { - tag: u64, /* only relevant for logs */ - pid: uuid::Uuid, - poll: Arc, - mio_statistics: Arc>, - ctrl_tx: Sender, -} - -impl MioWorker { - pub const CTRL_TOK: Token = Token(0); - - pub fn new( - tag: u64, - pid: uuid::Uuid, - thread_pool: Arc, - mut token_pool: tlid::Pool>, - ) -> Self { - let poll = Arc::new(Poll::new().unwrap()); - let poll_clone = poll.clone(); - let mio_statistics = Arc::new(RwLock::new(MioStatistics::default())); - let mio_statistics_clone = mio_statistics.clone(); - - let (ctrl_tx, ctrl_rx) = channel(); - poll.register(&ctrl_rx, Self::CTRL_TOK, Ready::readable(), PollOpt::edge()) - .unwrap(); - // reserve 10 tokens in case they start with 0, //TODO: cleaner method - for _ in 0..10 { - token_pool.next(); - } - - let mw = MioWorker { - tag, - pid, - poll, - mio_statistics, - ctrl_tx, - }; - thread_pool.execute(move || { - mio_worker( - tag, - pid, - poll_clone, - mio_statistics_clone, - token_pool, - ctrl_rx, - ) - }); - mw - } - - pub fn get_load_ratio(&self) -> f32 { - let statistics = self.mio_statistics.read().unwrap(); - statistics.nano_busy as f32 / (statistics.nano_busy + statistics.nano_wait + 1) as f32 - } - - //TODO: split 4->5 MioWorkers and merge 5->4 MioWorkers - - pub(crate) fn get_tx(&self) -> Sender { self.ctrl_tx.clone() } -} - -impl Drop for MioWorker { - fn drop(&mut self) { let _ = self.ctrl_tx.send(CtrlMsg::Shutdown); } -} - -fn mio_worker( - tag: u64, - pid: uuid::Uuid, - poll: Arc, - mio_statistics: Arc>, - mut token_pool: tlid::Pool>, - ctrl_rx: Receiver, -) { - let mut mio_tokens = MioTokens::new(token_pool); - let mut events = mio::Events::with_capacity(1024); - let mut buf: [u8; 65000] = [0; 65000]; - let span = span!(Level::INFO, "mio worker", ?tag); - let _enter = span.enter(); - loop { - let time_before_poll = Instant::now(); - if let Err(err) = poll.poll(&mut events, None) { - error!("network poll error: {}", err); - return; - } - let time_after_poll = Instant::now(); - for event in &events { - match event.token() { - MioWorker::CTRL_TOK => { - if handle_ctl(&ctrl_rx, &mut mio_tokens, &poll, &mut buf, time_after_poll) { - return; - } - }, - _ => handle_tok( - pid, - event, - &mut mio_tokens, - &poll, - &mut buf, - time_after_poll, - ), - }; - } - handle_statistics(&mio_statistics, time_before_poll, time_after_poll); - } -} - -fn handle_ctl( - ctrl_rx: &Receiver, - mio_tokens: &mut MioTokens, - poll: &Arc, - buf: &mut [u8; 65000], - time_after_poll: Instant, -) -> bool { - match ctrl_rx.try_recv() { - Ok(CtrlMsg::Shutdown) => { - debug!("Shutting Down"); - return true; - }, - Ok(CtrlMsg::Register(handle, interest, opts)) => { - let tok = mio_tokens.construct(); - match &handle { - TokenObjects::TcpListener(h) => poll.register(h, tok, interest, opts).unwrap(), - TokenObjects::TcpChannel(channel) => poll - .register(&channel.tcpstream, tok, interest, opts) - .unwrap(), - } - debug!(?handle, ?tok, "Registered new handle"); - mio_tokens.insert(tok, handle); - }, - Ok(CtrlMsg::OpenStream { - pid, - prio, - promises, - }) => { - for (tok, obj) in mio_tokens.tokens.iter_mut() { - if let TokenObjects::TcpChannel(channel) = obj { - channel.open_stream(prio, promises); //TODO: check participant - channel.write(buf, time_after_poll); - } - } - //TODO: - }, - Ok(CtrlMsg::CloseStream { pid, sid }) => { - //TODO: - for to in mio_tokens.tokens.values_mut() { - if let TokenObjects::TcpChannel(channel) = to { - channel.close_stream(sid); //TODO: check participant - channel.write(buf, time_after_poll); - } - } - }, - Ok(_) => unimplemented!("dad"), - Err(TryRecvError::Empty) => {}, - Err(err) => { - //postbox_tx.send(Err(err.into()))?; - return true; - }, - } - false -} - -fn handle_tok( - pid: uuid::Uuid, - event: mio::Event, - mio_tokens: &mut MioTokens, - poll: &Arc, - buf: &mut [u8; 65000], - time_after_poll: Instant, -) { - match mio_tokens.tokens.get_mut(&event.token()) { - Some(e) => { - trace!(?event, "event"); - match e { - TokenObjects::TcpListener(listener) => match listener.accept() { - Ok((mut remote_stream, _)) => { - info!(?remote_stream, "remote connected"); - - let tok = mio_tokens.construct(); - poll.register( - &remote_stream, - tok, - Ready::readable() | Ready::writable(), - PollOpt::edge(), - ) - .unwrap(); - trace!(?remote_stream, ?tok, "registered"); - let mut channel = TcpChannel::new(remote_stream); - channel.handshake(); - channel.participant_id(pid); - - mio_tokens - .tokens - .insert(tok, TokenObjects::TcpChannel(channel)); - }, - Err(err) => { - error!(?err, "error during remote connected"); - }, - }, - TokenObjects::TcpChannel(channel) => { - if event.readiness().is_readable() { - trace!(?channel.tcpstream, "stream readable"); - channel.read(buf, time_after_poll); - } - if event.readiness().is_writable() { - trace!(?channel.tcpstream, "stream writeable"); - channel.write(buf, time_after_poll); - } - }, - } - }, - None => panic!("Unexpected event token '{:?}'", &event.token()), - }; -} - -fn handle_statistics( - mio_statistics: &Arc>, - time_before_poll: Instant, - time_after_poll: Instant, -) { - let time_after_work = Instant::now(); - match mio_statistics.try_write() { - Ok(mut mio_statistics) => { - const OLD_KEEP_FACTOR: f64 = 0.995; - //in order to weight new data stronger than older we fade them out with a - // factor < 1. for 0.995 under full load (500 ticks a 1ms) we keep 8% of the old - // value this means, that we start to see load comming up after - // 500ms, but not for small spikes - as reordering for smaller spikes would be - // to slow - mio_statistics.nano_wait = (mio_statistics.nano_wait as f64 * OLD_KEEP_FACTOR) as u128 - + time_after_poll.duration_since(time_before_poll).as_nanos(); - mio_statistics.nano_busy = (mio_statistics.nano_busy as f64 * OLD_KEEP_FACTOR) as u128 - + time_after_work.duration_since(time_after_poll).as_nanos(); - - trace!( - "current Load {}", - mio_statistics.nano_busy as f32 - / (mio_statistics.nano_busy + mio_statistics.nano_wait + 1) as f32 - ); - }, - Err(e) => warn!("statistics dropped because they are currently accecssed"), - } -} diff --git a/network/src/tcp_channel.rs b/network/src/tcp_channel.rs deleted file mode 100644 index b3c28a1563..0000000000 --- a/network/src/tcp_channel.rs +++ /dev/null @@ -1,190 +0,0 @@ -use crate::{ - api::Promise, - internal::{Channel, Stream, TcpFrame, VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION}, -}; -use bincode; -use enumset::EnumSet; -use mio::{self, net::TcpStream}; -use std::{ - collections::VecDeque, - io::{Read, Write}, - time::Instant, -}; -use tracing::*; - -#[derive(Debug)] -pub(crate) struct TcpChannel { - stream_id_pool: tlid::Pool>, //TODO: stream_id unique per participant - msg_id_pool: tlid::Pool>, //TODO: msg_id unique per participant - participant_id: Option, - pub tcpstream: TcpStream, - pub streams: Vec, - pub send_queue: VecDeque, - pub recv_queue: VecDeque, -} - -impl TcpChannel { - pub fn new(tcpstream: TcpStream) -> Self { - TcpChannel { - stream_id_pool: tlid::Pool::new_full(), - msg_id_pool: tlid::Pool::new_full(), - participant_id: None, - tcpstream, - streams: Vec::new(), - send_queue: VecDeque::new(), - recv_queue: VecDeque::new(), - } - } - - fn handle_frame(&mut self, frame: TcpFrame) { - match frame { - TcpFrame::Handshake { - magic_number, - version, - } => { - if magic_number != VELOREN_MAGIC_NUMBER { - error!("tcp connection with invalid handshake, closing connection"); - #[cfg(debug_assertions)] - { - debug!("sending client instructions before killing"); - let _ = self.tcpstream.write( - "Handshake does not contain the magic number requiered by veloren \ - server.\nWe are not sure if you are a valid veloren client.\nClosing \ - the connection" - .as_bytes(), - ); - } - } - if version != VELOREN_NETWORK_VERSION { - error!("tcp connection with wrong network version"); - #[cfg(debug_assertions)] - { - debug!("sending client instructions before killing"); - let _ = self.tcpstream.write( - format!( - "Handshake does not contain a correct magic number, but invalid \ - version.\nWe don't know how to communicate with you.\nOur \ - Version: {:?}\nYour Version: {:?}\nClosing the connection", - VELOREN_NETWORK_VERSION, version, - ) - .as_bytes(), - ); - } - } - info!(?self, "handshake completed"); - }, - TcpFrame::ParticipantId { pid } => { - self.participant_id = Some(pid); - info!("Participant: {} send their ID", pid); - }, - TcpFrame::OpenStream { - sid, - prio, - promises, - } => { - if let Some(pid) = self.participant_id { - let sid = self.stream_id_pool.next(); - let stream = Stream::new(sid, prio, promises.clone()); - self.streams.push(stream); - info!("Participant: {} opened a stream", pid); - } - }, - TcpFrame::CloseStream { sid } => { - if let Some(pid) = self.participant_id { - self.streams.retain(|stream| stream.sid() != sid); - info!("Participant: {} closed a stream", pid); - } - }, - TcpFrame::DataHeader { id, length } => { - info!("Data Header {}", id); - }, - TcpFrame::Data { id, frame_no, data } => { - info!("Data Package {}", id); - }, - } - } -} - -impl Channel for TcpChannel { - fn read(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant) { - match self.tcpstream.read(uninitialized_dirty_speed_buffer) { - Ok(n) => { - trace!("incomming message with len: {}", n); - let mut cur = std::io::Cursor::new(&uninitialized_dirty_speed_buffer[..n]); - while cur.position() < n as u64 { - let r: Result = bincode::deserialize_from(&mut cur); - match r { - Ok(frame) => self.handle_frame(frame), - Err(e) => { - error!( - ?self, - ?e, - "failure parsing a message with len: {}, starting with: {:?}", - n, - &uninitialized_dirty_speed_buffer[0..std::cmp::min(n, 10)] - ); - }, - } - } - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("would block"); - }, - Err(e) => { - panic!("{}", e); - }, - }; - } - - fn write(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant) { - while let Some(elem) = self.send_queue.pop_front() { - if let Ok(mut data) = bincode::serialize(&elem) { - let total = data.len(); - match self.tcpstream.write(&data) { - Ok(n) if n == total => {}, - Ok(n) => { - error!("could only send part"); - //let data = data.drain(n..).collect(); //TODO: - // validate n.. is correct - // to_send.push_front(data); - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("would block"); - }, - Err(e) => { - panic!("{}", e); - }, - }; - }; - } - } - - fn open_stream(&mut self, prio: u8, promises: EnumSet) -> u32 { - // validate promises - let sid = self.stream_id_pool.next(); - let stream = Stream::new(sid, prio, promises.clone()); - self.streams.push(stream); - self.send_queue.push_back(TcpFrame::OpenStream { - sid, - prio, - promises, - }); - sid - } - - fn close_stream(&mut self, sid: u32) { - self.streams.retain(|stream| stream.sid() != sid); - self.send_queue.push_back(TcpFrame::CloseStream { sid }); - } - - fn handshake(&mut self) { - self.send_queue.push_back(TcpFrame::Handshake { - magic_number: VELOREN_MAGIC_NUMBER.to_string(), - version: VELOREN_NETWORK_VERSION, - }); - } - - fn participant_id(&mut self, pid: uuid::Uuid) { - self.send_queue.push_back(TcpFrame::ParticipantId { pid }); - } -}