diff --git a/client/src/lib.rs b/client/src/lib.rs index 1c5e28bf6c..187c33306f 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1383,8 +1383,10 @@ impl Client { ); }, ServerMsg::Disconnect => { + debug!("finally sendinge ClientMsg::Terminate"); frontend_events.push(Event::Disconnect); self.singleton_stream.send(ClientMsg::Terminate)?; + break Ok(()); }, ServerMsg::CharacterListUpdate(character_list) => { self.character_list.characters = character_list; diff --git a/network/src/channel.rs b/network/src/channel.rs index 63062ef113..6e9a4c5e43 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -1,6 +1,7 @@ #[cfg(feature = "metrics")] use crate::metrics::NetworkMetrics; use crate::{ + participant::C2pFrame, protocols::Protocols, types::{ Cid, Frame, Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER, @@ -9,7 +10,7 @@ use crate::{ }; use futures::{ channel::{mpsc, oneshot}, - join, select, + join, sink::SinkExt, stream::StreamExt, FutureExt, @@ -41,8 +42,8 @@ impl Channel { pub async fn run( mut self, protocol: Protocols, - mut w2c_cid_frame_s: mpsc::UnboundedSender<(Cid, Frame)>, - mut leftover_cid_frame: Vec<(Cid, Frame)>, + mut w2c_cid_frame_s: mpsc::UnboundedSender, + mut leftover_cid_frame: Vec, ) { let c2w_frame_r = self.c2w_frame_r.take().unwrap(); let read_stop_receiver = self.read_stop_receiver.take().unwrap(); @@ -58,15 +59,15 @@ impl Channel { trace!(?self.cid, "Start up channel"); match protocol { Protocols::Tcp(tcp) => { - select!( - _ = tcp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver).fuse() => (), - _ = tcp.write_to_wire(self.cid, c2w_frame_r).fuse() => (), + join!( + tcp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver), + tcp.write_to_wire(self.cid, c2w_frame_r), ); }, Protocols::Udp(udp) => { - select!( - _ = udp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver).fuse() => (), - _ = udp.write_to_wire(self.cid, c2w_frame_r).fuse() => (), + join!( + udp.read_from_wire(self.cid, &mut w2c_cid_frame_s, read_stop_receiver), + udp.write_to_wire(self.cid, c2w_frame_r), ); }, } @@ -113,12 +114,9 @@ impl Handshake { } } - pub async fn setup( - self, - protocol: &Protocols, - ) -> Result<(Pid, Sid, u128, Vec<(Cid, Frame)>), ()> { + pub async fn setup(self, protocol: &Protocols) -> Result<(Pid, Sid, u128, Vec), ()> { let (c2w_frame_s, c2w_frame_r) = mpsc::unbounded::(); - let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>(); + let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::(); let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let handler_future = @@ -160,7 +158,7 @@ impl Handshake { async fn frame_handler( &self, - w2c_cid_frame_r: &mut mpsc::UnboundedReceiver<(Cid, Frame)>, + w2c_cid_frame_r: &mut mpsc::UnboundedReceiver, mut c2w_frame_s: mpsc::UnboundedSender, read_stop_sender: oneshot::Sender<()>, ) -> Result<(Pid, Sid, u128), ()> { @@ -176,7 +174,7 @@ impl Handshake { let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame); #[cfg(feature = "metrics")] { - if let Some(ref frame) = frame { + if let Some(Ok(ref frame)) = frame { self.metrics .frames_in_total .with_label_values(&["", &cid_string, &frame.get_string()]) @@ -184,10 +182,10 @@ impl Handshake { } } let r = match frame { - Some(Frame::Handshake { + Some(Ok(Frame::Handshake { magic_number, version, - }) => { + })) => { trace!(?magic_number, ?version, "Recv handshake"); if magic_number != VELOREN_MAGIC_NUMBER { error!(?magic_number, "Connection with invalid magic_number"); @@ -221,18 +219,24 @@ impl Handshake { Ok(()) } }, - Some(Frame::Shutdown) => { - info!("Shutdown signal received"); - Err(()) - }, - Some(Frame::Raw(bytes)) => { - match std::str::from_utf8(bytes.as_slice()) { - Ok(string) => error!(?string, ERR_S), - _ => error!(?bytes, ERR_S), + Some(Ok(frame)) => { + #[cfg(feature = "metrics")] + self.metrics + .frames_in_total + .with_label_values(&["", &cid_string, frame.get_string()]) + .inc(); + if let Frame::Raw(bytes) = frame { + match std::str::from_utf8(bytes.as_slice()) { + Ok(string) => error!(?string, ERR_S), + _ => error!(?bytes, ERR_S), + } } Err(()) }, - Some(_) => Err(()), + Some(Err(())) => { + info!("Protocol got interrupted"); + Err(()) + }, None => Err(()), }; if let Err(()) = r { @@ -248,7 +252,7 @@ impl Handshake { let frame = w2c_cid_frame_r.next().await.map(|(_cid, frame)| frame); let r = match frame { - Some(Frame::Init { pid, secret }) => { + Some(Ok(Frame::Init { pid, secret })) => { debug!(?pid, "Participant send their ID"); let pid_string = pid.to_string(); #[cfg(feature = "metrics")] @@ -265,22 +269,24 @@ impl Handshake { info!(?pid, "This Handshake is now configured!"); Ok((pid, stream_id_offset, secret)) }, - Some(frame) => { + Some(Ok(frame)) => { #[cfg(feature = "metrics")] self.metrics .frames_in_total .with_label_values(&["", &cid_string, frame.get_string()]) .inc(); - match frame { - Frame::Shutdown => info!("Shutdown signal received"), - Frame::Raw(bytes) => match std::str::from_utf8(bytes.as_slice()) { + if let Frame::Raw(bytes) = frame { + match std::str::from_utf8(bytes.as_slice()) { Ok(string) => error!(?string, ERR_S), _ => error!(?bytes, ERR_S), - }, - _ => (), + } } Err(()) }, + Some(Err(())) => { + info!("Protocol got interrupted"); + Err(()) + }, None => Err(()), }; if r.is_err() { diff --git a/network/src/participant.rs b/network/src/participant.rs index eca9945931..607b83e925 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -28,7 +28,8 @@ use tracing::*; use tracing_futures::Instrument; pub(crate) type A2bStreamOpen = (Prio, Promises, oneshot::Sender); -pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>); +pub(crate) type C2pFrame = (Cid, Result); +pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, Vec, oneshot::Sender<()>); pub(crate) type S2bShutdownBparticipant = oneshot::Sender>; pub(crate) type B2sPrioStatistic = (Pid, u64, u64); @@ -143,7 +144,7 @@ impl BParticipant { oneshot::channel(); let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel(); let (b2b_prios_flushed_s, b2b_prios_flushed_r) = oneshot::channel(); - let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<(Cid, Frame)>(); + let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::(); let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = PrioManager::new( #[cfg(feature = "metrics")] self.metrics.clone(), @@ -255,27 +256,29 @@ impl BParticipant { //TODO: just take first let mut lock = self.channels.write().await; if let Some(ci) = lock.values_mut().next() { - //note: this is technically wrong we should only increase when it succeeded, - // but this requiered me to clone `frame` which is a to big - // performance impact for error handling + //we are increasing metrics without checking the result to please + // borrow_checker. otherwise we would need to close `frame` what we + // dont want! #[cfg(feature = "metrics")] frames_out_total_cache .with_label_values(ci.cid, &frame) .inc(); if let Err(e) = ci.b2w_frame_s.send(frame).await { let cid = ci.cid; - warn!( - ?e, - ?cid, - "channel no longer available, maybe cleanup in process?" - ); + info!(?e, ?cid, "channel no longer available"); + if let Some(ci) = self.channels.write().await.remove(&cid) { + trace!(?cid, "stopping read protocol"); + if let Err(e) = ci.b2r_read_shutdown.send(()) { + trace!(?cid, ?e, "seems like was already shut down"); + } + } //TODO FIXME tags: takeover channel multiple info!( "FIXME: the frame is actually drop. which is fine for now as the participant \ will be closed, but not if we do channel-takeover" ); //TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant - self.close_api(Some(ParticipantError::ProtocolFailedUnrecoverable)) + self.close_write_api(Some(ParticipantError::ProtocolFailedUnrecoverable)) .await; false } else { @@ -297,16 +300,13 @@ impl BParticipant { } else { guard.1 += 1; } - //TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant - self.close_api(Some(ParticipantError::ProtocolFailedUnrecoverable)) - .await; false } } async fn handle_frames_mgr( &self, - mut w2b_frames_r: mpsc::UnboundedReceiver<(Cid, Frame)>, + mut w2b_frames_r: mpsc::UnboundedReceiver, mut b2a_stream_opened_s: mpsc::UnboundedSender, a2b_close_stream_s: mpsc::UnboundedSender, a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, @@ -318,8 +318,22 @@ impl BParticipant { let mut dropped_cnt = 0u64; let mut dropped_sid = Sid::new(0); - while let Some((cid, frame)) = w2b_frames_r.next().await { - //trace!("handling frame"); + while let Some((cid, result_frame)) = w2b_frames_r.next().await { + //trace!(?result_frame, "handling frame"); + let frame = match result_frame { + Ok(frame) => frame, + Err(()) => { + // The read protocol stopped, i need to make sure that write gets stopped + debug!("read protocol was closed. Stopping write protocol"); + if let Some(ci) = self.channels.write().await.get_mut(&cid) { + ci.b2w_frame_s + .close() + .await + .expect("couldn't stop write protocol"); + } + continue; + }, + }; #[cfg(feature = "metrics")] { let cid_string = cid.to_string(); @@ -328,8 +342,6 @@ impl BParticipant { .with_label_values(&[&self.remote_pid_string, &cid_string, frame.get_string()]) .inc(); } - #[cfg(not(feature = "metrics"))] - let _cid = cid; match frame { Frame::OpenStream { sid, @@ -400,7 +412,7 @@ impl BParticipant { false }; if finished { - //debug!(?mid, "finished receiving message"); + //trace!(?mid, "finished receiving message"); let imsg = messages.remove(&mid).unwrap(); if let Some(si) = self.streams.write().await.get_mut(&imsg.sid) { if let Err(e) = si.b2a_msg_recv_s.send(imsg).await { @@ -453,7 +465,7 @@ impl BParticipant { async fn create_channel_mgr( &self, s2b_create_channel_r: mpsc::UnboundedReceiver, - w2b_frames_s: mpsc::UnboundedSender<(Cid, Frame)>, + w2b_frames_s: mpsc::UnboundedSender, ) { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("Start create_channel_mgr"); @@ -489,17 +501,13 @@ impl BParticipant { .channels_disconnected_total .with_label_values(&[&self.remote_pid_string]) .inc(); - info!(?cid, "Channel got closed, cleaning up"); - //stopping read in case write triggered the failure - if let Some(ci) = channels.write().await.remove(&cid) { - if let Err(e) = ci.b2r_read_shutdown.send(()) { - debug!(?e, "read channel was already shut down"); - }; - } //None means it prob got closed by closing the participant + info!(?cid, "Channel got closed"); + //maybe channel got already dropped, we don't know. + channels.write().await.remove(&cid); trace!(?cid, "Channel cleanup completed"); //TEMP FIX: as we dont have channel takeover yet drop the whole // bParticipant - self.close_api(None).await; + self.close_write_api(None).await; } }, ) @@ -578,6 +586,9 @@ impl BParticipant { trace!("Start participant_shutdown_mgr"); let sender = s2b_shutdown_bparticipant_r.await.unwrap(); + let mut send_cache = + PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); + self.close_api(None).await; debug!("Closing all managers"); @@ -588,6 +599,21 @@ impl BParticipant { } b2b_prios_flushed_r.await.unwrap(); + if Some(ParticipantError::ParticipantDisconnected) != self.shutdown_info.read().await.error + { + debug!("Sending shutdown frame after flushed all prios"); + if !self + .send_frame( + Frame::Shutdown, + #[cfg(feature = "metrics")] + &mut send_cache, + ) + .await + { + warn!("couldn't send shutdown frame, are channels already closed?"); + } + } + debug!("Closing all channels, after flushed prios"); for (cid, ci) in self.channels.write().await.drain() { if let Err(e) = ci.b2r_read_shutdown.send(()) { @@ -732,21 +758,29 @@ impl BParticipant { ) } - /// close streams and set err - async fn close_api(&self, reason: Option) { - //closing api::Participant is done by closing all channels, exepct for the - // shutdown channel at this point! + async fn close_write_api(&self, reason: Option) { + trace!(?reason, "close_api"); let mut lock = self.shutdown_info.write().await; if let Some(r) = reason { lock.error = Some(r); } lock.b2a_stream_opened_s.close_channel(); + debug!("Closing all streams for write"); + for (sid, si) in self.streams.write().await.iter() { + trace!(?sid, "Shutting down Stream for write"); + si.send_closed.store(true, Ordering::Relaxed); + } + } + + ///closing api::Participant is done by closing all channels, exepct for the + /// shutdown channel at this point! + async fn close_api(&self, reason: Option) { + self.close_write_api(reason).await; debug!("Closing all streams"); for (sid, si) in self.streams.write().await.drain() { trace!(?sid, "Shutting down Stream"); si.b2a_msg_recv_s.close_channel(); - si.send_closed.store(true, Ordering::Relaxed); } } } diff --git a/network/src/protocols.rs b/network/src/protocols.rs index 0178bd805d..910b11d5c6 100644 --- a/network/src/protocols.rs +++ b/network/src/protocols.rs @@ -1,6 +1,9 @@ #[cfg(feature = "metrics")] use crate::metrics::{CidFrameCache, NetworkMetrics}; -use crate::types::{Cid, Frame, Mid, Pid, Sid}; +use crate::{ + participant::C2pFrame, + types::{Cid, Frame, Mid, Pid, Sid}, +}; use async_std::{ net::{TcpStream, UdpSocket}, prelude::*, @@ -68,9 +71,11 @@ impl TcpProtocol { /// read_except and if it fails, close the protocol async fn read_or_close( + cid: Cid, mut stream: &TcpStream, mut bytes: &mut [u8], mut end_receiver: &mut Fuse>, + w2c_cid_frame_s: &mut mpsc::UnboundedSender, ) -> bool { match select! { r = stream.read_exact(&mut bytes).fuse() => Some(r), @@ -79,6 +84,13 @@ impl TcpProtocol { Some(Ok(_)) => false, Some(Err(e)) => { info!(?e, "Closing tcp protocol due to read error"); + //w2c_cid_frame_s is shared, dropping it wouldn't notify the receiver as every + // channel is holding a sender! thats why Ne need a explicit + // STOP here + w2c_cid_frame_s + .send((cid, Err(()))) + .await + .expect("Channel or Participant seems no longer to exist"); true }, None => { @@ -91,7 +103,7 @@ impl TcpProtocol { pub async fn read_from_wire( &self, cid: Cid, - w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>, + w2c_cid_frame_s: &mut mpsc::UnboundedSender, end_r: oneshot::Receiver<()>, ) { trace!("Starting up tcp read()"); @@ -107,7 +119,7 @@ impl TcpProtocol { macro_rules! read_or_close { ($x:expr) => { - if TcpProtocol::read_or_close(&stream, $x, &mut end_r).await { + if TcpProtocol::read_or_close(cid, &stream, $x, &mut end_r, w2c_cid_frame_s).await { trace!("read_or_close requested a shutdown"); break; } @@ -202,7 +214,7 @@ impl TcpProtocol { #[cfg(feature = "metrics")] metrics_cache.with_label_values(&frame).inc(); w2c_cid_frame_s - .send((cid, frame)) + .send((cid, Ok(frame))) .await .expect("Channel or Participant seems no longer to exist"); } @@ -331,7 +343,7 @@ impl UdpProtocol { pub async fn read_from_wire( &self, cid: Cid, - w2c_cid_frame_s: &mut mpsc::UnboundedSender<(Cid, Frame)>, + w2c_cid_frame_s: &mut mpsc::UnboundedSender, end_r: oneshot::Receiver<()>, ) { trace!("Starting up udp read()"); @@ -345,7 +357,14 @@ impl UdpProtocol { let mut data_in = self.data_in.lock().await; let mut end_r = end_r.fuse(); while let Some(bytes) = select! { - r = data_in.next().fuse() => r, + r = data_in.next().fuse() => match r { + Some(r) => Some(r), + None => { + info!("Udp read ended"); + w2c_cid_frame_s.send((cid, Err(()))).await.expect("Channel or Participant seems no longer to exist"); + None + } + }, _ = end_r => None, } { trace!("Got raw UDP message with len: {}", bytes.len()); @@ -443,7 +462,7 @@ impl UdpProtocol { }; #[cfg(feature = "metrics")] metrics_cache.with_label_values(&frame).inc(); - w2c_cid_frame_s.send((cid, frame)).await.unwrap(); + w2c_cid_frame_s.send((cid, Ok(frame))).await.unwrap(); } trace!("Shutting down udp read()"); } @@ -554,10 +573,7 @@ impl UdpProtocol { #[cfg(test)] mod tests { use super::*; - use crate::{ - metrics::NetworkMetrics, - types::{Cid, Pid}, - }; + use crate::{metrics::NetworkMetrics, types::Pid}; use async_std::net; use futures::{executor::block_on, stream::StreamExt}; use std::sync::Arc; @@ -584,7 +600,7 @@ mod tests { client.flush(); //handle data - let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>(); + let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::(); let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let cid2 = cid; let t = std::thread::spawn(move || { @@ -597,10 +613,10 @@ mod tests { //async_std::task::sleep(std::time::Duration::from_millis(1000)); let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap(); assert_eq!(cid, cid_r); - if let Frame::Handshake { + if let Ok(Frame::Handshake { magic_number, version, - } = frame + }) = frame { assert_eq!(&magic_number, b"HELLOWO"); assert_eq!(version, [1337, 0, 42]); @@ -633,7 +649,7 @@ mod tests { client.flush(); //handle data - let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::<(Cid, Frame)>(); + let (mut w2c_cid_frame_s, mut w2c_cid_frame_r) = mpsc::unbounded::(); let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let cid2 = cid; let t = std::thread::spawn(move || { @@ -645,7 +661,7 @@ mod tests { // Assert than we get some value back! Its a Raw! let (cid_r, frame) = w2c_cid_frame_r.next().await.unwrap(); assert_eq!(cid, cid_r); - if let Frame::Raw(data) = frame { + if let Ok(Frame::Raw(data)) = frame { assert_eq!(&data.as_slice(), b"x4hrtzsektfhxugzdtz5r78gzrtzfhxf"); } else { panic!("wrong frame type"); diff --git a/network/src/types.rs b/network/src/types.rs index 3ede7fd302..51fd1843e3 100644 --- a/network/src/types.rs +++ b/network/src/types.rs @@ -146,10 +146,18 @@ impl Pid { /// This will panic if pid i greater than 7, as I do not want you to use /// this in production! #[doc(hidden)] - pub fn fake(pid: u8) -> Self { - assert!(pid < 8); + pub fn fake(pid_offset: u8) -> Self { + assert!(pid_offset < 8); + let o = pid_offset as u128; + const OFF: [u128; 5] = [ + 0x40, + 0x40 * 0x40, + 0x40 * 0x40 * 0x40, + 0x40 * 0x40 * 0x40 * 0x40, + 0x40 * 0x40 * 0x40 * 0x40 * 0x40, + ]; Self { - internal: pid as u128, + internal: o + o * OFF[0] + o * OFF[1] + o * OFF[2] + o * OFF[3] + o * OFF[4], } } @@ -251,7 +259,9 @@ mod tests { #[test] fn frame_creation() { Pid::new(); - assert_eq!(format!("{}", Pid::fake(2)), "CAAAAA"); + assert_eq!(format!("{}", Pid::fake(0)), "AAAAAA"); + assert_eq!(format!("{}", Pid::fake(1)), "BBBBBB"); + assert_eq!(format!("{}", Pid::fake(2)), "CCCCCC"); } #[test] diff --git a/network/tests/closing.rs b/network/tests/closing.rs index af0674460a..9c26a3ac43 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -42,8 +42,11 @@ fn close_participant() { let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp())); block_on(p1_a.disconnect()).unwrap(); - //As no more read/write is done on p1_b the disconnect is successful - block_on(p1_b.disconnect()).unwrap(); + //As no more read/write is run disconnect is successful or already disconnected + match block_on(p1_b.disconnect()) { + Ok(_) | Err(ParticipantError::ParticipantDisconnected) => (), + e => panic!("wrong disconnect type {:?}", e), + }; assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed)); assert_eq!( @@ -225,15 +228,6 @@ fn close_network_then_disconnect_part() { } #[test] -/* -FLANKY: ----- opened_stream_before_remote_part_is_closed stdout ---- -thread 'opened_stream_before_remote_part_is_closed' panicked at 'assertion failed: `(left == right)` - left: `Err(StreamClosed)`, - right: `Ok("HelloWorld")`', network/tests/closing.rs:236:5 -note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace - -*/ fn opened_stream_before_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); let (_n_a, p_a, _, _n_b, p_b, _) = block_on(network_participant_stream(tcp())); @@ -291,9 +285,9 @@ fn failed_stream_open_after_remote_part_is_closed() { #[test] fn open_participant_before_remote_part_is_closed() { - let (n_a, f) = Network::new(Pid::fake(1)); + let (n_a, f) = Network::new(Pid::fake(0)); std::thread::spawn(f); - let (n_b, f) = Network::new(Pid::fake(2)); + let (n_b, f) = Network::new(Pid::fake(1)); std::thread::spawn(f); let addr = tcp(); block_on(n_a.listen(addr.clone())).unwrap(); @@ -311,9 +305,9 @@ fn open_participant_before_remote_part_is_closed() { #[test] fn open_participant_after_remote_part_is_closed() { - let (n_a, f) = Network::new(Pid::fake(1)); + let (n_a, f) = Network::new(Pid::fake(0)); std::thread::spawn(f); - let (n_b, f) = Network::new(Pid::fake(2)); + let (n_b, f) = Network::new(Pid::fake(1)); std::thread::spawn(f); let addr = tcp(); block_on(n_a.listen(addr.clone())).unwrap(); diff --git a/network/tests/helper.rs b/network/tests/helper.rs index 53ce8e0c47..91987cbbc5 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -49,9 +49,9 @@ pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) { pub async fn network_participant_stream( addr: ProtocolAddr, ) -> (Network, Participant, Stream, Network, Participant, Stream) { - let (n_a, f_a) = Network::new(Pid::fake(1)); + let (n_a, f_a) = Network::new(Pid::fake(0)); std::thread::spawn(f_a); - let (n_b, f_b) = Network::new(Pid::fake(2)); + let (n_b, f_b) = Network::new(Pid::fake(1)); std::thread::spawn(f_b); n_a.listen(addr.clone()).await.unwrap();