2021-02-14 17:45:12 +00:00
|
|
|
#[cfg(feature = "metrics")]
|
|
|
|
use crate::metrics::RemoveReason;
|
2021-01-22 16:09:20 +00:00
|
|
|
use crate::{
|
|
|
|
event::ProtocolEvent,
|
|
|
|
frame::InitFrame,
|
|
|
|
handshake::{ReliableDrain, ReliableSink},
|
2021-02-14 17:45:12 +00:00
|
|
|
metrics::ProtocolMetricCache,
|
2021-01-22 16:09:20 +00:00
|
|
|
types::Bandwidth,
|
2021-02-14 17:45:12 +00:00
|
|
|
ProtocolError, RecvProtocol, SendProtocol, UnreliableDrain, UnreliableSink,
|
2021-01-22 16:09:20 +00:00
|
|
|
};
|
|
|
|
use async_trait::async_trait;
|
|
|
|
use std::time::{Duration, Instant};
|
2021-02-10 10:37:42 +00:00
|
|
|
#[cfg(feature = "trace_pedantic")]
|
|
|
|
use tracing::trace;
|
2021-01-22 16:09:20 +00:00
|
|
|
|
2021-02-14 17:45:12 +00:00
|
|
|
/// used for implementing your own MPSC `Sink` and `Drain`
|
2021-02-10 10:37:42 +00:00
|
|
|
#[derive(Debug)]
|
2021-02-14 17:45:12 +00:00
|
|
|
pub enum MpscMsg {
|
2021-01-22 16:09:20 +00:00
|
|
|
Event(ProtocolEvent),
|
|
|
|
InitFrame(InitFrame),
|
|
|
|
}
|
|
|
|
|
2021-02-14 17:45:12 +00:00
|
|
|
/// MPSC implementation of [`SendProtocol`]
|
|
|
|
///
|
|
|
|
/// [`SendProtocol`]: crate::SendProtocol
|
2021-01-22 16:09:20 +00:00
|
|
|
#[derive(Debug)]
|
2021-02-14 17:45:12 +00:00
|
|
|
pub struct MpscSendProtocol<D>
|
2021-01-22 16:09:20 +00:00
|
|
|
where
|
|
|
|
D: UnreliableDrain<DataFormat = MpscMsg>,
|
|
|
|
{
|
|
|
|
drain: D,
|
|
|
|
last: Instant,
|
|
|
|
metrics: ProtocolMetricCache,
|
|
|
|
}
|
|
|
|
|
2021-02-14 17:45:12 +00:00
|
|
|
/// MPSC implementation of [`RecvProtocol`]
|
|
|
|
///
|
|
|
|
/// [`RecvProtocol`]: crate::RecvProtocol
|
2021-01-22 16:09:20 +00:00
|
|
|
#[derive(Debug)]
|
2021-02-14 17:45:12 +00:00
|
|
|
pub struct MpscRecvProtocol<S>
|
2021-01-22 16:09:20 +00:00
|
|
|
where
|
|
|
|
S: UnreliableSink<DataFormat = MpscMsg>,
|
|
|
|
{
|
|
|
|
sink: S,
|
|
|
|
metrics: ProtocolMetricCache,
|
|
|
|
}
|
|
|
|
|
2021-02-14 17:45:12 +00:00
|
|
|
impl<D> MpscSendProtocol<D>
|
2021-01-22 16:09:20 +00:00
|
|
|
where
|
|
|
|
D: UnreliableDrain<DataFormat = MpscMsg>,
|
|
|
|
{
|
|
|
|
pub fn new(drain: D, metrics: ProtocolMetricCache) -> Self {
|
|
|
|
Self {
|
|
|
|
drain,
|
|
|
|
last: Instant::now(),
|
|
|
|
metrics,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-14 17:45:12 +00:00
|
|
|
impl<S> MpscRecvProtocol<S>
|
2021-01-22 16:09:20 +00:00
|
|
|
where
|
|
|
|
S: UnreliableSink<DataFormat = MpscMsg>,
|
|
|
|
{
|
|
|
|
pub fn new(sink: S, metrics: ProtocolMetricCache) -> Self { Self { sink, metrics } }
|
|
|
|
}
|
|
|
|
|
|
|
|
#[async_trait]
|
2021-02-14 17:45:12 +00:00
|
|
|
impl<D> SendProtocol for MpscSendProtocol<D>
|
2021-01-22 16:09:20 +00:00
|
|
|
where
|
|
|
|
D: UnreliableDrain<DataFormat = MpscMsg>,
|
|
|
|
{
|
2021-02-10 10:37:42 +00:00
|
|
|
fn notify_from_recv(&mut self, _event: ProtocolEvent) {}
|
|
|
|
|
2021-01-22 16:09:20 +00:00
|
|
|
async fn send(&mut self, event: ProtocolEvent) -> Result<(), ProtocolError> {
|
2021-02-10 10:37:42 +00:00
|
|
|
#[cfg(feature = "trace_pedantic")]
|
|
|
|
trace!(?event, "send");
|
2021-01-22 16:09:20 +00:00
|
|
|
match &event {
|
|
|
|
ProtocolEvent::Message {
|
2021-02-14 17:45:12 +00:00
|
|
|
data: _data,
|
2021-01-22 16:09:20 +00:00
|
|
|
mid: _,
|
2021-02-14 17:45:12 +00:00
|
|
|
sid: _sid,
|
2021-01-22 16:09:20 +00:00
|
|
|
} => {
|
2021-02-14 17:45:12 +00:00
|
|
|
#[cfg(feature = "metrics")]
|
|
|
|
let (bytes, line) = {
|
|
|
|
let sid = *_sid;
|
|
|
|
let bytes = _data.len() as u64;
|
|
|
|
let line = self.metrics.init_sid(sid);
|
|
|
|
line.smsg_it.inc();
|
|
|
|
line.smsg_ib.inc_by(bytes);
|
|
|
|
(bytes, line)
|
|
|
|
};
|
2021-01-22 16:09:20 +00:00
|
|
|
let r = self.drain.send(MpscMsg::Event(event)).await;
|
2021-02-14 17:45:12 +00:00
|
|
|
#[cfg(feature = "metrics")]
|
|
|
|
{
|
|
|
|
line.smsg_ot[RemoveReason::Finished.i()].inc();
|
|
|
|
line.smsg_ob[RemoveReason::Finished.i()].inc_by(bytes);
|
|
|
|
}
|
2021-01-22 16:09:20 +00:00
|
|
|
r
|
|
|
|
},
|
|
|
|
_ => self.drain.send(MpscMsg::Event(event)).await,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn flush(&mut self, _: Bandwidth, _: Duration) -> Result<(), ProtocolError> { Ok(()) }
|
|
|
|
}
|
|
|
|
|
|
|
|
#[async_trait]
|
2021-02-14 17:45:12 +00:00
|
|
|
impl<S> RecvProtocol for MpscRecvProtocol<S>
|
2021-01-22 16:09:20 +00:00
|
|
|
where
|
|
|
|
S: UnreliableSink<DataFormat = MpscMsg>,
|
|
|
|
{
|
|
|
|
async fn recv(&mut self) -> Result<ProtocolEvent, ProtocolError> {
|
2021-02-10 10:37:42 +00:00
|
|
|
let event = self.sink.recv().await?;
|
|
|
|
#[cfg(feature = "trace_pedantic")]
|
|
|
|
trace!(?event, "recv");
|
|
|
|
match event {
|
2021-01-22 16:09:20 +00:00
|
|
|
MpscMsg::Event(e) => {
|
2021-02-14 17:45:12 +00:00
|
|
|
#[cfg(feature = "metrics")]
|
2021-01-22 16:09:20 +00:00
|
|
|
{
|
2021-02-14 17:45:12 +00:00
|
|
|
if let ProtocolEvent::Message { data, mid: _, sid } = &e {
|
|
|
|
let sid = *sid;
|
|
|
|
let bytes = data.len() as u64;
|
|
|
|
let line = self.metrics.init_sid(sid);
|
|
|
|
line.rmsg_it.inc();
|
|
|
|
line.rmsg_ib.inc_by(bytes);
|
|
|
|
line.rmsg_ot[RemoveReason::Finished.i()].inc();
|
|
|
|
line.rmsg_ob[RemoveReason::Finished.i()].inc_by(bytes);
|
|
|
|
}
|
2021-01-22 16:09:20 +00:00
|
|
|
}
|
|
|
|
Ok(e)
|
|
|
|
},
|
|
|
|
MpscMsg::InitFrame(_) => Err(ProtocolError::Closed),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[async_trait]
|
2021-02-14 17:45:12 +00:00
|
|
|
impl<D> ReliableDrain for MpscSendProtocol<D>
|
2021-01-22 16:09:20 +00:00
|
|
|
where
|
|
|
|
D: UnreliableDrain<DataFormat = MpscMsg>,
|
|
|
|
{
|
|
|
|
async fn send(&mut self, frame: InitFrame) -> Result<(), ProtocolError> {
|
|
|
|
self.drain.send(MpscMsg::InitFrame(frame)).await
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[async_trait]
|
2021-02-14 17:45:12 +00:00
|
|
|
impl<S> ReliableSink for MpscRecvProtocol<S>
|
2021-01-22 16:09:20 +00:00
|
|
|
where
|
|
|
|
S: UnreliableSink<DataFormat = MpscMsg>,
|
|
|
|
{
|
|
|
|
async fn recv(&mut self) -> Result<InitFrame, ProtocolError> {
|
|
|
|
match self.sink.recv().await? {
|
|
|
|
MpscMsg::Event(_) => Err(ProtocolError::Closed),
|
|
|
|
MpscMsg::InitFrame(f) => Ok(f),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
pub mod test_utils {
|
|
|
|
use super::*;
|
2021-02-14 17:45:12 +00:00
|
|
|
use crate::metrics::{ProtocolMetricCache, ProtocolMetrics};
|
2021-01-22 16:09:20 +00:00
|
|
|
use async_channel::*;
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
|
|
|
pub struct ACDrain {
|
|
|
|
sender: Sender<MpscMsg>,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct ACSink {
|
|
|
|
receiver: Receiver<MpscMsg>,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn ac_bound(
|
|
|
|
cap: usize,
|
|
|
|
metrics: Option<ProtocolMetricCache>,
|
2021-02-14 17:45:12 +00:00
|
|
|
) -> [(MpscSendProtocol<ACDrain>, MpscRecvProtocol<ACSink>); 2] {
|
2021-01-22 16:09:20 +00:00
|
|
|
let (s1, r1) = async_channel::bounded(cap);
|
|
|
|
let (s2, r2) = async_channel::bounded(cap);
|
|
|
|
let m = metrics.unwrap_or_else(|| {
|
|
|
|
ProtocolMetricCache::new("mpsc", Arc::new(ProtocolMetrics::new().unwrap()))
|
|
|
|
});
|
|
|
|
[
|
|
|
|
(
|
2021-02-14 17:45:12 +00:00
|
|
|
MpscSendProtocol::new(ACDrain { sender: s1 }, m.clone()),
|
|
|
|
MpscRecvProtocol::new(ACSink { receiver: r2 }, m.clone()),
|
2021-01-22 16:09:20 +00:00
|
|
|
),
|
|
|
|
(
|
2021-02-14 17:45:12 +00:00
|
|
|
MpscSendProtocol::new(ACDrain { sender: s2 }, m.clone()),
|
|
|
|
MpscRecvProtocol::new(ACSink { receiver: r1 }, m),
|
2021-01-22 16:09:20 +00:00
|
|
|
),
|
|
|
|
]
|
|
|
|
}
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
impl UnreliableDrain for ACDrain {
|
|
|
|
type DataFormat = MpscMsg;
|
|
|
|
|
|
|
|
async fn send(&mut self, data: Self::DataFormat) -> Result<(), ProtocolError> {
|
|
|
|
self.sender
|
|
|
|
.send(data)
|
|
|
|
.await
|
|
|
|
.map_err(|_| ProtocolError::Closed)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
impl UnreliableSink for ACSink {
|
|
|
|
type DataFormat = MpscMsg;
|
|
|
|
|
|
|
|
async fn recv(&mut self) -> Result<Self::DataFormat, ProtocolError> {
|
|
|
|
self.receiver
|
|
|
|
.recv()
|
|
|
|
.await
|
|
|
|
.map_err(|_| ProtocolError::Closed)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use crate::{
|
|
|
|
mpsc::test_utils::*,
|
|
|
|
types::{Pid, STREAM_ID_OFFSET1, STREAM_ID_OFFSET2},
|
|
|
|
InitProtocol,
|
|
|
|
};
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn handshake_all_good() {
|
|
|
|
let [mut p1, mut p2] = ac_bound(10, None);
|
|
|
|
let r1 = tokio::spawn(async move { p1.initialize(true, Pid::fake(2), 1337).await });
|
|
|
|
let r2 = tokio::spawn(async move { p2.initialize(false, Pid::fake(3), 42).await });
|
|
|
|
let (r1, r2) = tokio::join!(r1, r2);
|
|
|
|
assert_eq!(r1.unwrap(), Ok((Pid::fake(3), STREAM_ID_OFFSET1, 42)));
|
|
|
|
assert_eq!(r2.unwrap(), Ok((Pid::fake(2), STREAM_ID_OFFSET2, 1337)));
|
|
|
|
}
|
|
|
|
}
|