Switch API to return Participant rather than Arc<Participant>

- API behavior switched!
 - the `Network` no longer holds a copy of participant, thus if the return of `connect` (before `Arc<Participant>, now `Participant`) got dropped, the `Participant::Drop` is triggered!
 - you can close a Participant async via `Particiant::disconnect()`, no more need to know the network at this point
 - the `Network::Drop` will check and drop not yet disconnected Participants.
 - you can compare Participants via PartialEq, if they are true they point to the same endpoint (it checks remote_pid)
   - Note: multiple Participants are only supported in theory, wont work yet

Additionally:
 - fix some `debug!`
 - veloren-client will now drop the participant gracefully on shutdown
 - rename `error` to `debug` when 2 times Bparticipant shutdown is called, as it is to be expected in a async runtime
This commit is contained in:
Marcel Märtens 2020-07-09 09:58:21 +02:00
parent 9ae1d8474f
commit 041349be48
17 changed files with 384 additions and 231 deletions

View File

@ -75,7 +75,7 @@ pub struct Client {
pub active_character_id: Option<i32>,
_network: Network,
_participant: Arc<Participant>,
participant: Option<Participant>,
singleton_stream: Stream,
last_server_ping: f64,
@ -200,7 +200,7 @@ impl Client {
active_character_id: None,
_network: network,
_participant: participant,
participant: Some(participant),
singleton_stream: stream,
last_server_ping: 0.0,
@ -1200,5 +1200,8 @@ impl Drop for Client {
already closed?",
);
}
if let Err(e) = block_on(self.participant.take().unwrap().disconnect()) {
warn!(?e, "error when disconnecting, couldn't send all data");
}
}
}

View File

@ -65,7 +65,7 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5753e2a71534719bf3f4e57006c3a4f0d2c672a4b676eec84161f763eca87dbf"
dependencies = [
"byteorder",
"byteorder 1.3.4",
"serde",
]
@ -75,6 +75,12 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "byteorder"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855"
[[package]]
name = "byteorder"
version = "1.3.4"
@ -345,6 +351,16 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "lz4-compress"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f966533a922a9bba9e95e594c1fdb3b9bf5fdcdb11e37e51ad84cd76e468b91"
dependencies = [
"byteorder 0.5.3",
"quick-error",
]
[[package]]
name = "matchers"
version = "0.0.1"
@ -538,15 +554,15 @@ dependencies = [
[[package]]
name = "prometheus"
version = "0.7.0"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5567486d5778e2c6455b1b90ff1c558f29e751fc018130fa182e15828e728af1"
checksum = "dd0ced56dee39a6e960c15c74dc48849d614586db2eaada6497477af7c7811cd"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"quick-error",
"spin",
"thiserror",
]
[[package]]
@ -629,7 +645,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4"
dependencies = [
"byteorder",
"byteorder 1.3.4",
"regex-syntax",
]
@ -712,6 +728,26 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "thiserror"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dfdd070ccd8ccb78f4ad66bf1982dc37f620ef696c6b5028fe2ed83dd3d0d08"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "thread_local"
version = "1.0.1"
@ -798,6 +834,7 @@ dependencies = [
"crossbeam-channel",
"futures",
"lazy_static",
"lz4-compress",
"prometheus",
"rand",
"serde",

View File

@ -4,10 +4,11 @@
//! (cd network/examples/chat && RUST_BACKTRACE=1 cargo run --release -- --trace=info --port 15006 --mode=client)
//! ```
use async_std::io;
use async_std::sync::RwLock;
use clap::{App, Arg};
use futures::executor::{block_on, ThreadPool};
use network::{Address, Network, Participant, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
use std::{sync::Arc, thread, time::Duration};
use std::{sync::Arc, thread, time::Duration, collections::HashMap};
use tracing::*;
use tracing_subscriber::EnvFilter;
@ -103,17 +104,19 @@ fn server(address: Address) {
let server = Arc::new(server);
std::thread::spawn(f);
let pool = ThreadPool::new().unwrap();
let participants = Arc::new(RwLock::new(HashMap::new()));
block_on(async {
server.listen(address).await.unwrap();
loop {
let p1 = server.connected().await.unwrap();
let p1 = Arc::new(server.connected().await.unwrap());
let server1 = server.clone();
pool.spawn_ok(client_connection(server1, p1));
participants.write().await.insert(p1.remote_pid(), p1.clone());
pool.spawn_ok(client_connection(server1, p1, participants.clone()));
}
});
}
async fn client_connection(network: Arc<Network>, participant: Arc<Participant>) {
async fn client_connection(_network: Arc<Network>, participant: Arc<Participant>, participants: Arc<RwLock<HashMap<Pid, Arc<Participant>>>>) {
let mut s1 = participant.opened().await.unwrap();
let username = s1.recv::<String>().await.unwrap();
println!("[{}] connected", username);
@ -124,14 +127,12 @@ async fn client_connection(network: Arc<Network>, participant: Arc<Participant>)
},
Ok(msg) => {
println!("[{}]: {}", username, msg);
let mut parts = network.participants().await;
for (_, p) in parts.drain() {
for (_, p) in participants.read().await.iter() {
match p
.open(32, PROMISES_ORDERED | PROMISES_CONSISTENCY)
.await {
Err(_) => {
//probably disconnected, remove it
network.disconnect(p).await.unwrap();
info!("error talking to client, //TODO drop it")
},
Ok(mut s) => s.send((username.clone(), msg.clone())).unwrap(),
};
@ -180,7 +181,7 @@ fn client(address: Address) {
// receiving i open and close a stream per message. this can be done easier but
// this allows me to be quite lazy on the server side and just get a list of
// all participants and send to them...
async fn read_messages(participant: Arc<Participant>) {
async fn read_messages(participant: Participant) {
while let Ok(mut s) = participant.opened().await {
let (username, message) = s.recv::<(String, String)>().await.unwrap();
println!("[{}]: {}", username, message);

View File

@ -83,7 +83,7 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5753e2a71534719bf3f4e57006c3a4f0d2c672a4b676eec84161f763eca87dbf"
dependencies = [
"byteorder",
"byteorder 1.3.4",
"serde",
]
@ -104,6 +104,12 @@ dependencies = [
"constant_time_eq",
]
[[package]]
name = "byteorder"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855"
[[package]]
name = "byteorder"
version = "1.3.4"
@ -418,6 +424,16 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "lz4-compress"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f966533a922a9bba9e95e594c1fdb3b9bf5fdcdb11e37e51ad84cd76e468b91"
dependencies = [
"byteorder 0.5.3",
"quick-error",
]
[[package]]
name = "matchers"
version = "0.0.1"
@ -597,15 +613,15 @@ dependencies = [
[[package]]
name = "prometheus"
version = "0.7.0"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5567486d5778e2c6455b1b90ff1c558f29e751fc018130fa182e15828e728af1"
checksum = "dd0ced56dee39a6e960c15c74dc48849d614586db2eaada6497477af7c7811cd"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"quick-error",
"spin",
"thiserror",
]
[[package]]
@ -699,7 +715,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4"
dependencies = [
"byteorder",
"byteorder 1.3.4",
"regex-syntax",
]
@ -803,6 +819,26 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "thiserror"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dfdd070ccd8ccb78f4ad66bf1982dc37f620ef696c6b5028fe2ed83dd3d0d08"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "thread_local"
version = "1.0.1"
@ -889,6 +925,7 @@ dependencies = [
"crossbeam-channel",
"futures",
"lazy_static",
"lz4-compress",
"prometheus",
"rand",
"serde",

View File

@ -6,7 +6,7 @@ use network::{Address, Participant, Stream};
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use std::collections::HashMap;
#[derive(Debug)]
pub enum LocalCommand {
@ -34,7 +34,7 @@ pub struct FileInfo {
pub struct RemoteInfo {
infos: HashMap<u32, FileInfo>,
_participant: Arc<Participant>,
_participant: Participant,
pub cmd_out: Stream,
pub file_out: Stream,
}
@ -68,7 +68,7 @@ impl FileInfo {
}
impl RemoteInfo {
pub fn new(cmd_out: Stream, file_out: Stream, participant: Arc<Participant>) -> Self {
pub fn new(cmd_out: Stream, file_out: Stream, participant: Participant) -> Self {
Self {
infos: HashMap::new(),
_participant: participant,

View File

@ -64,9 +64,6 @@ impl Server {
},
LocalCommand::Disconnect => {
self.remotes.write().await.clear();
for (_, p) in self.network.participants().await.drain() {
self.network.disconnect(p).await.unwrap();
}
println!("Disconnecting all connections");
return;
},
@ -126,7 +123,7 @@ impl Server {
trace!("Stop connect_manager");
}
async fn loop_participant(&self, p: Arc<Participant>) {
async fn loop_participant(&self, p: Participant) {
if let (Ok(cmd_out), Ok(file_out), Ok(cmd_in), Ok(file_in)) = (
p.open(15, PROMISES_CONSISTENCY | PROMISES_ORDERED).await,
p.open(40, PROMISES_CONSISTENCY).await,

View File

@ -71,7 +71,7 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5753e2a71534719bf3f4e57006c3a4f0d2c672a4b676eec84161f763eca87dbf"
dependencies = [
"byteorder",
"byteorder 1.3.4",
"serde",
]
@ -81,6 +81,12 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "byteorder"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855"
[[package]]
name = "byteorder"
version = "1.3.4"
@ -372,6 +378,16 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "lz4-compress"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f966533a922a9bba9e95e594c1fdb3b9bf5fdcdb11e37e51ad84cd76e468b91"
dependencies = [
"byteorder 0.5.3",
"quick-error",
]
[[package]]
name = "matchers"
version = "0.0.1"
@ -578,16 +594,16 @@ dependencies = [
[[package]]
name = "prometheus"
version = "0.7.0"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5567486d5778e2c6455b1b90ff1c558f29e751fc018130fa182e15828e728af1"
checksum = "dd0ced56dee39a6e960c15c74dc48849d614586db2eaada6497477af7c7811cd"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"protobuf",
"quick-error",
"spin",
"thiserror",
]
[[package]]
@ -670,7 +686,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4"
dependencies = [
"byteorder",
"byteorder 1.3.4",
"regex-syntax",
]
@ -753,6 +769,26 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "thiserror"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dfdd070ccd8ccb78f4ad66bf1982dc37f620ef696c6b5028fe2ed83dd3d0d08"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "thread_local"
version = "1.0.1"
@ -880,6 +916,7 @@ dependencies = [
"crossbeam-channel",
"futures",
"lazy_static",
"lz4-compress",
"prometheus",
"rand",
"serde",

View File

@ -181,7 +181,7 @@ fn client(address: Address) {
drop(s1);
std::thread::sleep(std::time::Duration::from_millis(5000));
info!("Closing participant");
block_on(client.disconnect(p1)).unwrap();
block_on(p1.disconnect()).unwrap();
std::thread::sleep(std::time::Duration::from_millis(25000));
info!("DROPPING! client");
drop(client);

View File

@ -26,6 +26,9 @@ use std::{
use tracing::*;
use tracing_futures::Instrument;
type ParticipantCloseChannel =
mpsc::UnboundedSender<(Pid, oneshot::Sender<async_std::io::Result<()>>)>;
/// Represents a Tcp or Udp or Mpsc address
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum Address {
@ -47,8 +50,9 @@ pub struct Participant {
a2b_steam_open_s: RwLock<mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>>,
b2a_stream_opened_r: RwLock<mpsc::UnboundedReceiver<Stream>>,
closed: AtomicBool,
a2s_disconnect_s:
Option<mpsc::UnboundedSender<(Pid, oneshot::Sender<async_std::io::Result<()>>)>>,
//We need a std::Mutex here, the async Mutex requeres a block in `Drop` which can `panic!`
//It's only okay because `disconnect` is the only `fn` accessing it and it consumes self!
a2s_disconnect_s: Arc<std::sync::Mutex<Option<ParticipantCloseChannel>>>,
}
/// `Streams` represents a channel to send `n` messages with a certain priority
@ -83,13 +87,14 @@ pub enum NetworkError {
NetworkClosed,
ListenFailed(std::io::Error),
ConnectFailed(std::io::Error),
GracefulDisconnectFailed(std::io::Error),
}
/// Error type thrown by [`Participants`](Participant) methods
#[derive(Debug, PartialEq)]
#[derive(Debug)]
pub enum ParticipantError {
///Participant was closed too early to complete the action fully
ParticipantClosed,
GracefulDisconnectFailed(std::io::Error),
}
/// Error type thrown by [`Streams`](Stream) methods
@ -105,10 +110,9 @@ pub enum StreamError {
/// Application. You can pass it around multiple threads in an
/// [`Arc`](std::sync::Arc) as all commands have internal mutability.
///
/// The `Network` has methods to [`connect`] and [`disconnect`] to other
/// [`Participants`] via their [`Address`]. All [`Participants`] will be stored
/// in the Network until explicitly disconnected, which is the only way to close
/// the sockets.
/// The `Network` has methods to [`connect`] to other [`Participants`] actively
/// via their [`Address`], or [`listen`] passively for [`connected`]
/// [`Participants`].
///
/// # Examples
/// ```rust
@ -133,10 +137,12 @@ pub enum StreamError {
///
/// [`Participants`]: crate::api::Participant
/// [`connect`]: Network::connect
/// [`disconnect`]: Network::disconnect
/// [`listen`]: Network::listen
/// [`connected`]: Network::connected
pub struct Network {
local_pid: Pid,
participants: RwLock<HashMap<Pid, Arc<Participant>>>,
participant_disconnect_sender:
RwLock<HashMap<Pid, Arc<std::sync::Mutex<Option<ParticipantCloseChannel>>>>>,
listen_sender:
RwLock<mpsc::UnboundedSender<(Address, oneshot::Sender<async_std::io::Result<()>>)>>,
connect_sender:
@ -204,7 +210,7 @@ impl Network {
(
Self {
local_pid: participant_id,
participants: RwLock::new(HashMap::new()),
participant_disconnect_sender: RwLock::new(HashMap::new()),
listen_sender: RwLock::new(listen_sender),
connect_sender: RwLock::new(connect_sender),
connected_receiver: RwLock::new(connected_receiver),
@ -293,7 +299,7 @@ impl Network {
/// let p2 = network
/// .connect(Address::Udp("127.0.0.1:2011".parse().unwrap()))
/// .await?;
/// assert!(std::sync::Arc::ptr_eq(&p1, &p2));
/// assert_eq!(&p1, &p2);
/// # Ok(())
/// })
/// # }
@ -307,7 +313,7 @@ impl Network {
///
/// [`Streams`]: crate::api::Stream
/// [`Addresses`]: crate::api::Address
pub async fn connect(&self, address: Address) -> Result<Arc<Participant>, NetworkError> {
pub async fn connect(&self, address: Address) -> Result<Participant, NetworkError> {
let (pid_sender, pid_receiver) = oneshot::channel::<io::Result<Participant>>();
debug!(?address, "Connect to address");
self.connect_sender
@ -324,11 +330,10 @@ impl Network {
?pid,
"Received Participant id from remote and return to user"
);
let participant = Arc::new(participant);
self.participants
self.participant_disconnect_sender
.write()
.await
.insert(participant.remote_pid, participant.clone());
.insert(pid, participant.a2s_disconnect_s.clone());
Ok(participant)
}
@ -365,129 +370,14 @@ impl Network {
///
/// [`Streams`]: crate::api::Stream
/// [`listen`]: crate::api::Network::listen
pub async fn connected(&self) -> Result<Arc<Participant>, NetworkError> {
pub async fn connected(&self) -> Result<Participant, NetworkError> {
let participant = self.connected_receiver.write().await.next().await?;
let participant = Arc::new(participant);
self.participants
self.participant_disconnect_sender
.write()
.await
.insert(participant.remote_pid, participant.clone());
.insert(participant.remote_pid, participant.a2s_disconnect_s.clone());
Ok(participant)
}
/// disconnecting a [`Participant`] where you move the last existing
/// [`Arc<Participant>`]. As the [`Network`] also holds [`Arc`] to the
/// [`Participant`], you need to provide the last [`Arc<Participant>`] and
/// are not allowed to keep others. If you do so the [`Participant`]
/// can't be disconnected properly. If you no longer have the respective
/// [`Participant`], try using the [`participants`] method to get it.
///
/// This function will wait for all [`Streams`] to properly close, including
/// all messages to be send before closing. If an error occurs with one
/// of the messages.
/// Except if the remote side already dropped the [`Participant`]
/// simultaneously, then messages won't be sended
///
/// There is NO `disconnected` function in `Network`, if a [`Participant`]
/// is no longer reachable (e.g. as the network cable was unplugged) the
/// [`Participant`] will fail all action, but needs to be manually
/// disconected, using this function.
///
/// # Examples
/// ```rust
/// use futures::executor::block_on;
/// use veloren_network::{Address, Network, Pid};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection.
/// let (network, f) = Network::new(Pid::new(), None);
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new(), None);
/// # std::thread::spawn(fr);
/// block_on(async {
/// network
/// .listen(Address::Tcp("0.0.0.0:2030".parse().unwrap()))
/// .await?;
/// # remote.connect(Address::Tcp("0.0.0.0:2030".parse().unwrap())).await?;
/// while let Ok(participant) = network.connected().await {
/// println!("Participant connected: {}", participant.remote_pid());
/// network.disconnect(participant).await?;
/// # //skip test here as it would be a endless loop
/// # break;
/// }
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Arc<Participant>`]: crate::api::Participant
/// [`Streams`]: crate::api::Stream
/// [`participants`]: Network::participants
/// [`Arc`]: std::sync::Arc
pub async fn disconnect(&self, participant: Arc<Participant>) -> Result<(), NetworkError> {
// Remove, Close and try_unwrap error when unwrap fails!
let pid = participant.remote_pid;
debug!(?pid, "Removing participant from network");
self.participants.write().await.remove(&pid)?;
participant.closed.store(true, Ordering::Relaxed);
match Arc::try_unwrap(participant) {
Err(_) => {
warn!(
"You are disconnecting and still keeping a reference to this participant, \
this is a bad idea. Participant will only be dropped when you drop your last \
reference"
);
Ok(())
},
Ok(mut participant) => {
trace!("Waiting now for participant to close");
let (finished_sender, finished_receiver) = oneshot::channel();
// we are deleting here asyncly before DROP is called. Because this is done
// nativly async, while drop needs an BLOCK! Drop will recognis
// that it has been delete here and don't try another double delete.
participant
.a2s_disconnect_s
.take()
.unwrap()
.send((pid, finished_sender))
.await
.expect("Something is wrong in internal scheduler coding");
match finished_receiver.await {
Ok(Ok(())) => {
trace!(?pid, "Participant is now closed");
Ok(())
},
Ok(Err(e)) => {
trace!(
?e,
"Error occured during shutdown of participant and is propagated to \
User"
);
Err(NetworkError::GracefulDisconnectFailed(e))
},
Err(e) => {
error!(
?pid,
?e,
"Failed to get a message back from the scheduler, closing the network"
);
Err(NetworkError::NetworkClosed)
},
}
},
}
}
/// returns a copy of all current connected [`Participants`],
/// including ones, which can't send data anymore as the underlying sockets
/// are closed already but haven't been [`disconnected`] yet.
///
/// [`Participants`]: crate::api::Participant
/// [`disconnected`]: Network::disconnect
pub async fn participants(&self) -> HashMap<Pid, Arc<Participant>> {
self.participants.read().await.clone()
}
}
impl Participant {
@ -504,7 +394,7 @@ impl Participant {
a2b_steam_open_s: RwLock::new(a2b_steam_open_s),
b2a_stream_opened_r: RwLock::new(b2a_stream_opened_r),
closed: AtomicBool::new(false),
a2s_disconnect_s: Some(a2s_disconnect_s),
a2s_disconnect_s: Arc::new(std::sync::Mutex::new(Some(a2s_disconnect_s))),
}
}
@ -635,6 +525,100 @@ impl Participant {
}
}
/// disconnecting a `Participant` in a async way.
/// Use this rather than `Participant::Drop` if you want to close multiple
/// `Participants`.
///
/// This function will wait for all [`Streams`] to properly close, including
/// all messages to be send before closing. If an error occurs with one
/// of the messages.
/// Except if the remote side already dropped the `Participant`
/// simultaneously, then messages won't be send
///
/// There is NO `disconnected` function in `Participant`, if a `Participant`
/// is no longer reachable (e.g. as the network cable was unplugged) the
/// `Participant` will fail all action, but needs to be manually
/// disconected, using this function.
///
/// # Examples
/// ```rust
/// use futures::executor::block_on;
/// use veloren_network::{Address, Network, Pid};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection.
/// let (network, f) = Network::new(Pid::new(), None);
/// std::thread::spawn(f);
/// # let (remote, fr) = Network::new(Pid::new(), None);
/// # std::thread::spawn(fr);
/// block_on(async {
/// network
/// .listen(Address::Tcp("0.0.0.0:2030".parse().unwrap()))
/// .await?;
/// # remote.connect(Address::Tcp("0.0.0.0:2030".parse().unwrap())).await?;
/// while let Ok(participant) = network.connected().await {
/// println!("Participant connected: {}", participant.remote_pid());
/// participant.disconnect().await?;
/// # //skip test here as it would be a endless loop
/// # break;
/// }
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Streams`]: crate::api::Stream
pub async fn disconnect(self) -> Result<(), ParticipantError> {
// Remove, Close and try_unwrap error when unwrap fails!
let pid = self.remote_pid;
debug!(?pid, "Closing participant from network");
self.closed.store(true, Ordering::Relaxed);
//Streams will be closed by BParticipant
match self.a2s_disconnect_s.lock().unwrap().take() {
Some(mut a2s_disconnect_s) => {
let (finished_sender, finished_receiver) = oneshot::channel();
// Participant is connecting to Scheduler here, not as usual
// Participant<->BParticipant
a2s_disconnect_s
.send((pid, finished_sender))
.await
.expect("Something is wrong in internal scheduler coding");
match finished_receiver.await {
Ok(Ok(())) => {
trace!(?pid, "Participant is now closed");
Ok(())
},
Ok(Err(e)) => {
trace!(
?e,
"Error occured during shutdown of participant and is propagated to \
User"
);
Err(ParticipantError::GracefulDisconnectFailed(e))
},
Err(e) => {
//this is a bug. but as i am Participant i can't destroy the network
error!(
?pid,
?e,
"Failed to get a message back from the scheduler, seems like the \
network is already closed"
);
Err(ParticipantError::ParticipantClosed)
},
}
},
None => {
warn!(
"seems like you are trying to disconnecting a participant after the network \
was already dropped. It was already dropped with the network!"
);
Err(ParticipantError::ParticipantClosed)
},
}
}
/// Returns the remote [`Pid`]
pub fn remote_pid(&self) -> Pid { self.remote_pid }
}
@ -837,38 +821,68 @@ impl Stream {
}
}
///
impl core::cmp::PartialEq for Participant {
fn eq(&self, other: &Self) -> bool {
//don't check local_pid, 2 Participant from different network should match if
// they are the "same"
self.remote_pid == other.remote_pid
}
}
impl Drop for Network {
fn drop(&mut self) {
let pid = self.local_pid;
debug!(?pid, "Shutting down Network");
debug!(
trace!(
?pid,
"Shutting down Participants of Network, while we still have metrics"
);
let mut finished_receiver_list = vec![];
task::block_on(async {
// we need to carefully shut down here! as otherwise we might call
// Participant::Drop with a2s_disconnect_s here which would open
// another task::block, which would panic! also i can't `.write` on
// `self.participants` as the `disconnect` fn needs it.
let mut participant_clone = self.participants().await;
for (_, p) in participant_clone.drain() {
if let Err(e) = self.disconnect(p).await {
error!(
?e,
"Error while dropping network, the error occured when dropping a \
participant but can't be notified to the user any more"
);
// another task::block, which would panic!
for (remote_pid, a2s_disconnect_s) in
self.participant_disconnect_sender.write().await.drain()
{
match a2s_disconnect_s.lock().unwrap().take() {
Some(mut a2s_disconnect_s) => {
trace!(?remote_pid, "Participants will be closed");
let (finished_sender, finished_receiver) = oneshot::channel();
finished_receiver_list.push((remote_pid, finished_receiver));
a2s_disconnect_s
.send((remote_pid, finished_sender))
.await
.expect(
"Scheduler is closed, but nobody other should be able to close it",
);
},
None => trace!(?remote_pid, "Participant already disconnected gracefully"),
}
}
//wait after close is requested for all
for (remote_pid, finished_receiver) in finished_receiver_list.drain(..) {
match finished_receiver.await {
Ok(Ok(())) => trace!(?remote_pid, "disconnect successful"),
Ok(Err(e)) => info!(?remote_pid, ?e, "unclean disconnect"),
Err(e) => warn!(
?remote_pid,
?e,
"Failed to get a message back from the scheduler, seems like the network \
is already closed"
),
}
}
self.participants.write().await.clear();
});
debug!(?pid, "Shutting down Scheduler");
trace!(?pid, "Participants have shut down!");
trace!(?pid, "Shutting down Scheduler");
self.shutdown_sender
.take()
.unwrap()
.send(())
.expect("Scheduler is closed, but nobody other should be able to close it");
debug!(?pid, "Participants have shut down!");
debug!(?pid, "Network has shut down");
}
}
@ -878,16 +892,14 @@ impl Drop for Participant {
// participant from network
let pid = self.remote_pid;
debug!(?pid, "Shutting down Participant");
match self.a2s_disconnect_s.take() {
None => debug!(
match self.a2s_disconnect_s.lock().unwrap().take() {
None => trace!(
?pid,
"Participant has been shutdown cleanly, no further waiting is requiered!"
),
Some(mut a2s_disconnect_s) => {
debug!(
?pid,
"Unclean shutdown detected, active waiting for client to be disconnected"
);
debug!(?pid, "Disconnect from Scheduler");
task::block_on(async {
let (finished_sender, finished_receiver) = oneshot::channel();
a2s_disconnect_s
@ -912,7 +924,7 @@ impl Drop for Participant {
});
},
}
debug!(?pid, "Network dropped");
debug!(?pid, "Participant dropped");
}
}
@ -932,7 +944,7 @@ impl Drop for Stream {
} else {
let sid = self.sid;
let pid = self.pid;
debug!(?pid, ?sid, "Drop not needed");
trace!(?pid, ?sid, "Stream Drop not needed");
}
}
}
@ -1012,6 +1024,9 @@ impl core::fmt::Display for ParticipantError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
ParticipantError::ParticipantClosed => write!(f, "Participant closed"),
ParticipantError::GracefulDisconnectFailed(_) => {
write!(f, "Graceful disconnect failed")
},
}
}
}
@ -1022,7 +1037,6 @@ impl core::fmt::Display for NetworkError {
NetworkError::NetworkClosed => write!(f, "Network closed"),
NetworkError::ListenFailed(_) => write!(f, "Listening failed"),
NetworkError::ConnectFailed(_) => write!(f, "Connecting failed"),
NetworkError::GracefulDisconnectFailed(_) => write!(f, "Graceful disconnect failed"),
}
}
}

View File

@ -16,7 +16,7 @@
//!
//! To connect to another application, you must know it's [`Address`]. One side
//! will call [`connect`], the other [`connected`]. If successfull both
//! applications will now get a [`Arc<Participant>`].
//! applications will now get a [`Participant`].
//!
//! This [`Participant`] represents the connection between those 2 applications.
//! over the respective [`Address`] and with it the choosen network protocol.
@ -86,7 +86,6 @@
//! [`Networks`]: crate::api::Network
//! [`connect`]: crate::api::Network::connect
//! [`connected`]: crate::api::Network::connected
//! [`Arc<Participant>`]: crate::api::Participant
//! [`Participant`]: crate::api::Participant
//! [`Participants`]: crate::api::Participant
//! [`open`]: crate::api::Participant::open

View File

@ -252,8 +252,8 @@ impl BParticipant {
longer work in the first place"
);
};
//TODO
warn!(
//TODO FIXME tags: takeover channel multiple
info!(
"FIXME: the frame is actually drop. which is fine for now as the participant \
will be closed, but not if we do channel-takeover"
);
@ -621,10 +621,9 @@ impl BParticipant {
let mut info = match self.shutdown_info.lock().await.take() {
Some(info) => info,
None => {
error!(
"Close of participant seemed to be called twice, that's bad, ignoring the 2nd \
close"
);
//This can happen if >=2 different async fn found out the protocol got dropped
// but they haven't shut down so far
debug!("Close of participant seemed to be called twice, ignoring the 2nd close");
return;
},
};

View File

@ -328,6 +328,7 @@ impl Scheduler {
);
};
}
debug!("Scheduler shut down gracefully");
//removing the possibility to create new participants, needed to close down
// some mgr:
self.participant_channels.lock().await.take();

View File

@ -19,10 +19,10 @@ fn close_network() {
#[test]
fn close_participant() {
let (_, _) = helper::setup(false, 0);
let (n_a, p1_a, mut s1_a, n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp()));
let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp()));
block_on(n_a.disconnect(p1_a)).unwrap();
block_on(n_b.disconnect(p1_b)).unwrap();
block_on(p1_a.disconnect()).unwrap();
block_on(p1_b.disconnect()).unwrap();
assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed));
assert_eq!(
@ -66,7 +66,7 @@ fn close_streams_in_block_on() {
#[test]
fn stream_simple_3msg_then_close() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(tcp()));
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
s1_a.send(1u8).unwrap();
s1_a.send(42).unwrap();
@ -83,7 +83,7 @@ fn stream_simple_3msg_then_close() {
fn stream_send_first_then_receive() {
// recv should still be possible even if stream got closed if they are in queue
let (_, _) = helper::setup(false, 0);
let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(tcp()));
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
s1_a.send(1u8).unwrap();
s1_a.send(42).unwrap();
@ -99,7 +99,7 @@ fn stream_send_first_then_receive() {
#[test]
fn stream_send_1_then_close_stream() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(tcp()));
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
s1_a.send("this message must be received, even if stream is closed already!")
.unwrap();
drop(s1_a);
@ -112,7 +112,7 @@ fn stream_send_1_then_close_stream() {
#[test]
fn stream_send_100000_then_close_stream() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(tcp()));
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
@ -130,7 +130,7 @@ fn stream_send_100000_then_close_stream() {
#[test]
fn stream_send_100000_then_close_stream_remote() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _, mut s1_a, _n_b, _, _s1_b) = block_on(network_participant_stream(tcp()));
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
@ -142,7 +142,7 @@ fn stream_send_100000_then_close_stream_remote() {
#[test]
fn stream_send_100000_then_close_stream_remote2() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _, mut s1_a, _n_b, _, _s1_b) = block_on(network_participant_stream(tcp()));
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
@ -155,7 +155,7 @@ fn stream_send_100000_then_close_stream_remote2() {
#[test]
fn stream_send_100000_then_close_stream_remote3() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _, mut s1_a, _n_b, _, _s1_b) = block_on(network_participant_stream(tcp()));
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
@ -164,3 +164,41 @@ fn stream_send_100000_then_close_stream_remote3() {
drop(s1_a);
//no receiving
}
#[test]
fn close_part_then_network() {
let (_, _) = helper::setup(false, 0);
let (n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
for _ in 0..1000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
drop(n_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
}
#[test]
fn close_network_then_part() {
let (_, _) = helper::setup(false, 0);
let (n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
for _ in 0..1000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
drop(n_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
drop(p_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
}
#[test]
fn close_network_then_disconnect_part() {
let (_, _) = helper::setup(false, 0);
let (n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp()));
for _ in 0..1000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
drop(n_a);
assert!(block_on(p_a.disconnect()).is_err());
std::thread::sleep(std::time::Duration::from_millis(1000));
}

View File

@ -1,10 +1,7 @@
use lazy_static::*;
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicU16, Ordering},
Arc,
},
sync::atomic::{AtomicU16, Ordering},
thread,
time::Duration,
};
@ -51,14 +48,7 @@ pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) {
#[allow(dead_code)]
pub async fn network_participant_stream(
addr: Address,
) -> (
Network,
Arc<Participant>,
Stream,
Network,
Arc<Participant>,
Stream,
) {
) -> (Network, Participant, Stream, Network, Participant, Stream) {
let (n_a, f_a) = Network::new(Pid::fake(1), None);
std::thread::spawn(f_a);
let (n_b, f_b) = Network::new(Pid::fake(2), None);

View File

@ -17,7 +17,7 @@ fn network_20s() {
#[test]
fn stream_simple() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(tcp()));
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
s1_a.send("Hello World").unwrap();
assert_eq!(block_on(s1_b.recv()), Ok("Hello World".to_string()));
@ -26,7 +26,7 @@ fn stream_simple() {
#[test]
fn stream_simple_3msg() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(tcp()));
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
s1_a.send("Hello World").unwrap();
s1_a.send(1337).unwrap();
@ -39,7 +39,7 @@ fn stream_simple_3msg() {
#[test]
fn stream_simple_udp() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(udp()));
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(udp()));
s1_a.send("Hello World").unwrap();
assert_eq!(block_on(s1_b.recv()), Ok("Hello World".to_string()));
@ -48,7 +48,7 @@ fn stream_simple_udp() {
#[test]
fn stream_simple_udp_3msg() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(udp()));
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(udp()));
s1_a.send("Hello World").unwrap();
s1_a.send(1337).unwrap();
@ -79,7 +79,7 @@ fn tcp_and_udp_2_connections() -> std::result::Result<(), Box<dyn std::error::Er
let p2 = network
.connect(Address::Udp("127.0.0.1:2001".parse().unwrap()))
.await?;
assert!(std::sync::Arc::ptr_eq(&p1, &p2));
assert_eq!(&p1, &p2);
Ok(())
})
}
@ -174,7 +174,7 @@ fn api_stream_recv_main() -> std::result::Result<(), Box<dyn std::error::Error>>
#[test]
fn wrong_parse() {
let (_, _) = helper::setup(false, 0);
let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(tcp()));
let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp()));
s1_a.send(1337).unwrap();
match block_on(s1_b.recv::<String>()) {

View File

@ -3,12 +3,12 @@ use hashbrown::HashSet;
use network::{Participant, Stream};
use specs::{Component, FlaggedStorage};
use specs_idvs::IdvStorage;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use vek::*;
pub struct Client {
pub client_state: ClientState,
pub participant: Mutex<Option<Arc<Participant>>>,
pub participant: Mutex<Option<Participant>>,
pub singleton_stream: Stream,
pub last_ping: f64,
pub login_msg_sent: bool,

View File

@ -50,7 +50,7 @@ pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event
if let Some(client) = server.state().read_storage::<Client>().get(entity) {
trace!("closing participant of client");
let participant = client.participant.lock().unwrap().take().unwrap();
if let Err(e) = block_on(server.network.disconnect(participant)) {
if let Err(e) = block_on(participant.disconnect()) {
debug!(
?e,
"Error when disconnecting client, maybe the pipe already broke"