diff --git a/client/src/lib.rs b/client/src/lib.rs index e83842c6a7..b9b1de4210 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -189,7 +189,7 @@ impl Client { view_distance: Option, runtime: Arc, ) -> Result { - let network = Network::new(Pid::new(), Arc::clone(&runtime)); + let network = Network::new(Pid::new(), &runtime); let participant = match addr { ConnectionArgs::IpAndPort(addrs) => { diff --git a/network/benches/speed.rs b/network/benches/speed.rs index 76828ee523..6216d4a7a4 100644 --- a/network/benches/speed.rs +++ b/network/benches/speed.rs @@ -117,7 +117,7 @@ criterion_main!(benches); pub fn network_participant_stream( addr: ProtocolAddr, ) -> ( - Arc, + Runtime, Network, Participant, Stream, @@ -125,10 +125,10 @@ pub fn network_participant_stream( Participant, Stream, ) { - let runtime = Arc::new(Runtime::new().unwrap()); + let runtime = Runtime::new().unwrap(); let (n_a, p1_a, s1_a, n_b, p1_b, s1_b) = runtime.block_on(async { - let n_a = Network::new(Pid::fake(0), Arc::clone(&runtime)); - let n_b = Network::new(Pid::fake(1), Arc::clone(&runtime)); + let n_a = Network::new(Pid::fake(0), &runtime); + let n_b = Network::new(Pid::fake(1), &runtime); n_a.listen(addr.clone()).await.unwrap(); let p1_b = n_b.connect(addr).await.unwrap(); diff --git a/network/examples/chat.rs b/network/examples/chat.rs index 9fb0c60eee..8746479f73 100644 --- a/network/examples/chat.rs +++ b/network/examples/chat.rs @@ -100,7 +100,7 @@ fn main() { fn server(address: ProtocolAddr) { let r = Arc::new(Runtime::new().unwrap()); - let server = Network::new(Pid::new(), Arc::clone(&r)); + let server = Network::new(Pid::new(), &r); let server = Arc::new(server); let participants = Arc::new(RwLock::new(Vec::new())); r.block_on(async { @@ -146,7 +146,7 @@ async fn client_connection( fn client(address: ProtocolAddr) { let r = Arc::new(Runtime::new().unwrap()); - let client = Network::new(Pid::new(), Arc::clone(&r)); + let client = Network::new(Pid::new(), &r); r.block_on(async { let p1 = client.connect(address.clone()).await.unwrap(); //remote representation of p1 diff --git a/network/examples/fileshare/server.rs b/network/examples/fileshare/server.rs index ba3e666c5e..252ebdf32c 100644 --- a/network/examples/fileshare/server.rs +++ b/network/examples/fileshare/server.rs @@ -27,7 +27,7 @@ impl Server { pub fn new(runtime: Arc) -> (Self, mpsc::UnboundedSender) { let (command_sender, command_receiver) = mpsc::unbounded_channel(); - let network = Network::new(Pid::new(), runtime); + let network = Network::new(Pid::new(), &runtime); let run_channels = Some(ControlChannels { command_receiver }); ( diff --git a/network/examples/network-speed/main.rs b/network/examples/network-speed/main.rs index 2bab12af5e..d67e1a6c4b 100644 --- a/network/examples/network-speed/main.rs +++ b/network/examples/network-speed/main.rs @@ -123,7 +123,7 @@ fn main() { fn server(address: ProtocolAddr, runtime: Arc) { let registry = Arc::new(Registry::new()); - let server = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), ®istry); + let server = Network::new_with_registry(Pid::new(), &runtime, ®istry); runtime.spawn(Server::run( Arc::clone(®istry), SocketAddr::from(([0; 4], 59112)), @@ -155,7 +155,7 @@ fn server(address: ProtocolAddr, runtime: Arc) { fn client(address: ProtocolAddr, runtime: Arc) { let registry = Arc::new(Registry::new()); - let client = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), ®istry); + let client = Network::new_with_registry(Pid::new(), &runtime, ®istry); runtime.spawn(Server::run( Arc::clone(®istry), SocketAddr::from(([0; 4], 59111)), diff --git a/network/src/api.rs b/network/src/api.rs index 1a718ffc2f..0d97725a3a 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -129,39 +129,43 @@ pub enum StreamError { /// via their [`ProtocolAddr`], or [`listen`] passively for [`connected`] /// [`Participants`]. /// +/// Too guarantee a clean shutdown, the [`Runtime`] MUST NOT be droped before +/// the Network. +/// /// # Examples /// ```rust -/// # use std::sync::Arc; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, ProtocolAddr, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application -/// let runtime = Arc::new(Runtime::new().unwrap()); -/// let network = Network::new(Pid::new(), Arc::clone(&runtime)); +/// let runtime = Runtime::new().unwrap(); +/// let network = Network::new(Pid::new(), &runtime); /// runtime.block_on(async{ /// # //setup pseudo database! -/// # let database = Network::new(Pid::new(), Arc::clone(&runtime)); +/// # let database = Network::new(Pid::new(), &runtime); /// # database.listen(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?; /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?; /// let database = network.connect(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?; +/// drop(network); +/// # drop(database); /// # Ok(()) /// }) /// # } /// ``` /// /// [`Participants`]: crate::api::Participant +/// [`Runtime`]: tokio::runtime::Runtime /// [`connect`]: Network::connect /// [`listen`]: Network::listen /// [`connected`]: Network::connected pub struct Network { local_pid: Pid, - runtime: Arc, - participant_disconnect_sender: Mutex>, + participant_disconnect_sender: Arc>>, listen_sender: Mutex>)>>, connect_sender: Mutex>, connected_receiver: Mutex>, - shutdown_sender: Option>, + shutdown_network_s: Option>>, } impl Network { @@ -170,7 +174,7 @@ impl Network { /// # Arguments /// * `participant_id` - provide it by calling [`Pid::new()`], usually you /// don't want to reuse a Pid for 2 `Networks` - /// * `runtime` - provide a tokio::Runtime, it's used to internally spawn + /// * `runtime` - provide a [`Runtime`], it's used to internally spawn /// tasks. It is necessary to clean up in the non-async `Drop`. **All** /// network related components **must** be dropped before the runtime is /// stopped. dropping the runtime while a shutdown is still in progress @@ -183,12 +187,11 @@ impl Network { /// /// # Examples /// ```rust - /// use std::sync::Arc; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, ProtocolAddr}; /// /// let runtime = Runtime::new().unwrap(); - /// let network = Network::new(Pid::new(), Arc::new(runtime)); + /// let network = Network::new(Pid::new(), &runtime); /// ``` /// /// Usually you only create a single `Network` for an application, @@ -197,7 +200,8 @@ impl Network { /// creating more. /// /// [`Pid::new()`]: network_protocol::Pid::new - pub fn new(participant_id: Pid, runtime: Arc) -> Self { + /// [`Runtime`]: tokio::runtime::Runtime + pub fn new(participant_id: Pid, runtime: &Runtime) -> Self { Self::internal_new( participant_id, runtime, @@ -215,28 +219,23 @@ impl Network { /// /// # Examples /// ```rust - /// # use std::sync::Arc; /// use prometheus::Registry; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, ProtocolAddr}; /// /// let runtime = Runtime::new().unwrap(); /// let registry = Registry::new(); - /// let network = Network::new_with_registry(Pid::new(), Arc::new(runtime), ®istry); + /// let network = Network::new_with_registry(Pid::new(), &runtime, ®istry); /// ``` /// [`new`]: crate::api::Network::new #[cfg(feature = "metrics")] - pub fn new_with_registry( - participant_id: Pid, - runtime: Arc, - registry: &Registry, - ) -> Self { + pub fn new_with_registry(participant_id: Pid, runtime: &Runtime, registry: &Registry) -> Self { Self::internal_new(participant_id, runtime, Some(registry)) } fn internal_new( participant_id: Pid, - runtime: Arc, + runtime: &Runtime, #[cfg(feature = "metrics")] registry: Option<&Registry>, ) -> Self { let p = participant_id; @@ -248,6 +247,15 @@ impl Network { #[cfg(feature = "metrics")] registry, ); + let participant_disconnect_sender = Arc::new(Mutex::new(HashMap::new())); + let (shutdown_network_s, shutdown_network_r) = oneshot::channel(); + let f = Self::shutdown_mgr( + p, + shutdown_network_r, + Arc::clone(&participant_disconnect_sender), + shutdown_sender, + ); + runtime.spawn(f); runtime.spawn( async move { trace!("Starting scheduler in own thread"); @@ -258,12 +266,11 @@ impl Network { ); Self { local_pid: participant_id, - runtime, - participant_disconnect_sender: Mutex::new(HashMap::new()), + participant_disconnect_sender, listen_sender: Mutex::new(listen_sender), connect_sender: Mutex::new(connect_sender), connected_receiver: Mutex::new(connected_receiver), - shutdown_sender: Some(shutdown_sender), + shutdown_network_s: Some(shutdown_network_s), } } @@ -276,14 +283,13 @@ impl Network { /// /// # Examples /// ```ignore - /// # use std::sync::Arc; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, ProtocolAddr}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally - /// let runtime = Arc::new(Runtime::new().unwrap()); - /// let network = Network::new(Pid::new(), Arc::clone(&runtime)); + /// let runtime = Runtime::new().unwrap(); + /// let network = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network /// .listen(ProtocolAddr::Tcp("127.0.0.1:2000".parse().unwrap())) @@ -291,6 +297,7 @@ impl Network { /// network /// .listen(ProtocolAddr::Udp("127.0.0.1:2001".parse().unwrap())) /// .await?; + /// drop(network); /// # Ok(()) /// }) /// # } @@ -318,15 +325,14 @@ impl Network { /// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g. /// can't connect, or invalid Handshake) # Examples /// ```ignore - /// # use std::sync::Arc; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, ProtocolAddr}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above - /// let runtime = Arc::new(Runtime::new().unwrap()); - /// let network = Network::new(Pid::new(), Arc::clone(&runtime)); - /// # let remote = Network::new(Pid::new(), Arc::clone(&runtime)); + /// let runtime = Runtime::new().unwrap(); + /// let network = Network::new(Pid::new(), &runtime); + /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2010".parse().unwrap())).await?; /// # remote.listen(ProtocolAddr::Udp("127.0.0.1:2011".parse().unwrap())).await?; @@ -341,7 +347,10 @@ impl Network { /// .await?; /// assert_eq!(&p1, &p2); /// # Ok(()) - /// }) + /// })?; + /// drop(network); + /// # drop(remote); + /// # Ok(()) /// # } /// ``` /// Usually the `Network` guarantees that a operation on a [`Participant`] @@ -382,15 +391,14 @@ impl Network { /// /// # Examples /// ```rust - /// # use std::sync::Arc; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, ProtocolAddr}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2020` TCP and opens returns their Pid - /// let runtime = Arc::new(Runtime::new().unwrap()); - /// let network = Network::new(Pid::new(), Arc::clone(&runtime)); - /// # let remote = Network::new(Pid::new(), Arc::clone(&runtime)); + /// let runtime = Runtime::new().unwrap(); + /// let network = Network::new(Pid::new(), &runtime); + /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network /// .listen(ProtocolAddr::Tcp("127.0.0.1:2020".parse().unwrap())) @@ -401,6 +409,8 @@ impl Network { /// # //skip test here as it would be a endless loop /// # break; /// } + /// drop(network); + /// # drop(remote); /// # Ok(()) /// }) /// # } @@ -417,6 +427,58 @@ impl Network { ); Ok(participant) } + + /// Use a mgr to handle shutdown smoothly and not in `Drop` + #[instrument(name="network", skip(participant_disconnect_sender, shutdown_scheduler_s), fields(p = %local_pid))] + async fn shutdown_mgr( + local_pid: Pid, + shutdown_network_r: oneshot::Receiver>, + participant_disconnect_sender: Arc>>, + shutdown_scheduler_s: oneshot::Sender<()>, + ) { + trace!("waiting for shutdown triggerNetwork"); + let return_s = shutdown_network_r.await; + trace!("Shutting down Participants of Network"); + let mut finished_receiver_list = vec![]; + + for (remote_pid, a2s_disconnect_s) in participant_disconnect_sender.lock().await.drain() { + match a2s_disconnect_s.lock().await.take() { + Some(a2s_disconnect_s) => { + trace!(?remote_pid, "Participants will be closed"); + let (finished_sender, finished_receiver) = oneshot::channel(); + finished_receiver_list.push((remote_pid, finished_receiver)); + a2s_disconnect_s + .send((remote_pid, (Duration::from_secs(120), finished_sender))) + .expect("Scheduler is closed, but nobody other should be able to close it"); + }, + None => trace!(?remote_pid, "Participant already disconnected gracefully"), + } + } + //wait after close is requested for all + for (remote_pid, finished_receiver) in finished_receiver_list.drain(..) { + match finished_receiver.await { + Ok(Ok(())) => trace!(?remote_pid, "disconnect successful"), + Ok(Err(e)) => info!(?remote_pid, ?e, "unclean disconnect"), + Err(e) => warn!( + ?remote_pid, + ?e, + "Failed to get a message back from the scheduler, seems like the network is \ + already closed" + ), + } + } + + trace!("Participants have shut down - next: Scheduler"); + shutdown_scheduler_s + .send(()) + .expect("Scheduler is closed, but nobody other should be able to close it"); + if let Ok(return_s) = return_s { + if return_s.send(()).is_err() { + warn!("Network::drop stoped after a timeout and didn't wait for our shutdown"); + }; + } + debug!("Network has shut down"); + } } impl Participant { @@ -456,15 +518,14 @@ impl Participant { /// /// # Examples /// ```rust - /// # use std::sync::Arc; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, Promises, ProtocolAddr}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port 2100 and open a stream - /// let runtime = Arc::new(Runtime::new().unwrap()); - /// let network = Network::new(Pid::new(), Arc::clone(&runtime)); - /// # let remote = Network::new(Pid::new(), Arc::clone(&runtime)); + /// let runtime = Runtime::new().unwrap(); + /// let network = Network::new(Pid::new(), &runtime); + /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap())).await?; /// let p1 = network @@ -473,6 +534,8 @@ impl Participant { /// let _s1 = p1 /// .open(4, Promises::ORDERED | Promises::CONSISTENCY, 1000) /// .await?; + /// drop(network); + /// # drop(remote); /// # Ok(()) /// }) /// # } @@ -522,22 +585,23 @@ impl Participant { /// /// # Examples /// ```rust - /// # use std::sync::Arc; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, ProtocolAddr, Promises}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, connect on port 2110 and wait for the other side to open a stream /// // Note: It's quite unusual to actively connect, but then wait on a stream to be connected, usually the Application taking initiative want's to also create the first Stream. - /// let runtime = Arc::new(Runtime::new().unwrap()); - /// let network = Network::new(Pid::new(), Arc::clone(&runtime)); - /// # let remote = Network::new(Pid::new(), Arc::clone(&runtime)); + /// let runtime = Runtime::new().unwrap(); + /// let network = Network::new(Pid::new(), &runtime); + /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?; /// let p1 = network.connect(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?; /// # let p2 = remote.connected().await?; /// # p2.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?; /// let _s1 = p1.opened().await?; + /// drop(network); + /// # drop(remote); /// # Ok(()) /// }) /// # } @@ -578,16 +642,15 @@ impl Participant { /// /// # Examples /// ```rust - /// # use std::sync::Arc; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, Pid, ProtocolAddr}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection. - /// let runtime = Arc::new(Runtime::new().unwrap()); - /// let network = Network::new(Pid::new(), Arc::clone(&runtime)); - /// # let remote = Network::new(Pid::new(), Arc::clone(&runtime)); - /// runtime.block_on(async { + /// let runtime = Runtime::new().unwrap(); + /// let network = Network::new(Pid::new(), &runtime); + /// # let remote = Network::new(Pid::new(), &runtime); + /// let err = runtime.block_on(async { /// network /// .listen(ProtocolAddr::Tcp("127.0.0.1:2030".parse().unwrap())) /// .await?; @@ -599,7 +662,10 @@ impl Participant { /// # break; /// } /// # Ok(()) - /// }) + /// }); + /// drop(network); + /// # drop(remote); + /// # err /// # } /// ``` /// @@ -708,15 +774,14 @@ impl Stream { /// # Example /// ``` /// # use veloren_network::Promises; - /// # use std::sync::Arc; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, ProtocolAddr, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World` - /// let runtime = Arc::new(Runtime::new().unwrap()); - /// let network = Network::new(Pid::new(), Arc::clone(&runtime)); - /// # let remote = Network::new(Pid::new(), Arc::clone(&runtime)); + /// let runtime = Runtime::new().unwrap(); + /// let network = Network::new(Pid::new(), &runtime); + /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?; @@ -726,6 +791,8 @@ impl Stream { /// let mut stream_a = participant_a.opened().await?; /// //Send Message /// stream_a.send("Hello World")?; + /// drop(network); + /// # drop(remote); /// # Ok(()) /// }) /// # } @@ -748,16 +815,15 @@ impl Stream { /// # Example /// ```rust /// # use veloren_network::Promises; - /// # use std::sync::Arc; /// use tokio::runtime::Runtime; /// use bincode; /// use veloren_network::{Network, ProtocolAddr, Pid, Message}; /// /// # fn main() -> std::result::Result<(), Box> { - /// let runtime = Arc::new(Runtime::new().unwrap()); - /// let network = Network::new(Pid::new(), Arc::clone(&runtime)); - /// # let remote1 = Network::new(Pid::new(), Arc::clone(&runtime)); - /// # let remote2 = Network::new(Pid::new(), Arc::clone(&runtime)); + /// let runtime = Runtime::new().unwrap(); + /// let network = Network::new(Pid::new(), &runtime); + /// # let remote1 = Network::new(Pid::new(), &runtime); + /// # let remote2 = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; /// # let remote1_p = remote1.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?; @@ -775,6 +841,9 @@ impl Stream { /// //Send same Message to multiple Streams /// stream_a.send_raw(&msg); /// stream_b.send_raw(&msg); + /// drop(network); + /// # drop(remote1); + /// # drop(remote2); /// # Ok(()) /// }) /// # } @@ -806,15 +875,14 @@ impl Stream { /// # Example /// ``` /// # use veloren_network::Promises; - /// # use std::sync::Arc; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, ProtocolAddr, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it - /// let runtime = Arc::new(Runtime::new().unwrap()); - /// let network = Network::new(Pid::new(), Arc::clone(&runtime)); - /// # let remote = Network::new(Pid::new(), Arc::clone(&runtime)); + /// let runtime = Runtime::new().unwrap(); + /// let network = Network::new(Pid::new(), &runtime); + /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?; @@ -824,6 +892,8 @@ impl Stream { /// let mut stream_a = participant_a.opened().await?; /// //Recv Message /// println!("{}", stream_a.recv::().await?); + /// drop(network); + /// # drop(remote); /// # Ok(()) /// }) /// # } @@ -839,15 +909,14 @@ impl Stream { /// # Example /// ``` /// # use veloren_network::Promises; - /// # use std::sync::Arc; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, ProtocolAddr, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2230` and wait for a Stream to be opened, then listen on it - /// let runtime = Arc::new(Runtime::new().unwrap()); - /// let network = Network::new(Pid::new(), Arc::clone(&runtime)); - /// # let remote = Network::new(Pid::new(), Arc::clone(&runtime)); + /// let runtime = Runtime::new().unwrap(); + /// let network = Network::new(Pid::new(), &runtime); + /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?; @@ -859,6 +928,8 @@ impl Stream { /// let msg = stream_a.recv_raw().await?; /// //Resend Message, without deserializing /// stream_a.send_raw(&msg)?; + /// drop(network); + /// # drop(remote); /// # Ok(()) /// }) /// # } @@ -894,15 +965,14 @@ impl Stream { /// # Example /// ``` /// # use veloren_network::Promises; - /// # use std::sync::Arc; /// use tokio::runtime::Runtime; /// use veloren_network::{Network, ProtocolAddr, Pid}; /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2240` and wait for a Stream to be opened, then listen on it - /// let runtime = Arc::new(Runtime::new().unwrap()); - /// let network = Network::new(Pid::new(), Arc::clone(&runtime)); - /// # let remote = Network::new(Pid::new(), Arc::clone(&runtime)); + /// let runtime = Runtime::new().unwrap(); + /// let network = Network::new(Pid::new(), &runtime); + /// # let remote = Network::new(Pid::new(), &runtime); /// runtime.block_on(async { /// network.listen(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?; @@ -913,6 +983,8 @@ impl Stream { /// let mut stream_a = participant_a.opened().await?; /// //Try Recv Message /// println!("{:?}", stream_a.try_recv::()?); + /// drop(network); + /// # drop(remote); /// # Ok(()) /// }) /// # } @@ -952,60 +1024,61 @@ impl core::cmp::PartialEq for Participant { } } +fn actively_wait(mut finished_receiver: oneshot::Receiver, f: F) +where + F: FnOnce(T) + Send + 'static, + T: Send + 'static, +{ + const CHANNEL_ERR: &str = "Something is wrong in internal scheduler/participant coding"; + + if let Ok(handle) = tokio::runtime::Handle::try_current() { + // When in Async Context WE MUST NOT SYNC BLOCK (as a deadlock might occur as + // other is queued behind). And we CANNOT join our Future_Handle + trace!("async context detected, defer shutdown"); + handle.spawn(async move { + match finished_receiver.await { + Ok(data) => f(data), + Err(e) => panic!("{}: {}", CHANNEL_ERR, e), + } + }); + } else { + let mut cnt = 0; + loop { + use tokio::sync::oneshot::error::TryRecvError; + match finished_receiver.try_recv() { + Ok(data) => { + f(data); + break; + }, + Err(TryRecvError::Closed) => panic!(CHANNEL_ERR), + Err(TryRecvError::Empty) => { + trace!("activly sleeping"); + cnt += 1; + if cnt > 120 { + error!("Timeout waiting for shutdown, dropping"); + break; + } + std::thread::sleep(Duration::from_millis(100) * cnt); + }, + } + } + }; +} + impl Drop for Network { #[instrument(name="network", skip(self), fields(p = %self.local_pid))] fn drop(&mut self) { - debug!("Shutting down Network"); - trace!("Shutting down Participants of Network, while we still have metrics"); - let mut finished_receiver_list = vec![]; - - if tokio::runtime::Handle::try_current().is_ok() { - error!("we have a runtime but we mustn't, DROP NETWORK from async runtime is illegal") - } - - tokio::task::block_in_place(|| { - self.runtime.block_on(async { - for (remote_pid, a2s_disconnect_s) in - self.participant_disconnect_sender.lock().await.drain() - { - match a2s_disconnect_s.lock().await.take() { - Some(a2s_disconnect_s) => { - trace!(?remote_pid, "Participants will be closed"); - let (finished_sender, finished_receiver) = oneshot::channel(); - finished_receiver_list.push((remote_pid, finished_receiver)); - a2s_disconnect_s - .send((remote_pid, (Duration::from_secs(120), finished_sender))) - .expect( - "Scheduler is closed, but nobody other should be able to \ - close it", - ); - }, - None => trace!(?remote_pid, "Participant already disconnected gracefully"), - } - } - //wait after close is requested for all - for (remote_pid, finished_receiver) in finished_receiver_list.drain(..) { - match finished_receiver.await { - Ok(Ok(())) => trace!(?remote_pid, "disconnect successful"), - Ok(Err(e)) => info!(?remote_pid, ?e, "unclean disconnect"), - Err(e) => warn!( - ?remote_pid, - ?e, - "Failed to get a message back from the scheduler, seems like the \ - network is already closed" - ), - } - } - }); - }); - trace!("Participants have shut down!"); - trace!("Shutting down Scheduler"); - self.shutdown_sender + trace!("Dropping Network"); + let (finished_sender, finished_receiver) = oneshot::channel(); + match self + .shutdown_network_s .take() .unwrap() - .send(()) - .expect("Scheduler is closed, but nobody other should be able to close it"); - debug!("Network has shut down"); + .send(finished_sender) + { + Err(e) => warn!(?e, "Runtime seems to be dropped already"), + Ok(()) => actively_wait(finished_receiver, |()| info!("Network dropped gracefully")), + }; } } @@ -1015,59 +1088,31 @@ impl Drop for Participant { fn drop(&mut self) { const SHUTDOWN_ERR: &str = "Error while dropping the participant, couldn't send all \ outgoing messages, dropping remaining"; - const CHANNEL_ERR: &str = "Something is wrong in internal scheduler/participant coding"; - use tokio::sync::oneshot::error::TryRecvError; + const SCHEDULER_ERR: &str = + "Something is wrong in internal scheduler coding or you dropped the runtime to early"; // ignore closed, as we need to send it even though we disconnected the // participant from network debug!("Shutting down Participant"); - match self - .a2s_disconnect_s - .try_lock() - .expect("Participant in use while beeing dropped") - .take() - { - None => info!("Participant already has been shutdown gracefully"), - Some(a2s_disconnect_s) => { - debug!("Disconnect from Scheduler"); - let (finished_sender, mut finished_receiver) = oneshot::channel(); - a2s_disconnect_s - .send((self.remote_pid, (Duration::from_secs(120), finished_sender))) - .expect("Something is wrong in internal scheduler coding"); - if let Ok(handle) = tokio::runtime::Handle::try_current() { - trace!("Participant drop Async"); - handle.spawn(async move { - match finished_receiver.await { - Ok(Ok(())) => info!("Participant dropped gracefully"), - Ok(Err(e)) => error!(?e, SHUTDOWN_ERR), - Err(e) => panic!("{}: {}", CHANNEL_ERR, e), - } - }); - } else { - let mut cnt = 0; - loop { - match finished_receiver.try_recv() { - Ok(Ok(())) => { - info!("Participant dropped gracefully"); - break; - }, - Ok(Err(e)) => { - error!(?e, SHUTDOWN_ERR); - break; - }, - Err(TryRecvError::Closed) => panic!(CHANNEL_ERR), - Err(TryRecvError::Empty) => { - trace!("activly sleeping"); - cnt += 1; - if cnt > 120 { - error!("Timeout waiting for participant shutdown, droping"); - break; - } - std::thread::sleep(Duration::from_millis(100) * cnt); - }, - } + match self.a2s_disconnect_s.try_lock() { + Err(e) => debug!(?e, "Participant is beeing dropped by Network right now"), + Ok(mut s) => match s.take() { + None => info!("Participant already has been shutdown gracefully"), + Some(a2s_disconnect_s) => { + debug!("Disconnect from Scheduler"); + let (finished_sender, finished_receiver) = oneshot::channel(); + match a2s_disconnect_s + .send((self.remote_pid, (Duration::from_secs(120), finished_sender))) + { + Err(e) => warn!(?e, SCHEDULER_ERR), + Ok(()) => { + actively_wait(finished_receiver, |d| match d { + Ok(()) => info!("Participant dropped gracefully"), + Err(e) => error!(?e, SHUTDOWN_ERR), + }); + }, } - }; + }, }, } } diff --git a/network/src/lib.rs b/network/src/lib.rs index e5c1545f32..e4f29f0687 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -44,7 +44,7 @@ //! use veloren_network::{Network, Pid, Promises, ProtocolAddr}; //! //! // Client -//! async fn client(runtime: Arc) -> std::result::Result<(), Box> { +//! async fn client(runtime: &Runtime) -> std::result::Result<(), Box> { //! sleep(std::time::Duration::from_secs(1)).await; // `connect` MUST be after `listen` //! let client_network = Network::new(Pid::new(), runtime); //! let server = client_network @@ -58,7 +58,7 @@ //! } //! //! // Server -//! async fn server(runtime: Arc) -> std::result::Result<(), Box> { +//! async fn server(runtime: &Runtime) -> std::result::Result<(), Box> { //! let server_network = Network::new(Pid::new(), runtime); //! server_network //! .listen(ProtocolAddr::Tcp("127.0.0.1:12345".parse().unwrap())) @@ -72,10 +72,9 @@ //! } //! //! fn main() -> std::result::Result<(), Box> { -//! let runtime = Arc::new(Runtime::new().unwrap()); +//! let runtime = Runtime::new().unwrap(); //! runtime.block_on(async { -//! let (result_c, result_s) = -//! join!(client(Arc::clone(&runtime)), server(Arc::clone(&runtime)),); +//! let (result_c, result_s) = join!(client(&runtime), server(&runtime),); //! result_c?; //! result_s?; //! Ok(()) diff --git a/network/src/message.rs b/network/src/message.rs index 8da46a4fb2..516fe06ddd 100644 --- a/network/src/message.rs +++ b/network/src/message.rs @@ -77,9 +77,9 @@ impl Message { /// /// # fn main() -> std::result::Result<(), Box> { /// // Create a Network, listen on Port `2300` and wait for a Stream to be opened, then listen on it - /// # let runtime = Arc::new(Runtime::new().unwrap()); - /// # let network = Network::new(Pid::new(), Arc::clone(&runtime)); - /// # let remote = Network::new(Pid::new(), Arc::clone(&runtime)); + /// # let runtime = Runtime::new().unwrap(); + /// # let network = Network::new(Pid::new(), &runtime); + /// # let remote = Network::new(Pid::new(), &runtime); /// # runtime.block_on(async { /// # network.listen(ProtocolAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?; /// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?; @@ -90,6 +90,8 @@ impl Message { /// //Recv Message /// let msg = stream_a.recv_raw().await?; /// println!("Msg is {}", msg.deserialize::()?); + /// drop(network); + /// # drop(remote); /// # Ok(()) /// # }) /// # } diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 77baa45c99..f711287d38 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -629,10 +629,13 @@ impl Scheduler { pid_oneshot.send(Ok(participant)).unwrap(); } else { // no one is waiting on this Participant, return in to Network - participant_channels + if participant_channels .s2a_connected_s .send(participant) - .unwrap(); + .is_err() + { + warn!("seems like Network already got closed"); + }; } } else { let pi = &participants[&pid]; diff --git a/network/tests/closing.rs b/network/tests/closing.rs index 76c684a5e4..7d6a2cb0ee 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -150,7 +150,7 @@ fn stream_send_100000_then_close_stream() { #[test] fn stream_send_100000_then_close_stream_remote() { let (_, _) = helper::setup(false, 0); - let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..100000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -163,7 +163,7 @@ fn stream_send_100000_then_close_stream_remote() { #[test] fn stream_send_100000_then_close_stream_remote2() { let (_, _) = helper::setup(false, 0); - let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..100000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -177,7 +177,7 @@ fn stream_send_100000_then_close_stream_remote2() { #[test] fn stream_send_100000_then_close_stream_remote3() { let (_, _) = helper::setup(false, 0); - let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..100000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -191,7 +191,7 @@ fn stream_send_100000_then_close_stream_remote3() { #[test] fn close_part_then_network() { let (_, _) = helper::setup(false, 0); - let (_, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + let (_r, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..1000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -204,7 +204,7 @@ fn close_part_then_network() { #[test] fn close_network_then_part() { let (_, _) = helper::setup(false, 0); - let (_, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + let (_r, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); for _ in 0..1000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -224,6 +224,61 @@ fn close_network_then_disconnect_part() { drop(n_a); assert!(r.block_on(p_a.disconnect()).is_err()); std::thread::sleep(std::time::Duration::from_millis(1000)); + drop((_n_b, _p_b)); //clean teardown +} + +#[test] +fn close_runtime_then_network() { + let (_, _) = helper::setup(false, 0); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + for _ in 0..100 { + s1_a.send("woop_PARTY_HARD_woop").unwrap(); + } + drop(r); + drop(_n_a); + std::thread::sleep(std::time::Duration::from_millis(1000)); + drop(_p_b); +} + +#[test] +fn close_runtime_then_part() { + let (_, _) = helper::setup(false, 0); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + for _ in 0..100 { + s1_a.send("woop_PARTY_HARD_woop").unwrap(); + } + drop(r); + drop(_p_a); + std::thread::sleep(std::time::Duration::from_millis(1000)); + drop(_p_b); + drop(_n_a); +} + +#[test] +fn close_network_from_async() { + let (_, _) = helper::setup(false, 0); + let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + for _ in 0..100 { + s1_a.send("woop_PARTY_HARD_woop").unwrap(); + } + r.block_on(async move { + drop(_n_a); + }); + drop(_p_b); +} + +#[test] +fn close_part_from_async() { + let (_, _) = helper::setup(false, 0); + let (r, _n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp()); + for _ in 0..100 { + s1_a.send("woop_PARTY_HARD_woop").unwrap(); + } + r.block_on(async move { + p_a.disconnect().await.unwrap(); + drop(_p_b); + }); + drop(_n_a); } #[test] @@ -289,8 +344,8 @@ fn failed_stream_open_after_remote_part_is_closed() { fn open_participant_before_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); let r = Arc::new(Runtime::new().unwrap()); - let n_a = Network::new(Pid::fake(0), Arc::clone(&r)); - let n_b = Network::new(Pid::fake(1), Arc::clone(&r)); + let n_a = Network::new(Pid::fake(0), &r); + let n_b = Network::new(Pid::fake(1), &r); let addr = tcp(); r.block_on(n_a.listen(addr.clone())).unwrap(); let p_b = r.block_on(n_b.connect(addr)).unwrap(); @@ -309,8 +364,8 @@ fn open_participant_before_remote_part_is_closed() { fn open_participant_after_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); let r = Arc::new(Runtime::new().unwrap()); - let n_a = Network::new(Pid::fake(0), Arc::clone(&r)); - let n_b = Network::new(Pid::fake(1), Arc::clone(&r)); + let n_a = Network::new(Pid::fake(0), &r); + let n_b = Network::new(Pid::fake(1), &r); let addr = tcp(); r.block_on(n_a.listen(addr.clone())).unwrap(); let p_b = r.block_on(n_b.connect(addr)).unwrap(); @@ -329,8 +384,8 @@ fn open_participant_after_remote_part_is_closed() { fn close_network_scheduler_completely() { let (_, _) = helper::setup(false, 0); let r = Arc::new(Runtime::new().unwrap()); - let n_a = Network::new(Pid::fake(0), Arc::clone(&r)); - let n_b = Network::new(Pid::fake(1), Arc::clone(&r)); + let n_a = Network::new(Pid::fake(0), &r); + let n_b = Network::new(Pid::fake(1), &r); let addr = tcp(); r.block_on(n_a.listen(addr.clone())).unwrap(); let p_b = r.block_on(n_b.connect(addr)).unwrap(); @@ -353,7 +408,7 @@ fn close_network_scheduler_completely() { #[test] fn dont_panic_on_multiply_recv_after_close() { let (_, _) = helper::setup(false, 0); - let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(11u32).unwrap(); drop(s1_a); @@ -368,7 +423,7 @@ fn dont_panic_on_multiply_recv_after_close() { #[test] fn dont_panic_on_recv_send_after_close() { let (_, _) = helper::setup(false, 0); - let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(11u32).unwrap(); drop(s1_a); @@ -381,7 +436,7 @@ fn dont_panic_on_recv_send_after_close() { #[test] fn dont_panic_on_multiple_send_after_close() { let (_, _) = helper::setup(false, 0); - let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(11u32).unwrap(); drop(s1_a); diff --git a/network/tests/helper.rs b/network/tests/helper.rs index 46def7020d..68d5cebd87 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -59,8 +59,8 @@ pub fn network_participant_stream( ) { let runtime = Arc::new(Runtime::new().unwrap()); let (n_a, p1_a, s1_a, n_b, p1_b, s1_b) = runtime.block_on(async { - let n_a = Network::new(Pid::fake(0), Arc::clone(&runtime)); - let n_b = Network::new(Pid::fake(1), Arc::clone(&runtime)); + let n_a = Network::new(Pid::fake(0), &runtime); + let n_b = Network::new(Pid::fake(1), &runtime); n_a.listen(addr.clone()).await.unwrap(); let p1_b = n_b.connect(addr).await.unwrap(); diff --git a/network/tests/integration.rs b/network/tests/integration.rs index 56d90cb4cc..93534ac082 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -27,7 +27,7 @@ fn stream_simple() { #[test] fn stream_try_recv() { let (_, _) = helper::setup(false, 0); - let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send(4242u32).unwrap(); std::thread::sleep(std::time::Duration::from_secs(1)); @@ -104,8 +104,8 @@ fn stream_simple_udp_3msg() { fn tcp_and_udp_2_connections() -> std::result::Result<(), Box> { let (_, _) = helper::setup(false, 0); let r = Arc::new(Runtime::new().unwrap()); - let network = Network::new(Pid::new(), Arc::clone(&r)); - let remote = Network::new(Pid::new(), Arc::clone(&r)); + let network = Network::new(Pid::new(), &r); + let remote = Network::new(Pid::new(), &r); r.block_on(async { let network = network; let remote = remote; @@ -131,14 +131,14 @@ fn tcp_and_udp_2_connections() -> std::result::Result<(), Box std::result::Result<(), Box> { let (_, _) = helper::setup(false, 0); let r = Arc::new(Runtime::new().unwrap()); - let network = Network::new(Pid::new(), Arc::clone(&r)); + let network = Network::new(Pid::new(), &r); let udp1 = udp(); let tcp1 = tcp(); r.block_on(network.listen(udp1.clone()))?; r.block_on(network.listen(tcp1.clone()))?; std::thread::sleep(std::time::Duration::from_millis(200)); - let network2 = Network::new(Pid::new(), Arc::clone(&r)); + let network2 = Network::new(Pid::new(), &r); let e1 = r.block_on(network2.listen(udp1)); let e2 = r.block_on(network2.listen(tcp1)); match e1 { @@ -164,8 +164,8 @@ fn api_stream_send_main() -> std::result::Result<(), Box> // Create a Network, listen on Port `1200` and wait for a Stream to be opened, // then answer `Hello World` let r = Arc::new(Runtime::new().unwrap()); - let network = Network::new(Pid::new(), Arc::clone(&r)); - let remote = Network::new(Pid::new(), Arc::clone(&r)); + let network = Network::new(Pid::new(), &r); + let remote = Network::new(Pid::new(), &r); r.block_on(async { let network = network; let remote = remote; @@ -193,8 +193,8 @@ fn api_stream_recv_main() -> std::result::Result<(), Box> // Create a Network, listen on Port `1220` and wait for a Stream to be opened, // then listen on it let r = Arc::new(Runtime::new().unwrap()); - let network = Network::new(Pid::new(), Arc::clone(&r)); - let remote = Network::new(Pid::new(), Arc::clone(&r)); + let network = Network::new(Pid::new(), &r); + let remote = Network::new(Pid::new(), &r); r.block_on(async { let network = network; let remote = remote; @@ -232,7 +232,7 @@ fn wrong_parse() { #[test] fn multiple_try_recv() { let (_, _) = helper::setup(false, 0); - let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); + let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp()); s1_a.send("asd").unwrap(); s1_a.send(11u32).unwrap(); diff --git a/server/src/lib.rs b/server/src/lib.rs index 82c69f5f7d..51f31a15cf 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -370,7 +370,7 @@ impl Server { registry_state(®istry).expect("failed to register state metrics"); registry_physics(®istry).expect("failed to register state metrics"); - let network = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), ®istry); + let network = Network::new_with_registry(Pid::new(), &runtime, ®istry); let metrics_shutdown = Arc::new(Notify::new()); let metrics_shutdown_clone = Arc::clone(&metrics_shutdown); let addr = settings.metrics_address;