From dc8e0f5bff1a96122ce4384e7f305f89969a3477 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=A4rtens?= Date: Sun, 23 Aug 2020 21:43:17 +0200 Subject: [PATCH] using Locks a more sensitive way. - replace RwLock by Mutex if it's only accessed for insert/delete - use RwLock> pattern otherwise in order to allow concurrent `.read()` - fixed a deadlock O.o --- network/src/api.rs | 55 ++++++++++++-------------- network/src/participant.rs | 80 +++++++++++++++++++++----------------- network/src/scheduler.rs | 23 +++++------ 3 files changed, 79 insertions(+), 79 deletions(-) diff --git a/network/src/api.rs b/network/src/api.rs index 54de28cf35..07fbcd1375 100644 --- a/network/src/api.rs +++ b/network/src/api.rs @@ -8,11 +8,7 @@ use crate::{ scheduler::Scheduler, types::{Mid, Pid, Prio, Promises, Sid}, }; -use async_std::{ - io, - sync::{Mutex, RwLock}, - task, -}; +use async_std::{io, sync::Mutex, task}; use futures::{ channel::{mpsc, oneshot}, sink::SinkExt, @@ -52,8 +48,8 @@ pub enum ProtocolAddr { pub struct Participant { local_pid: Pid, remote_pid: Pid, - a2b_stream_open_s: RwLock>, - b2a_stream_opened_r: RwLock>, + a2b_stream_open_s: Mutex>, + b2a_stream_opened_r: Mutex>, a2s_disconnect_s: A2sDisconnect, } @@ -147,12 +143,12 @@ pub enum StreamError { /// [`connected`]: Network::connected pub struct Network { local_pid: Pid, - participant_disconnect_sender: RwLock>, + participant_disconnect_sender: Mutex>, listen_sender: - RwLock>)>>, + Mutex>)>>, connect_sender: - RwLock>)>>, - connected_receiver: RwLock>, + Mutex>)>>, + connected_receiver: Mutex>, shutdown_sender: Option>, } @@ -249,10 +245,10 @@ impl Network { ( Self { local_pid: participant_id, - participant_disconnect_sender: RwLock::new(HashMap::new()), - listen_sender: RwLock::new(listen_sender), - connect_sender: RwLock::new(connect_sender), - connected_receiver: RwLock::new(connected_receiver), + participant_disconnect_sender: Mutex::new(HashMap::new()), + listen_sender: Mutex::new(listen_sender), + connect_sender: Mutex::new(connect_sender), + connected_receiver: Mutex::new(connected_receiver), shutdown_sender: Some(shutdown_sender), }, move || { @@ -300,7 +296,7 @@ impl Network { let (s2a_result_s, s2a_result_r) = oneshot::channel::>(); debug!(?address, "listening on address"); self.listen_sender - .write() + .lock() .await .send((address, s2a_result_s)) .await?; @@ -356,7 +352,7 @@ impl Network { let (pid_sender, pid_receiver) = oneshot::channel::>(); debug!(?address, "Connect to address"); self.connect_sender - .write() + .lock() .await .send((address, pid_sender)) .await?; @@ -370,7 +366,7 @@ impl Network { "Received Participant id from remote and return to user" ); self.participant_disconnect_sender - .write() + .lock() .await .insert(pid, participant.a2s_disconnect_s.clone()); Ok(participant) @@ -410,9 +406,9 @@ impl Network { /// [`Streams`]: crate::api::Stream /// [`listen`]: crate::api::Network::listen pub async fn connected(&self) -> Result { - let participant = self.connected_receiver.write().await.next().await?; + let participant = self.connected_receiver.lock().await.next().await?; self.participant_disconnect_sender - .write() + .lock() .await .insert(participant.remote_pid, participant.a2s_disconnect_s.clone()); Ok(participant) @@ -430,8 +426,8 @@ impl Participant { Self { local_pid, remote_pid, - a2b_stream_open_s: RwLock::new(a2b_stream_open_s), - b2a_stream_opened_r: RwLock::new(b2a_stream_opened_r), + a2b_stream_open_s: Mutex::new(a2b_stream_open_s), + b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r), a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))), } } @@ -477,11 +473,11 @@ impl Participant { /// /// [`Streams`]: crate::api::Stream pub async fn open(&self, prio: u8, promises: Promises) -> Result { - //use this lock for now to make sure that only one open at a time is made, - // TODO: not sure if we can paralise that, check in future - let mut a2b_stream_open_s = self.a2b_stream_open_s.write().await; let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel(); - if let Err(e) = a2b_stream_open_s + if let Err(e) = self + .a2b_stream_open_s + .lock() + .await .send((prio, promises, p2a_return_stream_s)) .await { @@ -535,10 +531,7 @@ impl Participant { /// [`connected`]: Network::connected /// [`open`]: Participant::open pub async fn opened(&self) -> Result { - //use this lock for now to make sure that only one open at a time is made, - // TODO: not sure if we can paralise that, check in future - let mut stream_opened_receiver = self.b2a_stream_opened_r.write().await; - match stream_opened_receiver.next().await { + match self.b2a_stream_opened_r.lock().await.next().await { Some(stream) => { let sid = stream.sid; debug!(?sid, ?self.remote_pid, "Receive opened stream"); @@ -861,7 +854,7 @@ impl Drop for Network { // we MUST avoid nested block_on, good that Network::Drop no longer triggers // Participant::Drop directly but just the BParticipant for (remote_pid, a2s_disconnect_s) in - self.participant_disconnect_sender.write().await.drain() + self.participant_disconnect_sender.lock().await.drain() { match a2s_disconnect_s.lock().await.take() { Some(mut a2s_disconnect_s) => { diff --git a/network/src/participant.rs b/network/src/participant.rs index 741d85fa2b..3882a2f5e7 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -8,7 +8,7 @@ use crate::{ protocols::Protocols, types::{Cid, Frame, Pid, Prio, Promises, Sid}, }; -use async_std::sync::RwLock; +use async_std::sync::{Mutex, RwLock}; use futures::{ channel::{mpsc, oneshot}, future::FutureExt, @@ -46,7 +46,7 @@ struct StreamInfo { prio: Prio, promises: Promises, send_closed: Arc, - b2a_msg_recv_s: mpsc::UnboundedSender, + b2a_msg_recv_s: Mutex>, } #[derive(Debug)] @@ -71,7 +71,7 @@ pub struct BParticipant { remote_pid: Pid, remote_pid_string: String, //optimisation offset_sid: Sid, - channels: Arc>>, + channels: Arc>>>, streams: RwLock>, running_mgr: AtomicUsize, run_channels: Option, @@ -252,10 +252,10 @@ impl BParticipant { frame: Frame, #[cfg(feature = "metrics")] frames_out_total_cache: &mut PidCidFrameCache, ) -> bool { - // find out ideal channel here - //TODO: just take first - let mut lock = self.channels.write().await; - if let Some(ci) = lock.values_mut().next() { + let mut drop_cid = None; + // TODO: find out ideal channel here + let res = if let Some(ci) = self.channels.read().await.values().next() { + let mut ci = ci.lock().await; //we are increasing metrics without checking the result to please // borrow_checker. otherwise we would need to close `frame` what we // dont want! @@ -266,20 +266,7 @@ impl BParticipant { if let Err(e) = ci.b2w_frame_s.send(frame).await { let cid = ci.cid; info!(?e, ?cid, "channel no longer available"); - if let Some(ci) = self.channels.write().await.remove(&cid) { - trace!(?cid, "stopping read protocol"); - if let Err(e) = ci.b2r_read_shutdown.send(()) { - trace!(?cid, ?e, "seems like was already shut down"); - } - } - //TODO FIXME tags: takeover channel multiple - info!( - "FIXME: the frame is actually drop. which is fine for now as the participant \ - will be closed, but not if we do channel-takeover" - ); - //TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant - self.close_write_api(Some(ParticipantError::ProtocolFailedUnrecoverable)) - .await; + drop_cid = Some(cid); false } else { true @@ -301,7 +288,25 @@ impl BParticipant { guard.1 += 1; } false - } + }; + if let Some(cid) = drop_cid { + if let Some(ci) = self.channels.write().await.remove(&cid) { + let ci = ci.into_inner(); + trace!(?cid, "stopping read protocol"); + if let Err(e) = ci.b2r_read_shutdown.send(()) { + trace!(?cid, ?e, "seems like was already shut down"); + } + } + //TODO FIXME tags: takeover channel multiple + info!( + "FIXME: the frame is actually drop. which is fine for now as the participant will \ + be closed, but not if we do channel-takeover" + ); + //TEMP FIX: as we dont have channel takeover yet drop the whole bParticipant + self.close_write_api(Some(ParticipantError::ProtocolFailedUnrecoverable)) + .await; + }; + res } async fn handle_frames_mgr( @@ -325,7 +330,8 @@ impl BParticipant { Err(()) => { // The read protocol stopped, i need to make sure that write gets stopped debug!("read protocol was closed. Stopping write protocol"); - if let Some(ci) = self.channels.write().await.get_mut(&cid) { + if let Some(ci) = self.channels.read().await.get(&cid) { + let mut ci = ci.lock().await; ci.b2w_frame_s .close() .await @@ -381,7 +387,7 @@ impl BParticipant { .with_label_values(&[&self.remote_pid_string]) .inc(); si.send_closed.store(true, Ordering::Relaxed); - si.b2a_msg_recv_s.close_channel(); + si.b2a_msg_recv_s.into_inner().close_channel(); trace!(?sid, "Closed stream from remote"); } else { warn!( @@ -414,8 +420,8 @@ impl BParticipant { if finished { //trace!(?mid, "finished receiving message"); let imsg = messages.remove(&mid).unwrap(); - if let Some(si) = self.streams.write().await.get_mut(&imsg.sid) { - if let Err(e) = si.b2a_msg_recv_s.send(imsg).await { + if let Some(si) = self.streams.read().await.get(&imsg.sid) { + if let Err(e) = si.b2a_msg_recv_s.lock().await.send(imsg).await { warn!( ?e, ?mid, @@ -482,12 +488,15 @@ impl BParticipant { let channels = self.channels.clone(); async move { let (channel, b2w_frame_s, b2r_read_shutdown) = Channel::new(cid); - channels.write().await.insert(cid, ChannelInfo { + channels.write().await.insert( cid, - cid_string: cid.to_string(), - b2w_frame_s, - b2r_read_shutdown, - }); + Mutex::new(ChannelInfo { + cid, + cid_string: cid.to_string(), + b2w_frame_s, + b2r_read_shutdown, + }), + ); b2s_create_channel_done_s.send(()).unwrap(); #[cfg(feature = "metrics")] self.metrics @@ -619,6 +628,7 @@ impl BParticipant { debug!("Closing all channels, after flushed prios"); for (cid, ci) in self.channels.write().await.drain() { + let ci = ci.into_inner(); if let Err(e) = ci.b2r_read_shutdown.send(()) { debug!( ?e, @@ -695,7 +705,7 @@ impl BParticipant { match self.streams.read().await.get(&sid) { Some(si) => { si.send_closed.store(true, Ordering::Relaxed); - si.b2a_msg_recv_s.close_channel(); + si.b2a_msg_recv_s.lock().await.close_channel(); }, None => warn!("Couldn't find the stream, might be simultaneous close from remote"), } @@ -742,7 +752,7 @@ impl BParticipant { prio, promises, send_closed: send_closed.clone(), - b2a_msg_recv_s, + b2a_msg_recv_s: Mutex::new(b2a_msg_recv_s), }); #[cfg(feature = "metrics")] self.metrics @@ -770,7 +780,7 @@ impl BParticipant { lock.b2a_stream_opened_s.close_channel(); debug!("Closing all streams for write"); - for (sid, si) in self.streams.write().await.iter() { + for (sid, si) in self.streams.read().await.iter() { trace!(?sid, "Shutting down Stream for write"); si.send_closed.store(true, Ordering::Relaxed); } @@ -783,7 +793,7 @@ impl BParticipant { debug!("Closing all streams"); for (sid, si) in self.streams.write().await.drain() { trace!(?sid, "Shutting down Stream"); - si.b2a_msg_recv_s.close_channel(); + si.b2a_msg_recv_s.lock().await.close_channel(); } } } diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index ae53f5d4e7..b352463166 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -7,10 +7,7 @@ use crate::{ protocols::{Protocols, TcpProtocol, UdpProtocol}, types::Pid, }; -use async_std::{ - io, net, - sync::{Mutex, RwLock}, -}; +use async_std::{io, net, sync::Mutex}; use futures::{ channel::{mpsc, oneshot}, executor::ThreadPool, @@ -76,9 +73,9 @@ pub struct Scheduler { pool: Arc, run_channels: Option, participant_channels: Arc>>, - participants: Arc>>, + participants: Arc>>, channel_ids: Arc, - channel_listener: RwLock>>, + channel_listener: Mutex>>, #[cfg(feature = "metrics")] metrics: Arc, } @@ -136,9 +133,9 @@ impl Scheduler { pool: Arc::new(ThreadPool::new().unwrap()), run_channels, participant_channels: Arc::new(Mutex::new(Some(participant_channels))), - participants: Arc::new(RwLock::new(HashMap::new())), + participants: Arc::new(Mutex::new(HashMap::new())), channel_ids: Arc::new(AtomicU64::new(0)), - channel_listener: RwLock::new(HashMap::new()), + channel_listener: Mutex::new(HashMap::new()), #[cfg(feature = "metrics")] metrics, }, @@ -180,7 +177,7 @@ impl Scheduler { .inc(); let (end_sender, end_receiver) = oneshot::channel::<()>(); self.channel_listener - .write() + .lock() .await .insert(address.clone(), end_sender); self.channel_creator(address, end_receiver, s2a_listen_result_s) @@ -273,7 +270,7 @@ impl Scheduler { // 3. Participant will try to access the BParticipant senders and receivers with // their next api action, it will fail and be closed then. trace!(?pid, "Got request to close participant"); - if let Some(mut pi) = self.participants.write().await.remove(&pid) { + if let Some(mut pi) = self.participants.lock().await.remove(&pid) { let (finished_sender, finished_receiver) = oneshot::channel(); pi.s2b_shutdown_bparticipant_s .take() @@ -310,7 +307,7 @@ impl Scheduler { a2s_scheduler_shutdown_r.await.unwrap(); self.closed.store(true, Ordering::Relaxed); debug!("Shutting down all BParticipants gracefully"); - let mut participants = self.participants.write().await; + let mut participants = self.participants.lock().await; let waitings = participants .drain() .map(|(pid, mut pi)| { @@ -336,7 +333,7 @@ impl Scheduler { }; } debug!("shutting down protocol listeners"); - for (addr, end_channel_sender) in self.channel_listener.write().await.drain() { + for (addr, end_channel_sender) in self.channel_listener.lock().await.drain() { trace!(?addr, "stopping listen on protocol"); if let Err(e) = end_channel_sender.send(()) { warn!(?addr, ?e, "listener crashed/disconnected already"); @@ -531,7 +528,7 @@ impl Scheduler { ?pid, "Detected that my channel is ready!, activating it :)" ); - let mut participants = participants.write().await; + let mut participants = participants.lock().await; if !participants.contains_key(&pid) { debug!(?cid, "New participant connected via a channel"); let (