Converting the API interface to Async and experimenting with a Channel implementation for TCP, UDP, MPSC, which will later be reverted

It should compile and tests run fine now.
If not, the 2nd last squashed commit message said it currently only send frames but not incomming messages, also recv would only handle frames. The last one said i added internal messages and a reverse path (prob for .recv)
This commit is contained in:
Marcel Märtens 2020-02-10 18:25:47 +01:00
parent 5c5b33bd2a
commit f3251c0879
11 changed files with 583 additions and 291 deletions

55
Cargo.lock generated
View File

@ -74,9 +74,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.27"
version = "1.0.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "013a6e0a2cbe3d20f9c60b65458f7a7f7a5e636c5d0f45a5a6aee5d4b1f01785"
checksum = "d9a60d744a80c30fcb657dfe2c1b22bcb3e814c1a1e3674f32bf5820b570fbff"
[[package]]
name = "anymap"
@ -375,7 +375,7 @@ dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"parking_lot 0.10.0",
"parking_lot 0.10.1",
"slab",
]
@ -423,9 +423,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.2.1"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12ae9db68ad7fac5fe51304d20f016c911539251075a214f8e663babefa35187"
checksum = "1f359dc14ff8911330a51ef78022d376f25ed00248912803b58f00cb1c27f742"
[[package]]
name = "byteorder"
@ -695,9 +695,9 @@ dependencies = [
[[package]]
name = "const-tweaker"
version = "0.2.5"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a7081900ff8f4b89046f8898eb8af6ed26be5a47299c56147d5a7dac74298b0"
checksum = "7fbe3e1d2fccd896d451adb486910a0bfc233fd6dcafdb4e13bac7de72f8f250"
dependencies = [
"anyhow",
"async-std",
@ -1066,9 +1066,9 @@ dependencies = [
[[package]]
name = "dashmap"
version = "3.7.0"
version = "3.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "010ef3f25ed5bb93505a3238d19957622190268640526aab07174c66ccf5d611"
checksum = "0f87a04c37da1d3d27db1fb7f372802b72fb8c3ff3e9c0914530995127f4a6a1"
dependencies = [
"ahash 0.3.2",
"cfg-if",
@ -3178,12 +3178,12 @@ dependencies = [
[[package]]
name = "parking_lot"
version = "0.10.0"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92e98c49ab0b7ce5b222f2cc9193fc4efe11c6d0bd4f648e374684a6857b1cfc"
checksum = "6fdfcb5f20930a79e326f7ec992a9fdb5b7bd809254b1e735bdd5a99f78bee0d"
dependencies = [
"lock_api",
"parking_lot_core 0.7.0",
"parking_lot_core 0.7.2",
]
[[package]]
@ -3215,9 +3215,9 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.7.0"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7582838484df45743c8434fbff785e8edf260c28748353d44bc0da32e0ceabf1"
checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3"
dependencies = [
"cfg-if",
"cloudabi",
@ -3420,9 +3420,9 @@ dependencies = [
[[package]]
name = "proc-macro-nested"
version = "0.1.3"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "369a6ed065f249a159e06c45752c780bda2fb53c995718f9e484d08daa9eb42e"
checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694"
[[package]]
name = "proc-macro2"
@ -4144,18 +4144,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "serde"
version = "1.0.105"
version = "1.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e707fbbf255b8fc8c3b99abb91e7257a622caeb20a9818cbadbeeede4e0932ff"
checksum = "36df6ac6412072f67cf767ebbde4133a5b2e88e76dc6187fa7104cd16f783399"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.105"
version = "1.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac5d00fc561ba2724df6758a17de23df5914f20e41cb00f94d5b7ae42fffaff8"
checksum = "9e549e3abf4fb8621bd1609f11dfc9f5e50320802273b12f3811a67e6716ea6c"
dependencies = [
"proc-macro2 1.0.9",
"quote 1.0.3",
@ -5086,9 +5086,9 @@ dependencies = [
[[package]]
name = "vek"
version = "0.10.0"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c98f7e1c1400d5b1704baee82cbc56a3fde406769555ead0f2306e43ebab967"
checksum = "761f71ebd4296be71d1c584aa41a1ab8f3e5e646357fefce387b54381c151926"
dependencies = [
"approx 0.3.2",
"num-integer",
@ -5120,7 +5120,7 @@ dependencies = [
"num_cpus",
"specs",
"uvth",
"vek 0.10.0",
"vek 0.10.2",
"veloren-common",
]
@ -5154,7 +5154,7 @@ dependencies = [
"specs",
"specs-idvs",
"sum_type",
"vek 0.10.0",
"vek 0.10.2",
]
[[package]]
@ -5164,6 +5164,7 @@ dependencies = [
"bincode",
"byteorder 1.3.4",
"enumset",
"futures 0.3.4",
"mio",
"mio-extras",
"serde",
@ -5202,7 +5203,7 @@ dependencies = [
"specs",
"specs-idvs",
"uvth",
"vek 0.10.0",
"vek 0.10.2",
"veloren-common",
"veloren-world",
]
@ -5260,7 +5261,7 @@ dependencies = [
"specs-idvs",
"treeculler",
"uvth",
"vek 0.10.0",
"vek 0.10.2",
"veloren-client",
"veloren-common",
"veloren-server",
@ -5295,7 +5296,7 @@ dependencies = [
"roots",
"serde",
"serde_derive",
"vek 0.10.0",
"vek 0.10.2",
"veloren-common",
]

View File

@ -18,5 +18,6 @@ tracing = "0.1"
tracing-subscriber = "0.2.0-alpha.4"
byteorder = "1.3"
mio-extras = "2.0"
futures = "0.3"
uuid = { version = "0.8", features = ["serde", "v4"] }
tlid = { path = "../../tlid", features = ["serde"]}

View File

@ -2,10 +2,8 @@ use crate::{
internal::RemoteParticipant,
message::{self, OutGoingMessage},
worker::{
channel::Channel,
tcp::TcpChannel,
types::{CtrlMsg, Pid, RtrnMsg, Sid, TokenObjects},
Controller,
Channel, Controller, TcpChannel,
},
};
use enumset::*;
@ -42,6 +40,7 @@ pub enum Promise {
pub struct Participant {
addr: Address,
remote_pid: Pid,
}
pub struct Connection {}
@ -114,12 +113,15 @@ impl<E: Events> Network<E> {
// should almost ever be empty except for new channel creations and stream
// creations!
for worker in self.controller.iter() {
worker.get_tx().send(CtrlMsg::Send(OutGoingMessage {
buffer: messagebuffer.clone(),
cursor: 0,
mid: None,
sid: stream.sid,
}));
worker
.get_tx()
.send(CtrlMsg::Send(OutGoingMessage {
buffer: messagebuffer.clone(),
cursor: 0,
mid: None,
sid: stream.sid,
}))
.unwrap();
}
}
@ -146,72 +148,154 @@ impl<E: Events> Network<E> {
None
}
pub fn listen(&self, addr: &Address) {
let worker = Self::get_lowest_worker(&self.controller);
let pipe = worker.get_tx();
let address = addr.clone();
self.thread_pool.execute(move || {
let span = span!(Level::INFO, "listen", ?address);
let _enter = span.enter();
match address {
Address::Tcp(a) => {
info!("listening");
let tcp_listener = TcpListener::bind(&a).unwrap();
pipe.send(CtrlMsg::Register(
TokenObjects::TcpListener(tcp_listener),
Ready::readable(),
PollOpt::edge(),
))
.unwrap();
},
Address::Udp(_) => unimplemented!("lazy me"),
}
});
}
pub fn connect(&self, addr: &Address) -> Participant {
let worker = Self::get_lowest_worker(&self.controller);
let pipe = worker.get_tx();
let address = addr.clone();
let pid = self.participant_id;
let remotes = self.remotes.clone();
self.thread_pool.execute(move || {
let mut span = span!(Level::INFO, "connect", ?address);
let _enter = span.enter();
match address {
Address::Tcp(a) => {
info!("connecting");
let tcp_stream = match TcpStream::connect(&a) {
Err(err) => {
error!("could not open connection: {}", err);
return;
},
Ok(s) => s,
};
let mut channel = TcpChannel::new(tcp_stream, pid, remotes);
pipe.send(CtrlMsg::Register(
TokenObjects::TcpChannel(channel),
Ready::readable() | Ready::writable(),
PollOpt::edge(),
))
.unwrap();
},
Address::Udp(_) => unimplemented!("lazy me"),
}
});
Participant { addr: addr.clone() }
}
pub fn open(&self, part: Participant, prio: u8, promises: EnumSet<Promise>) -> Stream {
pub fn open(&self, part: &Participant, prio: u8, promises: EnumSet<Promise>) -> Stream {
for worker in self.controller.iter() {
worker.get_tx().send(CtrlMsg::OpenStream {
pid: uuid::Uuid::new_v4(),
prio,
promises,
});
worker
.get_tx()
.send(CtrlMsg::OpenStream {
pid: uuid::Uuid::new_v4(),
prio,
promises,
})
.unwrap();
}
Stream { sid: 0 }
}
pub fn close(&self, stream: Stream) {}
pub async fn listen(&self, address: &Address) -> Result<(), NetworkError> {
let span = span!(Level::TRACE, "listen", ?address);
let worker = Self::get_lowest_worker(&self.controller);
let _enter = span.enter();
match address {
Address::Tcp(a) => {
let tcp_listener = TcpListener::bind(&a)?;
info!("listening");
worker.get_tx().send(CtrlMsg::Register(
TokenObjects::TcpListener(tcp_listener),
Ready::readable(),
PollOpt::edge(),
))?;
},
Address::Udp(_) => unimplemented!("lazy me"),
};
Ok(())
}
pub async fn connect(&self, address: &Address) -> Result<Participant, NetworkError> {
let worker = Self::get_lowest_worker(&self.controller);
let pid = self.participant_id;
let remotes = self.remotes.clone();
let mut span = span!(Level::INFO, "connect", ?address);
let _enter = span.enter();
match address {
Address::Tcp(a) => {
info!("connecting");
let tcp_stream = TcpStream::connect(&a)?;
let tcp_channel = TcpChannel::new(tcp_stream);
let mut channel = Channel::new(pid, tcp_channel, remotes);
let (ctrl_tx, ctrl_rx) = mio_extras::channel::channel::<Pid>();
worker.get_tx().send(CtrlMsg::Register(
TokenObjects::TcpChannel(channel, Some(ctrl_tx)),
Ready::readable() | Ready::writable(),
PollOpt::edge(),
))?;
// wait for a return
},
Address::Udp(_) => unimplemented!("lazy me"),
}
Ok(Participant {
addr: address.clone(),
remote_pid: uuid::Uuid::new_v4(),
})
}
//TODO: evaluate if move to Participant
pub async fn _disconnect(&self, participant: Participant) -> Result<(), NetworkError> {
panic!("sda");
}
pub fn participants(&self) -> Vec<Participant> {
panic!("sda");
}
pub async fn _connected(&self) -> Result<Participant, NetworkError> {
// returns if a Participant connected and is ready
panic!("sda");
}
pub async fn _disconnected(&self) -> Result<Participant, NetworkError> {
// returns if a Participant connected and is ready
panic!("sda");
}
pub async fn multisend<M: Serialize>(
&self,
streams: Vec<Stream>,
msg: M,
) -> Result<(), NetworkError> {
panic!("sda");
}
}
impl Participant {
pub async fn _open(
&self,
prio: u8,
promises: EnumSet<Promise>,
) -> Result<Stream, ParticipantError> {
panic!("sda");
}
pub async fn _close(&self, stream: Stream) -> Result<(), ParticipantError> {
panic!("sda");
}
pub async fn _opened(&self) -> Result<Stream, ParticipantError> {
panic!("sda");
}
pub async fn _closed(&self) -> Result<Stream, ParticipantError> {
panic!("sda");
}
}
impl Stream {
//TODO: What about SEND instead of Serializeable if it goes via PIPE ?
//TODO: timeout per message or per stream ? stream or ?
pub async fn _send<M: Serialize>(&self, msg: M) -> Result<(), StreamError> {
panic!("sda");
}
pub async fn _recv<M: DeserializeOwned>(&self) -> Result<M, StreamError> {
panic!("sda");
}
}
#[derive(Debug)]
pub enum NetworkError {
NetworkDestroyed,
WorkerDestroyed,
IoError(std::io::Error),
}
#[derive(Debug)]
pub enum ParticipantError {
ParticipantDisconected,
}
#[derive(Debug)]
pub enum StreamError {
StreamClosed,
}
impl From<std::io::Error> for NetworkError {
fn from(err: std::io::Error) -> Self { NetworkError::IoError(err) }
}
impl<T> From<mio_extras::channel::SendError<T>> for NetworkError {
fn from(err: mio_extras::channel::SendError<T>) -> Self { NetworkError::WorkerDestroyed }
}

View File

@ -7,6 +7,7 @@ mod worker;
#[cfg(test)]
pub mod tests {
use crate::api::*;
use futures::executor::block_on;
use std::{net::SocketAddr, sync::Arc};
use tracing::*;
use uuid::Uuid;
@ -27,8 +28,6 @@ pub mod tests {
}
pub fn test_tracing() {
use tracing::Level;
tracing_subscriber::FmtSubscriber::builder()
// all spans/events with a level higher than TRACE (e.g, info, warn, etc.)
// will be written to stdout.
@ -43,8 +42,51 @@ pub mod tests {
assert_eq!(2 + 2, 4);
}
/*
#[test]
#[ignore]
fn client_server() {
let thread_pool = Arc::new(
ThreadPoolBuilder::new()
.name("veloren-network-test".into())
.build(),
);
test_tracing();
let n1 = Network::<N>::new(Uuid::new_v4(), thread_pool.clone());
let n2 = Network::<N>::new(Uuid::new_v4(), thread_pool.clone());
let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000)));
let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52001)));
n1.listen(&a1); //await
n2.listen(&a2); // only requiered here, but doesnt hurt on n1
std::thread::sleep(std::time::Duration::from_millis(20));
let p1 = n1.connect(&a2); //await
//n2.OnRemoteConnectionOpen triggered
std::thread::sleep(std::time::Duration::from_millis(20));
let s1 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt);
std::thread::sleep(std::time::Duration::from_millis(20));
//n2.OnRemoteStreamOpen triggered
n1.send("Hello World", &s1);
std::thread::sleep(std::time::Duration::from_millis(20));
// receive on n2 now
let s: Option<String> = n2.recv(&s1);
for _ in 1..4 {
error!("{:?}", s);
}
assert_eq!(s, Some("Hello World".to_string()));
n1.close(s1);
//n2.OnRemoteStreamClose triggered
std::thread::sleep(std::time::Duration::from_millis(20000));
}
*/
#[test]
fn client_server() {
fn client_server_stream() {
let thread_pool = Arc::new(
ThreadPoolBuilder::new()
.name("veloren-network-test".into())
@ -53,33 +95,28 @@ pub mod tests {
test_tracing();
let n1 = Network::<N>::new(Uuid::new_v4(), thread_pool.clone());
let n2 = Network::<N>::new(Uuid::new_v4(), thread_pool.clone());
let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000)));
let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52001)));
n1.listen(&a1); //await
n2.listen(&a2); // only requiered here, but doesnt hurt on n1
let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52010)));
let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52011)));
block_on(n1.listen(&a1)).unwrap(); //await
block_on(n2.listen(&a2)).unwrap(); // only requiered here, but doesnt hurt on n1
std::thread::sleep(std::time::Duration::from_millis(20));
let p1 = n1.connect(&a2); //await
//n2.OnRemoteConnectionOpen triggered
let p1 = block_on(n1.connect(&a2)); //await
let p1 = p1.unwrap();
std::thread::sleep(std::time::Duration::from_millis(20));
let s1 = n1.open(p1, 16, Promise::InOrder | Promise::NoCorrupt);
let s1 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt);
//let s2 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt);
std::thread::sleep(std::time::Duration::from_millis(20));
//n2.OnRemoteStreamOpen triggered
n1.send("Hello World", &s1);
std::thread::sleep(std::time::Duration::from_millis(20));
// receive on n2 now
std::thread::sleep(std::time::Duration::from_millis(1000));
let s: Option<String> = n2.recv(&s1);
for _ in 1..4 {
error!("{:?}", s);
}
assert_eq!(s, Some("Hello World".to_string()));
n1.close(s1);
//n2.OnRemoteStreamClose triggered
std::thread::sleep(std::time::Duration::from_millis(20000));
}
}

View File

@ -9,33 +9,21 @@ use mio_extras::channel::Sender;
use std::{
collections::{HashMap, VecDeque},
sync::{Arc, RwLock},
time::Instant,
};
use tracing::*;
pub(crate) trait Channel {
/*
uninitialized_dirty_speed_buffer: is just a already allocated buffer, that probably is already dirty because it's getting reused to save allocations, feel free to use it, but expect nothing
aprox_time is the time taken when the events come in, you can reuse it for message timeouts, to not make any more syscalls
*/
pub(crate) trait ChannelProtocol {
type Handle: ?Sized + mio::Evented;
/// Execute when ready to read
fn read(
&mut self,
uninitialized_dirty_speed_buffer: &mut [u8; 65000],
aprox_time: Instant,
rtrn_tx: &Sender<RtrnMsg>,
);
fn read(&mut self) -> Vec<Frame>;
/// Execute when ready to write
fn write(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant);
fn open_stream(&mut self, prio: u8, promises: EnumSet<Promise>) -> u32;
fn close_stream(&mut self, sid: u32);
fn handshake(&mut self);
fn shutdown(&mut self);
fn send(&mut self, outgoing: OutGoingMessage);
fn write(&mut self, frame: Frame);
/// used for mio
fn get_handle(&self) -> &Self::Handle;
}
#[derive(Debug)]
pub(crate) struct ChannelState {
pub(crate) struct Channel<P: ChannelProtocol> {
pub stream_id_pool: Option<tlid::Pool<tlid::Wrapping<Sid>>>, /* TODO: stream_id unique per
* participant */
pub msg_id_pool: Option<tlid::Pool<tlid::Wrapping<Mid>>>, //TODO: msg_id unique per
@ -46,6 +34,7 @@ pub(crate) struct ChannelState {
pub streams: Vec<Stream>,
pub send_queue: VecDeque<Frame>,
pub recv_queue: VecDeque<InCommingMessage>,
pub protocol: P,
pub send_handshake: bool,
pub send_pid: bool,
pub send_config: bool,
@ -70,7 +59,7 @@ pub(crate) struct ChannelState {
Shutdown phase
*/
impl ChannelState {
impl<P: ChannelProtocol> Channel<P> {
const WRONG_NUMBER: &'static [u8] = "Handshake does not contain the magic number requiered by \
veloren server.\nWe are not sure if you are a valid \
veloren client.\nClosing the connection"
@ -79,8 +68,12 @@ impl ChannelState {
invalid version.\nWe don't know how to communicate with \
you.\n";
pub fn new(local_pid: Pid, remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>) -> Self {
ChannelState {
pub fn new(
local_pid: Pid,
protocol: P,
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
) -> Self {
Self {
stream_id_pool: None,
msg_id_pool: None,
local_pid,
@ -89,6 +82,7 @@ impl ChannelState {
streams: Vec::new(),
send_queue: VecDeque::new(),
recv_queue: VecDeque::new(),
protocol,
send_handshake: false,
send_pid: false,
send_config: false,
@ -110,7 +104,20 @@ impl ChannelState {
&& !self.recv_shutdown
}
pub fn handle(&mut self, frame: Frame, rtrn_tx: &Sender<RtrnMsg>) {
pub fn tick_recv(&mut self, rtrn_tx: &Sender<RtrnMsg>) {
for frame in self.protocol.read() {
self.handle(frame, rtrn_tx);
}
}
pub fn tick_send(&mut self) {
self.tick_streams();
while let Some(frame) = self.send_queue.pop_front() {
self.protocol.write(frame)
}
}
fn handle(&mut self, frame: Frame, rtrn_tx: &Sender<RtrnMsg>) {
match frame {
Frame::Handshake {
magic_number,
@ -261,9 +268,9 @@ impl ChannelState {
}
if let Some(pos) = pos {
for m in s.to_receive.drain(pos..pos + 1) {
info!("receied message: {}", m.mid);
info!("received message: {}", m.mid);
//self.recv_queue.push_back(m);
rtrn_tx.send(RtrnMsg::Receive(m));
rtrn_tx.send(RtrnMsg::Receive(m)).unwrap();
}
}
}
@ -279,7 +286,7 @@ impl ChannelState {
// This function will tick all streams according to priority and add them to the
// send queue
pub(crate) fn tick_streams(&mut self) {
fn tick_streams(&mut self) {
//ignoring prio for now
//TODO: fix prio
if let Some(msg_id_pool) = &mut self.msg_id_pool {
@ -327,4 +334,50 @@ impl ChannelState {
self.send_shutdown = true;
}
}
pub(crate) fn open_stream(&mut self, prio: u8, promises: EnumSet<Promise>) -> u32 {
// validate promises
if let Some(stream_id_pool) = &mut self.stream_id_pool {
let sid = stream_id_pool.next();
let stream = Stream::new(sid, prio, promises.clone());
self.streams.push(stream);
self.send_queue.push_back(Frame::OpenStream {
sid,
prio,
promises,
});
return sid;
}
error!("fix me");
return 0;
//TODO: fix me
}
pub(crate) fn close_stream(&mut self, sid: u32) {
self.streams.retain(|stream| stream.sid() != sid);
self.send_queue.push_back(Frame::CloseStream { sid });
}
pub(crate) fn handshake(&mut self) {
self.send_queue.push_back(Frame::Handshake {
magic_number: VELOREN_MAGIC_NUMBER.to_string(),
version: VELOREN_NETWORK_VERSION,
});
self.send_handshake = true;
}
pub(crate) fn shutdown(&mut self) {
self.send_queue.push_back(Frame::Shutdown {});
self.send_shutdown = true;
}
pub(crate) fn send(&mut self, outgoing: OutGoingMessage) {
//TODO: fix me
for s in self.streams.iter_mut() {
s.to_send.push_back(outgoing);
break;
}
}
pub(crate) fn get_handle(&self) -> &P::Handle { self.protocol.get_handle() }
}

View File

@ -5,10 +5,17 @@
communication is done via channels.
*/
pub mod channel;
pub mod mpsc;
pub mod tcp;
pub mod types;
pub mod udp;
pub mod worker;
pub(crate) use channel::Channel;
pub(crate) use mpsc::MpscChannel;
pub(crate) use tcp::TcpChannel;
pub(crate) use udp::UdpChannel;
use crate::{
internal::RemoteParticipant,
worker::{

View File

@ -0,0 +1,55 @@
use crate::worker::{channel::ChannelProtocol, types::Frame};
use mio_extras::channel::{Receiver, Sender};
use tracing::*;
pub(crate) struct MpscChannel {
endpoint_sender: Sender<Frame>,
endpoint_receiver: Receiver<Frame>,
}
impl MpscChannel {}
impl ChannelProtocol for MpscChannel {
type Handle = Receiver<Frame>;
/// Execute when ready to read
fn read(&mut self) -> Vec<Frame> {
let mut result = Vec::new();
loop {
match self.endpoint_receiver.try_recv() {
Ok(frame) => {
trace!("incomming message");
result.push(frame);
},
Err(std::sync::mpsc::TryRecvError::Empty) => {
debug!("would block");
break;
},
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
panic!("disconnected");
},
};
}
result
}
/// Execute when ready to write
fn write(&mut self, frame: Frame) {
match self.endpoint_sender.send(frame) {
Ok(n) => {
trace!("semded");
},
Err(mio_extras::channel::SendError::Io(e))
if e.kind() == std::io::ErrorKind::WouldBlock =>
{
debug!("would block");
return;
}
Err(e) => {
panic!("{}", e);
},
};
}
fn get_handle(&self) -> &Self::Handle { &self.endpoint_receiver }
}

View File

@ -1,69 +1,51 @@
use crate::{
api::Promise,
internal::{RemoteParticipant, VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION},
message::OutGoingMessage,
worker::{
channel::{Channel, ChannelState},
types::{Pid, RtrnMsg, Stream, TcpFrame},
},
};
use crate::worker::{channel::ChannelProtocol, types::Frame};
use bincode;
use enumset::EnumSet;
use mio::{self, net::TcpStream};
use mio_extras::channel::Sender;
use std::{
collections::HashMap,
io::{Read, Write},
sync::{Arc, RwLock},
time::Instant,
};
use mio::net::TcpStream;
use std::io::{Read, Write};
use tracing::*;
#[derive(Debug)]
pub(crate) struct TcpChannel {
state: ChannelState,
pub tcpstream: TcpStream,
endpoint: TcpStream,
//these buffers only ever contain 1 FRAME !
read_buffer: Vec<u8>,
write_buffer: Vec<u8>,
}
impl TcpChannel {
pub fn new(
tcpstream: TcpStream,
local_pid: Pid,
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
) -> Self {
TcpChannel {
state: ChannelState::new(local_pid, remotes),
tcpstream,
pub fn new(endpoint: TcpStream) -> Self {
let mut b = vec![0; 200];
Self {
endpoint,
read_buffer: b.clone(),
write_buffer: b,
}
}
}
impl Channel for TcpChannel {
fn read(
&mut self,
uninitialized_dirty_speed_buffer: &mut [u8; 65000],
aprox_time: Instant,
rtrn_tx: &Sender<RtrnMsg>,
) {
let pid = self.state.remote_pid;
let span = span!(Level::INFO, "channel", ?pid);
let _enter = span.enter();
match self.tcpstream.read(uninitialized_dirty_speed_buffer) {
impl ChannelProtocol for TcpChannel {
type Handle = TcpStream;
/// Execute when ready to read
fn read(&mut self) -> Vec<Frame> {
let mut result = Vec::new();
match self.endpoint.read(self.read_buffer.as_mut_slice()) {
Ok(n) => {
trace!("incomming message with len: {}", n);
let mut cur = std::io::Cursor::new(&uninitialized_dirty_speed_buffer[..n]);
let mut cur = std::io::Cursor::new(&self.read_buffer[..n]);
while cur.position() < n as u64 {
let r: Result<TcpFrame, _> = bincode::deserialize_from(&mut cur);
let r: Result<Frame, _> = bincode::deserialize_from(&mut cur);
match r {
Ok(frame) => self.state.handle(frame, rtrn_tx),
Ok(frame) => result.push(frame),
Err(e) => {
error!(
?self,
?e,
"failure parsing a message with len: {}, starting with: {:?}",
n,
&uninitialized_dirty_speed_buffer[0..std::cmp::min(n, 10)]
&self.read_buffer[0..std::cmp::min(n, 10)]
);
break;
},
}
}
@ -75,85 +57,33 @@ impl Channel for TcpChannel {
panic!("{}", e);
},
};
result
}
fn write(&mut self, uninitialized_dirty_speed_buffer: &mut [u8; 65000], aprox_time: Instant) {
let pid = self.state.remote_pid;
let span = span!(Level::INFO, "channel", ?pid);
let _enter = span.enter();
loop {
while let Some(elem) = self.state.send_queue.pop_front() {
if let Ok(mut data) = bincode::serialize(&elem) {
let total = data.len();
match self.tcpstream.write(&data) {
Ok(n) if n == total => {},
Ok(n) => {
error!("could only send part");
//let data = data.drain(n..).collect(); //TODO:
// validate n.. is correct
// to_send.push_front(data);
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("would block");
return;
},
Err(e) => {
panic!("{}", e);
},
};
};
}
// run streams
self.state.tick_streams();
if self.state.send_queue.is_empty() {
break;
}
}
/// Execute when ready to write
fn write(&mut self, frame: Frame) {
if let Ok(mut data) = bincode::serialize(&frame) {
let total = data.len();
match self.endpoint.write(&data) {
Ok(n) if n == total => {
trace!("send!");
},
Ok(n) => {
error!("could only send part");
//let data = data.drain(n..).collect(); //TODO:
// validate n.. is correct
// to_send.push_front(data);
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("would block");
return;
},
Err(e) => {
panic!("{}", e);
},
};
};
}
fn open_stream(&mut self, prio: u8, promises: EnumSet<Promise>) -> u32 {
// validate promises
if let Some(stream_id_pool) = &mut self.state.stream_id_pool {
let sid = stream_id_pool.next();
let stream = Stream::new(sid, prio, promises.clone());
self.state.streams.push(stream);
self.state.send_queue.push_back(TcpFrame::OpenStream {
sid,
prio,
promises,
});
return sid;
}
error!("fix me");
return 0;
//TODO: fix me
}
fn close_stream(&mut self, sid: u32) {
self.state.streams.retain(|stream| stream.sid() != sid);
self.state
.send_queue
.push_back(TcpFrame::CloseStream { sid });
}
fn handshake(&mut self) {
self.state.send_queue.push_back(TcpFrame::Handshake {
magic_number: VELOREN_MAGIC_NUMBER.to_string(),
version: VELOREN_NETWORK_VERSION,
});
self.state.send_handshake = true;
}
fn shutdown(&mut self) {
self.state.send_queue.push_back(TcpFrame::Shutdown {});
self.state.send_shutdown = true;
}
fn send(&mut self, outgoing: OutGoingMessage) {
//TODO: fix me
for s in self.state.streams.iter_mut() {
s.to_send.push_back(outgoing);
break;
}
}
fn get_handle(&self) -> &Self::Handle { &self.endpoint }
}

View File

@ -1,10 +1,11 @@
use crate::{
api::Promise,
message::{InCommingMessage, OutGoingMessage},
worker::tcp::TcpChannel,
worker::{Channel, MpscChannel, TcpChannel, UdpChannel},
};
use enumset::EnumSet;
use mio::{self, net::TcpListener, PollOpt, Ready};
use mio_extras::channel::Sender;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use uuid::Uuid;
@ -50,10 +51,23 @@ pub struct Statistics {
pub nano_busy: u128,
}
#[derive(Debug)]
pub(crate) enum TokenObjects {
TcpListener(TcpListener),
TcpChannel(TcpChannel),
TcpChannel(Channel<TcpChannel>, Option<Sender<Pid>>),
UdpChannel(Channel<UdpChannel>, Option<Sender<Pid>>),
MpscChannel(Channel<MpscChannel>, Option<Sender<Pid>>),
}
impl std::fmt::Debug for TokenObjects {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TokenObjects::TcpListener(l) => write!(f, "{:?}", l),
TokenObjects::TcpChannel(c, _) => write!(f, "{:?}", c),
TokenObjects::UdpChannel(c, _) => write!(f, "{:?}", c),
TokenObjects::MpscChannel(c, _) => unimplemented!("MPSC"),
}
}
}
#[derive(Debug)]
@ -121,6 +135,3 @@ pub(crate) enum Frame {
* against veloren Server! */
Raw(Vec<u8>),
}
pub(crate) type TcpFrame = Frame;
pub(crate) type UdpFrame = Frame;

84
network/src/worker/udp.rs Normal file
View File

@ -0,0 +1,84 @@
use crate::worker::{channel::ChannelProtocol, types::Frame};
use bincode;
use mio::net::UdpSocket;
use tracing::*;
#[derive(Debug)]
pub(crate) struct UdpChannel {
endpoint: UdpSocket,
read_buffer: Vec<u8>,
write_buffer: Vec<u8>,
}
impl UdpChannel {
pub fn new(endpoint: UdpSocket) -> Self {
Self {
endpoint,
read_buffer: Vec::new(),
write_buffer: Vec::new(),
}
}
}
impl ChannelProtocol for UdpChannel {
type Handle = UdpSocket;
/// Execute when ready to read
fn read(&mut self) -> Vec<Frame> {
let mut result = Vec::new();
match self.endpoint.recv_from(self.read_buffer.as_mut_slice()) {
Ok((n, remote)) => {
trace!("incomming message with len: {}", n);
let mut cur = std::io::Cursor::new(&self.read_buffer[..n]);
while cur.position() < n as u64 {
let r: Result<Frame, _> = bincode::deserialize_from(&mut cur);
match r {
Ok(frame) => result.push(frame),
Err(e) => {
error!(
?self,
?e,
"failure parsing a message with len: {}, starting with: {:?}",
n,
&self.read_buffer[0..std::cmp::min(n, 10)]
);
break;
},
}
}
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("would block");
},
Err(e) => {
panic!("{}", e);
},
};
result
}
/// Execute when ready to write
fn write(&mut self, frame: Frame) {
if let Ok(mut data) = bincode::serialize(&frame) {
let total = data.len();
match self.endpoint.send(&data) {
Ok(n) if n == total => {},
Ok(n) => {
error!("could only send part");
//let data = data.drain(n..).collect(); //TODO:
// validate n.. is correct
// to_send.push_front(data);
},
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
debug!("would block");
return;
},
Err(e) => {
panic!("{}", e);
},
};
};
}
fn get_handle(&self) -> &Self::Handle { &self.endpoint }
}

View File

@ -1,10 +1,8 @@
use crate::{
internal::RemoteParticipant,
worker::{
channel::Channel,
tcp::TcpChannel,
types::{CtrlMsg, Pid, RtrnMsg, Statistics, TokenObjects},
Controller,
Channel, Controller, TcpChannel,
},
};
use mio::{self, Poll, PollOpt, Ready, Token};
@ -49,7 +47,6 @@ pub(crate) struct Worker {
ctrl_rx: Receiver<CtrlMsg>,
rtrn_tx: Sender<RtrnMsg>,
mio_tokens: MioTokens,
buf: [u8; 65000],
time_before_poll: Instant,
time_after_poll: Instant,
}
@ -73,7 +70,6 @@ impl Worker {
ctrl_rx,
rtrn_tx,
mio_tokens,
buf: [0; 65000],
time_before_poll: Instant::now(),
time_after_poll: Instant::now(),
}
@ -118,9 +114,9 @@ impl Worker {
CtrlMsg::Shutdown => {
debug!("Shutting Down");
for (tok, obj) in self.mio_tokens.tokens.iter_mut() {
if let TokenObjects::TcpChannel(channel) = obj {
if let TokenObjects::TcpChannel(channel, _) = obj {
channel.shutdown();
channel.write(&mut self.buf, self.time_after_poll);
channel.tick_send();
}
}
return true;
@ -131,9 +127,17 @@ impl Worker {
TokenObjects::TcpListener(h) => {
self.poll.register(h, tok, interest, opts).unwrap()
},
TokenObjects::TcpChannel(channel) => self
TokenObjects::TcpChannel(channel, _) => self
.poll
.register(&channel.tcpstream, tok, interest, opts)
.register(channel.get_handle(), tok, interest, opts)
.unwrap(),
TokenObjects::UdpChannel(channel, _) => self
.poll
.register(channel.get_handle(), tok, interest, opts)
.unwrap(),
TokenObjects::MpscChannel(channel, _) => self
.poll
.register(channel.get_handle(), tok, interest, opts)
.unwrap(),
}
debug!(?handle, ?tok, "Registered new handle");
@ -145,9 +149,9 @@ impl Worker {
promises,
} => {
for (tok, obj) in self.mio_tokens.tokens.iter_mut() {
if let TokenObjects::TcpChannel(channel) = obj {
if let TokenObjects::TcpChannel(channel, _) = obj {
channel.open_stream(prio, promises); //TODO: check participant
channel.write(&mut self.buf, self.time_after_poll);
channel.tick_send();
}
}
//TODO:
@ -155,18 +159,18 @@ impl Worker {
CtrlMsg::CloseStream { pid, sid } => {
//TODO:
for to in self.mio_tokens.tokens.values_mut() {
if let TokenObjects::TcpChannel(channel) = to {
if let TokenObjects::TcpChannel(channel, _) = to {
channel.close_stream(sid); //TODO: check participant
channel.write(&mut self.buf, self.time_after_poll);
channel.tick_send();
}
}
},
CtrlMsg::Send(outgoing) => {
//TODO:
for to in self.mio_tokens.tokens.values_mut() {
if let TokenObjects::TcpChannel(channel) = to {
if let TokenObjects::TcpChannel(channel, _) = to {
channel.send(outgoing); //TODO: check participant
channel.write(&mut self.buf, self.time_after_poll);
channel.tick_send();
break;
}
}
@ -196,26 +200,51 @@ impl Worker {
)
.unwrap();
trace!(?remote_stream, ?tok, "registered");
let mut channel =
TcpChannel::new(remote_stream, self.pid, self.remotes.clone());
let tcp_channel = TcpChannel::new(remote_stream);
let mut channel = Channel::new(self.pid, tcp_channel, self.remotes.clone());
channel.handshake();
channel.tick_send();
self.mio_tokens
.tokens
.insert(tok, TokenObjects::TcpChannel(channel));
.insert(tok, TokenObjects::TcpChannel(channel, None));
},
Err(err) => {
error!(?err, "error during remote connected");
},
},
TokenObjects::TcpChannel(channel) => {
TokenObjects::TcpChannel(channel, _) => {
if event.readiness().is_readable() {
trace!(?channel.tcpstream, "stream readable");
channel.read(&mut self.buf, self.time_after_poll, &self.rtrn_tx);
let handle = channel.get_handle();
trace!(?handle, "stream readable");
channel.tick_recv(&self.rtrn_tx);
}
if event.readiness().is_writable() {
trace!(?channel.tcpstream, "stream writeable");
channel.write(&mut self.buf, self.time_after_poll);
let handle = channel.get_handle();
trace!(?handle, "stream writeable");
channel.tick_send();
}
},
TokenObjects::UdpChannel(channel, _) => {
if event.readiness().is_readable() {
let handle = channel.get_handle();
trace!(?handle, "stream readable");
channel.tick_recv(&self.rtrn_tx);
}
if event.readiness().is_writable() {
let handle = channel.get_handle();
trace!(?handle, "stream writeable");
channel.tick_send();
}
},
TokenObjects::MpscChannel(channel, _) => {
if event.readiness().is_readable() {
let handle = channel.get_handle();
channel.tick_recv(&self.rtrn_tx);
}
if event.readiness().is_writable() {
let handle = channel.get_handle();
channel.tick_send();
}
},
};