fix error handling in networking and switch to hashbrown, fixing #1118

This commit is contained in:
Marcel Märtens 2021-05-04 15:27:30 +02:00
parent 0c61ccfe89
commit df7b65289d
6 changed files with 12 additions and 8 deletions

1
Cargo.lock generated
View File

@ -5654,6 +5654,7 @@ dependencies = [
"crossbeam-channel", "crossbeam-channel",
"futures-core", "futures-core",
"futures-util", "futures-util",
"hashbrown",
"lazy_static", "lazy_static",
"lz-fear", "lz-fear",
"prometheus", "prometheus",

View File

@ -42,6 +42,8 @@ lz-fear = { version = "0.1.1", optional = true }
# async traits # async traits
async-trait = "0.1.42" async-trait = "0.1.42"
bytes = "^1" bytes = "^1"
# faster HashMaps
hashbrown = { version = ">=0.9, <0.12" }
[dev-dependencies] [dev-dependencies]
tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] } tracing-subscriber = { version = "0.2.3", default-features = false, features = ["env-filter", "fmt", "chrono", "ansi", "smallvec"] }

View File

@ -4,6 +4,7 @@ use crate::{
scheduler::{A2sConnect, Scheduler}, scheduler::{A2sConnect, Scheduler},
}; };
use bytes::Bytes; use bytes::Bytes;
use hashbrown::HashMap;
#[cfg(feature = "compression")] #[cfg(feature = "compression")]
use lz_fear::raw::DecodeError; use lz_fear::raw::DecodeError;
use network_protocol::{Bandwidth, InitProtocolError, Pid, Prio, Promises, Sid}; use network_protocol::{Bandwidth, InitProtocolError, Pid, Prio, Promises, Sid};
@ -11,7 +12,6 @@ use network_protocol::{Bandwidth, InitProtocolError, Pid, Prio, Promises, Sid};
use prometheus::Registry; use prometheus::Registry;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use std::{ use std::{
collections::HashMap,
net::SocketAddr, net::SocketAddr,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},

View File

@ -4,6 +4,7 @@ use bytes::BytesMut;
use futures_util::FutureExt; use futures_util::FutureExt;
#[cfg(feature = "quic")] #[cfg(feature = "quic")]
use futures_util::StreamExt; use futures_util::StreamExt;
use hashbrown::HashMap;
use network_protocol::{ use network_protocol::{
Bandwidth, Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid, Bandwidth, Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid,
ProtocolError, ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol, ProtocolError, ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol,
@ -12,7 +13,6 @@ use network_protocol::{
#[cfg(feature = "quic")] #[cfg(feature = "quic")]
use network_protocol::{QuicDataFormat, QuicDataFormatStream, QuicRecvProtocol, QuicSendProtocol}; use network_protocol::{QuicDataFormat, QuicDataFormatStream, QuicRecvProtocol, QuicSendProtocol};
use std::{ use std::{
collections::HashMap,
io, io,
net::SocketAddr, net::SocketAddr,
sync::{ sync::{
@ -327,7 +327,7 @@ impl Protocols {
QuicDrain { QuicDrain {
con: connection.connection.clone(), con: connection.connection.clone(),
main: sendstream, main: sendstream,
reliables: std::collections::HashMap::new(), reliables: HashMap::new(),
recvstreams_s: streams_s_clone, recvstreams_s: streams_s_clone,
sendstreams_r, sendstreams_r,
}, },
@ -507,7 +507,7 @@ type QuicStream = (
pub struct QuicDrain { pub struct QuicDrain {
con: quinn::Connection, con: quinn::Connection,
main: quinn::SendStream, main: quinn::SendStream,
reliables: std::collections::HashMap<Sid, quinn::SendStream>, reliables: HashMap<Sid, quinn::SendStream>,
recvstreams_s: mpsc::UnboundedSender<QuicStream>, recvstreams_s: mpsc::UnboundedSender<QuicStream>,
sendstreams_r: mpsc::UnboundedReceiver<quinn::SendStream>, sendstreams_r: mpsc::UnboundedReceiver<quinn::SendStream>,
} }
@ -547,7 +547,7 @@ impl UnreliableDrain for QuicDrain {
QuicDataFormatStream::Main => self.main.write_all(&data.data).await, QuicDataFormatStream::Main => self.main.write_all(&data.data).await,
QuicDataFormatStream::Unreliable => unimplemented!(), QuicDataFormatStream::Unreliable => unimplemented!(),
QuicDataFormatStream::Reliable(sid) => { QuicDataFormatStream::Reliable(sid) => {
use std::collections::hash_map::Entry; use hashbrown::hash_map::Entry;
tracing::trace!(?sid, "Reliable"); tracing::trace!(?sid, "Reliable");
match self.reliables.entry(sid) { match self.reliables.entry(sid) {
Entry::Occupied(mut occupied) => occupied.get_mut().write_all(&data.data).await, Entry::Occupied(mut occupied) => occupied.get_mut().write_all(&data.data).await,

View File

@ -6,12 +6,12 @@ use crate::{
}; };
use bytes::Bytes; use bytes::Bytes;
use futures_util::{FutureExt, StreamExt}; use futures_util::{FutureExt, StreamExt};
use hashbrown::HashMap;
use network_protocol::{ use network_protocol::{
Bandwidth, Cid, Pid, Prio, Promises, ProtocolEvent, RecvProtocol, SendProtocol, Sid, Bandwidth, Cid, Pid, Prio, Promises, ProtocolEvent, RecvProtocol, SendProtocol, Sid,
_internal::SortedVec, _internal::SortedVec,
}; };
use std::{ use std::{
collections::HashMap,
sync::{ sync::{
atomic::{AtomicBool, AtomicI32, Ordering}, atomic::{AtomicBool, AtomicI32, Ordering},
Arc, Arc,
@ -355,7 +355,8 @@ impl BParticipant {
let diff = send_time.duration_since(last_instant); let diff = send_time.duration_since(last_instant);
last_instant = send_time; last_instant = send_time;
let mut cnt = 0; let mut cnt = 0;
for (_, p) in sorted_send_protocols.data.iter_mut() { for (c, p) in sorted_send_protocols.data.iter_mut() {
cid = *c;
cnt += p.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it. cnt += p.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it.
} }
let flush_time = send_time.elapsed().as_secs_f32(); let flush_time = send_time.elapsed().as_secs_f32();

View File

@ -5,12 +5,12 @@ use crate::{
participant::{B2sPrioStatistic, BParticipant, S2bCreateChannel, S2bShutdownBparticipant}, participant::{B2sPrioStatistic, BParticipant, S2bCreateChannel, S2bShutdownBparticipant},
}; };
use futures_util::StreamExt; use futures_util::StreamExt;
use hashbrown::HashMap;
use network_protocol::{Cid, Pid, ProtocolMetricCache, ProtocolMetrics}; use network_protocol::{Cid, Pid, ProtocolMetricCache, ProtocolMetrics};
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
use prometheus::Registry; use prometheus::Registry;
use rand::Rng; use rand::Rng;
use std::{ use std::{
collections::HashMap,
sync::{ sync::{
atomic::{AtomicBool, AtomicU64, Ordering}, atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Arc,