From 84d5d2c2f19e929b91606bdde9f2feadd0bead39 Mon Sep 17 00:00:00 2001 From: appflowy Date: Tue, 14 Dec 2021 20:50:07 +0800 Subject: [PATCH] start ws connect after sign up --- .../flowy-net/src/services/mock/ws_mock.rs | 4 +- .../flowy-net/src/services/ws/conn.rs | 3 +- .../flowy-net/src/services/ws/manager.rs | 68 +++++++++++-------- .../flowy-net/src/services/ws/ws_local.rs | 4 +- frontend/rust-lib/flowy-sdk/src/lib.rs | 3 + shared-lib/lib-ws/src/ws.rs | 9 +-- 6 files changed, 56 insertions(+), 35 deletions(-) diff --git a/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs b/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs index b79b2c8f0b..db1981654a 100644 --- a/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs +++ b/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs @@ -61,7 +61,9 @@ impl FlowyWebSocket for Arc { FutureResult::new(async { Ok(()) }) } - fn conn_state_subscribe(&self) -> Receiver { self.state_sender.subscribe() } + fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } + + fn subscribe_connect_state(&self) -> Receiver { self.state_sender.subscribe() } fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } diff --git a/frontend/rust-lib/flowy-net/src/services/ws/conn.rs b/frontend/rust-lib/flowy-net/src/services/ws/conn.rs index fc16da07fa..b8faaa273a 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws/conn.rs +++ b/frontend/rust-lib/flowy-net/src/services/ws/conn.rs @@ -7,7 +7,8 @@ pub use lib_ws::{WsConnectState, WsMessage, WsMessageHandler}; pub trait FlowyWebSocket: Send + Sync { fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError>; - fn conn_state_subscribe(&self) -> broadcast::Receiver; + fn stop_connect(&self) -> FutureResult<(), FlowyError>; + fn subscribe_connect_state(&self) -> broadcast::Receiver; fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>; fn add_handler(&self, handler: Arc) -> Result<(), FlowyError>; fn ws_sender(&self) -> Result, FlowyError>; diff --git a/frontend/rust-lib/flowy-net/src/services/ws/manager.rs b/frontend/rust-lib/flowy-net/src/services/ws/manager.rs index 928f3464e9..d8b1d00e01 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws/manager.rs +++ b/frontend/rust-lib/flowy-net/src/services/ws/manager.rs @@ -23,8 +23,8 @@ impl WsManager { } else { local_web_socket() }; - let (status_notifier, _) = broadcast::channel(10); + listen_on_websocket(ws.clone()); WsManager { inner: ws, connect_type: RwLock::new(NetworkType::default()), @@ -35,11 +35,14 @@ impl WsManager { pub async fn start(&self, token: String) -> Result<(), FlowyError> { let addr = format!("{}/{}", self.addr, token); - self.listen_on_websocket(); + self.inner.stop_connect().await; + let _ = self.inner.start_connect(addr).await?; Ok(()) } + pub async fn stop(&self) { self.inner.stop_connect().await; } + pub fn update_network_type(&self, new_type: &NetworkType) { tracing::debug!("Network new state: {:?}", new_type); let old_type = self.connect_type.read().clone(); @@ -62,33 +65,10 @@ impl WsManager { } } - #[tracing::instrument(level = "debug", skip(self))] - fn listen_on_websocket(&self) { - let mut notify = self.inner.conn_state_subscribe(); - let ws = self.inner.clone(); - let _ = tokio::spawn(async move { - loop { - match notify.recv().await { - Ok(state) => { - tracing::info!("Websocket state changed: {}", state); - match state { - WsConnectState::Init => {}, - WsConnectState::Connected => {}, - WsConnectState::Connecting => {}, - WsConnectState::Disconnected => retry_connect(ws.clone(), 100).await, - } - }, - Err(e) => { - tracing::error!("Websocket state notify error: {:?}", e); - break; - }, - } - } - }); + pub fn subscribe_websocket_state(&self) -> broadcast::Receiver { + self.inner.subscribe_connect_state() } - pub fn subscribe_websocket_state(&self) -> broadcast::Receiver { self.inner.conn_state_subscribe() } - pub fn subscribe_network_ty(&self) -> broadcast::Receiver { self.status_notifier.subscribe() } pub fn add_handler(&self, handler: Arc) -> Result<(), FlowyError> { @@ -99,6 +79,30 @@ impl WsManager { pub fn ws_sender(&self) -> Result, FlowyError> { self.inner.ws_sender() } } +#[tracing::instrument(level = "debug", skip(ws))] +fn listen_on_websocket(ws: Arc) { + let mut notify = ws.subscribe_connect_state(); + let _ = tokio::spawn(async move { + loop { + match notify.recv().await { + Ok(state) => { + tracing::info!("Websocket state changed: {}", state); + match state { + WsConnectState::Init => {}, + WsConnectState::Connected => {}, + WsConnectState::Connecting => {}, + WsConnectState::Disconnected => retry_connect(ws.clone(), 100).await, + } + }, + Err(e) => { + tracing::error!("Websocket state notify error: {:?}", e); + break; + }, + } + } + }); +} + async fn retry_connect(ws: Arc, count: usize) { match ws.reconnect(count).await { Ok(_) => {}, @@ -117,7 +121,15 @@ impl FlowyWebSocket for Arc { }) } - fn conn_state_subscribe(&self) -> Receiver { self.state_subscribe() } + fn stop_connect(&self) -> FutureResult<(), FlowyError> { + let controller = self.clone(); + FutureResult::new(async move { + controller.stop().await; + Ok(()) + }) + } + + fn subscribe_connect_state(&self) -> Receiver { self.subscribe_state() } fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> { let cloned_ws = self.clone(); diff --git a/frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs b/frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs index 984dee7837..fdace175b8 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs +++ b/frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs @@ -22,7 +22,9 @@ impl std::default::Default for LocalWebSocket { impl FlowyWebSocket for Arc { fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - fn conn_state_subscribe(&self) -> Receiver { self.state_sender.subscribe() } + fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } + + fn subscribe_connect_state(&self) -> Receiver { self.state_sender.subscribe() } fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index 55efc14fa8..68c15436aa 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -134,12 +134,15 @@ async fn _listen_user_status( }, UserStatus::Logout { .. } => { core.user_did_logout().await; + let _ = ws_manager.stop().await; }, UserStatus::Expired { .. } => { core.user_session_expired().await; + let _ = ws_manager.stop().await; }, UserStatus::SignUp { profile, ret } => { let _ = core.user_did_sign_up(&profile.token).await?; + let _ = ws_manager.start(profile.token.clone()).await?; let _ = ret.send(()); }, } diff --git a/shared-lib/lib-ws/src/ws.rs b/shared-lib/lib-ws/src/ws.rs index 8b1459ba1c..b33870dca9 100644 --- a/shared-lib/lib-ws/src/ws.rs +++ b/shared-lib/lib-ws/src/ws.rs @@ -70,11 +70,12 @@ impl WsController { pub async fn start(&self, addr: String) -> Result<(), ServerError> { *self.addr.write() = Some(addr.clone()); - let strategy = FixedInterval::from_millis(5000).take(3); self.connect(addr, strategy).await } + pub async fn stop(&self) { self.sender_ctrl.write().set_state(WsConnectState::Disconnected); } + async fn connect(&self, addr: String, strategy: T) -> Result<(), ServerError> where T: IntoIterator, @@ -130,7 +131,7 @@ impl WsController { self.connect(addr, strategy).await } - pub fn state_subscribe(&self) -> broadcast::Receiver { self.state_notify.subscribe() } + pub fn subscribe_state(&self) -> broadcast::Receiver { self.state_notify.subscribe() } pub fn sender(&self) -> Result, WsError> { match self.sender_ctrl.read().sender() { @@ -359,8 +360,8 @@ impl WsSenderController { self.sender = None; } - self.state = state.clone(); - let _ = self.state_notify.send(state); + self.state = state; + let _ = self.state_notify.send(self.state.clone()); } fn set_error(&mut self, error: WsError) {