Experiments with a prometheus bug which actually worked as designed because i had client and server running at the same time

- https://github.com/tikv/rust-prometheus/issues/321
 - split up channel into a hanshake part and channel part.
   The handshake part is non endless and ends when its either done or aborted.
   If its okay i will send a request to the BParticipant which then opens a channel on the existing TCP or UDP connection.
   this streamlines the command chain alot. also the channel is almost empty now, thinking about removing it completly.
   isnt perfect, as shutdown and udp doesnt work yet
 - make PID to print as Base64
 - replace rouille with tiny_http
This commit is contained in:
Marcel Märtens 2020-05-04 15:27:58 +02:00
parent 9074de533a
commit a8f1bc178a
10 changed files with 698 additions and 664 deletions

View File

@ -17,5 +17,5 @@ tracing = "0.1"
tracing-subscriber = "0.2.3"
bincode = "1.2"
prometheus = "0.7"
rouille = "3.0.0"
tiny_http = "0.7.0"
serde = { version = "1.0", features = ["derive"] }

View File

@ -90,16 +90,15 @@ fn main() {
_ => panic!("invalid mode, run --help!"),
};
let mut m = metrics::SimpleMetrics::new();
let mut background = None;
match matches.value_of("mode") {
Some("server") => server(address),
Some("client") => client(address, &mut m),
Some("client") => client(address),
Some("both") => {
let address1 = address.clone();
background = Some(thread::spawn(|| server(address1)));
thread::sleep(Duration::from_millis(200)); //start client after server
client(address, &mut m);
client(address);
},
_ => panic!("invalid mode, run --help!"),
};
@ -110,7 +109,9 @@ fn main() {
fn server(address: Address) {
let thread_pool = ThreadPoolBuilder::new().build();
let server = Network::new(Pid::new(), &thread_pool, None);
let mut metrics = metrics::SimpleMetrics::new();
let server = Network::new(Pid::new(), &thread_pool, Some(metrics.registry()));
metrics.run("0.0.0.0:59112".parse().unwrap()).unwrap();
block_on(server.listen(address)).unwrap();
loop {
@ -134,8 +135,9 @@ fn server(address: Address) {
}
}
fn client(address: Address, metrics: &mut metrics::SimpleMetrics) {
fn client(address: Address) {
let thread_pool = ThreadPoolBuilder::new().build();
let mut metrics = metrics::SimpleMetrics::new();
let client = Network::new(Pid::new(), &thread_pool, Some(metrics.registry()));
metrics.run("0.0.0.0:59111".parse().unwrap()).unwrap();
@ -160,7 +162,7 @@ fn client(address: Address, metrics: &mut metrics::SimpleMetrics) {
}
if id > 2000000 {
println!("stop");
std::thread::sleep(std::time::Duration::from_millis(50));
std::thread::sleep(std::time::Duration::from_millis(5000));
break;
}
}

View File

@ -1,15 +1,15 @@
use prometheus::{Encoder, Gauge, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder};
use rouille::{router, Server};
use prometheus::{Encoder, Registry, TextEncoder};
use tiny_http;
use tracing::*;
use std::{
convert::TryInto,
error::Error,
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::{Duration, SystemTime, UNIX_EPOCH},
time::Duration,
};
pub struct SimpleMetrics {
@ -48,24 +48,23 @@ impl SimpleMetrics {
//TODO: make this a job
self.handle = Some(thread::spawn(move || {
let server = Server::new(addr, move |request| {
router!(request,
(GET) (/metrics) => {
let encoder = TextEncoder::new();
let mut buffer = vec![];
let mf = registry.gather();
encoder.encode(&mf, &mut buffer).expect("Failed to encoder metrics text.");
rouille::Response::text(String::from_utf8(buffer).expect("Failed to parse bytes as a string."))
},
_ => rouille::Response::empty_404()
)
})
.expect("Failed to start server");
let server = tiny_http::Server::http(addr).unwrap();
const timeout: std::time::Duration = std::time::Duration::from_secs(1);
debug!("starting tiny_http server to serve metrics");
while running2.load(Ordering::Relaxed) {
server.poll();
// Poll at 10Hz
thread::sleep(Duration::from_millis(100));
let request = match server.recv_timeout(timeout) {
Ok(Some(rq)) => rq,
Ok(None) => continue,
Err(e) => { println!("error: {}", e); break }
};
let mf = registry.gather();
let encoder = TextEncoder::new();
let mut buffer = vec![];
encoder.encode(&mf, &mut buffer).expect("Failed to encoder metrics text.");
let response = tiny_http::Response::from_string(String::from_utf8(buffer).expect("Failed to parse bytes as a string."));
request.respond(response);
}
debug!("stopping tiny_http server to serve metrics");
}));
Ok(())
}

View File

@ -92,6 +92,7 @@ impl Network {
.run()
.instrument(tracing::info_span!("scheduler", ?p)),
);
trace!(?p, ?User, "stopping sheduler and his own thread");
});
Self {
local_pid: participant_id,

View File

@ -1,40 +1,86 @@
use crate::{
metrics::NetworkMetrics,
protocols::Protocols,
scheduler::ConfigureInfo,
types::{
Cid, Frame, Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER,
VELOREN_NETWORK_VERSION,
},
};
use async_std::sync::RwLock;
use futures::{
channel::{mpsc, oneshot},
join,
sink::SinkExt,
stream::StreamExt,
FutureExt,
};
use std::sync::Arc;
use tracing::*;
//use futures::prelude::*;
pub(crate) struct Channel {
cid: Cid,
local_pid: Pid,
metrics: Arc<NetworkMetrics>,
remote_pid: RwLock<Option<Pid>>,
send_state: RwLock<ChannelState>,
recv_state: RwLock<ChannelState>,
}
#[derive(Debug, PartialEq)]
enum ChannelState {
None,
Handshake,
Pid,
Shutdown,
remote_pid: Pid,
to_wire_receiver: Option<mpsc::UnboundedReceiver<Frame>>,
read_stop_receiver: Option<oneshot::Receiver<()>>,
}
impl Channel {
pub fn new(
cid: u64,
remote_pid: Pid,
metrics: Arc<NetworkMetrics>,
) -> (Self, mpsc::UnboundedSender<Frame>, oneshot::Sender<()>) {
let (to_wire_sender, to_wire_receiver) = mpsc::unbounded::<Frame>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
(
Self {
cid,
metrics,
remote_pid,
to_wire_receiver: Some(to_wire_receiver),
read_stop_receiver: Some(read_stop_receiver),
},
to_wire_sender,
read_stop_sender,
)
}
pub async fn run(
mut self,
protocol: Protocols,
from_wire_sender: mpsc::UnboundedSender<(Cid, Frame)>,
) {
let to_wire_receiver = self.to_wire_receiver.take().unwrap();
let read_stop_receiver = self.read_stop_receiver.take().unwrap();
trace!(?self.remote_pid, "start up channel");
match protocol {
Protocols::Tcp(tcp) => {
futures::join!(
tcp.read(self.cid, from_wire_sender, read_stop_receiver),
tcp.write(self.cid, to_wire_receiver),
);
},
Protocols::Udp(udp) => {
futures::join!(
udp.read(self.cid, from_wire_sender, read_stop_receiver),
udp.write(self.cid, to_wire_receiver),
);
},
}
trace!(?self.remote_pid, "shut down channel");
}
}
pub(crate) struct Handshake {
cid: Cid,
local_pid: Pid,
init_handshake: bool,
metrics: Arc<NetworkMetrics>,
}
impl Handshake {
#[cfg(debug_assertions)]
const WRONG_NUMBER: &'static [u8] = "Handshake does not contain the magic number requiered by \
veloren server.\nWe are not sure if you are a valid \
@ -45,263 +91,226 @@ impl Channel {
invalid version.\nWe don't know how to communicate with \
you.\nClosing the connection";
pub fn new(cid: u64, local_pid: Pid, metrics: Arc<NetworkMetrics>) -> Self {
pub fn new(
cid: u64,
local_pid: Pid,
metrics: Arc<NetworkMetrics>,
init_handshake: bool,
) -> Self {
Self {
cid,
local_pid,
metrics,
remote_pid: RwLock::new(None),
send_state: RwLock::new(ChannelState::None),
recv_state: RwLock::new(ChannelState::None),
init_handshake,
}
}
/// (prot|part)_(in|out)_(sender|receiver)
/// prot: TO/FROM PROTOCOL = TCP
/// part: TO/FROM PARTICIPANT
/// in: FROM
/// out: TO
/// sender: mpsc::Sender
/// receiver: mpsc::Receiver
pub async fn run(
self,
protocol: Protocols,
part_in_receiver: mpsc::UnboundedReceiver<Frame>,
configured_sender: mpsc::UnboundedSender<ConfigureInfo>,
) {
let (prot_in_sender, prot_in_receiver) = mpsc::unbounded::<Frame>();
let (prot_out_sender, prot_out_receiver) = mpsc::unbounded::<Frame>();
pub async fn setup(self, protocol: &Protocols) -> Result<(Pid, Sid), ()> {
let (to_wire_sender, to_wire_receiver) = mpsc::unbounded::<Frame>();
let (from_wire_sender, from_wire_receiver) = mpsc::unbounded::<(Cid, Frame)>();
let (read_stop_sender, read_stop_receiver) = oneshot::channel();
let handler_future =
self.frame_handler(prot_in_receiver, prot_out_sender, configured_sender);
self.frame_handler(from_wire_receiver, to_wire_sender, read_stop_sender);
match protocol {
Protocols::Tcp(tcp) => {
futures::join!(
tcp.read(prot_in_sender),
tcp.write(prot_out_receiver, part_in_receiver),
(join! {
tcp.read(self.cid, from_wire_sender, read_stop_receiver),
tcp.write(self.cid, to_wire_receiver).fuse(),
handler_future,
);
})
.2
},
Protocols::Udp(udp) => {
futures::join!(
udp.read(prot_in_sender),
udp.write(prot_out_receiver, part_in_receiver),
(join! {
udp.read(self.cid, from_wire_sender, read_stop_receiver),
udp.write(self.cid, to_wire_receiver),
handler_future,
);
})
.2
},
}
//return part_out_receiver;
}
pub async fn frame_handler(
async fn frame_handler(
&self,
mut frames: mpsc::UnboundedReceiver<Frame>,
mut frame_sender: mpsc::UnboundedSender<Frame>,
mut configured_sender: mpsc::UnboundedSender<(
Cid,
Pid,
Sid,
oneshot::Sender<mpsc::UnboundedSender<(Cid, Frame)>>,
)>,
) {
mut from_wire_receiver: mpsc::UnboundedReceiver<(Cid, Frame)>,
mut to_wire_sender: mpsc::UnboundedSender<Frame>,
_read_stop_sender: oneshot::Sender<()>,
) -> Result<(Pid, Sid), ()> {
const ERR_S: &str = "Got A Raw Message, these are usually Debug Messages indicating that \
something went wrong on network layer and connection will be closed";
let mut pid_string = "".to_string();
let cid_string = self.cid.to_string();
let mut external_frame_sender: Option<mpsc::UnboundedSender<(Cid, Frame)>> = None;
while let Some(frame) = frames.next().await {
match frame {
if self.init_handshake {
self.send_handshake(&mut to_wire_sender).await;
}
match from_wire_receiver.next().await {
Some((
_,
Frame::Handshake {
magic_number,
version,
} => {
trace!(?magic_number, ?version, "recv handshake");
self.metrics
.frames_in_total
.with_label_values(&["", &cid_string, "Handshake"])
.inc();
if self
.verify_handshake(magic_number, version, &mut frame_sender)
.await
.is_ok()
},
)) => {
trace!(?magic_number, ?version, "recv handshake");
self.metrics
.frames_in_total
.with_label_values(&["", &cid_string, "Handshake"])
.inc();
if magic_number != VELOREN_MAGIC_NUMBER {
error!(?magic_number, "connection with invalid magic_number");
#[cfg(debug_assertions)]
{
debug!("handshake completed");
*self.recv_state.write().await = ChannelState::Handshake;
if *self.send_state.read().await == ChannelState::Handshake {
self.send_pid(&mut frame_sender).await;
} else {
self.send_handshake(&mut frame_sender).await;
}
};
},
Frame::ParticipantId { pid } => {
if self.remote_pid.read().await.is_some() {
error!(?pid, "invalid message, cant change participantId");
return;
self.metrics
.frames_out_total
.with_label_values(&["", &cid_string, "Raw"])
.inc();
debug!("sending client instructions before killing");
to_wire_sender
.send(Frame::Raw(Self::WRONG_NUMBER.to_vec()))
.await
.unwrap();
to_wire_sender.send(Frame::Shutdown).await.unwrap();
}
*self.remote_pid.write().await = Some(pid);
*self.recv_state.write().await = ChannelState::Pid;
debug!(?pid, "Participant send their ID");
let pid_u128: u128 = pid.into();
pid_string = pid_u128.to_string();
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, "ParticipantId"])
.inc();
let stream_id_offset = if *self.send_state.read().await != ChannelState::Pid {
self.send_pid(&mut frame_sender).await;
STREAM_ID_OFFSET2
} else {
STREAM_ID_OFFSET1
};
info!(?pid, "this channel is now configured!");
let pid_u128: u128 = pid.into();
self.metrics
.channels_connected_total
.with_label_values(&[&pid_u128.to_string()])
.inc();
let (sender, receiver) = oneshot::channel();
configured_sender
.send((self.cid, pid, stream_id_offset, sender))
.await
.unwrap();
external_frame_sender = Some(receiver.await.unwrap());
//TODO: this is sync anyway, because we need to wait. so find a better way than
// there channels like direct method call... otherwise a
// frame might jump in before its officially configured yet
debug!(
"STOP, if you read this, fix this error. make this a function isntead a \
channel here"
);
},
Frame::Shutdown => {
info!("shutdown signal received");
*self.recv_state.write().await = ChannelState::Shutdown;
self.metrics
.channels_disconnected_total
.with_label_values(&[&pid_string])
.inc();
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, "Shutdown"])
.inc();
},
/* Sending RAW is only used for debug purposes in case someone write a
* new API against veloren Server! */
Frame::Raw(bytes) => {
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, "Raw"])
.inc();
match std::str::from_utf8(bytes.as_slice()) {
Ok(string) => error!(?string, ERR_S),
_ => error!(?bytes, ERR_S),
return Err(());
}
if version != VELOREN_NETWORK_VERSION {
error!(?version, "connection with wrong network version");
#[cfg(debug_assertions)]
{
debug!("sending client instructions before killing");
self.metrics
.frames_out_total
.with_label_values(&["", &cid_string, "Raw"])
.inc();
to_wire_sender
.send(Frame::Raw(
format!(
"{} Our Version: {:?}\nYour Version: {:?}\nClosing the \
connection",
Self::WRONG_VERSION,
VELOREN_NETWORK_VERSION,
version,
)
.as_bytes()
.to_vec(),
))
.await
.unwrap();
to_wire_sender.send(Frame::Shutdown {}).await.unwrap();
}
},
_ => {
trace!("forward frame");
let pid = &pid_string;
match &mut external_frame_sender {
None => error!(
?pid,
"cannot forward frame, as channel isn't configured correctly!"
),
Some(sender) => sender.send((self.cid, frame)).await.unwrap(),
};
},
}
}
return Err(());
}
debug!("handshake completed");
if self.init_handshake {
self.send_pid(&mut to_wire_sender, &pid_string).await;
} else {
self.send_handshake(&mut to_wire_sender).await;
}
},
Some((_, Frame::Shutdown)) => {
info!("shutdown signal received");
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, "Shutdown"])
.inc();
return Err(());
},
Some((_, Frame::Raw(bytes))) => {
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, "Raw"])
.inc();
match std::str::from_utf8(bytes.as_slice()) {
Ok(string) => error!(?string, ERR_S),
_ => error!(?bytes, ERR_S),
}
return Err(());
},
Some((_, frame)) => {
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, frame.get_string()])
.inc();
return Err(());
},
None => return Err(()),
};
match from_wire_receiver.next().await {
Some((_, Frame::ParticipantId { pid })) => {
debug!(?pid, "Participant send their ID");
pid_string = pid.to_string();
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, "ParticipantId"])
.inc();
let stream_id_offset = if self.init_handshake {
STREAM_ID_OFFSET1
} else {
self.send_pid(&mut to_wire_sender, &pid_string).await;
STREAM_ID_OFFSET2
};
info!(?pid, "this Handshake is now configured!");
return Ok((pid, stream_id_offset));
},
Some((_, Frame::Shutdown)) => {
info!("shutdown signal received");
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, "Shutdown"])
.inc();
return Err(());
},
Some((_, Frame::Raw(bytes))) => {
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, "Raw"])
.inc();
match std::str::from_utf8(bytes.as_slice()) {
Ok(string) => error!(?string, ERR_S),
_ => error!(?bytes, ERR_S),
}
return Err(());
},
Some((_, frame)) => {
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, frame.get_string()])
.inc();
return Err(());
},
None => return Err(()),
};
}
async fn verify_handshake(
&self,
magic_number: [u8; 7],
version: [u32; 3],
#[cfg(debug_assertions)] frame_sender: &mut mpsc::UnboundedSender<Frame>,
#[cfg(not(debug_assertions))] _: &mut mpsc::UnboundedSender<Frame>,
) -> Result<(), ()> {
if magic_number != VELOREN_MAGIC_NUMBER {
error!(?magic_number, "connection with invalid magic_number");
#[cfg(debug_assertions)]
{
debug!("sending client instructions before killing");
frame_sender
.send(Frame::Raw(Self::WRONG_NUMBER.to_vec()))
.await
.unwrap();
frame_sender.send(Frame::Shutdown).await.unwrap();
*self.send_state.write().await = ChannelState::Shutdown;
}
return Err(());
}
if version != VELOREN_NETWORK_VERSION {
error!(?version, "connection with wrong network version");
#[cfg(debug_assertions)]
{
debug!("sending client instructions before killing");
frame_sender
.send(Frame::Raw(
format!(
"{} Our Version: {:?}\nYour Version: {:?}\nClosing the connection",
Self::WRONG_VERSION,
VELOREN_NETWORK_VERSION,
version,
)
.as_bytes()
.to_vec(),
))
.await
.unwrap();
frame_sender.send(Frame::Shutdown {}).await.unwrap();
*self.send_state.write().await = ChannelState::Shutdown;
}
return Err(());
}
Ok(())
}
pub(crate) async fn send_handshake(&self, part_in_sender: &mut mpsc::UnboundedSender<Frame>) {
part_in_sender
async fn send_handshake(&self, to_wire_sender: &mut mpsc::UnboundedSender<Frame>) {
self.metrics
.frames_out_total
.with_label_values(&["", &self.cid.to_string(), "Handshake"])
.inc();
to_wire_sender
.send(Frame::Handshake {
magic_number: VELOREN_MAGIC_NUMBER,
version: VELOREN_NETWORK_VERSION,
})
.await
.unwrap();
*self.send_state.write().await = ChannelState::Handshake;
}
pub(crate) async fn send_pid(&self, part_in_sender: &mut mpsc::UnboundedSender<Frame>) {
part_in_sender
async fn send_pid(&self, to_wire_sender: &mut mpsc::UnboundedSender<Frame>, pid_string: &str) {
self.metrics
.frames_out_total
.with_label_values(&[pid_string, &self.cid.to_string(), "ParticipantId"])
.inc();
to_wire_sender
.send(Frame::ParticipantId {
pid: self.local_pid,
})
.await
.unwrap();
*self.send_state.write().await = ChannelState::Pid;
}
/*
pub async fn run(&mut self) {
//let (incomming_sender, incomming_receiver) = mpsc::unbounded();
futures::join!(self.listen_manager(), self.send_outgoing());
}
pub async fn listen_manager(&self) {
let (mut listen_sender, mut listen_receiver) = mpsc::unbounded::<Address>();
while self.closed.load(Ordering::Relaxed) {
while let Some(address) = listen_receiver.next().await {
let (end_sender, end_receiver) = oneshot::channel::<()>();
task::spawn(channel_creator(address, end_receiver));
}
}
}
pub async fn send_outgoing(&self) {
//let prios = prios::PrioManager;
while self.closed.load(Ordering::Relaxed) {
}
}*/
}

View File

@ -1,3 +1,4 @@
use crate::types::Pid;
use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
use std::error::Error;
@ -16,9 +17,12 @@ pub struct NetworkMetrics {
pub streams_opened_total: IntCounterVec,
pub streams_closed_total: IntCounterVec,
pub network_info: IntGauge,
// Frames, seperated by CHANNEL (and PARTICIPANT) AND FRAME TYPE,
// Frames counted a channel level, seperated by CHANNEL (and PARTICIPANT) AND FRAME TYPE,
pub frames_out_total: IntCounterVec,
pub frames_in_total: IntCounterVec,
// Frames counted at protocol level, seperated by CHANNEL (and PARTICIPANT) AND FRAME TYPE,
pub frames_wire_out_total: IntCounterVec,
pub frames_wire_in_total: IntCounterVec,
pub frames_count: IntGaugeVec,
// send Messages, seperated by STREAM (and PARTICIPANT, CHANNEL),
pub message_count: IntGaugeVec,
@ -38,7 +42,7 @@ pub struct NetworkMetrics {
impl NetworkMetrics {
#[allow(dead_code)]
pub fn new() -> Result<Self, Box<dyn Error>> {
pub fn new(local_pid: &Pid) -> Result<Self, Box<dyn Error>> {
let listen_requests_total = IntCounterVec::new(
Opts::new(
"listen_requests_total",
@ -89,27 +93,46 @@ impl NetworkMetrics {
),
&["participant"],
)?;
let opts = Opts::new("network_info", "Static Network information").const_label(
"version",
&format!(
"{}.{}.{}",
&crate::types::VELOREN_NETWORK_VERSION[0],
&crate::types::VELOREN_NETWORK_VERSION[1],
&crate::types::VELOREN_NETWORK_VERSION[2]
),
);
let opts = Opts::new("network_info", "Static Network information")
.const_label(
"version",
&format!(
"{}.{}.{}",
&crate::types::VELOREN_NETWORK_VERSION[0],
&crate::types::VELOREN_NETWORK_VERSION[1],
&crate::types::VELOREN_NETWORK_VERSION[2]
),
)
.const_label("local_pid", &format!("{}", &local_pid));
let network_info = IntGauge::with_opts(opts)?;
let frames_out_total = IntCounterVec::new(
Opts::new("frames_out_total", "number of all frames send per channel"),
Opts::new(
"frames_out_total",
"number of all frames send per channel, at the channel level",
),
&["participant", "channel", "frametype"],
)?;
let frames_in_total = IntCounterVec::new(
Opts::new(
"frames_in_total",
"number of all frames received per channel",
"number of all frames received per channel, at the channel level",
),
&["participant", "channel", "frametype"],
)?;
let frames_wire_out_total = IntCounterVec::new(
Opts::new(
"frames_wire_out_total",
"number of all frames send per channel, at the protocol level",
),
&["channel", "frametype"],
)?;
let frames_wire_in_total = IntCounterVec::new(
Opts::new(
"frames_wire_in_total",
"number of all frames received per channel, at the protocol level",
),
&["channel", "frametype"],
)?;
let frames_count = IntGaugeVec::new(
Opts::new(
@ -170,6 +193,8 @@ impl NetworkMetrics {
network_info,
frames_out_total,
frames_in_total,
frames_wire_out_total,
frames_wire_in_total,
frames_count,
message_count,
bytes_send,
@ -189,9 +214,11 @@ impl NetworkMetrics {
registry.register(Box::new(self.channels_disconnected_total.clone()))?;
registry.register(Box::new(self.streams_opened_total.clone()))?;
registry.register(Box::new(self.streams_closed_total.clone()))?;
registry.register(Box::new(self.network_info.clone()))?;
registry.register(Box::new(self.frames_out_total.clone()))?;
registry.register(Box::new(self.frames_in_total.clone()))?;
registry.register(Box::new(self.frames_wire_out_total.clone()))?;
registry.register(Box::new(self.frames_wire_in_total.clone()))?;
registry.register(Box::new(self.network_info.clone()))?;
registry.register(Box::new(self.frames_count.clone()))?;
registry.register(Box::new(self.message_count.clone()))?;
registry.register(Box::new(self.bytes_send.clone()))?;

View File

@ -1,7 +1,9 @@
use crate::{
api::Stream,
channel::Channel,
message::{InCommingMessage, MessageBuffer, OutGoingMessage},
metrics::NetworkMetrics,
protocols::Protocols,
types::{Cid, Frame, Pid, Prio, Promises, Sid},
};
use async_std::sync::RwLock;
@ -25,8 +27,7 @@ use tracing::*;
struct ControlChannels {
stream_open_receiver: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender<Stream>)>,
stream_opened_sender: mpsc::UnboundedSender<Stream>,
transfer_channel_receiver: mpsc::UnboundedReceiver<(Cid, mpsc::UnboundedSender<Frame>)>,
frame_recv_receiver: mpsc::UnboundedReceiver<(Cid, Frame)>,
create_channel_receiver: mpsc::UnboundedReceiver<(Cid, Sid, Protocols, oneshot::Sender<()>)>,
shutdown_api_receiver: mpsc::UnboundedReceiver<Sid>,
shutdown_api_sender: mpsc::UnboundedSender<Sid>,
send_outgoing: Arc<Mutex<std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>>>, //api
@ -39,7 +40,7 @@ struct ControlChannels {
pub struct BParticipant {
remote_pid: Pid,
offset_sid: Sid,
channels: RwLock<Vec<(Cid, mpsc::UnboundedSender<Frame>)>>,
channels: Arc<RwLock<Vec<(Cid, mpsc::UnboundedSender<Frame>)>>>,
streams: RwLock<
HashMap<
Sid,
@ -66,26 +67,23 @@ impl BParticipant {
Self,
mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender<Stream>)>,
mpsc::UnboundedReceiver<Stream>,
mpsc::UnboundedSender<(Cid, mpsc::UnboundedSender<Frame>)>,
mpsc::UnboundedSender<(Cid, Frame)>,
mpsc::UnboundedSender<(Cid, Sid, Protocols, oneshot::Sender<()>)>,
mpsc::UnboundedSender<(Pid, Sid, Frame)>,
oneshot::Sender<()>,
) {
let (stream_open_sender, stream_open_receiver) =
mpsc::unbounded::<(Prio, Promises, oneshot::Sender<Stream>)>();
let (stream_opened_sender, stream_opened_receiver) = mpsc::unbounded::<Stream>();
let (transfer_channel_sender, transfer_channel_receiver) =
mpsc::unbounded::<(Cid, mpsc::UnboundedSender<Frame>)>();
let (frame_recv_sender, frame_recv_receiver) = mpsc::unbounded::<(Cid, Frame)>();
let (shutdown_api_sender, shutdown_api_receiver) = mpsc::unbounded();
let (frame_send_sender, frame_send_receiver) = mpsc::unbounded::<(Pid, Sid, Frame)>();
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let (create_channel_sender, create_channel_receiver) =
mpsc::unbounded::<(Cid, Sid, Protocols, oneshot::Sender<()>)>();
let run_channels = Some(ControlChannels {
stream_open_receiver,
stream_opened_sender,
transfer_channel_receiver,
frame_recv_receiver,
create_channel_receiver,
shutdown_api_receiver,
shutdown_api_sender,
send_outgoing: Arc::new(Mutex::new(send_outgoing)),
@ -98,15 +96,14 @@ impl BParticipant {
Self {
remote_pid,
offset_sid,
channels: RwLock::new(vec![]),
channels: Arc::new(RwLock::new(vec![])),
streams: RwLock::new(HashMap::new()),
run_channels,
metrics,
},
stream_open_sender,
stream_opened_receiver,
transfer_channel_sender,
frame_recv_sender,
create_channel_sender,
frame_send_sender,
shutdown_sender,
)
@ -118,10 +115,10 @@ impl BParticipant {
let (shutdown_open_manager_sender, shutdown_open_manager_receiver) = oneshot::channel();
let (shutdown_stream_close_manager_sender, shutdown_stream_close_manager_receiver) =
oneshot::channel();
let (frame_from_wire_sender, frame_from_wire_receiver) = mpsc::unbounded::<(Cid, Frame)>();
let run_channels = self.run_channels.take().unwrap();
futures::join!(
self.transfer_channel_manager(run_channels.transfer_channel_receiver),
self.open_manager(
run_channels.stream_open_receiver,
run_channels.shutdown_api_sender.clone(),
@ -129,11 +126,15 @@ impl BParticipant {
shutdown_open_manager_receiver,
),
self.handle_frames(
run_channels.frame_recv_receiver,
frame_from_wire_receiver,
run_channels.stream_opened_sender,
run_channels.shutdown_api_sender,
run_channels.send_outgoing.clone(),
),
self.create_channel_manager(
run_channels.create_channel_receiver,
frame_from_wire_sender,
),
self.send_manager(run_channels.frame_send_receiver),
self.stream_close_manager(
run_channels.shutdown_api_receiver,
@ -153,7 +154,15 @@ impl BParticipant {
async fn send_frame(&self, frame: Frame) {
// find out ideal channel here
//TODO: just take first
if let Some((_cid, channel)) = self.channels.write().await.get_mut(0) {
if let Some((cid, channel)) = self.channels.write().await.get_mut(0) {
self.metrics
.frames_out_total
.with_label_values(&[
&self.remote_pid.to_string(),
&cid.to_string(),
frame.get_string(),
])
.inc();
channel.send(frame).await.unwrap();
} else {
error!("participant has no channel to communicate on");
@ -162,7 +171,7 @@ impl BParticipant {
async fn handle_frames(
&self,
mut frame_recv_receiver: mpsc::UnboundedReceiver<(Cid, Frame)>,
mut frame_from_wire_receiver: mpsc::UnboundedReceiver<(Cid, Frame)>,
mut stream_opened_sender: mpsc::UnboundedSender<Stream>,
shutdown_api_sender: mpsc::UnboundedSender<Sid>,
send_outgoing: Arc<Mutex<std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>>>,
@ -170,10 +179,14 @@ impl BParticipant {
trace!("start handle_frames");
let send_outgoing = { send_outgoing.lock().unwrap().clone() };
let mut messages = HashMap::new();
let pid_u128: u128 = self.remote_pid.into();
let pid_string = pid_u128.to_string();
while let Some((cid, frame)) = frame_recv_receiver.next().await {
debug!("handling frame");
let pid_string = &self.remote_pid.to_string();
while let Some((cid, frame)) = frame_from_wire_receiver.next().await {
let cid_string = cid.to_string();
trace!("handling frame");
self.metrics
.frames_in_total
.with_label_values(&[&pid_string, &cid_string, frame.get_string()])
.inc();
match frame {
Frame::OpenStream {
sid,
@ -185,9 +198,6 @@ impl BParticipant {
.create_stream(sid, prio, promises, send_outgoing, &shutdown_api_sender)
.await;
stream_opened_sender.send(stream).await.unwrap();
//TODO: Metrics
//self.metrics.frames_in_total.with_label_values(&[&pid_string, &cid_string,
// "Raw"]).inc();
trace!("opened frame from remote");
},
Frame::CloseStream { sid } => {
@ -197,10 +207,9 @@ impl BParticipant {
// is dropped, so i need a way to notify the Stream that it's send messages will
// be dropped... from remote, notify local
if let Some((_, _, _, closed)) = self.streams.write().await.remove(&sid) {
let pid_u128: u128 = self.remote_pid.into();
self.metrics
.streams_closed_total
.with_label_values(&[&pid_u128.to_string()])
.with_label_values(&[&pid_string])
.inc();
closed.store(true, Ordering::Relaxed);
} else {
@ -249,6 +258,40 @@ impl BParticipant {
trace!("stop handle_frames");
}
async fn create_channel_manager(
&self,
channels_receiver: mpsc::UnboundedReceiver<(Cid, Sid, Protocols, oneshot::Sender<()>)>,
frame_from_wire_sender: mpsc::UnboundedSender<(Cid, Frame)>,
) {
trace!("start channel_manager");
channels_receiver
.for_each_concurrent(None, |(cid, sid, protocol, sender)| {
// This channel is now configured, and we are running it in scope of the
// participant.
let frame_from_wire_sender = frame_from_wire_sender.clone();
let channels = self.channels.clone();
async move {
let (channel, frame_to_wire_sender, shutdown_sender) =
Channel::new(cid, self.remote_pid, self.metrics.clone());
channels.write().await.push((cid, frame_to_wire_sender));
sender.send(()).unwrap();
self.metrics
.channels_connected_total
.with_label_values(&[&self.remote_pid.to_string()])
.inc();
channel.run(protocol, frame_from_wire_sender).await;
self.metrics
.channels_disconnected_total
.with_label_values(&[&self.remote_pid.to_string()])
.inc();
trace!(?cid, "channel got closed");
shutdown_sender.send(()).unwrap();
}
})
.await;
trace!("stop channel_manager");
}
async fn send_manager(
&self,
mut frame_send_receiver: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>,
@ -260,18 +303,6 @@ impl BParticipant {
trace!("stop send_manager");
}
async fn transfer_channel_manager(
&self,
mut transfer_channel_receiver: mpsc::UnboundedReceiver<(Cid, mpsc::UnboundedSender<Frame>)>,
) {
trace!("start transfer_channel_manager");
while let Some((cid, sender)) = transfer_channel_receiver.next().await {
debug!(?cid, "got a new channel to listen on");
self.channels.write().await.push((cid, sender));
}
trace!("stop transfer_channel_manager");
}
async fn open_manager(
&self,
mut stream_open_receiver: mpsc::UnboundedReceiver<(
@ -369,10 +400,9 @@ impl BParticipant {
.unwrap();
receiver.await.unwrap();
trace!(?sid, "stream was successfully flushed");
let pid_u128: u128 = self.remote_pid.into();
self.metrics
.streams_closed_total
.with_label_values(&[&pid_u128.to_string()])
.with_label_values(&[&self.remote_pid.to_string()])
.inc();
self.streams.write().await.remove(&sid);
@ -396,10 +426,9 @@ impl BParticipant {
.write()
.await
.insert(sid, (prio, promises, msg_recv_sender, closed.clone()));
let pid_u128: u128 = self.remote_pid.into();
self.metrics
.streams_opened_total
.with_label_values(&[&pid_u128.to_string()])
.with_label_values(&[&self.remote_pid.to_string()])
.inc();
Stream::new(
self.remote_pid,

View File

@ -1,13 +1,19 @@
use crate::{
metrics::NetworkMetrics,
types::{Frame, Mid, Pid, Sid},
types::{Cid, Frame, Mid, Pid, Sid},
};
use async_std::{
net::{TcpStream, UdpSocket},
prelude::*,
sync::RwLock,
};
use futures::{channel::mpsc, future::FutureExt, select, sink::SinkExt, stream::StreamExt};
use futures::{
channel::{mpsc, oneshot},
future::FutureExt,
select,
sink::SinkExt,
stream::StreamExt,
};
use std::{net::SocketAddr, sync::Arc};
use tracing::*;
@ -51,12 +57,23 @@ impl TcpProtocol {
Self { stream, metrics }
}
pub async fn read(&self, mut frame_handler: mpsc::UnboundedSender<Frame>) {
pub async fn read(
&self,
cid: Cid,
mut from_wire_sender: mpsc::UnboundedSender<(Cid, Frame)>,
end_receiver: oneshot::Receiver<()>,
) {
trace!("starting up tcp write()");
let mut stream = self.stream.clone();
let mut end_receiver = end_receiver.fuse();
loop {
let mut bytes = [0u8; 1];
if stream.read_exact(&mut bytes).await.is_err() {
info!("tcp channel closed, shutting down read");
let r = select! {
r = stream.read_exact(&mut bytes).fuse() => r,
_ = end_receiver => break,
};
if r.is_err() {
info!("tcp stream closed, shutting down read");
break;
}
let frame_no = bytes[0];
@ -156,7 +173,11 @@ impl TcpProtocol {
Frame::Raw(data)
},
};
frame_handler.send(frame).await.unwrap();
self.metrics
.frames_wire_in_total
.with_label_values(&[&cid.to_string(), frame.get_string()])
.inc();
from_wire_sender.send((cid, frame)).await.unwrap();
}
trace!("shutting down tcp read()");
}
@ -164,16 +185,15 @@ impl TcpProtocol {
//dezerialize here as this is executed in a seperate thread PER channel.
// Limites Throughput per single Receiver but stays in same thread (maybe as its
// in a threadpool) for TCP, UDP and MPSC
pub async fn write(
&self,
mut internal_frame_receiver: mpsc::UnboundedReceiver<Frame>,
mut external_frame_receiver: mpsc::UnboundedReceiver<Frame>,
) {
pub async fn write(&self, cid: Cid, mut to_wire_receiver: mpsc::UnboundedReceiver<Frame>) {
trace!("starting up tcp write()");
let mut stream = self.stream.clone();
while let Some(frame) = select! {
next = internal_frame_receiver.next().fuse() => next,
next = external_frame_receiver.next().fuse() => next,
} {
let cid_string = cid.to_string();
while let Some(frame) = to_wire_receiver.next().await {
self.metrics
.frames_wire_out_total
.with_label_values(&[&cid_string, frame.get_string()])
.inc();
match frame {
Frame::Handshake {
magic_number,
@ -269,9 +289,19 @@ impl UdpProtocol {
}
}
pub async fn read(&self, mut frame_handler: mpsc::UnboundedSender<Frame>) {
pub async fn read(
&self,
cid: Cid,
mut from_wire_sender: mpsc::UnboundedSender<(Cid, Frame)>,
end_receiver: oneshot::Receiver<()>,
) {
trace!("starting up udp read()");
let mut data_in = self.data_in.write().await;
while let Some(bytes) = data_in.next().await {
let mut end_receiver = end_receiver.fuse();
while let Some(bytes) = select! {
r = data_in.next().fuse() => r,
_ = end_receiver => None,
} {
trace!("got raw UDP message with len: {}", bytes.len());
let frame_no = bytes[0];
let frame = match frame_no {
@ -351,7 +381,6 @@ impl UdpProtocol {
Frame::Data { mid, start, data }
},
FRAME_RAW => {
error!("Uffff");
let length = u16::from_le_bytes([bytes[1], bytes[2]]);
let mut data = vec![0; length as usize];
data.copy_from_slice(&bytes[3..]);
@ -359,60 +388,24 @@ impl UdpProtocol {
},
_ => Frame::Raw(bytes),
};
frame_handler.send(frame).await.unwrap();
self.metrics
.frames_wire_in_total
.with_label_values(&[&cid.to_string(), frame.get_string()])
.inc();
from_wire_sender.send((cid, frame)).await.unwrap();
}
/*
let mut data_in = self.data_in.write().await;
let mut buffer = NetworkBuffer::new();
while let Some(data) = data_in.next().await {
let n = data.len();
let slice = &mut buffer.get_write_slice(n)[0..n]; //get_write_slice can return more then n!
slice.clone_from_slice(data.as_slice());
buffer.actually_written(n);
trace!("incomming message with len: {}", n);
let slice = buffer.get_read_slice();
let mut cur = std::io::Cursor::new(slice);
let mut read_ok = 0;
while cur.position() < n as u64 {
let round_start = cur.position() as usize;
let r: Result<Frame, _> = bincode::deserialize_from(&mut cur);
match r {
Ok(frame) => {
frame_handler.send(frame).await.unwrap();
read_ok = cur.position() as usize;
},
Err(e) => {
// Probably we have to wait for moare data!
let first_bytes_of_msg =
&slice[round_start..std::cmp::min(n, round_start + 16)];
debug!(
?buffer,
?e,
?n,
?round_start,
?first_bytes_of_msg,
"message cant be parsed, probably because we need to wait for more \
data"
);
break;
},
}
}
buffer.actually_read(read_ok);
}*/
trace!("shutting down udp read()");
}
pub async fn write(
&self,
mut internal_frame_receiver: mpsc::UnboundedReceiver<Frame>,
mut external_frame_receiver: mpsc::UnboundedReceiver<Frame>,
) {
pub async fn write(&self, cid: Cid, mut to_wire_receiver: mpsc::UnboundedReceiver<Frame>) {
trace!("starting up udp write()");
let mut buffer = [0u8; 2000];
while let Some(frame) = select! {
next = internal_frame_receiver.next().fuse() => next,
next = external_frame_receiver.next().fuse() => next,
} {
let cid_string = cid.to_string();
while let Some(frame) = to_wire_receiver.next().await {
self.metrics
.frames_wire_out_total
.with_label_values(&[&cid_string, frame.get_string()])
.inc();
let len = match frame {
Frame::Handshake {
magic_number,
@ -602,116 +595,5 @@ impl UdpProtocol {
}
}
trace!("shutting down udp write()");
/*
let mut buffer = NetworkBuffer::new();
while let Some(frame) = select! {
next = internal_frame_receiver.next().fuse() => next,
next = external_frame_receiver.next().fuse() => next,
} {
let len = bincode::serialized_size(&frame).unwrap() as usize;
match bincode::serialize_into(buffer.get_write_slice(len), &frame) {
Ok(_) => buffer.actually_written(len),
Err(e) => error!("Oh nooo {}", e),
};
trace!(?len, "going to send frame via Udp");
let mut to_send = buffer.get_read_slice();
while to_send.len() > 0 {
match self.socket.send_to(to_send, self.remote_addr).await {
Ok(n) => buffer.actually_read(n),
Err(e) => error!(?e, "need to handle that error!"),
}
to_send = buffer.get_read_slice();
}
}
*/
}
}
// INTERNAL NetworkBuffer
/*
struct NetworkBuffer {
pub(crate) data: Vec<u8>,
pub(crate) read_idx: usize,
pub(crate) write_idx: usize,
}
/// NetworkBuffer to use for streamed access
/// valid data is between read_idx and write_idx!
/// everything before read_idx is already processed and no longer important
/// everything after write_idx is either 0 or random data buffered
impl NetworkBuffer {
fn new() -> Self {
NetworkBuffer {
data: vec![0; 2048],
read_idx: 0,
write_idx: 0,
}
}
fn get_write_slice(&mut self, min_size: usize) -> &mut [u8] {
if self.data.len() < self.write_idx + min_size {
trace!(
?self,
?min_size,
"need to resize because buffer is to small"
);
self.data.resize(self.write_idx + min_size, 0);
}
&mut self.data[self.write_idx..]
}
fn actually_written(&mut self, cnt: usize) { self.write_idx += cnt; }
fn get_read_slice(&self) -> &[u8] { &self.data[self.read_idx..self.write_idx] }
fn actually_read(&mut self, cnt: usize) {
self.read_idx += cnt;
if self.read_idx == self.write_idx {
if self.read_idx > 10485760 {
trace!(?self, "buffer empty, resetting indices");
}
self.read_idx = 0;
self.write_idx = 0;
}
if self.write_idx > 10485760 {
if self.write_idx - self.read_idx < 65536 {
debug!(
?self,
"This buffer is filled over 10 MB, but the actual data diff is less then \
65kB, which is a sign of stressing this connection much as always new data \
comes in - nevertheless, in order to handle this we will remove some data \
now so that this buffer doesn't grow endlessly"
);
let mut i2 = 0;
for i in self.read_idx..self.write_idx {
self.data[i2] = self.data[i];
i2 += 1;
}
self.read_idx = 0;
self.write_idx = i2;
}
if self.data.len() > 67108864 {
warn!(
?self,
"over 64Mbyte used, something seems fishy, len: {}",
self.data.len()
);
}
}
}
}
impl std::fmt::Debug for NetworkBuffer {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"NetworkBuffer(len: {}, read: {}, write: {})",
self.data.len(),
self.read_idx,
self.write_idx
)
}
}
*/

View File

@ -1,6 +1,6 @@
use crate::{
api::{Address, Participant},
channel::Channel,
channel::Handshake,
message::OutGoingMessage,
metrics::NetworkMetrics,
participant::BParticipant,
@ -30,31 +30,28 @@ use std::{
};
use tracing::*;
use tracing_futures::Instrument;
//use futures::prelude::*;
type ParticipantInfo = (
mpsc::UnboundedSender<(Cid, mpsc::UnboundedSender<Frame>)>,
mpsc::UnboundedSender<(Cid, Sid, Protocols, oneshot::Sender<()>)>,
mpsc::UnboundedSender<(Pid, Sid, Frame)>,
oneshot::Sender<()>,
);
type UnknownChannelInfo = (
mpsc::UnboundedSender<Frame>,
Option<oneshot::Sender<io::Result<Participant>>>,
);
pub(crate) type ConfigureInfo = (
Cid,
Pid,
Sid,
oneshot::Sender<mpsc::UnboundedSender<(Cid, Frame)>>,
);
#[derive(Debug)]
struct ControlChannels {
listen_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<()>>)>,
connect_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<Participant>>)>,
connected_sender: mpsc::UnboundedSender<Participant>,
shutdown_receiver: oneshot::Receiver<()>,
disconnect_receiver: mpsc::UnboundedReceiver<Pid>,
stream_finished_request_receiver: mpsc::UnboundedReceiver<(Pid, Sid, oneshot::Sender<()>)>,
}
#[derive(Debug, Clone)]
struct ParticipantChannels {
connected_sender: mpsc::UnboundedSender<Participant>,
disconnect_sender: mpsc::UnboundedSender<Pid>,
prios_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
stream_finished_request_sender: mpsc::UnboundedSender<(Pid, Sid, oneshot::Sender<()>)>,
}
#[derive(Debug)]
@ -63,11 +60,10 @@ pub struct Scheduler {
closed: AtomicBool,
pool: Arc<ThreadPool>,
run_channels: Option<ControlChannels>,
participant_channels: ParticipantChannels,
participants: Arc<RwLock<HashMap<Pid, ParticipantInfo>>>,
participant_from_channel: Arc<RwLock<HashMap<Cid, Pid>>>,
channel_ids: Arc<AtomicU64>,
channel_listener: RwLock<HashMap<Address, oneshot::Sender<()>>>,
unknown_channels: Arc<RwLock<HashMap<Cid, UnknownChannelInfo>>>,
prios: Arc<Mutex<PrioManager>>,
metrics: Arc<NetworkMetrics>,
}
@ -90,16 +86,25 @@ impl Scheduler {
let (connected_sender, connected_receiver) = mpsc::unbounded::<Participant>();
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let (prios, prios_sender) = PrioManager::new();
let (disconnect_sender, disconnect_receiver) = mpsc::unbounded::<Pid>();
let (stream_finished_request_sender, stream_finished_request_receiver) = mpsc::unbounded();
let run_channels = Some(ControlChannels {
listen_receiver,
connect_receiver,
connected_sender,
shutdown_receiver,
prios_sender,
disconnect_receiver,
stream_finished_request_receiver,
});
let metrics = Arc::new(NetworkMetrics::new().unwrap());
let participant_channels = ParticipantChannels {
disconnect_sender,
stream_finished_request_sender,
connected_sender,
prios_sender,
};
let metrics = Arc::new(NetworkMetrics::new(&local_pid).unwrap());
if let Some(registry) = registry {
metrics.register(registry).unwrap();
}
@ -110,11 +115,10 @@ impl Scheduler {
closed: AtomicBool::new(false),
pool: Arc::new(ThreadPool::new().unwrap()),
run_channels,
participant_channels,
participants: Arc::new(RwLock::new(HashMap::new())),
participant_from_channel: Arc::new(RwLock::new(HashMap::new())),
channel_ids: Arc::new(AtomicU64::new(0)),
channel_listener: RwLock::new(HashMap::new()),
unknown_channels: Arc::new(RwLock::new(HashMap::new())),
prios: Arc::new(Mutex::new(prios)),
metrics,
},
@ -126,38 +130,26 @@ impl Scheduler {
}
pub async fn run(mut self) {
let (configured_sender, configured_receiver) = mpsc::unbounded::<ConfigureInfo>();
let (disconnect_sender, disconnect_receiver) = mpsc::unbounded::<Pid>();
let (stream_finished_request_sender, stream_finished_request_receiver) = mpsc::unbounded();
let run_channels = self.run_channels.take().unwrap();
futures::join!(
self.listen_manager(run_channels.listen_receiver, configured_sender.clone(),),
self.connect_manager(run_channels.connect_receiver, configured_sender,),
self.disconnect_manager(disconnect_receiver,),
self.listen_manager(run_channels.listen_receiver),
self.connect_manager(run_channels.connect_receiver),
self.disconnect_manager(run_channels.disconnect_receiver),
self.send_outgoing(),
self.stream_finished_manager(stream_finished_request_receiver),
self.stream_finished_manager(run_channels.stream_finished_request_receiver),
self.shutdown_manager(run_channels.shutdown_receiver),
self.channel_configurer(
run_channels.connected_sender,
configured_receiver,
disconnect_sender,
run_channels.prios_sender.clone(),
stream_finished_request_sender.clone(),
),
);
}
async fn listen_manager(
&self,
listen_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender<io::Result<()>>)>,
configured_sender: mpsc::UnboundedSender<ConfigureInfo>,
) {
trace!("start listen_manager");
listen_receiver
.for_each_concurrent(None, |(address, result_sender)| {
let address = address.clone();
let configured_sender = configured_sender.clone();
async move {
debug!(?address, "got request to open a channel_creator");
@ -174,13 +166,8 @@ impl Scheduler {
.write()
.await
.insert(address.clone(), end_sender);
self.channel_creator(
address,
end_receiver,
configured_sender.clone(),
result_sender,
)
.await;
self.channel_creator(address, end_receiver, result_sender)
.await;
}
})
.await;
@ -193,11 +180,10 @@ impl Scheduler {
Address,
oneshot::Sender<io::Result<Participant>>,
)>,
configured_sender: mpsc::UnboundedSender<ConfigureInfo>,
) {
trace!("start connect_manager");
while let Some((addr, pid_sender)) = connect_receiver.next().await {
let (addr, protocol, handshake) = match addr {
let (protocol, handshake) = match addr {
Address::Tcp(addr) => {
self.metrics
.connect_requests_total
@ -212,7 +198,7 @@ impl Scheduler {
};
info!("Connecting Tcp to: {}", stream.peer_addr().unwrap());
let protocol = Protocols::Tcp(TcpProtocol::new(stream, self.metrics.clone()));
(addr, protocol, false)
(protocol, false)
},
Address::Udp(addr) => {
self.metrics
@ -242,18 +228,12 @@ impl Scheduler {
Self::udp_single_channel_connect(socket.clone(), udp_data_sender)
.instrument(tracing::info_span!("udp", ?addr)),
);
(addr, protocol, true)
(protocol, true)
},
_ => unimplemented!(),
};
self.init_protocol(
addr,
&configured_sender,
protocol,
Some(pid_sender),
handshake,
)
.await;
self.init_protocol(protocol, Some(pid_sender), handshake)
.await;
}
trace!("stop connect_manager");
}
@ -296,95 +276,6 @@ impl Scheduler {
trace!("stop send_outgoing");
}
//TODO: //ERROR CHECK IF THIS SHOULD BE PUT IN A ASYNC FUNC WHICH IS SEND OVER
// TO CHANNEL OR NOT FOR RETURN VALUE!
async fn channel_configurer(
&self,
mut connected_sender: mpsc::UnboundedSender<Participant>,
mut receiver: mpsc::UnboundedReceiver<ConfigureInfo>,
disconnect_sender: mpsc::UnboundedSender<Pid>,
prios_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>,
stream_finished_request_sender: mpsc::UnboundedSender<(Pid, Sid, oneshot::Sender<()>)>,
) {
trace!("start channel_activator");
while let Some((cid, pid, offset_sid, sender)) = receiver.next().await {
if let Some((frame_sender, pid_oneshot)) =
self.unknown_channels.write().await.remove(&cid)
{
trace!(
?cid,
?pid,
"detected that my channel is ready!, activating it :)"
);
let mut participants = self.participants.write().await;
if !participants.contains_key(&pid) {
debug!(?cid, "new participant connected via a channel");
let (
bparticipant,
stream_open_sender,
stream_opened_receiver,
mut transfer_channel_receiver,
frame_recv_sender,
frame_send_sender,
shutdown_sender,
) = BParticipant::new(
pid,
offset_sid,
self.metrics.clone(),
prios_sender.clone(),
stream_finished_request_sender.clone(),
);
let participant = Participant::new(
self.local_pid,
pid,
stream_open_sender,
stream_opened_receiver,
disconnect_sender.clone(),
);
if let Some(pid_oneshot) = pid_oneshot {
// someone is waiting with connect, so give them their PID
pid_oneshot.send(Ok(participant)).unwrap();
} else {
// noone is waiting on this Participant, return in to Network
connected_sender.send(participant).await.unwrap();
}
self.metrics.participants_connected_total.inc();
transfer_channel_receiver
.send((cid, frame_sender))
.await
.unwrap();
participants.insert(
pid,
(
transfer_channel_receiver,
frame_send_sender,
shutdown_sender,
),
);
self.participant_from_channel.write().await.insert(cid, pid);
self.pool.spawn_ok(
bparticipant
.run()
.instrument(tracing::info_span!("participant", ?pid)),
);
sender.send(frame_recv_sender).unwrap();
} else {
error!(
"2ND channel of participants opens, but we cannot verify that this is not \
a attack to "
);
//ERROR DEADLOCK AS NO SENDER HERE!
//sender.send(frame_recv_sender).unwrap();
}
//From now on this CHANNEL can receiver other frames! move
// directly to participant!
}
}
trace!("stop channel_activator");
}
// requested by participant when stream wants to close from api, checking if no
// more msg is in prio and return
pub(crate) async fn stream_finished_manager(
@ -447,10 +338,9 @@ impl Scheduler {
&self,
addr: Address,
end_receiver: oneshot::Receiver<()>,
configured_sender: mpsc::UnboundedSender<ConfigureInfo>,
result_sender: oneshot::Sender<io::Result<()>>,
) {
info!(?addr, "start up channel creator");
trace!(?addr, "start up channel creator");
match addr {
Address::Tcp(addr) => {
let listener = match net::TcpListener::bind(addr).await {
@ -478,8 +368,6 @@ impl Scheduler {
let stream = stream.unwrap();
info!("Accepting Tcp from: {}", stream.peer_addr().unwrap());
self.init_protocol(
addr,
&configured_sender,
Protocols::Tcp(TcpProtocol::new(stream, self.metrics.clone())),
None,
true,
@ -521,12 +409,11 @@ impl Scheduler {
listeners.insert(remote_addr.clone(), udp_data_sender);
let protocol = Protocols::Udp(UdpProtocol::new(
socket.clone(),
remote_addr,
remote_addr.clone(),
self.metrics.clone(),
udp_data_receiver,
));
self.init_protocol(addr, &configured_sender, protocol, None, true)
.await;
self.init_protocol(protocol, None, false).await;
}
let udp_data_sender = listeners.get_mut(&remote_addr).unwrap();
udp_data_sender.send(datavec).await.unwrap();
@ -534,7 +421,7 @@ impl Scheduler {
},
_ => unimplemented!(),
}
info!(?addr, "ending channel creator");
trace!(?addr, "ending channel creator");
}
pub(crate) async fn udp_single_channel_connect(
@ -542,7 +429,7 @@ impl Scheduler {
mut udp_data_sender: mpsc::UnboundedSender<Vec<u8>>,
) {
let addr = socket.local_addr();
info!(?addr, "start udp_single_channel_connect");
trace!(?addr, "start udp_single_channel_connect");
//TODO: implement real closing
let (_end_sender, end_receiver) = oneshot::channel::<()>();
@ -558,37 +445,112 @@ impl Scheduler {
datavec.extend_from_slice(&data[0..size]);
udp_data_sender.send(datavec).await.unwrap();
}
info!(?addr, "stop udp_single_channel_connect");
trace!(?addr, "stop udp_single_channel_connect");
}
async fn init_protocol(
&self,
addr: std::net::SocketAddr,
configured_sender: &mpsc::UnboundedSender<ConfigureInfo>,
protocol: Protocols,
pid_sender: Option<oneshot::Sender<io::Result<Participant>>>,
send_handshake: bool,
) {
let (mut part_in_sender, part_in_receiver) = mpsc::unbounded::<Frame>();
//channels are unknown till PID is known!
/* When A connects to a NETWORK, we, the listener answers with a Handshake.
Pro: - Its easier to debug, as someone who opens a port gets a magic number back!
Contra: - DOS posibility because we answer fist
- Speed, because otherwise the message can be send with the creation
*/
let mut participant_channels = self.participant_channels.clone();
// spawn is needed here, e.g. for TCP connect it would mean that only 1
// participant can be in handshake phase ever! Someone could deadlock
// the whole server easily for new clients UDP doesnt work at all, as
// the UDP listening is done in another place.
let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed);
let channel = Channel::new(cid, self.local_pid, self.metrics.clone());
if send_handshake {
channel.send_handshake(&mut part_in_sender).await;
}
self.pool.spawn_ok(
channel
.run(protocol, part_in_receiver, configured_sender.clone())
.instrument(tracing::info_span!("channel", ?addr)),
);
self.unknown_channels
.write()
.await
.insert(cid, (part_in_sender, pid_sender));
let participants = self.participants.clone();
let metrics = self.metrics.clone();
let pool = self.pool.clone();
let local_pid = self.local_pid;
self.pool.spawn_ok(async move {
trace!(?cid, "open channel and be ready for Handshake");
let handshake = Handshake::new(cid, local_pid, metrics.clone(), send_handshake);
match handshake.setup(&protocol).await {
Ok((pid, sid)) => {
trace!(
?cid,
?pid,
"detected that my channel is ready!, activating it :)"
);
let mut participants = participants.write().await;
if !participants.contains_key(&pid) {
debug!(?cid, "new participant connected via a channel");
let (
bparticipant,
stream_open_sender,
stream_opened_receiver,
mut create_channel_sender,
frame_send_sender,
shutdown_sender,
) = BParticipant::new(
pid,
sid,
metrics.clone(),
participant_channels.prios_sender,
participant_channels.stream_finished_request_sender,
);
let participant = Participant::new(
local_pid,
pid,
stream_open_sender,
stream_opened_receiver,
participant_channels.disconnect_sender,
);
metrics.participants_connected_total.inc();
participants.insert(
pid,
(
create_channel_sender.clone(),
frame_send_sender,
shutdown_sender,
),
);
pool.spawn_ok(
bparticipant
.run()
.instrument(tracing::info_span!("participant", ?pid)),
);
//create a new channel within BParticipant and wait for it to run
let (sync_sender, sync_receiver) = oneshot::channel();
create_channel_sender
.send((cid, sid, protocol, sync_sender))
.await
.unwrap();
sync_receiver.await.unwrap();
if let Some(pid_oneshot) = pid_sender {
// someone is waiting with connect, so give them their PID
pid_oneshot.send(Ok(participant)).unwrap();
} else {
// noone is waiting on this Participant, return in to Network
participant_channels
.connected_sender
.send(participant)
.await
.unwrap();
}
} else {
error!(
"2ND channel of participants opens, but we cannot verify that this is \
not a attack to "
);
//ERROR DEADLOCK AS NO SENDER HERE!
//sender.send(frame_recv_sender).unwrap();
}
//From now on this CHANNEL can receiver other frames! move
// directly to participant!
},
Err(()) => {},
}
});
}
}

View File

@ -62,6 +62,36 @@ pub(crate) enum Frame {
Raw(Vec<u8>),
}
impl Frame {
pub fn get_string(&self) -> &str {
match self {
Frame::Handshake {
magic_number: _,
version: _,
} => "Handshake",
Frame::ParticipantId { pid: _ } => "ParticipantId",
Frame::Shutdown => "Shutdown",
Frame::OpenStream {
sid: _,
prio: _,
promises: _,
} => "OpenStream",
Frame::CloseStream { sid: _ } => "CloseStream",
Frame::DataHeader {
mid: _,
sid: _,
length: _,
} => "DataHeader",
Frame::Data {
mid: _,
start: _,
data: _,
} => "Data",
Frame::Raw(_) => "Raw",
}
}
}
#[derive(Debug)]
pub(crate) enum Requestor {
User,
@ -111,13 +141,35 @@ impl Sid {
impl std::fmt::Debug for Pid {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
const BITS_PER_SIXLET: usize = 6;
//only print last 6 chars of number as full u128 logs are unreadable
write!(f, "{}", self.internal.rem_euclid(100000))
const CHAR_COUNT: usize = 6;
for i in 0..CHAR_COUNT {
write!(
f,
"{}",
sixlet_to_str((self.internal >> i * BITS_PER_SIXLET) & 0x3F)
)?;
}
Ok(())
}
}
impl From<Pid> for u128 {
fn from(pid: Pid) -> Self { pid.internal }
impl std::fmt::Display for Pid {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
const BITS_PER_SIXLET: usize = 6;
//only print last 6 chars of number as full u128 logs are unreadable
const CHAR_COUNT: usize = 6;
for i in 0..CHAR_COUNT {
write!(
f,
"{}",
sixlet_to_str((self.internal >> i * BITS_PER_SIXLET) & 0x3F)
)?;
}
Ok(())
}
}
impl std::ops::AddAssign for Sid {
@ -139,3 +191,74 @@ impl std::fmt::Debug for Sid {
impl From<u64> for Sid {
fn from(internal: u64) -> Self { Sid { internal } }
}
#[inline]
fn sixlet_to_str(sixlet: u128) -> char {
match sixlet {
0 => 'A',
1 => 'B',
2 => 'C',
3 => 'D',
4 => 'E',
5 => 'F',
6 => 'G',
7 => 'H',
8 => 'I',
9 => 'J',
10 => 'K',
11 => 'L',
12 => 'M',
13 => 'N',
14 => 'O',
15 => 'P',
16 => 'Q',
17 => 'R',
18 => 'S',
19 => 'T',
20 => 'U',
21 => 'V',
22 => 'W',
23 => 'X',
24 => 'Y',
25 => 'Z',
26 => 'a',
27 => 'b',
28 => 'c',
29 => 'd',
30 => 'e',
31 => 'f',
32 => 'g',
33 => 'h',
34 => 'i',
35 => 'j',
36 => 'k',
37 => 'l',
38 => 'm',
39 => 'n',
40 => 'o',
41 => 'p',
42 => 'q',
43 => 'r',
44 => 's',
45 => 't',
46 => 'u',
47 => 'v',
48 => 'w',
49 => 'x',
50 => 'y',
51 => 'z',
52 => '0',
53 => '1',
54 => '2',
55 => '3',
56 => '4',
57 => '5',
58 => '6',
59 => '7',
60 => '8',
61 => '9',
62 => '+',
63 => '/',
_ => '-',
}
}