From 0f5afd1f9240b447e2f9cfeaa4697af0a5f1a56a Mon Sep 17 00:00:00 2001 From: Joshua Barretto Date: Fri, 12 Apr 2019 17:17:14 +0100 Subject: [PATCH] Fixed zombie network worker threads Former-commit-id: a8e74a9a8e42a0d5e1a79d87a1daf398e6294c73 --- common/src/net/post2.rs | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/common/src/net/post2.rs b/common/src/net/post2.rs index 62f95078eb..25db57288b 100644 --- a/common/src/net/post2.rs +++ b/common/src/net/post2.rs @@ -3,7 +3,7 @@ use std::{ net::{TcpListener, TcpStream, SocketAddr, Shutdown}, time::{Instant, Duration}, marker::PhantomData, - sync::mpsc, + sync::{mpsc, Arc, atomic::{AtomicBool, Ordering}}, thread, collections::VecDeque, convert::TryFrom, @@ -85,9 +85,10 @@ impl PostOffice { } pub struct PostBox { - send_tx: mpsc::Sender>, + send_tx: mpsc::Sender, recv_rx: mpsc::Receiver>, worker: Option>, + running: Arc, error: Option, } @@ -99,15 +100,19 @@ impl PostBox { fn from_stream(stream: TcpStream) -> Result { stream.set_nonblocking(true)?; + let running = Arc::new(AtomicBool::new(true)); + let worker_running = running.clone(); + let (send_tx, send_rx) = mpsc::channel(); let (recv_tx, recv_rx) = mpsc::channel(); - let worker = thread::spawn(move || Self::worker(stream, send_rx, recv_tx)); + let worker = thread::spawn(move || Self::worker(stream, send_rx, recv_tx, worker_running)); Ok(Self { send_tx, recv_rx, worker: Some(worker), + running, error: None, }) } @@ -117,7 +122,7 @@ impl PostBox { } pub fn send_message(&mut self, msg: S) { - let _ = self.send_tx.send(Some(msg)); + let _ = self.send_tx.send(msg); } pub fn next_message(&mut self) -> Option { @@ -159,11 +164,11 @@ impl PostBox { new.into_iter() } - fn worker(mut stream: TcpStream, send_rx: mpsc::Receiver>, recv_tx: mpsc::Sender>) { + fn worker(mut stream: TcpStream, send_rx: mpsc::Receiver, recv_tx: mpsc::Sender>, running: Arc) { let mut outgoing_chunks = VecDeque::new(); let mut incoming_buf = Vec::new(); - 'work: loop { + 'work: while running.load(Ordering::Relaxed) { // Get stream errors match stream.take_error() { Ok(Some(e)) | Err(e) => { @@ -176,7 +181,7 @@ impl PostBox { // Try getting messages from the send channel for _ in 0..10 { match send_rx.try_recv() { - Ok(Some(send_msg)) => { + Ok(send_msg) => { // Serialize message let mut msg_bytes = bincode::serialize(&send_msg).unwrap(); @@ -194,8 +199,6 @@ impl PostBox { .map(|chunk| chunk.to_vec()) .for_each(|chunk| outgoing_chunks.push_back(chunk)) }, - // Shut down worker - Ok(None) => break 'work, Err(mpsc::TryRecvError::Empty) => break, // Worker error Err(e) => { @@ -276,9 +279,8 @@ impl PostBox { impl Drop for PostBox { fn drop(&mut self) { - let _ = self.send_tx.send(None); - // TODO: Cleanly join! - //self.worker.take().map(|handle| handle.join()); + self.running.store(false, Ordering::Relaxed); + self.worker.take().map(|handle| handle.join()); } }