Fixed the unclean disconnecting of participants.

Till now, we just dropped the TCP connection and registered this as a clean shutdown.
The prodocol reader intereted this and send a Frame::Shutdown frame to it's local processor.
This is ofc wrong.
So now the protocol reader will detect a Frame::Shutdown frame and send it over. if the Tcp connection gets closed it will return an Error up.
The processor will then pick up this error and request a unclear shutdown and notifies the user.
Also when doing a clean shutdown we are sending a Frame::Shutdown now to the remote side to trigger this behavior.

Before we wrongly added the feature of only using a `select` in channel. This is WRONG,
 as it could mean that the write maybe fails, but the read still had some Frames buffered which then get dropped.
Its fixed now by the clean shutdown mechanims defined before.

Also when a channel is closed now inside a participant we are closing the whole participant as a protection.
However, we must not close the recv channel as the `handle_frames_mgr` might still be working on them, so we only stop writing/sending.

Debugging this also let me introduce some smaller fixes:
 - PID in tests are now 0 and 1+1*64+1*64*64+... this makes the traces appear as AAAAAA and BBBBBB instead of ABAAAA and ACAAAA
 - veloren client now better seperates between clean shutdown and unclear shutdown.
 - added a new type: C2pFrame for `(cid, Result<Frame, ()>)`
 - wrong frames inside the handshare are not counted in metrics
 -
This commit is contained in:
Marcel Märtens 2020-08-21 14:01:49 +02:00
parent daffe8bee3
commit 926d334082
7 changed files with 167 additions and 105 deletions

@ -1383,8 +1383,10 @@ impl Client {
);
},
ServerMsg::Disconnect => {
debug!("finally sendinge ClientMsg::Terminate");
frontend_events.push(Event::Disconnect);
self.singleton_stream.send(ClientMsg::Terminate)?;
break Ok(());
},
ServerMsg::CharacterListUpdate(character_list) => {
self.character_list.characters = character_list;

@ -1,6 +1,7 @@
#[cfg(feature = "metrics")]
use crate::metrics::NetworkMetrics;
use crate::{
participant::C2pFrame,
protocols::Protocols,
types::{
Cid, Frame, Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER,
@ -9,7 +10,7 @@ use crate::{
};
use futures::{
channel::{mpsc, oneshot},
join, select,
join,
sink::SinkExt,
stream::StreamExt,
FutureExt,
@ -41,8 +42,8 @@ impl Channel {
pub async fn run(
mut self,
protocol: Protocols,
mut w2c_cid_frame_s: mpsc::UnboundedSender<(Cid, Frame)>,
mut leftover_cid_frame: Vec<(Cid, Frame)>,
mut w2c_cid_frame_s: mpsc::UnboundedSender<C2pFrame>,
mut leftover_cid_frame: Vec<C2pFrame>,
) {
let c2w_frame_r = self.c2w_frame_r.take().unwrap();
let read_stop_receiver = self.read_stop_receiver.take().unwrap();
@ -58,15 +59,15 @@ impl Channel {
trace!(?self.cid, "Start up channel");
match protocol {
Protocols::Tcp(tcp) => {
select!(
_ = tcp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver).fuse() => (),
_ = tcp.write_to_wire(self.cid, c2w_frame_r).fuse() => (),
join!(
tcp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver),
tcp.write_to_wire(self.cid, c2w_frame_r),
);
},
Protocols::Udp(udp) => {
select!(
_ = udp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver).fuse() => (),
_ = udp.write_to_wire(self.cid, c2w_frame_r).fuse() => (),
join!(
udp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver),
udp.write_to_wire(self.cid, c2w_frame_r),
);
},
}
@ -113,12 +114,9 @@ impl Handshake {
}
}
pub async fn setup(
self,
protocol: &Protocols,
) -> Result<(Pid, Sid, u128, Vec<(Cid, Frame)>), ()> {
pub async fn setup(self, protocol: &Protocols) -> Result<(Pid, Sid, u128, Vec<C2pFrame>), ()> {
let (c2w_frame_s, c2w_frame_r) = mpsc::unbounded::<Frame>();
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>();
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<C2pFrame>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
let handler_future =
@ -160,7 +158,7 @@ impl Handshake {
async fn frame_handler(
&self,
w2c_cid_frame_r: &mut mpsc::UnboundedReceiver<(Cid, Frame)>,
w2c_cid_frame_r: &mut mpsc::UnboundedReceiver<C2pFrame>,
mut c2w_frame_s: mpsc::UnboundedSender<Frame>,
read_stop_sender: oneshot::Sender<()>,
) -> Result<(Pid, Sid, u128), ()> {
@ -176,7 +174,7 @@ impl Handshake {
let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame);
#[cfg(feature = "metrics")]
{
if let Some(ref frame) = frame {
if let Some(Ok(ref frame)) = frame {
self.metrics
.frames_in_total
.with_label_values(&["", &cid_string, &frame.get_string()])
@ -184,10 +182,10 @@ impl Handshake {
}
}
let r = match frame {
Some(Frame::Handshake {
Some(Ok(Frame::Handshake {
magic_number,
version,
}) => {
})) => {
trace!(?magic_number, ?version, "Recv handshake");
if magic_number != VELOREN_MAGIC_NUMBER {
error!(?magic_number, "Connection with invalid magic_number");
@ -221,18 +219,24 @@ impl Handshake {
Ok(())
}
},
Some(Frame::Shutdown) => {
info!("Shutdown signal received");
Err(())
},
Some(Frame::Raw(bytes)) => {
match std::str::from_utf8(bytes.as_slice()) {
Ok(string) => error!(?string, ERR_S),
_ => error!(?bytes, ERR_S),
Some(Ok(frame)) => {
#[cfg(feature = "metrics")]
self.metrics
.frames_in_total
.with_label_values(&["", &cid_string, frame.get_string()])
.inc();
if let Frame::Raw(bytes) = frame {
match std::str::from_utf8(bytes.as_slice()) {
Ok(string) => error!(?string, ERR_S),
_ => error!(?bytes, ERR_S),
}
}
Err(())
},
Some(_) => Err(()),
Some(Err(())) => {
info!("Protocol got interrupted");
Err(())
},
None => Err(()),
};
if let Err(()) = r {
@ -248,7 +252,7 @@ impl Handshake {
let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame);
let r = match frame {
Some(Frame::Init { pid, secret }) => {
Some(Ok(Frame::Init { pid, secret })) => {
debug!(?pid, "Participant send their ID");
let pid_string = pid.to_string();
#[cfg(feature = "metrics")]
@ -265,22 +269,24 @@ impl Handshake {
info!(?pid, "This Handshake is now configured!");
Ok((pid, stream_id_offset, secret))
},
Some(frame) => {
Some(Ok(frame)) => {
#[cfg(feature = "metrics")]
self.metrics
.frames_in_total
.with_label_values(&["", &cid_string, frame.get_string()])
.inc();
match frame {
Frame::Shutdown => info!("Shutdown signal received"),
Frame::Raw(bytes) => match std::str::from_utf8(bytes.as_slice()) {
if let Frame::Raw(bytes) = frame {
match std::str::from_utf8(bytes.as_slice()) {
Ok(string) => error!(?string, ERR_S),
_ => error!(?bytes, ERR_S),
},
_ => (),
}
}
Err(())
},
Some(Err(())) => {
info!("Protocol got interrupted");
Err(())
},
None => Err(()),
};
if r.is_err() {

@ -28,7 +28,8 @@ use tracing::*;
use tracing_futures::Instrument;
pub(crate) type A2bStreamOpen = (Prio, Promises, oneshot::Sender<Stream>);
pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>);
pub(crate) type C2pFrame = (Cid, Result<Frame, ()>);
pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, Vec<C2pFrame>, oneshot::Sender<()>);
pub(crate) type S2bShutdownBparticipant = oneshot::Sender<Result<(), ParticipantError>>;
pub(crate) type B2sPrioStatistic = (Pid, u64, u64);
@ -143,7 +144,7 @@ impl BParticipant {
oneshot::channel();
let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel();
let (b2b_prios_flushed_s, b2b_prios_flushed_r) = oneshot::channel();
let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<(Cid, Frame)>();
let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<C2pFrame>();
let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = PrioManager::new(
#[cfg(feature = "metrics")]
self.metrics.clone(),
@ -255,27 +256,29 @@ impl BParticipant {
//TODO: just take first
let mut lock = self.channels.write().await;
if let Some(ci) = lock.values_mut().next() {
//note: this is technically wrong we should only increase when it succeeded,
// but this requiered me to clone `frame` which is a to big
// performance impact for error handling
//we are increasing metrics without checking the result to please
// borrow_checker. otherwise we would need to close `frame` what we
// dont want!
#[cfg(feature = "metrics")]
frames_out_total_cache
.with_label_values(ci.cid, &frame)
.inc();
if let Err(e) = ci.b2w_frame_s.send(frame).await {
let cid = ci.cid;
warn!(
?e,
?cid,
"channel no longer available, maybe cleanup in process?"
);
info!(?e, ?cid, "channel no longer available");
if let Some(ci) = self.channels.write().await.remove(&cid) {
trace!(?cid, "stopping read protocol");
if let Err(e) = ci.b2r_read_shutdown.send(()) {
trace!(?cid, ?e, "seems like was already shut down");
}
}
//TODO FIXME tags: takeover channel multiple
info!(
"FIXME: the frame is actually drop. which is fine for now as the participant \
will be closed, but not if we do channel-takeover"
);
//TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant
self.close_api(Some(ParticipantError::ProtocolFailedUnrecoverable))
self.close_write_api(Some(ParticipantError::ProtocolFailedUnrecoverable))
.await;
false
} else {
@ -297,16 +300,13 @@ impl BParticipant {
} else {
guard.1 += 1;
}
//TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant
self.close_api(Some(ParticipantError::ProtocolFailedUnrecoverable))
.await;
false
}
}
async fn handle_frames_mgr(
&self,
mut w2b_frames_r: mpsc::UnboundedReceiver<(Cid, Frame)>,
mut w2b_frames_r: mpsc::UnboundedReceiver<C2pFrame>,
mut b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
@ -318,8 +318,22 @@ impl BParticipant {
let mut dropped_cnt = 0u64;
let mut dropped_sid = Sid::new(0);
while let Some((cid, frame)) = w2b_frames_r.next().await {
//trace!("handling frame");
while let Some((cid, result_frame)) = w2b_frames_r.next().await {
//trace!(?result_frame, "handling frame");
let frame = match result_frame {
Ok(frame) => frame,
Err(()) => {
// The read protocol stopped, i need to make sure that write gets stopped
debug!("read protocol was closed. Stopping write protocol");
if let Some(ci) = self.channels.write().await.get_mut(&cid) {
ci.b2w_frame_s
.close()
.await
.expect("couldn't stop write protocol");
}
continue;
},
};
#[cfg(feature = "metrics")]
{
let cid_string = cid.to_string();
@ -328,8 +342,6 @@ impl BParticipant {
.with_label_values(&[&self.remote_pid_string, &cid_string, frame.get_string()])
.inc();
}
#[cfg(not(feature = "metrics"))]
let _cid = cid;
match frame {
Frame::OpenStream {
sid,
@ -400,7 +412,7 @@ impl BParticipant {
false
};
if finished {
//debug!(?mid, "finished receiving message");
//trace!(?mid, "finished receiving message");
let imsg = messages.remove(&mid).unwrap();
if let Some(si) = self.streams.write().await.get_mut(&imsg.sid) {
if let Err(e) = si.b2a_msg_recv_s.send(imsg).await {
@ -453,7 +465,7 @@ impl BParticipant {
async fn create_channel_mgr(
&self,
s2b_create_channel_r: mpsc::UnboundedReceiver<S2bCreateChannel>,
w2b_frames_s: mpsc::UnboundedSender<(Cid, Frame)>,
w2b_frames_s: mpsc::UnboundedSender<C2pFrame>,
) {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("Start create_channel_mgr");
@ -489,17 +501,13 @@ impl BParticipant {
.channels_disconnected_total
.with_label_values(&[&self.remote_pid_string])
.inc();
info!(?cid, "Channel got closed, cleaning up");
//stopping read in case write triggered the failure
if let Some(ci) = channels.write().await.remove(&cid) {
if let Err(e) = ci.b2r_read_shutdown.send(()) {
debug!(?e, "read channel was already shut down");
};
} //None means it prob got closed by closing the participant
info!(?cid, "Channel got closed");
//maybe channel got already dropped, we don't know.
channels.write().await.remove(&cid);
trace!(?cid, "Channel cleanup completed");
//TEMP FIX: as we dont have channel takeover yet drop the whole
// bParticipant
self.close_api(None).await;
self.close_write_api(None).await;
}
},
)
@ -578,6 +586,9 @@ impl BParticipant {
trace!("Start participant_shutdown_mgr");
let sender = s2b_shutdown_bparticipant_r.await.unwrap();
let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
self.close_api(None).await;
debug!("Closing all managers");
@ -588,6 +599,21 @@ impl BParticipant {
}
b2b_prios_flushed_r.await.unwrap();
if Some(ParticipantError::ParticipantDisconnected) != self.shutdown_info.read().await.error
{
debug!("Sending shutdown frame after flushed all prios");
if !self
.send_frame(
Frame::Shutdown,
#[cfg(feature = "metrics")]
&mut send_cache,
)
.await
{
warn!("couldn't send shutdown frame, are channels already closed?");
}
}
debug!("Closing all channels, after flushed prios");
for (cid, ci) in self.channels.write().await.drain() {
if let Err(e) = ci.b2r_read_shutdown.send(()) {
@ -732,21 +758,29 @@ impl BParticipant {
)
}
/// close streams and set err
async fn close_api(&self, reason: Option<ParticipantError>) {
//closing api::Participant is done by closing all channels, exepct for the
// shutdown channel at this point!
async fn close_write_api(&self, reason: Option<ParticipantError>) {
trace!(?reason, "close_api");
let mut lock = self.shutdown_info.write().await;
if let Some(r) = reason {
lock.error = Some(r);
}
lock.b2a_stream_opened_s.close_channel();
debug!("Closing all streams for write");
for (sid, si) in self.streams.write().await.iter() {
trace!(?sid, "Shutting down Stream for write");
si.send_closed.store(true, Ordering::Relaxed);
}
}
///closing api::Participant is done by closing all channels, exepct for the
/// shutdown channel at this point!
async fn close_api(&self, reason: Option<ParticipantError>) {
self.close_write_api(reason).await;
debug!("Closing all streams");
for (sid, si) in self.streams.write().await.drain() {
trace!(?sid, "Shutting down Stream");
si.b2a_msg_recv_s.close_channel();
si.send_closed.store(true, Ordering::Relaxed);
}
}
}

@ -1,6 +1,9 @@
#[cfg(feature = "metrics")]
use crate::metrics::{CidFrameCache, NetworkMetrics};
use crate::types::{Cid, Frame, Mid, Pid, Sid};
use crate::{
participant::C2pFrame,
types::{Cid, Frame, Mid, Pid, Sid},
};
use async_std::{
net::{TcpStream, UdpSocket},
prelude::*,
@ -68,9 +71,11 @@ impl TcpProtocol {
/// read_except and if it fails, close the protocol
async fn read_or_close(
cid: Cid,
mut stream: &TcpStream,
mut bytes: &mut [u8],
mut end_receiver: &mut Fuse<oneshot::Receiver<()>>,
w2c_cid_frame_s: &mut mpsc::UnboundedSender<C2pFrame>,
) -> bool {
match select! {
r = stream.read_exact(&mut bytes).fuse() => Some(r),
@ -79,6 +84,13 @@ impl TcpProtocol {
Some(Ok(_)) => false,
Some(Err(e)) => {
info!(?e, "Closing tcp protocol due to read error");
//w2c_cid_frame_s is shared, dropping it wouldn't notify the receiver as every
// channel is holding a sender! thats why Ne need a explicit
// STOP here
w2c_cid_frame_s
.send((cid, Err(())))
.await
.expect("Channel or Participant seems no longer to exist");
true
},
None => {
@ -91,7 +103,7 @@ impl TcpProtocol {
pub async fn read_from_wire(
&self,
cid: Cid,
w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>,
w2c_cid_frame_s: &mut mpsc::UnboundedSender<C2pFrame>,
end_r: oneshot::Receiver<()>,
) {
trace!("Starting up tcp read()");
@ -107,7 +119,7 @@ impl TcpProtocol {
macro_rules! read_or_close {
($x:expr) => {
if TcpProtocol::read_or_close(&stream, $x, &mut end_r).await {
if TcpProtocol::read_or_close(cid, &stream, $x, &mut end_r, w2c_cid_frame_s).await {
trace!("read_or_close requested a shutdown");
break;
}
@ -202,7 +214,7 @@ impl TcpProtocol {
#[cfg(feature = "metrics")]
metrics_cache.with_label_values(&frame).inc();
w2c_cid_frame_s
.send((cid, frame))
.send((cid, Ok(frame)))
.await
.expect("Channel or Participant seems no longer to exist");
}
@ -331,7 +343,7 @@ impl UdpProtocol {
pub async fn read_from_wire(
&self,
cid: Cid,
w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>,
w2c_cid_frame_s: &mut mpsc::UnboundedSender<C2pFrame>,
end_r: oneshot::Receiver<()>,
) {
trace!("Starting up udp read()");
@ -345,7 +357,14 @@ impl UdpProtocol {
let mut data_in = self.data_in.lock().await;
let mut end_r = end_r.fuse();
while let Some(bytes) = select! {
r = data_in.next().fuse() => r,
r = data_in.next().fuse() => match r {
Some(r) => Some(r),
None => {
info!("Udp read ended");
w2c_cid_frame_s.send((cid, Err(()))).await.expect("Channel or Participant seems no longer to exist");
None
}
},
_ = end_r => None,
} {
trace!("Got raw UDP message with len: {}", bytes.len());
@ -443,7 +462,7 @@ impl UdpProtocol {
};
#[cfg(feature = "metrics")]
metrics_cache.with_label_values(&frame).inc();
w2c_cid_frame_s.send((cid, frame)).await.unwrap();
w2c_cid_frame_s.send((cid, Ok(frame))).await.unwrap();
}
trace!("Shutting down udp read()");
}
@ -554,10 +573,7 @@ impl UdpProtocol {
#[cfg(test)]
mod tests {
use super::*;
use crate::{
metrics::NetworkMetrics,
types::{Cid, Pid},
};
use crate::{metrics::NetworkMetrics, types::Pid};
use async_std::net;
use futures::{executor::block_on, stream::StreamExt};
use std::sync::Arc;
@ -584,7 +600,7 @@ mod tests {
client.flush();
//handle data
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>();
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<C2pFrame>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
let cid2 = cid;
let t = std::thread::spawn(move || {
@ -597,10 +613,10 @@ mod tests {
//async_std::task::sleep(std::time::Duration::from_millis(1000));
let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap();
assert_eq!(cid, cid_r);
if let Frame::Handshake {
if let Ok(Frame::Handshake {
magic_number,
version,
} = frame
}) = frame
{
assert_eq!(&magic_number, b"HELLOWO");
assert_eq!(version, [1337, 0, 42]);
@ -633,7 +649,7 @@ mod tests {
client.flush();
//handle data
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>();
let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<C2pFrame>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
let cid2 = cid;
let t = std::thread::spawn(move || {
@ -645,7 +661,7 @@ mod tests {
// Assert than we get some value back! Its a Raw!
let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap();
assert_eq!(cid, cid_r);
if let Frame::Raw(data) = frame {
if let Ok(Frame::Raw(data)) = frame {
assert_eq!(&data.as_slice(), b"x4hrtzsektfhxugzdtz5r78gzrtzfhxf");
} else {
panic!("wrong frame type");

@ -146,10 +146,18 @@ impl Pid {
/// This will panic if pid i greater than 7, as I do not want you to use
/// this in production!
#[doc(hidden)]
pub fn fake(pid: u8) -> Self {
assert!(pid < 8);
pub fn fake(pid_offset: u8) -> Self {
assert!(pid_offset < 8);
let o = pid_offset as u128;
const OFF: [u128; 5] = [
0x40,
0x40 * 0x40,
0x40 * 0x40 * 0x40,
0x40 * 0x40 * 0x40 * 0x40,
0x40 * 0x40 * 0x40 * 0x40 * 0x40,
];
Self {
internal: pid as u128,
internal: o + o * OFF[0] + o * OFF[1] + o * OFF[2] + o * OFF[3] + o * OFF[4],
}
}
@ -251,7 +259,9 @@ mod tests {
#[test]
fn frame_creation() {
Pid::new();
assert_eq!(format!("{}", Pid::fake(2)), "CAAAAA");
assert_eq!(format!("{}", Pid::fake(0)), "AAAAAA");
assert_eq!(format!("{}", Pid::fake(1)), "BBBBBB");
assert_eq!(format!("{}", Pid::fake(2)), "CCCCCC");
}
#[test]

@ -42,8 +42,11 @@ fn close_participant() {
let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp()));
block_on(p1_a.disconnect()).unwrap();
//As no more read/write is done on p1_b the disconnect is successful
block_on(p1_b.disconnect()).unwrap();
//As no more read/write is run disconnect is successful or already disconnected
match block_on(p1_b.disconnect()) {
Ok(_) | Err(ParticipantError::ParticipantDisconnected) => (),
e => panic!("wrong disconnect type {:?}", e),
};
assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed));
assert_eq!(
@ -225,15 +228,6 @@ fn close_network_then_disconnect_part() {
}
#[test]
/*
FLANKY:
---- opened_stream_before_remote_part_is_closed stdout ----
thread 'opened_stream_before_remote_part_is_closed' panicked at 'assertion failed: `(left == right)`
left: `Err(StreamClosed)`,
right: `Ok("HelloWorld")`', network/tests/closing.rs:236:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
*/
fn opened_stream_before_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp()));
@ -291,9 +285,9 @@ fn failed_stream_open_after_remote_part_is_closed() {
#[test]
fn open_participant_before_remote_part_is_closed() {
let (n_a, f) = Network::new(Pid::fake(1));
let (n_a, f) = Network::new(Pid::fake(0));
std::thread::spawn(f);
let (n_b, f) = Network::new(Pid::fake(2));
let (n_b, f) = Network::new(Pid::fake(1));
std::thread::spawn(f);
let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap();
@ -311,9 +305,9 @@ fn open_participant_before_remote_part_is_closed() {
#[test]
fn open_participant_after_remote_part_is_closed() {
let (n_a, f) = Network::new(Pid::fake(1));
let (n_a, f) = Network::new(Pid::fake(0));
std::thread::spawn(f);
let (n_b, f) = Network::new(Pid::fake(2));
let (n_b, f) = Network::new(Pid::fake(1));
std::thread::spawn(f);
let addr = tcp();
block_on(n_a.listen(addr.clone())).unwrap();

@ -49,9 +49,9 @@ pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) {
pub async fn network_participant_stream(
addr: ProtocolAddr,
) -> (Network, Participant, Stream, Network, Participant, Stream) {
let (n_a, f_a) = Network::new(Pid::fake(1));
let (n_a, f_a) = Network::new(Pid::fake(0));
std::thread::spawn(f_a);
let (n_b, f_b) = Network::new(Pid::fake(2));
let (n_b, f_b) = Network::new(Pid::fake(1));
std::thread::spawn(f_b);
n_a.listen(addr.clone()).await.unwrap();