Add a new/unstable functionality report_channel.

This will ask the bparticipant for a list of all channels and their respective connection arguments.
With that one could prob reach the remote side.
The data is gathered by scheduler (or channel for the listener code).
It requeres some read logs so we shouldn't abuse that function call.

in bparticipant we have a new manager that also properly shuts down as the Participant holds the sender to the respective receiver.
The sender is always dropped. inside the Mutex via disconnect and outside via Drop (we need 2 Options as otherwise we would create a runtime inside async context implicitly o.O )
(also i didn't liked the alternative by just overwriting the sender with a fake one, i want a propper Option that can be taken)

The code might also come handy in the future when we implement a auto-reconnect feature in the bparticipant.
This commit is contained in:
Marcel Märtens 2022-06-20 00:11:49 +02:00
parent 5284e9ec94
commit 5b63035506
6 changed files with 207 additions and 44 deletions

View File

@ -1,7 +1,7 @@
use crate::{
channel::ProtocolsError,
message::{partial_eq_bincode, Message},
participant::{A2bStreamOpen, S2bShutdownBparticipant},
participant::{A2bReportChannel, A2bStreamOpen, S2bShutdownBparticipant},
scheduler::{A2sConnect, Scheduler},
};
use bytes::Bytes;
@ -61,6 +61,7 @@ pub struct Participant {
remote_pid: Pid,
a2b_open_stream_s: Mutex<mpsc::UnboundedSender<A2bStreamOpen>>,
b2a_stream_opened_r: Mutex<mpsc::UnboundedReceiver<Stream>>,
a2b_report_channel_s: Option<Mutex<Option<mpsc::UnboundedSender<A2bReportChannel>>>>,
b2a_bandwidth_stats_r: watch::Receiver<f32>,
a2s_disconnect_s: A2sDisconnect,
}
@ -520,6 +521,7 @@ impl Participant {
remote_pid: Pid,
a2b_open_stream_s: mpsc::UnboundedSender<A2bStreamOpen>,
b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
a2b_report_channel_s: mpsc::UnboundedSender<A2bReportChannel>,
b2a_bandwidth_stats_r: watch::Receiver<f32>,
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>,
) -> Self {
@ -528,6 +530,7 @@ impl Participant {
remote_pid,
a2b_open_stream_s: Mutex::new(a2b_open_stream_s),
b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r),
a2b_report_channel_s: Some(Mutex::new(Some(a2b_report_channel_s))),
b2a_bandwidth_stats_r,
a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
}
@ -709,6 +712,9 @@ impl Participant {
pub async fn disconnect(self) -> Result<(), ParticipantError> {
// Remove, Close and try_unwrap error when unwrap fails!
debug!("Closing participant from network");
if let Some(s) = &self.a2b_report_channel_s {
let _ = s.lock().await.take(); // drop so that bparticipant can close
}
//Streams will be closed by BParticipant
match self.a2s_disconnect_s.lock().await.take() {
@ -752,6 +758,38 @@ impl Participant {
}
}
/// Returns a list of [`ConnectAddr`] that can be used to connect to the
/// respective remote. This only reports the current state of the
/// Participant at the point of calling. Also there is no guarantee that
/// the remote is listening on this address. Note: Due to timing
/// problems even if you call this repeatedly you might miss some addr that
/// got connected and disconnected quickly, though this is more of a
/// theoretical problem.
///
/// [`ConnectAddr`]: ConnectAddr
pub async fn report_current_connect_addr(&self) -> Result<Vec<ConnectAddr>, ParticipantError> {
let (p2a_return_report_s, p2a_return_report_r) = oneshot::channel::<Vec<ConnectAddr>>();
if let Some(s) = &self.a2b_report_channel_s {
if let Some(s) = &*s.lock().await {
match s.send(p2a_return_report_s) {
Ok(()) => match p2a_return_report_r.await {
Ok(list) => return Ok(list),
Err(_) => {
debug!("p2a_return_report_r failed, closing participant");
return Err(ParticipantError::ParticipantDisconnected);
},
},
Err(e) => {
debug!(?e, "bParticipant is already closed, notifying");
return Err(ParticipantError::ParticipantDisconnected);
},
}
}
}
debug!("Participant is already closed, notifying");
Err(ParticipantError::ParticipantDisconnected)
}
/// Returns the current approximation on the maximum bandwidth available.
/// This WILL fluctuate based on the amount/size of send messages.
pub fn bandwidth(&self) -> f32 { *self.b2a_bandwidth_stats_r.borrow() }
@ -1148,6 +1186,7 @@ impl Drop for Participant {
// ignore closed, as we need to send it even though we disconnected the
// participant from network
debug!("Shutting down Participant");
let _ = self.a2b_report_channel_s.take(); // drop so that bparticipant can close
match self.a2s_disconnect_s.try_lock() {
Err(e) => debug!(?e, "Participant is beeing dropped by Network right now"),

View File

@ -1,4 +1,4 @@
use crate::api::NetworkConnectError;
use crate::api::{ConnectAddr, NetworkConnectError};
use async_trait::async_trait;
use bytes::BytesMut;
use futures_util::FutureExt;
@ -65,6 +65,7 @@ pub(crate) type C2cMpscConnect = (
mpsc::Sender<MpscMsg>,
oneshot::Sender<mpsc::Sender<MpscMsg>>,
);
pub(crate) type C2sProtocol = (Protocols, ConnectAddr, Cid);
impl Protocols {
const MPSC_CHANNEL_BOUND: usize = 1000;
@ -92,7 +93,7 @@ impl Protocols {
cids: Arc<AtomicU64>,
metrics: Arc<ProtocolMetrics>,
s2s_stop_listening_r: oneshot::Receiver<()>,
c2s_protocol_s: mpsc::UnboundedSender<(Self, Cid)>,
c2s_protocol_s: mpsc::UnboundedSender<C2sProtocol>,
) -> std::io::Result<()> {
use socket2::{Domain, Socket, Type};
let domain = Domain::for_address(addr);
@ -132,7 +133,11 @@ impl Protocols {
let cid = cids.fetch_add(1, Ordering::Relaxed);
info!(?remote_addr, ?cid, "Accepting Tcp from");
let metrics = ProtocolMetricCache::new(&cid.to_string(), Arc::clone(&metrics));
let _ = c2s_protocol_s.send((Self::new_tcp(stream, metrics.clone()), cid));
let _ = c2s_protocol_s.send((
Self::new_tcp(stream, metrics.clone()),
ConnectAddr::Tcp(remote_addr),
cid,
));
}
});
Ok(())
@ -192,7 +197,7 @@ impl Protocols {
cids: Arc<AtomicU64>,
metrics: Arc<ProtocolMetrics>,
s2s_stop_listening_r: oneshot::Receiver<()>,
c2s_protocol_s: mpsc::UnboundedSender<(Self, Cid)>,
c2s_protocol_s: mpsc::UnboundedSender<C2sProtocol>,
) -> io::Result<()> {
let (mpsc_s, mut mpsc_r) = mpsc::unbounded_channel();
MPSC_POOL.lock().await.insert(addr, mpsc_s);
@ -214,6 +219,7 @@ impl Protocols {
let metrics = ProtocolMetricCache::new(&cid.to_string(), Arc::clone(&metrics));
let _ = c2s_protocol_s.send((
Self::new_mpsc(local_to_remote_s, remote_to_local_r, metrics.clone()),
ConnectAddr::Mpsc(addr),
cid,
));
}
@ -276,7 +282,7 @@ impl Protocols {
cids: Arc<AtomicU64>,
metrics: Arc<ProtocolMetrics>,
s2s_stop_listening_r: oneshot::Receiver<()>,
c2s_protocol_s: mpsc::UnboundedSender<(Self, Cid)>,
c2s_protocol_s: mpsc::UnboundedSender<C2sProtocol>,
) -> io::Result<()> {
let (_endpoint, mut listener) = match quinn::Endpoint::server(server_config, addr) {
Ok(v) => v,
@ -303,7 +309,12 @@ impl Protocols {
let metrics = ProtocolMetricCache::new(&cid.to_string(), Arc::clone(&metrics));
match Protocols::new_quic(connection, true, metrics).await {
Ok(quic) => {
let _ = c2s_protocol_s.send((quic, cid));
let connect_addr = ConnectAddr::Quic(
addr,
quinn::ClientConfig::with_native_roots(),
"TODO_remote_hostname".to_string(),
);
let _ = c2s_protocol_s.send((quic, connect_addr, cid));
},
Err(e) => {
trace!(?e, "failed to start quic");

View File

@ -2,6 +2,7 @@
#![cfg_attr(test, deny(rust_2018_idioms))]
#![cfg_attr(test, deny(warnings))]
#![deny(clippy::clone_on_ref_ptr)]
#![feature(assert_matches)]
//! Crate to handle high level networking of messages with different
//! requirements and priorities over a number of protocols

View File

@ -1,5 +1,5 @@
use crate::{
api::{ParticipantError, Stream},
api::{ConnectAddr, ParticipantError, Stream},
channel::{Protocols, ProtocolsError, RecvProtocols, SendProtocols},
metrics::NetworkMetrics,
util::DeferredTracer,
@ -27,7 +27,8 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
pub(crate) type A2bStreamOpen = (Prio, Promises, Bandwidth, oneshot::Sender<Stream>);
pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, oneshot::Sender<()>);
pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, ConnectAddr, oneshot::Sender<()>);
pub(crate) type A2bReportChannel = oneshot::Sender<Vec<ConnectAddr>>;
pub(crate) type S2bShutdownBparticipant = (Duration, oneshot::Sender<Result<(), ParticipantError>>);
pub(crate) type B2sPrioStatistic = (Pid, u64, u64);
@ -36,6 +37,7 @@ pub(crate) type B2sPrioStatistic = (Pid, u64, u64);
struct ChannelInfo {
cid: Cid,
cid_string: String, //optimisationmetrics
remote_con_addr: ConnectAddr,
}
#[derive(Debug)]
@ -53,6 +55,7 @@ struct ControlChannels {
a2b_open_stream_r: mpsc::UnboundedReceiver<A2bStreamOpen>,
b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
s2b_create_channel_r: mpsc::UnboundedReceiver<S2bCreateChannel>,
a2b_report_channel_r: mpsc::UnboundedReceiver<A2bReportChannel>,
b2a_bandwidth_stats_s: watch::Sender<f32>,
s2b_shutdown_bparticipant_r: oneshot::Receiver<S2bShutdownBparticipant>, /* own */
}
@ -81,6 +84,7 @@ impl BParticipant {
// We use integer instead of Barrier to not block mgr from freeing at the end
const BARR_CHANNEL: i32 = 1;
const BARR_RECV: i32 = 4;
const BARR_REPORT: i32 = 8;
const BARR_SEND: i32 = 2;
const TICK_TIME: Duration = Duration::from_millis(Self::TICK_TIME_MS);
const TICK_TIME_MS: u64 = 5;
@ -95,6 +99,7 @@ impl BParticipant {
mpsc::UnboundedSender<A2bStreamOpen>,
mpsc::UnboundedReceiver<Stream>,
mpsc::UnboundedSender<S2bCreateChannel>,
mpsc::UnboundedSender<A2bReportChannel>,
oneshot::Sender<S2bShutdownBparticipant>,
watch::Receiver<f32>,
) {
@ -102,12 +107,15 @@ impl BParticipant {
let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded_channel::<Stream>();
let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel();
let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded_channel();
let (a2b_report_channel_s, a2b_report_channel_r) =
mpsc::unbounded_channel::<A2bReportChannel>();
let (b2a_bandwidth_stats_s, b2a_bandwidth_stats_r) = watch::channel::<f32>(0.0);
let run_channels = Some(ControlChannels {
a2b_open_stream_r,
b2a_stream_opened_s,
s2b_create_channel_r,
a2b_report_channel_r,
b2a_bandwidth_stats_s,
s2b_shutdown_bparticipant_r,
});
@ -121,7 +129,7 @@ impl BParticipant {
channels: Arc::new(RwLock::new(HashMap::new())),
streams: RwLock::new(HashMap::new()),
shutdown_barrier: AtomicI32::new(
Self::BARR_CHANNEL + Self::BARR_SEND + Self::BARR_RECV,
Self::BARR_CHANNEL + Self::BARR_SEND + Self::BARR_RECV + Self::BARR_REPORT,
),
run_channels,
metrics,
@ -130,6 +138,7 @@ impl BParticipant {
a2b_open_stream_s,
b2a_stream_opened_r,
s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2a_bandwidth_stats_r,
)
@ -185,6 +194,7 @@ impl BParticipant {
b2b_add_send_protocol_s,
b2b_add_recv_protocol_s,
),
self.report_channels_mgr(run_channels.a2b_report_channel_r),
self.participant_shutdown_mgr(
run_channels.s2b_shutdown_bparticipant_r,
b2b_close_send_protocol_s.clone(),
@ -560,7 +570,9 @@ impl BParticipant {
) {
let s2b_create_channel_r = UnboundedReceiverStream::new(s2b_create_channel_r);
s2b_create_channel_r
.for_each_concurrent(None, |(cid, _, protocol, b2s_create_channel_done_s)| {
.for_each_concurrent(
None,
|(cid, _, protocol, remote_con_addr, b2s_create_channel_done_s)| {
// This channel is now configured, and we are running it in scope of the
// participant.
let channels = Arc::clone(&self.channels);
@ -574,6 +586,7 @@ impl BParticipant {
Mutex::new(ChannelInfo {
cid,
cid_string: cid.to_string(),
remote_con_addr,
}),
);
drop(lock);
@ -588,13 +601,36 @@ impl BParticipant {
self.metrics
.channels_connected(&self.remote_pid_string, channel_no, cid);
}
})
},
)
.await;
trace!("Stop create_channel_mgr");
self.shutdown_barrier
.fetch_sub(Self::BARR_CHANNEL, Ordering::SeqCst);
}
async fn report_channels_mgr(
&self,
mut a2b_report_channel_r: mpsc::UnboundedReceiver<A2bReportChannel>,
) {
while let Some(b2a_report_channel_s) = a2b_report_channel_r.recv().await {
let data = {
let lock = self.channels.read().await;
let mut data = Vec::new();
for (_, c) in lock.iter() {
data.push(c.lock().await.remote_con_addr.clone());
}
data
};
if b2a_report_channel_s.send(data).is_err() {
warn!("couldn't report back connect_addrs");
};
}
trace!("Stop report_channels_mgr");
self.shutdown_barrier
.fetch_sub(Self::BARR_REPORT, Ordering::SeqCst);
}
/// sink shutdown:
/// Situation AS, AR, BS, BR. A wants to close.
/// AS shutdown.
@ -784,6 +820,7 @@ mod tests {
mpsc::UnboundedSender<A2bStreamOpen>,
mpsc::UnboundedReceiver<Stream>,
mpsc::UnboundedSender<S2bCreateChannel>,
mpsc::UnboundedSender<A2bReportChannel>,
oneshot::Sender<S2bShutdownBparticipant>,
mpsc::UnboundedReceiver<B2sPrioStatistic>,
watch::Receiver<f32>,
@ -800,6 +837,7 @@ mod tests {
a2b_open_stream_s,
b2a_stream_opened_r,
s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2a_bandwidth_stats_r,
) = runtime_clone.block_on(async move {
@ -817,6 +855,7 @@ mod tests {
a2b_open_stream_s,
b2a_stream_opened_r,
s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
b2a_bandwidth_stats_r,
@ -836,7 +875,7 @@ mod tests {
let p1 = Protocols::new_mpsc(s1, r2, metrics);
let (complete_s, complete_r) = oneshot::channel();
create_channel
.send((cid, Sid::new(0), p1, complete_s))
.send((cid, Sid::new(0), p1, ConnectAddr::Mpsc(42), complete_s))
.unwrap();
complete_r.await.unwrap();
let metrics = ProtocolMetricCache::new(&cid.to_string(), met);
@ -850,6 +889,7 @@ mod tests {
a2b_open_stream_s,
b2a_stream_opened_r,
mut s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
@ -863,6 +903,7 @@ mod tests {
let before = Instant::now();
runtime.block_on(async {
drop(s2b_create_channel_s);
drop(a2b_report_channel_s);
s2b_shutdown_bparticipant_s
.send((Duration::from_secs(1), s))
.unwrap();
@ -886,6 +927,7 @@ mod tests {
a2b_open_stream_s,
b2a_stream_opened_r,
mut s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
@ -899,6 +941,7 @@ mod tests {
let before = Instant::now();
runtime.block_on(async {
drop(s2b_create_channel_s);
drop(a2b_report_channel_s);
s2b_shutdown_bparticipant_s
.send((Duration::from_secs(2), s))
.unwrap();
@ -923,6 +966,7 @@ mod tests {
a2b_open_stream_s,
b2a_stream_opened_r,
mut s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
@ -958,6 +1002,7 @@ mod tests {
let (s, r) = oneshot::channel();
runtime.block_on(async {
drop(s2b_create_channel_s);
drop(a2b_report_channel_s);
s2b_shutdown_bparticipant_s
.send((Duration::from_secs(1), s))
.unwrap();
@ -978,6 +1023,7 @@ mod tests {
a2b_open_stream_s,
mut b2a_stream_opened_r,
mut s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
@ -1004,6 +1050,7 @@ mod tests {
let (s, r) = oneshot::channel();
runtime.block_on(async {
drop(s2b_create_channel_s);
drop(a2b_report_channel_s);
s2b_shutdown_bparticipant_s
.send((Duration::from_secs(1), s))
.unwrap();
@ -1016,4 +1063,48 @@ mod tests {
drop((a2b_open_stream_s, b2a_stream_opened_r, b2s_prio_statistic_r));
drop(runtime);
}
#[test]
fn report_conn_addr() {
let (
runtime,
a2b_open_stream_s,
_b2a_stream_opened_r,
mut s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
handle,
) = mock_bparticipant();
let _remote = runtime.block_on(mock_mpsc(0, &runtime, &mut s2b_create_channel_s));
std::thread::sleep(Duration::from_millis(50));
let result = runtime.block_on(async {
let (s, r) = oneshot::channel();
a2b_report_channel_s.send(s).unwrap();
r.await.unwrap()
});
assert_eq!(result.len(), 1);
if !matches!(result[0], ConnectAddr::Mpsc(42)) {
panic!("wrong code");
}
let (s, r) = oneshot::channel();
runtime.block_on(async {
drop(s2b_create_channel_s);
drop(a2b_report_channel_s);
s2b_shutdown_bparticipant_s
.send((Duration::from_secs(1), s))
.unwrap();
r.await.unwrap().unwrap();
});
runtime.block_on(handle).unwrap();
drop((a2b_open_stream_s, b2s_prio_statistic_r));
drop(runtime);
}
}

View File

@ -223,8 +223,8 @@ impl Scheduler {
};
let _ = s2a_listen_result_s.send(res);
while let Some((prot, cid)) = c2s_protocol_r.recv().await {
self.init_protocol(prot, cid, None, true).await;
while let Some((prot, con_addr, cid)) = c2s_protocol_r.recv().await {
self.init_protocol(prot, con_addr, cid, None, true).await;
}
}
})
@ -239,7 +239,7 @@ impl Scheduler {
let metrics =
ProtocolMetricCache::new(&cid.to_string(), Arc::clone(&self.protocol_metrics));
self.metrics.connect_request(&addr);
let protocol = match addr {
let protocol = match addr.clone() {
ConnectAddr::Tcp(addr) => Protocols::with_tcp_connect(addr, metrics).await,
#[cfg(feature = "quic")]
ConnectAddr::Quic(addr, ref config, name) => {
@ -255,7 +255,7 @@ impl Scheduler {
continue;
},
};
self.init_protocol(protocol, cid, Some(pid_sender), false)
self.init_protocol(protocol, addr, cid, Some(pid_sender), false)
.await;
}
trace!("Stop connect_mgr");
@ -375,6 +375,7 @@ impl Scheduler {
async fn init_protocol(
&self,
mut protocol: Protocols,
con_addr: ConnectAddr, //address necessary to connect to the remote
cid: Cid,
s2a_return_pid_s: Option<oneshot::Sender<Result<Participant, NetworkConnectError>>>,
send_handshake: bool,
@ -418,6 +419,7 @@ impl Scheduler {
a2b_open_stream_s,
b2a_stream_opened_r,
s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2a_bandwidth_stats_r,
) = BParticipant::new(local_pid, pid, sid, Arc::clone(&metrics));
@ -427,6 +429,7 @@ impl Scheduler {
pid,
a2b_open_stream_s,
b2a_stream_opened_r,
a2b_report_channel_s,
b2a_bandwidth_stats_r,
participant_channels.a2s_disconnect_s,
);
@ -451,7 +454,7 @@ impl Scheduler {
oneshot::channel();
//From now on wire connects directly with bparticipant!
s2b_create_channel_s
.send((cid, sid, protocol, b2s_create_channel_done_s))
.send((cid, sid, protocol, con_addr, b2s_create_channel_done_s))
.unwrap();
b2s_create_channel_done_r.await.unwrap();
if let Some(pid_oneshot) = s2a_return_pid_s {

View File

@ -1,4 +1,6 @@
use std::sync::Arc;
#![feature(assert_matches)]
use std::{assert_matches::assert_matches, sync::Arc};
use tokio::runtime::Runtime;
use veloren_network::{NetworkError, StreamError};
mod helper;
@ -307,3 +309,19 @@ fn listen_on_ipv6_doesnt_block_ipv4() {
drop((s1_a, s1_b, _n_a, _n_b, _p_a, _p_b));
drop((s1_a2, s1_b2, _n_a2, _n_b2, _p_a2, _p_b2)); //clean teardown
}
#[test]
fn report_ip() {
let (_, _) = helper::setup(false, 0);
let (r, _n_a, p_a, _s1_a, _n_b, p_b, _s1_b) = network_participant_stream(tcp());
let list_a = r.block_on(p_a.report_current_connect_addr()).unwrap();
assert_eq!(list_a.len(), 1);
assert_matches!(list_a[0], ConnectAddr::Tcp(ip) if ip.ip() == std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)));
let list_b = r.block_on(p_b.report_current_connect_addr()).unwrap();
assert_eq!(list_b.len(), 1);
assert_matches!(list_b[0], ConnectAddr::Tcp(ip) if ip.ip() == std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)));
assert_matches!((list_a[0].clone() , list_b[0].clone()), (ConnectAddr::Tcp(a), ConnectAddr::Tcp(b)) if a.port() != b.port()); // ports need to be different
drop((_n_a, _n_b, p_a, p_b)); //clean teardown
}