use type to reduce complexity

This commit is contained in:
Marcel Märtens 2020-07-15 00:18:04 +02:00
parent 20af325165
commit 58cb98deaa
3 changed files with 62 additions and 79 deletions

View File

@ -4,6 +4,7 @@
//! (cd network/examples/async_recv && RUST_BACKTRACE=1 cargo run)
use crate::{
message::{self, partial_eq_bincode, IncomingMessage, MessageBuffer, OutgoingMessage},
participant::{A2bStreamOpen, S2bShutdownBparticipant},
scheduler::Scheduler,
types::{Mid, Pid, Prio, Promises, Sid},
};
@ -30,8 +31,7 @@ use std::{
use tracing::*;
use tracing_futures::Instrument;
type ParticipantCloseChannel =
mpsc::UnboundedSender<(Pid, oneshot::Sender<async_std::io::Result<()>>)>;
type A2sDisconnect = Arc<Mutex<Option<mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>>>>;
/// Represents a Tcp or Udp or Mpsc address
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
@ -51,10 +51,10 @@ pub enum ProtocolAddr {
pub struct Participant {
local_pid: Pid,
remote_pid: Pid,
a2b_steam_open_s: RwLock<mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>>,
a2b_stream_open_s: RwLock<mpsc::UnboundedSender<A2bStreamOpen>>,
b2a_stream_opened_r: RwLock<mpsc::UnboundedReceiver<Stream>>,
closed: Arc<RwLock<Result<(), ParticipantError>>>,
a2s_disconnect_s: Arc<Mutex<Option<ParticipantCloseChannel>>>,
a2s_disconnect_s: A2sDisconnect,
}
/// `Streams` represents a channel to send `n` messages with a certain priority
@ -147,8 +147,7 @@ pub enum StreamError {
/// [`connected`]: Network::connected
pub struct Network {
local_pid: Pid,
participant_disconnect_sender:
RwLock<HashMap<Pid, Arc<Mutex<Option<ParticipantCloseChannel>>>>>,
participant_disconnect_sender: RwLock<HashMap<Pid, A2sDisconnect>>,
listen_sender:
RwLock<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<async_std::io::Result<()>>)>>,
connect_sender:
@ -390,15 +389,15 @@ impl Participant {
pub(crate) fn new(
local_pid: Pid,
remote_pid: Pid,
a2b_steam_open_s: mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>,
a2b_stream_open_s: mpsc::UnboundedSender<A2bStreamOpen>,
b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, oneshot::Sender<async_std::io::Result<()>>)>,
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>,
closed: Arc<RwLock<Result<(), ParticipantError>>>,
) -> Self {
Self {
local_pid,
remote_pid,
a2b_steam_open_s: RwLock::new(a2b_steam_open_s),
a2b_stream_open_s: RwLock::new(a2b_stream_open_s),
b2a_stream_opened_r: RwLock::new(b2a_stream_opened_r),
closed,
a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
@ -448,10 +447,10 @@ impl Participant {
pub async fn open(&self, prio: u8, promises: Promises) -> Result<Stream, ParticipantError> {
//use this lock for now to make sure that only one open at a time is made,
// TODO: not sure if we can paralise that, check in future
let mut a2b_steam_open_s = self.a2b_steam_open_s.write().await;
let mut a2b_stream_open_s = self.a2b_stream_open_s.write().await;
self.closed.read().await.clone()?;
let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel();
a2b_steam_open_s
a2b_stream_open_s
.send((prio, promises, p2a_return_stream_s))
.await
.unwrap();

View File

@ -25,6 +25,11 @@ use std::{
};
use tracing::*;
pub(crate) type A2bStreamOpen = (Prio, Promises, oneshot::Sender<Stream>);
pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>);
pub(crate) type S2bShutdownBparticipant = oneshot::Sender<async_std::io::Result<()>>;
pub(crate) type B2sPrioStatistic = (Pid, u64, u64);
#[derive(Debug)]
struct ChannelInfo {
cid: Cid,
@ -42,15 +47,13 @@ struct StreamInfo {
}
#[derive(Debug)]
#[allow(clippy::type_complexity)]
struct ControlChannels {
a2b_steam_open_r: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender<Stream>)>,
a2b_stream_open_r: mpsc::UnboundedReceiver<A2bStreamOpen>,
b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
s2b_create_channel_r:
mpsc::UnboundedReceiver<(Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>)>,
s2b_create_channel_r: mpsc::UnboundedReceiver<S2bCreateChannel>,
a2b_close_stream_r: mpsc::UnboundedReceiver<Sid>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
s2b_shutdown_bparticipant_r: oneshot::Receiver<oneshot::Sender<async_std::io::Result<()>>>, /* own */
s2b_shutdown_bparticipant_r: oneshot::Receiver<S2bShutdownBparticipant>, /* own */
}
#[derive(Debug)]
@ -75,21 +78,20 @@ impl BParticipant {
metrics: Arc<NetworkMetrics>,
) -> (
Self,
mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>,
mpsc::UnboundedSender<A2bStreamOpen>,
mpsc::UnboundedReceiver<Stream>,
mpsc::UnboundedSender<(Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>)>,
oneshot::Sender<oneshot::Sender<async_std::io::Result<()>>>,
mpsc::UnboundedSender<S2bCreateChannel>,
oneshot::Sender<S2bShutdownBparticipant>,
Arc<RwLock<Result<(), ParticipantError>>>,
) {
let (a2b_steam_open_s, a2b_steam_open_r) =
mpsc::unbounded::<(Prio, Promises, oneshot::Sender<Stream>)>();
let (a2b_steam_open_s, a2b_stream_open_r) = mpsc::unbounded::<A2bStreamOpen>();
let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded::<Stream>();
let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded();
let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel();
let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded();
let run_channels = Some(ControlChannels {
a2b_steam_open_r,
a2b_stream_open_r,
b2a_stream_opened_s,
s2b_create_channel_r,
a2b_close_stream_r,
@ -120,7 +122,7 @@ impl BParticipant {
)
}
pub async fn run(mut self, b2s_prio_statistic_s: mpsc::UnboundedSender<(Pid, u64, u64)>) {
pub async fn run(mut self, b2s_prio_statistic_s: mpsc::UnboundedSender<B2sPrioStatistic>) {
//those managers that listen on api::Participant need an additional oneshot for
// shutdown scenario, those handled by scheduler will be closed by it.
let (shutdown_send_mgr_sender, shutdown_send_mgr_receiver) = oneshot::channel();
@ -135,7 +137,7 @@ impl BParticipant {
let run_channels = self.run_channels.take().unwrap();
futures::join!(
self.open_mgr(
run_channels.a2b_steam_open_r,
run_channels.a2b_stream_open_r,
run_channels.a2b_close_stream_s.clone(),
a2p_msg_s.clone(),
shutdown_open_mgr_receiver,
@ -175,7 +177,7 @@ impl BParticipant {
mut prios: PrioManager,
mut shutdown_send_mgr_receiver: oneshot::Receiver<()>,
b2b_prios_flushed_s: oneshot::Sender<()>,
mut b2s_prio_statistic_s: mpsc::UnboundedSender<(Pid, u64, u64)>,
mut b2s_prio_statistic_s: mpsc::UnboundedSender<B2sPrioStatistic>,
) {
//This time equals the MINIMUM Latency in average, so keep it down and //Todo:
// make it configureable or switch to await E.g. Prio 0 = await, prio 50
@ -407,16 +409,9 @@ impl BParticipant {
self.running_mgr.fetch_sub(1, Ordering::Relaxed);
}
#[allow(clippy::type_complexity)]
async fn create_channel_mgr(
&self,
s2b_create_channel_r: mpsc::UnboundedReceiver<(
Cid,
Sid,
Protocols,
Vec<(Cid, Frame)>,
oneshot::Sender<()>,
)>,
s2b_create_channel_r: mpsc::UnboundedReceiver<S2bCreateChannel>,
w2b_frames_s: mpsc::UnboundedSender<(Cid, Frame)>,
) {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
@ -461,7 +456,7 @@ impl BParticipant {
async fn open_mgr(
&self,
mut a2b_steam_open_r: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender<Stream>)>,
mut a2b_stream_open_r: mpsc::UnboundedReceiver<A2bStreamOpen>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>,
shutdown_open_mgr_receiver: oneshot::Receiver<()>,
@ -474,7 +469,7 @@ impl BParticipant {
let mut shutdown_open_mgr_receiver = shutdown_open_mgr_receiver.fuse();
//from api or shutdown signal
while let Some((prio, promises, p2a_return_stream)) = select! {
next = a2b_steam_open_r.next().fuse() => next,
next = a2b_stream_open_r.next().fuse() => next,
_ = shutdown_open_mgr_receiver => None,
} {
debug!(?prio, ?promises, "Got request to open a new steam");
@ -512,7 +507,7 @@ impl BParticipant {
/// called by api to get the result status
async fn participant_shutdown_mgr(
&self,
s2b_shutdown_bparticipant_r: oneshot::Receiver<oneshot::Sender<async_std::io::Result<()>>>,
s2b_shutdown_bparticipant_r: oneshot::Receiver<S2bShutdownBparticipant>,
b2b_prios_flushed_r: oneshot::Receiver<()>,
mut mgr_to_shutdown: Vec<oneshot::Sender<()>>,
) {

View File

@ -2,9 +2,9 @@ use crate::{
api::{Participant, ProtocolAddr},
channel::Handshake,
metrics::NetworkMetrics,
participant::BParticipant,
participant::{B2sPrioStatistic, BParticipant, S2bCreateChannel},
protocols::{Protocols, TcpProtocol, UdpProtocol},
types::{Cid, Frame, Pid, Sid},
types::Pid,
};
use async_std::{
io, net,
@ -30,16 +30,6 @@ use std::{
use tracing::*;
use tracing_futures::Instrument;
#[derive(Debug)]
#[allow(clippy::type_complexity)]
struct ParticipantInfo {
secret: u128,
s2b_create_channel_s:
mpsc::UnboundedSender<(Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>)>,
s2b_shutdown_bparticipant_s:
Option<oneshot::Sender<oneshot::Sender<async_std::io::Result<()>>>>,
}
/// Naming of Channels `x2x`
/// - a: api
/// - s: scheduler
@ -48,21 +38,33 @@ struct ParticipantInfo {
/// - r: protocol
/// - w: wire
/// - c: channel/handshake
#[derive(Debug)]
struct ParticipantInfo {
secret: u128,
s2b_create_channel_s: mpsc::UnboundedSender<S2bCreateChannel>,
s2b_shutdown_bparticipant_s:
Option<oneshot::Sender<oneshot::Sender<async_std::io::Result<()>>>>,
}
type A2sListen = (ProtocolAddr, oneshot::Sender<io::Result<()>>);
type A2sConnect = (ProtocolAddr, oneshot::Sender<io::Result<Participant>>);
type A2sDisconnect = (Pid, oneshot::Sender<async_std::io::Result<()>>);
#[derive(Debug)]
struct ControlChannels {
a2s_listen_r: mpsc::UnboundedReceiver<(ProtocolAddr, oneshot::Sender<io::Result<()>>)>,
a2s_connect_r:
mpsc::UnboundedReceiver<(ProtocolAddr, oneshot::Sender<io::Result<Participant>>)>,
a2s_listen_r: mpsc::UnboundedReceiver<A2sListen>,
a2s_connect_r: mpsc::UnboundedReceiver<A2sConnect>,
a2s_scheduler_shutdown_r: oneshot::Receiver<()>,
a2s_disconnect_r: mpsc::UnboundedReceiver<(Pid, oneshot::Sender<async_std::io::Result<()>>)>,
b2s_prio_statistic_r: mpsc::UnboundedReceiver<(Pid, u64, u64)>,
a2s_disconnect_r: mpsc::UnboundedReceiver<A2sDisconnect>,
b2s_prio_statistic_r: mpsc::UnboundedReceiver<B2sPrioStatistic>,
}
#[derive(Debug, Clone)]
struct ParticipantChannels {
s2a_connected_s: mpsc::UnboundedSender<Participant>,
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, oneshot::Sender<async_std::io::Result<()>>)>,
b2s_prio_statistic_s: mpsc::UnboundedSender<(Pid, u64, u64)>,
a2s_disconnect_s: mpsc::UnboundedSender<A2sDisconnect>,
b2s_prio_statistic_s: mpsc::UnboundedSender<B2sPrioStatistic>,
}
#[derive(Debug)]
@ -80,26 +82,22 @@ pub struct Scheduler {
}
impl Scheduler {
#[allow(clippy::type_complexity)]
pub fn new(
local_pid: Pid,
registry: Option<&Registry>,
) -> (
Self,
mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<io::Result<()>>)>,
mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<io::Result<Participant>>)>,
mpsc::UnboundedSender<A2sListen>,
mpsc::UnboundedSender<A2sConnect>,
mpsc::UnboundedReceiver<Participant>,
oneshot::Sender<()>,
) {
let (a2s_listen_s, a2s_listen_r) =
mpsc::unbounded::<(ProtocolAddr, oneshot::Sender<io::Result<()>>)>();
let (a2s_connect_s, a2s_connect_r) =
mpsc::unbounded::<(ProtocolAddr, oneshot::Sender<io::Result<Participant>>)>();
let (a2s_listen_s, a2s_listen_r) = mpsc::unbounded::<A2sListen>();
let (a2s_connect_s, a2s_connect_r) = mpsc::unbounded::<A2sConnect>();
let (s2a_connected_s, s2a_connected_r) = mpsc::unbounded::<Participant>();
let (a2s_scheduler_shutdown_s, a2s_scheduler_shutdown_r) = oneshot::channel::<()>();
let (a2s_disconnect_s, a2s_disconnect_r) =
mpsc::unbounded::<(Pid, oneshot::Sender<async_std::io::Result<()>>)>();
let (b2s_prio_statistic_s, b2s_prio_statistic_r) = mpsc::unbounded::<(Pid, u64, u64)>();
let (a2s_disconnect_s, a2s_disconnect_r) = mpsc::unbounded::<A2sDisconnect>();
let (b2s_prio_statistic_s, b2s_prio_statistic_r) = mpsc::unbounded::<B2sPrioStatistic>();
let run_channels = Some(ControlChannels {
a2s_listen_r,
@ -155,10 +153,7 @@ impl Scheduler {
);
}
async fn listen_mgr(
&self,
a2s_listen_r: mpsc::UnboundedReceiver<(ProtocolAddr, oneshot::Sender<io::Result<()>>)>,
) {
async fn listen_mgr(&self, a2s_listen_r: mpsc::UnboundedReceiver<A2sListen>) {
trace!("Start listen_mgr");
a2s_listen_r
.for_each_concurrent(None, |(address, s2a_listen_result_s)| {
@ -253,13 +248,7 @@ impl Scheduler {
trace!("Stop connect_mgr");
}
async fn disconnect_mgr(
&self,
mut a2s_disconnect_r: mpsc::UnboundedReceiver<(
Pid,
oneshot::Sender<async_std::io::Result<()>>,
)>,
) {
async fn disconnect_mgr(&self, mut a2s_disconnect_r: mpsc::UnboundedReceiver<A2sDisconnect>) {
trace!("Start disconnect_mgr");
while let Some((pid, return_once_successful_shutdown)) = a2s_disconnect_r.next().await {
//Closing Participants is done the following way:
@ -289,7 +278,7 @@ impl Scheduler {
async fn prio_adj_mgr(
&self,
mut b2s_prio_statistic_r: mpsc::UnboundedReceiver<(Pid, u64, u64)>,
mut b2s_prio_statistic_r: mpsc::UnboundedReceiver<B2sPrioStatistic>,
) {
trace!("Start prio_adj_mgr");
while let Some((_pid, _frame_cnt, _unused)) = b2s_prio_statistic_r.next().await {
@ -512,7 +501,7 @@ impl Scheduler {
debug!(?cid, "New participant connected via a channel");
let (
bparticipant,
a2b_steam_open_s,
a2b_stream_open_s,
b2a_stream_opened_r,
mut s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
@ -522,7 +511,7 @@ impl Scheduler {
let participant = Participant::new(
local_pid,
pid,
a2b_steam_open_s,
a2b_stream_open_s,
b2a_stream_opened_r,
participant_channels.a2s_disconnect_s,
api_participant_closed,