diff --git a/client/src/lib.rs b/client/src/lib.rs index c6ba6c119d..0644006433 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -75,7 +75,7 @@ pub struct Client { pub active_character_id: Option, _network: Network, - _participant: Arc, + participant: Option, singleton_stream: Stream, last_server_ping: f64, @@ -200,7 +200,7 @@ impl Client { active_character_id: None, _network: network, - _participant: participant, + participant: Some(participant), singleton_stream: stream, last_server_ping: 0.0, @@ -1200,5 +1200,8 @@ impl Drop for Client { already closed?", ); } + if let Err(e) = block_on(self.participant.take().unwrap().disconnect()) { + warn!(?e, "error when disconnecting, couldn't send all data"); + } } } diff --git a/network/examples/chat/Cargo.lock b/network/examples/chat/Cargo.lock index 9c26fd8f98..c5efb50889 100644 --- a/network/examples/chat/Cargo.lock +++ b/network/examples/chat/Cargo.lock @@ -65,7 +65,7 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5753e2a71534719bf3f4e57006c3a4f0d2c672a4b676eec84161f763eca87dbf" dependencies = [ - "byteorder", + "byteorder 1.3.4", "serde", ] @@ -75,6 +75,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "byteorder" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855" + [[package]] name = "byteorder" version = "1.3.4" @@ -345,6 +351,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "lz4-compress" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f966533a922a9bba9e95e594c1fdb3b9bf5fdcdb11e37e51ad84cd76e468b91" +dependencies = [ + "byteorder 0.5.3", + "quick-error", +] + [[package]] name = "matchers" version = "0.0.1" @@ -538,15 +554,15 @@ dependencies = [ [[package]] name = "prometheus" -version = "0.7.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5567486d5778e2c6455b1b90ff1c558f29e751fc018130fa182e15828e728af1" +checksum = "dd0ced56dee39a6e960c15c74dc48849d614586db2eaada6497477af7c7811cd" dependencies = [ "cfg-if", "fnv", "lazy_static", - "quick-error", "spin", + "thiserror", ] [[package]] @@ -629,7 +645,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" dependencies = [ - "byteorder", + "byteorder 1.3.4", "regex-syntax", ] @@ -712,6 +728,26 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "thiserror" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dfdd070ccd8ccb78f4ad66bf1982dc37f620ef696c6b5028fe2ed83dd3d0d08" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.0.1" @@ -798,6 +834,7 @@ dependencies = [ "crossbeam-channel", "futures", "lazy_static", + "lz4-compress", "prometheus", "rand", "serde", diff --git a/network/examples/chat/src/main.rs b/network/examples/chat/src/main.rs index a3a4fabcab..a497567525 100644 --- a/network/examples/chat/src/main.rs +++ b/network/examples/chat/src/main.rs @@ -4,10 +4,11 @@ //! (cd network/examples/chat && RUST_BACKTRACE=1 cargo run --release -- --trace=info --port 15006 --mode=client) //! ``` use async_std::io; +use async_std::sync::RwLock; use clap::{App, Arg}; use futures::executor::{block_on, ThreadPool}; use network::{Address, Network, Participant, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED}; -use std::{sync::Arc, thread, time::Duration}; +use std::{sync::Arc, thread, time::Duration, collections::HashMap}; use tracing::*; use tracing_subscriber::EnvFilter; @@ -103,17 +104,19 @@ fn server(address: Address) { let server = Arc::new(server); std::thread::spawn(f); let pool = ThreadPool::new().unwrap(); + let participants = Arc::new(RwLock::new(HashMap::new())); block_on(async { server.listen(address).await.unwrap(); loop { - let p1 = server.connected().await.unwrap(); + let p1 = Arc::new(server.connected().await.unwrap()); let server1 = server.clone(); - pool.spawn_ok(client_connection(server1, p1)); + participants.write().await.insert(p1.remote_pid(), p1.clone()); + pool.spawn_ok(client_connection(server1, p1, participants.clone())); } }); } -async fn client_connection(network: Arc, participant: Arc) { +async fn client_connection(_network: Arc, participant: Arc, participants: Arc>>>) { let mut s1 = participant.opened().await.unwrap(); let username = s1.recv::().await.unwrap(); println!("[{}] connected", username); @@ -124,14 +127,12 @@ async fn client_connection(network: Arc, participant: Arc) }, Ok(msg) => { println!("[{}]: {}", username, msg); - let mut parts = network.participants().await; - for (_, p) in parts.drain() { + for (_, p) in participants.read().await.iter() { match p .open(32, PROMISES_ORDERED | PROMISES_CONSISTENCY) .await { Err(_) => { - //probably disconnected, remove it - network.disconnect(p).await.unwrap(); + info!("error talking to client, //TODO drop it") }, Ok(mut s) => s.send((username.clone(), msg.clone())).unwrap(), }; @@ -180,7 +181,7 @@ fn client(address: Address) { // receiving i open and close a stream per message. this can be done easier but // this allows me to be quite lazy on the server side and just get a list of // all participants and send to them... -async fn read_messages(participant: Arc) { +async fn read_messages(participant: Participant) { while let Ok(mut s) = participant.opened().await { let (username, message) = s.recv::<(String, String)>().await.unwrap(); println!("[{}]: {}", username, message); diff --git a/network/examples/fileshare/Cargo.lock b/network/examples/fileshare/Cargo.lock index 0144ec3ac0..24eaae51a1 100644 --- a/network/examples/fileshare/Cargo.lock +++ b/network/examples/fileshare/Cargo.lock @@ -83,7 +83,7 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5753e2a71534719bf3f4e57006c3a4f0d2c672a4b676eec84161f763eca87dbf" dependencies = [ - "byteorder", + "byteorder 1.3.4", "serde", ] @@ -104,6 +104,12 @@ dependencies = [ "constant_time_eq", ] +[[package]] +name = "byteorder" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855" + [[package]] name = "byteorder" version = "1.3.4" @@ -418,6 +424,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "lz4-compress" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f966533a922a9bba9e95e594c1fdb3b9bf5fdcdb11e37e51ad84cd76e468b91" +dependencies = [ + "byteorder 0.5.3", + "quick-error", +] + [[package]] name = "matchers" version = "0.0.1" @@ -597,15 +613,15 @@ dependencies = [ [[package]] name = "prometheus" -version = "0.7.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5567486d5778e2c6455b1b90ff1c558f29e751fc018130fa182e15828e728af1" +checksum = "dd0ced56dee39a6e960c15c74dc48849d614586db2eaada6497477af7c7811cd" dependencies = [ "cfg-if", "fnv", "lazy_static", - "quick-error", "spin", + "thiserror", ] [[package]] @@ -699,7 +715,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" dependencies = [ - "byteorder", + "byteorder 1.3.4", "regex-syntax", ] @@ -803,6 +819,26 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "thiserror" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dfdd070ccd8ccb78f4ad66bf1982dc37f620ef696c6b5028fe2ed83dd3d0d08" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.0.1" @@ -889,6 +925,7 @@ dependencies = [ "crossbeam-channel", "futures", "lazy_static", + "lz4-compress", "prometheus", "rand", "serde", diff --git a/network/examples/fileshare/src/commands.rs b/network/examples/fileshare/src/commands.rs index 3749f29c53..a603890a22 100644 --- a/network/examples/fileshare/src/commands.rs +++ b/network/examples/fileshare/src/commands.rs @@ -6,7 +6,7 @@ use network::{Address, Participant, Stream}; use rand::Rng; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; #[derive(Debug)] pub enum LocalCommand { @@ -34,7 +34,7 @@ pub struct FileInfo { pub struct RemoteInfo { infos: HashMap, - _participant: Arc, + _participant: Participant, pub cmd_out: Stream, pub file_out: Stream, } @@ -68,7 +68,7 @@ impl FileInfo { } impl RemoteInfo { - pub fn new(cmd_out: Stream, file_out: Stream, participant: Arc) -> Self { + pub fn new(cmd_out: Stream, file_out: Stream, participant: Participant) -> Self { Self { infos: HashMap::new(), _participant: participant, diff --git a/network/examples/fileshare/src/server.rs b/network/examples/fileshare/src/server.rs index c00cecd61d..2dc3835437 100644 --- a/network/examples/fileshare/src/server.rs +++ b/network/examples/fileshare/src/server.rs @@ -64,9 +64,6 @@ impl Server { }, LocalCommand::Disconnect => { self.remotes.write().await.clear(); - for (_, p) in self.network.participants().await.drain() { - self.network.disconnect(p).await.unwrap(); - } println!("Disconnecting all connections"); return; }, @@ -126,7 +123,7 @@ impl Server { trace!("Stop connect_manager"); } - async fn loop_participant(&self, p: Arc) { + async fn loop_participant(&self, p: Participant) { if let (Ok(cmd_out), Ok(file_out), Ok(cmd_in), Ok(file_in)) = ( p.open(15, PROMISES_CONSISTENCY | PROMISES_ORDERED).await, p.open(40, PROMISES_CONSISTENCY).await, diff --git a/network/examples/network-speed/Cargo.lock b/network/examples/network-speed/Cargo.lock index 7cd1b8e982..3a1ba21cb5 100644 --- a/network/examples/network-speed/Cargo.lock +++ b/network/examples/network-speed/Cargo.lock @@ -71,7 +71,7 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5753e2a71534719bf3f4e57006c3a4f0d2c672a4b676eec84161f763eca87dbf" dependencies = [ - "byteorder", + "byteorder 1.3.4", "serde", ] @@ -81,6 +81,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "byteorder" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855" + [[package]] name = "byteorder" version = "1.3.4" @@ -372,6 +378,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "lz4-compress" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f966533a922a9bba9e95e594c1fdb3b9bf5fdcdb11e37e51ad84cd76e468b91" +dependencies = [ + "byteorder 0.5.3", + "quick-error", +] + [[package]] name = "matchers" version = "0.0.1" @@ -578,16 +594,16 @@ dependencies = [ [[package]] name = "prometheus" -version = "0.7.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5567486d5778e2c6455b1b90ff1c558f29e751fc018130fa182e15828e728af1" +checksum = "dd0ced56dee39a6e960c15c74dc48849d614586db2eaada6497477af7c7811cd" dependencies = [ "cfg-if", "fnv", "lazy_static", "protobuf", - "quick-error", "spin", + "thiserror", ] [[package]] @@ -670,7 +686,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" dependencies = [ - "byteorder", + "byteorder 1.3.4", "regex-syntax", ] @@ -753,6 +769,26 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "thiserror" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dfdd070ccd8ccb78f4ad66bf1982dc37f620ef696c6b5028fe2ed83dd3d0d08" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.0.1" @@ -880,6 +916,7 @@ dependencies = [ "crossbeam-channel", "futures", "lazy_static", + "lz4-compress", "prometheus", "rand", "serde", diff --git a/network/examples/network-speed/src/main.rs b/network/examples/network-speed/src/main.rs index f7b8e3a033..f047983394 100644 --- a/network/examples/network-speed/src/main.rs +++ b/network/examples/network-speed/src/main.rs @@ -181,7 +181,7 @@ fn client(address: Address) { drop(s1); std::thread::sleep(std::time::Duration::from_millis(5000)); info!("Closing participant"); - block_on(client.disconnect(p1)).unwrap(); + block_on(p1.disconnect()).unwrap(); std::thread::sleep(std::time::Duration::from_millis(25000)); info!("DROPPING! client"); drop(client); diff --git a/network/src/api.rs b/network/src/api.rs index 8a954c930b..8ef1d0918c 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -26,6 +26,9 @@ use std::{ use tracing::*; use tracing_futures::Instrument; +type ParticipantCloseChannel = + mpsc::UnboundedSender<(Pid, oneshot::Sender>)>; + /// Represents a Tcp or Udp or Mpsc address #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub enum Address { @@ -47,8 +50,9 @@ pub struct Participant { a2b_steam_open_s: RwLock)>>, b2a_stream_opened_r: RwLock>, closed: AtomicBool, - a2s_disconnect_s: - Option>)>>, + //We need a std::Mutex here, the async Mutex requeres a block in `Drop` which can `panic!` + //It's only okay because `disconnect` is the only `fn` accessing it and it consumes self! + a2s_disconnect_s: Arc>>, } /// `Streams` represents a channel to send `n` messages with a certain priority @@ -83,13 +87,14 @@ pub enum NetworkError { NetworkClosed, ListenFailed(std::io::Error), ConnectFailed(std::io::Error), - GracefulDisconnectFailed(std::io::Error), } /// Error type thrown by [`Participants`](Participant) methods -#[derive(Debug, PartialEq)] +#[derive(Debug)] pub enum ParticipantError { + ///Participant was closed too early to complete the action fully ParticipantClosed, + GracefulDisconnectFailed(std::io::Error), } /// Error type thrown by [`Streams`](Stream) methods @@ -105,10 +110,9 @@ pub enum StreamError { /// Application. You can pass it around multiple threads in an /// [`Arc`](std::sync::Arc) as all commands have internal mutability. /// -/// The `Network` has methods to [`connect`] and [`disconnect`] to other -/// [`Participants`] via their [`Address`]. All [`Participants`] will be stored -/// in the Network until explicitly disconnected, which is the only way to close -/// the sockets. +/// The `Network` has methods to [`connect`] to other [`Participants`] actively +/// via their [`Address`], or [`listen`] passively for [`connected`] +/// [`Participants`]. /// /// # Examples /// ```rust @@ -133,10 +137,12 @@ pub enum StreamError { /// /// [`Participants`]: crate::api::Participant /// [`connect`]: Network::connect -/// [`disconnect`]: Network::disconnect +/// [`listen`]: Network::listen +/// [`connected`]: Network::connected pub struct Network { local_pid: Pid, - participants: RwLock>>, + participant_disconnect_sender: + RwLock>>>>, listen_sender: RwLock>)>>, connect_sender: @@ -204,7 +210,7 @@ impl Network { ( Self { local_pid: participant_id, - participants: RwLock::new(HashMap::new()), + participant_disconnect_sender: RwLock::new(HashMap::new()), listen_sender: RwLock::new(listen_sender), connect_sender: RwLock::new(connect_sender), connected_receiver: RwLock::new(connected_receiver), @@ -293,7 +299,7 @@ impl Network { /// let p2 = network /// .connect(Address::Udp("127.0.0.1:2011".parse().unwrap())) /// .await?; - /// assert!(std::sync::Arc::ptr_eq(&p1, &p2)); + /// assert_eq!(&p1, &p2); /// # Ok(()) /// }) /// # } @@ -307,7 +313,7 @@ impl Network { /// /// [`Streams`]: crate::api::Stream /// [`Addresses`]: crate::api::Address - pub async fn connect(&self, address: Address) -> Result, NetworkError> { + pub async fn connect(&self, address: Address) -> Result { let (pid_sender, pid_receiver) = oneshot::channel::>(); debug!(?address, "Connect to address"); self.connect_sender @@ -324,11 +330,10 @@ impl Network { ?pid, "Received Participant id from remote and return to user" ); - let participant = Arc::new(participant); - self.participants + self.participant_disconnect_sender .write() .await - .insert(participant.remote_pid, participant.clone()); + .insert(pid, participant.a2s_disconnect_s.clone()); Ok(participant) } @@ -365,129 +370,14 @@ impl Network { /// /// [`Streams`]: crate::api::Stream /// [`listen`]: crate::api::Network::listen - pub async fn connected(&self) -> Result, NetworkError> { + pub async fn connected(&self) -> Result { let participant = self.connected_receiver.write().await.next().await?; - let participant = Arc::new(participant); - self.participants + self.participant_disconnect_sender .write() .await - .insert(participant.remote_pid, participant.clone()); + .insert(participant.remote_pid, participant.a2s_disconnect_s.clone()); Ok(participant) } - - /// disconnecting a [`Participant`] where you move the last existing - /// [`Arc`]. As the [`Network`] also holds [`Arc`] to the - /// [`Participant`], you need to provide the last [`Arc`] and - /// are not allowed to keep others. If you do so the [`Participant`] - /// can't be disconnected properly. If you no longer have the respective - /// [`Participant`], try using the [`participants`] method to get it. - /// - /// This function will wait for all [`Streams`] to properly close, including - /// all messages to be send before closing. If an error occurs with one - /// of the messages. - /// Except if the remote side already dropped the [`Participant`] - /// simultaneously, then messages won't be sended - /// - /// There is NO `disconnected` function in `Network`, if a [`Participant`] - /// is no longer reachable (e.g. as the network cable was unplugged) the - /// [`Participant`] will fail all action, but needs to be manually - /// disconected, using this function. - /// - /// # Examples - /// ```rust - /// use futures::executor::block_on; - /// use veloren_network::{Address, Network, Pid}; - /// - /// # fn main() -> std::result::Result<(), Box> { - /// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection. - /// let (network, f) = Network::new(Pid::new(), None); - /// std::thread::spawn(f); - /// # let (remote, fr) = Network::new(Pid::new(), None); - /// # std::thread::spawn(fr); - /// block_on(async { - /// network - /// .listen(Address::Tcp("0.0.0.0:2030".parse().unwrap())) - /// .await?; - /// # remote.connect(Address::Tcp("0.0.0.0:2030".parse().unwrap())).await?; - /// while let Ok(participant) = network.connected().await { - /// println!("Participant connected: {}", participant.remote_pid()); - /// network.disconnect(participant).await?; - /// # //skip test here as it would be a endless loop - /// # break; - /// } - /// # Ok(()) - /// }) - /// # } - /// ``` - /// - /// [`Arc`]: crate::api::Participant - /// [`Streams`]: crate::api::Stream - /// [`participants`]: Network::participants - /// [`Arc`]: std::sync::Arc - pub async fn disconnect(&self, participant: Arc) -> Result<(), NetworkError> { - // Remove, Close and try_unwrap error when unwrap fails! - let pid = participant.remote_pid; - debug!(?pid, "Removing participant from network"); - self.participants.write().await.remove(&pid)?; - participant.closed.store(true, Ordering::Relaxed); - - match Arc::try_unwrap(participant) { - Err(_) => { - warn!( - "You are disconnecting and still keeping a reference to this participant, \ - this is a bad idea. Participant will only be dropped when you drop your last \ - reference" - ); - Ok(()) - }, - Ok(mut participant) => { - trace!("Waiting now for participant to close"); - let (finished_sender, finished_receiver) = oneshot::channel(); - // we are deleting here asyncly before DROP is called. Because this is done - // nativly async, while drop needs an BLOCK! Drop will recognis - // that it has been delete here and don't try another double delete. - participant - .a2s_disconnect_s - .take() - .unwrap() - .send((pid, finished_sender)) - .await - .expect("Something is wrong in internal scheduler coding"); - match finished_receiver.await { - Ok(Ok(())) => { - trace!(?pid, "Participant is now closed"); - Ok(()) - }, - Ok(Err(e)) => { - trace!( - ?e, - "Error occured during shutdown of participant and is propagated to \ - User" - ); - Err(NetworkError::GracefulDisconnectFailed(e)) - }, - Err(e) => { - error!( - ?pid, - ?e, - "Failed to get a message back from the scheduler, closing the network" - ); - Err(NetworkError::NetworkClosed) - }, - } - }, - } - } - - /// returns a copy of all current connected [`Participants`], - /// including ones, which can't send data anymore as the underlying sockets - /// are closed already but haven't been [`disconnected`] yet. - /// - /// [`Participants`]: crate::api::Participant - /// [`disconnected`]: Network::disconnect - pub async fn participants(&self) -> HashMap> { - self.participants.read().await.clone() - } } impl Participant { @@ -504,7 +394,7 @@ impl Participant { a2b_steam_open_s: RwLock::new(a2b_steam_open_s), b2a_stream_opened_r: RwLock::new(b2a_stream_opened_r), closed: AtomicBool::new(false), - a2s_disconnect_s: Some(a2s_disconnect_s), + a2s_disconnect_s: Arc::new(std::sync::Mutex::new(Some(a2s_disconnect_s))), } } @@ -635,6 +525,100 @@ impl Participant { } } + /// disconnecting a `Participant` in a async way. + /// Use this rather than `Participant::Drop` if you want to close multiple + /// `Participants`. + /// + /// This function will wait for all [`Streams`] to properly close, including + /// all messages to be send before closing. If an error occurs with one + /// of the messages. + /// Except if the remote side already dropped the `Participant` + /// simultaneously, then messages won't be send + /// + /// There is NO `disconnected` function in `Participant`, if a `Participant` + /// is no longer reachable (e.g. as the network cable was unplugged) the + /// `Participant` will fail all action, but needs to be manually + /// disconected, using this function. + /// + /// # Examples + /// ```rust + /// use futures::executor::block_on; + /// use veloren_network::{Address, Network, Pid}; + /// + /// # fn main() -> std::result::Result<(), Box> { + /// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection. + /// let (network, f) = Network::new(Pid::new(), None); + /// std::thread::spawn(f); + /// # let (remote, fr) = Network::new(Pid::new(), None); + /// # std::thread::spawn(fr); + /// block_on(async { + /// network + /// .listen(Address::Tcp("0.0.0.0:2030".parse().unwrap())) + /// .await?; + /// # remote.connect(Address::Tcp("0.0.0.0:2030".parse().unwrap())).await?; + /// while let Ok(participant) = network.connected().await { + /// println!("Participant connected: {}", participant.remote_pid()); + /// participant.disconnect().await?; + /// # //skip test here as it would be a endless loop + /// # break; + /// } + /// # Ok(()) + /// }) + /// # } + /// ``` + /// + /// [`Streams`]: crate::api::Stream + pub async fn disconnect(self) -> Result<(), ParticipantError> { + // Remove, Close and try_unwrap error when unwrap fails! + let pid = self.remote_pid; + debug!(?pid, "Closing participant from network"); + self.closed.store(true, Ordering::Relaxed); + //Streams will be closed by BParticipant + + match self.a2s_disconnect_s.lock().unwrap().take() { + Some(mut a2s_disconnect_s) => { + let (finished_sender, finished_receiver) = oneshot::channel(); + // Participant is connecting to Scheduler here, not as usual + // Participant<->BParticipant + a2s_disconnect_s + .send((pid, finished_sender)) + .await + .expect("Something is wrong in internal scheduler coding"); + match finished_receiver.await { + Ok(Ok(())) => { + trace!(?pid, "Participant is now closed"); + Ok(()) + }, + Ok(Err(e)) => { + trace!( + ?e, + "Error occured during shutdown of participant and is propagated to \ + User" + ); + Err(ParticipantError::GracefulDisconnectFailed(e)) + }, + Err(e) => { + //this is a bug. but as i am Participant i can't destroy the network + error!( + ?pid, + ?e, + "Failed to get a message back from the scheduler, seems like the \ + network is already closed" + ); + Err(ParticipantError::ParticipantClosed) + }, + } + }, + None => { + warn!( + "seems like you are trying to disconnecting a participant after the network \ + was already dropped. It was already dropped with the network!" + ); + Err(ParticipantError::ParticipantClosed) + }, + } + } + /// Returns the remote [`Pid`] pub fn remote_pid(&self) -> Pid { self.remote_pid } } @@ -837,38 +821,68 @@ impl Stream { } } +/// +impl core::cmp::PartialEq for Participant { + fn eq(&self, other: &Self) -> bool { + //don't check local_pid, 2 Participant from different network should match if + // they are the "same" + self.remote_pid == other.remote_pid + } +} + impl Drop for Network { fn drop(&mut self) { let pid = self.local_pid; debug!(?pid, "Shutting down Network"); - debug!( + trace!( ?pid, "Shutting down Participants of Network, while we still have metrics" ); + let mut finished_receiver_list = vec![]; task::block_on(async { // we need to carefully shut down here! as otherwise we might call // Participant::Drop with a2s_disconnect_s here which would open - // another task::block, which would panic! also i can't `.write` on - // `self.participants` as the `disconnect` fn needs it. - let mut participant_clone = self.participants().await; - for (_, p) in participant_clone.drain() { - if let Err(e) = self.disconnect(p).await { - error!( - ?e, - "Error while dropping network, the error occured when dropping a \ - participant but can't be notified to the user any more" - ); + // another task::block, which would panic! + for (remote_pid, a2s_disconnect_s) in + self.participant_disconnect_sender.write().await.drain() + { + match a2s_disconnect_s.lock().unwrap().take() { + Some(mut a2s_disconnect_s) => { + trace!(?remote_pid, "Participants will be closed"); + let (finished_sender, finished_receiver) = oneshot::channel(); + finished_receiver_list.push((remote_pid, finished_receiver)); + a2s_disconnect_s + .send((remote_pid, finished_sender)) + .await + .expect( + "Scheduler is closed, but nobody other should be able to close it", + ); + }, + None => trace!(?remote_pid, "Participant already disconnected gracefully"), + } + } + //wait after close is requested for all + for (remote_pid, finished_receiver) in finished_receiver_list.drain(..) { + match finished_receiver.await { + Ok(Ok(())) => trace!(?remote_pid, "disconnect successful"), + Ok(Err(e)) => info!(?remote_pid, ?e, "unclean disconnect"), + Err(e) => warn!( + ?remote_pid, + ?e, + "Failed to get a message back from the scheduler, seems like the network \ + is already closed" + ), } } - self.participants.write().await.clear(); }); - debug!(?pid, "Shutting down Scheduler"); + trace!(?pid, "Participants have shut down!"); + trace!(?pid, "Shutting down Scheduler"); self.shutdown_sender .take() .unwrap() .send(()) .expect("Scheduler is closed, but nobody other should be able to close it"); - debug!(?pid, "Participants have shut down!"); + debug!(?pid, "Network has shut down"); } } @@ -878,16 +892,14 @@ impl Drop for Participant { // participant from network let pid = self.remote_pid; debug!(?pid, "Shutting down Participant"); - match self.a2s_disconnect_s.take() { - None => debug!( + + match self.a2s_disconnect_s.lock().unwrap().take() { + None => trace!( ?pid, "Participant has been shutdown cleanly, no further waiting is requiered!" ), Some(mut a2s_disconnect_s) => { - debug!( - ?pid, - "Unclean shutdown detected, active waiting for client to be disconnected" - ); + debug!(?pid, "Disconnect from Scheduler"); task::block_on(async { let (finished_sender, finished_receiver) = oneshot::channel(); a2s_disconnect_s @@ -912,7 +924,7 @@ impl Drop for Participant { }); }, } - debug!(?pid, "Network dropped"); + debug!(?pid, "Participant dropped"); } } @@ -932,7 +944,7 @@ impl Drop for Stream { } else { let sid = self.sid; let pid = self.pid; - debug!(?pid, ?sid, "Drop not needed"); + trace!(?pid, ?sid, "Stream Drop not needed"); } } } @@ -1012,6 +1024,9 @@ impl core::fmt::Display for ParticipantError { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { ParticipantError::ParticipantClosed => write!(f, "Participant closed"), + ParticipantError::GracefulDisconnectFailed(_) => { + write!(f, "Graceful disconnect failed") + }, } } } @@ -1022,7 +1037,6 @@ impl core::fmt::Display for NetworkError { NetworkError::NetworkClosed => write!(f, "Network closed"), NetworkError::ListenFailed(_) => write!(f, "Listening failed"), NetworkError::ConnectFailed(_) => write!(f, "Connecting failed"), - NetworkError::GracefulDisconnectFailed(_) => write!(f, "Graceful disconnect failed"), } } } diff --git a/network/src/lib.rs b/network/src/lib.rs index eea927189c..2cbb2bc494 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -16,7 +16,7 @@ //! //! To connect to another application, you must know it's [`Address`]. One side //! will call [`connect`], the other [`connected`]. If successfull both -//! applications will now get a [`Arc`]. +//! applications will now get a [`Participant`]. //! //! This [`Participant`] represents the connection between those 2 applications. //! over the respective [`Address`] and with it the choosen network protocol. @@ -86,7 +86,6 @@ //! [`Networks`]: crate::api::Network //! [`connect`]: crate::api::Network::connect //! [`connected`]: crate::api::Network::connected -//! [`Arc`]: crate::api::Participant //! [`Participant`]: crate::api::Participant //! [`Participants`]: crate::api::Participant //! [`open`]: crate::api::Participant::open diff --git a/network/src/participant.rs b/network/src/participant.rs index 7dac6102bf..fd6c32d798 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -252,8 +252,8 @@ impl BParticipant { longer work in the first place" ); }; - //TODO - warn!( + //TODO FIXME tags: takeover channel multiple + info!( "FIXME: the frame is actually drop. which is fine for now as the participant \ will be closed, but not if we do channel-takeover" ); @@ -621,10 +621,9 @@ impl BParticipant { let mut info = match self.shutdown_info.lock().await.take() { Some(info) => info, None => { - error!( - "Close of participant seemed to be called twice, that's bad, ignoring the 2nd \ - close" - ); + //This can happen if >=2 different async fn found out the protocol got dropped + // but they haven't shut down so far + debug!("Close of participant seemed to be called twice, ignoring the 2nd close"); return; }, }; diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 5264a89244..6fd45c7950 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -328,6 +328,7 @@ impl Scheduler { ); }; } + debug!("Scheduler shut down gracefully"); //removing the possibility to create new participants, needed to close down // some mgr: self.participant_channels.lock().await.take(); diff --git a/network/tests/closing.rs b/network/tests/closing.rs index 25ac05311c..2bd9a92acd 100644 --- a/network/tests/closing.rs +++ b/network/tests/closing.rs @@ -19,10 +19,10 @@ fn close_network() { #[test] fn close_participant() { let (_, _) = helper::setup(false, 0); - let (n_a, p1_a, mut s1_a, n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp())); + let (_n_a, p1_a, mut s1_a, _n_b, p1_b, mut s1_b) = block_on(network_participant_stream(tcp())); - block_on(n_a.disconnect(p1_a)).unwrap(); - block_on(n_b.disconnect(p1_b)).unwrap(); + block_on(p1_a.disconnect()).unwrap(); + block_on(p1_b.disconnect()).unwrap(); assert_eq!(s1_a.send("Hello World"), Err(StreamError::StreamClosed)); assert_eq!( @@ -66,7 +66,7 @@ fn close_streams_in_block_on() { #[test] fn stream_simple_3msg_then_close() { let (_, _) = helper::setup(false, 0); - let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(tcp())); + let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); s1_a.send(1u8).unwrap(); s1_a.send(42).unwrap(); @@ -83,7 +83,7 @@ fn stream_simple_3msg_then_close() { fn stream_send_first_then_receive() { // recv should still be possible even if stream got closed if they are in queue let (_, _) = helper::setup(false, 0); - let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(tcp())); + let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); s1_a.send(1u8).unwrap(); s1_a.send(42).unwrap(); @@ -99,7 +99,7 @@ fn stream_send_first_then_receive() { #[test] fn stream_send_1_then_close_stream() { let (_, _) = helper::setup(false, 0); - let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(tcp())); + let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); s1_a.send("this message must be received, even if stream is closed already!") .unwrap(); drop(s1_a); @@ -112,7 +112,7 @@ fn stream_send_1_then_close_stream() { #[test] fn stream_send_100000_then_close_stream() { let (_, _) = helper::setup(false, 0); - let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(tcp())); + let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); for _ in 0..100000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -130,7 +130,7 @@ fn stream_send_100000_then_close_stream() { #[test] fn stream_send_100000_then_close_stream_remote() { let (_, _) = helper::setup(false, 0); - let (_n_a, _, mut s1_a, _n_b, _, _s1_b) = block_on(network_participant_stream(tcp())); + let (_n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp())); for _ in 0..100000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -142,7 +142,7 @@ fn stream_send_100000_then_close_stream_remote() { #[test] fn stream_send_100000_then_close_stream_remote2() { let (_, _) = helper::setup(false, 0); - let (_n_a, _, mut s1_a, _n_b, _, _s1_b) = block_on(network_participant_stream(tcp())); + let (_n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp())); for _ in 0..100000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -155,7 +155,7 @@ fn stream_send_100000_then_close_stream_remote2() { #[test] fn stream_send_100000_then_close_stream_remote3() { let (_, _) = helper::setup(false, 0); - let (_n_a, _, mut s1_a, _n_b, _, _s1_b) = block_on(network_participant_stream(tcp())); + let (_n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp())); for _ in 0..100000 { s1_a.send("woop_PARTY_HARD_woop").unwrap(); } @@ -164,3 +164,41 @@ fn stream_send_100000_then_close_stream_remote3() { drop(s1_a); //no receiving } + +#[test] +fn close_part_then_network() { + let (_, _) = helper::setup(false, 0); + let (n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp())); + for _ in 0..1000 { + s1_a.send("woop_PARTY_HARD_woop").unwrap(); + } + drop(p_a); + std::thread::sleep(std::time::Duration::from_millis(1000)); + drop(n_a); + std::thread::sleep(std::time::Duration::from_millis(1000)); +} + +#[test] +fn close_network_then_part() { + let (_, _) = helper::setup(false, 0); + let (n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp())); + for _ in 0..1000 { + s1_a.send("woop_PARTY_HARD_woop").unwrap(); + } + drop(n_a); + std::thread::sleep(std::time::Duration::from_millis(1000)); + drop(p_a); + std::thread::sleep(std::time::Duration::from_millis(1000)); +} + +#[test] +fn close_network_then_disconnect_part() { + let (_, _) = helper::setup(false, 0); + let (n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = block_on(network_participant_stream(tcp())); + for _ in 0..1000 { + s1_a.send("woop_PARTY_HARD_woop").unwrap(); + } + drop(n_a); + assert!(block_on(p_a.disconnect()).is_err()); + std::thread::sleep(std::time::Duration::from_millis(1000)); +} diff --git a/network/tests/helper.rs b/network/tests/helper.rs index f043074e8e..386f6d94dc 100644 --- a/network/tests/helper.rs +++ b/network/tests/helper.rs @@ -1,10 +1,7 @@ use lazy_static::*; use std::{ net::SocketAddr, - sync::{ - atomic::{AtomicU16, Ordering}, - Arc, - }, + sync::atomic::{AtomicU16, Ordering}, thread, time::Duration, }; @@ -51,14 +48,7 @@ pub fn setup(tracing: bool, mut sleep: u64) -> (u64, u64) { #[allow(dead_code)] pub async fn network_participant_stream( addr: Address, -) -> ( - Network, - Arc, - Stream, - Network, - Arc, - Stream, -) { +) -> (Network, Participant, Stream, Network, Participant, Stream) { let (n_a, f_a) = Network::new(Pid::fake(1), None); std::thread::spawn(f_a); let (n_b, f_b) = Network::new(Pid::fake(2), None); diff --git a/network/tests/integration.rs b/network/tests/integration.rs index 6fe0dea42a..e0cd43844f 100644 --- a/network/tests/integration.rs +++ b/network/tests/integration.rs @@ -17,7 +17,7 @@ fn network_20s() { #[test] fn stream_simple() { let (_, _) = helper::setup(false, 0); - let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(tcp())); + let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); s1_a.send("Hello World").unwrap(); assert_eq!(block_on(s1_b.recv()), Ok("Hello World".to_string())); @@ -26,7 +26,7 @@ fn stream_simple() { #[test] fn stream_simple_3msg() { let (_, _) = helper::setup(false, 0); - let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(tcp())); + let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); s1_a.send("Hello World").unwrap(); s1_a.send(1337).unwrap(); @@ -39,7 +39,7 @@ fn stream_simple_3msg() { #[test] fn stream_simple_udp() { let (_, _) = helper::setup(false, 0); - let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(udp())); + let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(udp())); s1_a.send("Hello World").unwrap(); assert_eq!(block_on(s1_b.recv()), Ok("Hello World".to_string())); @@ -48,7 +48,7 @@ fn stream_simple_udp() { #[test] fn stream_simple_udp_3msg() { let (_, _) = helper::setup(false, 0); - let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(udp())); + let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(udp())); s1_a.send("Hello World").unwrap(); s1_a.send(1337).unwrap(); @@ -79,7 +79,7 @@ fn tcp_and_udp_2_connections() -> std::result::Result<(), Box std::result::Result<(), Box> #[test] fn wrong_parse() { let (_, _) = helper::setup(false, 0); - let (_n_a, _, mut s1_a, _n_b, _, mut s1_b) = block_on(network_participant_stream(tcp())); + let (_n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = block_on(network_participant_stream(tcp())); s1_a.send(1337).unwrap(); match block_on(s1_b.recv::()) { diff --git a/server/src/client.rs b/server/src/client.rs index ea3ecce838..345d674381 100644 --- a/server/src/client.rs +++ b/server/src/client.rs @@ -3,12 +3,12 @@ use hashbrown::HashSet; use network::{Participant, Stream}; use specs::{Component, FlaggedStorage}; use specs_idvs::IdvStorage; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; use vek::*; pub struct Client { pub client_state: ClientState, - pub participant: Mutex>>, + pub participant: Mutex>, pub singleton_stream: Stream, pub last_ping: f64, pub login_msg_sent: bool, diff --git a/server/src/events/player.rs b/server/src/events/player.rs index 53286ab115..c01afd405e 100644 --- a/server/src/events/player.rs +++ b/server/src/events/player.rs @@ -50,7 +50,7 @@ pub fn handle_client_disconnect(server: &mut Server, entity: EcsEntity) -> Event if let Some(client) = server.state().read_storage::().get(entity) { trace!("closing participant of client"); let participant = client.participant.lock().unwrap().take().unwrap(); - if let Err(e) = block_on(server.network.disconnect(participant)) { + if let Err(e) = block_on(participant.disconnect()) { debug!( ?e, "Error when disconnecting client, maybe the pipe already broke"