From 24c1817c8df46e7d1a33fae7353cedba2bb16be8 Mon Sep 17 00:00:00 2001 From: appflowy Date: Mon, 24 Jan 2022 16:27:40 +0800 Subject: [PATCH] config web socket --- frontend/Makefile.toml | 2 +- .../rust-lib/flowy-core/src/controller.rs | 7 +- .../rust-lib/flowy-document/src/controller.rs | 12 +- .../rust-lib/flowy-net/src/local_server/ws.rs | 11 +- .../rust-lib/flowy-net/src/ws/connection.rs | 15 +- frontend/rust-lib/flowy-net/src/ws/http_ws.rs | 20 +- .../src/deps_resolve/document_deps.rs | 22 ++- .../flowy-sdk/src/deps_resolve/folder_deps.rs | 42 ++++- frontend/rust-lib/flowy-sdk/src/lib.rs | 111 +++++------ .../rust-lib/flowy-sync/src/ws_manager.rs | 8 +- frontend/rust-lib/flowy-test/src/lib.rs | 2 +- .../flowy-user/src/services/user_session.rs | 6 +- shared-lib/lib-ws/src/ws.rs | 172 ++++++++++-------- 13 files changed, 244 insertions(+), 186 deletions(-) diff --git a/frontend/Makefile.toml b/frontend/Makefile.toml index 8037f6bd03..9f3d9ea57f 100644 --- a/frontend/Makefile.toml +++ b/frontend/Makefile.toml @@ -15,7 +15,7 @@ CARGO_MAKE_EXTEND_WORKSPACE_MAKEFILE = true CARGO_MAKE_CRATE_FS_NAME = "dart_ffi" CARGO_MAKE_CRATE_NAME = "dart-ffi" VERSION = "0.0.2" -FEATURES = "flutter, http_server" +FEATURES = "flutter" PRODUCT_NAME = "AppFlowy" #CRATE_TYPE: https://doc.rust-lang.org/reference/linkage.html CRATE_TYPE = "staticlib" diff --git a/frontend/rust-lib/flowy-core/src/controller.rs b/frontend/rust-lib/flowy-core/src/controller.rs index deb6785a62..16e5410d23 100644 --- a/frontend/rust-lib/flowy-core/src/controller.rs +++ b/frontend/rust-lib/flowy-core/src/controller.rs @@ -140,7 +140,7 @@ impl FolderManager { } } - pub async fn initialize(&self, user_id: &str) -> FlowyResult<()> { + pub async fn initialize(&self, user_id: &str, token: &str) -> FlowyResult<()> { let mut write_guard = INIT_FOLDER_FLAG.write().await; if let Some(is_init) = write_guard.get(user_id) { if *is_init { @@ -150,9 +150,8 @@ impl FolderManager { let folder_id = FolderId::new(user_id); let _ = self.persistence.initialize(user_id, &folder_id).await?; - let token = self.user.token()?; let pool = self.persistence.db_pool()?; - let folder_editor = FolderEditor::new(user_id, &folder_id, &token, pool, self.web_socket.clone()).await?; + let folder_editor = FolderEditor::new(user_id, &folder_id, token, pool, self.web_socket.clone()).await?; *self.folder_editor.write().await = Some(Arc::new(folder_editor)); let _ = self.app_controller.initialize()?; @@ -163,7 +162,7 @@ impl FolderManager { pub async fn initialize_with_new_user(&self, user_id: &str, token: &str) -> FlowyResult<()> { DefaultFolderBuilder::build(token, user_id, self.persistence.clone(), self.view_controller.clone()).await?; - self.initialize(user_id).await + self.initialize(user_id, token).await } pub async fn clear(&self) { *self.folder_editor.write().await = None; } diff --git a/frontend/rust-lib/flowy-document/src/controller.rs b/frontend/rust-lib/flowy-document/src/controller.rs index 814fa8880a..b5d7d63703 100644 --- a/frontend/rust-lib/flowy-document/src/controller.rs +++ b/frontend/rust-lib/flowy-document/src/controller.rs @@ -9,7 +9,7 @@ use flowy_collaboration::entities::{ }; use flowy_database::ConnectionPool; use flowy_error::FlowyResult; -use flowy_sync::{RevisionCache, RevisionCloudService, RevisionManager, RevisionWebSocket, WSStateReceiver}; +use flowy_sync::{RevisionCache, RevisionCloudService, RevisionManager, RevisionWebSocket}; use lib_infra::future::FutureResult; use lib_ws::WSConnectState; use std::{convert::TryInto, sync::Arc}; @@ -53,8 +53,7 @@ impl FlowyDocumentManager { } pub fn init(&self) -> FlowyResult<()> { - let notify = self.web_socket.subscribe_state_changed(); - listen_ws_state_changed(notify, self.ws_receivers.clone()); + listen_ws_state_changed(self.web_socket.clone(), self.ws_receivers.clone()); Ok(()) } @@ -234,10 +233,11 @@ impl OpenDocCache { } } -#[tracing::instrument(level = "trace", skip(state_receiver, receivers))] -fn listen_ws_state_changed(mut state_receiver: WSStateReceiver, receivers: WebSocketDataReceivers) { +#[tracing::instrument(level = "trace", skip(web_socket, receivers))] +fn listen_ws_state_changed(web_socket: Arc, receivers: WebSocketDataReceivers) { tokio::spawn(async move { - while let Ok(state) = state_receiver.recv().await { + let mut notify = web_socket.subscribe_state_changed().await; + while let Ok(state) = notify.recv().await { for receiver in receivers.iter() { receiver.value().connect_state_changed(state.clone()); } diff --git a/frontend/rust-lib/flowy-net/src/local_server/ws.rs b/frontend/rust-lib/flowy-net/src/local_server/ws.rs index 4bae2b13e1..08733634b5 100644 --- a/frontend/rust-lib/flowy-net/src/local_server/ws.rs +++ b/frontend/rust-lib/flowy-net/src/local_server/ws.rs @@ -1,6 +1,7 @@ use crate::ws::connection::{FlowyRawWebSocket, FlowyWebSocket}; use dashmap::DashMap; use flowy_error::FlowyError; +use futures_util::future::BoxFuture; use lib_infra::future::FutureResult; use lib_ws::{WSChannel, WSConnectState, WSMessageReceiver, WebSocketRawMessage}; use parking_lot::RwLock; @@ -56,7 +57,10 @@ impl FlowyRawWebSocket for LocalWebSocket { fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - fn subscribe_connect_state(&self) -> Receiver { self.state_sender.subscribe() } + fn subscribe_connect_state(&self) -> BoxFuture> { + let subscribe = self.state_sender.subscribe(); + Box::pin(async move { subscribe }) + } fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } @@ -66,8 +70,9 @@ impl FlowyRawWebSocket for LocalWebSocket { Ok(()) } - fn ws_msg_sender(&self) -> Result, FlowyError> { - Ok(Arc::new(LocalWebSocketAdaptor(self.server_ws_sender.clone()))) + fn ws_msg_sender(&self) -> FutureResult>, FlowyError> { + let ws: Arc = Arc::new(LocalWebSocketAdaptor(self.server_ws_sender.clone())); + FutureResult::new(async move { Ok(Some(ws)) }) } } diff --git a/frontend/rust-lib/flowy-net/src/ws/connection.rs b/frontend/rust-lib/flowy-net/src/ws/connection.rs index cc779c71fd..d94504dbc4 100644 --- a/frontend/rust-lib/flowy-net/src/ws/connection.rs +++ b/frontend/rust-lib/flowy-net/src/ws/connection.rs @@ -4,6 +4,7 @@ pub use flowy_error::FlowyError; use lib_infra::future::FutureResult; pub use lib_ws::{WSConnectState, WSMessageReceiver, WebSocketRawMessage}; +use futures_util::future::BoxFuture; use lib_ws::WSController; use parking_lot::RwLock; use std::sync::Arc; @@ -13,10 +14,10 @@ pub trait FlowyRawWebSocket: Send + Sync { fn initialize(&self) -> FutureResult<(), FlowyError>; fn start_connect(&self, addr: String, user_id: String) -> FutureResult<(), FlowyError>; fn stop_connect(&self) -> FutureResult<(), FlowyError>; - fn subscribe_connect_state(&self) -> broadcast::Receiver; + fn subscribe_connect_state(&self) -> BoxFuture>; fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>; fn add_msg_receiver(&self, receiver: Arc) -> Result<(), FlowyError>; - fn ws_msg_sender(&self) -> Result, FlowyError>; + fn ws_msg_sender(&self) -> FutureResult>, FlowyError>; } pub trait FlowyWebSocket: Send + Sync { @@ -90,8 +91,8 @@ impl FlowyWebSocketConnect { } } - pub fn subscribe_websocket_state(&self) -> broadcast::Receiver { - self.inner.subscribe_connect_state() + pub async fn subscribe_websocket_state(&self) -> broadcast::Receiver { + self.inner.subscribe_connect_state().await } pub fn subscribe_network_ty(&self) -> broadcast::Receiver { self.status_notifier.subscribe() } @@ -101,14 +102,16 @@ impl FlowyWebSocketConnect { Ok(()) } - pub fn web_socket(&self) -> Result, FlowyError> { self.inner.ws_msg_sender() } + pub async fn web_socket(&self) -> Result>, FlowyError> { + self.inner.ws_msg_sender().await + } } #[tracing::instrument(level = "debug", skip(ws_conn))] pub fn listen_on_websocket(ws_conn: Arc) { let raw_web_socket = ws_conn.inner.clone(); - let mut notify = ws_conn.inner.subscribe_connect_state(); let _ = tokio::spawn(async move { + let mut notify = ws_conn.inner.subscribe_connect_state().await; loop { match notify.recv().await { Ok(state) => { diff --git a/frontend/rust-lib/flowy-net/src/ws/http_ws.rs b/frontend/rust-lib/flowy-net/src/ws/http_ws.rs index 15c6f081ff..73e71787bb 100644 --- a/frontend/rust-lib/flowy-net/src/ws/http_ws.rs +++ b/frontend/rust-lib/flowy-net/src/ws/http_ws.rs @@ -5,6 +5,7 @@ use lib_infra::future::FutureResult; pub use lib_ws::{WSConnectState, WSMessageReceiver, WebSocketRawMessage}; use lib_ws::{WSController, WSSender}; +use futures_util::future::BoxFuture; use std::sync::Arc; use tokio::sync::broadcast::Receiver; @@ -27,7 +28,10 @@ impl FlowyRawWebSocket for Arc { }) } - fn subscribe_connect_state(&self) -> Receiver { self.subscribe_state() } + fn subscribe_connect_state(&self) -> BoxFuture> { + let cloned_ws = self.clone(); + Box::pin(async move { cloned_ws.subscribe_state().await }) + } fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> { let cloned_ws = self.clone(); @@ -42,9 +46,17 @@ impl FlowyRawWebSocket for Arc { Ok(()) } - fn ws_msg_sender(&self) -> Result, FlowyError> { - let sender = self.ws_message_sender().map_err(internal_error)?; - Ok(sender) + fn ws_msg_sender(&self) -> FutureResult>, FlowyError> { + let cloned_self = self.clone(); + FutureResult::new(async move { + match cloned_self.ws_message_sender().await.map_err(internal_error)? { + None => Ok(None), + Some(sender) => { + let sender = sender as Arc; + Ok(Some(sender)) + }, + } + }) } } diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index 5fe8bccb8f..5800037f14 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -15,6 +15,8 @@ use flowy_net::{ }; use flowy_sync::{RevisionWebSocket, WSStateReceiver}; use flowy_user::services::UserSession; +use futures_core::future::BoxFuture; +use lib_infra::future::BoxResultFuture; use lib_ws::{WSChannel, WSMessageReceiver, WebSocketRawMessage}; use std::{convert::TryInto, path::Path, sync::Arc}; @@ -62,18 +64,28 @@ impl DocumentUser for DocumentUserImpl { struct DocumentWebSocketImpl(Arc); impl RevisionWebSocket for DocumentWebSocketImpl { - fn send(&self, data: ClientRevisionWSData) -> Result<(), FlowyError> { + fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError> { let bytes: Bytes = data.try_into().unwrap(); let msg = WebSocketRawMessage { channel: WSChannel::Document, data: bytes.to_vec(), }; - let sender = self.0.web_socket()?; - sender.send(msg).map_err(internal_error)?; - Ok(()) + let ws_conn = self.0.clone(); + Box::pin(async move { + match ws_conn.web_socket().await? { + None => {}, + Some(sender) => { + sender.send(msg).map_err(internal_error)?; + }, + } + Ok(()) + }) } - fn subscribe_state_changed(&self) -> WSStateReceiver { self.0.subscribe_websocket_state() } + fn subscribe_state_changed(&self) -> BoxFuture { + let ws_conn = self.0.clone(); + Box::pin(async move { ws_conn.subscribe_websocket_state().await }) + } } struct DocumentWSMessageReceiverImpl(Arc); diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs index 86de2b4c18..9d2fbfd6fe 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs @@ -15,6 +15,8 @@ use flowy_net::{ }; use flowy_sync::{RevisionWebSocket, WSStateReceiver}; use flowy_user::services::UserSession; +use futures_core::future::BoxFuture; +use lib_infra::future::BoxResultFuture; use lib_ws::{WSChannel, WSMessageReceiver, WebSocketRawMessage}; use std::{convert::TryInto, sync::Arc}; @@ -35,8 +37,23 @@ impl FolderDepsResolver { Some(local_server) => local_server, }; - let folder_manager = - Arc::new(FolderManager::new(user, cloud_service, database, document_manager.clone(), web_socket).await); + let folder_manager = Arc::new( + FolderManager::new( + user.clone(), + cloud_service, + database, + document_manager.clone(), + web_socket, + ) + .await, + ); + + if let (Ok(user_id), Ok(token)) = (user.user_id(), user.token()) { + match folder_manager.initialize(&user_id, &token).await { + Ok(_) => {}, + Err(e) => tracing::error!("Initialize folder manager failed: {}", e), + } + } let receiver = Arc::new(FolderWSMessageReceiverImpl(folder_manager.clone())); ws_conn.add_ws_message_receiver(receiver).unwrap(); @@ -61,18 +78,29 @@ impl WorkspaceUser for WorkspaceUserImpl { struct FolderWebSocketImpl(Arc); impl RevisionWebSocket for FolderWebSocketImpl { - fn send(&self, data: ClientRevisionWSData) -> Result<(), FlowyError> { + fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError> { let bytes: Bytes = data.try_into().unwrap(); let msg = WebSocketRawMessage { channel: WSChannel::Folder, data: bytes.to_vec(), }; - let sender = self.0.web_socket()?; - sender.send(msg).map_err(internal_error)?; - Ok(()) + + let ws_conn = self.0.clone(); + Box::pin(async move { + match ws_conn.web_socket().await? { + None => {}, + Some(sender) => { + sender.send(msg).map_err(internal_error)?; + }, + } + Ok(()) + }) } - fn subscribe_state_changed(&self) -> WSStateReceiver { self.0.subscribe_websocket_state() } + fn subscribe_state_changed(&self) -> BoxFuture { + let ws_conn = self.0.clone(); + Box::pin(async move { ws_conn.subscribe_websocket_state().await }) + } } struct FolderWSMessageReceiverImpl(Arc); diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index 21bc98a789..e8f21a1213 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -22,7 +22,7 @@ use std::{ Arc, }, }; -use tokio::{runtime::Runtime, sync::broadcast}; +use tokio::sync::broadcast; static INIT_LOG: AtomicBool = AtomicBool::new(false); @@ -97,32 +97,37 @@ impl FlowySDK { init_kv(&config.root); tracing::debug!("🔥 {:?}", config); let runtime = tokio_default_runtime().unwrap(); - let ws_addr = config.server_config.ws_addr(); - let (local_server, ws_conn) = if cfg!(feature = "http_server") { - let ws_conn = Arc::new(FlowyWebSocketConnect::new(ws_addr)); - (None, ws_conn) - } else { - let context = flowy_net::local_server::build_server(&config.server_config); - let local_ws = Arc::new(context.local_ws); - let ws_conn = Arc::new(FlowyWebSocketConnect::from_local(ws_addr, local_ws)); - (Some(Arc::new(context.local_server)), ws_conn) - }; + let (local_server, ws_conn) = mk_local_server(&config.server_config); + let (user_session, document_manager, folder_manager, local_server) = runtime.block_on(async { + let user_session = mk_user_session(&config, &local_server, &config.server_config); + let document_manager = DocumentDepsResolver::resolve( + local_server.clone(), + ws_conn.clone(), + user_session.clone(), + &config.server_config, + ); - let user_session = mk_user_session(&config, &local_server, &config.server_config); - let document_manager = mk_document(&local_server, &ws_conn, &user_session, &config.server_config); - let folder_manager = mk_folder_manager( - &runtime, - &local_server, - &user_session, - &document_manager, - &config.server_config, - &ws_conn, - ); + let folder_manager = FolderDepsResolver::resolve( + local_server.clone(), + user_session.clone(), + &config.server_config, + &document_manager, + ws_conn.clone(), + ) + .await; + + if let Some(local_server) = local_server.as_ref() { + local_server.run(); + } + ws_conn.init().await; + (user_session, document_manager, folder_manager, local_server) + }); let dispatcher = Arc::new(EventDispatcher::construct(runtime, || { mk_modules(&ws_conn, &folder_manager, &user_session) })); - _init(&local_server, &dispatcher, &ws_conn, &user_session, &folder_manager); + + _start_listening(&dispatcher, &ws_conn, &user_session, &folder_manager); Self { config, @@ -138,8 +143,7 @@ impl FlowySDK { pub fn dispatcher(&self) -> Arc { self.dispatcher.clone() } } -fn _init( - local_server: &Option>, +fn _start_listening( dispatch: &EventDispatcher, ws_conn: &Arc, user_session: &Arc, @@ -149,17 +153,11 @@ fn _init( let subscribe_network_type = ws_conn.subscribe_network_ty(); let folder_manager = folder_manager.clone(); let cloned_folder_manager = folder_manager.clone(); - let user_session = user_session.clone(); let ws_conn = ws_conn.clone(); - let local_server = local_server.clone(); + let user_session = user_session.clone(); dispatch.spawn(async move { - if let Some(local_server) = local_server.as_ref() { - local_server.run(); - } - user_session.init(); - ws_conn.init().await; listen_on_websocket(ws_conn.clone()); _listen_user_status(ws_conn.clone(), subscribe_user_status, folder_manager.clone()).await; }); @@ -169,6 +167,21 @@ fn _init( }); } +fn mk_local_server( + server_config: &ClientServerConfiguration, +) -> (Option>, Arc) { + let ws_addr = server_config.ws_addr(); + if cfg!(feature = "http_server") { + let ws_conn = Arc::new(FlowyWebSocketConnect::new(ws_addr)); + (None, ws_conn) + } else { + let context = flowy_net::local_server::build_server(server_config); + let local_ws = Arc::new(context.local_ws); + let ws_conn = Arc::new(FlowyWebSocketConnect::from_local(ws_addr, local_ws)); + (Some(Arc::new(context.local_server)), ws_conn) + } +} + async fn _listen_user_status( ws_conn: Arc, mut subscribe: broadcast::Receiver, @@ -179,7 +192,7 @@ async fn _listen_user_status( match status { UserStatus::Login { token, user_id } => { tracing::trace!("User did login"); - let _ = folder_manager.initialize(&user_id).await?; + let _ = folder_manager.initialize(&user_id, &token).await?; let _ = ws_conn.start(token, user_id).await?; }, UserStatus::Logout { .. } => { @@ -244,37 +257,3 @@ fn mk_user_session( let cloud_service = UserDepsResolver::resolve(local_server, server_config); Arc::new(UserSession::new(user_config, cloud_service)) } - -fn mk_folder_manager( - runtime: &Runtime, - local_server: &Option>, - user_session: &Arc, - document_manager: &Arc, - server_config: &ClientServerConfiguration, - ws_conn: &Arc, -) -> Arc { - runtime.block_on(async { - FolderDepsResolver::resolve( - local_server.clone(), - user_session.clone(), - server_config, - document_manager, - ws_conn.clone(), - ) - .await - }) -} - -pub fn mk_document( - local_server: &Option>, - ws_conn: &Arc, - user_session: &Arc, - server_config: &ClientServerConfiguration, -) -> Arc { - DocumentDepsResolver::resolve( - local_server.clone(), - ws_conn.clone(), - user_session.clone(), - server_config, - ) -} diff --git a/frontend/rust-lib/flowy-sync/src/ws_manager.rs b/frontend/rust-lib/flowy-sync/src/ws_manager.rs index 292ed29863..6f66d29ea8 100644 --- a/frontend/rust-lib/flowy-sync/src/ws_manager.rs +++ b/frontend/rust-lib/flowy-sync/src/ws_manager.rs @@ -6,7 +6,7 @@ use flowy_collaboration::entities::{ ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSData, ServerRevisionWSDataType}, }; use flowy_error::{FlowyError, FlowyResult}; -use futures_util::stream::StreamExt; +use futures_util::{future::BoxFuture, stream::StreamExt}; use lib_infra::future::{BoxResultFuture, FutureResult}; use lib_ws::WSConnectState; use std::{collections::VecDeque, convert::TryFrom, fmt::Formatter, sync::Arc}; @@ -36,8 +36,8 @@ pub trait RevisionWSSinkDataProvider: Send + Sync { pub type WSStateReceiver = tokio::sync::broadcast::Receiver; pub trait RevisionWebSocket: Send + Sync + 'static { - fn send(&self, data: ClientRevisionWSData) -> Result<(), FlowyError>; - fn subscribe_state_changed(&self) -> WSStateReceiver; + fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError>; + fn subscribe_state_changed(&self) -> BoxFuture; } pub struct RevisionWebSocketManager { @@ -287,7 +287,7 @@ impl RevisionWSSink { }, Some(data) => { tracing::trace!("[{}]: send {}:{}-{:?}", self, data.object_id, data.id(), data.ty); - self.ws_sender.send(data) + self.ws_sender.send(data).await }, } } diff --git a/frontend/rust-lib/flowy-test/src/lib.rs b/frontend/rust-lib/flowy-test/src/lib.rs index 2ba8a20828..af47340ba7 100644 --- a/frontend/rust-lib/flowy-test/src/lib.rs +++ b/frontend/rust-lib/flowy-test/src/lib.rs @@ -35,7 +35,7 @@ impl std::default::Default for FlowySDKTest { impl FlowySDKTest { pub fn new(server_config: ClientServerConfiguration) -> Self { let config = FlowySDKConfig::new(&root_dir(), server_config, &uuid_string()).log_filter("trace"); - let sdk = FlowySDK::new(config); + let sdk = std::thread::spawn(|| FlowySDK::new(config)).join().unwrap(); std::mem::forget(sdk.dispatcher()); Self { inner: sdk } } diff --git a/frontend/rust-lib/flowy-user/src/services/user_session.rs b/frontend/rust-lib/flowy-user/src/services/user_session.rs index 2daf1c24e3..e26fc3967a 100644 --- a/frontend/rust-lib/flowy-user/src/services/user_session.rs +++ b/frontend/rust-lib/flowy-user/src/services/user_session.rs @@ -88,7 +88,7 @@ impl UserSession { #[tracing::instrument(level = "debug", skip(self))] pub async fn sign_in(&self, params: SignInParams) -> Result { - if self.is_login(¶ms.email) { + if self.is_user_login(¶ms.email) { self.user_profile().await } else { let resp = self.cloud_service.sign_in(params).await?; @@ -103,7 +103,7 @@ impl UserSession { #[tracing::instrument(level = "debug", skip(self))] pub async fn sign_up(&self, params: SignUpParams) -> Result { - if self.is_login(¶ms.email) { + if self.is_user_login(¶ms.email) { self.user_profile().await } else { let resp = self.cloud_service.sign_up(params).await?; @@ -263,7 +263,7 @@ impl UserSession { } } - fn is_login(&self, email: &str) -> bool { + fn is_user_login(&self, email: &str) -> bool { match self.get_session() { Ok(session) => session.email == email, Err(_) => false, diff --git a/shared-lib/lib-ws/src/ws.rs b/shared-lib/lib-ws/src/ws.rs index 3454dc2b87..5d88f29c72 100644 --- a/shared-lib/lib-ws/src/ws.rs +++ b/shared-lib/lib-ws/src/ws.rs @@ -11,7 +11,6 @@ use dashmap::DashMap; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures_core::{ready, Stream}; use lib_infra::retry::{Action, FixedInterval, Retry}; -use parking_lot::RwLock; use pin_project::pin_project; use std::{ convert::TryFrom, @@ -22,7 +21,7 @@ use std::{ task::{Context, Poll}, time::Duration, }; -use tokio::sync::{broadcast, oneshot}; +use tokio::sync::{broadcast, oneshot, RwLock}; use tokio_tungstenite::tungstenite::{ protocol::{frame::coding::CloseCode, CloseFrame}, Message, @@ -39,19 +38,22 @@ pub trait WSMessageReceiver: Sync + Send + 'static { pub struct WSController { handlers: Handlers, - state_notify: Arc>, - sender_ctrl: Arc>, addr: Arc>>, + sender: Arc>>>, + conn_state_notify: Arc>, +} + +impl std::fmt::Display for WSController { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("WebSocket") } } impl std::default::Default for WSController { fn default() -> Self { - let (state_notify, _) = broadcast::channel(16); Self { handlers: DashMap::new(), - sender_ctrl: Arc::new(RwLock::new(WSSenderController::default())), - state_notify: Arc::new(state_notify), addr: Arc::new(RwLock::new(None)), + sender: Arc::new(RwLock::new(None)), + conn_state_notify: Arc::new(RwLock::new(WSConnectStateNotifier::default())), } } } @@ -62,36 +64,52 @@ impl WSController { pub fn add_ws_message_receiver(&self, handler: Arc) -> Result<(), WSError> { let source = handler.source(); if self.handlers.contains_key(&source) { - log::error!("WsSource's {:?} is already registered", source); + log::error!("{:?} is already registered", source); } self.handlers.insert(source, handler); Ok(()) } pub async fn start(&self, addr: String) -> Result<(), ServerError> { - *self.addr.write() = Some(addr.clone()); + *self.addr.write().await = Some(addr.clone()); let strategy = FixedInterval::from_millis(5000).take(3); self.connect(addr, strategy).await } - pub async fn stop(&self) { self.sender_ctrl.write().set_state(WSConnectState::Disconnected); } + pub async fn stop(&self) { + if self.conn_state_notify.read().await.conn_state.is_connected() { + tracing::trace!("[{}] stop", self); + self.conn_state_notify + .write() + .await + .update_state(WSConnectState::Disconnected); + } + } async fn connect(&self, addr: String, strategy: T) -> Result<(), ServerError> where T: IntoIterator, I: Iterator + Send + 'static, { + let mut conn_state_notify = self.conn_state_notify.write().await; + let conn_state = conn_state_notify.conn_state.clone(); + if conn_state.is_connected() || conn_state.is_connecting() { + return Ok(()); + } + let (ret, rx) = oneshot::channel::>(); - *self.addr.write() = Some(addr.clone()); + *self.addr.write().await = Some(addr.clone()); let action = WSConnectAction { addr, handlers: self.handlers.clone(), }; - let retry = Retry::spawn(strategy, action); - let sender_ctrl = self.sender_ctrl.clone(); - sender_ctrl.write().set_state(WSConnectState::Connecting); + conn_state_notify.update_state(WSConnectState::Connecting); + drop(conn_state_notify); + let cloned_conn_state = self.conn_state_notify.clone(); + let cloned_sender = self.sender.clone(); + tracing::trace!("[{}] start connecting", self); tokio::spawn(async move { match retry.await { Ok(result) => { @@ -100,30 +118,36 @@ impl WSController { handlers_fut, sender, } = result; - sender_ctrl.write().set_sender(sender); - sender_ctrl.write().set_state(WSConnectState::Connected); + + cloned_conn_state.write().await.update_state(WSConnectState::Connected); + *cloned_sender.write().await = Some(Arc::new(sender)); + let _ = ret.send(Ok(())); - spawn_stream_and_handlers(stream, handlers_fut, sender_ctrl.clone()).await; + spawn_stream_and_handlers(stream, handlers_fut).await; }, Err(e) => { - sender_ctrl.write().set_error(e.clone()); + cloned_conn_state + .write() + .await + .update_state(WSConnectState::Disconnected); let _ = ret.send(Err(ServerError::internal().context(e))); }, } }); - rx.await? } pub async fn retry(&self, count: usize) -> Result<(), ServerError> { - if !self.sender_ctrl.read().is_disconnected() { + if !self.conn_state_notify.read().await.conn_state.is_disconnected() { return Ok(()); } + tracing::trace!("[WebSocket]: retry connect..."); let strategy = FixedInterval::from_millis(5000).take(count); let addr = self .addr .read() + .await .as_ref() .expect("Retry web socket connection failed, should call start_connect first") .clone(); @@ -131,25 +155,30 @@ impl WSController { self.connect(addr, strategy).await } - pub fn subscribe_state(&self) -> broadcast::Receiver { self.state_notify.subscribe() } + pub async fn subscribe_state(&self) -> broadcast::Receiver { + self.conn_state_notify.read().await.notify.subscribe() + } - pub fn ws_message_sender(&self) -> Result, WSError> { - match self.sender_ctrl.read().sender() { - None => Err(WSError::internal().context("WebSocket is not initialized, should call connect first")), - Some(sender) => Ok(sender), + pub async fn ws_message_sender(&self) -> Result>, WSError> { + let sender = self.sender.read().await.clone(); + match sender { + None => match self.conn_state_notify.read().await.conn_state { + WSConnectState::Disconnected => { + let msg = "WebSocket is disconnected"; + Err(WSError::internal().context(msg)) + }, + _ => Ok(None), + }, + Some(sender) => Ok(Some(sender)), } } } -async fn spawn_stream_and_handlers( - stream: WSStream, - handlers: WSHandlerFuture, - sender_ctrl: Arc>, -) { +async fn spawn_stream_and_handlers(stream: WSStream, handlers: WSHandlerFuture) { tokio::select! { result = stream => { if let Err(e) = result { - sender_ctrl.write().set_error(e); + tracing::error!("WSStream error: {:?}", e); } }, result = handlers => tracing::debug!("handlers completed {:?}", result), @@ -201,15 +230,13 @@ impl Future for WSHandlerFuture { } #[derive(Debug, Clone)] -pub struct WSSender { - ws_tx: MsgSender, -} +pub struct WSSender(MsgSender); impl WSSender { pub fn send_msg>(&self, msg: T) -> Result<(), WSError> { let msg = msg.into(); let _ = self - .ws_tx + .0 .unbounded_send(msg.into()) .map_err(|e| WSError::internal().context(e))?; Ok(()) @@ -237,10 +264,7 @@ impl WSSender { reason: reason.to_owned().into(), }; let msg = Message::Close(Some(frame)); - let _ = self - .ws_tx - .unbounded_send(msg) - .map_err(|e| WSError::internal().context(e))?; + let _ = self.0.unbounded_send(msg).map_err(|e| WSError::internal().context(e))?; Ok(()) } } @@ -291,7 +315,7 @@ impl WSConnectActionFut { // └───────────────┘ └──────────────┘ let (msg_tx, msg_rx) = futures_channel::mpsc::unbounded(); let (ws_tx, ws_rx) = futures_channel::mpsc::unbounded(); - let sender = WSSender { ws_tx }; + let sender = WSSender(ws_tx); let handlers_fut = WSHandlerFuture::new(handlers, msg_rx); let conn = WSConnectionFuture::new(msg_tx, ws_rx, addr.clone()); Self { @@ -330,12 +354,20 @@ pub enum WSConnectState { Disconnected, } +impl WSConnectState { + fn is_connected(&self) -> bool { self == &WSConnectState::Connected } + + fn is_connecting(&self) -> bool { self == &WSConnectState::Connecting } + + fn is_disconnected(&self) -> bool { self == &WSConnectState::Disconnected || self == &WSConnectState::Init } +} + impl std::fmt::Display for WSConnectState { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { WSConnectState::Init => f.write_str("Init"), - WSConnectState::Connected => f.write_str("Connecting"), - WSConnectState::Connecting => f.write_str("Connected"), + WSConnectState::Connected => f.write_str("Connected"), + WSConnectState::Connecting => f.write_str("Connecting"), WSConnectState::Disconnected => f.write_str("Disconnected"), } } @@ -345,44 +377,32 @@ impl std::fmt::Debug for WSConnectState { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str(&format!("{}", self)) } } -struct WSSenderController { - state: WSConnectState, - state_notify: Arc>, - sender: Option>, +struct WSConnectStateNotifier { + conn_state: WSConnectState, + notify: Arc>, } -impl WSSenderController { - fn set_sender(&mut self, sender: WSSender) { self.sender = Some(Arc::new(sender)); } - - fn set_state(&mut self, state: WSConnectState) { - if state != WSConnectState::Connected { - self.sender = None; - } - - self.state = state; - let _ = self.state_notify.send(self.state.clone()); - } - - fn set_error(&mut self, error: WSError) { - log::error!("{:?}", error); - self.set_state(WSConnectState::Disconnected); - } - - fn sender(&self) -> Option> { self.sender.clone() } - - #[allow(dead_code)] - fn is_connecting(&self) -> bool { self.state == WSConnectState::Connecting } - - fn is_disconnected(&self) -> bool { self.state == WSConnectState::Disconnected } -} - -impl std::default::Default for WSSenderController { +impl std::default::Default for WSConnectStateNotifier { fn default() -> Self { let (state_notify, _) = broadcast::channel(16); - WSSenderController { - state: WSConnectState::Init, - state_notify: Arc::new(state_notify), - sender: None, + Self { + conn_state: WSConnectState::Init, + notify: Arc::new(state_notify), } } } + +impl WSConnectStateNotifier { + fn update_state(&mut self, new_state: WSConnectState) { + if self.conn_state == new_state { + return; + } + tracing::debug!( + "WebSocket connect state did change: {} -> {}", + self.conn_state, + new_state + ); + self.conn_state = new_state.clone(); + let _ = self.notify.send(new_state); + } +}