extend network with better Error codes for Network

This commit is contained in:
Marcel Märtens 2020-07-04 12:17:33 +02:00
parent cbfd398035
commit e7195b57ad
3 changed files with 43 additions and 15 deletions

View File

@ -82,6 +82,8 @@ pub struct Stream {
pub enum NetworkError {
NetworkClosed,
ListenFailed(std::io::Error),
ConnectFailed(std::io::Error),
GracefulDisconnectFailed(std::io::Error),
}
/// Error type thrown by [`Participants`](Participant) methods
@ -313,7 +315,10 @@ impl Network {
.await
.send((address, pid_sender))
.await?;
let participant = pid_receiver.await??;
let participant = match pid_receiver.await? {
Ok(p) => p,
Err(e) => return Err(NetworkError::ConnectFailed(e)),
};
let pid = participant.remote_pid;
debug!(
?pid,
@ -433,6 +438,7 @@ impl Network {
this is a bad idea. Participant will only be dropped when you drop your last \
reference"
);
Ok(())
},
Ok(mut participant) => {
trace!("waiting now for participant to close");
@ -447,13 +453,30 @@ impl Network {
.send((pid, finished_sender))
.await
.expect("something is wrong in internal scheduler coding");
let res = finished_receiver.await.unwrap();
trace!("participant is now closed");
res?;
match finished_receiver.await {
Ok(Ok(())) => {
trace!(?pid, "Participant is now closed");
Ok(())
},
Ok(Err(e)) => {
trace!(
?e,
"Error occured during shutdown of participant and is propagated to \
User"
);
Err(NetworkError::GracefulDisconnectFailed(e))
},
Err(e) => {
error!(
?pid,
?e,
"Failed to get a message back from the scheduler, closing the network"
);
Err(NetworkError::NetworkClosed)
},
}
},
};
Ok(())
}
}
/// returns a copy of all current connected [`Participants`],
@ -942,10 +965,6 @@ impl<T> From<crossbeam_channel::SendError<T>> for NetworkError {
fn from(_err: crossbeam_channel::SendError<T>) -> Self { NetworkError::NetworkClosed }
}
impl From<async_std::io::Error> for NetworkError {
fn from(err: async_std::io::Error) -> Self { NetworkError::ListenFailed(err) }
}
impl From<std::option::NoneError> for StreamError {
fn from(_err: std::option::NoneError) -> Self { StreamError::StreamClosed }
}
@ -1002,6 +1021,8 @@ impl core::fmt::Display for NetworkError {
match self {
NetworkError::NetworkClosed => write!(f, "network closed"),
NetworkError::ListenFailed(_) => write!(f, "listening failed"),
NetworkError::ConnectFailed(_) => write!(f, "connecting failed"),
NetworkError::GracefulDisconnectFailed(_) => write!(f, "graceful disconnect failed"),
}
}
}

View File

@ -71,7 +71,10 @@ impl TcpProtocol {
"closing tcp protocol due to read error, sending close frame to gracefully \
shutdown"
);
w2c_cid_frame_s.send((cid, Frame::Shutdown)).await.unwrap();
w2c_cid_frame_s
.send((cid, Frame::Shutdown))
.await
.expect("Channel or Participant seems no longer to exist to be Shutdown");
}
}
@ -201,7 +204,10 @@ impl TcpProtocol {
},
};
metrics_cache.with_label_values(&frame).inc();
w2c_cid_frame_s.send((cid, frame)).await.unwrap();
w2c_cid_frame_s
.send((cid, frame))
.await
.expect("Channel or Participant seems no longer to exist");
}
trace!("shutting down tcp read()");
}

View File

@ -100,8 +100,9 @@ impl ClientInit {
},
Err(err) => {
match err {
ClientError::NetworkErr(NetworkError::ListenFailed(..)) => {
},
ClientError::NetworkErr(NetworkError::ConnectFailed(
..,
)) => {},
// Non-connection error, stop attempts
err => {
last_err = Some(Error::ClientError(err));