diff --git a/Cargo.lock b/Cargo.lock index 5403842ba5..dc612eb409 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3631,6 +3631,17 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "pem" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd56cbd21fea48d0c440b41cd69c589faacade08c992d9a54e471b79d0fd13eb" +dependencies = [ + "base64", + "once_cell", + "regex", +] + [[package]] name = "percent-encoding" version = "2.1.0" @@ -4061,6 +4072,18 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "rcgen" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e80a701a04edd9cab874a3d59323bebe24c9a92dd602088c78da83732066d1b" +dependencies = [ + "chrono", + "pem", + "ring", + "yasna", +] + [[package]] name = "redox_syscall" version = "0.1.57" @@ -5628,6 +5651,7 @@ dependencies = [ "prometheus-hyper", "quinn", "rand 0.8.3", + "rcgen", "serde", "shellexpand", "tokio", @@ -6638,3 +6662,12 @@ name = "xml-rs" version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" + +[[package]] +name = "yasna" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de7bff972b4f2a06c85f6d8454b09df153af7e3a4ec2aac81db1b105b684ddb" +dependencies = [ + "chrono", +] diff --git a/client/src/lib.rs b/client/src/lib.rs index 51c577aaeb..35511879a4 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -61,7 +61,7 @@ use comp::BuffKind; use futures_util::FutureExt; use hashbrown::{HashMap, HashSet}; use image::DynamicImage; -use network::{Network, Participant, Pid, ProtocolAddr, Stream}; +use network::{ConnectAddr, Network, Participant, Pid, Stream}; use num::traits::FloatConst; use rayon::prelude::*; use specs::Component; @@ -217,7 +217,7 @@ impl Client { // Try to connect to all IP's and return the first that works let mut participant = None; for addr in addrs { - match network.connect(ProtocolAddr::Tcp(addr)).await { + match network.connect(ConnectAddr::Tcp(addr)).await { Ok(p) => { participant = Some(Ok(p)); break; @@ -228,7 +228,7 @@ impl Client { participant .unwrap_or_else(|| Err(Error::Other("No Ip Addr provided".to_string())))? }, - ConnectionArgs::Mpsc(id) => network.connect(ProtocolAddr::Mpsc(id)).await?, + ConnectionArgs::Mpsc(id) => network.connect(ConnectAddr::Mpsc(id)).await?, }; let stream = participant.opened().await?; diff --git a/network/Cargo.toml b/network/Cargo.toml index 7f854f68a9..bcb509aea1 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -52,6 +52,8 @@ shellexpand = "2.0.0" serde = { version = "1.0", features = ["derive"] } prometheus-hyper = "0.1.2" criterion = { version = "0.3.4", features = ["default", "async_tokio"] } +#quic +rcgen = { version = "0.8.10"} [[bench]] name = "speed" diff --git a/network/benches/speed.rs b/network/benches/speed.rs index b110d308ae..d7f0f2b63c 100644 --- a/network/benches/speed.rs +++ b/network/benches/speed.rs @@ -1,7 +1,9 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; use std::{net::SocketAddr, sync::Arc}; use tokio::{runtime::Runtime, sync::Mutex}; -use veloren_network::{Message, Network, Participant, Pid, Promises, ProtocolAddr, Stream}; +use veloren_network::{ + ConnectAddr, ListenAddr, Message, Network, Participant, Pid, Promises, Stream, +}; fn serialize(data: &[u8], stream: &Stream) { let _ = Message::serialize(data, stream.params()); } @@ -30,7 +32,7 @@ fn criterion_util(c: &mut Criterion) { c.significance_level(0.1).sample_size(100); let (r, _n_a, p_a, s1_a, _n_b, _p_b, _s1_b) = - network_participant_stream(ProtocolAddr::Mpsc(5000)); + network_participant_stream((ListenAddr::Mpsc(5000), ConnectAddr::Mpsc(5000))); let s2_a = r.block_on(p_a.open(4, Promises::COMPRESSED, 0)).unwrap(); c.throughput(Throughput::Bytes(1000)) @@ -50,7 +52,7 @@ fn criterion_mpsc(c: &mut Criterion) { c.significance_level(0.1).sample_size(10); let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, s1_b) = - network_participant_stream(ProtocolAddr::Mpsc(5000)); + network_participant_stream((ListenAddr::Mpsc(5000), ConnectAddr::Mpsc(5000))); let s1_a = Arc::new(Mutex::new(s1_a)); let s1_b = Arc::new(Mutex::new(s1_b)); @@ -82,8 +84,9 @@ fn criterion_tcp(c: &mut Criterion) { let mut c = c.benchmark_group("net_tcp"); c.significance_level(0.1).sample_size(10); + let socket_addr = SocketAddr::from(([127, 0, 0, 1], 5000)); let (_r, _n_a, _p_a, s1_a, _n_b, _p_b, s1_b) = - network_participant_stream(ProtocolAddr::Tcp(SocketAddr::from(([127, 0, 0, 1], 5000)))); + network_participant_stream((ListenAddr::Tcp(socket_addr), ConnectAddr::Tcp(socket_addr))); let s1_a = Arc::new(Mutex::new(s1_a)); let s1_b = Arc::new(Mutex::new(s1_b)); @@ -115,7 +118,7 @@ criterion_group!(benches, criterion_util, criterion_mpsc, criterion_tcp); criterion_main!(benches); pub fn network_participant_stream( - addr: ProtocolAddr, + addr: (ListenAddr, ConnectAddr), ) -> ( Runtime, Network, @@ -130,8 +133,8 @@ pub fn network_participant_stream( let n_a = Network::new(Pid::fake(0), &runtime); let n_b = Network::new(Pid::fake(1), &runtime); - n_a.listen(addr.clone()).await.unwrap(); - let p1_b = n_b.connect(addr).await.unwrap(); + n_a.listen(addr.0).await.unwrap(); + let p1_b = n_b.connect(addr.1).await.unwrap(); let p1_a = n_a.connected().await.unwrap(); let s1_a = p1_a.open(4, Promises::empty(), 0).await.unwrap(); diff --git a/network/examples/chat.rs b/network/examples/chat.rs index 8746479f73..2dc1e56e78 100644 --- a/network/examples/chat.rs +++ b/network/examples/chat.rs @@ -8,7 +8,7 @@ use std::{sync::Arc, thread, time::Duration}; use tokio::{io, io::AsyncBufReadExt, runtime::Runtime, sync::RwLock}; use tracing::*; use tracing_subscriber::EnvFilter; -use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr}; +use veloren_network::{ConnectAddr, ListenAddr, Network, Participant, Pid, Promises}; ///This example contains a simple chatserver, that allows to send messages /// between participants, it's neither pretty nor perfect, but it should show @@ -75,21 +75,27 @@ fn main() { let port: u16 = matches.value_of("port").unwrap().parse().unwrap(); let ip: &str = matches.value_of("ip").unwrap(); - let address = match matches.value_of("protocol") { - Some("tcp") => ProtocolAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()), - Some("udp") => ProtocolAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()), + let addresses = match matches.value_of("protocol") { + Some("tcp") => ( + ListenAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()), + ConnectAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()), + ), + Some("udp") => ( + ListenAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()), + ConnectAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()), + ), _ => panic!("invalid mode, run --help!"), }; let mut background = None; match matches.value_of("mode") { - Some("server") => server(address), - Some("client") => client(address), + Some("server") => server(addresses.0), + Some("client") => client(addresses.1), Some("both") => { - let address1 = address.clone(); - background = Some(thread::spawn(|| server(address1))); + let s = addresses.0; + background = Some(thread::spawn(|| server(s))); thread::sleep(Duration::from_millis(200)); //start client after server - client(address) + client(addresses.1) }, _ => panic!("invalid mode, run --help!"), }; @@ -98,7 +104,7 @@ fn main() { } } -fn server(address: ProtocolAddr) { +fn server(address: ListenAddr) { let r = Arc::new(Runtime::new().unwrap()); let server = Network::new(Pid::new(), &r); let server = Arc::new(server); @@ -144,7 +150,7 @@ async fn client_connection( println!("[{}] disconnected", username); } -fn client(address: ProtocolAddr) { +fn client(address: ConnectAddr) { let r = Arc::new(Runtime::new().unwrap()); let client = Network::new(Pid::new(), &r); diff --git a/network/examples/fileshare/commands.rs b/network/examples/fileshare/commands.rs index a18c90b38e..9f23ddb6aa 100644 --- a/network/examples/fileshare/commands.rs +++ b/network/examples/fileshare/commands.rs @@ -2,7 +2,7 @@ use rand::Rng; use serde::{Deserialize, Serialize}; use std::path::{Path, PathBuf}; use tokio::fs; -use veloren_network::{Participant, ProtocolAddr, Stream}; +use veloren_network::{ConnectAddr, Participant, Stream}; use std::collections::HashMap; @@ -10,7 +10,7 @@ use std::collections::HashMap; pub enum LocalCommand { Shutdown, Disconnect, - Connect(ProtocolAddr), + Connect(ConnectAddr), List, Serve(FileInfo), Get(u32, Option), diff --git a/network/examples/fileshare/main.rs b/network/examples/fileshare/main.rs index f000f371e0..158b825073 100644 --- a/network/examples/fileshare/main.rs +++ b/network/examples/fileshare/main.rs @@ -9,7 +9,7 @@ use std::{path::PathBuf, sync::Arc, thread, time::Duration}; use tokio::{io, io::AsyncBufReadExt, runtime::Runtime, sync::mpsc}; use tracing::*; use tracing_subscriber::EnvFilter; -use veloren_network::ProtocolAddr; +use veloren_network::{ConnectAddr, ListenAddr}; mod commands; mod server; use commands::{FileInfo, LocalCommand}; @@ -50,7 +50,7 @@ fn main() { .init(); let port: u16 = matches.value_of("port").unwrap().parse().unwrap(); - let address = ProtocolAddr::Tcp(format!("{}:{}", "127.0.0.1", port).parse().unwrap()); + let address = ListenAddr::Tcp(format!("{}:{}", "127.0.0.1", port).parse().unwrap()); let runtime = Arc::new(Runtime::new().unwrap()); let (server, cmd_sender) = Server::new(Arc::clone(&runtime)); @@ -158,12 +158,12 @@ async fn client(cmd_sender: mpsc::UnboundedSender) { .parse() .unwrap(); cmd_sender - .send(LocalCommand::Connect(ProtocolAddr::Tcp(socketaddr))) + .send(LocalCommand::Connect(ConnectAddr::Tcp(socketaddr))) .unwrap(); }, ("t", _) => { cmd_sender - .send(LocalCommand::Connect(ProtocolAddr::Tcp( + .send(LocalCommand::Connect(ConnectAddr::Tcp( "127.0.0.1:1231".parse().unwrap(), ))) .unwrap(); diff --git a/network/examples/fileshare/server.rs b/network/examples/fileshare/server.rs index 252ebdf32c..7a40d5be11 100644 --- a/network/examples/fileshare/server.rs +++ b/network/examples/fileshare/server.rs @@ -8,7 +8,7 @@ use tokio::{ }; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; -use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr, Stream}; +use veloren_network::{ListenAddr, Network, Participant, Pid, Promises, Stream}; #[derive(Debug)] struct ControlChannels { @@ -42,7 +42,7 @@ impl Server { ) } - pub async fn run(mut self, address: ProtocolAddr) { + pub async fn run(mut self, address: ListenAddr) { let run_channels = self.run_channels.take().unwrap(); self.network.listen(address).await.unwrap(); diff --git a/network/examples/network-speed/main.rs b/network/examples/network-speed/main.rs index e058aac7a8..e8ccc8f278 100644 --- a/network/examples/network-speed/main.rs +++ b/network/examples/network-speed/main.rs @@ -16,7 +16,7 @@ use std::{ use tokio::runtime::Runtime; use tracing::*; use tracing_subscriber::EnvFilter; -use veloren_network::{Message, Network, Pid, Promises, ProtocolAddr}; +use veloren_network::{ConnectAddr, ListenAddr, Message, Network, Pid, Promises}; #[derive(Serialize, Deserialize, Debug)] enum Msg { @@ -96,23 +96,29 @@ fn main() { let port: u16 = matches.value_of("port").unwrap().parse().unwrap(); let ip: &str = matches.value_of("ip").unwrap(); - let address = match matches.value_of("protocol") { - Some("tcp") => ProtocolAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()), - Some("udp") => ProtocolAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()), - _ => panic!("Invalid mode, run --help!"), + let addresses = match matches.value_of("protocol") { + Some("tcp") => ( + ListenAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()), + ConnectAddr::Tcp(format!("{}:{}", ip, port).parse().unwrap()), + ), + Some("udp") => ( + ListenAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()), + ConnectAddr::Udp(format!("{}:{}", ip, port).parse().unwrap()), + ), + _ => panic!("invalid mode, run --help!"), }; let mut background = None; let runtime = Arc::new(Runtime::new().unwrap()); match matches.value_of("mode") { - Some("server") => server(address, Arc::clone(&runtime)), - Some("client") => client(address, Arc::clone(&runtime)), + Some("server") => server(addresses.0, Arc::clone(&runtime)), + Some("client") => client(addresses.1, Arc::clone(&runtime)), Some("both") => { - let address1 = address.clone(); + let s = addresses.0; let runtime2 = Arc::clone(&runtime); - background = Some(thread::spawn(|| server(address1, runtime2))); + background = Some(thread::spawn(|| server(s, runtime2))); thread::sleep(Duration::from_millis(200)); //start client after server - client(address, Arc::clone(&runtime)); + client(addresses.1, Arc::clone(&runtime)); }, _ => panic!("Invalid mode, run --help!"), }; @@ -121,7 +127,7 @@ fn main() { } } -fn server(address: ProtocolAddr, runtime: Arc) { +fn server(address: ListenAddr, runtime: Arc) { let registry = Arc::new(Registry::new()); let server = Network::new_with_registry(Pid::new(), &runtime, ®istry); runtime.spawn(Server::run( @@ -153,7 +159,7 @@ fn server(address: ProtocolAddr, runtime: Arc) { } } -fn client(address: ProtocolAddr, runtime: Arc) { +fn client(address: ConnectAddr, runtime: Arc) { let registry = Arc::new(Registry::new()); let client = Network::new_with_registry(Pid::new(), &runtime, ®istry); runtime.spawn(Server::run( diff --git a/network/protocol/src/quic.rs b/network/protocol/src/quic.rs index b4af04a193..e656fdf5a1 100644 --- a/network/protocol/src/quic.rs +++ b/network/protocol/src/quic.rs @@ -285,9 +285,11 @@ where } } for (id, (_, buffer)) in self.reliable_buffers.data.iter_mut().enumerate() { - self.drain - .send(QuicDataFormat::with_reliable(buffer, id as u64)) - .await?; + if !buffer.is_empty() { + self.drain + .send(QuicDataFormat::with_reliable(buffer, id as u64)) + .await?; + } } self.metrics .sdata_frames_b(data_frames, data_bandwidth as u64); @@ -340,43 +342,41 @@ where { async fn recv(&mut self) -> Result { 'outer: loop { - loop { - match ITFrame::read_frame(&mut self.main_buffer) { - Ok(Some(frame)) => { - #[cfg(feature = "trace_pedantic")] - trace!(?frame, "recv"); - match frame { - ITFrame::Shutdown => break 'outer Ok(ProtocolEvent::Shutdown), - ITFrame::OpenStream { + match ITFrame::read_frame(&mut self.main_buffer) { + Ok(Some(frame)) => { + #[cfg(feature = "trace_pedantic")] + trace!(?frame, "recv"); + match frame { + ITFrame::Shutdown => break 'outer Ok(ProtocolEvent::Shutdown), + ITFrame::OpenStream { + sid, + prio, + promises, + guaranteed_bandwidth, + } => { + if promises.contains(Promises::ORDERED) + || promises.contains(Promises::CONSISTENCY) + || promises.contains(Promises::GUARANTEED_DELIVERY) + { + self.reliable_buffers.insert(sid, BytesMut::new()); + } + break 'outer Ok(ProtocolEvent::OpenStream { sid, - prio, + prio: prio.min(crate::types::HIGHEST_PRIO), promises, guaranteed_bandwidth, - } => { - if promises.contains(Promises::ORDERED) - || promises.contains(Promises::CONSISTENCY) - || promises.contains(Promises::GUARANTEED_DELIVERY) - { - self.reliable_buffers.insert(sid, BytesMut::new()); - } - break 'outer Ok(ProtocolEvent::OpenStream { - sid, - prio: prio.min(crate::types::HIGHEST_PRIO), - promises, - guaranteed_bandwidth, - }); - }, - ITFrame::CloseStream { sid } => { - //FIXME: defer close! - //let _ = self.reliable_buffers.delete(sid); // if it was reliable - break 'outer Ok(ProtocolEvent::CloseStream { sid }); - }, - _ => break 'outer Err(ProtocolError::Violated), - }; - }, - Ok(None) => break, //inner => read more data - Err(()) => return Err(ProtocolError::Violated), - } + }); + }, + ITFrame::CloseStream { sid } => { + //FIXME: defer close! + //let _ = self.reliable_buffers.delete(sid); // if it was reliable + break 'outer Ok(ProtocolEvent::CloseStream { sid }); + }, + _ => break 'outer Err(ProtocolError::Violated), + }; + }, + Ok(None) => {}, + Err(()) => return Err(ProtocolError::Violated), } // try to order pending @@ -401,6 +401,7 @@ where Ok(None) => false, } }); + if pending_violated { break 'outer Err(ProtocolError::Violated); } @@ -435,10 +436,10 @@ where None => { if reliable { info!( - ?mid, - "protocol violation by remote side: send Data before \ - Header" - ); + ?mid, + "protocol violation by remote side: send Data \ + before Header" + ); break 'outer Err(ProtocolError::Violated); } else { //TODO: cleanup old messages from time to time diff --git a/network/src/api.rs b/network/src/api.rs index d38318aa57..ad95dd3419 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -28,9 +28,19 @@ use tracing::*; type A2sDisconnect = Arc>>>; -/// Represents a Tcp or Udp or Mpsc address -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub enum ProtocolAddr { +/// Represents a Tcp, Quic, Udp or Mpsc connection address +#[derive(Clone, Debug)] +pub enum ConnectAddr { + Tcp(SocketAddr), + Udp(SocketAddr), + #[cfg(feature = "quic")] + Quic(SocketAddr, quinn::ClientConfig, String), + Mpsc(u64), +} + +/// Represents a Tcp, Quic, Udp or Mpsc listen address +#[derive(Clone, Debug)] +pub enum ListenAddr { Tcp(SocketAddr), Udp(SocketAddr), #[cfg(feature = "quic")] @@ -135,8 +145,8 @@ pub struct StreamParams { /// [`Arc`](std::sync::Arc) as all commands have internal mutability. /// /// The `Network` has methods to [`connect`] to other [`Participants`] actively -/// via their [`ProtocolAddr`], or [`listen`] passively for [`connected`] -/// [`Participants`]. +/// via their [`ProtocolConnectAddr`], or [`listen`] passively for [`connected`] +/// [`Participants`] via [`ProtocolListenAddr`]. /// /// Too guarantee a clean shutdown, the [`Runtime`] MUST NOT be droped before /// the Network. @@ -144,7 +154,7 @@ pub struct StreamParams { /// # Examples /// ```rust /// use tokio::runtime::Runtime; -/// use veloren_network::{Network, ProtocolAddr, Pid}; +/// use veloren_network::{Network, ConnectAddr, ListenAddr, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application @@ -153,9 +163,9 @@ pub struct StreamParams { /// runtime.block_on(async{ /// # //setup pseudo database! /// # let database = Network::new(Pid::new(), &runtime); -/// # database.listen(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?; -/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?; -/// let database = network.connect(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?; +/// # database.listen(ListenAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?; +/// network.listen(ListenAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?; +/// let database = network.connect(ConnectAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?; /// drop(network); /// # drop(database); /// # Ok(()) @@ -171,7 +181,7 @@ pub struct StreamParams { pub struct Network { local_pid: Pid, participant_disconnect_sender: Arc>>, - listen_sender: Mutex>)>>, + listen_sender: Mutex>)>>, connect_sender: Mutex>, connected_receiver: Mutex>, shutdown_network_s: Option>>, @@ -197,7 +207,7 @@ impl Network { /// # Examples /// ```rust /// use tokio::runtime::Runtime; - /// use veloren_network::{Network, Pid, ProtocolAddr}; + /// use veloren_network::{Network, Pid}; /// /// let runtime = Runtime::new().unwrap(); /// let network = Network::new(Pid::new(), &runtime); @@ -230,7 +240,7 @@ impl Network { /// ```rust /// use prometheus::Registry; /// use tokio::runtime::Runtime; - /// use veloren_network::{Network, Pid, ProtocolAddr}; + /// use veloren_network::{Network, Pid}; /// /// let runtime = Runtime::new().unwrap(); /// let registry = Registry::new(); @@ -283,7 +293,7 @@ impl Network { } } - /// starts listening on an [`ProtocolAddr`]. + /// starts listening on an [`ProtocolListenAddr`]. /// When the method returns the `Network` is ready to listen for incoming /// connections OR has returned a [`NetworkError`] (e.g. port already used). /// You can call [`connected`] to asynchrony wait for a [`Participant`] to @@ -293,7 +303,7 @@ impl Network { /// # Examples /// ```ignore /// use tokio::runtime::Runtime; - /// use veloren_network::{Network, Pid, ProtocolAddr}; + /// use veloren_network::{Network, Pid, ProtocolListenAddr}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally @@ -301,10 +311,10 @@ impl Network { /// let network = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network - /// .listen(ProtocolAddr::Tcp("127.0.0.1:2000".parse().unwrap())) + /// .listen(ProtocolListenAddr::Tcp("127.0.0.1:2000".parse().unwrap())) /// .await?; /// network - /// .listen(ProtocolAddr::Udp("127.0.0.1:2001".parse().unwrap())) + /// .listen(ProtocolListenAddr::Udp("127.0.0.1:2001".parse().unwrap())) /// .await?; /// drop(network); /// # Ok(()) @@ -314,7 +324,7 @@ impl Network { /// /// [`connected`]: Network::connected #[instrument(name="network", skip(self, address), fields(p = %self.local_pid))] - pub async fn listen(&self, address: ProtocolAddr) -> Result<(), NetworkError> { + pub async fn listen(&self, address: ListenAddr) -> Result<(), NetworkError> { let (s2a_result_s, s2a_result_r) = oneshot::channel::>(); debug!(?address, "listening on address"); self.listen_sender @@ -329,13 +339,13 @@ impl Network { } } - /// starts connection to an [`ProtocolAddr`]. + /// starts connection to an [`ProtocolConnectAddr`]. /// When the method returns the Network either returns a [`Participant`] /// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g. /// can't connect, or invalid Handshake) # Examples /// ```ignore /// use tokio::runtime::Runtime; - /// use veloren_network::{Network, Pid, ProtocolAddr}; + /// use veloren_network::{Network, Pid, ProtocolListenAddr, ProtocolConnectAddr}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above @@ -343,16 +353,16 @@ impl Network { /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { - /// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2010".parse().unwrap())).await?; - /// # remote.listen(ProtocolAddr::Udp("127.0.0.1:2011".parse().unwrap())).await?; + /// # remote.listen(ProtocolListenAddr::Tcp("127.0.0.1:2010".parse().unwrap())).await?; + /// # remote.listen(ProtocolListenAddr::Udp("127.0.0.1:2011".parse().unwrap())).await?; /// let p1 = network - /// .connect(ProtocolAddr::Tcp("127.0.0.1:2010".parse().unwrap())) + /// .connect(ProtocolConnectAddr::Tcp("127.0.0.1:2010".parse().unwrap())) /// .await?; /// # //this doesn't work yet, so skip the test /// # //TODO fixme! /// # return Ok(()); /// let p2 = network - /// .connect(ProtocolAddr::Udp("127.0.0.1:2011".parse().unwrap())) + /// .connect(ProtocolConnectAddr::Udp("127.0.0.1:2011".parse().unwrap())) /// .await?; /// assert_eq!(&p1, &p2); /// # Ok(()) @@ -364,15 +374,15 @@ impl Network { /// ``` /// Usually the `Network` guarantees that a operation on a [`Participant`] /// succeeds, e.g. by automatic retrying unless it fails completely e.g. by - /// disconnecting from the remote. If 2 [`ProtocolAddres`] you `connect` to - /// belongs to the same [`Participant`], you get the same [`Participant`] as - /// a result. This is useful e.g. by connecting to the same - /// [`Participant`] via multiple Protocols. + /// disconnecting from the remote. If 2 [`ProtocolConnectAddres`] you + /// `connect` to belongs to the same [`Participant`], you get the same + /// [`Participant`] as a result. This is useful e.g. by connecting to + /// the same [`Participant`] via multiple Protocols. /// /// [`Streams`]: crate::api::Stream - /// [`ProtocolAddres`]: crate::api::ProtocolAddr + /// [`ProtocolConnectAddres`]: crate::api::ProtocolConnectAddr #[instrument(name="network", skip(self, address), fields(p = %self.local_pid))] - pub async fn connect(&self, address: ProtocolAddr) -> Result { + pub async fn connect(&self, address: ConnectAddr) -> Result { let (pid_sender, pid_receiver) = oneshot::channel::>(); debug!(?address, "Connect to address"); @@ -393,15 +403,15 @@ impl Network { Ok(participant) } - /// returns a [`Participant`] created from a [`ProtocolAddr`] you called - /// [`listen`] on before. This function will either return a working - /// [`Participant`] ready to open [`Streams`] on OR has returned a - /// [`NetworkError`] (e.g. Network got closed) + /// returns a [`Participant`] created from a [`ProtocolListenAddr`] you + /// called [`listen`] on before. This function will either return a + /// working [`Participant`] ready to open [`Streams`] on OR has returned + /// a [`NetworkError`] (e.g. Network got closed) /// /// # Examples /// ```rust /// use tokio::runtime::Runtime; - /// use veloren_network::{Network, Pid, ProtocolAddr}; + /// use veloren_network::{ConnectAddr, ListenAddr, Network, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2020` TCP and opens returns their Pid @@ -410,9 +420,9 @@ impl Network { /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network - /// .listen(ProtocolAddr::Tcp("127.0.0.1:2020".parse().unwrap())) + /// .listen(ListenAddr::Tcp("127.0.0.1:2020".parse().unwrap())) /// .await?; - /// # remote.connect(ProtocolAddr::Tcp("127.0.0.1:2020".parse().unwrap())).await?; + /// # remote.connect(ConnectAddr::Tcp("127.0.0.1:2020".parse().unwrap())).await?; /// while let Ok(participant) = network.connected().await { /// println!("Participant connected: {}", participant.remote_pid()); /// # //skip test here as it would be a endless loop @@ -530,7 +540,7 @@ impl Participant { /// # Examples /// ```rust /// use tokio::runtime::Runtime; - /// use veloren_network::{Network, Pid, Promises, ProtocolAddr}; + /// use veloren_network::{ConnectAddr, ListenAddr, Network, Pid, Promises}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port 2100 and open a stream @@ -538,9 +548,9 @@ impl Participant { /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { - /// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap())).await?; + /// # remote.listen(ListenAddr::Tcp("127.0.0.1:2100".parse().unwrap())).await?; /// let p1 = network - /// .connect(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap())) + /// .connect(ConnectAddr::Tcp("127.0.0.1:2100".parse().unwrap())) /// .await?; /// let _s1 = p1 /// .open(4, Promises::ORDERED | Promises::CONSISTENCY, 1000) @@ -597,7 +607,7 @@ impl Participant { /// # Examples /// ```rust /// use tokio::runtime::Runtime; - /// use veloren_network::{Network, Pid, ProtocolAddr, Promises}; + /// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr, Promises}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port 2110 and wait for the other side to open a stream @@ -606,8 +616,8 @@ impl Participant { /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { - /// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?; - /// let p1 = network.connect(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?; + /// # remote.listen(ListenAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?; + /// let p1 = network.connect(ConnectAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?; /// # let p2 = remote.connected().await?; /// # p2.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// let _s1 = p1.opened().await?; @@ -654,7 +664,7 @@ impl Participant { /// # Examples /// ```rust /// use tokio::runtime::Runtime; - /// use veloren_network::{Network, Pid, ProtocolAddr}; + /// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection. @@ -663,9 +673,9 @@ impl Participant { /// # let remote = Network::new(Pid::new(), &runtime); /// let err = runtime.block_on(async { /// network - /// .listen(ProtocolAddr::Tcp("127.0.0.1:2030".parse().unwrap())) + /// .listen(ListenAddr::Tcp("127.0.0.1:2030".parse().unwrap())) /// .await?; - /// # let keep_alive = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2030".parse().unwrap())).await?; + /// # let keep_alive = remote.connect(ConnectAddr::Tcp("127.0.0.1:2030".parse().unwrap())).await?; /// while let Ok(participant) = network.connected().await { /// println!("Participant connected: {}", participant.remote_pid()); /// participant.disconnect().await?; @@ -790,7 +800,7 @@ impl Stream { /// ``` /// # use veloren_network::Promises; /// use tokio::runtime::Runtime; - /// use veloren_network::{Network, ProtocolAddr, Pid}; + /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World` @@ -798,8 +808,8 @@ impl Stream { /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { - /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?; - /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?; + /// network.listen(ListenAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?; + /// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?; /// # // keep it alive /// # let _stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// let participant_a = network.connected().await?; @@ -832,7 +842,7 @@ impl Stream { /// # use veloren_network::Promises; /// use tokio::runtime::Runtime; /// use bincode; - /// use veloren_network::{Network, ProtocolAddr, Pid, Message}; + /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid, Message}; /// /// # fn main() -> std::result::Result<(), Box> { /// let runtime = Runtime::new().unwrap(); @@ -840,9 +850,9 @@ impl Stream { /// # let remote1 = Network::new(Pid::new(), &runtime); /// # let remote2 = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { - /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; - /// # let remote1_p = remote1.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; - /// # let remote2_p = remote2.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; + /// network.listen(ListenAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; + /// # let remote1_p = remote1.connect(ConnectAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; + /// # let remote2_p = remote2.connect(ConnectAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; /// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid()); /// # remote1_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// # remote2_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; @@ -891,7 +901,7 @@ impl Stream { /// ``` /// # use veloren_network::Promises; /// use tokio::runtime::Runtime; - /// use veloren_network::{Network, ProtocolAddr, Pid}; + /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it @@ -899,8 +909,8 @@ impl Stream { /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { - /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?; - /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?; + /// network.listen(ListenAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?; + /// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?; /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// # stream_p.send("Hello World"); /// let participant_a = network.connected().await?; @@ -925,7 +935,7 @@ impl Stream { /// ``` /// # use veloren_network::Promises; /// use tokio::runtime::Runtime; - /// use veloren_network::{Network, ProtocolAddr, Pid}; + /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2230` and wait for a Stream to be opened, then listen on it @@ -933,8 +943,8 @@ impl Stream { /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { - /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?; - /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?; + /// network.listen(ListenAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?; + /// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?; /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// # stream_p.send("Hello World"); /// let participant_a = network.connected().await?; @@ -981,7 +991,7 @@ impl Stream { /// ``` /// # use veloren_network::Promises; /// use tokio::runtime::Runtime; - /// use veloren_network::{Network, ProtocolAddr, Pid}; + /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2240` and wait for a Stream to be opened, then listen on it @@ -989,8 +999,8 @@ impl Stream { /// let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { - /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?; - /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?; + /// network.listen(ListenAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?; + /// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?; /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// # stream_p.send("Hello World"); /// # std::thread::sleep(std::time::Duration::from_secs(1)); diff --git a/network/src/channel.rs b/network/src/channel.rs index 85bf824134..9866d88da9 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -1,19 +1,20 @@ use async_trait::async_trait; use bytes::BytesMut; use network_protocol::{ - QuicDataFormat, QuicDataFormatStream, QuicSendProtocol, QuicRecvProtocol, Bandwidth, Cid, InitProtocolError, MpscMsg, MpscRecvProtocol, MpscSendProtocol, Pid, - ProtocolError, ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, Sid, TcpRecvProtocol, + ProtocolError, ProtocolEvent, ProtocolMetricCache, ProtocolMetrics, QuicDataFormat, + QuicDataFormatStream, QuicRecvProtocol, QuicSendProtocol, Sid, TcpRecvProtocol, TcpSendProtocol, UnreliableDrain, UnreliableSink, }; -#[cfg(feature = "quic")] use quinn::*; use std::{sync::Arc, time::Duration}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::tcp::{OwnedReadHalf, OwnedWriteHalf}, sync::mpsc, }; +use tokio_stream::StreamExt; +#[allow(clippy::large_enum_variant)] #[derive(Debug)] pub(crate) enum Protocols { Tcp((TcpSendProtocol, TcpRecvProtocol)), @@ -35,7 +36,7 @@ pub(crate) enum RecvProtocols { Tcp(TcpRecvProtocol), Mpsc(MpscRecvProtocol), #[cfg(feature = "quic")] - Quic(QuicSendProtocol), + Quic(QuicRecvProtocol), } impl Protocols { @@ -73,26 +74,39 @@ impl Protocols { #[cfg(feature = "quic")] pub(crate) async fn new_quic( - connection: quinn::NewConnection, + mut connection: quinn::NewConnection, + listen: bool, cid: Cid, metrics: Arc, ) -> Result { let metrics = ProtocolMetricCache::new(&cid.to_string(), metrics); - let (sendstream, recvstream) = connection.connection.open_bi().await?; - - - let sp = QuicSendProtocol::new(QuicDrain { - con: connection.connection.clone(), - main: sendstream, - reliables: vec!(), - }, metrics.clone()); - let rp = QuicRecvProtocol::new(QuicSink { - con: connection.connection, - main: recvstream, - reliables: vec!(), - buffer: BytesMut::new(), - }, metrics); + let (sendstream, recvstream) = if listen { + connection.connection.open_bi().await? + } else { + connection.bi_streams.next().await.expect("none").expect("dasdasd") + }; + let (streams_s,streams_r) = mpsc::unbounded_channel(); + let streams_s_clone = streams_s.clone(); + let sp = QuicSendProtocol::new( + QuicDrain { + con: connection.connection.clone(), + main: sendstream, + reliables: std::collections::HashMap::new(), + streams_s: streams_s_clone, + }, + metrics.clone(), + ); + spawn_new(recvstream, None, &streams_s); + let rp = QuicRecvProtocol::new( + QuicSink { + con: connection.connection, + bi: connection.bi_streams, + streams_r, + streams_s, + }, + metrics, + ); Ok(Protocols::Quic((sp, rp))) } @@ -243,50 +257,128 @@ impl UnreliableSink for MpscSink { /////////////////////////////////////// //// QUIC +#[cfg(feature = "quic")] +type QuicStream = (BytesMut, Result, quinn::ReadError>, quinn::RecvStream, Option); + +#[cfg(feature = "quic")] #[derive(Debug)] pub struct QuicDrain { con: quinn::Connection, main: quinn::SendStream, - reliables: Vec, + reliables: std::collections::HashMap, + streams_s: mpsc::UnboundedSender, } +#[cfg(feature = "quic")] #[derive(Debug)] pub struct QuicSink { con: quinn::Connection, - main: quinn::RecvStream, - reliables: Vec, - buffer: BytesMut, + bi: quinn::IncomingBiStreams, + streams_r: mpsc::UnboundedReceiver, + streams_s: mpsc::UnboundedSender, } +#[cfg(feature = "quic")] +fn spawn_new(mut recvstream: quinn::RecvStream, id: Option, streams_s: &mpsc::UnboundedSender) { + let streams_s_clone = streams_s.clone(); + tokio::spawn(async move { + let mut buffer = BytesMut::new(); + buffer.resize(1500, 0u8); + let r = recvstream.read(&mut buffer).await; + let _ = streams_s_clone.send((buffer, r, recvstream, id)); + }); +} + +#[cfg(feature = "quic")] #[async_trait] impl UnreliableDrain for QuicDrain { type DataFormat = QuicDataFormat; async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> { match match data.stream { - QuicDataFormatStream::Main => self.main.write_all(&data.data), + QuicDataFormatStream::Main => { + self.main.write_all(&data.data).await + }, QuicDataFormatStream::Unreliable => unimplemented!(), - QuicDataFormatStream::Reliable(id) => self.reliables.get_mut(id as usize).ok_or(ProtocolError::Closed)?.write_all(&data.data), - }.await { + QuicDataFormatStream::Reliable(id) => { + use std::collections::hash_map::Entry; + match self.reliables.entry(id) { + Entry::Occupied(mut occupied) => { + occupied.get_mut().write_all(&data.data).await + }, + Entry::Vacant(vacant) => { + match self.con.open_bi().await { + Ok((sendstream, recvstream)) => { + let id = Some(0); //TODO FIXME + spawn_new(recvstream, id, &self.streams_s); + vacant.insert(sendstream).write_all(&data.data).await + }, + Err(_) => return Err(ProtocolError::Closed), + } + }, + } + }, + } + { Ok(()) => Ok(()), Err(_) => Err(ProtocolError::Closed), } } } +#[cfg(feature = "quic")] #[async_trait] impl UnreliableSink for QuicSink { type DataFormat = QuicDataFormat; async fn recv(&mut self) -> Result { - self.buffer.resize(1500, 0u8); - //TODO improve - match self.main.read(&mut self.buffer).await { + let (mut buffer, result, mut recvstream, id) = loop { + use futures_util::FutureExt; + // first handle all bi streams! + let (a, b) = tokio::select! { + biased; + Some(n) = self.bi.next().fuse() => (Some(n), None), + Some(n) = self.streams_r.recv().fuse() => (None, Some(n)), + }; + + if let Some(remote_stream) = a { + match remote_stream { + Ok((sendstream, recvstream)) => { + //FIXME TODO + let id = Some(0); // get real ID + drop(sendstream); // not drop it! + spawn_new(recvstream, id, &self.streams_s); + }, + Err(_) => return Err(ProtocolError::Closed), + } + } + + if let Some(data) = b { + break data; + } + }; + + let r = match result { Ok(Some(0)) => Err(ProtocolError::Closed), - Ok(Some(n)) => Ok(QuicDataFormat{stream: QuicDataFormatStream::Main, data: self.buffer.split_to(n)}), + Ok(Some(n)) => Ok(QuicDataFormat { + stream: match id { + Some(id) => QuicDataFormatStream::Reliable(id), + None => QuicDataFormatStream::Main, + }, + data: buffer.split_to(n), + }), Ok(None) => Err(ProtocolError::Closed), Err(_) => Err(ProtocolError::Closed), - } + }?; + + + let streams_s_clone = self.streams_s.clone(); + tokio::spawn(async move { + buffer.resize(1500, 0u8); + let r = recvstream.read(&mut buffer).await; + let _ = streams_s_clone.send((buffer, r, recvstream, id)); + }); + Ok(r) } } diff --git a/network/src/lib.rs b/network/src/lib.rs index 448b50f41c..70b68fbafb 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -13,14 +13,14 @@ //! Say you have an application that wants to communicate with other application //! over a Network or on the same computer. Now each application instances the //! struct [`Network`] once with a new [`Pid`]. The Pid is necessary to identify -//! other [`Networks`] over the network protocols (e.g. TCP, UDP) +//! other [`Networks`] over the network protocols (e.g. TCP, UDP, QUIC, MPSC) //! -//! To connect to another application, you must know it's [`ProtocolAddr`]. One +//! To connect to another application, you must know it's [`ConnectAddr`]. One //! side will call [`connect`], the other [`connected`]. If successful both //! applications will now get a [`Participant`]. //! //! This [`Participant`] represents the connection between those 2 applications. -//! over the respective [`ProtocolAddr`] and with it the chosen network +//! over the respective [`ConnectAddr`] and with it the chosen network //! protocol. However messages can't be send directly via [`Participants`], //! instead you must open a [`Stream`] on it. Like above, one side has to call //! [`open`], the other [`opened`]. [`Streams`] can have a different priority @@ -41,14 +41,14 @@ //! ```rust //! use std::sync::Arc; //! use tokio::{join, runtime::Runtime, time::sleep}; -//! use veloren_network::{Network, Pid, Promises, ProtocolAddr}; +//! use veloren_network::{ConnectAddr, ListenAddr, Network, Pid, Promises}; //! //! // Client //! async fn client(runtime: &Runtime) -> std::result::Result<(), Box> { //! sleep(std::time::Duration::from_secs(1)).await; // `connect` MUST be after `listen` //! let client_network = Network::new(Pid::new(), runtime); //! let server = client_network -//! .connect(ProtocolAddr::Tcp("127.0.0.1:12345".parse().unwrap())) +//! .connect(ConnectAddr::Tcp("127.0.0.1:12345".parse().unwrap())) //! .await?; //! let mut stream = server //! .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0) @@ -61,7 +61,7 @@ //! async fn server(runtime: &Runtime) -> std::result::Result<(), Box> { //! let server_network = Network::new(Pid::new(), runtime); //! server_network -//! .listen(ProtocolAddr::Tcp("127.0.0.1:12345".parse().unwrap())) +//! .listen(ListenAddr::Tcp("127.0.0.1:12345".parse().unwrap())) //! .await?; //! let client = server_network.connected().await?; //! let mut stream = client.opened().await?; @@ -95,7 +95,8 @@ //! [`send`]: crate::api::Stream::send //! [`recv`]: crate::api::Stream::recv //! [`Pid`]: network_protocol::Pid -//! [`ProtocolAddr`]: crate::api::ProtocolAddr +//! [`ListenAddr`]: crate::api::ListenAddr +//! [`ConnectAddr`]: crate::api::ConnectAddr //! [`Promises`]: network_protocol::Promises mod api; @@ -107,8 +108,8 @@ mod scheduler; mod util; pub use api::{ - Network, NetworkConnectError, NetworkError, Participant, ParticipantError, ProtocolAddr, - Stream, StreamError, StreamParams, + ConnectAddr, ListenAddr, Network, NetworkConnectError, NetworkError, Participant, + ParticipantError, Stream, StreamError, StreamParams, }; pub use message::Message; pub use network_protocol::{InitProtocolError, Pid, Promises}; diff --git a/network/src/message.rs b/network/src/message.rs index bc81e25802..5c0029cf16 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -70,7 +70,7 @@ impl Message { /// /// # Example /// ``` - /// # use veloren_network::{Network, ProtocolAddr, Pid}; + /// # use veloren_network::{Network, ListenAddr, ConnectAddr, Pid}; /// # use veloren_network::Promises; /// # use tokio::runtime::Runtime; /// # use std::sync::Arc; @@ -81,8 +81,8 @@ impl Message { /// # let network = Network::new(Pid::new(), &runtime); /// # let remote = Network::new(Pid::new(), &runtime); /// # runtime.block_on(async { - /// # network.listen(ProtocolAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?; - /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?; + /// # network.listen(ListenAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?; + /// # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?; /// # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// # stream_p.send("Hello World"); /// # let participant_a = network.connected().await?; diff --git a/network/src/metrics.rs b/network/src/metrics.rs index c46fe16bda..d532347140 100644 --- a/network/src/metrics.rs +++ b/network/src/metrics.rs @@ -1,8 +1,29 @@ -use crate::api::ProtocolAddr; +use crate::api::{ConnectAddr, ListenAddr}; use network_protocol::{Cid, Pid}; #[cfg(feature = "metrics")] use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; -use std::error::Error; +use std::{error::Error, net::SocketAddr}; + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub(crate) enum ProtocolInfo { + Tcp(SocketAddr), + Udp(SocketAddr), + #[cfg(feature = "quic")] + Quic(SocketAddr), + Mpsc(u64), +} + +impl From for ProtocolInfo { + fn from(other: ListenAddr) -> ProtocolInfo { + match other { + ListenAddr::Tcp(s) => ProtocolInfo::Tcp(s), + ListenAddr::Udp(s) => ProtocolInfo::Udp(s), + #[cfg(feature = "quic")] + ListenAddr::Quic(s, _) => ProtocolInfo::Quic(s), + ListenAddr::Mpsc(s) => ProtocolInfo::Mpsc(s), + } + } +} /// 1:1 relation between NetworkMetrics and Network #[cfg(feature = "metrics")] @@ -154,9 +175,9 @@ impl NetworkMetrics { Ok(()) } - pub(crate) fn connect_requests_cache(&self, protocol: &ProtocolAddr) -> prometheus::IntCounter { + pub(crate) fn connect_requests_cache(&self, protocol: &ListenAddr) -> prometheus::IntCounter { self.incoming_connections_total - .with_label_values(&[protocol_name(protocol)]) + .with_label_values(&[protocollisten_name(protocol)]) } pub(crate) fn channels_connected(&self, remote_p: &str, no: usize, cid: Cid) { @@ -192,15 +213,15 @@ impl NetworkMetrics { .inc(); } - pub(crate) fn listen_request(&self, protocol: &ProtocolAddr) { + pub(crate) fn listen_request(&self, protocol: &ListenAddr) { self.listen_requests_total - .with_label_values(&[protocol_name(protocol)]) + .with_label_values(&[protocollisten_name(protocol)]) .inc(); } - pub(crate) fn connect_request(&self, protocol: &ProtocolAddr) { + pub(crate) fn connect_request(&self, protocol: &ConnectAddr) { self.connect_requests_total - .with_label_values(&[protocol_name(protocol)]) + .with_label_values(&[protocolconnect_name(protocol)]) .inc(); } @@ -225,11 +246,22 @@ impl NetworkMetrics { } #[cfg(feature = "metrics")] -fn protocol_name(protocol: &ProtocolAddr) -> &str { +fn protocolconnect_name(protocol: &ConnectAddr) -> &str { match protocol { - ProtocolAddr::Tcp(_) => "tcp", - ProtocolAddr::Udp(_) => "udp", - ProtocolAddr::Mpsc(_) => "mpsc", + ConnectAddr::Tcp(_) => "tcp", + ConnectAddr::Udp(_) => "udp", + ConnectAddr::Mpsc(_) => "mpsc", + ConnectAddr::Quic(_, _, _) => "quic", + } +} + +#[cfg(feature = "metrics")] +fn protocollisten_name(protocol: &ListenAddr) -> &str { + match protocol { + ListenAddr::Tcp(_) => "tcp", + ListenAddr::Udp(_) => "udp", + ListenAddr::Mpsc(_) => "mpsc", + ListenAddr::Quic(_, _) => "quic", } } @@ -247,9 +279,9 @@ impl NetworkMetrics { pub(crate) fn streams_closed(&self, _remote_p: &str) {} - pub(crate) fn listen_request(&self, _protocol: &ProtocolAddr) {} + pub(crate) fn listen_request(&self, _protocol: &ListenAddr) {} - pub(crate) fn connect_request(&self, _protocol: &ProtocolAddr) {} + pub(crate) fn connect_request(&self, _protocol: &ConnectAddr) {} pub(crate) fn cleanup_participant(&self, _remote_p: &str) {} } diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 11a2a0f774..475e34371f 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -1,7 +1,7 @@ use crate::{ - api::{NetworkConnectError, Participant, ProtocolAddr}, + api::{ConnectAddr, ListenAddr, NetworkConnectError, Participant}, channel::Protocols, - metrics::NetworkMetrics, + metrics::{NetworkMetrics, ProtocolInfo}, participant::{B2sPrioStatistic, BParticipant, S2bCreateChannel, S2bShutdownBparticipant}, }; use futures_util::{FutureExt, StreamExt}; @@ -46,9 +46,9 @@ struct ParticipantInfo { s2b_shutdown_bparticipant_s: Option>, } -type A2sListen = (ProtocolAddr, oneshot::Sender>); +type A2sListen = (ListenAddr, oneshot::Sender>); pub(crate) type A2sConnect = ( - ProtocolAddr, + ConnectAddr, oneshot::Sender>, ); type A2sDisconnect = (Pid, S2bShutdownBparticipant); @@ -82,7 +82,7 @@ pub struct Scheduler { participant_channels: Arc>>, participants: Arc>>, channel_ids: Arc, - channel_listener: Mutex>>, + channel_listener: Mutex>>, metrics: Arc, protocol_metrics: Arc, } @@ -182,7 +182,7 @@ impl Scheduler { self.channel_listener .lock() .await - .insert(address.clone(), end_sender); + .insert(address.clone().into(), end_sender); self.channel_creator(address, end_receiver, s2a_listen_result_s) .await; } @@ -198,7 +198,7 @@ impl Scheduler { let metrics = Arc::clone(&self.protocol_metrics); self.metrics.connect_request(&addr); let (protocol, handshake) = match addr { - ProtocolAddr::Tcp(addr) => { + ConnectAddr::Tcp(addr) => { let stream = match net::TcpStream::connect(addr).await { Ok(stream) => stream, Err(e) => { @@ -209,7 +209,21 @@ impl Scheduler { info!("Connecting Tcp to: {}", stream.peer_addr().unwrap()); (Protocols::new_tcp(stream, cid, metrics), false) }, - ProtocolAddr::Mpsc(addr) => { + #[cfg(feature = "quic")] + ConnectAddr::Quic(addr, ref config, name) => { + let config = config.clone(); + let endpoint = quinn::Endpoint::builder(); + let (endpoint, _) = endpoint.bind(&"[::]:0".parse().unwrap()).expect("FIXME"); + + let connecting = endpoint.connect_with(config, &addr, &name).expect("FIXME"); + let connection = connecting.await.expect("FIXME"); + ( + Protocols::new_quic(connection, false, cid, metrics).await.unwrap(), + false, + ) + //pid_sender.send(Ok(())).unwrap(); + }, + ConnectAddr::Mpsc(addr) => { let mpsc_s = match MPSC_POOL.lock().await.get(&addr) { Some(s) => s.clone(), None => { @@ -236,7 +250,7 @@ impl Scheduler { ) }, /* */ - //ProtocolAddr::Udp(addr) => { + //ProtocolConnectAddr::Udp(addr) => { //#[cfg(feature = "metrics")] //self.metrics //.connect_requests_total @@ -386,7 +400,7 @@ impl Scheduler { async fn channel_creator( &self, - addr: ProtocolAddr, + addr: ListenAddr, s2s_stop_listening_r: oneshot::Receiver<()>, s2a_listen_result_s: oneshot::Sender>, ) { @@ -394,7 +408,7 @@ impl Scheduler { #[cfg(feature = "metrics")] let mcache = self.metrics.connect_requests_cache(&addr); match addr { - ProtocolAddr::Tcp(addr) => { + ListenAddr::Tcp(addr) => { let listener = match net::TcpListener::bind(addr).await { Ok(listener) => { s2a_listen_result_s.send(Ok(())).unwrap(); @@ -432,10 +446,10 @@ impl Scheduler { } }, #[cfg(feature = "quic")] - ProtocolAddr::Quic(addr, server_config) => { + ListenAddr::Quic(addr, ref server_config) => { let mut endpoint = quinn::Endpoint::builder(); - endpoint.listen(server_config); - let (endpoint, mut listener) = match endpoint.bind(&addr) { + endpoint.listen(server_config.clone()); + let (_endpoint, mut listener) = match endpoint.bind(&addr) { Ok((endpoint, listener)) => { s2a_listen_result_s.send(Ok(())).unwrap(); (endpoint, listener) @@ -468,11 +482,18 @@ impl Scheduler { mcache.inc(); let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed); info!(?remote_addr, ?cid, "Accepting Quic from"); - self.init_protocol(Protocols::new_quic(connection, cid, Arc::clone(&self.protocol_metrics)), cid, None, true) + let quic = match Protocols::new_quic(connection, true, cid, Arc::clone(&self.protocol_metrics)).await { + Ok(quic) => quic, + Err(e) => { + trace!(?e, "failed to start quic"); + continue; + } + }; + self.init_protocol(quic, cid, None, true) .await; } }, - ProtocolAddr::Mpsc(addr) => { + ListenAddr::Mpsc(addr) => { let (mpsc_s, mut mpsc_r) = mpsc::unbounded_channel(); MPSC_POOL.lock().await.insert(addr, mpsc_s); s2a_listen_result_s.send(Ok(())).unwrap(); @@ -494,7 +515,7 @@ impl Scheduler { } warn!("MpscStream Failed, stopping"); },/* - ProtocolAddr::Udp(addr) => { + ProtocolListenAddr::Udp(addr) => { let socket = match net::UdpSocket::bind(addr).await { Ok(socket) => { s2a_listen_result_s.send(Ok(())).unwrap(); diff --git a/network/tests/closing.rs b/network/tests/closing.rs index 7d6a2cb0ee..100e84e544 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -347,8 +347,8 @@ fn open_participant_before_remote_part_is_closed() { let n_a = Network::new(Pid::fake(0), &r); let n_b = Network::new(Pid::fake(1), &r); let addr = tcp(); - r.block_on(n_a.listen(addr.clone())).unwrap(); - let p_b = r.block_on(n_b.connect(addr)).unwrap(); + r.block_on(n_a.listen(addr.0)).unwrap(); + let p_b = r.block_on(n_b.connect(addr.1)).unwrap(); let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap(); s1_b.send("HelloWorld").unwrap(); let p_a = r.block_on(n_a.connected()).unwrap(); @@ -367,8 +367,8 @@ fn open_participant_after_remote_part_is_closed() { let n_a = Network::new(Pid::fake(0), &r); let n_b = Network::new(Pid::fake(1), &r); let addr = tcp(); - r.block_on(n_a.listen(addr.clone())).unwrap(); - let p_b = r.block_on(n_b.connect(addr)).unwrap(); + r.block_on(n_a.listen(addr.0)).unwrap(); + let p_b = r.block_on(n_b.connect(addr.1)).unwrap(); let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap(); s1_b.send("HelloWorld").unwrap(); drop(s1_b); @@ -387,8 +387,8 @@ fn close_network_scheduler_completely() { let n_a = Network::new(Pid::fake(0), &r); let n_b = Network::new(Pid::fake(1), &r); let addr = tcp(); - r.block_on(n_a.listen(addr.clone())).unwrap(); - let p_b = r.block_on(n_b.connect(addr)).unwrap(); + r.block_on(n_a.listen(addr.0)).unwrap(); + let p_b = r.block_on(n_b.connect(addr.1)).unwrap(); let mut s1_b = r.block_on(p_b.open(4, Promises::empty(), 0)).unwrap(); s1_b.send("HelloWorld").unwrap(); diff --git a/network/tests/helper.rs b/network/tests/helper.rs index 68d5cebd87..9e78928f55 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -11,7 +11,7 @@ use std::{ use tokio::runtime::Runtime; use tracing::*; use tracing_subscriber::EnvFilter; -use veloren_network::{Network, Participant, Pid, Promises, ProtocolAddr, Stream}; +use veloren_network::{ConnectAddr, ListenAddr, Network, Participant, Pid, Promises, Stream}; #[allow(dead_code)] pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) { @@ -47,7 +47,7 @@ pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) { #[allow(dead_code)] pub fn network_participant_stream( - addr: ProtocolAddr, + addr: (ListenAddr, ConnectAddr), ) -> ( Arc, Network, @@ -62,11 +62,11 @@ pub fn network_participant_stream( let n_a = Network::new(Pid::fake(0), &runtime); let n_b = Network::new(Pid::fake(1), &runtime); - n_a.listen(addr.clone()).await.unwrap(); - let p1_b = n_b.connect(addr).await.unwrap(); + n_a.listen(addr.0).await.unwrap(); + let p1_b = n_b.connect(addr.1).await.unwrap(); let p1_a = n_a.connected().await.unwrap(); - let s1_a = p1_a.open(4, Promises::empty(), 0).await.unwrap(); + let s1_a = p1_a.open(4, Promises::ORDERED, 0).await.unwrap(); let s1_b = p1_b.opened().await.unwrap(); (n_a, p1_a, s1_a, n_b, p1_b, s1_b) @@ -75,28 +75,76 @@ pub fn network_participant_stream( } #[allow(dead_code)] -pub fn tcp() -> ProtocolAddr { +pub fn tcp() -> (ListenAddr, ConnectAddr) { lazy_static! { static ref PORTS: AtomicU16 = AtomicU16::new(5000); } let port = PORTS.fetch_add(1, Ordering::Relaxed); - ProtocolAddr::Tcp(SocketAddr::from(([127, 0, 0, 1], port))) + ( + ListenAddr::Tcp(SocketAddr::from(([127, 0, 0, 1], port))), + ConnectAddr::Tcp(SocketAddr::from(([127, 0, 0, 1], port))), + ) +} + +lazy_static! { + static ref UDP_PORTS: AtomicU16 = AtomicU16::new(5000); } #[allow(dead_code)] -pub fn udp() -> ProtocolAddr { - lazy_static! { - static ref PORTS: AtomicU16 = AtomicU16::new(5000); - } - let port = PORTS.fetch_add(1, Ordering::Relaxed); - ProtocolAddr::Udp(SocketAddr::from(([127, 0, 0, 1], port))) +pub fn quic() -> (ListenAddr, ConnectAddr) { + const LOCALHOST: &str = "localhost"; + let port = UDP_PORTS.fetch_add(1, Ordering::Relaxed); + + let transport_config = quinn::TransportConfig::default(); + let mut server_config = quinn::ServerConfig::default(); + server_config.transport = Arc::new(transport_config); + let mut server_config = quinn::ServerConfigBuilder::new(server_config); + server_config.protocols(&[b"veloren"]); + + trace!("generating self-signed certificate"); + let cert = rcgen::generate_simple_self_signed(vec![LOCALHOST.into()]).unwrap(); + let key = cert.serialize_private_key_der(); + let cert = cert.serialize_der().unwrap(); + + let key = quinn::PrivateKey::from_der(&key).expect("private key failed"); + let cert = quinn::Certificate::from_der(&cert).expect("cert failed"); + server_config + .certificate(quinn::CertificateChain::from_certs(vec![cert.clone()]), key) + .expect("set cert failed"); + + let server_config = server_config.build(); + + let mut client_config = quinn::ClientConfigBuilder::default(); + client_config.protocols(&[b"veloren"]); + client_config + .add_certificate_authority(cert) + .expect("adding certificate failed"); + + let client_config = client_config.build(); + ( + ListenAddr::Quic(SocketAddr::from(([127, 0, 0, 1], port)), server_config), + ConnectAddr::Quic( + SocketAddr::from(([127, 0, 0, 1], port)), + client_config, + LOCALHOST.to_owned(), + ), + ) } #[allow(dead_code)] -pub fn mpsc() -> ProtocolAddr { +pub fn udp() -> (ListenAddr, ConnectAddr) { + let port = UDP_PORTS.fetch_add(1, Ordering::Relaxed); + ( + ListenAddr::Udp(SocketAddr::from(([127, 0, 0, 1], port))), + ConnectAddr::Udp(SocketAddr::from(([127, 0, 0, 1], port))), + ) +} + +#[allow(dead_code)] +pub fn mpsc() -> (ListenAddr, ConnectAddr) { lazy_static! { static ref PORTS: AtomicU64 = AtomicU64::new(5000); } let port = PORTS.fetch_add(1, Ordering::Relaxed); - ProtocolAddr::Mpsc(port) + (ListenAddr::Mpsc(port), ConnectAddr::Mpsc(port)) } diff --git a/network/tests/integration.rs b/network/tests/integration.rs index 93534ac082..e81530b4f0 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -2,9 +2,9 @@ use std::sync::Arc; use tokio::runtime::Runtime; use veloren_network::{NetworkError, StreamError}; mod helper; -use helper::{mpsc, network_participant_stream, tcp, udp}; +use helper::{mpsc, network_participant_stream, quic, tcp, udp}; use std::io::ErrorKind; -use veloren_network::{Network, Pid, Promises, ProtocolAddr}; +use veloren_network::{ConnectAddr, ListenAddr, Network, Pid, Promises}; #[test] #[ignore] @@ -73,6 +73,30 @@ fn stream_simple_mpsc_3msg() { drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown } +#[test] +fn stream_simple_quic() { + let (_, _) = helper::setup(false, 0); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(quic()); + + s1_a.send("Hello World").unwrap(); + assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); + drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown +} + +#[test] +fn stream_simple_quic_3msg() { + let (_, _) = helper::setup(true, 0); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(quic()); + + s1_a.send("Hello World").unwrap(); + s1_a.send(1337).unwrap(); + assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); + assert_eq!(r.block_on(s1_b.recv()), Ok(1337)); + s1_a.send("3rdMessage").unwrap(); + assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string())); + drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown +} + #[test] #[ignore] fn stream_simple_udp() { @@ -110,16 +134,16 @@ fn tcp_and_udp_2_connections() -> std::result::Result<(), Box std::result::Result<(), Box (), _ => panic!(), @@ -170,10 +194,10 @@ fn api_stream_send_main() -> std::result::Result<(), Box> let network = network; let remote = remote; network - .listen(ProtocolAddr::Tcp("127.0.0.1:1200".parse().unwrap())) + .listen(ListenAddr::Tcp("127.0.0.1:1200".parse().unwrap())) .await?; let remote_p = remote - .connect(ProtocolAddr::Tcp("127.0.0.1:1200".parse().unwrap())) + .connect(ConnectAddr::Tcp("127.0.0.1:1200".parse().unwrap())) .await?; // keep it alive let _stream_p = remote_p @@ -199,10 +223,10 @@ fn api_stream_recv_main() -> std::result::Result<(), Box> let network = network; let remote = remote; network - .listen(ProtocolAddr::Tcp("127.0.0.1:1220".parse().unwrap())) + .listen(ListenAddr::Tcp("127.0.0.1:1220".parse().unwrap())) .await?; let remote_p = remote - .connect(ProtocolAddr::Tcp("127.0.0.1:1220".parse().unwrap())) + .connect(ConnectAddr::Tcp("127.0.0.1:1220".parse().unwrap())) .await?; let mut stream_p = remote_p .open(4, Promises::ORDERED | Promises::CONSISTENCY, 0) diff --git a/server/src/lib.rs b/server/src/lib.rs index 692ae02c0e..910f124a13 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -83,7 +83,7 @@ use common_state::plugin::PluginMgr; use common_state::{BuildAreas, State}; use common_systems::add_local_systems; use metrics::{EcsSystemMetrics, PhysicsMetrics, TickMetrics}; -use network::{Network, Pid, ProtocolAddr}; +use network::{ListenAddr, Network, Pid}; use persistence::{ character_loader::{CharacterLoader, CharacterLoaderResponseKind}, character_updater::CharacterUpdater, @@ -386,8 +386,8 @@ impl Server { ) .await }); - runtime.block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?; - runtime.block_on(network.listen(ProtocolAddr::Mpsc(14004)))?; + runtime.block_on(network.listen(ListenAddr::Tcp(settings.gameserver_address)))?; + runtime.block_on(network.listen(ListenAddr::Mpsc(14004)))?; let connection_handler = ConnectionHandler::new(network, &runtime); // Initiate real-time world simulation