diff --git a/network/src/api.rs b/network/src/api.rs index 1b349f3248..ef6fb113db 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -167,7 +167,10 @@ impl Network { /// * `participant_id` - provide it by calling [`Pid::new()`], usually you /// don't want to reuse a Pid for 2 `Networks` /// * `runtime` - provide a tokio::Runtime, it's used to internally spawn - /// tasks. It is necessary to clean up in the non-async `Drop`. + /// tasks. It is necessary to clean up in the non-async `Drop`. **All** + /// network related components **must** be dropped before the runtime is + /// stopped. dropping the runtime while a shutdown is still in progress + /// leaves the network in a bad state which might cause a panic! /// /// # Result /// * `Self` - returns a `Network` which can be `Send` to multiple areas of @@ -245,11 +248,10 @@ impl Network { ); runtime.spawn(async move { trace!(?p, "Starting scheduler in own thread"); - let _handle = tokio::spawn( - scheduler - .run() - .instrument(tracing::info_span!("scheduler", ?p)), - ); + scheduler + .run() + .instrument(tracing::info_span!("scheduler", ?p)) + .await; trace!(?p, "Stopping scheduler and his own thread"); }); Self { @@ -985,11 +987,7 @@ impl Drop for Network { }); trace!(?pid, "Participants have shut down!"); trace!(?pid, "Shutting down Scheduler"); - self.shutdown_sender - .take() - .unwrap() - .send(()) - .expect("Scheduler is closed, but nobody other should be able to close it"); + self.shutdown_sender.take().unwrap().send(()).expect("Scheduler is closed, but nobody other should be able to close it"); debug!(?pid, "Network has shut down"); } } @@ -1001,7 +999,12 @@ impl Drop for Participant { let pid = self.remote_pid; debug!(?pid, "Shutting down Participant"); - match self.runtime.block_on(self.a2s_disconnect_s.lock()).take() { + match self + .a2s_disconnect_s + .try_lock() + .expect("Participant in use while beeing dropped") + .take() + { None => trace!( ?pid, "Participant has been shutdown cleanly, no further waiting is required!" diff --git a/network/src/participant.rs b/network/src/participant.rs index 764f407cdf..6986a70e8f 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -204,6 +204,7 @@ impl BParticipant { // wait for more messages self.running_mgr.fetch_add(1, Ordering::Relaxed); let mut b2b_prios_flushed_s = None; //closing up + let mut interval = tokio::time::interval(Self::TICK_TIME); trace!("Start send_mgr"); #[cfg(feature = "metrics")] let mut send_cache = MultiCidFrameCache::new(self.metrics.frames_out_total.clone()); @@ -225,7 +226,7 @@ impl BParticipant { b2s_prio_statistic_s .send((self.remote_pid, len as u64, /* */ 0)) .unwrap(); - tokio::time::sleep(Self::TICK_TIME).await; + interval.tick().await; i += 1; if i.rem_euclid(1000) == 0 { trace!("Did 1000 ticks"); diff --git a/network/src/protocols.rs b/network/src/protocols.rs index b92ef27eee..a18c1e1cbd 100644 --- a/network/src/protocols.rs +++ b/network/src/protocols.rs @@ -172,20 +172,20 @@ impl TcpProtocol { match Self::read_frame(&mut *read_stream, &mut end_r).await { Ok(frame) => { #[cfg(feature = "metrics")] - { - metrics_cache.with_label_values(&frame).inc(); - if let Frame::Data { - mid: _, - start: _, - ref data, - } = frame { - throughput_cache.inc_by(data.len() as u64); + metrics_cache.with_label_values(&frame).inc(); + if let Frame::Data { + mid: _, + start: _, + ref data, + } = frame + { + throughput_cache.inc_by(data.len() as u64); + } } + if let Err(e) = w2c_cid_frame_s.send((cid, Ok(frame))) { + warn!(?e, "Channel or Participant seems no longer to exist"); } - w2c_cid_frame_s - .send((cid, Ok(frame))) - .expect("Channel or Participant seems no longer to exist"); }, Err(e_option) => { if let Some(e) = e_option { @@ -193,9 +193,9 @@ impl TcpProtocol { //w2c_cid_frame_s is shared, dropping it wouldn't notify the receiver as // every channel is holding a sender! thats why Ne // need a explicit STOP here - w2c_cid_frame_s - .send((cid, Err(()))) - .expect("Channel or Participant seems no longer to exist"); + if let Err(e) = w2c_cid_frame_s.send((cid, Err(()))) { + warn!(?e, "Channel or Participant seems no longer to exist"); + } } //None is clean shutdown break; diff --git a/network/tests/closing.rs b/network/tests/closing.rs index e3abb25533..b0e8e180a9 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -66,9 +66,8 @@ fn close_stream() { ); } -///THIS is actually a bug which currently luckily doesn't trigger, but with new -/// async-std WE must make sure, if a stream is `drop`ed inside a `block_on`, -/// that no panic is thrown. +///WE must NOT create runtimes inside a Runtime, this check needs to verify +/// that we dont panic there #[test] fn close_streams_in_block_on() { let (_, _) = helper::setup(false, 0); @@ -81,6 +80,7 @@ fn close_streams_in_block_on() { assert_eq!(s1_b.recv().await, Ok("ping".to_string())); drop(s1_a); }); + drop((_n_a, _p_a, _n_b, _p_b)); //clean teardown } #[test] @@ -157,6 +157,7 @@ fn stream_send_100000_then_close_stream_remote() { drop(s1_a); drop(_s1_b); //no receiving + drop((_n_a, _p_a, _n_b, _p_b)); //clean teardown } #[test] @@ -170,6 +171,7 @@ fn stream_send_100000_then_close_stream_remote2() { std::thread::sleep(std::time::Duration::from_millis(1000)); drop(s1_a); //no receiving + drop((_n_a, _p_a, _n_b, _p_b)); //clean teardown } #[test] @@ -183,6 +185,7 @@ fn stream_send_100000_then_close_stream_remote3() { std::thread::sleep(std::time::Duration::from_millis(1000)); drop(s1_a); //no receiving + drop((_n_a, _p_a, _n_b, _p_b)); //clean teardown } #[test] @@ -233,6 +236,7 @@ fn opened_stream_before_remote_part_is_closed() { drop(p_a); std::thread::sleep(std::time::Duration::from_millis(1000)); assert_eq!(r.block_on(s2_b.recv()), Ok("HelloWorld".to_string())); + drop((_n_a, _n_b, p_b)); //clean teardown } #[test] @@ -249,6 +253,7 @@ fn opened_stream_after_remote_part_is_closed() { r.block_on(p_b.opened()).unwrap_err(), ParticipantError::ParticipantDisconnected ); + drop((_n_a, _n_b, p_b)); //clean teardown } #[test] @@ -265,6 +270,7 @@ fn open_stream_after_remote_part_is_closed() { r.block_on(p_b.open(20, Promises::empty())).unwrap_err(), ParticipantError::ParticipantDisconnected ); + drop((_n_a, _n_b, p_b)); //clean teardown } #[test] @@ -272,11 +278,11 @@ fn failed_stream_open_after_remote_part_is_closed() { let (_, _) = helper::setup(false, 0); let (r, _n_a, p_a, _, _n_b, p_b, _) = network_participant_stream(tcp()); drop(p_a); - std::thread::sleep(std::time::Duration::from_millis(1000)); assert_eq!( r.block_on(p_b.opened()).unwrap_err(), ParticipantError::ParticipantDisconnected ); + drop((_n_a, _n_b, p_b)); //clean teardown } #[test] @@ -337,6 +343,9 @@ fn close_network_scheduler_completely() { drop(n_a); drop(n_b); std::thread::sleep(std::time::Duration::from_millis(1000)); + + drop(p_b); + drop(p_a); let runtime = Arc::try_unwrap(r).expect("runtime is not alone, there still exist a reference"); runtime.shutdown_timeout(std::time::Duration::from_secs(300)); } diff --git a/network/tests/integration.rs b/network/tests/integration.rs index b83f50b570..b78619d65d 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -21,6 +21,7 @@ fn stream_simple() { s1_a.send("Hello World").unwrap(); assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); + drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown } #[test] @@ -31,6 +32,7 @@ fn stream_try_recv() { s1_a.send(4242u32).unwrap(); std::thread::sleep(std::time::Duration::from_secs(1)); assert_eq!(s1_b.try_recv(), Ok(Some(4242u32))); + drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown } #[test] @@ -44,6 +46,7 @@ fn stream_simple_3msg() { assert_eq!(r.block_on(s1_b.recv()), Ok(1337)); s1_a.send("3rdMessage").unwrap(); assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string())); + drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown } #[test] @@ -53,6 +56,7 @@ fn stream_simple_udp() { s1_a.send("Hello World").unwrap(); assert_eq!(r.block_on(s1_b.recv()), Ok("Hello World".to_string())); + drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown } #[test] @@ -66,6 +70,7 @@ fn stream_simple_udp_3msg() { assert_eq!(r.block_on(s1_b.recv()), Ok(1337)); s1_a.send("3rdMessage").unwrap(); assert_eq!(r.block_on(s1_b.recv()), Ok("3rdMessage".to_string())); + drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown } #[test] @@ -76,6 +81,8 @@ fn tcp_and_udp_2_connections() -> std::result::Result<(), Box std::result::Result<(), Box (), _ => panic!(), }; + drop((network, network2)); //clean teardown Ok(()) } @@ -132,6 +140,8 @@ fn api_stream_send_main() -> std::result::Result<(), Box> let network = Network::new(Pid::new(), Arc::clone(&r)); let remote = Network::new(Pid::new(), Arc::clone(&r)); r.block_on(async { + let network = network; + let remote = remote; network .listen(ProtocolAddr::Tcp("127.0.0.1:1200".parse().unwrap())) .await?; @@ -159,6 +169,8 @@ fn api_stream_recv_main() -> std::result::Result<(), Box> let network = Network::new(Pid::new(), Arc::clone(&r)); let remote = Network::new(Pid::new(), Arc::clone(&r)); r.block_on(async { + let network = network; + let remote = remote; network .listen(ProtocolAddr::Tcp("127.0.0.1:1220".parse().unwrap())) .await?; @@ -187,6 +199,7 @@ fn wrong_parse() { Err(StreamError::Deserialize(_)) => (), _ => panic!("this should fail, but it doesnt!"), } + drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown } #[test] @@ -204,4 +217,5 @@ fn multiple_try_recv() { drop(s1_a); std::thread::sleep(std::time::Duration::from_secs(1)); assert_eq!(s1_b.try_recv::(), Err(StreamError::StreamClosed)); + drop((_n_a, _n_b, _p_a, _p_b)); //clean teardown }