apparently span doesnt work for async, so i replaced it by an instrument version

This commit is contained in:
Marcel Märtens 2020-10-14 17:54:01 +02:00
parent 7995a0e124
commit ccd93ee876

View File

@ -762,53 +762,56 @@ impl BParticipant {
from_remote: bool, from_remote: bool,
#[cfg(feature = "metrics")] frames_out_total_cache: &mut MultiCidFrameCache, #[cfg(feature = "metrics")] frames_out_total_cache: &mut MultiCidFrameCache,
) { ) {
let span = span!(Level::INFO, "delete_stream", ?sid, ?from_remote);
let _enter = span.enter();
//This needs to first stop clients from sending any more. //This needs to first stop clients from sending any more.
//Then it will wait for all pending messages (in prio) to be send to the //Then it will wait for all pending messages (in prio) to be send to the
// protocol After this happened the stream is closed // protocol After this happened the stream is closed
//Only after all messages are send to the protocol, we can send the CloseStream //Only after all messages are send to the protocol, we can send the CloseStream
// frame! If we would send it before, all followup messages couldn't // frame! If we would send it before, all followup messages couldn't
// be handled at the remote side. // be handled at the remote side.
trace!("Stopping api to use this stream"); async {
match self.streams.read().await.get(&sid) { trace!("Stopping api to use this stream");
Some(si) => { match self.streams.read().await.get(&sid) {
si.send_closed.store(true, Ordering::Relaxed); Some(si) => {
si.b2a_msg_recv_s.lock().await.close_channel(); si.send_closed.store(true, Ordering::Relaxed);
}, si.b2a_msg_recv_s.lock().await.close_channel();
None => { },
trace!("Couldn't find the stream, might be simultaneous close from local/remote") None => trace!(
}, "Couldn't find the stream, might be simultaneous close from local/remote"
} ),
}
if !from_remote {
trace!("Wait for stream to be flushed"); if !from_remote {
let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) = oneshot::channel(); trace!("Wait for stream to be flushed");
b2p_notify_empty_stream_s let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) =
.expect("needs to be set when from_remote is false") oneshot::channel();
.send((sid, s2b_stream_finished_closed_s)) b2p_notify_empty_stream_s
.unwrap(); .expect("needs to be set when from_remote is false")
s2b_stream_finished_closed_r.await.unwrap(); .send((sid, s2b_stream_finished_closed_s))
.unwrap();
trace!("Stream was successfully flushed"); s2b_stream_finished_closed_r.await.unwrap();
}
trace!("Stream was successfully flushed");
#[cfg(feature = "metrics")] }
self.metrics
.streams_closed_total #[cfg(feature = "metrics")]
.with_label_values(&[&self.remote_pid_string]) self.metrics
.inc(); .streams_closed_total
//only now remove the Stream, that means we can still recv on it. .with_label_values(&[&self.remote_pid_string])
self.streams.write().await.remove(&sid); .inc();
//only now remove the Stream, that means we can still recv on it.
if !from_remote { self.streams.write().await.remove(&sid);
self.send_frame(
Frame::CloseStream { sid }, if !from_remote {
#[cfg(feature = "metrics")] self.send_frame(
frames_out_total_cache, Frame::CloseStream { sid },
) #[cfg(feature = "metrics")]
.await; frames_out_total_cache,
)
.await;
}
} }
.instrument(tracing::info_span!("close", ?sid, ?from_remote))
.await;
} }
async fn create_stream( async fn create_stream(