From 155a526d044c386335600f12df7c6dc65be07a85 Mon Sep 17 00:00:00 2001 From: appflowy Date: Tue, 5 Oct 2021 17:54:11 +0800 Subject: [PATCH] retry when ws connection lost --- backend/src/application.rs | 4 +- backend/src/service/doc/edit/edit_doc.rs | 4 +- backend/src/service/doc/edit/open_handle.rs | 1 - .../src/services/doc/doc_controller.rs | 4 +- .../src/services/doc/edit/doc_actor.rs | 2 +- .../src/services/doc/edit/edit_doc.rs | 8 ++-- .../src/services/ws/ws_manager.rs | 5 +-- rust-lib/flowy-sdk/src/lib.rs | 2 +- rust-lib/flowy-sdk/src/module.rs | 4 +- .../src/services/user/user_session.rs | 35 +++++++++++++-- rust-lib/flowy-ws/src/connect.rs | 45 +++++++++++++------ rust-lib/flowy-ws/src/errors.rs | 7 +++ rust-lib/flowy-ws/src/ws.rs | 40 +++++++++++++---- 13 files changed, 116 insertions(+), 45 deletions(-) diff --git a/backend/src/application.rs b/backend/src/application.rs index 0c984681d4..713910d068 100644 --- a/backend/src/application.rs +++ b/backend/src/application.rs @@ -2,7 +2,7 @@ use std::{net::TcpListener, time::Duration}; use actix::Actor; use actix_identity::{CookieIdentityPolicy, IdentityService}; -use actix_web::{dev::Server, middleware, web, web::Data, App, HttpServer, Scope}; +use actix_web::{dev::Server, web, web::Data, App, HttpServer, Scope}; use sqlx::{postgres::PgPoolOptions, PgPool}; use tokio::time::interval; @@ -50,7 +50,7 @@ pub fn run(listener: TcpListener, app_ctx: AppContext) -> Result) -> Result<(), ServerError> { // Opti: save with multiple revisions let mut params = UpdateDocParams::new(); diff --git a/backend/src/service/doc/edit/open_handle.rs b/backend/src/service/doc/edit/open_handle.rs index 34856cc4bc..4439dacade 100644 --- a/backend/src/service/doc/edit/open_handle.rs +++ b/backend/src/service/doc/edit/open_handle.rs @@ -33,7 +33,6 @@ impl DocHandle { Ok(()) } - #[tracing::instrument(level = "debug", skip(self, user, socket, revision))] pub async fn apply_revision( &self, user: Arc, diff --git a/rust-lib/flowy-document/src/services/doc/doc_controller.rs b/rust-lib/flowy-document/src/services/doc/doc_controller.rs index 33792b2768..a550bee8ff 100644 --- a/rust-lib/flowy-document/src/services/doc/doc_controller.rs +++ b/rust-lib/flowy-document/src/services/doc/doc_controller.rs @@ -15,7 +15,7 @@ use crate::{ server::Server, ws::WsDocumentManager, }, - sql_tables::doc::{DocTable, DocTableSql}, + sql_tables::doc::DocTableSql, }; use flowy_database::{ConnectionPool, SqliteConnection}; use flowy_infra::future::{wrap_future, FnFuture, ResultFuture}; @@ -51,7 +51,7 @@ impl DocController { #[tracing::instrument(skip(self, conn), err)] pub(crate) fn create(&self, params: CreateDocParams, conn: &SqliteConnection) -> Result<(), DocError> { - let doc = Doc { + let _doc = Doc { id: params.id, data: params.data, rev_id: 0, diff --git a/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs b/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs index a2481d5ddd..e909c3cb3c 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs @@ -117,7 +117,7 @@ impl DocumentActor { let data = self.document.read().await.to_json(); let _ = ret.send(Ok(data)); }, - DocumentMsg::SaveDocument { rev_id, ret } => { + DocumentMsg::SaveDocument { rev_id: _, ret } => { // let result = self.save_to_disk(rev_id).await; let _ = ret.send(Ok(())); }, diff --git a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs index 0aaa8019d7..98029f81e6 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs @@ -289,11 +289,11 @@ impl WsDocumentHandler for EditDocWsHandler { fn state_changed(&self, state: &WsState) { match state { WsState::Init => {}, - WsState::Connected(_) => { - log::debug!("ws state changed: {}", state); - self.0.notify_open_doc() + WsState::Connected(_) => self.0.notify_open_doc(), + WsState::Disconnected(e) => { + log::error!("websocket error: {:?}", e); + // }, - WsState::Disconnected(_) => {}, } } } diff --git a/rust-lib/flowy-document/src/services/ws/ws_manager.rs b/rust-lib/flowy-document/src/services/ws/ws_manager.rs index bfc39534ab..197e5e87af 100644 --- a/rust-lib/flowy-document/src/services/ws/ws_manager.rs +++ b/rust-lib/flowy-document/src/services/ws/ws_manager.rs @@ -65,10 +65,7 @@ fn listen_ws_state_changed(ws: Arc, handlers: Arc { - log::error!("Websocket state notify error: {:?}", e); - break; - }, + Err(_) => break, } } }); diff --git a/rust-lib/flowy-sdk/src/lib.rs b/rust-lib/flowy-sdk/src/lib.rs index ce3da34f01..bf6cd62913 100644 --- a/rust-lib/flowy-sdk/src/lib.rs +++ b/rust-lib/flowy-sdk/src/lib.rs @@ -2,7 +2,7 @@ mod deps_resolve; // mod flowy_server; pub mod module; -use crate::deps_resolve::{DocumentDepsResolver, WorkspaceDepsResolver}; +use crate::deps_resolve::WorkspaceDepsResolver; use flowy_dispatch::prelude::*; use flowy_document::prelude::FlowyDocument; use flowy_net::config::ServerConfig; diff --git a/rust-lib/flowy-sdk/src/module.rs b/rust-lib/flowy-sdk/src/module.rs index 3ebbaa762d..1f18696cbd 100644 --- a/rust-lib/flowy-sdk/src/module.rs +++ b/rust-lib/flowy-sdk/src/module.rs @@ -1,9 +1,9 @@ -use crate::deps_resolve::{DocumentDepsResolver, WorkspaceDepsResolver}; +use crate::deps_resolve::DocumentDepsResolver; use flowy_dispatch::prelude::Module; use flowy_document::module::FlowyDocument; use flowy_net::config::ServerConfig; use flowy_user::services::user::UserSession; -use flowy_workspace::{module::mk_workspace, prelude::WorkspaceController}; +use flowy_workspace::prelude::WorkspaceController; use std::sync::Arc; pub fn mk_modules(workspace_controller: Arc, user_session: Arc) -> Vec { diff --git a/rust-lib/flowy-user/src/services/user/user_session.rs b/rust-lib/flowy-user/src/services/user/user_session.rs index 623fd71a7a..9623442d84 100644 --- a/rust-lib/flowy-user/src/services/user/user_session.rs +++ b/rust-lib/flowy-user/src/services/user/user_session.rs @@ -19,7 +19,7 @@ use flowy_database::{ use flowy_infra::kv::KV; use flowy_net::config::ServerConfig; use flowy_sqlite::ConnectionPool; -use flowy_ws::{WsController, WsMessageHandler}; +use flowy_ws::{WsController, WsMessageHandler, WsState}; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -278,12 +278,41 @@ impl UserSession { } } + #[tracing::instrument(level = "debug", skip(self, token))] pub async fn start_ws_connection(&self, token: &str) -> Result<(), UserError> { - log::debug!("start_ws_connection"); let addr = format!("{}/{}", self.server.ws_addr(), token); - let _ = self.ws_controller.connect(addr).await?; + self.listen_on_websocket(); + + let _ = self.ws_controller.start_connect(addr).await?; Ok(()) } + + #[tracing::instrument(level = "debug", skip(self))] + fn listen_on_websocket(&self) { + let mut notify = self.ws_controller.state_subscribe(); + let ws_controller = self.ws_controller.clone(); + let _ = tokio::spawn(async move { + log::debug!("listen ws state"); + loop { + match notify.recv().await { + Ok(state) => { + log::info!("Websocket state changed: {}", state); + match state { + WsState::Init => {}, + WsState::Connected(_) => {}, + WsState::Disconnected(_) => { + ws_controller.retry().await; + }, + } + }, + Err(e) => { + log::error!("Websocket state notify error: {:?}", e); + break; + }, + } + } + }); + } } pub async fn update_user( diff --git a/rust-lib/flowy-ws/src/connect.rs b/rust-lib/flowy-ws/src/connect.rs index 04471eece7..2134189b20 100644 --- a/rust-lib/flowy-ws/src/connect.rs +++ b/rust-lib/flowy-ws/src/connect.rs @@ -1,4 +1,8 @@ -use crate::{errors::WsError, MsgReceiver, MsgSender}; +use crate::{ + errors::{internal_error, WsError}, + MsgReceiver, + MsgSender, +}; use futures_core::{future::BoxFuture, ready}; use futures_util::{FutureExt, StreamExt}; use pin_project::pin_project; @@ -88,14 +92,32 @@ impl WsStream { msg_tx: msg_tx.clone(), inner: Some(( Box::pin(async move { + let (tx, mut rx) = tokio::sync::mpsc::channel(10); let _ = ws_read - .for_each(|message| async { post_message(msg_tx.clone(), message) }) + .for_each(|message| async { + match tx.send(post_message(msg_tx.clone(), message)).await { + Ok(_) => {}, + Err(e) => log::error!("WsStream tx closed unexpectedly: {} ", e), + } + }) .await; - Ok(()) + + loop { + match rx.recv().await { + None => { + return Err(WsError::internal().context("WsStream rx closed unexpectedly")); + }, + Some(result) => { + if result.is_err() { + return result; + } + }, + } + } }), Box::pin(async move { - let _ = ws_rx.map(Ok).forward(ws_write).await?; - Ok(()) + let result = ws_rx.map(Ok).forward(ws_write).await.map_err(internal_error); + result }), )), } @@ -127,16 +149,11 @@ impl Future for WsStream { } } -fn post_message(tx: MsgSender, message: Result) { +fn post_message(tx: MsgSender, message: Result) -> Result<(), WsError> { match message { - Ok(Message::Binary(bytes)) => match tx.unbounded_send(Message::Binary(bytes)) { - Ok(_) => {}, - Err(e) => log::error!("tx send error: {:?}", e), - }, - Ok(_) => {}, - Err(e) => { - log::error!("ws read error: {:?}", e) - }, + Ok(Message::Binary(bytes)) => tx.unbounded_send(Message::Binary(bytes)).map_err(internal_error), + Ok(_) => Ok(()), + Err(e) => Err(WsError::internal().context(e)), } } #[allow(dead_code)] diff --git a/rust-lib/flowy-ws/src/errors.rs b/rust-lib/flowy-ws/src/errors.rs index 2ff687691c..fb5f0a2820 100644 --- a/rust-lib/flowy-ws/src/errors.rs +++ b/rust-lib/flowy-ws/src/errors.rs @@ -45,6 +45,13 @@ impl WsError { static_user_error!(unauthorized, ErrorCode::Unauthorized); } +pub fn internal_error(e: T) -> WsError +where + T: std::fmt::Debug, +{ + WsError::internal().context(e) +} + #[derive(Debug, Clone, ProtoBuf_Enum, Display, PartialEq, Eq)] pub enum ErrorCode { InternalError = 0, diff --git a/rust-lib/flowy-ws/src/ws.rs b/rust-lib/flowy-ws/src/ws.rs index 89e3f5c5f0..b12073f768 100644 --- a/rust-lib/flowy-ws/src/ws.rs +++ b/rust-lib/flowy-ws/src/ws.rs @@ -6,9 +6,8 @@ use crate::{ }; use bytes::Bytes; use dashmap::DashMap; -use flowy_infra::retry::{Action, ExponentialBackoff, Retry}; +use flowy_infra::retry::{Action, ExponentialBackoff, FixedInterval, Retry}; use flowy_net::errors::ServerError; - use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures_core::{ready, Stream}; use parking_lot::RwLock; @@ -20,6 +19,7 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, + time::Duration, }; use tokio::sync::{broadcast, oneshot}; use tokio_tungstenite::tungstenite::{ @@ -61,6 +61,7 @@ pub struct WsController { handlers: Handlers, state_notify: Arc>, sender: Arc>>>, + addr: Arc>>, } impl WsController { @@ -70,6 +71,7 @@ impl WsController { handlers: DashMap::new(), sender: Arc::new(RwLock::new(None)), state_notify: Arc::new(state_notify), + addr: Arc::new(RwLock::new(None)), }; controller } @@ -83,14 +85,26 @@ impl WsController { Ok(()) } - pub async fn connect(&self, addr: String) -> Result<(), ServerError> { + pub async fn start_connect(&self, addr: String) -> Result<(), ServerError> { + *self.addr.write() = Some(addr.clone()); + + let strategy = ExponentialBackoff::from_millis(100).take(5); + self.connect(addr, strategy).await + } + + async fn connect(&self, addr: String, strategy: T) -> Result<(), ServerError> + where + T: IntoIterator, + I: Iterator + Send + 'static, + { let (ret, rx) = oneshot::channel::>(); + *self.addr.write() = Some(addr.clone()); let action = WsConnectAction { addr, handlers: self.handlers.clone(), }; - let strategy = ExponentialBackoff::from_millis(100).take(3); + let retry = Retry::spawn(strategy, action); let sender_holder = self.sender.clone(); let state_notify = self.state_notify.clone(); @@ -121,7 +135,17 @@ impl WsController { rx.await? } - #[allow(dead_code)] + pub async fn retry(&self) -> Result<(), ServerError> { + let addr = self + .addr + .read() + .as_ref() + .expect("must call start_connect first") + .clone(); + let strategy = FixedInterval::from_millis(5000); + self.connect(addr, strategy).await + } + pub fn state_subscribe(&self) -> broadcast::Receiver { self.state_notify.subscribe() } pub fn sender(&self) -> Result, WsError> { @@ -142,9 +166,8 @@ async fn spawn_stream_and_handlers( match result { Ok(_) => {}, Err(e) => { - // TODO: retry? - log::error!("ws stream error {:?}", e); - let _ = state_notify.send(WsState::Disconnected(e)); + log::error!("websocket error: {:?}", e); + let _ = state_notify.send(WsState::Disconnected(e)).unwrap(); } } }, @@ -275,7 +298,6 @@ impl WsConnectActionFut { // └─────────┼──│ws_write │◀─┼────│ ws_rx │◀──┼──│ ws_tx │ │ // │ └─────────┘ │ └────────┘ │ └────────┘ │ // └───────────────┘ └──────────────┘ - log::debug!("🐴 ws start connect: {}", &addr); let (msg_tx, msg_rx) = futures_channel::mpsc::unbounded(); let (ws_tx, ws_rx) = futures_channel::mpsc::unbounded(); let sender = WsSender { ws_tx };