Code/Dependency Cleanup

This commit is contained in:
Marcel Märtens 2020-03-04 16:52:30 +01:00
parent 641df53f4a
commit 9354952a7f
14 changed files with 139 additions and 134 deletions

1
Cargo.lock generated
View File

@ -5210,7 +5210,6 @@ dependencies = [
"mio-extras", "mio-extras",
"prometheus", "prometheus",
"serde", "serde",
"serde_derive",
"tlid", "tlid",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",

View File

@ -8,20 +8,23 @@ edition = "2018"
[dependencies] [dependencies]
uvth = "3.1"
enumset = { version = "0.4", features = ["serde"] } enumset = { version = "0.4", features = ["serde"] }
bincode = "1.2"
serde = "1.0"
serde_derive = "1.0"
mio = "0.6"
tracing = "0.1"
byteorder = "1.3"
mio-extras = "2.0"
prometheus = "0.7"
futures = "0.3"
uuid = { version = "0.8", features = ["serde", "v4"] } uuid = { version = "0.8", features = ["serde", "v4"] }
tlid = { path = "../../tlid", features = ["serde"]} tlid = { path = "../../tlid", features = ["serde"]}
#threadpool
uvth = "3.1"
#serialisation
bincode = "1.2"
serde = "1.0"
byteorder = "1.3"
#sending
mio = "0.6"
mio-extras = "2.0"
#tracing and metrics
tracing = "0.1"
prometheus = "0.7"
#async
futures = "0.3"
[dev-dependencies] [dev-dependencies]
futures = "0.3"
tracing-subscriber = "0.2.0-alpha.4" tracing-subscriber = "0.2.0-alpha.4"

View File

@ -7,7 +7,7 @@ use crate::{
types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg, Sid, TokenObjects}, types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg, Sid, TokenObjects},
}; };
use enumset::*; use enumset::*;
use futures::{future::poll_fn, stream::StreamExt}; use futures::stream::StreamExt;
use mio::{ use mio::{
self, self,
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
@ -16,10 +16,7 @@ use mio::{
use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::{ sync::{mpsc, Arc, RwLock},
mpsc::{self, TryRecvError},
Arc, RwLock,
},
}; };
use tlid; use tlid;
use tracing::*; use tracing::*;
@ -47,8 +44,6 @@ pub struct Participant {
network_controller: Arc<Vec<Controller>>, network_controller: Arc<Vec<Controller>>,
} }
pub struct Connection {}
pub struct Stream { pub struct Stream {
sid: Sid, sid: Sid,
msg_rx: futures::channel::mpsc::UnboundedReceiver<InCommingMessage>, msg_rx: futures::channel::mpsc::UnboundedReceiver<InCommingMessage>,
@ -121,7 +116,7 @@ impl Network {
let worker = Self::get_lowest_worker(&self.controller); let worker = Self::get_lowest_worker(&self.controller);
let pid = self.participant_id; let pid = self.participant_id;
let remotes = self.remotes.clone(); let remotes = self.remotes.clone();
let mut span = span!(Level::INFO, "connect", ?address); let span = span!(Level::INFO, "connect", ?address);
let _enter = span.enter(); let _enter = span.enter();
match address { match address {
Address::Tcp(a) => { Address::Tcp(a) => {
@ -129,7 +124,7 @@ impl Network {
let tcp_stream = TcpStream::connect(&a)?; let tcp_stream = TcpStream::connect(&a)?;
let tcp_channel = TcpChannel::new(tcp_stream); let tcp_channel = TcpChannel::new(tcp_stream);
let (ctrl_tx, ctrl_rx) = mpsc::channel::<Pid>(); let (ctrl_tx, ctrl_rx) = mpsc::channel::<Pid>();
let mut channel = Channel::new( let channel = Channel::new(
pid, pid,
ChannelProtocols::Tcp(tcp_channel), ChannelProtocols::Tcp(tcp_channel),
remotes, remotes,
@ -190,7 +185,23 @@ impl Network {
streams: Vec<Stream>, streams: Vec<Stream>,
msg: M, msg: M,
) -> Result<(), NetworkError> { ) -> Result<(), NetworkError> {
panic!("sda"); let messagebuffer = Arc::new(message::serialize(&msg));
//TODO: why do we need a look here, i want my own local directory which is
// updated by workes via a channel and needs to be intepreted on a send but it
// should almost ever be empty except for new channel creations and stream
// creations!
for stream in streams {
stream
.ctr_tx
.send(CtrlMsg::Send(OutGoingMessage {
buffer: messagebuffer.clone(),
cursor: 0,
mid: None,
sid: stream.sid,
}))
.unwrap();
}
Ok(())
} }
} }
@ -227,7 +238,18 @@ impl Participant {
Err(ParticipantError::ParticipantDisconected) Err(ParticipantError::ParticipantDisconected)
} }
pub fn close(&self, stream: Stream) -> Result<(), ParticipantError> { Ok(()) } pub fn close(&self, stream: Stream) -> Result<(), ParticipantError> {
for controller in self.network_controller.iter() {
let tx = controller.get_tx();
tx.send(CtrlMsg::CloseStream {
pid: self.remote_pid,
sid: stream.sid,
})
.unwrap();
return Ok(());
}
Err(ParticipantError::ParticipantDisconected)
}
pub async fn opened(&self) -> Result<Stream, ParticipantError> { pub async fn opened(&self) -> Result<Stream, ParticipantError> {
loop { loop {
@ -255,7 +277,7 @@ impl Participant {
} }
pub async fn _closed(&self) -> Result<Stream, ParticipantError> { pub async fn _closed(&self) -> Result<Stream, ParticipantError> {
panic!("sda"); panic!("aaa");
} }
} }
@ -272,7 +294,7 @@ impl Stream {
// creations! // creations!
self.ctr_tx self.ctr_tx
.send(CtrlMsg::Send(OutGoingMessage { .send(CtrlMsg::Send(OutGoingMessage {
buffer: messagebuffer.clone(), buffer: messagebuffer,
cursor: 0, cursor: 0,
mid: None, mid: None,
sid: self.sid, sid: self.sid,
@ -317,5 +339,5 @@ impl From<std::io::Error> for NetworkError {
} }
impl<T> From<mio_extras::channel::SendError<T>> for NetworkError { impl<T> From<mio_extras::channel::SendError<T>> for NetworkError {
fn from(err: mio_extras::channel::SendError<T>) -> Self { NetworkError::WorkerDestroyed } fn from(_err: mio_extras::channel::SendError<T>) -> Self { NetworkError::WorkerDestroyed }
} }

View File

@ -14,7 +14,7 @@ use futures::{executor::block_on, sink::SinkExt};
use mio_extras::channel::Sender; use mio_extras::channel::Sender;
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
sync::{mpsc, Arc, RwLock}, sync::{Arc, RwLock},
}; };
use tracing::*; use tracing::*;
@ -220,7 +220,13 @@ impl Channel {
}); });
self.send_config = true; self.send_config = true;
info!(?pid, "this channel is now configured!"); info!(?pid, "this channel is now configured!");
rtrn_tx.send(RtrnMsg::ConnectedParticipant { pid }); if let Err(err) = rtrn_tx.send(RtrnMsg::ConnectedParticipant { pid }) {
error!(
?err,
"couldn't notify of connected participant, is network already \
closed ?"
);
}
} }
} else { } else {
self.send_queue.push_back(Frame::ParticipantId { self.send_queue.push_back(Frame::ParticipantId {
@ -240,12 +246,19 @@ impl Channel {
if !remotes.contains_key(&pid) { if !remotes.contains_key(&pid) {
remotes.insert(pid, RemoteParticipant::new()); remotes.insert(pid, RemoteParticipant::new());
} }
if let Some(rp) = remotes.get_mut(&pid) { if let Some(_rp) = remotes.get_mut(&pid) {
//TODO: make use of RemoteParticipant
self.stream_id_pool = Some(stream_id_pool); self.stream_id_pool = Some(stream_id_pool);
self.msg_id_pool = Some(msg_id_pool); self.msg_id_pool = Some(msg_id_pool);
} }
if let Some(send) = &self.return_pid_to { if let Some(send) = &self.return_pid_to {
send.send(pid); if let Err(err) = send.send(pid) {
error!(
?err,
"couldn't notify of connected participant, is network already \
closed ?"
);
}
}; };
self.return_pid_to = None; self.return_pid_to = None;
} else { } else {
@ -256,7 +269,9 @@ impl Channel {
Frame::Shutdown {} => { Frame::Shutdown {} => {
self.recv_shutdown = true; self.recv_shutdown = true;
info!("shutting down channel"); info!("shutting down channel");
rtrn_tx.send(RtrnMsg::Shutdown); if let Err(err) = rtrn_tx.send(RtrnMsg::Shutdown) {
error!(?err, "couldn't notify of shutdown");
}
}, },
Frame::OpenStream { Frame::OpenStream {
sid, sid,
@ -268,13 +283,15 @@ impl Channel {
let stream = IntStream::new(sid, prio, promises.clone(), msg_tx); let stream = IntStream::new(sid, prio, promises.clone(), msg_tx);
self.streams.push(stream); self.streams.push(stream);
info!("opened a stream"); info!("opened a stream");
rtrn_tx.send(RtrnMsg::OpendStream { if let Err(err) = rtrn_tx.send(RtrnMsg::OpendStream {
pid, pid,
sid, sid,
prio, prio,
msg_rx, msg_rx,
promises, promises,
}); }) {
error!(?err, "couldn't notify of opened stream");
}
} else { } else {
error!("called OpenStream before PartcipantID!"); error!("called OpenStream before PartcipantID!");
} }
@ -283,7 +300,9 @@ impl Channel {
if let Some(pid) = self.remote_pid { if let Some(pid) = self.remote_pid {
self.streams.retain(|stream| stream.sid() != sid); self.streams.retain(|stream| stream.sid() != sid);
info!("closed a stream"); info!("closed a stream");
rtrn_tx.send(RtrnMsg::ClosedStream { pid, sid }); if let Err(err) = rtrn_tx.send(RtrnMsg::ClosedStream { pid, sid }) {
error!(?err, "couldn't notify of closed stream");
}
} }
}, },
Frame::DataHeader { mid, sid, length } => { Frame::DataHeader { mid, sid, length } => {
@ -309,7 +328,7 @@ impl Channel {
}, },
Frame::Data { Frame::Data {
id, id,
start, start: _, //TODO: use start to verify!
mut data, mut data,
} => { } => {
debug!("Data Package {}, len: {}", id, data.len()); debug!("Data Package {}, len: {}", id, data.len());
@ -334,7 +353,13 @@ impl Channel {
info!(?sid, ? m.mid, "received message"); info!(?sid, ? m.mid, "received message");
//TODO: I dislike that block_on here! //TODO: I dislike that block_on here!
block_on(async { block_on(async {
tx.send(m).await; if let Err(err) = tx.send(m).await {
error!(
?err,
"cannot notify that message was received, probably stream \
is already closed"
);
};
}); });
} }
} }

View File

@ -25,7 +25,6 @@ use uvth::ThreadPool;
It is monitored, and when it's thread is fully loaded it can be splitted up into 2 MioWorkers It is monitored, and when it's thread is fully loaded it can be splitted up into 2 MioWorkers
*/ */
pub struct Controller { pub struct Controller {
poll: Arc<Poll>,
ctrl_tx: Sender<CtrlMsg>, ctrl_tx: Sender<CtrlMsg>,
rtrn_rx: Receiver<RtrnMsg>, rtrn_rx: Receiver<RtrnMsg>,
} }
@ -42,7 +41,6 @@ impl Controller {
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>, remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
) -> Self { ) -> Self {
let poll = Arc::new(Poll::new().unwrap()); let poll = Arc::new(Poll::new().unwrap());
let poll_clone = poll.clone();
let (ctrl_tx, ctrl_rx) = channel(); let (ctrl_tx, ctrl_rx) = channel();
let (rtrn_tx, rtrn_rx) = channel(); let (rtrn_tx, rtrn_rx) = channel();
@ -57,16 +55,10 @@ impl Controller {
let w = wid; let w = wid;
let span = span!(Level::INFO, "worker", ?w); let span = span!(Level::INFO, "worker", ?w);
let _enter = span.enter(); let _enter = span.enter();
let mut worker = Worker::new( let mut worker = Worker::new(pid, poll, metrics, remotes, token_pool, ctrl_rx, rtrn_tx);
pid, poll_clone, metrics, remotes, token_pool, ctrl_rx, rtrn_tx,
);
worker.run(); worker.run();
}); });
Controller { Controller { ctrl_tx, rtrn_rx }
poll,
ctrl_tx,
rtrn_rx,
}
} }
//TODO: split 4->5 MioWorkers and merge 5->4 MioWorkers //TODO: split 4->5 MioWorkers and merge 5->4 MioWorkers

View File

@ -48,21 +48,6 @@ pub mod tests {
.init(); .init();
} }
pub fn block_on_recv(stream: &Stream) -> Result<String, StreamError> {
let mut s: Result<Option<String>, StreamError> = stream.recv();
while let Ok(None) = s {
thread::sleep(Duration::from_millis(1));
s = stream.recv();
}
if let Ok(Some(s)) = s {
return Ok(s);
}
if let Err(e) = s {
return Err(e);
}
unreachable!("invalid test");
}
#[test] #[test]
fn aaa() { test_tracing(); } fn aaa() { test_tracing(); }
@ -88,9 +73,9 @@ pub mod tests {
assert!(s1.send("Hello World").is_ok()); assert!(s1.send("Hello World").is_ok());
let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1 let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1
let s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1 let mut s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1
let s = block_on_recv(&s1_n2); let s: Result<String, _> = block_on(s1_n2.recv());
assert_eq!(s, Ok("Hello World".to_string())); assert_eq!(s, Ok("Hello World".to_string()));
assert!(p1.close(s1).is_ok()); assert!(p1.close(s1).is_ok());
@ -128,23 +113,23 @@ pub mod tests {
assert!(s4.send("Hello World4").is_ok()); assert!(s4.send("Hello World4").is_ok());
let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1 let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1
let s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1 let mut s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1
let s2_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s2 let mut s2_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s2
let s3_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s3 let mut s3_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s3
let s4_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s4 let mut s4_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s4
let s5_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s5 let mut s5_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s5
info!("all streams opened"); info!("all streams opened");
let s = block_on_recv(&s3_n2); let s: Result<String, _> = block_on(s3_n2.recv());
assert_eq!(s, Ok("Hello World3".to_string())); assert_eq!(s, Ok("Hello World3".to_string()));
let s = block_on_recv(&s1_n2); let s: Result<String, _> = block_on(s1_n2.recv());
assert_eq!(s, Ok("Hello World1".to_string())); assert_eq!(s, Ok("Hello World1".to_string()));
let s = block_on_recv(&s2_n2); let s: Result<String, _> = block_on(s2_n2.recv());
assert_eq!(s, Ok("Hello World2".to_string())); assert_eq!(s, Ok("Hello World2".to_string()));
let s = block_on_recv(&s5_n2); let s: Result<String, _> = block_on(s5_n2.recv());
assert_eq!(s, Ok("Hello World5".to_string())); assert_eq!(s, Ok("Hello World5".to_string()));
let s = block_on_recv(&s4_n2); let s: Result<String, _> = block_on(s4_n2.recv());
assert_eq!(s, Ok("Hello World4".to_string())); assert_eq!(s, Ok("Hello World4".to_string()));
assert!(p1.close(s1).is_ok()); assert!(p1.close(s1).is_ok());

View File

@ -140,5 +140,5 @@ impl NetworkMetrics {
}) })
} }
pub fn is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 } pub fn _is_100th_tick(&self) -> bool { self.tick.load(Ordering::Relaxed).rem_euclid(100) == 0 }
} }

View File

@ -36,7 +36,7 @@ impl ChannelProtocol for MpscChannel {
fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I) { fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I) {
for frame in frames { for frame in frames {
match self.endpoint_sender.send(frame) { match self.endpoint_sender.send(frame) {
Ok(n) => { Ok(()) => {
trace!("sended"); trace!("sended");
}, },
Err(mio_extras::channel::SendError::Io(e)) Err(mio_extras::channel::SendError::Io(e))

View File

@ -1,10 +1,7 @@
use crate::{channel::ChannelProtocol, types::Frame}; use crate::{channel::ChannelProtocol, types::Frame};
use bincode; use bincode;
use mio::net::TcpStream; use mio::net::TcpStream;
use std::{ use std::io::{Read, Write};
io::{Read, Write},
ops::Range,
};
use tracing::*; use tracing::*;
pub(crate) struct TcpChannel { pub(crate) struct TcpChannel {
@ -12,7 +9,6 @@ pub(crate) struct TcpChannel {
//these buffers only ever contain 1 FRAME ! //these buffers only ever contain 1 FRAME !
read_buffer: NetworkBuffer, read_buffer: NetworkBuffer,
write_buffer: NetworkBuffer, write_buffer: NetworkBuffer,
need_to_send_till: usize,
} }
struct NetworkBuffer { struct NetworkBuffer {
@ -27,7 +23,6 @@ impl TcpChannel {
endpoint, endpoint,
read_buffer: NetworkBuffer::new(), read_buffer: NetworkBuffer::new(),
write_buffer: NetworkBuffer::new(), write_buffer: NetworkBuffer::new(),
need_to_send_till: 0,
} }
} }
} }
@ -98,15 +93,6 @@ impl NetworkBuffer {
} }
} }
fn move_in_vec(vec: &mut Vec<u8>, src: Range<usize>, dest: Range<usize>) {
debug_assert_eq!(src.end - src.start, dest.end - dest.start);
let mut i2 = dest.start;
for i in src {
vec[i2] = vec[i];
i2 += 1;
}
}
impl ChannelProtocol for TcpChannel { impl ChannelProtocol for TcpChannel {
type Handle = TcpStream; type Handle = TcpStream;
@ -165,10 +151,11 @@ impl ChannelProtocol for TcpChannel {
if self.write_buffer.get_read_slice().len() < 1500 { if self.write_buffer.get_read_slice().len() < 1500 {
match frames.next() { match frames.next() {
Some(frame) => { Some(frame) => {
if let Ok(mut size) = bincode::serialized_size(&frame) { if let Ok(size) = bincode::serialized_size(&frame) {
let slice = self.write_buffer.get_write_slice(size as usize); let slice = self.write_buffer.get_write_slice(size as usize);
if let Err(e) = bincode::serialize_into(slice, &frame) { if let Err(err) = bincode::serialize_into(slice, &frame) {
error!( error!(
?err,
"serialising frame was unsuccessful, this should never \ "serialising frame was unsuccessful, this should never \
happen! dropping frame!" happen! dropping frame!"
) )

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
api::{Address, Promise}, api::Promise,
channel::Channel, channel::Channel,
message::{InCommingMessage, OutGoingMessage}, message::{InCommingMessage, OutGoingMessage},
}; };
@ -144,20 +144,6 @@ pub(crate) enum Frame {
Raw(Vec<u8>), Raw(Vec<u8>),
} }
pub(crate) enum Protocol {
Tcp,
Udp,
}
impl Address {
pub(crate) fn get_protocol(&self) -> Protocol {
match self {
Address::Tcp(_) => Protocol::Tcp,
Address::Udp(_) => Protocol::Udp,
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct RemoteParticipant { pub struct RemoteParticipant {
pub stream_id_pool: tlid::Pool<tlid::Wrapping<Sid>>, pub stream_id_pool: tlid::Pool<tlid::Wrapping<Sid>>,

View File

@ -6,15 +6,15 @@ use tracing::*;
pub(crate) struct UdpChannel { pub(crate) struct UdpChannel {
endpoint: UdpSocket, endpoint: UdpSocket,
read_buffer: Vec<u8>, read_buffer: Vec<u8>,
write_buffer: Vec<u8>, _write_buffer: Vec<u8>,
} }
impl UdpChannel { impl UdpChannel {
pub fn new(endpoint: UdpSocket) -> Self { pub fn _new(endpoint: UdpSocket) -> Self {
Self { Self {
endpoint, endpoint,
read_buffer: Vec::new(), read_buffer: Vec::new(),
write_buffer: Vec::new(), _write_buffer: Vec::new(),
} }
} }
} }
@ -26,7 +26,7 @@ impl ChannelProtocol for UdpChannel {
fn read(&mut self) -> Vec<Frame> { fn read(&mut self) -> Vec<Frame> {
let mut result = Vec::new(); let mut result = Vec::new();
match self.endpoint.recv_from(self.read_buffer.as_mut_slice()) { match self.endpoint.recv_from(self.read_buffer.as_mut_slice()) {
Ok((n, remote)) => { Ok((n, _)) => {
trace!("incomming message with len: {}", n); trace!("incomming message with len: {}", n);
let mut cur = std::io::Cursor::new(&self.read_buffer[..n]); let mut cur = std::io::Cursor::new(&self.read_buffer[..n]);
while cur.position() < n as u64 { while cur.position() < n as u64 {
@ -59,13 +59,13 @@ impl ChannelProtocol for UdpChannel {
/// Execute when ready to write /// Execute when ready to write
fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I) { fn write<I: std::iter::Iterator<Item = Frame>>(&mut self, frames: &mut I) {
for frame in frames { for frame in frames {
if let Ok(mut data) = bincode::serialize(&frame) { if let Ok(data) = bincode::serialize(&frame) {
let total = data.len(); let total = data.len();
match self.endpoint.send(&data) { match self.endpoint.send(&data) {
Ok(n) if n == total => { Ok(n) if n == total => {
trace!("send {} bytes", n); trace!("send {} bytes", n);
}, },
Ok(n) => { Ok(_) => {
error!("could only send part"); error!("could only send part");
}, },
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {

View File

@ -114,7 +114,7 @@ impl Worker {
match msg { match msg {
CtrlMsg::Shutdown => { CtrlMsg::Shutdown => {
debug!("Shutting Down"); debug!("Shutting Down");
for (tok, obj) in self.mio_tokens.tokens.iter_mut() { for (_, obj) in self.mio_tokens.tokens.iter_mut() {
if let TokenObjects::Channel(channel) = obj { if let TokenObjects::Channel(channel) = obj {
channel.shutdown(); channel.shutdown();
channel.tick_send(); channel.tick_send();
@ -154,11 +154,17 @@ impl Worker {
return_sid, return_sid,
} => { } => {
let mut handled = false; let mut handled = false;
for (tok, obj) in self.mio_tokens.tokens.iter_mut() { for (_, obj) in self.mio_tokens.tokens.iter_mut() {
if let TokenObjects::Channel(channel) = obj { if let TokenObjects::Channel(channel) = obj {
if Some(pid) == channel.remote_pid { if Some(pid) == channel.remote_pid {
let sid = channel.open_stream(prio, promises, msg_tx); let sid = channel.open_stream(prio, promises, msg_tx);
return_sid.send(sid); if let Err(err) = return_sid.send(sid) {
error!(
?err,
"cannot send that a stream opened, probably channel was \
already closed!"
);
};
channel.tick_send(); channel.tick_send();
handled = true; handled = true;
break; break;

View File

@ -1,7 +1,7 @@
use chrono::prelude::*; use chrono::prelude::*;
use clap::{App, Arg, SubCommand}; use clap::{App, Arg, SubCommand};
use futures::executor::block_on; use futures::executor::block_on;
use network::{Address, Network, Participant, Promise, Stream}; use network::{Address, Network, Promise, Stream};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
net::SocketAddr, net::SocketAddr,
@ -91,39 +91,39 @@ fn server(port: u16) {
let t1 = thread::spawn(move || { let t1 = thread::spawn(move || {
if let Ok(Msg::Ping(id)) = block_on(s1.recv()) { if let Ok(Msg::Ping(id)) = block_on(s1.recv()) {
thread::sleep(Duration::from_millis(3000)); thread::sleep(Duration::from_millis(3000));
s1.send(Msg::Pong(id)); s1.send(Msg::Pong(id)).unwrap();
println!("[{}], send s1_1", Utc::now().time()); println!("[{}], send s1_1", Utc::now().time());
} }
if let Ok(Msg::Ping(id)) = block_on(s1.recv()) { if let Ok(Msg::Ping(id)) = block_on(s1.recv()) {
thread::sleep(Duration::from_millis(3000)); thread::sleep(Duration::from_millis(3000));
s1.send(Msg::Pong(id)); s1.send(Msg::Pong(id)).unwrap();
println!("[{}], send s1_2", Utc::now().time()); println!("[{}], send s1_2", Utc::now().time());
} }
}); });
let t2 = thread::spawn(move || { let t2 = thread::spawn(move || {
if let Ok(Msg::Ping(id)) = block_on(s2.recv()) { if let Ok(Msg::Ping(id)) = block_on(s2.recv()) {
thread::sleep(Duration::from_millis(1000)); thread::sleep(Duration::from_millis(1000));
s2.send(Msg::Pong(id)); s2.send(Msg::Pong(id)).unwrap();
println!("[{}], send s2_1", Utc::now().time()); println!("[{}], send s2_1", Utc::now().time());
} }
if let Ok(Msg::Ping(id)) = block_on(s2.recv()) { if let Ok(Msg::Ping(id)) = block_on(s2.recv()) {
thread::sleep(Duration::from_millis(1000)); thread::sleep(Duration::from_millis(1000));
s2.send(Msg::Pong(id)); s2.send(Msg::Pong(id)).unwrap();
println!("[{}], send s2_2", Utc::now().time()); println!("[{}], send s2_2", Utc::now().time());
} }
}); });
t1.join(); t1.join().unwrap();
t2.join(); t2.join().unwrap();
thread::sleep(Duration::from_millis(50)); thread::sleep(Duration::from_millis(50));
} }
async fn async_task1(mut s: Stream) -> u64 { async fn async_task1(mut s: Stream) -> u64 {
s.send(Msg::Ping(100)); s.send(Msg::Ping(100)).unwrap();
println!("[{}], s1_1...", Utc::now().time()); println!("[{}], s1_1...", Utc::now().time());
let m1: Result<Msg, _> = s.recv().await; let m1: Result<Msg, _> = s.recv().await;
println!("[{}], s1_1: {:?}", Utc::now().time(), m1); println!("[{}], s1_1: {:?}", Utc::now().time(), m1);
thread::sleep(Duration::from_millis(1000)); thread::sleep(Duration::from_millis(1000));
s.send(Msg::Ping(101)); s.send(Msg::Ping(101)).unwrap();
println!("[{}], s1_2...", Utc::now().time()); println!("[{}], s1_2...", Utc::now().time());
let m2: Result<Msg, _> = s.recv().await; let m2: Result<Msg, _> = s.recv().await;
println!("[{}], s1_2: {:?}", Utc::now().time(), m2); println!("[{}], s1_2: {:?}", Utc::now().time(), m2);
@ -134,12 +134,12 @@ async fn async_task1(mut s: Stream) -> u64 {
} }
async fn async_task2(mut s: Stream) -> u64 { async fn async_task2(mut s: Stream) -> u64 {
s.send(Msg::Ping(200)); s.send(Msg::Ping(200)).unwrap();
println!("[{}], s2_1...", Utc::now().time()); println!("[{}], s2_1...", Utc::now().time());
let m1: Result<Msg, _> = s.recv().await; let m1: Result<Msg, _> = s.recv().await;
println!("[{}], s2_1: {:?}", Utc::now().time(), m1); println!("[{}], s2_1: {:?}", Utc::now().time(), m1);
thread::sleep(Duration::from_millis(5000)); thread::sleep(Duration::from_millis(5000));
s.send(Msg::Ping(201)); s.send(Msg::Ping(201)).unwrap();
println!("[{}], s2_2...", Utc::now().time()); println!("[{}], s2_2...", Utc::now().time());
let m2: Result<Msg, _> = s.recv().await; let m2: Result<Msg, _> = s.recv().await;
println!("[{}], s2_2: {:?}", Utc::now().time(), m2); println!("[{}], s2_2: {:?}", Utc::now().time(), m2);
@ -161,13 +161,13 @@ fn client(port: u16) {
thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly!
let p1 = block_on(client.connect(&address)).unwrap(); //remote representation of p1 let p1 = block_on(client.connect(&address)).unwrap(); //remote representation of p1
let mut s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1 let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1
let mut s2 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s2 let s2 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s2
let before = Instant::now(); let before = Instant::now();
block_on(async { block_on(async {
let f1 = async_task1(s1); let f1 = async_task1(s1);
let f2 = async_task2(s2); let f2 = async_task2(s2);
let x = futures::join!(f1, f2); let _ = futures::join!(f1, f2);
}); });
if before.elapsed() < Duration::from_secs(13) { if before.elapsed() < Duration::from_secs(13) {
println!("IT WORKS!"); println!("IT WORKS!");

View File

@ -93,9 +93,9 @@ fn server(port: u16) {
loop { loop {
let p1 = block_on(server.connected()).unwrap(); //remote representation of p1 let p1 = block_on(server.connected()).unwrap(); //remote representation of p1
let s1 = block_on(p1.opened()).unwrap(); //remote representation of s1 let mut s1 = block_on(p1.opened()).unwrap(); //remote representation of s1
loop { loop {
let m: Result<Option<Msg>, _> = s1.recv(); let m: Result<Option<Msg>, _> = block_on(s1.recv());
match m { match m {
Ok(Some(Msg::Ping { id, data })) => { Ok(Some(Msg::Ping { id, data })) => {
//s1.send(Msg::Pong {id, data}); //s1.send(Msg::Pong {id, data});
@ -120,7 +120,7 @@ fn client(port: u16) {
loop { loop {
let p1 = block_on(client.connect(&address)).unwrap(); //remote representation of p1 let p1 = block_on(client.connect(&address)).unwrap(); //remote representation of p1
let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1 let mut s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1
let mut last = Instant::now(); let mut last = Instant::now();
let mut id = 0u64; let mut id = 0u64;
loop { loop {
@ -135,7 +135,7 @@ fn client(port: u16) {
last = new; last = new;
println!("1.000.000 took {}", diff.as_millis()); println!("1.000.000 took {}", diff.as_millis());
} }
let _: Result<Option<Msg>, _> = s1.recv(); //let _: Result<Option<Msg>, _> = block_on(s1.recv());
} }
} }
} }