move prios from scheduler to participant in oder to fixing closing of stream/participant

however i need to coordinate the prio adjustments in scheduler from now on, so that ParticipantA doesn't get all the network bandwith and ParticipantB nothing
This commit is contained in:
Marcel Märtens 2020-05-22 16:00:08 +02:00
parent bd69b2ae28
commit 8b839afcae
5 changed files with 130 additions and 173 deletions

View File

@ -14,7 +14,7 @@ use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
atomic::{AtomicBool, Ordering},
Arc,
},
};
@ -71,7 +71,7 @@ pub struct Stream {
mid: Mid,
prio: Prio,
promises: Promises,
a2b_msg_s: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
a2b_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutGoingMessage)>,
b2a_msg_recv_r: mpsc::UnboundedReceiver<InCommingMessage>,
closed: Arc<AtomicBool>,
a2b_close_stream_s: Option<mpsc::UnboundedSender<Sid>>,
@ -530,7 +530,7 @@ impl Stream {
sid: Sid,
prio: Prio,
promises: Promises,
a2b_msg_s: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
a2b_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutGoingMessage)>,
b2a_msg_recv_r: mpsc::UnboundedReceiver<InCommingMessage>,
closed: Arc<AtomicBool>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
@ -584,6 +584,7 @@ impl Stream {
/// [`send_raw`]: Stream::send_raw
/// [`recv`]: Stream::recv
/// [`Serialized`]: Serialize
#[inline]
pub fn send<M: Serialize>(&mut self, msg: M) -> Result<(), StreamError> {
self.send_raw(Arc::new(message::serialize(&msg)))
}
@ -624,13 +625,12 @@ impl Stream {
return Err(StreamError::StreamClosed);
}
//debug!(?messagebuffer, "sending a message");
self.a2b_msg_s
.send((self.prio, self.pid, self.sid, OutGoingMessage {
buffer: messagebuffer,
cursor: 0,
mid: self.mid,
sid: self.sid,
}))?;
self.a2b_msg_s.send((self.prio, self.sid, OutGoingMessage {
buffer: messagebuffer,
cursor: 0,
mid: self.mid,
sid: self.sid,
}))?;
self.mid += 1;
Ok(())
}
@ -643,6 +643,7 @@ impl Stream {
///
/// A [`StreamError`] will be returned in the error case, e.g. when the
/// `Stream` got closed already.
#[inline]
pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError> {
Ok(message::deserialize(self.recv_raw().await?))
}

View File

@ -18,7 +18,6 @@ use tracing::*;
pub(crate) struct Channel {
cid: Cid,
metrics: Arc<NetworkMetrics>,
remote_pid: Pid,
to_wire_receiver: Option<mpsc::UnboundedReceiver<Frame>>,
read_stop_receiver: Option<oneshot::Receiver<()>>,
@ -28,14 +27,12 @@ impl Channel {
pub fn new(
cid: u64,
remote_pid: Pid,
metrics: Arc<NetworkMetrics>,
) -> (Self, mpsc::UnboundedSender<Frame>, oneshot::Sender<()>) {
let (to_wire_sender, to_wire_receiver) = mpsc::unbounded::<Frame>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
(
Self {
cid,
metrics,
remote_pid,
to_wire_receiver: Some(to_wire_receiver),
read_stop_receiver: Some(read_stop_receiver),

View File

@ -3,6 +3,7 @@ use crate::{
channel::Channel,
message::{InCommingMessage, MessageBuffer, OutGoingMessage},
metrics::NetworkMetrics,
prios::PrioManager,
protocols::Protocols,
types::{Cid, Frame, Pid, Prio, Promises, Sid},
};
@ -15,10 +16,10 @@ use futures::{
stream::StreamExt,
};
use std::{
collections::HashMap,
collections::{HashMap, VecDeque},
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex,
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
};
use tracing::*;
@ -45,9 +46,6 @@ struct ControlChannels {
s2b_create_channel_r: mpsc::UnboundedReceiver<(Cid, Sid, Protocols, oneshot::Sender<()>)>,
a2b_close_stream_r: mpsc::UnboundedReceiver<Sid>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
a2p_msg_s: Arc<Mutex<std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>>>, //api stream
p2b_notify_empty_stream_s: Arc<Mutex<std::sync::mpsc::Sender<(Pid, Sid, oneshot::Sender<()>)>>>,
s2b_frame_r: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>, //scheduler
s2b_shutdown_bparticipant_r: oneshot::Receiver<oneshot::Sender<async_std::io::Result<()>>>, /* own */
}
@ -67,21 +65,17 @@ impl BParticipant {
remote_pid: Pid,
offset_sid: Sid,
metrics: Arc<NetworkMetrics>,
a2p_msg_s: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
p2b_notify_empty_stream_s: std::sync::mpsc::Sender<(Pid, Sid, oneshot::Sender<()>)>,
) -> (
Self,
mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>,
mpsc::UnboundedReceiver<Stream>,
mpsc::UnboundedSender<(Cid, Sid, Protocols, oneshot::Sender<()>)>,
mpsc::UnboundedSender<(Pid, Sid, Frame)>,
oneshot::Sender<oneshot::Sender<async_std::io::Result<()>>>,
) {
let (a2b_steam_open_s, a2b_steam_open_r) =
mpsc::unbounded::<(Prio, Promises, oneshot::Sender<Stream>)>();
let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded::<Stream>();
let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded();
let (s2b_frame_s, s2b_frame_r) = mpsc::unbounded::<(Pid, Sid, Frame)>();
let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel();
let (s2b_create_channel_s, s2b_create_channel_r) =
mpsc::unbounded::<(Cid, Sid, Protocols, oneshot::Sender<()>)>();
@ -92,9 +86,6 @@ impl BParticipant {
s2b_create_channel_r,
a2b_close_stream_r,
a2b_close_stream_s,
a2p_msg_s: Arc::new(Mutex::new(a2p_msg_s)),
p2b_notify_empty_stream_s: Arc::new(Mutex::new(p2b_notify_empty_stream_s)),
s2b_frame_r,
s2b_shutdown_bparticipant_r,
});
@ -111,7 +102,6 @@ impl BParticipant {
a2b_steam_open_s,
b2a_stream_opened_r,
s2b_create_channel_s,
s2b_frame_s,
s2b_shutdown_bparticipant_s,
)
}
@ -119,39 +109,86 @@ impl BParticipant {
pub async fn run(mut self) {
//those managers that listen on api::Participant need an additional oneshot for
// shutdown scenario, those handled by scheduler will be closed by it.
let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel();
let (shutdown_send_mgr_sender, shutdown_send_mgr_receiver) = oneshot::channel();
let (shutdown_stream_close_mgr_sender, shutdown_stream_close_mgr_receiver) =
oneshot::channel();
let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel();
let (b2b_prios_flushed_s, b2b_prios_flushed_r) = oneshot::channel();
let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<(Cid, Frame)>();
let (prios, a2p_msg_s, p2b_notify_empty_stream_s) = PrioManager::new();
let run_channels = self.run_channels.take().unwrap();
futures::join!(
self.open_mgr(
run_channels.a2b_steam_open_r,
run_channels.a2b_close_stream_s.clone(),
run_channels.a2p_msg_s.clone(),
a2p_msg_s.clone(),
shutdown_open_mgr_receiver,
),
self.handle_frames_mgr(
w2b_frames_r,
run_channels.b2a_stream_opened_s,
run_channels.a2b_close_stream_s,
run_channels.a2p_msg_s.clone(),
a2p_msg_s.clone(),
),
self.create_channel_mgr(run_channels.s2b_create_channel_r, w2b_frames_s,),
self.send_mgr(run_channels.s2b_frame_r),
self.send_mgr(prios, shutdown_send_mgr_receiver, b2b_prios_flushed_s),
self.stream_close_mgr(
run_channels.a2b_close_stream_r,
shutdown_stream_close_mgr_receiver,
run_channels.p2b_notify_empty_stream_s,
p2b_notify_empty_stream_s,
),
self.participant_shutdown_mgr(
run_channels.s2b_shutdown_bparticipant_r,
vec!(shutdown_open_mgr_sender, shutdown_stream_close_mgr_sender)
b2b_prios_flushed_r,
vec!(
shutdown_send_mgr_sender,
shutdown_open_mgr_sender,
shutdown_stream_close_mgr_sender
)
),
);
}
async fn send_mgr(
&self,
mut prios: PrioManager,
mut shutdown_send_mgr_receiver: oneshot::Receiver<()>,
b2b_prios_flushed_s: oneshot::Sender<()>,
) {
//This time equals the MINIMUM Latency in average, so keep it down and //Todo:
// make it configureable or switch to await E.g. Prio 0 = await, prio 50
// wait for more messages
const TICK_TIME: std::time::Duration = std::time::Duration::from_millis(10);
const FRAMES_PER_TICK: usize = 10005;
self.running_mgr.fetch_add(1, Ordering::Relaxed);
let mut closing_up = false;
trace!("start send_mgr");
//while !self.closed.load(Ordering::Relaxed) {
loop {
let mut frames = VecDeque::new();
prios.fill_frames(FRAMES_PER_TICK, &mut frames).await;
let len = frames.len();
if len > 0 {
trace!("tick {}", len);
}
for (_, frame) in frames {
self.send_frame(frame).await;
}
async_std::task::sleep(TICK_TIME).await;
//shutdown after all msg are send!
if !closing_up && shutdown_send_mgr_receiver.try_recv().unwrap().is_some() {
closing_up = true;
}
if closing_up && (len == 0) {
break;
}
}
trace!("stop send_mgr");
b2b_prios_flushed_s.send(()).unwrap();
self.running_mgr.fetch_sub(1, Ordering::Relaxed);
}
async fn send_frame(&self, frame: Frame) {
// find out ideal channel here
//TODO: just take first
@ -175,11 +212,10 @@ impl BParticipant {
mut w2b_frames_r: mpsc::UnboundedReceiver<(Cid, Frame)>,
mut b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
a2p_msg_s: Arc<Mutex<std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>>>,
a2p_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutGoingMessage)>,
) {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start handle_frames");
let a2p_msg_s = { a2p_msg_s.lock().unwrap().clone() };
trace!("start handle_frames_mgr");
let mut messages = HashMap::new();
let pid_string = &self.remote_pid.to_string();
while let Some((cid, frame)) = w2b_frames_r.next().await {
@ -255,7 +291,7 @@ impl BParticipant {
_ => unreachable!("never reaches frame!"),
}
}
trace!("stop handle_frames");
trace!("stop handle_frames_mgr");
self.running_mgr.fetch_sub(1, Ordering::Relaxed);
}
@ -267,14 +303,14 @@ impl BParticipant {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start create_channel_mgr");
s2b_create_channel_r
.for_each_concurrent(None, |(cid, sid, protocol, b2s_create_channel_done_s)| {
.for_each_concurrent(None, |(cid, _, protocol, b2s_create_channel_done_s)| {
// This channel is now configured, and we are running it in scope of the
// participant.
let w2b_frames_s = w2b_frames_s.clone();
let channels = self.channels.clone();
async move {
let (channel, b2w_frame_s, b2r_read_shutdown) =
Channel::new(cid, self.remote_pid, self.metrics.clone());
Channel::new(cid, self.remote_pid);
channels.write().await.push(ChannelInfo {
cid,
b2w_frame_s,
@ -299,29 +335,15 @@ impl BParticipant {
self.running_mgr.fetch_sub(1, Ordering::Relaxed);
}
async fn send_mgr(&self, mut s2b_frame_r: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>) {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start send_mgr");
while let Some((_, sid, frame)) = s2b_frame_r.next().await {
self.send_frame(frame).await;
}
trace!("stop send_mgr");
self.running_mgr.fetch_sub(1, Ordering::Relaxed);
}
async fn open_mgr(
&self,
mut a2b_steam_open_r: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender<Stream>)>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
a2p_msg_s: Arc<Mutex<std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>>>,
a2p_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutGoingMessage)>,
shutdown_open_mgr_receiver: oneshot::Receiver<()>,
) {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start open_mgr");
let send_outgoing = {
//fighting the borrow checker ;)
a2p_msg_s.lock().unwrap().clone()
};
let mut stream_ids = self.offset_sid;
let mut shutdown_open_mgr_receiver = shutdown_open_mgr_receiver.fuse();
//from api or shutdown signal
@ -330,10 +352,10 @@ impl BParticipant {
_ = shutdown_open_mgr_receiver => None,
} {
debug!(?prio, ?promises, "got request to open a new steam");
let send_outgoing = send_outgoing.clone();
let a2p_msg_s = a2p_msg_s.clone();
let sid = stream_ids;
let stream = self
.create_stream(sid, prio, promises, send_outgoing, &a2b_close_stream_s)
.create_stream(sid, prio, promises, a2p_msg_s, &a2b_close_stream_s)
.await;
self.send_frame(Frame::OpenStream {
sid,
@ -355,6 +377,7 @@ impl BParticipant {
async fn participant_shutdown_mgr(
&self,
s2b_shutdown_bparticipant_r: oneshot::Receiver<oneshot::Sender<async_std::io::Result<()>>>,
b2b_prios_flushed_r: oneshot::Receiver<()>,
mut to_shutdown: Vec<oneshot::Sender<()>>,
) {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
@ -367,11 +390,12 @@ impl BParticipant {
};
}
debug!("closing all streams");
let mut streams = self.streams.write().await;
for (sid, si) in streams.drain() {
for (sid, si) in self.streams.write().await.drain() {
trace!(?sid, "shutting down Stream");
si.closed.store(true, Ordering::Relaxed);
}
debug!("waiting for prios to be flushed");
b2b_prios_flushed_r.await.unwrap();
debug!("closing all channels");
for ci in self.channels.write().await.drain(..) {
ci.b2r_read_shutdown.send(()).unwrap();
@ -401,9 +425,7 @@ impl BParticipant {
&self,
mut a2b_close_stream_r: mpsc::UnboundedReceiver<Sid>,
shutdown_stream_close_mgr_receiver: oneshot::Receiver<()>,
mut p2b_notify_empty_stream_s: Arc<
Mutex<std::sync::mpsc::Sender<(Pid, Sid, oneshot::Sender<()>)>>,
>,
p2b_notify_empty_stream_s: std::sync::mpsc::Sender<(Sid, oneshot::Sender<()>)>,
) {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start stream_close_mgr");
@ -436,9 +458,7 @@ impl BParticipant {
trace!(?sid, "wait for stream to be flushed");
let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) = oneshot::channel();
p2b_notify_empty_stream_s
.lock()
.unwrap()
.send((self.remote_pid, sid, s2b_stream_finished_closed_s))
.send((sid, s2b_stream_finished_closed_s))
.unwrap();
s2b_stream_finished_closed_r.await.unwrap();
@ -460,7 +480,7 @@ impl BParticipant {
sid: Sid,
prio: Prio,
promises: Promises,
a2p_msg_s: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
a2p_msg_s: std::sync::mpsc::Sender<(Prio, Sid, OutGoingMessage)>,
a2b_close_stream_s: &mpsc::UnboundedSender<Sid>,
) -> Stream {
let (b2a_msg_recv_s, b2a_msg_recv_r) = mpsc::unbounded::<InCommingMessage>();

View File

@ -7,7 +7,7 @@
use crate::{
message::OutGoingMessage,
types::{Frame, Pid, Prio, Sid},
types::{Frame, Prio, Sid},
};
use futures::channel::oneshot;
use std::{
@ -26,11 +26,11 @@ struct PidSidInfo {
pub(crate) struct PrioManager {
points: [u32; PRIO_MAX],
messages: [VecDeque<(Pid, Sid, OutGoingMessage)>; PRIO_MAX],
messages_rx: Receiver<(Prio, Pid, Sid, OutGoingMessage)>,
pid_sid_owned: HashMap<(Pid, Sid), PidSidInfo>,
messages: [VecDeque<(Sid, OutGoingMessage)>; PRIO_MAX],
messages_rx: Receiver<(Prio, Sid, OutGoingMessage)>,
sid_owned: HashMap<Sid, PidSidInfo>,
//you can register to be notified if a pid_sid combination is flushed completly here
pid_sid_flushed_rx: Receiver<(Pid, Sid, oneshot::Sender<()>)>,
sid_flushed_rx: Receiver<(Sid, oneshot::Sender<()>)>,
queued: HashSet<u8>,
}
@ -40,6 +40,12 @@ Der Priomanager hört auf gekillte PID, SIDs, und entweder returned sofort wenn
Evtl sollten wir auch den prioManger auf mehr Async umstellen. auch wenn der TICK selber syncron ist. mal schaun.
*/
/*
ERROR, okay wie hauen alles komplett um, PRIOS wird ein teildes BPARTICIPANT
Der BPARTICIPANT bekommt vom Scheduler seine throughput werte, und berichtet zurück
PRIOS wird ASYNC!
*/
impl PrioManager {
const FRAME_DATA_SIZE: u64 = 1400;
const PRIOS: [u32; PRIO_MAX] = [
@ -52,12 +58,12 @@ impl PrioManager {
pub fn new() -> (
Self,
Sender<(Prio, Pid, Sid, OutGoingMessage)>,
Sender<(Pid, Sid, oneshot::Sender<()>)>,
Sender<(Prio, Sid, OutGoingMessage)>,
Sender<(Sid, oneshot::Sender<()>)>,
) {
// (a2p_msg_s, a2p_msg_r)
let (messages_tx, messages_rx) = channel();
let (pid_sid_flushed_tx, pid_sid_flushed_rx) = channel();
let (sid_flushed_tx, sid_flushed_rx) = channel();
(
Self {
points: [0; PRIO_MAX],
@ -129,11 +135,11 @@ impl PrioManager {
],
messages_rx,
queued: HashSet::new(), //TODO: optimize with u64 and 64 bits
pid_sid_flushed_rx,
pid_sid_owned: HashMap::new(),
sid_flushed_rx,
sid_owned: HashMap::new(),
},
messages_tx,
pid_sid_flushed_tx,
sid_flushed_tx,
)
}
@ -141,31 +147,31 @@ impl PrioManager {
// Check Range
let mut times = 0;
let mut closed = 0;
for (prio, pid, sid, msg) in self.messages_rx.try_iter() {
for (prio, sid, msg) in self.messages_rx.try_iter() {
debug_assert!(prio as usize <= PRIO_MAX);
times += 1;
//trace!(?prio, ?sid, ?pid, "tick");
//trace!(?prio, ?sid, "tick");
self.queued.insert(prio);
self.messages[prio as usize].push_back((pid, sid, msg));
if let Some(cnt) = self.pid_sid_owned.get_mut(&(pid, sid)) {
self.messages[prio as usize].push_back((sid, msg));
if let Some(cnt) = self.sid_owned.get_mut(&sid) {
cnt.len += 1;
} else {
self.pid_sid_owned.insert((pid, sid), PidSidInfo {
self.sid_owned.insert(sid, PidSidInfo {
len: 1,
empty_notify: None,
});
}
}
//this must be AFTER messages
for (pid, sid, return_sender) in self.pid_sid_flushed_rx.try_iter() {
for (sid, return_sender) in self.sid_flushed_rx.try_iter() {
closed += 1;
if let Some(cnt) = self.pid_sid_owned.get_mut(&(pid, sid)) {
if let Some(cnt) = self.sid_owned.get_mut(&sid) {
// register sender
cnt.empty_notify = Some(return_sender);
} else {
// return immediately
futures::executor::block_on(async {
return_sender.send(());
return_sender.send(()).unwrap();
});
}
}
@ -193,9 +199,8 @@ impl PrioManager {
}
/// returns if msg is empty
fn tick_msg<E: Extend<(Pid, Sid, Frame)>>(
fn tick_msg<E: Extend<(Sid, Frame)>>(
msg: &mut OutGoingMessage,
msg_pid: Pid,
msg_sid: Sid,
frames: &mut E,
) -> bool {
@ -205,13 +210,13 @@ impl PrioManager {
);
if to_send > 0 {
if msg.cursor == 0 {
frames.extend(std::iter::once((msg_pid, msg_sid, Frame::DataHeader {
frames.extend(std::iter::once((msg_sid, Frame::DataHeader {
mid: msg.mid,
sid: msg.sid,
length: msg.buffer.data.len() as u64,
})));
}
frames.extend(std::iter::once((msg_pid, msg_sid, Frame::Data {
frames.extend(std::iter::once((msg_sid, Frame::Data {
mid: msg.mid,
start: msg.cursor,
data: msg.buffer.data[msg.cursor as usize..(msg.cursor + to_send) as usize]
@ -231,7 +236,7 @@ impl PrioManager {
/// high prio messages!
/// - if no_of_frames is too low you wont saturate your Socket fully, thus
/// have a lower bandwidth as possible
pub fn fill_frames<E: Extend<(Pid, Sid, Frame)>>(
pub async fn fill_frames<E: Extend<(Sid, Frame)>>(
&mut self,
no_of_frames: usize,
frames: &mut E,
@ -246,29 +251,26 @@ impl PrioManager {
// => messages with same prio get a fair chance :)
//TODO: evalaute not poping every time
match self.messages[prio as usize].pop_front() {
Some((pid, sid, mut msg)) => {
if Self::tick_msg(&mut msg, pid, sid, frames) {
Some((sid, mut msg)) => {
if Self::tick_msg(&mut msg, sid, frames) {
//debug!(?m.mid, "finish message");
//check if prio is empty
if self.messages[prio as usize].is_empty() {
self.queued.remove(&prio);
}
//decrease pid_sid counter by 1 again
let cnt = self.pid_sid_owned.get_mut(&(pid, sid)).expect(
let cnt = self.sid_owned.get_mut(&sid).expect(
"the pid_sid_owned counter works wrong, more pid,sid removed \
than inserted",
);
cnt.len -= 1;
if cnt.len == 0 {
let cnt = self.pid_sid_owned.remove(&(pid, sid)).unwrap();
cnt.empty_notify.map(|empty_notify| {
futures::executor::block_on(async {
empty_notify.send(());
})
});
let cnt = self.sid_owned.remove(&sid).unwrap();
cnt.empty_notify
.map(|empty_notify| empty_notify.send(()).unwrap());
}
} else {
self.messages[prio as usize].push_back((pid, sid, msg));
self.messages[prio as usize].push_back((sid, msg));
//trace!(?m.mid, "repush message");
}
},
@ -284,12 +286,6 @@ impl PrioManager {
}
}
}
/// if you want to make sure to empty the prio of a single pid and sid, use
/// this
pub(crate) fn contains_pid_sid(&self, pid: Pid, sid: Sid) -> bool {
self.pid_sid_owned.contains_key(&(pid, sid))
}
}
impl std::fmt::Debug for PrioManager {
@ -315,9 +311,9 @@ mod tests {
const SIZE: u64 = PrioManager::FRAME_DATA_SIZE;
const USIZE: usize = PrioManager::FRAME_DATA_SIZE as usize;
fn mock_out(prio: Prio, sid: u64) -> (Prio, Pid, Sid, OutGoingMessage) {
fn mock_out(prio: Prio, sid: u64) -> (Prio, Sid, OutGoingMessage) {
let sid = Sid::new(sid);
(prio, Pid::fake(0), sid, OutGoingMessage {
(prio, sid, OutGoingMessage {
buffer: Arc::new(MessageBuffer {
data: vec![48, 49, 50],
}),
@ -327,12 +323,12 @@ mod tests {
})
}
fn mock_out_large(prio: Prio, sid: u64) -> (Prio, Pid, Sid, OutGoingMessage) {
fn mock_out_large(prio: Prio, sid: u64) -> (Prio, Sid, OutGoingMessage) {
let sid = Sid::new(sid);
let mut data = vec![48; USIZE];
data.append(&mut vec![49; USIZE]);
data.append(&mut vec![50; 20]);
(prio, Pid::fake(0), sid, OutGoingMessage {
(prio, sid, OutGoingMessage {
buffer: Arc::new(MessageBuffer { data }),
cursor: 0,
mid: 1,
@ -340,7 +336,7 @@ mod tests {
})
}
fn assert_header(frames: &mut VecDeque<(Pid, Sid, Frame)>, f_sid: u64, f_length: u64) {
fn assert_header(frames: &mut VecDeque<(Sid, Frame)>, f_sid: u64, f_length: u64) {
let frame = frames
.pop_front()
.expect("frames vecdeque doesn't contain enough frames!")
@ -354,7 +350,7 @@ mod tests {
}
}
fn assert_data(frames: &mut VecDeque<(Pid, Sid, Frame)>, f_start: u64, f_data: Vec<u8>) {
fn assert_data(frames: &mut VecDeque<(Sid, Frame)>, f_start: u64, f_data: Vec<u8>) {
let frame = frames
.pop_front()
.expect("frames vecdeque doesn't contain enough frames!")

View File

@ -1,12 +1,10 @@
use crate::{
api::{Address, Participant},
channel::Handshake,
message::OutGoingMessage,
metrics::NetworkMetrics,
participant::BParticipant,
prios::PrioManager,
protocols::{Protocols, TcpProtocol, UdpProtocol},
types::{Cid, Frame, Pid, Prio, Sid},
types::{Cid, Pid, Prio, Sid},
};
use async_std::{
io, net,
@ -22,7 +20,7 @@ use futures::{
};
use prometheus::Registry;
use std::{
collections::{HashMap, HashSet, VecDeque},
collections::HashMap,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
@ -34,7 +32,6 @@ use tracing_futures::Instrument;
#[derive(Debug)]
struct ParticipantInfo {
s2b_create_channel_s: mpsc::UnboundedSender<(Cid, Sid, Protocols, oneshot::Sender<()>)>,
s2b_frame_s: mpsc::UnboundedSender<(Pid, Sid, Frame)>,
s2b_shutdown_bparticipant_s:
Option<oneshot::Sender<oneshot::Sender<async_std::io::Result<()>>>>,
}
@ -58,8 +55,6 @@ struct ControlChannels {
struct ParticipantChannels {
s2a_connected_s: mpsc::UnboundedSender<Participant>,
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, oneshot::Sender<async_std::io::Result<()>>)>,
a2p_msg_s: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
p2b_notify_empty_stream_s: std::sync::mpsc::Sender<(Pid, Sid, oneshot::Sender<()>)>,
}
#[derive(Debug)]
@ -72,7 +67,6 @@ pub struct Scheduler {
participants: Arc<RwLock<HashMap<Pid, ParticipantInfo>>>,
channel_ids: Arc<AtomicU64>,
channel_listener: RwLock<HashMap<Address, oneshot::Sender<()>>>,
prios: Arc<Mutex<PrioManager>>,
metrics: Arc<NetworkMetrics>,
}
@ -93,7 +87,6 @@ impl Scheduler {
mpsc::unbounded::<(Address, oneshot::Sender<io::Result<Participant>>)>();
let (s2a_connected_s, s2a_connected_r) = mpsc::unbounded::<Participant>();
let (a2s_scheduler_shutdown_s, a2s_scheduler_shutdown_r) = oneshot::channel::<()>();
let (prios, a2p_msg_s, p2b_notify_empty_stream_s) = PrioManager::new();
let (a2s_disconnect_s, a2s_disconnect_r) =
mpsc::unbounded::<(Pid, oneshot::Sender<async_std::io::Result<()>>)>();
@ -107,8 +100,6 @@ impl Scheduler {
let participant_channels = ParticipantChannels {
s2a_connected_s,
a2s_disconnect_s,
a2p_msg_s,
p2b_notify_empty_stream_s,
};
let metrics = Arc::new(NetworkMetrics::new(&local_pid).unwrap());
@ -126,7 +117,6 @@ impl Scheduler {
participants: Arc::new(RwLock::new(HashMap::new())),
channel_ids: Arc::new(AtomicU64::new(0)),
channel_listener: RwLock::new(HashMap::new()),
prios: Arc::new(Mutex::new(prios)),
metrics,
},
a2s_listen_s,
@ -143,7 +133,6 @@ impl Scheduler {
self.listen_mgr(run_channels.a2s_listen_r),
self.connect_mgr(run_channels.a2s_connect_r),
self.disconnect_mgr(run_channels.a2s_disconnect_r),
self.send_outgoing_mgr(),
self.scheduler_shutdown_mgr(run_channels.a2s_scheduler_shutdown_r),
);
}
@ -259,7 +248,7 @@ impl Scheduler {
// 3. Participant will try to access the BParticipant senders and receivers with
// their next api action, it will fail and be closed then.
let (finished_sender, finished_receiver) = oneshot::channel();
if let Some(pi) = self.participants.write().await.get_mut(&pid) {
if let Some(mut pi) = self.participants.write().await.remove(&pid) {
pi.s2b_shutdown_bparticipant_s
.take()
.unwrap()
@ -267,49 +256,11 @@ impl Scheduler {
.unwrap();
}
let e = finished_receiver.await.unwrap();
//only remove after flush!
self.participants.write().await.remove(&pid).unwrap();
return_once_successfull_shutdown.send(e);
return_once_successfull_shutdown.send(e).unwrap();
}
trace!("stop disconnect_mgr");
}
async fn send_outgoing_mgr(&self) {
//This time equals the MINIMUM Latency in average, so keep it down and //Todo:
// make it configureable or switch to await E.g. Prio 0 = await, prio 50
// wait for more messages
const TICK_TIME: std::time::Duration = std::time::Duration::from_millis(10);
const FRAMES_PER_TICK: usize = 1000005;
trace!("start send_outgoing_mgr");
while !self.closed.load(Ordering::Relaxed) {
let mut frames = VecDeque::new();
self.prios
.lock()
.await
.fill_frames(FRAMES_PER_TICK, &mut frames);
if frames.len() > 0 {
trace!("tick {}", frames.len());
}
let mut already_traced = HashSet::new();
for (pid, sid, frame) in frames {
if let Some(pi) = self.participants.write().await.get_mut(&pid) {
pi.s2b_frame_s.send((pid, sid, frame)).await.unwrap();
} else {
if !already_traced.contains(&(pid, sid)) {
error!(
?pid,
?sid,
"dropping frames, as participant no longer exists!"
);
already_traced.insert((pid, sid));
}
}
}
async_std::task::sleep(TICK_TIME).await;
}
trace!("stop send_outgoing_mgr");
}
async fn scheduler_shutdown_mgr(&self, a2s_scheduler_shutdown_r: oneshot::Receiver<()>) {
trace!("start scheduler_shutdown_mgr");
a2s_scheduler_shutdown_r.await.unwrap();
@ -503,15 +454,8 @@ impl Scheduler {
a2b_steam_open_s,
b2a_stream_opened_r,
mut s2b_create_channel_s,
s2b_frame_s,
s2b_shutdown_bparticipant_s,
) = BParticipant::new(
pid,
sid,
metrics.clone(),
participant_channels.a2p_msg_s,
participant_channels.p2b_notify_empty_stream_s,
);
) = BParticipant::new(pid, sid, metrics.clone());
let participant = Participant::new(
local_pid,
@ -524,7 +468,6 @@ impl Scheduler {
metrics.participants_connected_total.inc();
participants.insert(pid, ParticipantInfo {
s2b_create_channel_s: s2b_create_channel_s.clone(),
s2b_frame_s,
s2b_shutdown_bparticipant_s: Some(s2b_shutdown_bparticipant_s),
});
pool.spawn_ok(