remove worker folder - flatten file structure

This commit is contained in:
Marcel Märtens 2020-02-21 14:08:34 +01:00
parent e388b40c54
commit 10863eed14
12 changed files with 89 additions and 95 deletions

View File

@ -1,12 +1,10 @@
use crate::{ use crate::{
internal::RemoteParticipant, channel::{Channel, ChannelProtocols},
controller::Controller,
message::{self, OutGoingMessage}, message::{self, OutGoingMessage},
worker::{ metrics::NetworkMetrics,
channel::ChannelProtocols, tcp::TcpChannel,
metrics::NetworkMetrics, types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg, Sid, TokenObjects},
types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects},
Channel, Controller, TcpChannel,
},
}; };
use enumset::*; use enumset::*;
use mio::{ use mio::{
@ -154,7 +152,7 @@ impl<E: Events> Network<E> {
None None
} }
pub fn open(&self, part: &Participant, prio: u8, promises: EnumSet<Promise>) -> Stream { pub async fn open(&self, part: &Participant, prio: u8, promises: EnumSet<Promise>) -> Stream {
let (ctrl_tx, ctrl_rx) = std::sync::mpsc::channel::<Sid>(); let (ctrl_tx, ctrl_rx) = std::sync::mpsc::channel::<Sid>();
for controller in self.controller.iter() { for controller in self.controller.iter() {
controller controller
@ -229,7 +227,6 @@ impl<E: Events> Network<E> {
}, },
Address::Udp(_) => unimplemented!("lazy me"), Address::Udp(_) => unimplemented!("lazy me"),
} }
Err(NetworkError::Todo_Error_For_Wrong_Connection)
} }
//TODO: evaluate if move to Participant //TODO: evaluate if move to Participant
@ -299,7 +296,6 @@ impl Stream {
pub enum NetworkError { pub enum NetworkError {
NetworkDestroyed, NetworkDestroyed,
WorkerDestroyed, WorkerDestroyed,
Todo_Error_For_Wrong_Connection,
IoError(std::io::Error), IoError(std::io::Error),
} }

View File

@ -1,13 +1,13 @@
use crate::{ use crate::{
api::Promise, api::Promise,
internal::{RemoteParticipant, VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION},
message::{InCommingMessage, MessageBuffer, OutGoingMessage}, message::{InCommingMessage, MessageBuffer, OutGoingMessage},
worker::{ mpsc::MpscChannel,
mpsc::MpscChannel, tcp::TcpChannel,
tcp::TcpChannel, types::{
types::{Frame, Mid, Pid, RtrnMsg, Sid, Stream}, Frame, Mid, Pid, RemoteParticipant, RtrnMsg, Sid, Stream, VELOREN_MAGIC_NUMBER,
udp::UdpChannel, VELOREN_NETWORK_VERSION,
}, },
udp::UdpChannel,
}; };
use enumset::EnumSet; use enumset::EnumSet;
use mio_extras::channel::Sender; use mio_extras::channel::Sender;
@ -248,7 +248,6 @@ impl Channel {
self.msg_id_pool = Some(msg_id_pool); self.msg_id_pool = Some(msg_id_pool);
} }
if let Some(send) = &self.return_pid_to { if let Some(send) = &self.return_pid_to {
info!("asdasd");
send.send(pid); send.send(pid);
}; };
self.return_pid_to = None; self.return_pid_to = None;
@ -428,7 +427,6 @@ impl Channel {
pub(crate) fn send(&mut self, outgoing: OutGoingMessage) { pub(crate) fn send(&mut self, outgoing: OutGoingMessage) {
for s in self.streams.iter_mut() { for s in self.streams.iter_mut() {
warn!("{}", s.sid());
if s.sid() == outgoing.sid { if s.sid() == outgoing.sid {
s.to_send.push_back(outgoing); s.to_send.push_back(outgoing);
return; return;

View File

@ -1,29 +1,13 @@
/* /*
Most of the internals take place in it's own worker-thread. Most of the internals take place in it's own worker-thread.
This folder contains all this outsourced calculation. This folder contains all this outsourced calculation.
This mod.rs contains the interface to communicate with the thread, This controller contains the interface to communicate with the thread,
communication is done via channels. communication is done via channels.
*/ */
pub mod channel;
pub mod metrics;
pub mod mpsc;
pub mod tcp;
pub mod types;
pub mod udp;
pub mod worker;
pub(crate) use channel::Channel;
pub(crate) use mpsc::MpscChannel;
pub(crate) use tcp::TcpChannel;
pub(crate) use udp::UdpChannel;
use crate::{ use crate::{
internal::RemoteParticipant, metrics::NetworkMetrics,
worker::{ types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg},
metrics::NetworkMetrics, worker::Worker,
types::{CtrlMsg, Pid, RtrnMsg},
worker::Worker,
},
}; };
use mio::{self, Poll, PollOpt, Ready, Token}; use mio::{self, Poll, PollOpt, Ready, Token};
use mio_extras::channel::{channel, Receiver, Sender}; use mio_extras::channel::{channel, Receiver, Sender};

View File

@ -1,36 +0,0 @@
use crate::{
api::Address,
worker::types::{Mid, Sid},
};
pub(crate) const VELOREN_MAGIC_NUMBER: &str = "VELOREN";
pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 1, 0];
pub(crate) enum Protocol {
Tcp,
Udp,
}
impl Address {
pub(crate) fn get_protocol(&self) -> Protocol {
match self {
Address::Tcp(_) => Protocol::Tcp,
Address::Udp(_) => Protocol::Udp,
}
}
}
#[derive(Debug)]
pub struct RemoteParticipant {
pub stream_id_pool: tlid::Pool<tlid::Wrapping<Sid>>,
pub msg_id_pool: tlid::Pool<tlid::Wrapping<Mid>>,
}
impl RemoteParticipant {
pub(crate) fn new() -> Self {
Self {
stream_id_pool: tlid::Pool::new_full(),
msg_id_pool: tlid::Pool::new_full(),
}
}
}

View File

@ -1,7 +1,13 @@
#![feature(trait_alias)] #![feature(trait_alias)]
mod api; mod api;
mod internal; mod channel;
mod controller;
mod message; mod message;
mod metrics;
mod mpsc;
mod tcp;
mod types;
mod udp;
mod worker; mod worker;
#[cfg(test)] #[cfg(test)]
@ -10,6 +16,7 @@ pub mod tests {
use futures::executor::block_on; use futures::executor::block_on;
use std::{net::SocketAddr, sync::Arc}; use std::{net::SocketAddr, sync::Arc};
use tracing::*; use tracing::*;
use tracing_subscriber::EnvFilter;
use uuid::Uuid; use uuid::Uuid;
use uvth::ThreadPoolBuilder; use uvth::ThreadPoolBuilder;
@ -28,11 +35,24 @@ pub mod tests {
} }
pub fn test_tracing() { pub fn test_tracing() {
let filter = EnvFilter::from_default_env()
//.add_directive("[worker]=trace".parse().unwrap())
//.add_directive("trace".parse().unwrap())
.add_directive("veloren_network::worker=debug".parse().unwrap())
.add_directive("veloren_network::controller=trace".parse().unwrap())
.add_directive("veloren_network::channel=trace".parse().unwrap())
.add_directive("veloren_network::message=trace".parse().unwrap())
.add_directive("veloren_network::metrics=trace".parse().unwrap())
.add_directive("veloren_network::types=trace".parse().unwrap())
.add_directive("veloren_network::mpsc=debug".parse().unwrap())
.add_directive("veloren_network::udp=debug".parse().unwrap())
.add_directive("veloren_network::tcp=debug".parse().unwrap());
tracing_subscriber::FmtSubscriber::builder() tracing_subscriber::FmtSubscriber::builder()
// all spans/events with a level higher than TRACE (e.g, info, warn, etc.) // all spans/events with a level higher than TRACE (e.g, info, warn, etc.)
// will be written to stdout. // will be written to stdout.
.with_max_level(Level::TRACE) .with_max_level(Level::TRACE)
//.with_env_filter("veloren_network::api=info,my_crate::my_mod=debug,[my_span]=trace") .with_env_filter(filter)
// sets this to be the default, global subscriber for this application. // sets this to be the default, global subscriber for this application.
.init(); .init();
} }
@ -105,7 +125,7 @@ pub mod tests {
let p1 = p1.unwrap(); let p1 = p1.unwrap();
std::thread::sleep(std::time::Duration::from_millis(20)); std::thread::sleep(std::time::Duration::from_millis(20));
let s1 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt); let s1 = block_on(n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt));
//let s2 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt); //let s2 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt);
std::thread::sleep(std::time::Duration::from_millis(20)); std::thread::sleep(std::time::Duration::from_millis(20));

View File

@ -1,7 +1,7 @@
use bincode; use bincode;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
//use std::collections::VecDeque; //use std::collections::VecDeque;
use crate::worker::types::{Mid, Sid}; use crate::types::{Mid, Sid};
use std::sync::Arc; use std::sync::Arc;
use tracing::*; use tracing::*;

View File

@ -53,9 +53,9 @@ impl NetworkMetrics {
"version", "version",
&format!( &format!(
"{}.{}.{}", "{}.{}.{}",
&crate::internal::VELOREN_NETWORK_VERSION[0], &crate::types::VELOREN_NETWORK_VERSION[0],
&crate::internal::VELOREN_NETWORK_VERSION[1], &crate::types::VELOREN_NETWORK_VERSION[1],
&crate::internal::VELOREN_NETWORK_VERSION[2] &crate::types::VELOREN_NETWORK_VERSION[2]
), ),
); );
let network_info = IntGauge::with_opts(opts)?; let network_info = IntGauge::with_opts(opts)?;

View File

@ -1,4 +1,4 @@
use crate::worker::{channel::ChannelProtocol, types::Frame}; use crate::{channel::ChannelProtocol, types::Frame};
use mio_extras::channel::{Receiver, Sender}; use mio_extras::channel::{Receiver, Sender};
use tracing::*; use tracing::*;

View File

@ -1,4 +1,4 @@
use crate::worker::{channel::ChannelProtocol, types::Frame}; use crate::{channel::ChannelProtocol, types::Frame};
use bincode; use bincode;
use mio::net::TcpStream; use mio::net::TcpStream;
use std::io::{Read, Write}; use std::io::{Read, Write};
@ -65,7 +65,7 @@ impl ChannelProtocol for TcpChannel {
let total = data.len(); let total = data.len();
match self.endpoint.write(&data) { match self.endpoint.write(&data) {
Ok(n) if n == total => { Ok(n) if n == total => {
trace!("send!"); trace!("send {} bytes", n);
}, },
Ok(n) => { Ok(n) => {
error!("could only send part"); error!("could only send part");

View File

@ -1,7 +1,7 @@
use crate::{ use crate::{
api::Promise, api::{Address, Promise},
channel::Channel,
message::{InCommingMessage, OutGoingMessage}, message::{InCommingMessage, OutGoingMessage},
worker::Channel,
}; };
use enumset::EnumSet; use enumset::EnumSet;
use mio::{self, net::TcpListener, PollOpt, Ready}; use mio::{self, net::TcpListener, PollOpt, Ready};
@ -21,6 +21,9 @@ pub type Sid = u32;
//*otherwise extra synchronization would be needed //*otherwise extra synchronization would be needed
pub type Mid = u64; pub type Mid = u64;
pub(crate) const VELOREN_MAGIC_NUMBER: &str = "VELOREN";
pub const VELOREN_NETWORK_VERSION: [u32; 3] = [0, 1, 0];
// Used for Communication between Controller <--> Worker // Used for Communication between Controller <--> Worker
pub(crate) enum CtrlMsg { pub(crate) enum CtrlMsg {
Shutdown, Shutdown,
@ -123,3 +126,32 @@ pub(crate) enum Frame {
* against veloren Server! */ * against veloren Server! */
Raw(Vec<u8>), Raw(Vec<u8>),
} }
pub(crate) enum Protocol {
Tcp,
Udp,
}
impl Address {
pub(crate) fn get_protocol(&self) -> Protocol {
match self {
Address::Tcp(_) => Protocol::Tcp,
Address::Udp(_) => Protocol::Udp,
}
}
}
#[derive(Debug)]
pub struct RemoteParticipant {
pub stream_id_pool: tlid::Pool<tlid::Wrapping<Sid>>,
pub msg_id_pool: tlid::Pool<tlid::Wrapping<Mid>>,
}
impl RemoteParticipant {
pub(crate) fn new() -> Self {
Self {
stream_id_pool: tlid::Pool::new_full(),
msg_id_pool: tlid::Pool::new_full(),
}
}
}

View File

@ -1,4 +1,4 @@
use crate::worker::{channel::ChannelProtocol, types::Frame}; use crate::{channel::ChannelProtocol, types::Frame};
use bincode; use bincode;
use mio::net::UdpSocket; use mio::net::UdpSocket;
use tracing::*; use tracing::*;
@ -61,7 +61,9 @@ impl ChannelProtocol for UdpChannel {
if let Ok(mut data) = bincode::serialize(&frame) { if let Ok(mut data) = bincode::serialize(&frame) {
let total = data.len(); let total = data.len();
match self.endpoint.send(&data) { match self.endpoint.send(&data) {
Ok(n) if n == total => {}, Ok(n) if n == total => {
trace!("send {} bytes", n);
},
Ok(n) => { Ok(n) => {
error!("could only send part"); error!("could only send part");
//let data = data.drain(n..).collect(); //TODO: //let data = data.drain(n..).collect(); //TODO:

View File

@ -1,11 +1,9 @@
use crate::{ use crate::{
internal::RemoteParticipant, channel::{Channel, ChannelProtocol, ChannelProtocols},
worker::{ controller::Controller,
channel::{ChannelProtocol, ChannelProtocols}, metrics::NetworkMetrics,
metrics::NetworkMetrics, tcp::TcpChannel,
types::{CtrlMsg, Pid, RtrnMsg, TokenObjects}, types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg, TokenObjects},
Channel, Controller, TcpChannel,
},
}; };
use mio::{self, Poll, PollOpt, Ready, Token}; use mio::{self, Poll, PollOpt, Ready, Token};
use mio_extras::channel::{Receiver, Sender}; use mio_extras::channel::{Receiver, Sender};