From 10863eed1401a30d486630cd2c8f8d0cc1f08c89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Fri, 21 Feb 2020 14:08:34 +0100 Subject: [PATCH] remove worker folder - flatten file structure --- network/src/api.rs | 16 ++++----- network/src/{worker => }/channel.rs | 14 ++++---- network/src/{worker/mod.rs => controller.rs} | 24 +++---------- network/src/internal.rs | 36 -------------------- network/src/lib.rs | 26 ++++++++++++-- network/src/message.rs | 2 +- network/src/{worker => }/metrics.rs | 6 ++-- network/src/{worker => }/mpsc.rs | 2 +- network/src/{worker => }/tcp.rs | 4 +-- network/src/{worker => }/types.rs | 36 ++++++++++++++++++-- network/src/{worker => }/udp.rs | 6 ++-- network/src/{worker => }/worker.rs | 12 +++---- 12 files changed, 89 insertions(+), 95 deletions(-) rename network/src/{worker => }/channel.rs (98%) rename network/src/{worker/mod.rs => controller.rs} (82%) delete mode 100644 network/src/internal.rs rename network/src/{worker => }/metrics.rs (96%) rename network/src/{worker => }/mpsc.rs (96%) rename network/src/{worker => }/tcp.rs (96%) rename network/src/{worker => }/types.rs (80%) rename network/src/{worker => }/udp.rs (94%) rename network/src/{worker => }/worker.rs (97%) diff --git a/network/src/api.rs b/network/src/api.rs index 15ccdb2e87..580c7cb888 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -1,12 +1,10 @@ use crate::{ - internal::RemoteParticipant, + channel::{Channel, ChannelProtocols}, + controller::Controller, message::{self, OutGoingMessage}, - worker::{ - channel::ChannelProtocols, - metrics::NetworkMetrics, - types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects}, - Channel, Controller, TcpChannel, - }, + metrics::NetworkMetrics, + tcp::TcpChannel, + types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg, Sid, TokenObjects}, }; use enumset::*; use mio::{ @@ -154,7 +152,7 @@ impl Network { None } - pub fn open(&self, part: &Participant, prio: u8, promises: EnumSet) -> Stream { + pub async fn open(&self, part: &Participant, prio: u8, promises: EnumSet) -> Stream { let (ctrl_tx, ctrl_rx) = std::sync::mpsc::channel::(); for controller in self.controller.iter() { controller @@ -229,7 +227,6 @@ impl Network { }, Address::Udp(_) => unimplemented!("lazy me"), } - Err(NetworkError::Todo_Error_For_Wrong_Connection) } //TODO: evaluate if move to Participant @@ -299,7 +296,6 @@ impl Stream { pub enum NetworkError { NetworkDestroyed, WorkerDestroyed, - Todo_Error_For_Wrong_Connection, IoError(std::io::Error), } diff --git a/network/src/worker/channel.rs b/network/src/channel.rs similarity index 98% rename from network/src/worker/channel.rs rename to network/src/channel.rs index 2165115bab..ffd0246dc5 100644 --- a/network/src/worker/channel.rs +++ b/network/src/channel.rs @@ -1,13 +1,13 @@ use crate::{ api::Promise, - internal::{RemoteParticipant, VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION}, message::{InCommingMessage, MessageBuffer, OutGoingMessage}, - worker::{ - mpsc::MpscChannel, - tcp::TcpChannel, - types::{Frame, Mid, Pid, RtrnMsg, Sid, Stream}, - udp::UdpChannel, + mpsc::MpscChannel, + tcp::TcpChannel, + types::{ + Frame, Mid, Pid, RemoteParticipant, RtrnMsg, Sid, Stream, VELOREN_MAGIC_NUMBER, + VELOREN_NETWORK_VERSION, }, + udp::UdpChannel, }; use enumset::EnumSet; use mio_extras::channel::Sender; @@ -248,7 +248,6 @@ impl Channel { self.msg_id_pool = Some(msg_id_pool); } if let Some(send) = &self.return_pid_to { - info!("asdasd"); send.send(pid); }; self.return_pid_to = None; @@ -428,7 +427,6 @@ impl Channel { pub(crate) fn send(&mut self, outgoing: OutGoingMessage) { for s in self.streams.iter_mut() { - warn!("{}", s.sid()); if s.sid() == outgoing.sid { s.to_send.push_back(outgoing); return; diff --git a/network/src/worker/mod.rs b/network/src/controller.rs similarity index 82% rename from network/src/worker/mod.rs rename to network/src/controller.rs index 4320f4e40a..05e8513dfc 100644 --- a/network/src/worker/mod.rs +++ b/network/src/controller.rs @@ -1,29 +1,13 @@ /* Most of the internals take place in it's own worker-thread. This folder contains all this outsourced calculation. - This mod.rs contains the interface to communicate with the thread, + This controller contains the interface to communicate with the thread, communication is done via channels. */ -pub mod channel; -pub mod metrics; -pub mod mpsc; -pub mod tcp; -pub mod types; -pub mod udp; -pub mod worker; - -pub(crate) use channel::Channel; -pub(crate) use mpsc::MpscChannel; -pub(crate) use tcp::TcpChannel; -pub(crate) use udp::UdpChannel; - use crate::{ - internal::RemoteParticipant, - worker::{ - metrics::NetworkMetrics, - types::{CtrlMsg, Pid, RtrnMsg}, - worker::Worker, - }, + metrics::NetworkMetrics, + types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg}, + worker::Worker, }; use mio::{self, Poll, PollOpt, Ready, Token}; use mio_extras::channel::{channel, Receiver, Sender}; diff --git a/network/src/internal.rs b/network/src/internal.rs deleted file mode 100644 index 3a177f8ae7..0000000000 --- a/network/src/internal.rs +++ /dev/null @@ -1,36 +0,0 @@ -use crate::{ - api::Address, - worker::types::{Mid, Sid}, -}; - -pub(crate) const VELOREN_MAGIC_NUMBER: &str = "VELOREN"; -pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 1, 0]; - -pub(crate) enum Protocol { - Tcp, - Udp, -} - -impl Address { - pub(crate) fn get_protocol(&self) -> Protocol { - match self { - Address::Tcp(_) => Protocol::Tcp, - Address::Udp(_) => Protocol::Udp, - } - } -} - -#[derive(Debug)] -pub struct RemoteParticipant { - pub stream_id_pool: tlid::Pool>, - pub msg_id_pool: tlid::Pool>, -} - -impl RemoteParticipant { - pub(crate) fn new() -> Self { - Self { - stream_id_pool: tlid::Pool::new_full(), - msg_id_pool: tlid::Pool::new_full(), - } - } -} diff --git a/network/src/lib.rs b/network/src/lib.rs index 27aff0dea2..e8bd60b1dd 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -1,7 +1,13 @@ #![feature(trait_alias)] mod api; -mod internal; +mod channel; +mod controller; mod message; +mod metrics; +mod mpsc; +mod tcp; +mod types; +mod udp; mod worker; #[cfg(test)] @@ -10,6 +16,7 @@ pub mod tests { use futures::executor::block_on; use std::{net::SocketAddr, sync::Arc}; use tracing::*; + use tracing_subscriber::EnvFilter; use uuid::Uuid; use uvth::ThreadPoolBuilder; @@ -28,11 +35,24 @@ pub mod tests { } pub fn test_tracing() { + let filter = EnvFilter::from_default_env() + //.add_directive("[worker]=trace".parse().unwrap()) + //.add_directive("trace".parse().unwrap()) + .add_directive("veloren_network::worker=debug".parse().unwrap()) + .add_directive("veloren_network::controller=trace".parse().unwrap()) + .add_directive("veloren_network::channel=trace".parse().unwrap()) + .add_directive("veloren_network::message=trace".parse().unwrap()) + .add_directive("veloren_network::metrics=trace".parse().unwrap()) + .add_directive("veloren_network::types=trace".parse().unwrap()) + .add_directive("veloren_network::mpsc=debug".parse().unwrap()) + .add_directive("veloren_network::udp=debug".parse().unwrap()) + .add_directive("veloren_network::tcp=debug".parse().unwrap()); + tracing_subscriber::FmtSubscriber::builder() // all spans/events with a level higher than TRACE (e.g, info, warn, etc.) // will be written to stdout. .with_max_level(Level::TRACE) - //.with_env_filter("veloren_network::api=info,my_crate::my_mod=debug,[my_span]=trace") + .with_env_filter(filter) // sets this to be the default, global subscriber for this application. .init(); } @@ -105,7 +125,7 @@ pub mod tests { let p1 = p1.unwrap(); std::thread::sleep(std::time::Duration::from_millis(20)); - let s1 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt); + let s1 = block_on(n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt)); //let s2 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt); std::thread::sleep(std::time::Duration::from_millis(20)); diff --git a/network/src/message.rs b/network/src/message.rs index 7230c85aab..5e5882154f 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -1,7 +1,7 @@ use bincode; use serde::{de::DeserializeOwned, Serialize}; //use std::collections::VecDeque; -use crate::worker::types::{Mid, Sid}; +use crate::types::{Mid, Sid}; use std::sync::Arc; use tracing::*; diff --git a/network/src/worker/metrics.rs b/network/src/metrics.rs similarity index 96% rename from network/src/worker/metrics.rs rename to network/src/metrics.rs index a1ad1fd13a..9ce3030cbe 100644 --- a/network/src/worker/metrics.rs +++ b/network/src/metrics.rs @@ -53,9 +53,9 @@ impl NetworkMetrics { "version", &format!( "{}.{}.{}", - &crate::internal::VELOREN_NETWORK_VERSION[0], - &crate::internal::VELOREN_NETWORK_VERSION[1], - &crate::internal::VELOREN_NETWORK_VERSION[2] + &crate::types::VELOREN_NETWORK_VERSION[0], + &crate::types::VELOREN_NETWORK_VERSION[1], + &crate::types::VELOREN_NETWORK_VERSION[2] ), ); let network_info = IntGauge::with_opts(opts)?; diff --git a/network/src/worker/mpsc.rs b/network/src/mpsc.rs similarity index 96% rename from network/src/worker/mpsc.rs rename to network/src/mpsc.rs index 7073df4106..e782421744 100644 --- a/network/src/worker/mpsc.rs +++ b/network/src/mpsc.rs @@ -1,4 +1,4 @@ -use crate::worker::{channel::ChannelProtocol, types::Frame}; +use crate::{channel::ChannelProtocol, types::Frame}; use mio_extras::channel::{Receiver, Sender}; use tracing::*; diff --git a/network/src/worker/tcp.rs b/network/src/tcp.rs similarity index 96% rename from network/src/worker/tcp.rs rename to network/src/tcp.rs index 16b5ca10d4..1e92f2f8f1 100644 --- a/network/src/worker/tcp.rs +++ b/network/src/tcp.rs @@ -1,4 +1,4 @@ -use crate::worker::{channel::ChannelProtocol, types::Frame}; +use crate::{channel::ChannelProtocol, types::Frame}; use bincode; use mio::net::TcpStream; use std::io::{Read, Write}; @@ -65,7 +65,7 @@ impl ChannelProtocol for TcpChannel { let total = data.len(); match self.endpoint.write(&data) { Ok(n) if n == total => { - trace!("send!"); + trace!("send {} bytes", n); }, Ok(n) => { error!("could only send part"); diff --git a/network/src/worker/types.rs b/network/src/types.rs similarity index 80% rename from network/src/worker/types.rs rename to network/src/types.rs index 8ac523d901..c073283cf5 100644 --- a/network/src/worker/types.rs +++ b/network/src/types.rs @@ -1,7 +1,7 @@ use crate::{ - api::Promise, + api::{Address, Promise}, + channel::Channel, message::{InCommingMessage, OutGoingMessage}, - worker::Channel, }; use enumset::EnumSet; use mio::{self, net::TcpListener, PollOpt, Ready}; @@ -21,6 +21,9 @@ pub type Sid = u32; //*otherwise extra synchronization would be needed pub type Mid = u64; +pub(crate) const VELOREN_MAGIC_NUMBER: &str = "VELOREN"; +pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 1, 0]; + // Used for Communication between Controller <--> Worker pub(crate) enum CtrlMsg { Shutdown, @@ -123,3 +126,32 @@ pub(crate) enum Frame { * against veloren Server! */ Raw(Vec), } + +pub(crate) enum Protocol { + Tcp, + Udp, +} + +impl Address { + pub(crate) fn get_protocol(&self) -> Protocol { + match self { + Address::Tcp(_) => Protocol::Tcp, + Address::Udp(_) => Protocol::Udp, + } + } +} + +#[derive(Debug)] +pub struct RemoteParticipant { + pub stream_id_pool: tlid::Pool>, + pub msg_id_pool: tlid::Pool>, +} + +impl RemoteParticipant { + pub(crate) fn new() -> Self { + Self { + stream_id_pool: tlid::Pool::new_full(), + msg_id_pool: tlid::Pool::new_full(), + } + } +} diff --git a/network/src/worker/udp.rs b/network/src/udp.rs similarity index 94% rename from network/src/worker/udp.rs rename to network/src/udp.rs index 84287cc9ec..009338d031 100644 --- a/network/src/worker/udp.rs +++ b/network/src/udp.rs @@ -1,4 +1,4 @@ -use crate::worker::{channel::ChannelProtocol, types::Frame}; +use crate::{channel::ChannelProtocol, types::Frame}; use bincode; use mio::net::UdpSocket; use tracing::*; @@ -61,7 +61,9 @@ impl ChannelProtocol for UdpChannel { if let Ok(mut data) = bincode::serialize(&frame) { let total = data.len(); match self.endpoint.send(&data) { - Ok(n) if n == total => {}, + Ok(n) if n == total => { + trace!("send {} bytes", n); + }, Ok(n) => { error!("could only send part"); //let data = data.drain(n..).collect(); //TODO: diff --git a/network/src/worker/worker.rs b/network/src/worker.rs similarity index 97% rename from network/src/worker/worker.rs rename to network/src/worker.rs index 68ce137d39..bf4799a20b 100644 --- a/network/src/worker/worker.rs +++ b/network/src/worker.rs @@ -1,11 +1,9 @@ use crate::{ - internal::RemoteParticipant, - worker::{ - channel::{ChannelProtocol, ChannelProtocols}, - metrics::NetworkMetrics, - types::{CtrlMsg, Pid, RtrnMsg, TokenObjects}, - Channel, Controller, TcpChannel, - }, + channel::{Channel, ChannelProtocol, ChannelProtocols}, + controller::Controller, + metrics::NetworkMetrics, + tcp::TcpChannel, + types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg, TokenObjects}, }; use mio::{self, Poll, PollOpt, Ready, Token}; use mio_extras::channel::{Receiver, Sender};