diff --git a/network/examples/network-speed/Cargo.toml b/network/examples/network-speed/Cargo.toml index 73977d5523..40d7c22395 100644 --- a/network/examples/network-speed/Cargo.toml +++ b/network/examples/network-speed/Cargo.toml @@ -17,5 +17,5 @@ tracing = "0.1" tracing-subscriber = "0.2.3" bincode = "1.2" prometheus = "0.7" -rouille = "3.0.0" +tiny_http = "0.7.0" serde = { version = "1.0", features = ["derive"] } \ No newline at end of file diff --git a/network/examples/network-speed/src/main.rs b/network/examples/network-speed/src/main.rs index 4c09d9029b..7f1f4ce9b8 100644 --- a/network/examples/network-speed/src/main.rs +++ b/network/examples/network-speed/src/main.rs @@ -90,16 +90,15 @@ fn main() { _ => panic!("invalid mode, run --help!"), }; - let mut m = metrics::SimpleMetrics::new(); let mut background = None; match matches.value_of("mode") { Some("server") => server(address), - Some("client") => client(address, &mut m), + Some("client") => client(address), Some("both") => { let address1 = address.clone(); background = Some(thread::spawn(|| server(address1))); thread::sleep(Duration::from_millis(200)); //start client after server - client(address, &mut m); + client(address); }, _ => panic!("invalid mode, run --help!"), }; @@ -110,7 +109,9 @@ fn main() { fn server(address: Address) { let thread_pool = ThreadPoolBuilder::new().build(); - let server = Network::new(Pid::new(), &thread_pool, None); + let mut metrics = metrics::SimpleMetrics::new(); + let server = Network::new(Pid::new(), &thread_pool, Some(metrics.registry())); + metrics.run("0.0.0.0:59112".parse().unwrap()).unwrap(); block_on(server.listen(address)).unwrap(); loop { @@ -134,8 +135,9 @@ fn server(address: Address) { } } -fn client(address: Address, metrics: &mut metrics::SimpleMetrics) { +fn client(address: Address) { let thread_pool = ThreadPoolBuilder::new().build(); + let mut metrics = metrics::SimpleMetrics::new(); let client = Network::new(Pid::new(), &thread_pool, Some(metrics.registry())); metrics.run("0.0.0.0:59111".parse().unwrap()).unwrap(); @@ -160,7 +162,7 @@ fn client(address: Address, metrics: &mut metrics::SimpleMetrics) { } if id > 2000000 { println!("stop"); - std::thread::sleep(std::time::Duration::from_millis(50)); + std::thread::sleep(std::time::Duration::from_millis(5000)); break; } } diff --git a/network/examples/network-speed/src/metrics.rs b/network/examples/network-speed/src/metrics.rs index e043c751db..e10eb678e0 100644 --- a/network/examples/network-speed/src/metrics.rs +++ b/network/examples/network-speed/src/metrics.rs @@ -1,15 +1,15 @@ -use prometheus::{Encoder, Gauge, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder}; -use rouille::{router, Server}; +use prometheus::{Encoder, Registry, TextEncoder}; +use tiny_http; +use tracing::*; use std::{ - convert::TryInto, error::Error, net::SocketAddr, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicBool, Ordering}, Arc, }, thread, - time::{Duration, SystemTime, UNIX_EPOCH}, + time::Duration, }; pub struct SimpleMetrics { @@ -48,24 +48,23 @@ impl SimpleMetrics { //TODO: make this a job self.handle = Some(thread::spawn(move || { - let server = Server::new(addr, move |request| { - router!(request, - (GET) (/metrics) => { - let encoder = TextEncoder::new(); - let mut buffer = vec![]; - let mf = registry.gather(); - encoder.encode(&mf, &mut buffer).expect("Failed to encoder metrics text."); - rouille::Response::text(String::from_utf8(buffer).expect("Failed to parse bytes as a string.")) - }, - _ => rouille::Response::empty_404() - ) - }) - .expect("Failed to start server"); + let server = tiny_http::Server::http(addr).unwrap(); + const timeout: std::time::Duration = std::time::Duration::from_secs(1); + debug!("starting tiny_http server to serve metrics"); while running2.load(Ordering::Relaxed) { - server.poll(); - // Poll at 10Hz - thread::sleep(Duration::from_millis(100)); + let request = match server.recv_timeout(timeout) { + Ok(Some(rq)) => rq, + Ok(None) => continue, + Err(e) => { println!("error: {}", e); break } + }; + let mf = registry.gather(); + let encoder = TextEncoder::new(); + let mut buffer = vec![]; + encoder.encode(&mf, &mut buffer).expect("Failed to encoder metrics text."); + let response = tiny_http::Response::from_string(String::from_utf8(buffer).expect("Failed to parse bytes as a string.")); + request.respond(response); } + debug!("stopping tiny_http server to serve metrics"); })); Ok(()) } diff --git a/network/src/api.rs b/network/src/api.rs index b57e947a08..137cb1b047 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -92,6 +92,7 @@ impl Network { .run() .instrument(tracing::info_span!("scheduler", ?p)), ); + trace!(?p, ?User, "stopping sheduler and his own thread"); }); Self { local_pid: participant_id, diff --git a/network/src/channel.rs b/network/src/channel.rs index c8869a9352..fdbc06a613 100644 --- a/network/src/channel.rs +++ b/network/src/channel.rs @@ -1,40 +1,86 @@ use crate::{ metrics::NetworkMetrics, protocols::Protocols, - scheduler::ConfigureInfo, types::{ Cid, Frame, Pid, Sid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2, VELOREN_MAGIC_NUMBER, VELOREN_NETWORK_VERSION, }, }; -use async_std::sync::RwLock; use futures::{ channel::{mpsc, oneshot}, + join, sink::SinkExt, stream::StreamExt, + FutureExt, }; use std::sync::Arc; use tracing::*; -//use futures::prelude::*; pub(crate) struct Channel { cid: Cid, - local_pid: Pid, metrics: Arc, - remote_pid: RwLock>, - send_state: RwLock, - recv_state: RwLock, -} - -#[derive(Debug, PartialEq)] -enum ChannelState { - None, - Handshake, - Pid, - Shutdown, + remote_pid: Pid, + to_wire_receiver: Option>, + read_stop_receiver: Option>, } impl Channel { + pub fn new( + cid: u64, + remote_pid: Pid, + metrics: Arc, + ) -> (Self, mpsc::UnboundedSender, oneshot::Sender<()>) { + let (to_wire_sender, to_wire_receiver) = mpsc::unbounded::(); + let (read_stop_sender, read_stop_receiver) = oneshot::channel(); + ( + Self { + cid, + metrics, + remote_pid, + to_wire_receiver: Some(to_wire_receiver), + read_stop_receiver: Some(read_stop_receiver), + }, + to_wire_sender, + read_stop_sender, + ) + } + + pub async fn run( + mut self, + protocol: Protocols, + from_wire_sender: mpsc::UnboundedSender<(Cid, Frame)>, + ) { + let to_wire_receiver = self.to_wire_receiver.take().unwrap(); + let read_stop_receiver = self.read_stop_receiver.take().unwrap(); + + trace!(?self.remote_pid, "start up channel"); + match protocol { + Protocols::Tcp(tcp) => { + futures::join!( + tcp.read(self.cid, from_wire_sender, read_stop_receiver), + tcp.write(self.cid, to_wire_receiver), + ); + }, + Protocols::Udp(udp) => { + futures::join!( + udp.read(self.cid, from_wire_sender, read_stop_receiver), + udp.write(self.cid, to_wire_receiver), + ); + }, + } + + trace!(?self.remote_pid, "shut down channel"); + } +} + +pub(crate) struct Handshake { + cid: Cid, + local_pid: Pid, + init_handshake: bool, + metrics: Arc, +} + +impl Handshake { #[cfg(debug_assertions)] const WRONG_NUMBER: &'static [u8] = "Handshake does not contain the magic number requiered by \ veloren server.\nWe are not sure if you are a valid \ @@ -45,263 +91,226 @@ impl Channel { invalid version.\nWe don't know how to communicate with \ you.\nClosing the connection"; - pub fn new(cid: u64, local_pid: Pid, metrics: Arc) -> Self { + pub fn new( + cid: u64, + local_pid: Pid, + metrics: Arc, + init_handshake: bool, + ) -> Self { Self { cid, local_pid, metrics, - remote_pid: RwLock::new(None), - send_state: RwLock::new(ChannelState::None), - recv_state: RwLock::new(ChannelState::None), + init_handshake, } } - /// (prot|part)_(in|out)_(sender|receiver) - /// prot: TO/FROM PROTOCOL = TCP - /// part: TO/FROM PARTICIPANT - /// in: FROM - /// out: TO - /// sender: mpsc::Sender - /// receiver: mpsc::Receiver - pub async fn run( - self, - protocol: Protocols, - part_in_receiver: mpsc::UnboundedReceiver, - configured_sender: mpsc::UnboundedSender, - ) { - let (prot_in_sender, prot_in_receiver) = mpsc::unbounded::(); - let (prot_out_sender, prot_out_receiver) = mpsc::unbounded::(); + pub async fn setup(self, protocol: &Protocols) -> Result<(Pid, Sid), ()> { + let (to_wire_sender, to_wire_receiver) = mpsc::unbounded::(); + let (from_wire_sender, from_wire_receiver) = mpsc::unbounded::<(Cid, Frame)>(); + let (read_stop_sender, read_stop_receiver) = oneshot::channel(); let handler_future = - self.frame_handler(prot_in_receiver, prot_out_sender, configured_sender); + self.frame_handler(from_wire_receiver, to_wire_sender, read_stop_sender); match protocol { Protocols::Tcp(tcp) => { - futures::join!( - tcp.read(prot_in_sender), - tcp.write(prot_out_receiver, part_in_receiver), + (join! { + tcp.read(self.cid, from_wire_sender, read_stop_receiver), + tcp.write(self.cid, to_wire_receiver).fuse(), handler_future, - ); + }) + .2 }, Protocols::Udp(udp) => { - futures::join!( - udp.read(prot_in_sender), - udp.write(prot_out_receiver, part_in_receiver), + (join! { + udp.read(self.cid, from_wire_sender, read_stop_receiver), + udp.write(self.cid, to_wire_receiver), handler_future, - ); + }) + .2 }, } - - //return part_out_receiver; } - pub async fn frame_handler( + async fn frame_handler( &self, - mut frames: mpsc::UnboundedReceiver, - mut frame_sender: mpsc::UnboundedSender, - mut configured_sender: mpsc::UnboundedSender<( - Cid, - Pid, - Sid, - oneshot::Sender>, - )>, - ) { + mut from_wire_receiver: mpsc::UnboundedReceiver<(Cid, Frame)>, + mut to_wire_sender: mpsc::UnboundedSender, + _read_stop_sender: oneshot::Sender<()>, + ) -> Result<(Pid, Sid), ()> { const ERR_S: &str = "Got A Raw Message, these are usually Debug Messages indicating that \ something went wrong on network layer and connection will be closed"; let mut pid_string = "".to_string(); let cid_string = self.cid.to_string(); - let mut external_frame_sender: Option> = None; - while let Some(frame) = frames.next().await { - match frame { + + if self.init_handshake { + self.send_handshake(&mut to_wire_sender).await; + } + + match from_wire_receiver.next().await { + Some(( + _, Frame::Handshake { magic_number, version, - } => { - trace!(?magic_number, ?version, "recv handshake"); - self.metrics - .frames_in_total - .with_label_values(&["", &cid_string, "Handshake"]) - .inc(); - if self - .verify_handshake(magic_number, version, &mut frame_sender) - .await - .is_ok() + }, + )) => { + trace!(?magic_number, ?version, "recv handshake"); + self.metrics + .frames_in_total + .with_label_values(&["", &cid_string, "Handshake"]) + .inc(); + if magic_number != VELOREN_MAGIC_NUMBER { + error!(?magic_number, "connection with invalid magic_number"); + #[cfg(debug_assertions)] { - debug!("handshake completed"); - *self.recv_state.write().await = ChannelState::Handshake; - if *self.send_state.read().await == ChannelState::Handshake { - self.send_pid(&mut frame_sender).await; - } else { - self.send_handshake(&mut frame_sender).await; - } - }; - }, - Frame::ParticipantId { pid } => { - if self.remote_pid.read().await.is_some() { - error!(?pid, "invalid message, cant change participantId"); - return; + self.metrics + .frames_out_total + .with_label_values(&["", &cid_string, "Raw"]) + .inc(); + debug!("sending client instructions before killing"); + to_wire_sender + .send(Frame::Raw(Self::WRONG_NUMBER.to_vec())) + .await + .unwrap(); + to_wire_sender.send(Frame::Shutdown).await.unwrap(); } - *self.remote_pid.write().await = Some(pid); - *self.recv_state.write().await = ChannelState::Pid; - debug!(?pid, "Participant send their ID"); - let pid_u128: u128 = pid.into(); - pid_string = pid_u128.to_string(); - self.metrics - .frames_in_total - .with_label_values(&[&pid_string, &cid_string, "ParticipantId"]) - .inc(); - let stream_id_offset = if *self.send_state.read().await != ChannelState::Pid { - self.send_pid(&mut frame_sender).await; - STREAM_ID_OFFSET2 - } else { - STREAM_ID_OFFSET1 - }; - info!(?pid, "this channel is now configured!"); - let pid_u128: u128 = pid.into(); - self.metrics - .channels_connected_total - .with_label_values(&[&pid_u128.to_string()]) - .inc(); - let (sender, receiver) = oneshot::channel(); - configured_sender - .send((self.cid, pid, stream_id_offset, sender)) - .await - .unwrap(); - external_frame_sender = Some(receiver.await.unwrap()); - //TODO: this is sync anyway, because we need to wait. so find a better way than - // there channels like direct method call... otherwise a - // frame might jump in before its officially configured yet - debug!( - "STOP, if you read this, fix this error. make this a function isntead a \ - channel here" - ); - }, - Frame::Shutdown => { - info!("shutdown signal received"); - *self.recv_state.write().await = ChannelState::Shutdown; - self.metrics - .channels_disconnected_total - .with_label_values(&[&pid_string]) - .inc(); - self.metrics - .frames_in_total - .with_label_values(&[&pid_string, &cid_string, "Shutdown"]) - .inc(); - }, - /* Sending RAW is only used for debug purposes in case someone write a - * new API against veloren Server! */ - Frame::Raw(bytes) => { - self.metrics - .frames_in_total - .with_label_values(&[&pid_string, &cid_string, "Raw"]) - .inc(); - match std::str::from_utf8(bytes.as_slice()) { - Ok(string) => error!(?string, ERR_S), - _ => error!(?bytes, ERR_S), + return Err(()); + } + if version != VELOREN_NETWORK_VERSION { + error!(?version, "connection with wrong network version"); + #[cfg(debug_assertions)] + { + debug!("sending client instructions before killing"); + self.metrics + .frames_out_total + .with_label_values(&["", &cid_string, "Raw"]) + .inc(); + to_wire_sender + .send(Frame::Raw( + format!( + "{} Our Version: {:?}\nYour Version: {:?}\nClosing the \ + connection", + Self::WRONG_VERSION, + VELOREN_NETWORK_VERSION, + version, + ) + .as_bytes() + .to_vec(), + )) + .await + .unwrap(); + to_wire_sender.send(Frame::Shutdown {}).await.unwrap(); } - }, - _ => { - trace!("forward frame"); - let pid = &pid_string; - match &mut external_frame_sender { - None => error!( - ?pid, - "cannot forward frame, as channel isn't configured correctly!" - ), - Some(sender) => sender.send((self.cid, frame)).await.unwrap(), - }; - }, - } - } + return Err(()); + } + debug!("handshake completed"); + if self.init_handshake { + self.send_pid(&mut to_wire_sender, &pid_string).await; + } else { + self.send_handshake(&mut to_wire_sender).await; + } + }, + Some((_, Frame::Shutdown)) => { + info!("shutdown signal received"); + self.metrics + .frames_in_total + .with_label_values(&[&pid_string, &cid_string, "Shutdown"]) + .inc(); + return Err(()); + }, + Some((_, Frame::Raw(bytes))) => { + self.metrics + .frames_in_total + .with_label_values(&[&pid_string, &cid_string, "Raw"]) + .inc(); + match std::str::from_utf8(bytes.as_slice()) { + Ok(string) => error!(?string, ERR_S), + _ => error!(?bytes, ERR_S), + } + return Err(()); + }, + Some((_, frame)) => { + self.metrics + .frames_in_total + .with_label_values(&[&pid_string, &cid_string, frame.get_string()]) + .inc(); + return Err(()); + }, + None => return Err(()), + }; + + match from_wire_receiver.next().await { + Some((_, Frame::ParticipantId { pid })) => { + debug!(?pid, "Participant send their ID"); + pid_string = pid.to_string(); + self.metrics + .frames_in_total + .with_label_values(&[&pid_string, &cid_string, "ParticipantId"]) + .inc(); + let stream_id_offset = if self.init_handshake { + STREAM_ID_OFFSET1 + } else { + self.send_pid(&mut to_wire_sender, &pid_string).await; + STREAM_ID_OFFSET2 + }; + info!(?pid, "this Handshake is now configured!"); + return Ok((pid, stream_id_offset)); + }, + Some((_, Frame::Shutdown)) => { + info!("shutdown signal received"); + self.metrics + .frames_in_total + .with_label_values(&[&pid_string, &cid_string, "Shutdown"]) + .inc(); + return Err(()); + }, + Some((_, Frame::Raw(bytes))) => { + self.metrics + .frames_in_total + .with_label_values(&[&pid_string, &cid_string, "Raw"]) + .inc(); + match std::str::from_utf8(bytes.as_slice()) { + Ok(string) => error!(?string, ERR_S), + _ => error!(?bytes, ERR_S), + } + return Err(()); + }, + Some((_, frame)) => { + self.metrics + .frames_in_total + .with_label_values(&[&pid_string, &cid_string, frame.get_string()]) + .inc(); + return Err(()); + }, + None => return Err(()), + }; } - async fn verify_handshake( - &self, - magic_number: [u8; 7], - version: [u32; 3], - #[cfg(debug_assertions)] frame_sender: &mut mpsc::UnboundedSender, - #[cfg(not(debug_assertions))] _: &mut mpsc::UnboundedSender, - ) -> Result<(), ()> { - if magic_number != VELOREN_MAGIC_NUMBER { - error!(?magic_number, "connection with invalid magic_number"); - #[cfg(debug_assertions)] - { - debug!("sending client instructions before killing"); - frame_sender - .send(Frame::Raw(Self::WRONG_NUMBER.to_vec())) - .await - .unwrap(); - frame_sender.send(Frame::Shutdown).await.unwrap(); - *self.send_state.write().await = ChannelState::Shutdown; - } - return Err(()); - } - if version != VELOREN_NETWORK_VERSION { - error!(?version, "connection with wrong network version"); - #[cfg(debug_assertions)] - { - debug!("sending client instructions before killing"); - frame_sender - .send(Frame::Raw( - format!( - "{} Our Version: {:?}\nYour Version: {:?}\nClosing the connection", - Self::WRONG_VERSION, - VELOREN_NETWORK_VERSION, - version, - ) - .as_bytes() - .to_vec(), - )) - .await - .unwrap(); - frame_sender.send(Frame::Shutdown {}).await.unwrap(); - *self.send_state.write().await = ChannelState::Shutdown; - } - return Err(()); - } - Ok(()) - } - - pub(crate) async fn send_handshake(&self, part_in_sender: &mut mpsc::UnboundedSender) { - part_in_sender + async fn send_handshake(&self, to_wire_sender: &mut mpsc::UnboundedSender) { + self.metrics + .frames_out_total + .with_label_values(&["", &self.cid.to_string(), "Handshake"]) + .inc(); + to_wire_sender .send(Frame::Handshake { magic_number: VELOREN_MAGIC_NUMBER, version: VELOREN_NETWORK_VERSION, }) .await .unwrap(); - *self.send_state.write().await = ChannelState::Handshake; } - pub(crate) async fn send_pid(&self, part_in_sender: &mut mpsc::UnboundedSender) { - part_in_sender + async fn send_pid(&self, to_wire_sender: &mut mpsc::UnboundedSender, pid_string: &str) { + self.metrics + .frames_out_total + .with_label_values(&[pid_string, &self.cid.to_string(), "ParticipantId"]) + .inc(); + to_wire_sender .send(Frame::ParticipantId { pid: self.local_pid, }) .await .unwrap(); - *self.send_state.write().await = ChannelState::Pid; } - /* - pub async fn run(&mut self) { - //let (incomming_sender, incomming_receiver) = mpsc::unbounded(); - futures::join!(self.listen_manager(), self.send_outgoing()); - } - - pub async fn listen_manager(&self) { - let (mut listen_sender, mut listen_receiver) = mpsc::unbounded::
(); - - while self.closed.load(Ordering::Relaxed) { - while let Some(address) = listen_receiver.next().await { - let (end_sender, end_receiver) = oneshot::channel::<()>(); - task::spawn(channel_creator(address, end_receiver)); - } - } - } - - pub async fn send_outgoing(&self) { - //let prios = prios::PrioManager; - while self.closed.load(Ordering::Relaxed) { - - } - }*/ } diff --git a/network/src/metrics.rs b/network/src/metrics.rs index e18eb50121..79e951151a 100644 --- a/network/src/metrics.rs +++ b/network/src/metrics.rs @@ -1,3 +1,4 @@ +use crate::types::Pid; use prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry}; use std::error::Error; @@ -16,9 +17,12 @@ pub struct NetworkMetrics { pub streams_opened_total: IntCounterVec, pub streams_closed_total: IntCounterVec, pub network_info: IntGauge, - // Frames, seperated by CHANNEL (and PARTICIPANT) AND FRAME TYPE, + // Frames counted a channel level, seperated by CHANNEL (and PARTICIPANT) AND FRAME TYPE, pub frames_out_total: IntCounterVec, pub frames_in_total: IntCounterVec, + // Frames counted at protocol level, seperated by CHANNEL (and PARTICIPANT) AND FRAME TYPE, + pub frames_wire_out_total: IntCounterVec, + pub frames_wire_in_total: IntCounterVec, pub frames_count: IntGaugeVec, // send Messages, seperated by STREAM (and PARTICIPANT, CHANNEL), pub message_count: IntGaugeVec, @@ -38,7 +42,7 @@ pub struct NetworkMetrics { impl NetworkMetrics { #[allow(dead_code)] - pub fn new() -> Result> { + pub fn new(local_pid: &Pid) -> Result> { let listen_requests_total = IntCounterVec::new( Opts::new( "listen_requests_total", @@ -89,27 +93,46 @@ impl NetworkMetrics { ), &["participant"], )?; - let opts = Opts::new("network_info", "Static Network information").const_label( - "version", - &format!( - "{}.{}.{}", - &crate::types::VELOREN_NETWORK_VERSION[0], - &crate::types::VELOREN_NETWORK_VERSION[1], - &crate::types::VELOREN_NETWORK_VERSION[2] - ), - ); + let opts = Opts::new("network_info", "Static Network information") + .const_label( + "version", + &format!( + "{}.{}.{}", + &crate::types::VELOREN_NETWORK_VERSION[0], + &crate::types::VELOREN_NETWORK_VERSION[1], + &crate::types::VELOREN_NETWORK_VERSION[2] + ), + ) + .const_label("local_pid", &format!("{}", &local_pid)); let network_info = IntGauge::with_opts(opts)?; let frames_out_total = IntCounterVec::new( - Opts::new("frames_out_total", "number of all frames send per channel"), + Opts::new( + "frames_out_total", + "number of all frames send per channel, at the channel level", + ), &["participant", "channel", "frametype"], )?; let frames_in_total = IntCounterVec::new( Opts::new( "frames_in_total", - "number of all frames received per channel", + "number of all frames received per channel, at the channel level", ), &["participant", "channel", "frametype"], )?; + let frames_wire_out_total = IntCounterVec::new( + Opts::new( + "frames_wire_out_total", + "number of all frames send per channel, at the protocol level", + ), + &["channel", "frametype"], + )?; + let frames_wire_in_total = IntCounterVec::new( + Opts::new( + "frames_wire_in_total", + "number of all frames received per channel, at the protocol level", + ), + &["channel", "frametype"], + )?; let frames_count = IntGaugeVec::new( Opts::new( @@ -170,6 +193,8 @@ impl NetworkMetrics { network_info, frames_out_total, frames_in_total, + frames_wire_out_total, + frames_wire_in_total, frames_count, message_count, bytes_send, @@ -189,9 +214,11 @@ impl NetworkMetrics { registry.register(Box::new(self.channels_disconnected_total.clone()))?; registry.register(Box::new(self.streams_opened_total.clone()))?; registry.register(Box::new(self.streams_closed_total.clone()))?; - registry.register(Box::new(self.network_info.clone()))?; registry.register(Box::new(self.frames_out_total.clone()))?; registry.register(Box::new(self.frames_in_total.clone()))?; + registry.register(Box::new(self.frames_wire_out_total.clone()))?; + registry.register(Box::new(self.frames_wire_in_total.clone()))?; + registry.register(Box::new(self.network_info.clone()))?; registry.register(Box::new(self.frames_count.clone()))?; registry.register(Box::new(self.message_count.clone()))?; registry.register(Box::new(self.bytes_send.clone()))?; diff --git a/network/src/participant.rs b/network/src/participant.rs index 84716abc02..d2a6d7f1be 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -1,7 +1,9 @@ use crate::{ api::Stream, + channel::Channel, message::{InCommingMessage, MessageBuffer, OutGoingMessage}, metrics::NetworkMetrics, + protocols::Protocols, types::{Cid, Frame, Pid, Prio, Promises, Sid}, }; use async_std::sync::RwLock; @@ -25,8 +27,7 @@ use tracing::*; struct ControlChannels { stream_open_receiver: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender)>, stream_opened_sender: mpsc::UnboundedSender, - transfer_channel_receiver: mpsc::UnboundedReceiver<(Cid, mpsc::UnboundedSender)>, - frame_recv_receiver: mpsc::UnboundedReceiver<(Cid, Frame)>, + create_channel_receiver: mpsc::UnboundedReceiver<(Cid, Sid, Protocols, oneshot::Sender<()>)>, shutdown_api_receiver: mpsc::UnboundedReceiver, shutdown_api_sender: mpsc::UnboundedSender, send_outgoing: Arc>>, //api @@ -39,7 +40,7 @@ struct ControlChannels { pub struct BParticipant { remote_pid: Pid, offset_sid: Sid, - channels: RwLock)>>, + channels: Arc)>>>, streams: RwLock< HashMap< Sid, @@ -66,26 +67,23 @@ impl BParticipant { Self, mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender)>, mpsc::UnboundedReceiver, - mpsc::UnboundedSender<(Cid, mpsc::UnboundedSender)>, - mpsc::UnboundedSender<(Cid, Frame)>, + mpsc::UnboundedSender<(Cid, Sid, Protocols, oneshot::Sender<()>)>, mpsc::UnboundedSender<(Pid, Sid, Frame)>, oneshot::Sender<()>, ) { let (stream_open_sender, stream_open_receiver) = mpsc::unbounded::<(Prio, Promises, oneshot::Sender)>(); let (stream_opened_sender, stream_opened_receiver) = mpsc::unbounded::(); - let (transfer_channel_sender, transfer_channel_receiver) = - mpsc::unbounded::<(Cid, mpsc::UnboundedSender)>(); - let (frame_recv_sender, frame_recv_receiver) = mpsc::unbounded::<(Cid, Frame)>(); let (shutdown_api_sender, shutdown_api_receiver) = mpsc::unbounded(); let (frame_send_sender, frame_send_receiver) = mpsc::unbounded::<(Pid, Sid, Frame)>(); let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + let (create_channel_sender, create_channel_receiver) = + mpsc::unbounded::<(Cid, Sid, Protocols, oneshot::Sender<()>)>(); let run_channels = Some(ControlChannels { stream_open_receiver, stream_opened_sender, - transfer_channel_receiver, - frame_recv_receiver, + create_channel_receiver, shutdown_api_receiver, shutdown_api_sender, send_outgoing: Arc::new(Mutex::new(send_outgoing)), @@ -98,15 +96,14 @@ impl BParticipant { Self { remote_pid, offset_sid, - channels: RwLock::new(vec![]), + channels: Arc::new(RwLock::new(vec![])), streams: RwLock::new(HashMap::new()), run_channels, metrics, }, stream_open_sender, stream_opened_receiver, - transfer_channel_sender, - frame_recv_sender, + create_channel_sender, frame_send_sender, shutdown_sender, ) @@ -118,10 +115,10 @@ impl BParticipant { let (shutdown_open_manager_sender, shutdown_open_manager_receiver) = oneshot::channel(); let (shutdown_stream_close_manager_sender, shutdown_stream_close_manager_receiver) = oneshot::channel(); + let (frame_from_wire_sender, frame_from_wire_receiver) = mpsc::unbounded::<(Cid, Frame)>(); let run_channels = self.run_channels.take().unwrap(); futures::join!( - self.transfer_channel_manager(run_channels.transfer_channel_receiver), self.open_manager( run_channels.stream_open_receiver, run_channels.shutdown_api_sender.clone(), @@ -129,11 +126,15 @@ impl BParticipant { shutdown_open_manager_receiver, ), self.handle_frames( - run_channels.frame_recv_receiver, + frame_from_wire_receiver, run_channels.stream_opened_sender, run_channels.shutdown_api_sender, run_channels.send_outgoing.clone(), ), + self.create_channel_manager( + run_channels.create_channel_receiver, + frame_from_wire_sender, + ), self.send_manager(run_channels.frame_send_receiver), self.stream_close_manager( run_channels.shutdown_api_receiver, @@ -153,7 +154,15 @@ impl BParticipant { async fn send_frame(&self, frame: Frame) { // find out ideal channel here //TODO: just take first - if let Some((_cid, channel)) = self.channels.write().await.get_mut(0) { + if let Some((cid, channel)) = self.channels.write().await.get_mut(0) { + self.metrics + .frames_out_total + .with_label_values(&[ + &self.remote_pid.to_string(), + &cid.to_string(), + frame.get_string(), + ]) + .inc(); channel.send(frame).await.unwrap(); } else { error!("participant has no channel to communicate on"); @@ -162,7 +171,7 @@ impl BParticipant { async fn handle_frames( &self, - mut frame_recv_receiver: mpsc::UnboundedReceiver<(Cid, Frame)>, + mut frame_from_wire_receiver: mpsc::UnboundedReceiver<(Cid, Frame)>, mut stream_opened_sender: mpsc::UnboundedSender, shutdown_api_sender: mpsc::UnboundedSender, send_outgoing: Arc>>, @@ -170,10 +179,14 @@ impl BParticipant { trace!("start handle_frames"); let send_outgoing = { send_outgoing.lock().unwrap().clone() }; let mut messages = HashMap::new(); - let pid_u128: u128 = self.remote_pid.into(); - let pid_string = pid_u128.to_string(); - while let Some((cid, frame)) = frame_recv_receiver.next().await { - debug!("handling frame"); + let pid_string = &self.remote_pid.to_string(); + while let Some((cid, frame)) = frame_from_wire_receiver.next().await { + let cid_string = cid.to_string(); + trace!("handling frame"); + self.metrics + .frames_in_total + .with_label_values(&[&pid_string, &cid_string, frame.get_string()]) + .inc(); match frame { Frame::OpenStream { sid, @@ -185,9 +198,6 @@ impl BParticipant { .create_stream(sid, prio, promises, send_outgoing, &shutdown_api_sender) .await; stream_opened_sender.send(stream).await.unwrap(); - //TODO: Metrics - //self.metrics.frames_in_total.with_label_values(&[&pid_string, &cid_string, - // "Raw"]).inc(); trace!("opened frame from remote"); }, Frame::CloseStream { sid } => { @@ -197,10 +207,9 @@ impl BParticipant { // is dropped, so i need a way to notify the Stream that it's send messages will // be dropped... from remote, notify local if let Some((_, _, _, closed)) = self.streams.write().await.remove(&sid) { - let pid_u128: u128 = self.remote_pid.into(); self.metrics .streams_closed_total - .with_label_values(&[&pid_u128.to_string()]) + .with_label_values(&[&pid_string]) .inc(); closed.store(true, Ordering::Relaxed); } else { @@ -249,6 +258,40 @@ impl BParticipant { trace!("stop handle_frames"); } + async fn create_channel_manager( + &self, + channels_receiver: mpsc::UnboundedReceiver<(Cid, Sid, Protocols, oneshot::Sender<()>)>, + frame_from_wire_sender: mpsc::UnboundedSender<(Cid, Frame)>, + ) { + trace!("start channel_manager"); + channels_receiver + .for_each_concurrent(None, |(cid, sid, protocol, sender)| { + // This channel is now configured, and we are running it in scope of the + // participant. + let frame_from_wire_sender = frame_from_wire_sender.clone(); + let channels = self.channels.clone(); + async move { + let (channel, frame_to_wire_sender, shutdown_sender) = + Channel::new(cid, self.remote_pid, self.metrics.clone()); + channels.write().await.push((cid, frame_to_wire_sender)); + sender.send(()).unwrap(); + self.metrics + .channels_connected_total + .with_label_values(&[&self.remote_pid.to_string()]) + .inc(); + channel.run(protocol, frame_from_wire_sender).await; + self.metrics + .channels_disconnected_total + .with_label_values(&[&self.remote_pid.to_string()]) + .inc(); + trace!(?cid, "channel got closed"); + shutdown_sender.send(()).unwrap(); + } + }) + .await; + trace!("stop channel_manager"); + } + async fn send_manager( &self, mut frame_send_receiver: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>, @@ -260,18 +303,6 @@ impl BParticipant { trace!("stop send_manager"); } - async fn transfer_channel_manager( - &self, - mut transfer_channel_receiver: mpsc::UnboundedReceiver<(Cid, mpsc::UnboundedSender)>, - ) { - trace!("start transfer_channel_manager"); - while let Some((cid, sender)) = transfer_channel_receiver.next().await { - debug!(?cid, "got a new channel to listen on"); - self.channels.write().await.push((cid, sender)); - } - trace!("stop transfer_channel_manager"); - } - async fn open_manager( &self, mut stream_open_receiver: mpsc::UnboundedReceiver<( @@ -369,10 +400,9 @@ impl BParticipant { .unwrap(); receiver.await.unwrap(); trace!(?sid, "stream was successfully flushed"); - let pid_u128: u128 = self.remote_pid.into(); self.metrics .streams_closed_total - .with_label_values(&[&pid_u128.to_string()]) + .with_label_values(&[&self.remote_pid.to_string()]) .inc(); self.streams.write().await.remove(&sid); @@ -396,10 +426,9 @@ impl BParticipant { .write() .await .insert(sid, (prio, promises, msg_recv_sender, closed.clone())); - let pid_u128: u128 = self.remote_pid.into(); self.metrics .streams_opened_total - .with_label_values(&[&pid_u128.to_string()]) + .with_label_values(&[&self.remote_pid.to_string()]) .inc(); Stream::new( self.remote_pid, diff --git a/network/src/protocols.rs b/network/src/protocols.rs index 92fcd5cf4a..d70890db7d 100644 --- a/network/src/protocols.rs +++ b/network/src/protocols.rs @@ -1,13 +1,19 @@ use crate::{ metrics::NetworkMetrics, - types::{Frame, Mid, Pid, Sid}, + types::{Cid, Frame, Mid, Pid, Sid}, }; use async_std::{ net::{TcpStream, UdpSocket}, prelude::*, sync::RwLock, }; -use futures::{channel::mpsc, future::FutureExt, select, sink::SinkExt, stream::StreamExt}; +use futures::{ + channel::{mpsc, oneshot}, + future::FutureExt, + select, + sink::SinkExt, + stream::StreamExt, +}; use std::{net::SocketAddr, sync::Arc}; use tracing::*; @@ -51,12 +57,23 @@ impl TcpProtocol { Self { stream, metrics } } - pub async fn read(&self, mut frame_handler: mpsc::UnboundedSender) { + pub async fn read( + &self, + cid: Cid, + mut from_wire_sender: mpsc::UnboundedSender<(Cid, Frame)>, + end_receiver: oneshot::Receiver<()>, + ) { + trace!("starting up tcp write()"); let mut stream = self.stream.clone(); + let mut end_receiver = end_receiver.fuse(); loop { let mut bytes = [0u8; 1]; - if stream.read_exact(&mut bytes).await.is_err() { - info!("tcp channel closed, shutting down read"); + let r = select! { + r = stream.read_exact(&mut bytes).fuse() => r, + _ = end_receiver => break, + }; + if r.is_err() { + info!("tcp stream closed, shutting down read"); break; } let frame_no = bytes[0]; @@ -156,7 +173,11 @@ impl TcpProtocol { Frame::Raw(data) }, }; - frame_handler.send(frame).await.unwrap(); + self.metrics + .frames_wire_in_total + .with_label_values(&[&cid.to_string(), frame.get_string()]) + .inc(); + from_wire_sender.send((cid, frame)).await.unwrap(); } trace!("shutting down tcp read()"); } @@ -164,16 +185,15 @@ impl TcpProtocol { //dezerialize here as this is executed in a seperate thread PER channel. // Limites Throughput per single Receiver but stays in same thread (maybe as its // in a threadpool) for TCP, UDP and MPSC - pub async fn write( - &self, - mut internal_frame_receiver: mpsc::UnboundedReceiver, - mut external_frame_receiver: mpsc::UnboundedReceiver, - ) { + pub async fn write(&self, cid: Cid, mut to_wire_receiver: mpsc::UnboundedReceiver) { + trace!("starting up tcp write()"); let mut stream = self.stream.clone(); - while let Some(frame) = select! { - next = internal_frame_receiver.next().fuse() => next, - next = external_frame_receiver.next().fuse() => next, - } { + let cid_string = cid.to_string(); + while let Some(frame) = to_wire_receiver.next().await { + self.metrics + .frames_wire_out_total + .with_label_values(&[&cid_string, frame.get_string()]) + .inc(); match frame { Frame::Handshake { magic_number, @@ -269,9 +289,19 @@ impl UdpProtocol { } } - pub async fn read(&self, mut frame_handler: mpsc::UnboundedSender) { + pub async fn read( + &self, + cid: Cid, + mut from_wire_sender: mpsc::UnboundedSender<(Cid, Frame)>, + end_receiver: oneshot::Receiver<()>, + ) { + trace!("starting up udp read()"); let mut data_in = self.data_in.write().await; - while let Some(bytes) = data_in.next().await { + let mut end_receiver = end_receiver.fuse(); + while let Some(bytes) = select! { + r = data_in.next().fuse() => r, + _ = end_receiver => None, + } { trace!("got raw UDP message with len: {}", bytes.len()); let frame_no = bytes[0]; let frame = match frame_no { @@ -351,7 +381,6 @@ impl UdpProtocol { Frame::Data { mid, start, data } }, FRAME_RAW => { - error!("Uffff"); let length = u16::from_le_bytes([bytes[1], bytes[2]]); let mut data = vec![0; length as usize]; data.copy_from_slice(&bytes[3..]); @@ -359,60 +388,24 @@ impl UdpProtocol { }, _ => Frame::Raw(bytes), }; - frame_handler.send(frame).await.unwrap(); + self.metrics + .frames_wire_in_total + .with_label_values(&[&cid.to_string(), frame.get_string()]) + .inc(); + from_wire_sender.send((cid, frame)).await.unwrap(); } - /* - let mut data_in = self.data_in.write().await; - let mut buffer = NetworkBuffer::new(); - while let Some(data) = data_in.next().await { - let n = data.len(); - let slice = &mut buffer.get_write_slice(n)[0..n]; //get_write_slice can return more then n! - slice.clone_from_slice(data.as_slice()); - buffer.actually_written(n); - trace!("incomming message with len: {}", n); - let slice = buffer.get_read_slice(); - let mut cur = std::io::Cursor::new(slice); - let mut read_ok = 0; - while cur.position() < n as u64 { - let round_start = cur.position() as usize; - let r: Result = bincode::deserialize_from(&mut cur); - match r { - Ok(frame) => { - frame_handler.send(frame).await.unwrap(); - read_ok = cur.position() as usize; - }, - Err(e) => { - // Probably we have to wait for moare data! - let first_bytes_of_msg = - &slice[round_start..std::cmp::min(n, round_start + 16)]; - debug!( - ?buffer, - ?e, - ?n, - ?round_start, - ?first_bytes_of_msg, - "message cant be parsed, probably because we need to wait for more \ - data" - ); - break; - }, - } - } - buffer.actually_read(read_ok); - }*/ trace!("shutting down udp read()"); } - pub async fn write( - &self, - mut internal_frame_receiver: mpsc::UnboundedReceiver, - mut external_frame_receiver: mpsc::UnboundedReceiver, - ) { + pub async fn write(&self, cid: Cid, mut to_wire_receiver: mpsc::UnboundedReceiver) { + trace!("starting up udp write()"); let mut buffer = [0u8; 2000]; - while let Some(frame) = select! { - next = internal_frame_receiver.next().fuse() => next, - next = external_frame_receiver.next().fuse() => next, - } { + let cid_string = cid.to_string(); + while let Some(frame) = to_wire_receiver.next().await { + self.metrics + .frames_wire_out_total + .with_label_values(&[&cid_string, frame.get_string()]) + .inc(); let len = match frame { Frame::Handshake { magic_number, @@ -602,116 +595,5 @@ impl UdpProtocol { } } trace!("shutting down udp write()"); - /* - let mut buffer = NetworkBuffer::new(); - while let Some(frame) = select! { - next = internal_frame_receiver.next().fuse() => next, - next = external_frame_receiver.next().fuse() => next, - } { - let len = bincode::serialized_size(&frame).unwrap() as usize; - match bincode::serialize_into(buffer.get_write_slice(len), &frame) { - Ok(_) => buffer.actually_written(len), - Err(e) => error!("Oh nooo {}", e), - }; - trace!(?len, "going to send frame via Udp"); - let mut to_send = buffer.get_read_slice(); - while to_send.len() > 0 { - match self.socket.send_to(to_send, self.remote_addr).await { - Ok(n) => buffer.actually_read(n), - Err(e) => error!(?e, "need to handle that error!"), - } - to_send = buffer.get_read_slice(); - } - } - */ } } - -// INTERNAL NetworkBuffer -/* -struct NetworkBuffer { - pub(crate) data: Vec, - pub(crate) read_idx: usize, - pub(crate) write_idx: usize, -} - -/// NetworkBuffer to use for streamed access -/// valid data is between read_idx and write_idx! -/// everything before read_idx is already processed and no longer important -/// everything after write_idx is either 0 or random data buffered -impl NetworkBuffer { - fn new() -> Self { - NetworkBuffer { - data: vec![0; 2048], - read_idx: 0, - write_idx: 0, - } - } - - fn get_write_slice(&mut self, min_size: usize) -> &mut [u8] { - if self.data.len() < self.write_idx + min_size { - trace!( - ?self, - ?min_size, - "need to resize because buffer is to small" - ); - self.data.resize(self.write_idx + min_size, 0); - } - &mut self.data[self.write_idx..] - } - - fn actually_written(&mut self, cnt: usize) { self.write_idx += cnt; } - - fn get_read_slice(&self) -> &[u8] { &self.data[self.read_idx..self.write_idx] } - - fn actually_read(&mut self, cnt: usize) { - self.read_idx += cnt; - if self.read_idx == self.write_idx { - if self.read_idx > 10485760 { - trace!(?self, "buffer empty, resetting indices"); - } - self.read_idx = 0; - self.write_idx = 0; - } - if self.write_idx > 10485760 { - if self.write_idx - self.read_idx < 65536 { - debug!( - ?self, - "This buffer is filled over 10 MB, but the actual data diff is less then \ - 65kB, which is a sign of stressing this connection much as always new data \ - comes in - nevertheless, in order to handle this we will remove some data \ - now so that this buffer doesn't grow endlessly" - ); - let mut i2 = 0; - for i in self.read_idx..self.write_idx { - self.data[i2] = self.data[i]; - i2 += 1; - } - self.read_idx = 0; - self.write_idx = i2; - } - if self.data.len() > 67108864 { - warn!( - ?self, - "over 64Mbyte used, something seems fishy, len: {}", - self.data.len() - ); - } - } - } -} - -impl std::fmt::Debug for NetworkBuffer { - #[inline] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "NetworkBuffer(len: {}, read: {}, write: {})", - self.data.len(), - self.read_idx, - self.write_idx - ) - } -} - -*/ diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index e53acf2ac3..799da284c6 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -1,6 +1,6 @@ use crate::{ api::{Address, Participant}, - channel::Channel, + channel::Handshake, message::OutGoingMessage, metrics::NetworkMetrics, participant::BParticipant, @@ -30,31 +30,28 @@ use std::{ }; use tracing::*; use tracing_futures::Instrument; -//use futures::prelude::*; type ParticipantInfo = ( - mpsc::UnboundedSender<(Cid, mpsc::UnboundedSender)>, + mpsc::UnboundedSender<(Cid, Sid, Protocols, oneshot::Sender<()>)>, mpsc::UnboundedSender<(Pid, Sid, Frame)>, oneshot::Sender<()>, ); -type UnknownChannelInfo = ( - mpsc::UnboundedSender, - Option>>, -); -pub(crate) type ConfigureInfo = ( - Cid, - Pid, - Sid, - oneshot::Sender>, -); #[derive(Debug)] struct ControlChannels { listen_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender>)>, connect_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender>)>, - connected_sender: mpsc::UnboundedSender, shutdown_receiver: oneshot::Receiver<()>, + disconnect_receiver: mpsc::UnboundedReceiver, + stream_finished_request_receiver: mpsc::UnboundedReceiver<(Pid, Sid, oneshot::Sender<()>)>, +} + +#[derive(Debug, Clone)] +struct ParticipantChannels { + connected_sender: mpsc::UnboundedSender, + disconnect_sender: mpsc::UnboundedSender, prios_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, + stream_finished_request_sender: mpsc::UnboundedSender<(Pid, Sid, oneshot::Sender<()>)>, } #[derive(Debug)] @@ -63,11 +60,10 @@ pub struct Scheduler { closed: AtomicBool, pool: Arc, run_channels: Option, + participant_channels: ParticipantChannels, participants: Arc>>, - participant_from_channel: Arc>>, channel_ids: Arc, channel_listener: RwLock>>, - unknown_channels: Arc>>, prios: Arc>, metrics: Arc, } @@ -90,16 +86,25 @@ impl Scheduler { let (connected_sender, connected_receiver) = mpsc::unbounded::(); let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>(); let (prios, prios_sender) = PrioManager::new(); + let (disconnect_sender, disconnect_receiver) = mpsc::unbounded::(); + let (stream_finished_request_sender, stream_finished_request_receiver) = mpsc::unbounded(); let run_channels = Some(ControlChannels { listen_receiver, connect_receiver, - connected_sender, shutdown_receiver, - prios_sender, + disconnect_receiver, + stream_finished_request_receiver, }); - let metrics = Arc::new(NetworkMetrics::new().unwrap()); + let participant_channels = ParticipantChannels { + disconnect_sender, + stream_finished_request_sender, + connected_sender, + prios_sender, + }; + + let metrics = Arc::new(NetworkMetrics::new(&local_pid).unwrap()); if let Some(registry) = registry { metrics.register(registry).unwrap(); } @@ -110,11 +115,10 @@ impl Scheduler { closed: AtomicBool::new(false), pool: Arc::new(ThreadPool::new().unwrap()), run_channels, + participant_channels, participants: Arc::new(RwLock::new(HashMap::new())), - participant_from_channel: Arc::new(RwLock::new(HashMap::new())), channel_ids: Arc::new(AtomicU64::new(0)), channel_listener: RwLock::new(HashMap::new()), - unknown_channels: Arc::new(RwLock::new(HashMap::new())), prios: Arc::new(Mutex::new(prios)), metrics, }, @@ -126,38 +130,26 @@ impl Scheduler { } pub async fn run(mut self) { - let (configured_sender, configured_receiver) = mpsc::unbounded::(); - let (disconnect_sender, disconnect_receiver) = mpsc::unbounded::(); - let (stream_finished_request_sender, stream_finished_request_receiver) = mpsc::unbounded(); let run_channels = self.run_channels.take().unwrap(); futures::join!( - self.listen_manager(run_channels.listen_receiver, configured_sender.clone(),), - self.connect_manager(run_channels.connect_receiver, configured_sender,), - self.disconnect_manager(disconnect_receiver,), + self.listen_manager(run_channels.listen_receiver), + self.connect_manager(run_channels.connect_receiver), + self.disconnect_manager(run_channels.disconnect_receiver), self.send_outgoing(), - self.stream_finished_manager(stream_finished_request_receiver), + self.stream_finished_manager(run_channels.stream_finished_request_receiver), self.shutdown_manager(run_channels.shutdown_receiver), - self.channel_configurer( - run_channels.connected_sender, - configured_receiver, - disconnect_sender, - run_channels.prios_sender.clone(), - stream_finished_request_sender.clone(), - ), ); } async fn listen_manager( &self, listen_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender>)>, - configured_sender: mpsc::UnboundedSender, ) { trace!("start listen_manager"); listen_receiver .for_each_concurrent(None, |(address, result_sender)| { let address = address.clone(); - let configured_sender = configured_sender.clone(); async move { debug!(?address, "got request to open a channel_creator"); @@ -174,13 +166,8 @@ impl Scheduler { .write() .await .insert(address.clone(), end_sender); - self.channel_creator( - address, - end_receiver, - configured_sender.clone(), - result_sender, - ) - .await; + self.channel_creator(address, end_receiver, result_sender) + .await; } }) .await; @@ -193,11 +180,10 @@ impl Scheduler { Address, oneshot::Sender>, )>, - configured_sender: mpsc::UnboundedSender, ) { trace!("start connect_manager"); while let Some((addr, pid_sender)) = connect_receiver.next().await { - let (addr, protocol, handshake) = match addr { + let (protocol, handshake) = match addr { Address::Tcp(addr) => { self.metrics .connect_requests_total @@ -212,7 +198,7 @@ impl Scheduler { }; info!("Connecting Tcp to: {}", stream.peer_addr().unwrap()); let protocol = Protocols::Tcp(TcpProtocol::new(stream, self.metrics.clone())); - (addr, protocol, false) + (protocol, false) }, Address::Udp(addr) => { self.metrics @@ -242,18 +228,12 @@ impl Scheduler { Self::udp_single_channel_connect(socket.clone(), udp_data_sender) .instrument(tracing::info_span!("udp", ?addr)), ); - (addr, protocol, true) + (protocol, true) }, _ => unimplemented!(), }; - self.init_protocol( - addr, - &configured_sender, - protocol, - Some(pid_sender), - handshake, - ) - .await; + self.init_protocol(protocol, Some(pid_sender), handshake) + .await; } trace!("stop connect_manager"); } @@ -296,95 +276,6 @@ impl Scheduler { trace!("stop send_outgoing"); } - //TODO: //ERROR CHECK IF THIS SHOULD BE PUT IN A ASYNC FUNC WHICH IS SEND OVER - // TO CHANNEL OR NOT FOR RETURN VALUE! - - async fn channel_configurer( - &self, - mut connected_sender: mpsc::UnboundedSender, - mut receiver: mpsc::UnboundedReceiver, - disconnect_sender: mpsc::UnboundedSender, - prios_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, - stream_finished_request_sender: mpsc::UnboundedSender<(Pid, Sid, oneshot::Sender<()>)>, - ) { - trace!("start channel_activator"); - while let Some((cid, pid, offset_sid, sender)) = receiver.next().await { - if let Some((frame_sender, pid_oneshot)) = - self.unknown_channels.write().await.remove(&cid) - { - trace!( - ?cid, - ?pid, - "detected that my channel is ready!, activating it :)" - ); - let mut participants = self.participants.write().await; - if !participants.contains_key(&pid) { - debug!(?cid, "new participant connected via a channel"); - let ( - bparticipant, - stream_open_sender, - stream_opened_receiver, - mut transfer_channel_receiver, - frame_recv_sender, - frame_send_sender, - shutdown_sender, - ) = BParticipant::new( - pid, - offset_sid, - self.metrics.clone(), - prios_sender.clone(), - stream_finished_request_sender.clone(), - ); - - let participant = Participant::new( - self.local_pid, - pid, - stream_open_sender, - stream_opened_receiver, - disconnect_sender.clone(), - ); - if let Some(pid_oneshot) = pid_oneshot { - // someone is waiting with connect, so give them their PID - pid_oneshot.send(Ok(participant)).unwrap(); - } else { - // noone is waiting on this Participant, return in to Network - connected_sender.send(participant).await.unwrap(); - } - self.metrics.participants_connected_total.inc(); - transfer_channel_receiver - .send((cid, frame_sender)) - .await - .unwrap(); - participants.insert( - pid, - ( - transfer_channel_receiver, - frame_send_sender, - shutdown_sender, - ), - ); - self.participant_from_channel.write().await.insert(cid, pid); - self.pool.spawn_ok( - bparticipant - .run() - .instrument(tracing::info_span!("participant", ?pid)), - ); - sender.send(frame_recv_sender).unwrap(); - } else { - error!( - "2ND channel of participants opens, but we cannot verify that this is not \ - a attack to " - ); - //ERROR DEADLOCK AS NO SENDER HERE! - //sender.send(frame_recv_sender).unwrap(); - } - //From now on this CHANNEL can receiver other frames! move - // directly to participant! - } - } - trace!("stop channel_activator"); - } - // requested by participant when stream wants to close from api, checking if no // more msg is in prio and return pub(crate) async fn stream_finished_manager( @@ -447,10 +338,9 @@ impl Scheduler { &self, addr: Address, end_receiver: oneshot::Receiver<()>, - configured_sender: mpsc::UnboundedSender, result_sender: oneshot::Sender>, ) { - info!(?addr, "start up channel creator"); + trace!(?addr, "start up channel creator"); match addr { Address::Tcp(addr) => { let listener = match net::TcpListener::bind(addr).await { @@ -478,8 +368,6 @@ impl Scheduler { let stream = stream.unwrap(); info!("Accepting Tcp from: {}", stream.peer_addr().unwrap()); self.init_protocol( - addr, - &configured_sender, Protocols::Tcp(TcpProtocol::new(stream, self.metrics.clone())), None, true, @@ -521,12 +409,11 @@ impl Scheduler { listeners.insert(remote_addr.clone(), udp_data_sender); let protocol = Protocols::Udp(UdpProtocol::new( socket.clone(), - remote_addr, + remote_addr.clone(), self.metrics.clone(), udp_data_receiver, )); - self.init_protocol(addr, &configured_sender, protocol, None, true) - .await; + self.init_protocol(protocol, None, false).await; } let udp_data_sender = listeners.get_mut(&remote_addr).unwrap(); udp_data_sender.send(datavec).await.unwrap(); @@ -534,7 +421,7 @@ impl Scheduler { }, _ => unimplemented!(), } - info!(?addr, "ending channel creator"); + trace!(?addr, "ending channel creator"); } pub(crate) async fn udp_single_channel_connect( @@ -542,7 +429,7 @@ impl Scheduler { mut udp_data_sender: mpsc::UnboundedSender>, ) { let addr = socket.local_addr(); - info!(?addr, "start udp_single_channel_connect"); + trace!(?addr, "start udp_single_channel_connect"); //TODO: implement real closing let (_end_sender, end_receiver) = oneshot::channel::<()>(); @@ -558,37 +445,112 @@ impl Scheduler { datavec.extend_from_slice(&data[0..size]); udp_data_sender.send(datavec).await.unwrap(); } - info!(?addr, "stop udp_single_channel_connect"); + trace!(?addr, "stop udp_single_channel_connect"); } async fn init_protocol( &self, - addr: std::net::SocketAddr, - configured_sender: &mpsc::UnboundedSender, protocol: Protocols, pid_sender: Option>>, send_handshake: bool, ) { - let (mut part_in_sender, part_in_receiver) = mpsc::unbounded::(); //channels are unknown till PID is known! /* When A connects to a NETWORK, we, the listener answers with a Handshake. Pro: - Its easier to debug, as someone who opens a port gets a magic number back! Contra: - DOS posibility because we answer fist - Speed, because otherwise the message can be send with the creation */ + let mut participant_channels = self.participant_channels.clone(); + // spawn is needed here, e.g. for TCP connect it would mean that only 1 + // participant can be in handshake phase ever! Someone could deadlock + // the whole server easily for new clients UDP doesnt work at all, as + // the UDP listening is done in another place. let cid = self.channel_ids.fetch_add(1, Ordering::Relaxed); - let channel = Channel::new(cid, self.local_pid, self.metrics.clone()); - if send_handshake { - channel.send_handshake(&mut part_in_sender).await; - } - self.pool.spawn_ok( - channel - .run(protocol, part_in_receiver, configured_sender.clone()) - .instrument(tracing::info_span!("channel", ?addr)), - ); - self.unknown_channels - .write() - .await - .insert(cid, (part_in_sender, pid_sender)); + let participants = self.participants.clone(); + let metrics = self.metrics.clone(); + let pool = self.pool.clone(); + let local_pid = self.local_pid; + self.pool.spawn_ok(async move { + trace!(?cid, "open channel and be ready for Handshake"); + let handshake = Handshake::new(cid, local_pid, metrics.clone(), send_handshake); + match handshake.setup(&protocol).await { + Ok((pid, sid)) => { + trace!( + ?cid, + ?pid, + "detected that my channel is ready!, activating it :)" + ); + let mut participants = participants.write().await; + if !participants.contains_key(&pid) { + debug!(?cid, "new participant connected via a channel"); + let ( + bparticipant, + stream_open_sender, + stream_opened_receiver, + mut create_channel_sender, + frame_send_sender, + shutdown_sender, + ) = BParticipant::new( + pid, + sid, + metrics.clone(), + participant_channels.prios_sender, + participant_channels.stream_finished_request_sender, + ); + + let participant = Participant::new( + local_pid, + pid, + stream_open_sender, + stream_opened_receiver, + participant_channels.disconnect_sender, + ); + + metrics.participants_connected_total.inc(); + participants.insert( + pid, + ( + create_channel_sender.clone(), + frame_send_sender, + shutdown_sender, + ), + ); + pool.spawn_ok( + bparticipant + .run() + .instrument(tracing::info_span!("participant", ?pid)), + ); + //create a new channel within BParticipant and wait for it to run + let (sync_sender, sync_receiver) = oneshot::channel(); + create_channel_sender + .send((cid, sid, protocol, sync_sender)) + .await + .unwrap(); + sync_receiver.await.unwrap(); + if let Some(pid_oneshot) = pid_sender { + // someone is waiting with connect, so give them their PID + pid_oneshot.send(Ok(participant)).unwrap(); + } else { + // noone is waiting on this Participant, return in to Network + participant_channels + .connected_sender + .send(participant) + .await + .unwrap(); + } + } else { + error!( + "2ND channel of participants opens, but we cannot verify that this is \ + not a attack to " + ); + //ERROR DEADLOCK AS NO SENDER HERE! + //sender.send(frame_recv_sender).unwrap(); + } + //From now on this CHANNEL can receiver other frames! move + // directly to participant! + }, + Err(()) => {}, + } + }); } } diff --git a/network/src/types.rs b/network/src/types.rs index d80d0839e5..541c9b534a 100644 --- a/network/src/types.rs +++ b/network/src/types.rs @@ -62,6 +62,36 @@ pub(crate) enum Frame { Raw(Vec), } +impl Frame { + pub fn get_string(&self) -> &str { + match self { + Frame::Handshake { + magic_number: _, + version: _, + } => "Handshake", + Frame::ParticipantId { pid: _ } => "ParticipantId", + Frame::Shutdown => "Shutdown", + Frame::OpenStream { + sid: _, + prio: _, + promises: _, + } => "OpenStream", + Frame::CloseStream { sid: _ } => "CloseStream", + Frame::DataHeader { + mid: _, + sid: _, + length: _, + } => "DataHeader", + Frame::Data { + mid: _, + start: _, + data: _, + } => "Data", + Frame::Raw(_) => "Raw", + } + } +} + #[derive(Debug)] pub(crate) enum Requestor { User, @@ -111,13 +141,35 @@ impl Sid { impl std::fmt::Debug for Pid { #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + const BITS_PER_SIXLET: usize = 6; //only print last 6 chars of number as full u128 logs are unreadable - write!(f, "{}", self.internal.rem_euclid(100000)) + const CHAR_COUNT: usize = 6; + for i in 0..CHAR_COUNT { + write!( + f, + "{}", + sixlet_to_str((self.internal >> i * BITS_PER_SIXLET) & 0x3F) + )?; + } + Ok(()) } } -impl From for u128 { - fn from(pid: Pid) -> Self { pid.internal } +impl std::fmt::Display for Pid { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + const BITS_PER_SIXLET: usize = 6; + //only print last 6 chars of number as full u128 logs are unreadable + const CHAR_COUNT: usize = 6; + for i in 0..CHAR_COUNT { + write!( + f, + "{}", + sixlet_to_str((self.internal >> i * BITS_PER_SIXLET) & 0x3F) + )?; + } + Ok(()) + } } impl std::ops::AddAssign for Sid { @@ -139,3 +191,74 @@ impl std::fmt::Debug for Sid { impl From for Sid { fn from(internal: u64) -> Self { Sid { internal } } } + +#[inline] +fn sixlet_to_str(sixlet: u128) -> char { + match sixlet { + 0 => 'A', + 1 => 'B', + 2 => 'C', + 3 => 'D', + 4 => 'E', + 5 => 'F', + 6 => 'G', + 7 => 'H', + 8 => 'I', + 9 => 'J', + 10 => 'K', + 11 => 'L', + 12 => 'M', + 13 => 'N', + 14 => 'O', + 15 => 'P', + 16 => 'Q', + 17 => 'R', + 18 => 'S', + 19 => 'T', + 20 => 'U', + 21 => 'V', + 22 => 'W', + 23 => 'X', + 24 => 'Y', + 25 => 'Z', + 26 => 'a', + 27 => 'b', + 28 => 'c', + 29 => 'd', + 30 => 'e', + 31 => 'f', + 32 => 'g', + 33 => 'h', + 34 => 'i', + 35 => 'j', + 36 => 'k', + 37 => 'l', + 38 => 'm', + 39 => 'n', + 40 => 'o', + 41 => 'p', + 42 => 'q', + 43 => 'r', + 44 => 's', + 45 => 't', + 46 => 'u', + 47 => 'v', + 48 => 'w', + 49 => 'x', + 50 => 'y', + 51 => 'z', + 52 => '0', + 53 => '1', + 54 => '2', + 55 => '3', + 56 => '4', + 57 => '5', + 58 => '6', + 59 => '7', + 60 => '8', + 61 => '9', + 62 => '+', + 63 => '/', + _ => '-', + } +}