diff --git a/network/src/api.rs b/network/src/api.rs index 0c363396cd..15ccdb2e87 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -2,6 +2,7 @@ use crate::{ internal::RemoteParticipant, message::{self, OutGoingMessage}, worker::{ + channel::ChannelProtocols, metrics::NetworkMetrics, types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects}, Channel, Controller, TcpChannel, @@ -154,17 +155,25 @@ impl Network { } pub fn open(&self, part: &Participant, prio: u8, promises: EnumSet) -> Stream { - for worker in self.controller.iter() { - worker + let (ctrl_tx, ctrl_rx) = std::sync::mpsc::channel::(); + for controller in self.controller.iter() { + controller .get_tx() .send(CtrlMsg::OpenStream { - pid: uuid::Uuid::new_v4(), + pid: part.remote_pid, prio, promises, + return_sid: ctrl_tx, }) .unwrap(); + break; } - Stream { sid: 0 } + // I dont like the fact that i need to wait on the worker thread for getting my + // sid back :/ we could avoid this by introducing a Thread Local Network + // which owns some sids we can take without waiting + let sid = ctrl_rx.recv().unwrap(); + info!(?sid, " sucessfully opened stream"); + Stream { sid } } pub fn close(&self, stream: Stream) {} @@ -199,22 +208,28 @@ impl Network { info!("connecting"); let tcp_stream = TcpStream::connect(&a)?; let tcp_channel = TcpChannel::new(tcp_stream); - let mut channel = Channel::new(pid, tcp_channel, remotes); - let (ctrl_tx, ctrl_rx) = mio_extras::channel::channel::(); + let (ctrl_tx, ctrl_rx) = std::sync::mpsc::channel::(); + let mut channel = Channel::new( + pid, + ChannelProtocols::Tcp(tcp_channel), + remotes, + Some(ctrl_tx), + ); worker.get_tx().send(CtrlMsg::Register( - TokenObjects::TcpChannel(channel, Some(ctrl_tx)), + TokenObjects::Channel(channel), Ready::readable() | Ready::writable(), PollOpt::edge(), ))?; - // wait for a return + let remote_pid = ctrl_rx.recv().unwrap(); + info!(?remote_pid, " sucessfully connected to"); + return Ok(Participant { + addr: address.clone(), + remote_pid, + }); }, Address::Udp(_) => unimplemented!("lazy me"), } - - Ok(Participant { - addr: address.clone(), - remote_pid: uuid::Uuid::new_v4(), - }) + Err(NetworkError::Todo_Error_For_Wrong_Connection) } //TODO: evaluate if move to Participant @@ -284,6 +299,7 @@ impl Stream { pub enum NetworkError { NetworkDestroyed, WorkerDestroyed, + Todo_Error_For_Wrong_Connection, IoError(std::io::Error), } diff --git a/network/src/worker/channel.rs b/network/src/worker/channel.rs index b943c5cb9f..2165115bab 100644 --- a/network/src/worker/channel.rs +++ b/network/src/worker/channel.rs @@ -2,7 +2,12 @@ use crate::{ api::Promise, internal::{RemoteParticipant, VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION}, message::{InCommingMessage, MessageBuffer, OutGoingMessage}, - worker::types::{Frame, Mid, Pid, RtrnMsg, Sid, Stream}, + worker::{ + mpsc::MpscChannel, + tcp::TcpChannel, + types::{Frame, Mid, Pid, RtrnMsg, Sid, Stream}, + udp::UdpChannel, + }, }; use enumset::EnumSet; use mio_extras::channel::Sender; @@ -23,7 +28,14 @@ pub(crate) trait ChannelProtocol { } #[derive(Debug)] -pub(crate) struct Channel { +pub(crate) enum ChannelProtocols { + Tcp(TcpChannel), + Udp(UdpChannel), + Mpsc(MpscChannel), +} + +#[derive(Debug)] +pub(crate) struct Channel { pub stream_id_pool: Option>>, /* TODO: stream_id unique per * participant */ pub msg_id_pool: Option>>, //TODO: msg_id unique per @@ -34,7 +46,8 @@ pub(crate) struct Channel { pub streams: Vec, pub send_queue: VecDeque, pub recv_queue: VecDeque, - pub protocol: P, + pub protocol: ChannelProtocols, + pub return_pid_to: Option>, pub send_handshake: bool, pub send_pid: bool, pub send_config: bool, @@ -59,7 +72,7 @@ pub(crate) struct Channel { Shutdown phase */ -impl Channel

{ +impl Channel { const WRONG_NUMBER: &'static [u8] = "Handshake does not contain the magic number requiered by \ veloren server.\nWe are not sure if you are a valid \ veloren client.\nClosing the connection" @@ -70,8 +83,9 @@ impl Channel

{ pub fn new( local_pid: Pid, - protocol: P, + protocol: ChannelProtocols, remotes: Arc>>, + return_pid_to: Option>, ) -> Self { Self { stream_id_pool: None, @@ -83,6 +97,7 @@ impl Channel

{ send_queue: VecDeque::new(), recv_queue: VecDeque::new(), protocol, + return_pid_to, send_handshake: false, send_pid: false, send_config: false, @@ -105,15 +120,43 @@ impl Channel

{ } pub fn tick_recv(&mut self, rtrn_tx: &Sender) { - for frame in self.protocol.read() { - self.handle(frame, rtrn_tx); + match &mut self.protocol { + ChannelProtocols::Tcp(c) => { + for frame in c.read() { + self.handle(frame, rtrn_tx); + } + }, + ChannelProtocols::Udp(c) => { + for frame in c.read() { + self.handle(frame, rtrn_tx); + } + }, + ChannelProtocols::Mpsc(c) => { + for frame in c.read() { + self.handle(frame, rtrn_tx); + } + }, } } pub fn tick_send(&mut self) { self.tick_streams(); - while let Some(frame) = self.send_queue.pop_front() { - self.protocol.write(frame) + match &mut self.protocol { + ChannelProtocols::Tcp(c) => { + while let Some(frame) = self.send_queue.pop_front() { + c.write(frame) + } + }, + ChannelProtocols::Udp(c) => { + while let Some(frame) = self.send_queue.pop_front() { + c.write(frame) + } + }, + ChannelProtocols::Mpsc(c) => { + while let Some(frame) = self.send_queue.pop_front() { + c.write(frame) + } + }, } } @@ -154,24 +197,6 @@ impl Channel

{ self.send_handshake = true; } }, - Frame::Configure { - stream_id_pool, - msg_id_pool, - } => { - self.recv_config = true; - //TODO remove range from rp! as this could probably cause duplicate ID !!! - let mut remotes = self.remotes.write().unwrap(); - if let Some(pid) = self.remote_pid { - if !remotes.contains_key(&pid) { - remotes.insert(pid, RemoteParticipant::new()); - } - if let Some(rp) = remotes.get_mut(&pid) { - self.stream_id_pool = Some(stream_id_pool); - self.msg_id_pool = Some(msg_id_pool); - } - } - info!("recv config. This channel is now configured!"); - }, Frame::ParticipantId { pid } => { if self.remote_pid.is_some() { error!(?pid, "invalid message, cant change participantId"); @@ -184,6 +209,11 @@ impl Channel

{ let mut remotes = self.remotes.write().unwrap(); if !remotes.contains_key(&pid) { remotes.insert(pid, RemoteParticipant::new()); + } else { + warn!( + "a known participant opened an additional channel, UNCHECKED BECAUSE \ + NO TOKEN WAS IMPLEMENTED IN THE HANDSHAKE!" + ); } if let Some(rp) = remotes.get_mut(&pid) { self.stream_id_pool = Some(rp.stream_id_pool.subpool(1000000).unwrap()); @@ -202,6 +232,31 @@ impl Channel

{ self.send_pid = true; } }, + Frame::Configure { + stream_id_pool, + msg_id_pool, + } => { + self.recv_config = true; + //TODO remove range from rp! as this could probably cause duplicate ID !!! + let mut remotes = self.remotes.write().unwrap(); + if let Some(pid) = self.remote_pid { + if !remotes.contains_key(&pid) { + remotes.insert(pid, RemoteParticipant::new()); + } + if let Some(rp) = remotes.get_mut(&pid) { + self.stream_id_pool = Some(stream_id_pool); + self.msg_id_pool = Some(msg_id_pool); + } + if let Some(send) = &self.return_pid_to { + info!("asdasd"); + send.send(pid); + }; + self.return_pid_to = None; + } else { + warn!(?self, "Protocol is done wrong!"); + } + info!("recv config. This channel is now configured!"); + }, Frame::Shutdown {} => { self.recv_shutdown = true; info!("shutting down channel"); @@ -335,10 +390,11 @@ impl Channel

{ } } - pub(crate) fn open_stream(&mut self, prio: u8, promises: EnumSet) -> u32 { + pub(crate) fn open_stream(&mut self, prio: u8, promises: EnumSet) -> Sid { // validate promises if let Some(stream_id_pool) = &mut self.stream_id_pool { let sid = stream_id_pool.next(); + trace!(?sid, "going to open a new stream"); let stream = Stream::new(sid, prio, promises.clone()); self.streams.push(stream); self.send_queue.push_back(Frame::OpenStream { @@ -347,13 +403,12 @@ impl Channel

{ promises, }); return sid; + } else { + panic!("cant open stream because connection isn't initialized"); } - error!("fix me"); - return 0; - //TODO: fix me } - pub(crate) fn close_stream(&mut self, sid: u32) { + pub(crate) fn close_stream(&mut self, sid: Sid) { self.streams.retain(|stream| stream.sid() != sid); self.send_queue.push_back(Frame::CloseStream { sid }); } @@ -372,12 +427,16 @@ impl Channel

{ } pub(crate) fn send(&mut self, outgoing: OutGoingMessage) { - //TODO: fix me for s in self.streams.iter_mut() { - s.to_send.push_back(outgoing); - break; + warn!("{}", s.sid()); + if s.sid() == outgoing.sid { + s.to_send.push_back(outgoing); + return; + } } + let sid = &outgoing.sid; + error!(?sid, "couldn't send message, didn't found sid") } - pub(crate) fn get_handle(&self) -> &P::Handle { self.protocol.get_handle() } + pub(crate) fn get_protocol(&self) -> &ChannelProtocols { &self.protocol } } diff --git a/network/src/worker/mpsc.rs b/network/src/worker/mpsc.rs index 76899f7370..7073df4106 100644 --- a/network/src/worker/mpsc.rs +++ b/network/src/worker/mpsc.rs @@ -53,3 +53,8 @@ impl ChannelProtocol for MpscChannel { fn get_handle(&self) -> &Self::Handle { &self.endpoint_receiver } } + +impl std::fmt::Debug for MpscChannel { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", "MPSC") } +} diff --git a/network/src/worker/tcp.rs b/network/src/worker/tcp.rs index 0c25739d6c..16b5ca10d4 100644 --- a/network/src/worker/tcp.rs +++ b/network/src/worker/tcp.rs @@ -4,7 +4,6 @@ use mio::net::TcpStream; use std::io::{Read, Write}; use tracing::*; -#[derive(Debug)] pub(crate) struct TcpChannel { endpoint: TcpStream, //these buffers only ever contain 1 FRAME ! @@ -87,3 +86,10 @@ impl ChannelProtocol for TcpChannel { fn get_handle(&self) -> &Self::Handle { &self.endpoint } } + +impl std::fmt::Debug for TcpChannel { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.endpoint) + } +} diff --git a/network/src/worker/types.rs b/network/src/worker/types.rs index ce9559e08e..8ac523d901 100644 --- a/network/src/worker/types.rs +++ b/network/src/worker/types.rs @@ -1,17 +1,24 @@ use crate::{ api::Promise, message::{InCommingMessage, OutGoingMessage}, - worker::{Channel, MpscChannel, TcpChannel, UdpChannel}, + worker::Channel, }; use enumset::EnumSet; use mio::{self, net::TcpListener, PollOpt, Ready}; -use mio_extras::channel::Sender; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use uuid::Uuid; +//Participant Ids are randomly chosen pub type Pid = Uuid; +//Stream Ids are unique per Participant* and are split in 2 ranges, one for +// every Network involved Every Channel gets a subrange during their handshake +// protocol from one of the 2 ranges +//*otherwise extra synchronization would be needed pub type Sid = u32; +//Message Ids are unique per Stream* and are split in 2 ranges, one for every +// Channel involved +//*otherwise extra synchronization would be needed pub type Mid = u64; // Used for Communication between Controller <--> Worker @@ -22,6 +29,7 @@ pub(crate) enum CtrlMsg { pid: Pid, prio: u8, promises: EnumSet, + return_sid: std::sync::mpsc::Sender, }, CloseStream { pid: Pid, @@ -44,23 +52,10 @@ pub(crate) enum RtrnMsg { Receive(InCommingMessage), } +#[derive(Debug)] pub(crate) enum TokenObjects { TcpListener(TcpListener), - TcpChannel(Channel, Option>), - UdpChannel(Channel, Option>), - MpscChannel(Channel, Option>), -} - -impl std::fmt::Debug for TokenObjects { - #[inline] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - TokenObjects::TcpListener(l) => write!(f, "{:?}", l), - TokenObjects::TcpChannel(c, _) => write!(f, "{:?}", c), - TokenObjects::UdpChannel(c, _) => write!(f, "{:?}", c), - TokenObjects::MpscChannel(c, _) => unimplemented!("MPSC"), - } - } + Channel(Channel), } #[derive(Debug)] diff --git a/network/src/worker/udp.rs b/network/src/worker/udp.rs index bf58e2cdd5..84287cc9ec 100644 --- a/network/src/worker/udp.rs +++ b/network/src/worker/udp.rs @@ -3,7 +3,6 @@ use bincode; use mio::net::UdpSocket; use tracing::*; -#[derive(Debug)] pub(crate) struct UdpChannel { endpoint: UdpSocket, read_buffer: Vec, @@ -82,3 +81,10 @@ impl ChannelProtocol for UdpChannel { fn get_handle(&self) -> &Self::Handle { &self.endpoint } } + +impl std::fmt::Debug for UdpChannel { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.endpoint) + } +} diff --git a/network/src/worker/worker.rs b/network/src/worker/worker.rs index f8e0e3a88d..68ce137d39 100644 --- a/network/src/worker/worker.rs +++ b/network/src/worker/worker.rs @@ -1,6 +1,7 @@ use crate::{ internal::RemoteParticipant, worker::{ + channel::{ChannelProtocol, ChannelProtocols}, metrics::NetworkMetrics, types::{CtrlMsg, Pid, RtrnMsg, TokenObjects}, Channel, Controller, TcpChannel, @@ -115,7 +116,7 @@ impl Worker { CtrlMsg::Shutdown => { debug!("Shutting Down"); for (tok, obj) in self.mio_tokens.tokens.iter_mut() { - if let TokenObjects::TcpChannel(channel, _) = obj { + if let TokenObjects::Channel(channel) = obj { channel.shutdown(); channel.tick_send(); } @@ -128,18 +129,20 @@ impl Worker { TokenObjects::TcpListener(h) => { self.poll.register(h, tok, interest, opts).unwrap() }, - TokenObjects::TcpChannel(channel, _) => self - .poll - .register(channel.get_handle(), tok, interest, opts) - .unwrap(), - TokenObjects::UdpChannel(channel, _) => self - .poll - .register(channel.get_handle(), tok, interest, opts) - .unwrap(), - TokenObjects::MpscChannel(channel, _) => self - .poll - .register(channel.get_handle(), tok, interest, opts) - .unwrap(), + TokenObjects::Channel(channel) => { + match channel.get_protocol() { + ChannelProtocols::Tcp(c) => { + self.poll.register(c.get_handle(), tok, interest, opts) + }, + ChannelProtocols::Udp(c) => { + self.poll.register(c.get_handle(), tok, interest, opts) + }, + ChannelProtocols::Mpsc(c) => { + self.poll.register(c.get_handle(), tok, interest, opts) + }, + } + .unwrap(); + }, } debug!(?handle, ?tok, "Registered new handle"); self.mio_tokens.insert(tok, handle); @@ -148,33 +151,53 @@ impl Worker { pid, prio, promises, + return_sid, } => { + let mut handled = false; for (tok, obj) in self.mio_tokens.tokens.iter_mut() { - if let TokenObjects::TcpChannel(channel, _) = obj { - channel.open_stream(prio, promises); //TODO: check participant - channel.tick_send(); + if let TokenObjects::Channel(channel) = obj { + if Some(pid) == channel.remote_pid { + let sid = channel.open_stream(prio, promises); + return_sid.send(sid); + channel.tick_send(); + handled = true; + break; + } } } - //TODO: + if !handled { + error!(?pid, "couldn't open Stream, didn't found pid"); + } }, CtrlMsg::CloseStream { pid, sid } => { - //TODO: + let mut handled = false; for to in self.mio_tokens.tokens.values_mut() { - if let TokenObjects::TcpChannel(channel, _) = to { - channel.close_stream(sid); //TODO: check participant - channel.tick_send(); + if let TokenObjects::Channel(channel) = to { + if Some(pid) == channel.remote_pid { + channel.close_stream(sid); //TODO: check participant + channel.tick_send(); + handled = true; + break; + } } } + if !handled { + error!(?pid, "couldn't close Stream, didn't found pid"); + } }, CtrlMsg::Send(outgoing) => { - //TODO: + let mut handled = false; for to in self.mio_tokens.tokens.values_mut() { - if let TokenObjects::TcpChannel(channel, _) = to { + if let TokenObjects::Channel(channel) = to { channel.send(outgoing); //TODO: check participant channel.tick_send(); + handled = true; break; } } + if !handled { + error!("help, we should check here for stream data, but its in channel ...."); + } }, }; false @@ -202,49 +225,32 @@ impl Worker { .unwrap(); trace!(?remote_stream, ?tok, "registered"); let tcp_channel = TcpChannel::new(remote_stream); - let mut channel = Channel::new(self.pid, tcp_channel, self.remotes.clone()); + let mut channel = Channel::new( + self.pid, + ChannelProtocols::Tcp(tcp_channel), + self.remotes.clone(), + None, + ); channel.handshake(); channel.tick_send(); self.mio_tokens .tokens - .insert(tok, TokenObjects::TcpChannel(channel, None)); + .insert(tok, TokenObjects::Channel(channel)); }, Err(err) => { error!(?err, "error during remote connected"); }, }, - TokenObjects::TcpChannel(channel, _) => { + TokenObjects::Channel(channel) => { if event.readiness().is_readable() { - let handle = channel.get_handle(); - trace!(?handle, "stream readable"); + let protocol = channel.get_protocol(); + trace!(?protocol, "channel readable"); channel.tick_recv(&self.rtrn_tx); } if event.readiness().is_writable() { - let handle = channel.get_handle(); - trace!(?handle, "stream writeable"); - channel.tick_send(); - } - }, - TokenObjects::UdpChannel(channel, _) => { - if event.readiness().is_readable() { - let handle = channel.get_handle(); - trace!(?handle, "stream readable"); - channel.tick_recv(&self.rtrn_tx); - } - if event.readiness().is_writable() { - let handle = channel.get_handle(); - trace!(?handle, "stream writeable"); - channel.tick_send(); - } - }, - TokenObjects::MpscChannel(channel, _) => { - if event.readiness().is_readable() { - let handle = channel.get_handle(); - channel.tick_recv(&self.rtrn_tx); - } - if event.readiness().is_writable() { - let handle = channel.get_handle(); + let protocol = channel.get_protocol(); + trace!(?protocol, "channel writeable"); channel.tick_send(); } },