From 13bf418794a282082a6df1eb03251c6c96b1fc6a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= <marcel.cochem@googlemail.com>
Date: Tue, 19 Jan 2021 09:48:33 +0100
Subject: [PATCH] fix most unittests (not all) by a) dropping
 network/participant BEFORE runtime and by transfering a expect into a warn!
 in the protocol

---
 network/src/api.rs           | 27 +++++++++++++++------------
 network/src/participant.rs   |  3 ++-
 network/src/protocols.rs     | 28 ++++++++++++++--------------
 network/tests/closing.rs     | 17 +++++++++++++----
 network/tests/integration.rs | 14 ++++++++++++++
 5 files changed, 58 insertions(+), 31 deletions(-)

diff --git a/network/src/api.rs b/network/src/api.rs
index 1b349f3248..ef6fb113db 100644
--- a/network/src/api.rs
+++ b/network/src/api.rs
@@ -167,7 +167,10 @@ impl Network {
     /// * `participant_id` - provide it by calling [`Pid::new()`], usually you
     ///   don't want to reuse a Pid for 2 `Networks`
     /// * `runtime` - provide a tokio::Runtime, it's used to internally spawn
-    ///   tasks. It is necessary to clean up in the non-async `Drop`.
+    ///   tasks. It is necessary to clean up in the non-async `Drop`. **All**
+    ///   network related components **must** be dropped before the runtime is
+    ///   stopped. dropping the runtime while a shutdown is still in progress
+    ///   leaves the network in a bad state which might cause a panic!
     ///
     /// # Result
     /// * `Self` - returns a `Network` which can be `Send` to multiple areas of
@@ -245,11 +248,10 @@ impl Network {
             );
         runtime.spawn(async move {
             trace!(?p, "Starting scheduler in own thread");
-            let _handle = tokio::spawn(
-                scheduler
-                    .run()
-                    .instrument(tracing::info_span!("scheduler", ?p)),
-            );
+            scheduler
+                .run()
+                .instrument(tracing::info_span!("scheduler", ?p))
+                .await;
             trace!(?p, "Stopping scheduler and his own thread");
         });
         Self {
@@ -985,11 +987,7 @@ impl Drop for Network {
         });
         trace!(?pid, "Participants have shut down!");
         trace!(?pid, "Shutting down Scheduler");
-        self.shutdown_sender
-            .take()
-            .unwrap()
-            .send(())
-            .expect("Scheduler is closed, but nobody other should be able to close it");
+        self.shutdown_sender.take().unwrap().send(()).expect("Scheduler is closed, but nobody other should be able to close it");
         debug!(?pid, "Network has shut down");
     }
 }
@@ -1001,7 +999,12 @@ impl Drop for Participant {
         let pid = self.remote_pid;
         debug!(?pid, "Shutting down Participant");
 
-        match self.runtime.block_on(self.a2s_disconnect_s.lock()).take() {
+        match self
+            .a2s_disconnect_s
+            .try_lock()
+            .expect("Participant in use while beeing dropped")
+            .take()
+        {
             None => trace!(
                 ?pid,
                 "Participant has been shutdown cleanly, no further waiting is required!"
diff --git a/network/src/participant.rs b/network/src/participant.rs
index 764f407cdf..6986a70e8f 100644
--- a/network/src/participant.rs
+++ b/network/src/participant.rs
@@ -204,6 +204,7 @@ impl BParticipant {
         // wait for more messages
         self.running_mgr.fetch_add(1, Ordering::Relaxed);
         let mut b2b_prios_flushed_s = None; //closing up
+        let mut interval = tokio::time::interval(Self::TICK_TIME);
         trace!("Start send_mgr");
         #[cfg(feature = "metrics")]
         let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone());
@@ -225,7 +226,7 @@ impl BParticipant {
             b2s_prio_statistic_s
                 .send((self.remote_pid, len as u64, /*  */ 0))
                 .unwrap();
-            tokio::time::sleep(Self::TICK_TIME).await;
+            interval.tick().await;
             i += 1;
             if i.rem_euclid(1000) == 0 {
                 trace!("Did 1000 ticks");
diff --git a/network/src/protocols.rs b/network/src/protocols.rs
index b92ef27eee..a18c1e1cbd 100644
--- a/network/src/protocols.rs
+++ b/network/src/protocols.rs
@@ -172,20 +172,20 @@ impl TcpProtocol {
             match Self::read_frame(&mut *read_stream, &mut end_r).await {
                 Ok(frame) => {
                     #[cfg(feature = "metrics")]
-                    {
-                        metrics_cache.with_label_values(&frame).inc();
-                        if let Frame::Data {
-                            mid: _,
-                            start: _,
-                            ref data,
-                        } = frame
                         {
-                            throughput_cache.inc_by(data.len() as u64);
+                            metrics_cache.with_label_values(&frame).inc();
+                            if let Frame::Data {
+                                mid: _,
+                                start: _,
+                                ref data,
+                            } = frame
+                            {
+                                throughput_cache.inc_by(data.len() as u64);
+                            }
                         }
+                    if let Err(e) = w2c_cid_frame_s.send((cid, Ok(frame))) {
+                        warn!(?e, "Channel or Participant seems no longer to exist");
                     }
-                    w2c_cid_frame_s
-                        .send((cid, Ok(frame)))
-                        .expect("Channel or Participant seems no longer to exist");
                 },
                 Err(e_option) => {
                     if let Some(e) = e_option {
@@ -193,9 +193,9 @@ impl TcpProtocol {
                         //w2c_cid_frame_s is shared, dropping it wouldn't notify the receiver as
                         // every channel is holding a sender! thats why Ne
                         // need a explicit STOP here
-                        w2c_cid_frame_s
-                            .send((cid, Err(())))
-                            .expect("Channel or Participant seems no longer to exist");
+                        if let Err(e) = w2c_cid_frame_s.send((cid, Err(()))) {
+                            warn!(?e, "Channel or Participant seems no longer to exist");
+                        }
                     }
                     //None is clean shutdown
                     break;
diff --git a/network/tests/closing.rs b/network/tests/closing.rs
index e3abb25533..b0e8e180a9 100644
--- a/network/tests/closing.rs
+++ b/network/tests/closing.rs
@@ -66,9 +66,8 @@ fn close_stream() {
     );
 }
 
-///THIS is actually a bug which currently luckily doesn't trigger, but with new
-/// async-std WE must make sure, if a stream is `drop`ed inside a `block_on`,
-/// that no panic is thrown.
+///WE must NOT create runtimes inside a Runtime, this check needs to verify
+/// that we dont panic there
 #[test]
 fn close_streams_in_block_on() {
     let (_, _) = helper::setup(false, 0);
@@ -81,6 +80,7 @@ fn close_streams_in_block_on() {
         assert_eq!(s1_b.recv().await, Ok("ping".to_string()));
         drop(s1_a);
     });
+    drop((_n_a, _p_a, _n_b, _p_b)); //clean teardown
 }
 
 #[test]
@@ -157,6 +157,7 @@ fn stream_send_100000_then_close_stream_remote() {
     drop(s1_a);
     drop(_s1_b);
     //no receiving
+    drop((_n_a, _p_a, _n_b, _p_b)); //clean teardown
 }
 
 #[test]
@@ -170,6 +171,7 @@ fn stream_send_100000_then_close_stream_remote2() {
     std::thread::sleep(std::time::Duration::from_millis(1000));
     drop(s1_a);
     //no receiving
+    drop((_n_a, _p_a, _n_b, _p_b)); //clean teardown
 }
 
 #[test]
@@ -183,6 +185,7 @@ fn stream_send_100000_then_close_stream_remote3() {
     std::thread::sleep(std::time::Duration::from_millis(1000));
     drop(s1_a);
     //no receiving
+    drop((_n_a, _p_a, _n_b, _p_b)); //clean teardown
 }
 
 #[test]
@@ -233,6 +236,7 @@ fn opened_stream_before_remote_part_is_closed() {
     drop(p_a);
     std::thread::sleep(std::time::Duration::from_millis(1000));
     assert_eq!(r.block_on(s2_b.recv()), Ok("HelloWorld".to_string()));
+    drop((_n_a, _n_b, p_b)); //clean teardown
 }
 
 #[test]
@@ -249,6 +253,7 @@ fn opened_stream_after_remote_part_is_closed() {
         r.block_on(p_b.opened()).unwrap_err(),
         ParticipantError::ParticipantDisconnected
     );
+    drop((_n_a, _n_b, p_b)); //clean teardown
 }
 
 #[test]
@@ -265,6 +270,7 @@ fn open_stream_after_remote_part_is_closed() {
         r.block_on(p_b.open(20, Promises::empty())).unwrap_err(),
         ParticipantError::ParticipantDisconnected
     );
+    drop((_n_a, _n_b, p_b)); //clean teardown
 }
 
 #[test]
@@ -272,11 +278,11 @@ fn failed_stream_open_after_remote_part_is_closed() {
     let (_, _) = helper::setup(false, 0);
     let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp());
     drop(p_a);
-    std::thread::sleep(std::time::Duration::from_millis(1000));
     assert_eq!(
         r.block_on(p_b.opened()).unwrap_err(),
         ParticipantError::ParticipantDisconnected
     );
+    drop((_n_a, _n_b, p_b)); //clean teardown
 }
 
 #[test]
@@ -337,6 +343,9 @@ fn close_network_scheduler_completely() {
     drop(n_a);
     drop(n_b);
     std::thread::sleep(std::time::Duration::from_millis(1000));
+
+    drop(p_b);
+    drop(p_a);
     let runtime = Arc::try_unwrap(r).expect("runtime is not alone, there still exist a reference");
     runtime.shutdown_timeout(std::time::Duration::from_secs(300));
 }
diff --git a/network/tests/integration.rs b/network/tests/integration.rs
index b83f50b570..b78619d65d 100644
--- a/network/tests/integration.rs
+++ b/network/tests/integration.rs
@@ -21,6 +21,7 @@ fn stream_simple() {
 
     s1_a.send("Hello World").unwrap();
     assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
+    drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
 }
 
 #[test]
@@ -31,6 +32,7 @@ fn stream_try_recv() {
     s1_a.send(4242u32).unwrap();
     std::thread::sleep(std::time::Duration::from_secs(1));
     assert_eq!(s1_b.try_recv(), Ok(Some(4242u32)));
+    drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
 }
 
 #[test]
@@ -44,6 +46,7 @@ fn stream_simple_3msg() {
     assert_eq!(r.block_on(s1_b.recv()), Ok(1337));
     s1_a.send("3rdMessage").unwrap();
     assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
+    drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
 }
 
 #[test]
@@ -53,6 +56,7 @@ fn stream_simple_udp() {
 
     s1_a.send("Hello World").unwrap();
     assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string()));
+    drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
 }
 
 #[test]
@@ -66,6 +70,7 @@ fn stream_simple_udp_3msg() {
     assert_eq!(r.block_on(s1_b.recv()), Ok(1337));
     s1_a.send("3rdMessage").unwrap();
     assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
+    drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
 }
 
 #[test]
@@ -76,6 +81,8 @@ fn tcp_and_udp_2_connections() -> std::result::Result<(), Box<dyn std::error::Er
     let network = Network::new(Pid::new(), Arc::clone(&r));
     let remote = Network::new(Pid::new(), Arc::clone(&r));
     r.block_on(async {
+        let network = network;
+        let remote = remote;
         remote
             .listen(ProtocolAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
             .await?;
@@ -115,6 +122,7 @@ fn failed_listen_on_used_ports() -> std::result::Result<(), Box<dyn std::error::
         Err(NetworkError::ListenFailed(e)) if e.kind() == ErrorKind::AddrInUse => (),
         _ => panic!(),
     };
+    drop((network, network2)); //clean teardown
     Ok(())
 }
 
@@ -132,6 +140,8 @@ fn api_stream_send_main() -> std::result::Result<(), Box<dyn std::error::Error>>
     let network = Network::new(Pid::new(), Arc::clone(&r));
     let remote = Network::new(Pid::new(), Arc::clone(&r));
     r.block_on(async {
+        let network = network;
+        let remote = remote;
         network
             .listen(ProtocolAddr::Tcp("127.0.0.1:1200".parse().unwrap()))
             .await?;
@@ -159,6 +169,8 @@ fn api_stream_recv_main() -> std::result::Result<(), Box<dyn std::error::Error>>
     let network = Network::new(Pid::new(), Arc::clone(&r));
     let remote = Network::new(Pid::new(), Arc::clone(&r));
     r.block_on(async {
+        let network = network;
+        let remote = remote;
         network
             .listen(ProtocolAddr::Tcp("127.0.0.1:1220".parse().unwrap()))
             .await?;
@@ -187,6 +199,7 @@ fn wrong_parse() {
         Err(StreamError::Deserialize(_)) => (),
         _ => panic!("this should fail, but it doesnt!"),
     }
+    drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
 }
 
 #[test]
@@ -204,4 +217,5 @@ fn multiple_try_recv() {
     drop(s1_a);
     std::thread::sleep(std::time::Duration::from_secs(1));
     assert_eq!(s1_b.try_recv::<String>(), Err(StreamError::StreamClosed));
+    drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown
 }