retry when ws connection lost

This commit is contained in:
appflowy 2021-10-05 17:54:11 +08:00
parent 32d9331b35
commit 155a526d04
13 changed files with 116 additions and 45 deletions

View File

@ -2,7 +2,7 @@ use std::{net::TcpListener, time::Duration};
use actix::Actor;
use actix_identity::{CookieIdentityPolicy, IdentityService};
use actix_web::{dev::Server, middleware, web, web::Data, App, HttpServer, Scope};
use actix_web::{dev::Server, web, web::Data, App, HttpServer, Scope};
use sqlx::{postgres::PgPoolOptions, PgPool};
use tokio::time::interval;
@ -50,7 +50,7 @@ pub fn run(listener: TcpListener, app_ctx: AppContext) -> Result<Server, std::io
let server = HttpServer::new(move || {
App::new()
.wrap(middleware::Logger::default())
// .wrap(middleware::Logger::default())
.wrap(identify_service(&domain, &secret))
.wrap(crate::middleware::default_cors())
.wrap(crate::middleware::AuthenticationService)

View File

@ -82,7 +82,7 @@ impl ServerEditDoc {
#[tracing::instrument(
level = "debug",
skip(self, user, pg_pool),
skip(self, user, pg_pool, revision),
fields(
rev_id = %self.rev_id.load(SeqCst),
revision_rev_id = %revision.rev_id,
@ -186,7 +186,7 @@ impl ServerEditDoc {
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, pg_pool), err)]
#[tracing::instrument(level = "debug", skip(self, revision, pg_pool), err)]
async fn save_revision(&self, revision: &Revision, pg_pool: Data<PgPool>) -> Result<(), ServerError> {
// Opti: save with multiple revisions
let mut params = UpdateDocParams::new();

View File

@ -33,7 +33,6 @@ impl DocHandle {
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, user, socket, revision))]
pub async fn apply_revision(
&self,
user: Arc<WsUser>,

View File

@ -15,7 +15,7 @@ use crate::{
server::Server,
ws::WsDocumentManager,
},
sql_tables::doc::{DocTable, DocTableSql},
sql_tables::doc::DocTableSql,
};
use flowy_database::{ConnectionPool, SqliteConnection};
use flowy_infra::future::{wrap_future, FnFuture, ResultFuture};
@ -51,7 +51,7 @@ impl DocController {
#[tracing::instrument(skip(self, conn), err)]
pub(crate) fn create(&self, params: CreateDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
let doc = Doc {
let _doc = Doc {
id: params.id,
data: params.data,
rev_id: 0,

View File

@ -117,7 +117,7 @@ impl DocumentActor {
let data = self.document.read().await.to_json();
let _ = ret.send(Ok(data));
},
DocumentMsg::SaveDocument { rev_id, ret } => {
DocumentMsg::SaveDocument { rev_id: _, ret } => {
// let result = self.save_to_disk(rev_id).await;
let _ = ret.send(Ok(()));
},

View File

@ -289,11 +289,11 @@ impl WsDocumentHandler for EditDocWsHandler {
fn state_changed(&self, state: &WsState) {
match state {
WsState::Init => {},
WsState::Connected(_) => {
log::debug!("ws state changed: {}", state);
self.0.notify_open_doc()
WsState::Connected(_) => self.0.notify_open_doc(),
WsState::Disconnected(e) => {
log::error!("websocket error: {:?}", e);
//
},
WsState::Disconnected(_) => {},
}
}
}

View File

@ -65,10 +65,7 @@ fn listen_ws_state_changed(ws: Arc<dyn DocumentWebSocket>, handlers: Arc<DashMap
handle.value().state_changed(&state);
});
},
Err(e) => {
log::error!("Websocket state notify error: {:?}", e);
break;
},
Err(_) => break,
}
}
});

View File

@ -2,7 +2,7 @@ mod deps_resolve;
// mod flowy_server;
pub mod module;
use crate::deps_resolve::{DocumentDepsResolver, WorkspaceDepsResolver};
use crate::deps_resolve::WorkspaceDepsResolver;
use flowy_dispatch::prelude::*;
use flowy_document::prelude::FlowyDocument;
use flowy_net::config::ServerConfig;

View File

@ -1,9 +1,9 @@
use crate::deps_resolve::{DocumentDepsResolver, WorkspaceDepsResolver};
use crate::deps_resolve::DocumentDepsResolver;
use flowy_dispatch::prelude::Module;
use flowy_document::module::FlowyDocument;
use flowy_net::config::ServerConfig;
use flowy_user::services::user::UserSession;
use flowy_workspace::{module::mk_workspace, prelude::WorkspaceController};
use flowy_workspace::prelude::WorkspaceController;
use std::sync::Arc;
pub fn mk_modules(workspace_controller: Arc<WorkspaceController>, user_session: Arc<UserSession>) -> Vec<Module> {

View File

@ -19,7 +19,7 @@ use flowy_database::{
use flowy_infra::kv::KV;
use flowy_net::config::ServerConfig;
use flowy_sqlite::ConnectionPool;
use flowy_ws::{WsController, WsMessageHandler};
use flowy_ws::{WsController, WsMessageHandler, WsState};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
@ -278,12 +278,41 @@ impl UserSession {
}
}
#[tracing::instrument(level = "debug", skip(self, token))]
pub async fn start_ws_connection(&self, token: &str) -> Result<(), UserError> {
log::debug!("start_ws_connection");
let addr = format!("{}/{}", self.server.ws_addr(), token);
let _ = self.ws_controller.connect(addr).await?;
self.listen_on_websocket();
let _ = self.ws_controller.start_connect(addr).await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self))]
fn listen_on_websocket(&self) {
let mut notify = self.ws_controller.state_subscribe();
let ws_controller = self.ws_controller.clone();
let _ = tokio::spawn(async move {
log::debug!("listen ws state");
loop {
match notify.recv().await {
Ok(state) => {
log::info!("Websocket state changed: {}", state);
match state {
WsState::Init => {},
WsState::Connected(_) => {},
WsState::Disconnected(_) => {
ws_controller.retry().await;
},
}
},
Err(e) => {
log::error!("Websocket state notify error: {:?}", e);
break;
},
}
}
});
}
}
pub async fn update_user(

View File

@ -1,4 +1,8 @@
use crate::{errors::WsError, MsgReceiver, MsgSender};
use crate::{
errors::{internal_error, WsError},
MsgReceiver,
MsgSender,
};
use futures_core::{future::BoxFuture, ready};
use futures_util::{FutureExt, StreamExt};
use pin_project::pin_project;
@ -88,14 +92,32 @@ impl WsStream {
msg_tx: msg_tx.clone(),
inner: Some((
Box::pin(async move {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let _ = ws_read
.for_each(|message| async { post_message(msg_tx.clone(), message) })
.for_each(|message| async {
match tx.send(post_message(msg_tx.clone(), message)).await {
Ok(_) => {},
Err(e) => log::error!("WsStream tx closed unexpectedly: {} ", e),
}
})
.await;
Ok(())
loop {
match rx.recv().await {
None => {
return Err(WsError::internal().context("WsStream rx closed unexpectedly"));
},
Some(result) => {
if result.is_err() {
return result;
}
},
}
}
}),
Box::pin(async move {
let _ = ws_rx.map(Ok).forward(ws_write).await?;
Ok(())
let result = ws_rx.map(Ok).forward(ws_write).await.map_err(internal_error);
result
}),
)),
}
@ -127,16 +149,11 @@ impl Future for WsStream {
}
}
fn post_message(tx: MsgSender, message: Result<Message, Error>) {
fn post_message(tx: MsgSender, message: Result<Message, Error>) -> Result<(), WsError> {
match message {
Ok(Message::Binary(bytes)) => match tx.unbounded_send(Message::Binary(bytes)) {
Ok(_) => {},
Err(e) => log::error!("tx send error: {:?}", e),
},
Ok(_) => {},
Err(e) => {
log::error!("ws read error: {:?}", e)
},
Ok(Message::Binary(bytes)) => tx.unbounded_send(Message::Binary(bytes)).map_err(internal_error),
Ok(_) => Ok(()),
Err(e) => Err(WsError::internal().context(e)),
}
}
#[allow(dead_code)]

View File

@ -45,6 +45,13 @@ impl WsError {
static_user_error!(unauthorized, ErrorCode::Unauthorized);
}
pub fn internal_error<T>(e: T) -> WsError
where
T: std::fmt::Debug,
{
WsError::internal().context(e)
}
#[derive(Debug, Clone, ProtoBuf_Enum, Display, PartialEq, Eq)]
pub enum ErrorCode {
InternalError = 0,

View File

@ -6,9 +6,8 @@ use crate::{
};
use bytes::Bytes;
use dashmap::DashMap;
use flowy_infra::retry::{Action, ExponentialBackoff, Retry};
use flowy_infra::retry::{Action, ExponentialBackoff, FixedInterval, Retry};
use flowy_net::errors::ServerError;
use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures_core::{ready, Stream};
use parking_lot::RwLock;
@ -20,6 +19,7 @@ use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use tokio::sync::{broadcast, oneshot};
use tokio_tungstenite::tungstenite::{
@ -61,6 +61,7 @@ pub struct WsController {
handlers: Handlers,
state_notify: Arc<broadcast::Sender<WsState>>,
sender: Arc<RwLock<Option<Arc<WsSender>>>>,
addr: Arc<RwLock<Option<String>>>,
}
impl WsController {
@ -70,6 +71,7 @@ impl WsController {
handlers: DashMap::new(),
sender: Arc::new(RwLock::new(None)),
state_notify: Arc::new(state_notify),
addr: Arc::new(RwLock::new(None)),
};
controller
}
@ -83,14 +85,26 @@ impl WsController {
Ok(())
}
pub async fn connect(&self, addr: String) -> Result<(), ServerError> {
pub async fn start_connect(&self, addr: String) -> Result<(), ServerError> {
*self.addr.write() = Some(addr.clone());
let strategy = ExponentialBackoff::from_millis(100).take(5);
self.connect(addr, strategy).await
}
async fn connect<T, I>(&self, addr: String, strategy: T) -> Result<(), ServerError>
where
T: IntoIterator<IntoIter = I, Item = Duration>,
I: Iterator<Item = Duration> + Send + 'static,
{
let (ret, rx) = oneshot::channel::<Result<(), ServerError>>();
*self.addr.write() = Some(addr.clone());
let action = WsConnectAction {
addr,
handlers: self.handlers.clone(),
};
let strategy = ExponentialBackoff::from_millis(100).take(3);
let retry = Retry::spawn(strategy, action);
let sender_holder = self.sender.clone();
let state_notify = self.state_notify.clone();
@ -121,7 +135,17 @@ impl WsController {
rx.await?
}
#[allow(dead_code)]
pub async fn retry(&self) -> Result<(), ServerError> {
let addr = self
.addr
.read()
.as_ref()
.expect("must call start_connect first")
.clone();
let strategy = FixedInterval::from_millis(5000);
self.connect(addr, strategy).await
}
pub fn state_subscribe(&self) -> broadcast::Receiver<WsState> { self.state_notify.subscribe() }
pub fn sender(&self) -> Result<Arc<WsSender>, WsError> {
@ -142,9 +166,8 @@ async fn spawn_stream_and_handlers(
match result {
Ok(_) => {},
Err(e) => {
// TODO: retry?
log::error!("ws stream error {:?}", e);
let _ = state_notify.send(WsState::Disconnected(e));
log::error!("websocket error: {:?}", e);
let _ = state_notify.send(WsState::Disconnected(e)).unwrap();
}
}
},
@ -275,7 +298,6 @@ impl WsConnectActionFut {
// └─────────┼──│ws_write │◀─┼────│ ws_rx │◀──┼──│ ws_tx │ │
// │ └─────────┘ │ └────────┘ │ └────────┘ │
// └───────────────┘ └──────────────┘
log::debug!("🐴 ws start connect: {}", &addr);
let (msg_tx, msg_rx) = futures_channel::mpsc::unbounded();
let (ws_tx, ws_rx) = futures_channel::mpsc::unbounded();
let sender = WsSender { ws_tx };