diff --git a/Cargo.lock b/Cargo.lock index d6d4264afa..1dbc92c4fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -74,9 +74,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "013a6e0a2cbe3d20f9c60b65458f7a7f7a5e636c5d0f45a5a6aee5d4b1f01785" +checksum = "d9a60d744a80c30fcb657dfe2c1b22bcb3e814c1a1e3674f32bf5820b570fbff" [[package]] name = "anymap" @@ -375,7 +375,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "parking_lot 0.10.0", + "parking_lot 0.10.1", "slab", ] @@ -423,9 +423,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.2.1" +version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12ae9db68ad7fac5fe51304d20f016c911539251075a214f8e663babefa35187" +checksum = "1f359dc14ff8911330a51ef78022d376f25ed00248912803b58f00cb1c27f742" [[package]] name = "byteorder" @@ -695,9 +695,9 @@ dependencies = [ [[package]] name = "const-tweaker" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a7081900ff8f4b89046f8898eb8af6ed26be5a47299c56147d5a7dac74298b0" +checksum = "7fbe3e1d2fccd896d451adb486910a0bfc233fd6dcafdb4e13bac7de72f8f250" dependencies = [ "anyhow", "async-std", @@ -1066,9 +1066,9 @@ dependencies = [ [[package]] name = "dashmap" -version = "3.7.0" +version = "3.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "010ef3f25ed5bb93505a3238d19957622190268640526aab07174c66ccf5d611" +checksum = "0f87a04c37da1d3d27db1fb7f372802b72fb8c3ff3e9c0914530995127f4a6a1" dependencies = [ "ahash 0.3.2", "cfg-if", @@ -3178,12 +3178,12 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92e98c49ab0b7ce5b222f2cc9193fc4efe11c6d0bd4f648e374684a6857b1cfc" +checksum = "6fdfcb5f20930a79e326f7ec992a9fdb5b7bd809254b1e735bdd5a99f78bee0d" dependencies = [ "lock_api", - "parking_lot_core 0.7.0", + "parking_lot_core 0.7.2", ] [[package]] @@ -3215,9 +3215,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.7.0" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7582838484df45743c8434fbff785e8edf260c28748353d44bc0da32e0ceabf1" +checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3" dependencies = [ "cfg-if", "cloudabi", @@ -3420,9 +3420,9 @@ dependencies = [ [[package]] name = "proc-macro-nested" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "369a6ed065f249a159e06c45752c780bda2fb53c995718f9e484d08daa9eb42e" +checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" [[package]] name = "proc-macro2" @@ -4144,18 +4144,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.105" +version = "1.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e707fbbf255b8fc8c3b99abb91e7257a622caeb20a9818cbadbeeede4e0932ff" +checksum = "36df6ac6412072f67cf767ebbde4133a5b2e88e76dc6187fa7104cd16f783399" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.105" +version = "1.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac5d00fc561ba2724df6758a17de23df5914f20e41cb00f94d5b7ae42fffaff8" +checksum = "9e549e3abf4fb8621bd1609f11dfc9f5e50320802273b12f3811a67e6716ea6c" dependencies = [ "proc-macro2 1.0.9", "quote 1.0.3", @@ -5086,9 +5086,9 @@ dependencies = [ [[package]] name = "vek" -version = "0.10.0" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c98f7e1c1400d5b1704baee82cbc56a3fde406769555ead0f2306e43ebab967" +checksum = "761f71ebd4296be71d1c584aa41a1ab8f3e5e646357fefce387b54381c151926" dependencies = [ "approx 0.3.2", "num-integer", @@ -5120,7 +5120,7 @@ dependencies = [ "num_cpus", "specs", "uvth", - "vek 0.10.0", + "vek 0.10.2", "veloren-common", ] @@ -5154,7 +5154,7 @@ dependencies = [ "specs", "specs-idvs", "sum_type", - "vek 0.10.0", + "vek 0.10.2", ] [[package]] @@ -5164,6 +5164,7 @@ dependencies = [ "bincode", "byteorder 1.3.4", "enumset", + "futures 0.3.4", "mio", "mio-extras", "serde", @@ -5202,7 +5203,7 @@ dependencies = [ "specs", "specs-idvs", "uvth", - "vek 0.10.0", + "vek 0.10.2", "veloren-common", "veloren-world", ] @@ -5260,7 +5261,7 @@ dependencies = [ "specs-idvs", "treeculler", "uvth", - "vek 0.10.0", + "vek 0.10.2", "veloren-client", "veloren-common", "veloren-server", @@ -5295,7 +5296,7 @@ dependencies = [ "roots", "serde", "serde_derive", - "vek 0.10.0", + "vek 0.10.2", "veloren-common", ] diff --git a/network/Cargo.toml b/network/Cargo.toml index b72633bc6b..c08e1032cf 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -18,5 +18,6 @@ tracing = "0.1" tracing-subscriber = "0.2.0-alpha.4" byteorder = "1.3" mio-extras = "2.0" +futures = "0.3" uuid = { version = "0.8", features = ["serde", "v4"] } tlid = { path = "../../tlid", features = ["serde"]} \ No newline at end of file diff --git a/network/src/api.rs b/network/src/api.rs index 00fed03bfd..a3e534a57f 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -2,10 +2,8 @@ use crate::{ internal::RemoteParticipant, message::{self, OutGoingMessage}, worker::{ - channel::Channel, - tcp::TcpChannel, types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects}, - Controller, + Channel, Controller, TcpChannel, }, }; use enumset::*; @@ -42,6 +40,7 @@ pub enum Promise { pub struct Participant { addr: Address, + remote_pid: Pid, } pub struct Connection {} @@ -114,12 +113,15 @@ impl Network { // should almost ever be empty except for new channel creations and stream // creations! for worker in self.controller.iter() { - worker.get_tx().send(CtrlMsg::Send(OutGoingMessage { - buffer: messagebuffer.clone(), - cursor: 0, - mid: None, - sid: stream.sid, - })); + worker + .get_tx() + .send(CtrlMsg::Send(OutGoingMessage { + buffer: messagebuffer.clone(), + cursor: 0, + mid: None, + sid: stream.sid, + })) + .unwrap(); } } @@ -146,72 +148,154 @@ impl Network { None } - pub fn listen(&self, addr: &Address) { - let worker = Self::get_lowest_worker(&self.controller); - let pipe = worker.get_tx(); - let address = addr.clone(); - self.thread_pool.execute(move || { - let span = span!(Level::INFO, "listen", ?address); - let _enter = span.enter(); - match address { - Address::Tcp(a) => { - info!("listening"); - let tcp_listener = TcpListener::bind(&a).unwrap(); - pipe.send(CtrlMsg::Register( - TokenObjects::TcpListener(tcp_listener), - Ready::readable(), - PollOpt::edge(), - )) - .unwrap(); - }, - Address::Udp(_) => unimplemented!("lazy me"), - } - }); - } - - pub fn connect(&self, addr: &Address) -> Participant { - let worker = Self::get_lowest_worker(&self.controller); - let pipe = worker.get_tx(); - let address = addr.clone(); - let pid = self.participant_id; - let remotes = self.remotes.clone(); - self.thread_pool.execute(move || { - let mut span = span!(Level::INFO, "connect", ?address); - let _enter = span.enter(); - match address { - Address::Tcp(a) => { - info!("connecting"); - let tcp_stream = match TcpStream::connect(&a) { - Err(err) => { - error!("could not open connection: {}", err); - return; - }, - Ok(s) => s, - }; - let mut channel = TcpChannel::new(tcp_stream, pid, remotes); - pipe.send(CtrlMsg::Register( - TokenObjects::TcpChannel(channel), - Ready::readable() | Ready::writable(), - PollOpt::edge(), - )) - .unwrap(); - }, - Address::Udp(_) => unimplemented!("lazy me"), - } - }); - Participant { addr: addr.clone() } - } - - pub fn open(&self, part: Participant, prio: u8, promises: EnumSet) -> Stream { + pub fn open(&self, part: &Participant, prio: u8, promises: EnumSet) -> Stream { for worker in self.controller.iter() { - worker.get_tx().send(CtrlMsg::OpenStream { - pid: uuid::Uuid::new_v4(), - prio, - promises, - }); + worker + .get_tx() + .send(CtrlMsg::OpenStream { + pid: uuid::Uuid::new_v4(), + prio, + promises, + }) + .unwrap(); } Stream { sid: 0 } } pub fn close(&self, stream: Stream) {} + + pub async fn listen(&self, address: &Address) -> Result<(), NetworkError> { + let span = span!(Level::TRACE, "listen", ?address); + let worker = Self::get_lowest_worker(&self.controller); + let _enter = span.enter(); + match address { + Address::Tcp(a) => { + let tcp_listener = TcpListener::bind(&a)?; + info!("listening"); + worker.get_tx().send(CtrlMsg::Register( + TokenObjects::TcpListener(tcp_listener), + Ready::readable(), + PollOpt::edge(), + ))?; + }, + Address::Udp(_) => unimplemented!("lazy me"), + }; + Ok(()) + } + + pub async fn connect(&self, address: &Address) -> Result { + let worker = Self::get_lowest_worker(&self.controller); + let pid = self.participant_id; + let remotes = self.remotes.clone(); + let mut span = span!(Level::INFO, "connect", ?address); + let _enter = span.enter(); + match address { + Address::Tcp(a) => { + info!("connecting"); + let tcp_stream = TcpStream::connect(&a)?; + let tcp_channel = TcpChannel::new(tcp_stream); + let mut channel = Channel::new(pid, tcp_channel, remotes); + let (ctrl_tx, ctrl_rx) = mio_extras::channel::channel::(); + worker.get_tx().send(CtrlMsg::Register( + TokenObjects::TcpChannel(channel, Some(ctrl_tx)), + Ready::readable() | Ready::writable(), + PollOpt::edge(), + ))?; + // wait for a return + }, + Address::Udp(_) => unimplemented!("lazy me"), + } + + Ok(Participant { + addr: address.clone(), + remote_pid: uuid::Uuid::new_v4(), + }) + } + + //TODO: evaluate if move to Participant + pub async fn _disconnect(&self, participant: Participant) -> Result<(), NetworkError> { + panic!("sda"); + } + + pub fn participants(&self) -> Vec { + panic!("sda"); + } + + pub async fn _connected(&self) -> Result { + // returns if a Participant connected and is ready + panic!("sda"); + } + + pub async fn _disconnected(&self) -> Result { + // returns if a Participant connected and is ready + panic!("sda"); + } + + pub async fn multisend( + &self, + streams: Vec, + msg: M, + ) -> Result<(), NetworkError> { + panic!("sda"); + } +} + +impl Participant { + pub async fn _open( + &self, + prio: u8, + promises: EnumSet, + ) -> Result { + panic!("sda"); + } + + pub async fn _close(&self, stream: Stream) -> Result<(), ParticipantError> { + panic!("sda"); + } + + pub async fn _opened(&self) -> Result { + panic!("sda"); + } + + pub async fn _closed(&self) -> Result { + panic!("sda"); + } +} + +impl Stream { + //TODO: What about SEND instead of Serializeable if it goes via PIPE ? + //TODO: timeout per message or per stream ? stream or ? + + pub async fn _send(&self, msg: M) -> Result<(), StreamError> { + panic!("sda"); + } + + pub async fn _recv(&self) -> Result { + panic!("sda"); + } +} + +#[derive(Debug)] +pub enum NetworkError { + NetworkDestroyed, + WorkerDestroyed, + IoError(std::io::Error), +} + +#[derive(Debug)] +pub enum ParticipantError { + ParticipantDisconected, +} + +#[derive(Debug)] +pub enum StreamError { + StreamClosed, +} + +impl From for NetworkError { + fn from(err: std::io::Error) -> Self { NetworkError::IoError(err) } +} + +impl From> for NetworkError { + fn from(err: mio_extras::channel::SendError) -> Self { NetworkError::WorkerDestroyed } } diff --git a/network/src/lib.rs b/network/src/lib.rs index 80dda8e210..27aff0dea2 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -7,6 +7,7 @@ mod worker; #[cfg(test)] pub mod tests { use crate::api::*; + use futures::executor::block_on; use std::{net::SocketAddr, sync::Arc}; use tracing::*; use uuid::Uuid; @@ -27,8 +28,6 @@ pub mod tests { } pub fn test_tracing() { - use tracing::Level; - tracing_subscriber::FmtSubscriber::builder() // all spans/events with a level higher than TRACE (e.g, info, warn, etc.) // will be written to stdout. @@ -43,8 +42,51 @@ pub mod tests { assert_eq!(2 + 2, 4); } + /* + #[test] + #[ignore] + fn client_server() { + let thread_pool = Arc::new( + ThreadPoolBuilder::new() + .name("veloren-network-test".into()) + .build(), + ); + test_tracing(); + let n1 = Network::::new(Uuid::new_v4(), thread_pool.clone()); + let n2 = Network::::new(Uuid::new_v4(), thread_pool.clone()); + let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000))); + let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52001))); + n1.listen(&a1); //await + n2.listen(&a2); // only requiered here, but doesnt hurt on n1 + std::thread::sleep(std::time::Duration::from_millis(20)); + + let p1 = n1.connect(&a2); //await + //n2.OnRemoteConnectionOpen triggered + std::thread::sleep(std::time::Duration::from_millis(20)); + + let s1 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt); + std::thread::sleep(std::time::Duration::from_millis(20)); + //n2.OnRemoteStreamOpen triggered + + n1.send("Hello World", &s1); + std::thread::sleep(std::time::Duration::from_millis(20)); + // receive on n2 now + + let s: Option = n2.recv(&s1); + for _ in 1..4 { + error!("{:?}", s); + } + assert_eq!(s, Some("Hello World".to_string())); + + n1.close(s1); + //n2.OnRemoteStreamClose triggered + + std::thread::sleep(std::time::Duration::from_millis(20000)); + } + */ + #[test] - fn client_server() { + fn client_server_stream() { let thread_pool = Arc::new( ThreadPoolBuilder::new() .name("veloren-network-test".into()) @@ -53,33 +95,28 @@ pub mod tests { test_tracing(); let n1 = Network::::new(Uuid::new_v4(), thread_pool.clone()); let n2 = Network::::new(Uuid::new_v4(), thread_pool.clone()); - let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000))); - let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52001))); - n1.listen(&a1); //await - n2.listen(&a2); // only requiered here, but doesnt hurt on n1 + let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52010))); + let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52011))); + block_on(n1.listen(&a1)).unwrap(); //await + block_on(n2.listen(&a2)).unwrap(); // only requiered here, but doesnt hurt on n1 std::thread::sleep(std::time::Duration::from_millis(20)); - let p1 = n1.connect(&a2); //await - //n2.OnRemoteConnectionOpen triggered + let p1 = block_on(n1.connect(&a2)); //await + let p1 = p1.unwrap(); std::thread::sleep(std::time::Duration::from_millis(20)); - let s1 = n1.open(p1, 16, Promise::InOrder | Promise::NoCorrupt); + let s1 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt); + //let s2 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt); std::thread::sleep(std::time::Duration::from_millis(20)); - //n2.OnRemoteStreamOpen triggered n1.send("Hello World", &s1); std::thread::sleep(std::time::Duration::from_millis(20)); - // receive on n2 now + + std::thread::sleep(std::time::Duration::from_millis(1000)); let s: Option = n2.recv(&s1); - for _ in 1..4 { - error!("{:?}", s); - } assert_eq!(s, Some("Hello World".to_string())); n1.close(s1); - //n2.OnRemoteStreamClose triggered - - std::thread::sleep(std::time::Duration::from_millis(20000)); } } diff --git a/network/src/worker/channel.rs b/network/src/worker/channel.rs index 5c687acda5..b943c5cb9f 100644 --- a/network/src/worker/channel.rs +++ b/network/src/worker/channel.rs @@ -9,33 +9,21 @@ use mio_extras::channel::Sender; use std::{ collections::{HashMap, VecDeque}, sync::{Arc, RwLock}, - time::Instant, }; use tracing::*; -pub(crate) trait Channel { - /* - uninitialized_dirty_speed_buffer: is just a already allocated buffer, that probably is already dirty because it's getting reused to save allocations, feel free to use it, but expect nothing - aprox_time is the time taken when the events come in, you can reuse it for message timeouts, to not make any more syscalls - */ +pub(crate) trait ChannelProtocol { + type Handle: ?Sized + mio::Evented; /// Execute when ready to read - fn read( - &mut self, - uninitialized_dirty_speed_buffer: &mut [u8; 65000], - aprox_time: Instant, - rtrn_tx: &Sender, - ); + fn read(&mut self) -> Vec; /// Execute when ready to write - fn write(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant); - fn open_stream(&mut self, prio: u8, promises: EnumSet) -> u32; - fn close_stream(&mut self, sid: u32); - fn handshake(&mut self); - fn shutdown(&mut self); - fn send(&mut self, outgoing: OutGoingMessage); + fn write(&mut self, frame: Frame); + /// used for mio + fn get_handle(&self) -> &Self::Handle; } #[derive(Debug)] -pub(crate) struct ChannelState { +pub(crate) struct Channel { pub stream_id_pool: Option>>, /* TODO: stream_id unique per * participant */ pub msg_id_pool: Option>>, //TODO: msg_id unique per @@ -46,6 +34,7 @@ pub(crate) struct ChannelState { pub streams: Vec, pub send_queue: VecDeque, pub recv_queue: VecDeque, + pub protocol: P, pub send_handshake: bool, pub send_pid: bool, pub send_config: bool, @@ -70,7 +59,7 @@ pub(crate) struct ChannelState { Shutdown phase */ -impl ChannelState { +impl Channel

{ const WRONG_NUMBER: &'static [u8] = "Handshake does not contain the magic number requiered by \ veloren server.\nWe are not sure if you are a valid \ veloren client.\nClosing the connection" @@ -79,8 +68,12 @@ impl ChannelState { invalid version.\nWe don't know how to communicate with \ you.\n"; - pub fn new(local_pid: Pid, remotes: Arc>>) -> Self { - ChannelState { + pub fn new( + local_pid: Pid, + protocol: P, + remotes: Arc>>, + ) -> Self { + Self { stream_id_pool: None, msg_id_pool: None, local_pid, @@ -89,6 +82,7 @@ impl ChannelState { streams: Vec::new(), send_queue: VecDeque::new(), recv_queue: VecDeque::new(), + protocol, send_handshake: false, send_pid: false, send_config: false, @@ -110,7 +104,20 @@ impl ChannelState { && !self.recv_shutdown } - pub fn handle(&mut self, frame: Frame, rtrn_tx: &Sender) { + pub fn tick_recv(&mut self, rtrn_tx: &Sender) { + for frame in self.protocol.read() { + self.handle(frame, rtrn_tx); + } + } + + pub fn tick_send(&mut self) { + self.tick_streams(); + while let Some(frame) = self.send_queue.pop_front() { + self.protocol.write(frame) + } + } + + fn handle(&mut self, frame: Frame, rtrn_tx: &Sender) { match frame { Frame::Handshake { magic_number, @@ -261,9 +268,9 @@ impl ChannelState { } if let Some(pos) = pos { for m in s.to_receive.drain(pos..pos + 1) { - info!("receied message: {}", m.mid); + info!("received message: {}", m.mid); //self.recv_queue.push_back(m); - rtrn_tx.send(RtrnMsg::Receive(m)); + rtrn_tx.send(RtrnMsg::Receive(m)).unwrap(); } } } @@ -279,7 +286,7 @@ impl ChannelState { // This function will tick all streams according to priority and add them to the // send queue - pub(crate) fn tick_streams(&mut self) { + fn tick_streams(&mut self) { //ignoring prio for now //TODO: fix prio if let Some(msg_id_pool) = &mut self.msg_id_pool { @@ -327,4 +334,50 @@ impl ChannelState { self.send_shutdown = true; } } + + pub(crate) fn open_stream(&mut self, prio: u8, promises: EnumSet) -> u32 { + // validate promises + if let Some(stream_id_pool) = &mut self.stream_id_pool { + let sid = stream_id_pool.next(); + let stream = Stream::new(sid, prio, promises.clone()); + self.streams.push(stream); + self.send_queue.push_back(Frame::OpenStream { + sid, + prio, + promises, + }); + return sid; + } + error!("fix me"); + return 0; + //TODO: fix me + } + + pub(crate) fn close_stream(&mut self, sid: u32) { + self.streams.retain(|stream| stream.sid() != sid); + self.send_queue.push_back(Frame::CloseStream { sid }); + } + + pub(crate) fn handshake(&mut self) { + self.send_queue.push_back(Frame::Handshake { + magic_number: VELOREN_MAGIC_NUMBER.to_string(), + version: VELOREN_NETWORK_VERSION, + }); + self.send_handshake = true; + } + + pub(crate) fn shutdown(&mut self) { + self.send_queue.push_back(Frame::Shutdown {}); + self.send_shutdown = true; + } + + pub(crate) fn send(&mut self, outgoing: OutGoingMessage) { + //TODO: fix me + for s in self.streams.iter_mut() { + s.to_send.push_back(outgoing); + break; + } + } + + pub(crate) fn get_handle(&self) -> &P::Handle { self.protocol.get_handle() } } diff --git a/network/src/worker/mod.rs b/network/src/worker/mod.rs index 05835c82cc..6d9b158cc2 100644 --- a/network/src/worker/mod.rs +++ b/network/src/worker/mod.rs @@ -5,10 +5,17 @@ communication is done via channels. */ pub mod channel; +pub mod mpsc; pub mod tcp; pub mod types; +pub mod udp; pub mod worker; +pub(crate) use channel::Channel; +pub(crate) use mpsc::MpscChannel; +pub(crate) use tcp::TcpChannel; +pub(crate) use udp::UdpChannel; + use crate::{ internal::RemoteParticipant, worker::{ diff --git a/network/src/worker/mpsc.rs b/network/src/worker/mpsc.rs new file mode 100644 index 0000000000..76899f7370 --- /dev/null +++ b/network/src/worker/mpsc.rs @@ -0,0 +1,55 @@ +use crate::worker::{channel::ChannelProtocol, types::Frame}; +use mio_extras::channel::{Receiver, Sender}; +use tracing::*; + +pub(crate) struct MpscChannel { + endpoint_sender: Sender, + endpoint_receiver: Receiver, +} + +impl MpscChannel {} + +impl ChannelProtocol for MpscChannel { + type Handle = Receiver; + + /// Execute when ready to read + fn read(&mut self) -> Vec { + let mut result = Vec::new(); + loop { + match self.endpoint_receiver.try_recv() { + Ok(frame) => { + trace!("incomming message"); + result.push(frame); + }, + Err(std::sync::mpsc::TryRecvError::Empty) => { + debug!("would block"); + break; + }, + Err(std::sync::mpsc::TryRecvError::Disconnected) => { + panic!("disconnected"); + }, + }; + } + result + } + + /// Execute when ready to write + fn write(&mut self, frame: Frame) { + match self.endpoint_sender.send(frame) { + Ok(n) => { + trace!("semded"); + }, + Err(mio_extras::channel::SendError::Io(e)) + if e.kind() == std::io::ErrorKind::WouldBlock => + { + debug!("would block"); + return; + } + Err(e) => { + panic!("{}", e); + }, + }; + } + + fn get_handle(&self) -> &Self::Handle { &self.endpoint_receiver } +} diff --git a/network/src/worker/tcp.rs b/network/src/worker/tcp.rs index d0394b1054..0c25739d6c 100644 --- a/network/src/worker/tcp.rs +++ b/network/src/worker/tcp.rs @@ -1,69 +1,51 @@ -use crate::{ - api::Promise, - internal::{RemoteParticipant, VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION}, - message::OutGoingMessage, - worker::{ - channel::{Channel, ChannelState}, - types::{Pid, RtrnMsg, Stream, TcpFrame}, - }, -}; +use crate::worker::{channel::ChannelProtocol, types::Frame}; use bincode; -use enumset::EnumSet; -use mio::{self, net::TcpStream}; -use mio_extras::channel::Sender; -use std::{ - collections::HashMap, - io::{Read, Write}, - sync::{Arc, RwLock}, - time::Instant, -}; +use mio::net::TcpStream; +use std::io::{Read, Write}; use tracing::*; #[derive(Debug)] pub(crate) struct TcpChannel { - state: ChannelState, - pub tcpstream: TcpStream, + endpoint: TcpStream, + //these buffers only ever contain 1 FRAME ! + read_buffer: Vec, + write_buffer: Vec, } impl TcpChannel { - pub fn new( - tcpstream: TcpStream, - local_pid: Pid, - remotes: Arc>>, - ) -> Self { - TcpChannel { - state: ChannelState::new(local_pid, remotes), - tcpstream, + pub fn new(endpoint: TcpStream) -> Self { + let mut b = vec![0; 200]; + Self { + endpoint, + read_buffer: b.clone(), + write_buffer: b, } } } -impl Channel for TcpChannel { - fn read( - &mut self, - uninitialized_dirty_speed_buffer: &mut [u8; 65000], - aprox_time: Instant, - rtrn_tx: &Sender, - ) { - let pid = self.state.remote_pid; - let span = span!(Level::INFO, "channel", ?pid); - let _enter = span.enter(); - match self.tcpstream.read(uninitialized_dirty_speed_buffer) { +impl ChannelProtocol for TcpChannel { + type Handle = TcpStream; + + /// Execute when ready to read + fn read(&mut self) -> Vec { + let mut result = Vec::new(); + match self.endpoint.read(self.read_buffer.as_mut_slice()) { Ok(n) => { trace!("incomming message with len: {}", n); - let mut cur = std::io::Cursor::new(&uninitialized_dirty_speed_buffer[..n]); + let mut cur = std::io::Cursor::new(&self.read_buffer[..n]); while cur.position() < n as u64 { - let r: Result = bincode::deserialize_from(&mut cur); + let r: Result = bincode::deserialize_from(&mut cur); match r { - Ok(frame) => self.state.handle(frame, rtrn_tx), + Ok(frame) => result.push(frame), Err(e) => { error!( ?self, ?e, "failure parsing a message with len: {}, starting with: {:?}", n, - &uninitialized_dirty_speed_buffer[0..std::cmp::min(n, 10)] + &self.read_buffer[0..std::cmp::min(n, 10)] ); + break; }, } } @@ -75,85 +57,33 @@ impl Channel for TcpChannel { panic!("{}", e); }, }; + result } - fn write(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant) { - let pid = self.state.remote_pid; - let span = span!(Level::INFO, "channel", ?pid); - let _enter = span.enter(); - loop { - while let Some(elem) = self.state.send_queue.pop_front() { - if let Ok(mut data) = bincode::serialize(&elem) { - let total = data.len(); - match self.tcpstream.write(&data) { - Ok(n) if n == total => {}, - Ok(n) => { - error!("could only send part"); - //let data = data.drain(n..).collect(); //TODO: - // validate n.. is correct - // to_send.push_front(data); - }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - debug!("would block"); - return; - }, - Err(e) => { - panic!("{}", e); - }, - }; - }; - } - // run streams - self.state.tick_streams(); - if self.state.send_queue.is_empty() { - break; - } - } + /// Execute when ready to write + fn write(&mut self, frame: Frame) { + if let Ok(mut data) = bincode::serialize(&frame) { + let total = data.len(); + match self.endpoint.write(&data) { + Ok(n) if n == total => { + trace!("send!"); + }, + Ok(n) => { + error!("could only send part"); + //let data = data.drain(n..).collect(); //TODO: + // validate n.. is correct + // to_send.push_front(data); + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + debug!("would block"); + return; + }, + Err(e) => { + panic!("{}", e); + }, + }; + }; } - fn open_stream(&mut self, prio: u8, promises: EnumSet) -> u32 { - // validate promises - if let Some(stream_id_pool) = &mut self.state.stream_id_pool { - let sid = stream_id_pool.next(); - let stream = Stream::new(sid, prio, promises.clone()); - self.state.streams.push(stream); - self.state.send_queue.push_back(TcpFrame::OpenStream { - sid, - prio, - promises, - }); - return sid; - } - error!("fix me"); - return 0; - //TODO: fix me - } - - fn close_stream(&mut self, sid: u32) { - self.state.streams.retain(|stream| stream.sid() != sid); - self.state - .send_queue - .push_back(TcpFrame::CloseStream { sid }); - } - - fn handshake(&mut self) { - self.state.send_queue.push_back(TcpFrame::Handshake { - magic_number: VELOREN_MAGIC_NUMBER.to_string(), - version: VELOREN_NETWORK_VERSION, - }); - self.state.send_handshake = true; - } - - fn shutdown(&mut self) { - self.state.send_queue.push_back(TcpFrame::Shutdown {}); - self.state.send_shutdown = true; - } - - fn send(&mut self, outgoing: OutGoingMessage) { - //TODO: fix me - for s in self.state.streams.iter_mut() { - s.to_send.push_back(outgoing); - break; - } - } + fn get_handle(&self) -> &Self::Handle { &self.endpoint } } diff --git a/network/src/worker/types.rs b/network/src/worker/types.rs index 590d4d473f..5bec57070c 100644 --- a/network/src/worker/types.rs +++ b/network/src/worker/types.rs @@ -1,10 +1,11 @@ use crate::{ api::Promise, message::{InCommingMessage, OutGoingMessage}, - worker::tcp::TcpChannel, + worker::{Channel, MpscChannel, TcpChannel, UdpChannel}, }; use enumset::EnumSet; use mio::{self, net::TcpListener, PollOpt, Ready}; +use mio_extras::channel::Sender; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use uuid::Uuid; @@ -50,10 +51,23 @@ pub struct Statistics { pub nano_busy: u128, } -#[derive(Debug)] pub(crate) enum TokenObjects { TcpListener(TcpListener), - TcpChannel(TcpChannel), + TcpChannel(Channel, Option>), + UdpChannel(Channel, Option>), + MpscChannel(Channel, Option>), +} + +impl std::fmt::Debug for TokenObjects { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TokenObjects::TcpListener(l) => write!(f, "{:?}", l), + TokenObjects::TcpChannel(c, _) => write!(f, "{:?}", c), + TokenObjects::UdpChannel(c, _) => write!(f, "{:?}", c), + TokenObjects::MpscChannel(c, _) => unimplemented!("MPSC"), + } + } } #[derive(Debug)] @@ -121,6 +135,3 @@ pub(crate) enum Frame { * against veloren Server! */ Raw(Vec), } - -pub(crate) type TcpFrame = Frame; -pub(crate) type UdpFrame = Frame; diff --git a/network/src/worker/udp.rs b/network/src/worker/udp.rs new file mode 100644 index 0000000000..bf58e2cdd5 --- /dev/null +++ b/network/src/worker/udp.rs @@ -0,0 +1,84 @@ +use crate::worker::{channel::ChannelProtocol, types::Frame}; +use bincode; +use mio::net::UdpSocket; +use tracing::*; + +#[derive(Debug)] +pub(crate) struct UdpChannel { + endpoint: UdpSocket, + read_buffer: Vec, + write_buffer: Vec, +} + +impl UdpChannel { + pub fn new(endpoint: UdpSocket) -> Self { + Self { + endpoint, + read_buffer: Vec::new(), + write_buffer: Vec::new(), + } + } +} + +impl ChannelProtocol for UdpChannel { + type Handle = UdpSocket; + + /// Execute when ready to read + fn read(&mut self) -> Vec { + let mut result = Vec::new(); + match self.endpoint.recv_from(self.read_buffer.as_mut_slice()) { + Ok((n, remote)) => { + trace!("incomming message with len: {}", n); + let mut cur = std::io::Cursor::new(&self.read_buffer[..n]); + while cur.position() < n as u64 { + let r: Result = bincode::deserialize_from(&mut cur); + match r { + Ok(frame) => result.push(frame), + Err(e) => { + error!( + ?self, + ?e, + "failure parsing a message with len: {}, starting with: {:?}", + n, + &self.read_buffer[0..std::cmp::min(n, 10)] + ); + break; + }, + } + } + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + debug!("would block"); + }, + Err(e) => { + panic!("{}", e); + }, + }; + result + } + + /// Execute when ready to write + fn write(&mut self, frame: Frame) { + if let Ok(mut data) = bincode::serialize(&frame) { + let total = data.len(); + match self.endpoint.send(&data) { + Ok(n) if n == total => {}, + Ok(n) => { + error!("could only send part"); + //let data = data.drain(n..).collect(); //TODO: + // validate n.. is correct + // to_send.push_front(data); + }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + debug!("would block"); + return; + }, + Err(e) => { + panic!("{}", e); + }, + }; + }; + } + + fn get_handle(&self) -> &Self::Handle { &self.endpoint } +} diff --git a/network/src/worker/worker.rs b/network/src/worker/worker.rs index cf14fa9242..e113f19a83 100644 --- a/network/src/worker/worker.rs +++ b/network/src/worker/worker.rs @@ -1,10 +1,8 @@ use crate::{ internal::RemoteParticipant, worker::{ - channel::Channel, - tcp::TcpChannel, types::{CtrlMsg, Pid, RtrnMsg, Statistics, TokenObjects}, - Controller, + Channel, Controller, TcpChannel, }, }; use mio::{self, Poll, PollOpt, Ready, Token}; @@ -49,7 +47,6 @@ pub(crate) struct Worker { ctrl_rx: Receiver, rtrn_tx: Sender, mio_tokens: MioTokens, - buf: [u8; 65000], time_before_poll: Instant, time_after_poll: Instant, } @@ -73,7 +70,6 @@ impl Worker { ctrl_rx, rtrn_tx, mio_tokens, - buf: [0; 65000], time_before_poll: Instant::now(), time_after_poll: Instant::now(), } @@ -118,9 +114,9 @@ impl Worker { CtrlMsg::Shutdown => { debug!("Shutting Down"); for (tok, obj) in self.mio_tokens.tokens.iter_mut() { - if let TokenObjects::TcpChannel(channel) = obj { + if let TokenObjects::TcpChannel(channel, _) = obj { channel.shutdown(); - channel.write(&mut self.buf, self.time_after_poll); + channel.tick_send(); } } return true; @@ -131,9 +127,17 @@ impl Worker { TokenObjects::TcpListener(h) => { self.poll.register(h, tok, interest, opts).unwrap() }, - TokenObjects::TcpChannel(channel) => self + TokenObjects::TcpChannel(channel, _) => self .poll - .register(&channel.tcpstream, tok, interest, opts) + .register(channel.get_handle(), tok, interest, opts) + .unwrap(), + TokenObjects::UdpChannel(channel, _) => self + .poll + .register(channel.get_handle(), tok, interest, opts) + .unwrap(), + TokenObjects::MpscChannel(channel, _) => self + .poll + .register(channel.get_handle(), tok, interest, opts) .unwrap(), } debug!(?handle, ?tok, "Registered new handle"); @@ -145,9 +149,9 @@ impl Worker { promises, } => { for (tok, obj) in self.mio_tokens.tokens.iter_mut() { - if let TokenObjects::TcpChannel(channel) = obj { + if let TokenObjects::TcpChannel(channel, _) = obj { channel.open_stream(prio, promises); //TODO: check participant - channel.write(&mut self.buf, self.time_after_poll); + channel.tick_send(); } } //TODO: @@ -155,18 +159,18 @@ impl Worker { CtrlMsg::CloseStream { pid, sid } => { //TODO: for to in self.mio_tokens.tokens.values_mut() { - if let TokenObjects::TcpChannel(channel) = to { + if let TokenObjects::TcpChannel(channel, _) = to { channel.close_stream(sid); //TODO: check participant - channel.write(&mut self.buf, self.time_after_poll); + channel.tick_send(); } } }, CtrlMsg::Send(outgoing) => { //TODO: for to in self.mio_tokens.tokens.values_mut() { - if let TokenObjects::TcpChannel(channel) = to { + if let TokenObjects::TcpChannel(channel, _) = to { channel.send(outgoing); //TODO: check participant - channel.write(&mut self.buf, self.time_after_poll); + channel.tick_send(); break; } } @@ -196,26 +200,51 @@ impl Worker { ) .unwrap(); trace!(?remote_stream, ?tok, "registered"); - let mut channel = - TcpChannel::new(remote_stream, self.pid, self.remotes.clone()); + let tcp_channel = TcpChannel::new(remote_stream); + let mut channel = Channel::new(self.pid, tcp_channel, self.remotes.clone()); channel.handshake(); + channel.tick_send(); self.mio_tokens .tokens - .insert(tok, TokenObjects::TcpChannel(channel)); + .insert(tok, TokenObjects::TcpChannel(channel, None)); }, Err(err) => { error!(?err, "error during remote connected"); }, }, - TokenObjects::TcpChannel(channel) => { + TokenObjects::TcpChannel(channel, _) => { if event.readiness().is_readable() { - trace!(?channel.tcpstream, "stream readable"); - channel.read(&mut self.buf, self.time_after_poll, &self.rtrn_tx); + let handle = channel.get_handle(); + trace!(?handle, "stream readable"); + channel.tick_recv(&self.rtrn_tx); } if event.readiness().is_writable() { - trace!(?channel.tcpstream, "stream writeable"); - channel.write(&mut self.buf, self.time_after_poll); + let handle = channel.get_handle(); + trace!(?handle, "stream writeable"); + channel.tick_send(); + } + }, + TokenObjects::UdpChannel(channel, _) => { + if event.readiness().is_readable() { + let handle = channel.get_handle(); + trace!(?handle, "stream readable"); + channel.tick_recv(&self.rtrn_tx); + } + if event.readiness().is_writable() { + let handle = channel.get_handle(); + trace!(?handle, "stream writeable"); + channel.tick_send(); + } + }, + TokenObjects::MpscChannel(channel, _) => { + if event.readiness().is_readable() { + let handle = channel.get_handle(); + channel.tick_recv(&self.rtrn_tx); + } + if event.readiness().is_writable() { + let handle = channel.get_handle(); + channel.tick_send(); } }, };