mirror of
https://gitlab.com/veloren/veloren.git
synced 2024-08-30 18:12:32 +00:00
Cleanup:
- We can now get rid of most sleep and get true remote part and stream working, however there seems to be a deadlock after registered new handle trace with 10% spawn chance - removal of the events trait, as we use channels - streams now directly communicate with each other for performance reasons, somewhere are still deadlocks, oonce directly at listening somehow and after the first message has read, but i also got it to run perfectly through at this state without code change, maybe a sleep or more detailed rust-dgb session would help here!
This commit is contained in:
parent
10863eed14
commit
35233d07f9
@ -1,7 +1,7 @@
|
||||
use crate::{
|
||||
channel::{Channel, ChannelProtocols},
|
||||
controller::Controller,
|
||||
message::{self, OutGoingMessage},
|
||||
message::{self, InCommingMessage, OutGoingMessage},
|
||||
metrics::NetworkMetrics,
|
||||
tcp::TcpChannel,
|
||||
types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg, Sid, TokenObjects},
|
||||
@ -16,7 +16,10 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
marker::PhantomData,
|
||||
sync::{mpsc::TryRecvError, Arc, RwLock},
|
||||
sync::{
|
||||
mpsc::{self, Receiver, Sender, TryRecvError},
|
||||
Arc, RwLock,
|
||||
},
|
||||
};
|
||||
use tlid;
|
||||
use tracing::*;
|
||||
@ -41,30 +44,18 @@ pub enum Promise {
|
||||
pub struct Participant {
|
||||
addr: Address,
|
||||
remote_pid: Pid,
|
||||
network_controller: Arc<Vec<Controller>>,
|
||||
}
|
||||
|
||||
pub struct Connection {}
|
||||
|
||||
pub struct Stream {
|
||||
sid: Sid,
|
||||
msg_rx: Receiver<InCommingMessage>,
|
||||
network_controller: Arc<Vec<Controller>>,
|
||||
}
|
||||
|
||||
pub trait Events {
|
||||
fn on_remote_connection_open(net: &Network<Self>, con: &Connection)
|
||||
where
|
||||
Self: std::marker::Sized;
|
||||
fn on_remote_connection_close(net: &Network<Self>, con: &Connection)
|
||||
where
|
||||
Self: std::marker::Sized;
|
||||
fn on_remote_stream_open(net: &Network<Self>, st: &Stream)
|
||||
where
|
||||
Self: std::marker::Sized;
|
||||
fn on_remote_stream_close(net: &Network<Self>, st: &Stream)
|
||||
where
|
||||
Self: std::marker::Sized;
|
||||
}
|
||||
|
||||
pub struct Network<E: Events> {
|
||||
pub struct Network {
|
||||
token_pool: tlid::Pool<tlid::Wrapping<usize>>,
|
||||
worker_pool: tlid::Pool<tlid::Wrapping<u64>>,
|
||||
controller: Arc<Vec<Controller>>,
|
||||
@ -72,10 +63,9 @@ pub struct Network<E: Events> {
|
||||
participant_id: Pid,
|
||||
remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
|
||||
metrics: Arc<Option<NetworkMetrics>>,
|
||||
_pe: PhantomData<E>,
|
||||
}
|
||||
|
||||
impl<E: Events> Network<E> {
|
||||
impl Network {
|
||||
pub fn new(participant_id: Uuid, thread_pool: Arc<ThreadPool>) -> Self {
|
||||
let mut token_pool = tlid::Pool::new_full();
|
||||
let mut worker_pool = tlid::Pool::new_full();
|
||||
@ -103,79 +93,11 @@ impl<E: Events> Network<E> {
|
||||
participant_id,
|
||||
remotes,
|
||||
metrics,
|
||||
_pe: PhantomData::<E> {},
|
||||
}
|
||||
}
|
||||
|
||||
fn get_lowest_worker<'a: 'b, 'b>(list: &'a Arc<Vec<Controller>>) -> &'a Controller { &list[0] }
|
||||
|
||||
pub fn send<M: Serialize>(&self, msg: M, stream: &Stream) {
|
||||
let messagebuffer = Arc::new(message::serialize(&msg));
|
||||
//transfer message to right worker to right channel to correct stream
|
||||
//TODO: why do we need a look here, i want my own local directory which is
|
||||
// updated by workes via a channel and needs to be intepreted on a send but it
|
||||
// should almost ever be empty except for new channel creations and stream
|
||||
// creations!
|
||||
for worker in self.controller.iter() {
|
||||
worker
|
||||
.get_tx()
|
||||
.send(CtrlMsg::Send(OutGoingMessage {
|
||||
buffer: messagebuffer.clone(),
|
||||
cursor: 0,
|
||||
mid: None,
|
||||
sid: stream.sid,
|
||||
}))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn recv<M: DeserializeOwned>(&self, stream: &Stream) -> Option<M> {
|
||||
for worker in self.controller.iter() {
|
||||
let msg = match worker.get_rx().try_recv() {
|
||||
Ok(msg) => msg,
|
||||
Err(TryRecvError::Empty) => {
|
||||
return None;
|
||||
},
|
||||
Err(err) => {
|
||||
panic!("Unexpected error '{}'", err);
|
||||
},
|
||||
};
|
||||
|
||||
match msg {
|
||||
RtrnMsg::Receive(m) => {
|
||||
info!("delivering a message");
|
||||
return Some(message::deserialize(m.buffer));
|
||||
},
|
||||
_ => unimplemented!("woopsie"),
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub async fn open(&self, part: &Participant, prio: u8, promises: EnumSet<Promise>) -> Stream {
|
||||
let (ctrl_tx, ctrl_rx) = std::sync::mpsc::channel::<Sid>();
|
||||
for controller in self.controller.iter() {
|
||||
controller
|
||||
.get_tx()
|
||||
.send(CtrlMsg::OpenStream {
|
||||
pid: part.remote_pid,
|
||||
prio,
|
||||
promises,
|
||||
return_sid: ctrl_tx,
|
||||
})
|
||||
.unwrap();
|
||||
break;
|
||||
}
|
||||
// I dont like the fact that i need to wait on the worker thread for getting my
|
||||
// sid back :/ we could avoid this by introducing a Thread Local Network
|
||||
// which owns some sids we can take without waiting
|
||||
let sid = ctrl_rx.recv().unwrap();
|
||||
info!(?sid, " sucessfully opened stream");
|
||||
Stream { sid }
|
||||
}
|
||||
|
||||
pub fn close(&self, stream: Stream) {}
|
||||
|
||||
pub async fn listen(&self, address: &Address) -> Result<(), NetworkError> {
|
||||
let span = span!(Level::TRACE, "listen", ?address);
|
||||
let worker = Self::get_lowest_worker(&self.controller);
|
||||
@ -206,7 +128,7 @@ impl<E: Events> Network<E> {
|
||||
info!("connecting");
|
||||
let tcp_stream = TcpStream::connect(&a)?;
|
||||
let tcp_channel = TcpChannel::new(tcp_stream);
|
||||
let (ctrl_tx, ctrl_rx) = std::sync::mpsc::channel::<Pid>();
|
||||
let (ctrl_tx, ctrl_rx) = mpsc::channel::<Pid>();
|
||||
let mut channel = Channel::new(
|
||||
pid,
|
||||
ChannelProtocols::Tcp(tcp_channel),
|
||||
@ -223,6 +145,7 @@ impl<E: Events> Network<E> {
|
||||
return Ok(Participant {
|
||||
addr: address.clone(),
|
||||
remote_pid,
|
||||
network_controller: self.controller.clone(),
|
||||
});
|
||||
},
|
||||
Address::Udp(_) => unimplemented!("lazy me"),
|
||||
@ -238,9 +161,23 @@ impl<E: Events> Network<E> {
|
||||
panic!("sda");
|
||||
}
|
||||
|
||||
pub async fn _connected(&self) -> Result<Participant, NetworkError> {
|
||||
pub async fn connected(&self) -> Result<Participant, NetworkError> {
|
||||
// returns if a Participant connected and is ready
|
||||
panic!("sda");
|
||||
loop {
|
||||
//ARRGGG
|
||||
for worker in self.controller.iter() {
|
||||
//TODO harden!
|
||||
if let Ok(msg) = worker.get_rx().try_recv() {
|
||||
if let RtrnMsg::ConnectedParticipant { pid } = msg {
|
||||
return Ok(Participant {
|
||||
addr: Address::Tcp(std::net::SocketAddr::from(([1, 3, 3, 7], 1337))), /* TODO: FIXME */
|
||||
remote_pid: pid,
|
||||
network_controller: self.controller.clone(),
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn _disconnected(&self) -> Result<Participant, NetworkError> {
|
||||
@ -258,20 +195,63 @@ impl<E: Events> Network<E> {
|
||||
}
|
||||
|
||||
impl Participant {
|
||||
pub async fn _open(
|
||||
pub async fn open(
|
||||
&self,
|
||||
prio: u8,
|
||||
promises: EnumSet<Promise>,
|
||||
) -> Result<Stream, ParticipantError> {
|
||||
panic!("sda");
|
||||
let (ctrl_tx, ctrl_rx) = mpsc::channel::<Sid>();
|
||||
let (msg_tx, msg_rx) = mpsc::channel::<InCommingMessage>();
|
||||
for controller in self.network_controller.iter() {
|
||||
controller
|
||||
.get_tx()
|
||||
.send(CtrlMsg::OpenStream {
|
||||
pid: self.remote_pid,
|
||||
prio,
|
||||
promises,
|
||||
return_sid: ctrl_tx,
|
||||
msg_tx,
|
||||
})
|
||||
.unwrap();
|
||||
break;
|
||||
}
|
||||
// I dont like the fact that i need to wait on the worker thread for getting my
|
||||
// sid back :/ we could avoid this by introducing a Thread Local Network
|
||||
// which owns some sids we can take without waiting
|
||||
let sid = ctrl_rx.recv().unwrap();
|
||||
info!(?sid, " sucessfully opened stream");
|
||||
Ok(Stream {
|
||||
sid,
|
||||
msg_rx,
|
||||
network_controller: self.network_controller.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn _close(&self, stream: Stream) -> Result<(), ParticipantError> {
|
||||
panic!("sda");
|
||||
}
|
||||
pub fn close(&self, stream: Stream) -> Result<(), ParticipantError> { Ok(()) }
|
||||
|
||||
pub async fn _opened(&self) -> Result<Stream, ParticipantError> {
|
||||
panic!("sda");
|
||||
pub async fn opened(&self) -> Result<Stream, ParticipantError> {
|
||||
loop {
|
||||
//ARRGGG
|
||||
for worker in self.network_controller.iter() {
|
||||
//TODO harden!
|
||||
if let Ok(msg) = worker.get_rx().try_recv() {
|
||||
if let RtrnMsg::OpendStream {
|
||||
pid,
|
||||
sid,
|
||||
prio,
|
||||
msg_rx,
|
||||
promises,
|
||||
} = msg
|
||||
{
|
||||
return Ok(Stream {
|
||||
sid,
|
||||
msg_rx,
|
||||
network_controller: self.network_controller.clone(),
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn _closed(&self) -> Result<Stream, ParticipantError> {
|
||||
@ -283,12 +263,38 @@ impl Stream {
|
||||
//TODO: What about SEND instead of Serializeable if it goes via PIPE ?
|
||||
//TODO: timeout per message or per stream ? stream or ?
|
||||
|
||||
pub async fn _send<M: Serialize>(&self, msg: M) -> Result<(), StreamError> {
|
||||
panic!("sda");
|
||||
pub fn send<M: Serialize>(&self, msg: M) -> Result<(), StreamError> {
|
||||
let messagebuffer = Arc::new(message::serialize(&msg));
|
||||
//transfer message to right worker to right channel to correct stream
|
||||
//TODO: why do we need a look here, i want my own local directory which is
|
||||
// updated by workes via a channel and needs to be intepreted on a send but it
|
||||
// should almost ever be empty except for new channel creations and stream
|
||||
// creations!
|
||||
for worker in self.network_controller.iter() {
|
||||
worker
|
||||
.get_tx()
|
||||
.send(CtrlMsg::Send(OutGoingMessage {
|
||||
buffer: messagebuffer.clone(),
|
||||
cursor: 0,
|
||||
mid: None,
|
||||
sid: self.sid,
|
||||
}))
|
||||
.unwrap();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn _recv<M: DeserializeOwned>(&self) -> Result<M, StreamError> {
|
||||
panic!("sda");
|
||||
pub fn recv<M: DeserializeOwned>(&self) -> Result<Option<M>, StreamError> {
|
||||
match self.msg_rx.try_recv() {
|
||||
Ok(msg) => {
|
||||
info!(?msg, "delivering a message");
|
||||
Ok(Some(message::deserialize(msg.buffer)))
|
||||
},
|
||||
Err(TryRecvError::Empty) => Ok(None),
|
||||
Err(err) => {
|
||||
panic!("Unexpected error '{}'", err);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -299,12 +305,12 @@ pub enum NetworkError {
|
||||
IoError(std::io::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ParticipantError {
|
||||
ParticipantDisconected,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum StreamError {
|
||||
StreamClosed,
|
||||
}
|
||||
|
@ -4,7 +4,7 @@ use crate::{
|
||||
mpsc::MpscChannel,
|
||||
tcp::TcpChannel,
|
||||
types::{
|
||||
Frame, Mid, Pid, RemoteParticipant, RtrnMsg, Sid, Stream, VELOREN_MAGIC_NUMBER,
|
||||
Frame, IntStream, Mid, Pid, RemoteParticipant, RtrnMsg, Sid, VELOREN_MAGIC_NUMBER,
|
||||
VELOREN_NETWORK_VERSION,
|
||||
},
|
||||
udp::UdpChannel,
|
||||
@ -13,7 +13,7 @@ use enumset::EnumSet;
|
||||
use mio_extras::channel::Sender;
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
sync::{Arc, RwLock},
|
||||
sync::{mpsc, Arc, RwLock},
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
@ -43,11 +43,10 @@ pub(crate) struct Channel {
|
||||
pub local_pid: Pid,
|
||||
pub remote_pid: Option<Pid>,
|
||||
pub remotes: Arc<RwLock<HashMap<Pid, RemoteParticipant>>>,
|
||||
pub streams: Vec<Stream>,
|
||||
pub streams: Vec<IntStream>,
|
||||
pub send_queue: VecDeque<Frame>,
|
||||
pub recv_queue: VecDeque<InCommingMessage>,
|
||||
pub protocol: ChannelProtocols,
|
||||
pub return_pid_to: Option<std::sync::mpsc::Sender<Pid>>,
|
||||
pub return_pid_to: Option<std::sync::mpsc::Sender<Pid>>, //use for network::connect()
|
||||
pub send_handshake: bool,
|
||||
pub send_pid: bool,
|
||||
pub send_config: bool,
|
||||
@ -95,7 +94,6 @@ impl Channel {
|
||||
remotes,
|
||||
streams: Vec::new(),
|
||||
send_queue: VecDeque::new(),
|
||||
recv_queue: VecDeque::new(),
|
||||
protocol,
|
||||
return_pid_to,
|
||||
send_handshake: false,
|
||||
@ -224,6 +222,7 @@ impl Channel {
|
||||
});
|
||||
self.send_config = true;
|
||||
info!(?pid, "this channel is now configured!");
|
||||
rtrn_tx.send(RtrnMsg::ConnectedParticipant { pid });
|
||||
}
|
||||
} else {
|
||||
self.send_queue.push_back(Frame::ParticipantId {
|
||||
@ -259,6 +258,7 @@ impl Channel {
|
||||
Frame::Shutdown {} => {
|
||||
self.recv_shutdown = true;
|
||||
info!("shutting down channel");
|
||||
rtrn_tx.send(RtrnMsg::Shutdown);
|
||||
},
|
||||
Frame::OpenStream {
|
||||
sid,
|
||||
@ -266,9 +266,17 @@ impl Channel {
|
||||
promises,
|
||||
} => {
|
||||
if let Some(pid) = self.remote_pid {
|
||||
let stream = Stream::new(sid, prio, promises.clone());
|
||||
let (msg_tx, msg_rx) = mpsc::channel::<InCommingMessage>();
|
||||
let stream = IntStream::new(sid, prio, promises.clone(), msg_tx);
|
||||
self.streams.push(stream);
|
||||
info!("opened a stream");
|
||||
rtrn_tx.send(RtrnMsg::OpendStream {
|
||||
pid,
|
||||
sid,
|
||||
prio,
|
||||
msg_rx,
|
||||
promises,
|
||||
});
|
||||
} else {
|
||||
error!("called OpenStream before PartcipantID!");
|
||||
}
|
||||
@ -277,6 +285,7 @@ impl Channel {
|
||||
if let Some(pid) = self.remote_pid {
|
||||
self.streams.retain(|stream| stream.sid() != sid);
|
||||
info!("closed a stream");
|
||||
rtrn_tx.send(RtrnMsg::ClosedStream { pid, sid });
|
||||
}
|
||||
},
|
||||
Frame::DataHeader { mid, sid, length } => {
|
||||
@ -321,10 +330,11 @@ impl Channel {
|
||||
};
|
||||
}
|
||||
if let Some(pos) = pos {
|
||||
let sid = s.sid();
|
||||
let tx = s.msg_tx();
|
||||
for m in s.to_receive.drain(pos..pos + 1) {
|
||||
info!("received message: {}", m.mid);
|
||||
//self.recv_queue.push_back(m);
|
||||
rtrn_tx.send(RtrnMsg::Receive(m)).unwrap();
|
||||
info!(?sid, ? m.mid, "received message");
|
||||
tx.send(m).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -389,12 +399,17 @@ impl Channel {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn open_stream(&mut self, prio: u8, promises: EnumSet<Promise>) -> Sid {
|
||||
pub(crate) fn open_stream(
|
||||
&mut self,
|
||||
prio: u8,
|
||||
promises: EnumSet<Promise>,
|
||||
msg_tx: mpsc::Sender<InCommingMessage>,
|
||||
) -> Sid {
|
||||
// validate promises
|
||||
if let Some(stream_id_pool) = &mut self.stream_id_pool {
|
||||
let sid = stream_id_pool.next();
|
||||
trace!(?sid, "going to open a new stream");
|
||||
let stream = Stream::new(sid, prio, promises.clone());
|
||||
let stream = IntStream::new(sid, prio, promises.clone(), msg_tx);
|
||||
self.streams.push(stream);
|
||||
self.send_queue.push_back(Frame::OpenStream {
|
||||
sid,
|
||||
|
@ -14,30 +14,17 @@ mod worker;
|
||||
pub mod tests {
|
||||
use crate::api::*;
|
||||
use futures::executor::block_on;
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use std::{net::SocketAddr, sync::Arc, thread, time::Duration};
|
||||
use tracing::*;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use uuid::Uuid;
|
||||
use uvth::ThreadPoolBuilder;
|
||||
|
||||
struct N {
|
||||
_id: u8,
|
||||
}
|
||||
|
||||
impl Events for N {
|
||||
fn on_remote_connection_open(_net: &Network<N>, _con: &Connection) {}
|
||||
|
||||
fn on_remote_connection_close(_net: &Network<N>, _con: &Connection) {}
|
||||
|
||||
fn on_remote_stream_open(_net: &Network<N>, _st: &Stream) {}
|
||||
|
||||
fn on_remote_stream_close(_net: &Network<N>, _st: &Stream) {}
|
||||
}
|
||||
|
||||
pub fn test_tracing() {
|
||||
let filter = EnvFilter::from_default_env()
|
||||
//.add_directive("[worker]=trace".parse().unwrap())
|
||||
//.add_directive("trace".parse().unwrap())
|
||||
.add_directive("trace".parse().unwrap())
|
||||
.add_directive("veloren_network::tests=trace".parse().unwrap())
|
||||
.add_directive("veloren_network::worker=debug".parse().unwrap())
|
||||
.add_directive("veloren_network::controller=trace".parse().unwrap())
|
||||
.add_directive("veloren_network::channel=trace".parse().unwrap())
|
||||
@ -57,53 +44,54 @@ pub mod tests {
|
||||
.init();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn it_works() {
|
||||
assert_eq!(2 + 2, 4);
|
||||
pub fn block_on_recv(stream: &Stream) -> Result<String, StreamError> {
|
||||
let mut s: Result<Option<String>, StreamError> = stream.recv();
|
||||
while let Ok(None) = s {
|
||||
thread::sleep(Duration::from_millis(1));
|
||||
s = stream.recv();
|
||||
}
|
||||
if let Ok(Some(s)) = s {
|
||||
return Ok(s);
|
||||
}
|
||||
if let Err(e) = s {
|
||||
return Err(e);
|
||||
}
|
||||
unreachable!("invalid test");
|
||||
}
|
||||
|
||||
/*
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn client_server() {
|
||||
let thread_pool = Arc::new(
|
||||
ThreadPoolBuilder::new()
|
||||
.name("veloren-network-test".into())
|
||||
.build(),
|
||||
);
|
||||
test_tracing();
|
||||
let n1 = Network::<N>::new(Uuid::new_v4(), thread_pool.clone());
|
||||
let n2 = Network::<N>::new(Uuid::new_v4(), thread_pool.clone());
|
||||
let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000)));
|
||||
let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52001)));
|
||||
n1.listen(&a1); //await
|
||||
n2.listen(&a2); // only requiered here, but doesnt hurt on n1
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
#[test]
|
||||
fn aaa() { test_tracing(); }
|
||||
|
||||
let p1 = n1.connect(&a2); //await
|
||||
//n2.OnRemoteConnectionOpen triggered
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn client_server() {
|
||||
let thread_pool = Arc::new(
|
||||
ThreadPoolBuilder::new()
|
||||
.name("veloren-network-test".into())
|
||||
.build(),
|
||||
);
|
||||
thread::sleep(Duration::from_millis(200));
|
||||
let n1 = Network::new(Uuid::new_v4(), thread_pool.clone());
|
||||
let n2 = Network::new(Uuid::new_v4(), thread_pool.clone());
|
||||
let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52000)));
|
||||
let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52001)));
|
||||
block_on(n1.listen(&a1)).unwrap(); //await
|
||||
block_on(n2.listen(&a2)).unwrap(); // only requiered here, but doesnt hurt on n1
|
||||
thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly!
|
||||
|
||||
let s1 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt);
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
//n2.OnRemoteStreamOpen triggered
|
||||
let p1 = block_on(n1.connect(&a2)).unwrap(); //await
|
||||
let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap();
|
||||
|
||||
n1.send("Hello World", &s1);
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
// receive on n2 now
|
||||
s1.send("Hello World");
|
||||
|
||||
let s: Option<String> = n2.recv(&s1);
|
||||
for _ in 1..4 {
|
||||
error!("{:?}", s);
|
||||
}
|
||||
assert_eq!(s, Some("Hello World".to_string()));
|
||||
let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1
|
||||
let s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1
|
||||
|
||||
n1.close(s1);
|
||||
//n2.OnRemoteStreamClose triggered
|
||||
let s = block_on_recv(&s1_n2);
|
||||
assert_eq!(s, Ok("Hello World".to_string()));
|
||||
|
||||
std::thread::sleep(std::time::Duration::from_millis(20000));
|
||||
}
|
||||
*/
|
||||
p1.close(s1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn client_server_stream() {
|
||||
@ -112,31 +100,58 @@ pub mod tests {
|
||||
.name("veloren-network-test".into())
|
||||
.build(),
|
||||
);
|
||||
test_tracing();
|
||||
let n1 = Network::<N>::new(Uuid::new_v4(), thread_pool.clone());
|
||||
let n2 = Network::<N>::new(Uuid::new_v4(), thread_pool.clone());
|
||||
thread::sleep(Duration::from_millis(400));
|
||||
let n1 = Network::new(Uuid::new_v4(), thread_pool.clone());
|
||||
let n2 = Network::new(Uuid::new_v4(), thread_pool.clone());
|
||||
let a1 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52010)));
|
||||
let a2 = Address::Tcp(SocketAddr::from(([127, 0, 0, 1], 52011)));
|
||||
|
||||
block_on(n1.listen(&a1)).unwrap(); //await
|
||||
block_on(n2.listen(&a2)).unwrap(); // only requiered here, but doesnt hurt on n1
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
thread::sleep(Duration::from_millis(3)); //TODO: listeing still doesnt block correctly!
|
||||
|
||||
let p1 = block_on(n1.connect(&a2)); //await
|
||||
let p1 = p1.unwrap();
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
let p1 = block_on(n1.connect(&a2)).unwrap(); //await
|
||||
|
||||
let s1 = block_on(n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt));
|
||||
//let s2 = n1.open(&p1, 16, Promise::InOrder | Promise::NoCorrupt);
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
let s1 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap();
|
||||
let s2 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap();
|
||||
let s3 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap();
|
||||
let s4 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap();
|
||||
let s5 = block_on(p1.open(16, Promise::InOrder | Promise::NoCorrupt)).unwrap();
|
||||
|
||||
n1.send("Hello World", &s1);
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
thread::sleep(Duration::from_millis(3));
|
||||
s3.send("Hello World3");
|
||||
thread::sleep(Duration::from_millis(3));
|
||||
s1.send("Hello World1");
|
||||
s5.send("Hello World5");
|
||||
s2.send("Hello World2");
|
||||
s4.send("Hello World4");
|
||||
thread::sleep(Duration::from_millis(3));
|
||||
|
||||
std::thread::sleep(std::time::Duration::from_millis(1000));
|
||||
let p1_n2 = block_on(n2.connected()).unwrap(); //remote representation of p1
|
||||
let s1_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s1
|
||||
let s2_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s2
|
||||
let s3_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s3
|
||||
let s4_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s4
|
||||
let s5_n2 = block_on(p1_n2.opened()).unwrap(); //remote representation of s5
|
||||
|
||||
let s: Option<String> = n2.recv(&s1);
|
||||
assert_eq!(s, Some("Hello World".to_string()));
|
||||
info!("all streams opened");
|
||||
|
||||
n1.close(s1);
|
||||
let s = block_on_recv(&s3_n2);
|
||||
assert_eq!(s, Ok("Hello World3".to_string()));
|
||||
info!("1 read");
|
||||
let s = block_on_recv(&s1_n2);
|
||||
assert_eq!(s, Ok("Hello World1".to_string()));
|
||||
info!("2 read");
|
||||
let s = block_on_recv(&s2_n2);
|
||||
assert_eq!(s, Ok("Hello World2".to_string()));
|
||||
info!("3 read");
|
||||
let s = block_on_recv(&s5_n2);
|
||||
assert_eq!(s, Ok("Hello World5".to_string()));
|
||||
info!("4 read");
|
||||
let s = block_on_recv(&s4_n2);
|
||||
assert_eq!(s, Ok("Hello World4".to_string()));
|
||||
info!("5 read");
|
||||
|
||||
p1.close(s1);
|
||||
}
|
||||
}
|
||||
|
@ -6,7 +6,7 @@ use crate::{
|
||||
use enumset::EnumSet;
|
||||
use mio::{self, net::TcpListener, PollOpt, Ready};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::VecDeque;
|
||||
use std::{collections::VecDeque, sync::mpsc};
|
||||
use uuid::Uuid;
|
||||
|
||||
//Participant Ids are randomly chosen
|
||||
@ -32,7 +32,8 @@ pub(crate) enum CtrlMsg {
|
||||
pid: Pid,
|
||||
prio: u8,
|
||||
promises: EnumSet<Promise>,
|
||||
return_sid: std::sync::mpsc::Sender<Sid>,
|
||||
msg_tx: mpsc::Sender<InCommingMessage>,
|
||||
return_sid: mpsc::Sender<Sid>,
|
||||
},
|
||||
CloseStream {
|
||||
pid: Pid,
|
||||
@ -43,16 +44,20 @@ pub(crate) enum CtrlMsg {
|
||||
|
||||
pub(crate) enum RtrnMsg {
|
||||
Shutdown,
|
||||
ConnectedParticipant {
|
||||
pid: Pid,
|
||||
},
|
||||
OpendStream {
|
||||
pid: Pid,
|
||||
sid: Sid,
|
||||
prio: u8,
|
||||
msg_rx: mpsc::Receiver<InCommingMessage>,
|
||||
promises: EnumSet<Promise>,
|
||||
},
|
||||
ClosedStream {
|
||||
pid: Pid,
|
||||
sid: Sid,
|
||||
},
|
||||
Receive(InCommingMessage),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -62,20 +67,27 @@ pub(crate) enum TokenObjects {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Stream {
|
||||
pub(crate) struct IntStream {
|
||||
sid: Sid,
|
||||
prio: u8,
|
||||
promises: EnumSet<Promise>,
|
||||
msg_tx: mpsc::Sender<InCommingMessage>,
|
||||
pub to_send: VecDeque<OutGoingMessage>,
|
||||
pub to_receive: VecDeque<InCommingMessage>,
|
||||
}
|
||||
|
||||
impl Stream {
|
||||
pub fn new(sid: Sid, prio: u8, promises: EnumSet<Promise>) -> Self {
|
||||
Stream {
|
||||
impl IntStream {
|
||||
pub fn new(
|
||||
sid: Sid,
|
||||
prio: u8,
|
||||
promises: EnumSet<Promise>,
|
||||
msg_tx: mpsc::Sender<InCommingMessage>,
|
||||
) -> Self {
|
||||
IntStream {
|
||||
sid,
|
||||
prio,
|
||||
promises,
|
||||
msg_tx,
|
||||
to_send: VecDeque::new(),
|
||||
to_receive: VecDeque::new(),
|
||||
}
|
||||
@ -85,6 +97,8 @@ impl Stream {
|
||||
|
||||
pub fn prio(&self) -> u8 { self.prio }
|
||||
|
||||
pub fn msg_tx(&self) -> mpsc::Sender<InCommingMessage> { self.msg_tx.clone() }
|
||||
|
||||
pub fn promises(&self) -> EnumSet<Promise> { self.promises }
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
use crate::{
|
||||
channel::{Channel, ChannelProtocol, ChannelProtocols},
|
||||
controller::Controller,
|
||||
message::InCommingMessage,
|
||||
metrics::NetworkMetrics,
|
||||
tcp::TcpChannel,
|
||||
types::{CtrlMsg, Pid, RemoteParticipant, RtrnMsg, TokenObjects},
|
||||
@ -9,7 +10,7 @@ use mio::{self, Poll, PollOpt, Ready, Token};
|
||||
use mio_extras::channel::{Receiver, Sender};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{mpsc::TryRecvError, Arc, RwLock},
|
||||
sync::{mpsc, mpsc::TryRecvError, Arc, RwLock},
|
||||
time::Instant,
|
||||
};
|
||||
use tlid;
|
||||
@ -149,15 +150,18 @@ impl Worker {
|
||||
pid,
|
||||
prio,
|
||||
promises,
|
||||
msg_tx,
|
||||
return_sid,
|
||||
} => {
|
||||
let mut handled = false;
|
||||
for (tok, obj) in self.mio_tokens.tokens.iter_mut() {
|
||||
if let TokenObjects::Channel(channel) = obj {
|
||||
if Some(pid) == channel.remote_pid {
|
||||
let sid = channel.open_stream(prio, promises);
|
||||
let (msg_tx, msg_rx) = mpsc::channel::<InCommingMessage>();
|
||||
let sid = channel.open_stream(prio, promises, msg_tx);
|
||||
return_sid.send(sid);
|
||||
channel.tick_send();
|
||||
error!("handle msg_tx");
|
||||
handled = true;
|
||||
break;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user