Fixed zombie network worker threads

Former-commit-id: a8e74a9a8e42a0d5e1a79d87a1daf398e6294c73
This commit is contained in:
Joshua Barretto 2019-04-12 17:17:14 +01:00
parent 824fb8a6c0
commit a444826f0b

View File

@ -3,7 +3,7 @@ use std::{
net::{TcpListener, TcpStream, SocketAddr, Shutdown}, net::{TcpListener, TcpStream, SocketAddr, Shutdown},
time::{Instant, Duration}, time::{Instant, Duration},
marker::PhantomData, marker::PhantomData,
sync::mpsc, sync::{mpsc, Arc, atomic::{AtomicBool, Ordering}},
thread, thread,
collections::VecDeque, collections::VecDeque,
convert::TryFrom, convert::TryFrom,
@ -85,9 +85,10 @@ impl<S: PostMsg, R: PostMsg> PostOffice<S, R> {
} }
pub struct PostBox<S: PostMsg, R: PostMsg> { pub struct PostBox<S: PostMsg, R: PostMsg> {
send_tx: mpsc::Sender<Option<S>>, send_tx: mpsc::Sender<S>,
recv_rx: mpsc::Receiver<Result<R, Error>>, recv_rx: mpsc::Receiver<Result<R, Error>>,
worker: Option<thread::JoinHandle<()>>, worker: Option<thread::JoinHandle<()>>,
running: Arc<AtomicBool>,
error: Option<Error>, error: Option<Error>,
} }
@ -99,15 +100,19 @@ impl<S: PostMsg, R: PostMsg> PostBox<S, R> {
fn from_stream(stream: TcpStream) -> Result<Self, Error> { fn from_stream(stream: TcpStream) -> Result<Self, Error> {
stream.set_nonblocking(true)?; stream.set_nonblocking(true)?;
let running = Arc::new(AtomicBool::new(true));
let worker_running = running.clone();
let (send_tx, send_rx) = mpsc::channel(); let (send_tx, send_rx) = mpsc::channel();
let (recv_tx, recv_rx) = mpsc::channel(); let (recv_tx, recv_rx) = mpsc::channel();
let worker = thread::spawn(move || Self::worker(stream, send_rx, recv_tx)); let worker = thread::spawn(move || Self::worker(stream, send_rx, recv_tx, worker_running));
Ok(Self { Ok(Self {
send_tx, send_tx,
recv_rx, recv_rx,
worker: Some(worker), worker: Some(worker),
running,
error: None, error: None,
}) })
} }
@ -117,7 +122,7 @@ impl<S: PostMsg, R: PostMsg> PostBox<S, R> {
} }
pub fn send_message(&mut self, msg: S) { pub fn send_message(&mut self, msg: S) {
let _ = self.send_tx.send(Some(msg)); let _ = self.send_tx.send(msg);
} }
pub fn next_message(&mut self) -> Option<R> { pub fn next_message(&mut self) -> Option<R> {
@ -159,11 +164,11 @@ impl<S: PostMsg, R: PostMsg> PostBox<S, R> {
new.into_iter() new.into_iter()
} }
fn worker(mut stream: TcpStream, send_rx: mpsc::Receiver<Option<S>>, recv_tx: mpsc::Sender<Result<R, Error>>) { fn worker(mut stream: TcpStream, send_rx: mpsc::Receiver<S>, recv_tx: mpsc::Sender<Result<R, Error>>, running: Arc<AtomicBool>) {
let mut outgoing_chunks = VecDeque::new(); let mut outgoing_chunks = VecDeque::new();
let mut incoming_buf = Vec::new(); let mut incoming_buf = Vec::new();
'work: loop { 'work: while running.load(Ordering::Relaxed) {
// Get stream errors // Get stream errors
match stream.take_error() { match stream.take_error() {
Ok(Some(e)) | Err(e) => { Ok(Some(e)) | Err(e) => {
@ -176,7 +181,7 @@ impl<S: PostMsg, R: PostMsg> PostBox<S, R> {
// Try getting messages from the send channel // Try getting messages from the send channel
for _ in 0..10 { for _ in 0..10 {
match send_rx.try_recv() { match send_rx.try_recv() {
Ok(Some(send_msg)) => { Ok(send_msg) => {
// Serialize message // Serialize message
let mut msg_bytes = bincode::serialize(&send_msg).unwrap(); let mut msg_bytes = bincode::serialize(&send_msg).unwrap();
@ -194,8 +199,6 @@ impl<S: PostMsg, R: PostMsg> PostBox<S, R> {
.map(|chunk| chunk.to_vec()) .map(|chunk| chunk.to_vec())
.for_each(|chunk| outgoing_chunks.push_back(chunk)) .for_each(|chunk| outgoing_chunks.push_back(chunk))
}, },
// Shut down worker
Ok(None) => break 'work,
Err(mpsc::TryRecvError::Empty) => break, Err(mpsc::TryRecvError::Empty) => break,
// Worker error // Worker error
Err(e) => { Err(e) => {
@ -276,9 +279,8 @@ impl<S: PostMsg, R: PostMsg> PostBox<S, R> {
impl<S: PostMsg, R: PostMsg> Drop for PostBox<S, R> { impl<S: PostMsg, R: PostMsg> Drop for PostBox<S, R> {
fn drop(&mut self) { fn drop(&mut self) {
let _ = self.send_tx.send(None); self.running.store(false, Ordering::Relaxed);
// TODO: Cleanly join! self.worker.take().map(|handle| handle.join());
//self.worker.take().map(|handle| handle.join());
} }
} }