AppFlowy/shared-lib/lib-ws/src/connect.rs

204 lines
6.3 KiB
Rust
Raw Normal View History

2021-10-05 09:54:11 +00:00
use crate::{
errors::{internal_error, WsError},
MsgReceiver,
MsgSender,
};
use futures_core::{future::BoxFuture, ready};
2021-09-19 15:21:10 +00:00
use futures_util::{FutureExt, StreamExt};
2021-09-18 14:32:00 +00:00
use pin_project::pin_project;
use std::{
fmt,
2021-09-18 14:32:00 +00:00
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::net::TcpStream;
2021-09-18 14:32:00 +00:00
use tokio_tungstenite::{
connect_async,
2021-09-19 15:21:10 +00:00
tungstenite::{handshake::client::Response, Error, Message},
2021-09-18 14:32:00 +00:00
MaybeTlsStream,
WebSocketStream,
};
#[pin_project]
2021-09-20 07:38:55 +00:00
pub struct WsConnectionFuture {
2021-09-18 14:32:00 +00:00
msg_tx: Option<MsgSender>,
ws_rx: Option<MsgReceiver>,
#[pin]
fut: Pin<
Box<dyn Future<Output = Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), Error>> + Send + Sync>,
>,
2021-09-18 14:32:00 +00:00
}
2021-09-20 07:38:55 +00:00
impl WsConnectionFuture {
2021-09-18 14:32:00 +00:00
pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver, addr: String) -> Self {
2021-09-20 07:38:55 +00:00
WsConnectionFuture {
2021-09-18 14:32:00 +00:00
msg_tx: Some(msg_tx),
ws_rx: Some(ws_rx),
fut: Box::pin(async move { connect_async(&addr).await }),
}
}
}
2021-09-20 07:38:55 +00:00
impl Future for WsConnectionFuture {
2021-09-19 10:39:56 +00:00
type Output = Result<WsStream, WsError>;
2021-09-18 14:32:00 +00:00
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// [[pin]]
// poll async function. The following methods not work.
// 1.
// let f = connect_async("");
// pin_mut!(f);
// ready!(Pin::new(&mut a).poll(cx))
//
// 2.ready!(Pin::new(&mut Box::pin(connect_async(""))).poll(cx))
//
// An async method calls poll multiple times and might return to the executor. A
// single poll call can only return to the executor once and will get
// resumed through another poll invocation. the connect_async call multiple time
// from the beginning. So I use fut to hold the future and continue to
// poll it. (Fix me if i was wrong)
loop {
return match ready!(self.as_mut().project().fut.poll(cx)) {
Ok((stream, _)) => {
2021-11-03 07:37:38 +00:00
tracing::debug!("🐴 ws connect success");
2021-09-19 08:11:02 +00:00
let (msg_tx, ws_rx) = (
self.msg_tx.take().expect("WsConnection should be call once "),
self.ws_rx.take().expect("WsConnection should be call once "),
);
2021-09-18 14:32:00 +00:00
Poll::Ready(Ok(WsStream::new(msg_tx, ws_rx, stream)))
},
Err(error) => {
2021-11-03 07:37:38 +00:00
tracing::debug!("🐴 ws connect failed: {:?}", error);
2021-09-19 10:39:56 +00:00
Poll::Ready(Err(error.into()))
},
2021-09-18 14:32:00 +00:00
};
}
}
}
type Fut = BoxFuture<'static, Result<(), WsError>>;
2021-09-18 14:32:00 +00:00
#[pin_project]
pub struct WsStream {
2021-09-22 06:42:14 +00:00
#[allow(dead_code)]
2021-09-19 08:11:02 +00:00
msg_tx: MsgSender,
2021-09-18 14:32:00 +00:00
#[pin]
inner: Option<(Fut, Fut)>,
2021-09-18 14:32:00 +00:00
}
impl WsStream {
pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver, stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
let (ws_write, ws_read) = stream.split();
Self {
2021-09-19 08:11:02 +00:00
msg_tx: msg_tx.clone(),
inner: Some((
2021-09-18 14:32:00 +00:00
Box::pin(async move {
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
let read = async {
ws_read
.for_each(|message| async {
match tx.send(post_message(msg_tx.clone(), message)).await {
Ok(_) => {},
Err(e) => log::error!("WsStream tx closed unexpectedly: {} ", e),
2021-10-05 09:54:11 +00:00
}
})
.await;
Ok(())
};
let ret = async {
loop {
match rx.recv().await {
None => {
return Err(WsError::internal().context("WsStream rx closed unexpectedly"));
},
Some(result) => {
if result.is_err() {
return result;
}
},
}
2021-10-05 09:54:11 +00:00
}
};
futures::pin_mut!(ret);
futures::pin_mut!(read);
tokio::select! {
result = read => {return result},
result = ret => {return result},
};
2021-09-18 14:32:00 +00:00
}),
Box::pin(async move {
2021-10-05 09:54:11 +00:00
let result = ws_rx.map(Ok).forward(ws_write).await.map_err(internal_error);
result
2021-09-18 14:32:00 +00:00
}),
)),
}
}
}
impl fmt::Debug for WsStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WsStream").finish() }
}
2021-09-18 14:32:00 +00:00
impl Future for WsStream {
type Output = Result<(), WsError>;
2021-09-18 14:32:00 +00:00
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2021-09-19 08:11:02 +00:00
let (mut ws_read, mut ws_write) = self.inner.take().unwrap();
match ws_read.poll_unpin(cx) {
Poll::Ready(l) => Poll::Ready(l),
Poll::Pending => {
//
2021-09-19 08:11:02 +00:00
match ws_write.poll_unpin(cx) {
Poll::Ready(r) => Poll::Ready(r),
Poll::Pending => {
2021-09-19 08:11:02 +00:00
self.inner = Some((ws_read, ws_write));
Poll::Pending
},
}
2021-09-18 14:32:00 +00:00
},
}
}
}
2021-10-05 09:54:11 +00:00
fn post_message(tx: MsgSender, message: Result<Message, Error>) -> Result<(), WsError> {
2021-09-18 14:32:00 +00:00
match message {
2021-10-05 09:54:11 +00:00
Ok(Message::Binary(bytes)) => tx.unbounded_send(Message::Binary(bytes)).map_err(internal_error),
Ok(_) => Ok(()),
Err(e) => Err(WsError::internal().context(e)),
2021-09-18 14:32:00 +00:00
}
}
2021-09-30 09:24:02 +00:00
#[allow(dead_code)]
2021-09-19 08:11:02 +00:00
pub struct Retry<F> {
f: F,
2021-09-22 06:42:14 +00:00
#[allow(dead_code)]
2021-09-19 08:11:02 +00:00
retry_time: usize,
addr: String,
}
impl<F> Retry<F>
where
F: Fn(&str),
{
2021-09-30 09:24:02 +00:00
#[allow(dead_code)]
2021-09-19 08:11:02 +00:00
pub fn new(addr: &str, f: F) -> Self {
Self {
f,
retry_time: 3,
addr: addr.to_owned(),
}
}
}
impl<F> Future for Retry<F>
where
F: Fn(&str),
{
type Output = ();
2021-09-19 15:21:10 +00:00
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
2021-09-19 08:11:02 +00:00
(self.f)(&self.addr);
Poll::Ready(())
}
}