veloren/network/tests/closing.rs

389 lines
14 KiB
Rust
Raw Normal View History

//! How to read those tests:
//! - in the first line we call the helper, this is only debug code. in case
//! you want to have tracing for a special test you set set the bool = true
//! and the sleep to 10000 and your test will start 10 sec delayed with
2020-08-25 12:21:25 +00:00
//! tracing. You need a delay as otherwise the other tests pollute your trace
//! - the second line is to simulate a client and a server
//! `network_participant_stream` will return
//! - 2 networks
//! - 2 participants
//! - 2 streams
//! each one `linked` to their counterpart.
//! You see a cryptic use of rust `_` this is because we are testing the
//! `drop` behavior here.
//! - A `_` means this is directly dropped after the line executes, thus
//! immediately executing its `Drop` impl.
//! - A `_p1_a` e.g. means we don't use that Participant yet, but we must
//! not `drop` it yet as we might want to use the Streams.
//! - You sometimes see sleep(1000ms) this is used when we rely on the
//! underlying TCP functionality, as this simulates client and server
use async_std::task;
use task::block_on;
use veloren_network::{Network, ParticipantError, Pid, Promises, StreamError};
mod helper;
use helper::{network_participant_stream, tcp};
#[test]
fn close_network() {
let (_, _) = helper::setup(false, 0);
let (_, _p1_a, mut s1_a, _, _p1_b, mut s1_b) = block_on(network_participant_stream(tcp()));
2020-07-05 18:56:06 +00:00
std::thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed));
let msg1: Result<String, _> = block_on(s1_b.recv());
assert_eq!(msg1, Err(StreamError::StreamClosed));
}
#[test]
fn close_participant() {
let (_, _) = helper::setup(false, 0);
let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp()));
block_on(p1_a.disconnect()).unwrap();
block_on(p1_b.disconnect()).unwrap();
assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed));
assert_eq!(
block_on(s1_b.recv::<String>()),
Err(StreamError::StreamClosed)
);
}
#[test]
fn close_stream() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _, mut s1_a, _n_b, _, _) = block_on(network_participant_stream(tcp()));
// s1_b is dropped directly while s1_a isn't
2020-07-05 18:56:06 +00:00
std::thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed));
assert_eq!(
block_on(s1_a.recv::<String>()),
Err(StreamError::StreamClosed)
);
}
Fixing the DEADLOCK in handshake -> channel creation - this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :) - When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport however the protocol could already catch non handshake data any more and push in into this mpsc::Channel. Then this channel got dropped and a fresh one was created for the network::Channel. These droped Frames are ofc a BUG! I tried multiple things to solve this: - dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1. This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)> to handle ALL the network::channel. If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out Bad Idea... - using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the scheduler doesnt know the remote_pid yet - i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what So i switched over to the simply method now: - Do everything like before with 2 mpsc::Channels - after the handshake. close the receiver and listen for all remaining (cid, frame) combinations - when starting the channel, reapply them to the new sender/listener combination - added tracing - switched Protocol RwLock to Mutex, as it's only ever 1 - Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema - Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail - fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed - add extra test to verify that a send message is received even if the Stream is already closed - changed OutGoing to Outgoing - fixed a bug that `metrics.tick()` was never called - removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
///THIS is actually a bug which currently luckily doesn't trigger, but with new
/// async-std WE must make sure, if a stream is `drop`ed inside a `block_on`,
/// that no panic is thrown.
#[test]
fn close_streams_in_block_on() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, s1_a, _n_b, _p_b, s1_b) = block_on(network_participant_stream(tcp()));
block_on(async {
//make it locally so that they are dropped later
let mut s1_a = s1_a;
let mut s1_b = s1_b;
s1_a.send("ping").unwrap();
assert_eq!(s1_b.recv().await, Ok("ping".to_string()));
drop(s1_a);
});
}
#[test]
fn stream_simple_3msg_then_close() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
s1_a.send(1u8).unwrap();
s1_a.send(42).unwrap();
s1_a.send("3rdMessage").unwrap();
assert_eq!(block_on(s1_b.recv()), Ok(1u8));
assert_eq!(block_on(s1_b.recv()), Ok(42));
assert_eq!(block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
drop(s1_a);
2020-07-05 18:56:06 +00:00
std::thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(s1_b.send("Hello World"), Err(StreamError::StreamClosed));
}
#[test]
fn stream_send_first_then_receive() {
// recv should still be possible even if stream got closed if they are in queue
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
s1_a.send(1u8).unwrap();
s1_a.send(42).unwrap();
s1_a.send("3rdMessage").unwrap();
drop(s1_a);
2020-07-05 18:56:06 +00:00
std::thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(block_on(s1_b.recv()), Ok(1u8));
assert_eq!(block_on(s1_b.recv()), Ok(42));
assert_eq!(block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
assert_eq!(s1_b.send("Hello World"), Err(StreamError::StreamClosed));
}
Fixing the DEADLOCK in handshake -> channel creation - this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :) - When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport however the protocol could already catch non handshake data any more and push in into this mpsc::Channel. Then this channel got dropped and a fresh one was created for the network::Channel. These droped Frames are ofc a BUG! I tried multiple things to solve this: - dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1. This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)> to handle ALL the network::channel. If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out Bad Idea... - using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the scheduler doesnt know the remote_pid yet - i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what So i switched over to the simply method now: - Do everything like before with 2 mpsc::Channels - after the handshake. close the receiver and listen for all remaining (cid, frame) combinations - when starting the channel, reapply them to the new sender/listener combination - added tracing - switched Protocol RwLock to Mutex, as it's only ever 1 - Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema - Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail - fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed - add extra test to verify that a send message is received even if the Stream is already closed - changed OutGoing to Outgoing - fixed a bug that `metrics.tick()` was never called - removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
#[test]
fn stream_send_1_then_close_stream() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
Fixing the DEADLOCK in handshake -> channel creation - this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :) - When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport however the protocol could already catch non handshake data any more and push in into this mpsc::Channel. Then this channel got dropped and a fresh one was created for the network::Channel. These droped Frames are ofc a BUG! I tried multiple things to solve this: - dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1. This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)> to handle ALL the network::channel. If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out Bad Idea... - using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the scheduler doesnt know the remote_pid yet - i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what So i switched over to the simply method now: - Do everything like before with 2 mpsc::Channels - after the handshake. close the receiver and listen for all remaining (cid, frame) combinations - when starting the channel, reapply them to the new sender/listener combination - added tracing - switched Protocol RwLock to Mutex, as it's only ever 1 - Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema - Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail - fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed - add extra test to verify that a send message is received even if the Stream is already closed - changed OutGoing to Outgoing - fixed a bug that `metrics.tick()` was never called - removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
s1_a.send("this message must be received, even if stream is closed already!")
.unwrap();
drop(s1_a);
2020-07-05 18:56:06 +00:00
std::thread::sleep(std::time::Duration::from_millis(1000));
Fixing the DEADLOCK in handshake -> channel creation - this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :) - When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport however the protocol could already catch non handshake data any more and push in into this mpsc::Channel. Then this channel got dropped and a fresh one was created for the network::Channel. These droped Frames are ofc a BUG! I tried multiple things to solve this: - dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1. This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)> to handle ALL the network::channel. If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out Bad Idea... - using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the scheduler doesnt know the remote_pid yet - i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what So i switched over to the simply method now: - Do everything like before with 2 mpsc::Channels - after the handshake. close the receiver and listen for all remaining (cid, frame) combinations - when starting the channel, reapply them to the new sender/listener combination - added tracing - switched Protocol RwLock to Mutex, as it's only ever 1 - Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema - Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail - fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed - add extra test to verify that a send message is received even if the Stream is already closed - changed OutGoing to Outgoing - fixed a bug that `metrics.tick()` was never called - removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
let exp = Ok("this message must be received, even if stream is closed already!".to_string());
assert_eq!(block_on(s1_b.recv()), exp);
println!("all received and done");
}
#[test]
fn stream_send_100000_then_close_stream() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
drop(s1_a);
let exp = Ok("woop_PARTY_HARD_woop".to_string());
println!("start receiving");
block_on(async {
for _ in 0..100000 {
assert_eq!(s1_b.recv().await, exp);
}
});
println!("all received and done");
}
#[test]
fn stream_send_100000_then_close_stream_remote() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
drop(s1_a);
drop(_s1_b);
//no receiving
}
#[test]
fn stream_send_100000_then_close_stream_remote2() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
drop(_s1_b);
2020-07-05 18:56:06 +00:00
std::thread::sleep(std::time::Duration::from_millis(1000));
drop(s1_a);
//no receiving
}
#[test]
fn stream_send_100000_then_close_stream_remote3() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
drop(_s1_b);
2020-07-05 18:56:06 +00:00
std::thread::sleep(std::time::Duration::from_millis(1000));
drop(s1_a);
//no receiving
}
#[test]
fn close_part_then_network() {
let (_, _) = helper::setup(false, 0);
let (n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
for _ in 0..1000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
drop(n_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
}
#[test]
fn close_network_then_part() {
let (_, _) = helper::setup(false, 0);
let (n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
for _ in 0..1000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
drop(n_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
}
#[test]
fn close_network_then_disconnect_part() {
let (_, _) = helper::setup(false, 0);
let (n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
for _ in 0..1000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
drop(n_a);
assert!(block_on(p_a.disconnect()).is_err());
std::thread::sleep(std::time::Duration::from_millis(1000));
}
#[test]
fn opened_stream_before_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp()));
let mut s2_a = block_on(p_a.open(10, Promises::empty())).unwrap();
s2_a.send("HelloWorld").unwrap();
let mut s2_b = block_on(p_b.opened()).unwrap();
drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(block_on(s2_b.recv()), Ok("HelloWorld".to_string()));
}
#[test]
fn opened_stream_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp()));
let mut s2_a = block_on(p_a.open(10, Promises::empty())).unwrap();
s2_a.send("HelloWorld").unwrap();
drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
let mut s2_b = block_on(p_b.opened()).unwrap();
assert_eq!(block_on(s2_b.recv()), Ok("HelloWorld".to_string()));
assert_eq!(
block_on(p_b.opened()).unwrap_err(),
ParticipantError::ParticipantDisconnected
);
}
#[test]
fn open_stream_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp()));
let mut s2_a = block_on(p_a.open(10, Promises::empty())).unwrap();
s2_a.send("HelloWorld").unwrap();
drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
let mut s2_b = block_on(p_b.opened()).unwrap();
assert_eq!(block_on(s2_b.recv()), Ok("HelloWorld".to_string()));
assert_eq!(
block_on(p_b.open(20, Promises::empty())).unwrap_err(),
ParticipantError::ParticipantDisconnected
);
}
#[test]
fn failed_stream_open_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp()));
drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(
block_on(p_b.opened()).unwrap_err(),
ParticipantError::ParticipantDisconnected
);
}
#[test]
fn open_participant_before_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
Fixed the unclean disconnecting of participants. Till now, we just dropped the TCP connection and registered this as a clean shutdown. The prodocol reader intereted this and send a Frame::Shutdown frame to it's local processor. This is ofc wrong. So now the protocol reader will detect a Frame::Shutdown frame and send it over. if the Tcp connection gets closed it will return an Error up. The processor will then pick up this error and request a unclear shutdown and notifies the user. Also when doing a clean shutdown we are sending a Frame::Shutdown now to the remote side to trigger this behavior. Before we wrongly added the feature of only using a `select` in channel. This is WRONG, as it could mean that the write maybe fails, but the read still had some Frames buffered which then get dropped. Its fixed now by the clean shutdown mechanims defined before. Also when a channel is closed now inside a participant we are closing the whole participant as a protection. However, we must not close the recv channel as the `handle_frames_mgr` might still be working on them, so we only stop writing/sending. Debugging this also let me introduce some smaller fixes: - PID in tests are now 0 and 1+1*64+1*64*64+... this makes the traces appear as AAAAAA and BBBBBB instead of ABAAAA and ACAAAA - veloren client now better seperates between clean shutdown and unclear shutdown. - added a new type: C2pFrame for `(cid, Result<Frame, ()>)` - wrong frames inside the handshare are not counted in metrics -
2020-08-21 12:01:49 +00:00
let (n_a, f) = Network::new(Pid::fake(0));
std::thread::spawn(f);
Fixed the unclean disconnecting of participants. Till now, we just dropped the TCP connection and registered this as a clean shutdown. The prodocol reader intereted this and send a Frame::Shutdown frame to it's local processor. This is ofc wrong. So now the protocol reader will detect a Frame::Shutdown frame and send it over. if the Tcp connection gets closed it will return an Error up. The processor will then pick up this error and request a unclear shutdown and notifies the user. Also when doing a clean shutdown we are sending a Frame::Shutdown now to the remote side to trigger this behavior. Before we wrongly added the feature of only using a `select` in channel. This is WRONG, as it could mean that the write maybe fails, but the read still had some Frames buffered which then get dropped. Its fixed now by the clean shutdown mechanims defined before. Also when a channel is closed now inside a participant we are closing the whole participant as a protection. However, we must not close the recv channel as the `handle_frames_mgr` might still be working on them, so we only stop writing/sending. Debugging this also let me introduce some smaller fixes: - PID in tests are now 0 and 1+1*64+1*64*64+... this makes the traces appear as AAAAAA and BBBBBB instead of ABAAAA and ACAAAA - veloren client now better seperates between clean shutdown and unclear shutdown. - added a new type: C2pFrame for `(cid, Result<Frame, ()>)` - wrong frames inside the handshare are not counted in metrics -
2020-08-21 12:01:49 +00:00
let (n_b, f) = Network::new(Pid::fake(1));
std::thread::spawn(f);
let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap();
let p_b = block_on(n_b.connect(addr)).unwrap();
let mut s1_b = block_on(p_b.open(10, Promises::empty())).unwrap();
s1_b.send("HelloWorld").unwrap();
let p_a = block_on(n_a.connected()).unwrap();
drop(s1_b);
drop(p_b);
drop(n_b);
std::thread::sleep(std::time::Duration::from_millis(1000));
let mut s1_a = block_on(p_a.opened()).unwrap();
assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string()));
}
#[test]
fn open_participant_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
Fixed the unclean disconnecting of participants. Till now, we just dropped the TCP connection and registered this as a clean shutdown. The prodocol reader intereted this and send a Frame::Shutdown frame to it's local processor. This is ofc wrong. So now the protocol reader will detect a Frame::Shutdown frame and send it over. if the Tcp connection gets closed it will return an Error up. The processor will then pick up this error and request a unclear shutdown and notifies the user. Also when doing a clean shutdown we are sending a Frame::Shutdown now to the remote side to trigger this behavior. Before we wrongly added the feature of only using a `select` in channel. This is WRONG, as it could mean that the write maybe fails, but the read still had some Frames buffered which then get dropped. Its fixed now by the clean shutdown mechanims defined before. Also when a channel is closed now inside a participant we are closing the whole participant as a protection. However, we must not close the recv channel as the `handle_frames_mgr` might still be working on them, so we only stop writing/sending. Debugging this also let me introduce some smaller fixes: - PID in tests are now 0 and 1+1*64+1*64*64+... this makes the traces appear as AAAAAA and BBBBBB instead of ABAAAA and ACAAAA - veloren client now better seperates between clean shutdown and unclear shutdown. - added a new type: C2pFrame for `(cid, Result<Frame, ()>)` - wrong frames inside the handshare are not counted in metrics -
2020-08-21 12:01:49 +00:00
let (n_a, f) = Network::new(Pid::fake(0));
std::thread::spawn(f);
Fixed the unclean disconnecting of participants. Till now, we just dropped the TCP connection and registered this as a clean shutdown. The prodocol reader intereted this and send a Frame::Shutdown frame to it's local processor. This is ofc wrong. So now the protocol reader will detect a Frame::Shutdown frame and send it over. if the Tcp connection gets closed it will return an Error up. The processor will then pick up this error and request a unclear shutdown and notifies the user. Also when doing a clean shutdown we are sending a Frame::Shutdown now to the remote side to trigger this behavior. Before we wrongly added the feature of only using a `select` in channel. This is WRONG, as it could mean that the write maybe fails, but the read still had some Frames buffered which then get dropped. Its fixed now by the clean shutdown mechanims defined before. Also when a channel is closed now inside a participant we are closing the whole participant as a protection. However, we must not close the recv channel as the `handle_frames_mgr` might still be working on them, so we only stop writing/sending. Debugging this also let me introduce some smaller fixes: - PID in tests are now 0 and 1+1*64+1*64*64+... this makes the traces appear as AAAAAA and BBBBBB instead of ABAAAA and ACAAAA - veloren client now better seperates between clean shutdown and unclear shutdown. - added a new type: C2pFrame for `(cid, Result<Frame, ()>)` - wrong frames inside the handshare are not counted in metrics -
2020-08-21 12:01:49 +00:00
let (n_b, f) = Network::new(Pid::fake(1));
std::thread::spawn(f);
let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap();
let p_b = block_on(n_b.connect(addr)).unwrap();
let mut s1_b = block_on(p_b.open(10, Promises::empty())).unwrap();
s1_b.send("HelloWorld").unwrap();
drop(s1_b);
drop(p_b);
drop(n_b);
std::thread::sleep(std::time::Duration::from_millis(1000));
let p_a = block_on(n_a.connected()).unwrap();
let mut s1_a = block_on(p_a.opened()).unwrap();
assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string()));
}
#[test]
fn close_network_scheduler_completely() {
let (_, _) = helper::setup(false, 0);
let (n_a, f) = Network::new(Pid::fake(0));
let ha = std::thread::spawn(f);
let (n_b, f) = Network::new(Pid::fake(1));
let hb = std::thread::spawn(f);
let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap();
let p_b = block_on(n_b.connect(addr)).unwrap();
let mut s1_b = block_on(p_b.open(10, Promises::empty())).unwrap();
s1_b.send("HelloWorld").unwrap();
let p_a = block_on(n_a.connected()).unwrap();
let mut s1_a = block_on(p_a.opened()).unwrap();
assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string()));
drop(n_a);
drop(n_b);
std::thread::sleep(std::time::Duration::from_millis(1000));
ha.join().unwrap();
hb.join().unwrap();
}
#[test]
fn dont_panic_on_multiply_recv_after_close() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
s1_a.send(11u32).unwrap();
drop(s1_a);
std::thread::sleep(std::time::Duration::from_secs(1));
assert_eq!(s1_b.try_recv::<u32>(), Ok(Some(11u32)));
assert_eq!(s1_b.try_recv::<String>(), Err(StreamError::StreamClosed));
// There was a "Feature" in futures::channels that they panic when you call recv
// a second time after it showed end of stream
assert_eq!(s1_b.try_recv::<String>(), Err(StreamError::StreamClosed));
}
#[test]
fn dont_panic_on_recv_send_after_close() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
s1_a.send(11u32).unwrap();
drop(s1_a);
std::thread::sleep(std::time::Duration::from_secs(1));
assert_eq!(s1_b.try_recv::<u32>(), Ok(Some(11u32)));
assert_eq!(s1_b.try_recv::<String>(), Err(StreamError::StreamClosed));
assert_eq!(s1_b.send("foobar"), Err(StreamError::StreamClosed));
}
#[test]
fn dont_panic_on_multiple_send_after_close() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
s1_a.send(11u32).unwrap();
drop(s1_a);
drop(_p_a);
std::thread::sleep(std::time::Duration::from_secs(1));
assert_eq!(s1_b.try_recv::<u32>(), Ok(Some(11u32)));
assert_eq!(s1_b.try_recv::<String>(), Err(StreamError::StreamClosed));
assert_eq!(s1_b.send("foobar"), Err(StreamError::StreamClosed));
assert_eq!(s1_b.send("foobar"), Err(StreamError::StreamClosed));
}