diff --git a/network/src/api.rs b/network/src/api.rs index dd31c482ca..dc028dc797 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -1,7 +1,7 @@ use crate::{ channel::ProtocolsError, message::{partial_eq_bincode, Message}, - participant::{A2bReportChannel, A2bStreamOpen, S2bShutdownBparticipant}, + participant::{A2bStreamOpen, S2bShutdownBparticipant}, scheduler::{A2sConnect, Scheduler}, }; use bytes::Bytes; @@ -61,7 +61,6 @@ pub struct Participant { remote_pid: Pid, a2b_open_stream_s: Mutex>, b2a_stream_opened_r: Mutex>, - a2b_report_channel_s: Mutex>, b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: A2sDisconnect, } @@ -521,7 +520,6 @@ impl Participant { remote_pid: Pid, a2b_open_stream_s: mpsc::UnboundedSender, b2a_stream_opened_r: mpsc::UnboundedReceiver, - a2b_report_channel_s: mpsc::UnboundedSender, b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>, ) -> Self { @@ -530,7 +528,6 @@ impl Participant { remote_pid, a2b_open_stream_s: Mutex::new(a2b_open_stream_s), b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r), - a2b_report_channel_s: Mutex::new(a2b_report_channel_s), b2a_bandwidth_stats_r, a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), } @@ -755,37 +752,6 @@ impl Participant { } } - /// Returns a list of [`ConnectAddr`] that can be used to connect to the - /// respective remote. This only reports the current state of the - /// Participant at the point of calling. Also there is no guarantee that - /// the remote is listening on this address. Note: Due to timing - /// problems even if you call this repeatedly you might miss some addr that - /// got connected and disconnected quickly, though this is more of a - /// theoretical problem. - /// - /// [`ConnectAddr`]: ConnectAddr - pub async fn report_current_connect_addr(&self) -> Result, ParticipantError> { - let (p2a_return_report_s, p2a_return_report_r) = oneshot::channel::>(); - match self - .a2b_report_channel_s - .lock() - .await - .send(p2a_return_report_s) - { - Ok(()) => match p2a_return_report_r.await { - Ok(list) => Ok(list), - Err(_) => { - debug!("p2a_return_report_r failed, closing participant"); - Err(ParticipantError::ParticipantDisconnected) - }, - }, - Err(e) => { - debug!(?e, "bParticipant is already closed, notifying"); - Err(ParticipantError::ParticipantDisconnected) - }, - } - } - /// Returns the current approximation on the maximum bandwidth available. /// This WILL fluctuate based on the amount/size of send messages. pub fn bandwidth(&self) -> f32 { *self.b2a_bandwidth_stats_r.borrow() } diff --git a/network/src/lib.rs b/network/src/lib.rs index df35f28a1c..742ab23eb7 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -2,7 +2,6 @@ #![cfg_attr(test, deny(rust_2018_idioms))] #![cfg_attr(test, deny(warnings))] #![deny(clippy::clone_on_ref_ptr)] -#![feature(assert_matches)] //! Crate to handle high level networking of messages with different //! requirements and priorities over a number of protocols diff --git a/network/src/participant.rs b/network/src/participant.rs index 54c3b1f1dc..9564f55a81 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -28,7 +28,6 @@ use tracing::*; pub(crate) type A2bStreamOpen = (Prio, Promises, Bandwidth, oneshot::Sender); pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, ConnectAddr, oneshot::Sender<()>); -pub(crate) type A2bReportChannel = oneshot::Sender>; pub(crate) type S2bShutdownBparticipant = (Duration, oneshot::Sender>); pub(crate) type B2sPrioStatistic = (Pid, u64, u64); @@ -55,7 +54,6 @@ struct ControlChannels { a2b_open_stream_r: mpsc::UnboundedReceiver, b2a_stream_opened_s: mpsc::UnboundedSender, s2b_create_channel_r: mpsc::UnboundedReceiver, - a2b_report_channel_r: mpsc::UnboundedReceiver, b2a_bandwidth_stats_s: watch::Sender, s2b_shutdown_bparticipant_r: oneshot::Receiver, /* own */ } @@ -84,7 +82,6 @@ impl BParticipant { // We use integer instead of Barrier to not block mgr from freeing at the end const BARR_CHANNEL: i32 = 1; const BARR_RECV: i32 = 4; - const BARR_REPORT: i32 = 8; const BARR_SEND: i32 = 2; const TICK_TIME: Duration = Duration::from_millis(Self::TICK_TIME_MS); const TICK_TIME_MS: u64 = 5; @@ -99,7 +96,6 @@ impl BParticipant { mpsc::UnboundedSender, mpsc::UnboundedReceiver, mpsc::UnboundedSender, - mpsc::UnboundedSender, oneshot::Sender, watch::Receiver, ) { @@ -107,15 +103,12 @@ impl BParticipant { let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded_channel::(); let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel(); let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded_channel(); - let (a2b_report_channel_s, a2b_report_channel_r) = - mpsc::unbounded_channel::(); let (b2a_bandwidth_stats_s, b2a_bandwidth_stats_r) = watch::channel::(0.0); let run_channels = Some(ControlChannels { a2b_open_stream_r, b2a_stream_opened_s, s2b_create_channel_r, - a2b_report_channel_r, b2a_bandwidth_stats_s, s2b_shutdown_bparticipant_r, }); @@ -129,7 +122,7 @@ impl BParticipant { channels: Arc::new(RwLock::new(HashMap::new())), streams: RwLock::new(HashMap::new()), shutdown_barrier: AtomicI32::new( - Self::BARR_CHANNEL + Self::BARR_SEND + Self::BARR_RECV + Self::BARR_REPORT, + Self::BARR_CHANNEL + Self::BARR_SEND + Self::BARR_RECV, ), run_channels, metrics, @@ -138,7 +131,6 @@ impl BParticipant { a2b_open_stream_s, b2a_stream_opened_r, s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2a_bandwidth_stats_r, ) @@ -153,7 +145,6 @@ impl BParticipant { async_channel::unbounded::(); let (b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_r) = async_channel::unbounded::(); - let (b2b_close_report_channel_s, b2b_close_report_channel_r) = oneshot::channel(); let (b2b_notify_send_of_recv_open_s, b2b_notify_send_of_recv_open_r) = crossbeam_channel::unbounded::<(Cid, Sid, Prio, Promises, u64)>(); let (b2b_notify_send_of_recv_close_s, b2b_notify_send_of_recv_close_r) = @@ -195,15 +186,10 @@ impl BParticipant { b2b_add_send_protocol_s, b2b_add_recv_protocol_s, ), - self.report_channels_mgr( - run_channels.a2b_report_channel_r, - b2b_close_report_channel_r - ), self.participant_shutdown_mgr( run_channels.s2b_shutdown_bparticipant_r, b2b_close_send_protocol_s.clone(), b2b_force_close_recv_protocol_s, - b2b_close_report_channel_s, ), ); } @@ -614,39 +600,6 @@ impl BParticipant { .fetch_sub(Self::BARR_CHANNEL, Ordering::SeqCst); } - async fn report_channels_mgr( - &self, - mut a2b_report_channel_r: mpsc::UnboundedReceiver, - b2b_close_report_channel_r: oneshot::Receiver<()>, - ) { - let mut b2b_close_report_channel_r = b2b_close_report_channel_r.fuse(); - loop { - let report = select!( - n = a2b_report_channel_r.recv().fuse() => n, - _ = &mut b2b_close_report_channel_r => break, - ); - match report { - Some(b2a_report_channel_s) => { - let data = { - let lock = self.channels.read().await; - let mut data = Vec::new(); - for (_, c) in lock.iter() { - data.push(c.lock().await.remote_con_addr.clone()); - } - data - }; - if b2a_report_channel_s.send(data).is_err() { - warn!("couldn't report back connect_addrs"); - }; - }, - None => break, - } - } - trace!("Stop report_channels_mgr"); - self.shutdown_barrier - .fetch_sub(Self::BARR_REPORT, Ordering::SeqCst); - } - /// sink shutdown: /// Situation AS, AR, BS, BR. A wants to close. /// AS shutdown. @@ -674,7 +627,6 @@ impl BParticipant { s2b_shutdown_bparticipant_r: oneshot::Receiver, b2b_close_send_protocol_s: async_channel::Sender, b2b_force_close_recv_protocol_s: async_channel::Sender, - b2b_close_report_channel_s: oneshot::Sender<()>, ) { let wait_for_manager = || async { let mut sleep = 0.01f64; @@ -716,13 +668,6 @@ impl BParticipant { } } drop(lock); - trace!("close report_channel_mgr"); - if let Err(e) = b2b_close_report_channel_s.send(()) { - debug!( - ?e, - "report_channel_mgr could be closed if Participant was dropped already here" - ); - } trace!("wait for other managers"); let timeout = tokio::time::sleep( @@ -844,7 +789,6 @@ mod tests { mpsc::UnboundedSender, mpsc::UnboundedReceiver, mpsc::UnboundedSender, - mpsc::UnboundedSender, oneshot::Sender, mpsc::UnboundedReceiver, watch::Receiver, @@ -861,7 +805,6 @@ mod tests { a2b_open_stream_s, b2a_stream_opened_r, s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2a_bandwidth_stats_r, ) = runtime_clone.block_on(async move { @@ -879,7 +822,6 @@ mod tests { a2b_open_stream_s, b2a_stream_opened_r, s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, b2a_bandwidth_stats_r, @@ -913,7 +855,6 @@ mod tests { a2b_open_stream_s, b2a_stream_opened_r, mut s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, _b2a_bandwidth_stats_r, @@ -927,7 +868,6 @@ mod tests { let before = Instant::now(); runtime.block_on(async { drop(s2b_create_channel_s); - drop(a2b_report_channel_s); s2b_shutdown_bparticipant_s .send((Duration::from_secs(1), s)) .unwrap(); @@ -951,7 +891,6 @@ mod tests { a2b_open_stream_s, b2a_stream_opened_r, mut s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, _b2a_bandwidth_stats_r, @@ -965,7 +904,6 @@ mod tests { let before = Instant::now(); runtime.block_on(async { drop(s2b_create_channel_s); - drop(a2b_report_channel_s); s2b_shutdown_bparticipant_s .send((Duration::from_secs(2), s)) .unwrap(); @@ -990,7 +928,6 @@ mod tests { a2b_open_stream_s, b2a_stream_opened_r, mut s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, _b2a_bandwidth_stats_r, @@ -1026,7 +963,6 @@ mod tests { let (s, r) = oneshot::channel(); runtime.block_on(async { drop(s2b_create_channel_s); - drop(a2b_report_channel_s); s2b_shutdown_bparticipant_s .send((Duration::from_secs(1), s)) .unwrap(); @@ -1047,7 +983,6 @@ mod tests { a2b_open_stream_s, mut b2a_stream_opened_r, mut s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, _b2a_bandwidth_stats_r, @@ -1074,7 +1009,6 @@ mod tests { let (s, r) = oneshot::channel(); runtime.block_on(async { drop(s2b_create_channel_s); - drop(a2b_report_channel_s); s2b_shutdown_bparticipant_s .send((Duration::from_secs(1), s)) .unwrap(); @@ -1087,48 +1021,4 @@ mod tests { drop((a2b_open_stream_s, b2a_stream_opened_r, b2s_prio_statistic_r)); drop(runtime); } - - #[test] - fn report_conn_addr() { - let ( - runtime, - a2b_open_stream_s, - _b2a_stream_opened_r, - mut s2b_create_channel_s, - a2b_report_channel_s, - s2b_shutdown_bparticipant_s, - b2s_prio_statistic_r, - _b2a_bandwidth_stats_r, - handle, - ) = mock_bparticipant(); - - let _remote = runtime.block_on(mock_mpsc(0, &runtime, &mut s2b_create_channel_s)); - std::thread::sleep(Duration::from_millis(50)); - - let result = runtime.block_on(async { - let (s, r) = oneshot::channel(); - a2b_report_channel_s.send(s).unwrap(); - r.await.unwrap() - }); - - assert_eq!(result.len(), 1); - if !matches!(result[0], ConnectAddr::Mpsc(42)) { - panic!("wrong code"); - } - - let (s, r) = oneshot::channel(); - runtime.block_on(async { - drop(s2b_create_channel_s); - drop(a2b_report_channel_s); - s2b_shutdown_bparticipant_s - .send((Duration::from_secs(1), s)) - .unwrap(); - r.await.unwrap().unwrap(); - }); - - runtime.block_on(handle).unwrap(); - - drop((a2b_open_stream_s, b2s_prio_statistic_r)); - drop(runtime); - } } diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 7ee534b4e7..63a37356c5 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -419,7 +419,6 @@ impl Scheduler { a2b_open_stream_s, b2a_stream_opened_r, s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2a_bandwidth_stats_r, ) = BParticipant::new(local_pid, pid, sid, Arc::clone(&metrics)); @@ -429,7 +428,6 @@ impl Scheduler { pid, a2b_open_stream_s, b2a_stream_opened_r, - a2b_report_channel_s, b2a_bandwidth_stats_r, participant_channels.a2s_disconnect_s, ); diff --git a/network/tests/integration.rs b/network/tests/integration.rs index 863840b94b..84d54d6211 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -1,6 +1,4 @@ -#![feature(assert_matches)] - -use std::{assert_matches::assert_matches, sync::Arc}; +use std::sync::Arc; use tokio::runtime::Runtime; use veloren_network::{NetworkError, StreamError}; mod helper; @@ -309,19 +307,3 @@ fn listen_on_ipv6_doesnt_block_ipv4() { drop((s1_a, s1_b, _n_a, _n_b, _p_a, _p_b)); drop((s1_a2, s1_b2, _n_a2, _n_b2, _p_a2, _p_b2)); //clean teardown } - -#[test] -fn report_ip() { - let (_, _) = helper::setup(false, 0); - let (r, _n_a, p_a, _s1_a, _n_b, p_b, _s1_b) = network_participant_stream(tcp()); - - let list_a = r.block_on(p_a.report_current_connect_addr()).unwrap(); - assert_eq!(list_a.len(), 1); - assert_matches!(list_a[0], ConnectAddr::Tcp(ip) if ip.ip() == std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1))); - - let list_b = r.block_on(p_b.report_current_connect_addr()).unwrap(); - assert_eq!(list_b.len(), 1); - assert_matches!(list_b[0], ConnectAddr::Tcp(ip) if ip.ip() == std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1))); - assert_matches!((list_a[0].clone() , list_b[0].clone()), (ConnectAddr::Tcp(a), ConnectAddr::Tcp(b)) if a.port() != b.port()); // ports need to be different - drop((_n_a, _n_b, p_a, p_b)); //clean teardown -}