fix listen on websocket error

This commit is contained in:
appflowy 2021-12-20 12:06:17 +08:00
parent 936e133624
commit 16c702b1e1
2 changed files with 33 additions and 25 deletions

View File

@ -24,7 +24,6 @@ impl WsManager {
local_web_socket() local_web_socket()
}; };
let (status_notifier, _) = broadcast::channel(10); let (status_notifier, _) = broadcast::channel(10);
listen_on_websocket(ws.clone());
WsManager { WsManager {
inner: ws, inner: ws,
connect_type: RwLock::new(NetworkType::default()), connect_type: RwLock::new(NetworkType::default()),
@ -36,7 +35,6 @@ impl WsManager {
pub async fn start(&self, token: String) -> Result<(), FlowyError> { pub async fn start(&self, token: String) -> Result<(), FlowyError> {
let addr = format!("{}/{}", self.addr, token); let addr = format!("{}/{}", self.addr, token);
self.inner.stop_connect().await?; self.inner.stop_connect().await?;
let _ = self.inner.start_connect(addr).await?; let _ = self.inner.start_connect(addr).await?;
Ok(()) Ok(())
} }
@ -79,28 +77,33 @@ impl WsManager {
pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError> { self.inner.ws_sender() } pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError> { self.inner.ws_sender() }
} }
#[tracing::instrument(level = "debug", skip(ws))] #[tracing::instrument(level = "debug", skip(manager))]
fn listen_on_websocket(ws: Arc<dyn FlowyWebSocket>) { pub fn listen_on_websocket(manager: Arc<WsManager>) {
let mut notify = ws.subscribe_connect_state(); if cfg!(feature = "http_server") {
let _ = tokio::spawn(async move { let ws = manager.inner.clone();
loop { let mut notify = manager.inner.subscribe_connect_state();
match notify.recv().await { let _ = tokio::spawn(async move {
Ok(state) => { loop {
tracing::info!("Websocket state changed: {}", state); match notify.recv().await {
match state { Ok(state) => {
WSConnectState::Init => {}, tracing::info!("Websocket state changed: {}", state);
WSConnectState::Connected => {}, match state {
WSConnectState::Connecting => {}, WSConnectState::Init => {},
WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await, WSConnectState::Connected => {},
} WSConnectState::Connecting => {},
}, WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await,
Err(e) => { }
tracing::error!("Websocket state notify error: {:?}", e); },
break; Err(e) => {
}, tracing::error!("Websocket state notify error: {:?}", e);
break;
},
}
} }
} });
}); } else {
// do nothing
};
} }
async fn retry_connect(ws: Arc<dyn FlowyWebSocket>, count: usize) { async fn retry_connect(ws: Arc<dyn FlowyWebSocket>, count: usize) {

View File

@ -5,7 +5,10 @@ use crate::deps_resolve::{DocumentDepsResolver, WorkspaceDepsResolver};
use backend_service::configuration::ClientServerConfiguration; use backend_service::configuration::ClientServerConfiguration;
use flowy_core::{errors::FlowyError, module::init_core, prelude::CoreContext}; use flowy_core::{errors::FlowyError, module::init_core, prelude::CoreContext};
use flowy_document::module::FlowyDocument; use flowy_document::module::FlowyDocument;
use flowy_net::{entities::NetworkType, services::ws::WsManager}; use flowy_net::{
entities::NetworkType,
services::ws::{listen_on_websocket, WsManager},
};
use flowy_user::{ use flowy_user::{
prelude::UserStatus, prelude::UserStatus,
services::user::{UserSession, UserSessionConfig}, services::user::{UserSession, UserSessionConfig},
@ -113,8 +116,10 @@ fn _init(
dispatch.spawn(async move { dispatch.spawn(async move {
user_session.init(); user_session.init();
_listen_user_status(ws_manager, subscribe_user_status, core.clone()).await; listen_on_websocket(ws_manager.clone());
_listen_user_status(ws_manager.clone(), subscribe_user_status, core.clone()).await;
}); });
dispatch.spawn(async move { dispatch.spawn(async move {
_listen_network_status(subscribe_network_type, cloned_core).await; _listen_network_status(subscribe_network_type, cloned_core).await;
}); });