Merge branch 'network-lockless' into 'master'

Network lockless

See merge request veloren/veloren!1153
This commit is contained in:
Marcel 2020-07-05 09:17:29 +00:00
commit 2a7a8b05e6
10 changed files with 59 additions and 30 deletions

View File

@ -51,6 +51,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed unable to use ability; Secondary and ability3 (fire rod) will now automatically wield - Fixed unable to use ability; Secondary and ability3 (fire rod) will now automatically wield
- Gliding is now a toggle that can be triggered from the ground - Gliding is now a toggle that can be triggered from the ground
- Replaced `log` with `tracing` in all crates - Replaced `log` with `tracing` in all crates
- Switch to a new network backend that will allow several improvements in the future
- Connection screen fails after 4 minutes if it can't connect to the server instead of 80 minutes
### Removed ### Removed

View File

@ -795,6 +795,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"async-std", "async-std",
"bincode", "bincode",
"crossbeam-channel",
"futures", "futures",
"lazy_static", "lazy_static",
"prometheus", "prometheus",

View File

@ -886,6 +886,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"async-std", "async-std",
"bincode", "bincode",
"crossbeam-channel",
"futures", "futures",
"lazy_static", "lazy_static",
"prometheus", "prometheus",

View File

@ -877,6 +877,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"async-std", "async-std",
"bincode", "bincode",
"crossbeam-channel",
"futures", "futures",
"lazy_static", "lazy_static",
"prometheus", "prometheus",

View File

@ -60,14 +60,10 @@ pub struct Participant {
/// ///
/// Unlike [`Network`] and [`Participant`], `Streams` don't implement interior /// Unlike [`Network`] and [`Participant`], `Streams` don't implement interior
/// mutability, as multiple threads don't need access to the same `Stream`. /// mutability, as multiple threads don't need access to the same `Stream`.
/// [`Sync`] is not supported! In that case multiple `Streams` should be used
/// instead. However it's still possible to [`Send`] `Streams`.
/// ///
/// [`Networks`]: crate::api::Network /// [`Networks`]: crate::api::Network
/// [`open`]: Participant::open /// [`open`]: Participant::open
/// [`opened`]: Participant::opened /// [`opened`]: Participant::opened
/// [`Send`]: std::marker::Send
/// [`Sync`]: std::marker::Sync
#[derive(Debug)] #[derive(Debug)]
pub struct Stream { pub struct Stream {
pid: Pid, pid: Pid,
@ -86,6 +82,8 @@ pub struct Stream {
pub enum NetworkError { pub enum NetworkError {
NetworkClosed, NetworkClosed,
ListenFailed(std::io::Error), ListenFailed(std::io::Error),
ConnectFailed(std::io::Error),
GracefulDisconnectFailed(std::io::Error),
} }
/// Error type thrown by [`Participants`](Participant) methods /// Error type thrown by [`Participants`](Participant) methods
@ -317,7 +315,10 @@ impl Network {
.await .await
.send((address, pid_sender)) .send((address, pid_sender))
.await?; .await?;
let participant = pid_receiver.await??; let participant = match pid_receiver.await? {
Ok(p) => p,
Err(e) => return Err(NetworkError::ConnectFailed(e)),
};
let pid = participant.remote_pid; let pid = participant.remote_pid;
debug!( debug!(
?pid, ?pid,
@ -437,6 +438,7 @@ impl Network {
this is a bad idea. Participant will only be dropped when you drop your last \ this is a bad idea. Participant will only be dropped when you drop your last \
reference" reference"
); );
Ok(())
}, },
Ok(mut participant) => { Ok(mut participant) => {
trace!("waiting now for participant to close"); trace!("waiting now for participant to close");
@ -451,13 +453,30 @@ impl Network {
.send((pid, finished_sender)) .send((pid, finished_sender))
.await .await
.expect("something is wrong in internal scheduler coding"); .expect("something is wrong in internal scheduler coding");
let res = finished_receiver.await.unwrap(); match finished_receiver.await {
trace!("participant is now closed"); Ok(Ok(())) => {
res?; trace!(?pid, "Participant is now closed");
Ok(())
},
Ok(Err(e)) => {
trace!(
?e,
"Error occured during shutdown of participant and is propagated to \
User"
);
Err(NetworkError::GracefulDisconnectFailed(e))
},
Err(e) => {
error!(
?pid,
?e,
"Failed to get a message back from the scheduler, closing the network"
);
Err(NetworkError::NetworkClosed)
},
}
}, },
}; }
Ok(())
} }
/// returns a copy of all current connected [`Participants`], /// returns a copy of all current connected [`Participants`],
@ -946,10 +965,6 @@ impl<T> From<crossbeam_channel::SendError<T>> for NetworkError {
fn from(_err: crossbeam_channel::SendError<T>) -> Self { NetworkError::NetworkClosed } fn from(_err: crossbeam_channel::SendError<T>) -> Self { NetworkError::NetworkClosed }
} }
impl From<async_std::io::Error> for NetworkError {
fn from(err: async_std::io::Error) -> Self { NetworkError::ListenFailed(err) }
}
impl From<std::option::NoneError> for StreamError { impl From<std::option::NoneError> for StreamError {
fn from(_err: std::option::NoneError) -> Self { StreamError::StreamClosed } fn from(_err: std::option::NoneError) -> Self { StreamError::StreamClosed }
} }
@ -1006,6 +1021,8 @@ impl core::fmt::Display for NetworkError {
match self { match self {
NetworkError::NetworkClosed => write!(f, "network closed"), NetworkError::NetworkClosed => write!(f, "network closed"),
NetworkError::ListenFailed(_) => write!(f, "listening failed"), NetworkError::ListenFailed(_) => write!(f, "listening failed"),
NetworkError::ConnectFailed(_) => write!(f, "connecting failed"),
NetworkError::GracefulDisconnectFailed(_) => write!(f, "graceful disconnect failed"),
} }
} }
} }

View File

@ -71,7 +71,10 @@ impl TcpProtocol {
"closing tcp protocol due to read error, sending close frame to gracefully \ "closing tcp protocol due to read error, sending close frame to gracefully \
shutdown" shutdown"
); );
w2c_cid_frame_s.send((cid, Frame::Shutdown)).await.unwrap(); w2c_cid_frame_s
.send((cid, Frame::Shutdown))
.await
.expect("Channel or Participant seems no longer to exist to be Shutdown");
} }
} }
@ -201,7 +204,10 @@ impl TcpProtocol {
}, },
}; };
metrics_cache.with_label_values(&frame).inc(); metrics_cache.with_label_values(&frame).inc();
w2c_cid_frame_s.send((cid, frame)).await.unwrap(); w2c_cid_frame_s
.send((cid, frame))
.await
.expect("Channel or Participant seems no longer to exist");
} }
trace!("shutting down tcp read()"); trace!("shutting down tcp read()");
} }

View File

@ -7,7 +7,7 @@ use vek::*;
pub struct Client { pub struct Client {
pub client_state: ClientState, pub client_state: ClientState,
pub singleton_stream: std::sync::Mutex<Stream>, pub singleton_stream: Stream,
pub last_ping: f64, pub last_ping: f64,
pub login_msg_sent: bool, pub login_msg_sent: bool,
} }
@ -17,9 +17,7 @@ impl Component for Client {
} }
impl Client { impl Client {
pub fn notify(&mut self, msg: ServerMsg) { pub fn notify(&mut self, msg: ServerMsg) { let _ = self.singleton_stream.send(msg); }
let _ = self.singleton_stream.lock().unwrap().send(msg);
}
pub fn is_registered(&self) -> bool { pub fn is_registered(&self) -> bool {
match self.client_state { match self.client_state {
@ -39,16 +37,12 @@ impl Client {
self.client_state = new_state; self.client_state = new_state;
let _ = self let _ = self
.singleton_stream .singleton_stream
.lock()
.unwrap()
.send(ServerMsg::StateAnswer(Ok(new_state))); .send(ServerMsg::StateAnswer(Ok(new_state)));
} }
pub fn error_state(&mut self, error: RequestStateError) { pub fn error_state(&mut self, error: RequestStateError) {
let _ = self let _ = self
.singleton_stream .singleton_stream
.lock()
.unwrap()
.send(ServerMsg::StateAnswer(Err((error, self.client_state)))); .send(ServerMsg::StateAnswer(Err((error, self.client_state))));
} }
} }

View File

@ -601,7 +601,7 @@ impl Server {
let mut client = Client { let mut client = Client {
client_state: ClientState::Connected, client_state: ClientState::Connected,
singleton_stream: std::sync::Mutex::new(singleton_stream), singleton_stream,
last_ping: self.state.get_time(), last_ping: self.state.get_time(),
login_msg_sent: false, login_msg_sent: false,
}; };

View File

@ -57,7 +57,7 @@ impl Sys {
settings: &Read<'_, ServerSettings>, settings: &Read<'_, ServerSettings>,
) -> Result<(), crate::error::Error> { ) -> Result<(), crate::error::Error> {
loop { loop {
let msg = client.singleton_stream.lock().unwrap().recv().await?; let msg = client.singleton_stream.recv().await?;
*cnt += 1; *cnt += 1;
match msg { match msg {
// Go back to registered state (char selection screen) // Go back to registered state (char selection screen)

View File

@ -12,6 +12,7 @@ use std::{
thread, thread,
time::Duration, time::Duration,
}; };
use tracing::debug;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@ -70,8 +71,8 @@ impl ClientInit {
let mut last_err = None; let mut last_err = None;
'tries: for _ in 0..960 + 1 { const FOUR_MINUTES_RETRIES: u64 = 48;
// 300 Seconds 'tries: for _ in 0..FOUR_MINUTES_RETRIES {
if cancel2.load(Ordering::Relaxed) { if cancel2.load(Ordering::Relaxed) {
break; break;
} }
@ -100,7 +101,13 @@ impl ClientInit {
}, },
Err(err) => { Err(err) => {
match err { match err {
ClientError::NetworkErr(NetworkError::ListenFailed(..)) => { ClientError::NetworkErr(NetworkError::ConnectFailed(
..,
)) => {
debug!(
"can't reach the server, going to retry in a few \
seconds"
);
}, },
// Non-connection error, stop attempts // Non-connection error, stop attempts
err => { err => {