diff --git a/network/src/api.rs b/network/src/api.rs index 71ebc3c86b..31dcae775c 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -53,7 +53,7 @@ pub struct Participant { remote_pid: Pid, a2b_steam_open_s: RwLock)>>, b2a_stream_opened_r: RwLock>, - closed: AtomicBool, + closed: Arc>>, a2s_disconnect_s: Arc>>, } @@ -92,11 +92,15 @@ pub enum NetworkError { } /// Error type thrown by [`Participants`](Participant) methods -#[derive(Debug)] +#[derive(Debug, PartialEq, Clone)] pub enum ParticipantError { - ///Participant was closed too early to complete the action fully - ParticipantClosed, - GracefulDisconnectFailed(std::io::Error), + ///Participant was closed by remote side + ParticipantDisconnected, + ///Underlying Protocol failed and wasn't able to recover, expect some Data + /// loss unfortunately, there is no method to get the exact messages + /// that failed. This is also returned when local side tries to do + /// something while remote site gracefully disconnects + ProtocolFailedUnrecoverable, } /// Error type thrown by [`Streams`](Stream) methods @@ -389,13 +393,14 @@ impl Participant { a2b_steam_open_s: mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender)>, b2a_stream_opened_r: mpsc::UnboundedReceiver, a2s_disconnect_s: mpsc::UnboundedSender<(Pid, oneshot::Sender>)>, + closed: Arc>>, ) -> Self { Self { local_pid, remote_pid, a2b_steam_open_s: RwLock::new(a2b_steam_open_s), b2a_stream_opened_r: RwLock::new(b2a_stream_opened_r), - closed: AtomicBool::new(false), + closed, a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), } } @@ -444,20 +449,12 @@ impl Participant { //use this lock for now to make sure that only one open at a time is made, // TODO: not sure if we can paralise that, check in future let mut a2b_steam_open_s = self.a2b_steam_open_s.write().await; - if self.closed.load(Ordering::Relaxed) { - warn!(?self.remote_pid, "participant is closed but another open is tried on it"); - return Err(ParticipantError::ParticipantClosed); - } + self.closed.read().await.clone()?; let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel(); - if a2b_steam_open_s + a2b_steam_open_s .send((prio, promises, p2a_return_stream_s)) .await - .is_err() - { - debug!(?self.remote_pid, "stream_open_sender failed, closing participant"); - self.closed.store(true, Ordering::Relaxed); - return Err(ParticipantError::ParticipantClosed); - } + .unwrap(); match p2a_return_stream_r.await { Ok(stream) => { let sid = stream.sid; @@ -466,8 +463,8 @@ impl Participant { }, Err(_) => { debug!(?self.remote_pid, "p2a_return_stream_r failed, closing participant"); - self.closed.store(true, Ordering::Relaxed); - Err(ParticipantError::ParticipantClosed) + *self.closed.write().await = Err(ParticipantError::ProtocolFailedUnrecoverable); + Err(ParticipantError::ProtocolFailedUnrecoverable) }, } } @@ -509,10 +506,7 @@ impl Participant { //use this lock for now to make sure that only one open at a time is made, // TODO: not sure if we can paralise that, check in future let mut stream_opened_receiver = self.b2a_stream_opened_r.write().await; - if self.closed.load(Ordering::Relaxed) { - warn!(?self.remote_pid, "Participant is closed but another open is tried on it"); - return Err(ParticipantError::ParticipantClosed); - } + self.closed.read().await.clone()?; match stream_opened_receiver.next().await { Some(stream) => { let sid = stream.sid; @@ -521,8 +515,8 @@ impl Participant { }, None => { debug!(?self.remote_pid, "stream_opened_receiver failed, closing participant"); - self.closed.store(true, Ordering::Relaxed); - Err(ParticipantError::ParticipantClosed) + *self.closed.write().await = Err(ParticipantError::ProtocolFailedUnrecoverable); + Err(ParticipantError::ProtocolFailedUnrecoverable) }, } } @@ -557,7 +551,7 @@ impl Participant { /// network /// .listen(ProtocolAddr::Tcp("0.0.0.0:2030".parse().unwrap())) /// .await?; - /// # remote.connect(ProtocolAddr::Tcp("0.0.0.0:2030".parse().unwrap())).await?; + /// # let keep_alive = remote.connect(ProtocolAddr::Tcp("0.0.0.0:2030".parse().unwrap())).await?; /// while let Ok(participant) = network.connected().await { /// println!("Participant connected: {}", participant.remote_pid()); /// participant.disconnect().await?; @@ -574,9 +568,13 @@ impl Participant { // Remove, Close and try_unwrap error when unwrap fails! let pid = self.remote_pid; debug!(?pid, "Closing participant from network"); - self.closed.store(true, Ordering::Relaxed); - //Streams will be closed by BParticipant + { + let mut lock = self.closed.write().await; + lock.clone()?; + *lock = Err(ParticipantError::ParticipantDisconnected); + } + //Streams will be closed by BParticipant match self.a2s_disconnect_s.lock().await.take() { Some(mut a2s_disconnect_s) => { let (finished_sender, finished_receiver) = oneshot::channel(); @@ -597,7 +595,7 @@ impl Participant { "Error occured during shutdown of participant and is propagated to \ User" ); - Err(ParticipantError::GracefulDisconnectFailed(e)) + Err(ParticipantError::ProtocolFailedUnrecoverable) }, Err(e) => { //this is a bug. but as i am Participant i can't destroy the network @@ -607,7 +605,7 @@ impl Participant { "Failed to get a message back from the scheduler, seems like the \ network is already closed" ); - Err(ParticipantError::ParticipantClosed) + Err(ParticipantError::ProtocolFailedUnrecoverable) }, } }, @@ -616,7 +614,7 @@ impl Participant { "seems like you are trying to disconnecting a participant after the network \ was already dropped. It was already dropped with the network!" ); - Err(ParticipantError::ParticipantClosed) + Err(ParticipantError::ParticipantDisconnected) }, } } @@ -907,19 +905,16 @@ impl Drop for Participant { .send((self.remote_pid, finished_sender)) .await .expect("Something is wrong in internal scheduler coding"); - match finished_receiver.await { - Ok(Err(e)) => error!( + match finished_receiver + .await + .expect("Something is wrong in internal scheduler/participant coding") + { + Err(e) => error!( ?pid, ?e, "Error while dropping the participant, couldn't send all outgoing \ messages, dropping remaining" ), - Err(e) => warn!( - ?e, - "//TODO i dont know why the finish doesnt work, i normally would \ - expect to have sended a return message from the participant... \ - ignoring to not caue a panic for now, please fix me" - ), _ => (), }; }); @@ -953,15 +948,10 @@ impl Drop for Stream { impl std::fmt::Debug for Participant { #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let status = if self.closed.load(Ordering::Relaxed) { - "[CLOSED]" - } else { - "[OPEN]" - }; write!( f, - "Participant {{{} local_pid: {:?}, remote_pid: {:?} }}", - status, &self.local_pid, &self.remote_pid, + "Participant {{ local_pid: {:?}, remote_pid: {:?} }}", + &self.local_pid, &self.remote_pid, ) } } @@ -970,10 +960,6 @@ impl From> for StreamError { fn from(_err: crossbeam_channel::SendError) -> Self { StreamError::StreamClosed } } -impl From> for ParticipantError { - fn from(_err: crossbeam_channel::SendError) -> Self { ParticipantError::ParticipantClosed } -} - impl From> for NetworkError { fn from(_err: crossbeam_channel::SendError) -> Self { NetworkError::NetworkClosed } } @@ -982,26 +968,14 @@ impl From for StreamError { fn from(_err: std::option::NoneError) -> Self { StreamError::StreamClosed } } -impl From for ParticipantError { - fn from(_err: std::option::NoneError) -> Self { ParticipantError::ParticipantClosed } -} - impl From for NetworkError { fn from(_err: std::option::NoneError) -> Self { NetworkError::NetworkClosed } } -impl From for ParticipantError { - fn from(_err: mpsc::SendError) -> Self { ParticipantError::ParticipantClosed } -} - impl From for NetworkError { fn from(_err: mpsc::SendError) -> Self { NetworkError::NetworkClosed } } -impl From for ParticipantError { - fn from(_err: oneshot::Canceled) -> Self { ParticipantError::ParticipantClosed } -} - impl From for NetworkError { fn from(_err: oneshot::Canceled) -> Self { NetworkError::NetworkClosed } } @@ -1024,9 +998,9 @@ impl core::fmt::Display for StreamError { impl core::fmt::Display for ParticipantError { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { - ParticipantError::ParticipantClosed => write!(f, "Participant closed"), - ParticipantError::GracefulDisconnectFailed(_) => { - write!(f, "Graceful disconnect failed") + ParticipantError::ParticipantDisconnected => write!(f, "Participant disconnect"), + ParticipantError::ProtocolFailedUnrecoverable => { + write!(f, "underlying protocol failed unrecoverable") }, } } diff --git a/network/src/participant.rs b/network/src/participant.rs index fd6c32d798..08fb58cc7e 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -1,5 +1,5 @@ use crate::{ - api::Stream, + api::{ParticipantError, Stream}, channel::Channel, message::{IncomingMessage, MessageBuffer, OutgoingMessage}, metrics::{NetworkMetrics, PidCidFrameCache}, @@ -11,7 +11,6 @@ use async_std::sync::RwLock; use futures::{ channel::{mpsc, oneshot}, future::FutureExt, - lock::Mutex, select, sink::SinkExt, stream::StreamExt, @@ -54,12 +53,6 @@ struct ControlChannels { s2b_shutdown_bparticipant_r: oneshot::Receiver>>, /* own */ } -//this is needed in case of a shutdown -struct BParticipantShutdown { - b2b_prios_flushed_r: oneshot::Receiver<()>, - mgr_to_shutdown: Vec>, -} - #[derive(Debug)] pub struct BParticipant { remote_pid: Pid, @@ -67,10 +60,10 @@ pub struct BParticipant { offset_sid: Sid, channels: Arc>>, streams: RwLock>, + api_participant_closed: Arc>>, running_mgr: AtomicUsize, run_channels: Option, metrics: Arc, - shutdown_info: Mutex>, no_channel_error_info: RwLock<(Instant, u64)>, } @@ -86,6 +79,7 @@ impl BParticipant { mpsc::UnboundedReceiver, mpsc::UnboundedSender<(Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>)>, oneshot::Sender>>, + Arc>>, ) { let (a2b_steam_open_s, a2b_steam_open_r) = mpsc::unbounded::<(Prio, Promises, oneshot::Sender)>(); @@ -103,6 +97,8 @@ impl BParticipant { s2b_shutdown_bparticipant_r, }); + let api_participant_closed = Arc::new(RwLock::new(Ok(()))); + ( Self { remote_pid, @@ -110,16 +106,17 @@ impl BParticipant { offset_sid, channels: Arc::new(RwLock::new(vec![])), streams: RwLock::new(HashMap::new()), + api_participant_closed: api_participant_closed.clone(), running_mgr: AtomicUsize::new(0), run_channels, metrics, no_channel_error_info: RwLock::new((Instant::now(), 0)), - shutdown_info: Mutex::new(None), }, a2b_steam_open_s, b2a_stream_opened_r, s2b_create_channel_s, s2b_shutdown_bparticipant_s, + api_participant_closed, ) } @@ -134,14 +131,6 @@ impl BParticipant { let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<(Cid, Frame)>(); let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = PrioManager::new(self.metrics.clone(), self.remote_pid_string.clone()); - *self.shutdown_info.lock().await = Some(BParticipantShutdown { - b2b_prios_flushed_r, - mgr_to_shutdown: vec![ - shutdown_send_mgr_sender, - shutdown_open_mgr_sender, - shutdown_stream_close_mgr_sender, - ], - }); let run_channels = self.run_channels.take().unwrap(); futures::join!( @@ -169,7 +158,15 @@ impl BParticipant { shutdown_stream_close_mgr_receiver, b2p_notify_empty_stream_s, ), - self.participant_shutdown_mgr(run_channels.s2b_shutdown_bparticipant_r,), + self.participant_shutdown_mgr( + run_channels.s2b_shutdown_bparticipant_r, + b2b_prios_flushed_r, + vec![ + shutdown_send_mgr_sender, + shutdown_open_mgr_sender, + shutdown_stream_close_mgr_sender, + ], + ), ); } @@ -190,7 +187,6 @@ impl BParticipant { trace!("Start send_mgr"); let mut send_cache = PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid); - //while !self.closed.load(Ordering::Relaxed) { loop { let mut frames = VecDeque::new(); prios.fill_frames(FRAMES_PER_TICK, &mut frames).await; @@ -258,7 +254,8 @@ impl BParticipant { will be closed, but not if we do channel-takeover" ); //TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant - self.close_participant(2).await; + self.close_api(ParticipantError::ProtocolFailedUnrecoverable) + .await; false } else { true @@ -393,7 +390,8 @@ impl BParticipant { }, Frame::Shutdown => { debug!("Shutdown received from remote side"); - self.close_participant(2).await; + self.close_api(ParticipantError::ParticipantDisconnected) + .await; }, f => unreachable!("Frame should never reache participant!: {:?}", f), } @@ -510,14 +508,59 @@ impl BParticipant { /// wait for everything to go right! Then return 1. Shutting down /// Streams for API and End user! 2. Wait for all "prio queued" Messages /// to be send. 3. Send Stream + /// If BParticipant kills itself managers stay active till this function is + /// called by api to get the result status async fn participant_shutdown_mgr( &self, s2b_shutdown_bparticipant_r: oneshot::Receiver>>, + b2b_prios_flushed_r: oneshot::Receiver<()>, + mut mgr_to_shutdown: Vec>, ) { self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("Start participant_shutdown_mgr"); let sender = s2b_shutdown_bparticipant_r.await.unwrap(); - self.close_participant(1).await; + + //Todo: isn't ParticipantDisconnected useless, as api is waiting rn for a + // callback? + self.close_api(ParticipantError::ParticipantDisconnected) + .await; + + debug!("Closing all managers"); + for sender in mgr_to_shutdown.drain(..) { + if let Err(e) = sender.send(()) { + warn!(?e, "Manager seems to be closed already, weird, maybe a bug"); + }; + } + + b2b_prios_flushed_r.await.unwrap(); + debug!("Closing all channels, after flushed prios"); + for ci in self.channels.write().await.drain(..) { + if let Err(e) = ci.b2r_read_shutdown.send(()) { + debug!(?e, ?ci.cid, "Seems like this read protocol got already dropped by closing the Stream itself, just ignoring the fact"); + }; + } + + //Wait for other bparticipants mgr to close via AtomicUsize + const SLEEP_TIME: Duration = Duration::from_millis(5); + const ALLOWED_MANAGER: usize = 1; + async_std::task::sleep(SLEEP_TIME).await; + let mut i: u32 = 1; + while self.running_mgr.load(Ordering::Relaxed) > ALLOWED_MANAGER { + i += 1; + if i.rem_euclid(10) == 1 { + trace!( + ?ALLOWED_MANAGER, + "Waiting for bparticipant mgr to shut down, remaining {}", + self.running_mgr.load(Ordering::Relaxed) - ALLOWED_MANAGER + ); + } + async_std::task::sleep(SLEEP_TIME * i).await; + } + trace!("All BParticipant mgr (except me) are shut down now"); + + self.metrics.participants_disconnected_total.inc(); + debug!("BParticipant close done"); + sender.send(Ok(())).unwrap(); trace!("Stop participant_shutdown_mgr"); self.running_mgr.fetch_sub(1, Ordering::Relaxed); @@ -613,56 +656,13 @@ impl BParticipant { ) } - /// this will gracefully shut down the bparticipant - /// allowed_managers: the number of open managers to sleep on. Must be 1 for - /// shutdown_mgr and 2 if it comes from a send error. - async fn close_participant(&self, allowed_managers: usize) { - trace!("Participant shutdown triggered"); - let mut info = match self.shutdown_info.lock().await.take() { - Some(info) => info, - None => { - //This can happen if >=2 different async fn found out the protocol got dropped - // but they haven't shut down so far - debug!("Close of participant seemed to be called twice, ignoring the 2nd close"); - return; - }, - }; - debug!("Closing all managers"); - for sender in info.mgr_to_shutdown.drain(..) { - if let Err(e) = sender.send(()) { - warn!(?e, "Manager seems to be closed already, weird, maybe a bug"); - }; - } + /// close streams and set err + async fn close_api(&self, err: ParticipantError) { + *self.api_participant_closed.write().await = Err(err); debug!("Closing all streams"); for (sid, si) in self.streams.write().await.drain() { trace!(?sid, "Shutting down Stream"); si.closed.store(true, Ordering::Relaxed); } - debug!("Waiting for prios to be flushed"); - info.b2b_prios_flushed_r.await.unwrap(); - debug!("Closing all channels"); - for ci in self.channels.write().await.drain(..) { - if let Err(e) = ci.b2r_read_shutdown.send(()) { - debug!(?e, ?ci.cid, "Seems like this read protocol got already dropped by closing the Stream itself, just ignoring the fact"); - }; - } - //Wait for other bparticipants mgr to close via AtomicUsize - const SLEEP_TIME: Duration = Duration::from_millis(5); - async_std::task::sleep(SLEEP_TIME).await; - let mut i: u32 = 1; - while self.running_mgr.load(Ordering::Relaxed) > allowed_managers { - i += 1; - if i.rem_euclid(10) == 1 { - trace!( - ?allowed_managers, - "Waiting for bparticipant mgr to shut down, remaining {}", - self.running_mgr.load(Ordering::Relaxed) - allowed_managers - ); - } - async_std::task::sleep(SLEEP_TIME * i).await; - } - trace!("All BParticipant mgr (except me) are shut down now"); - self.metrics.participants_disconnected_total.inc(); - debug!("BParticipant close done"); } } diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index bd83e3f924..7917266638 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -516,6 +516,7 @@ impl Scheduler { b2a_stream_opened_r, mut s2b_create_channel_s, s2b_shutdown_bparticipant_s, + api_participant_closed, ) = BParticipant::new(pid, sid, metrics.clone()); let participant = Participant::new( @@ -524,6 +525,7 @@ impl Scheduler { a2b_steam_open_s, b2a_stream_opened_r, participant_channels.a2s_disconnect_s, + api_participant_closed, ); metrics.participants_connected_total.inc(); diff --git a/network/tests/closing.rs b/network/tests/closing.rs index 2bd9a92acd..1b5b74fb2a 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -22,7 +22,10 @@ fn close_participant() { let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp())); block_on(p1_a.disconnect()).unwrap(); - block_on(p1_b.disconnect()).unwrap(); + //We dont know of if the disconect is done YET, so the next command will either + // return already closed or fail a gracefully close as it will discover that the + // remote site closed right now + assert!(block_on(p1_b.disconnect()).is_err()); assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed)); assert_eq!(