From 5b6303550670127cb31e6baec2991ff82682e223 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Mon, 20 Jun 2022 00:11:49 +0200 Subject: [PATCH 1/4] Add a new/unstable functionality report_channel. This will ask the bparticipant for a list of all channels and their respective connection arguments. With that one could prob reach the remote side. The data is gathered by scheduler (or channel for the listener code). It requeres some read logs so we shouldn't abuse that function call. in bparticipant we have a new manager that also properly shuts down as the Participant holds the sender to the respective receiver. The sender is always dropped. inside the Mutex via disconnect and outside via Drop (we need 2 Options as otherwise we would create a runtime inside async context implicitly o.O ) (also i didn't liked the alternative by just overwriting the sender with a fake one, i want a propper Option that can be taken) The code might also come handy in the future when we implement a auto-reconnect feature in the bparticipant. --- network/src/api.rs | 41 +++++++++- network/src/channel.rs | 23 ++++-- network/src/lib.rs | 1 + network/src/participant.rs | 153 ++++++++++++++++++++++++++++------- network/src/scheduler.rs | 13 +-- network/tests/integration.rs | 20 ++++- 6 files changed, 207 insertions(+), 44 deletions(-) diff --git a/network/src/api.rs b/network/src/api.rs index dc028dc797..c5972af28d 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -1,7 +1,7 @@ use crate::{ channel::ProtocolsError, message::{partial_eq_bincode, Message}, - participant::{A2bStreamOpen, S2bShutdownBparticipant}, + participant::{A2bReportChannel, A2bStreamOpen, S2bShutdownBparticipant}, scheduler::{A2sConnect, Scheduler}, }; use bytes::Bytes; @@ -61,6 +61,7 @@ pub struct Participant { remote_pid: Pid, a2b_open_stream_s: Mutex>, b2a_stream_opened_r: Mutex>, + a2b_report_channel_s: Option>>>, b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: A2sDisconnect, } @@ -520,6 +521,7 @@ impl Participant { remote_pid: Pid, a2b_open_stream_s: mpsc::UnboundedSender, b2a_stream_opened_r: mpsc::UnboundedReceiver, + a2b_report_channel_s: mpsc::UnboundedSender, b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>, ) -> Self { @@ -528,6 +530,7 @@ impl Participant { remote_pid, a2b_open_stream_s: Mutex::new(a2b_open_stream_s), b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r), + a2b_report_channel_s: Some(Mutex::new(Some(a2b_report_channel_s))), b2a_bandwidth_stats_r, a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), } @@ -709,6 +712,9 @@ impl Participant { pub async fn disconnect(self) -> Result<(), ParticipantError> { // Remove, Close and try_unwrap error when unwrap fails! debug!("Closing participant from network"); + if let Some(s) = &self.a2b_report_channel_s { + let _ = s.lock().await.take(); // drop so that bparticipant can close + } //Streams will be closed by BParticipant match self.a2s_disconnect_s.lock().await.take() { @@ -752,6 +758,38 @@ impl Participant { } } + /// Returns a list of [`ConnectAddr`] that can be used to connect to the + /// respective remote. This only reports the current state of the + /// Participant at the point of calling. Also there is no guarantee that + /// the remote is listening on this address. Note: Due to timing + /// problems even if you call this repeatedly you might miss some addr that + /// got connected and disconnected quickly, though this is more of a + /// theoretical problem. + /// + /// [`ConnectAddr`]: ConnectAddr + pub async fn report_current_connect_addr(&self) -> Result, ParticipantError> { + let (p2a_return_report_s, p2a_return_report_r) = oneshot::channel::>(); + if let Some(s) = &self.a2b_report_channel_s { + if let Some(s) = &*s.lock().await { + match s.send(p2a_return_report_s) { + Ok(()) => match p2a_return_report_r.await { + Ok(list) => return Ok(list), + Err(_) => { + debug!("p2a_return_report_r failed, closing participant"); + return Err(ParticipantError::ParticipantDisconnected); + }, + }, + Err(e) => { + debug!(?e, "bParticipant is already closed, notifying"); + return Err(ParticipantError::ParticipantDisconnected); + }, + } + } + } + debug!("Participant is already closed, notifying"); + Err(ParticipantError::ParticipantDisconnected) + } + /// Returns the current approximation on the maximum bandwidth available. /// This WILL fluctuate based on the amount/size of send messages. pub fn bandwidth(&self) -> f32 { *self.b2a_bandwidth_stats_r.borrow() } @@ -1148,6 +1186,7 @@ impl Drop for Participant { // ignore closed, as we need to send it even though we disconnected the // participant from network debug!("Shutting down Participant"); + let _ = self.a2b_report_channel_s.take(); // drop so that bparticipant can close match self.a2s_disconnect_s.try_lock() { Err(e) => debug!(?e, "Participant is beeing dropped by Network right now"), diff --git a/network/src/channel.rs b/network/src/channel.rs index 92fb798ec3..592780d874 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -1,4 +1,4 @@ -use crate::api::NetworkConnectError; +use crate::api::{ConnectAddr, NetworkConnectError}; use async_trait::async_trait; use bytes::BytesMut; use futures_util::FutureExt; @@ -65,6 +65,7 @@ pub(crate) type C2cMpscConnect = ( mpsc::Sender, oneshot::Sender>, ); +pub(crate) type C2sProtocol = (Protocols, ConnectAddr, Cid); impl Protocols { const MPSC_CHANNEL_BOUND: usize = 1000; @@ -92,7 +93,7 @@ impl Protocols { cids: Arc, metrics: Arc, s2s_stop_listening_r: oneshot::Receiver<()>, - c2s_protocol_s: mpsc::UnboundedSender<(Self, Cid)>, + c2s_protocol_s: mpsc::UnboundedSender, ) -> std::io::Result<()> { use socket2::{Domain, Socket, Type}; let domain = Domain::for_address(addr); @@ -132,7 +133,11 @@ impl Protocols { let cid = cids.fetch_add(1, Ordering::Relaxed); info!(?remote_addr, ?cid, "Accepting Tcp from"); let metrics = ProtocolMetricCache::new(&cid.to_string(), Arc::clone(&metrics)); - let _ = c2s_protocol_s.send((Self::new_tcp(stream, metrics.clone()), cid)); + let _ = c2s_protocol_s.send(( + Self::new_tcp(stream, metrics.clone()), + ConnectAddr::Tcp(remote_addr), + cid, + )); } }); Ok(()) @@ -192,7 +197,7 @@ impl Protocols { cids: Arc, metrics: Arc, s2s_stop_listening_r: oneshot::Receiver<()>, - c2s_protocol_s: mpsc::UnboundedSender<(Self, Cid)>, + c2s_protocol_s: mpsc::UnboundedSender, ) -> io::Result<()> { let (mpsc_s, mut mpsc_r) = mpsc::unbounded_channel(); MPSC_POOL.lock().await.insert(addr, mpsc_s); @@ -214,6 +219,7 @@ impl Protocols { let metrics = ProtocolMetricCache::new(&cid.to_string(), Arc::clone(&metrics)); let _ = c2s_protocol_s.send(( Self::new_mpsc(local_to_remote_s, remote_to_local_r, metrics.clone()), + ConnectAddr::Mpsc(addr), cid, )); } @@ -276,7 +282,7 @@ impl Protocols { cids: Arc, metrics: Arc, s2s_stop_listening_r: oneshot::Receiver<()>, - c2s_protocol_s: mpsc::UnboundedSender<(Self, Cid)>, + c2s_protocol_s: mpsc::UnboundedSender, ) -> io::Result<()> { let (_endpoint, mut listener) = match quinn::Endpoint::server(server_config, addr) { Ok(v) => v, @@ -303,7 +309,12 @@ impl Protocols { let metrics = ProtocolMetricCache::new(&cid.to_string(), Arc::clone(&metrics)); match Protocols::new_quic(connection, true, metrics).await { Ok(quic) => { - let _ = c2s_protocol_s.send((quic, cid)); + let connect_addr = ConnectAddr::Quic( + addr, + quinn::ClientConfig::with_native_roots(), + "TODO_remote_hostname".to_string(), + ); + let _ = c2s_protocol_s.send((quic, connect_addr, cid)); }, Err(e) => { trace!(?e, "failed to start quic"); diff --git a/network/src/lib.rs b/network/src/lib.rs index 742ab23eb7..df35f28a1c 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -2,6 +2,7 @@ #![cfg_attr(test, deny(rust_2018_idioms))] #![cfg_attr(test, deny(warnings))] #![deny(clippy::clone_on_ref_ptr)] +#![feature(assert_matches)] //! Crate to handle high level networking of messages with different //! requirements and priorities over a number of protocols diff --git a/network/src/participant.rs b/network/src/participant.rs index 2f8f38bca6..ab3a50912a 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -1,5 +1,5 @@ use crate::{ - api::{ParticipantError, Stream}, + api::{ConnectAddr, ParticipantError, Stream}, channel::{Protocols, ProtocolsError, RecvProtocols, SendProtocols}, metrics::NetworkMetrics, util::DeferredTracer, @@ -27,7 +27,8 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; pub(crate) type A2bStreamOpen = (Prio, Promises, Bandwidth, oneshot::Sender); -pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, oneshot::Sender<()>); +pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, ConnectAddr, oneshot::Sender<()>); +pub(crate) type A2bReportChannel = oneshot::Sender>; pub(crate) type S2bShutdownBparticipant = (Duration, oneshot::Sender>); pub(crate) type B2sPrioStatistic = (Pid, u64, u64); @@ -36,6 +37,7 @@ pub(crate) type B2sPrioStatistic = (Pid, u64, u64); struct ChannelInfo { cid: Cid, cid_string: String, //optimisationmetrics + remote_con_addr: ConnectAddr, } #[derive(Debug)] @@ -53,6 +55,7 @@ struct ControlChannels { a2b_open_stream_r: mpsc::UnboundedReceiver, b2a_stream_opened_s: mpsc::UnboundedSender, s2b_create_channel_r: mpsc::UnboundedReceiver, + a2b_report_channel_r: mpsc::UnboundedReceiver, b2a_bandwidth_stats_s: watch::Sender, s2b_shutdown_bparticipant_r: oneshot::Receiver, /* own */ } @@ -81,6 +84,7 @@ impl BParticipant { // We use integer instead of Barrier to not block mgr from freeing at the end const BARR_CHANNEL: i32 = 1; const BARR_RECV: i32 = 4; + const BARR_REPORT: i32 = 8; const BARR_SEND: i32 = 2; const TICK_TIME: Duration = Duration::from_millis(Self::TICK_TIME_MS); const TICK_TIME_MS: u64 = 5; @@ -95,6 +99,7 @@ impl BParticipant { mpsc::UnboundedSender, mpsc::UnboundedReceiver, mpsc::UnboundedSender, + mpsc::UnboundedSender, oneshot::Sender, watch::Receiver, ) { @@ -102,12 +107,15 @@ impl BParticipant { let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded_channel::(); let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel(); let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded_channel(); + let (a2b_report_channel_s, a2b_report_channel_r) = + mpsc::unbounded_channel::(); let (b2a_bandwidth_stats_s, b2a_bandwidth_stats_r) = watch::channel::(0.0); let run_channels = Some(ControlChannels { a2b_open_stream_r, b2a_stream_opened_s, s2b_create_channel_r, + a2b_report_channel_r, b2a_bandwidth_stats_s, s2b_shutdown_bparticipant_r, }); @@ -121,7 +129,7 @@ impl BParticipant { channels: Arc::new(RwLock::new(HashMap::new())), streams: RwLock::new(HashMap::new()), shutdown_barrier: AtomicI32::new( - Self::BARR_CHANNEL + Self::BARR_SEND + Self::BARR_RECV, + Self::BARR_CHANNEL + Self::BARR_SEND + Self::BARR_RECV + Self::BARR_REPORT, ), run_channels, metrics, @@ -130,6 +138,7 @@ impl BParticipant { a2b_open_stream_s, b2a_stream_opened_r, s2b_create_channel_s, + a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2a_bandwidth_stats_r, ) @@ -185,6 +194,7 @@ impl BParticipant { b2b_add_send_protocol_s, b2b_add_recv_protocol_s, ), + self.report_channels_mgr(run_channels.a2b_report_channel_r), self.participant_shutdown_mgr( run_channels.s2b_shutdown_bparticipant_r, b2b_close_send_protocol_s.clone(), @@ -560,41 +570,67 @@ impl BParticipant { ) { let s2b_create_channel_r = UnboundedReceiverStream::new(s2b_create_channel_r); s2b_create_channel_r - .for_each_concurrent(None, |(cid, _, protocol, b2s_create_channel_done_s)| { - // This channel is now configured, and we are running it in scope of the - // participant. - let channels = Arc::clone(&self.channels); - let b2b_add_send_protocol_s = b2b_add_send_protocol_s.clone(); - let b2b_add_recv_protocol_s = b2b_add_recv_protocol_s.clone(); - async move { - let mut lock = channels.write().await; - let mut channel_no = lock.len(); - lock.insert( - cid, - Mutex::new(ChannelInfo { + .for_each_concurrent( + None, + |(cid, _, protocol, remote_con_addr, b2s_create_channel_done_s)| { + // This channel is now configured, and we are running it in scope of the + // participant. + let channels = Arc::clone(&self.channels); + let b2b_add_send_protocol_s = b2b_add_send_protocol_s.clone(); + let b2b_add_recv_protocol_s = b2b_add_recv_protocol_s.clone(); + async move { + let mut lock = channels.write().await; + let mut channel_no = lock.len(); + lock.insert( cid, - cid_string: cid.to_string(), - }), - ); - drop(lock); - let (send, recv) = protocol.split(); - b2b_add_send_protocol_s.send((cid, send)).unwrap(); - b2b_add_recv_protocol_s.send((cid, recv)).unwrap(); - b2s_create_channel_done_s.send(()).unwrap(); - if channel_no > 5 { - debug!(?channel_no, "metrics will overwrite channel #5"); - channel_no = 5; + Mutex::new(ChannelInfo { + cid, + cid_string: cid.to_string(), + remote_con_addr, + }), + ); + drop(lock); + let (send, recv) = protocol.split(); + b2b_add_send_protocol_s.send((cid, send)).unwrap(); + b2b_add_recv_protocol_s.send((cid, recv)).unwrap(); + b2s_create_channel_done_s.send(()).unwrap(); + if channel_no > 5 { + debug!(?channel_no, "metrics will overwrite channel #5"); + channel_no = 5; + } + self.metrics + .channels_connected(&self.remote_pid_string, channel_no, cid); } - self.metrics - .channels_connected(&self.remote_pid_string, channel_no, cid); - } - }) + }, + ) .await; trace!("Stop create_channel_mgr"); self.shutdown_barrier .fetch_sub(Self::BARR_CHANNEL, Ordering::SeqCst); } + async fn report_channels_mgr( + &self, + mut a2b_report_channel_r: mpsc::UnboundedReceiver, + ) { + while let Some(b2a_report_channel_s) = a2b_report_channel_r.recv().await { + let data = { + let lock = self.channels.read().await; + let mut data = Vec::new(); + for (_, c) in lock.iter() { + data.push(c.lock().await.remote_con_addr.clone()); + } + data + }; + if b2a_report_channel_s.send(data).is_err() { + warn!("couldn't report back connect_addrs"); + }; + } + trace!("Stop report_channels_mgr"); + self.shutdown_barrier + .fetch_sub(Self::BARR_REPORT, Ordering::SeqCst); + } + /// sink shutdown: /// Situation AS, AR, BS, BR. A wants to close. /// AS shutdown. @@ -784,6 +820,7 @@ mod tests { mpsc::UnboundedSender, mpsc::UnboundedReceiver, mpsc::UnboundedSender, + mpsc::UnboundedSender, oneshot::Sender, mpsc::UnboundedReceiver, watch::Receiver, @@ -800,6 +837,7 @@ mod tests { a2b_open_stream_s, b2a_stream_opened_r, s2b_create_channel_s, + a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2a_bandwidth_stats_r, ) = runtime_clone.block_on(async move { @@ -817,6 +855,7 @@ mod tests { a2b_open_stream_s, b2a_stream_opened_r, s2b_create_channel_s, + a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, b2a_bandwidth_stats_r, @@ -836,7 +875,7 @@ mod tests { let p1 = Protocols::new_mpsc(s1, r2, metrics); let (complete_s, complete_r) = oneshot::channel(); create_channel - .send((cid, Sid::new(0), p1, complete_s)) + .send((cid, Sid::new(0), p1, ConnectAddr::Mpsc(42), complete_s)) .unwrap(); complete_r.await.unwrap(); let metrics = ProtocolMetricCache::new(&cid.to_string(), met); @@ -850,6 +889,7 @@ mod tests { a2b_open_stream_s, b2a_stream_opened_r, mut s2b_create_channel_s, + a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, _b2a_bandwidth_stats_r, @@ -863,6 +903,7 @@ mod tests { let before = Instant::now(); runtime.block_on(async { drop(s2b_create_channel_s); + drop(a2b_report_channel_s); s2b_shutdown_bparticipant_s .send((Duration::from_secs(1), s)) .unwrap(); @@ -886,6 +927,7 @@ mod tests { a2b_open_stream_s, b2a_stream_opened_r, mut s2b_create_channel_s, + a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, _b2a_bandwidth_stats_r, @@ -899,6 +941,7 @@ mod tests { let before = Instant::now(); runtime.block_on(async { drop(s2b_create_channel_s); + drop(a2b_report_channel_s); s2b_shutdown_bparticipant_s .send((Duration::from_secs(2), s)) .unwrap(); @@ -923,6 +966,7 @@ mod tests { a2b_open_stream_s, b2a_stream_opened_r, mut s2b_create_channel_s, + a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, _b2a_bandwidth_stats_r, @@ -958,6 +1002,7 @@ mod tests { let (s, r) = oneshot::channel(); runtime.block_on(async { drop(s2b_create_channel_s); + drop(a2b_report_channel_s); s2b_shutdown_bparticipant_s .send((Duration::from_secs(1), s)) .unwrap(); @@ -978,6 +1023,7 @@ mod tests { a2b_open_stream_s, mut b2a_stream_opened_r, mut s2b_create_channel_s, + a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, _b2a_bandwidth_stats_r, @@ -1004,6 +1050,7 @@ mod tests { let (s, r) = oneshot::channel(); runtime.block_on(async { drop(s2b_create_channel_s); + drop(a2b_report_channel_s); s2b_shutdown_bparticipant_s .send((Duration::from_secs(1), s)) .unwrap(); @@ -1016,4 +1063,48 @@ mod tests { drop((a2b_open_stream_s, b2a_stream_opened_r, b2s_prio_statistic_r)); drop(runtime); } + + #[test] + fn report_conn_addr() { + let ( + runtime, + a2b_open_stream_s, + _b2a_stream_opened_r, + mut s2b_create_channel_s, + a2b_report_channel_s, + s2b_shutdown_bparticipant_s, + b2s_prio_statistic_r, + _b2a_bandwidth_stats_r, + handle, + ) = mock_bparticipant(); + + let _remote = runtime.block_on(mock_mpsc(0, &runtime, &mut s2b_create_channel_s)); + std::thread::sleep(Duration::from_millis(50)); + + let result = runtime.block_on(async { + let (s, r) = oneshot::channel(); + a2b_report_channel_s.send(s).unwrap(); + r.await.unwrap() + }); + + assert_eq!(result.len(), 1); + if !matches!(result[0], ConnectAddr::Mpsc(42)) { + panic!("wrong code"); + } + + let (s, r) = oneshot::channel(); + runtime.block_on(async { + drop(s2b_create_channel_s); + drop(a2b_report_channel_s); + s2b_shutdown_bparticipant_s + .send((Duration::from_secs(1), s)) + .unwrap(); + r.await.unwrap().unwrap(); + }); + + runtime.block_on(handle).unwrap(); + + drop((a2b_open_stream_s, b2s_prio_statistic_r)); + drop(runtime); + } } diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 48d51be32e..7ee534b4e7 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -223,8 +223,8 @@ impl Scheduler { }; let _ = s2a_listen_result_s.send(res); - while let Some((prot, cid)) = c2s_protocol_r.recv().await { - self.init_protocol(prot, cid, None, true).await; + while let Some((prot, con_addr, cid)) = c2s_protocol_r.recv().await { + self.init_protocol(prot, con_addr, cid, None, true).await; } } }) @@ -239,7 +239,7 @@ impl Scheduler { let metrics = ProtocolMetricCache::new(&cid.to_string(), Arc::clone(&self.protocol_metrics)); self.metrics.connect_request(&addr); - let protocol = match addr { + let protocol = match addr.clone() { ConnectAddr::Tcp(addr) => Protocols::with_tcp_connect(addr, metrics).await, #[cfg(feature = "quic")] ConnectAddr::Quic(addr, ref config, name) => { @@ -255,7 +255,7 @@ impl Scheduler { continue; }, }; - self.init_protocol(protocol, cid, Some(pid_sender), false) + self.init_protocol(protocol, addr, cid, Some(pid_sender), false) .await; } trace!("Stop connect_mgr"); @@ -375,6 +375,7 @@ impl Scheduler { async fn init_protocol( &self, mut protocol: Protocols, + con_addr: ConnectAddr, //address necessary to connect to the remote cid: Cid, s2a_return_pid_s: Option>>, send_handshake: bool, @@ -418,6 +419,7 @@ impl Scheduler { a2b_open_stream_s, b2a_stream_opened_r, s2b_create_channel_s, + a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2a_bandwidth_stats_r, ) = BParticipant::new(local_pid, pid, sid, Arc::clone(&metrics)); @@ -427,6 +429,7 @@ impl Scheduler { pid, a2b_open_stream_s, b2a_stream_opened_r, + a2b_report_channel_s, b2a_bandwidth_stats_r, participant_channels.a2s_disconnect_s, ); @@ -451,7 +454,7 @@ impl Scheduler { oneshot::channel(); //From now on wire connects directly with bparticipant! s2b_create_channel_s - .send((cid, sid, protocol, b2s_create_channel_done_s)) + .send((cid, sid, protocol, con_addr, b2s_create_channel_done_s)) .unwrap(); b2s_create_channel_done_r.await.unwrap(); if let Some(pid_oneshot) = s2a_return_pid_s { diff --git a/network/tests/integration.rs b/network/tests/integration.rs index 84d54d6211..863840b94b 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -1,4 +1,6 @@ -use std::sync::Arc; +#![feature(assert_matches)] + +use std::{assert_matches::assert_matches, sync::Arc}; use tokio::runtime::Runtime; use veloren_network::{NetworkError, StreamError}; mod helper; @@ -307,3 +309,19 @@ fn listen_on_ipv6_doesnt_block_ipv4() { drop((s1_a, s1_b, _n_a, _n_b, _p_a, _p_b)); drop((s1_a2, s1_b2, _n_a2, _n_b2, _p_a2, _p_b2)); //clean teardown } + +#[test] +fn report_ip() { + let (_, _) = helper::setup(false, 0); + let (r, _n_a, p_a, _s1_a, _n_b, p_b, _s1_b) = network_participant_stream(tcp()); + + let list_a = r.block_on(p_a.report_current_connect_addr()).unwrap(); + assert_eq!(list_a.len(), 1); + assert_matches!(list_a[0], ConnectAddr::Tcp(ip) if ip.ip() == std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1))); + + let list_b = r.block_on(p_b.report_current_connect_addr()).unwrap(); + assert_eq!(list_b.len(), 1); + assert_matches!(list_b[0], ConnectAddr::Tcp(ip) if ip.ip() == std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1))); + assert_matches!((list_a[0].clone() , list_b[0].clone()), (ConnectAddr::Tcp(a), ConnectAddr::Tcp(b)) if a.port() != b.port()); // ports need to be different + drop((_n_a, _n_b, p_a, p_b)); //clean teardown +} From f3e4f022cb55bf7e749d42a17f777871e63f5bca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Mon, 20 Jun 2022 09:47:46 +0200 Subject: [PATCH 2/4] rather than letting the api/Participant handling the cleanup, we had to move over to the bParticipant::shutdownmanager. Because the old case didn't account for the Scheduler got dropped but Participant keept around. Shutdown behavior is quite easy now: bParticipant sends a oneshot, when that hits we stop. also when Participant gets droped we still stop as there woul dbe no sense in continue running the report_mgr ... --- network/src/api.rs | 43 +++++++++++++++----------------- network/src/participant.rs | 50 ++++++++++++++++++++++++++++---------- 2 files changed, 56 insertions(+), 37 deletions(-) diff --git a/network/src/api.rs b/network/src/api.rs index c5972af28d..dd31c482ca 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -61,7 +61,7 @@ pub struct Participant { remote_pid: Pid, a2b_open_stream_s: Mutex>, b2a_stream_opened_r: Mutex>, - a2b_report_channel_s: Option>>>, + a2b_report_channel_s: Mutex>, b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: A2sDisconnect, } @@ -530,7 +530,7 @@ impl Participant { remote_pid, a2b_open_stream_s: Mutex::new(a2b_open_stream_s), b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r), - a2b_report_channel_s: Some(Mutex::new(Some(a2b_report_channel_s))), + a2b_report_channel_s: Mutex::new(a2b_report_channel_s), b2a_bandwidth_stats_r, a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), } @@ -712,9 +712,6 @@ impl Participant { pub async fn disconnect(self) -> Result<(), ParticipantError> { // Remove, Close and try_unwrap error when unwrap fails! debug!("Closing participant from network"); - if let Some(s) = &self.a2b_report_channel_s { - let _ = s.lock().await.take(); // drop so that bparticipant can close - } //Streams will be closed by BParticipant match self.a2s_disconnect_s.lock().await.take() { @@ -769,25 +766,24 @@ impl Participant { /// [`ConnectAddr`]: ConnectAddr pub async fn report_current_connect_addr(&self) -> Result, ParticipantError> { let (p2a_return_report_s, p2a_return_report_r) = oneshot::channel::>(); - if let Some(s) = &self.a2b_report_channel_s { - if let Some(s) = &*s.lock().await { - match s.send(p2a_return_report_s) { - Ok(()) => match p2a_return_report_r.await { - Ok(list) => return Ok(list), - Err(_) => { - debug!("p2a_return_report_r failed, closing participant"); - return Err(ParticipantError::ParticipantDisconnected); - }, - }, - Err(e) => { - debug!(?e, "bParticipant is already closed, notifying"); - return Err(ParticipantError::ParticipantDisconnected); - }, - } - } + match self + .a2b_report_channel_s + .lock() + .await + .send(p2a_return_report_s) + { + Ok(()) => match p2a_return_report_r.await { + Ok(list) => Ok(list), + Err(_) => { + debug!("p2a_return_report_r failed, closing participant"); + Err(ParticipantError::ParticipantDisconnected) + }, + }, + Err(e) => { + debug!(?e, "bParticipant is already closed, notifying"); + Err(ParticipantError::ParticipantDisconnected) + }, } - debug!("Participant is already closed, notifying"); - Err(ParticipantError::ParticipantDisconnected) } /// Returns the current approximation on the maximum bandwidth available. @@ -1186,7 +1182,6 @@ impl Drop for Participant { // ignore closed, as we need to send it even though we disconnected the // participant from network debug!("Shutting down Participant"); - let _ = self.a2b_report_channel_s.take(); // drop so that bparticipant can close match self.a2s_disconnect_s.try_lock() { Err(e) => debug!(?e, "Participant is beeing dropped by Network right now"), diff --git a/network/src/participant.rs b/network/src/participant.rs index ab3a50912a..54c3b1f1dc 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -153,6 +153,7 @@ impl BParticipant { async_channel::unbounded::(); let (b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_r) = async_channel::unbounded::(); + let (b2b_close_report_channel_s, b2b_close_report_channel_r) = oneshot::channel(); let (b2b_notify_send_of_recv_open_s, b2b_notify_send_of_recv_open_r) = crossbeam_channel::unbounded::<(Cid, Sid, Prio, Promises, u64)>(); let (b2b_notify_send_of_recv_close_s, b2b_notify_send_of_recv_close_r) = @@ -194,11 +195,15 @@ impl BParticipant { b2b_add_send_protocol_s, b2b_add_recv_protocol_s, ), - self.report_channels_mgr(run_channels.a2b_report_channel_r), + self.report_channels_mgr( + run_channels.a2b_report_channel_r, + b2b_close_report_channel_r + ), self.participant_shutdown_mgr( run_channels.s2b_shutdown_bparticipant_r, b2b_close_send_protocol_s.clone(), b2b_force_close_recv_protocol_s, + b2b_close_report_channel_s, ), ); } @@ -612,19 +617,30 @@ impl BParticipant { async fn report_channels_mgr( &self, mut a2b_report_channel_r: mpsc::UnboundedReceiver, + b2b_close_report_channel_r: oneshot::Receiver<()>, ) { - while let Some(b2a_report_channel_s) = a2b_report_channel_r.recv().await { - let data = { - let lock = self.channels.read().await; - let mut data = Vec::new(); - for (_, c) in lock.iter() { - data.push(c.lock().await.remote_con_addr.clone()); - } - data - }; - if b2a_report_channel_s.send(data).is_err() { - warn!("couldn't report back connect_addrs"); - }; + let mut b2b_close_report_channel_r = b2b_close_report_channel_r.fuse(); + loop { + let report = select!( + n = a2b_report_channel_r.recv().fuse() => n, + _ = &mut b2b_close_report_channel_r => break, + ); + match report { + Some(b2a_report_channel_s) => { + let data = { + let lock = self.channels.read().await; + let mut data = Vec::new(); + for (_, c) in lock.iter() { + data.push(c.lock().await.remote_con_addr.clone()); + } + data + }; + if b2a_report_channel_s.send(data).is_err() { + warn!("couldn't report back connect_addrs"); + }; + }, + None => break, + } } trace!("Stop report_channels_mgr"); self.shutdown_barrier @@ -658,6 +674,7 @@ impl BParticipant { s2b_shutdown_bparticipant_r: oneshot::Receiver, b2b_close_send_protocol_s: async_channel::Sender, b2b_force_close_recv_protocol_s: async_channel::Sender, + b2b_close_report_channel_s: oneshot::Sender<()>, ) { let wait_for_manager = || async { let mut sleep = 0.01f64; @@ -699,6 +716,13 @@ impl BParticipant { } } drop(lock); + trace!("close report_channel_mgr"); + if let Err(e) = b2b_close_report_channel_s.send(()) { + debug!( + ?e, + "report_channel_mgr could be closed if Participant was dropped already here" + ); + } trace!("wait for other managers"); let timeout = tokio::time::sleep( From b5d0ee22e4d48133491425bf1d5e50230b8c1001 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Mon, 20 Jun 2022 09:47:46 +0200 Subject: [PATCH 3/4] deactivate some features again and only keep the internal code for now to reuse it in automatic reconnect code --- network/src/api.rs | 36 +---------- network/src/lib.rs | 1 - network/src/participant.rs | 112 +---------------------------------- network/src/scheduler.rs | 2 - network/tests/integration.rs | 20 +------ 5 files changed, 3 insertions(+), 168 deletions(-) diff --git a/network/src/api.rs b/network/src/api.rs index dd31c482ca..dc028dc797 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -1,7 +1,7 @@ use crate::{ channel::ProtocolsError, message::{partial_eq_bincode, Message}, - participant::{A2bReportChannel, A2bStreamOpen, S2bShutdownBparticipant}, + participant::{A2bStreamOpen, S2bShutdownBparticipant}, scheduler::{A2sConnect, Scheduler}, }; use bytes::Bytes; @@ -61,7 +61,6 @@ pub struct Participant { remote_pid: Pid, a2b_open_stream_s: Mutex>, b2a_stream_opened_r: Mutex>, - a2b_report_channel_s: Mutex>, b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: A2sDisconnect, } @@ -521,7 +520,6 @@ impl Participant { remote_pid: Pid, a2b_open_stream_s: mpsc::UnboundedSender, b2a_stream_opened_r: mpsc::UnboundedReceiver, - a2b_report_channel_s: mpsc::UnboundedSender, b2a_bandwidth_stats_r: watch::Receiver, a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>, ) -> Self { @@ -530,7 +528,6 @@ impl Participant { remote_pid, a2b_open_stream_s: Mutex::new(a2b_open_stream_s), b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r), - a2b_report_channel_s: Mutex::new(a2b_report_channel_s), b2a_bandwidth_stats_r, a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), } @@ -755,37 +752,6 @@ impl Participant { } } - /// Returns a list of [`ConnectAddr`] that can be used to connect to the - /// respective remote. This only reports the current state of the - /// Participant at the point of calling. Also there is no guarantee that - /// the remote is listening on this address. Note: Due to timing - /// problems even if you call this repeatedly you might miss some addr that - /// got connected and disconnected quickly, though this is more of a - /// theoretical problem. - /// - /// [`ConnectAddr`]: ConnectAddr - pub async fn report_current_connect_addr(&self) -> Result, ParticipantError> { - let (p2a_return_report_s, p2a_return_report_r) = oneshot::channel::>(); - match self - .a2b_report_channel_s - .lock() - .await - .send(p2a_return_report_s) - { - Ok(()) => match p2a_return_report_r.await { - Ok(list) => Ok(list), - Err(_) => { - debug!("p2a_return_report_r failed, closing participant"); - Err(ParticipantError::ParticipantDisconnected) - }, - }, - Err(e) => { - debug!(?e, "bParticipant is already closed, notifying"); - Err(ParticipantError::ParticipantDisconnected) - }, - } - } - /// Returns the current approximation on the maximum bandwidth available. /// This WILL fluctuate based on the amount/size of send messages. pub fn bandwidth(&self) -> f32 { *self.b2a_bandwidth_stats_r.borrow() } diff --git a/network/src/lib.rs b/network/src/lib.rs index df35f28a1c..742ab23eb7 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -2,7 +2,6 @@ #![cfg_attr(test, deny(rust_2018_idioms))] #![cfg_attr(test, deny(warnings))] #![deny(clippy::clone_on_ref_ptr)] -#![feature(assert_matches)] //! Crate to handle high level networking of messages with different //! requirements and priorities over a number of protocols diff --git a/network/src/participant.rs b/network/src/participant.rs index 54c3b1f1dc..9564f55a81 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -28,7 +28,6 @@ use tracing::*; pub(crate) type A2bStreamOpen = (Prio, Promises, Bandwidth, oneshot::Sender); pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, ConnectAddr, oneshot::Sender<()>); -pub(crate) type A2bReportChannel = oneshot::Sender>; pub(crate) type S2bShutdownBparticipant = (Duration, oneshot::Sender>); pub(crate) type B2sPrioStatistic = (Pid, u64, u64); @@ -55,7 +54,6 @@ struct ControlChannels { a2b_open_stream_r: mpsc::UnboundedReceiver, b2a_stream_opened_s: mpsc::UnboundedSender, s2b_create_channel_r: mpsc::UnboundedReceiver, - a2b_report_channel_r: mpsc::UnboundedReceiver, b2a_bandwidth_stats_s: watch::Sender, s2b_shutdown_bparticipant_r: oneshot::Receiver, /* own */ } @@ -84,7 +82,6 @@ impl BParticipant { // We use integer instead of Barrier to not block mgr from freeing at the end const BARR_CHANNEL: i32 = 1; const BARR_RECV: i32 = 4; - const BARR_REPORT: i32 = 8; const BARR_SEND: i32 = 2; const TICK_TIME: Duration = Duration::from_millis(Self::TICK_TIME_MS); const TICK_TIME_MS: u64 = 5; @@ -99,7 +96,6 @@ impl BParticipant { mpsc::UnboundedSender, mpsc::UnboundedReceiver, mpsc::UnboundedSender, - mpsc::UnboundedSender, oneshot::Sender, watch::Receiver, ) { @@ -107,15 +103,12 @@ impl BParticipant { let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded_channel::(); let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel(); let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded_channel(); - let (a2b_report_channel_s, a2b_report_channel_r) = - mpsc::unbounded_channel::(); let (b2a_bandwidth_stats_s, b2a_bandwidth_stats_r) = watch::channel::(0.0); let run_channels = Some(ControlChannels { a2b_open_stream_r, b2a_stream_opened_s, s2b_create_channel_r, - a2b_report_channel_r, b2a_bandwidth_stats_s, s2b_shutdown_bparticipant_r, }); @@ -129,7 +122,7 @@ impl BParticipant { channels: Arc::new(RwLock::new(HashMap::new())), streams: RwLock::new(HashMap::new()), shutdown_barrier: AtomicI32::new( - Self::BARR_CHANNEL + Self::BARR_SEND + Self::BARR_RECV + Self::BARR_REPORT, + Self::BARR_CHANNEL + Self::BARR_SEND + Self::BARR_RECV, ), run_channels, metrics, @@ -138,7 +131,6 @@ impl BParticipant { a2b_open_stream_s, b2a_stream_opened_r, s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2a_bandwidth_stats_r, ) @@ -153,7 +145,6 @@ impl BParticipant { async_channel::unbounded::(); let (b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_r) = async_channel::unbounded::(); - let (b2b_close_report_channel_s, b2b_close_report_channel_r) = oneshot::channel(); let (b2b_notify_send_of_recv_open_s, b2b_notify_send_of_recv_open_r) = crossbeam_channel::unbounded::<(Cid, Sid, Prio, Promises, u64)>(); let (b2b_notify_send_of_recv_close_s, b2b_notify_send_of_recv_close_r) = @@ -195,15 +186,10 @@ impl BParticipant { b2b_add_send_protocol_s, b2b_add_recv_protocol_s, ), - self.report_channels_mgr( - run_channels.a2b_report_channel_r, - b2b_close_report_channel_r - ), self.participant_shutdown_mgr( run_channels.s2b_shutdown_bparticipant_r, b2b_close_send_protocol_s.clone(), b2b_force_close_recv_protocol_s, - b2b_close_report_channel_s, ), ); } @@ -614,39 +600,6 @@ impl BParticipant { .fetch_sub(Self::BARR_CHANNEL, Ordering::SeqCst); } - async fn report_channels_mgr( - &self, - mut a2b_report_channel_r: mpsc::UnboundedReceiver, - b2b_close_report_channel_r: oneshot::Receiver<()>, - ) { - let mut b2b_close_report_channel_r = b2b_close_report_channel_r.fuse(); - loop { - let report = select!( - n = a2b_report_channel_r.recv().fuse() => n, - _ = &mut b2b_close_report_channel_r => break, - ); - match report { - Some(b2a_report_channel_s) => { - let data = { - let lock = self.channels.read().await; - let mut data = Vec::new(); - for (_, c) in lock.iter() { - data.push(c.lock().await.remote_con_addr.clone()); - } - data - }; - if b2a_report_channel_s.send(data).is_err() { - warn!("couldn't report back connect_addrs"); - }; - }, - None => break, - } - } - trace!("Stop report_channels_mgr"); - self.shutdown_barrier - .fetch_sub(Self::BARR_REPORT, Ordering::SeqCst); - } - /// sink shutdown: /// Situation AS, AR, BS, BR. A wants to close. /// AS shutdown. @@ -674,7 +627,6 @@ impl BParticipant { s2b_shutdown_bparticipant_r: oneshot::Receiver, b2b_close_send_protocol_s: async_channel::Sender, b2b_force_close_recv_protocol_s: async_channel::Sender, - b2b_close_report_channel_s: oneshot::Sender<()>, ) { let wait_for_manager = || async { let mut sleep = 0.01f64; @@ -716,13 +668,6 @@ impl BParticipant { } } drop(lock); - trace!("close report_channel_mgr"); - if let Err(e) = b2b_close_report_channel_s.send(()) { - debug!( - ?e, - "report_channel_mgr could be closed if Participant was dropped already here" - ); - } trace!("wait for other managers"); let timeout = tokio::time::sleep( @@ -844,7 +789,6 @@ mod tests { mpsc::UnboundedSender, mpsc::UnboundedReceiver, mpsc::UnboundedSender, - mpsc::UnboundedSender, oneshot::Sender, mpsc::UnboundedReceiver, watch::Receiver, @@ -861,7 +805,6 @@ mod tests { a2b_open_stream_s, b2a_stream_opened_r, s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2a_bandwidth_stats_r, ) = runtime_clone.block_on(async move { @@ -879,7 +822,6 @@ mod tests { a2b_open_stream_s, b2a_stream_opened_r, s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, b2a_bandwidth_stats_r, @@ -913,7 +855,6 @@ mod tests { a2b_open_stream_s, b2a_stream_opened_r, mut s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, _b2a_bandwidth_stats_r, @@ -927,7 +868,6 @@ mod tests { let before = Instant::now(); runtime.block_on(async { drop(s2b_create_channel_s); - drop(a2b_report_channel_s); s2b_shutdown_bparticipant_s .send((Duration::from_secs(1), s)) .unwrap(); @@ -951,7 +891,6 @@ mod tests { a2b_open_stream_s, b2a_stream_opened_r, mut s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, _b2a_bandwidth_stats_r, @@ -965,7 +904,6 @@ mod tests { let before = Instant::now(); runtime.block_on(async { drop(s2b_create_channel_s); - drop(a2b_report_channel_s); s2b_shutdown_bparticipant_s .send((Duration::from_secs(2), s)) .unwrap(); @@ -990,7 +928,6 @@ mod tests { a2b_open_stream_s, b2a_stream_opened_r, mut s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, _b2a_bandwidth_stats_r, @@ -1026,7 +963,6 @@ mod tests { let (s, r) = oneshot::channel(); runtime.block_on(async { drop(s2b_create_channel_s); - drop(a2b_report_channel_s); s2b_shutdown_bparticipant_s .send((Duration::from_secs(1), s)) .unwrap(); @@ -1047,7 +983,6 @@ mod tests { a2b_open_stream_s, mut b2a_stream_opened_r, mut s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2s_prio_statistic_r, _b2a_bandwidth_stats_r, @@ -1074,7 +1009,6 @@ mod tests { let (s, r) = oneshot::channel(); runtime.block_on(async { drop(s2b_create_channel_s); - drop(a2b_report_channel_s); s2b_shutdown_bparticipant_s .send((Duration::from_secs(1), s)) .unwrap(); @@ -1087,48 +1021,4 @@ mod tests { drop((a2b_open_stream_s, b2a_stream_opened_r, b2s_prio_statistic_r)); drop(runtime); } - - #[test] - fn report_conn_addr() { - let ( - runtime, - a2b_open_stream_s, - _b2a_stream_opened_r, - mut s2b_create_channel_s, - a2b_report_channel_s, - s2b_shutdown_bparticipant_s, - b2s_prio_statistic_r, - _b2a_bandwidth_stats_r, - handle, - ) = mock_bparticipant(); - - let _remote = runtime.block_on(mock_mpsc(0, &runtime, &mut s2b_create_channel_s)); - std::thread::sleep(Duration::from_millis(50)); - - let result = runtime.block_on(async { - let (s, r) = oneshot::channel(); - a2b_report_channel_s.send(s).unwrap(); - r.await.unwrap() - }); - - assert_eq!(result.len(), 1); - if !matches!(result[0], ConnectAddr::Mpsc(42)) { - panic!("wrong code"); - } - - let (s, r) = oneshot::channel(); - runtime.block_on(async { - drop(s2b_create_channel_s); - drop(a2b_report_channel_s); - s2b_shutdown_bparticipant_s - .send((Duration::from_secs(1), s)) - .unwrap(); - r.await.unwrap().unwrap(); - }); - - runtime.block_on(handle).unwrap(); - - drop((a2b_open_stream_s, b2s_prio_statistic_r)); - drop(runtime); - } } diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 7ee534b4e7..63a37356c5 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -419,7 +419,6 @@ impl Scheduler { a2b_open_stream_s, b2a_stream_opened_r, s2b_create_channel_s, - a2b_report_channel_s, s2b_shutdown_bparticipant_s, b2a_bandwidth_stats_r, ) = BParticipant::new(local_pid, pid, sid, Arc::clone(&metrics)); @@ -429,7 +428,6 @@ impl Scheduler { pid, a2b_open_stream_s, b2a_stream_opened_r, - a2b_report_channel_s, b2a_bandwidth_stats_r, participant_channels.a2s_disconnect_s, ); diff --git a/network/tests/integration.rs b/network/tests/integration.rs index 863840b94b..84d54d6211 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -1,6 +1,4 @@ -#![feature(assert_matches)] - -use std::{assert_matches::assert_matches, sync::Arc}; +use std::sync::Arc; use tokio::runtime::Runtime; use veloren_network::{NetworkError, StreamError}; mod helper; @@ -309,19 +307,3 @@ fn listen_on_ipv6_doesnt_block_ipv4() { drop((s1_a, s1_b, _n_a, _n_b, _p_a, _p_b)); drop((s1_a2, s1_b2, _n_a2, _n_b2, _p_a2, _p_b2)); //clean teardown } - -#[test] -fn report_ip() { - let (_, _) = helper::setup(false, 0); - let (r, _n_a, p_a, _s1_a, _n_b, p_b, _s1_b) = network_participant_stream(tcp()); - - let list_a = r.block_on(p_a.report_current_connect_addr()).unwrap(); - assert_eq!(list_a.len(), 1); - assert_matches!(list_a[0], ConnectAddr::Tcp(ip) if ip.ip() == std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1))); - - let list_b = r.block_on(p_b.report_current_connect_addr()).unwrap(); - assert_eq!(list_b.len(), 1); - assert_matches!(list_b[0], ConnectAddr::Tcp(ip) if ip.ip() == std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1))); - assert_matches!((list_a[0].clone() , list_b[0].clone()), (ConnectAddr::Tcp(a), ConnectAddr::Tcp(b)) if a.port() != b.port()); // ports need to be different - drop((_n_a, _n_b, p_a, p_b)); //clean teardown -} From c2b043e524a64296b8f79272e05d7f2da0600267 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Sun, 3 Jul 2022 20:51:49 +0200 Subject: [PATCH 4/4] add a comment about QUIC like Isse proposed --- network/src/channel.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/network/src/channel.rs b/network/src/channel.rs index 592780d874..4fd31f1c21 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -309,6 +309,10 @@ impl Protocols { let metrics = ProtocolMetricCache::new(&cid.to_string(), Arc::clone(&metrics)); match Protocols::new_quic(connection, true, metrics).await { Ok(quic) => { + // TODO: we cannot guess the client hostname in quic server here. + // though we need it for the certificate to be validated, in the future + // this will either go away with new auth, or we have to do something like + // a reverse DNS lookup let connect_addr = ConnectAddr::Quic( addr, quinn::ClientConfig::with_native_roots(),