Converting the API interface to Async and experimenting with a Channel implementation for TCP, UDP, MPSC, which will later be reverted

It should compile and tests run fine now.
If not, the 2nd last squashed commit message said it currently only send frames but not incomming messages, also recv would only handle frames. The last one said i added internal messages and a reverse path (prob for .recv)
This commit is contained in:
Marcel Märtens
2020-02-10 18:25:47 +01:00
parent 15edf6dbb4
commit 0777d70a0e
11 changed files with 583 additions and 291 deletions

55
Cargo.lock generated
View File

@ -74,9 +74,9 @@ dependencies = [
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.27" version = "1.0.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "013a6e0a2cbe3d20f9c60b65458f7a7f7a5e636c5d0f45a5a6aee5d4b1f01785" checksum = "d9a60d744a80c30fcb657dfe2c1b22bcb3e814c1a1e3674f32bf5820b570fbff"
[[package]] [[package]]
name = "anymap" name = "anymap"
@ -375,7 +375,7 @@ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
"futures-util", "futures-util",
"parking_lot 0.10.0", "parking_lot 0.10.1",
"slab", "slab",
] ]
@ -423,9 +423,9 @@ dependencies = [
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.2.1" version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12ae9db68ad7fac5fe51304d20f016c911539251075a214f8e663babefa35187" checksum = "1f359dc14ff8911330a51ef78022d376f25ed00248912803b58f00cb1c27f742"
[[package]] [[package]]
name = "byteorder" name = "byteorder"
@ -695,9 +695,9 @@ dependencies = [
[[package]] [[package]]
name = "const-tweaker" name = "const-tweaker"
version = "0.2.5" version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a7081900ff8f4b89046f8898eb8af6ed26be5a47299c56147d5a7dac74298b0" checksum = "7fbe3e1d2fccd896d451adb486910a0bfc233fd6dcafdb4e13bac7de72f8f250"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-std", "async-std",
@ -1066,9 +1066,9 @@ dependencies = [
[[package]] [[package]]
name = "dashmap" name = "dashmap"
version = "3.7.0" version = "3.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "010ef3f25ed5bb93505a3238d19957622190268640526aab07174c66ccf5d611" checksum = "0f87a04c37da1d3d27db1fb7f372802b72fb8c3ff3e9c0914530995127f4a6a1"
dependencies = [ dependencies = [
"ahash 0.3.2", "ahash 0.3.2",
"cfg-if", "cfg-if",
@ -3178,12 +3178,12 @@ dependencies = [
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.10.0" version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92e98c49ab0b7ce5b222f2cc9193fc4efe11c6d0bd4f648e374684a6857b1cfc" checksum = "6fdfcb5f20930a79e326f7ec992a9fdb5b7bd809254b1e735bdd5a99f78bee0d"
dependencies = [ dependencies = [
"lock_api", "lock_api",
"parking_lot_core 0.7.0", "parking_lot_core 0.7.2",
] ]
[[package]] [[package]]
@ -3215,9 +3215,9 @@ dependencies = [
[[package]] [[package]]
name = "parking_lot_core" name = "parking_lot_core"
version = "0.7.0" version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7582838484df45743c8434fbff785e8edf260c28748353d44bc0da32e0ceabf1" checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"cloudabi", "cloudabi",
@ -3420,9 +3420,9 @@ dependencies = [
[[package]] [[package]]
name = "proc-macro-nested" name = "proc-macro-nested"
version = "0.1.3" version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "369a6ed065f249a159e06c45752c780bda2fb53c995718f9e484d08daa9eb42e" checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
@ -4144,18 +4144,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.105" version = "1.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e707fbbf255b8fc8c3b99abb91e7257a622caeb20a9818cbadbeeede4e0932ff" checksum = "36df6ac6412072f67cf767ebbde4133a5b2e88e76dc6187fa7104cd16f783399"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.105" version = "1.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac5d00fc561ba2724df6758a17de23df5914f20e41cb00f94d5b7ae42fffaff8" checksum = "9e549e3abf4fb8621bd1609f11dfc9f5e50320802273b12f3811a67e6716ea6c"
dependencies = [ dependencies = [
"proc-macro2 1.0.9", "proc-macro2 1.0.9",
"quote 1.0.3", "quote 1.0.3",
@ -5086,9 +5086,9 @@ dependencies = [
[[package]] [[package]]
name = "vek" name = "vek"
version = "0.10.0" version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c98f7e1c1400d5b1704baee82cbc56a3fde406769555ead0f2306e43ebab967" checksum = "761f71ebd4296be71d1c584aa41a1ab8f3e5e646357fefce387b54381c151926"
dependencies = [ dependencies = [
"approx 0.3.2", "approx 0.3.2",
"num-integer", "num-integer",
@ -5120,7 +5120,7 @@ dependencies = [
"num_cpus", "num_cpus",
"specs", "specs",
"uvth", "uvth",
"vek 0.10.0", "vek 0.10.2",
"veloren-common", "veloren-common",
] ]
@ -5154,7 +5154,7 @@ dependencies = [
"specs", "specs",
"specs-idvs", "specs-idvs",
"sum_type", "sum_type",
"vek 0.10.0", "vek 0.10.2",
] ]
[[package]] [[package]]
@ -5164,6 +5164,7 @@ dependencies = [
"bincode", "bincode",
"byteorder 1.3.4", "byteorder 1.3.4",
"enumset", "enumset",
"futures 0.3.4",
"mio", "mio",
"mio-extras", "mio-extras",
"serde", "serde",
@ -5202,7 +5203,7 @@ dependencies = [
"specs", "specs",
"specs-idvs", "specs-idvs",
"uvth", "uvth",
"vek 0.10.0", "vek 0.10.2",
"veloren-common", "veloren-common",
"veloren-world", "veloren-world",
] ]
@ -5260,7 +5261,7 @@ dependencies = [
"specs-idvs", "specs-idvs",
"treeculler", "treeculler",
"uvth", "uvth",
"vek 0.10.0", "vek 0.10.2",
"veloren-client", "veloren-client",
"veloren-common", "veloren-common",
"veloren-server", "veloren-server",
@ -5295,7 +5296,7 @@ dependencies = [
"roots", "roots",
"serde", "serde",
"serde_derive", "serde_derive",
"vek 0.10.0", "vek 0.10.2",
"veloren-common", "veloren-common",
] ]

View File

@ -18,5 +18,6 @@ tracing = "0.1"
tracing-subscriber = "0.2.0-alpha.4" tracing-subscriber = "0.2.0-alpha.4"
byteorder = "1.3" byteorder = "1.3"
mio-extras = "2.0" mio-extras = "2.0"
futures = "0.3"
uuid = { version = "0.8", features = ["serde", "v4"] } uuid = { version = "0.8", features = ["serde", "v4"] }
tlid = { path = "../../tlid", features = ["serde"]} tlid = { path = "../../tlid", features = ["serde"]}

View File

@ -2,10 +2,8 @@ use crate::{
internal::RemoteParticipant, internal::RemoteParticipant,
message::{self, OutGoingMessage}, message::{self, OutGoingMessage},
worker::{ worker::{
channel::Channel,
tcp::TcpChannel,
types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects}, types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects},
Controller, Channel, Controller, TcpChannel,
}, },
}; };
use enumset::*; use enumset::*;
@ -42,6 +40,7 @@ pub enum Promise {
pub struct Participant { pub struct Participant {
addr: Address, addr: Address,
remote_pid: Pid,
} }
pub struct Connection {} pub struct Connection {}
@ -114,12 +113,15 @@ impl<E: Events> Network<E> {
// should almost ever be empty except for new channel creations and stream // should almost ever be empty except for new channel creations and stream
// creations! // creations!
for worker in self.controller.iter() { for worker in self.controller.iter() {
worker.get_tx().send(CtrlMsg::Send(OutGoingMessage { worker
.get_tx()
.send(CtrlMsg::Send(OutGoingMessage {
buffer: messagebuffer.clone(), buffer: messagebuffer.clone(),
cursor: 0, cursor: 0,
mid: None, mid: None,
sid: stream.sid, sid: stream.sid,
})); }))
.unwrap();
} }
} }
@ -146,72 +148,154 @@ impl<E: Events> Network<E> {
None None
} }
pub fn listen(&self, addr: &Address) { pub fn open(&self, part: &Participant, prio: u8, promises: EnumSet<Promise>) -> Stream {
let worker = Self::get_lowest_worker(&self.controller);
let pipe = worker.get_tx();
let address = addr.clone();
self.thread_pool.execute(move || {
let span = span!(Level::INFO, "listen", ?address);
let _enter = span.enter();
match address {
Address::Tcp(a) => {
info!("listening");
let tcp_listener = TcpListener::bind(&a).unwrap();
pipe.send(CtrlMsg::Register(
TokenObjects::TcpListener(tcp_listener),
Ready::readable(),
PollOpt::edge(),
))
.unwrap();
},
Address::Udp(_) => unimplemented!("lazy me"),
}
});
}
pub fn connect(&self, addr: &Address) -> Participant {
let worker = Self::get_lowest_worker(&self.controller);
let pipe = worker.get_tx();
let address = addr.clone();
let pid = self.participant_id;
let remotes = self.remotes.clone();
self.thread_pool.execute(move || {
let mut span = span!(Level::INFO, "connect", ?address);
let _enter = span.enter();
match address {
Address::Tcp(a) => {
info!("connecting");
let tcp_stream = match TcpStream::connect(&a) {
Err(err) => {
error!("could not open connection: {}", err);
return;
},
Ok(s) => s,
};
let mut channel = TcpChannel::new(tcp_stream, pid, remotes);
pipe.send(CtrlMsg::Register(
TokenObjects::TcpChannel(channel),
Ready::readable() | Ready::writable(),
PollOpt::edge(),
))
.unwrap();
},
Address::Udp(_) => unimplemented!("lazy me"),
}
});
Participant { addr: addr.clone() }
}
pub fn open(&self, part: Participant, prio: u8, promises: EnumSet<Promise>) -> Stream {
for worker in self.controller.iter() { for worker in self.controller.iter() {
worker.get_tx().send(CtrlMsg::OpenStream { worker
.get_tx()
.send(CtrlMsg::OpenStream {
pid: uuid::Uuid::new_v4(), pid: uuid::Uuid::new_v4(),
prio, prio,
promises, promises,
}); })
.unwrap();
} }
Stream { sid: 0 } Stream { sid: 0 }
} }
pub fn close(&self, stream: Stream) {} pub fn close(&self, stream: Stream) {}
pub async fn listen(&self, address: &Address) -> Result<(), NetworkError> {
let span = span!(Level::TRACE, "listen", ?address);
let worker = Self::get_lowest_worker(&self.controller);
let _enter = span.enter();
match address {
Address::Tcp(a) => {
let tcp_listener = TcpListener::bind(&a)?;
info!("listening");
worker.get_tx().send(CtrlMsg::Register(
TokenObjects::TcpListener(tcp_listener),
Ready::readable(),
PollOpt::edge(),
))?;
},
Address::Udp(_) => unimplemented!("lazy me"),
};
Ok(())
}
pub async fn connect(&self, address: &Address) -> Result<Participant, NetworkError> {
let worker = Self::get_lowest_worker(&self.controller);
let pid = self.participant_id;
let remotes = self.remotes.clone();
let mut span = span!(Level::INFO, "connect", ?address);
let _enter = span.enter();
match address {
Address::Tcp(a) => {
info!("connecting");
let tcp_stream = TcpStream::connect(&a)?;
let tcp_channel = TcpChannel::new(tcp_stream);
let mut channel = Channel::new(pid, tcp_channel, remotes);
let (ctrl_tx, ctrl_rx) = mio_extras::channel::channel::<Pid>();
worker.get_tx().send(CtrlMsg::Register(
TokenObjects::TcpChannel(channel, Some(ctrl_tx)),
Ready::readable() | Ready::writable(),
PollOpt::edge(),
))?;
// wait for a return
},
Address::Udp(_) => unimplemented!("lazy me"),
}
Ok(Participant {
addr: address.clone(),
remote_pid: uuid::Uuid::new_v4(),
})
}
//TODO: evaluate if move to Participant
pub async fn _disconnect(&self, participant: Participant) -> Result<(), NetworkError> {
panic!("sda");
}
pub fn participants(&self) -> Vec<Participant> {
panic!("sda");
}
pub async fn _connected(&self) -> Result<Participant, NetworkError> {
// returns if a Participant connected and is ready
panic!("sda");
}
pub async fn _disconnected(&self) -> Result<Participant, NetworkError> {
// returns if a Participant connected and is ready
panic!("sda");
}
pub async fn multisend<M: Serialize>(
&self,
streams: Vec<Stream>,
msg: M,
) -> Result<(), NetworkError> {
panic!("sda");
}
}
impl Participant {
pub async fn _open(
&self,
prio: u8,
promises: EnumSet<Promise>,
) -> Result<Stream, ParticipantError> {
panic!("sda");
}
pub async fn _close(&self, stream: Stream) -> Result<(), ParticipantError> {
panic!("sda");
}
pub async fn _opened(&self) -> Result<Stream, ParticipantError> {
panic!("sda");
}
pub async fn _closed(&self) -> Result<Stream, ParticipantError> {
panic!("sda");
}
}
impl Stream {
//TODO: What about SEND instead of Serializeable if it goes via PIPE ?
//TODO: timeout per message or per stream ? stream or ?
pub async fn _send<M: Serialize>(&self, msg: M) -> Result<(), StreamError> {
panic!("sda");
}
pub async fn _recv<M: DeserializeOwned>(&self) -> Result<M, StreamError> {
panic!("sda");
}
}
#[derive(Debug)]
pub enum NetworkError {
NetworkDestroyed,
WorkerDestroyed,
IoError(std::io::Error),
}
#[derive(Debug)]
pub enum ParticipantError {
ParticipantDisconected,
}
#[derive(Debug)]
pub enum StreamError {
StreamClosed,
}
impl From<std::io::Error> for NetworkError {
fn from(err: std::io::Error) -> Self { NetworkError::IoError(err) }
}
impl<T> From<mio_extras::channel::SendError<T>> for NetworkError {
fn from(err: mio_extras::channel::SendError<T>) -> Self { NetworkError::WorkerDestroyed }
} }

View File

@ -7,6 +7,7 @@ mod worker;
#[cfg(test)] #[cfg(test)]
pub mod tests { pub mod tests {
use crate::api::*; use crate::api::*;
use futures::executor::block_on;
use std::{net::SocketAddr, sync::Arc}; use std::{net::SocketAddr, sync::Arc};
use tracing::*; use tracing::*;
use uuid::Uuid; use uuid::Uuid;
@ -27,8 +28,6 @@ pub mod tests {
} }
pub fn test_tracing() { pub fn test_tracing() {
use tracing::Level;
tracing_subscriber::FmtSubscriber::builder() tracing_subscriber::FmtSubscriber::builder()
// all spans/events with a level higher than TRACE (e.g, info, warn, etc.) // all spans/events with a level higher than TRACE (e.g, info, warn, etc.)
// will be written to stdout. // will be written to stdout.
@ -43,7 +42,9 @@ pub mod tests {
assert_eq!(2 + 2, 4); assert_eq!(2 + 2, 4);
} }
/*
#[test] #[test]
#[ignore]
fn client_server() { fn client_server() {
let thread_pool = Arc::new( let thread_pool = Arc::new(
ThreadPoolBuilder::new() ThreadPoolBuilder::new()
@ -63,7 +64,7 @@ pub mod tests {
//n2.OnRemoteConnectionOpen triggered //n2.OnRemoteConnectionOpen triggered
std::thread::sleep(std::time::Duration::from_millis(20)); std::thread::sleep(std::time::Duration::from_millis(20));
let s1 = n1.open(p1, 16, Promise::InOrder | Promise::NoCorrupt); let s1 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt);
std::thread::sleep(std::time::Duration::from_millis(20)); std::thread::sleep(std::time::Duration::from_millis(20));
//n2.OnRemoteStreamOpen triggered //n2.OnRemoteStreamOpen triggered
@ -82,4 +83,40 @@ pub mod tests {
std::thread::sleep(std::time::Duration::from_millis(20000)); std::thread::sleep(std::time::Duration::from_millis(20000));
} }
*/
#[test]
fn client_server_stream() {
let thread_pool = Arc::new(
ThreadPoolBuilder::new()
.name("veloren-network-test".into())
.build(),
);
test_tracing();
let n1 = Network::<N>::new(Uuid::new_v4(), thread_pool.clone());
let n2 = Network::<N>::new(Uuid::new_v4(), thread_pool.clone());
let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52010)));
let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52011)));
block_on(n1.listen(&a1)).unwrap(); //await
block_on(n2.listen(&a2)).unwrap(); // only requiered here, but doesnt hurt on n1
std::thread::sleep(std::time::Duration::from_millis(20));
let p1 = block_on(n1.connect(&a2)); //await
let p1 = p1.unwrap();
std::thread::sleep(std::time::Duration::from_millis(20));
let s1 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt);
//let s2 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt);
std::thread::sleep(std::time::Duration::from_millis(20));
n1.send("Hello World", &s1);
std::thread::sleep(std::time::Duration::from_millis(20));
std::thread::sleep(std::time::Duration::from_millis(1000));
let s: Option<String> = n2.recv(&s1);
assert_eq!(s, Some("Hello World".to_string()));
n1.close(s1);
}
} }

View File

@ -9,33 +9,21 @@ use mio_extras::channel::Sender;
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
sync::{Arc, RwLock}, sync::{Arc, RwLock},
time::Instant,
}; };
use tracing::*; use tracing::*;
pub(crate) trait Channel { pub(crate) trait ChannelProtocol {
/* type Handle: ?Sized + mio::Evented;
uninitialized_dirty_speed_buffer: is just a already allocated buffer, that probably is already dirty because it's getting reused to save allocations, feel free to use it, but expect nothing
aprox_time is the time taken when the events come in, you can reuse it for message timeouts, to not make any more syscalls
*/
/// Execute when ready to read /// Execute when ready to read
fn read( fn read(&mut self) -> Vec<Frame>;
&mut self,
uninitialized_dirty_speed_buffer: &mut [u8; 65000],
aprox_time: Instant,
rtrn_tx: &Sender<RtrnMsg>,
);
/// Execute when ready to write /// Execute when ready to write
fn write(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant); fn write(&mut self, frame: Frame);
fn open_stream(&mut self, prio: u8, promises: EnumSet<Promise>) -> u32; /// used for mio
fn close_stream(&mut self, sid: u32); fn get_handle(&self) -> &Self::Handle;
fn handshake(&mut self);
fn shutdown(&mut self);
fn send(&mut self, outgoing: OutGoingMessage);
} }
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct ChannelState { pub(crate) struct Channel<P: ChannelProtocol> {
pub stream_id_pool: Option<tlid::Pool<tlid::Wrapping<Sid>>>, /* TODO: stream_id unique per pub stream_id_pool: Option<tlid::Pool<tlid::Wrapping<Sid>>>, /* TODO: stream_id unique per
* participant */ * participant */
pub msg_id_pool: Option<tlid::Pool<tlid::Wrapping<Mid>>>, //TODO: msg_id unique per pub msg_id_pool: Option<tlid::Pool<tlid::Wrapping<Mid>>>, //TODO: msg_id unique per
@ -46,6 +34,7 @@ pub(crate) struct ChannelState {
pub streams: Vec<Stream>, pub streams: Vec<Stream>,
pub send_queue: VecDeque<Frame>, pub send_queue: VecDeque<Frame>,
pub recv_queue: VecDeque<InCommingMessage>, pub recv_queue: VecDeque<InCommingMessage>,
pub protocol: P,
pub send_handshake: bool, pub send_handshake: bool,
pub send_pid: bool, pub send_pid: bool,
pub send_config: bool, pub send_config: bool,
@ -70,7 +59,7 @@ pub(crate) struct ChannelState {
Shutdown phase Shutdown phase
*/ */
impl ChannelState { impl<P: ChannelProtocol> Channel<P> {
const WRONG_NUMBER: &'static [u8] = "Handshake does not contain the magic number requiered by \ const WRONG_NUMBER: &'static [u8] = "Handshake does not contain the magic number requiered by \
veloren server.\nWe are not sure if you are a valid \ veloren server.\nWe are not sure if you are a valid \
veloren client.\nClosing the connection" veloren client.\nClosing the connection"
@ -79,8 +68,12 @@ impl ChannelState {
invalid version.\nWe don't know how to communicate with \ invalid version.\nWe don't know how to communicate with \
you.\n"; you.\n";
pub fn new(local_pid: Pid, remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>) -> Self { pub fn new(
ChannelState { local_pid: Pid,
protocol: P,
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
) -> Self {
Self {
stream_id_pool: None, stream_id_pool: None,
msg_id_pool: None, msg_id_pool: None,
local_pid, local_pid,
@ -89,6 +82,7 @@ impl ChannelState {
streams: Vec::new(), streams: Vec::new(),
send_queue: VecDeque::new(), send_queue: VecDeque::new(),
recv_queue: VecDeque::new(), recv_queue: VecDeque::new(),
protocol,
send_handshake: false, send_handshake: false,
send_pid: false, send_pid: false,
send_config: false, send_config: false,
@ -110,7 +104,20 @@ impl ChannelState {
&& !self.recv_shutdown && !self.recv_shutdown
} }
pub fn handle(&mut self, frame: Frame, rtrn_tx: &Sender<RtrnMsg>) { pub fn tick_recv(&mut self, rtrn_tx: &Sender<RtrnMsg>) {
for frame in self.protocol.read() {
self.handle(frame, rtrn_tx);
}
}
pub fn tick_send(&mut self) {
self.tick_streams();
while let Some(frame) = self.send_queue.pop_front() {
self.protocol.write(frame)
}
}
fn handle(&mut self, frame: Frame, rtrn_tx: &Sender<RtrnMsg>) {
match frame { match frame {
Frame::Handshake { Frame::Handshake {
magic_number, magic_number,
@ -261,9 +268,9 @@ impl ChannelState {
} }
if let Some(pos) = pos { if let Some(pos) = pos {
for m in s.to_receive.drain(pos..pos + 1) { for m in s.to_receive.drain(pos..pos + 1) {
info!("receied message: {}", m.mid); info!("received message: {}", m.mid);
//self.recv_queue.push_back(m); //self.recv_queue.push_back(m);
rtrn_tx.send(RtrnMsg::Receive(m)); rtrn_tx.send(RtrnMsg::Receive(m)).unwrap();
} }
} }
} }
@ -279,7 +286,7 @@ impl ChannelState {
// This function will tick all streams according to priority and add them to the // This function will tick all streams according to priority and add them to the
// send queue // send queue
pub(crate) fn tick_streams(&mut self) { fn tick_streams(&mut self) {
//ignoring prio for now //ignoring prio for now
//TODO: fix prio //TODO: fix prio
if let Some(msg_id_pool) = &mut self.msg_id_pool { if let Some(msg_id_pool) = &mut self.msg_id_pool {
@ -327,4 +334,50 @@ impl ChannelState {
self.send_shutdown = true; self.send_shutdown = true;
} }
} }
pub(crate) fn open_stream(&mut self, prio: u8, promises: EnumSet<Promise>) -> u32 {
// validate promises
if let Some(stream_id_pool) = &mut self.stream_id_pool {
let sid = stream_id_pool.next();
let stream = Stream::new(sid, prio, promises.clone());
self.streams.push(stream);
self.send_queue.push_back(Frame::OpenStream {
sid,
prio,
promises,
});
return sid;
}
error!("fix me");
return 0;
//TODO: fix me
}
pub(crate) fn close_stream(&mut self, sid: u32) {
self.streams.retain(|stream| stream.sid() != sid);
self.send_queue.push_back(Frame::CloseStream { sid });
}
pub(crate) fn handshake(&mut self) {
self.send_queue.push_back(Frame::Handshake {
magic_number: VELOREN_MAGIC_NUMBER.to_string(),
version: VELOREN_NETWORK_VERSION,
});
self.send_handshake = true;
}
pub(crate) fn shutdown(&mut self) {
self.send_queue.push_back(Frame::Shutdown {});
self.send_shutdown = true;
}
pub(crate) fn send(&mut self, outgoing: OutGoingMessage) {
//TODO: fix me
for s in self.streams.iter_mut() {
s.to_send.push_back(outgoing);
break;
}
}
pub(crate) fn get_handle(&self) -> &P::Handle { self.protocol.get_handle() }
} }

View File

@ -5,10 +5,17 @@
communication is done via channels. communication is done via channels.
*/ */
pub mod channel; pub mod channel;
pub mod mpsc;
pub mod tcp; pub mod tcp;
pub mod types; pub mod types;
pub mod udp;
pub mod worker; pub mod worker;
pub(crate) use channel::Channel;
pub(crate) use mpsc::MpscChannel;
pub(crate) use tcp::TcpChannel;
pub(crate) use udp::UdpChannel;
use crate::{ use crate::{
internal::RemoteParticipant, internal::RemoteParticipant,
worker::{ worker::{

View File

@ -0,0 +1,55 @@
use crate::worker::{channel::ChannelProtocol, types::Frame};
use mio_extras::channel::{Receiver, Sender};
use tracing::*;
pub(crate) struct MpscChannel {
endpoint_sender: Sender<Frame>,
endpoint_receiver: Receiver<Frame>,
}
impl MpscChannel {}
impl ChannelProtocol for MpscChannel {
type Handle = Receiver<Frame>;
/// Execute when ready to read
fn read(&mut self) -> Vec<Frame> {
let mut result = Vec::new();
loop {
match self.endpoint_receiver.try_recv() {
Ok(frame) => {
trace!("incomming message");
result.push(frame);
},
Err(std::sync::mpsc::TryRecvError::Empty) => {
debug!("would block");
break;
},
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
panic!("disconnected");
},
};
}
result
}
/// Execute when ready to write
fn write(&mut self, frame: Frame) {
match self.endpoint_sender.send(frame) {
Ok(n) => {
trace!("semded");
},
Err(mio_extras::channel::SendError::Io(e))
if e.kind() == std::io::ErrorKind::WouldBlock =>
{
debug!("would block");
return;
}
Err(e) => {
panic!("{}", e);
},
};
}
fn get_handle(&self) -> &Self::Handle { &self.endpoint_receiver }
}

View File

@ -1,69 +1,51 @@
use crate::{ use crate::worker::{channel::ChannelProtocol, types::Frame};
api::Promise,
internal::{RemoteParticipant, VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION},
message::OutGoingMessage,
worker::{
channel::{Channel, ChannelState},
types::{Pid, RtrnMsg, Stream, TcpFrame},
},
};
use bincode; use bincode;
use enumset::EnumSet; use mio::net::TcpStream;
use mio::{self, net::TcpStream}; use std::io::{Read, Write};
use mio_extras::channel::Sender;
use std::{
collections::HashMap,
io::{Read, Write},
sync::{Arc, RwLock},
time::Instant,
};
use tracing::*; use tracing::*;
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct TcpChannel { pub(crate) struct TcpChannel {
state: ChannelState, endpoint: TcpStream,
pub tcpstream: TcpStream, //these buffers only ever contain 1 FRAME !
read_buffer: Vec<u8>,
write_buffer: Vec<u8>,
} }
impl TcpChannel { impl TcpChannel {
pub fn new( pub fn new(endpoint: TcpStream) -> Self {
tcpstream: TcpStream, let mut b = vec![0; 200];
local_pid: Pid, Self {
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>, endpoint,
) -> Self { read_buffer: b.clone(),
TcpChannel { write_buffer: b,
state: ChannelState::new(local_pid, remotes),
tcpstream,
} }
} }
} }
impl Channel for TcpChannel { impl ChannelProtocol for TcpChannel {
fn read( type Handle = TcpStream;
&mut self,
uninitialized_dirty_speed_buffer: &mut [u8; 65000], /// Execute when ready to read
aprox_time: Instant, fn read(&mut self) -> Vec<Frame> {
rtrn_tx: &Sender<RtrnMsg>, let mut result = Vec::new();
) { match self.endpoint.read(self.read_buffer.as_mut_slice()) {
let pid = self.state.remote_pid;
let span = span!(Level::INFO, "channel", ?pid);
let _enter = span.enter();
match self.tcpstream.read(uninitialized_dirty_speed_buffer) {
Ok(n) => { Ok(n) => {
trace!("incomming message with len: {}", n); trace!("incomming message with len: {}", n);
let mut cur = std::io::Cursor::new(&uninitialized_dirty_speed_buffer[..n]); let mut cur = std::io::Cursor::new(&self.read_buffer[..n]);
while cur.position() < n as u64 { while cur.position() < n as u64 {
let r: Result<TcpFrame, _> = bincode::deserialize_from(&mut cur); let r: Result<Frame, _> = bincode::deserialize_from(&mut cur);
match r { match r {
Ok(frame) => self.state.handle(frame, rtrn_tx), Ok(frame) => result.push(frame),
Err(e) => { Err(e) => {
error!( error!(
?self, ?self,
?e, ?e,
"failure parsing a message with len: {}, starting with: {:?}", "failure parsing a message with len: {}, starting with: {:?}",
n, n,
&uninitialized_dirty_speed_buffer[0..std::cmp::min(n, 10)] &self.read_buffer[0..std::cmp::min(n, 10)]
); );
break;
}, },
} }
} }
@ -75,18 +57,17 @@ impl Channel for TcpChannel {
panic!("{}", e); panic!("{}", e);
}, },
}; };
result
} }
fn write(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant) { /// Execute when ready to write
let pid = self.state.remote_pid; fn write(&mut self, frame: Frame) {
let span = span!(Level::INFO, "channel", ?pid); if let Ok(mut data) = bincode::serialize(&frame) {
let _enter = span.enter();
loop {
while let Some(elem) = self.state.send_queue.pop_front() {
if let Ok(mut data) = bincode::serialize(&elem) {
let total = data.len(); let total = data.len();
match self.tcpstream.write(&data) { match self.endpoint.write(&data) {
Ok(n) if n == total => {}, Ok(n) if n == total => {
trace!("send!");
},
Ok(n) => { Ok(n) => {
error!("could only send part"); error!("could only send part");
//let data = data.drain(n..).collect(); //TODO: //let data = data.drain(n..).collect(); //TODO:
@ -103,57 +84,6 @@ impl Channel for TcpChannel {
}; };
}; };
} }
// run streams
self.state.tick_streams();
if self.state.send_queue.is_empty() {
break;
}
}
}
fn open_stream(&mut self, prio: u8, promises: EnumSet<Promise>) -> u32 { fn get_handle(&self) -> &Self::Handle { &self.endpoint }
// validate promises
if let Some(stream_id_pool) = &mut self.state.stream_id_pool {
let sid = stream_id_pool.next();
let stream = Stream::new(sid, prio, promises.clone());
self.state.streams.push(stream);
self.state.send_queue.push_back(TcpFrame::OpenStream {
sid,
prio,
promises,
});
return sid;
}
error!("fix me");
return 0;
//TODO: fix me
}
fn close_stream(&mut self, sid: u32) {
self.state.streams.retain(|stream| stream.sid() != sid);
self.state
.send_queue
.push_back(TcpFrame::CloseStream { sid });
}
fn handshake(&mut self) {
self.state.send_queue.push_back(TcpFrame::Handshake {
magic_number: VELOREN_MAGIC_NUMBER.to_string(),
version: VELOREN_NETWORK_VERSION,
});
self.state.send_handshake = true;
}
fn shutdown(&mut self) {
self.state.send_queue.push_back(TcpFrame::Shutdown {});
self.state.send_shutdown = true;
}
fn send(&mut self, outgoing: OutGoingMessage) {
//TODO: fix me
for s in self.state.streams.iter_mut() {
s.to_send.push_back(outgoing);
break;
}
}
} }

View File

@ -1,10 +1,11 @@
use crate::{ use crate::{
api::Promise, api::Promise,
message::{InCommingMessage, OutGoingMessage}, message::{InCommingMessage, OutGoingMessage},
worker::tcp::TcpChannel, worker::{Channel, MpscChannel, TcpChannel, UdpChannel},
}; };
use enumset::EnumSet; use enumset::EnumSet;
use mio::{self, net::TcpListener, PollOpt, Ready}; use mio::{self, net::TcpListener, PollOpt, Ready};
use mio_extras::channel::Sender;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::VecDeque; use std::collections::VecDeque;
use uuid::Uuid; use uuid::Uuid;
@ -50,10 +51,23 @@ pub struct Statistics {
pub nano_busy: u128, pub nano_busy: u128,
} }
#[derive(Debug)]
pub(crate) enum TokenObjects { pub(crate) enum TokenObjects {
TcpListener(TcpListener), TcpListener(TcpListener),
TcpChannel(TcpChannel), TcpChannel(Channel<TcpChannel>, Option<Sender<Pid>>),
UdpChannel(Channel<UdpChannel>, Option<Sender<Pid>>),
MpscChannel(Channel<MpscChannel>, Option<Sender<Pid>>),
}
impl std::fmt::Debug for TokenObjects {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TokenObjects::TcpListener(l) => write!(f, "{:?}", l),
TokenObjects::TcpChannel(c, _) => write!(f, "{:?}", c),
TokenObjects::UdpChannel(c, _) => write!(f, "{:?}", c),
TokenObjects::MpscChannel(c, _) => unimplemented!("MPSC"),
}
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -121,6 +135,3 @@ pub(crate) enum Frame {
* against veloren Server! */ * against veloren Server! */
Raw(Vec<u8>), Raw(Vec<u8>),
} }
pub(crate) type TcpFrame = Frame;
pub(crate) type UdpFrame = Frame;

84
network/src/worker/udp.rs Normal file
View File

@ -0,0 +1,84 @@
use crate::worker::{channel::ChannelProtocol, types::Frame};
use bincode;
use mio::net::UdpSocket;
use tracing::*;
#[derive(Debug)]
pub(crate) struct UdpChannel {
endpoint: UdpSocket,
read_buffer: Vec<u8>,
write_buffer: Vec<u8>,
}
impl UdpChannel {
pub fn new(endpoint: UdpSocket) -> Self {
Self {
endpoint,
read_buffer: Vec::new(),
write_buffer: Vec::new(),
}
}
}
impl ChannelProtocol for UdpChannel {
type Handle = UdpSocket;
/// Execute when ready to read
fn read(&mut self) -> Vec<Frame> {
let mut result = Vec::new();
match self.endpoint.recv_from(self.read_buffer.as_mut_slice()) {
Ok((n, remote)) => {
trace!("incomming message with len: {}", n);
let mut cur = std::io::Cursor::new(&self.read_buffer[..n]);
while cur.position() < n as u64 {
let r: Result<Frame, _> = bincode::deserialize_from(&mut cur);
match r {
Ok(frame) => result.push(frame),
Err(e) => {
error!(
?self,
?e,
"failure parsing a message with len: {}, starting with: {:?}",
n,
&self.read_buffer[0..std::cmp::min(n, 10)]
);
break;
},
}
}
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("would block");
},
Err(e) => {
panic!("{}", e);
},
};
result
}
/// Execute when ready to write
fn write(&mut self, frame: Frame) {
if let Ok(mut data) = bincode::serialize(&frame) {
let total = data.len();
match self.endpoint.send(&data) {
Ok(n) if n == total => {},
Ok(n) => {
error!("could only send part");
//let data = data.drain(n..).collect(); //TODO:
// validate n.. is correct
// to_send.push_front(data);
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("would block");
return;
},
Err(e) => {
panic!("{}", e);
},
};
};
}
fn get_handle(&self) -> &Self::Handle { &self.endpoint }
}

View File

@ -1,10 +1,8 @@
use crate::{ use crate::{
internal::RemoteParticipant, internal::RemoteParticipant,
worker::{ worker::{
channel::Channel,
tcp::TcpChannel,
types::{CtrlMsg, Pid, RtrnMsg, Statistics, TokenObjects}, types::{CtrlMsg, Pid, RtrnMsg, Statistics, TokenObjects},
Controller, Channel, Controller, TcpChannel,
}, },
}; };
use mio::{self, Poll, PollOpt, Ready, Token}; use mio::{self, Poll, PollOpt, Ready, Token};
@ -49,7 +47,6 @@ pub(crate) struct Worker {
ctrl_rx: Receiver<CtrlMsg>, ctrl_rx: Receiver<CtrlMsg>,
rtrn_tx: Sender<RtrnMsg>, rtrn_tx: Sender<RtrnMsg>,
mio_tokens: MioTokens, mio_tokens: MioTokens,
buf: [u8; 65000],
time_before_poll: Instant, time_before_poll: Instant,
time_after_poll: Instant, time_after_poll: Instant,
} }
@ -73,7 +70,6 @@ impl Worker {
ctrl_rx, ctrl_rx,
rtrn_tx, rtrn_tx,
mio_tokens, mio_tokens,
buf: [0; 65000],
time_before_poll: Instant::now(), time_before_poll: Instant::now(),
time_after_poll: Instant::now(), time_after_poll: Instant::now(),
} }
@ -118,9 +114,9 @@ impl Worker {
CtrlMsg::Shutdown => { CtrlMsg::Shutdown => {
debug!("Shutting Down"); debug!("Shutting Down");
for (tok, obj) in self.mio_tokens.tokens.iter_mut() { for (tok, obj) in self.mio_tokens.tokens.iter_mut() {
if let TokenObjects::TcpChannel(channel) = obj { if let TokenObjects::TcpChannel(channel, _) = obj {
channel.shutdown(); channel.shutdown();
channel.write(&mut self.buf, self.time_after_poll); channel.tick_send();
} }
} }
return true; return true;
@ -131,9 +127,17 @@ impl Worker {
TokenObjects::TcpListener(h) => { TokenObjects::TcpListener(h) => {
self.poll.register(h, tok, interest, opts).unwrap() self.poll.register(h, tok, interest, opts).unwrap()
}, },
TokenObjects::TcpChannel(channel) => self TokenObjects::TcpChannel(channel, _) => self
.poll .poll
.register(&channel.tcpstream, tok, interest, opts) .register(channel.get_handle(), tok, interest, opts)
.unwrap(),
TokenObjects::UdpChannel(channel, _) => self
.poll
.register(channel.get_handle(), tok, interest, opts)
.unwrap(),
TokenObjects::MpscChannel(channel, _) => self
.poll
.register(channel.get_handle(), tok, interest, opts)
.unwrap(), .unwrap(),
} }
debug!(?handle, ?tok, "Registered new handle"); debug!(?handle, ?tok, "Registered new handle");
@ -145,9 +149,9 @@ impl Worker {
promises, promises,
} => { } => {
for (tok, obj) in self.mio_tokens.tokens.iter_mut() { for (tok, obj) in self.mio_tokens.tokens.iter_mut() {
if let TokenObjects::TcpChannel(channel) = obj { if let TokenObjects::TcpChannel(channel, _) = obj {
channel.open_stream(prio, promises); //TODO: check participant channel.open_stream(prio, promises); //TODO: check participant
channel.write(&mut self.buf, self.time_after_poll); channel.tick_send();
} }
} }
//TODO: //TODO:
@ -155,18 +159,18 @@ impl Worker {
CtrlMsg::CloseStream { pid, sid } => { CtrlMsg::CloseStream { pid, sid } => {
//TODO: //TODO:
for to in self.mio_tokens.tokens.values_mut() { for to in self.mio_tokens.tokens.values_mut() {
if let TokenObjects::TcpChannel(channel) = to { if let TokenObjects::TcpChannel(channel, _) = to {
channel.close_stream(sid); //TODO: check participant channel.close_stream(sid); //TODO: check participant
channel.write(&mut self.buf, self.time_after_poll); channel.tick_send();
} }
} }
}, },
CtrlMsg::Send(outgoing) => { CtrlMsg::Send(outgoing) => {
//TODO: //TODO:
for to in self.mio_tokens.tokens.values_mut() { for to in self.mio_tokens.tokens.values_mut() {
if let TokenObjects::TcpChannel(channel) = to { if let TokenObjects::TcpChannel(channel, _) = to {
channel.send(outgoing); //TODO: check participant channel.send(outgoing); //TODO: check participant
channel.write(&mut self.buf, self.time_after_poll); channel.tick_send();
break; break;
} }
} }
@ -196,26 +200,51 @@ impl Worker {
) )
.unwrap(); .unwrap();
trace!(?remote_stream, ?tok, "registered"); trace!(?remote_stream, ?tok, "registered");
let mut channel = let tcp_channel = TcpChannel::new(remote_stream);
TcpChannel::new(remote_stream, self.pid, self.remotes.clone()); let mut channel = Channel::new(self.pid, tcp_channel, self.remotes.clone());
channel.handshake(); channel.handshake();
channel.tick_send();
self.mio_tokens self.mio_tokens
.tokens .tokens
.insert(tok, TokenObjects::TcpChannel(channel)); .insert(tok, TokenObjects::TcpChannel(channel, None));
}, },
Err(err) => { Err(err) => {
error!(?err, "error during remote connected"); error!(?err, "error during remote connected");
}, },
}, },
TokenObjects::TcpChannel(channel) => { TokenObjects::TcpChannel(channel, _) => {
if event.readiness().is_readable() { if event.readiness().is_readable() {
trace!(?channel.tcpstream, "stream readable"); let handle = channel.get_handle();
channel.read(&mut self.buf, self.time_after_poll, &self.rtrn_tx); trace!(?handle, "stream readable");
channel.tick_recv(&self.rtrn_tx);
} }
if event.readiness().is_writable() { if event.readiness().is_writable() {
trace!(?channel.tcpstream, "stream writeable"); let handle = channel.get_handle();
channel.write(&mut self.buf, self.time_after_poll); trace!(?handle, "stream writeable");
channel.tick_send();
}
},
TokenObjects::UdpChannel(channel, _) => {
if event.readiness().is_readable() {
let handle = channel.get_handle();
trace!(?handle, "stream readable");
channel.tick_recv(&self.rtrn_tx);
}
if event.readiness().is_writable() {
let handle = channel.get_handle();
trace!(?handle, "stream writeable");
channel.tick_send();
}
},
TokenObjects::MpscChannel(channel, _) => {
if event.readiness().is_readable() {
let handle = channel.get_handle();
channel.tick_recv(&self.rtrn_tx);
}
if event.readiness().is_writable() {
let handle = channel.get_handle();
channel.tick_send();
} }
}, },
}; };