Got some async test to work

This commit is contained in:
Marcel Märtens 2020-03-04 11:59:19 +01:00
parent 74143e13d3
commit 641df53f4a
5 changed files with 33 additions and 26 deletions

View File

@ -18,6 +18,7 @@ tracing = "0.1"
byteorder = "1.3"
mio-extras = "2.0"
prometheus = "0.7"
futures = "0.3"
uuid = { version = "0.8", features = ["serde", "v4"] }
tlid = { path = "../../tlid", features = ["serde"]}

View File

@ -7,6 +7,7 @@ use crate::{
types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg, Sid, TokenObjects},
};
use enumset::*;
use futures::{future::poll_fn, stream::StreamExt};
use mio::{
self,
net::{TcpListener, TcpStream},
@ -16,7 +17,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
collections::HashMap,
sync::{
mpsc::{self, Receiver, TryRecvError},
mpsc::{self, TryRecvError},
Arc, RwLock,
},
};
@ -50,7 +51,7 @@ pub struct Connection {}
pub struct Stream {
sid: Sid,
msg_rx: Receiver<InCommingMessage>,
msg_rx: futures::channel::mpsc::UnboundedReceiver<InCommingMessage>,
ctr_tx: mio_extras::channel::Sender<CtrlMsg>,
}
@ -200,7 +201,7 @@ impl Participant {
promises: EnumSet<Promise>,
) -> Result<Stream, ParticipantError> {
let (ctrl_tx, ctrl_rx) = mpsc::channel::<Sid>();
let (msg_tx, msg_rx) = mpsc::channel::<InCommingMessage>();
let (msg_tx, msg_rx) = futures::channel::mpsc::unbounded::<InCommingMessage>();
for controller in self.network_controller.iter() {
let tx = controller.get_tx();
tx.send(CtrlMsg::OpenStream {
@ -280,13 +281,16 @@ impl Stream {
Ok(())
}
pub async fn recv<M: DeserializeOwned>(&self) -> Result<M, StreamError> {
match self.msg_rx.recv() {
Ok(msg) => {
pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError> {
match self.msg_rx.next().await {
Some(msg) => {
info!(?msg, "delivering a message");
Ok(message::deserialize(msg.buffer))
},
Err(err) => panic!("Unexpected error '{}'", err),
None => panic!(
"Unexpected error, probably stream was destroyed... maybe i dont know yet, no \
idea of async stuff"
),
}
}
}

View File

@ -10,6 +10,7 @@ use crate::{
udp::UdpChannel,
};
use enumset::EnumSet;
use futures::{executor::block_on, sink::SinkExt};
use mio_extras::channel::Sender;
use std::{
collections::{HashMap, VecDeque},
@ -263,7 +264,7 @@ impl Channel {
promises,
} => {
if let Some(pid) = self.remote_pid {
let (msg_tx, msg_rx) = mpsc::channel::<InCommingMessage>();
let (msg_tx, msg_rx) = futures::channel::mpsc::unbounded::<InCommingMessage>();
let stream = IntStream::new(sid, prio, promises.clone(), msg_tx);
self.streams.push(stream);
info!("opened a stream");
@ -328,14 +329,12 @@ impl Channel {
}
if let Some(pos) = pos {
let sid = s.sid();
let tx = s.msg_tx();
let mut tx = s.msg_tx();
for m in s.to_receive.drain(pos..pos + 1) {
info!(?sid, ? m.mid, "received message");
tx.send(m).map_err(|err| {
error!(
?err,
"Couldn't deliver message, as stream no longer exists!"
)
//TODO: I dislike that block_on here!
block_on(async {
tx.send(m).await;
});
}
}
@ -405,7 +404,7 @@ impl Channel {
&mut self,
prio: u8,
promises: EnumSet<Promise>,
msg_tx: mpsc::Sender<InCommingMessage>,
msg_tx: futures::channel::mpsc::UnboundedSender<InCommingMessage>,
) -> Sid {
// validate promises
if let Some(stream_id_pool) = &mut self.stream_id_pool {

View File

@ -4,6 +4,7 @@ use crate::{
message::{InCommingMessage, OutGoingMessage},
};
use enumset::EnumSet;
use futures;
use mio::{self, net::TcpListener, PollOpt, Ready};
use serde::{Deserialize, Serialize};
use std::{collections::VecDeque, sync::mpsc};
@ -32,7 +33,7 @@ pub(crate) enum CtrlMsg {
pid: Pid,
prio: u8,
promises: EnumSet<Promise>,
msg_tx: mpsc::Sender<InCommingMessage>,
msg_tx: futures::channel::mpsc::UnboundedSender<InCommingMessage>,
return_sid: mpsc::Sender<Sid>,
},
CloseStream {
@ -51,7 +52,7 @@ pub(crate) enum RtrnMsg {
pid: Pid,
sid: Sid,
prio: u8,
msg_rx: mpsc::Receiver<InCommingMessage>,
msg_rx: futures::channel::mpsc::UnboundedReceiver<InCommingMessage>,
promises: EnumSet<Promise>,
},
ClosedStream {
@ -71,7 +72,7 @@ pub(crate) struct IntStream {
sid: Sid,
prio: u8,
promises: EnumSet<Promise>,
msg_tx: mpsc::Sender<InCommingMessage>,
msg_tx: futures::channel::mpsc::UnboundedSender<InCommingMessage>,
pub to_send: VecDeque<OutGoingMessage>,
pub to_receive: VecDeque<InCommingMessage>,
}
@ -81,7 +82,7 @@ impl IntStream {
sid: Sid,
prio: u8,
promises: EnumSet<Promise>,
msg_tx: mpsc::Sender<InCommingMessage>,
msg_tx: futures::channel::mpsc::UnboundedSender<InCommingMessage>,
) -> Self {
IntStream {
sid,
@ -97,7 +98,9 @@ impl IntStream {
pub fn prio(&self) -> u8 { self.prio }
pub fn msg_tx(&self) -> mpsc::Sender<InCommingMessage> { self.msg_tx.clone() }
pub fn msg_tx(&self) -> futures::channel::mpsc::UnboundedSender<InCommingMessage> {
self.msg_tx.clone()
}
pub fn promises(&self) -> EnumSet<Promise> { self.promises }
}

View File

@ -86,8 +86,8 @@ fn server(port: u16) {
println!("waiting for client");
let p1 = block_on(server.connected()).unwrap(); //remote representation of p1
let s1 = block_on(p1.opened()).unwrap(); //remote representation of s1
let s2 = block_on(p1.opened()).unwrap(); //remote representation of s2
let mut s1 = block_on(p1.opened()).unwrap(); //remote representation of s1
let mut s2 = block_on(p1.opened()).unwrap(); //remote representation of s2
let t1 = thread::spawn(move || {
if let Ok(Msg::Ping(id)) = block_on(s1.recv()) {
thread::sleep(Duration::from_millis(3000));
@ -117,7 +117,7 @@ fn server(port: u16) {
thread::sleep(Duration::from_millis(50));
}
async fn async_task1(s: Stream) -> u64 {
async fn async_task1(mut s: Stream) -> u64 {
s.send(Msg::Ping(100));
println!("[{}], s1_1...", Utc::now().time());
let m1: Result<Msg, _> = s.recv().await;
@ -133,7 +133,7 @@ async fn async_task1(s: Stream) -> u64 {
}
}
async fn async_task2(s: Stream) -> u64 {
async fn async_task2(mut s: Stream) -> u64 {
s.send(Msg::Ping(200));
println!("[{}], s2_1...", Utc::now().time());
let m1: Result<Msg, _> = s.recv().await;
@ -161,8 +161,8 @@ fn client(port: u16) {
thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly!
let p1 = block_on(client.connect(&address)).unwrap(); //remote representation of p1
let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1
let s2 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s2
let mut s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s1
let mut s2 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap(); //remote representation of s2
let before = Instant::now();
block_on(async {
let f1 = async_task1(s1);