diff --git a/network/examples/network-speed/src/main.rs b/network/examples/network-speed/src/main.rs index 8b1fe89061..ef44609307 100644 --- a/network/examples/network-speed/src/main.rs +++ b/network/examples/network-speed/src/main.rs @@ -75,7 +75,14 @@ fn main() { .get_matches(); let trace = matches.value_of("trace").unwrap(); - let filter = EnvFilter::from_default_env().add_directive(trace.parse().unwrap())/* + let filter = EnvFilter::from_default_env() + .add_directive(trace.parse().unwrap()) + .add_directive("network_speed=debug".parse().unwrap()) + .add_directive("veloren_network::participant=trace".parse().unwrap()) + .add_directive("veloren_network::protocol=trace".parse().unwrap()) + .add_directive("veloren_network::scheduler=trace".parse().unwrap()) + .add_directive("veloren_network::api=trace".parse().unwrap()) + /* .add_directive("veloren_network::participant=debug".parse().unwrap()).add_directive("veloren_network::api=debug".parse().unwrap())*/; tracing_subscriber::FmtSubscriber::builder() .with_max_level(Level::ERROR) @@ -165,6 +172,13 @@ fn client(address: Address) { std::thread::sleep(std::time::Duration::from_millis(5000)); break; } - } - debug!("closing client"); + }; + drop(s1); + std::thread::sleep(std::time::Duration::from_millis(5000)); + info!("closing participant"); + block_on(client.disconnect(p1)).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(75000)); + info!("DROPPING! client"); + drop(client); + std::thread::sleep(std::time::Duration::from_millis(75000)); } diff --git a/network/examples/network-speed/src/metrics.rs b/network/examples/network-speed/src/metrics.rs index e10eb678e0..9186c3fdc8 100644 --- a/network/examples/network-speed/src/metrics.rs +++ b/network/examples/network-speed/src/metrics.rs @@ -9,7 +9,6 @@ use std::{ Arc, }, thread, - time::Duration, }; pub struct SimpleMetrics { @@ -49,10 +48,10 @@ impl SimpleMetrics { //TODO: make this a job self.handle = Some(thread::spawn(move || { let server = tiny_http::Server::http(addr).unwrap(); - const timeout: std::time::Duration = std::time::Duration::from_secs(1); + const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1); debug!("starting tiny_http server to serve metrics"); while running2.load(Ordering::Relaxed) { - let request = match server.recv_timeout(timeout) { + let request = match server.recv_timeout(TIMEOUT) { Ok(Some(rq)) => rq, Ok(None) => continue, Err(e) => { println!("error: {}", e); break } @@ -62,7 +61,10 @@ impl SimpleMetrics { let mut buffer = vec![]; encoder.encode(&mf, &mut buffer).expect("Failed to encoder metrics text."); let response = tiny_http::Response::from_string(String::from_utf8(buffer).expect("Failed to parse bytes as a string.")); - request.respond(response); + match request.respond(response) { + Err(e) => error!(?e, "The metrics HTTP server had encountered and error with answering"), + _ => (), + } } debug!("stopping tiny_http server to serve metrics"); })); diff --git a/network/src/api.rs b/network/src/api.rs index 5aec33b906..40332e8efd 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -1,7 +1,7 @@ use crate::{ message::{self, InCommingMessage, MessageBuffer, OutGoingMessage}, scheduler::Scheduler, - types::{Mid, Pid, Prio, Promises, Requestor::User, Sid}, + types::{Mid, Pid, Prio, Promises, Sid}, }; use async_std::{io, sync::RwLock, task}; use futures::{ @@ -14,7 +14,7 @@ use serde::{de::DeserializeOwned, Serialize}; use std::{ collections::HashMap, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, }; @@ -40,10 +40,11 @@ pub enum Address { pub struct Participant { local_pid: Pid, remote_pid: Pid, - stream_open_sender: RwLock)>>, - stream_opened_receiver: RwLock>, + a2b_steam_open_s: RwLock)>>, + b2a_stream_opened_r: RwLock>, closed: AtomicBool, - disconnect_sender: Option>, + a2s_disconnect_s: + Option>)>>, } /// `Streams` represents a channel to send `n` messages with a certain priority @@ -70,10 +71,10 @@ pub struct Stream { mid: Mid, prio: Prio, promises: Promises, - msg_send_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, - msg_recv_receiver: mpsc::UnboundedReceiver, + a2b_msg_s: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, + b2a_msg_recv_r: mpsc::UnboundedReceiver, closed: Arc, - shutdown_sender: Option>, + a2b_close_stream_s: Option>, } /// Error type thrown by [`Networks`](Network) methods @@ -162,17 +163,17 @@ impl Network { /// [`ThreadPool`]: uvth::ThreadPool pub fn new(participant_id: Pid, thread_pool: &ThreadPool, registry: Option<&Registry>) -> Self { let p = participant_id; - debug!(?p, ?User, "starting Network"); + debug!(?p, "starting Network"); let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) = Scheduler::new(participant_id, registry); thread_pool.execute(move || { - trace!(?p, ?User, "starting sheduler in own thread"); + trace!(?p, "starting sheduler in own thread"); let _handle = task::block_on( scheduler .run() .instrument(tracing::info_span!("scheduler", ?p)), ); - trace!(?p, ?User, "stopping sheduler and his own thread"); + trace!(?p, "stopping sheduler and his own thread"); }); Self { local_pid: participant_id, @@ -210,14 +211,14 @@ impl Network { /// /// [`connected`]: Network::connected pub async fn listen(&self, address: Address) -> Result<(), NetworkError> { - let (result_sender, result_receiver) = oneshot::channel::>(); - debug!(?address, ?User, "listening on address"); + let (s2a_result_s, s2a_result_r) = oneshot::channel::>(); + debug!(?address, "listening on address"); self.listen_sender .write() .await - .send((address, result_sender)) + .send((address, s2a_result_s)) .await?; - match result_receiver.await? { + match s2a_result_r.await? { //waiting guarantees that we either listened sucessfully or get an error like port in // use Ok(()) => Ok(()), @@ -256,7 +257,7 @@ impl Network { /// [`Addresses`]: crate::api::Address pub async fn connect(&self, address: Address) -> Result, NetworkError> { let (pid_sender, pid_receiver) = oneshot::channel::>(); - debug!(?address, ?User, "connect to address"); + debug!(?address, "connect to address"); self.connect_sender .write() .await @@ -266,7 +267,6 @@ impl Network { let pid = participant.remote_pid; debug!( ?pid, - ?User, "received Participant id from remote and return to user" ); let participant = Arc::new(participant); @@ -317,8 +317,10 @@ impl Network { /// are not allowed to keep others. If you do so the [`Participant`] /// can't be disconnected properly. If you no longer have the respective /// [`Participant`], try using the [`participants`] method to get it. + /// /// This function will wait for all [`Streams`] to properly close, including - /// all messages to be send before closing. + /// all messages to be send before closing. If an error occurs with one + /// of the messavb /// /// # Examples /// ```rust @@ -349,12 +351,33 @@ impl Network { self.participants.write().await.remove(&pid)?; participant.closed.store(true, Ordering::Relaxed); - if Arc::try_unwrap(participant).is_err() { - warn!( - "you are disconnecting and still keeping a reference to this participant, this is \ - a bad idea. Participant will only be dropped when you drop your last reference" - ); + match Arc::try_unwrap(participant) { + Err(_) => { + warn!( + "you are disconnecting and still keeping a reference to this participant, \ + this is a bad idea. Participant will only be dropped when you drop your last \ + reference" + ); + }, + Ok(mut participant) => { + trace!("waiting now for participant to close"); + let (finished_sender, finished_receiver) = oneshot::channel(); + // we are deleting here asyncly before DROP is called. Because this is done + // nativly async, while drop needs an BLOCK! Drop will recognis + // that it has been delete here and don't try another double delete. + participant + .a2s_disconnect_s + .take() + .unwrap() + .send((pid, finished_sender)) + .await + .expect("something is wrong in internal scheduler coding"); + let res = finished_receiver.await.unwrap(); + trace!("participant is now closed"); + res?; + }, }; + Ok(()) } @@ -370,17 +393,17 @@ impl Participant { pub(crate) fn new( local_pid: Pid, remote_pid: Pid, - stream_open_sender: mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender)>, - stream_opened_receiver: mpsc::UnboundedReceiver, - disconnect_sender: mpsc::UnboundedSender, + a2b_steam_open_s: mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender)>, + b2a_stream_opened_r: mpsc::UnboundedReceiver, + a2s_disconnect_s: mpsc::UnboundedSender<(Pid, oneshot::Sender>)>, ) -> Self { Self { local_pid, remote_pid, - stream_open_sender: RwLock::new(stream_open_sender), - stream_opened_receiver: RwLock::new(stream_opened_receiver), + a2b_steam_open_s: RwLock::new(a2b_steam_open_s), + b2a_stream_opened_r: RwLock::new(b2a_stream_opened_r), closed: AtomicBool::new(false), - disconnect_sender: Some(disconnect_sender), + a2s_disconnect_s: Some(a2s_disconnect_s), } } @@ -422,29 +445,29 @@ impl Participant { pub async fn open(&self, prio: u8, promises: Promises) -> Result { //use this lock for now to make sure that only one open at a time is made, // TODO: not sure if we can paralise that, check in future - let mut stream_open_sender = self.stream_open_sender.write().await; + let mut a2b_steam_open_s = self.a2b_steam_open_s.write().await; if self.closed.load(Ordering::Relaxed) { warn!(?self.remote_pid, "participant is closed but another open is tried on it"); return Err(ParticipantError::ParticipantClosed); } - let (sid_sender, sid_receiver) = oneshot::channel(); - if stream_open_sender - .send((prio, promises, sid_sender)) + let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel(); + if a2b_steam_open_s + .send((prio, promises, p2a_return_stream_s)) .await .is_err() { - debug!(?self.remote_pid, ?User, "stream_open_sender failed, closing participant"); + debug!(?self.remote_pid, "stream_open_sender failed, closing participant"); self.closed.store(true, Ordering::Relaxed); return Err(ParticipantError::ParticipantClosed); } - match sid_receiver.await { + match p2a_return_stream_r.await { Ok(stream) => { let sid = stream.sid; - debug!(?sid, ?self.remote_pid, ?User, "opened stream"); + debug!(?sid, ?self.remote_pid, "opened stream"); Ok(stream) }, Err(_) => { - debug!(?self.remote_pid, ?User, "sid_receiver failed, closing participant"); + debug!(?self.remote_pid, "p2a_return_stream_r failed, closing participant"); self.closed.store(true, Ordering::Relaxed); Err(ParticipantError::ParticipantClosed) }, @@ -478,7 +501,7 @@ impl Participant { pub async fn opened(&self) -> Result { //use this lock for now to make sure that only one open at a time is made, // TODO: not sure if we can paralise that, check in future - let mut stream_opened_receiver = self.stream_opened_receiver.write().await; + let mut stream_opened_receiver = self.b2a_stream_opened_r.write().await; if self.closed.load(Ordering::Relaxed) { warn!(?self.remote_pid, "participant is closed but another open is tried on it"); return Err(ParticipantError::ParticipantClosed); @@ -507,10 +530,10 @@ impl Stream { sid: Sid, prio: Prio, promises: Promises, - msg_send_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, - msg_recv_receiver: mpsc::UnboundedReceiver, + a2b_msg_s: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, + b2a_msg_recv_r: mpsc::UnboundedReceiver, closed: Arc, - shutdown_sender: mpsc::UnboundedSender, + a2b_close_stream_s: mpsc::UnboundedSender, ) -> Self { Self { pid, @@ -518,10 +541,10 @@ impl Stream { mid: 0, prio, promises, - msg_send_sender, - msg_recv_receiver, + a2b_msg_s, + b2a_msg_recv_r, closed, - shutdown_sender: Some(shutdown_sender), + a2b_close_stream_s: Some(a2b_close_stream_s), } } @@ -600,8 +623,8 @@ impl Stream { if self.closed.load(Ordering::Relaxed) { return Err(StreamError::StreamClosed); } - debug!(?messagebuffer, ?User, "sending a message"); - self.msg_send_sender + //debug!(?messagebuffer, "sending a message"); + self.a2b_msg_s .send((self.prio, self.pid, self.sid, OutGoingMessage { buffer: messagebuffer, cursor: 0, @@ -632,8 +655,8 @@ impl Stream { pub async fn recv_raw(&mut self) -> Result { //no need to access self.closed here, as when this stream is closed the Channel // is closed which will trigger a None - let msg = self.msg_recv_receiver.next().await?; - info!(?msg, ?User, "delivering a message"); + let msg = self.b2a_msg_recv_r.next().await?; + info!(?msg, "delivering a message"); Ok(msg.buffer) } } @@ -642,11 +665,20 @@ impl Drop for Network { fn drop(&mut self) { let pid = self.local_pid; debug!(?pid, "shutting down Network"); + debug!( + ?pid, + "shutting down Participants of Network, while we still have metrics" + ); + task::block_on(async { + self.participants.write().await.clear(); + }); + debug!(?pid, "shutting down Scheduler"); self.shutdown_sender .take() .unwrap() .send(()) .expect("scheduler is closed, but nobody other should be able to close it"); + debug!(?pid, "participants have shut down!"); } } @@ -656,14 +688,41 @@ impl Drop for Participant { // participant from network let pid = self.remote_pid; debug!(?pid, "shutting down Participant"); - task::block_on(async { - self.disconnect_sender - .take() - .unwrap() - .send(self.remote_pid) - .await - .expect("something is wrong in internal scheduler coding") - }); + match self.a2s_disconnect_s.take() { + None => debug!( + ?pid, + "Participant has been shutdown cleanly, no further waiting is requiered!" + ), + Some(mut a2s_disconnect_s) => { + debug!( + ?pid, + "unclean shutdown detected, active waiting for client to be disconnected" + ); + task::block_on(async { + let (finished_sender, finished_receiver) = oneshot::channel(); + a2s_disconnect_s + .send((self.remote_pid, finished_sender)) + .await + .expect("something is wrong in internal scheduler coding"); + match finished_receiver.await { + Ok(Err(e)) => error!( + ?pid, + ?e, + "Error while dropping the participant, couldn't send all outgoing \ + messages, dropping remaining" + ), + Err(e) => warn!( + ?e, + "//TODO i dont know why the finish doesnt work, i normally would \ + expect to have sended a return message from the participant... \ + ignoring to not caue a panic for now, please fix me" + ), + _ => (), + }; + }); + }, + } + debug!(?pid, "network dropped"); } } @@ -674,12 +733,16 @@ impl Drop for Stream { let sid = self.sid; let pid = self.pid; debug!(?pid, ?sid, "shutting down Stream"); - if task::block_on(self.shutdown_sender.take().unwrap().send(self.sid)).is_err() { + if task::block_on(self.a2b_close_stream_s.take().unwrap().send(self.sid)).is_err() { warn!( "Other side got already dropped, probably due to timing, other side will \ handle this gracefully" ); }; + } else { + let sid = self.sid; + let pid = self.pid; + debug!(?pid, ?sid, "not needed"); } } } diff --git a/network/src/participant.rs b/network/src/participant.rs index d2a6d7f1be..087cd2633d 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -17,41 +17,47 @@ use futures::{ use std::{ collections::HashMap, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, }, }; use tracing::*; +#[derive(Debug)] +struct ChannelInfo { + cid: Cid, + b2w_frame_s: mpsc::UnboundedSender, + b2r_read_shutdown: oneshot::Sender<()>, +} + +#[derive(Debug)] +struct StreamInfo { + prio: Prio, + promises: Promises, + b2a_msg_recv_s: mpsc::UnboundedSender, + closed: Arc, +} + #[derive(Debug)] struct ControlChannels { - stream_open_receiver: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender)>, - stream_opened_sender: mpsc::UnboundedSender, - create_channel_receiver: mpsc::UnboundedReceiver<(Cid, Sid, Protocols, oneshot::Sender<()>)>, - shutdown_api_receiver: mpsc::UnboundedReceiver, - shutdown_api_sender: mpsc::UnboundedSender, - send_outgoing: Arc>>, //api - frame_send_receiver: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>, //scheduler - shutdown_receiver: oneshot::Receiver<()>, //own - stream_finished_request_sender: mpsc::UnboundedSender<(Pid, Sid, oneshot::Sender<()>)>, + a2b_steam_open_r: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender)>, + b2a_stream_opened_s: mpsc::UnboundedSender, + s2b_create_channel_r: mpsc::UnboundedReceiver<(Cid, Sid, Protocols, oneshot::Sender<()>)>, + a2b_close_stream_r: mpsc::UnboundedReceiver, + a2b_close_stream_s: mpsc::UnboundedSender, + a2p_msg_s: Arc>>, //api stream + p2b_notify_empty_stream_s: Arc)>>>, + s2b_frame_r: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>, //scheduler + s2b_shutdown_bparticipant_r: oneshot::Receiver>>, /* own */ } #[derive(Debug)] pub struct BParticipant { remote_pid: Pid, offset_sid: Sid, - channels: Arc)>>>, - streams: RwLock< - HashMap< - Sid, - ( - Prio, - Promises, - mpsc::UnboundedSender, - Arc, - ), - >, - >, + channels: Arc>>, + streams: RwLock>, + running_mgr: AtomicUsize, run_channels: Option, metrics: Arc, } @@ -61,35 +67,35 @@ impl BParticipant { remote_pid: Pid, offset_sid: Sid, metrics: Arc, - send_outgoing: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, - stream_finished_request_sender: mpsc::UnboundedSender<(Pid, Sid, oneshot::Sender<()>)>, + a2p_msg_s: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, + p2b_notify_empty_stream_s: std::sync::mpsc::Sender<(Pid, Sid, oneshot::Sender<()>)>, ) -> ( Self, mpsc::UnboundedSender<(Prio, Promises, oneshot::Sender)>, mpsc::UnboundedReceiver, mpsc::UnboundedSender<(Cid, Sid, Protocols, oneshot::Sender<()>)>, mpsc::UnboundedSender<(Pid, Sid, Frame)>, - oneshot::Sender<()>, + oneshot::Sender>>, ) { - let (stream_open_sender, stream_open_receiver) = + let (a2b_steam_open_s, a2b_steam_open_r) = mpsc::unbounded::<(Prio, Promises, oneshot::Sender)>(); - let (stream_opened_sender, stream_opened_receiver) = mpsc::unbounded::(); - let (shutdown_api_sender, shutdown_api_receiver) = mpsc::unbounded(); - let (frame_send_sender, frame_send_receiver) = mpsc::unbounded::<(Pid, Sid, Frame)>(); - let (shutdown_sender, shutdown_receiver) = oneshot::channel(); - let (create_channel_sender, create_channel_receiver) = + let (b2a_stream_opened_s, b2a_stream_opened_r) = mpsc::unbounded::(); + let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded(); + let (s2b_frame_s, s2b_frame_r) = mpsc::unbounded::<(Pid, Sid, Frame)>(); + let (s2b_shutdown_bparticipant_s, s2b_shutdown_bparticipant_r) = oneshot::channel(); + let (s2b_create_channel_s, s2b_create_channel_r) = mpsc::unbounded::<(Cid, Sid, Protocols, oneshot::Sender<()>)>(); let run_channels = Some(ControlChannels { - stream_open_receiver, - stream_opened_sender, - create_channel_receiver, - shutdown_api_receiver, - shutdown_api_sender, - send_outgoing: Arc::new(Mutex::new(send_outgoing)), - frame_send_receiver, - shutdown_receiver, - stream_finished_request_sender, + a2b_steam_open_r, + b2a_stream_opened_s, + s2b_create_channel_r, + a2b_close_stream_r, + a2b_close_stream_s, + a2p_msg_s: Arc::new(Mutex::new(a2p_msg_s)), + p2b_notify_empty_stream_s: Arc::new(Mutex::new(p2b_notify_empty_stream_s)), + s2b_frame_r, + s2b_shutdown_bparticipant_r, }); ( @@ -98,55 +104,50 @@ impl BParticipant { offset_sid, channels: Arc::new(RwLock::new(vec![])), streams: RwLock::new(HashMap::new()), + running_mgr: AtomicUsize::new(0), run_channels, metrics, }, - stream_open_sender, - stream_opened_receiver, - create_channel_sender, - frame_send_sender, - shutdown_sender, + a2b_steam_open_s, + b2a_stream_opened_r, + s2b_create_channel_s, + s2b_frame_s, + s2b_shutdown_bparticipant_s, ) } pub async fn run(mut self) { //those managers that listen on api::Participant need an additional oneshot for // shutdown scenario, those handled by scheduler will be closed by it. - let (shutdown_open_manager_sender, shutdown_open_manager_receiver) = oneshot::channel(); - let (shutdown_stream_close_manager_sender, shutdown_stream_close_manager_receiver) = + let (shutdown_open_mgr_sender, shutdown_open_mgr_receiver) = oneshot::channel(); + let (shutdown_stream_close_mgr_sender, shutdown_stream_close_mgr_receiver) = oneshot::channel(); - let (frame_from_wire_sender, frame_from_wire_receiver) = mpsc::unbounded::<(Cid, Frame)>(); + let (w2b_frames_s, w2b_frames_r) = mpsc::unbounded::<(Cid, Frame)>(); let run_channels = self.run_channels.take().unwrap(); futures::join!( - self.open_manager( - run_channels.stream_open_receiver, - run_channels.shutdown_api_sender.clone(), - run_channels.send_outgoing.clone(), - shutdown_open_manager_receiver, + self.open_mgr( + run_channels.a2b_steam_open_r, + run_channels.a2b_close_stream_s.clone(), + run_channels.a2p_msg_s.clone(), + shutdown_open_mgr_receiver, ), - self.handle_frames( - frame_from_wire_receiver, - run_channels.stream_opened_sender, - run_channels.shutdown_api_sender, - run_channels.send_outgoing.clone(), + self.handle_frames_mgr( + w2b_frames_r, + run_channels.b2a_stream_opened_s, + run_channels.a2b_close_stream_s, + run_channels.a2p_msg_s.clone(), ), - self.create_channel_manager( - run_channels.create_channel_receiver, - frame_from_wire_sender, + self.create_channel_mgr(run_channels.s2b_create_channel_r, w2b_frames_s,), + self.send_mgr(run_channels.s2b_frame_r), + self.stream_close_mgr( + run_channels.a2b_close_stream_r, + shutdown_stream_close_mgr_receiver, + run_channels.p2b_notify_empty_stream_s, ), - self.send_manager(run_channels.frame_send_receiver), - self.stream_close_manager( - run_channels.shutdown_api_receiver, - shutdown_stream_close_manager_receiver, - run_channels.stream_finished_request_sender, - ), - self.shutdown_manager( - run_channels.shutdown_receiver, - vec!( - shutdown_open_manager_sender, - shutdown_stream_close_manager_sender - ) + self.participant_shutdown_mgr( + run_channels.s2b_shutdown_bparticipant_r, + vec!(shutdown_open_mgr_sender, shutdown_stream_close_mgr_sender) ), ); } @@ -154,35 +155,36 @@ impl BParticipant { async fn send_frame(&self, frame: Frame) { // find out ideal channel here //TODO: just take first - if let Some((cid, channel)) = self.channels.write().await.get_mut(0) { + if let Some(ci) = self.channels.write().await.get_mut(0) { self.metrics .frames_out_total .with_label_values(&[ &self.remote_pid.to_string(), - &cid.to_string(), + &ci.cid.to_string(), frame.get_string(), ]) .inc(); - channel.send(frame).await.unwrap(); + ci.b2w_frame_s.send(frame).await.unwrap(); } else { error!("participant has no channel to communicate on"); } } - async fn handle_frames( + async fn handle_frames_mgr( &self, - mut frame_from_wire_receiver: mpsc::UnboundedReceiver<(Cid, Frame)>, - mut stream_opened_sender: mpsc::UnboundedSender, - shutdown_api_sender: mpsc::UnboundedSender, - send_outgoing: Arc>>, + mut w2b_frames_r: mpsc::UnboundedReceiver<(Cid, Frame)>, + mut b2a_stream_opened_s: mpsc::UnboundedSender, + a2b_close_stream_s: mpsc::UnboundedSender, + a2p_msg_s: Arc>>, ) { + self.running_mgr.fetch_add(1, Ordering::Relaxed); trace!("start handle_frames"); - let send_outgoing = { send_outgoing.lock().unwrap().clone() }; + let a2p_msg_s = { a2p_msg_s.lock().unwrap().clone() }; let mut messages = HashMap::new(); let pid_string = &self.remote_pid.to_string(); - while let Some((cid, frame)) = frame_from_wire_receiver.next().await { + while let Some((cid, frame)) = w2b_frames_r.next().await { let cid_string = cid.to_string(); - trace!("handling frame"); + //trace!("handling frame"); self.metrics .frames_in_total .with_label_values(&[&pid_string, &cid_string, frame.get_string()]) @@ -193,11 +195,11 @@ impl BParticipant { prio, promises, } => { - let send_outgoing = send_outgoing.clone(); + let a2p_msg_s = a2p_msg_s.clone(); let stream = self - .create_stream(sid, prio, promises, send_outgoing, &shutdown_api_sender) + .create_stream(sid, prio, promises, a2p_msg_s, &a2b_close_stream_s) .await; - stream_opened_sender.send(stream).await.unwrap(); + b2a_stream_opened_s.send(stream).await.unwrap(); trace!("opened frame from remote"); }, Frame::CloseStream { sid } => { @@ -206,12 +208,12 @@ impl BParticipant { // However Stream.send() is not async and their receiver isn't dropped if Steam // is dropped, so i need a way to notify the Stream that it's send messages will // be dropped... from remote, notify local - if let Some((_, _, _, closed)) = self.streams.write().await.remove(&sid) { + if let Some(si) = self.streams.write().await.remove(&sid) { self.metrics .streams_closed_total .with_label_values(&[&pid_string]) .inc(); - closed.store(true, Ordering::Relaxed); + si.closed.store(true, Ordering::Relaxed); } else { error!( "couldn't find stream to close, either this is a duplicate message, \ @@ -241,12 +243,10 @@ impl BParticipant { false }; if finished { - debug!(?mid, "finished receiving message"); + //debug!(?mid, "finished receiving message"); let imsg = messages.remove(&mid).unwrap(); - if let Some((_, _, sender, _)) = - self.streams.write().await.get_mut(&imsg.sid) - { - sender.send(imsg).await.unwrap(); + if let Some(si) = self.streams.write().await.get_mut(&imsg.sid) { + si.b2a_msg_recv_s.send(imsg).await.unwrap(); } else { error!("dropping message as stream no longer seems to exist"); } @@ -256,81 +256,84 @@ impl BParticipant { } } trace!("stop handle_frames"); + self.running_mgr.fetch_sub(1, Ordering::Relaxed); } - async fn create_channel_manager( + async fn create_channel_mgr( &self, - channels_receiver: mpsc::UnboundedReceiver<(Cid, Sid, Protocols, oneshot::Sender<()>)>, - frame_from_wire_sender: mpsc::UnboundedSender<(Cid, Frame)>, + s2b_create_channel_r: mpsc::UnboundedReceiver<(Cid, Sid, Protocols, oneshot::Sender<()>)>, + w2b_frames_s: mpsc::UnboundedSender<(Cid, Frame)>, ) { - trace!("start channel_manager"); - channels_receiver - .for_each_concurrent(None, |(cid, sid, protocol, sender)| { + self.running_mgr.fetch_add(1, Ordering::Relaxed); + trace!("start create_channel_mgr"); + s2b_create_channel_r + .for_each_concurrent(None, |(cid, sid, protocol, b2s_create_channel_done_s)| { // This channel is now configured, and we are running it in scope of the // participant. - let frame_from_wire_sender = frame_from_wire_sender.clone(); + let w2b_frames_s = w2b_frames_s.clone(); let channels = self.channels.clone(); async move { - let (channel, frame_to_wire_sender, shutdown_sender) = + let (channel, b2w_frame_s, b2r_read_shutdown) = Channel::new(cid, self.remote_pid, self.metrics.clone()); - channels.write().await.push((cid, frame_to_wire_sender)); - sender.send(()).unwrap(); + channels.write().await.push(ChannelInfo { + cid, + b2w_frame_s, + b2r_read_shutdown, + }); + b2s_create_channel_done_s.send(()).unwrap(); self.metrics .channels_connected_total .with_label_values(&[&self.remote_pid.to_string()]) .inc(); - channel.run(protocol, frame_from_wire_sender).await; + trace!(?cid, "running channel in participant"); + channel.run(protocol, w2b_frames_s).await; self.metrics .channels_disconnected_total .with_label_values(&[&self.remote_pid.to_string()]) .inc(); trace!(?cid, "channel got closed"); - shutdown_sender.send(()).unwrap(); } }) .await; - trace!("stop channel_manager"); + trace!("stop create_channel_mgr"); + self.running_mgr.fetch_sub(1, Ordering::Relaxed); } - async fn send_manager( - &self, - mut frame_send_receiver: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>, - ) { - trace!("start send_manager"); - while let Some((_, _, frame)) = frame_send_receiver.next().await { + async fn send_mgr(&self, mut s2b_frame_r: mpsc::UnboundedReceiver<(Pid, Sid, Frame)>) { + self.running_mgr.fetch_add(1, Ordering::Relaxed); + trace!("start send_mgr"); + while let Some((_, sid, frame)) = s2b_frame_r.next().await { self.send_frame(frame).await; } - trace!("stop send_manager"); + trace!("stop send_mgr"); + self.running_mgr.fetch_sub(1, Ordering::Relaxed); } - async fn open_manager( + async fn open_mgr( &self, - mut stream_open_receiver: mpsc::UnboundedReceiver<( - Prio, - Promises, - oneshot::Sender, - )>, - shutdown_api_sender: mpsc::UnboundedSender, - send_outgoing: Arc>>, - shutdown_open_manager_receiver: oneshot::Receiver<()>, + mut a2b_steam_open_r: mpsc::UnboundedReceiver<(Prio, Promises, oneshot::Sender)>, + a2b_close_stream_s: mpsc::UnboundedSender, + a2p_msg_s: Arc>>, + shutdown_open_mgr_receiver: oneshot::Receiver<()>, ) { - trace!("start open_manager"); + self.running_mgr.fetch_add(1, Ordering::Relaxed); + trace!("start open_mgr"); let send_outgoing = { //fighting the borrow checker ;) - send_outgoing.lock().unwrap().clone() + a2p_msg_s.lock().unwrap().clone() }; let mut stream_ids = self.offset_sid; - let mut shutdown_open_manager_receiver = shutdown_open_manager_receiver.fuse(); + let mut shutdown_open_mgr_receiver = shutdown_open_mgr_receiver.fuse(); //from api or shutdown signal - while let Some((prio, promises, sender)) = select! { - next = stream_open_receiver.next().fuse() => next, - _ = shutdown_open_manager_receiver => None, + while let Some((prio, promises, p2a_return_stream)) = select! { + next = a2b_steam_open_r.next().fuse() => next, + _ = shutdown_open_mgr_receiver => None, } { debug!(?prio, ?promises, "got request to open a new steam"); let send_outgoing = send_outgoing.clone(); let sid = stream_ids; let stream = self - .create_stream(sid, prio, promises, send_outgoing, &shutdown_api_sender) + .create_stream(sid, prio, promises, send_outgoing, &a2b_close_stream_s) .await; self.send_frame(Frame::OpenStream { sid, @@ -338,78 +341,118 @@ impl BParticipant { promises, }) .await; - sender.send(stream).unwrap(); + p2a_return_stream.send(stream).unwrap(); stream_ids += Sid::from(1); } - trace!("stop open_manager"); + trace!("stop open_mgr"); + self.running_mgr.fetch_sub(1, Ordering::Relaxed); } - async fn shutdown_manager( + /// when activated this function will drop the participant completly and + /// wait for everything to go right! Then return 1. Shutting down + /// Streams for API and End user! 2. Wait for all "prio queued" Messages + /// to be send. 3. Send Stream + async fn participant_shutdown_mgr( &self, - shutdown_receiver: oneshot::Receiver<()>, + s2b_shutdown_bparticipant_r: oneshot::Receiver>>, mut to_shutdown: Vec>, ) { - trace!("start shutdown_manager"); - shutdown_receiver.await.unwrap(); + self.running_mgr.fetch_add(1, Ordering::Relaxed); + trace!("start participant_shutdown_mgr"); + let sender = s2b_shutdown_bparticipant_r.await.unwrap(); debug!("closing all managers"); for sender in to_shutdown.drain(..) { if sender.send(()).is_err() { - debug!("manager seems to be closed already, weird, maybe a bug"); + warn!("manager seems to be closed already, weird, maybe a bug"); }; } debug!("closing all streams"); let mut streams = self.streams.write().await; - for (sid, (_, _, _, closing)) in streams.drain() { + for (sid, si) in streams.drain() { trace!(?sid, "shutting down Stream"); - closing.store(true, Ordering::Relaxed); + si.closed.store(true, Ordering::Relaxed); } + debug!("closing all channels"); + for ci in self.channels.write().await.drain(..) { + ci.b2r_read_shutdown.send(()).unwrap(); + } + //Wait for other bparticipants mgr to close via AtomicUsize + const SLEEP_TIME: std::time::Duration = std::time::Duration::from_millis(5); + async_std::task::sleep(SLEEP_TIME).await; + let mut i: u32 = 1; + while self.running_mgr.load(Ordering::Relaxed) > 1 { + i += 1; + if i.rem_euclid(10) == 1 { + trace!( + "waiting for bparticipant mgr to shut down, remaining {}", + self.running_mgr.load(Ordering::Relaxed) - 1 + ); + } + async_std::task::sleep(SLEEP_TIME * i).await; + } + trace!("all bparticipant mgr (except me) are shut down now"); self.metrics.participants_disconnected_total.inc(); - trace!("stop shutdown_manager"); + sender.send(Ok(())).unwrap(); + trace!("stop participant_shutdown_mgr"); + self.running_mgr.fetch_sub(1, Ordering::Relaxed); } - async fn stream_close_manager( + async fn stream_close_mgr( &self, - mut shutdown_api_receiver: mpsc::UnboundedReceiver, - shutdown_stream_close_manager_receiver: oneshot::Receiver<()>, - mut stream_finished_request_sender: mpsc::UnboundedSender<(Pid, Sid, oneshot::Sender<()>)>, + mut a2b_close_stream_r: mpsc::UnboundedReceiver, + shutdown_stream_close_mgr_receiver: oneshot::Receiver<()>, + mut p2b_notify_empty_stream_s: Arc< + Mutex)>>, + >, ) { - trace!("start stream_close_manager"); - let mut shutdown_stream_close_manager_receiver = - shutdown_stream_close_manager_receiver.fuse(); + self.running_mgr.fetch_add(1, Ordering::Relaxed); + trace!("start stream_close_mgr"); + let mut shutdown_stream_close_mgr_receiver = shutdown_stream_close_mgr_receiver.fuse(); + //from api or shutdown signal while let Some(sid) = select! { - next = shutdown_api_receiver.next().fuse() => next, - _ = shutdown_stream_close_manager_receiver => None, + next = a2b_close_stream_r.next().fuse() => next, + _ = shutdown_stream_close_mgr_receiver => None, } { + //TODO: make this concurrent! + //TODO: Performance, closing is slow! trace!(?sid, "got request from api to close steam"); - //TODO: wait here till the last prio was send! - //The error is, that the close msg as a control message is send directly, while - // messages are only send after a next prio tick. This means, we - // close it first, and then send the headers and data packages... - // ofc the other side then no longer finds the respective stream. - //however we need to find out when the last message of a stream is send. it - // would be usefull to get a snapshot here, like, this stream has send out to - // msgid n, while the prio only has send m. then sleep as long as n < m maybe... - debug!("IF YOU SEE THIS, FIND A PROPPER FIX FOR CLOSING STREAMS"); + //This needs to first stop clients from sending any more. + //Then it will wait for all pending messages (in prio) to be send to the + // protocol After this happened the stream is closed + //Only after all messages are send to the prococol, we can send the CloseStream + // frame! If we would send it before, all followup messages couldn't + // be handled at the remote side. - let (sender, receiver) = oneshot::channel(); - trace!(?sid, "wait for stream to be flushed"); - stream_finished_request_sender - .send((self.remote_pid, sid, sender)) + trace!(?sid, "stopping api to use this stream"); + self.streams + .read() .await + .get(&sid) + .unwrap() + .closed + .store(true, Ordering::Relaxed); + + trace!(?sid, "wait for stream to be flushed"); + let (s2b_stream_finished_closed_s, s2b_stream_finished_closed_r) = oneshot::channel(); + p2b_notify_empty_stream_s + .lock() + .unwrap() + .send((self.remote_pid, sid, s2b_stream_finished_closed_s)) .unwrap(); - receiver.await.unwrap(); + s2b_stream_finished_closed_r.await.unwrap(); + trace!(?sid, "stream was successfully flushed"); self.metrics .streams_closed_total .with_label_values(&[&self.remote_pid.to_string()]) .inc(); - + //only now remove the Stream, that means we can still recv on it. self.streams.write().await.remove(&sid); - //from local, notify remote self.send_frame(Frame::CloseStream { sid }).await; } - trace!("stop stream_close_manager"); + trace!("stop stream_close_mgr"); + self.running_mgr.fetch_sub(1, Ordering::Relaxed); } async fn create_stream( @@ -417,15 +460,17 @@ impl BParticipant { sid: Sid, prio: Prio, promises: Promises, - send_outgoing: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, - shutdown_api_sender: &mpsc::UnboundedSender, + a2p_msg_s: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, + a2b_close_stream_s: &mpsc::UnboundedSender, ) -> Stream { - let (msg_recv_sender, msg_recv_receiver) = mpsc::unbounded::(); + let (b2a_msg_recv_s, b2a_msg_recv_r) = mpsc::unbounded::(); let closed = Arc::new(AtomicBool::new(false)); - self.streams - .write() - .await - .insert(sid, (prio, promises, msg_recv_sender, closed.clone())); + self.streams.write().await.insert(sid, StreamInfo { + prio, + promises, + b2a_msg_recv_s, + closed: closed.clone(), + }); self.metrics .streams_opened_total .with_label_values(&[&self.remote_pid.to_string()]) @@ -435,10 +480,10 @@ impl BParticipant { sid, prio, promises, - send_outgoing, - msg_recv_receiver, + a2p_msg_s, + b2a_msg_recv_r, closed.clone(), - shutdown_api_sender.clone(), + a2b_close_stream_s.clone(), ) } } diff --git a/network/src/prios.rs b/network/src/prios.rs index d30f80c0e2..b225f8a277 100644 --- a/network/src/prios.rs +++ b/network/src/prios.rs @@ -9,6 +9,7 @@ use crate::{ message::OutGoingMessage, types::{Frame, Pid, Prio, Sid}, }; +use futures::channel::oneshot; use std::{ collections::{HashMap, HashSet, VecDeque}, sync::mpsc::{channel, Receiver, Sender}, @@ -18,14 +19,27 @@ use tracing::*; const PRIO_MAX: usize = 64; +struct PidSidInfo { + len: u64, + empty_notify: Option>, +} + pub(crate) struct PrioManager { points: [u32; PRIO_MAX], messages: [VecDeque<(Pid, Sid, OutGoingMessage)>; PRIO_MAX], messages_rx: Receiver<(Prio, Pid, Sid, OutGoingMessage)>, - pid_sid_owned: HashMap<(Pid, Sid), u64>, + pid_sid_owned: HashMap<(Pid, Sid), PidSidInfo>, + //you can register to be notified if a pid_sid combination is flushed completly here + pid_sid_flushed_rx: Receiver<(Pid, Sid, oneshot::Sender<()>)>, queued: HashSet, } +/* +ERROR Okay ich kann die frames und msg nicht counten, da api auf msg basis zöhlt und BParticipant auf frame basis. +Der Priomanager hört auf gekillte PID, SIDs, und entweder returned sofort wenn keine msg drinn ist, oder schreibt es in id_sid_owned und haut es dann raus +Evtl sollten wir auch den prioManger auf mehr Async umstellen. auch wenn der TICK selber syncron ist. mal schaun. +*/ + impl PrioManager { const FRAME_DATA_SIZE: u64 = 1400; const PRIOS: [u32; PRIO_MAX] = [ @@ -36,8 +50,14 @@ impl PrioManager { 310419, 356578, 409600, 470507, 540470, 620838, ]; - pub fn new() -> (Self, Sender<(Prio, Pid, Sid, OutGoingMessage)>) { + pub fn new() -> ( + Self, + Sender<(Prio, Pid, Sid, OutGoingMessage)>, + Sender<(Pid, Sid, oneshot::Sender<()>)>, + ) { + // (a2p_msg_s, a2p_msg_r) let (messages_tx, messages_rx) = channel(); + let (pid_sid_flushed_tx, pid_sid_flushed_rx) = channel(); ( Self { points: [0; PRIO_MAX], @@ -109,15 +129,18 @@ impl PrioManager { ], messages_rx, queued: HashSet::new(), //TODO: optimize with u64 and 64 bits + pid_sid_flushed_rx, pid_sid_owned: HashMap::new(), }, messages_tx, + pid_sid_flushed_tx, ) } fn tick(&mut self) { // Check Range let mut times = 0; + let mut closed = 0; for (prio, pid, sid, msg) in self.messages_rx.try_iter() { debug_assert!(prio as usize <= PRIO_MAX); times += 1; @@ -125,13 +148,29 @@ impl PrioManager { self.queued.insert(prio); self.messages[prio as usize].push_back((pid, sid, msg)); if let Some(cnt) = self.pid_sid_owned.get_mut(&(pid, sid)) { - *cnt += 1; + cnt.len += 1; } else { - self.pid_sid_owned.insert((pid, sid), 1); + self.pid_sid_owned.insert((pid, sid), PidSidInfo { + len: 1, + empty_notify: None, + }); } } - if times > 0 { - trace!(?times, "tick"); + //this must be AFTER messages + for (pid, sid, return_sender) in self.pid_sid_flushed_rx.try_iter() { + closed += 1; + if let Some(cnt) = self.pid_sid_owned.get_mut(&(pid, sid)) { + // register sender + cnt.empty_notify = Some(return_sender); + } else { + // return immediately + futures::executor::block_on(async { + return_sender.send(()); + }); + } + } + if times > 0 || closed > 0 { + trace!(?times, ?closed, "tick"); } } @@ -219,9 +258,14 @@ impl PrioManager { "the pid_sid_owned counter works wrong, more pid,sid removed \ than inserted", ); - *cnt -= 1; - if *cnt == 0 { - self.pid_sid_owned.remove(&(pid, sid)); + cnt.len -= 1; + if cnt.len == 0 { + let cnt = self.pid_sid_owned.remove(&(pid, sid)).unwrap(); + cnt.empty_notify.map(|empty_notify| { + futures::executor::block_on(async { + empty_notify.send(()); + }) + }); } } else { self.messages[prio as usize].push_back((pid, sid, msg)); diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 799da284c6..d3b5e7d87f 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -22,7 +22,7 @@ use futures::{ }; use prometheus::Registry; use std::{ - collections::{HashMap, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, @@ -31,27 +31,35 @@ use std::{ use tracing::*; use tracing_futures::Instrument; -type ParticipantInfo = ( - mpsc::UnboundedSender<(Cid, Sid, Protocols, oneshot::Sender<()>)>, - mpsc::UnboundedSender<(Pid, Sid, Frame)>, - oneshot::Sender<()>, -); +#[derive(Debug)] +struct ParticipantInfo { + s2b_create_channel_s: mpsc::UnboundedSender<(Cid, Sid, Protocols, oneshot::Sender<()>)>, + s2b_frame_s: mpsc::UnboundedSender<(Pid, Sid, Frame)>, + s2b_shutdown_bparticipant_s: + Option>>>, +} +/// Naming of Channels `x2x` +/// - a: api +/// - s: scheduler +/// - b: bparticipant +/// - p: prios +/// - r: protocol +/// - w: wire #[derive(Debug)] struct ControlChannels { - listen_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender>)>, - connect_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender>)>, - shutdown_receiver: oneshot::Receiver<()>, - disconnect_receiver: mpsc::UnboundedReceiver, - stream_finished_request_receiver: mpsc::UnboundedReceiver<(Pid, Sid, oneshot::Sender<()>)>, + a2s_listen_r: mpsc::UnboundedReceiver<(Address, oneshot::Sender>)>, + a2s_connect_r: mpsc::UnboundedReceiver<(Address, oneshot::Sender>)>, + a2s_scheduler_shutdown_r: oneshot::Receiver<()>, + a2s_disconnect_r: mpsc::UnboundedReceiver<(Pid, oneshot::Sender>)>, } #[derive(Debug, Clone)] struct ParticipantChannels { - connected_sender: mpsc::UnboundedSender, - disconnect_sender: mpsc::UnboundedSender, - prios_sender: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, - stream_finished_request_sender: mpsc::UnboundedSender<(Pid, Sid, oneshot::Sender<()>)>, + s2a_connected_s: mpsc::UnboundedSender, + a2s_disconnect_s: mpsc::UnboundedSender<(Pid, oneshot::Sender>)>, + a2p_msg_s: std::sync::mpsc::Sender<(Prio, Pid, Sid, OutGoingMessage)>, + p2b_notify_empty_stream_s: std::sync::mpsc::Sender<(Pid, Sid, oneshot::Sender<()>)>, } #[derive(Debug)] @@ -60,7 +68,7 @@ pub struct Scheduler { closed: AtomicBool, pool: Arc, run_channels: Option, - participant_channels: ParticipantChannels, + participant_channels: Arc>>, participants: Arc>>, channel_ids: Arc, channel_listener: RwLock>>, @@ -79,29 +87,28 @@ impl Scheduler { mpsc::UnboundedReceiver, oneshot::Sender<()>, ) { - let (listen_sender, listen_receiver) = + let (a2s_listen_s, a2s_listen_r) = mpsc::unbounded::<(Address, oneshot::Sender>)>(); - let (connect_sender, connect_receiver) = + let (a2s_connect_s, a2s_connect_r) = mpsc::unbounded::<(Address, oneshot::Sender>)>(); - let (connected_sender, connected_receiver) = mpsc::unbounded::(); - let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>(); - let (prios, prios_sender) = PrioManager::new(); - let (disconnect_sender, disconnect_receiver) = mpsc::unbounded::(); - let (stream_finished_request_sender, stream_finished_request_receiver) = mpsc::unbounded(); + let (s2a_connected_s, s2a_connected_r) = mpsc::unbounded::(); + let (a2s_scheduler_shutdown_s, a2s_scheduler_shutdown_r) = oneshot::channel::<()>(); + let (prios, a2p_msg_s, p2b_notify_empty_stream_s) = PrioManager::new(); + let (a2s_disconnect_s, a2s_disconnect_r) = + mpsc::unbounded::<(Pid, oneshot::Sender>)>(); let run_channels = Some(ControlChannels { - listen_receiver, - connect_receiver, - shutdown_receiver, - disconnect_receiver, - stream_finished_request_receiver, + a2s_listen_r, + a2s_connect_r, + a2s_scheduler_shutdown_r, + a2s_disconnect_r, }); let participant_channels = ParticipantChannels { - disconnect_sender, - stream_finished_request_sender, - connected_sender, - prios_sender, + s2a_connected_s, + a2s_disconnect_s, + a2p_msg_s, + p2b_notify_empty_stream_s, }; let metrics = Arc::new(NetworkMetrics::new(&local_pid).unwrap()); @@ -115,17 +122,17 @@ impl Scheduler { closed: AtomicBool::new(false), pool: Arc::new(ThreadPool::new().unwrap()), run_channels, - participant_channels, + participant_channels: Arc::new(Mutex::new(Some(participant_channels))), participants: Arc::new(RwLock::new(HashMap::new())), channel_ids: Arc::new(AtomicU64::new(0)), channel_listener: RwLock::new(HashMap::new()), prios: Arc::new(Mutex::new(prios)), metrics, }, - listen_sender, - connect_sender, - connected_receiver, - shutdown_sender, + a2s_listen_s, + a2s_connect_s, + s2a_connected_r, + a2s_scheduler_shutdown_s, ) } @@ -133,22 +140,21 @@ impl Scheduler { let run_channels = self.run_channels.take().unwrap(); futures::join!( - self.listen_manager(run_channels.listen_receiver), - self.connect_manager(run_channels.connect_receiver), - self.disconnect_manager(run_channels.disconnect_receiver), - self.send_outgoing(), - self.stream_finished_manager(run_channels.stream_finished_request_receiver), - self.shutdown_manager(run_channels.shutdown_receiver), + self.listen_mgr(run_channels.a2s_listen_r), + self.connect_mgr(run_channels.a2s_connect_r), + self.disconnect_mgr(run_channels.a2s_disconnect_r), + self.send_outgoing_mgr(), + self.scheduler_shutdown_mgr(run_channels.a2s_scheduler_shutdown_r), ); } - async fn listen_manager( + async fn listen_mgr( &self, - listen_receiver: mpsc::UnboundedReceiver<(Address, oneshot::Sender>)>, + a2s_listen_r: mpsc::UnboundedReceiver<(Address, oneshot::Sender>)>, ) { - trace!("start listen_manager"); - listen_receiver - .for_each_concurrent(None, |(address, result_sender)| { + trace!("start listen_mgr"); + a2s_listen_r + .for_each_concurrent(None, |(address, s2a_result_s)| { let address = address.clone(); async move { @@ -166,23 +172,23 @@ impl Scheduler { .write() .await .insert(address.clone(), end_sender); - self.channel_creator(address, end_receiver, result_sender) + self.channel_creator(address, end_receiver, s2a_result_s) .await; } }) .await; - trace!("stop listen_manager"); + trace!("stop listen_mgr"); } - async fn connect_manager( + async fn connect_mgr( &self, - mut connect_receiver: mpsc::UnboundedReceiver<( + mut a2s_connect_r: mpsc::UnboundedReceiver<( Address, oneshot::Sender>, )>, ) { - trace!("start connect_manager"); - while let Some((addr, pid_sender)) = connect_receiver.next().await { + trace!("start connect_mgr"); + while let Some((addr, pid_sender)) = a2s_connect_r.next().await { let (protocol, handshake) = match addr { Address::Tcp(addr) => { self.metrics @@ -235,117 +241,126 @@ impl Scheduler { self.init_protocol(protocol, Some(pid_sender), handshake) .await; } - trace!("stop connect_manager"); + trace!("stop connect_mgr"); } - async fn disconnect_manager(&self, mut disconnect_receiver: mpsc::UnboundedReceiver) { - trace!("start disconnect_manager"); - while let Some(pid) = disconnect_receiver.next().await { + async fn disconnect_mgr( + &self, + mut a2s_disconnect_r: mpsc::UnboundedReceiver<( + Pid, + oneshot::Sender>, + )>, + ) { + trace!("start disconnect_mgr"); + while let Some((pid, return_once_successfull_shutdown)) = a2s_disconnect_r.next().await { //Closing Participants is done the following way: // 1. We drop our senders and receivers // 2. we need to close BParticipant, this will drop its senderns and receivers // 3. Participant will try to access the BParticipant senders and receivers with // their next api action, it will fail and be closed then. - if let Some((_, _, sender)) = self.participants.write().await.remove(&pid) { - sender.send(()).unwrap(); + let (finished_sender, finished_receiver) = oneshot::channel(); + if let Some(pi) = self.participants.write().await.get_mut(&pid) { + pi.s2b_shutdown_bparticipant_s + .take() + .unwrap() + .send(finished_sender) + .unwrap(); } + let e = finished_receiver.await.unwrap(); + //only remove after flush! + self.participants.write().await.remove(&pid).unwrap(); + return_once_successfull_shutdown.send(e); } - trace!("stop disconnect_manager"); + trace!("stop disconnect_mgr"); } - async fn send_outgoing(&self) { + async fn send_outgoing_mgr(&self) { //This time equals the MINIMUM Latency in average, so keep it down and //Todo: // make it configureable or switch to await E.g. Prio 0 = await, prio 50 // wait for more messages const TICK_TIME: std::time::Duration = std::time::Duration::from_millis(10); - const FRAMES_PER_TICK: usize = 1000000; - trace!("start send_outgoing"); + const FRAMES_PER_TICK: usize = 1000005; + trace!("start send_outgoing_mgr"); while !self.closed.load(Ordering::Relaxed) { let mut frames = VecDeque::new(); self.prios .lock() .await .fill_frames(FRAMES_PER_TICK, &mut frames); + if frames.len() > 0 { + trace!("tick {}", frames.len()); + } + let mut already_traced = HashSet::new(); for (pid, sid, frame) in frames { - if let Some((_, sender, _)) = self.participants.write().await.get_mut(&pid) { - sender.send((pid, sid, frame)).await.unwrap(); + if let Some(pi) = self.participants.write().await.get_mut(&pid) { + pi.s2b_frame_s.send((pid, sid, frame)).await.unwrap(); + } else { + if !already_traced.contains(&(pid, sid)) { + error!( + ?pid, + ?sid, + "dropping frames, as participant no longer exists!" + ); + already_traced.insert((pid, sid)); + } } } async_std::task::sleep(TICK_TIME).await; } - trace!("stop send_outgoing"); + trace!("stop send_outgoing_mgr"); } - // requested by participant when stream wants to close from api, checking if no - // more msg is in prio and return - pub(crate) async fn stream_finished_manager( - &self, - stream_finished_request_receiver: mpsc::UnboundedReceiver<(Pid, Sid, oneshot::Sender<()>)>, - ) { - trace!("start stream_finished_manager"); - stream_finished_request_receiver - .for_each_concurrent(None, async move |(pid, sid, sender)| { - //TODO: THERE MUST BE A MORE CLEVER METHOD THAN SPIN LOCKING! LIKE REGISTERING - // DIRECTLY IN PRIO AS A FUTURE WERE PRIO IS WAKER! TODO: also this - // has a great potential for handing network, if you create a network, send - // gigabytes close it then. Also i need a Mutex, which really adds - // to cost if alot strems want to close - self.stream_finished_waiter(pid, sid, sender).await; - }) - .await; - } - - async fn stream_finished_waiter(&self, pid: Pid, sid: Sid, sender: oneshot::Sender<()>) { - const TICK_TIME: std::time::Duration = std::time::Duration::from_millis(5); - //TODO: ARRRG, i need to wait for AT LEAST 1 TICK, because i am lazy i just - // wait 15mn and tick count is 10ms because recv is only done with a - // tick and not async as soon as we send.... - async_std::task::sleep(TICK_TIME * 3).await; - let mut n = 0u64; - loop { - if !self.prios.lock().await.contains_pid_sid(pid, sid) { - trace!("prio is clear, go to close stream as requested from api"); - sender.send(()).unwrap(); - break; - } - n += 1; - async_std::task::sleep(match n { - 0..=199 => TICK_TIME, - n if n.rem_euclid(100) == 0 => { - warn!(?pid, ?sid, ?n, "cant close stream, as it still queued"); - TICK_TIME * (n as f32 * (n as f32).sqrt() / 100.0) as u32 - }, - n => TICK_TIME * (n as f32 * (n as f32).sqrt() / 100.0) as u32, - }) - .await; - } - } - - pub(crate) async fn shutdown_manager(&self, receiver: oneshot::Receiver<()>) { - trace!("start shutdown_manager"); - receiver.await.unwrap(); + async fn scheduler_shutdown_mgr(&self, a2s_scheduler_shutdown_r: oneshot::Receiver<()>) { + trace!("start scheduler_shutdown_mgr"); + a2s_scheduler_shutdown_r.await.unwrap(); self.closed.store(true, Ordering::Relaxed); debug!("shutting down all BParticipants gracefully"); let mut participants = self.participants.write().await; - for (pid, (_, _, sender)) in participants.drain() { + let mut waitings = vec![]; + //close participants but don't remove them from self.participants yet + for (pid, pi) in participants.iter_mut() { trace!(?pid, "shutting down BParticipants"); - sender.send(()).unwrap(); + let (finished_sender, finished_receiver) = oneshot::channel(); + waitings.push((pid, finished_receiver)); + pi.s2b_shutdown_bparticipant_s + .take() + .unwrap() + .send(finished_sender) + .unwrap(); } - trace!("stop shutdown_manager"); + debug!("wait for partiticipants to be shut down"); + for (pid, recv) in waitings { + match recv.await { + Err(e) => error!( + ?pid, + ?e, + "failed to finish sending all remainding messages to participant when \ + shutting down" + ), + _ => (), + }; + } + //remove participants once everything is shut down + participants.clear(); + //removing the possibility to create new participants, needed to close down + // some mgr: + self.participant_channels.lock().await.take(); + + trace!("stop scheduler_shutdown_mgr"); } - pub(crate) async fn channel_creator( + async fn channel_creator( &self, addr: Address, - end_receiver: oneshot::Receiver<()>, - result_sender: oneshot::Sender>, + s2s_stop_listening_r: oneshot::Receiver<()>, + s2a_listen_result_s: oneshot::Sender>, ) { trace!(?addr, "start up channel creator"); match addr { Address::Tcp(addr) => { let listener = match net::TcpListener::bind(addr).await { Ok(listener) => { - result_sender.send(Ok(())).unwrap(); + s2a_listen_result_s.send(Ok(())).unwrap(); listener }, Err(e) => { @@ -354,13 +369,13 @@ impl Scheduler { ?e, "listener couldn't be started due to error on tcp bind" ); - result_sender.send(Err(e)).unwrap(); + s2a_listen_result_s.send(Err(e)).unwrap(); return; }, }; trace!(?addr, "listener bound"); let mut incoming = listener.incoming(); - let mut end_receiver = end_receiver.fuse(); + let mut end_receiver = s2s_stop_listening_r.fuse(); while let Some(stream) = select! { next = incoming.next().fuse() => next, _ = end_receiver => None, @@ -378,7 +393,7 @@ impl Scheduler { Address::Udp(addr) => { let socket = match net::UdpSocket::bind(addr).await { Ok(socket) => { - result_sender.send(Ok(())).unwrap(); + s2a_listen_result_s.send(Ok(())).unwrap(); Arc::new(socket) }, Err(e) => { @@ -387,7 +402,7 @@ impl Scheduler { ?e, "listener couldn't be started due to error on udp bind" ); - result_sender.send(Err(e)).unwrap(); + s2a_listen_result_s.send(Err(e)).unwrap(); return; }, }; @@ -395,7 +410,7 @@ impl Scheduler { // receiving is done from here and will be piped to protocol as UDP does not // have any state let mut listeners = HashMap::new(); - let mut end_receiver = end_receiver.fuse(); + let mut end_receiver = s2s_stop_listening_r.fuse(); let mut data = [0u8; 9216]; while let Ok((size, remote_addr)) = select! { next = socket.recv_from(&mut data).fuse() => next, @@ -424,9 +439,9 @@ impl Scheduler { trace!(?addr, "ending channel creator"); } - pub(crate) async fn udp_single_channel_connect( + async fn udp_single_channel_connect( socket: Arc, - mut udp_data_sender: mpsc::UnboundedSender>, + mut w2p_udp_package_s: mpsc::UnboundedSender>, ) { let addr = socket.local_addr(); trace!(?addr, "start udp_single_channel_connect"); @@ -443,7 +458,7 @@ impl Scheduler { } { let mut datavec = Vec::with_capacity(size); datavec.extend_from_slice(&data[0..size]); - udp_data_sender.send(datavec).await.unwrap(); + w2p_udp_package_s.send(datavec).await.unwrap(); } trace!(?addr, "stop udp_single_channel_connect"); } @@ -451,7 +466,7 @@ impl Scheduler { async fn init_protocol( &self, protocol: Protocols, - pid_sender: Option>>, + s2a_return_pid_s: Option>>, send_handshake: bool, ) { //channels are unknown till PID is known! @@ -460,7 +475,7 @@ impl Scheduler { Contra: - DOS posibility because we answer fist - Speed, because otherwise the message can be send with the creation */ - let mut participant_channels = self.participant_channels.clone(); + let mut participant_channels = self.participant_channels.lock().await.clone().unwrap(); // spawn is needed here, e.g. for TCP connect it would mean that only 1 // participant can be in handshake phase ever! Someone could deadlock // the whole server easily for new clients UDP doesnt work at all, as @@ -485,55 +500,53 @@ impl Scheduler { debug!(?cid, "new participant connected via a channel"); let ( bparticipant, - stream_open_sender, - stream_opened_receiver, - mut create_channel_sender, - frame_send_sender, - shutdown_sender, + a2b_steam_open_s, + b2a_stream_opened_r, + mut s2b_create_channel_s, + s2b_frame_s, + s2b_shutdown_bparticipant_s, ) = BParticipant::new( pid, sid, metrics.clone(), - participant_channels.prios_sender, - participant_channels.stream_finished_request_sender, + participant_channels.a2p_msg_s, + participant_channels.p2b_notify_empty_stream_s, ); let participant = Participant::new( local_pid, pid, - stream_open_sender, - stream_opened_receiver, - participant_channels.disconnect_sender, + a2b_steam_open_s, + b2a_stream_opened_r, + participant_channels.a2s_disconnect_s, ); metrics.participants_connected_total.inc(); - participants.insert( - pid, - ( - create_channel_sender.clone(), - frame_send_sender, - shutdown_sender, - ), - ); + participants.insert(pid, ParticipantInfo { + s2b_create_channel_s: s2b_create_channel_s.clone(), + s2b_frame_s, + s2b_shutdown_bparticipant_s: Some(s2b_shutdown_bparticipant_s), + }); pool.spawn_ok( bparticipant .run() .instrument(tracing::info_span!("participant", ?pid)), ); //create a new channel within BParticipant and wait for it to run - let (sync_sender, sync_receiver) = oneshot::channel(); - create_channel_sender - .send((cid, sid, protocol, sync_sender)) + let (b2s_create_channel_done_s, b2s_create_channel_done_r) = + oneshot::channel(); + s2b_create_channel_s + .send((cid, sid, protocol, b2s_create_channel_done_s)) .await .unwrap(); - sync_receiver.await.unwrap(); - if let Some(pid_oneshot) = pid_sender { + b2s_create_channel_done_r.await.unwrap(); + if let Some(pid_oneshot) = s2a_return_pid_s { // someone is waiting with connect, so give them their PID pid_oneshot.send(Ok(participant)).unwrap(); } else { // noone is waiting on this Participant, return in to Network participant_channels - .connected_sender + .s2a_connected_s .send(participant) .await .unwrap(); diff --git a/network/src/types.rs b/network/src/types.rs index dcda4e29a6..b98de3ba71 100644 --- a/network/src/types.rs +++ b/network/src/types.rs @@ -118,14 +118,6 @@ impl Frame { } } -#[derive(Debug)] -pub(crate) enum Requestor { - User, - Api, - Scheduler, - Remote, -} - impl Pid { /// create a new Pid with a random interior value ///