From 641df53f4a33c81dafd032bfa8a7b953a58ae375 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Wed, 4 Mar 2020 11:59:19 +0100 Subject: [PATCH] Got some async test to work --- network/Cargo.toml | 1 + network/src/api.rs | 18 +++++++++++------- network/src/channel.rs | 15 +++++++-------- network/src/types.rs | 13 ++++++++----- network/tools/async_recv/src/main.rs | 12 ++++++------ 5 files changed, 33 insertions(+), 26 deletions(-) diff --git a/network/Cargo.toml b/network/Cargo.toml index 65a111de1e..fa37ae7e70 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -18,6 +18,7 @@ tracing = "0.1" byteorder = "1.3" mio-extras = "2.0" prometheus = "0.7" +futures = "0.3" uuid = { version = "0.8", features = ["serde", "v4"] } tlid = { path = "../../tlid", features = ["serde"]} diff --git a/network/src/api.rs b/network/src/api.rs index 923c4ec1e9..d5b96420b3 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -7,6 +7,7 @@ use crate::{ types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg, Sid, TokenObjects}, }; use enumset::*; +use futures::{future::poll_fn, stream::StreamExt}; use mio::{ self, net::{TcpListener, TcpStream}, @@ -16,7 +17,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{ collections::HashMap, sync::{ - mpsc::{self, Receiver, TryRecvError}, + mpsc::{self, TryRecvError}, Arc, RwLock, }, }; @@ -50,7 +51,7 @@ pub struct Connection {} pub struct Stream { sid: Sid, - msg_rx: Receiver, + msg_rx: futures::channel::mpsc::UnboundedReceiver, ctr_tx: mio_extras::channel::Sender, } @@ -200,7 +201,7 @@ impl Participant { promises: EnumSet, ) -> Result { let (ctrl_tx, ctrl_rx) = mpsc::channel::(); - let (msg_tx, msg_rx) = mpsc::channel::(); + let (msg_tx, msg_rx) = futures::channel::mpsc::unbounded::(); for controller in self.network_controller.iter() { let tx = controller.get_tx(); tx.send(CtrlMsg::OpenStream { @@ -280,13 +281,16 @@ impl Stream { Ok(()) } - pub async fn recv(&self) -> Result { - match self.msg_rx.recv() { - Ok(msg) => { + pub async fn recv(&mut self) -> Result { + match self.msg_rx.next().await { + Some(msg) => { info!(?msg, "delivering a message"); Ok(message::deserialize(msg.buffer)) }, - Err(err) => panic!("Unexpected error '{}'", err), + None => panic!( + "Unexpected error, probably stream was destroyed... maybe i dont know yet, no \ + idea of async stuff" + ), } } } diff --git a/network/src/channel.rs b/network/src/channel.rs index 350215c962..0d6a9a0092 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -10,6 +10,7 @@ use crate::{ udp::UdpChannel, }; use enumset::EnumSet; +use futures::{executor::block_on, sink::SinkExt}; use mio_extras::channel::Sender; use std::{ collections::{HashMap, VecDeque}, @@ -263,7 +264,7 @@ impl Channel { promises, } => { if let Some(pid) = self.remote_pid { - let (msg_tx, msg_rx) = mpsc::channel::(); + let (msg_tx, msg_rx) = futures::channel::mpsc::unbounded::(); let stream = IntStream::new(sid, prio, promises.clone(), msg_tx); self.streams.push(stream); info!("opened a stream"); @@ -328,14 +329,12 @@ impl Channel { } if let Some(pos) = pos { let sid = s.sid(); - let tx = s.msg_tx(); + let mut tx = s.msg_tx(); for m in s.to_receive.drain(pos..pos + 1) { info!(?sid, ? m.mid, "received message"); - tx.send(m).map_err(|err| { - error!( - ?err, - "Couldn't deliver message, as stream no longer exists!" - ) + //TODO: I dislike that block_on here! + block_on(async { + tx.send(m).await; }); } } @@ -405,7 +404,7 @@ impl Channel { &mut self, prio: u8, promises: EnumSet, - msg_tx: mpsc::Sender, + msg_tx: futures::channel::mpsc::UnboundedSender, ) -> Sid { // validate promises if let Some(stream_id_pool) = &mut self.stream_id_pool { diff --git a/network/src/types.rs b/network/src/types.rs index ab41ded729..b45c26afc7 100644 --- a/network/src/types.rs +++ b/network/src/types.rs @@ -4,6 +4,7 @@ use crate::{ message::{InCommingMessage, OutGoingMessage}, }; use enumset::EnumSet; +use futures; use mio::{self, net::TcpListener, PollOpt, Ready}; use serde::{Deserialize, Serialize}; use std::{collections::VecDeque, sync::mpsc}; @@ -32,7 +33,7 @@ pub(crate) enum CtrlMsg { pid: Pid, prio: u8, promises: EnumSet, - msg_tx: mpsc::Sender, + msg_tx: futures::channel::mpsc::UnboundedSender, return_sid: mpsc::Sender, }, CloseStream { @@ -51,7 +52,7 @@ pub(crate) enum RtrnMsg { pid: Pid, sid: Sid, prio: u8, - msg_rx: mpsc::Receiver, + msg_rx: futures::channel::mpsc::UnboundedReceiver, promises: EnumSet, }, ClosedStream { @@ -71,7 +72,7 @@ pub(crate) struct IntStream { sid: Sid, prio: u8, promises: EnumSet, - msg_tx: mpsc::Sender, + msg_tx: futures::channel::mpsc::UnboundedSender, pub to_send: VecDeque, pub to_receive: VecDeque, } @@ -81,7 +82,7 @@ impl IntStream { sid: Sid, prio: u8, promises: EnumSet, - msg_tx: mpsc::Sender, + msg_tx: futures::channel::mpsc::UnboundedSender, ) -> Self { IntStream { sid, @@ -97,7 +98,9 @@ impl IntStream { pub fn prio(&self) -> u8 { self.prio } - pub fn msg_tx(&self) -> mpsc::Sender { self.msg_tx.clone() } + pub fn msg_tx(&self) -> futures::channel::mpsc::UnboundedSender { + self.msg_tx.clone() + } pub fn promises(&self) -> EnumSet { self.promises } } diff --git a/network/tools/async_recv/src/main.rs b/network/tools/async_recv/src/main.rs index f7e3866381..4a7ebb20e8 100644 --- a/network/tools/async_recv/src/main.rs +++ b/network/tools/async_recv/src/main.rs @@ -86,8 +86,8 @@ fn server(port: u16) { println!("waiting for client"); let p1 = block_on(server.connected()).unwrap(); //remote representation of p1 - let s1 = block_on(p1.opened()).unwrap(); //remote representation of s1 - let s2 = block_on(p1.opened()).unwrap(); //remote representation of s2 + let mut s1 = block_on(p1.opened()).unwrap(); //remote representation of s1 + let mut s2 = block_on(p1.opened()).unwrap(); //remote representation of s2 let t1 = thread::spawn(move || { if let Ok(Msg::Ping(id)) = block_on(s1.recv()) { thread::sleep(Duration::from_millis(3000)); @@ -117,7 +117,7 @@ fn server(port: u16) { thread::sleep(Duration::from_millis(50)); } -async fn async_task1(s: Stream) -> u64 { +async fn async_task1(mut s: Stream) -> u64 { s.send(Msg::Ping(100)); println!("[{}], s1_1...", Utc::now().time()); let m1: Result = s.recv().await; @@ -133,7 +133,7 @@ async fn async_task1(s: Stream) -> u64 { } } -async fn async_task2(s: Stream) -> u64 { +async fn async_task2(mut s: Stream) -> u64 { s.send(Msg::Ping(200)); println!("[{}], s2_1...", Utc::now().time()); let m1: Result = s.recv().await; @@ -161,8 +161,8 @@ fn client(port: u16) { thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly! let p1 = block_on(client.connect(&address)).unwrap(); //remote representation of p1 - let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1 - let s2 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s2 + let mut s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1 + let mut s2 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s2 let before = Instant::now(); block_on(async { let f1 = async_task1(s1);