[Common] Switch mutexes and channels.

This commit is contained in:
Acrimon 2019-08-16 00:19:54 +02:00
parent 593deb828b
commit 42e065d425
4 changed files with 21 additions and 15 deletions

2
Cargo.lock generated
View File

@ -2945,6 +2945,7 @@ name = "veloren-common"
version = "0.3.0"
dependencies = [
"bincode 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"dot_vox 4.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"find_folder 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"hashbrown 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2954,6 +2955,7 @@ dependencies = [
"lz4-compress 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)",
"mio-extras 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.98 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -25,3 +25,5 @@ lazy_static = "1.3.0"
lz4-compress = "0.1.1"
hashbrown = { version = "0.5.0", features = ["serde", "nightly"] }
find_folder = "0.3.0"
parking_lot = "0.9.0"
crossbeam = "0.7.2"

View File

@ -1,5 +1,6 @@
use parking_lot::Mutex;
use specs::Entity as EcsEntity;
use std::{collections::VecDeque, ops::DerefMut, sync::Mutex};
use std::{collections::VecDeque, ops::DerefMut};
use vek::*;
pub enum Event {
@ -21,11 +22,11 @@ impl EventBus {
}
pub fn emit(&self, event: Event) {
self.queue.lock().unwrap().push_front(event);
self.queue.lock().push_front(event);
}
pub fn recv_all(&self) -> impl ExactSizeIterator<Item = Event> {
std::mem::replace(self.queue.lock().unwrap().deref_mut(), VecDeque::new()).into_iter()
std::mem::replace(self.queue.lock().deref_mut(), VecDeque::new()).into_iter()
}
}
@ -42,6 +43,6 @@ impl<'a> Emitter<'a> {
impl<'a> Drop for Emitter<'a> {
fn drop(&mut self) {
self.bus.queue.lock().unwrap().append(&mut self.events);
self.bus.queue.lock().append(&mut self.events);
}
}

View File

@ -1,3 +1,4 @@
use crossbeam::channel;
use log::warn;
use serde::{de::DeserializeOwned, Serialize};
use std::{
@ -8,7 +9,7 @@ use std::{
net::{Shutdown, SocketAddr, TcpListener, TcpStream},
sync::{
atomic::{AtomicBool, Ordering},
mpsc, Arc,
Arc,
},
thread,
time::Duration,
@ -34,8 +35,8 @@ impl From<bincode::Error> for Error {
}
}
impl From<mpsc::TryRecvError> for Error {
fn from(_error: mpsc::TryRecvError) -> Self {
impl From<channel::TryRecvError> for Error {
fn from(_error: channel::TryRecvError) -> Self {
Error::ChannelFailure
}
}
@ -90,8 +91,8 @@ impl<S: PostMsg, R: PostMsg> PostOffice<S, R> {
}
pub struct PostBox<S: PostMsg, R: PostMsg> {
send_tx: mpsc::Sender<S>,
recv_rx: mpsc::Receiver<Result<R, Error>>,
send_tx: channel::Sender<S>,
recv_rx: channel::Receiver<Result<R, Error>>,
worker: Option<thread::JoinHandle<()>>,
running: Arc<AtomicBool>,
error: Option<Error>,
@ -108,8 +109,8 @@ impl<S: PostMsg, R: PostMsg> PostBox<S, R> {
let running = Arc::new(AtomicBool::new(true));
let worker_running = running.clone();
let (send_tx, send_rx) = mpsc::channel();
let (recv_tx, recv_rx) = mpsc::channel();
let (send_tx, send_rx) = channel::unbounded();
let (recv_tx, recv_rx) = channel::unbounded();
let worker = thread::spawn(move || Self::worker(stream, send_rx, recv_tx, worker_running));
@ -154,7 +155,7 @@ impl<S: PostMsg, R: PostMsg> PostBox<S, R> {
loop {
match self.recv_rx.try_recv() {
Ok(Ok(msg)) => new.push(msg),
Err(mpsc::TryRecvError::Empty) => break,
Err(channel::TryRecvError::Empty) => break,
Err(e) => {
self.error = Some(e.into());
break;
@ -171,8 +172,8 @@ impl<S: PostMsg, R: PostMsg> PostBox<S, R> {
fn worker(
mut stream: TcpStream,
send_rx: mpsc::Receiver<S>,
recv_tx: mpsc::Sender<Result<R, Error>>,
send_rx: channel::Receiver<S>,
recv_tx: channel::Sender<Result<R, Error>>,
running: Arc<AtomicBool>,
) {
let mut outgoing_chunks = VecDeque::new();
@ -215,7 +216,7 @@ impl<S: PostMsg, R: PostMsg> PostBox<S, R> {
.map(|chunk| chunk.to_vec())
.for_each(|chunk| outgoing_chunks.push_back(chunk))
}
Err(mpsc::TryRecvError::Empty) => break,
Err(channel::TryRecvError::Empty) => break,
// Worker error
Err(e) => {
let _ = recv_tx.send(Err(e.into()));