diff --git a/Cargo.lock b/Cargo.lock index 5600abfcae..96368ecc90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5654,6 +5654,7 @@ dependencies = [ "crossbeam-channel", "futures-core", "futures-util", + "hashbrown", "lazy_static", "lz-fear", "prometheus", diff --git a/network/Cargo.toml b/network/Cargo.toml index bcb509aea1..e6c7a715ef 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -42,6 +42,8 @@ lz-fear = { version = "0.1.1", optional = true } # async traits async-trait = "0.1.42" bytes = "^1" +# faster HashMaps +hashbrown = { version = ">=0.9, <0.12" } [dev-dependencies] tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] } diff --git a/network/src/api.rs b/network/src/api.rs index 0da58aa6d5..4dcb919e1f 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -4,6 +4,7 @@ use crate::{ scheduler::{A2sConnect, Scheduler}, }; use bytes::Bytes; +use hashbrown::HashMap; #[cfg(feature = "compression")] use lz_fear::raw::DecodeError; use network_protocol::{Bandwidth, InitProtocolError, Pid, Prio, Promises, Sid}; @@ -11,7 +12,6 @@ use network_protocol::{Bandwidth, InitProtocolError, Pid, Prio, Promises, Sid}; use prometheus::Registry; use serde::{de::DeserializeOwned, Serialize}; use std::{ - collections::HashMap, net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, diff --git a/network/src/channel.rs b/network/src/channel.rs index 03930c03ec..aa5805800b 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -4,6 +4,7 @@ use bytes::BytesMut; use futures_util::FutureExt; #[cfg(feature = "quic")] use futures_util::StreamExt; +use hashbrown::HashMap; use network_protocol::{ Bandwidth, Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid, ProtocolError, ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol, @@ -12,7 +13,6 @@ use network_protocol::{ #[cfg(feature = "quic")] use network_protocol::{QuicDataFormat, QuicDataFormatStream, QuicRecvProtocol, QuicSendProtocol}; use std::{ - collections::HashMap, io, net::SocketAddr, sync::{ @@ -327,7 +327,7 @@ impl Protocols { QuicDrain { con: connection.connection.clone(), main: sendstream, - reliables: std::collections::HashMap::new(), + reliables: HashMap::new(), recvstreams_s: streams_s_clone, sendstreams_r, }, @@ -507,7 +507,7 @@ type QuicStream = ( pub struct QuicDrain { con: quinn::Connection, main: quinn::SendStream, - reliables: std::collections::HashMap, + reliables: HashMap, recvstreams_s: mpsc::UnboundedSender, sendstreams_r: mpsc::UnboundedReceiver, } @@ -547,7 +547,7 @@ impl UnreliableDrain for QuicDrain { QuicDataFormatStream::Main => self.main.write_all(&data.data).await, QuicDataFormatStream::Unreliable => unimplemented!(), QuicDataFormatStream::Reliable(sid) => { - use std::collections::hash_map::Entry; + use hashbrown::hash_map::Entry; tracing::trace!(?sid, "Reliable"); match self.reliables.entry(sid) { Entry::Occupied(mut occupied) => occupied.get_mut().write_all(&data.data).await, diff --git a/network/src/participant.rs b/network/src/participant.rs index 2735fd5bdd..5ce2384410 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -6,12 +6,12 @@ use crate::{ }; use bytes::Bytes; use futures_util::{FutureExt, StreamExt}; +use hashbrown::HashMap; use network_protocol::{ Bandwidth, Cid, Pid, Prio, Promises, ProtocolEvent, RecvProtocol, SendProtocol, Sid, _internal::SortedVec, }; use std::{ - collections::HashMap, sync::{ atomic::{AtomicBool, AtomicI32, Ordering}, Arc, @@ -355,7 +355,8 @@ impl BParticipant { let diff = send_time.duration_since(last_instant); last_instant = send_time; let mut cnt = 0; - for (_, p) in sorted_send_protocols.data.iter_mut() { + for (c, p) in sorted_send_protocols.data.iter_mut() { + cid = *c; cnt += p.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it. } let flush_time = send_time.elapsed().as_secs_f32(); diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index a232be440b..95f3e4364a 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -5,12 +5,12 @@ use crate::{ participant::{B2sPrioStatistic, BParticipant, S2bCreateChannel, S2bShutdownBparticipant}, }; use futures_util::StreamExt; +use hashbrown::HashMap; use network_protocol::{Cid, Pid, ProtocolMetricCache, ProtocolMetrics}; #[cfg(feature = "metrics")] use prometheus::Registry; use rand::Rng; use std::{ - collections::HashMap, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc,