fix Participant shutdown

- we had the problem that Participants couldn't shutdown them self, only by scheduler, which was controlled by api.
  it's needed e.g. to handle the Schudown Frame
 - my initial solution did a full shutdown, which was a problem if in parallel a 2nd shutdown was requested, no possibility of getting the error
 - new solution will only deactivate Participant and Stream. and then still functions correctly, till the api closes the participant and calls the scheduler which then calls the bparticipant again
 - i experimented with a Mutex<oneshot> or 2 and a `select` but it didn't prove that well
 - also adjusted the Error messages to now either Disconnected when gracefully shutdown or ProtocolFailed when some msg couldn't be delivered
  (note later might not be 100% returned correctly yet)
This commit is contained in:
Marcel Märtens 2020-07-11 14:34:01 +02:00
parent df45d35c0e
commit 187ec42aa2
4 changed files with 114 additions and 135 deletions

View File

@ -53,7 +53,7 @@ pub struct Participant {
remote_pid: Pid,
a2b_steam_open_s: RwLock<mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>>,
b2a_stream_opened_r: RwLock<mpsc::UnboundedReceiver<Stream>>,
closed: AtomicBool,
closed: Arc<RwLock<Result<(), ParticipantError>>>,
a2s_disconnect_s: Arc<Mutex<Option<ParticipantCloseChannel>>>,
}
@ -92,11 +92,15 @@ pub enum NetworkError {
}
/// Error type thrown by [`Participants`](Participant) methods
#[derive(Debug)]
#[derive(Debug, PartialEq, Clone)]
pub enum ParticipantError {
///Participant was closed too early to complete the action fully
ParticipantClosed,
GracefulDisconnectFailed(std::io::Error),
///Participant was closed by remote side
ParticipantDisconnected,
///Underlying Protocol failed and wasn't able to recover, expect some Data
/// loss unfortunately, there is no method to get the exact messages
/// that failed. This is also returned when local side tries to do
/// something while remote site gracefully disconnects
ProtocolFailedUnrecoverable,
}
/// Error type thrown by [`Streams`](Stream) methods
@ -389,13 +393,14 @@ impl Participant {
a2b_steam_open_s: mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>,
b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, oneshot::Sender<async_std::io::Result<()>>)>,
closed: Arc<RwLock<Result<(), ParticipantError>>>,
) -> Self {
Self {
local_pid,
remote_pid,
a2b_steam_open_s: RwLock::new(a2b_steam_open_s),
b2a_stream_opened_r: RwLock::new(b2a_stream_opened_r),
closed: AtomicBool::new(false),
closed,
a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
}
}
@ -444,20 +449,12 @@ impl Participant {
//use this lock for now to make sure that only one open at a time is made,
// TODO: not sure if we can paralise that, check in future
let mut a2b_steam_open_s = self.a2b_steam_open_s.write().await;
if self.closed.load(Ordering::Relaxed) {
warn!(?self.remote_pid, "participant is closed but another open is tried on it");
return Err(ParticipantError::ParticipantClosed);
}
self.closed.read().await.clone()?;
let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel();
if a2b_steam_open_s
a2b_steam_open_s
.send((prio, promises, p2a_return_stream_s))
.await
.is_err()
{
debug!(?self.remote_pid, "stream_open_sender failed, closing participant");
self.closed.store(true, Ordering::Relaxed);
return Err(ParticipantError::ParticipantClosed);
}
.unwrap();
match p2a_return_stream_r.await {
Ok(stream) => {
let sid = stream.sid;
@ -466,8 +463,8 @@ impl Participant {
},
Err(_) => {
debug!(?self.remote_pid, "p2a_return_stream_r failed, closing participant");
self.closed.store(true, Ordering::Relaxed);
Err(ParticipantError::ParticipantClosed)
*self.closed.write().await = Err(ParticipantError::ProtocolFailedUnrecoverable);
Err(ParticipantError::ProtocolFailedUnrecoverable)
},
}
}
@ -509,10 +506,7 @@ impl Participant {
//use this lock for now to make sure that only one open at a time is made,
// TODO: not sure if we can paralise that, check in future
let mut stream_opened_receiver = self.b2a_stream_opened_r.write().await;
if self.closed.load(Ordering::Relaxed) {
warn!(?self.remote_pid, "Participant is closed but another open is tried on it");
return Err(ParticipantError::ParticipantClosed);
}
self.closed.read().await.clone()?;
match stream_opened_receiver.next().await {
Some(stream) => {
let sid = stream.sid;
@ -521,8 +515,8 @@ impl Participant {
},
None => {
debug!(?self.remote_pid, "stream_opened_receiver failed, closing participant");
self.closed.store(true, Ordering::Relaxed);
Err(ParticipantError::ParticipantClosed)
*self.closed.write().await = Err(ParticipantError::ProtocolFailedUnrecoverable);
Err(ParticipantError::ProtocolFailedUnrecoverable)
},
}
}
@ -557,7 +551,7 @@ impl Participant {
/// network
/// .listen(ProtocolAddr::Tcp("0.0.0.0:2030".parse().unwrap()))
/// .await?;
/// # remote.connect(ProtocolAddr::Tcp("0.0.0.0:2030".parse().unwrap())).await?;
/// # let keep_alive = remote.connect(ProtocolAddr::Tcp("0.0.0.0:2030".parse().unwrap())).await?;
/// while let Ok(participant) = network.connected().await {
/// println!("Participant connected: {}", participant.remote_pid());
/// participant.disconnect().await?;
@ -574,9 +568,13 @@ impl Participant {
// Remove, Close and try_unwrap error when unwrap fails!
let pid = self.remote_pid;
debug!(?pid, "Closing participant from network");
self.closed.store(true, Ordering::Relaxed);
//Streams will be closed by BParticipant
{
let mut lock = self.closed.write().await;
lock.clone()?;
*lock = Err(ParticipantError::ParticipantDisconnected);
}
//Streams will be closed by BParticipant
match self.a2s_disconnect_s.lock().await.take() {
Some(mut a2s_disconnect_s) => {
let (finished_sender, finished_receiver) = oneshot::channel();
@ -597,7 +595,7 @@ impl Participant {
"Error occured during shutdown of participant and is propagated to \
User"
);
Err(ParticipantError::GracefulDisconnectFailed(e))
Err(ParticipantError::ProtocolFailedUnrecoverable)
},
Err(e) => {
//this is a bug. but as i am Participant i can't destroy the network
@ -607,7 +605,7 @@ impl Participant {
"Failed to get a message back from the scheduler, seems like the \
network is already closed"
);
Err(ParticipantError::ParticipantClosed)
Err(ParticipantError::ProtocolFailedUnrecoverable)
},
}
},
@ -616,7 +614,7 @@ impl Participant {
"seems like you are trying to disconnecting a participant after the network \
was already dropped. It was already dropped with the network!"
);
Err(ParticipantError::ParticipantClosed)
Err(ParticipantError::ParticipantDisconnected)
},
}
}
@ -907,19 +905,16 @@ impl Drop for Participant {
.send((self.remote_pid, finished_sender))
.await
.expect("Something is wrong in internal scheduler coding");
match finished_receiver.await {
Ok(Err(e)) => error!(
match finished_receiver
.await
.expect("Something is wrong in internal scheduler/participant coding")
{
Err(e) => error!(
?pid,
?e,
"Error while dropping the participant, couldn't send all outgoing \
messages, dropping remaining"
),
Err(e) => warn!(
?e,
"//TODO i dont know why the finish doesnt work, i normally would \
expect to have sended a return message from the participant... \
ignoring to not caue a panic for now, please fix me"
),
_ => (),
};
});
@ -953,15 +948,10 @@ impl Drop for Stream {
impl std::fmt::Debug for Participant {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let status = if self.closed.load(Ordering::Relaxed) {
"[CLOSED]"
} else {
"[OPEN]"
};
write!(
f,
"Participant {{{} local_pid: {:?}, remote_pid: {:?} }}",
status, &self.local_pid, &self.remote_pid,
"Participant {{ local_pid: {:?}, remote_pid: {:?} }}",
&self.local_pid, &self.remote_pid,
)
}
}
@ -970,10 +960,6 @@ impl<T> From<crossbeam_channel::SendError<T>> for StreamError {
fn from(_err: crossbeam_channel::SendError<T>) -> Self { StreamError::StreamClosed }
}
impl<T> From<crossbeam_channel::SendError<T>> for ParticipantError {
fn from(_err: crossbeam_channel::SendError<T>) -> Self { ParticipantError::ParticipantClosed }
}
impl<T> From<crossbeam_channel::SendError<T>> for NetworkError {
fn from(_err: crossbeam_channel::SendError<T>) -> Self { NetworkError::NetworkClosed }
}
@ -982,26 +968,14 @@ impl From<std::option::NoneError> for StreamError {
fn from(_err: std::option::NoneError) -> Self { StreamError::StreamClosed }
}
impl From<std::option::NoneError> for ParticipantError {
fn from(_err: std::option::NoneError) -> Self { ParticipantError::ParticipantClosed }
}
impl From<std::option::NoneError> for NetworkError {
fn from(_err: std::option::NoneError) -> Self { NetworkError::NetworkClosed }
}
impl From<mpsc::SendError> for ParticipantError {
fn from(_err: mpsc::SendError) -> Self { ParticipantError::ParticipantClosed }
}
impl From<mpsc::SendError> for NetworkError {
fn from(_err: mpsc::SendError) -> Self { NetworkError::NetworkClosed }
}
impl From<oneshot::Canceled> for ParticipantError {
fn from(_err: oneshot::Canceled) -> Self { ParticipantError::ParticipantClosed }
}
impl From<oneshot::Canceled> for NetworkError {
fn from(_err: oneshot::Canceled) -> Self { NetworkError::NetworkClosed }
}
@ -1024,9 +998,9 @@ impl core::fmt::Display for StreamError {
impl core::fmt::Display for ParticipantError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
ParticipantError::ParticipantClosed => write!(f, "Participant closed"),
ParticipantError::GracefulDisconnectFailed(_) => {
write!(f, "Graceful disconnect failed")
ParticipantError::ParticipantDisconnected => write!(f, "Participant disconnect"),
ParticipantError::ProtocolFailedUnrecoverable => {
write!(f, "underlying protocol failed unrecoverable")
},
}
}

View File

@ -1,5 +1,5 @@
use crate::{
api::Stream,
api::{ParticipantError, Stream},
channel::Channel,
message::{IncomingMessage, MessageBuffer, OutgoingMessage},
metrics::{NetworkMetrics, PidCidFrameCache},
@ -11,7 +11,6 @@ use async_std::sync::RwLock;
use futures::{
channel::{mpsc, oneshot},
future::FutureExt,
lock::Mutex,
select,
sink::SinkExt,
stream::StreamExt,
@ -54,12 +53,6 @@ struct ControlChannels {
s2b_shutdown_bparticipant_r: oneshot::Receiver<oneshot::Sender<async_std::io::Result<()>>>, /* own */
}
//this is needed in case of a shutdown
struct BParticipantShutdown {
b2b_prios_flushed_r: oneshot::Receiver<()>,
mgr_to_shutdown: Vec<oneshot::Sender<()>>,
}
#[derive(Debug)]
pub struct BParticipant {
remote_pid: Pid,
@ -67,10 +60,10 @@ pub struct BParticipant {
offset_sid: Sid,
channels: Arc<RwLock<Vec<ChannelInfo>>>,
streams: RwLock<HashMap<Sid, StreamInfo>>,
api_participant_closed: Arc<RwLock<Result<(), ParticipantError>>>,
running_mgr: AtomicUsize,
run_channels: Option<ControlChannels>,
metrics: Arc<NetworkMetrics>,
shutdown_info: Mutex<Option<BParticipantShutdown>>,
no_channel_error_info: RwLock<(Instant, u64)>,
}
@ -86,6 +79,7 @@ impl BParticipant {
mpsc::UnboundedReceiver<Stream>,
mpsc::UnboundedSender<(Cid, Sid, Protocols, Vec<(Cid, Frame)>, oneshot::Sender<()>)>,
oneshot::Sender<oneshot::Sender<async_std::io::Result<()>>>,
Arc<RwLock<Result<(), ParticipantError>>>,
) {
let (a2b_steam_open_s, a2b_steam_open_r) =
mpsc::unbounded::<(Prio, Promises, oneshot::Sender<Stream>)>();
@ -103,6 +97,8 @@ impl BParticipant {
s2b_shutdown_bparticipant_r,
});
let api_participant_closed = Arc::new(RwLock::new(Ok(())));
(
Self {
remote_pid,
@ -110,16 +106,17 @@ impl BParticipant {
offset_sid,
channels: Arc::new(RwLock::new(vec![])),
streams: RwLock::new(HashMap::new()),
api_participant_closed: api_participant_closed.clone(),
running_mgr: AtomicUsize::new(0),
run_channels,
metrics,
no_channel_error_info: RwLock::new((Instant::now(), 0)),
shutdown_info: Mutex::new(None),
},
a2b_steam_open_s,
b2a_stream_opened_r,
s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
api_participant_closed,
)
}
@ -134,14 +131,6 @@ impl BParticipant {
let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<(Cid, Frame)>();
let (prios, a2p_msg_s, b2p_notify_empty_stream_s) =
PrioManager::new(self.metrics.clone(), self.remote_pid_string.clone());
*self.shutdown_info.lock().await = Some(BParticipantShutdown {
b2b_prios_flushed_r,
mgr_to_shutdown: vec![
shutdown_send_mgr_sender,
shutdown_open_mgr_sender,
shutdown_stream_close_mgr_sender,
],
});
let run_channels = self.run_channels.take().unwrap();
futures::join!(
@ -169,7 +158,15 @@ impl BParticipant {
shutdown_stream_close_mgr_receiver,
b2p_notify_empty_stream_s,
),
self.participant_shutdown_mgr(run_channels.s2b_shutdown_bparticipant_r,),
self.participant_shutdown_mgr(
run_channels.s2b_shutdown_bparticipant_r,
b2b_prios_flushed_r,
vec![
shutdown_send_mgr_sender,
shutdown_open_mgr_sender,
shutdown_stream_close_mgr_sender,
],
),
);
}
@ -190,7 +187,6 @@ impl BParticipant {
trace!("Start send_mgr");
let mut send_cache =
PidCidFrameCache::new(self.metrics.frames_out_total.clone(), self.remote_pid);
//while !self.closed.load(Ordering::Relaxed) {
loop {
let mut frames = VecDeque::new();
prios.fill_frames(FRAMES_PER_TICK, &mut frames).await;
@ -258,7 +254,8 @@ impl BParticipant {
will be closed, but not if we do channel-takeover"
);
//TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant
self.close_participant(2).await;
self.close_api(ParticipantError::ProtocolFailedUnrecoverable)
.await;
false
} else {
true
@ -393,7 +390,8 @@ impl BParticipant {
},
Frame::Shutdown => {
debug!("Shutdown received from remote side");
self.close_participant(2).await;
self.close_api(ParticipantError::ParticipantDisconnected)
.await;
},
f => unreachable!("Frame should never reache participant!: {:?}", f),
}
@ -510,14 +508,59 @@ impl BParticipant {
/// wait for everything to go right! Then return 1. Shutting down
/// Streams for API and End user! 2. Wait for all "prio queued" Messages
/// to be send. 3. Send Stream
/// If BParticipant kills itself managers stay active till this function is
/// called by api to get the result status
async fn participant_shutdown_mgr(
&self,
s2b_shutdown_bparticipant_r: oneshot::Receiver<oneshot::Sender<async_std::io::Result<()>>>,
b2b_prios_flushed_r: oneshot::Receiver<()>,
mut mgr_to_shutdown: Vec<oneshot::Sender<()>>,
) {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("Start participant_shutdown_mgr");
let sender = s2b_shutdown_bparticipant_r.await.unwrap();
self.close_participant(1).await;
//Todo: isn't ParticipantDisconnected useless, as api is waiting rn for a
// callback?
self.close_api(ParticipantError::ParticipantDisconnected)
.await;
debug!("Closing all managers");
for sender in mgr_to_shutdown.drain(..) {
if let Err(e) = sender.send(()) {
warn!(?e, "Manager seems to be closed already, weird, maybe a bug");
};
}
b2b_prios_flushed_r.await.unwrap();
debug!("Closing all channels, after flushed prios");
for ci in self.channels.write().await.drain(..) {
if let Err(e) = ci.b2r_read_shutdown.send(()) {
debug!(?e, ?ci.cid, "Seems like this read protocol got already dropped by closing the Stream itself, just ignoring the fact");
};
}
//Wait for other bparticipants mgr to close via AtomicUsize
const SLEEP_TIME: Duration = Duration::from_millis(5);
const ALLOWED_MANAGER: usize = 1;
async_std::task::sleep(SLEEP_TIME).await;
let mut i: u32 = 1;
while self.running_mgr.load(Ordering::Relaxed) > ALLOWED_MANAGER {
i += 1;
if i.rem_euclid(10) == 1 {
trace!(
?ALLOWED_MANAGER,
"Waiting for bparticipant mgr to shut down, remaining {}",
self.running_mgr.load(Ordering::Relaxed) - ALLOWED_MANAGER
);
}
async_std::task::sleep(SLEEP_TIME * i).await;
}
trace!("All BParticipant mgr (except me) are shut down now");
self.metrics.participants_disconnected_total.inc();
debug!("BParticipant close done");
sender.send(Ok(())).unwrap();
trace!("Stop participant_shutdown_mgr");
self.running_mgr.fetch_sub(1, Ordering::Relaxed);
@ -613,56 +656,13 @@ impl BParticipant {
)
}
/// this will gracefully shut down the bparticipant
/// allowed_managers: the number of open managers to sleep on. Must be 1 for
/// shutdown_mgr and 2 if it comes from a send error.
async fn close_participant(&self, allowed_managers: usize) {
trace!("Participant shutdown triggered");
let mut info = match self.shutdown_info.lock().await.take() {
Some(info) => info,
None => {
//This can happen if >=2 different async fn found out the protocol got dropped
// but they haven't shut down so far
debug!("Close of participant seemed to be called twice, ignoring the 2nd close");
return;
},
};
debug!("Closing all managers");
for sender in info.mgr_to_shutdown.drain(..) {
if let Err(e) = sender.send(()) {
warn!(?e, "Manager seems to be closed already, weird, maybe a bug");
};
}
/// close streams and set err
async fn close_api(&self, err: ParticipantError) {
*self.api_participant_closed.write().await = Err(err);
debug!("Closing all streams");
for (sid, si) in self.streams.write().await.drain() {
trace!(?sid, "Shutting down Stream");
si.closed.store(true, Ordering::Relaxed);
}
debug!("Waiting for prios to be flushed");
info.b2b_prios_flushed_r.await.unwrap();
debug!("Closing all channels");
for ci in self.channels.write().await.drain(..) {
if let Err(e) = ci.b2r_read_shutdown.send(()) {
debug!(?e, ?ci.cid, "Seems like this read protocol got already dropped by closing the Stream itself, just ignoring the fact");
};
}
//Wait for other bparticipants mgr to close via AtomicUsize
const SLEEP_TIME: Duration = Duration::from_millis(5);
async_std::task::sleep(SLEEP_TIME).await;
let mut i: u32 = 1;
while self.running_mgr.load(Ordering::Relaxed) > allowed_managers {
i += 1;
if i.rem_euclid(10) == 1 {
trace!(
?allowed_managers,
"Waiting for bparticipant mgr to shut down, remaining {}",
self.running_mgr.load(Ordering::Relaxed) - allowed_managers
);
}
async_std::task::sleep(SLEEP_TIME * i).await;
}
trace!("All BParticipant mgr (except me) are shut down now");
self.metrics.participants_disconnected_total.inc();
debug!("BParticipant close done");
}
}

View File

@ -516,6 +516,7 @@ impl Scheduler {
b2a_stream_opened_r,
mut s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
api_participant_closed,
) = BParticipant::new(pid, sid, metrics.clone());
let participant = Participant::new(
@ -524,6 +525,7 @@ impl Scheduler {
a2b_steam_open_s,
b2a_stream_opened_r,
participant_channels.a2s_disconnect_s,
api_participant_closed,
);
metrics.participants_connected_total.inc();

View File

@ -22,7 +22,10 @@ fn close_participant() {
let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp()));
block_on(p1_a.disconnect()).unwrap();
block_on(p1_b.disconnect()).unwrap();
//We dont know of if the disconect is done YET, so the next command will either
// return already closed or fail a gracefully close as it will discover that the
// remote site closed right now
assert!(block_on(p1_b.disconnect()).is_err());
assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed));
assert_eq!(