Merge branch 'xMAC94x/mpsc' into 'master'

Change the way Network is dropped.

Closes #46

See merge request veloren/veloren!1846
This commit is contained in:
Marcel 2021-03-03 15:19:44 +00:00
commit ef1ee4c4fa
14 changed files with 319 additions and 217 deletions

View File

@ -189,7 +189,7 @@ impl Client {
view_distance: Option<u32>,
runtime: Arc<Runtime>,
) -> Result<Self, Error> {
let network = Network::new(Pid::new(), Arc::clone(&runtime));
let network = Network::new(Pid::new(), &runtime);
let participant = match addr {
ConnectionArgs::IpAndPort(addrs) => {

View File

@ -117,7 +117,7 @@ criterion_main!(benches);
pub fn network_participant_stream(
addr: ProtocolAddr,
) -> (
Arc<Runtime>,
Runtime,
Network,
Participant,
Stream,
@ -125,10 +125,10 @@ pub fn network_participant_stream(
Participant,
Stream,
) {
let runtime = Arc::new(Runtime::new().unwrap());
let runtime = Runtime::new().unwrap();
let (n_a, p1_a, s1_a, n_b, p1_b, s1_b) = runtime.block_on(async {
let n_a = Network::new(Pid::fake(0), Arc::clone(&runtime));
let n_b = Network::new(Pid::fake(1), Arc::clone(&runtime));
let n_a = Network::new(Pid::fake(0), &runtime);
let n_b = Network::new(Pid::fake(1), &runtime);
n_a.listen(addr.clone()).await.unwrap();
let p1_b = n_b.connect(addr).await.unwrap();

View File

@ -100,7 +100,7 @@ fn main() {
fn server(address: ProtocolAddr) {
let r = Arc::new(Runtime::new().unwrap());
let server = Network::new(Pid::new(), Arc::clone(&r));
let server = Network::new(Pid::new(), &r);
let server = Arc::new(server);
let participants = Arc::new(RwLock::new(Vec::new()));
r.block_on(async {
@ -146,7 +146,7 @@ async fn client_connection(
fn client(address: ProtocolAddr) {
let r = Arc::new(Runtime::new().unwrap());
let client = Network::new(Pid::new(), Arc::clone(&r));
let client = Network::new(Pid::new(), &r);
r.block_on(async {
let p1 = client.connect(address.clone()).await.unwrap(); //remote representation of p1

View File

@ -27,7 +27,7 @@ impl Server {
pub fn new(runtime: Arc<Runtime>) -> (Self, mpsc::UnboundedSender<LocalCommand>) {
let (command_sender, command_receiver) = mpsc::unbounded_channel();
let network = Network::new(Pid::new(), runtime);
let network = Network::new(Pid::new(), &runtime);
let run_channels = Some(ControlChannels { command_receiver });
(

View File

@ -123,7 +123,7 @@ fn main() {
fn server(address: ProtocolAddr, runtime: Arc<Runtime>) {
let registry = Arc::new(Registry::new());
let server = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &registry);
let server = Network::new_with_registry(Pid::new(), &runtime, &registry);
runtime.spawn(Server::run(
Arc::clone(&registry),
SocketAddr::from(([0; 4], 59112)),
@ -155,7 +155,7 @@ fn server(address: ProtocolAddr, runtime: Arc<Runtime>) {
fn client(address: ProtocolAddr, runtime: Arc<Runtime>) {
let registry = Arc::new(Registry::new());
let client = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &registry);
let client = Network::new_with_registry(Pid::new(), &runtime, &registry);
runtime.spawn(Server::run(
Arc::clone(&registry),
SocketAddr::from(([0; 4], 59111)),

View File

@ -129,39 +129,43 @@ pub enum StreamError {
/// via their [`ProtocolAddr`], or [`listen`] passively for [`connected`]
/// [`Participants`].
///
/// Too guarantee a clean shutdown, the [`Runtime`] MUST NOT be droped before
/// the Network.
///
/// # Examples
/// ```rust
/// # use std::sync::Arc;
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, ProtocolAddr, Pid};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application
/// let runtime = Arc::new(Runtime::new().unwrap());
/// let network = Network::new(Pid::new(), Arc::clone(&runtime));
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// runtime.block_on(async{
/// # //setup pseudo database!
/// # let database = Network::new(Pid::new(), Arc::clone(&runtime));
/// # let database = Network::new(Pid::new(), &runtime);
/// # database.listen(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?;
/// let database = network.connect(ProtocolAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
/// drop(network);
/// # drop(database);
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Participants`]: crate::api::Participant
/// [`Runtime`]: tokio::runtime::Runtime
/// [`connect`]: Network::connect
/// [`listen`]: Network::listen
/// [`connected`]: Network::connected
pub struct Network {
local_pid: Pid,
runtime: Arc<Runtime>,
participant_disconnect_sender: Mutex<HashMap<Pid, A2sDisconnect>>,
participant_disconnect_sender: Arc<Mutex<HashMap<Pid, A2sDisconnect>>>,
listen_sender: Mutex<mpsc::UnboundedSender<(ProtocolAddr, oneshot::Sender<io::Result<()>>)>>,
connect_sender: Mutex<mpsc::UnboundedSender<A2sConnect>>,
connected_receiver: Mutex<mpsc::UnboundedReceiver<Participant>>,
shutdown_sender: Option<oneshot::Sender<()>>,
shutdown_network_s: Option<oneshot::Sender<oneshot::Sender<()>>>,
}
impl Network {
@ -170,7 +174,7 @@ impl Network {
/// # Arguments
/// * `participant_id` - provide it by calling [`Pid::new()`], usually you
/// don't want to reuse a Pid for 2 `Networks`
/// * `runtime` - provide a tokio::Runtime, it's used to internally spawn
/// * `runtime` - provide a [`Runtime`], it's used to internally spawn
/// tasks. It is necessary to clean up in the non-async `Drop`. **All**
/// network related components **must** be dropped before the runtime is
/// stopped. dropping the runtime while a shutdown is still in progress
@ -183,12 +187,11 @@ impl Network {
///
/// # Examples
/// ```rust
/// use std::sync::Arc;
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), Arc::new(runtime));
/// let network = Network::new(Pid::new(), &runtime);
/// ```
///
/// Usually you only create a single `Network` for an application,
@ -197,7 +200,8 @@ impl Network {
/// creating more.
///
/// [`Pid::new()`]: network_protocol::Pid::new
pub fn new(participant_id: Pid, runtime: Arc<Runtime>) -> Self {
/// [`Runtime`]: tokio::runtime::Runtime
pub fn new(participant_id: Pid, runtime: &Runtime) -> Self {
Self::internal_new(
participant_id,
runtime,
@ -215,28 +219,23 @@ impl Network {
///
/// # Examples
/// ```rust
/// # use std::sync::Arc;
/// use prometheus::Registry;
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// let runtime = Runtime::new().unwrap();
/// let registry = Registry::new();
/// let network = Network::new_with_registry(Pid::new(), Arc::new(runtime), &registry);
/// let network = Network::new_with_registry(Pid::new(), &runtime, &registry);
/// ```
/// [`new`]: crate::api::Network::new
#[cfg(feature = "metrics")]
pub fn new_with_registry(
participant_id: Pid,
runtime: Arc<Runtime>,
registry: &Registry,
) -> Self {
pub fn new_with_registry(participant_id: Pid, runtime: &Runtime, registry: &Registry) -> Self {
Self::internal_new(participant_id, runtime, Some(registry))
}
fn internal_new(
participant_id: Pid,
runtime: Arc<Runtime>,
runtime: &Runtime,
#[cfg(feature = "metrics")] registry: Option<&Registry>,
) -> Self {
let p = participant_id;
@ -248,6 +247,15 @@ impl Network {
#[cfg(feature = "metrics")]
registry,
);
let participant_disconnect_sender = Arc::new(Mutex::new(HashMap::new()));
let (shutdown_network_s, shutdown_network_r) = oneshot::channel();
let f = Self::shutdown_mgr(
p,
shutdown_network_r,
Arc::clone(&participant_disconnect_sender),
shutdown_sender,
);
runtime.spawn(f);
runtime.spawn(
async move {
trace!("Starting scheduler in own thread");
@ -258,12 +266,11 @@ impl Network {
);
Self {
local_pid: participant_id,
runtime,
participant_disconnect_sender: Mutex::new(HashMap::new()),
participant_disconnect_sender,
listen_sender: Mutex::new(listen_sender),
connect_sender: Mutex::new(connect_sender),
connected_receiver: Mutex::new(connected_receiver),
shutdown_sender: Some(shutdown_sender),
shutdown_network_s: Some(shutdown_network_s),
}
}
@ -276,14 +283,13 @@ impl Network {
///
/// # Examples
/// ```ignore
/// # use std::sync::Arc;
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally
/// let runtime = Arc::new(Runtime::new().unwrap());
/// let network = Network::new(Pid::new(), Arc::clone(&runtime));
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// runtime.block_on(async {
/// network
/// .listen(ProtocolAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
@ -291,6 +297,7 @@ impl Network {
/// network
/// .listen(ProtocolAddr::Udp("127.0.0.1:2001".parse().unwrap()))
/// .await?;
/// drop(network);
/// # Ok(())
/// })
/// # }
@ -318,15 +325,14 @@ impl Network {
/// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g.
/// can't connect, or invalid Handshake) # Examples
/// ```ignore
/// # use std::sync::Arc;
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above
/// let runtime = Arc::new(Runtime::new().unwrap());
/// let network = Network::new(Pid::new(), Arc::clone(&runtime));
/// # let remote = Network::new(Pid::new(), Arc::clone(&runtime));
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
/// runtime.block_on(async {
/// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2010".parse().unwrap())).await?;
/// # remote.listen(ProtocolAddr::Udp("127.0.0.1:2011".parse().unwrap())).await?;
@ -341,7 +347,10 @@ impl Network {
/// .await?;
/// assert_eq!(&p1, &p2);
/// # Ok(())
/// })
/// })?;
/// drop(network);
/// # drop(remote);
/// # Ok(())
/// # }
/// ```
/// Usually the `Network` guarantees that a operation on a [`Participant`]
@ -382,15 +391,14 @@ impl Network {
///
/// # Examples
/// ```rust
/// # use std::sync::Arc;
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2020` TCP and opens returns their Pid
/// let runtime = Arc::new(Runtime::new().unwrap());
/// let network = Network::new(Pid::new(), Arc::clone(&runtime));
/// # let remote = Network::new(Pid::new(), Arc::clone(&runtime));
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
/// runtime.block_on(async {
/// network
/// .listen(ProtocolAddr::Tcp("127.0.0.1:2020".parse().unwrap()))
@ -401,6 +409,8 @@ impl Network {
/// # //skip test here as it would be a endless loop
/// # break;
/// }
/// drop(network);
/// # drop(remote);
/// # Ok(())
/// })
/// # }
@ -417,6 +427,58 @@ impl Network {
);
Ok(participant)
}
/// Use a mgr to handle shutdown smoothly and not in `Drop`
#[instrument(name="network", skip(participant_disconnect_sender, shutdown_scheduler_s), fields(p = %local_pid))]
async fn shutdown_mgr(
local_pid: Pid,
shutdown_network_r: oneshot::Receiver<oneshot::Sender<()>>,
participant_disconnect_sender: Arc<Mutex<HashMap<Pid, A2sDisconnect>>>,
shutdown_scheduler_s: oneshot::Sender<()>,
) {
trace!("waiting for shutdown triggerNetwork");
let return_s = shutdown_network_r.await;
trace!("Shutting down Participants of Network");
let mut finished_receiver_list = vec![];
for (remote_pid, a2s_disconnect_s) in participant_disconnect_sender.lock().await.drain() {
match a2s_disconnect_s.lock().await.take() {
Some(a2s_disconnect_s) => {
trace!(?remote_pid, "Participants will be closed");
let (finished_sender, finished_receiver) = oneshot::channel();
finished_receiver_list.push((remote_pid, finished_receiver));
a2s_disconnect_s
.send((remote_pid, (Duration::from_secs(120), finished_sender)))
.expect("Scheduler is closed, but nobody other should be able to close it");
},
None => trace!(?remote_pid, "Participant already disconnected gracefully"),
}
}
//wait after close is requested for all
for (remote_pid, finished_receiver) in finished_receiver_list.drain(..) {
match finished_receiver.await {
Ok(Ok(())) => trace!(?remote_pid, "disconnect successful"),
Ok(Err(e)) => info!(?remote_pid, ?e, "unclean disconnect"),
Err(e) => warn!(
?remote_pid,
?e,
"Failed to get a message back from the scheduler, seems like the network is \
already closed"
),
}
}
trace!("Participants have shut down - next: Scheduler");
shutdown_scheduler_s
.send(())
.expect("Scheduler is closed, but nobody other should be able to close it");
if let Ok(return_s) = return_s {
if return_s.send(()).is_err() {
warn!("Network::drop stoped after a timeout and didn't wait for our shutdown");
};
}
debug!("Network has shut down");
}
}
impl Participant {
@ -456,15 +518,14 @@ impl Participant {
///
/// # Examples
/// ```rust
/// # use std::sync::Arc;
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, Pid, Promises, ProtocolAddr};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, connect on port 2100 and open a stream
/// let runtime = Arc::new(Runtime::new().unwrap());
/// let network = Network::new(Pid::new(), Arc::clone(&runtime));
/// # let remote = Network::new(Pid::new(), Arc::clone(&runtime));
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
/// runtime.block_on(async {
/// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2100".parse().unwrap())).await?;
/// let p1 = network
@ -473,6 +534,8 @@ impl Participant {
/// let _s1 = p1
/// .open(4, Promises::ORDERED | Promises::CONSISTENCY, 1000)
/// .await?;
/// drop(network);
/// # drop(remote);
/// # Ok(())
/// })
/// # }
@ -522,22 +585,23 @@ impl Participant {
///
/// # Examples
/// ```rust
/// # use std::sync::Arc;
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, Pid, ProtocolAddr, Promises};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, connect on port 2110 and wait for the other side to open a stream
/// // Note: It's quite unusual to actively connect, but then wait on a stream to be connected, usually the Application taking initiative want's to also create the first Stream.
/// let runtime = Arc::new(Runtime::new().unwrap());
/// let network = Network::new(Pid::new(), Arc::clone(&runtime));
/// # let remote = Network::new(Pid::new(), Arc::clone(&runtime));
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
/// runtime.block_on(async {
/// # remote.listen(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
/// let p1 = network.connect(ProtocolAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
/// # let p2 = remote.connected().await?;
/// # p2.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
/// let _s1 = p1.opened().await?;
/// drop(network);
/// # drop(remote);
/// # Ok(())
/// })
/// # }
@ -578,16 +642,15 @@ impl Participant {
///
/// # Examples
/// ```rust
/// # use std::sync::Arc;
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, Pid, ProtocolAddr};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection.
/// let runtime = Arc::new(Runtime::new().unwrap());
/// let network = Network::new(Pid::new(), Arc::clone(&runtime));
/// # let remote = Network::new(Pid::new(), Arc::clone(&runtime));
/// runtime.block_on(async {
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
/// let err = runtime.block_on(async {
/// network
/// .listen(ProtocolAddr::Tcp("127.0.0.1:2030".parse().unwrap()))
/// .await?;
@ -599,7 +662,10 @@ impl Participant {
/// # break;
/// }
/// # Ok(())
/// })
/// });
/// drop(network);
/// # drop(remote);
/// # err
/// # }
/// ```
///
@ -708,15 +774,14 @@ impl Stream {
/// # Example
/// ```
/// # use veloren_network::Promises;
/// # use std::sync::Arc;
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, ProtocolAddr, Pid};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World`
/// let runtime = Arc::new(Runtime::new().unwrap());
/// let network = Network::new(Pid::new(), Arc::clone(&runtime));
/// # let remote = Network::new(Pid::new(), Arc::clone(&runtime));
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
/// runtime.block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
@ -726,6 +791,8 @@ impl Stream {
/// let mut stream_a = participant_a.opened().await?;
/// //Send Message
/// stream_a.send("Hello World")?;
/// drop(network);
/// # drop(remote);
/// # Ok(())
/// })
/// # }
@ -748,16 +815,15 @@ impl Stream {
/// # Example
/// ```rust
/// # use veloren_network::Promises;
/// # use std::sync::Arc;
/// use tokio::runtime::Runtime;
/// use bincode;
/// use veloren_network::{Network, ProtocolAddr, Pid, Message};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// let runtime = Arc::new(Runtime::new().unwrap());
/// let network = Network::new(Pid::new(), Arc::clone(&runtime));
/// # let remote1 = Network::new(Pid::new(), Arc::clone(&runtime));
/// # let remote2 = Network::new(Pid::new(), Arc::clone(&runtime));
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote1 = Network::new(Pid::new(), &runtime);
/// # let remote2 = Network::new(Pid::new(), &runtime);
/// runtime.block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
/// # let remote1_p = remote1.connect(ProtocolAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
@ -775,6 +841,9 @@ impl Stream {
/// //Send same Message to multiple Streams
/// stream_a.send_raw(&msg);
/// stream_b.send_raw(&msg);
/// drop(network);
/// # drop(remote1);
/// # drop(remote2);
/// # Ok(())
/// })
/// # }
@ -806,15 +875,14 @@ impl Stream {
/// # Example
/// ```
/// # use veloren_network::Promises;
/// # use std::sync::Arc;
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, ProtocolAddr, Pid};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it
/// let runtime = Arc::new(Runtime::new().unwrap());
/// let network = Network::new(Pid::new(), Arc::clone(&runtime));
/// # let remote = Network::new(Pid::new(), Arc::clone(&runtime));
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
/// runtime.block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
@ -824,6 +892,8 @@ impl Stream {
/// let mut stream_a = participant_a.opened().await?;
/// //Recv Message
/// println!("{}", stream_a.recv::<String>().await?);
/// drop(network);
/// # drop(remote);
/// # Ok(())
/// })
/// # }
@ -839,15 +909,14 @@ impl Stream {
/// # Example
/// ```
/// # use veloren_network::Promises;
/// # use std::sync::Arc;
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, ProtocolAddr, Pid};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on Port `2230` and wait for a Stream to be opened, then listen on it
/// let runtime = Arc::new(Runtime::new().unwrap());
/// let network = Network::new(Pid::new(), Arc::clone(&runtime));
/// # let remote = Network::new(Pid::new(), Arc::clone(&runtime));
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
/// runtime.block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
@ -859,6 +928,8 @@ impl Stream {
/// let msg = stream_a.recv_raw().await?;
/// //Resend Message, without deserializing
/// stream_a.send_raw(&msg)?;
/// drop(network);
/// # drop(remote);
/// # Ok(())
/// })
/// # }
@ -894,15 +965,14 @@ impl Stream {
/// # Example
/// ```
/// # use veloren_network::Promises;
/// # use std::sync::Arc;
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, ProtocolAddr, Pid};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on Port `2240` and wait for a Stream to be opened, then listen on it
/// let runtime = Arc::new(Runtime::new().unwrap());
/// let network = Network::new(Pid::new(), Arc::clone(&runtime));
/// # let remote = Network::new(Pid::new(), Arc::clone(&runtime));
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
/// runtime.block_on(async {
/// network.listen(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
@ -913,6 +983,8 @@ impl Stream {
/// let mut stream_a = participant_a.opened().await?;
/// //Try Recv Message
/// println!("{:?}", stream_a.try_recv::<String>()?);
/// drop(network);
/// # drop(remote);
/// # Ok(())
/// })
/// # }
@ -952,60 +1024,61 @@ impl core::cmp::PartialEq for Participant {
}
}
fn actively_wait<T, F>(mut finished_receiver: oneshot::Receiver<T>, f: F)
where
F: FnOnce(T) + Send + 'static,
T: Send + 'static,
{
const CHANNEL_ERR: &str = "Something is wrong in internal scheduler/participant coding";
if let Ok(handle) = tokio::runtime::Handle::try_current() {
// When in Async Context WE MUST NOT SYNC BLOCK (as a deadlock might occur as
// other is queued behind). And we CANNOT join our Future_Handle
trace!("async context detected, defer shutdown");
handle.spawn(async move {
match finished_receiver.await {
Ok(data) => f(data),
Err(e) => panic!("{}: {}", CHANNEL_ERR, e),
}
});
} else {
let mut cnt = 0;
loop {
use tokio::sync::oneshot::error::TryRecvError;
match finished_receiver.try_recv() {
Ok(data) => {
f(data);
break;
},
Err(TryRecvError::Closed) => panic!(CHANNEL_ERR),
Err(TryRecvError::Empty) => {
trace!("activly sleeping");
cnt += 1;
if cnt > 120 {
error!("Timeout waiting for shutdown, dropping");
break;
}
std::thread::sleep(Duration::from_millis(100) * cnt);
},
}
}
};
}
impl Drop for Network {
#[instrument(name="network", skip(self), fields(p = %self.local_pid))]
fn drop(&mut self) {
debug!("Shutting down Network");
trace!("Shutting down Participants of Network, while we still have metrics");
let mut finished_receiver_list = vec![];
if tokio::runtime::Handle::try_current().is_ok() {
error!("we have a runtime but we mustn't, DROP NETWORK from async runtime is illegal")
}
tokio::task::block_in_place(|| {
self.runtime.block_on(async {
for (remote_pid, a2s_disconnect_s) in
self.participant_disconnect_sender.lock().await.drain()
{
match a2s_disconnect_s.lock().await.take() {
Some(a2s_disconnect_s) => {
trace!(?remote_pid, "Participants will be closed");
let (finished_sender, finished_receiver) = oneshot::channel();
finished_receiver_list.push((remote_pid, finished_receiver));
a2s_disconnect_s
.send((remote_pid, (Duration::from_secs(120), finished_sender)))
.expect(
"Scheduler is closed, but nobody other should be able to \
close it",
);
},
None => trace!(?remote_pid, "Participant already disconnected gracefully"),
}
}
//wait after close is requested for all
for (remote_pid, finished_receiver) in finished_receiver_list.drain(..) {
match finished_receiver.await {
Ok(Ok(())) => trace!(?remote_pid, "disconnect successful"),
Ok(Err(e)) => info!(?remote_pid, ?e, "unclean disconnect"),
Err(e) => warn!(
?remote_pid,
?e,
"Failed to get a message back from the scheduler, seems like the \
network is already closed"
),
}
}
});
});
trace!("Participants have shut down!");
trace!("Shutting down Scheduler");
self.shutdown_sender
trace!("Dropping Network");
let (finished_sender, finished_receiver) = oneshot::channel();
match self
.shutdown_network_s
.take()
.unwrap()
.send(())
.expect("Scheduler is closed, but nobody other should be able to close it");
debug!("Network has shut down");
.send(finished_sender)
{
Err(e) => warn!(?e, "Runtime seems to be dropped already"),
Ok(()) => actively_wait(finished_receiver, |()| info!("Network dropped gracefully")),
};
}
}
@ -1015,59 +1088,31 @@ impl Drop for Participant {
fn drop(&mut self) {
const SHUTDOWN_ERR: &str = "Error while dropping the participant, couldn't send all \
outgoing messages, dropping remaining";
const CHANNEL_ERR: &str = "Something is wrong in internal scheduler/participant coding";
use tokio::sync::oneshot::error::TryRecvError;
const SCHEDULER_ERR: &str =
"Something is wrong in internal scheduler coding or you dropped the runtime to early";
// ignore closed, as we need to send it even though we disconnected the
// participant from network
debug!("Shutting down Participant");
match self
.a2s_disconnect_s
.try_lock()
.expect("Participant in use while beeing dropped")
.take()
{
None => info!("Participant already has been shutdown gracefully"),
Some(a2s_disconnect_s) => {
debug!("Disconnect from Scheduler");
let (finished_sender, mut finished_receiver) = oneshot::channel();
a2s_disconnect_s
.send((self.remote_pid, (Duration::from_secs(120), finished_sender)))
.expect("Something is wrong in internal scheduler coding");
if let Ok(handle) = tokio::runtime::Handle::try_current() {
trace!("Participant drop Async");
handle.spawn(async move {
match finished_receiver.await {
Ok(Ok(())) => info!("Participant dropped gracefully"),
Ok(Err(e)) => error!(?e, SHUTDOWN_ERR),
Err(e) => panic!("{}: {}", CHANNEL_ERR, e),
}
});
} else {
let mut cnt = 0;
loop {
match finished_receiver.try_recv() {
Ok(Ok(())) => {
info!("Participant dropped gracefully");
break;
},
Ok(Err(e)) => {
error!(?e, SHUTDOWN_ERR);
break;
},
Err(TryRecvError::Closed) => panic!(CHANNEL_ERR),
Err(TryRecvError::Empty) => {
trace!("activly sleeping");
cnt += 1;
if cnt > 120 {
error!("Timeout waiting for participant shutdown, droping");
break;
}
std::thread::sleep(Duration::from_millis(100) * cnt);
},
}
match self.a2s_disconnect_s.try_lock() {
Err(e) => debug!(?e, "Participant is beeing dropped by Network right now"),
Ok(mut s) => match s.take() {
None => info!("Participant already has been shutdown gracefully"),
Some(a2s_disconnect_s) => {
debug!("Disconnect from Scheduler");
let (finished_sender, finished_receiver) = oneshot::channel();
match a2s_disconnect_s
.send((self.remote_pid, (Duration::from_secs(120), finished_sender)))
{
Err(e) => warn!(?e, SCHEDULER_ERR),
Ok(()) => {
actively_wait(finished_receiver, |d| match d {
Ok(()) => info!("Participant dropped gracefully"),
Err(e) => error!(?e, SHUTDOWN_ERR),
});
},
}
};
},
},
}
}

View File

@ -44,7 +44,7 @@
//! use veloren_network::{Network, Pid, Promises, ProtocolAddr};
//!
//! // Client
//! async fn client(runtime: Arc<Runtime>) -> std::result::Result<(), Box<dyn std::error::Error>> {
//! async fn client(runtime: &Runtime) -> std::result::Result<(), Box<dyn std::error::Error>> {
//! sleep(std::time::Duration::from_secs(1)).await; // `connect` MUST be after `listen`
//! let client_network = Network::new(Pid::new(), runtime);
//! let server = client_network
@ -58,7 +58,7 @@
//! }
//!
//! // Server
//! async fn server(runtime: Arc<Runtime>) -> std::result::Result<(), Box<dyn std::error::Error>> {
//! async fn server(runtime: &Runtime) -> std::result::Result<(), Box<dyn std::error::Error>> {
//! let server_network = Network::new(Pid::new(), runtime);
//! server_network
//! .listen(ProtocolAddr::Tcp("127.0.0.1:12345".parse().unwrap()))
@ -72,10 +72,9 @@
//! }
//!
//! fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
//! let runtime = Arc::new(Runtime::new().unwrap());
//! let runtime = Runtime::new().unwrap();
//! runtime.block_on(async {
//! let (result_c, result_s) =
//! join!(client(Arc::clone(&runtime)), server(Arc::clone(&runtime)),);
//! let (result_c, result_s) = join!(client(&runtime), server(&runtime),);
//! result_c?;
//! result_s?;
//! Ok(())

View File

@ -77,9 +77,9 @@ impl Message {
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on Port `2300` and wait for a Stream to be opened, then listen on it
/// # let runtime = Arc::new(Runtime::new().unwrap());
/// # let network = Network::new(Pid::new(), Arc::clone(&runtime));
/// # let remote = Network::new(Pid::new(), Arc::clone(&runtime));
/// # let runtime = Runtime::new().unwrap();
/// # let network = Network::new(Pid::new(), &runtime);
/// # let remote = Network::new(Pid::new(), &runtime);
/// # runtime.block_on(async {
/// # network.listen(ProtocolAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?;
/// # let remote_p = remote.connect(ProtocolAddr::Tcp("127.0.0.1:2300".parse().unwrap())).await?;
@ -90,6 +90,8 @@ impl Message {
/// //Recv Message
/// let msg = stream_a.recv_raw().await?;
/// println!("Msg is {}", msg.deserialize::<String>()?);
/// drop(network);
/// # drop(remote);
/// # Ok(())
/// # })
/// # }

View File

@ -629,10 +629,13 @@ impl Scheduler {
pid_oneshot.send(Ok(participant)).unwrap();
} else {
// no one is waiting on this Participant, return in to Network
participant_channels
if participant_channels
.s2a_connected_s
.send(participant)
.unwrap();
.is_err()
{
warn!("seems like Network already got closed");
};
}
} else {
let pi = &participants[&pid];

View File

@ -150,7 +150,7 @@ fn stream_send_100000_then_close_stream() {
#[test]
fn stream_send_100000_then_close_stream_remote() {
let (_, _) = helper::setup(false, 0);
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
@ -163,7 +163,7 @@ fn stream_send_100000_then_close_stream_remote() {
#[test]
fn stream_send_100000_then_close_stream_remote2() {
let (_, _) = helper::setup(false, 0);
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
@ -177,7 +177,7 @@ fn stream_send_100000_then_close_stream_remote2() {
#[test]
fn stream_send_100000_then_close_stream_remote3() {
let (_, _) = helper::setup(false, 0);
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
@ -191,7 +191,7 @@ fn stream_send_100000_then_close_stream_remote3() {
#[test]
fn close_part_then_network() {
let (_, _) = helper::setup(false, 0);
let (_, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
let (_r, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..1000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
@ -204,7 +204,7 @@ fn close_part_then_network() {
#[test]
fn close_network_then_part() {
let (_, _) = helper::setup(false, 0);
let (_, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
let (_r, n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..1000 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
@ -224,6 +224,61 @@ fn close_network_then_disconnect_part() {
drop(n_a);
assert!(r.block_on(p_a.disconnect()).is_err());
std::thread::sleep(std::time::Duration::from_millis(1000));
drop((_n_b, _p_b)); //clean teardown
}
#[test]
fn close_runtime_then_network() {
let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
drop(r);
drop(_n_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
drop(_p_b);
}
#[test]
fn close_runtime_then_part() {
let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
drop(r);
drop(_p_a);
std::thread::sleep(std::time::Duration::from_millis(1000));
drop(_p_b);
drop(_n_a);
}
#[test]
fn close_network_from_async() {
let (_, _) = helper::setup(false, 0);
let (r, _n_a, _p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
r.block_on(async move {
drop(_n_a);
});
drop(_p_b);
}
#[test]
fn close_part_from_async() {
let (_, _) = helper::setup(false, 0);
let (r, _n_a, p_a, mut s1_a, _n_b, _p_b, _s1_b) = network_participant_stream(tcp());
for _ in 0..100 {
s1_a.send("woop_PARTY_HARD_woop").unwrap();
}
r.block_on(async move {
p_a.disconnect().await.unwrap();
drop(_p_b);
});
drop(_n_a);
}
#[test]
@ -289,8 +344,8 @@ fn failed_stream_open_after_remote_part_is_closed() {
fn open_participant_before_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let r = Arc::new(Runtime::new().unwrap());
let n_a = Network::new(Pid::fake(0), Arc::clone(&r));
let n_b = Network::new(Pid::fake(1), Arc::clone(&r));
let n_a = Network::new(Pid::fake(0), &r);
let n_b = Network::new(Pid::fake(1), &r);
let addr = tcp();
r.block_on(n_a.listen(addr.clone())).unwrap();
let p_b = r.block_on(n_b.connect(addr)).unwrap();
@ -309,8 +364,8 @@ fn open_participant_before_remote_part_is_closed() {
fn open_participant_after_remote_part_is_closed() {
let (_, _) = helper::setup(false, 0);
let r = Arc::new(Runtime::new().unwrap());
let n_a = Network::new(Pid::fake(0), Arc::clone(&r));
let n_b = Network::new(Pid::fake(1), Arc::clone(&r));
let n_a = Network::new(Pid::fake(0), &r);
let n_b = Network::new(Pid::fake(1), &r);
let addr = tcp();
r.block_on(n_a.listen(addr.clone())).unwrap();
let p_b = r.block_on(n_b.connect(addr)).unwrap();
@ -329,8 +384,8 @@ fn open_participant_after_remote_part_is_closed() {
fn close_network_scheduler_completely() {
let (_, _) = helper::setup(false, 0);
let r = Arc::new(Runtime::new().unwrap());
let n_a = Network::new(Pid::fake(0), Arc::clone(&r));
let n_b = Network::new(Pid::fake(1), Arc::clone(&r));
let n_a = Network::new(Pid::fake(0), &r);
let n_b = Network::new(Pid::fake(1), &r);
let addr = tcp();
r.block_on(n_a.listen(addr.clone())).unwrap();
let p_b = r.block_on(n_b.connect(addr)).unwrap();
@ -353,7 +408,7 @@ fn close_network_scheduler_completely() {
#[test]
fn dont_panic_on_multiply_recv_after_close() {
let (_, _) = helper::setup(false, 0);
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(11u32).unwrap();
drop(s1_a);
@ -368,7 +423,7 @@ fn dont_panic_on_multiply_recv_after_close() {
#[test]
fn dont_panic_on_recv_send_after_close() {
let (_, _) = helper::setup(false, 0);
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(11u32).unwrap();
drop(s1_a);
@ -381,7 +436,7 @@ fn dont_panic_on_recv_send_after_close() {
#[test]
fn dont_panic_on_multiple_send_after_close() {
let (_, _) = helper::setup(false, 0);
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(11u32).unwrap();
drop(s1_a);

View File

@ -59,8 +59,8 @@ pub fn network_participant_stream(
) {
let runtime = Arc::new(Runtime::new().unwrap());
let (n_a, p1_a, s1_a, n_b, p1_b, s1_b) = runtime.block_on(async {
let n_a = Network::new(Pid::fake(0), Arc::clone(&runtime));
let n_b = Network::new(Pid::fake(1), Arc::clone(&runtime));
let n_a = Network::new(Pid::fake(0), &runtime);
let n_b = Network::new(Pid::fake(1), &runtime);
n_a.listen(addr.clone()).await.unwrap();
let p1_b = n_b.connect(addr).await.unwrap();

View File

@ -27,7 +27,7 @@ fn stream_simple() {
#[test]
fn stream_try_recv() {
let (_, _) = helper::setup(false, 0);
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send(4242u32).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
@ -104,8 +104,8 @@ fn stream_simple_udp_3msg() {
fn tcp_and_udp_2_connections() -> std::result::Result<(), Box<dyn std::error::Error>> {
let (_, _) = helper::setup(false, 0);
let r = Arc::new(Runtime::new().unwrap());
let network = Network::new(Pid::new(), Arc::clone(&r));
let remote = Network::new(Pid::new(), Arc::clone(&r));
let network = Network::new(Pid::new(), &r);
let remote = Network::new(Pid::new(), &r);
r.block_on(async {
let network = network;
let remote = remote;
@ -131,14 +131,14 @@ fn tcp_and_udp_2_connections() -> std::result::Result<(), Box<dyn std::error::Er
fn failed_listen_on_used_ports() -> std::result::Result<(), Box<dyn std::error::Error>> {
let (_, _) = helper::setup(false, 0);
let r = Arc::new(Runtime::new().unwrap());
let network = Network::new(Pid::new(), Arc::clone(&r));
let network = Network::new(Pid::new(), &r);
let udp1 = udp();
let tcp1 = tcp();
r.block_on(network.listen(udp1.clone()))?;
r.block_on(network.listen(tcp1.clone()))?;
std::thread::sleep(std::time::Duration::from_millis(200));
let network2 = Network::new(Pid::new(), Arc::clone(&r));
let network2 = Network::new(Pid::new(), &r);
let e1 = r.block_on(network2.listen(udp1));
let e2 = r.block_on(network2.listen(tcp1));
match e1 {
@ -164,8 +164,8 @@ fn api_stream_send_main() -> std::result::Result<(), Box<dyn std::error::Error>>
// Create a Network, listen on Port `1200` and wait for a Stream to be opened,
// then answer `Hello World`
let r = Arc::new(Runtime::new().unwrap());
let network = Network::new(Pid::new(), Arc::clone(&r));
let remote = Network::new(Pid::new(), Arc::clone(&r));
let network = Network::new(Pid::new(), &r);
let remote = Network::new(Pid::new(), &r);
r.block_on(async {
let network = network;
let remote = remote;
@ -193,8 +193,8 @@ fn api_stream_recv_main() -> std::result::Result<(), Box<dyn std::error::Error>>
// Create a Network, listen on Port `1220` and wait for a Stream to be opened,
// then listen on it
let r = Arc::new(Runtime::new().unwrap());
let network = Network::new(Pid::new(), Arc::clone(&r));
let remote = Network::new(Pid::new(), Arc::clone(&r));
let network = Network::new(Pid::new(), &r);
let remote = Network::new(Pid::new(), &r);
r.block_on(async {
let network = network;
let remote = remote;
@ -232,7 +232,7 @@ fn wrong_parse() {
#[test]
fn multiple_try_recv() {
let (_, _) = helper::setup(false, 0);
let (_, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
let (_r, _n_a, _p_a, mut s1_a, _n_b, _p_b, mut s1_b) = network_participant_stream(tcp());
s1_a.send("asd").unwrap();
s1_a.send(11u32).unwrap();

View File

@ -370,7 +370,7 @@ impl Server {
registry_state(&registry).expect("failed to register state metrics");
registry_physics(&registry).expect("failed to register state metrics");
let network = Network::new_with_registry(Pid::new(), Arc::clone(&runtime), &registry);
let network = Network::new_with_registry(Pid::new(), &runtime, &registry);
let metrics_shutdown = Arc::new(Notify::new());
let metrics_shutdown_clone = Arc::clone(&metrics_shutdown);
let addr = settings.metrics_address;
@ -383,6 +383,7 @@ impl Server {
.await
});
runtime.block_on(network.listen(ProtocolAddr::Tcp(settings.gameserver_address)))?;
runtime.block_on(network.listen(ProtocolAddr::Mpsc(14004)))?;
let connection_handler = ConnectionHandler::new(network, &runtime);
// Initiate real-time world simulation

View File

@ -69,16 +69,13 @@ impl PlayState for MainMenuState {
if let Some(singleplayer) = &global_state.singleplayer {
match singleplayer.receiver.try_recv() {
Ok(Ok(runtime)) => {
let server_settings = singleplayer.settings();
// Attempt login after the server is finished initializing
attempt_login(
&mut global_state.settings,
&mut global_state.info_message,
"singleplayer".to_owned(),
"".to_owned(),
ClientConnArgs::Resolved(ConnectionArgs::IpAndPort(vec![
server_settings.gameserver_address,
])),
ClientConnArgs::Resolved(ConnectionArgs::Mpsc(14004)),
&mut self.client_init,
Some(runtime),
);