2022-07-01 11:23:46 +00:00
|
|
|
#![feature(assert_matches)]
|
2021-01-15 13:04:32 +00:00
|
|
|
use std::sync::Arc;
|
|
|
|
use tokio::runtime::Runtime;
|
2020-06-08 09:47:39 +00:00
|
|
|
use veloren_network::{NetworkError, StreamError};
|
2020-03-10 00:07:36 +00:00
|
|
|
mod helper;
|
2021-11-19 08:36:39 +00:00
|
|
|
use helper::{mpsc, network_participant_stream, quic, tcp, udp, SLEEP_EXTERNAL, SLEEP_INTERNAL};
|
2020-05-27 11:43:29 +00:00
|
|
|
use std::io::ErrorKind;
|
2022-07-01 11:23:46 +00:00
|
|
|
use veloren_network::{ConnectAddr, ListenAddr, Network, ParticipantEvent, Pid, Promises};
|
2020-03-10 00:07:36 +00:00
|
|
|
|
2020-03-22 13:47:21 +00:00
|
|
|
#[test]
|
2020-04-08 14:26:42 +00:00
|
|
|
fn stream_simple() {
|
|
|
|
let (_, _) = helper::setup(false, 0);
|
2021-01-15 13:04:32 +00:00
|
|
|
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
|
2020-04-08 14:26:42 +00:00
|
|
|
|
|
|
|
s1_a.send("Hello World").unwrap();
|
2021-01-15 13:04:32 +00:00
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
|
2021-01-19 08:48:33 +00:00
|
|
|
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
|
2020-03-22 13:47:21 +00:00
|
|
|
}
|
2020-03-10 00:07:36 +00:00
|
|
|
|
2020-10-15 11:22:34 +00:00
|
|
|
#[test]
|
|
|
|
fn stream_try_recv() {
|
|
|
|
let (_, _) = helper::setup(false, 0);
|
2021-03-03 09:39:21 +00:00
|
|
|
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
|
2020-10-15 11:22:34 +00:00
|
|
|
|
|
|
|
s1_a.send(4242u32).unwrap();
|
2021-11-19 08:36:39 +00:00
|
|
|
std::thread::sleep(SLEEP_EXTERNAL);
|
2020-10-15 11:22:34 +00:00
|
|
|
assert_eq!(s1_b.try_recv(), Ok(Some(4242u32)));
|
2021-01-19 08:48:33 +00:00
|
|
|
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
|
2020-10-15 11:22:34 +00:00
|
|
|
}
|
|
|
|
|
2020-03-22 13:47:21 +00:00
|
|
|
#[test]
|
2020-04-08 14:26:42 +00:00
|
|
|
fn stream_simple_3msg() {
|
|
|
|
let (_, _) = helper::setup(false, 0);
|
2021-01-15 13:04:32 +00:00
|
|
|
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
|
2020-04-08 14:26:42 +00:00
|
|
|
|
|
|
|
s1_a.send("Hello World").unwrap();
|
|
|
|
s1_a.send(1337).unwrap();
|
2021-01-15 13:04:32 +00:00
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
|
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok(1337));
|
2020-04-08 14:26:42 +00:00
|
|
|
s1_a.send("3rdMessage").unwrap();
|
2021-01-15 13:04:32 +00:00
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
|
2021-01-19 08:48:33 +00:00
|
|
|
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
|
2020-04-08 14:26:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
2021-02-10 10:37:42 +00:00
|
|
|
fn stream_simple_mpsc() {
|
|
|
|
let (_, _) = helper::setup(false, 0);
|
|
|
|
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(mpsc());
|
|
|
|
|
|
|
|
s1_a.send("Hello World").unwrap();
|
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
|
|
|
|
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn stream_simple_mpsc_3msg() {
|
|
|
|
let (_, _) = helper::setup(false, 0);
|
|
|
|
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(mpsc());
|
|
|
|
|
|
|
|
s1_a.send("Hello World").unwrap();
|
|
|
|
s1_a.send(1337).unwrap();
|
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
|
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok(1337));
|
|
|
|
s1_a.send("3rdMessage").unwrap();
|
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
|
|
|
|
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
|
|
|
|
}
|
|
|
|
|
2021-04-15 08:16:42 +00:00
|
|
|
#[test]
|
|
|
|
fn stream_simple_quic() {
|
|
|
|
let (_, _) = helper::setup(false, 0);
|
|
|
|
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(quic());
|
|
|
|
|
|
|
|
s1_a.send("Hello World").unwrap();
|
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
|
|
|
|
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn stream_simple_quic_3msg() {
|
2021-04-27 15:59:36 +00:00
|
|
|
let (_, _) = helper::setup(false, 0);
|
2021-04-15 08:16:42 +00:00
|
|
|
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(quic());
|
|
|
|
|
|
|
|
s1_a.send("Hello World").unwrap();
|
|
|
|
s1_a.send(1337).unwrap();
|
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
|
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok(1337));
|
|
|
|
s1_a.send("3rdMessage").unwrap();
|
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
|
|
|
|
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
|
|
|
|
}
|
|
|
|
|
2021-02-10 10:37:42 +00:00
|
|
|
#[test]
|
|
|
|
#[ignore]
|
2020-04-08 14:26:42 +00:00
|
|
|
fn stream_simple_udp() {
|
|
|
|
let (_, _) = helper::setup(false, 0);
|
2021-01-15 13:04:32 +00:00
|
|
|
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp());
|
2020-04-08 14:26:42 +00:00
|
|
|
|
|
|
|
s1_a.send("Hello World").unwrap();
|
2021-01-15 13:04:32 +00:00
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
|
2021-01-19 08:48:33 +00:00
|
|
|
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
|
2020-04-08 14:26:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
2021-02-10 10:37:42 +00:00
|
|
|
#[ignore]
|
2020-04-08 14:26:42 +00:00
|
|
|
fn stream_simple_udp_3msg() {
|
|
|
|
let (_, _) = helper::setup(false, 0);
|
2021-01-15 13:04:32 +00:00
|
|
|
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(udp());
|
2020-04-08 14:26:42 +00:00
|
|
|
|
|
|
|
s1_a.send("Hello World").unwrap();
|
|
|
|
s1_a.send(1337).unwrap();
|
2021-01-15 13:04:32 +00:00
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
|
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok(1337));
|
2020-04-08 14:26:42 +00:00
|
|
|
s1_a.send("3rdMessage").unwrap();
|
2021-01-15 13:04:32 +00:00
|
|
|
assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
|
2021-01-19 08:48:33 +00:00
|
|
|
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
|
2020-03-10 00:07:36 +00:00
|
|
|
}
|
2020-05-26 13:06:03 +00:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
#[ignore]
|
|
|
|
fn tcp_and_udp_2_connections() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
|
|
|
let (_, _) = helper::setup(false, 0);
|
2021-01-15 13:04:32 +00:00
|
|
|
let r = Arc::new(Runtime::new().unwrap());
|
2021-03-03 09:39:21 +00:00
|
|
|
let network = Network::new(Pid::new(), &r);
|
|
|
|
let remote = Network::new(Pid::new(), &r);
|
2021-01-15 13:04:32 +00:00
|
|
|
r.block_on(async {
|
2021-01-19 08:48:33 +00:00
|
|
|
let network = network;
|
|
|
|
let remote = remote;
|
2020-05-26 13:06:03 +00:00
|
|
|
remote
|
2021-04-15 08:16:42 +00:00
|
|
|
.listen(ListenAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
|
2020-05-26 13:06:03 +00:00
|
|
|
.await?;
|
|
|
|
remote
|
2021-04-15 08:16:42 +00:00
|
|
|
.listen(ListenAddr::Udp("127.0.0.1:2001".parse().unwrap()))
|
2020-05-26 13:06:03 +00:00
|
|
|
.await?;
|
|
|
|
let p1 = network
|
2021-04-15 08:16:42 +00:00
|
|
|
.connect(ConnectAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
|
2020-05-26 13:06:03 +00:00
|
|
|
.await?;
|
|
|
|
let p2 = network
|
2021-04-15 08:16:42 +00:00
|
|
|
.connect(ConnectAddr::Udp("127.0.0.1:2001".parse().unwrap()))
|
2020-05-26 13:06:03 +00:00
|
|
|
.await?;
|
2020-07-09 07:58:21 +00:00
|
|
|
assert_eq!(&p1, &p2);
|
2020-05-26 13:06:03 +00:00
|
|
|
Ok(())
|
|
|
|
})
|
|
|
|
}
|
2020-05-27 11:43:29 +00:00
|
|
|
|
|
|
|
#[test]
|
2021-02-10 10:37:42 +00:00
|
|
|
#[ignore]
|
2020-05-27 11:43:29 +00:00
|
|
|
fn failed_listen_on_used_ports() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
|
|
let (_, _) = helper::setup(false, 0);
|
2021-01-15 13:04:32 +00:00
|
|
|
let r = Arc::new(Runtime::new().unwrap());
|
2021-03-03 09:39:21 +00:00
|
|
|
let network = Network::new(Pid::new(), &r);
|
2020-05-27 11:43:29 +00:00
|
|
|
let udp1 = udp();
|
|
|
|
let tcp1 = tcp();
|
2021-04-15 08:16:42 +00:00
|
|
|
r.block_on(network.listen(udp1.0.clone()))?;
|
|
|
|
r.block_on(network.listen(tcp1.0.clone()))?;
|
2021-11-19 08:36:39 +00:00
|
|
|
std::thread::sleep(SLEEP_INTERNAL);
|
2020-05-27 11:43:29 +00:00
|
|
|
|
2021-03-03 09:39:21 +00:00
|
|
|
let network2 = Network::new(Pid::new(), &r);
|
2021-04-15 08:16:42 +00:00
|
|
|
let e1 = r.block_on(network2.listen(udp1.0));
|
|
|
|
let e2 = r.block_on(network2.listen(tcp1.0));
|
2020-05-27 11:43:29 +00:00
|
|
|
match e1 {
|
|
|
|
Err(NetworkError::ListenFailed(e)) if e.kind() == ErrorKind::AddrInUse => (),
|
2020-06-30 22:01:09 +00:00
|
|
|
_ => panic!(),
|
2020-05-27 11:43:29 +00:00
|
|
|
};
|
|
|
|
match e2 {
|
|
|
|
Err(NetworkError::ListenFailed(e)) if e.kind() == ErrorKind::AddrInUse => (),
|
2020-06-30 22:01:09 +00:00
|
|
|
_ => panic!(),
|
2020-05-27 11:43:29 +00:00
|
|
|
};
|
2021-01-19 08:48:33 +00:00
|
|
|
drop((network, network2)); //clean teardown
|
2020-05-27 11:43:29 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
2020-05-27 15:58:57 +00:00
|
|
|
|
|
|
|
/// There is a bug an impris-desktop-1 which fails the DOC tests,
|
|
|
|
/// it fails exactly `api_stream_send_main` and `api_stream_recv_main` by
|
|
|
|
/// deadlocking at different times!
|
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
|
|
|
/// So i rather put the same test into a unit test, these are now duplicate to
|
|
|
|
/// the api, but are left here, just to be save!
|
2020-05-27 15:58:57 +00:00
|
|
|
#[test]
|
|
|
|
fn api_stream_send_main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
|
|
let (_, _) = helper::setup(false, 0);
|
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
|
|
|
// Create a Network, listen on Port `1200` and wait for a Stream to be opened,
|
2020-05-27 15:58:57 +00:00
|
|
|
// then answer `Hello World`
|
2021-01-15 13:04:32 +00:00
|
|
|
let r = Arc::new(Runtime::new().unwrap());
|
2021-03-03 09:39:21 +00:00
|
|
|
let network = Network::new(Pid::new(), &r);
|
|
|
|
let remote = Network::new(Pid::new(), &r);
|
2021-01-15 13:04:32 +00:00
|
|
|
r.block_on(async {
|
2021-01-19 08:48:33 +00:00
|
|
|
let network = network;
|
|
|
|
let remote = remote;
|
2020-05-27 15:58:57 +00:00
|
|
|
network
|
2021-04-15 08:16:42 +00:00
|
|
|
.listen(ListenAddr::Tcp("127.0.0.1:1200".parse().unwrap()))
|
2020-05-27 15:58:57 +00:00
|
|
|
.await?;
|
|
|
|
let remote_p = remote
|
2021-04-15 08:16:42 +00:00
|
|
|
.connect(ConnectAddr::Tcp("127.0.0.1:1200".parse().unwrap()))
|
2020-05-27 15:58:57 +00:00
|
|
|
.await?;
|
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
|
|
|
// keep it alive
|
|
|
|
let _stream_p = remote_p
|
2021-02-18 00:01:57 +00:00
|
|
|
.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
|
2020-05-27 15:58:57 +00:00
|
|
|
.await?;
|
|
|
|
let participant_a = network.connected().await?;
|
|
|
|
let mut stream_a = participant_a.opened().await?;
|
|
|
|
//Send Message
|
|
|
|
stream_a.send("Hello World")?;
|
|
|
|
Ok(())
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn api_stream_recv_main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|
|
|
let (_, _) = helper::setup(false, 0);
|
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
|
|
|
// Create a Network, listen on Port `1220` and wait for a Stream to be opened,
|
2020-05-27 15:58:57 +00:00
|
|
|
// then listen on it
|
2021-01-15 13:04:32 +00:00
|
|
|
let r = Arc::new(Runtime::new().unwrap());
|
2021-03-03 09:39:21 +00:00
|
|
|
let network = Network::new(Pid::new(), &r);
|
|
|
|
let remote = Network::new(Pid::new(), &r);
|
2021-01-15 13:04:32 +00:00
|
|
|
r.block_on(async {
|
2021-01-19 08:48:33 +00:00
|
|
|
let network = network;
|
|
|
|
let remote = remote;
|
2020-05-27 15:58:57 +00:00
|
|
|
network
|
2021-04-15 08:16:42 +00:00
|
|
|
.listen(ListenAddr::Tcp("127.0.0.1:1220".parse().unwrap()))
|
2020-05-27 15:58:57 +00:00
|
|
|
.await?;
|
|
|
|
let remote_p = remote
|
2021-04-15 08:16:42 +00:00
|
|
|
.connect(ConnectAddr::Tcp("127.0.0.1:1220".parse().unwrap()))
|
2020-05-27 15:58:57 +00:00
|
|
|
.await?;
|
|
|
|
let mut stream_p = remote_p
|
2021-02-18 00:01:57 +00:00
|
|
|
.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0)
|
2020-05-27 15:58:57 +00:00
|
|
|
.await?;
|
|
|
|
stream_p.send("Hello World")?;
|
|
|
|
let participant_a = network.connected().await?;
|
|
|
|
let mut stream_a = participant_a.opened().await?;
|
|
|
|
//Send Message
|
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
|
|
|
assert_eq!("Hello World".to_string(), stream_a.recv::<String>().await?);
|
2020-05-27 15:58:57 +00:00
|
|
|
Ok(())
|
|
|
|
})
|
|
|
|
}
|
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn wrong_parse() {
|
|
|
|
let (_, _) = helper::setup(false, 0);
|
2021-01-15 13:04:32 +00:00
|
|
|
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
|
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
|
|
|
|
|
|
|
s1_a.send(1337).unwrap();
|
2021-01-15 13:04:32 +00:00
|
|
|
match r.block_on(s1_b.recv::<String>()) {
|
2020-08-25 13:32:42 +00:00
|
|
|
Err(StreamError::Deserialize(_)) => (),
|
2020-06-30 22:01:09 +00:00
|
|
|
_ => panic!("this should fail, but it doesnt!"),
|
2020-06-08 09:47:39 +00:00
|
|
|
}
|
2021-01-19 08:48:33 +00:00
|
|
|
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
|
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
|
|
|
}
|
2020-10-15 11:22:34 +00:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn multiple_try_recv() {
|
|
|
|
let (_, _) = helper::setup(false, 0);
|
2021-03-03 09:39:21 +00:00
|
|
|
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
|
2020-10-15 11:22:34 +00:00
|
|
|
|
|
|
|
s1_a.send("asd").unwrap();
|
|
|
|
s1_a.send(11u32).unwrap();
|
2021-11-19 08:36:39 +00:00
|
|
|
std::thread::sleep(SLEEP_EXTERNAL);
|
2020-10-15 11:22:34 +00:00
|
|
|
assert_eq!(s1_b.try_recv(), Ok(Some("asd".to_string())));
|
|
|
|
assert_eq!(s1_b.try_recv::<u32>(), Ok(Some(11u32)));
|
|
|
|
assert_eq!(s1_b.try_recv::<String>(), Ok(None));
|
|
|
|
|
|
|
|
drop(s1_a);
|
2021-11-19 08:36:39 +00:00
|
|
|
std::thread::sleep(SLEEP_EXTERNAL);
|
2020-10-15 11:22:34 +00:00
|
|
|
assert_eq!(s1_b.try_recv::<String>(), Err(StreamError::StreamClosed));
|
2021-01-19 08:48:33 +00:00
|
|
|
drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
|
2020-10-15 11:22:34 +00:00
|
|
|
}
|
2022-01-29 17:18:30 +00:00
|
|
|
|
|
|
|
/// If we listen on a IPv6 UNSPECIFIED address, on linux it will automatically
|
|
|
|
/// listen on the respective IPv4 address. This must not be as we should behave
|
|
|
|
/// similar under windows and linux.
|
2022-01-30 20:42:59 +00:00
|
|
|
///
|
|
|
|
/// As most CI servers don't have IPv6 configured, this would return
|
|
|
|
/// ConnectFailed(Io(Os { code: 99, kind: AddrNotAvailable, message: "Cannot
|
|
|
|
/// assign requested address" })) we have to disable this test in CI, but it was
|
|
|
|
/// manually tested on linux and windows
|
|
|
|
///
|
|
|
|
/// On Windows this test must be executed as root to listen on IPv6::UNSPECIFIED
|
2022-01-29 17:18:30 +00:00
|
|
|
#[test]
|
2022-01-30 20:42:59 +00:00
|
|
|
#[ignore]
|
2022-01-29 17:18:30 +00:00
|
|
|
fn listen_on_ipv6_doesnt_block_ipv4() {
|
|
|
|
let (_, _) = helper::setup(false, 0);
|
|
|
|
let tcpv4 = tcp();
|
|
|
|
let port = if let ListenAddr::Tcp(x) = tcpv4.0 {
|
|
|
|
x.port()
|
|
|
|
} else {
|
|
|
|
unreachable!()
|
|
|
|
};
|
|
|
|
let tcpv6 = (
|
|
|
|
ListenAddr::Tcp(std::net::SocketAddr::from((
|
|
|
|
std::net::Ipv6Addr::UNSPECIFIED,
|
|
|
|
port,
|
|
|
|
))),
|
|
|
|
ConnectAddr::Tcp(std::net::SocketAddr::from((
|
|
|
|
std::net::Ipv6Addr::UNSPECIFIED,
|
|
|
|
port,
|
|
|
|
))),
|
|
|
|
);
|
|
|
|
|
|
|
|
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcpv6);
|
|
|
|
std::thread::sleep(SLEEP_EXTERNAL);
|
|
|
|
let (_r2, _n_a2, _p_a2, mut s1_a2, _n_b2, _p_b2, mut s1_b2) = network_participant_stream(tcpv4);
|
|
|
|
|
|
|
|
s1_a.send(42u32).unwrap();
|
|
|
|
s1_a2.send(1337u32).unwrap();
|
|
|
|
std::thread::sleep(SLEEP_EXTERNAL);
|
|
|
|
assert_eq!(s1_b.try_recv::<u32>(), Ok(Some(42u32)));
|
|
|
|
assert_eq!(s1_b2.try_recv::<u32>(), Ok(Some(1337u32)));
|
|
|
|
|
|
|
|
drop((s1_a, s1_b, _n_a, _n_b, _p_a, _p_b));
|
|
|
|
drop((s1_a2, s1_b2, _n_a2, _n_b2, _p_a2, _p_b2)); //clean teardown
|
|
|
|
}
|
2022-07-01 11:23:46 +00:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn check_correct_channel_events() {
|
|
|
|
let (_, _) = helper::setup(false, 0);
|
|
|
|
let con_addr = tcp();
|
|
|
|
let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(con_addr.clone());
|
|
|
|
|
|
|
|
let event_a = r.block_on(p_a.fetch_event()).unwrap();
|
|
|
|
let event_b = r.block_on(p_b.fetch_event()).unwrap();
|
|
|
|
if let ConnectAddr::Tcp(listen_addr) = con_addr.1 {
|
|
|
|
match event_a {
|
|
|
|
ParticipantEvent::ChannelCreated(ConnectAddr::Tcp(socket_addr)) => {
|
|
|
|
assert_ne!(socket_addr, listen_addr);
|
|
|
|
assert_eq!(socket_addr.ip(), std::net::Ipv4Addr::LOCALHOST);
|
|
|
|
},
|
|
|
|
e => panic!("wrong event {:?}", e),
|
|
|
|
}
|
|
|
|
match event_b {
|
|
|
|
ParticipantEvent::ChannelCreated(ConnectAddr::Tcp(socket_addr)) => {
|
|
|
|
assert_eq!(socket_addr, listen_addr);
|
|
|
|
},
|
|
|
|
e => panic!("wrong event {:?}", e),
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
unreachable!();
|
|
|
|
}
|
|
|
|
|
|
|
|
std::thread::sleep(SLEEP_EXTERNAL);
|
|
|
|
drop((_n_a, _n_b)); //drop network
|
|
|
|
|
|
|
|
let event_a = r.block_on(p_a.fetch_event()).unwrap();
|
|
|
|
let event_b = r.block_on(p_b.fetch_event()).unwrap();
|
|
|
|
if let ConnectAddr::Tcp(listen_addr) = con_addr.1 {
|
|
|
|
match event_a {
|
|
|
|
ParticipantEvent::ChannelDeleted(ConnectAddr::Tcp(socket_addr)) => {
|
|
|
|
assert_ne!(socket_addr, listen_addr);
|
|
|
|
assert_eq!(socket_addr.ip(), std::net::Ipv4Addr::LOCALHOST);
|
|
|
|
},
|
|
|
|
e => panic!("wrong event {:?}", e),
|
|
|
|
}
|
|
|
|
match event_b {
|
|
|
|
ParticipantEvent::ChannelDeleted(ConnectAddr::Tcp(socket_addr)) => {
|
|
|
|
assert_eq!(socket_addr, listen_addr);
|
|
|
|
},
|
|
|
|
e => panic!("wrong event {:?}", e),
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
unreachable!();
|
|
|
|
}
|
|
|
|
|
|
|
|
drop((p_a, p_b)); //clean teardown
|
|
|
|
}
|