retry ws connection using WsConnectAction

This commit is contained in:
appflowy 2021-10-03 15:59:07 +08:00
parent d7300dd7e2
commit d70072ae9f
4 changed files with 129 additions and 76 deletions

View File

@ -164,7 +164,7 @@ where
} }
/// An action can be run multiple times and produces a future. /// An action can be run multiple times and produces a future.
pub trait Action { pub trait Action: Send + Sync {
type Future: Future<Output = Result<Self::Item, Self::Error>>; type Future: Future<Output = Result<Self::Item, Self::Error>>;
type Item; type Item;
type Error; type Error;
@ -172,13 +172,13 @@ pub trait Action {
fn run(&mut self) -> Self::Future; fn run(&mut self) -> Self::Future;
} }
impl<R, E, T: Future<Output = Result<R, E>>, F: FnMut() -> T> Action for F { // impl<R, E, T: Future<Output = Result<R, E>>, F: FnMut() -> T> Action for F {
type Future = T; // type Future = T;
type Item = R; // type Item = R;
type Error = E; // type Error = E;
//
fn run(&mut self) -> Self::Future { self() } // fn run(&mut self) -> Self::Future { self() }
} // }
pub trait Condition<E> { pub trait Condition<E> {
fn should_retry(&mut self, error: &E) -> bool; fn should_retry(&mut self, error: &E) -> bool;

View File

@ -16,7 +16,7 @@ use flowy_database::{
ExpressionMethods, ExpressionMethods,
UserDatabaseConnection, UserDatabaseConnection,
}; };
use flowy_infra::kv::KV; use flowy_infra::{future::wrap_future, kv::KV};
use flowy_net::config::ServerConfig; use flowy_net::config::ServerConfig;
use flowy_sqlite::ConnectionPool; use flowy_sqlite::ConnectionPool;
use flowy_ws::{WsController, WsMessage, WsMessageHandler}; use flowy_ws::{WsController, WsMessage, WsMessageHandler};

View File

@ -21,7 +21,9 @@ pub struct WsConnectionFuture {
msg_tx: Option<MsgSender>, msg_tx: Option<MsgSender>,
ws_rx: Option<MsgReceiver>, ws_rx: Option<MsgReceiver>,
#[pin] #[pin]
fut: BoxFuture<'static, Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), Error>>, fut: Pin<
Box<dyn Future<Output = Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), Error>> + Send + Sync>,
>,
} }
impl WsConnectionFuture { impl WsConnectionFuture {

View File

@ -6,7 +6,12 @@ use crate::{
}; };
use bytes::Bytes; use bytes::Bytes;
use dashmap::DashMap; use dashmap::DashMap;
use flowy_infra::{
future::{wrap_future, FnFuture},
retry::{Action, ExponentialBackoff, Retry},
};
use flowy_net::errors::ServerError; use flowy_net::errors::ServerError;
use futures::future::BoxFuture;
use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures_core::{ready, Stream}; use futures_core::{ready, Stream};
use parking_lot::RwLock; use parking_lot::RwLock;
@ -43,7 +48,7 @@ pub enum WsState {
pub struct WsController { pub struct WsController {
handlers: Handlers, handlers: Handlers,
state_notify: Arc<broadcast::Sender<WsState>>, state_notify: Arc<broadcast::Sender<WsState>>,
sender: RwLock<Option<Arc<WsSender>>>, sender: Arc<RwLock<Option<Arc<WsSender>>>>,
} }
impl WsController { impl WsController {
@ -51,7 +56,7 @@ impl WsController {
let (state_notify, _) = broadcast::channel(16); let (state_notify, _) = broadcast::channel(16);
let controller = Self { let controller = Self {
handlers: DashMap::new(), handlers: DashMap::new(),
sender: RwLock::new(None), sender: Arc::new(RwLock::new(None)),
state_notify: Arc::new(state_notify), state_notify: Arc::new(state_notify),
}; };
controller controller
@ -68,7 +73,39 @@ impl WsController {
pub async fn connect(&self, addr: String) -> Result<(), ServerError> { pub async fn connect(&self, addr: String) -> Result<(), ServerError> {
let (ret, rx) = oneshot::channel::<Result<(), ServerError>>(); let (ret, rx) = oneshot::channel::<Result<(), ServerError>>();
self._connect(addr.clone(), ret);
let action = WsConnectAction {
addr,
handlers: self.handlers.clone(),
};
let strategy = ExponentialBackoff::from_millis(100).take(3);
let retry = Retry::spawn(strategy, action);
let sender_holder = self.sender.clone();
let state_notify = self.state_notify.clone();
tokio::spawn(async move {
match retry.await {
Ok(result) => {
let WsConnectResult {
stream,
handlers_fut,
sender,
} = result;
let sender = Arc::new(sender);
*sender_holder.write() = Some(sender.clone());
let _ = state_notify.send(WsState::Connected(sender));
let _ = ret.send(Ok(()));
spawn_stream_and_handlers(stream, handlers_fut, state_notify).await;
},
Err(e) => {
//
let _ = state_notify.send(WsState::Disconnected(e.clone()));
let _ = ret.send(Err(ServerError::internal().context(e)));
},
}
});
rx.await? rx.await?
} }
@ -81,51 +118,6 @@ impl WsController {
Some(sender) => Ok(sender.clone()), Some(sender) => Ok(sender.clone()),
} }
} }
fn _connect(&self, addr: String, ret: oneshot::Sender<Result<(), ServerError>>) {
log::debug!("🐴 ws connect: {}", &addr);
let (connection, handlers) = self.make_connect(addr.clone());
let state_notify = self.state_notify.clone();
let sender = self
.sender
.read()
.clone()
.expect("Sender should be not empty after calling make_connect");
tokio::spawn(async move {
match connection.await {
Ok(stream) => {
let _ = state_notify.send(WsState::Connected(sender));
let _ = ret.send(Ok(()));
spawn_stream_and_handlers(stream, handlers, state_notify).await;
},
Err(e) => {
let _ = state_notify.send(WsState::Disconnected(e.clone()));
let _ = ret.send(Err(ServerError::internal().context(e)));
},
}
});
}
fn make_connect(&self, addr: String) -> (WsConnectionFuture, WsHandlerFuture) {
// Stream User
// ┌───────────────┐ ┌──────────────┐
// ┌──────┐ │ ┌─────────┐ │ ┌────────┐ │ ┌────────┐ │
// │Server│──────┼─▶│ ws_read │──┼───▶│ msg_tx │───┼─▶│ msg_rx │ │
// └──────┘ │ └─────────┘ │ └────────┘ │ └────────┘ │
// ▲ │ │ │ │
// │ │ ┌─────────┐ │ ┌────────┐ │ ┌────────┐ │
// └─────────┼──│ws_write │◀─┼────│ ws_rx │◀──┼──│ ws_tx │ │
// │ └─────────┘ │ └────────┘ │ └────────┘ │
// └───────────────┘ └──────────────┘
let (msg_tx, msg_rx) = futures_channel::mpsc::unbounded();
let (ws_tx, ws_rx) = futures_channel::mpsc::unbounded();
let handlers = self.handlers.clone();
*self.sender.write() = Some(Arc::new(WsSender { ws_tx }));
(
WsConnectionFuture::new(msg_tx, ws_rx, addr),
WsHandlerFuture::new(handlers, msg_rx),
)
}
} }
async fn spawn_stream_and_handlers( async fn spawn_stream_and_handlers(
@ -239,21 +231,80 @@ impl WsSender {
} }
} }
// #[cfg(test)] struct WsConnectAction {
// mod tests { addr: String,
// use super::WsController; handlers: Handlers,
// }
// #[tokio::test]
// async fn connect() { struct WsConnectResult {
// std::env::set_var("RUST_LOG", "Debug"); stream: WsStream,
// env_logger::init(); handlers_fut: WsHandlerFuture,
// sender: WsSender,
// let mut controller = WsController::new(); }
// let addr = format!("{}/123", flowy_net::config::WS_ADDR.as_str());
// let (a, b) = controller.make_connect(addr); #[pin_project]
// tokio::select! { struct WsConnectActionFut {
// r = a => println!("write completed {:?}", r), addr: String,
// _ = b => println!("read completed"), #[pin]
// }; conn: WsConnectionFuture,
// } handlers_fut: Option<WsHandlerFuture>,
// } sender: Option<WsSender>,
}
impl WsConnectActionFut {
fn new(addr: String, handlers: Handlers) -> Self {
// Stream User
// ┌───────────────┐ ┌──────────────┐
// ┌──────┐ │ ┌─────────┐ │ ┌────────┐ │ ┌────────┐ │
// │Server│──────┼─▶│ ws_read │──┼───▶│ msg_tx │───┼─▶│ msg_rx │ │
// └──────┘ │ └─────────┘ │ └────────┘ │ └────────┘ │
// ▲ │ │ │ │
// │ │ ┌─────────┐ │ ┌────────┐ │ ┌────────┐ │
// └─────────┼──│ws_write │◀─┼────│ ws_rx │◀──┼──│ ws_tx │ │
// │ └─────────┘ │ └────────┘ │ └────────┘ │
// └───────────────┘ └──────────────┘
log::debug!("🐴 ws start connect: {}", &addr);
let (msg_tx, msg_rx) = futures_channel::mpsc::unbounded();
let (ws_tx, ws_rx) = futures_channel::mpsc::unbounded();
let sender = WsSender { ws_tx };
let handlers_fut = WsHandlerFuture::new(handlers, msg_rx);
let conn = WsConnectionFuture::new(msg_tx, ws_rx, addr.clone());
Self {
addr,
conn,
handlers_fut: Some(handlers_fut),
sender: Some(sender),
}
}
}
impl Future for WsConnectActionFut {
type Output = Result<WsConnectResult, WsError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
match ready!(this.conn.as_mut().poll(cx)) {
Ok(stream) => {
let handlers_fut = this.handlers_fut.take().expect("Only take once");
let sender = this.sender.take().expect("Only take once");
Poll::Ready(Ok(WsConnectResult {
stream,
handlers_fut,
sender,
}))
},
Err(e) => Poll::Ready(Err(WsError::internal().context(e))),
}
}
}
impl Action for WsConnectAction {
type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send + Sync>>;
type Item = WsConnectResult;
type Error = WsError;
fn run(&mut self) -> Self::Future {
let addr = self.addr.clone();
let handlers = self.handlers.clone();
Box::pin(WsConnectActionFut::new(addr, handlers))
}
}