work on getting quic in the network

This commit is contained in:
Marcel Märtens 2021-04-11 23:37:48 +02:00
parent 383482a36e
commit 9f0aceba4c
5 changed files with 90 additions and 11 deletions

View File

@ -11,7 +11,7 @@ metrics = ["prometheus", "network-protocol/metrics"]
compression = ["lz-fear"]
quic = ["quinn"]
default = ["metrics","compression","quinn"]
default = ["metrics","compression","quic"]
[dependencies]

View File

@ -28,8 +28,8 @@ pub enum QuicDataFormatStream {
}
pub struct QuicDataFormat {
stream: QuicDataFormatStream,
data: BytesMut,
pub stream: QuicDataFormatStream,
pub data: BytesMut,
}
impl QuicDataFormat {

View File

@ -33,6 +33,8 @@ type A2sDisconnect = Arc<Mutex<Option<mpsc::UnboundedSender<(Pid, S2bShutdownBpa
pub enum ProtocolAddr {
Tcp(SocketAddr),
Udp(SocketAddr),
#[cfg(feature = "quic")]
Quic(SocketAddr, quinn::ServerConfig),
Mpsc(u64),
}

View File

@ -1,6 +1,7 @@
use async_trait::async_trait;
use bytes::BytesMut;
use network_protocol::{
QuicDataFormat, QuicDataFormatStream, QuicSendProtocol, QuicRecvProtocol,
Bandwidth, Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid,
ProtocolError, ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol,
TcpSendProtocol, UnreliableDrain, UnreliableSink,
@ -70,6 +71,31 @@ impl Protocols {
Protocols::Mpsc((sp, rp))
}
#[cfg(feature = "quic")]
pub(crate) async fn new_quic(
connection: quinn::NewConnection,
cid: Cid,
metrics: Arc<ProtocolMetrics>,
) -> Result<Self, quinn::ConnectionError> {
let metrics = ProtocolMetricCache::new(&cid.to_string(), metrics);
let (sendstream, recvstream) = connection.connection.open_bi().await?;
let sp = QuicSendProtocol::new(QuicDrain {
con: connection.connection.clone(),
main: sendstream,
reliables: vec!(),
}, metrics.clone());
let rp = QuicRecvProtocol::new(QuicSink {
con: connection.connection,
main: recvstream,
reliables: vec!(),
buffer: BytesMut::new(),
}, metrics);
Ok(Protocols::Quic((sp, rp)))
}
pub(crate) fn split(self) -> (SendProtocols, RecvProtocols) {
match self {
Protocols::Tcp((s, r)) => (SendProtocols::Tcp(s), RecvProtocols::Tcp(r)),
@ -219,21 +245,29 @@ impl UnreliableSink for MpscSink {
//// QUIC
#[derive(Debug)]
pub struct QuicDrain {
half: OwnedWriteHalf,
con: quinn::Connection,
main: quinn::SendStream,
reliables: Vec<quinn::SendStream>,
}
#[derive(Debug)]
pub struct QuicSink {
half: OwnedReadHalf,
con: quinn::Connection,
main: quinn::RecvStream,
reliables: Vec<quinn::RecvStream>,
buffer: BytesMut,
}
#[async_trait]
impl UnreliableDrain for QuicDrain {
type DataFormat = BytesMut;
type DataFormat = QuicDataFormat;
async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> {
match self.half.write_all(&data).await {
match match data.stream {
QuicDataFormatStream::Main => self.main.write_all(&data.data),
QuicDataFormatStream::Unreliable => unimplemented!(),
QuicDataFormatStream::Reliable(id) => self.reliables.get_mut(id as usize).ok_or(ProtocolError::Closed)?.write_all(&data.data),
}.await {
Ok(()) => Ok(()),
Err(_) => Err(ProtocolError::Closed),
}
@ -242,13 +276,15 @@ impl UnreliableDrain for QuicDrain {
#[async_trait]
impl UnreliableSink for QuicSink {
type DataFormat = BytesMut;
type DataFormat = QuicDataFormat;
async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError> {
self.buffer.resize(1500, 0u8);
match self.half.read(&mut self.buffer).await {
Ok(0) => Err(ProtocolError::Closed),
Ok(n) => Ok(self.buffer.split_to(n)),
//TODO improve
match self.main.read(&mut self.buffer).await {
Ok(Some(0)) => Err(ProtocolError::Closed),
Ok(Some(n)) => Ok(QuicDataFormat{stream: QuicDataFormatStream::Main, data: self.buffer.split_to(n)}),
Ok(None) => Err(ProtocolError::Closed),
Err(_) => Err(ProtocolError::Closed),
}
}

View File

@ -431,6 +431,47 @@ impl Scheduler {
.await;
}
},
#[cfg(feature = "quic")]
ProtocolAddr::Quic(addr, server_config) => {
let mut endpoint = quinn::Endpoint::builder();
endpoint.listen(server_config);
let (endpoint, mut listener) = match endpoint.bind(&addr) {
Ok((endpoint, listener)) => {
s2a_listen_result_s.send(Ok(())).unwrap();
(endpoint, listener)
},
Err(quinn::EndpointError::Socket(e)) => {
info!(
?addr,
?e,
"Quic bind error during listener startup"
);
s2a_listen_result_s.send(Err(e)).unwrap();
return;
}
};
trace!(?addr, "Listener bound");
let mut end_receiver = s2s_stop_listening_r.fuse();
while let Some(Some(connecting)) = select! {
next = listener.next().fuse() => Some(next),
_ = &mut end_receiver => None,
} {
let remote_addr = connecting.remote_address();
let connection = match connecting.await {
Ok(c) => c,
Err(e) => {
debug!(?e, ?remote_addr, "skipping connection attempt");
continue;
},
};
#[cfg(feature = "metrics")]
mcache.inc();
let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed);
info!(?remote_addr, ?cid, "Accepting Quic from");
self.init_protocol(Protocols::new_quic(connection, cid, Arc::clone(&self.protocol_metrics)), cid, None, true)
.await;
}
},
ProtocolAddr::Mpsc(addr) => {
let (mpsc_s, mut mpsc_r) = mpsc::unbounded_channel();
MPSC_POOL.lock().await.insert(addr, mpsc_s);