apparently span doesnt work for async, so i replaced it by an instrument version

This commit is contained in:
Marcel Märtens 2020-10-14 17:54:01 +02:00
parent 15a96f5812
commit ba1299e670

View File

@ -762,53 +762,56 @@ impl BParticipant {
from_remote: bool,
#[cfg(feature = "metrics")] frames_out_total_cache: &mut MultiCidFrameCache,
) {
let span = span!(Level::INFO, "delete_stream", ?sid, ?from_remote);
let _enter = span.enter();
//This needs to first stop clients from sending any more.
//Then it will wait for all pending messages (in prio) to be send to the
// protocol After this happened the stream is closed
//Only after all messages are send to the protocol, we can send the CloseStream
// frame! If we would send it before, all followup messages couldn't
// be handled at the remote side.
trace!("Stopping api to use this stream");
match self.streams.read().await.get(&sid) {
Some(si) => {
si.send_closed.store(true, Ordering::Relaxed);
si.b2a_msg_recv_s.lock().await.close_channel();
},
None => {
trace!("Couldn't find the stream, might be simultaneous close from local/remote")
},
}
if !from_remote {
trace!("Wait for stream to be flushed");
let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) = oneshot::channel();
b2p_notify_empty_stream_s
.expect("needs to be set when from_remote is false")
.send((sid, s2b_stream_finished_closed_s))
.unwrap();
s2b_stream_finished_closed_r.await.unwrap();
trace!("Stream was successfully flushed");
}
#[cfg(feature = "metrics")]
self.metrics
.streams_closed_total
.with_label_values(&[&self.remote_pid_string])
.inc();
//only now remove the Stream, that means we can still recv on it.
self.streams.write().await.remove(&sid);
if !from_remote {
self.send_frame(
Frame::CloseStream { sid },
#[cfg(feature = "metrics")]
frames_out_total_cache,
)
.await;
async {
trace!("Stopping api to use this stream");
match self.streams.read().await.get(&sid) {
Some(si) => {
si.send_closed.store(true, Ordering::Relaxed);
si.b2a_msg_recv_s.lock().await.close_channel();
},
None => trace!(
"Couldn't find the stream, might be simultaneous close from local/remote"
),
}
if !from_remote {
trace!("Wait for stream to be flushed");
let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) =
oneshot::channel();
b2p_notify_empty_stream_s
.expect("needs to be set when from_remote is false")
.send((sid, s2b_stream_finished_closed_s))
.unwrap();
s2b_stream_finished_closed_r.await.unwrap();
trace!("Stream was successfully flushed");
}
#[cfg(feature = "metrics")]
self.metrics
.streams_closed_total
.with_label_values(&[&self.remote_pid_string])
.inc();
//only now remove the Stream, that means we can still recv on it.
self.streams.write().await.remove(&sid);
if !from_remote {
self.send_frame(
Frame::CloseStream { sid },
#[cfg(feature = "metrics")]
frames_out_total_cache,
)
.await;
}
}
.instrument(tracing::info_span!("close", ?sid, ?from_remote))
.await;
}
async fn create_stream(