deactivate some features again and only keep the internal code for now to reuse it in automatic reconnect code

This commit is contained in:
Marcel Märtens 2022-06-20 09:47:46 +02:00
parent f3e4f022cb
commit b5d0ee22e4
5 changed files with 3 additions and 168 deletions

View File

@ -1,7 +1,7 @@
use crate::{
channel::ProtocolsError,
message::{partial_eq_bincode, Message},
participant::{A2bReportChannel, A2bStreamOpen, S2bShutdownBparticipant},
participant::{A2bStreamOpen, S2bShutdownBparticipant},
scheduler::{A2sConnect, Scheduler},
};
use bytes::Bytes;
@ -61,7 +61,6 @@ pub struct Participant {
remote_pid: Pid,
a2b_open_stream_s: Mutex<mpsc::UnboundedSender<A2bStreamOpen>>,
b2a_stream_opened_r: Mutex<mpsc::UnboundedReceiver<Stream>>,
a2b_report_channel_s: Mutex<mpsc::UnboundedSender<A2bReportChannel>>,
b2a_bandwidth_stats_r: watch::Receiver<f32>,
a2s_disconnect_s: A2sDisconnect,
}
@ -521,7 +520,6 @@ impl Participant {
remote_pid: Pid,
a2b_open_stream_s: mpsc::UnboundedSender<A2bStreamOpen>,
b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
a2b_report_channel_s: mpsc::UnboundedSender<A2bReportChannel>,
b2a_bandwidth_stats_r: watch::Receiver<f32>,
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>,
) -> Self {
@ -530,7 +528,6 @@ impl Participant {
remote_pid,
a2b_open_stream_s: Mutex::new(a2b_open_stream_s),
b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r),
a2b_report_channel_s: Mutex::new(a2b_report_channel_s),
b2a_bandwidth_stats_r,
a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
}
@ -755,37 +752,6 @@ impl Participant {
}
}
/// Returns a list of [`ConnectAddr`] that can be used to connect to the
/// respective remote. This only reports the current state of the
/// Participant at the point of calling. Also there is no guarantee that
/// the remote is listening on this address. Note: Due to timing
/// problems even if you call this repeatedly you might miss some addr that
/// got connected and disconnected quickly, though this is more of a
/// theoretical problem.
///
/// [`ConnectAddr`]: ConnectAddr
pub async fn report_current_connect_addr(&self) -> Result<Vec<ConnectAddr>, ParticipantError> {
let (p2a_return_report_s, p2a_return_report_r) = oneshot::channel::<Vec<ConnectAddr>>();
match self
.a2b_report_channel_s
.lock()
.await
.send(p2a_return_report_s)
{
Ok(()) => match p2a_return_report_r.await {
Ok(list) => Ok(list),
Err(_) => {
debug!("p2a_return_report_r failed, closing participant");
Err(ParticipantError::ParticipantDisconnected)
},
},
Err(e) => {
debug!(?e, "bParticipant is already closed, notifying");
Err(ParticipantError::ParticipantDisconnected)
},
}
}
/// Returns the current approximation on the maximum bandwidth available.
/// This WILL fluctuate based on the amount/size of send messages.
pub fn bandwidth(&self) -> f32 { *self.b2a_bandwidth_stats_r.borrow() }

View File

@ -2,7 +2,6 @@
#![cfg_attr(test, deny(rust_2018_idioms))]
#![cfg_attr(test, deny(warnings))]
#![deny(clippy::clone_on_ref_ptr)]
#![feature(assert_matches)]
//! Crate to handle high level networking of messages with different
//! requirements and priorities over a number of protocols

View File

@ -28,7 +28,6 @@ use tracing::*;
pub(crate) type A2bStreamOpen = (Prio, Promises, Bandwidth, oneshot::Sender<Stream>);
pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, ConnectAddr, oneshot::Sender<()>);
pub(crate) type A2bReportChannel = oneshot::Sender<Vec<ConnectAddr>>;
pub(crate) type S2bShutdownBparticipant = (Duration, oneshot::Sender<Result<(), ParticipantError>>);
pub(crate) type B2sPrioStatistic = (Pid, u64, u64);
@ -55,7 +54,6 @@ struct ControlChannels {
a2b_open_stream_r: mpsc::UnboundedReceiver<A2bStreamOpen>,
b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
s2b_create_channel_r: mpsc::UnboundedReceiver<S2bCreateChannel>,
a2b_report_channel_r: mpsc::UnboundedReceiver<A2bReportChannel>,
b2a_bandwidth_stats_s: watch::Sender<f32>,
s2b_shutdown_bparticipant_r: oneshot::Receiver<S2bShutdownBparticipant>, /* own */
}
@ -84,7 +82,6 @@ impl BParticipant {
// We use integer instead of Barrier to not block mgr from freeing at the end
const BARR_CHANNEL: i32 = 1;
const BARR_RECV: i32 = 4;
const BARR_REPORT: i32 = 8;
const BARR_SEND: i32 = 2;
const TICK_TIME: Duration = Duration::from_millis(Self::TICK_TIME_MS);
const TICK_TIME_MS: u64 = 5;
@ -99,7 +96,6 @@ impl BParticipant {
mpsc::UnboundedSender<A2bStreamOpen>,
mpsc::UnboundedReceiver<Stream>,
mpsc::UnboundedSender<S2bCreateChannel>,
mpsc::UnboundedSender<A2bReportChannel>,
oneshot::Sender<S2bShutdownBparticipant>,
watch::Receiver<f32>,
) {
@ -107,15 +103,12 @@ impl BParticipant {
let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded_channel::<Stream>();
let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel();
let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded_channel();
let (a2b_report_channel_s, a2b_report_channel_r) =
mpsc::unbounded_channel::<A2bReportChannel>();
let (b2a_bandwidth_stats_s, b2a_bandwidth_stats_r) = watch::channel::<f32>(0.0);
let run_channels = Some(ControlChannels {
a2b_open_stream_r,
b2a_stream_opened_s,
s2b_create_channel_r,
a2b_report_channel_r,
b2a_bandwidth_stats_s,
s2b_shutdown_bparticipant_r,
});
@ -129,7 +122,7 @@ impl BParticipant {
channels: Arc::new(RwLock::new(HashMap::new())),
streams: RwLock::new(HashMap::new()),
shutdown_barrier: AtomicI32::new(
Self::BARR_CHANNEL + Self::BARR_SEND + Self::BARR_RECV + Self::BARR_REPORT,
Self::BARR_CHANNEL + Self::BARR_SEND + Self::BARR_RECV,
),
run_channels,
metrics,
@ -138,7 +131,6 @@ impl BParticipant {
a2b_open_stream_s,
b2a_stream_opened_r,
s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2a_bandwidth_stats_r,
)
@ -153,7 +145,6 @@ impl BParticipant {
async_channel::unbounded::<Cid>();
let (b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_r) =
async_channel::unbounded::<Cid>();
let (b2b_close_report_channel_s, b2b_close_report_channel_r) = oneshot::channel();
let (b2b_notify_send_of_recv_open_s, b2b_notify_send_of_recv_open_r) =
crossbeam_channel::unbounded::<(Cid, Sid, Prio, Promises, u64)>();
let (b2b_notify_send_of_recv_close_s, b2b_notify_send_of_recv_close_r) =
@ -195,15 +186,10 @@ impl BParticipant {
b2b_add_send_protocol_s,
b2b_add_recv_protocol_s,
),
self.report_channels_mgr(
run_channels.a2b_report_channel_r,
b2b_close_report_channel_r
),
self.participant_shutdown_mgr(
run_channels.s2b_shutdown_bparticipant_r,
b2b_close_send_protocol_s.clone(),
b2b_force_close_recv_protocol_s,
b2b_close_report_channel_s,
),
);
}
@ -614,39 +600,6 @@ impl BParticipant {
.fetch_sub(Self::BARR_CHANNEL, Ordering::SeqCst);
}
async fn report_channels_mgr(
&self,
mut a2b_report_channel_r: mpsc::UnboundedReceiver<A2bReportChannel>,
b2b_close_report_channel_r: oneshot::Receiver<()>,
) {
let mut b2b_close_report_channel_r = b2b_close_report_channel_r.fuse();
loop {
let report = select!(
n = a2b_report_channel_r.recv().fuse() => n,
_ = &mut b2b_close_report_channel_r => break,
);
match report {
Some(b2a_report_channel_s) => {
let data = {
let lock = self.channels.read().await;
let mut data = Vec::new();
for (_, c) in lock.iter() {
data.push(c.lock().await.remote_con_addr.clone());
}
data
};
if b2a_report_channel_s.send(data).is_err() {
warn!("couldn't report back connect_addrs");
};
},
None => break,
}
}
trace!("Stop report_channels_mgr");
self.shutdown_barrier
.fetch_sub(Self::BARR_REPORT, Ordering::SeqCst);
}
/// sink shutdown:
/// Situation AS, AR, BS, BR. A wants to close.
/// AS shutdown.
@ -674,7 +627,6 @@ impl BParticipant {
s2b_shutdown_bparticipant_r: oneshot::Receiver<S2bShutdownBparticipant>,
b2b_close_send_protocol_s: async_channel::Sender<Cid>,
b2b_force_close_recv_protocol_s: async_channel::Sender<Cid>,
b2b_close_report_channel_s: oneshot::Sender<()>,
) {
let wait_for_manager = || async {
let mut sleep = 0.01f64;
@ -716,13 +668,6 @@ impl BParticipant {
}
}
drop(lock);
trace!("close report_channel_mgr");
if let Err(e) = b2b_close_report_channel_s.send(()) {
debug!(
?e,
"report_channel_mgr could be closed if Participant was dropped already here"
);
}
trace!("wait for other managers");
let timeout = tokio::time::sleep(
@ -844,7 +789,6 @@ mod tests {
mpsc::UnboundedSender<A2bStreamOpen>,
mpsc::UnboundedReceiver<Stream>,
mpsc::UnboundedSender<S2bCreateChannel>,
mpsc::UnboundedSender<A2bReportChannel>,
oneshot::Sender<S2bShutdownBparticipant>,
mpsc::UnboundedReceiver<B2sPrioStatistic>,
watch::Receiver<f32>,
@ -861,7 +805,6 @@ mod tests {
a2b_open_stream_s,
b2a_stream_opened_r,
s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2a_bandwidth_stats_r,
) = runtime_clone.block_on(async move {
@ -879,7 +822,6 @@ mod tests {
a2b_open_stream_s,
b2a_stream_opened_r,
s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
b2a_bandwidth_stats_r,
@ -913,7 +855,6 @@ mod tests {
a2b_open_stream_s,
b2a_stream_opened_r,
mut s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
@ -927,7 +868,6 @@ mod tests {
let before = Instant::now();
runtime.block_on(async {
drop(s2b_create_channel_s);
drop(a2b_report_channel_s);
s2b_shutdown_bparticipant_s
.send((Duration::from_secs(1), s))
.unwrap();
@ -951,7 +891,6 @@ mod tests {
a2b_open_stream_s,
b2a_stream_opened_r,
mut s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
@ -965,7 +904,6 @@ mod tests {
let before = Instant::now();
runtime.block_on(async {
drop(s2b_create_channel_s);
drop(a2b_report_channel_s);
s2b_shutdown_bparticipant_s
.send((Duration::from_secs(2), s))
.unwrap();
@ -990,7 +928,6 @@ mod tests {
a2b_open_stream_s,
b2a_stream_opened_r,
mut s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
@ -1026,7 +963,6 @@ mod tests {
let (s, r) = oneshot::channel();
runtime.block_on(async {
drop(s2b_create_channel_s);
drop(a2b_report_channel_s);
s2b_shutdown_bparticipant_s
.send((Duration::from_secs(1), s))
.unwrap();
@ -1047,7 +983,6 @@ mod tests {
a2b_open_stream_s,
mut b2a_stream_opened_r,
mut s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
@ -1074,7 +1009,6 @@ mod tests {
let (s, r) = oneshot::channel();
runtime.block_on(async {
drop(s2b_create_channel_s);
drop(a2b_report_channel_s);
s2b_shutdown_bparticipant_s
.send((Duration::from_secs(1), s))
.unwrap();
@ -1087,48 +1021,4 @@ mod tests {
drop((a2b_open_stream_s, b2a_stream_opened_r, b2s_prio_statistic_r));
drop(runtime);
}
#[test]
fn report_conn_addr() {
let (
runtime,
a2b_open_stream_s,
_b2a_stream_opened_r,
mut s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2s_prio_statistic_r,
_b2a_bandwidth_stats_r,
handle,
) = mock_bparticipant();
let _remote = runtime.block_on(mock_mpsc(0, &runtime, &mut s2b_create_channel_s));
std::thread::sleep(Duration::from_millis(50));
let result = runtime.block_on(async {
let (s, r) = oneshot::channel();
a2b_report_channel_s.send(s).unwrap();
r.await.unwrap()
});
assert_eq!(result.len(), 1);
if !matches!(result[0], ConnectAddr::Mpsc(42)) {
panic!("wrong code");
}
let (s, r) = oneshot::channel();
runtime.block_on(async {
drop(s2b_create_channel_s);
drop(a2b_report_channel_s);
s2b_shutdown_bparticipant_s
.send((Duration::from_secs(1), s))
.unwrap();
r.await.unwrap().unwrap();
});
runtime.block_on(handle).unwrap();
drop((a2b_open_stream_s, b2s_prio_statistic_r));
drop(runtime);
}
}

View File

@ -419,7 +419,6 @@ impl Scheduler {
a2b_open_stream_s,
b2a_stream_opened_r,
s2b_create_channel_s,
a2b_report_channel_s,
s2b_shutdown_bparticipant_s,
b2a_bandwidth_stats_r,
) = BParticipant::new(local_pid, pid, sid, Arc::clone(&metrics));
@ -429,7 +428,6 @@ impl Scheduler {
pid,
a2b_open_stream_s,
b2a_stream_opened_r,
a2b_report_channel_s,
b2a_bandwidth_stats_r,
participant_channels.a2s_disconnect_s,
);

View File

@ -1,6 +1,4 @@
#![feature(assert_matches)]
use std::{assert_matches::assert_matches, sync::Arc};
use std::sync::Arc;
use tokio::runtime::Runtime;
use veloren_network::{NetworkError, StreamError};
mod helper;
@ -309,19 +307,3 @@ fn listen_on_ipv6_doesnt_block_ipv4() {
drop((s1_a, s1_b, _n_a, _n_b, _p_a, _p_b));
drop((s1_a2, s1_b2, _n_a2, _n_b2, _p_a2, _p_b2)); //clean teardown
}
#[test]
fn report_ip() {
let (_, _) = helper::setup(false, 0);
let (r, _n_a, p_a, _s1_a, _n_b, p_b, _s1_b) = network_participant_stream(tcp());
let list_a = r.block_on(p_a.report_current_connect_addr()).unwrap();
assert_eq!(list_a.len(), 1);
assert_matches!(list_a[0], ConnectAddr::Tcp(ip) if ip.ip() == std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)));
let list_b = r.block_on(p_b.report_current_connect_addr()).unwrap();
assert_eq!(list_b.len(), 1);
assert_matches!(list_b[0], ConnectAddr::Tcp(ip) if ip.ip() == std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)));
assert_matches!((list_a[0].clone() , list_b[0].clone()), (ConnectAddr::Tcp(a), ConnectAddr::Tcp(b)) if a.port() != b.port()); // ports need to be different
drop((_n_a, _n_b, p_a, p_b)); //clean teardown
}