Merge branch 'xMAC94x/net-improve' into 'master'

defer some trace, so that we wont spam the log.

See merge request veloren/veloren!1966
This commit is contained in:
Marcel 2021-03-22 10:31:56 +00:00
commit c51a2b10a0
3 changed files with 57 additions and 1 deletions

View File

@ -104,6 +104,7 @@ mod message;
mod metrics;
mod participant;
mod scheduler;
mod trace;
pub use api::{
Network, NetworkConnectError, NetworkError, Participant, ParticipantError, ProtocolAddr,

View File

@ -2,6 +2,7 @@ use crate::{
api::{ParticipantError, Stream},
channel::{Protocols, RecvProtocols, SendProtocols},
metrics::NetworkMetrics,
trace::DeferredTracer,
};
use bytes::Bytes;
use futures_util::{FutureExt, StreamExt};
@ -356,6 +357,8 @@ impl BParticipant {
recv_protocols.is_empty()
};
let mut defered_orphan = DeferredTracer::new(tracing::Level::WARN);
loop {
let (event, addp, remp) = select!(
Some(n) = hacky_recv_r.recv().fuse() => (Some(n), None, None),
@ -408,7 +411,7 @@ impl BParticipant {
Some(stream) => {
let _ = stream.b2a_msg_recv_s.lock().await.send(data).await;
},
None => warn!("recv a msg with orphan stream"),
None => defered_orphan.log(sid),
};
retrigger(cid, p, &mut recv_protocols);
},
@ -432,6 +435,12 @@ impl BParticipant {
},
}
}
if let Some(table) = defered_orphan.print() {
for (sid, cnt) in table.iter() {
warn!(?sid, ?cnt, "recv messages with orphan stream");
}
}
}
trace!("receiving no longer possible, closing all streams");
for (_, si) in self.streams.write().await.drain() {

46
network/src/trace.rs Normal file
View File

@ -0,0 +1,46 @@
use core::hash::Hash;
use std::{collections::HashMap, time::Instant};
use tracing::Level;
/// used to collect multiple traces and not spam the console
pub(crate) struct DeferredTracer<T: Eq + Hash> {
level: Level,
items: HashMap<T, u64>,
last: Instant,
last_cnt: u32,
}
impl<T: Eq + Hash> DeferredTracer<T> {
pub(crate) fn new(level: Level) -> Self {
Self {
level,
items: HashMap::new(),
last: Instant::now(),
last_cnt: 0,
}
}
pub(crate) fn log(&mut self, t: T) {
if tracing::level_enabled!(self.level) {
*self.items.entry(t).or_default() += 1;
self.last = Instant::now();
self.last_cnt += 1;
} else {
}
}
pub(crate) fn print(&mut self) -> Option<HashMap<T, u64>> {
const MAX_LOGS: u32 = 10_000;
const MAX_SECS: u64 = 1;
if tracing::level_enabled!(self.level)
&& (self.last_cnt > MAX_LOGS || self.last.elapsed().as_secs() >= MAX_SECS)
{
if self.last_cnt > MAX_LOGS {
tracing::debug!("this seems to be logged continuesly");
}
Some(std::mem::take(&mut self.items))
} else {
None
}
}
}