diff --git a/network/src/api.rs b/network/src/api.rs index d31de577b2..39b47ad1ce 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -4,6 +4,7 @@ //! (cd network/examples/async_recv && RUST_BACKTRACE=1 cargo run) use crate::{ message::{self, partial_eq_bincode, IncomingMessage, MessageBuffer, OutgoingMessage}, + participant::{A2bStreamOpen, S2bShutdownBparticipant}, scheduler::Scheduler, types::{Mid, Pid, Prio, Promises, Sid}, }; @@ -30,8 +31,7 @@ use std::{ use tracing::*; use tracing_futures::Instrument; -type ParticipantCloseChannel = - mpsc::UnboundedSender<(Pid, oneshot::Sender>)>; +type A2sDisconnect = Arc>>>; /// Represents a Tcp or Udp or Mpsc address #[derive(Clone, Debug, Hash, PartialEq, Eq)] @@ -51,10 +51,10 @@ pub enum ProtocolAddr { pub struct Participant { local_pid: Pid, remote_pid: Pid, - a2b_steam_open_s: RwLock)>>, + a2b_stream_open_s: RwLock>, b2a_stream_opened_r: RwLock>, closed: Arc>>, - a2s_disconnect_s: Arc>>, + a2s_disconnect_s: A2sDisconnect, } /// `Streams` represents a channel to send `n` messages with a certain priority @@ -147,8 +147,7 @@ pub enum StreamError { /// [`connected`]: Network::connected pub struct Network { local_pid: Pid, - participant_disconnect_sender: - RwLock>>>>, + participant_disconnect_sender: RwLock>, listen_sender: RwLock>)>>, connect_sender: @@ -390,15 +389,15 @@ impl Participant { pub(crate) fn new( local_pid: Pid, remote_pid: Pid, - a2b_steam_open_s: mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender)>, + a2b_stream_open_s: mpsc::UnboundedSender, b2a_stream_opened_r: mpsc::UnboundedReceiver, - a2s_disconnect_s: mpsc::UnboundedSender<(Pid, oneshot::Sender>)>, + a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>, closed: Arc>>, ) -> Self { Self { local_pid, remote_pid, - a2b_steam_open_s: RwLock::new(a2b_steam_open_s), + a2b_stream_open_s: RwLock::new(a2b_stream_open_s), b2a_stream_opened_r: RwLock::new(b2a_stream_opened_r), closed, a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), @@ -448,10 +447,10 @@ impl Participant { pub async fn open(&self, prio: u8, promises: Promises) -> Result { //use this lock for now to make sure that only one open at a time is made, // TODO: not sure if we can paralise that, check in future - let mut a2b_steam_open_s = self.a2b_steam_open_s.write().await; + let mut a2b_stream_open_s = self.a2b_stream_open_s.write().await; self.closed.read().await.clone()?; let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel(); - a2b_steam_open_s + a2b_stream_open_s .send((prio, promises, p2a_return_stream_s)) .await .unwrap(); diff --git a/network/src/participant.rs b/network/src/participant.rs index 08fb58cc7e..901b10b4c7 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -25,6 +25,11 @@ use std::{ }; use tracing::*; +pub(crate) type A2bStreamOpen = (Prio, Promises, oneshot::Sender); +pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>); +pub(crate) type S2bShutdownBparticipant = oneshot::Sender>; +pub(crate) type B2sPrioStatistic = (Pid, u64, u64); + #[derive(Debug)] struct ChannelInfo { cid: Cid, @@ -42,15 +47,13 @@ struct StreamInfo { } #[derive(Debug)] -#[allow(clippy::type_complexity)] struct ControlChannels { - a2b_steam_open_r: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender)>, + a2b_stream_open_r: mpsc::UnboundedReceiver, b2a_stream_opened_s: mpsc::UnboundedSender, - s2b_create_channel_r: - mpsc::UnboundedReceiver<(Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>)>, + s2b_create_channel_r: mpsc::UnboundedReceiver, a2b_close_stream_r: mpsc::UnboundedReceiver, a2b_close_stream_s: mpsc::UnboundedSender, - s2b_shutdown_bparticipant_r: oneshot::Receiver>>, /* own */ + s2b_shutdown_bparticipant_r: oneshot::Receiver, /* own */ } #[derive(Debug)] @@ -75,21 +78,20 @@ impl BParticipant { metrics: Arc, ) -> ( Self, - mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender)>, + mpsc::UnboundedSender, mpsc::UnboundedReceiver, - mpsc::UnboundedSender<(Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>)>, - oneshot::Sender>>, + mpsc::UnboundedSender, + oneshot::Sender, Arc>>, ) { - let (a2b_steam_open_s, a2b_steam_open_r) = - mpsc::unbounded::<(Prio, Promises, oneshot::Sender)>(); + let (a2b_steam_open_s, a2b_stream_open_r) = mpsc::unbounded::(); let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded::(); let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded(); let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel(); let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded(); let run_channels = Some(ControlChannels { - a2b_steam_open_r, + a2b_stream_open_r, b2a_stream_opened_s, s2b_create_channel_r, a2b_close_stream_r, @@ -120,7 +122,7 @@ impl BParticipant { ) } - pub async fn run(mut self, b2s_prio_statistic_s: mpsc::UnboundedSender<(Pid, u64, u64)>) { + pub async fn run(mut self, b2s_prio_statistic_s: mpsc::UnboundedSender) { //those managers that listen on api::Participant need an additional oneshot for // shutdown scenario, those handled by scheduler will be closed by it. let (shutdown_send_mgr_sender, shutdown_send_mgr_receiver) = oneshot::channel(); @@ -135,7 +137,7 @@ impl BParticipant { let run_channels = self.run_channels.take().unwrap(); futures::join!( self.open_mgr( - run_channels.a2b_steam_open_r, + run_channels.a2b_stream_open_r, run_channels.a2b_close_stream_s.clone(), a2p_msg_s.clone(), shutdown_open_mgr_receiver, @@ -175,7 +177,7 @@ impl BParticipant { mut prios: PrioManager, mut shutdown_send_mgr_receiver: oneshot::Receiver<()>, b2b_prios_flushed_s: oneshot::Sender<()>, - mut b2s_prio_statistic_s: mpsc::UnboundedSender<(Pid, u64, u64)>, + mut b2s_prio_statistic_s: mpsc::UnboundedSender, ) { //This time equals the MINIMUM Latency in average, so keep it down and //Todo: // make it configureable or switch to await E.g. Prio 0 = await, prio 50 @@ -407,16 +409,9 @@ impl BParticipant { self.running_mgr.fetch_sub(1, Ordering::Relaxed); } - #[allow(clippy::type_complexity)] async fn create_channel_mgr( &self, - s2b_create_channel_r: mpsc::UnboundedReceiver<( - Cid, - Sid, - Protocols, - Vec<(Cid, Frame)>, - oneshot::Sender<()>, - )>, + s2b_create_channel_r: mpsc::UnboundedReceiver, w2b_frames_s: mpsc::UnboundedSender<(Cid, Frame)>, ) { self.running_mgr.fetch_add(1, Ordering::Relaxed); @@ -461,7 +456,7 @@ impl BParticipant { async fn open_mgr( &self, - mut a2b_steam_open_r: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender)>, + mut a2b_stream_open_r: mpsc::UnboundedReceiver, a2b_close_stream_s: mpsc::UnboundedSender, a2p_msg_s: crossbeam_channel::Sender<(Prio, Sid, OutgoingMessage)>, shutdown_open_mgr_receiver: oneshot::Receiver<()>, @@ -474,7 +469,7 @@ impl BParticipant { let mut shutdown_open_mgr_receiver = shutdown_open_mgr_receiver.fuse(); //from api or shutdown signal while let Some((prio, promises, p2a_return_stream)) = select! { - next = a2b_steam_open_r.next().fuse() => next, + next = a2b_stream_open_r.next().fuse() => next, _ = shutdown_open_mgr_receiver => None, } { debug!(?prio, ?promises, "Got request to open a new steam"); @@ -512,7 +507,7 @@ impl BParticipant { /// called by api to get the result status async fn participant_shutdown_mgr( &self, - s2b_shutdown_bparticipant_r: oneshot::Receiver>>, + s2b_shutdown_bparticipant_r: oneshot::Receiver, b2b_prios_flushed_r: oneshot::Receiver<()>, mut mgr_to_shutdown: Vec>, ) { diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 7917266638..931fabb2a1 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -2,9 +2,9 @@ use crate::{ api::{Participant, ProtocolAddr}, channel::Handshake, metrics::NetworkMetrics, - participant::BParticipant, + participant::{B2sPrioStatistic, BParticipant, S2bCreateChannel}, protocols::{Protocols, TcpProtocol, UdpProtocol}, - types::{Cid, Frame, Pid, Sid}, + types::Pid, }; use async_std::{ io, net, @@ -30,16 +30,6 @@ use std::{ use tracing::*; use tracing_futures::Instrument; -#[derive(Debug)] -#[allow(clippy::type_complexity)] -struct ParticipantInfo { - secret: u128, - s2b_create_channel_s: - mpsc::UnboundedSender<(Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>)>, - s2b_shutdown_bparticipant_s: - Option>>>, -} - /// Naming of Channels `x2x` /// - a: api /// - s: scheduler @@ -48,21 +38,33 @@ struct ParticipantInfo { /// - r: protocol /// - w: wire /// - c: channel/handshake + +#[derive(Debug)] +struct ParticipantInfo { + secret: u128, + s2b_create_channel_s: mpsc::UnboundedSender, + s2b_shutdown_bparticipant_s: + Option>>>, +} + +type A2sListen = (ProtocolAddr, oneshot::Sender>); +type A2sConnect = (ProtocolAddr, oneshot::Sender>); +type A2sDisconnect = (Pid, oneshot::Sender>); + #[derive(Debug)] struct ControlChannels { - a2s_listen_r: mpsc::UnboundedReceiver<(ProtocolAddr, oneshot::Sender>)>, - a2s_connect_r: - mpsc::UnboundedReceiver<(ProtocolAddr, oneshot::Sender>)>, + a2s_listen_r: mpsc::UnboundedReceiver, + a2s_connect_r: mpsc::UnboundedReceiver, a2s_scheduler_shutdown_r: oneshot::Receiver<()>, - a2s_disconnect_r: mpsc::UnboundedReceiver<(Pid, oneshot::Sender>)>, - b2s_prio_statistic_r: mpsc::UnboundedReceiver<(Pid, u64, u64)>, + a2s_disconnect_r: mpsc::UnboundedReceiver, + b2s_prio_statistic_r: mpsc::UnboundedReceiver, } #[derive(Debug, Clone)] struct ParticipantChannels { s2a_connected_s: mpsc::UnboundedSender, - a2s_disconnect_s: mpsc::UnboundedSender<(Pid, oneshot::Sender>)>, - b2s_prio_statistic_s: mpsc::UnboundedSender<(Pid, u64, u64)>, + a2s_disconnect_s: mpsc::UnboundedSender, + b2s_prio_statistic_s: mpsc::UnboundedSender, } #[derive(Debug)] @@ -80,26 +82,22 @@ pub struct Scheduler { } impl Scheduler { - #[allow(clippy::type_complexity)] pub fn new( local_pid: Pid, registry: Option<&Registry>, ) -> ( Self, - mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender>)>, - mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender>)>, + mpsc::UnboundedSender, + mpsc::UnboundedSender, mpsc::UnboundedReceiver, oneshot::Sender<()>, ) { - let (a2s_listen_s, a2s_listen_r) = - mpsc::unbounded::<(ProtocolAddr, oneshot::Sender>)>(); - let (a2s_connect_s, a2s_connect_r) = - mpsc::unbounded::<(ProtocolAddr, oneshot::Sender>)>(); + let (a2s_listen_s, a2s_listen_r) = mpsc::unbounded::(); + let (a2s_connect_s, a2s_connect_r) = mpsc::unbounded::(); let (s2a_connected_s, s2a_connected_r) = mpsc::unbounded::(); let (a2s_scheduler_shutdown_s, a2s_scheduler_shutdown_r) = oneshot::channel::<()>(); - let (a2s_disconnect_s, a2s_disconnect_r) = - mpsc::unbounded::<(Pid, oneshot::Sender>)>(); - let (b2s_prio_statistic_s, b2s_prio_statistic_r) = mpsc::unbounded::<(Pid, u64, u64)>(); + let (a2s_disconnect_s, a2s_disconnect_r) = mpsc::unbounded::(); + let (b2s_prio_statistic_s, b2s_prio_statistic_r) = mpsc::unbounded::(); let run_channels = Some(ControlChannels { a2s_listen_r, @@ -155,10 +153,7 @@ impl Scheduler { ); } - async fn listen_mgr( - &self, - a2s_listen_r: mpsc::UnboundedReceiver<(ProtocolAddr, oneshot::Sender>)>, - ) { + async fn listen_mgr(&self, a2s_listen_r: mpsc::UnboundedReceiver) { trace!("Start listen_mgr"); a2s_listen_r .for_each_concurrent(None, |(address, s2a_listen_result_s)| { @@ -253,13 +248,7 @@ impl Scheduler { trace!("Stop connect_mgr"); } - async fn disconnect_mgr( - &self, - mut a2s_disconnect_r: mpsc::UnboundedReceiver<( - Pid, - oneshot::Sender>, - )>, - ) { + async fn disconnect_mgr(&self, mut a2s_disconnect_r: mpsc::UnboundedReceiver) { trace!("Start disconnect_mgr"); while let Some((pid, return_once_successful_shutdown)) = a2s_disconnect_r.next().await { //Closing Participants is done the following way: @@ -289,7 +278,7 @@ impl Scheduler { async fn prio_adj_mgr( &self, - mut b2s_prio_statistic_r: mpsc::UnboundedReceiver<(Pid, u64, u64)>, + mut b2s_prio_statistic_r: mpsc::UnboundedReceiver, ) { trace!("Start prio_adj_mgr"); while let Some((_pid, _frame_cnt, _unused)) = b2s_prio_statistic_r.next().await { @@ -512,7 +501,7 @@ impl Scheduler { debug!(?cid, "New participant connected via a channel"); let ( bparticipant, - a2b_steam_open_s, + a2b_stream_open_s, b2a_stream_opened_r, mut s2b_create_channel_s, s2b_shutdown_bparticipant_s, @@ -522,7 +511,7 @@ impl Scheduler { let participant = Participant::new( local_pid, pid, - a2b_steam_open_s, + a2b_stream_open_s, b2a_stream_opened_r, participant_channels.a2s_disconnect_s, api_participant_closed,