From 9084ac48f12499bad62b07ed498f835451cd8a71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Mon, 22 Mar 2021 01:32:47 +0100 Subject: [PATCH] defer some trace, so that we wont spam the log. --- network/src/lib.rs | 1 + network/src/participant.rs | 11 ++++++++- network/src/trace.rs | 46 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 network/src/trace.rs diff --git a/network/src/lib.rs b/network/src/lib.rs index e4f29f0687..b79266e6d0 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -104,6 +104,7 @@ mod message; mod metrics; mod participant; mod scheduler; +mod trace; pub use api::{ Network, NetworkConnectError, NetworkError, Participant, ParticipantError, ProtocolAddr, diff --git a/network/src/participant.rs b/network/src/participant.rs index 170c80c2ce..c0276a4ad2 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -2,6 +2,7 @@ use crate::{ api::{ParticipantError, Stream}, channel::{Protocols, RecvProtocols, SendProtocols}, metrics::NetworkMetrics, + trace::DeferredTracer, }; use bytes::Bytes; use futures_util::{FutureExt, StreamExt}; @@ -356,6 +357,8 @@ impl BParticipant { recv_protocols.is_empty() }; + let mut defered_orphan = DeferredTracer::new(tracing::Level::WARN); + loop { let (event, addp, remp) = select!( Some(n) = hacky_recv_r.recv().fuse() => (Some(n), None, None), @@ -408,7 +411,7 @@ impl BParticipant { Some(stream) => { let _ = stream.b2a_msg_recv_s.lock().await.send(data).await; }, - None => warn!("recv a msg with orphan stream"), + None => defered_orphan.log(sid), }; retrigger(cid, p, &mut recv_protocols); }, @@ -432,6 +435,12 @@ impl BParticipant { }, } } + + if let Some(table) = defered_orphan.print() { + for (sid, cnt) in table.iter() { + warn!(?sid, ?cnt, "recv messages with orphan stream"); + } + } } trace!("receiving no longer possible, closing all streams"); for (_, si) in self.streams.write().await.drain() { diff --git a/network/src/trace.rs b/network/src/trace.rs new file mode 100644 index 0000000000..640d65ee55 --- /dev/null +++ b/network/src/trace.rs @@ -0,0 +1,46 @@ +use core::hash::Hash; +use std::{collections::HashMap, time::Instant}; +use tracing::Level; + +/// used to collect multiple traces and not spam the console +pub(crate) struct DeferredTracer { + level: Level, + items: HashMap, + last: Instant, + last_cnt: u32, +} + +impl DeferredTracer { + pub(crate) fn new(level: Level) -> Self { + Self { + level, + items: HashMap::new(), + last: Instant::now(), + last_cnt: 0, + } + } + + pub(crate) fn log(&mut self, t: T) { + if tracing::level_enabled!(self.level) { + *self.items.entry(t).or_default() += 1; + self.last = Instant::now(); + self.last_cnt += 1; + } else { + } + } + + pub(crate) fn print(&mut self) -> Option> { + const MAX_LOGS: u32 = 10_000; + const MAX_SECS: u64 = 1; + if tracing::level_enabled!(self.level) + && (self.last_cnt > MAX_LOGS || self.last.elapsed().as_secs() >= MAX_SECS) + { + if self.last_cnt > MAX_LOGS { + tracing::debug!("this seems to be logged continuesly"); + } + Some(std::mem::take(&mut self.items)) + } else { + None + } + } +}