Merge branch 'xMAC94x/quick_net_fix' into 'master'

fix main thread blocking which was a bad combination of

See merge request veloren/veloren!1793
This commit is contained in:
Marcel 2021-02-18 19:26:43 +00:00
commit 3f571b3b4e

View File

@ -57,6 +57,12 @@ struct ShutdownInfo {
error: Option<ParticipantError>, error: Option<ParticipantError>,
} }
#[derive(Debug)]
struct OpenStreamInfo {
a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
}
#[derive(Debug)] #[derive(Debug)]
pub struct BParticipant { pub struct BParticipant {
local_pid: Pid, //tracing local_pid: Pid, //tracing
@ -69,6 +75,7 @@ pub struct BParticipant {
shutdown_barrier: AtomicI32, shutdown_barrier: AtomicI32,
metrics: Arc<NetworkMetrics>, metrics: Arc<NetworkMetrics>,
no_channel_error_info: RwLock<(Instant, u64)>, no_channel_error_info: RwLock<(Instant, u64)>,
open_stream_channels: Arc<Mutex<Option<OpenStreamInfo>>>,
} }
impl BParticipant { impl BParticipant {
@ -118,6 +125,7 @@ impl BParticipant {
run_channels, run_channels,
metrics, metrics,
no_channel_error_info: RwLock::new((Instant::now(), 0)), no_channel_error_info: RwLock::new((Instant::now(), 0)),
open_stream_channels: Arc::new(Mutex::new(None)),
}, },
a2b_open_stream_s, a2b_open_stream_s,
b2a_stream_opened_r, b2a_stream_opened_r,
@ -139,9 +147,12 @@ impl BParticipant {
crossbeam_channel::unbounded::<ProtocolEvent>(); crossbeam_channel::unbounded::<ProtocolEvent>();
let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded_channel::<Sid>(); let (a2b_close_stream_s, a2b_close_stream_r) = mpsc::unbounded_channel::<Sid>();
const STREAM_BOUND: usize = 10_000; let (a2b_msg_s, a2b_msg_r) = crossbeam_channel::unbounded::<(Sid, Bytes)>();
let (a2b_msg_s, a2b_msg_r) = crossbeam_channel::bounded::<(Sid, Bytes)>(STREAM_BOUND);
*self.open_stream_channels.lock().await = Some(OpenStreamInfo {
a2b_msg_s,
a2b_close_stream_s,
});
let run_channels = self.run_channels.take().unwrap(); let run_channels = self.run_channels.take().unwrap();
trace!("start all managers"); trace!("start all managers");
tokio::join!( tokio::join!(
@ -153,8 +164,6 @@ impl BParticipant {
b2b_close_send_protocol_r, b2b_close_send_protocol_r,
b2b_notify_send_of_recv_r, b2b_notify_send_of_recv_r,
b2s_prio_statistic_s, b2s_prio_statistic_s,
a2b_msg_s.clone(), //self
a2b_close_stream_s.clone(), //self
) )
.instrument(tracing::info_span!("send")), .instrument(tracing::info_span!("send")),
self.recv_mgr( self.recv_mgr(
@ -163,8 +172,6 @@ impl BParticipant {
b2b_force_close_recv_protocol_r, b2b_force_close_recv_protocol_r,
b2b_close_send_protocol_s.clone(), b2b_close_send_protocol_s.clone(),
b2b_notify_send_of_recv_s, b2b_notify_send_of_recv_s,
a2b_msg_s.clone(), //self
a2b_close_stream_s.clone(), //self
) )
.instrument(tracing::info_span!("recv")), .instrument(tracing::info_span!("recv")),
self.create_channel_mgr( self.create_channel_mgr(
@ -191,8 +198,6 @@ impl BParticipant {
b2b_close_send_protocol_r: async_channel::Receiver<Cid>, b2b_close_send_protocol_r: async_channel::Receiver<Cid>,
b2b_notify_send_of_recv_r: crossbeam_channel::Receiver<ProtocolEvent>, b2b_notify_send_of_recv_r: crossbeam_channel::Receiver<ProtocolEvent>,
_b2s_prio_statistic_s: mpsc::UnboundedSender<B2sPrioStatistic>, _b2s_prio_statistic_s: mpsc::UnboundedSender<B2sPrioStatistic>,
a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
) { ) {
let mut send_protocols: HashMap<Cid, SendProtocols> = HashMap::new(); let mut send_protocols: HashMap<Cid, SendProtocols> = HashMap::new();
let mut interval = tokio::time::interval(Self::TICK_TIME); let mut interval = tokio::time::interval(Self::TICK_TIME);
@ -222,6 +227,7 @@ impl BParticipant {
Some((cid, a)) => (*cid, a), Some((cid, a)) => (*cid, a),
None => { None => {
warn!("no channel"); warn!("no channel");
tokio::time::sleep(Self::TICK_TIME * 1000).await; //TODO: failover
continue; continue;
}, },
}; };
@ -232,14 +238,7 @@ impl BParticipant {
trace!(?sid, "open stream"); trace!(?sid, "open stream");
stream_ids += Sid::from(1); stream_ids += Sid::from(1);
let stream = self let stream = self
.create_stream( .create_stream(sid, prio, promises, guaranteed_bandwidth)
sid,
prio,
promises,
guaranteed_bandwidth,
&a2b_msg_s,
&a2b_close_stream_s,
)
.await; .await;
let event = ProtocolEvent::OpenStream { let event = ProtocolEvent::OpenStream {
@ -305,8 +304,12 @@ impl BParticipant {
info!(?cid, ?e, "protocol failed, shutting down channel"); info!(?cid, ?e, "protocol failed, shutting down channel");
// remote recv will now fail, which will trigger remote send which will trigger // remote recv will now fail, which will trigger remote send which will trigger
// recv // recv
trace!("TODO: for now decide to FAIL this participant and not wait for a failover");
send_protocols.remove(&cid).unwrap(); send_protocols.remove(&cid).unwrap();
self.metrics.channels_disconnected(&self.remote_pid_string); self.metrics.channels_disconnected(&self.remote_pid_string);
if send_protocols.is_empty() {
break;
}
} }
if let Some(cid) = remp { if let Some(cid) = remp {
@ -326,6 +329,8 @@ impl BParticipant {
} }
} }
} }
trace!("stop sending in api!");
self.open_stream_channels.lock().await.take();
trace!("Stop send_mgr"); trace!("Stop send_mgr");
self.shutdown_barrier self.shutdown_barrier
.fetch_sub(Self::BARR_SEND, Ordering::Relaxed); .fetch_sub(Self::BARR_SEND, Ordering::Relaxed);
@ -339,8 +344,6 @@ impl BParticipant {
b2b_force_close_recv_protocol_r: async_channel::Receiver<Cid>, b2b_force_close_recv_protocol_r: async_channel::Receiver<Cid>,
b2b_close_send_protocol_s: async_channel::Sender<Cid>, b2b_close_send_protocol_s: async_channel::Sender<Cid>,
b2b_notify_send_of_recv_s: crossbeam_channel::Sender<ProtocolEvent>, b2b_notify_send_of_recv_s: crossbeam_channel::Sender<ProtocolEvent>,
a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
) { ) {
let mut recv_protocols: HashMap<Cid, JoinHandle<()>> = HashMap::new(); let mut recv_protocols: HashMap<Cid, JoinHandle<()>> = HashMap::new();
// we should be able to directly await futures imo // we should be able to directly await futures imo
@ -402,14 +405,7 @@ impl BParticipant {
// waiting for receiving is not necessary, because the send_mgr will first // waiting for receiving is not necessary, because the send_mgr will first
// process this before process messages! // process this before process messages!
let stream = self let stream = self
.create_stream( .create_stream(sid, prio, promises, guaranteed_bandwidth)
sid,
prio,
promises,
guaranteed_bandwidth,
&a2b_msg_s,
&a2b_close_stream_s,
)
.await; .await;
b2a_stream_opened_s.send(stream).unwrap(); b2a_stream_opened_s.send(stream).unwrap();
retrigger(cid, p, &mut recv_protocols); retrigger(cid, p, &mut recv_protocols);
@ -625,8 +621,6 @@ impl BParticipant {
prio: Prio, prio: Prio,
promises: Promises, promises: Promises,
guaranteed_bandwidth: Bandwidth, guaranteed_bandwidth: Bandwidth,
a2b_msg_s: &crossbeam_channel::Sender<(Sid, Bytes)>,
a2b_close_stream_s: &mpsc::UnboundedSender<Sid>,
) -> Stream { ) -> Stream {
let (b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded::<Bytes>(); let (b2a_msg_recv_s, b2a_msg_recv_r) = async_channel::unbounded::<Bytes>();
let send_closed = Arc::new(AtomicBool::new(false)); let send_closed = Arc::new(AtomicBool::new(false));
@ -637,6 +631,24 @@ impl BParticipant {
b2a_msg_recv_s: Mutex::new(b2a_msg_recv_s), b2a_msg_recv_s: Mutex::new(b2a_msg_recv_s),
}); });
self.metrics.streams_opened(&self.remote_pid_string); self.metrics.streams_opened(&self.remote_pid_string);
let (a2b_msg_s, a2b_close_stream_s) = {
let lock = self.open_stream_channels.lock().await;
match &*lock {
Some(osi) => (osi.a2b_msg_s.clone(), osi.a2b_close_stream_s.clone()),
None => {
// This Stream will not be able to send. feed it some "Dummy" Channels.
debug!(
"It seems that a stream was requested to open, while the send_mgr is \
already closed"
);
let (a2b_msg_s, _) = crossbeam_channel::unbounded();
let (a2b_close_stream_s, _) = mpsc::unbounded_channel();
(a2b_msg_s, a2b_close_stream_s)
},
}
};
Stream::new( Stream::new(
self.local_pid, self.local_pid,
self.remote_pid, self.remote_pid,
@ -645,9 +657,9 @@ impl BParticipant {
promises, promises,
guaranteed_bandwidth, guaranteed_bandwidth,
send_closed, send_closed,
a2b_msg_s.clone(), a2b_msg_s,
b2a_msg_recv_r, b2a_msg_recv_r,
a2b_close_stream_s.clone(), a2b_close_stream_s,
) )
} }
} }