fixing all tests and doc tests including some deadlocks

This commit is contained in:
Marcel Märtens 2020-05-26 15:06:03 +02:00
parent 9550da87b8
commit 6e776e449f
10 changed files with 467 additions and 272 deletions

View File

@ -13,6 +13,7 @@ use prometheus::Registry;
use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::HashMap,
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
@ -25,8 +26,8 @@ use uvth::ThreadPool;
/// Represents a Tcp or Udp or Mpsc address
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum Address {
Tcp(std::net::SocketAddr),
Udp(std::net::SocketAddr),
Tcp(SocketAddr),
Udp(SocketAddr),
Mpsc(u64),
}
@ -109,15 +110,22 @@ pub enum StreamError {
///
/// # Examples
/// ```rust
/// use veloren_network::{Network, Pid};
/// use veloren_network::{Network, Address, Pid};
/// use uvth::ThreadPoolBuilder;
/// use futures::executor::block_on;
///
/// // Create a Network, listen on port `12345` to accept connections and connect to port `80` to connect to a (pseudo) database Application
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
/// block_on(async {
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `12345` to accept connections and connect to port `8080` to connect to a (pseudo) database Application
/// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// block_on(async{
/// # //setup pseudo database!
/// # let database = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// # database.listen(Address::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
/// network.listen(Address::Tcp("127.0.0.1:12345".parse().unwrap())).await?;
/// let database = network.connect(Address::Tcp("127.0.0.1:80".parse().unwrap())).await?;
/// });
/// let database = network.connect(Address::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Participants`]: crate::api::Participant
@ -150,9 +158,9 @@ impl Network {
/// # Examples
/// ```rust
/// use uvth::ThreadPoolBuilder;
/// use veloren_network::{Network, Pid};
/// use veloren_network::{Address, Network, Pid};
///
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
/// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// ```
///
/// Usually you only create a single `Network` for an application, except
@ -194,11 +202,13 @@ impl Network {
///
/// # Examples
/// ```rust
/// use futures::executor::block_on;
/// use uvth::ThreadPoolBuilder;
/// use veloren_network::{Network, Pid};
/// use veloren_network::{Address, Network, Pid};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
/// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// block_on(async {
/// network
/// .listen(Address::Tcp("0.0.0.0:2000".parse().unwrap()))
@ -206,7 +216,9 @@ impl Network {
/// network
/// .listen(Address::Udp("127.0.0.1:2001".parse().unwrap()))
/// .await?;
/// });
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`connected`]: Network::connected
@ -231,20 +243,30 @@ impl Network {
/// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g.
/// can't connect, or invalid Handshake) # Examples
/// ```rust
/// use futures::executor::block_on;
/// use uvth::ThreadPoolBuilder;
/// use veloren_network::{Network, Pid};
/// use veloren_network::{Address, Network, Pid};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, connect on port `2000` TCP and `2001` UDP like listening above
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
/// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// block_on(async {
/// # remote.listen(Address::Tcp("0.0.0.0:2000".parse().unwrap())).await?;
/// # remote.listen(Address::Udp("0.0.0.0:2001".parse().unwrap())).await?;
/// let p1 = network
/// .connect(Address::Tcp("127.0.0.1:2000".parse().unwrap()))
/// .await?;
/// # //this doesn't work yet, so skip the test
/// # //TODO fixme!
/// # return Ok(());
/// let p2 = network
/// .connect(Address::Udp("127.0.0.1:2001".parse().unwrap()))
/// .await?;
/// assert!(p1.ptr_eq(p2));
/// });
/// assert!(std::sync::Arc::ptr_eq(&p1, &p2));
/// # Ok(())
/// })
/// # }
/// ```
/// Usually the `Network` guarantees that a operation on a [`Participant`]
/// succeeds, e.g. by automatic retrying unless it fails completely e.g. by
@ -284,19 +306,27 @@ impl Network {
///
/// # Examples
/// ```rust
/// use futures::executor::block_on;
/// use uvth::ThreadPoolBuilder;
/// use veloren_network::{Network, Pid};
/// use veloren_network::{Address, Network, Pid};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2000` TCP and opens returns their Pid
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
/// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// block_on(async {
/// network
/// .listen(Address::Tcp("0.0.0.0:2000".parse().unwrap()))
/// .await?;
/// while let Some(participant) = network.connected().await? {
/// # remote.connect(Address::Tcp("0.0.0.0:2000".parse().unwrap())).await?;
/// while let Ok(participant) = network.connected().await {
/// println!("Participant connected: {}", participant.remote_pid());
/// # //skip test here as it would be a endless loop
/// # break;
/// }
/// });
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Streams`]: crate::api::Stream
@ -324,20 +354,28 @@ impl Network {
///
/// # Examples
/// ```rust
/// use futures::executor::block_on;
/// use uvth::ThreadPoolBuilder;
/// use veloren_network::{Network, Pid};
/// use veloren_network::{Address, Network, Pid};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2000` TCP and opens returns their Pid and close connection.
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
/// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// block_on(async {
/// network
/// .listen(Address::Tcp("0.0.0.0:2000".parse().unwrap()))
/// .await?;
/// while let Some(participant) = network.connected().await? {
/// # remote.connect(Address::Tcp("0.0.0.0:2000".parse().unwrap())).await?;
/// while let Ok(participant) = network.connected().await {
/// println!("Participant connected: {}", participant.remote_pid());
/// network.disconnect(participant).await?;
/// # //skip test here as it would be a endless loop
/// # break;
/// }
/// });
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Arc<Participant>`]: crate::api::Participant
@ -426,19 +464,23 @@ impl Participant {
///
/// # Examples
/// ```rust
/// use futures::executor::block_on;
/// use uvth::ThreadPoolBuilder;
/// use veloren_network::{Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
/// use veloren_network::{Address, Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, connect on port 2000 and open a stream
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
/// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// block_on(async {
/// # remote.listen(Address::Tcp("0.0.0.0:2000".parse().unwrap())).await?;
/// let p1 = network
/// .connect(Address::Tcp("127.0.0.1:2000".parse().unwrap()))
/// .await?;
/// let _s1 = p1
/// .open(100, PROMISES_ORDERED | PROMISES_CONSISTENCY)
/// .await?;
/// });
/// let _s1 = p1.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Streams`]: crate::api::Stream
@ -483,16 +525,24 @@ impl Participant {
///
/// # Examples
/// ```rust
/// use veloren_network::{Network, Pid, PROMISES_ORDERED, PROMISES_CONSISTENCY};
/// use veloren_network::{Network, Pid, Address, PROMISES_ORDERED, PROMISES_CONSISTENCY};
/// use uvth::ThreadPoolBuilder;
/// use futures::executor::block_on;
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, connect on port 2000 and wait for the other side to open a stream
/// // Note: It's quite unusal to activly connect, but then wait on a stream to be connected, usually the Appication taking initiative want's to also create the first Stream.
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
/// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// block_on(async {
/// # remote.listen(Address::Tcp("0.0.0.0:2000".parse().unwrap())).await?;
/// let p1 = network.connect(Address::Tcp("127.0.0.1:2000".parse().unwrap())).await?;
/// # let p2 = remote.connected().await?;
/// # p2.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// let _s1 = p1.opened().await?;
/// });
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`Streams`]: crate::api::Stream
@ -569,16 +619,26 @@ impl Stream {
///
/// # Example
/// ```rust
/// use veloren_network::{Network, Address, Pid};
/// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY};
/// use uvth::ThreadPoolBuilder;
/// use futures::executor::block_on;
/// use veloren_network::{Network, Pid};
///
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on Port `2000` and wait for a Stream to be opened, then answer `Hello World`
/// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// # let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// block_on(async {
/// let participant_a = network.connected().await;
/// let mut stream_a = participant_a.opened().await;
/// network.listen(Address::Tcp("127.0.0.1:2000".parse().unwrap())).await?;
/// # let remote_p = remote.connect(Address::Tcp("127.0.0.1:2000".parse().unwrap())).await?;
/// # remote_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// let participant_a = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// //Send Message
/// stream_a.send("Hello World");
/// });
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`send_raw`]: Stream::send_raw
@ -596,26 +656,40 @@ impl Stream {
///
/// # Example
/// ```rust
/// use bincode;
/// use veloren_network::{Network, Address, Pid, MessageBuffer};
/// # use veloren_network::{PROMISES_ORDERED, PROMISES_CONSISTENCY};
/// use futures::executor::block_on;
/// use veloren_network::{Network, Pid};
/// use uvth::ThreadPoolBuilder;
/// use bincode;
/// use std::sync::Arc;
///
/// let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// # let remote1 = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// # let remote2 = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
/// block_on(async {
/// let participant_a = network.connected().await;
/// let participant_b = network.connected().await;
/// let mut stream_a = participant_a.opened().await;
/// let mut stream_b = participant_a.opened().await;
/// network.listen(Address::Tcp("127.0.0.1:2000".parse().unwrap())).await?;
/// # let remote1_p = remote1.connect(Address::Tcp("127.0.0.1:2000".parse().unwrap())).await?;
/// # let remote2_p = remote2.connect(Address::Tcp("127.0.0.1:2000".parse().unwrap())).await?;
/// # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid());
/// # remote1_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// # remote2_p.open(16, PROMISES_ORDERED | PROMISES_CONSISTENCY).await?;
/// let participant_a = network.connected().await?;
/// let participant_b = network.connected().await?;
/// let mut stream_a = participant_a.opened().await?;
/// let mut stream_b = participant_b.opened().await?;
///
/// //Prepare Message and decode it
/// let msg = "Hello World";
/// let raw_msg = Arc::new(MessageBuffer {
/// let raw_msg = Arc::new(MessageBuffer{
/// data: bincode::serialize(&msg).unwrap(),
/// });
/// //Send same Message to multiple Streams
/// stream_a.send_raw(raw_msg.clone());
/// stream_b.send_raw(raw_msg.clone());
/// });
/// # Ok(())
/// })
/// # }
/// ```
///
/// [`send`]: Stream::send
@ -807,3 +881,32 @@ impl From<oneshot::Canceled> for ParticipantError {
impl From<oneshot::Canceled> for NetworkError {
fn from(_err: oneshot::Canceled) -> Self { NetworkError::NetworkClosed }
}
impl core::fmt::Display for StreamError {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
match self {
StreamError::StreamClosed => write!(f, "stream closed"),
}
}
}
impl core::fmt::Display for ParticipantError {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
match self {
ParticipantError::ParticipantClosed => write!(f, "participant closed"),
}
}
}
impl core::fmt::Display for NetworkError {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
match self {
NetworkError::NetworkClosed => write!(f, "network closed"),
NetworkError::ListenFailed(_) => write!(f, "listening failed"),
}
}
}
impl std::error::Error for StreamError {}
impl std::error::Error for ParticipantError {}
impl std::error::Error for NetworkError {}

View File

@ -70,9 +70,11 @@ impl Channel {
}
}
#[derive(Debug)]
pub(crate) struct Handshake {
cid: Cid,
local_pid: Pid,
secret: u128,
init_handshake: bool,
metrics: Arc<NetworkMetrics>,
}
@ -91,18 +93,20 @@ impl Handshake {
pub fn new(
cid: u64,
local_pid: Pid,
secret: u128,
metrics: Arc<NetworkMetrics>,
init_handshake: bool,
) -> Self {
Self {
cid,
local_pid,
secret,
metrics,
init_handshake,
}
}
pub async fn setup(self, protocol: &Protocols) -> Result<(Pid, Sid), ()> {
pub async fn setup(self, protocol: &Protocols) -> Result<(Pid, Sid, u128), ()> {
let (to_wire_sender, to_wire_receiver) = mpsc::unbounded::<Frame>();
let (from_wire_sender, from_wire_receiver) = mpsc::unbounded::<(Cid, Frame)>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
@ -134,7 +138,7 @@ impl Handshake {
mut from_wire_receiver: mpsc::UnboundedReceiver<(Cid, Frame)>,
mut to_wire_sender: mpsc::UnboundedSender<Frame>,
_read_stop_sender: oneshot::Sender<()>,
) -> Result<(Pid, Sid), ()> {
) -> Result<(Pid, Sid, u128), ()> {
const ERR_S: &str = "Got A Raw Message, these are usually Debug Messages indicating that \
something went wrong on network layer and connection will be closed";
let mut pid_string = "".to_string();
@ -203,7 +207,7 @@ impl Handshake {
}
debug!("handshake completed");
if self.init_handshake {
self.send_pid(&mut to_wire_sender, &pid_string).await;
self.send_init(&mut to_wire_sender, &pid_string).await;
} else {
self.send_handshake(&mut to_wire_sender).await;
}
@ -238,7 +242,7 @@ impl Handshake {
};
match from_wire_receiver.next().await {
Some((_, Frame::ParticipantId { pid })) => {
Some((_, Frame::Init { pid, secret })) => {
debug!(?pid, "Participant send their ID");
pid_string = pid.to_string();
self.metrics
@ -248,11 +252,11 @@ impl Handshake {
let stream_id_offset = if self.init_handshake {
STREAM_ID_OFFSET1
} else {
self.send_pid(&mut to_wire_sender, &pid_string).await;
self.send_init(&mut to_wire_sender, &pid_string).await;
STREAM_ID_OFFSET2
};
info!(?pid, "this Handshake is now configured!");
return Ok((pid, stream_id_offset));
return Ok((pid, stream_id_offset, secret));
},
Some((_, Frame::Shutdown)) => {
info!("shutdown signal received");
@ -298,14 +302,15 @@ impl Handshake {
.unwrap();
}
async fn send_pid(&self, to_wire_sender: &mut mpsc::UnboundedSender<Frame>, pid_string: &str) {
async fn send_init(&self, to_wire_sender: &mut mpsc::UnboundedSender<Frame>, pid_string: &str) {
self.metrics
.frames_out_total
.with_label_values(&[pid_string, &self.cid.to_string(), "ParticipantId"])
.inc();
to_wire_sender
.send(Frame::ParticipantId {
.send(Frame::Init {
pid: self.local_pid,
secret: self.secret,
})
.await
.unwrap();

View File

@ -35,37 +35,47 @@
//!
//! # Examples
//! ```rust
//! // Client
//! use futures::executor::block_on;
//! use veloren_network::{Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
//! use async_std::task::sleep;
//! use futures::{executor::block_on, join};
//! use uvth::ThreadPoolBuilder;
//! use veloren_network::{Address, Network, Pid, PROMISES_CONSISTENCY, PROMISES_ORDERED};
//!
//! let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
//! block_on(async {
//! let server = network
//! // Client
//! async fn client() -> std::result::Result<(), Box<dyn std::error::Error>> {
//! sleep(std::time::Duration::from_secs(1)).await; // `connect` MUST be after `listen`
//! let client_network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
//! let server = client_network
//! .connect(Address::Tcp("127.0.0.1:12345".parse().unwrap()))
//! .await?;
//! let stream = server
//! let mut stream = server
//! .open(10, PROMISES_ORDERED | PROMISES_CONSISTENCY)
//! .await?;
//! stream.send("Hello World")?;
//! });
//! ```
//! Ok(())
//! }
//!
//! ```rust
//! // Server
//! use futures::executor::block_on;
//! use veloren_network::{Network, Pid};
//!
//! let network = Network::new(Pid::new(), ThreadPoolBuilder::new().build(), None);
//! block_on(async {
//! network
//! async fn server() -> std::result::Result<(), Box<dyn std::error::Error>> {
//! let server_network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
//! server_network
//! .listen(Address::Tcp("127.0.0.1:12345".parse().unwrap()))
//! .await?;
//! let client = network.connected().await?;
//! let stream = server.opened().await?;
//! let client = server_network.connected().await?;
//! let mut stream = client.opened().await?;
//! let msg: String = stream.recv().await?;
//! println!("got message: {}", msg);
//! });
//! assert_eq!(msg, "Hello World");
//! Ok(())
//! }
//!
//! fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
//! block_on(async {
//! let (result_c, result_s) = join!(client(), server(),);
//! result_c?;
//! result_s?;
//! Ok(())
//! })
//! }
//! ```
//!
//! [`Network`]: crate::api::Network

View File

@ -255,7 +255,7 @@ pub(crate) struct PidCidFrameCache<T: MetricVecBuilder> {
pub(crate) struct PidCidFrameCache {
metric: IntCounterVec,
pid: String,
cache: Vec<[GenericCounter<AtomicI64>; 8]>,
cache: Vec<[GenericCounter<AtomicI64>; Frame::FRAMES_LEN as usize]>,
}
impl PidCidFrameCache {
@ -308,7 +308,7 @@ impl PidCidFrameCache {
}
pub(crate) struct CidFrameCache {
cache: [GenericCounter<AtomicI64>; 8],
cache: [GenericCounter<AtomicI64>; Frame::FRAMES_LEN as usize],
}
impl CidFrameCache {

View File

@ -118,7 +118,7 @@ impl BParticipant {
let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel();
let (b2b_prios_flushed_s, b2b_prios_flushed_r) = oneshot::channel();
let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<(Cid, Frame)>();
let (prios, a2p_msg_s, p2b_notify_empty_stream_s) = PrioManager::new();
let (prios, a2p_msg_s, b2p_notify_empty_stream_s) = PrioManager::new();
let run_channels = self.run_channels.take().unwrap();
futures::join!(
@ -139,7 +139,7 @@ impl BParticipant {
self.stream_close_mgr(
run_channels.a2b_close_stream_r,
shutdown_stream_close_mgr_receiver,
p2b_notify_empty_stream_s,
b2p_notify_empty_stream_s,
),
self.participant_shutdown_mgr(
run_channels.s2b_shutdown_bparticipant_r,
@ -182,12 +182,14 @@ impl BParticipant {
}
async_std::task::sleep(TICK_TIME).await;
//shutdown after all msg are send!
if !closing_up && shutdown_send_mgr_receiver.try_recv().unwrap().is_some() {
closing_up = true;
}
if closing_up && (len == 0) {
break;
}
//this IF below the break IF to give it another chance to close all streams
// closed
if !closing_up && shutdown_send_mgr_receiver.try_recv().unwrap().is_some() {
closing_up = true;
}
}
trace!("stop send_mgr");
b2b_prios_flushed_s.send(()).unwrap();
@ -403,7 +405,9 @@ impl BParticipant {
b2b_prios_flushed_r.await.unwrap();
debug!("closing all channels");
for ci in self.channels.write().await.drain(..) {
ci.b2r_read_shutdown.send(()).unwrap();
if let Err(e) = ci.b2r_read_shutdown.send(()) {
debug!(?e, ?ci.cid, "seems like this read protocol got already dropped by closing the Stream itself, just ignoring the fact");
};
}
//Wait for other bparticipants mgr to close via AtomicUsize
const SLEEP_TIME: std::time::Duration = std::time::Duration::from_millis(5);
@ -430,7 +434,7 @@ impl BParticipant {
&self,
mut a2b_close_stream_r: mpsc::UnboundedReceiver<Sid>,
shutdown_stream_close_mgr_receiver: oneshot::Receiver<()>,
p2b_notify_empty_stream_s: std::sync::mpsc::Sender<(Sid, oneshot::Sender<()>)>,
b2p_notify_empty_stream_s: std::sync::mpsc::Sender<(Sid, oneshot::Sender<()>)>,
) {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start stream_close_mgr");
@ -464,7 +468,7 @@ impl BParticipant {
trace!(?sid, "wait for stream to be flushed");
let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) = oneshot::channel();
p2b_notify_empty_stream_s
b2p_notify_empty_stream_s
.send((sid, s2b_stream_finished_closed_s))
.unwrap();
s2b_stream_finished_closed_r.await.unwrap();

View File

@ -143,7 +143,7 @@ impl PrioManager {
)
}
fn tick(&mut self) {
async fn tick(&mut self) {
// Check Range
let mut times = 0;
let mut closed = 0;
@ -170,9 +170,7 @@ impl PrioManager {
cnt.empty_notify = Some(return_sender);
} else {
// return immediately
futures::executor::block_on(async {
return_sender.send(()).unwrap();
});
return_sender.send(()).unwrap();
}
}
if times > 0 || closed > 0 {
@ -241,7 +239,7 @@ impl PrioManager {
no_of_frames: usize,
frames: &mut E,
) {
self.tick();
self.tick().await;
for _ in 0..no_of_frames {
match self.calc_next_prio() {
Some(prio) => {
@ -304,8 +302,9 @@ mod tests {
use crate::{
message::{MessageBuffer, OutGoingMessage},
prios::*,
types::{Frame, Pid, Prio, Sid},
types::{Frame, Prio, Sid},
};
use futures::executor::block_on;
use std::{collections::VecDeque, sync::Arc};
const SIZE: u64 = PrioManager::FRAME_DATA_SIZE;
@ -340,7 +339,7 @@ mod tests {
let frame = frames
.pop_front()
.expect("frames vecdeque doesn't contain enough frames!")
.2;
.1;
if let Frame::DataHeader { mid, sid, length } = frame {
assert_eq!(mid, 1);
assert_eq!(sid, Sid::new(f_sid));
@ -354,7 +353,7 @@ mod tests {
let frame = frames
.pop_front()
.expect("frames vecdeque doesn't contain enough frames!")
.2;
.1;
if let Frame::Data { mid, start, data } = frame {
assert_eq!(mid, 1);
assert_eq!(start, f_start);
@ -364,20 +363,12 @@ mod tests {
}
}
fn assert_contains(mgr: &PrioManager, sid: u64) {
assert!(mgr.contains_pid_sid(Pid::fake(0), Sid::new(sid)));
}
fn assert_no_contains(mgr: &PrioManager, sid: u64) {
assert!(!mgr.contains_pid_sid(Pid::fake(0), Sid::new(sid)));
}
#[test]
fn single_p16() {
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out(16, 1337)).unwrap();
let (mut mgr, msg_tx, _flush_tx) = PrioManager::new();
msg_tx.send(mock_out(16, 1337)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(100, &mut frames);
block_on(mgr.fill_frames(100, &mut frames));
assert_header(&mut frames, 1337, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
@ -386,17 +377,12 @@ mod tests {
#[test]
fn single_p16_p20() {
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out(16, 1337)).unwrap();
tx.send(mock_out(20, 42)).unwrap();
let (mut mgr, msg_tx, _flush_tx) = PrioManager::new();
msg_tx.send(mock_out(16, 1337)).unwrap();
msg_tx.send(mock_out(20, 42)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(100, &mut frames);
assert_no_contains(&mgr, 1337);
assert_no_contains(&mgr, 42);
assert_no_contains(&mgr, 666);
block_on(mgr.fill_frames(100, &mut frames));
assert_header(&mut frames, 1337, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
assert_header(&mut frames, 42, 3);
@ -406,11 +392,11 @@ mod tests {
#[test]
fn single_p20_p16() {
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out(20, 42)).unwrap();
tx.send(mock_out(16, 1337)).unwrap();
let (mut mgr, msg_tx, _flush_tx) = PrioManager::new();
msg_tx.send(mock_out(20, 42)).unwrap();
msg_tx.send(mock_out(16, 1337)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(100, &mut frames);
block_on(mgr.fill_frames(100, &mut frames));
assert_header(&mut frames, 1337, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
@ -421,22 +407,22 @@ mod tests {
#[test]
fn multiple_p16_p20() {
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out(20, 2)).unwrap();
tx.send(mock_out(16, 1)).unwrap();
tx.send(mock_out(16, 3)).unwrap();
tx.send(mock_out(16, 5)).unwrap();
tx.send(mock_out(20, 4)).unwrap();
tx.send(mock_out(20, 7)).unwrap();
tx.send(mock_out(16, 6)).unwrap();
tx.send(mock_out(20, 10)).unwrap();
tx.send(mock_out(16, 8)).unwrap();
tx.send(mock_out(20, 12)).unwrap();
tx.send(mock_out(16, 9)).unwrap();
tx.send(mock_out(16, 11)).unwrap();
tx.send(mock_out(20, 13)).unwrap();
let (mut mgr, msg_tx, _flush_tx) = PrioManager::new();
msg_tx.send(mock_out(20, 2)).unwrap();
msg_tx.send(mock_out(16, 1)).unwrap();
msg_tx.send(mock_out(16, 3)).unwrap();
msg_tx.send(mock_out(16, 5)).unwrap();
msg_tx.send(mock_out(20, 4)).unwrap();
msg_tx.send(mock_out(20, 7)).unwrap();
msg_tx.send(mock_out(16, 6)).unwrap();
msg_tx.send(mock_out(20, 10)).unwrap();
msg_tx.send(mock_out(16, 8)).unwrap();
msg_tx.send(mock_out(20, 12)).unwrap();
msg_tx.send(mock_out(16, 9)).unwrap();
msg_tx.send(mock_out(16, 11)).unwrap();
msg_tx.send(mock_out(20, 13)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(100, &mut frames);
block_on(mgr.fill_frames(100, &mut frames));
for i in 1..14 {
assert_header(&mut frames, i, 3);
@ -447,34 +433,29 @@ mod tests {
#[test]
fn multiple_fill_frames_p16_p20() {
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out(20, 2)).unwrap();
tx.send(mock_out(16, 1)).unwrap();
tx.send(mock_out(16, 3)).unwrap();
tx.send(mock_out(16, 5)).unwrap();
tx.send(mock_out(20, 4)).unwrap();
tx.send(mock_out(20, 7)).unwrap();
tx.send(mock_out(16, 6)).unwrap();
tx.send(mock_out(20, 10)).unwrap();
tx.send(mock_out(16, 8)).unwrap();
tx.send(mock_out(20, 12)).unwrap();
tx.send(mock_out(16, 9)).unwrap();
tx.send(mock_out(16, 11)).unwrap();
tx.send(mock_out(20, 13)).unwrap();
let (mut mgr, msg_tx, _flush_tx) = PrioManager::new();
msg_tx.send(mock_out(20, 2)).unwrap();
msg_tx.send(mock_out(16, 1)).unwrap();
msg_tx.send(mock_out(16, 3)).unwrap();
msg_tx.send(mock_out(16, 5)).unwrap();
msg_tx.send(mock_out(20, 4)).unwrap();
msg_tx.send(mock_out(20, 7)).unwrap();
msg_tx.send(mock_out(16, 6)).unwrap();
msg_tx.send(mock_out(20, 10)).unwrap();
msg_tx.send(mock_out(16, 8)).unwrap();
msg_tx.send(mock_out(20, 12)).unwrap();
msg_tx.send(mock_out(16, 9)).unwrap();
msg_tx.send(mock_out(16, 11)).unwrap();
msg_tx.send(mock_out(20, 13)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(3, &mut frames);
assert_no_contains(&mgr, 1);
assert_no_contains(&mgr, 3);
assert_contains(&mgr, 13);
block_on(mgr.fill_frames(3, &mut frames));
for i in 1..4 {
assert_header(&mut frames, i, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
}
assert!(frames.is_empty());
mgr.fill_frames(11, &mut frames);
block_on(mgr.fill_frames(11, &mut frames));
for i in 4..14 {
assert_header(&mut frames, i, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
@ -484,10 +465,10 @@ mod tests {
#[test]
fn single_large_p16() {
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out_large(16, 1)).unwrap();
let (mut mgr, msg_tx, _flush_tx) = PrioManager::new();
msg_tx.send(mock_out_large(16, 1)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(100, &mut frames);
block_on(mgr.fill_frames(100, &mut frames));
assert_header(&mut frames, 1, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
@ -498,11 +479,11 @@ mod tests {
#[test]
fn multiple_large_p16() {
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out_large(16, 1)).unwrap();
tx.send(mock_out_large(16, 2)).unwrap();
let (mut mgr, msg_tx, _flush_tx) = PrioManager::new();
msg_tx.send(mock_out_large(16, 1)).unwrap();
msg_tx.send(mock_out_large(16, 2)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(100, &mut frames);
block_on(mgr.fill_frames(100, &mut frames));
assert_header(&mut frames, 1, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
@ -517,11 +498,11 @@ mod tests {
#[test]
fn multiple_large_p16_sudden_p0() {
let (mut mgr, tx) = PrioManager::new();
tx.send(mock_out_large(16, 1)).unwrap();
tx.send(mock_out_large(16, 2)).unwrap();
let (mut mgr, msg_tx, _flush_tx) = PrioManager::new();
msg_tx.send(mock_out_large(16, 1)).unwrap();
msg_tx.send(mock_out_large(16, 2)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(3, &mut frames);
block_on(mgr.fill_frames(3, &mut frames));
assert_header(&mut frames, 1, SIZE * 2 + 20);
assert_data(&mut frames, 0, vec![48; USIZE]);
@ -529,8 +510,8 @@ mod tests {
assert_data(&mut frames, 0, vec![48; USIZE]);
assert_data(&mut frames, SIZE, vec![49; USIZE]);
tx.send(mock_out(0, 3)).unwrap();
mgr.fill_frames(100, &mut frames);
msg_tx.send(mock_out(0, 3)).unwrap();
block_on(mgr.fill_frames(100, &mut frames));
assert_header(&mut frames, 3, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
@ -543,15 +524,15 @@ mod tests {
#[test]
fn single_p20_thousand_p16_at_once() {
let (mut mgr, tx) = PrioManager::new();
let (mut mgr, msg_tx, _flush_tx) = PrioManager::new();
for _ in 0..998 {
tx.send(mock_out(16, 2)).unwrap();
msg_tx.send(mock_out(16, 2)).unwrap();
}
tx.send(mock_out(20, 1)).unwrap();
tx.send(mock_out(16, 2)).unwrap();
tx.send(mock_out(16, 2)).unwrap();
msg_tx.send(mock_out(20, 1)).unwrap();
msg_tx.send(mock_out(16, 2)).unwrap();
msg_tx.send(mock_out(16, 2)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(2000, &mut frames);
block_on(mgr.fill_frames(2000, &mut frames));
assert_header(&mut frames, 2, 3);
assert_data(&mut frames, 0, vec![48, 49, 50]);
@ -565,18 +546,18 @@ mod tests {
#[test]
fn single_p20_thousand_p16_later() {
let (mut mgr, tx) = PrioManager::new();
let (mut mgr, msg_tx, _flush_tx) = PrioManager::new();
for _ in 0..998 {
tx.send(mock_out(16, 2)).unwrap();
msg_tx.send(mock_out(16, 2)).unwrap();
}
let mut frames = VecDeque::new();
mgr.fill_frames(2000, &mut frames);
block_on(mgr.fill_frames(2000, &mut frames));
//^unimportant frames, gonna be dropped
tx.send(mock_out(20, 1)).unwrap();
tx.send(mock_out(16, 2)).unwrap();
tx.send(mock_out(16, 2)).unwrap();
msg_tx.send(mock_out(20, 1)).unwrap();
msg_tx.send(mock_out(16, 2)).unwrap();
msg_tx.send(mock_out(16, 2)).unwrap();
let mut frames = VecDeque::new();
mgr.fill_frames(2000, &mut frames);
block_on(mgr.fill_frames(2000, &mut frames));
//important in that test is, that after the first frames got cleared i reset
// the Points even though 998 prio 16 messages have been send at this

View File

@ -21,7 +21,7 @@ use tracing::*;
// detect a invalid client, e.g. sending an empty line would make 10 first char
// const FRAME_RESERVED_1: u8 = 0;
const FRAME_HANDSHAKE: u8 = 1;
const FRAME_PARTICIPANT_ID: u8 = 2;
const FRAME_INIT: u8 = 2;
const FRAME_SHUTDOWN: u8 = 3;
const FRAME_OPEN_STREAM: u8 = 4;
const FRAME_CLOSE_STREAM: u8 = 5;
@ -63,7 +63,7 @@ impl TcpProtocol {
mut from_wire_sender: mpsc::UnboundedSender<(Cid, Frame)>,
end_receiver: oneshot::Receiver<()>,
) {
trace!("starting up tcp write()");
trace!("starting up tcp read()");
let mut metrics_cache = CidFrameCache::new(self.metrics.frames_wire_in_total.clone(), cid);
let mut stream = self.stream.clone();
let mut end_receiver = end_receiver.fuse();
@ -94,11 +94,13 @@ impl TcpProtocol {
],
}
},
FRAME_PARTICIPANT_ID => {
FRAME_INIT => {
let mut bytes = [0u8; 16];
stream.read_exact(&mut bytes).await.unwrap();
let pid = Pid::from_le_bytes(bytes);
Frame::ParticipantId { pid }
stream.read_exact(&mut bytes).await.unwrap();
let secret = u128::from_le_bytes(bytes);
Frame::Init { pid, secret }
},
FRAME_SHUTDOWN => Frame::Shutdown,
FRAME_OPEN_STREAM => {
@ -203,12 +205,10 @@ impl TcpProtocol {
stream.write_all(&version[1].to_le_bytes()).await.unwrap();
stream.write_all(&version[2].to_le_bytes()).await.unwrap();
},
Frame::ParticipantId { pid } => {
stream
.write_all(&FRAME_PARTICIPANT_ID.to_be_bytes())
.await
.unwrap();
Frame::Init { pid, secret } => {
stream.write_all(&FRAME_INIT.to_be_bytes()).await.unwrap();
stream.write_all(&pid.to_le_bytes()).await.unwrap();
stream.write_all(&secret.to_le_bytes()).await.unwrap();
},
Frame::Shutdown => {
stream
@ -315,13 +315,18 @@ impl UdpProtocol {
],
}
},
FRAME_PARTICIPANT_ID => {
FRAME_INIT => {
let pid = Pid::from_le_bytes([
bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14],
bytes[15], bytes[16],
]);
Frame::ParticipantId { pid }
let secret = u128::from_le_bytes([
bytes[17], bytes[18], bytes[19], bytes[20], bytes[21], bytes[22],
bytes[23], bytes[24], bytes[25], bytes[26], bytes[27], bytes[28],
bytes[29], bytes[30], bytes[31], bytes[32],
]);
Frame::Init { pid, secret }
},
FRAME_SHUTDOWN => Frame::Shutdown,
FRAME_OPEN_STREAM => {
@ -427,8 +432,8 @@ impl UdpProtocol {
buffer[19] = x[3];
20
},
Frame::ParticipantId { pid } => {
let x = FRAME_PARTICIPANT_ID.to_be_bytes();
Frame::Init { pid, secret } => {
let x = FRAME_INIT.to_be_bytes();
buffer[0] = x[0];
let x = pid.to_le_bytes();
buffer[1] = x[0];
@ -447,7 +452,24 @@ impl UdpProtocol {
buffer[14] = x[13];
buffer[15] = x[14];
buffer[16] = x[15];
17
let x = secret.to_le_bytes();
buffer[17] = x[0];
buffer[18] = x[1];
buffer[19] = x[2];
buffer[20] = x[3];
buffer[21] = x[4];
buffer[22] = x[5];
buffer[23] = x[6];
buffer[24] = x[7];
buffer[25] = x[8];
buffer[26] = x[9];
buffer[27] = x[10];
buffer[28] = x[11];
buffer[29] = x[12];
buffer[30] = x[13];
buffer[31] = x[14];
buffer[32] = x[15];
33
},
Frame::Shutdown => {
let x = FRAME_SHUTDOWN.to_be_bytes();

View File

@ -19,6 +19,7 @@ use futures::{
stream::StreamExt,
};
use prometheus::Registry;
use rand::Rng;
use std::{
collections::HashMap,
sync::{
@ -31,6 +32,7 @@ use tracing_futures::Instrument;
#[derive(Debug)]
struct ParticipantInfo {
secret: u128,
s2b_create_channel_s: mpsc::UnboundedSender<(Cid, Sid, Protocols, oneshot::Sender<()>)>,
s2b_shutdown_bparticipant_s:
Option<oneshot::Sender<oneshot::Sender<async_std::io::Result<()>>>>,
@ -60,6 +62,7 @@ struct ParticipantChannels {
#[derive(Debug)]
pub struct Scheduler {
local_pid: Pid,
local_secret: u128,
closed: AtomicBool,
pool: Arc<ThreadPool>,
run_channels: Option<ControlChannels>,
@ -107,9 +110,13 @@ impl Scheduler {
metrics.register(registry).unwrap();
}
let mut rng = rand::thread_rng();
let local_secret: u128 = rng.gen();
(
Self {
local_pid,
local_secret,
closed: AtomicBool::new(false),
pool: Arc::new(ThreadPool::new().unwrap()),
run_channels,
@ -248,16 +255,22 @@ impl Scheduler {
// 2. we need to close BParticipant, this will drop its senderns and receivers
// 3. Participant will try to access the BParticipant senders and receivers with
// their next api action, it will fail and be closed then.
let (finished_sender, finished_receiver) = oneshot::channel();
trace!(?pid, "got request to close participant");
if let Some(mut pi) = self.participants.write().await.remove(&pid) {
let (finished_sender, finished_receiver) = oneshot::channel();
pi.s2b_shutdown_bparticipant_s
.take()
.unwrap()
.send(finished_sender)
.unwrap();
drop(pi);
let e = finished_receiver.await.unwrap();
return_once_successfull_shutdown.send(e).unwrap();
} else {
debug!(?pid, "looks like participant is already dropped");
return_once_successfull_shutdown.send(Ok(())).unwrap();
}
let e = finished_receiver.await.unwrap();
return_once_successfull_shutdown.send(e).unwrap();
trace!(?pid, "closed participant");
}
trace!("stop disconnect_mgr");
}
@ -275,8 +288,7 @@ impl Scheduler {
debug!("shutting down all BParticipants gracefully");
let mut participants = self.participants.write().await;
let mut waitings = vec![];
//close participants but don't remove them from self.participants yet
for (pid, pi) in participants.iter_mut() {
for (pid, mut pi) in participants.drain() {
trace!(?pid, "shutting down BParticipants");
let (finished_sender, finished_receiver) = oneshot::channel();
waitings.push((pid, finished_receiver));
@ -298,8 +310,6 @@ impl Scheduler {
_ => (),
};
}
//remove participants once everything is shut down
participants.clear();
//removing the possibility to create new participants, needed to close down
// some mgr:
self.participant_channels.lock().await.take();
@ -443,77 +453,108 @@ impl Scheduler {
let metrics = self.metrics.clone();
let pool = self.pool.clone();
let local_pid = self.local_pid;
self.pool.spawn_ok(async move {
trace!(?cid, "open channel and be ready for Handshake");
let handshake = Handshake::new(cid, local_pid, metrics.clone(), send_handshake);
match handshake.setup(&protocol).await {
Ok((pid, sid)) => {
trace!(
?cid,
?pid,
"detected that my channel is ready!, activating it :)"
);
let mut participants = participants.write().await;
if !participants.contains_key(&pid) {
debug!(?cid, "new participant connected via a channel");
let (
bparticipant,
a2b_steam_open_s,
b2a_stream_opened_r,
mut s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
) = BParticipant::new(pid, sid, metrics.clone());
let participant = Participant::new(
local_pid,
pid,
a2b_steam_open_s,
b2a_stream_opened_r,
participant_channels.a2s_disconnect_s,
let local_secret = self.local_secret;
// this is necessary for UDP to work at all and to remove code duplication
self.pool.spawn_ok(
async move {
trace!(?cid, "open channel and be ready for Handshake");
let handshake = Handshake::new(
cid,
local_pid,
local_secret,
metrics.clone(),
send_handshake,
);
match handshake.setup(&protocol).await {
Ok((pid, sid, secret)) => {
trace!(
?cid,
?pid,
"detected that my channel is ready!, activating it :)"
);
let mut participants = participants.write().await;
if !participants.contains_key(&pid) {
debug!(?cid, "new participant connected via a channel");
let (
bparticipant,
a2b_steam_open_s,
b2a_stream_opened_r,
mut s2b_create_channel_s,
s2b_shutdown_bparticipant_s,
) = BParticipant::new(pid, sid, metrics.clone());
metrics.participants_connected_total.inc();
participants.insert(pid, ParticipantInfo {
s2b_create_channel_s: s2b_create_channel_s.clone(),
s2b_shutdown_bparticipant_s: Some(s2b_shutdown_bparticipant_s),
});
pool.spawn_ok(
bparticipant
.run()
.instrument(tracing::info_span!("participant", ?pid)),
);
//create a new channel within BParticipant and wait for it to run
let (b2s_create_channel_done_s, b2s_create_channel_done_r) =
oneshot::channel();
s2b_create_channel_s
.send((cid, sid, protocol, b2s_create_channel_done_s))
.await
.unwrap();
b2s_create_channel_done_r.await.unwrap();
if let Some(pid_oneshot) = s2a_return_pid_s {
// someone is waiting with connect, so give them their PID
pid_oneshot.send(Ok(participant)).unwrap();
} else {
// noone is waiting on this Participant, return in to Network
participant_channels
.s2a_connected_s
.send(participant)
let participant = Participant::new(
local_pid,
pid,
a2b_steam_open_s,
b2a_stream_opened_r,
participant_channels.a2s_disconnect_s,
);
metrics.participants_connected_total.inc();
participants.insert(pid, ParticipantInfo {
secret,
s2b_create_channel_s: s2b_create_channel_s.clone(),
s2b_shutdown_bparticipant_s: Some(s2b_shutdown_bparticipant_s),
});
pool.spawn_ok(
bparticipant
.run()
.instrument(tracing::info_span!("participant", ?pid)),
);
//create a new channel within BParticipant and wait for it to run
let (b2s_create_channel_done_s, b2s_create_channel_done_r) =
oneshot::channel();
s2b_create_channel_s
.send((cid, sid, protocol, b2s_create_channel_done_s))
.await
.unwrap();
b2s_create_channel_done_r.await.unwrap();
if let Some(pid_oneshot) = s2a_return_pid_s {
// someone is waiting with connect, so give them their PID
pid_oneshot.send(Ok(participant)).unwrap();
} else {
// noone is waiting on this Participant, return in to Network
participant_channels
.s2a_connected_s
.send(participant)
.await
.unwrap();
}
} else {
let pi = &participants[&pid];
trace!("2nd+ channel of participant, going to compare security ids");
if pi.secret != secret {
warn!(
?pid,
?secret,
"Detected incompatible Secret!, this is probably an attack!"
);
error!("just dropping here, TODO handle this correctly!");
//TODO
if let Some(pid_oneshot) = s2a_return_pid_s {
// someone is waiting with connect, so give them their Error
pid_oneshot
.send(Err(std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
"invalid secret, denying connection",
)))
.unwrap();
}
return;
}
error!(
"ufff i cant answer the pid_oneshot. as i need to create the SAME \
participant. maybe switch to ARC"
);
}
} else {
error!(
"2ND channel of participants opens, but we cannot verify that this is \
not a attack to "
);
//ERROR DEADLOCK AS NO SENDER HERE!
//sender.send(frame_recv_sender).unwrap();
}
//From now on this CHANNEL can receiver other frames! move
// directly to participant!
},
Err(()) => {},
//From now on this CHANNEL can receiver other frames!
// move directly to participant!
},
Err(()) => {},
}
}
});
.instrument(tracing::trace_span!("")),
); /*WORKAROUND FOR SPAN NOT TO GET LOST*/
}
}

View File

@ -60,8 +60,9 @@ pub(crate) enum Frame {
magic_number: [u8; 7],
version: [u32; 3],
},
ParticipantId {
Init {
pid: Pid,
secret: u128,
},
Shutdown, /* Shutsdown this channel gracefully, if all channels are shut down, Participant
* is deleted */
@ -89,10 +90,12 @@ pub(crate) enum Frame {
}
impl Frame {
pub const FRAMES_LEN: u8 = 8;
pub const fn int_to_string(i: u8) -> &'static str {
match i {
0 => "Handshake",
1 => "ParticipantId",
1 => "Init",
2 => "Shutdown",
3 => "OpenStream",
4 => "CloseStream",
@ -109,7 +112,7 @@ impl Frame {
magic_number: _,
version: _,
} => 0,
Frame::ParticipantId { pid: _ } => 1,
Frame::Init { pid: _, secret: _ } => 1,
Frame::Shutdown => 2,
Frame::OpenStream {
sid: _,
@ -140,10 +143,10 @@ impl Pid {
/// # Example
/// ```rust
/// use uvth::ThreadPoolBuilder;
/// use veloren_network::Network;
/// use veloren_network::{Network, Pid};
///
/// let pid = Pid::new();
/// let _network = Network::new(pid, ThreadPoolBuilder::new().build(), None);
/// let _network = Network::new(pid, &ThreadPoolBuilder::new().build(), None);
/// ```
pub fn new() -> Self {
Self {

View File

@ -103,7 +103,7 @@ fn stream_send_first_then_receive() {
s1_a.send(42).unwrap();
s1_a.send("3rdMessage").unwrap();
drop(s1_a);
std::thread::sleep(std::time::Duration::from_millis(2000));
std::thread::sleep(std::time::Duration::from_millis(500));
assert_eq!(block_on(s1_b.recv()), Ok(1u8));
assert_eq!(block_on(s1_b.recv()), Ok(42));
assert_eq!(block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
@ -131,3 +131,29 @@ fn stream_simple_udp_3msg() {
s1_a.send("3rdMessage").unwrap();
assert_eq!(block_on(s1_b.recv()), Ok("3rdMessage".to_string()));
}
use uvth::ThreadPoolBuilder;
use veloren_network::{Address, Network, Pid};
#[test]
#[ignore]
fn tcp_and_udp_2_connections() -> std::result::Result<(), Box<dyn std::error::Error>> {
let (_, _) = helper::setup(true, 0);
let network = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
let remote = Network::new(Pid::new(), &ThreadPoolBuilder::new().build(), None);
block_on(async {
remote
.listen(Address::Tcp("0.0.0.0:2000".parse().unwrap()))
.await?;
remote
.listen(Address::Udp("0.0.0.0:2001".parse().unwrap()))
.await?;
let p1 = network
.connect(Address::Tcp("127.0.0.1:2000".parse().unwrap()))
.await?;
let p2 = network
.connect(Address::Udp("127.0.0.1:2001".parse().unwrap()))
.await?;
assert!(std::sync::Arc::ptr_eq(&p1, &p2));
Ok(())
})
}