diff --git a/network/src/participant.rs b/network/src/participant.rs index 1595659e9b..199e85521c 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -762,53 +762,56 @@ impl BParticipant { from_remote: bool, #[cfg(feature = "metrics")] frames_out_total_cache: &mut MultiCidFrameCache, ) { - let span = span!(Level::INFO, "delete_stream", ?sid, ?from_remote); - let _enter = span.enter(); //This needs to first stop clients from sending any more. //Then it will wait for all pending messages (in prio) to be send to the // protocol After this happened the stream is closed //Only after all messages are send to the protocol, we can send the CloseStream // frame! If we would send it before, all followup messages couldn't // be handled at the remote side. - trace!("Stopping api to use this stream"); - match self.streams.read().await.get(&sid) { - Some(si) => { - si.send_closed.store(true, Ordering::Relaxed); - si.b2a_msg_recv_s.lock().await.close_channel(); - }, - None => { - trace!("Couldn't find the stream, might be simultaneous close from local/remote") - }, - } - - if !from_remote { - trace!("Wait for stream to be flushed"); - let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) = oneshot::channel(); - b2p_notify_empty_stream_s - .expect("needs to be set when from_remote is false") - .send((sid, s2b_stream_finished_closed_s)) - .unwrap(); - s2b_stream_finished_closed_r.await.unwrap(); - - trace!("Stream was successfully flushed"); - } - - #[cfg(feature = "metrics")] - self.metrics - .streams_closed_total - .with_label_values(&[&self.remote_pid_string]) - .inc(); - //only now remove the Stream, that means we can still recv on it. - self.streams.write().await.remove(&sid); - - if !from_remote { - self.send_frame( - Frame::CloseStream { sid }, - #[cfg(feature = "metrics")] - frames_out_total_cache, - ) - .await; + async { + trace!("Stopping api to use this stream"); + match self.streams.read().await.get(&sid) { + Some(si) => { + si.send_closed.store(true, Ordering::Relaxed); + si.b2a_msg_recv_s.lock().await.close_channel(); + }, + None => trace!( + "Couldn't find the stream, might be simultaneous close from local/remote" + ), + } + + if !from_remote { + trace!("Wait for stream to be flushed"); + let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) = + oneshot::channel(); + b2p_notify_empty_stream_s + .expect("needs to be set when from_remote is false") + .send((sid, s2b_stream_finished_closed_s)) + .unwrap(); + s2b_stream_finished_closed_r.await.unwrap(); + + trace!("Stream was successfully flushed"); + } + + #[cfg(feature = "metrics")] + self.metrics + .streams_closed_total + .with_label_values(&[&self.remote_pid_string]) + .inc(); + //only now remove the Stream, that means we can still recv on it. + self.streams.write().await.remove(&sid); + + if !from_remote { + self.send_frame( + Frame::CloseStream { sid }, + #[cfg(feature = "metrics")] + frames_out_total_cache, + ) + .await; + } } + .instrument(tracing::info_span!("close", ?sid, ?from_remote)) + .await; } async fn create_stream(