2020-03-22 13:47:21 +00:00
|
|
|
use crate::{
|
2020-07-11 12:34:01 +00:00
|
|
|
api::{ParticipantError, Stream},
|
2021-01-22 16:09:20 +00:00
|
|
|
channel::{Protocols, RecvProtocols, SendProtocols},
|
2021-02-14 17:45:12 +00:00
|
|
|
metrics::NetworkMetrics,
|
2021-04-09 11:17:38 +00:00
|
|
|
util::DeferredTracer,
|
2020-03-22 13:47:21 +00:00
|
|
|
};
|
2021-02-14 17:45:12 +00:00
|
|
|
use bytes::Bytes;
|
2021-01-15 13:04:32 +00:00
|
|
|
use futures_util::{FutureExt, StreamExt};
|
2021-05-04 13:27:30 +00:00
|
|
|
use hashbrown::HashMap;
|
2021-01-22 16:09:20 +00:00
|
|
|
use network_protocol::{
|
2021-02-14 17:45:12 +00:00
|
|
|
Bandwidth, Cid, Pid, Prio, Promises, ProtocolEvent, RecvProtocol, SendProtocol, Sid,
|
2021-04-09 11:17:38 +00:00
|
|
|
_internal::SortedVec,
|
2021-01-22 16:09:20 +00:00
|
|
|
};
|
2020-03-22 13:47:21 +00:00
|
|
|
use std::{
|
2020-04-08 14:26:42 +00:00
|
|
|
sync::{
|
2021-01-22 16:09:20 +00:00
|
|
|
atomic::{AtomicBool, AtomicI32, Ordering},
|
2020-05-22 14:00:08 +00:00
|
|
|
Arc,
|
2020-04-08 14:26:42 +00:00
|
|
|
},
|
2020-05-27 15:58:57 +00:00
|
|
|
time::{Duration, Instant},
|
2020-03-22 13:47:21 +00:00
|
|
|
};
|
2021-01-15 13:04:32 +00:00
|
|
|
use tokio::{
|
|
|
|
select,
|
2021-03-25 17:28:50 +00:00
|
|
|
sync::{mpsc, oneshot, watch, Mutex, RwLock},
|
2021-01-22 16:09:20 +00:00
|
|
|
task::JoinHandle,
|
2021-01-15 13:04:32 +00:00
|
|
|
};
|
|
|
|
use tokio_stream::wrappers::UnboundedReceiverStream;
|
2020-03-22 13:47:21 +00:00
|
|
|
use tracing::*;
|
|
|
|
|
2021-01-22 16:09:20 +00:00
|
|
|
pub(crate) type A2bStreamOpen = (Prio, Promises, Bandwidth, oneshot::Sender<Stream>);
|
|
|
|
pub(crate) type S2bCreateChannel = (Cid, Sid, Protocols, oneshot::Sender<()>);
|
|
|
|
pub(crate) type S2bShutdownBparticipant = (Duration, oneshot::Sender<Result<(), ParticipantError>>);
|
2020-07-14 22:18:04 +00:00
|
|
|
pub(crate) type B2sPrioStatistic = (Pid, u64, u64);
|
|
|
|
|
2020-05-15 12:29:17 +00:00
|
|
|
#[derive(Debug)]
|
|
|
|
struct ChannelInfo {
|
|
|
|
cid: Cid,
|
2020-05-27 15:58:57 +00:00
|
|
|
cid_string: String, //optimisationmetrics
|
2020-05-15 12:29:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
struct StreamInfo {
|
|
|
|
prio: Prio,
|
|
|
|
promises: Promises,
|
2020-07-16 19:39:33 +00:00
|
|
|
send_closed: Arc<AtomicBool>,
|
2021-02-14 17:45:12 +00:00
|
|
|
b2a_msg_recv_s: Mutex<async_channel::Sender<Bytes>>,
|
2020-05-15 12:29:17 +00:00
|
|
|
}
|
|
|
|
|
2020-03-22 13:47:21 +00:00
|
|
|
#[derive(Debug)]
|
|
|
|
struct ControlChannels {
|
2021-01-22 16:09:20 +00:00
|
|
|
a2b_open_stream_r: mpsc::UnboundedReceiver<A2bStreamOpen>,
|
2020-05-15 12:29:17 +00:00
|
|
|
b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
|
2020-07-14 22:18:04 +00:00
|
|
|
s2b_create_channel_r: mpsc::UnboundedReceiver<S2bCreateChannel>,
|
2021-03-25 17:28:50 +00:00
|
|
|
b2a_bandwidth_stats_s: watch::Sender<f32>,
|
2020-07-14 22:18:04 +00:00
|
|
|
s2b_shutdown_bparticipant_r: oneshot::Receiver<S2bShutdownBparticipant>, /* own */
|
2020-03-22 13:47:21 +00:00
|
|
|
}
|
|
|
|
|
2021-02-18 18:33:20 +00:00
|
|
|
#[derive(Debug)]
|
|
|
|
struct OpenStreamInfo {
|
|
|
|
a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
|
|
|
|
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
|
|
|
|
}
|
|
|
|
|
2020-03-22 13:47:21 +00:00
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct BParticipant {
|
2021-02-10 10:37:42 +00:00
|
|
|
local_pid: Pid, //tracing
|
2020-03-22 13:47:21 +00:00
|
|
|
remote_pid: Pid,
|
2020-05-24 23:17:03 +00:00
|
|
|
remote_pid_string: String, //optimisation
|
2020-03-22 13:47:21 +00:00
|
|
|
offset_sid: Sid,
|
2020-08-23 19:43:17 +00:00
|
|
|
channels: Arc<RwLock<HashMap<Cid, Mutex<ChannelInfo>>>>,
|
2020-05-15 12:29:17 +00:00
|
|
|
streams: RwLock<HashMap<Sid, StreamInfo>>,
|
2020-03-22 13:47:21 +00:00
|
|
|
run_channels: Option<ControlChannels>,
|
2021-01-22 16:09:20 +00:00
|
|
|
shutdown_barrier: AtomicI32,
|
2020-04-24 10:56:04 +00:00
|
|
|
metrics: Arc<NetworkMetrics>,
|
2021-02-18 18:33:20 +00:00
|
|
|
open_stream_channels: Arc<Mutex<Option<OpenStreamInfo>>>,
|
2020-03-22 13:47:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl BParticipant {
|
2021-01-22 16:09:20 +00:00
|
|
|
// We use integer instead of Barrier to not block mgr from freeing at the end
|
|
|
|
const BARR_CHANNEL: i32 = 1;
|
|
|
|
const BARR_RECV: i32 = 4;
|
|
|
|
const BARR_SEND: i32 = 2;
|
2021-01-15 13:04:32 +00:00
|
|
|
const TICK_TIME: Duration = Duration::from_millis(Self::TICK_TIME_MS);
|
2021-02-18 00:01:57 +00:00
|
|
|
const TICK_TIME_MS: u64 = 5;
|
2021-01-15 13:04:32 +00:00
|
|
|
|
2020-06-08 09:47:39 +00:00
|
|
|
#[allow(clippy::type_complexity)]
|
2020-03-22 13:47:21 +00:00
|
|
|
pub(crate) fn new(
|
2021-02-10 10:37:42 +00:00
|
|
|
local_pid: Pid,
|
2020-03-22 13:47:21 +00:00
|
|
|
remote_pid: Pid,
|
|
|
|
offset_sid: Sid,
|
2021-02-14 17:45:12 +00:00
|
|
|
metrics: Arc<NetworkMetrics>,
|
2020-03-22 13:47:21 +00:00
|
|
|
) -> (
|
|
|
|
Self,
|
2020-07-14 22:18:04 +00:00
|
|
|
mpsc::UnboundedSender<A2bStreamOpen>,
|
2020-03-22 13:47:21 +00:00
|
|
|
mpsc::UnboundedReceiver<Stream>,
|
2020-07-14 22:18:04 +00:00
|
|
|
mpsc::UnboundedSender<S2bCreateChannel>,
|
|
|
|
oneshot::Sender<S2bShutdownBparticipant>,
|
2021-03-25 17:28:50 +00:00
|
|
|
watch::Receiver<f32>,
|
2020-03-22 13:47:21 +00:00
|
|
|
) {
|
2021-01-22 16:09:20 +00:00
|
|
|
let (a2b_open_stream_s, a2b_open_stream_r) = mpsc::unbounded_channel::<A2bStreamOpen>();
|
2021-01-15 13:04:32 +00:00
|
|
|
let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded_channel::<Stream>();
|
2020-05-15 12:29:17 +00:00
|
|
|
let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel();
|
2021-01-15 13:04:32 +00:00
|
|
|
let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded_channel();
|
2021-03-25 17:28:50 +00:00
|
|
|
let (b2a_bandwidth_stats_s, b2a_bandwidth_stats_r) = watch::channel::<f32>(0.0);
|
2020-03-22 13:47:21 +00:00
|
|
|
|
|
|
|
let run_channels = Some(ControlChannels {
|
2021-01-22 16:09:20 +00:00
|
|
|
a2b_open_stream_r,
|
2020-05-15 12:29:17 +00:00
|
|
|
b2a_stream_opened_s,
|
|
|
|
s2b_create_channel_r,
|
2021-03-25 17:28:50 +00:00
|
|
|
b2a_bandwidth_stats_s,
|
2020-05-15 12:29:17 +00:00
|
|
|
s2b_shutdown_bparticipant_r,
|
2020-03-22 13:47:21 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
(
|
|
|
|
Self {
|
2021-02-10 10:37:42 +00:00
|
|
|
local_pid,
|
2020-03-22 13:47:21 +00:00
|
|
|
remote_pid,
|
2020-05-24 23:17:03 +00:00
|
|
|
remote_pid_string: remote_pid.to_string(),
|
2020-03-22 13:47:21 +00:00
|
|
|
offset_sid,
|
2020-08-18 15:52:19 +00:00
|
|
|
channels: Arc::new(RwLock::new(HashMap::new())),
|
2020-03-22 13:47:21 +00:00
|
|
|
streams: RwLock::new(HashMap::new()),
|
2021-01-22 16:09:20 +00:00
|
|
|
shutdown_barrier: AtomicI32::new(
|
|
|
|
Self::BARR_CHANNEL + Self::BARR_SEND + Self::BARR_RECV,
|
|
|
|
),
|
2020-03-22 13:47:21 +00:00
|
|
|
run_channels,
|
2020-04-24 10:56:04 +00:00
|
|
|
metrics,
|
2021-02-18 18:33:20 +00:00
|
|
|
open_stream_channels: Arc::new(Mutex::new(None)),
|
2020-03-22 13:47:21 +00:00
|
|
|
},
|
2021-01-22 16:09:20 +00:00
|
|
|
a2b_open_stream_s,
|
2020-05-15 12:29:17 +00:00
|
|
|
b2a_stream_opened_r,
|
|
|
|
s2b_create_channel_s,
|
|
|
|
s2b_shutdown_bparticipant_s,
|
2021-03-25 17:28:50 +00:00
|
|
|
b2a_bandwidth_stats_r,
|
2020-03-22 13:47:21 +00:00
|
|
|
)
|
|
|
|
}
|
|
|
|
|
2020-07-14 22:18:04 +00:00
|
|
|
pub async fn run(mut self, b2s_prio_statistic_s: mpsc::UnboundedSender<B2sPrioStatistic>) {
|
2021-01-22 16:09:20 +00:00
|
|
|
let (b2b_add_send_protocol_s, b2b_add_send_protocol_r) =
|
|
|
|
mpsc::unbounded_channel::<(Cid, SendProtocols)>();
|
|
|
|
let (b2b_add_recv_protocol_s, b2b_add_recv_protocol_r) =
|
|
|
|
mpsc::unbounded_channel::<(Cid, RecvProtocols)>();
|
|
|
|
let (b2b_close_send_protocol_s, b2b_close_send_protocol_r) =
|
|
|
|
async_channel::unbounded::<Cid>();
|
|
|
|
let (b2b_force_close_recv_protocol_s, b2b_force_close_recv_protocol_r) =
|
|
|
|
async_channel::unbounded::<Cid>();
|
2021-03-25 15:46:40 +00:00
|
|
|
let (b2b_notify_send_of_recv_open_s, b2b_notify_send_of_recv_open_r) =
|
|
|
|
crossbeam_channel::unbounded::<(Cid, Sid, Prio, Promises, u64)>();
|
|
|
|
let (b2b_notify_send_of_recv_close_s, b2b_notify_send_of_recv_close_r) =
|
|
|
|
crossbeam_channel::unbounded::<(Cid, Sid)>();
|
2021-01-22 16:09:20 +00:00
|
|
|
|
|
|
|
let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded_channel::<Sid>();
|
2021-02-18 18:33:20 +00:00
|
|
|
let (a2b_msg_s, a2b_msg_r) = crossbeam_channel::unbounded::<(Sid, Bytes)>();
|
2020-04-08 14:26:42 +00:00
|
|
|
|
2021-02-18 18:33:20 +00:00
|
|
|
*self.open_stream_channels.lock().await = Some(OpenStreamInfo {
|
|
|
|
a2b_msg_s,
|
|
|
|
a2b_close_stream_s,
|
|
|
|
});
|
2020-03-22 13:47:21 +00:00
|
|
|
let run_channels = self.run_channels.take().unwrap();
|
2021-02-10 10:37:42 +00:00
|
|
|
trace!("start all managers");
|
2021-01-15 13:04:32 +00:00
|
|
|
tokio::join!(
|
2021-01-22 16:09:20 +00:00
|
|
|
self.send_mgr(
|
|
|
|
run_channels.a2b_open_stream_r,
|
|
|
|
a2b_close_stream_r,
|
|
|
|
a2b_msg_r,
|
|
|
|
b2b_add_send_protocol_r,
|
|
|
|
b2b_close_send_protocol_r,
|
2021-03-25 15:46:40 +00:00
|
|
|
b2b_notify_send_of_recv_open_r,
|
|
|
|
b2b_notify_send_of_recv_close_r,
|
2021-01-22 16:09:20 +00:00
|
|
|
b2s_prio_statistic_s,
|
2021-03-25 17:28:50 +00:00
|
|
|
run_channels.b2a_bandwidth_stats_s,
|
2021-02-10 10:37:42 +00:00
|
|
|
)
|
|
|
|
.instrument(tracing::info_span!("send")),
|
2021-01-22 16:09:20 +00:00
|
|
|
self.recv_mgr(
|
2020-05-15 12:29:17 +00:00
|
|
|
run_channels.b2a_stream_opened_s,
|
2021-01-22 16:09:20 +00:00
|
|
|
b2b_add_recv_protocol_r,
|
|
|
|
b2b_force_close_recv_protocol_r,
|
|
|
|
b2b_close_send_protocol_s.clone(),
|
2021-03-25 15:46:40 +00:00
|
|
|
b2b_notify_send_of_recv_open_s,
|
|
|
|
b2b_notify_send_of_recv_close_s,
|
2021-02-10 10:37:42 +00:00
|
|
|
)
|
|
|
|
.instrument(tracing::info_span!("recv")),
|
2021-01-22 16:09:20 +00:00
|
|
|
self.create_channel_mgr(
|
|
|
|
run_channels.s2b_create_channel_r,
|
|
|
|
b2b_add_send_protocol_s,
|
|
|
|
b2b_add_recv_protocol_s,
|
2020-04-08 14:26:42 +00:00
|
|
|
),
|
2020-07-11 12:34:01 +00:00
|
|
|
self.participant_shutdown_mgr(
|
|
|
|
run_channels.s2b_shutdown_bparticipant_r,
|
2021-01-22 16:09:20 +00:00
|
|
|
b2b_close_send_protocol_s.clone(),
|
|
|
|
b2b_force_close_recv_protocol_s,
|
2020-07-11 12:34:01 +00:00
|
|
|
),
|
2020-03-22 13:47:21 +00:00
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2021-03-25 15:27:13 +00:00
|
|
|
fn best_protocol(all: &SortedVec<Cid, SendProtocols>, promises: Promises) -> Option<Cid> {
|
|
|
|
// check for mpsc
|
2021-05-19 23:59:30 +00:00
|
|
|
all.data.iter().find(|(_, p)| matches!(p, SendProtocols::Mpsc(_))).map(|(c, _)| *c).or_else(
|
|
|
|
|| if network_protocol::TcpSendProtocol::<crate::channel::TcpDrain>::supported_promises()
|
|
|
|
.contains(promises)
|
|
|
|
{
|
|
|
|
// check for tcp
|
|
|
|
all.data.iter().find(|(_, p)| matches!(p, SendProtocols::Tcp(_))).map(|(c, _)| *c)
|
|
|
|
} else {
|
|
|
|
None
|
2021-03-25 15:27:13 +00:00
|
|
|
}
|
2021-05-19 23:59:30 +00:00
|
|
|
).or_else(
|
|
|
|
// check for quic, TODO: evaluate to order quic BEFORE tcp once its stable
|
|
|
|
|| if network_protocol::QuicSendProtocol::<crate::channel::QuicDrain>::supported_promises()
|
|
|
|
.contains(promises)
|
|
|
|
{
|
|
|
|
all.data.iter().find(|(_, p)| matches!(p, SendProtocols::Quic(_))).map(|(c, _)| *c)
|
|
|
|
} else {
|
|
|
|
None
|
2021-03-25 15:27:13 +00:00
|
|
|
}
|
2021-05-19 23:59:30 +00:00
|
|
|
).or_else(
|
|
|
|
|| {
|
|
|
|
warn!("couldn't satisfy promises");
|
|
|
|
all.data.first().map(|(c, _)| *c)
|
|
|
|
}
|
|
|
|
)
|
2021-03-25 15:27:13 +00:00
|
|
|
}
|
|
|
|
|
2021-01-22 16:09:20 +00:00
|
|
|
//TODO: local stream_cid: HashMap<Sid, Cid> to know the respective protocol
|
2021-02-14 17:45:12 +00:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2020-05-22 14:00:08 +00:00
|
|
|
async fn send_mgr(
|
|
|
|
&self,
|
2021-01-22 16:09:20 +00:00
|
|
|
mut a2b_open_stream_r: mpsc::UnboundedReceiver<A2bStreamOpen>,
|
|
|
|
mut a2b_close_stream_r: mpsc::UnboundedReceiver<Sid>,
|
2021-02-14 17:45:12 +00:00
|
|
|
a2b_msg_r: crossbeam_channel::Receiver<(Sid, Bytes)>,
|
2021-01-22 16:09:20 +00:00
|
|
|
mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, SendProtocols)>,
|
|
|
|
b2b_close_send_protocol_r: async_channel::Receiver<Cid>,
|
2021-03-25 15:46:40 +00:00
|
|
|
b2b_notify_send_of_recv_open_r: crossbeam_channel::Receiver<(
|
|
|
|
Cid,
|
|
|
|
Sid,
|
|
|
|
Prio,
|
|
|
|
Promises,
|
|
|
|
Bandwidth,
|
|
|
|
)>,
|
|
|
|
b2b_notify_send_of_recv_close_r: crossbeam_channel::Receiver<(Cid, Sid)>,
|
2021-01-22 16:09:20 +00:00
|
|
|
_b2s_prio_statistic_s: mpsc::UnboundedSender<B2sPrioStatistic>,
|
2021-03-25 17:28:50 +00:00
|
|
|
b2a_bandwidth_stats_s: watch::Sender<f32>,
|
2020-05-22 14:00:08 +00:00
|
|
|
) {
|
2021-03-25 15:27:13 +00:00
|
|
|
let mut sorted_send_protocols = SortedVec::<Cid, SendProtocols>::default();
|
|
|
|
let mut sorted_stream_protocols = SortedVec::<Sid, Cid>::default();
|
2021-01-19 08:48:33 +00:00
|
|
|
let mut interval = tokio::time::interval(Self::TICK_TIME);
|
2021-02-14 17:45:12 +00:00
|
|
|
let mut last_instant = Instant::now();
|
2021-01-22 16:09:20 +00:00
|
|
|
let mut stream_ids = self.offset_sid;
|
2021-03-25 17:28:50 +00:00
|
|
|
let mut part_bandwidth = 0.0f32;
|
2021-02-10 10:37:42 +00:00
|
|
|
trace!("workaround, actively wait for first protocol");
|
2021-03-25 15:27:13 +00:00
|
|
|
if let Some((c, p)) = b2b_add_protocol_r.recv().await {
|
|
|
|
sorted_send_protocols.insert(c, p)
|
|
|
|
}
|
2020-05-22 14:00:08 +00:00
|
|
|
loop {
|
2021-02-14 17:45:12 +00:00
|
|
|
let (open, close, _, addp, remp) = select!(
|
|
|
|
Some(n) = a2b_open_stream_r.recv().fuse() => (Some(n), None, None, None, None),
|
|
|
|
Some(n) = a2b_close_stream_r.recv().fuse() => (None, Some(n), None, None, None),
|
|
|
|
_ = interval.tick() => (None, None, Some(()), None, None),
|
|
|
|
Some(n) = b2b_add_protocol_r.recv().fuse() => (None, None, None, Some(n), None),
|
|
|
|
Ok(n) = b2b_close_send_protocol_r.recv().fuse() => (None, None, None, None, Some(n)),
|
2021-01-22 16:09:20 +00:00
|
|
|
);
|
|
|
|
|
2021-03-25 15:27:13 +00:00
|
|
|
if let Some((cid, p)) = addp {
|
2021-02-10 10:37:42 +00:00
|
|
|
debug!(?cid, "add protocol");
|
2021-03-25 15:27:13 +00:00
|
|
|
sorted_send_protocols.insert(cid, p);
|
|
|
|
}
|
2021-01-22 16:09:20 +00:00
|
|
|
|
2021-03-25 15:27:13 +00:00
|
|
|
//verify that we have at LEAST 1 channel before continuing
|
|
|
|
if sorted_send_protocols.data.is_empty() {
|
|
|
|
warn!("no channel");
|
|
|
|
tokio::time::sleep(Self::TICK_TIME * 1000).await; //TODO: failover
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
//let (cid, active) = sorted_send_protocols.data.iter_mut().next().unwrap();
|
|
|
|
//used for error handling
|
|
|
|
let mut cid = u64::MAX;
|
2021-01-22 16:09:20 +00:00
|
|
|
|
|
|
|
let active_err = async {
|
2021-02-14 17:45:12 +00:00
|
|
|
if let Some((prio, promises, guaranteed_bandwidth, return_s)) = open {
|
2021-01-22 16:09:20 +00:00
|
|
|
let sid = stream_ids;
|
|
|
|
stream_ids += Sid::from(1);
|
2021-03-25 15:27:13 +00:00
|
|
|
cid = Self::best_protocol(&sorted_send_protocols, promises).unwrap();
|
|
|
|
trace!(?sid, ?cid, "open stream");
|
|
|
|
|
2021-01-22 16:09:20 +00:00
|
|
|
let stream = self
|
2021-02-18 18:33:20 +00:00
|
|
|
.create_stream(sid, prio, promises, guaranteed_bandwidth)
|
2021-01-22 16:09:20 +00:00
|
|
|
.await;
|
|
|
|
|
|
|
|
let event = ProtocolEvent::OpenStream {
|
|
|
|
sid,
|
|
|
|
prio,
|
|
|
|
promises,
|
|
|
|
guaranteed_bandwidth,
|
|
|
|
};
|
|
|
|
|
2021-03-25 15:27:13 +00:00
|
|
|
sorted_stream_protocols.insert(sid, cid);
|
2021-01-22 16:09:20 +00:00
|
|
|
return_s.send(stream).unwrap();
|
2021-03-25 15:27:13 +00:00
|
|
|
sorted_send_protocols
|
|
|
|
.get_mut(&cid)
|
|
|
|
.unwrap()
|
|
|
|
.send(event)
|
|
|
|
.await?;
|
2020-10-14 12:29:39 +00:00
|
|
|
}
|
2020-05-22 14:00:08 +00:00
|
|
|
|
2021-02-14 17:45:12 +00:00
|
|
|
// process recv content first
|
2021-03-25 15:46:40 +00:00
|
|
|
for (cid, sid, prio, promises, guaranteed_bandwidth) in
|
|
|
|
b2b_notify_send_of_recv_open_r.try_iter()
|
|
|
|
{
|
|
|
|
match sorted_send_protocols.get_mut(&cid) {
|
|
|
|
Some(p) => {
|
|
|
|
sorted_stream_protocols.insert(sid, cid);
|
|
|
|
p.notify_from_recv(ProtocolEvent::OpenStream {
|
|
|
|
sid,
|
|
|
|
prio,
|
|
|
|
promises,
|
|
|
|
guaranteed_bandwidth,
|
|
|
|
});
|
2021-03-25 15:27:13 +00:00
|
|
|
},
|
2021-03-25 15:46:40 +00:00
|
|
|
None => warn!(?cid, "couldn't notify create protocol, doesn't exist"),
|
|
|
|
};
|
|
|
|
}
|
2021-02-14 17:45:12 +00:00
|
|
|
|
2021-01-22 16:09:20 +00:00
|
|
|
// get all messages and assign it to a channel
|
|
|
|
for (sid, buffer) in a2b_msg_r.try_iter() {
|
2021-03-25 15:27:13 +00:00
|
|
|
cid = *sorted_stream_protocols.get(&sid).unwrap();
|
|
|
|
let event = ProtocolEvent::Message { data: buffer, sid };
|
|
|
|
sorted_send_protocols
|
|
|
|
.get_mut(&cid)
|
|
|
|
.unwrap()
|
|
|
|
.send(event)
|
|
|
|
.await?;
|
2021-01-22 16:09:20 +00:00
|
|
|
}
|
|
|
|
|
2021-02-14 17:45:12 +00:00
|
|
|
// process recv content afterwards
|
2021-03-25 15:46:40 +00:00
|
|
|
for (cid, sid) in b2b_notify_send_of_recv_close_r.try_iter() {
|
|
|
|
match sorted_send_protocols.get_mut(&cid) {
|
|
|
|
Some(p) => {
|
|
|
|
let _ = sorted_stream_protocols.delete(&sid);
|
|
|
|
p.notify_from_recv(ProtocolEvent::CloseStream { sid });
|
|
|
|
},
|
|
|
|
None => warn!(?cid, "couldn't notify close protocol, doesn't exist"),
|
|
|
|
};
|
|
|
|
}
|
2021-02-14 17:45:12 +00:00
|
|
|
|
|
|
|
if let Some(sid) = close {
|
2021-02-10 10:37:42 +00:00
|
|
|
trace!(?stream_ids, "delete stream");
|
2021-01-22 16:09:20 +00:00
|
|
|
self.delete_stream(sid).await;
|
|
|
|
// Fire&Forget the protocol will take care to verify that this Frame is delayed
|
|
|
|
// till the last msg was received!
|
2021-03-25 15:46:40 +00:00
|
|
|
if let Some(c) = sorted_stream_protocols.delete(&sid) {
|
|
|
|
cid = c;
|
|
|
|
let event = ProtocolEvent::CloseStream { sid };
|
|
|
|
sorted_send_protocols
|
|
|
|
.get_mut(&c)
|
|
|
|
.unwrap()
|
|
|
|
.send(event)
|
|
|
|
.await?;
|
|
|
|
}
|
2020-08-23 19:43:17 +00:00
|
|
|
}
|
2021-01-22 16:09:20 +00:00
|
|
|
|
2021-02-14 17:45:12 +00:00
|
|
|
let send_time = Instant::now();
|
|
|
|
let diff = send_time.duration_since(last_instant);
|
|
|
|
last_instant = send_time;
|
2021-03-25 17:28:50 +00:00
|
|
|
let mut cnt = 0;
|
2021-05-04 13:27:30 +00:00
|
|
|
for (c, p) in sorted_send_protocols.data.iter_mut() {
|
|
|
|
cid = *c;
|
2021-03-25 17:28:50 +00:00
|
|
|
cnt += p.flush(1_000_000_000, diff).await?; //this actually blocks, so we cant set streams while it.
|
2021-03-25 15:27:13 +00:00
|
|
|
}
|
2021-03-25 17:28:50 +00:00
|
|
|
let flush_time = send_time.elapsed().as_secs_f32();
|
|
|
|
part_bandwidth = 0.99 * part_bandwidth + 0.01 * (cnt as f32 / flush_time);
|
|
|
|
self.metrics
|
|
|
|
.participant_bandwidth(&self.remote_pid_string, part_bandwidth);
|
|
|
|
let _ = b2a_bandwidth_stats_s.send(part_bandwidth);
|
2021-01-22 16:09:20 +00:00
|
|
|
let r: Result<(), network_protocol::ProtocolError> = Ok(());
|
|
|
|
r
|
2020-08-23 19:43:17 +00:00
|
|
|
}
|
2021-01-22 16:09:20 +00:00
|
|
|
.await;
|
|
|
|
if let Err(e) = active_err {
|
2021-02-10 10:37:42 +00:00
|
|
|
info!(?cid, ?e, "protocol failed, shutting down channel");
|
2021-01-22 16:09:20 +00:00
|
|
|
// remote recv will now fail, which will trigger remote send which will trigger
|
|
|
|
// recv
|
2021-02-18 18:33:20 +00:00
|
|
|
trace!("TODO: for now decide to FAIL this participant and not wait for a failover");
|
2021-03-25 15:27:13 +00:00
|
|
|
sorted_send_protocols.delete(&cid).unwrap();
|
2021-02-14 17:45:12 +00:00
|
|
|
self.metrics.channels_disconnected(&self.remote_pid_string);
|
2021-03-25 15:27:13 +00:00
|
|
|
if sorted_send_protocols.data.is_empty() {
|
2021-02-18 18:33:20 +00:00
|
|
|
break;
|
|
|
|
}
|
2021-02-14 17:45:12 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(cid) = remp {
|
|
|
|
debug!(?cid, "remove protocol");
|
2021-03-25 15:27:13 +00:00
|
|
|
match sorted_send_protocols.delete(&cid) {
|
2021-02-14 17:45:12 +00:00
|
|
|
Some(mut prot) => {
|
|
|
|
self.metrics.channels_disconnected(&self.remote_pid_string);
|
|
|
|
trace!("blocking flush");
|
|
|
|
let _ = prot.flush(u64::MAX, Duration::from_secs(1)).await;
|
|
|
|
trace!("shutdown prot");
|
|
|
|
let _ = prot.send(ProtocolEvent::Shutdown).await;
|
|
|
|
},
|
|
|
|
None => trace!("tried to remove protocol twice"),
|
|
|
|
};
|
2021-03-25 15:27:13 +00:00
|
|
|
if sorted_send_protocols.data.is_empty() {
|
2021-02-14 17:45:12 +00:00
|
|
|
break;
|
|
|
|
}
|
2021-01-22 16:09:20 +00:00
|
|
|
}
|
|
|
|
}
|
2021-02-18 18:33:20 +00:00
|
|
|
trace!("stop sending in api!");
|
|
|
|
self.open_stream_channels.lock().await.take();
|
2021-01-22 16:09:20 +00:00
|
|
|
trace!("Stop send_mgr");
|
|
|
|
self.shutdown_barrier
|
2021-04-07 21:17:09 +00:00
|
|
|
.fetch_sub(Self::BARR_SEND, Ordering::SeqCst);
|
2020-03-22 13:47:21 +00:00
|
|
|
}
|
|
|
|
|
2021-02-14 17:45:12 +00:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2021-01-22 16:09:20 +00:00
|
|
|
async fn recv_mgr(
|
2020-03-22 13:47:21 +00:00
|
|
|
&self,
|
2021-01-15 13:04:32 +00:00
|
|
|
b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
|
2021-01-22 16:09:20 +00:00
|
|
|
mut b2b_add_protocol_r: mpsc::UnboundedReceiver<(Cid, RecvProtocols)>,
|
|
|
|
b2b_force_close_recv_protocol_r: async_channel::Receiver<Cid>,
|
|
|
|
b2b_close_send_protocol_s: async_channel::Sender<Cid>,
|
2021-03-25 15:46:40 +00:00
|
|
|
b2b_notify_send_of_recv_open_r: crossbeam_channel::Sender<(
|
|
|
|
Cid,
|
|
|
|
Sid,
|
|
|
|
Prio,
|
|
|
|
Promises,
|
|
|
|
Bandwidth,
|
|
|
|
)>,
|
|
|
|
b2b_notify_send_of_recv_close_s: crossbeam_channel::Sender<(Cid, Sid)>,
|
2020-03-22 13:47:21 +00:00
|
|
|
) {
|
2021-01-22 16:09:20 +00:00
|
|
|
let mut recv_protocols: HashMap<Cid, JoinHandle<()>> = HashMap::new();
|
|
|
|
// we should be able to directly await futures imo
|
|
|
|
let (hacky_recv_s, mut hacky_recv_r) = mpsc::unbounded_channel();
|
|
|
|
|
|
|
|
let retrigger = |cid: Cid, mut p: RecvProtocols, map: &mut HashMap<_, _>| {
|
|
|
|
let hacky_recv_s = hacky_recv_s.clone();
|
|
|
|
let handle = tokio::spawn(async move {
|
|
|
|
let cid = cid;
|
|
|
|
let r = p.recv().await;
|
|
|
|
let _ = hacky_recv_s.send((cid, r, p)); // ignoring failed
|
|
|
|
});
|
|
|
|
map.insert(cid, handle);
|
|
|
|
};
|
|
|
|
|
|
|
|
let remove_c = |recv_protocols: &mut HashMap<Cid, JoinHandle<()>>, cid: &Cid| {
|
|
|
|
match recv_protocols.remove(&cid) {
|
2021-02-10 10:37:42 +00:00
|
|
|
Some(h) => {
|
|
|
|
h.abort();
|
|
|
|
debug!(?cid, "remove protocol");
|
|
|
|
},
|
2021-01-22 16:09:20 +00:00
|
|
|
None => trace!("tried to remove protocol twice"),
|
2020-08-21 12:01:49 +00:00
|
|
|
};
|
2021-01-22 16:09:20 +00:00
|
|
|
recv_protocols.is_empty()
|
|
|
|
};
|
|
|
|
|
2021-03-22 00:32:47 +00:00
|
|
|
let mut defered_orphan = DeferredTracer::new(tracing::Level::WARN);
|
|
|
|
|
2021-01-22 16:09:20 +00:00
|
|
|
loop {
|
|
|
|
let (event, addp, remp) = select!(
|
2021-02-14 17:45:12 +00:00
|
|
|
Some(n) = hacky_recv_r.recv().fuse() => (Some(n), None, None),
|
|
|
|
Some(n) = b2b_add_protocol_r.recv().fuse() => (None, Some(n), None),
|
|
|
|
Ok(n) = b2b_force_close_recv_protocol_r.recv().fuse() => (None, None, Some(n)),
|
|
|
|
else => {
|
|
|
|
error!("recv_mgr -> something is seriously wrong!, end recv_mgr");
|
|
|
|
break;
|
|
|
|
}
|
2021-01-22 16:09:20 +00:00
|
|
|
);
|
|
|
|
|
2021-02-14 17:45:12 +00:00
|
|
|
if let Some((cid, p)) = addp {
|
2021-02-10 10:37:42 +00:00
|
|
|
debug!(?cid, "add protocol");
|
2021-01-22 16:09:20 +00:00
|
|
|
retrigger(cid, p, &mut recv_protocols);
|
2021-02-14 17:45:12 +00:00
|
|
|
};
|
|
|
|
if let Some(cid) = remp {
|
2021-01-22 16:09:20 +00:00
|
|
|
// no need to stop the send_mgr here as it has been canceled before
|
|
|
|
if remove_c(&mut recv_protocols, &cid) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-02-14 17:45:12 +00:00
|
|
|
if let Some((cid, r, p)) = event {
|
2021-01-22 16:09:20 +00:00
|
|
|
match r {
|
|
|
|
Ok(ProtocolEvent::OpenStream {
|
2020-10-14 12:29:39 +00:00
|
|
|
sid,
|
2021-01-22 16:09:20 +00:00
|
|
|
prio,
|
|
|
|
promises,
|
|
|
|
guaranteed_bandwidth,
|
|
|
|
}) => {
|
|
|
|
trace!(?sid, "open stream");
|
2021-03-25 15:46:40 +00:00
|
|
|
let _ = b2b_notify_send_of_recv_open_r.send((
|
|
|
|
cid,
|
|
|
|
sid,
|
|
|
|
prio,
|
|
|
|
promises,
|
|
|
|
guaranteed_bandwidth,
|
|
|
|
));
|
2021-02-14 17:45:12 +00:00
|
|
|
// waiting for receiving is not necessary, because the send_mgr will first
|
|
|
|
// process this before process messages!
|
2021-01-22 16:09:20 +00:00
|
|
|
let stream = self
|
2021-02-18 18:33:20 +00:00
|
|
|
.create_stream(sid, prio, promises, guaranteed_bandwidth)
|
2021-01-22 16:09:20 +00:00
|
|
|
.await;
|
|
|
|
b2a_stream_opened_s.send(stream).unwrap();
|
|
|
|
retrigger(cid, p, &mut recv_protocols);
|
|
|
|
},
|
|
|
|
Ok(ProtocolEvent::CloseStream { sid }) => {
|
|
|
|
trace!(?sid, "close stream");
|
2021-03-25 15:46:40 +00:00
|
|
|
let _ = b2b_notify_send_of_recv_close_s.send((cid, sid));
|
2021-01-22 16:09:20 +00:00
|
|
|
self.delete_stream(sid).await;
|
|
|
|
retrigger(cid, p, &mut recv_protocols);
|
|
|
|
},
|
2021-02-18 00:01:57 +00:00
|
|
|
Ok(ProtocolEvent::Message { data, sid }) => {
|
2021-01-22 16:09:20 +00:00
|
|
|
let lock = self.streams.read().await;
|
|
|
|
match lock.get(&sid) {
|
|
|
|
Some(stream) => {
|
2021-02-14 17:45:12 +00:00
|
|
|
let _ = stream.b2a_msg_recv_s.lock().await.send(data).await;
|
2021-01-22 16:09:20 +00:00
|
|
|
},
|
2021-03-22 00:32:47 +00:00
|
|
|
None => defered_orphan.log(sid),
|
2021-01-22 16:09:20 +00:00
|
|
|
};
|
|
|
|
retrigger(cid, p, &mut recv_protocols);
|
|
|
|
},
|
|
|
|
Ok(ProtocolEvent::Shutdown) => {
|
|
|
|
info!(?cid, "shutdown protocol");
|
|
|
|
if let Err(e) = b2b_close_send_protocol_s.send(cid).await {
|
|
|
|
debug!(?e, ?cid, "send_mgr was already closed simultaneously");
|
2020-03-22 13:47:21 +00:00
|
|
|
}
|
2021-01-22 16:09:20 +00:00
|
|
|
if remove_c(&mut recv_protocols, &cid) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
},
|
|
|
|
Err(e) => {
|
2021-02-10 10:37:42 +00:00
|
|
|
info!(?e, ?cid, "protocol failed, shutting down channel");
|
2021-01-22 16:09:20 +00:00
|
|
|
if let Err(e) = b2b_close_send_protocol_s.send(cid).await {
|
|
|
|
debug!(?e, ?cid, "send_mgr was already closed simultaneously");
|
|
|
|
}
|
|
|
|
if remove_c(&mut recv_protocols, &cid) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
},
|
|
|
|
}
|
2020-03-22 13:47:21 +00:00
|
|
|
}
|
2021-03-22 00:32:47 +00:00
|
|
|
|
|
|
|
if let Some(table) = defered_orphan.print() {
|
|
|
|
for (sid, cnt) in table.iter() {
|
|
|
|
warn!(?sid, ?cnt, "recv messages with orphan stream");
|
|
|
|
}
|
|
|
|
}
|
2020-03-22 13:47:21 +00:00
|
|
|
}
|
2021-02-14 17:45:12 +00:00
|
|
|
trace!("receiving no longer possible, closing all streams");
|
|
|
|
for (_, si) in self.streams.write().await.drain() {
|
2021-04-07 21:17:09 +00:00
|
|
|
si.send_closed.store(true, Ordering::SeqCst);
|
2021-02-14 17:45:12 +00:00
|
|
|
self.metrics.streams_closed(&self.remote_pid_string);
|
|
|
|
}
|
2021-01-22 16:09:20 +00:00
|
|
|
trace!("Stop recv_mgr");
|
|
|
|
self.shutdown_barrier
|
2021-04-07 21:17:09 +00:00
|
|
|
.fetch_sub(Self::BARR_RECV, Ordering::SeqCst);
|
2020-03-22 13:47:21 +00:00
|
|
|
}
|
|
|
|
|
2020-05-15 12:29:17 +00:00
|
|
|
async fn create_channel_mgr(
|
2020-05-04 13:27:58 +00:00
|
|
|
&self,
|
2020-07-14 22:18:04 +00:00
|
|
|
s2b_create_channel_r: mpsc::UnboundedReceiver<S2bCreateChannel>,
|
2021-01-22 16:09:20 +00:00
|
|
|
b2b_add_send_protocol_s: mpsc::UnboundedSender<(Cid, SendProtocols)>,
|
|
|
|
b2b_add_recv_protocol_s: mpsc::UnboundedSender<(Cid, RecvProtocols)>,
|
2020-05-04 13:27:58 +00:00
|
|
|
) {
|
2021-01-15 13:04:32 +00:00
|
|
|
let s2b_create_channel_r = UnboundedReceiverStream::new(s2b_create_channel_r);
|
2020-05-15 12:29:17 +00:00
|
|
|
s2b_create_channel_r
|
2021-01-22 16:09:20 +00:00
|
|
|
.for_each_concurrent(None, |(cid, _, protocol, b2s_create_channel_done_s)| {
|
|
|
|
// This channel is now configured, and we are running it in scope of the
|
|
|
|
// participant.
|
|
|
|
let channels = Arc::clone(&self.channels);
|
|
|
|
let b2b_add_send_protocol_s = b2b_add_send_protocol_s.clone();
|
|
|
|
let b2b_add_recv_protocol_s = b2b_add_recv_protocol_s.clone();
|
|
|
|
async move {
|
|
|
|
let mut lock = channels.write().await;
|
|
|
|
let mut channel_no = lock.len();
|
|
|
|
lock.insert(
|
|
|
|
cid,
|
|
|
|
Mutex::new(ChannelInfo {
|
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
|
|
|
cid,
|
2021-01-22 16:09:20 +00:00
|
|
|
cid_string: cid.to_string(),
|
|
|
|
}),
|
|
|
|
);
|
|
|
|
drop(lock);
|
|
|
|
let (send, recv) = protocol.split();
|
|
|
|
b2b_add_send_protocol_s.send((cid, send)).unwrap();
|
|
|
|
b2b_add_recv_protocol_s.send((cid, recv)).unwrap();
|
|
|
|
b2s_create_channel_done_s.send(()).unwrap();
|
2021-02-14 17:45:12 +00:00
|
|
|
if channel_no > 5 {
|
|
|
|
debug!(?channel_no, "metrics will overwrite channel #5");
|
|
|
|
channel_no = 5;
|
Fixing the DEADLOCK in handshake -> channel creation
- this bug was initially called imbris bug, as it happened on his runners and i couldn't reproduce it locally at fist :)
- When in a Handshake a seperate mpsc::Channel was created for (Cid, Frame) transport
however the protocol could already catch non handshake data any more and push in into this
mpsc::Channel.
Then this channel got dropped and a fresh one was created for the network::Channel.
These droped Frames are ofc a BUG!
I tried multiple things to solve this:
- dont create a new mpsc::Channel, but instead bind it to the Protocol itself and always use 1.
This would work theoretically, but in bParticipant side we are using 1 mpsc::Channel<(Cid, Frame)>
to handle ALL the network::channel.
If now ever Protocol would have it's own, and with that every network::Channel had it's own it would no longer work out
Bad Idea...
- using the first method but creating the mpsc::Channel inside the scheduler instead protocol neither works, as the
scheduler doesnt know the remote_pid yet
- i dont want a hack to say the protocol only listen to 2 messages and then stop no matter what
So i switched over to the simply method now:
- Do everything like before with 2 mpsc::Channels
- after the handshake. close the receiver and listen for all remaining (cid, frame) combinations
- when starting the channel, reapply them to the new sender/listener combination
- added tracing
- switched Protocol RwLock to Mutex, as it's only ever 1
- Additionally changed the layout and introduces the c2w_frame_s and w2s_cid_frame_s name schema
- Fixed a bug in scheduler which WOULD cause a DEADLOCK if handshake would fail
- fixd a but in api_send_send_main, i need to store the stream_p otherwise it's immeadiatly closed and a stream_a.send() isn't guaranteed
- add extra test to verify that a send message is received even if the Stream is already closed
- changed OutGoing to Outgoing
- fixed a bug that `metrics.tick()` was never called
- removed 2 unused nightly features and added `deny_code`
2020-06-03 07:13:00 +00:00
|
|
|
}
|
2021-02-14 17:45:12 +00:00
|
|
|
self.metrics
|
|
|
|
.channels_connected(&self.remote_pid_string, channel_no, cid);
|
2021-01-22 16:09:20 +00:00
|
|
|
}
|
|
|
|
})
|
2020-05-04 13:27:58 +00:00
|
|
|
.await;
|
2020-07-05 22:13:53 +00:00
|
|
|
trace!("Stop create_channel_mgr");
|
2021-01-22 16:09:20 +00:00
|
|
|
self.shutdown_barrier
|
2021-04-07 21:17:09 +00:00
|
|
|
.fetch_sub(Self::BARR_CHANNEL, Ordering::SeqCst);
|
2020-05-04 13:27:58 +00:00
|
|
|
}
|
|
|
|
|
2021-01-22 16:09:20 +00:00
|
|
|
/// sink shutdown:
|
|
|
|
/// Situation AS, AR, BS, BR. A wants to close.
|
|
|
|
/// AS shutdown.
|
|
|
|
/// BR notices shutdown and tries to stops BS. (success)
|
|
|
|
/// BS shutdown
|
|
|
|
/// AR notices shutdown and tries to stop AS. (fails)
|
|
|
|
/// For the case where BS didn't get shutdowned, e.g. by a handing situation
|
|
|
|
/// on the remote, we have a timeout to also force close AR.
|
|
|
|
///
|
|
|
|
/// This fn will:
|
|
|
|
/// - 1. stop api to interact with bparticipant by closing sendmsg and
|
|
|
|
/// openstream
|
|
|
|
/// - 2. stop the send_mgr (it will take care of clearing the
|
|
|
|
/// queue and finish with a Shutdown)
|
|
|
|
/// - (3). force stop recv after 60
|
|
|
|
/// seconds
|
|
|
|
/// - (4). this fn finishes last and afterwards BParticipant
|
|
|
|
/// drops
|
|
|
|
///
|
|
|
|
/// before calling this fn, make sure `s2b_create_channel` is closed!
|
2020-07-11 12:34:01 +00:00
|
|
|
/// If BParticipant kills itself managers stay active till this function is
|
|
|
|
/// called by api to get the result status
|
2020-05-15 12:29:17 +00:00
|
|
|
async fn participant_shutdown_mgr(
|
2020-04-08 14:26:42 +00:00
|
|
|
&self,
|
2020-07-14 22:18:04 +00:00
|
|
|
s2b_shutdown_bparticipant_r: oneshot::Receiver<S2bShutdownBparticipant>,
|
2021-01-22 16:09:20 +00:00
|
|
|
b2b_close_send_protocol_s: async_channel::Sender<Cid>,
|
|
|
|
b2b_force_close_recv_protocol_s: async_channel::Sender<Cid>,
|
2020-04-08 14:26:42 +00:00
|
|
|
) {
|
2021-01-22 16:09:20 +00:00
|
|
|
let wait_for_manager = || async {
|
|
|
|
let mut sleep = 0.01f64;
|
|
|
|
loop {
|
2021-04-07 21:17:09 +00:00
|
|
|
let bytes = self.shutdown_barrier.load(Ordering::SeqCst);
|
2021-01-22 16:09:20 +00:00
|
|
|
if bytes == 0 {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
sleep *= 1.4;
|
|
|
|
tokio::time::sleep(Duration::from_secs_f64(sleep)).await;
|
|
|
|
if sleep > 0.2 {
|
|
|
|
trace!(?bytes, "wait for mgr to close");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2021-02-14 17:45:12 +00:00
|
|
|
|
Added non-admin moderators and timed bans.
The security model has been updated to reflect this change (for example,
moderators cannot revert a ban by an administrator). Ban history is
also now recorded in the ban file, and much more information about the
ban is stored (whitelists and administrators also have extra
information).
To support the new information without losing important information,
this commit also introduces a new migration path for editable settings
(both from legacy to the new format, and between versions). Examples
of how to do this correctly, and migrate to new versions of a settings
file, are in the settings/ subdirectory.
As part of this effort, editable settings have been revamped to
guarantee atomic saves (due to the increased amount of information in
each file), some latent bugs in networking were fixed, and server-cli
has been updated to go through StructOpt for both calls through TUI
and argv, greatly simplifying parsing logic.
2021-05-08 18:22:21 +00:00
|
|
|
let awaited = s2b_shutdown_bparticipant_r.await.ok();
|
2021-02-10 10:37:42 +00:00
|
|
|
debug!("participant_shutdown_mgr triggered. Closing all streams for send");
|
2020-08-21 12:01:49 +00:00
|
|
|
{
|
2021-01-22 16:09:20 +00:00
|
|
|
let lock = self.streams.read().await;
|
|
|
|
for si in lock.values() {
|
2021-04-07 21:17:09 +00:00
|
|
|
si.send_closed.store(true, Ordering::SeqCst);
|
2020-08-21 12:01:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-22 16:09:20 +00:00
|
|
|
let lock = self.channels.read().await;
|
|
|
|
assert!(
|
|
|
|
!lock.is_empty(),
|
|
|
|
"no channel existed remote_pid={}",
|
|
|
|
self.remote_pid
|
|
|
|
);
|
|
|
|
for cid in lock.keys() {
|
|
|
|
if let Err(e) = b2b_close_send_protocol_s.send(*cid).await {
|
2020-08-18 15:52:19 +00:00
|
|
|
debug!(
|
|
|
|
?e,
|
|
|
|
?cid,
|
2021-01-22 16:09:20 +00:00
|
|
|
"closing send_mgr may fail if we got a recv error simultaneously"
|
2020-08-18 15:52:19 +00:00
|
|
|
);
|
2021-01-22 16:09:20 +00:00
|
|
|
}
|
2020-07-11 12:34:01 +00:00
|
|
|
}
|
2021-01-22 16:09:20 +00:00
|
|
|
drop(lock);
|
2020-07-11 12:34:01 +00:00
|
|
|
|
2021-01-22 16:09:20 +00:00
|
|
|
trace!("wait for other managers");
|
Added non-admin moderators and timed bans.
The security model has been updated to reflect this change (for example,
moderators cannot revert a ban by an administrator). Ban history is
also now recorded in the ban file, and much more information about the
ban is stored (whitelists and administrators also have extra
information).
To support the new information without losing important information,
this commit also introduces a new migration path for editable settings
(both from legacy to the new format, and between versions). Examples
of how to do this correctly, and migrate to new versions of a settings
file, are in the settings/ subdirectory.
As part of this effort, editable settings have been revamped to
guarantee atomic saves (due to the increased amount of information in
each file), some latent bugs in networking were fixed, and server-cli
has been updated to go through StructOpt for both calls through TUI
and argv, greatly simplifying parsing logic.
2021-05-08 18:22:21 +00:00
|
|
|
let timeout = tokio::time::sleep(
|
|
|
|
awaited
|
|
|
|
.as_ref()
|
|
|
|
.map(|(timeout_time, _)| *timeout_time)
|
|
|
|
.unwrap_or_default(),
|
|
|
|
);
|
2021-01-22 16:09:20 +00:00
|
|
|
let timeout = tokio::select! {
|
|
|
|
_ = wait_for_manager() => false,
|
|
|
|
_ = timeout => true,
|
|
|
|
};
|
|
|
|
if timeout {
|
|
|
|
warn!("timeout triggered: for killing recv");
|
|
|
|
let lock = self.channels.read().await;
|
|
|
|
for cid in lock.keys() {
|
|
|
|
if let Err(e) = b2b_force_close_recv_protocol_s.send(*cid).await {
|
|
|
|
debug!(
|
|
|
|
?e,
|
|
|
|
?cid,
|
|
|
|
"closing recv_mgr may fail if we got a recv error simultaneously"
|
|
|
|
);
|
|
|
|
}
|
2020-07-11 12:34:01 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-22 16:09:20 +00:00
|
|
|
trace!("wait again");
|
|
|
|
wait_for_manager().await;
|
2020-07-16 19:39:33 +00:00
|
|
|
|
Added non-admin moderators and timed bans.
The security model has been updated to reflect this change (for example,
moderators cannot revert a ban by an administrator). Ban history is
also now recorded in the ban file, and much more information about the
ban is stored (whitelists and administrators also have extra
information).
To support the new information without losing important information,
this commit also introduces a new migration path for editable settings
(both from legacy to the new format, and between versions). Examples
of how to do this correctly, and migrate to new versions of a settings
file, are in the settings/ subdirectory.
As part of this effort, editable settings have been revamped to
guarantee atomic saves (due to the increased amount of information in
each file), some latent bugs in networking were fixed, and server-cli
has been updated to go through StructOpt for both calls through TUI
and argv, greatly simplifying parsing logic.
2021-05-08 18:22:21 +00:00
|
|
|
if let Some((_, sender)) = awaited {
|
|
|
|
// Don't care whether this send succeeded since if the other end is dropped
|
|
|
|
// there's nothing to synchronize on.
|
|
|
|
let _ = sender.send(Ok(()));
|
2021-04-14 14:17:06 +00:00
|
|
|
}
|
2020-04-08 14:26:42 +00:00
|
|
|
|
2020-07-14 23:34:41 +00:00
|
|
|
#[cfg(feature = "metrics")]
|
2021-01-22 16:09:20 +00:00
|
|
|
self.metrics.participants_disconnected_total.inc();
|
2021-03-25 17:28:50 +00:00
|
|
|
self.metrics.cleanup_participant(&self.remote_pid_string);
|
2021-01-22 16:09:20 +00:00
|
|
|
trace!("Stop participant_shutdown_mgr");
|
2020-10-14 12:29:39 +00:00
|
|
|
}
|
|
|
|
|
2021-01-22 16:09:20 +00:00
|
|
|
/// Stopping API and participant usage
|
|
|
|
/// Protocol will take care of the order of the frame
|
2021-02-14 17:45:12 +00:00
|
|
|
async fn delete_stream(&self, sid: Sid) {
|
2021-01-22 16:09:20 +00:00
|
|
|
let stream = { self.streams.write().await.remove(&sid) };
|
|
|
|
match stream {
|
|
|
|
Some(si) => {
|
2021-04-07 21:17:09 +00:00
|
|
|
si.send_closed.store(true, Ordering::SeqCst);
|
2021-01-22 16:09:20 +00:00
|
|
|
si.b2a_msg_recv_s.lock().await.close();
|
|
|
|
},
|
|
|
|
None => {
|
|
|
|
trace!("Couldn't find the stream, might be simultaneous close from local/remote")
|
|
|
|
},
|
2020-03-22 13:47:21 +00:00
|
|
|
}
|
2021-02-14 17:45:12 +00:00
|
|
|
self.metrics.streams_closed(&self.remote_pid_string);
|
2020-03-22 13:47:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn create_stream(
|
|
|
|
&self,
|
|
|
|
sid: Sid,
|
|
|
|
prio: Prio,
|
|
|
|
promises: Promises,
|
2021-01-22 16:09:20 +00:00
|
|
|
guaranteed_bandwidth: Bandwidth,
|
2020-03-22 13:47:21 +00:00
|
|
|
) -> Stream {
|
2021-02-14 17:45:12 +00:00
|
|
|
let (b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded::<Bytes>();
|
2020-07-16 19:39:33 +00:00
|
|
|
let send_closed = Arc::new(AtomicBool::new(false));
|
2020-05-15 12:29:17 +00:00
|
|
|
self.streams.write().await.insert(sid, StreamInfo {
|
|
|
|
prio,
|
|
|
|
promises,
|
2020-09-27 16:20:40 +00:00
|
|
|
send_closed: Arc::clone(&send_closed),
|
2020-08-23 19:43:17 +00:00
|
|
|
b2a_msg_recv_s: Mutex::new(b2a_msg_recv_s),
|
2020-05-15 12:29:17 +00:00
|
|
|
});
|
2021-02-14 17:45:12 +00:00
|
|
|
self.metrics.streams_opened(&self.remote_pid_string);
|
2021-02-18 18:33:20 +00:00
|
|
|
|
|
|
|
let (a2b_msg_s, a2b_close_stream_s) = {
|
|
|
|
let lock = self.open_stream_channels.lock().await;
|
|
|
|
match &*lock {
|
|
|
|
Some(osi) => (osi.a2b_msg_s.clone(), osi.a2b_close_stream_s.clone()),
|
|
|
|
None => {
|
|
|
|
// This Stream will not be able to send. feed it some "Dummy" Channels.
|
|
|
|
debug!(
|
|
|
|
"It seems that a stream was requested to open, while the send_mgr is \
|
|
|
|
already closed"
|
|
|
|
);
|
|
|
|
let (a2b_msg_s, _) = crossbeam_channel::unbounded();
|
|
|
|
let (a2b_close_stream_s, _) = mpsc::unbounded_channel();
|
|
|
|
(a2b_msg_s, a2b_close_stream_s)
|
|
|
|
},
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2020-03-22 13:47:21 +00:00
|
|
|
Stream::new(
|
2021-02-10 10:37:42 +00:00
|
|
|
self.local_pid,
|
2020-03-22 13:47:21 +00:00
|
|
|
self.remote_pid,
|
|
|
|
sid,
|
|
|
|
prio,
|
|
|
|
promises,
|
2021-01-22 16:09:20 +00:00
|
|
|
guaranteed_bandwidth,
|
2020-07-16 19:39:33 +00:00
|
|
|
send_closed,
|
2021-02-18 18:33:20 +00:00
|
|
|
a2b_msg_s,
|
2020-05-15 12:29:17 +00:00
|
|
|
b2a_msg_recv_r,
|
2021-02-18 18:33:20 +00:00
|
|
|
a2b_close_stream_s,
|
2020-03-22 13:47:21 +00:00
|
|
|
)
|
|
|
|
}
|
2021-01-22 16:09:20 +00:00
|
|
|
}
|
2020-07-05 18:14:47 +00:00
|
|
|
|
2021-01-22 16:09:20 +00:00
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use super::*;
|
2021-04-27 15:59:36 +00:00
|
|
|
use network_protocol::{ProtocolMetricCache, ProtocolMetrics};
|
2021-01-22 16:09:20 +00:00
|
|
|
use tokio::{
|
|
|
|
runtime::Runtime,
|
|
|
|
sync::{mpsc, oneshot},
|
|
|
|
task::JoinHandle,
|
|
|
|
};
|
|
|
|
|
2021-02-14 17:45:12 +00:00
|
|
|
#[allow(clippy::type_complexity)]
|
2021-01-22 16:09:20 +00:00
|
|
|
fn mock_bparticipant() -> (
|
|
|
|
Arc<Runtime>,
|
|
|
|
mpsc::UnboundedSender<A2bStreamOpen>,
|
|
|
|
mpsc::UnboundedReceiver<Stream>,
|
|
|
|
mpsc::UnboundedSender<S2bCreateChannel>,
|
|
|
|
oneshot::Sender<S2bShutdownBparticipant>,
|
|
|
|
mpsc::UnboundedReceiver<B2sPrioStatistic>,
|
2021-03-25 17:28:50 +00:00
|
|
|
watch::Receiver<f32>,
|
2021-01-22 16:09:20 +00:00
|
|
|
JoinHandle<()>,
|
|
|
|
) {
|
|
|
|
let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap());
|
|
|
|
let runtime_clone = Arc::clone(&runtime);
|
|
|
|
|
|
|
|
let (b2s_prio_statistic_s, b2s_prio_statistic_r) =
|
|
|
|
mpsc::unbounded_channel::<B2sPrioStatistic>();
|
|
|
|
|
|
|
|
let (
|
|
|
|
bparticipant,
|
|
|
|
a2b_open_stream_s,
|
|
|
|
b2a_stream_opened_r,
|
|
|
|
s2b_create_channel_s,
|
|
|
|
s2b_shutdown_bparticipant_s,
|
2021-03-25 17:28:50 +00:00
|
|
|
b2a_bandwidth_stats_r,
|
2021-01-22 16:09:20 +00:00
|
|
|
) = runtime_clone.block_on(async move {
|
2021-02-10 10:37:42 +00:00
|
|
|
let local_pid = Pid::fake(0);
|
|
|
|
let remote_pid = Pid::fake(1);
|
2021-01-22 16:09:20 +00:00
|
|
|
let sid = Sid::new(1000);
|
2021-02-10 10:37:42 +00:00
|
|
|
let metrics = Arc::new(NetworkMetrics::new(&local_pid).unwrap());
|
2021-01-22 16:09:20 +00:00
|
|
|
|
2021-02-10 10:37:42 +00:00
|
|
|
BParticipant::new(local_pid, remote_pid, sid, Arc::clone(&metrics))
|
2021-01-22 16:09:20 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
let handle = runtime_clone.spawn(bparticipant.run(b2s_prio_statistic_s));
|
|
|
|
(
|
|
|
|
runtime_clone,
|
|
|
|
a2b_open_stream_s,
|
|
|
|
b2a_stream_opened_r,
|
|
|
|
s2b_create_channel_s,
|
|
|
|
s2b_shutdown_bparticipant_s,
|
|
|
|
b2s_prio_statistic_r,
|
2021-03-25 17:28:50 +00:00
|
|
|
b2a_bandwidth_stats_r,
|
2021-01-22 16:09:20 +00:00
|
|
|
handle,
|
|
|
|
)
|
2020-08-21 12:01:49 +00:00
|
|
|
}
|
|
|
|
|
2021-01-22 16:09:20 +00:00
|
|
|
async fn mock_mpsc(
|
|
|
|
cid: Cid,
|
|
|
|
_runtime: &Arc<Runtime>,
|
|
|
|
create_channel: &mut mpsc::UnboundedSender<S2bCreateChannel>,
|
|
|
|
) -> Protocols {
|
|
|
|
let (s1, r1) = mpsc::channel(100);
|
|
|
|
let (s2, r2) = mpsc::channel(100);
|
2021-04-27 15:59:36 +00:00
|
|
|
let met = Arc::new(ProtocolMetrics::new().unwrap());
|
|
|
|
let metrics = ProtocolMetricCache::new(&cid.to_string(), Arc::clone(&met));
|
|
|
|
let p1 = Protocols::new_mpsc(s1, r2, metrics);
|
2021-01-22 16:09:20 +00:00
|
|
|
let (complete_s, complete_r) = oneshot::channel();
|
|
|
|
create_channel
|
|
|
|
.send((cid, Sid::new(0), p1, complete_s))
|
|
|
|
.unwrap();
|
|
|
|
complete_r.await.unwrap();
|
2021-04-27 15:59:36 +00:00
|
|
|
let metrics = ProtocolMetricCache::new(&cid.to_string(), met);
|
|
|
|
Protocols::new_mpsc(s2, r1, metrics)
|
2021-01-22 16:09:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn close_bparticipant_by_timeout_during_close() {
|
|
|
|
let (
|
|
|
|
runtime,
|
|
|
|
a2b_open_stream_s,
|
|
|
|
b2a_stream_opened_r,
|
|
|
|
mut s2b_create_channel_s,
|
|
|
|
s2b_shutdown_bparticipant_s,
|
|
|
|
b2s_prio_statistic_r,
|
2021-03-25 17:28:50 +00:00
|
|
|
_b2a_bandwidth_stats_r,
|
2021-01-22 16:09:20 +00:00
|
|
|
handle,
|
|
|
|
) = mock_bparticipant();
|
|
|
|
|
|
|
|
let _remote = runtime.block_on(mock_mpsc(0, &runtime, &mut s2b_create_channel_s));
|
|
|
|
std::thread::sleep(Duration::from_millis(50));
|
|
|
|
|
|
|
|
let (s, r) = oneshot::channel();
|
|
|
|
let before = Instant::now();
|
|
|
|
runtime.block_on(async {
|
|
|
|
drop(s2b_create_channel_s);
|
|
|
|
s2b_shutdown_bparticipant_s
|
|
|
|
.send((Duration::from_secs(1), s))
|
|
|
|
.unwrap();
|
|
|
|
r.await.unwrap().unwrap();
|
|
|
|
});
|
|
|
|
assert!(
|
|
|
|
before.elapsed() > Duration::from_millis(900),
|
|
|
|
"timeout wasn't triggered"
|
|
|
|
);
|
|
|
|
|
|
|
|
runtime.block_on(handle).unwrap();
|
|
|
|
|
|
|
|
drop((a2b_open_stream_s, b2a_stream_opened_r, b2s_prio_statistic_r));
|
|
|
|
drop(runtime);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn close_bparticipant_cleanly() {
|
|
|
|
let (
|
|
|
|
runtime,
|
|
|
|
a2b_open_stream_s,
|
|
|
|
b2a_stream_opened_r,
|
|
|
|
mut s2b_create_channel_s,
|
|
|
|
s2b_shutdown_bparticipant_s,
|
|
|
|
b2s_prio_statistic_r,
|
2021-03-25 17:28:50 +00:00
|
|
|
_b2a_bandwidth_stats_r,
|
2021-01-22 16:09:20 +00:00
|
|
|
handle,
|
|
|
|
) = mock_bparticipant();
|
|
|
|
|
|
|
|
let remote = runtime.block_on(mock_mpsc(0, &runtime, &mut s2b_create_channel_s));
|
|
|
|
std::thread::sleep(Duration::from_millis(50));
|
|
|
|
|
|
|
|
let (s, r) = oneshot::channel();
|
|
|
|
let before = Instant::now();
|
|
|
|
runtime.block_on(async {
|
|
|
|
drop(s2b_create_channel_s);
|
|
|
|
s2b_shutdown_bparticipant_s
|
|
|
|
.send((Duration::from_secs(2), s))
|
|
|
|
.unwrap();
|
|
|
|
drop(remote); // remote needs to be dropped as soon as local.sender is closed
|
|
|
|
r.await.unwrap().unwrap();
|
|
|
|
});
|
|
|
|
assert!(
|
|
|
|
before.elapsed() < Duration::from_millis(1900),
|
|
|
|
"timeout was triggered"
|
|
|
|
);
|
|
|
|
|
|
|
|
runtime.block_on(handle).unwrap();
|
|
|
|
|
|
|
|
drop((a2b_open_stream_s, b2a_stream_opened_r, b2s_prio_statistic_r));
|
|
|
|
drop(runtime);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn create_stream() {
|
|
|
|
let (
|
|
|
|
runtime,
|
|
|
|
a2b_open_stream_s,
|
|
|
|
b2a_stream_opened_r,
|
|
|
|
mut s2b_create_channel_s,
|
|
|
|
s2b_shutdown_bparticipant_s,
|
|
|
|
b2s_prio_statistic_r,
|
2021-03-25 17:28:50 +00:00
|
|
|
_b2a_bandwidth_stats_r,
|
2021-01-22 16:09:20 +00:00
|
|
|
handle,
|
|
|
|
) = mock_bparticipant();
|
|
|
|
|
|
|
|
let remote = runtime.block_on(mock_mpsc(0, &runtime, &mut s2b_create_channel_s));
|
|
|
|
std::thread::sleep(Duration::from_millis(50));
|
|
|
|
|
|
|
|
// created stream
|
|
|
|
let (rs, mut rr) = remote.split();
|
|
|
|
let (stream_sender, _stream_receiver) = oneshot::channel();
|
|
|
|
a2b_open_stream_s
|
|
|
|
.send((7u8, Promises::ENCRYPTED, 1_000_000, stream_sender))
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let stream_event = runtime.block_on(rr.recv()).unwrap();
|
|
|
|
match stream_event {
|
|
|
|
ProtocolEvent::OpenStream {
|
|
|
|
sid,
|
|
|
|
prio,
|
|
|
|
promises,
|
|
|
|
guaranteed_bandwidth,
|
|
|
|
} => {
|
|
|
|
assert_eq!(sid, Sid::new(1000));
|
|
|
|
assert_eq!(prio, 7u8);
|
|
|
|
assert_eq!(promises, Promises::ENCRYPTED);
|
|
|
|
assert_eq!(guaranteed_bandwidth, 1_000_000);
|
|
|
|
},
|
|
|
|
_ => panic!("wrong event"),
|
|
|
|
};
|
|
|
|
|
|
|
|
let (s, r) = oneshot::channel();
|
|
|
|
runtime.block_on(async {
|
|
|
|
drop(s2b_create_channel_s);
|
|
|
|
s2b_shutdown_bparticipant_s
|
|
|
|
.send((Duration::from_secs(1), s))
|
|
|
|
.unwrap();
|
|
|
|
drop((rs, rr));
|
|
|
|
r.await.unwrap().unwrap();
|
|
|
|
});
|
|
|
|
|
|
|
|
runtime.block_on(handle).unwrap();
|
|
|
|
|
|
|
|
drop((a2b_open_stream_s, b2a_stream_opened_r, b2s_prio_statistic_r));
|
|
|
|
drop(runtime);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn created_stream() {
|
|
|
|
let (
|
|
|
|
runtime,
|
|
|
|
a2b_open_stream_s,
|
|
|
|
mut b2a_stream_opened_r,
|
|
|
|
mut s2b_create_channel_s,
|
|
|
|
s2b_shutdown_bparticipant_s,
|
|
|
|
b2s_prio_statistic_r,
|
2021-03-25 17:28:50 +00:00
|
|
|
_b2a_bandwidth_stats_r,
|
2021-01-22 16:09:20 +00:00
|
|
|
handle,
|
|
|
|
) = mock_bparticipant();
|
|
|
|
|
|
|
|
let remote = runtime.block_on(mock_mpsc(0, &runtime, &mut s2b_create_channel_s));
|
|
|
|
std::thread::sleep(Duration::from_millis(50));
|
|
|
|
|
|
|
|
// create stream
|
|
|
|
let (mut rs, rr) = remote.split();
|
|
|
|
runtime
|
|
|
|
.block_on(rs.send(ProtocolEvent::OpenStream {
|
|
|
|
sid: Sid::new(1000),
|
|
|
|
prio: 9u8,
|
|
|
|
promises: Promises::ORDERED,
|
|
|
|
guaranteed_bandwidth: 1_000_000,
|
|
|
|
}))
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let stream = runtime.block_on(b2a_stream_opened_r.recv()).unwrap();
|
2021-03-25 11:22:31 +00:00
|
|
|
assert_eq!(stream.params().promises, Promises::ORDERED);
|
2021-01-22 16:09:20 +00:00
|
|
|
|
|
|
|
let (s, r) = oneshot::channel();
|
|
|
|
runtime.block_on(async {
|
|
|
|
drop(s2b_create_channel_s);
|
|
|
|
s2b_shutdown_bparticipant_s
|
|
|
|
.send((Duration::from_secs(1), s))
|
|
|
|
.unwrap();
|
|
|
|
drop((rs, rr));
|
|
|
|
r.await.unwrap().unwrap();
|
|
|
|
});
|
|
|
|
|
|
|
|
runtime.block_on(handle).unwrap();
|
|
|
|
|
|
|
|
drop((a2b_open_stream_s, b2a_stream_opened_r, b2s_prio_statistic_r));
|
|
|
|
drop(runtime);
|
2020-07-05 18:14:47 +00:00
|
|
|
}
|
2020-03-22 13:47:21 +00:00
|
|
|
}
|