renamed all Channels to new naming scheme and fixing shutting down bparticipant and scheduler correctly. Introducing structs to keep Info in scheduler.rs and participant.rs

This commit is contained in:
Marcel Märtens 2020-05-15 14:29:17 +02:00
parent 007f5cabaa
commit bd69b2ae28
7 changed files with 586 additions and 413 deletions

View File

@ -75,7 +75,14 @@ fn main() {
.get_matches();
let trace = matches.value_of("trace").unwrap();
let filter = EnvFilter::from_default_env().add_directive(trace.parse().unwrap())/*
let filter = EnvFilter::from_default_env()
.add_directive(trace.parse().unwrap())
.add_directive("network_speed=debug".parse().unwrap())
.add_directive("veloren_network::participant=trace".parse().unwrap())
.add_directive("veloren_network::protocol=trace".parse().unwrap())
.add_directive("veloren_network::scheduler=trace".parse().unwrap())
.add_directive("veloren_network::api=trace".parse().unwrap())
/*
.add_directive("veloren_network::participant=debug".parse().unwrap()).add_directive("veloren_network::api=debug".parse().unwrap())*/;
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(Level::ERROR)
@ -165,6 +172,13 @@ fn client(address: Address) {
std::thread::sleep(std::time::Duration::from_millis(5000));
break;
}
}
debug!("closing client");
};
drop(s1);
std::thread::sleep(std::time::Duration::from_millis(5000));
info!("closing participant");
block_on(client.disconnect(p1)).unwrap();
std::thread::sleep(std::time::Duration::from_millis(75000));
info!("DROPPING! client");
drop(client);
std::thread::sleep(std::time::Duration::from_millis(75000));
}

View File

@ -9,7 +9,6 @@ use std::{
Arc,
},
thread,
time::Duration,
};
pub struct SimpleMetrics {
@ -49,10 +48,10 @@ impl SimpleMetrics {
//TODO: make this a job
self.handle = Some(thread::spawn(move || {
let server = tiny_http::Server::http(addr).unwrap();
const timeout: std::time::Duration = std::time::Duration::from_secs(1);
const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
debug!("starting tiny_http server to serve metrics");
while running2.load(Ordering::Relaxed) {
let request = match server.recv_timeout(timeout) {
let request = match server.recv_timeout(TIMEOUT) {
Ok(Some(rq)) => rq,
Ok(None) => continue,
Err(e) => { println!("error: {}", e); break }
@ -62,7 +61,10 @@ impl SimpleMetrics {
let mut buffer = vec![];
encoder.encode(&mf, &mut buffer).expect("Failed to encoder metrics text.");
let response = tiny_http::Response::from_string(String::from_utf8(buffer).expect("Failed to parse bytes as a string."));
request.respond(response);
match request.respond(response) {
Err(e) => error!(?e, "The metrics HTTP server had encountered and error with answering"),
_ => (),
}
}
debug!("stopping tiny_http server to serve metrics");
}));

View File

@ -1,7 +1,7 @@
use crate::{
message::{self, InCommingMessage, MessageBuffer, OutGoingMessage},
scheduler::Scheduler,
types::{Mid, Pid, Prio, Promises, Requestor::User, Sid},
types::{Mid, Pid, Prio, Promises, Sid},
};
use async_std::{io, sync::RwLock, task};
use futures::{
@ -14,7 +14,7 @@ use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
};
@ -40,10 +40,11 @@ pub enum Address {
pub struct Participant {
local_pid: Pid,
remote_pid: Pid,
stream_open_sender: RwLock<mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>>,
stream_opened_receiver: RwLock<mpsc::UnboundedReceiver<Stream>>,
a2b_steam_open_s: RwLock<mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>>,
b2a_stream_opened_r: RwLock<mpsc::UnboundedReceiver<Stream>>,
closed: AtomicBool,
disconnect_sender: Option<mpsc::UnboundedSender<Pid>>,
a2s_disconnect_s:
Option<mpsc::UnboundedSender<(Pid, oneshot::Sender<async_std::io::Result<()>>)>>,
}
/// `Streams` represents a channel to send `n` messages with a certain priority
@ -70,10 +71,10 @@ pub struct Stream {
mid: Mid,
prio: Prio,
promises: Promises,
msg_send_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
msg_recv_receiver: mpsc::UnboundedReceiver<InCommingMessage>,
a2b_msg_s: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
b2a_msg_recv_r: mpsc::UnboundedReceiver<InCommingMessage>,
closed: Arc<AtomicBool>,
shutdown_sender: Option<mpsc::UnboundedSender<Sid>>,
a2b_close_stream_s: Option<mpsc::UnboundedSender<Sid>>,
}
/// Error type thrown by [`Networks`](Network) methods
@ -162,17 +163,17 @@ impl Network {
/// [`ThreadPool`]: uvth::ThreadPool
pub fn new(participant_id: Pid, thread_pool: &ThreadPool, registry: Option<&Registry>) -> Self {
let p = participant_id;
debug!(?p, ?User, "starting Network");
debug!(?p, "starting Network");
let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) =
Scheduler::new(participant_id, registry);
thread_pool.execute(move || {
trace!(?p, ?User, "starting sheduler in own thread");
trace!(?p, "starting sheduler in own thread");
let _handle = task::block_on(
scheduler
.run()
.instrument(tracing::info_span!("scheduler", ?p)),
);
trace!(?p, ?User, "stopping sheduler and his own thread");
trace!(?p, "stopping sheduler and his own thread");
});
Self {
local_pid: participant_id,
@ -210,14 +211,14 @@ impl Network {
///
/// [`connected`]: Network::connected
pub async fn listen(&self, address: Address) -> Result<(), NetworkError> {
let (result_sender, result_receiver) = oneshot::channel::<async_std::io::Result<()>>();
debug!(?address, ?User, "listening on address");
let (s2a_result_s, s2a_result_r) = oneshot::channel::<async_std::io::Result<()>>();
debug!(?address, "listening on address");
self.listen_sender
.write()
.await
.send((address, result_sender))
.send((address, s2a_result_s))
.await?;
match result_receiver.await? {
match s2a_result_r.await? {
//waiting guarantees that we either listened sucessfully or get an error like port in
// use
Ok(()) => Ok(()),
@ -256,7 +257,7 @@ impl Network {
/// [`Addresses`]: crate::api::Address
pub async fn connect(&self, address: Address) -> Result<Arc<Participant>, NetworkError> {
let (pid_sender, pid_receiver) = oneshot::channel::<io::Result<Participant>>();
debug!(?address, ?User, "connect to address");
debug!(?address, "connect to address");
self.connect_sender
.write()
.await
@ -266,7 +267,6 @@ impl Network {
let pid = participant.remote_pid;
debug!(
?pid,
?User,
"received Participant id from remote and return to user"
);
let participant = Arc::new(participant);
@ -317,8 +317,10 @@ impl Network {
/// are not allowed to keep others. If you do so the [`Participant`]
/// can't be disconnected properly. If you no longer have the respective
/// [`Participant`], try using the [`participants`] method to get it.
///
/// This function will wait for all [`Streams`] to properly close, including
/// all messages to be send before closing.
/// all messages to be send before closing. If an error occurs with one
/// of the messavb
///
/// # Examples
/// ```rust
@ -349,12 +351,33 @@ impl Network {
self.participants.write().await.remove(&pid)?;
participant.closed.store(true, Ordering::Relaxed);
if Arc::try_unwrap(participant).is_err() {
warn!(
"you are disconnecting and still keeping a reference to this participant, this is \
a bad idea. Participant will only be dropped when you drop your last reference"
);
match Arc::try_unwrap(participant) {
Err(_) => {
warn!(
"you are disconnecting and still keeping a reference to this participant, \
this is a bad idea. Participant will only be dropped when you drop your last \
reference"
);
},
Ok(mut participant) => {
trace!("waiting now for participant to close");
let (finished_sender, finished_receiver) = oneshot::channel();
// we are deleting here asyncly before DROP is called. Because this is done
// nativly async, while drop needs an BLOCK! Drop will recognis
// that it has been delete here and don't try another double delete.
participant
.a2s_disconnect_s
.take()
.unwrap()
.send((pid, finished_sender))
.await
.expect("something is wrong in internal scheduler coding");
let res = finished_receiver.await.unwrap();
trace!("participant is now closed");
res?;
},
};
Ok(())
}
@ -370,17 +393,17 @@ impl Participant {
pub(crate) fn new(
local_pid: Pid,
remote_pid: Pid,
stream_open_sender: mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>,
stream_opened_receiver: mpsc::UnboundedReceiver<Stream>,
disconnect_sender: mpsc::UnboundedSender<Pid>,
a2b_steam_open_s: mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>,
b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, oneshot::Sender<async_std::io::Result<()>>)>,
) -> Self {
Self {
local_pid,
remote_pid,
stream_open_sender: RwLock::new(stream_open_sender),
stream_opened_receiver: RwLock::new(stream_opened_receiver),
a2b_steam_open_s: RwLock::new(a2b_steam_open_s),
b2a_stream_opened_r: RwLock::new(b2a_stream_opened_r),
closed: AtomicBool::new(false),
disconnect_sender: Some(disconnect_sender),
a2s_disconnect_s: Some(a2s_disconnect_s),
}
}
@ -422,29 +445,29 @@ impl Participant {
pub async fn open(&self, prio: u8, promises: Promises) -> Result<Stream, ParticipantError> {
//use this lock for now to make sure that only one open at a time is made,
// TODO: not sure if we can paralise that, check in future
let mut stream_open_sender = self.stream_open_sender.write().await;
let mut a2b_steam_open_s = self.a2b_steam_open_s.write().await;
if self.closed.load(Ordering::Relaxed) {
warn!(?self.remote_pid, "participant is closed but another open is tried on it");
return Err(ParticipantError::ParticipantClosed);
}
let (sid_sender, sid_receiver) = oneshot::channel();
if stream_open_sender
.send((prio, promises, sid_sender))
let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel();
if a2b_steam_open_s
.send((prio, promises, p2a_return_stream_s))
.await
.is_err()
{
debug!(?self.remote_pid, ?User, "stream_open_sender failed, closing participant");
debug!(?self.remote_pid, "stream_open_sender failed, closing participant");
self.closed.store(true, Ordering::Relaxed);
return Err(ParticipantError::ParticipantClosed);
}
match sid_receiver.await {
match p2a_return_stream_r.await {
Ok(stream) => {
let sid = stream.sid;
debug!(?sid, ?self.remote_pid, ?User, "opened stream");
debug!(?sid, ?self.remote_pid, "opened stream");
Ok(stream)
},
Err(_) => {
debug!(?self.remote_pid, ?User, "sid_receiver failed, closing participant");
debug!(?self.remote_pid, "p2a_return_stream_r failed, closing participant");
self.closed.store(true, Ordering::Relaxed);
Err(ParticipantError::ParticipantClosed)
},
@ -478,7 +501,7 @@ impl Participant {
pub async fn opened(&self) -> Result<Stream, ParticipantError> {
//use this lock for now to make sure that only one open at a time is made,
// TODO: not sure if we can paralise that, check in future
let mut stream_opened_receiver = self.stream_opened_receiver.write().await;
let mut stream_opened_receiver = self.b2a_stream_opened_r.write().await;
if self.closed.load(Ordering::Relaxed) {
warn!(?self.remote_pid, "participant is closed but another open is tried on it");
return Err(ParticipantError::ParticipantClosed);
@ -507,10 +530,10 @@ impl Stream {
sid: Sid,
prio: Prio,
promises: Promises,
msg_send_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
msg_recv_receiver: mpsc::UnboundedReceiver<InCommingMessage>,
a2b_msg_s: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
b2a_msg_recv_r: mpsc::UnboundedReceiver<InCommingMessage>,
closed: Arc<AtomicBool>,
shutdown_sender: mpsc::UnboundedSender<Sid>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
) -> Self {
Self {
pid,
@ -518,10 +541,10 @@ impl Stream {
mid: 0,
prio,
promises,
msg_send_sender,
msg_recv_receiver,
a2b_msg_s,
b2a_msg_recv_r,
closed,
shutdown_sender: Some(shutdown_sender),
a2b_close_stream_s: Some(a2b_close_stream_s),
}
}
@ -600,8 +623,8 @@ impl Stream {
if self.closed.load(Ordering::Relaxed) {
return Err(StreamError::StreamClosed);
}
debug!(?messagebuffer, ?User, "sending a message");
self.msg_send_sender
//debug!(?messagebuffer, "sending a message");
self.a2b_msg_s
.send((self.prio, self.pid, self.sid, OutGoingMessage {
buffer: messagebuffer,
cursor: 0,
@ -632,8 +655,8 @@ impl Stream {
pub async fn recv_raw(&mut self) -> Result<MessageBuffer, StreamError> {
//no need to access self.closed here, as when this stream is closed the Channel
// is closed which will trigger a None
let msg = self.msg_recv_receiver.next().await?;
info!(?msg, ?User, "delivering a message");
let msg = self.b2a_msg_recv_r.next().await?;
info!(?msg, "delivering a message");
Ok(msg.buffer)
}
}
@ -642,11 +665,20 @@ impl Drop for Network {
fn drop(&mut self) {
let pid = self.local_pid;
debug!(?pid, "shutting down Network");
debug!(
?pid,
"shutting down Participants of Network, while we still have metrics"
);
task::block_on(async {
self.participants.write().await.clear();
});
debug!(?pid, "shutting down Scheduler");
self.shutdown_sender
.take()
.unwrap()
.send(())
.expect("scheduler is closed, but nobody other should be able to close it");
debug!(?pid, "participants have shut down!");
}
}
@ -656,14 +688,41 @@ impl Drop for Participant {
// participant from network
let pid = self.remote_pid;
debug!(?pid, "shutting down Participant");
task::block_on(async {
self.disconnect_sender
.take()
.unwrap()
.send(self.remote_pid)
.await
.expect("something is wrong in internal scheduler coding")
});
match self.a2s_disconnect_s.take() {
None => debug!(
?pid,
"Participant has been shutdown cleanly, no further waiting is requiered!"
),
Some(mut a2s_disconnect_s) => {
debug!(
?pid,
"unclean shutdown detected, active waiting for client to be disconnected"
);
task::block_on(async {
let (finished_sender, finished_receiver) = oneshot::channel();
a2s_disconnect_s
.send((self.remote_pid, finished_sender))
.await
.expect("something is wrong in internal scheduler coding");
match finished_receiver.await {
Ok(Err(e)) => error!(
?pid,
?e,
"Error while dropping the participant, couldn't send all outgoing \
messages, dropping remaining"
),
Err(e) => warn!(
?e,
"//TODO i dont know why the finish doesnt work, i normally would \
expect to have sended a return message from the participant... \
ignoring to not caue a panic for now, please fix me"
),
_ => (),
};
});
},
}
debug!(?pid, "network dropped");
}
}
@ -674,12 +733,16 @@ impl Drop for Stream {
let sid = self.sid;
let pid = self.pid;
debug!(?pid, ?sid, "shutting down Stream");
if task::block_on(self.shutdown_sender.take().unwrap().send(self.sid)).is_err() {
if task::block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid)).is_err() {
warn!(
"Other side got already dropped, probably due to timing, other side will \
handle this gracefully"
);
};
} else {
let sid = self.sid;
let pid = self.pid;
debug!(?pid, ?sid, "not needed");
}
}
}

View File

@ -17,41 +17,47 @@ use futures::{
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex,
},
};
use tracing::*;
#[derive(Debug)]
struct ChannelInfo {
cid: Cid,
b2w_frame_s: mpsc::UnboundedSender<Frame>,
b2r_read_shutdown: oneshot::Sender<()>,
}
#[derive(Debug)]
struct StreamInfo {
prio: Prio,
promises: Promises,
b2a_msg_recv_s: mpsc::UnboundedSender<InCommingMessage>,
closed: Arc<AtomicBool>,
}
#[derive(Debug)]
struct ControlChannels {
stream_open_receiver: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender<Stream>)>,
stream_opened_sender: mpsc::UnboundedSender<Stream>,
create_channel_receiver: mpsc::UnboundedReceiver<(Cid, Sid, Protocols, oneshot::Sender<()>)>,
shutdown_api_receiver: mpsc::UnboundedReceiver<Sid>,
shutdown_api_sender: mpsc::UnboundedSender<Sid>,
send_outgoing: Arc<Mutex<std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>>>, //api
frame_send_receiver: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>, //scheduler
shutdown_receiver: oneshot::Receiver<()>, //own
stream_finished_request_sender: mpsc::UnboundedSender<(Pid, Sid, oneshot::Sender<()>)>,
a2b_steam_open_r: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender<Stream>)>,
b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
s2b_create_channel_r: mpsc::UnboundedReceiver<(Cid, Sid, Protocols, oneshot::Sender<()>)>,
a2b_close_stream_r: mpsc::UnboundedReceiver<Sid>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
a2p_msg_s: Arc<Mutex<std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>>>, //api stream
p2b_notify_empty_stream_s: Arc<Mutex<std::sync::mpsc::Sender<(Pid, Sid, oneshot::Sender<()>)>>>,
s2b_frame_r: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>, //scheduler
s2b_shutdown_bparticipant_r: oneshot::Receiver<oneshot::Sender<async_std::io::Result<()>>>, /* own */
}
#[derive(Debug)]
pub struct BParticipant {
remote_pid: Pid,
offset_sid: Sid,
channels: Arc<RwLock<Vec<(Cid, mpsc::UnboundedSender<Frame>)>>>,
streams: RwLock<
HashMap<
Sid,
(
Prio,
Promises,
mpsc::UnboundedSender<InCommingMessage>,
Arc<AtomicBool>,
),
>,
>,
channels: Arc<RwLock<Vec<ChannelInfo>>>,
streams: RwLock<HashMap<Sid, StreamInfo>>,
running_mgr: AtomicUsize,
run_channels: Option<ControlChannels>,
metrics: Arc<NetworkMetrics>,
}
@ -61,35 +67,35 @@ impl BParticipant {
remote_pid: Pid,
offset_sid: Sid,
metrics: Arc<NetworkMetrics>,
send_outgoing: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
stream_finished_request_sender: mpsc::UnboundedSender<(Pid, Sid, oneshot::Sender<()>)>,
a2p_msg_s: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
p2b_notify_empty_stream_s: std::sync::mpsc::Sender<(Pid, Sid, oneshot::Sender<()>)>,
) -> (
Self,
mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>,
mpsc::UnboundedReceiver<Stream>,
mpsc::UnboundedSender<(Cid, Sid, Protocols, oneshot::Sender<()>)>,
mpsc::UnboundedSender<(Pid, Sid, Frame)>,
oneshot::Sender<()>,
oneshot::Sender<oneshot::Sender<async_std::io::Result<()>>>,
) {
let (stream_open_sender, stream_open_receiver) =
let (a2b_steam_open_s, a2b_steam_open_r) =
mpsc::unbounded::<(Prio, Promises, oneshot::Sender<Stream>)>();
let (stream_opened_sender, stream_opened_receiver) = mpsc::unbounded::<Stream>();
let (shutdown_api_sender, shutdown_api_receiver) = mpsc::unbounded();
let (frame_send_sender, frame_send_receiver) = mpsc::unbounded::<(Pid, Sid, Frame)>();
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let (create_channel_sender, create_channel_receiver) =
let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded::<Stream>();
let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded();
let (s2b_frame_s, s2b_frame_r) = mpsc::unbounded::<(Pid, Sid, Frame)>();
let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel();
let (s2b_create_channel_s, s2b_create_channel_r) =
mpsc::unbounded::<(Cid, Sid, Protocols, oneshot::Sender<()>)>();
let run_channels = Some(ControlChannels {
stream_open_receiver,
stream_opened_sender,
create_channel_receiver,
shutdown_api_receiver,
shutdown_api_sender,
send_outgoing: Arc::new(Mutex::new(send_outgoing)),
frame_send_receiver,
shutdown_receiver,
stream_finished_request_sender,
a2b_steam_open_r,
b2a_stream_opened_s,
s2b_create_channel_r,
a2b_close_stream_r,
a2b_close_stream_s,
a2p_msg_s: Arc::new(Mutex::new(a2p_msg_s)),
p2b_notify_empty_stream_s: Arc::new(Mutex::new(p2b_notify_empty_stream_s)),
s2b_frame_r,
s2b_shutdown_bparticipant_r,
});
(
@ -98,55 +104,50 @@ impl BParticipant {
offset_sid,
channels: Arc::new(RwLock::new(vec![])),
streams: RwLock::new(HashMap::new()),
running_mgr: AtomicUsize::new(0),
run_channels,
metrics,
},
stream_open_sender,
stream_opened_receiver,
create_channel_sender,
frame_send_sender,
shutdown_sender,
a2b_steam_open_s,
b2a_stream_opened_r,
s2b_create_channel_s,
s2b_frame_s,
s2b_shutdown_bparticipant_s,
)
}
pub async fn run(mut self) {
//those managers that listen on api::Participant need an additional oneshot for
// shutdown scenario, those handled by scheduler will be closed by it.
let (shutdown_open_manager_sender, shutdown_open_manager_receiver) = oneshot::channel();
let (shutdown_stream_close_manager_sender, shutdown_stream_close_manager_receiver) =
let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel();
let (shutdown_stream_close_mgr_sender, shutdown_stream_close_mgr_receiver) =
oneshot::channel();
let (frame_from_wire_sender, frame_from_wire_receiver) = mpsc::unbounded::<(Cid, Frame)>();
let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<(Cid, Frame)>();
let run_channels = self.run_channels.take().unwrap();
futures::join!(
self.open_manager(
run_channels.stream_open_receiver,
run_channels.shutdown_api_sender.clone(),
run_channels.send_outgoing.clone(),
shutdown_open_manager_receiver,
self.open_mgr(
run_channels.a2b_steam_open_r,
run_channels.a2b_close_stream_s.clone(),
run_channels.a2p_msg_s.clone(),
shutdown_open_mgr_receiver,
),
self.handle_frames(
frame_from_wire_receiver,
run_channels.stream_opened_sender,
run_channels.shutdown_api_sender,
run_channels.send_outgoing.clone(),
self.handle_frames_mgr(
w2b_frames_r,
run_channels.b2a_stream_opened_s,
run_channels.a2b_close_stream_s,
run_channels.a2p_msg_s.clone(),
),
self.create_channel_manager(
run_channels.create_channel_receiver,
frame_from_wire_sender,
self.create_channel_mgr(run_channels.s2b_create_channel_r, w2b_frames_s,),
self.send_mgr(run_channels.s2b_frame_r),
self.stream_close_mgr(
run_channels.a2b_close_stream_r,
shutdown_stream_close_mgr_receiver,
run_channels.p2b_notify_empty_stream_s,
),
self.send_manager(run_channels.frame_send_receiver),
self.stream_close_manager(
run_channels.shutdown_api_receiver,
shutdown_stream_close_manager_receiver,
run_channels.stream_finished_request_sender,
),
self.shutdown_manager(
run_channels.shutdown_receiver,
vec!(
shutdown_open_manager_sender,
shutdown_stream_close_manager_sender
)
self.participant_shutdown_mgr(
run_channels.s2b_shutdown_bparticipant_r,
vec!(shutdown_open_mgr_sender, shutdown_stream_close_mgr_sender)
),
);
}
@ -154,35 +155,36 @@ impl BParticipant {
async fn send_frame(&self, frame: Frame) {
// find out ideal channel here
//TODO: just take first
if let Some((cid, channel)) = self.channels.write().await.get_mut(0) {
if let Some(ci) = self.channels.write().await.get_mut(0) {
self.metrics
.frames_out_total
.with_label_values(&[
&self.remote_pid.to_string(),
&cid.to_string(),
&ci.cid.to_string(),
frame.get_string(),
])
.inc();
channel.send(frame).await.unwrap();
ci.b2w_frame_s.send(frame).await.unwrap();
} else {
error!("participant has no channel to communicate on");
}
}
async fn handle_frames(
async fn handle_frames_mgr(
&self,
mut frame_from_wire_receiver: mpsc::UnboundedReceiver<(Cid, Frame)>,
mut stream_opened_sender: mpsc::UnboundedSender<Stream>,
shutdown_api_sender: mpsc::UnboundedSender<Sid>,
send_outgoing: Arc<Mutex<std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>>>,
mut w2b_frames_r: mpsc::UnboundedReceiver<(Cid, Frame)>,
mut b2a_stream_opened_s: mpsc::UnboundedSender<Stream>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
a2p_msg_s: Arc<Mutex<std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>>>,
) {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start handle_frames");
let send_outgoing = { send_outgoing.lock().unwrap().clone() };
let a2p_msg_s = { a2p_msg_s.lock().unwrap().clone() };
let mut messages = HashMap::new();
let pid_string = &self.remote_pid.to_string();
while let Some((cid, frame)) = frame_from_wire_receiver.next().await {
while let Some((cid, frame)) = w2b_frames_r.next().await {
let cid_string = cid.to_string();
trace!("handling frame");
//trace!("handling frame");
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, frame.get_string()])
@ -193,11 +195,11 @@ impl BParticipant {
prio,
promises,
} => {
let send_outgoing = send_outgoing.clone();
let a2p_msg_s = a2p_msg_s.clone();
let stream = self
.create_stream(sid, prio, promises, send_outgoing, &shutdown_api_sender)
.create_stream(sid, prio, promises, a2p_msg_s, &a2b_close_stream_s)
.await;
stream_opened_sender.send(stream).await.unwrap();
b2a_stream_opened_s.send(stream).await.unwrap();
trace!("opened frame from remote");
},
Frame::CloseStream { sid } => {
@ -206,12 +208,12 @@ impl BParticipant {
// However Stream.send() is not async and their receiver isn't dropped if Steam
// is dropped, so i need a way to notify the Stream that it's send messages will
// be dropped... from remote, notify local
if let Some((_, _, _, closed)) = self.streams.write().await.remove(&sid) {
if let Some(si) = self.streams.write().await.remove(&sid) {
self.metrics
.streams_closed_total
.with_label_values(&[&pid_string])
.inc();
closed.store(true, Ordering::Relaxed);
si.closed.store(true, Ordering::Relaxed);
} else {
error!(
"couldn't find stream to close, either this is a duplicate message, \
@ -241,12 +243,10 @@ impl BParticipant {
false
};
if finished {
debug!(?mid, "finished receiving message");
//debug!(?mid, "finished receiving message");
let imsg = messages.remove(&mid).unwrap();
if let Some((_, _, sender, _)) =
self.streams.write().await.get_mut(&imsg.sid)
{
sender.send(imsg).await.unwrap();
if let Some(si) = self.streams.write().await.get_mut(&imsg.sid) {
si.b2a_msg_recv_s.send(imsg).await.unwrap();
} else {
error!("dropping message as stream no longer seems to exist");
}
@ -256,81 +256,84 @@ impl BParticipant {
}
}
trace!("stop handle_frames");
self.running_mgr.fetch_sub(1, Ordering::Relaxed);
}
async fn create_channel_manager(
async fn create_channel_mgr(
&self,
channels_receiver: mpsc::UnboundedReceiver<(Cid, Sid, Protocols, oneshot::Sender<()>)>,
frame_from_wire_sender: mpsc::UnboundedSender<(Cid, Frame)>,
s2b_create_channel_r: mpsc::UnboundedReceiver<(Cid, Sid, Protocols, oneshot::Sender<()>)>,
w2b_frames_s: mpsc::UnboundedSender<(Cid, Frame)>,
) {
trace!("start channel_manager");
channels_receiver
.for_each_concurrent(None, |(cid, sid, protocol, sender)| {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start create_channel_mgr");
s2b_create_channel_r
.for_each_concurrent(None, |(cid, sid, protocol, b2s_create_channel_done_s)| {
// This channel is now configured, and we are running it in scope of the
// participant.
let frame_from_wire_sender = frame_from_wire_sender.clone();
let w2b_frames_s = w2b_frames_s.clone();
let channels = self.channels.clone();
async move {
let (channel, frame_to_wire_sender, shutdown_sender) =
let (channel, b2w_frame_s, b2r_read_shutdown) =
Channel::new(cid, self.remote_pid, self.metrics.clone());
channels.write().await.push((cid, frame_to_wire_sender));
sender.send(()).unwrap();
channels.write().await.push(ChannelInfo {
cid,
b2w_frame_s,
b2r_read_shutdown,
});
b2s_create_channel_done_s.send(()).unwrap();
self.metrics
.channels_connected_total
.with_label_values(&[&self.remote_pid.to_string()])
.inc();
channel.run(protocol, frame_from_wire_sender).await;
trace!(?cid, "running channel in participant");
channel.run(protocol, w2b_frames_s).await;
self.metrics
.channels_disconnected_total
.with_label_values(&[&self.remote_pid.to_string()])
.inc();
trace!(?cid, "channel got closed");
shutdown_sender.send(()).unwrap();
}
})
.await;
trace!("stop channel_manager");
trace!("stop create_channel_mgr");
self.running_mgr.fetch_sub(1, Ordering::Relaxed);
}
async fn send_manager(
&self,
mut frame_send_receiver: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>,
) {
trace!("start send_manager");
while let Some((_, _, frame)) = frame_send_receiver.next().await {
async fn send_mgr(&self, mut s2b_frame_r: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>) {
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start send_mgr");
while let Some((_, sid, frame)) = s2b_frame_r.next().await {
self.send_frame(frame).await;
}
trace!("stop send_manager");
trace!("stop send_mgr");
self.running_mgr.fetch_sub(1, Ordering::Relaxed);
}
async fn open_manager(
async fn open_mgr(
&self,
mut stream_open_receiver: mpsc::UnboundedReceiver<(
Prio,
Promises,
oneshot::Sender<Stream>,
)>,
shutdown_api_sender: mpsc::UnboundedSender<Sid>,
send_outgoing: Arc<Mutex<std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>>>,
shutdown_open_manager_receiver: oneshot::Receiver<()>,
mut a2b_steam_open_r: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender<Stream>)>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
a2p_msg_s: Arc<Mutex<std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>>>,
shutdown_open_mgr_receiver: oneshot::Receiver<()>,
) {
trace!("start open_manager");
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start open_mgr");
let send_outgoing = {
//fighting the borrow checker ;)
send_outgoing.lock().unwrap().clone()
a2p_msg_s.lock().unwrap().clone()
};
let mut stream_ids = self.offset_sid;
let mut shutdown_open_manager_receiver = shutdown_open_manager_receiver.fuse();
let mut shutdown_open_mgr_receiver = shutdown_open_mgr_receiver.fuse();
//from api or shutdown signal
while let Some((prio, promises, sender)) = select! {
next = stream_open_receiver.next().fuse() => next,
_ = shutdown_open_manager_receiver => None,
while let Some((prio, promises, p2a_return_stream)) = select! {
next = a2b_steam_open_r.next().fuse() => next,
_ = shutdown_open_mgr_receiver => None,
} {
debug!(?prio, ?promises, "got request to open a new steam");
let send_outgoing = send_outgoing.clone();
let sid = stream_ids;
let stream = self
.create_stream(sid, prio, promises, send_outgoing, &shutdown_api_sender)
.create_stream(sid, prio, promises, send_outgoing, &a2b_close_stream_s)
.await;
self.send_frame(Frame::OpenStream {
sid,
@ -338,78 +341,118 @@ impl BParticipant {
promises,
})
.await;
sender.send(stream).unwrap();
p2a_return_stream.send(stream).unwrap();
stream_ids += Sid::from(1);
}
trace!("stop open_manager");
trace!("stop open_mgr");
self.running_mgr.fetch_sub(1, Ordering::Relaxed);
}
async fn shutdown_manager(
/// when activated this function will drop the participant completly and
/// wait for everything to go right! Then return 1. Shutting down
/// Streams for API and End user! 2. Wait for all "prio queued" Messages
/// to be send. 3. Send Stream
async fn participant_shutdown_mgr(
&self,
shutdown_receiver: oneshot::Receiver<()>,
s2b_shutdown_bparticipant_r: oneshot::Receiver<oneshot::Sender<async_std::io::Result<()>>>,
mut to_shutdown: Vec<oneshot::Sender<()>>,
) {
trace!("start shutdown_manager");
shutdown_receiver.await.unwrap();
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start participant_shutdown_mgr");
let sender = s2b_shutdown_bparticipant_r.await.unwrap();
debug!("closing all managers");
for sender in to_shutdown.drain(..) {
if sender.send(()).is_err() {
debug!("manager seems to be closed already, weird, maybe a bug");
warn!("manager seems to be closed already, weird, maybe a bug");
};
}
debug!("closing all streams");
let mut streams = self.streams.write().await;
for (sid, (_, _, _, closing)) in streams.drain() {
for (sid, si) in streams.drain() {
trace!(?sid, "shutting down Stream");
closing.store(true, Ordering::Relaxed);
si.closed.store(true, Ordering::Relaxed);
}
debug!("closing all channels");
for ci in self.channels.write().await.drain(..) {
ci.b2r_read_shutdown.send(()).unwrap();
}
//Wait for other bparticipants mgr to close via AtomicUsize
const SLEEP_TIME: std::time::Duration = std::time::Duration::from_millis(5);
async_std::task::sleep(SLEEP_TIME).await;
let mut i: u32 = 1;
while self.running_mgr.load(Ordering::Relaxed) > 1 {
i += 1;
if i.rem_euclid(10) == 1 {
trace!(
"waiting for bparticipant mgr to shut down, remaining {}",
self.running_mgr.load(Ordering::Relaxed) - 1
);
}
async_std::task::sleep(SLEEP_TIME * i).await;
}
trace!("all bparticipant mgr (except me) are shut down now");
self.metrics.participants_disconnected_total.inc();
trace!("stop shutdown_manager");
sender.send(Ok(())).unwrap();
trace!("stop participant_shutdown_mgr");
self.running_mgr.fetch_sub(1, Ordering::Relaxed);
}
async fn stream_close_manager(
async fn stream_close_mgr(
&self,
mut shutdown_api_receiver: mpsc::UnboundedReceiver<Sid>,
shutdown_stream_close_manager_receiver: oneshot::Receiver<()>,
mut stream_finished_request_sender: mpsc::UnboundedSender<(Pid, Sid, oneshot::Sender<()>)>,
mut a2b_close_stream_r: mpsc::UnboundedReceiver<Sid>,
shutdown_stream_close_mgr_receiver: oneshot::Receiver<()>,
mut p2b_notify_empty_stream_s: Arc<
Mutex<std::sync::mpsc::Sender<(Pid, Sid, oneshot::Sender<()>)>>,
>,
) {
trace!("start stream_close_manager");
let mut shutdown_stream_close_manager_receiver =
shutdown_stream_close_manager_receiver.fuse();
self.running_mgr.fetch_add(1, Ordering::Relaxed);
trace!("start stream_close_mgr");
let mut shutdown_stream_close_mgr_receiver = shutdown_stream_close_mgr_receiver.fuse();
//from api or shutdown signal
while let Some(sid) = select! {
next = shutdown_api_receiver.next().fuse() => next,
_ = shutdown_stream_close_manager_receiver => None,
next = a2b_close_stream_r.next().fuse() => next,
_ = shutdown_stream_close_mgr_receiver => None,
} {
//TODO: make this concurrent!
//TODO: Performance, closing is slow!
trace!(?sid, "got request from api to close steam");
//TODO: wait here till the last prio was send!
//The error is, that the close msg as a control message is send directly, while
// messages are only send after a next prio tick. This means, we
// close it first, and then send the headers and data packages...
// ofc the other side then no longer finds the respective stream.
//however we need to find out when the last message of a stream is send. it
// would be usefull to get a snapshot here, like, this stream has send out to
// msgid n, while the prio only has send m. then sleep as long as n < m maybe...
debug!("IF YOU SEE THIS, FIND A PROPPER FIX FOR CLOSING STREAMS");
//This needs to first stop clients from sending any more.
//Then it will wait for all pending messages (in prio) to be send to the
// protocol After this happened the stream is closed
//Only after all messages are send to the prococol, we can send the CloseStream
// frame! If we would send it before, all followup messages couldn't
// be handled at the remote side.
let (sender, receiver) = oneshot::channel();
trace!(?sid, "wait for stream to be flushed");
stream_finished_request_sender
.send((self.remote_pid, sid, sender))
trace!(?sid, "stopping api to use this stream");
self.streams
.read()
.await
.get(&sid)
.unwrap()
.closed
.store(true, Ordering::Relaxed);
trace!(?sid, "wait for stream to be flushed");
let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) = oneshot::channel();
p2b_notify_empty_stream_s
.lock()
.unwrap()
.send((self.remote_pid, sid, s2b_stream_finished_closed_s))
.unwrap();
receiver.await.unwrap();
s2b_stream_finished_closed_r.await.unwrap();
trace!(?sid, "stream was successfully flushed");
self.metrics
.streams_closed_total
.with_label_values(&[&self.remote_pid.to_string()])
.inc();
//only now remove the Stream, that means we can still recv on it.
self.streams.write().await.remove(&sid);
//from local, notify remote
self.send_frame(Frame::CloseStream { sid }).await;
}
trace!("stop stream_close_manager");
trace!("stop stream_close_mgr");
self.running_mgr.fetch_sub(1, Ordering::Relaxed);
}
async fn create_stream(
@ -417,15 +460,17 @@ impl BParticipant {
sid: Sid,
prio: Prio,
promises: Promises,
send_outgoing: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
shutdown_api_sender: &mpsc::UnboundedSender<Sid>,
a2p_msg_s: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
a2b_close_stream_s: &mpsc::UnboundedSender<Sid>,
) -> Stream {
let (msg_recv_sender, msg_recv_receiver) = mpsc::unbounded::<InCommingMessage>();
let (b2a_msg_recv_s, b2a_msg_recv_r) = mpsc::unbounded::<InCommingMessage>();
let closed = Arc::new(AtomicBool::new(false));
self.streams
.write()
.await
.insert(sid, (prio, promises, msg_recv_sender, closed.clone()));
self.streams.write().await.insert(sid, StreamInfo {
prio,
promises,
b2a_msg_recv_s,
closed: closed.clone(),
});
self.metrics
.streams_opened_total
.with_label_values(&[&self.remote_pid.to_string()])
@ -435,10 +480,10 @@ impl BParticipant {
sid,
prio,
promises,
send_outgoing,
msg_recv_receiver,
a2p_msg_s,
b2a_msg_recv_r,
closed.clone(),
shutdown_api_sender.clone(),
a2b_close_stream_s.clone(),
)
}
}

View File

@ -9,6 +9,7 @@ use crate::{
message::OutGoingMessage,
types::{Frame, Pid, Prio, Sid},
};
use futures::channel::oneshot;
use std::{
collections::{HashMap, HashSet, VecDeque},
sync::mpsc::{channel, Receiver, Sender},
@ -18,14 +19,27 @@ use tracing::*;
const PRIO_MAX: usize = 64;
struct PidSidInfo {
len: u64,
empty_notify: Option<oneshot::Sender<()>>,
}
pub(crate) struct PrioManager {
points: [u32; PRIO_MAX],
messages: [VecDeque<(Pid, Sid, OutGoingMessage)>; PRIO_MAX],
messages_rx: Receiver<(Prio, Pid, Sid, OutGoingMessage)>,
pid_sid_owned: HashMap<(Pid, Sid), u64>,
pid_sid_owned: HashMap<(Pid, Sid), PidSidInfo>,
//you can register to be notified if a pid_sid combination is flushed completly here
pid_sid_flushed_rx: Receiver<(Pid, Sid, oneshot::Sender<()>)>,
queued: HashSet<u8>,
}
/*
ERROR Okay ich kann die frames und msg nicht counten, da api auf msg basis zöhlt und BParticipant auf frame basis.
Der Priomanager hört auf gekillte PID, SIDs, und entweder returned sofort wenn keine msg drinn ist, oder schreibt es in id_sid_owned und haut es dann raus
Evtl sollten wir auch den prioManger auf mehr Async umstellen. auch wenn der TICK selber syncron ist. mal schaun.
*/
impl PrioManager {
const FRAME_DATA_SIZE: u64 = 1400;
const PRIOS: [u32; PRIO_MAX] = [
@ -36,8 +50,14 @@ impl PrioManager {
310419, 356578, 409600, 470507, 540470, 620838,
];
pub fn new() -> (Self, Sender<(Prio, Pid, Sid, OutGoingMessage)>) {
pub fn new() -> (
Self,
Sender<(Prio, Pid, Sid, OutGoingMessage)>,
Sender<(Pid, Sid, oneshot::Sender<()>)>,
) {
// (a2p_msg_s, a2p_msg_r)
let (messages_tx, messages_rx) = channel();
let (pid_sid_flushed_tx, pid_sid_flushed_rx) = channel();
(
Self {
points: [0; PRIO_MAX],
@ -109,15 +129,18 @@ impl PrioManager {
],
messages_rx,
queued: HashSet::new(), //TODO: optimize with u64 and 64 bits
pid_sid_flushed_rx,
pid_sid_owned: HashMap::new(),
},
messages_tx,
pid_sid_flushed_tx,
)
}
fn tick(&mut self) {
// Check Range
let mut times = 0;
let mut closed = 0;
for (prio, pid, sid, msg) in self.messages_rx.try_iter() {
debug_assert!(prio as usize <= PRIO_MAX);
times += 1;
@ -125,13 +148,29 @@ impl PrioManager {
self.queued.insert(prio);
self.messages[prio as usize].push_back((pid, sid, msg));
if let Some(cnt) = self.pid_sid_owned.get_mut(&(pid, sid)) {
*cnt += 1;
cnt.len += 1;
} else {
self.pid_sid_owned.insert((pid, sid), 1);
self.pid_sid_owned.insert((pid, sid), PidSidInfo {
len: 1,
empty_notify: None,
});
}
}
if times > 0 {
trace!(?times, "tick");
//this must be AFTER messages
for (pid, sid, return_sender) in self.pid_sid_flushed_rx.try_iter() {
closed += 1;
if let Some(cnt) = self.pid_sid_owned.get_mut(&(pid, sid)) {
// register sender
cnt.empty_notify = Some(return_sender);
} else {
// return immediately
futures::executor::block_on(async {
return_sender.send(());
});
}
}
if times > 0 || closed > 0 {
trace!(?times, ?closed, "tick");
}
}
@ -219,9 +258,14 @@ impl PrioManager {
"the pid_sid_owned counter works wrong, more pid,sid removed \
than inserted",
);
*cnt -= 1;
if *cnt == 0 {
self.pid_sid_owned.remove(&(pid, sid));
cnt.len -= 1;
if cnt.len == 0 {
let cnt = self.pid_sid_owned.remove(&(pid, sid)).unwrap();
cnt.empty_notify.map(|empty_notify| {
futures::executor::block_on(async {
empty_notify.send(());
})
});
}
} else {
self.messages[prio as usize].push_back((pid, sid, msg));

View File

@ -22,7 +22,7 @@ use futures::{
};
use prometheus::Registry;
use std::{
collections::{HashMap, VecDeque},
collections::{HashMap, HashSet, VecDeque},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
@ -31,27 +31,35 @@ use std::{
use tracing::*;
use tracing_futures::Instrument;
type ParticipantInfo = (
mpsc::UnboundedSender<(Cid, Sid, Protocols, oneshot::Sender<()>)>,
mpsc::UnboundedSender<(Pid, Sid, Frame)>,
oneshot::Sender<()>,
);
#[derive(Debug)]
struct ParticipantInfo {
s2b_create_channel_s: mpsc::UnboundedSender<(Cid, Sid, Protocols, oneshot::Sender<()>)>,
s2b_frame_s: mpsc::UnboundedSender<(Pid, Sid, Frame)>,
s2b_shutdown_bparticipant_s:
Option<oneshot::Sender<oneshot::Sender<async_std::io::Result<()>>>>,
}
/// Naming of Channels `x2x`
/// - a: api
/// - s: scheduler
/// - b: bparticipant
/// - p: prios
/// - r: protocol
/// - w: wire
#[derive(Debug)]
struct ControlChannels {
listen_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<()>>)>,
connect_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<Participant>>)>,
shutdown_receiver: oneshot::Receiver<()>,
disconnect_receiver: mpsc::UnboundedReceiver<Pid>,
stream_finished_request_receiver: mpsc::UnboundedReceiver<(Pid, Sid, oneshot::Sender<()>)>,
a2s_listen_r: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<()>>)>,
a2s_connect_r: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<Participant>>)>,
a2s_scheduler_shutdown_r: oneshot::Receiver<()>,
a2s_disconnect_r: mpsc::UnboundedReceiver<(Pid, oneshot::Sender<async_std::io::Result<()>>)>,
}
#[derive(Debug, Clone)]
struct ParticipantChannels {
connected_sender: mpsc::UnboundedSender<Participant>,
disconnect_sender: mpsc::UnboundedSender<Pid>,
prios_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
stream_finished_request_sender: mpsc::UnboundedSender<(Pid, Sid, oneshot::Sender<()>)>,
s2a_connected_s: mpsc::UnboundedSender<Participant>,
a2s_disconnect_s: mpsc::UnboundedSender<(Pid, oneshot::Sender<async_std::io::Result<()>>)>,
a2p_msg_s: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
p2b_notify_empty_stream_s: std::sync::mpsc::Sender<(Pid, Sid, oneshot::Sender<()>)>,
}
#[derive(Debug)]
@ -60,7 +68,7 @@ pub struct Scheduler {
closed: AtomicBool,
pool: Arc<ThreadPool>,
run_channels: Option<ControlChannels>,
participant_channels: ParticipantChannels,
participant_channels: Arc<Mutex<Option<ParticipantChannels>>>,
participants: Arc<RwLock<HashMap<Pid, ParticipantInfo>>>,
channel_ids: Arc<AtomicU64>,
channel_listener: RwLock<HashMap<Address, oneshot::Sender<()>>>,
@ -79,29 +87,28 @@ impl Scheduler {
mpsc::UnboundedReceiver<Participant>,
oneshot::Sender<()>,
) {
let (listen_sender, listen_receiver) =
let (a2s_listen_s, a2s_listen_r) =
mpsc::unbounded::<(Address, oneshot::Sender<io::Result<()>>)>();
let (connect_sender, connect_receiver) =
let (a2s_connect_s, a2s_connect_r) =
mpsc::unbounded::<(Address, oneshot::Sender<io::Result<Participant>>)>();
let (connected_sender, connected_receiver) = mpsc::unbounded::<Participant>();
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let (prios, prios_sender) = PrioManager::new();
let (disconnect_sender, disconnect_receiver) = mpsc::unbounded::<Pid>();
let (stream_finished_request_sender, stream_finished_request_receiver) = mpsc::unbounded();
let (s2a_connected_s, s2a_connected_r) = mpsc::unbounded::<Participant>();
let (a2s_scheduler_shutdown_s, a2s_scheduler_shutdown_r) = oneshot::channel::<()>();
let (prios, a2p_msg_s, p2b_notify_empty_stream_s) = PrioManager::new();
let (a2s_disconnect_s, a2s_disconnect_r) =
mpsc::unbounded::<(Pid, oneshot::Sender<async_std::io::Result<()>>)>();
let run_channels = Some(ControlChannels {
listen_receiver,
connect_receiver,
shutdown_receiver,
disconnect_receiver,
stream_finished_request_receiver,
a2s_listen_r,
a2s_connect_r,
a2s_scheduler_shutdown_r,
a2s_disconnect_r,
});
let participant_channels = ParticipantChannels {
disconnect_sender,
stream_finished_request_sender,
connected_sender,
prios_sender,
s2a_connected_s,
a2s_disconnect_s,
a2p_msg_s,
p2b_notify_empty_stream_s,
};
let metrics = Arc::new(NetworkMetrics::new(&local_pid).unwrap());
@ -115,17 +122,17 @@ impl Scheduler {
closed: AtomicBool::new(false),
pool: Arc::new(ThreadPool::new().unwrap()),
run_channels,
participant_channels,
participant_channels: Arc::new(Mutex::new(Some(participant_channels))),
participants: Arc::new(RwLock::new(HashMap::new())),
channel_ids: Arc::new(AtomicU64::new(0)),
channel_listener: RwLock::new(HashMap::new()),
prios: Arc::new(Mutex::new(prios)),
metrics,
},
listen_sender,
connect_sender,
connected_receiver,
shutdown_sender,
a2s_listen_s,
a2s_connect_s,
s2a_connected_r,
a2s_scheduler_shutdown_s,
)
}
@ -133,22 +140,21 @@ impl Scheduler {
let run_channels = self.run_channels.take().unwrap();
futures::join!(
self.listen_manager(run_channels.listen_receiver),
self.connect_manager(run_channels.connect_receiver),
self.disconnect_manager(run_channels.disconnect_receiver),
self.send_outgoing(),
self.stream_finished_manager(run_channels.stream_finished_request_receiver),
self.shutdown_manager(run_channels.shutdown_receiver),
self.listen_mgr(run_channels.a2s_listen_r),
self.connect_mgr(run_channels.a2s_connect_r),
self.disconnect_mgr(run_channels.a2s_disconnect_r),
self.send_outgoing_mgr(),
self.scheduler_shutdown_mgr(run_channels.a2s_scheduler_shutdown_r),
);
}
async fn listen_manager(
async fn listen_mgr(
&self,
listen_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<()>>)>,
a2s_listen_r: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<()>>)>,
) {
trace!("start listen_manager");
listen_receiver
.for_each_concurrent(None, |(address, result_sender)| {
trace!("start listen_mgr");
a2s_listen_r
.for_each_concurrent(None, |(address, s2a_result_s)| {
let address = address.clone();
async move {
@ -166,23 +172,23 @@ impl Scheduler {
.write()
.await
.insert(address.clone(), end_sender);
self.channel_creator(address, end_receiver, result_sender)
self.channel_creator(address, end_receiver, s2a_result_s)
.await;
}
})
.await;
trace!("stop listen_manager");
trace!("stop listen_mgr");
}
async fn connect_manager(
async fn connect_mgr(
&self,
mut connect_receiver: mpsc::UnboundedReceiver<(
mut a2s_connect_r: mpsc::UnboundedReceiver<(
Address,
oneshot::Sender<io::Result<Participant>>,
)>,
) {
trace!("start connect_manager");
while let Some((addr, pid_sender)) = connect_receiver.next().await {
trace!("start connect_mgr");
while let Some((addr, pid_sender)) = a2s_connect_r.next().await {
let (protocol, handshake) = match addr {
Address::Tcp(addr) => {
self.metrics
@ -235,117 +241,126 @@ impl Scheduler {
self.init_protocol(protocol, Some(pid_sender), handshake)
.await;
}
trace!("stop connect_manager");
trace!("stop connect_mgr");
}
async fn disconnect_manager(&self, mut disconnect_receiver: mpsc::UnboundedReceiver<Pid>) {
trace!("start disconnect_manager");
while let Some(pid) = disconnect_receiver.next().await {
async fn disconnect_mgr(
&self,
mut a2s_disconnect_r: mpsc::UnboundedReceiver<(
Pid,
oneshot::Sender<async_std::io::Result<()>>,
)>,
) {
trace!("start disconnect_mgr");
while let Some((pid, return_once_successfull_shutdown)) = a2s_disconnect_r.next().await {
//Closing Participants is done the following way:
// 1. We drop our senders and receivers
// 2. we need to close BParticipant, this will drop its senderns and receivers
// 3. Participant will try to access the BParticipant senders and receivers with
// their next api action, it will fail and be closed then.
if let Some((_, _, sender)) = self.participants.write().await.remove(&pid) {
sender.send(()).unwrap();
let (finished_sender, finished_receiver) = oneshot::channel();
if let Some(pi) = self.participants.write().await.get_mut(&pid) {
pi.s2b_shutdown_bparticipant_s
.take()
.unwrap()
.send(finished_sender)
.unwrap();
}
let e = finished_receiver.await.unwrap();
//only remove after flush!
self.participants.write().await.remove(&pid).unwrap();
return_once_successfull_shutdown.send(e);
}
trace!("stop disconnect_manager");
trace!("stop disconnect_mgr");
}
async fn send_outgoing(&self) {
async fn send_outgoing_mgr(&self) {
//This time equals the MINIMUM Latency in average, so keep it down and //Todo:
// make it configureable or switch to await E.g. Prio 0 = await, prio 50
// wait for more messages
const TICK_TIME: std::time::Duration = std::time::Duration::from_millis(10);
const FRAMES_PER_TICK: usize = 1000000;
trace!("start send_outgoing");
const FRAMES_PER_TICK: usize = 1000005;
trace!("start send_outgoing_mgr");
while !self.closed.load(Ordering::Relaxed) {
let mut frames = VecDeque::new();
self.prios
.lock()
.await
.fill_frames(FRAMES_PER_TICK, &mut frames);
if frames.len() > 0 {
trace!("tick {}", frames.len());
}
let mut already_traced = HashSet::new();
for (pid, sid, frame) in frames {
if let Some((_, sender, _)) = self.participants.write().await.get_mut(&pid) {
sender.send((pid, sid, frame)).await.unwrap();
if let Some(pi) = self.participants.write().await.get_mut(&pid) {
pi.s2b_frame_s.send((pid, sid, frame)).await.unwrap();
} else {
if !already_traced.contains(&(pid, sid)) {
error!(
?pid,
?sid,
"dropping frames, as participant no longer exists!"
);
already_traced.insert((pid, sid));
}
}
}
async_std::task::sleep(TICK_TIME).await;
}
trace!("stop send_outgoing");
trace!("stop send_outgoing_mgr");
}
// requested by participant when stream wants to close from api, checking if no
// more msg is in prio and return
pub(crate) async fn stream_finished_manager(
&self,
stream_finished_request_receiver: mpsc::UnboundedReceiver<(Pid, Sid, oneshot::Sender<()>)>,
) {
trace!("start stream_finished_manager");
stream_finished_request_receiver
.for_each_concurrent(None, async move |(pid, sid, sender)| {
//TODO: THERE MUST BE A MORE CLEVER METHOD THAN SPIN LOCKING! LIKE REGISTERING
// DIRECTLY IN PRIO AS A FUTURE WERE PRIO IS WAKER! TODO: also this
// has a great potential for handing network, if you create a network, send
// gigabytes close it then. Also i need a Mutex, which really adds
// to cost if alot strems want to close
self.stream_finished_waiter(pid, sid, sender).await;
})
.await;
}
async fn stream_finished_waiter(&self, pid: Pid, sid: Sid, sender: oneshot::Sender<()>) {
const TICK_TIME: std::time::Duration = std::time::Duration::from_millis(5);
//TODO: ARRRG, i need to wait for AT LEAST 1 TICK, because i am lazy i just
// wait 15mn and tick count is 10ms because recv is only done with a
// tick and not async as soon as we send....
async_std::task::sleep(TICK_TIME * 3).await;
let mut n = 0u64;
loop {
if !self.prios.lock().await.contains_pid_sid(pid, sid) {
trace!("prio is clear, go to close stream as requested from api");
sender.send(()).unwrap();
break;
}
n += 1;
async_std::task::sleep(match n {
0..=199 => TICK_TIME,
n if n.rem_euclid(100) == 0 => {
warn!(?pid, ?sid, ?n, "cant close stream, as it still queued");
TICK_TIME * (n as f32 * (n as f32).sqrt() / 100.0) as u32
},
n => TICK_TIME * (n as f32 * (n as f32).sqrt() / 100.0) as u32,
})
.await;
}
}
pub(crate) async fn shutdown_manager(&self, receiver: oneshot::Receiver<()>) {
trace!("start shutdown_manager");
receiver.await.unwrap();
async fn scheduler_shutdown_mgr(&self, a2s_scheduler_shutdown_r: oneshot::Receiver<()>) {
trace!("start scheduler_shutdown_mgr");
a2s_scheduler_shutdown_r.await.unwrap();
self.closed.store(true, Ordering::Relaxed);
debug!("shutting down all BParticipants gracefully");
let mut participants = self.participants.write().await;
for (pid, (_, _, sender)) in participants.drain() {
let mut waitings = vec![];
//close participants but don't remove them from self.participants yet
for (pid, pi) in participants.iter_mut() {
trace!(?pid, "shutting down BParticipants");
sender.send(()).unwrap();
let (finished_sender, finished_receiver) = oneshot::channel();
waitings.push((pid, finished_receiver));
pi.s2b_shutdown_bparticipant_s
.take()
.unwrap()
.send(finished_sender)
.unwrap();
}
trace!("stop shutdown_manager");
debug!("wait for partiticipants to be shut down");
for (pid, recv) in waitings {
match recv.await {
Err(e) => error!(
?pid,
?e,
"failed to finish sending all remainding messages to participant when \
shutting down"
),
_ => (),
};
}
//remove participants once everything is shut down
participants.clear();
//removing the possibility to create new participants, needed to close down
// some mgr:
self.participant_channels.lock().await.take();
trace!("stop scheduler_shutdown_mgr");
}
pub(crate) async fn channel_creator(
async fn channel_creator(
&self,
addr: Address,
end_receiver: oneshot::Receiver<()>,
result_sender: oneshot::Sender<io::Result<()>>,
s2s_stop_listening_r: oneshot::Receiver<()>,
s2a_listen_result_s: oneshot::Sender<io::Result<()>>,
) {
trace!(?addr, "start up channel creator");
match addr {
Address::Tcp(addr) => {
let listener = match net::TcpListener::bind(addr).await {
Ok(listener) => {
result_sender.send(Ok(())).unwrap();
s2a_listen_result_s.send(Ok(())).unwrap();
listener
},
Err(e) => {
@ -354,13 +369,13 @@ impl Scheduler {
?e,
"listener couldn't be started due to error on tcp bind"
);
result_sender.send(Err(e)).unwrap();
s2a_listen_result_s.send(Err(e)).unwrap();
return;
},
};
trace!(?addr, "listener bound");
let mut incoming = listener.incoming();
let mut end_receiver = end_receiver.fuse();
let mut end_receiver = s2s_stop_listening_r.fuse();
while let Some(stream) = select! {
next = incoming.next().fuse() => next,
_ = end_receiver => None,
@ -378,7 +393,7 @@ impl Scheduler {
Address::Udp(addr) => {
let socket = match net::UdpSocket::bind(addr).await {
Ok(socket) => {
result_sender.send(Ok(())).unwrap();
s2a_listen_result_s.send(Ok(())).unwrap();
Arc::new(socket)
},
Err(e) => {
@ -387,7 +402,7 @@ impl Scheduler {
?e,
"listener couldn't be started due to error on udp bind"
);
result_sender.send(Err(e)).unwrap();
s2a_listen_result_s.send(Err(e)).unwrap();
return;
},
};
@ -395,7 +410,7 @@ impl Scheduler {
// receiving is done from here and will be piped to protocol as UDP does not
// have any state
let mut listeners = HashMap::new();
let mut end_receiver = end_receiver.fuse();
let mut end_receiver = s2s_stop_listening_r.fuse();
let mut data = [0u8; 9216];
while let Ok((size, remote_addr)) = select! {
next = socket.recv_from(&mut data).fuse() => next,
@ -424,9 +439,9 @@ impl Scheduler {
trace!(?addr, "ending channel creator");
}
pub(crate) async fn udp_single_channel_connect(
async fn udp_single_channel_connect(
socket: Arc<net::UdpSocket>,
mut udp_data_sender: mpsc::UnboundedSender<Vec<u8>>,
mut w2p_udp_package_s: mpsc::UnboundedSender<Vec<u8>>,
) {
let addr = socket.local_addr();
trace!(?addr, "start udp_single_channel_connect");
@ -443,7 +458,7 @@ impl Scheduler {
} {
let mut datavec = Vec::with_capacity(size);
datavec.extend_from_slice(&data[0..size]);
udp_data_sender.send(datavec).await.unwrap();
w2p_udp_package_s.send(datavec).await.unwrap();
}
trace!(?addr, "stop udp_single_channel_connect");
}
@ -451,7 +466,7 @@ impl Scheduler {
async fn init_protocol(
&self,
protocol: Protocols,
pid_sender: Option<oneshot::Sender<io::Result<Participant>>>,
s2a_return_pid_s: Option<oneshot::Sender<io::Result<Participant>>>,
send_handshake: bool,
) {
//channels are unknown till PID is known!
@ -460,7 +475,7 @@ impl Scheduler {
Contra: - DOS posibility because we answer fist
- Speed, because otherwise the message can be send with the creation
*/
let mut participant_channels = self.participant_channels.clone();
let mut participant_channels = self.participant_channels.lock().await.clone().unwrap();
// spawn is needed here, e.g. for TCP connect it would mean that only 1
// participant can be in handshake phase ever! Someone could deadlock
// the whole server easily for new clients UDP doesnt work at all, as
@ -485,55 +500,53 @@ impl Scheduler {
debug!(?cid, "new participant connected via a channel");
let (
bparticipant,
stream_open_sender,
stream_opened_receiver,
mut create_channel_sender,
frame_send_sender,
shutdown_sender,
a2b_steam_open_s,
b2a_stream_opened_r,
mut s2b_create_channel_s,
s2b_frame_s,
s2b_shutdown_bparticipant_s,
) = BParticipant::new(
pid,
sid,
metrics.clone(),
participant_channels.prios_sender,
participant_channels.stream_finished_request_sender,
participant_channels.a2p_msg_s,
participant_channels.p2b_notify_empty_stream_s,
);
let participant = Participant::new(
local_pid,
pid,
stream_open_sender,
stream_opened_receiver,
participant_channels.disconnect_sender,
a2b_steam_open_s,
b2a_stream_opened_r,
participant_channels.a2s_disconnect_s,
);
metrics.participants_connected_total.inc();
participants.insert(
pid,
(
create_channel_sender.clone(),
frame_send_sender,
shutdown_sender,
),
);
participants.insert(pid, ParticipantInfo {
s2b_create_channel_s: s2b_create_channel_s.clone(),
s2b_frame_s,
s2b_shutdown_bparticipant_s: Some(s2b_shutdown_bparticipant_s),
});
pool.spawn_ok(
bparticipant
.run()
.instrument(tracing::info_span!("participant", ?pid)),
);
//create a new channel within BParticipant and wait for it to run
let (sync_sender, sync_receiver) = oneshot::channel();
create_channel_sender
.send((cid, sid, protocol, sync_sender))
let (b2s_create_channel_done_s, b2s_create_channel_done_r) =
oneshot::channel();
s2b_create_channel_s
.send((cid, sid, protocol, b2s_create_channel_done_s))
.await
.unwrap();
sync_receiver.await.unwrap();
if let Some(pid_oneshot) = pid_sender {
b2s_create_channel_done_r.await.unwrap();
if let Some(pid_oneshot) = s2a_return_pid_s {
// someone is waiting with connect, so give them their PID
pid_oneshot.send(Ok(participant)).unwrap();
} else {
// noone is waiting on this Participant, return in to Network
participant_channels
.connected_sender
.s2a_connected_s
.send(participant)
.await
.unwrap();

View File

@ -118,14 +118,6 @@ impl Frame {
}
}
#[derive(Debug)]
pub(crate) enum Requestor {
User,
Api,
Scheduler,
Remote,
}
impl Pid {
/// create a new Pid with a random interior value
///