diff --git a/frontend/appflowy_tauri/src-tauri/Cargo.lock b/frontend/appflowy_tauri/src-tauri/Cargo.lock index bba444b294..1c83fa3c34 100644 --- a/frontend/appflowy_tauri/src-tauri/Cargo.lock +++ b/frontend/appflowy_tauri/src-tauri/Cargo.lock @@ -1215,6 +1215,7 @@ dependencies = [ "revision-model", "tokio", "tracing", + "user-model", "ws-model", ] diff --git a/frontend/appflowy_tauri/src/services/backend/index.ts b/frontend/appflowy_tauri/src/services/backend/index.ts index 77180e0736..0dd1c40eac 100644 --- a/frontend/appflowy_tauri/src/services/backend/index.ts +++ b/frontend/appflowy_tauri/src/services/backend/index.ts @@ -1,6 +1,6 @@ -export * from './classes/flowy-user'; -export * from './classes/flowy-document'; -export * from './classes/flowy-grid'; -export * from './classes/flowy-folder'; -export * from './classes/flowy-net'; -export * from './classes/flowy-error'; +export * from "./classes/flowy-user"; +export * from "./classes/flowy-document"; +export * from "./classes/flowy-database"; +export * from "./classes/flowy-folder"; +export * from "./classes/flowy-net"; +export * from "./classes/flowy-error"; diff --git a/frontend/rust-lib/Cargo.lock b/frontend/rust-lib/Cargo.lock index 6ad813a906..7c8e078e56 100644 --- a/frontend/rust-lib/Cargo.lock +++ b/frontend/rust-lib/Cargo.lock @@ -1003,6 +1003,7 @@ dependencies = [ "revision-model", "tokio", "tracing", + "user-model", "ws-model", ] diff --git a/frontend/rust-lib/flowy-core/Cargo.toml b/frontend/rust-lib/flowy-core/Cargo.toml index a675ab2b89..653447e75f 100644 --- a/frontend/rust-lib/flowy-core/Cargo.toml +++ b/frontend/rust-lib/flowy-core/Cargo.toml @@ -10,12 +10,13 @@ lib-dispatch = { path = "../lib-dispatch" } lib-log = { path = "../lib-log" } flowy-user = { path = "../flowy-user" } flowy-net = { path = "../flowy-net" } -flowy-folder = { path = "../flowy-folder", default-features = false } -flowy-database = { path = "../flowy-database", default-features = false } +flowy-folder = { path = "../flowy-folder" } +flowy-database = { path = "../flowy-database" } grid-model = { path = "../../../shared-lib/grid-model" } +user-model = { path = "../../../shared-lib/user-model" } flowy-client-ws = { path = "../../../shared-lib/flowy-client-ws" } flowy-sqlite = { path = "../flowy-sqlite", optional = true } -flowy-document = { path = "../flowy-document", default-features = false } +flowy-document = { path = "../flowy-document" } flowy-revision = { path = "../flowy-revision" } flowy-error = { path = "../flowy-error", features = ["adaptor_ws"] } flowy-task = { path = "../flowy-task" } @@ -32,6 +33,7 @@ lib-ws = { path = "../../../shared-lib/lib-ws" } lib-infra = { path = "../../../shared-lib/lib-infra" } [features] +default = ["rev-sqlite"] http_sync = ["flowy-folder/cloud_sync", "flowy-document/cloud_sync"] native_sync = ["flowy-folder/cloud_sync", "flowy-document/cloud_sync"] use_bunyan = ["lib-log/use_bunyan"] diff --git a/frontend/rust-lib/flowy-core/src/lib.rs b/frontend/rust-lib/flowy-core/src/lib.rs index 2f31306f7f..eab047a5fa 100644 --- a/frontend/rust-lib/flowy-core/src/lib.rs +++ b/frontend/rust-lib/flowy-core/src/lib.rs @@ -1,21 +1,23 @@ mod deps_resolve; pub mod module; -pub use flowy_net::get_client_server_configuration; - use crate::deps_resolve::*; - use flowy_client_ws::{listen_on_websocket, FlowyWebSocketConnect, NetworkType}; use flowy_database::manager::DatabaseManager; use flowy_document::entities::DocumentVersionPB; use flowy_document::{DocumentConfig, DocumentManager}; +use flowy_error::FlowyResult; use flowy_folder::entities::ViewDataFormatPB; use flowy_folder::{errors::FlowyError, manager::FolderManager}; +pub use flowy_net::get_client_server_configuration; use flowy_net::local_server::LocalServer; use flowy_net::ClientServerConfiguration; use flowy_task::{TaskDispatcher, TaskRunner}; -use flowy_user::services::{notifier::UserStatus, UserSession, UserSessionConfig}; +use flowy_user::event_map::UserStatusCallback; +use flowy_user::services::{UserSession, UserSessionConfig}; use lib_dispatch::prelude::*; use lib_dispatch::runtime::tokio_default_runtime; + +use lib_infra::future::{to_fut, Fut}; use module::make_plugins; pub use module::*; use std::time::Duration; @@ -27,6 +29,7 @@ use std::{ }, }; use tokio::sync::{broadcast, RwLock}; +use user_model::UserProfile; static INIT_LOG: AtomicBool = AtomicBool::new(false); @@ -83,9 +86,10 @@ fn create_log_filter(level: String, with_crates: Vec) -> String { filters.push(format!("flowy_folder={}", level)); filters.push(format!("flowy_user={}", level)); filters.push(format!("flowy_document={}", level)); - filters.push(format!("flowy_grid={}", level)); - filters.push(format!("flowy_collaboration={}", "info")); - filters.push(format!("flowy_notification={}", level)); + filters.push(format!("flowy_database={}", level)); + filters.push(format!("flowy_sync={}", "info")); + filters.push(format!("flowy_client_sync={}", "info")); + filters.push(format!("flowy_notification={}", "info")); filters.push(format!("lib_ot={}", level)); filters.push(format!("lib_ws={}", level)); filters.push(format!("lib_infra={}", level)); @@ -162,6 +166,21 @@ impl FlowySDK { ) }); + let user_status_listener = UserStatusListener { + document_manager: document_manager.clone(), + folder_manager: folder_manager.clone(), + grid_manager: grid_manager.clone(), + ws_conn: ws_conn.clone(), + config: config.clone(), + }; + let user_status_callback = UserStatusCallbackImpl { + listener: Arc::new(user_status_listener), + }; + let cloned_user_session = user_session.clone(); + runtime.block_on(async move { + cloned_user_session.clone().init(user_status_callback).await; + }); + let event_dispatcher = Arc::new(AFPluginDispatcher::construct(runtime, || { make_plugins( &ws_conn, @@ -171,16 +190,7 @@ impl FlowySDK { &document_manager, ) })); - - _start_listening( - &config, - &event_dispatcher, - &ws_conn, - &user_session, - &document_manager, - &folder_manager, - &grid_manager, - ); + _start_listening(&event_dispatcher, &ws_conn, &folder_manager); Self { config, @@ -201,36 +211,17 @@ impl FlowySDK { } fn _start_listening( - config: &FlowySDKConfig, event_dispatcher: &AFPluginDispatcher, ws_conn: &Arc, - user_session: &Arc, - document_manager: &Arc, folder_manager: &Arc, - grid_manager: &Arc, ) { - let subscribe_user_status = user_session.notifier.subscribe_user_status(); let subscribe_network_type = ws_conn.subscribe_network_ty(); let folder_manager = folder_manager.clone(); - let grid_manager = grid_manager.clone(); - let cloned_folder_manager = folder_manager.clone(); + let cloned_folder_manager = folder_manager; let ws_conn = ws_conn.clone(); - let user_session = user_session.clone(); - let document_manager = document_manager.clone(); - let config = config.clone(); event_dispatcher.spawn(async move { - user_session.init(); listen_on_websocket(ws_conn.clone()); - _listen_user_status( - config, - ws_conn.clone(), - subscribe_user_status, - document_manager, - folder_manager, - grid_manager, - ) - .await; }); event_dispatcher.spawn(async move { @@ -253,66 +244,6 @@ fn mk_local_server( } } -async fn _listen_user_status( - config: FlowySDKConfig, - ws_conn: Arc, - mut subscribe: broadcast::Receiver, - document_manager: Arc, - folder_manager: Arc, - grid_manager: Arc, -) { - while let Ok(status) = subscribe.recv().await { - let result = || async { - match status { - UserStatus::Login { token, user_id } => { - tracing::trace!("User did login"); - folder_manager.initialize(&user_id, &token).await?; - document_manager.initialize(&user_id).await?; - grid_manager.initialize(&user_id, &token).await?; - ws_conn.start(token, user_id).await?; - } - UserStatus::Logout { token: _, user_id } => { - tracing::trace!("User did logout"); - folder_manager.clear(&user_id).await; - ws_conn.stop().await; - } - UserStatus::Expired { token: _, user_id } => { - tracing::trace!("User session has been expired"); - folder_manager.clear(&user_id).await; - ws_conn.stop().await; - } - UserStatus::SignUp { profile, ret } => { - tracing::trace!("User did sign up"); - - let view_data_type = match config.document.version { - DocumentVersionPB::V0 => ViewDataFormatPB::DeltaFormat, - DocumentVersionPB::V1 => ViewDataFormatPB::TreeFormat, - }; - folder_manager - .initialize_with_new_user(&profile.id, &profile.token, view_data_type) - .await?; - document_manager - .initialize_with_new_user(&profile.id, &profile.token) - .await?; - - grid_manager - .initialize_with_new_user(&profile.id, &profile.token) - .await?; - - ws_conn.start(profile.token.clone(), profile.id.clone()).await?; - let _ = ret.send(()); - } - } - Ok::<(), FlowyError>(()) - }; - - match result().await { - Ok(_) => {} - Err(e) => tracing::error!("{}", e), - } - } -} - async fn _listen_network_status(mut subscribe: broadcast::Receiver, _core: Arc) { while let Ok(_new_type) = subscribe.recv().await { // core.network_state_changed(new_type); @@ -345,3 +276,75 @@ fn mk_user_session( let cloud_service = UserDepsResolver::resolve(local_server, server_config); Arc::new(UserSession::new(user_config, cloud_service)) } + +struct UserStatusListener { + document_manager: Arc, + folder_manager: Arc, + grid_manager: Arc, + ws_conn: Arc, + config: FlowySDKConfig, +} + +impl UserStatusListener { + async fn did_sign_in(&self, token: &str, user_id: &str) -> FlowyResult<()> { + self.folder_manager.initialize(user_id, token).await?; + self.document_manager.initialize(user_id).await?; + self.grid_manager.initialize(user_id, token).await?; + self.ws_conn.start(token.to_owned(), user_id.to_owned()).await?; + Ok(()) + } + + async fn did_sign_up(&self, user_profile: &UserProfile) -> FlowyResult<()> { + let view_data_type = match self.config.document.version { + DocumentVersionPB::V0 => ViewDataFormatPB::DeltaFormat, + DocumentVersionPB::V1 => ViewDataFormatPB::TreeFormat, + }; + self.folder_manager + .initialize_with_new_user(&user_profile.id, &user_profile.token, view_data_type) + .await?; + self.document_manager + .initialize_with_new_user(&user_profile.id, &user_profile.token) + .await?; + + self.grid_manager + .initialize_with_new_user(&user_profile.id, &user_profile.token) + .await?; + + self.ws_conn + .start(user_profile.token.clone(), user_profile.id.clone()) + .await?; + Ok(()) + } + + async fn did_expired(&self, _token: &str, user_id: &str) -> FlowyResult<()> { + self.folder_manager.clear(user_id).await; + self.ws_conn.stop().await; + Ok(()) + } +} + +struct UserStatusCallbackImpl { + listener: Arc, +} + +impl UserStatusCallback for UserStatusCallbackImpl { + fn did_sign_in(&self, token: &str, user_id: &str) -> Fut> { + let listener = self.listener.clone(); + let token = token.to_owned(); + let user_id = user_id.to_owned(); + to_fut(async move { listener.did_sign_in(&token, &user_id).await }) + } + + fn did_sign_up(&self, user_profile: &UserProfile) -> Fut> { + let listener = self.listener.clone(); + let user_profile = user_profile.clone(); + to_fut(async move { listener.did_sign_up(&user_profile).await }) + } + + fn did_expired(&self, token: &str, user_id: &str) -> Fut> { + let listener = self.listener.clone(); + let token = token.to_owned(); + let user_id = user_id.to_owned(); + to_fut(async move { listener.did_expired(&token, &user_id).await }) + } +} diff --git a/frontend/rust-lib/flowy-database/Cargo.toml b/frontend/rust-lib/flowy-database/Cargo.toml index 9c68ea0318..bd5759b880 100644 --- a/frontend/rust-lib/flowy-database/Cargo.toml +++ b/frontend/rust-lib/flowy-database/Cargo.toml @@ -56,7 +56,7 @@ flowy-database = { path = "", features = ["flowy_unit_test"]} flowy-codegen = { path = "../flowy-codegen"} [features] -default = [] +default = ["rev-sqlite"] rev-sqlite = ["flowy-sqlite"] dart = ["flowy-codegen/dart", "flowy-notification/dart"] ts = ["flowy-codegen/ts", "flowy-notification/ts"] diff --git a/frontend/rust-lib/flowy-document/Cargo.toml b/frontend/rust-lib/flowy-document/Cargo.toml index 47c2e3cdb2..9f20d7cea4 100644 --- a/frontend/rust-lib/flowy-document/Cargo.toml +++ b/frontend/rust-lib/flowy-document/Cargo.toml @@ -58,6 +58,7 @@ flowy-codegen = { path = "../flowy-codegen"} [features] +default = ["rev-sqlite"] sync = [] cloud_sync = ["sync"] rev-sqlite = ["flowy-sqlite"] diff --git a/frontend/rust-lib/flowy-document/src/services/migration.rs b/frontend/rust-lib/flowy-document/src/services/migration.rs index 7070387080..2e86f67710 100644 --- a/frontend/rust-lib/flowy-document/src/services/migration.rs +++ b/frontend/rust-lib/flowy-document/src/services/migration.rs @@ -65,7 +65,6 @@ impl DocumentMigration { // KV::set_bool(&key, true); - tracing::debug!("Run document v1 migration"); Ok(()) } } diff --git a/frontend/rust-lib/flowy-folder/Cargo.toml b/frontend/rust-lib/flowy-folder/Cargo.toml index 20ea2e4270..241c732e7f 100644 --- a/frontend/rust-lib/flowy-folder/Cargo.toml +++ b/frontend/rust-lib/flowy-folder/Cargo.toml @@ -48,7 +48,7 @@ flowy-codegen = { path = "../flowy-codegen"} [features] -default = [] +default = ["rev-sqlite"] sync = [] cloud_sync = ["sync"] rev-sqlite = ["flowy-sqlite", "flowy-folder/rev-sqlite"] diff --git a/frontend/rust-lib/flowy-folder/src/services/view/controller.rs b/frontend/rust-lib/flowy-folder/src/services/view/controller.rs index b6ff01446a..f184da6ae1 100644 --- a/frontend/rust-lib/flowy-folder/src/services/view/controller.rs +++ b/frontend/rust-lib/flowy-folder/src/services/view/controller.rs @@ -209,7 +209,7 @@ impl ViewController { let deleted_view = self .persistence .begin_transaction(|transaction| { - let view = transaction.read_view(&view_id)?; + let view = transaction.read_view(view_id)?; let views = read_belonging_views_on_local(&view.app_id, self.trash_controller.clone(), &transaction)?; let index = views @@ -223,12 +223,12 @@ impl ViewController { }) .await?; - send_notification(&view_id, FolderNotification::ViewMoveToTrash) + send_notification(view_id, FolderNotification::ViewMoveToTrash) .payload(deleted_view) .send(); - let processor = self.get_data_processor_from_view_id(&view_id).await?; - processor.close_view(&view_id).await?; + let processor = self.get_data_processor_from_view_id(view_id).await?; + processor.close_view(view_id).await?; Ok(()) } diff --git a/frontend/rust-lib/flowy-folder/src/services/view/event_handler.rs b/frontend/rust-lib/flowy-folder/src/services/view/event_handler.rs index a1dabf6cb1..a0e16c204e 100644 --- a/frontend/rust-lib/flowy-folder/src/services/view/event_handler.rs +++ b/frontend/rust-lib/flowy-folder/src/services/view/event_handler.rs @@ -62,7 +62,7 @@ pub(crate) async fn delete_view_handler( ) -> Result<(), FlowyError> { let params: RepeatedViewIdPB = data.into_inner(); for view_id in ¶ms.items { - let _ = view_controller.move_view_to_trash(&view_id).await; + let _ = view_controller.move_view_to_trash(view_id).await; } let trash = view_controller diff --git a/frontend/rust-lib/flowy-net/src/request.rs b/frontend/rust-lib/flowy-net/src/request.rs index 95e98c9428..f0617619ec 100644 --- a/frontend/rust-lib/flowy-net/src/request.rs +++ b/frontend/rust-lib/flowy-net/src/request.rs @@ -208,7 +208,7 @@ impl HttpRequestBuilder { self.response = Some(flowy_response.data); Ok(self) } - Some(error) => Err(FlowyError::new(error.code.into(), &error.msg)), + Some(error) => Err(FlowyError::new(error.code, &error.msg)), } } } @@ -231,7 +231,7 @@ async fn get_response_data(original: Response) -> Result { let response: HttpResponse = serde_json::from_slice(&bytes)?; match response.error { None => Ok(response.data), - Some(error) => Err(FlowyError::new(error.code.into(), &error.msg)), + Some(error) => Err(FlowyError::new(error.code, &error.msg)), } } else { Err(FlowyError::http().context(original)) diff --git a/frontend/rust-lib/flowy-revision/src/rev_manager.rs b/frontend/rust-lib/flowy-revision/src/rev_manager.rs index 465b6ada61..f4bf70f61b 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_manager.rs @@ -127,7 +127,7 @@ impl RevisionManager { } } - #[tracing::instrument(name = "revision_manager_initialize", level = "debug", skip_all, fields(deserializer, object_id, deserialize_revisions) err)] + #[tracing::instrument(name = "revision_manager_initialize", level = "trace", skip_all, fields(deserializer, object_id, deserialize_revisions) err)] pub async fn initialize(&mut self, _cloud: Option>) -> FlowyResult where B: RevisionObjectDeserializer, diff --git a/frontend/rust-lib/flowy-test/Cargo.toml b/frontend/rust-lib/flowy-test/Cargo.toml index 6cbe07f4e5..7038d031c5 100644 --- a/frontend/rust-lib/flowy-test/Cargo.toml +++ b/frontend/rust-lib/flowy-test/Cargo.toml @@ -6,11 +6,11 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -flowy-core = { path = "../flowy-core", default-features = false } +flowy-core = { path = "../flowy-core" } flowy-user = { path = "../flowy-user"} flowy-net = { path = "../flowy-net"} -flowy-folder = { path = "../flowy-folder", default-features = false} -flowy-document= { path = "../flowy-document", default-features = false} +flowy-folder = { path = "../flowy-folder" } +flowy-document= { path = "../flowy-document" } lib-dispatch = { path = "../lib-dispatch" } flowy-client-sync = { path = "../flowy-client-sync"} diff --git a/frontend/rust-lib/flowy-user/Cargo.toml b/frontend/rust-lib/flowy-user/Cargo.toml index 049832d7f3..2a2bced168 100644 --- a/frontend/rust-lib/flowy-user/Cargo.toml +++ b/frontend/rust-lib/flowy-user/Cargo.toml @@ -34,6 +34,7 @@ flowy-test = { path = "../flowy-test" } nanoid = "0.4.0" [features] +default = ["rev-sqlite"] rev-sqlite = ["flowy-sqlite"] dart = ["flowy-codegen/dart", "flowy-notification/dart"] ts = ["flowy-codegen/ts", "flowy-notification/ts"] diff --git a/frontend/rust-lib/flowy-user/src/entities/user_profile.rs b/frontend/rust-lib/flowy-user/src/entities/user_profile.rs index 30a3933642..ba5acbf4a4 100644 --- a/frontend/rust-lib/flowy-user/src/entities/user_profile.rs +++ b/frontend/rust-lib/flowy-user/src/entities/user_profile.rs @@ -1,7 +1,7 @@ use crate::errors::ErrorCode; use flowy_derive::ProtoBuf; use std::convert::TryInto; -use user_model::{UpdateUserProfileParams, UserEmail, UserIcon, UserId, UserName, UserPassword}; +use user_model::{UpdateUserProfileParams, UserEmail, UserIcon, UserId, UserName, UserPassword, UserProfile}; #[derive(Default, ProtoBuf)] pub struct UserTokenPB { @@ -33,6 +33,18 @@ pub struct UserProfilePB { pub icon_url: String, } +impl std::convert::From for UserProfilePB { + fn from(user_profile: UserProfile) -> Self { + Self { + id: user_profile.id, + email: user_profile.email, + name: user_profile.name, + token: user_profile.token, + icon_url: user_profile.icon_url, + } + } +} + #[derive(ProtoBuf, Default)] pub struct UpdateUserProfilePayloadPB { #[pb(index = 1)] diff --git a/frontend/rust-lib/flowy-user/src/event_map.rs b/frontend/rust-lib/flowy-user/src/event_map.rs index 58f3d29f0c..6280638064 100644 --- a/frontend/rust-lib/flowy-user/src/event_map.rs +++ b/frontend/rust-lib/flowy-user/src/event_map.rs @@ -1,9 +1,10 @@ use crate::entities::UserProfilePB; use crate::{errors::FlowyError, handlers::*, services::UserSession}; use lib_dispatch::prelude::*; -use lib_infra::future::FutureResult; + +use lib_infra::future::{Fut, FutureResult}; use std::sync::Arc; -use user_model::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams}; +use user_model::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams, UserProfile}; pub fn init(user_session: Arc) -> AFPlugin { AFPlugin::new() @@ -21,6 +22,12 @@ pub fn init(user_session: Arc) -> AFPlugin { .event(UserEvent::GetUserSetting, get_user_setting) } +pub trait UserStatusCallback: Send + Sync + 'static { + fn did_sign_in(&self, token: &str, user_id: &str) -> Fut>; + fn did_sign_up(&self, user_profile: &UserProfile) -> Fut>; + fn did_expired(&self, token: &str, user_id: &str) -> Fut>; +} + pub trait UserCloudService: Send + Sync { fn sign_up(&self, params: SignUpParams) -> FutureResult; fn sign_in(&self, params: SignInParams) -> FutureResult; @@ -31,6 +38,7 @@ pub trait UserCloudService: Send + Sync { } use flowy_derive::{Flowy_Event, ProtoBuf_Enum}; +use flowy_error::FlowyResult; use strum_macros::Display; #[derive(Clone, Copy, PartialEq, Eq, Debug, Display, Hash, ProtoBuf_Enum, Flowy_Event)] diff --git a/frontend/rust-lib/flowy-user/src/handlers/auth_handler.rs b/frontend/rust-lib/flowy-user/src/handlers/auth_handler.rs index baa1f62c6c..c9ee47db31 100644 --- a/frontend/rust-lib/flowy-user/src/handlers/auth_handler.rs +++ b/frontend/rust-lib/flowy-user/src/handlers/auth_handler.rs @@ -12,7 +12,7 @@ pub async fn sign_in( session: AFPluginState>, ) -> DataResult { let params: SignInParams = data.into_inner().try_into()?; - let user_profile = session.sign_in(params).await?; + let user_profile: UserProfilePB = session.sign_in(params).await?.into(); data_result(user_profile) } @@ -31,7 +31,7 @@ pub async fn sign_up( session: AFPluginState>, ) -> DataResult { let params: SignUpParams = data.into_inner().try_into()?; - let user_profile = session.sign_up(params).await?; + let user_profile: UserProfilePB = session.sign_up(params).await?.into(); data_result(user_profile) } diff --git a/frontend/rust-lib/flowy-user/src/handlers/user_handler.rs b/frontend/rust-lib/flowy-user/src/handlers/user_handler.rs index c1304e0563..fb26d3015b 100644 --- a/frontend/rust-lib/flowy-user/src/handlers/user_handler.rs +++ b/frontend/rust-lib/flowy-user/src/handlers/user_handler.rs @@ -15,7 +15,7 @@ pub async fn init_user_handler(session: AFPluginState>) -> Resu #[tracing::instrument(level = "debug", skip(session))] pub async fn check_user_handler(session: AFPluginState>) -> DataResult { - let user_profile = session.check_user().await?; + let user_profile: UserProfilePB = session.check_user().await?.into(); data_result(user_profile) } @@ -23,7 +23,7 @@ pub async fn check_user_handler(session: AFPluginState>) -> Dat pub async fn get_user_profile_handler( session: AFPluginState>, ) -> DataResult { - let user_profile = session.get_user_profile().await?; + let user_profile: UserProfilePB = session.get_user_profile().await?.into(); data_result(user_profile) } diff --git a/frontend/rust-lib/flowy-user/src/services/database.rs b/frontend/rust-lib/flowy-user/src/services/database.rs index cc13b61e41..5c621e87b3 100644 --- a/frontend/rust-lib/flowy-user/src/services/database.rs +++ b/frontend/rust-lib/flowy-user/src/services/database.rs @@ -1,4 +1,3 @@ -use crate::entities::UserProfilePB; use flowy_error::{ErrorCode, FlowyError}; use flowy_sqlite::ConnectionPool; use flowy_sqlite::{schema::user_table, DBConnection, Database}; @@ -6,7 +5,7 @@ use lazy_static::lazy_static; use parking_lot::RwLock; use std::path::PathBuf; use std::{collections::HashMap, sync::Arc, time::Duration}; -use user_model::{SignInResponse, SignUpResponse, UpdateUserProfileParams}; +use user_model::{SignInResponse, SignUpResponse, UpdateUserProfileParams, UserProfile}; pub struct UserDB { db_dir: String, @@ -117,9 +116,9 @@ impl std::convert::From for UserTable { } } -impl std::convert::From for UserProfilePB { +impl std::convert::From for UserProfile { fn from(table: UserTable) -> Self { - UserProfilePB { + UserProfile { id: table.id, email: table.email, name: table.name, diff --git a/frontend/rust-lib/flowy-user/src/services/mod.rs b/frontend/rust-lib/flowy-user/src/services/mod.rs index 7ad18349c9..11b6128fd7 100644 --- a/frontend/rust-lib/flowy-user/src/services/mod.rs +++ b/frontend/rust-lib/flowy-user/src/services/mod.rs @@ -1,4 +1,3 @@ pub mod database; -pub mod notifier; mod user_session; pub use user_session::*; diff --git a/frontend/rust-lib/flowy-user/src/services/notifier.rs b/frontend/rust-lib/flowy-user/src/services/notifier.rs deleted file mode 100644 index f52ad56422..0000000000 --- a/frontend/rust-lib/flowy-user/src/services/notifier.rs +++ /dev/null @@ -1,64 +0,0 @@ -use crate::entities::UserProfilePB; -use tokio::sync::{broadcast, mpsc}; - -#[derive(Clone)] -pub enum UserStatus { - Login { - token: String, - user_id: String, - }, - Logout { - token: String, - user_id: String, - }, - Expired { - token: String, - user_id: String, - }, - SignUp { - profile: UserProfilePB, - ret: mpsc::Sender<()>, - }, -} - -pub struct UserNotifier { - user_status_notifier: broadcast::Sender, -} - -impl std::default::Default for UserNotifier { - fn default() -> Self { - let (user_status_notifier, _) = broadcast::channel(10); - UserNotifier { user_status_notifier } - } -} - -impl UserNotifier { - pub(crate) fn new() -> Self { - UserNotifier::default() - } - - pub(crate) fn notify_login(&self, token: &str, user_id: &str) { - let _ = self.user_status_notifier.send(UserStatus::Login { - token: token.to_owned(), - user_id: user_id.to_owned(), - }); - } - - pub(crate) fn notify_sign_up(&self, ret: mpsc::Sender<()>, user_profile: &UserProfilePB) { - let _ = self.user_status_notifier.send(UserStatus::SignUp { - profile: user_profile.clone(), - ret, - }); - } - - pub(crate) fn notify_logout(&self, token: &str, user_id: &str) { - let _ = self.user_status_notifier.send(UserStatus::Logout { - token: token.to_owned(), - user_id: user_id.to_owned(), - }); - } - - pub fn subscribe_user_status(&self) -> broadcast::Receiver { - self.user_status_notifier.subscribe() - } -} diff --git a/frontend/rust-lib/flowy-user/src/services/user_session.rs b/frontend/rust-lib/flowy-user/src/services/user_session.rs index 567f96006b..9d060df3c9 100644 --- a/frontend/rust-lib/flowy-user/src/services/user_session.rs +++ b/frontend/rust-lib/flowy-user/src/services/user_session.rs @@ -1,12 +1,10 @@ use crate::entities::{UserProfilePB, UserSettingPB}; +use crate::event_map::UserStatusCallback; use crate::{ errors::{ErrorCode, FlowyError}, event_map::UserCloudService, notification::*, - services::{ - database::{UserDB, UserTable, UserTableChangeset}, - notifier::UserNotifier, - }, + services::database::{UserDB, UserTable, UserTableChangeset}, }; use flowy_sqlite::ConnectionPool; use flowy_sqlite::{ @@ -17,8 +15,8 @@ use flowy_sqlite::{ }; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use tokio::sync::mpsc; -use user_model::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams}; +use tokio::sync::RwLock; +use user_model::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams, UserProfile}; pub struct UserSessionConfig { root_dir: String, @@ -43,25 +41,26 @@ pub struct UserSession { database: UserDB, config: UserSessionConfig, cloud_service: Arc, - pub notifier: UserNotifier, + user_status_callback: RwLock>>, } impl UserSession { pub fn new(config: UserSessionConfig, cloud_service: Arc) -> Self { let db = UserDB::new(&config.root_dir); - let notifier = UserNotifier::new(); + let user_status_callback = RwLock::new(None); Self { database: db, config, cloud_service, - notifier, + user_status_callback, } } - pub fn init(&self) { + pub async fn init(&self, user_status_callback: C) { if let Ok(session) = self.get_session() { - self.notifier.notify_login(&session.token, &session.user_id); + let _ = user_status_callback.did_sign_in(&session.token, &session.user_id).await; } + *self.user_status_callback.write().await = Some(Arc::new(user_status_callback)); } pub fn db_connection(&self) -> Result { @@ -81,11 +80,13 @@ impl UserSession { } #[tracing::instrument(level = "debug", skip(self))] - pub async fn sign_in(&self, params: SignInParams) -> Result { + pub async fn sign_in(&self, params: SignInParams) -> Result { if self.is_user_login(¶ms.email) { match self.get_user_profile().await { Ok(profile) => { - send_sign_in_notification().payload(profile.clone()).send(); + send_sign_in_notification() + .payload::(profile.clone().into()) + .send(); Ok(profile) } Err(err) => Err(err), @@ -94,16 +95,24 @@ impl UserSession { let resp = self.cloud_service.sign_in(params).await?; let session: Session = resp.clone().into(); self.set_session(Some(session))?; - let user_table = self.save_user(resp.into()).await?; - let user_profile: UserProfilePB = user_table.into(); - self.notifier.notify_login(&user_profile.token, &user_profile.id); - send_sign_in_notification().payload(user_profile.clone()).send(); + let user_profile: UserProfile = self.save_user(resp.into()).await?.into(); + let _ = self + .user_status_callback + .read() + .await + .as_ref() + .unwrap() + .did_sign_in(&user_profile.token, &user_profile.id) + .await; + send_sign_in_notification() + .payload::(user_profile.clone().into()) + .send(); Ok(user_profile) } } #[tracing::instrument(level = "debug", skip(self))] - pub async fn sign_up(&self, params: SignUpParams) -> Result { + pub async fn sign_up(&self, params: SignUpParams) -> Result { if self.is_user_login(¶ms.email) { self.get_user_profile().await } else { @@ -111,11 +120,15 @@ impl UserSession { let session: Session = resp.clone().into(); self.set_session(Some(session))?; let user_table = self.save_user(resp.into()).await?; - let user_profile: UserProfilePB = user_table.into(); - let (ret, mut tx) = mpsc::channel(1); - self.notifier.notify_sign_up(ret, &user_profile); - - let _ = tx.recv().await; + let user_profile: UserProfile = user_table.into(); + let _ = self + .user_status_callback + .read() + .await + .as_ref() + .unwrap() + .did_sign_up(&user_profile) + .await; Ok(user_profile) } } @@ -127,7 +140,14 @@ impl UserSession { diesel::delete(dsl::user_table.filter(dsl::id.eq(&session.user_id))).execute(&*(self.db_connection()?))?; self.database.close_user_db(&session.user_id)?; self.set_session(None)?; - self.notifier.notify_logout(&session.token, &session.user_id); + let _ = self + .user_status_callback + .read() + .await + .as_ref() + .unwrap() + .did_expired(&session.token, &session.user_id) + .await; self.sign_out_on_server(&session.token).await?; Ok(()) @@ -140,8 +160,9 @@ impl UserSession { diesel_update_table!(user_table, changeset, &*self.db_connection()?); let user_profile = self.get_user_profile().await?; + let profile_pb: UserProfilePB = user_profile.into(); send_notification(&session.token, UserNotification::UserProfileUpdated) - .payload(user_profile) + .payload(profile_pb) .send(); self.update_user_on_server(&session.token, params).await?; Ok(()) @@ -151,7 +172,7 @@ impl UserSession { Ok(()) } - pub async fn check_user(&self) -> Result { + pub async fn check_user(&self) -> Result { let (user_id, token) = self.get_session()?.into_part(); let user = dsl::user_table @@ -162,7 +183,7 @@ impl UserSession { Ok(user.into()) } - pub async fn get_user_profile(&self) -> Result { + pub async fn get_user_profile(&self) -> Result { let (user_id, token) = self.get_session()?.into_part(); let user = dsl::user_table .filter(user_table::id.eq(&user_id))