From 0777d70a0e3de9c5a75884518423dbcc679def5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Mon, 10 Feb 2020 18:25:47 +0100 Subject: [PATCH] Converting the API interface to Async and experimenting with a Channel implementation for TCP, UDP, MPSC, which will later be reverted It should compile and tests run fine now. If not, the 2nd last squashed commit message said it currently only send frames but not incomming messages, also recv would only handle frames. The last one said i added internal messages and a reverse path (prob for .recv) --- Cargo.lock | 55 +++++---- network/Cargo.toml | 1 + network/src/api.rs | 226 +++++++++++++++++++++++----------- network/src/lib.rs | 73 ++++++++--- network/src/worker/channel.rs | 105 ++++++++++++---- network/src/worker/mod.rs | 7 ++ network/src/worker/mpsc.rs | 55 +++++++++ network/src/worker/tcp.rs | 170 ++++++++----------------- network/src/worker/types.rs | 23 +++- network/src/worker/udp.rs | 84 +++++++++++++ network/src/worker/worker.rs | 75 +++++++---- 11 files changed, 583 insertions(+), 291 deletions(-) create mode 100644 network/src/worker/mpsc.rs create mode 100644 network/src/worker/udp.rs diff --git a/Cargo.lock b/Cargo.lock index d6d4264afa..1dbc92c4fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -74,9 +74,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "013a6e0a2cbe3d20f9c60b65458f7a7f7a5e636c5d0f45a5a6aee5d4b1f01785" +checksum = "d9a60d744a80c30fcb657dfe2c1b22bcb3e814c1a1e3674f32bf5820b570fbff" [[package]] name = "anymap" @@ -375,7 +375,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "parking_lot 0.10.0", + "parking_lot 0.10.1", "slab", ] @@ -423,9 +423,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.2.1" +version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12ae9db68ad7fac5fe51304d20f016c911539251075a214f8e663babefa35187" +checksum = "1f359dc14ff8911330a51ef78022d376f25ed00248912803b58f00cb1c27f742" [[package]] name = "byteorder" @@ -695,9 +695,9 @@ dependencies = [ [[package]] name = "const-tweaker" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a7081900ff8f4b89046f8898eb8af6ed26be5a47299c56147d5a7dac74298b0" +checksum = "7fbe3e1d2fccd896d451adb486910a0bfc233fd6dcafdb4e13bac7de72f8f250" dependencies = [ "anyhow", "async-std", @@ -1066,9 +1066,9 @@ dependencies = [ [[package]] name = "dashmap" -version = "3.7.0" +version = "3.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "010ef3f25ed5bb93505a3238d19957622190268640526aab07174c66ccf5d611" +checksum = "0f87a04c37da1d3d27db1fb7f372802b72fb8c3ff3e9c0914530995127f4a6a1" dependencies = [ "ahash 0.3.2", "cfg-if", @@ -3178,12 +3178,12 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92e98c49ab0b7ce5b222f2cc9193fc4efe11c6d0bd4f648e374684a6857b1cfc" +checksum = "6fdfcb5f20930a79e326f7ec992a9fdb5b7bd809254b1e735bdd5a99f78bee0d" dependencies = [ "lock_api", - "parking_lot_core 0.7.0", + "parking_lot_core 0.7.2", ] [[package]] @@ -3215,9 +3215,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.7.0" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7582838484df45743c8434fbff785e8edf260c28748353d44bc0da32e0ceabf1" +checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3" dependencies = [ "cfg-if", "cloudabi", @@ -3420,9 +3420,9 @@ dependencies = [ [[package]] name = "proc-macro-nested" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "369a6ed065f249a159e06c45752c780bda2fb53c995718f9e484d08daa9eb42e" +checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" [[package]] name = "proc-macro2" @@ -4144,18 +4144,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.105" +version = "1.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e707fbbf255b8fc8c3b99abb91e7257a622caeb20a9818cbadbeeede4e0932ff" +checksum = "36df6ac6412072f67cf767ebbde4133a5b2e88e76dc6187fa7104cd16f783399" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.105" +version = "1.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac5d00fc561ba2724df6758a17de23df5914f20e41cb00f94d5b7ae42fffaff8" +checksum = "9e549e3abf4fb8621bd1609f11dfc9f5e50320802273b12f3811a67e6716ea6c" dependencies = [ "proc-macro2 1.0.9", "quote 1.0.3", @@ -5086,9 +5086,9 @@ dependencies = [ [[package]] name = "vek" -version = "0.10.0" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c98f7e1c1400d5b1704baee82cbc56a3fde406769555ead0f2306e43ebab967" +checksum = "761f71ebd4296be71d1c584aa41a1ab8f3e5e646357fefce387b54381c151926" dependencies = [ "approx 0.3.2", "num-integer", @@ -5120,7 +5120,7 @@ dependencies = [ "num_cpus", "specs", "uvth", - "vek 0.10.0", + "vek 0.10.2", "veloren-common", ] @@ -5154,7 +5154,7 @@ dependencies = [ "specs", "specs-idvs", "sum_type", - "vek 0.10.0", + "vek 0.10.2", ] [[package]] @@ -5164,6 +5164,7 @@ dependencies = [ "bincode", "byteorder 1.3.4", "enumset", + "futures 0.3.4", "mio", "mio-extras", "serde", @@ -5202,7 +5203,7 @@ dependencies = [ "specs", "specs-idvs", "uvth", - "vek 0.10.0", + "vek 0.10.2", "veloren-common", "veloren-world", ] @@ -5260,7 +5261,7 @@ dependencies = [ "specs-idvs", "treeculler", "uvth", - "vek 0.10.0", + "vek 0.10.2", "veloren-client", "veloren-common", "veloren-server", @@ -5295,7 +5296,7 @@ dependencies = [ "roots", "serde", "serde_derive", - "vek 0.10.0", + "vek 0.10.2", "veloren-common", ] diff --git a/network/Cargo.toml b/network/Cargo.toml index b72633bc6b..c08e1032cf 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -18,5 +18,6 @@ tracing = "0.1" tracing-subscriber = "0.2.0-alpha.4" byteorder = "1.3" mio-extras = "2.0" +futures = "0.3" uuid = { version = "0.8", features = ["serde", "v4"] } tlid = { path = "../../tlid", features = ["serde"]} \ No newline at end of file diff --git a/network/src/api.rs b/network/src/api.rs index 00fed03bfd..a3e534a57f 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -2,10 +2,8 @@ use crate::{ internal::RemoteParticipant, message::{self, OutGoingMessage}, worker::{ - channel::Channel, - tcp::TcpChannel, types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects}, - Controller, + Channel, Controller, TcpChannel, }, }; use enumset::*; @@ -42,6 +40,7 @@ pub enum Promise { pub struct Participant { addr: Address, + remote_pid: Pid, } pub struct Connection {} @@ -114,12 +113,15 @@ impl Network { // should almost ever be empty except for new channel creations and stream // creations! for worker in self.controller.iter() { - worker.get_tx().send(CtrlMsg::Send(OutGoingMessage { - buffer: messagebuffer.clone(), - cursor: 0, - mid: None, - sid: stream.sid, - })); + worker + .get_tx() + .send(CtrlMsg::Send(OutGoingMessage { + buffer: messagebuffer.clone(), + cursor: 0, + mid: None, + sid: stream.sid, + })) + .unwrap(); } } @@ -146,72 +148,154 @@ impl Network { None } - pub fn listen(&self, addr: &Address) { - let worker = Self::get_lowest_worker(&self.controller); - let pipe = worker.get_tx(); - let address = addr.clone(); - self.thread_pool.execute(move || { - let span = span!(Level::INFO, "listen", ?address); - let _enter = span.enter(); - match address { - Address::Tcp(a) => { - info!("listening"); - let tcp_listener = TcpListener::bind(&a).unwrap(); - pipe.send(CtrlMsg::Register( - TokenObjects::TcpListener(tcp_listener), - Ready::readable(), - PollOpt::edge(), - )) - .unwrap(); - }, - Address::Udp(_) => unimplemented!("lazy me"), - } - }); - } - - pub fn connect(&self, addr: &Address) -> Participant { - let worker = Self::get_lowest_worker(&self.controller); - let pipe = worker.get_tx(); - let address = addr.clone(); - let pid = self.participant_id; - let remotes = self.remotes.clone(); - self.thread_pool.execute(move || { - let mut span = span!(Level::INFO, "connect", ?address); - let _enter = span.enter(); - match address { - Address::Tcp(a) => { - info!("connecting"); - let tcp_stream = match TcpStream::connect(&a) { - Err(err) => { - error!("could not open connection: {}", err); - return; - }, - Ok(s) => s, - }; - let mut channel = TcpChannel::new(tcp_stream, pid, remotes); - pipe.send(CtrlMsg::Register( - TokenObjects::TcpChannel(channel), - Ready::readable() | Ready::writable(), - PollOpt::edge(), - )) - .unwrap(); - }, - Address::Udp(_) => unimplemented!("lazy me"), - } - }); - Participant { addr: addr.clone() } - } - - pub fn open(&self, part: Participant, prio: u8, promises: EnumSet) -> Stream { + pub fn open(&self, part: &Participant, prio: u8, promises: EnumSet) -> Stream { for worker in self.controller.iter() { - worker.get_tx().send(CtrlMsg::OpenStream { - pid: uuid::Uuid::new_v4(), - prio, - promises, - }); + worker + .get_tx() + .send(CtrlMsg::OpenStream { + pid: uuid::Uuid::new_v4(), + prio, + promises, + }) + .unwrap(); } Stream { sid: 0 } } pub fn close(&self, stream: Stream) {} + + pub async fn listen(&self, address: &Address) -> Result<(), NetworkError> { + let span = span!(Level::TRACE, "listen", ?address); + let worker = Self::get_lowest_worker(&self.controller); + let _enter = span.enter(); + match address { + Address::Tcp(a) => { + let tcp_listener = TcpListener::bind(&a)?; + info!("listening"); + worker.get_tx().send(CtrlMsg::Register( + TokenObjects::TcpListener(tcp_listener), + Ready::readable(), + PollOpt::edge(), + ))?; + }, + Address::Udp(_) => unimplemented!("lazy me"), + }; + Ok(()) + } + + pub async fn connect(&self, address: &Address) -> Result { + let worker = Self::get_lowest_worker(&self.controller); + let pid = self.participant_id; + let remotes = self.remotes.clone(); + let mut span = span!(Level::INFO, "connect", ?address); + let _enter = span.enter(); + match address { + Address::Tcp(a) => { + info!("connecting"); + let tcp_stream = TcpStream::connect(&a)?; + let tcp_channel = TcpChannel::new(tcp_stream); + let mut channel = Channel::new(pid, tcp_channel, remotes); + let (ctrl_tx, ctrl_rx) = mio_extras::channel::channel::(); + worker.get_tx().send(CtrlMsg::Register( + TokenObjects::TcpChannel(channel, Some(ctrl_tx)), + Ready::readable() | Ready::writable(), + PollOpt::edge(), + ))?; + // wait for a return + }, + Address::Udp(_) => unimplemented!("lazy me"), + } + + Ok(Participant { + addr: address.clone(), + remote_pid: uuid::Uuid::new_v4(), + }) + } + + //TODO: evaluate if move to Participant + pub async fn _disconnect(&self, participant: Participant) -> Result<(), NetworkError> { + panic!("sda"); + } + + pub fn participants(&self) -> Vec { + panic!("sda"); + } + + pub async fn _connected(&self) -> Result { + // returns if a Participant connected and is ready + panic!("sda"); + } + + pub async fn _disconnected(&self) -> Result { + // returns if a Participant connected and is ready + panic!("sda"); + } + + pub async fn multisend( + &self, + streams: Vec, + msg: M, + ) -> Result<(), NetworkError> { + panic!("sda"); + } +} + +impl Participant { + pub async fn _open( + &self, + prio: u8, + promises: EnumSet, + ) -> Result { + panic!("sda"); + } + + pub async fn _close(&self, stream: Stream) -> Result<(), ParticipantError> { + panic!("sda"); + } + + pub async fn _opened(&self) -> Result { + panic!("sda"); + } + + pub async fn _closed(&self) -> Result { + panic!("sda"); + } +} + +impl Stream { + //TODO: What about SEND instead of Serializeable if it goes via PIPE ? + //TODO: timeout per message or per stream ? stream or ? + + pub async fn _send(&self, msg: M) -> Result<(), StreamError> { + panic!("sda"); + } + + pub async fn _recv(&self) -> Result { + panic!("sda"); + } +} + +#[derive(Debug)] +pub enum NetworkError { + NetworkDestroyed, + WorkerDestroyed, + IoError(std::io::Error), +} + +#[derive(Debug)] +pub enum ParticipantError { + ParticipantDisconected, +} + +#[derive(Debug)] +pub enum StreamError { + StreamClosed, +} + +impl From for NetworkError { + fn from(err: std::io::Error) -> Self { NetworkError::IoError(err) } +} + +impl From> for NetworkError { + fn from(err: mio_extras::channel::SendError) -> Self { NetworkError::WorkerDestroyed } } diff --git a/network/src/lib.rs b/network/src/lib.rs index 80dda8e210..27aff0dea2 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -7,6 +7,7 @@ mod worker; #[cfg(test)] pub mod tests { use crate::api::*; + use futures::executor::block_on; use std::{net::SocketAddr, sync::Arc}; use tracing::*; use uuid::Uuid; @@ -27,8 +28,6 @@ pub mod tests { } pub fn test_tracing() { - use tracing::Level; - tracing_subscriber::FmtSubscriber::builder() // all spans/events with a level higher than TRACE (e.g, info, warn, etc.) // will be written to stdout. @@ -43,8 +42,51 @@ pub mod tests { assert_eq!(2 + 2, 4); } + /* + #[test] + #[ignore] + fn client_server() { + let thread_pool = Arc::new( + ThreadPoolBuilder::new() + .name("veloren-network-test".into()) + .build(), + ); + test_tracing(); + let n1 = Network::::new(Uuid::new_v4(), thread_pool.clone()); + let n2 = Network::::new(Uuid::new_v4(), thread_pool.clone()); + let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000))); + let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52001))); + n1.listen(&a1); //await + n2.listen(&a2); // only requiered here, but doesnt hurt on n1 + std::thread::sleep(std::time::Duration::from_millis(20)); + + let p1 = n1.connect(&a2); //await + //n2.OnRemoteConnectionOpen triggered + std::thread::sleep(std::time::Duration::from_millis(20)); + + let s1 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt); + std::thread::sleep(std::time::Duration::from_millis(20)); + //n2.OnRemoteStreamOpen triggered + + n1.send("Hello World", &s1); + std::thread::sleep(std::time::Duration::from_millis(20)); + // receive on n2 now + + let s: Option = n2.recv(&s1); + for _ in 1..4 { + error!("{:?}", s); + } + assert_eq!(s, Some("Hello World".to_string())); + + n1.close(s1); + //n2.OnRemoteStreamClose triggered + + std::thread::sleep(std::time::Duration::from_millis(20000)); + } + */ + #[test] - fn client_server() { + fn client_server_stream() { let thread_pool = Arc::new( ThreadPoolBuilder::new() .name("veloren-network-test".into()) @@ -53,33 +95,28 @@ pub mod tests { test_tracing(); let n1 = Network::::new(Uuid::new_v4(), thread_pool.clone()); let n2 = Network::::new(Uuid::new_v4(), thread_pool.clone()); - let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000))); - let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52001))); - n1.listen(&a1); //await - n2.listen(&a2); // only requiered here, but doesnt hurt on n1 + let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52010))); + let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52011))); + block_on(n1.listen(&a1)).unwrap(); //await + block_on(n2.listen(&a2)).unwrap(); // only requiered here, but doesnt hurt on n1 std::thread::sleep(std::time::Duration::from_millis(20)); - let p1 = n1.connect(&a2); //await - //n2.OnRemoteConnectionOpen triggered + let p1 = block_on(n1.connect(&a2)); //await + let p1 = p1.unwrap(); std::thread::sleep(std::time::Duration::from_millis(20)); - let s1 = n1.open(p1, 16, Promise::InOrder | Promise::NoCorrupt); + let s1 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt); + //let s2 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt); std::thread::sleep(std::time::Duration::from_millis(20)); - //n2.OnRemoteStreamOpen triggered n1.send("Hello World", &s1); std::thread::sleep(std::time::Duration::from_millis(20)); - // receive on n2 now + + std::thread::sleep(std::time::Duration::from_millis(1000)); let s: Option = n2.recv(&s1); - for _ in 1..4 { - error!("{:?}", s); - } assert_eq!(s, Some("Hello World".to_string())); n1.close(s1); - //n2.OnRemoteStreamClose triggered - - std::thread::sleep(std::time::Duration::from_millis(20000)); } } diff --git a/network/src/worker/channel.rs b/network/src/worker/channel.rs index 5c687acda5..b943c5cb9f 100644 --- a/network/src/worker/channel.rs +++ b/network/src/worker/channel.rs @@ -9,33 +9,21 @@ use mio_extras::channel::Sender; use std::{ collections::{HashMap, VecDeque}, sync::{Arc, RwLock}, - time::Instant, }; use tracing::*; -pub(crate) trait Channel { - /* - uninitialized_dirty_speed_buffer: is just a already allocated buffer, that probably is already dirty because it's getting reused to save allocations, feel free to use it, but expect nothing - aprox_time is the time taken when the events come in, you can reuse it for message timeouts, to not make any more syscalls - */ +pub(crate) trait ChannelProtocol { + type Handle: ?Sized + mio::Evented; /// Execute when ready to read - fn read( - &mut self, - uninitialized_dirty_speed_buffer: &mut [u8; 65000], - aprox_time: Instant, - rtrn_tx: &Sender, - ); + fn read(&mut self) -> Vec; /// Execute when ready to write - fn write(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant); - fn open_stream(&mut self, prio: u8, promises: EnumSet) -> u32; - fn close_stream(&mut self, sid: u32); - fn handshake(&mut self); - fn shutdown(&mut self); - fn send(&mut self, outgoing: OutGoingMessage); + fn write(&mut self, frame: Frame); + /// used for mio + fn get_handle(&self) -> &Self::Handle; } #[derive(Debug)] -pub(crate) struct ChannelState { +pub(crate) struct Channel { pub stream_id_pool: Option>>, /* TODO: stream_id unique per * participant */ pub msg_id_pool: Option>>, //TODO: msg_id unique per @@ -46,6 +34,7 @@ pub(crate) struct ChannelState { pub streams: Vec, pub send_queue: VecDeque, pub recv_queue: VecDeque, + pub protocol: P, pub send_handshake: bool, pub send_pid: bool, pub send_config: bool, @@ -70,7 +59,7 @@ pub(crate) struct ChannelState { Shutdown phase */ -impl ChannelState { +impl Channel

{ const WRONG_NUMBER: &'static [u8] = "Handshake does not contain the magic number requiered by \ veloren server.\nWe are not sure if you are a valid \ veloren client.\nClosing the connection" @@ -79,8 +68,12 @@ impl ChannelState { invalid version.\nWe don't know how to communicate with \ you.\n"; - pub fn new(local_pid: Pid, remotes: Arc>>) -> Self { - ChannelState { + pub fn new( + local_pid: Pid, + protocol: P, + remotes: Arc>>, + ) -> Self { + Self { stream_id_pool: None, msg_id_pool: None, local_pid, @@ -89,6 +82,7 @@ impl ChannelState { streams: Vec::new(), send_queue: VecDeque::new(), recv_queue: VecDeque::new(), + protocol, send_handshake: false, send_pid: false, send_config: false, @@ -110,7 +104,20 @@ impl ChannelState { && !self.recv_shutdown } - pub fn handle(&mut self, frame: Frame, rtrn_tx: &Sender) { + pub fn tick_recv(&mut self, rtrn_tx: &Sender) { + for frame in self.protocol.read() { + self.handle(frame, rtrn_tx); + } + } + + pub fn tick_send(&mut self) { + self.tick_streams(); + while let Some(frame) = self.send_queue.pop_front() { + self.protocol.write(frame) + } + } + + fn handle(&mut self, frame: Frame, rtrn_tx: &Sender) { match frame { Frame::Handshake { magic_number, @@ -261,9 +268,9 @@ impl ChannelState { } if let Some(pos) = pos { for m in s.to_receive.drain(pos..pos + 1) { - info!("receied message: {}", m.mid); + info!("received message: {}", m.mid); //self.recv_queue.push_back(m); - rtrn_tx.send(RtrnMsg::Receive(m)); + rtrn_tx.send(RtrnMsg::Receive(m)).unwrap(); } } } @@ -279,7 +286,7 @@ impl ChannelState { // This function will tick all streams according to priority and add them to the // send queue - pub(crate) fn tick_streams(&mut self) { + fn tick_streams(&mut self) { //ignoring prio for now //TODO: fix prio if let Some(msg_id_pool) = &mut self.msg_id_pool { @@ -327,4 +334,50 @@ impl ChannelState { self.send_shutdown = true; } } + + pub(crate) fn open_stream(&mut self, prio: u8, promises: EnumSet) -> u32 { + // validate promises + if let Some(stream_id_pool) = &mut self.stream_id_pool { + let sid = stream_id_pool.next(); + let stream = Stream::new(sid, prio, promises.clone()); + self.streams.push(stream); + self.send_queue.push_back(Frame::OpenStream { + sid, + prio, + promises, + }); + return sid; + } + error!("fix me"); + return 0; + //TODO: fix me + } + + pub(crate) fn close_stream(&mut self, sid: u32) { + self.streams.retain(|stream| stream.sid() != sid); + self.send_queue.push_back(Frame::CloseStream { sid }); + } + + pub(crate) fn handshake(&mut self) { + self.send_queue.push_back(Frame::Handshake { + magic_number: VELOREN_MAGIC_NUMBER.to_string(), + version: VELOREN_NETWORK_VERSION, + }); + self.send_handshake = true; + } + + pub(crate) fn shutdown(&mut self) { + self.send_queue.push_back(Frame::Shutdown {}); + self.send_shutdown = true; + } + + pub(crate) fn send(&mut self, outgoing: OutGoingMessage) { + //TODO: fix me + for s in self.streams.iter_mut() { + s.to_send.push_back(outgoing); + break; + } + } + + pub(crate) fn get_handle(&self) -> &P::Handle { self.protocol.get_handle() } } diff --git a/network/src/worker/mod.rs b/network/src/worker/mod.rs index 05835c82cc..6d9b158cc2 100644 --- a/network/src/worker/mod.rs +++ b/network/src/worker/mod.rs @@ -5,10 +5,17 @@ communication is done via channels. */ pub mod channel; +pub mod mpsc; pub mod tcp; pub mod types; +pub mod udp; pub mod worker; +pub(crate) use channel::Channel; +pub(crate) use mpsc::MpscChannel; +pub(crate) use tcp::TcpChannel; +pub(crate) use udp::UdpChannel; + use crate::{ internal::RemoteParticipant, worker::{ diff --git a/network/src/worker/mpsc.rs b/network/src/worker/mpsc.rs new file mode 100644 index 0000000000..76899f7370 --- /dev/null +++ b/network/src/worker/mpsc.rs @@ -0,0 +1,55 @@ +use crate::worker::{channel::ChannelProtocol, types::Frame}; +use mio_extras::channel::{Receiver, Sender}; +use tracing::*; + +pub(crate) struct MpscChannel { + endpoint_sender: Sender, + endpoint_receiver: Receiver, +} + +impl MpscChannel {} + +impl ChannelProtocol for MpscChannel { + type Handle = Receiver; + + /// Execute when ready to read + fn read(&mut self) -> Vec { + let mut result = Vec::new(); + loop { + match self.endpoint_receiver.try_recv() { + Ok(frame) => { + trace!("incomming message"); + result.push(frame); + }, + Err(std::sync::mpsc::TryRecvError::Empty) => { + debug!("would block"); + break; + }, + Err(std::sync::mpsc::TryRecvError::Disconnected) => { + panic!("disconnected"); + }, + }; + } + result + } + + /// Execute when ready to write + fn write(&mut self, frame: Frame) { + match self.endpoint_sender.send(frame) { + Ok(n) => { + trace!("semded"); + }, + Err(mio_extras::channel::SendError::Io(e)) + if e.kind() == std::io::ErrorKind::WouldBlock => + { + debug!("would block"); + return; + } + Err(e) => { + panic!("{}", e); + }, + }; + } + + fn get_handle(&self) -> &Self::Handle { &self.endpoint_receiver } +} diff --git a/network/src/worker/tcp.rs b/network/src/worker/tcp.rs index d0394b1054..0c25739d6c 100644 --- a/network/src/worker/tcp.rs +++ b/network/src/worker/tcp.rs @@ -1,69 +1,51 @@ -use crate::{ - api::Promise, - internal::{RemoteParticipant, VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION}, - message::OutGoingMessage, - worker::{ - channel::{Channel, ChannelState}, - types::{Pid, RtrnMsg, Stream, TcpFrame}, - }, -}; +use crate::worker::{channel::ChannelProtocol, types::Frame}; use bincode; -use enumset::EnumSet; -use mio::{self, net::TcpStream}; -use mio_extras::channel::Sender; -use std::{ - collections::HashMap, - io::{Read, Write}, - sync::{Arc, RwLock}, - time::Instant, -}; +use mio::net::TcpStream; +use std::io::{Read, Write}; use tracing::*; #[derive(Debug)] pub(crate) struct TcpChannel { - state: ChannelState, - pub tcpstream: TcpStream, + endpoint: TcpStream, + //these buffers only ever contain 1 FRAME ! + read_buffer: Vec, + write_buffer: Vec, } impl TcpChannel { - pub fn new( - tcpstream: TcpStream, - local_pid: Pid, - remotes: Arc>>, - ) -> Self { - TcpChannel { - state: ChannelState::new(local_pid, remotes), - tcpstream, + pub fn new(endpoint: TcpStream) -> Self { + let mut b = vec![0; 200]; + Self { + endpoint, + read_buffer: b.clone(), + write_buffer: b, } } } -impl Channel for TcpChannel { - fn read( - &mut self, - uninitialized_dirty_speed_buffer: &mut [u8; 65000], - aprox_time: Instant, - rtrn_tx: &Sender, - ) { - let pid = self.state.remote_pid; - let span = span!(Level::INFO, "channel", ?pid); - let _enter = span.enter(); - match self.tcpstream.read(uninitialized_dirty_speed_buffer) { +impl ChannelProtocol for TcpChannel { + type Handle = TcpStream; + + /// Execute when ready to read + fn read(&mut self) -> Vec { + let mut result = Vec::new(); + match self.endpoint.read(self.read_buffer.as_mut_slice()) { Ok(n) => { trace!("incomming message with len: {}", n); - let mut cur = std::io::Cursor::new(&uninitialized_dirty_speed_buffer[..n]); + let mut cur = std::io::Cursor::new(&self.read_buffer[..n]); while cur.position() < n as u64 { - let r: Result = bincode::deserialize_from(&mut cur); + let r: Result = bincode::deserialize_from(&mut cur); match r { - Ok(frame) => self.state.handle(frame, rtrn_tx), + Ok(frame) => result.push(frame), Err(e) => { error!( ?self, ?e, "failure parsing a message with len: {}, starting with: {:?}", n, - &uninitialized_dirty_speed_buffer[0..std::cmp::min(n, 10)] + &self.read_buffer[0..std::cmp::min(n, 10)] ); + break; }, } } @@ -75,85 +57,33 @@ impl Channel for TcpChannel { panic!("{}", e); }, }; + result } - fn write(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant) { - let pid = self.state.remote_pid; - let span = span!(Level::INFO, "channel", ?pid); - let _enter = span.enter(); - loop { - while let Some(elem) = self.state.send_queue.pop_front() { - if let Ok(mut data) = bincode::serialize(&elem) { - let total = data.len(); - match self.tcpstream.write(&data) { - Ok(n) if n == total => {}, - Ok(n) => { - error!("could only send part"); - //let data = data.drain(n..).collect(); //TODO: - // validate n.. is correct - // to_send.push_front(data); - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("would block"); - return; - }, - Err(e) => { - panic!("{}", e); - }, - }; - }; - } - // run streams - self.state.tick_streams(); - if self.state.send_queue.is_empty() { - break; - } - } + /// Execute when ready to write + fn write(&mut self, frame: Frame) { + if let Ok(mut data) = bincode::serialize(&frame) { + let total = data.len(); + match self.endpoint.write(&data) { + Ok(n) if n == total => { + trace!("send!"); + }, + Ok(n) => { + error!("could only send part"); + //let data = data.drain(n..).collect(); //TODO: + // validate n.. is correct + // to_send.push_front(data); + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + debug!("would block"); + return; + }, + Err(e) => { + panic!("{}", e); + }, + }; + }; } - fn open_stream(&mut self, prio: u8, promises: EnumSet) -> u32 { - // validate promises - if let Some(stream_id_pool) = &mut self.state.stream_id_pool { - let sid = stream_id_pool.next(); - let stream = Stream::new(sid, prio, promises.clone()); - self.state.streams.push(stream); - self.state.send_queue.push_back(TcpFrame::OpenStream { - sid, - prio, - promises, - }); - return sid; - } - error!("fix me"); - return 0; - //TODO: fix me - } - - fn close_stream(&mut self, sid: u32) { - self.state.streams.retain(|stream| stream.sid() != sid); - self.state - .send_queue - .push_back(TcpFrame::CloseStream { sid }); - } - - fn handshake(&mut self) { - self.state.send_queue.push_back(TcpFrame::Handshake { - magic_number: VELOREN_MAGIC_NUMBER.to_string(), - version: VELOREN_NETWORK_VERSION, - }); - self.state.send_handshake = true; - } - - fn shutdown(&mut self) { - self.state.send_queue.push_back(TcpFrame::Shutdown {}); - self.state.send_shutdown = true; - } - - fn send(&mut self, outgoing: OutGoingMessage) { - //TODO: fix me - for s in self.state.streams.iter_mut() { - s.to_send.push_back(outgoing); - break; - } - } + fn get_handle(&self) -> &Self::Handle { &self.endpoint } } diff --git a/network/src/worker/types.rs b/network/src/worker/types.rs index 590d4d473f..5bec57070c 100644 --- a/network/src/worker/types.rs +++ b/network/src/worker/types.rs @@ -1,10 +1,11 @@ use crate::{ api::Promise, message::{InCommingMessage, OutGoingMessage}, - worker::tcp::TcpChannel, + worker::{Channel, MpscChannel, TcpChannel, UdpChannel}, }; use enumset::EnumSet; use mio::{self, net::TcpListener, PollOpt, Ready}; +use mio_extras::channel::Sender; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use uuid::Uuid; @@ -50,10 +51,23 @@ pub struct Statistics { pub nano_busy: u128, } -#[derive(Debug)] pub(crate) enum TokenObjects { TcpListener(TcpListener), - TcpChannel(TcpChannel), + TcpChannel(Channel, Option>), + UdpChannel(Channel, Option>), + MpscChannel(Channel, Option>), +} + +impl std::fmt::Debug for TokenObjects { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TokenObjects::TcpListener(l) => write!(f, "{:?}", l), + TokenObjects::TcpChannel(c, _) => write!(f, "{:?}", c), + TokenObjects::UdpChannel(c, _) => write!(f, "{:?}", c), + TokenObjects::MpscChannel(c, _) => unimplemented!("MPSC"), + } + } } #[derive(Debug)] @@ -121,6 +135,3 @@ pub(crate) enum Frame { * against veloren Server! */ Raw(Vec), } - -pub(crate) type TcpFrame = Frame; -pub(crate) type UdpFrame = Frame; diff --git a/network/src/worker/udp.rs b/network/src/worker/udp.rs new file mode 100644 index 0000000000..bf58e2cdd5 --- /dev/null +++ b/network/src/worker/udp.rs @@ -0,0 +1,84 @@ +use crate::worker::{channel::ChannelProtocol, types::Frame}; +use bincode; +use mio::net::UdpSocket; +use tracing::*; + +#[derive(Debug)] +pub(crate) struct UdpChannel { + endpoint: UdpSocket, + read_buffer: Vec, + write_buffer: Vec, +} + +impl UdpChannel { + pub fn new(endpoint: UdpSocket) -> Self { + Self { + endpoint, + read_buffer: Vec::new(), + write_buffer: Vec::new(), + } + } +} + +impl ChannelProtocol for UdpChannel { + type Handle = UdpSocket; + + /// Execute when ready to read + fn read(&mut self) -> Vec { + let mut result = Vec::new(); + match self.endpoint.recv_from(self.read_buffer.as_mut_slice()) { + Ok((n, remote)) => { + trace!("incomming message with len: {}", n); + let mut cur = std::io::Cursor::new(&self.read_buffer[..n]); + while cur.position() < n as u64 { + let r: Result = bincode::deserialize_from(&mut cur); + match r { + Ok(frame) => result.push(frame), + Err(e) => { + error!( + ?self, + ?e, + "failure parsing a message with len: {}, starting with: {:?}", + n, + &self.read_buffer[0..std::cmp::min(n, 10)] + ); + break; + }, + } + } + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + debug!("would block"); + }, + Err(e) => { + panic!("{}", e); + }, + }; + result + } + + /// Execute when ready to write + fn write(&mut self, frame: Frame) { + if let Ok(mut data) = bincode::serialize(&frame) { + let total = data.len(); + match self.endpoint.send(&data) { + Ok(n) if n == total => {}, + Ok(n) => { + error!("could only send part"); + //let data = data.drain(n..).collect(); //TODO: + // validate n.. is correct + // to_send.push_front(data); + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + debug!("would block"); + return; + }, + Err(e) => { + panic!("{}", e); + }, + }; + }; + } + + fn get_handle(&self) -> &Self::Handle { &self.endpoint } +} diff --git a/network/src/worker/worker.rs b/network/src/worker/worker.rs index cf14fa9242..e113f19a83 100644 --- a/network/src/worker/worker.rs +++ b/network/src/worker/worker.rs @@ -1,10 +1,8 @@ use crate::{ internal::RemoteParticipant, worker::{ - channel::Channel, - tcp::TcpChannel, types::{CtrlMsg, Pid, RtrnMsg, Statistics, TokenObjects}, - Controller, + Channel, Controller, TcpChannel, }, }; use mio::{self, Poll, PollOpt, Ready, Token}; @@ -49,7 +47,6 @@ pub(crate) struct Worker { ctrl_rx: Receiver, rtrn_tx: Sender, mio_tokens: MioTokens, - buf: [u8; 65000], time_before_poll: Instant, time_after_poll: Instant, } @@ -73,7 +70,6 @@ impl Worker { ctrl_rx, rtrn_tx, mio_tokens, - buf: [0; 65000], time_before_poll: Instant::now(), time_after_poll: Instant::now(), } @@ -118,9 +114,9 @@ impl Worker { CtrlMsg::Shutdown => { debug!("Shutting Down"); for (tok, obj) in self.mio_tokens.tokens.iter_mut() { - if let TokenObjects::TcpChannel(channel) = obj { + if let TokenObjects::TcpChannel(channel, _) = obj { channel.shutdown(); - channel.write(&mut self.buf, self.time_after_poll); + channel.tick_send(); } } return true; @@ -131,9 +127,17 @@ impl Worker { TokenObjects::TcpListener(h) => { self.poll.register(h, tok, interest, opts).unwrap() }, - TokenObjects::TcpChannel(channel) => self + TokenObjects::TcpChannel(channel, _) => self .poll - .register(&channel.tcpstream, tok, interest, opts) + .register(channel.get_handle(), tok, interest, opts) + .unwrap(), + TokenObjects::UdpChannel(channel, _) => self + .poll + .register(channel.get_handle(), tok, interest, opts) + .unwrap(), + TokenObjects::MpscChannel(channel, _) => self + .poll + .register(channel.get_handle(), tok, interest, opts) .unwrap(), } debug!(?handle, ?tok, "Registered new handle"); @@ -145,9 +149,9 @@ impl Worker { promises, } => { for (tok, obj) in self.mio_tokens.tokens.iter_mut() { - if let TokenObjects::TcpChannel(channel) = obj { + if let TokenObjects::TcpChannel(channel, _) = obj { channel.open_stream(prio, promises); //TODO: check participant - channel.write(&mut self.buf, self.time_after_poll); + channel.tick_send(); } } //TODO: @@ -155,18 +159,18 @@ impl Worker { CtrlMsg::CloseStream { pid, sid } => { //TODO: for to in self.mio_tokens.tokens.values_mut() { - if let TokenObjects::TcpChannel(channel) = to { + if let TokenObjects::TcpChannel(channel, _) = to { channel.close_stream(sid); //TODO: check participant - channel.write(&mut self.buf, self.time_after_poll); + channel.tick_send(); } } }, CtrlMsg::Send(outgoing) => { //TODO: for to in self.mio_tokens.tokens.values_mut() { - if let TokenObjects::TcpChannel(channel) = to { + if let TokenObjects::TcpChannel(channel, _) = to { channel.send(outgoing); //TODO: check participant - channel.write(&mut self.buf, self.time_after_poll); + channel.tick_send(); break; } } @@ -196,26 +200,51 @@ impl Worker { ) .unwrap(); trace!(?remote_stream, ?tok, "registered"); - let mut channel = - TcpChannel::new(remote_stream, self.pid, self.remotes.clone()); + let tcp_channel = TcpChannel::new(remote_stream); + let mut channel = Channel::new(self.pid, tcp_channel, self.remotes.clone()); channel.handshake(); + channel.tick_send(); self.mio_tokens .tokens - .insert(tok, TokenObjects::TcpChannel(channel)); + .insert(tok, TokenObjects::TcpChannel(channel, None)); }, Err(err) => { error!(?err, "error during remote connected"); }, }, - TokenObjects::TcpChannel(channel) => { + TokenObjects::TcpChannel(channel, _) => { if event.readiness().is_readable() { - trace!(?channel.tcpstream, "stream readable"); - channel.read(&mut self.buf, self.time_after_poll, &self.rtrn_tx); + let handle = channel.get_handle(); + trace!(?handle, "stream readable"); + channel.tick_recv(&self.rtrn_tx); } if event.readiness().is_writable() { - trace!(?channel.tcpstream, "stream writeable"); - channel.write(&mut self.buf, self.time_after_poll); + let handle = channel.get_handle(); + trace!(?handle, "stream writeable"); + channel.tick_send(); + } + }, + TokenObjects::UdpChannel(channel, _) => { + if event.readiness().is_readable() { + let handle = channel.get_handle(); + trace!(?handle, "stream readable"); + channel.tick_recv(&self.rtrn_tx); + } + if event.readiness().is_writable() { + let handle = channel.get_handle(); + trace!(?handle, "stream writeable"); + channel.tick_send(); + } + }, + TokenObjects::MpscChannel(channel, _) => { + if event.readiness().is_readable() { + let handle = channel.get_handle(); + channel.tick_recv(&self.rtrn_tx); + } + if event.readiness().is_writable() { + let handle = channel.get_handle(); + channel.tick_send(); } }, };