From d40261e38e1a0961384678aee25f3fdfbd1ded0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Sun, 11 Apr 2021 23:37:48 +0200 Subject: [PATCH] work on getting quic in the network --- network/Cargo.toml | 2 +- network/protocol/src/quic.rs | 4 +-- network/src/api.rs | 2 ++ network/src/channel.rs | 52 ++++++++++++++++++++++++++++++------ network/src/scheduler.rs | 41 ++++++++++++++++++++++++++++ 5 files changed, 90 insertions(+), 11 deletions(-) diff --git a/network/Cargo.toml b/network/Cargo.toml index a51119a16a..7f854f68a9 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -11,7 +11,7 @@ metrics = ["prometheus", "network-protocol/metrics"] compression = ["lz-fear"] quic = ["quinn"] -default = ["metrics","compression","quinn"] +default = ["metrics","compression","quic"] [dependencies] diff --git a/network/protocol/src/quic.rs b/network/protocol/src/quic.rs index d2be37c010..b4af04a193 100644 --- a/network/protocol/src/quic.rs +++ b/network/protocol/src/quic.rs @@ -28,8 +28,8 @@ pub enum QuicDataFormatStream { } pub struct QuicDataFormat { - stream: QuicDataFormatStream, - data: BytesMut, + pub stream: QuicDataFormatStream, + pub data: BytesMut, } impl QuicDataFormat { diff --git a/network/src/api.rs b/network/src/api.rs index e04094d6ce..d38318aa57 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -33,6 +33,8 @@ type A2sDisconnect = Arc, + ) -> Result { + let metrics = ProtocolMetricCache::new(&cid.to_string(), metrics); + + let (sendstream, recvstream) = connection.connection.open_bi().await?; + + + let sp = QuicSendProtocol::new(QuicDrain { + con: connection.connection.clone(), + main: sendstream, + reliables: vec!(), + }, metrics.clone()); + let rp = QuicRecvProtocol::new(QuicSink { + con: connection.connection, + main: recvstream, + reliables: vec!(), + buffer: BytesMut::new(), + }, metrics); + Ok(Protocols::Quic((sp, rp))) + } + pub(crate) fn split(self) -> (SendProtocols, RecvProtocols) { match self { Protocols::Tcp((s, r)) => (SendProtocols::Tcp(s), RecvProtocols::Tcp(r)), @@ -219,21 +245,29 @@ impl UnreliableSink for MpscSink { //// QUIC #[derive(Debug)] pub struct QuicDrain { - half: OwnedWriteHalf, + con: quinn::Connection, + main: quinn::SendStream, + reliables: Vec, } #[derive(Debug)] pub struct QuicSink { - half: OwnedReadHalf, + con: quinn::Connection, + main: quinn::RecvStream, + reliables: Vec, buffer: BytesMut, } #[async_trait] impl UnreliableDrain for QuicDrain { - type DataFormat = BytesMut; + type DataFormat = QuicDataFormat; async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> { - match self.half.write_all(&data).await { + match match data.stream { + QuicDataFormatStream::Main => self.main.write_all(&data.data), + QuicDataFormatStream::Unreliable => unimplemented!(), + QuicDataFormatStream::Reliable(id) => self.reliables.get_mut(id as usize).ok_or(ProtocolError::Closed)?.write_all(&data.data), + }.await { Ok(()) => Ok(()), Err(_) => Err(ProtocolError::Closed), } @@ -242,13 +276,15 @@ impl UnreliableDrain for QuicDrain { #[async_trait] impl UnreliableSink for QuicSink { - type DataFormat = BytesMut; + type DataFormat = QuicDataFormat; async fn recv(&mut self) -> Result { self.buffer.resize(1500, 0u8); - match self.half.read(&mut self.buffer).await { - Ok(0) => Err(ProtocolError::Closed), - Ok(n) => Ok(self.buffer.split_to(n)), + //TODO improve + match self.main.read(&mut self.buffer).await { + Ok(Some(0)) => Err(ProtocolError::Closed), + Ok(Some(n)) => Ok(QuicDataFormat{stream: QuicDataFormatStream::Main, data: self.buffer.split_to(n)}), + Ok(None) => Err(ProtocolError::Closed), Err(_) => Err(ProtocolError::Closed), } } diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 527ea6f5fe..11a2a0f774 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -431,6 +431,47 @@ impl Scheduler { .await; } }, + #[cfg(feature = "quic")] + ProtocolAddr::Quic(addr, server_config) => { + let mut endpoint = quinn::Endpoint::builder(); + endpoint.listen(server_config); + let (endpoint, mut listener) = match endpoint.bind(&addr) { + Ok((endpoint, listener)) => { + s2a_listen_result_s.send(Ok(())).unwrap(); + (endpoint, listener) + }, + Err(quinn::EndpointError::Socket(e)) => { + info!( + ?addr, + ?e, + "Quic bind error during listener startup" + ); + s2a_listen_result_s.send(Err(e)).unwrap(); + return; + } + }; + trace!(?addr, "Listener bound"); + let mut end_receiver = s2s_stop_listening_r.fuse(); + while let Some(Some(connecting)) = select! { + next = listener.next().fuse() => Some(next), + _ = &mut end_receiver => None, + } { + let remote_addr = connecting.remote_address(); + let connection = match connecting.await { + Ok(c) => c, + Err(e) => { + debug!(?e, ?remote_addr, "skipping connection attempt"); + continue; + }, + }; + #[cfg(feature = "metrics")] + mcache.inc(); + let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed); + info!(?remote_addr, ?cid, "Accepting Quic from"); + self.init_protocol(Protocols::new_quic(connection, cid, Arc::clone(&self.protocol_metrics)), cid, None, true) + .await; + } + }, ProtocolAddr::Mpsc(addr) => { let (mpsc_s, mut mpsc_r) = mpsc::unbounded_channel(); MPSC_POOL.lock().await.insert(addr, mpsc_s);