Till now all operations where oneshots, now i actually wait for a participant handshake to complete and being able to return their PID

also fixed the correct pid, sid beeing send
This commit is contained in:
Marcel Märtens 2020-02-20 17:04:58 +01:00
parent 88f6b36a4e
commit e388b40c54
7 changed files with 213 additions and 120 deletions

View File

@ -2,6 +2,7 @@ use crate::{
internal::RemoteParticipant,
message::{self, OutGoingMessage},
worker::{
channel::ChannelProtocols,
metrics::NetworkMetrics,
types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects},
Channel, Controller, TcpChannel,
@ -154,17 +155,25 @@ impl<E: Events> Network<E> {
}
pub fn open(&self, part: &Participant, prio: u8, promises: EnumSet<Promise>) -> Stream {
for worker in self.controller.iter() {
worker
let (ctrl_tx, ctrl_rx) = std::sync::mpsc::channel::<Sid>();
for controller in self.controller.iter() {
controller
.get_tx()
.send(CtrlMsg::OpenStream {
pid: uuid::Uuid::new_v4(),
pid: part.remote_pid,
prio,
promises,
return_sid: ctrl_tx,
})
.unwrap();
break;
}
Stream { sid: 0 }
// I dont like the fact that i need to wait on the worker thread for getting my
// sid back :/ we could avoid this by introducing a Thread Local Network
// which owns some sids we can take without waiting
let sid = ctrl_rx.recv().unwrap();
info!(?sid, " sucessfully opened stream");
Stream { sid }
}
pub fn close(&self, stream: Stream) {}
@ -199,22 +208,28 @@ impl<E: Events> Network<E> {
info!("connecting");
let tcp_stream = TcpStream::connect(&a)?;
let tcp_channel = TcpChannel::new(tcp_stream);
let mut channel = Channel::new(pid, tcp_channel, remotes);
let (ctrl_tx, ctrl_rx) = mio_extras::channel::channel::<Pid>();
let (ctrl_tx, ctrl_rx) = std::sync::mpsc::channel::<Pid>();
let mut channel = Channel::new(
pid,
ChannelProtocols::Tcp(tcp_channel),
remotes,
Some(ctrl_tx),
);
worker.get_tx().send(CtrlMsg::Register(
TokenObjects::TcpChannel(channel, Some(ctrl_tx)),
TokenObjects::Channel(channel),
Ready::readable() | Ready::writable(),
PollOpt::edge(),
))?;
// wait for a return
let remote_pid = ctrl_rx.recv().unwrap();
info!(?remote_pid, " sucessfully connected to");
return Ok(Participant {
addr: address.clone(),
remote_pid,
});
},
Address::Udp(_) => unimplemented!("lazy me"),
}
Ok(Participant {
addr: address.clone(),
remote_pid: uuid::Uuid::new_v4(),
})
Err(NetworkError::Todo_Error_For_Wrong_Connection)
}
//TODO: evaluate if move to Participant
@ -284,6 +299,7 @@ impl Stream {
pub enum NetworkError {
NetworkDestroyed,
WorkerDestroyed,
Todo_Error_For_Wrong_Connection,
IoError(std::io::Error),
}

View File

@ -2,7 +2,12 @@ use crate::{
api::Promise,
internal::{RemoteParticipant, VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION},
message::{InCommingMessage, MessageBuffer, OutGoingMessage},
worker::types::{Frame, Mid, Pid, RtrnMsg, Sid, Stream},
worker::{
mpsc::MpscChannel,
tcp::TcpChannel,
types::{Frame, Mid, Pid, RtrnMsg, Sid, Stream},
udp::UdpChannel,
},
};
use enumset::EnumSet;
use mio_extras::channel::Sender;
@ -23,7 +28,14 @@ pub(crate) trait ChannelProtocol {
}
#[derive(Debug)]
pub(crate) struct Channel<P: ChannelProtocol> {
pub(crate) enum ChannelProtocols {
Tcp(TcpChannel),
Udp(UdpChannel),
Mpsc(MpscChannel),
}
#[derive(Debug)]
pub(crate) struct Channel {
pub stream_id_pool: Option<tlid::Pool<tlid::Wrapping<Sid>>>, /* TODO: stream_id unique per
* participant */
pub msg_id_pool: Option<tlid::Pool<tlid::Wrapping<Mid>>>, //TODO: msg_id unique per
@ -34,7 +46,8 @@ pub(crate) struct Channel<P: ChannelProtocol> {
pub streams: Vec<Stream>,
pub send_queue: VecDeque<Frame>,
pub recv_queue: VecDeque<InCommingMessage>,
pub protocol: P,
pub protocol: ChannelProtocols,
pub return_pid_to: Option<std::sync::mpsc::Sender<Pid>>,
pub send_handshake: bool,
pub send_pid: bool,
pub send_config: bool,
@ -59,7 +72,7 @@ pub(crate) struct Channel<P: ChannelProtocol> {
Shutdown phase
*/
impl<P: ChannelProtocol> Channel<P> {
impl Channel {
const WRONG_NUMBER: &'static [u8] = "Handshake does not contain the magic number requiered by \
veloren server.\nWe are not sure if you are a valid \
veloren client.\nClosing the connection"
@ -70,8 +83,9 @@ impl<P: ChannelProtocol> Channel<P> {
pub fn new(
local_pid: Pid,
protocol: P,
protocol: ChannelProtocols,
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
return_pid_to: Option<std::sync::mpsc::Sender<Pid>>,
) -> Self {
Self {
stream_id_pool: None,
@ -83,6 +97,7 @@ impl<P: ChannelProtocol> Channel<P> {
send_queue: VecDeque::new(),
recv_queue: VecDeque::new(),
protocol,
return_pid_to,
send_handshake: false,
send_pid: false,
send_config: false,
@ -105,15 +120,43 @@ impl<P: ChannelProtocol> Channel<P> {
}
pub fn tick_recv(&mut self, rtrn_tx: &Sender<RtrnMsg>) {
for frame in self.protocol.read() {
self.handle(frame, rtrn_tx);
match &mut self.protocol {
ChannelProtocols::Tcp(c) => {
for frame in c.read() {
self.handle(frame, rtrn_tx);
}
},
ChannelProtocols::Udp(c) => {
for frame in c.read() {
self.handle(frame, rtrn_tx);
}
},
ChannelProtocols::Mpsc(c) => {
for frame in c.read() {
self.handle(frame, rtrn_tx);
}
},
}
}
pub fn tick_send(&mut self) {
self.tick_streams();
while let Some(frame) = self.send_queue.pop_front() {
self.protocol.write(frame)
match &mut self.protocol {
ChannelProtocols::Tcp(c) => {
while let Some(frame) = self.send_queue.pop_front() {
c.write(frame)
}
},
ChannelProtocols::Udp(c) => {
while let Some(frame) = self.send_queue.pop_front() {
c.write(frame)
}
},
ChannelProtocols::Mpsc(c) => {
while let Some(frame) = self.send_queue.pop_front() {
c.write(frame)
}
},
}
}
@ -154,24 +197,6 @@ impl<P: ChannelProtocol> Channel<P> {
self.send_handshake = true;
}
},
Frame::Configure {
stream_id_pool,
msg_id_pool,
} => {
self.recv_config = true;
//TODO remove range from rp! as this could probably cause duplicate ID !!!
let mut remotes = self.remotes.write().unwrap();
if let Some(pid) = self.remote_pid {
if !remotes.contains_key(&pid) {
remotes.insert(pid, RemoteParticipant::new());
}
if let Some(rp) = remotes.get_mut(&pid) {
self.stream_id_pool = Some(stream_id_pool);
self.msg_id_pool = Some(msg_id_pool);
}
}
info!("recv config. This channel is now configured!");
},
Frame::ParticipantId { pid } => {
if self.remote_pid.is_some() {
error!(?pid, "invalid message, cant change participantId");
@ -184,6 +209,11 @@ impl<P: ChannelProtocol> Channel<P> {
let mut remotes = self.remotes.write().unwrap();
if !remotes.contains_key(&pid) {
remotes.insert(pid, RemoteParticipant::new());
} else {
warn!(
"a known participant opened an additional channel, UNCHECKED BECAUSE \
NO TOKEN WAS IMPLEMENTED IN THE HANDSHAKE!"
);
}
if let Some(rp) = remotes.get_mut(&pid) {
self.stream_id_pool = Some(rp.stream_id_pool.subpool(1000000).unwrap());
@ -202,6 +232,31 @@ impl<P: ChannelProtocol> Channel<P> {
self.send_pid = true;
}
},
Frame::Configure {
stream_id_pool,
msg_id_pool,
} => {
self.recv_config = true;
//TODO remove range from rp! as this could probably cause duplicate ID !!!
let mut remotes = self.remotes.write().unwrap();
if let Some(pid) = self.remote_pid {
if !remotes.contains_key(&pid) {
remotes.insert(pid, RemoteParticipant::new());
}
if let Some(rp) = remotes.get_mut(&pid) {
self.stream_id_pool = Some(stream_id_pool);
self.msg_id_pool = Some(msg_id_pool);
}
if let Some(send) = &self.return_pid_to {
info!("asdasd");
send.send(pid);
};
self.return_pid_to = None;
} else {
warn!(?self, "Protocol is done wrong!");
}
info!("recv config. This channel is now configured!");
},
Frame::Shutdown {} => {
self.recv_shutdown = true;
info!("shutting down channel");
@ -335,10 +390,11 @@ impl<P: ChannelProtocol> Channel<P> {
}
}
pub(crate) fn open_stream(&mut self, prio: u8, promises: EnumSet<Promise>) -> u32 {
pub(crate) fn open_stream(&mut self, prio: u8, promises: EnumSet<Promise>) -> Sid {
// validate promises
if let Some(stream_id_pool) = &mut self.stream_id_pool {
let sid = stream_id_pool.next();
trace!(?sid, "going to open a new stream");
let stream = Stream::new(sid, prio, promises.clone());
self.streams.push(stream);
self.send_queue.push_back(Frame::OpenStream {
@ -347,13 +403,12 @@ impl<P: ChannelProtocol> Channel<P> {
promises,
});
return sid;
} else {
panic!("cant open stream because connection isn't initialized");
}
error!("fix me");
return 0;
//TODO: fix me
}
pub(crate) fn close_stream(&mut self, sid: u32) {
pub(crate) fn close_stream(&mut self, sid: Sid) {
self.streams.retain(|stream| stream.sid() != sid);
self.send_queue.push_back(Frame::CloseStream { sid });
}
@ -372,12 +427,16 @@ impl<P: ChannelProtocol> Channel<P> {
}
pub(crate) fn send(&mut self, outgoing: OutGoingMessage) {
//TODO: fix me
for s in self.streams.iter_mut() {
s.to_send.push_back(outgoing);
break;
warn!("{}", s.sid());
if s.sid() == outgoing.sid {
s.to_send.push_back(outgoing);
return;
}
}
let sid = &outgoing.sid;
error!(?sid, "couldn't send message, didn't found sid")
}
pub(crate) fn get_handle(&self) -> &P::Handle { self.protocol.get_handle() }
pub(crate) fn get_protocol(&self) -> &ChannelProtocols { &self.protocol }
}

View File

@ -53,3 +53,8 @@ impl ChannelProtocol for MpscChannel {
fn get_handle(&self) -> &Self::Handle { &self.endpoint_receiver }
}
impl std::fmt::Debug for MpscChannel {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", "MPSC") }
}

View File

@ -4,7 +4,6 @@ use mio::net::TcpStream;
use std::io::{Read, Write};
use tracing::*;
#[derive(Debug)]
pub(crate) struct TcpChannel {
endpoint: TcpStream,
//these buffers only ever contain 1 FRAME !
@ -87,3 +86,10 @@ impl ChannelProtocol for TcpChannel {
fn get_handle(&self) -> &Self::Handle { &self.endpoint }
}
impl std::fmt::Debug for TcpChannel {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.endpoint)
}
}

View File

@ -1,17 +1,24 @@
use crate::{
api::Promise,
message::{InCommingMessage, OutGoingMessage},
worker::{Channel, MpscChannel, TcpChannel, UdpChannel},
worker::Channel,
};
use enumset::EnumSet;
use mio::{self, net::TcpListener, PollOpt, Ready};
use mio_extras::channel::Sender;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use uuid::Uuid;
//Participant Ids are randomly chosen
pub type Pid = Uuid;
//Stream Ids are unique per Participant* and are split in 2 ranges, one for
// every Network involved Every Channel gets a subrange during their handshake
// protocol from one of the 2 ranges
//*otherwise extra synchronization would be needed
pub type Sid = u32;
//Message Ids are unique per Stream* and are split in 2 ranges, one for every
// Channel involved
//*otherwise extra synchronization would be needed
pub type Mid = u64;
// Used for Communication between Controller <--> Worker
@ -22,6 +29,7 @@ pub(crate) enum CtrlMsg {
pid: Pid,
prio: u8,
promises: EnumSet<Promise>,
return_sid: std::sync::mpsc::Sender<Sid>,
},
CloseStream {
pid: Pid,
@ -44,23 +52,10 @@ pub(crate) enum RtrnMsg {
Receive(InCommingMessage),
}
#[derive(Debug)]
pub(crate) enum TokenObjects {
TcpListener(TcpListener),
TcpChannel(Channel<TcpChannel>, Option<Sender<Pid>>),
UdpChannel(Channel<UdpChannel>, Option<Sender<Pid>>),
MpscChannel(Channel<MpscChannel>, Option<Sender<Pid>>),
}
impl std::fmt::Debug for TokenObjects {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TokenObjects::TcpListener(l) => write!(f, "{:?}", l),
TokenObjects::TcpChannel(c, _) => write!(f, "{:?}", c),
TokenObjects::UdpChannel(c, _) => write!(f, "{:?}", c),
TokenObjects::MpscChannel(c, _) => unimplemented!("MPSC"),
}
}
Channel(Channel),
}
#[derive(Debug)]

View File

@ -3,7 +3,6 @@ use bincode;
use mio::net::UdpSocket;
use tracing::*;
#[derive(Debug)]
pub(crate) struct UdpChannel {
endpoint: UdpSocket,
read_buffer: Vec<u8>,
@ -82,3 +81,10 @@ impl ChannelProtocol for UdpChannel {
fn get_handle(&self) -> &Self::Handle { &self.endpoint }
}
impl std::fmt::Debug for UdpChannel {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.endpoint)
}
}

View File

@ -1,6 +1,7 @@
use crate::{
internal::RemoteParticipant,
worker::{
channel::{ChannelProtocol, ChannelProtocols},
metrics::NetworkMetrics,
types::{CtrlMsg, Pid, RtrnMsg, TokenObjects},
Channel, Controller, TcpChannel,
@ -115,7 +116,7 @@ impl Worker {
CtrlMsg::Shutdown => {
debug!("Shutting Down");
for (tok, obj) in self.mio_tokens.tokens.iter_mut() {
if let TokenObjects::TcpChannel(channel, _) = obj {
if let TokenObjects::Channel(channel) = obj {
channel.shutdown();
channel.tick_send();
}
@ -128,18 +129,20 @@ impl Worker {
TokenObjects::TcpListener(h) => {
self.poll.register(h, tok, interest, opts).unwrap()
},
TokenObjects::TcpChannel(channel, _) => self
.poll
.register(channel.get_handle(), tok, interest, opts)
.unwrap(),
TokenObjects::UdpChannel(channel, _) => self
.poll
.register(channel.get_handle(), tok, interest, opts)
.unwrap(),
TokenObjects::MpscChannel(channel, _) => self
.poll
.register(channel.get_handle(), tok, interest, opts)
.unwrap(),
TokenObjects::Channel(channel) => {
match channel.get_protocol() {
ChannelProtocols::Tcp(c) => {
self.poll.register(c.get_handle(), tok, interest, opts)
},
ChannelProtocols::Udp(c) => {
self.poll.register(c.get_handle(), tok, interest, opts)
},
ChannelProtocols::Mpsc(c) => {
self.poll.register(c.get_handle(), tok, interest, opts)
},
}
.unwrap();
},
}
debug!(?handle, ?tok, "Registered new handle");
self.mio_tokens.insert(tok, handle);
@ -148,33 +151,53 @@ impl Worker {
pid,
prio,
promises,
return_sid,
} => {
let mut handled = false;
for (tok, obj) in self.mio_tokens.tokens.iter_mut() {
if let TokenObjects::TcpChannel(channel, _) = obj {
channel.open_stream(prio, promises); //TODO: check participant
channel.tick_send();
if let TokenObjects::Channel(channel) = obj {
if Some(pid) == channel.remote_pid {
let sid = channel.open_stream(prio, promises);
return_sid.send(sid);
channel.tick_send();
handled = true;
break;
}
}
}
//TODO:
if !handled {
error!(?pid, "couldn't open Stream, didn't found pid");
}
},
CtrlMsg::CloseStream { pid, sid } => {
//TODO:
let mut handled = false;
for to in self.mio_tokens.tokens.values_mut() {
if let TokenObjects::TcpChannel(channel, _) = to {
channel.close_stream(sid); //TODO: check participant
channel.tick_send();
if let TokenObjects::Channel(channel) = to {
if Some(pid) == channel.remote_pid {
channel.close_stream(sid); //TODO: check participant
channel.tick_send();
handled = true;
break;
}
}
}
if !handled {
error!(?pid, "couldn't close Stream, didn't found pid");
}
},
CtrlMsg::Send(outgoing) => {
//TODO:
let mut handled = false;
for to in self.mio_tokens.tokens.values_mut() {
if let TokenObjects::TcpChannel(channel, _) = to {
if let TokenObjects::Channel(channel) = to {
channel.send(outgoing); //TODO: check participant
channel.tick_send();
handled = true;
break;
}
}
if !handled {
error!("help, we should check here for stream data, but its in channel ....");
}
},
};
false
@ -202,49 +225,32 @@ impl Worker {
.unwrap();
trace!(?remote_stream, ?tok, "registered");
let tcp_channel = TcpChannel::new(remote_stream);
let mut channel = Channel::new(self.pid, tcp_channel, self.remotes.clone());
let mut channel = Channel::new(
self.pid,
ChannelProtocols::Tcp(tcp_channel),
self.remotes.clone(),
None,
);
channel.handshake();
channel.tick_send();
self.mio_tokens
.tokens
.insert(tok, TokenObjects::TcpChannel(channel, None));
.insert(tok, TokenObjects::Channel(channel));
},
Err(err) => {
error!(?err, "error during remote connected");
},
},
TokenObjects::TcpChannel(channel, _) => {
TokenObjects::Channel(channel) => {
if event.readiness().is_readable() {
let handle = channel.get_handle();
trace!(?handle, "stream readable");
let protocol = channel.get_protocol();
trace!(?protocol, "channel readable");
channel.tick_recv(&self.rtrn_tx);
}
if event.readiness().is_writable() {
let handle = channel.get_handle();
trace!(?handle, "stream writeable");
channel.tick_send();
}
},
TokenObjects::UdpChannel(channel, _) => {
if event.readiness().is_readable() {
let handle = channel.get_handle();
trace!(?handle, "stream readable");
channel.tick_recv(&self.rtrn_tx);
}
if event.readiness().is_writable() {
let handle = channel.get_handle();
trace!(?handle, "stream writeable");
channel.tick_send();
}
},
TokenObjects::MpscChannel(channel, _) => {
if event.readiness().is_readable() {
let handle = channel.get_handle();
channel.tick_recv(&self.rtrn_tx);
}
if event.readiness().is_writable() {
let handle = channel.get_handle();
let protocol = channel.get_protocol();
trace!(?protocol, "channel writeable");
channel.tick_send();
}
},