diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index b7792f811f..a8a0a0a889 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -334,6 +334,13 @@ impl Scheduler { ); }; } + debug!("shutting down protocol listeners"); + for (addr, end_channel_sender) in self.channel_listener.write().await.drain() { + trace!(?addr, "stopping listen on protocol"); + if let Err(e) = end_channel_sender.send(()) { + warn!(?addr, ?e, "listener crashed/disconnected already"); + } + } debug!("Scheduler shut down gracefully"); //removing the possibility to create new participants, needed to close down // some mgr: diff --git a/network/tests/closing.rs b/network/tests/closing.rs index 9c26a3ac43..0a9a51ed93 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -322,3 +322,25 @@ fn open_participant_after_remote_part_is_closed() { let mut s1_a = block_on(p_a.opened()).unwrap(); assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string())); } + +#[test] +fn close_network_scheduler_completely() { + let (n_a, f) = Network::new(Pid::fake(0)); + let ha = std::thread::spawn(f); + let (n_b, f) = Network::new(Pid::fake(1)); + let hb = std::thread::spawn(f); + let addr = tcp(); + block_on(n_a.listen(addr.clone())).unwrap(); + let p_b = block_on(n_b.connect(addr)).unwrap(); + let mut s1_b = block_on(p_b.open(10, PROMISES_NONE)).unwrap(); + s1_b.send("HelloWorld").unwrap(); + + let p_a = block_on(n_a.connected()).unwrap(); + let mut s1_a = block_on(p_a.opened()).unwrap(); + assert_eq!(block_on(s1_a.recv()), Ok("HelloWorld".to_string())); + drop(n_a); + drop(n_b); + std::thread::sleep(std::time::Duration::from_millis(1000)); + ha.join().unwrap(); + hb.join().unwrap(); +} diff --git a/network/tests/helper.rs b/network/tests/helper.rs index 91987cbbc5..02edf8bbff 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -10,10 +10,7 @@ use tracing_subscriber::EnvFilter; use veloren_network::{Network, Participant, Pid, ProtocolAddr, Stream, PROMISES_NONE}; #[allow(dead_code)] -pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) { - if tracing { - sleep += 1000 - } +pub fn setup(tracing: bool, sleep: u64) -> (u64, u64) { if sleep > 0 { thread::sleep(Duration::from_millis(sleep)); }