From f9b552395b4b209ad971757dafa52c9f9f2a6514 Mon Sep 17 00:00:00 2001 From: appflowy Date: Fri, 14 Jan 2022 09:09:25 +0800 Subject: [PATCH] trait for flowy-core persistence --- .../infrastructure/repos/app_repo.dart | 2 +- .../infrastructure/repos/helper.dart | 2 +- frontend/rust-lib/flowy-core/src/context.rs | 16 +- frontend/rust-lib/flowy-core/src/module.rs | 13 +- .../flowy-core/src/services/app/controller.rs | 194 ++++++-------- .../src/services/app/event_handler.rs | 8 +- .../flowy-core/src/services/app/mod.rs | 1 - .../rust-lib/flowy-core/src/services/mod.rs | 1 + .../src/services/persistence/mod.rs | 76 ++++++ .../version_1/app_sql.rs} | 18 +- .../src/services/persistence/version_1/mod.rs | 5 + .../version_1/trash_sql.rs} | 3 +- .../services/persistence/version_1/v1_impl.rs | 163 ++++++++++++ .../version_1/view_sql.rs} | 7 +- .../version_1/workspace_sql.rs} | 18 +- .../src/services/trash/controller.rs | 129 +++++---- .../src/services/trash/event_handler.rs | 3 +- .../flowy-core/src/services/trash/mod.rs | 1 - .../src/services/view/controller.rs | 250 ++++++++---------- .../src/services/view/event_handler.rs | 4 +- .../flowy-core/src/services/view/mod.rs | 1 - .../src/services/workspace/controller.rs | 118 +++------ .../src/services/workspace/event_handler.rs | 42 ++- .../flowy-core/src/services/workspace/mod.rs | 1 - .../src/entities/trash.rs | 14 +- 25 files changed, 618 insertions(+), 472 deletions(-) create mode 100644 frontend/rust-lib/flowy-core/src/services/persistence/mod.rs rename frontend/rust-lib/flowy-core/src/services/{app/sql.rs => persistence/version_1/app_sql.rs} (95%) create mode 100644 frontend/rust-lib/flowy-core/src/services/persistence/version_1/mod.rs rename frontend/rust-lib/flowy-core/src/services/{trash/sql.rs => persistence/version_1/trash_sql.rs} (99%) create mode 100644 frontend/rust-lib/flowy-core/src/services/persistence/version_1/v1_impl.rs rename frontend/rust-lib/flowy-core/src/services/{view/sql.rs => persistence/version_1/view_sql.rs} (98%) rename frontend/rust-lib/flowy-core/src/services/{workspace/sql.rs => persistence/version_1/workspace_sql.rs} (89%) diff --git a/frontend/app_flowy/lib/workspace/infrastructure/repos/app_repo.dart b/frontend/app_flowy/lib/workspace/infrastructure/repos/app_repo.dart index d7c344674d..8c647baad1 100644 --- a/frontend/app_flowy/lib/workspace/infrastructure/repos/app_repo.dart +++ b/frontend/app_flowy/lib/workspace/infrastructure/repos/app_repo.dart @@ -8,7 +8,7 @@ import 'package:flowy_sdk/protobuf/dart-notify/subject.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-core-data-model/app.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-core-data-model/view.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; -import 'package:flowy_sdk/protobuf/flowy-core/observable.pb.dart'; +import 'package:flowy_sdk/protobuf/flowy-core/dart_notification.pb.dart'; import 'package:flowy_sdk/rust_stream.dart'; import 'helper.dart'; diff --git a/frontend/app_flowy/lib/workspace/infrastructure/repos/helper.dart b/frontend/app_flowy/lib/workspace/infrastructure/repos/helper.dart index 2894bdd771..30c278e49d 100644 --- a/frontend/app_flowy/lib/workspace/infrastructure/repos/helper.dart +++ b/frontend/app_flowy/lib/workspace/infrastructure/repos/helper.dart @@ -3,7 +3,7 @@ import 'package:flowy_sdk/protobuf/dart-notify/protobuf.dart'; import 'package:flowy_sdk/protobuf/flowy-user/protobuf.dart'; import 'package:dartz/dartz.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; -import 'package:flowy_sdk/protobuf/flowy-core/observable.pb.dart'; +import 'package:flowy_sdk/protobuf/flowy-core/dart_notification.pb.dart'; typedef UserNotificationCallback = void Function(UserNotification, Either); diff --git a/frontend/rust-lib/flowy-core/src/context.rs b/frontend/rust-lib/flowy-core/src/context.rs index 4222f60bae..6760d23380 100644 --- a/frontend/rust-lib/flowy-core/src/context.rs +++ b/frontend/rust-lib/flowy-core/src/context.rs @@ -9,8 +9,14 @@ use crate::{ dart_notification::{send_dart_notification, WorkspaceNotification}, entities::workspace::RepeatedWorkspace, errors::{FlowyError, FlowyResult}, - module::{WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser}, - services::{AppController, TrashController, ViewController, WorkspaceController}, + module::{WorkspaceCloudService, WorkspaceUser}, + services::{ + persistence::FlowyCorePersistence, + AppController, + TrashController, + ViewController, + WorkspaceController, + }, }; lazy_static! { @@ -20,7 +26,7 @@ lazy_static! { pub struct CoreContext { pub user: Arc, pub(crate) cloud_service: Arc, - pub(crate) database: Arc, + pub(crate) persistence: Arc, pub workspace_controller: Arc, pub(crate) app_controller: Arc, pub(crate) view_controller: Arc, @@ -31,7 +37,7 @@ impl CoreContext { pub(crate) fn new( user: Arc, cloud_service: Arc, - database: Arc, + persistence: Arc, workspace_controller: Arc, app_controller: Arc, view_controller: Arc, @@ -44,7 +50,7 @@ impl CoreContext { Self { user, cloud_service, - database, + persistence, workspace_controller, app_controller, view_controller, diff --git a/frontend/rust-lib/flowy-core/src/module.rs b/frontend/rust-lib/flowy-core/src/module.rs index 06f3dc94e4..29943df707 100644 --- a/frontend/rust-lib/flowy-core/src/module.rs +++ b/frontend/rust-lib/flowy-core/src/module.rs @@ -10,6 +10,7 @@ use crate::{ event::WorkspaceEvent, services::{ app::event_handler::*, + persistence::FlowyCorePersistence, trash::event_handler::*, view::event_handler::*, workspace::event_handler::*, @@ -49,15 +50,17 @@ pub fn init_core( flowy_document: Arc, cloud_service: Arc, ) -> Arc { + let persistence = Arc::new(FlowyCorePersistence::new(database.clone())); + let trash_controller = Arc::new(TrashController::new( - database.clone(), + persistence.clone(), cloud_service.clone(), user.clone(), )); let view_controller = Arc::new(ViewController::new( user.clone(), - database.clone(), + persistence.clone(), cloud_service.clone(), trash_controller.clone(), flowy_document, @@ -65,14 +68,14 @@ pub fn init_core( let app_controller = Arc::new(AppController::new( user.clone(), - database.clone(), + persistence.clone(), trash_controller.clone(), cloud_service.clone(), )); let workspace_controller = Arc::new(WorkspaceController::new( user.clone(), - database.clone(), + persistence.clone(), trash_controller.clone(), cloud_service.clone(), )); @@ -80,7 +83,7 @@ pub fn init_core( Arc::new(CoreContext::new( user, cloud_service, - database, + persistence, workspace_controller, app_controller, view_controller, diff --git a/frontend/rust-lib/flowy-core/src/services/app/controller.rs b/frontend/rust-lib/flowy-core/src/services/app/controller.rs index 34846e0cb0..5f0a5cfecf 100644 --- a/frontend/rust-lib/flowy-core/src/services/app/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/app/controller.rs @@ -5,35 +5,35 @@ use crate::{ trash::TrashType, }, errors::*, - module::{WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser}, + module::{WorkspaceCloudService, WorkspaceUser}, services::{ - app::sql::{AppChangeset, AppTable, AppTableSql}, + persistence::{AppChangeset, FlowyCorePersistence, FlowyCorePersistenceTransaction}, TrashController, TrashEvent, }, }; -use flowy_database::SqliteConnection; + use futures::{FutureExt, StreamExt}; use std::{collections::HashSet, sync::Arc}; pub(crate) struct AppController { user: Arc, - database: Arc, - trash_can: Arc, + persistence: Arc, + trash_controller: Arc, cloud_service: Arc, } impl AppController { pub(crate) fn new( user: Arc, - database: Arc, + persistence: Arc, trash_can: Arc, cloud_service: Arc, ) -> Self { Self { user, - database, - trash_can, + persistence, + trash_controller: trash_can, cloud_service, } } @@ -50,62 +50,52 @@ impl AppController { } pub(crate) async fn create_app_on_local(&self, app: App) -> Result { - let conn = &*self.database.db_connection()?; - conn.immediate_transaction::<_, FlowyError, _>(|| { - let _ = self.save_app(app.clone(), &*conn)?; - let _ = notify_apps_changed(&app.workspace_id, self.trash_can.clone(), conn)?; + let _ = self.persistence.begin_transaction(|transaction| { + let _ = transaction.create_app(app.clone())?; + let _ = notify_apps_changed(&app.workspace_id, self.trash_controller.clone(), &transaction)?; Ok(()) })?; - Ok(app) } - pub(crate) fn save_app(&self, app: App, conn: &SqliteConnection) -> Result<(), FlowyError> { - let _ = AppTableSql::create_app(app, &*conn)?; - Ok(()) - } - pub(crate) async fn read_app(&self, params: AppId) -> Result { - let conn = self.database.db_connection()?; - let app_table = AppTableSql::read_app(¶ms.app_id, &*conn)?; - - let trash_ids = self.trash_can.read_trash_ids(&conn)?; - if trash_ids.contains(&app_table.id) { - return Err(FlowyError::record_not_found()); - } - + let app = self.persistence.begin_transaction(|transaction| { + let app = transaction.read_app(¶ms.app_id)?; + let trash_ids = self.trash_controller.read_trash_ids(&transaction)?; + if trash_ids.contains(&app.id) { + return Err(FlowyError::record_not_found()); + } + Ok(app) + })?; let _ = self.read_app_on_server(params)?; - Ok(app_table.into()) + Ok(app) } pub(crate) async fn update_app(&self, params: UpdateAppParams) -> Result<(), FlowyError> { let changeset = AppChangeset::new(params.clone()); let app_id = changeset.id.clone(); - let conn = &*self.database.db_connection()?; - conn.immediate_transaction::<_, FlowyError, _>(|| { - let _ = AppTableSql::update_app(changeset, conn)?; - let app: App = AppTableSql::read_app(&app_id, conn)?.into(); - send_dart_notification(&app_id, WorkspaceNotification::AppUpdated) - .payload(app) - .send(); - Ok(()) - })?; + let app = self.persistence.begin_transaction(|transaction| { + let _ = transaction.update_app(changeset)?; + let app = transaction.read_app(&app_id)?; + Ok(app) + })?; + send_dart_notification(&app_id, WorkspaceNotification::AppUpdated) + .payload(app) + .send(); let _ = self.update_app_on_server(params)?; Ok(()) } - pub(crate) fn read_app_tables(&self, ids: Vec) -> Result, FlowyError> { - let conn = &*self.database.db_connection()?; - let mut app_tables = vec![]; - conn.immediate_transaction::<_, FlowyError, _>(|| { - for app_id in ids { - app_tables.push(AppTableSql::read_app(&app_id, conn)?); + pub(crate) fn read_local_apps(&self, ids: Vec) -> Result, FlowyError> { + let apps = self.persistence.begin_transaction(|transaction| { + let mut apps = vec![]; + for id in ids { + apps.push(transaction.read_app(&id)?); } - Ok(()) + Ok(apps) })?; - - Ok(app_tables) + Ok(apps) } } @@ -137,23 +127,18 @@ impl AppController { fn read_app_on_server(&self, params: AppId) -> Result<(), FlowyError> { let token = self.user.token()?; let server = self.cloud_service.clone(); - let pool = self.database.db_pool()?; + let persistence = self.persistence.clone(); tokio::spawn(async move { - // Opti: retry? match server.read_app(&token, params).await { - Ok(Some(app)) => match pool.get() { - Ok(conn) => { - let result = AppTableSql::create_app(app.clone(), &*conn); - match result { - Ok(_) => { - send_dart_notification(&app.id, WorkspaceNotification::AppUpdated) - .payload(app) - .send(); - }, - Err(e) => log::error!("Save app failed: {:?}", e), - } - }, - Err(e) => log::error!("Require db connection failed: {:?}", e), + Ok(Some(app)) => { + match persistence.begin_transaction(|transaction| transaction.create_app(app.clone())) { + Ok(_) => { + send_dart_notification(&app.id, WorkspaceNotification::AppUpdated) + .payload(app) + .send(); + }, + Err(e) => log::error!("Save app failed: {:?}", e), + } }, Ok(None) => {}, Err(e) => log::error!("Read app failed: {:?}", e), @@ -163,9 +148,9 @@ impl AppController { } fn listen_trash_controller_event(&self) { - let mut rx = self.trash_can.subscribe(); - let database = self.database.clone(); - let trash_can = self.trash_can.clone(); + let mut rx = self.trash_controller.subscribe(); + let persistence = self.persistence.clone(); + let trash_controller = self.trash_controller.clone(); let _ = tokio::spawn(async move { loop { let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move { @@ -175,76 +160,69 @@ impl AppController { } })); if let Some(event) = stream.next().await { - handle_trash_event(database.clone(), trash_can.clone(), event).await + handle_trash_event(persistence.clone(), trash_controller.clone(), event).await } } }); } } -#[tracing::instrument(level = "trace", skip(database, trash_can))] -async fn handle_trash_event(database: Arc, trash_can: Arc, event: TrashEvent) { - let db_result = database.db_connection(); +#[tracing::instrument(level = "trace", skip(persistence, trash_controller))] +async fn handle_trash_event( + persistence: Arc, + trash_controller: Arc, + event: TrashEvent, +) { match event { TrashEvent::NewTrash(identifiers, ret) | TrashEvent::Putback(identifiers, ret) => { - let result = || { - let conn = &*db_result?; - let _ = conn.immediate_transaction::<_, FlowyError, _>(|| { - for identifier in identifiers.items { - let app_table = AppTableSql::read_app(&identifier.id, conn)?; - let _ = notify_apps_changed(&app_table.workspace_id, trash_can.clone(), conn)?; - } - Ok(()) - })?; - Ok::<(), FlowyError>(()) - }; - let _ = ret.send(result()).await; + let result = persistence.begin_transaction(|transaction| { + for identifier in identifiers.items { + let app = transaction.read_app(&identifier.id)?; + let _ = notify_apps_changed(&app.workspace_id, trash_controller.clone(), &transaction)?; + } + Ok(()) + }); + let _ = ret.send(result).await; }, TrashEvent::Delete(identifiers, ret) => { - let result = || { - let conn = &*db_result?; - let _ = conn.immediate_transaction::<_, FlowyError, _>(|| { - let mut notify_ids = HashSet::new(); - for identifier in identifiers.items { - let app_table = AppTableSql::read_app(&identifier.id, conn)?; - let _ = AppTableSql::delete_app(&identifier.id, conn)?; - notify_ids.insert(app_table.workspace_id); - } + let result = persistence.begin_transaction(|transaction| { + let mut notify_ids = HashSet::new(); + for identifier in identifiers.items { + let app = transaction.read_app(&identifier.id)?; + let _ = transaction.delete_app(&identifier.id)?; + notify_ids.insert(app.workspace_id); + } - for notify_id in notify_ids { - let _ = notify_apps_changed(¬ify_id, trash_can.clone(), conn)?; - } - Ok(()) - })?; - Ok::<(), FlowyError>(()) - }; - let _ = ret.send(result()).await; + for notify_id in notify_ids { + let _ = notify_apps_changed(¬ify_id, trash_controller.clone(), &transaction)?; + } + Ok(()) + }); + let _ = ret.send(result).await; }, } } -#[tracing::instrument(skip(workspace_id, trash_can, conn), err)] -fn notify_apps_changed( +#[tracing::instrument(skip(workspace_id, trash_controller, transaction), err)] +fn notify_apps_changed<'a>( workspace_id: &str, - trash_can: Arc, - conn: &SqliteConnection, + trash_controller: Arc, + transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a), ) -> FlowyResult<()> { - let repeated_app = read_local_workspace_apps(workspace_id, trash_can, conn)?; + let repeated_app = read_local_workspace_apps(workspace_id, trash_controller, transaction)?; send_dart_notification(workspace_id, WorkspaceNotification::WorkspaceAppsChanged) .payload(repeated_app) .send(); Ok(()) } -pub fn read_local_workspace_apps( +pub fn read_local_workspace_apps<'a>( workspace_id: &str, trash_controller: Arc, - conn: &SqliteConnection, + transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a), ) -> Result { - let mut app_tables = AppTableSql::read_workspace_apps(workspace_id, false, conn)?; - let trash_ids = trash_controller.read_trash_ids(conn)?; - app_tables.retain(|app_table| !trash_ids.contains(&app_table.id)); - - let apps = app_tables.into_iter().map(|table| table.into()).collect::>(); + let mut apps = transaction.read_workspace_apps(workspace_id)?; + let trash_ids = trash_controller.read_trash_ids(transaction)?; + apps.retain(|app| !trash_ids.contains(&app.id)); Ok(RepeatedApp { items: apps }) } diff --git a/frontend/rust-lib/flowy-core/src/services/app/event_handler.rs b/frontend/rust-lib/flowy-core/src/services/app/event_handler.rs index 4cf1a6dd42..94e503a035 100644 --- a/frontend/rust-lib/flowy-core/src/services/app/event_handler.rs +++ b/frontend/rust-lib/flowy-core/src/services/app/event_handler.rs @@ -21,14 +21,14 @@ pub(crate) async fn create_app_handler( pub(crate) async fn delete_app_handler( data: Data, - view_controller: Unit>, + app_controller: Unit>, trash_controller: Unit>, ) -> Result<(), FlowyError> { let params: AppId = data.into_inner().try_into()?; - let trash = view_controller - .read_app_tables(vec![params.app_id])? + let trash = app_controller + .read_local_apps(vec![params.app_id])? .into_iter() - .map(|view_table| view_table.into()) + .map(|app| app.into()) .collect::>(); let _ = trash_controller.add(trash).await?; diff --git a/frontend/rust-lib/flowy-core/src/services/app/mod.rs b/frontend/rust-lib/flowy-core/src/services/app/mod.rs index 8bf3cc04ff..e854517f17 100644 --- a/frontend/rust-lib/flowy-core/src/services/app/mod.rs +++ b/frontend/rust-lib/flowy-core/src/services/app/mod.rs @@ -1,3 +1,2 @@ pub mod controller; pub mod event_handler; -pub(crate) mod sql; diff --git a/frontend/rust-lib/flowy-core/src/services/mod.rs b/frontend/rust-lib/flowy-core/src/services/mod.rs index 20e280ae63..9481a853ad 100644 --- a/frontend/rust-lib/flowy-core/src/services/mod.rs +++ b/frontend/rust-lib/flowy-core/src/services/mod.rs @@ -4,6 +4,7 @@ pub(crate) use view::controller::*; pub(crate) use workspace::controller::*; pub(crate) mod app; +pub(crate) mod persistence; pub(crate) mod trash; pub(crate) mod view; pub(crate) mod workspace; diff --git a/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs b/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs new file mode 100644 index 0000000000..964ea2b1c1 --- /dev/null +++ b/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs @@ -0,0 +1,76 @@ +mod version_1; + +use std::sync::Arc; +pub use version_1::{app_sql::*, trash_sql::*, v1_impl::V1Transaction, view_sql::*, workspace_sql::*}; + +use crate::module::WorkspaceDatabase; +use flowy_core_data_model::entities::{ + app::App, + prelude::RepeatedTrash, + trash::Trash, + view::View, + workspace::Workspace, +}; +use flowy_error::{FlowyError, FlowyResult}; + +pub trait FlowyCorePersistenceTransaction { + fn create_workspace(&self, user_id: &str, workspace: Workspace) -> FlowyResult<()>; + fn read_workspaces(&self, user_id: &str, workspace_id: Option) -> FlowyResult>; + fn update_workspace(&self, changeset: WorkspaceChangeset) -> FlowyResult<()>; + fn delete_workspace(&self, workspace_id: &str) -> FlowyResult<()>; + + fn create_app(&self, app: App) -> FlowyResult<()>; + fn update_app(&self, changeset: AppChangeset) -> FlowyResult<()>; + fn read_app(&self, app_id: &str) -> FlowyResult; + fn read_workspace_apps(&self, workspace_id: &str) -> FlowyResult>; + fn delete_app(&self, app_id: &str) -> FlowyResult; + + fn create_view(&self, view: View) -> FlowyResult<()>; + fn read_view(&self, view_id: &str) -> FlowyResult; + fn read_views(&self, belong_to_id: &str) -> FlowyResult>; + fn update_view(&self, changeset: ViewChangeset) -> FlowyResult<()>; + fn delete_view(&self, view_id: &str) -> FlowyResult<()>; + + fn create_trash(&self, trashes: Vec) -> FlowyResult<()>; + fn read_all_trash(&self) -> FlowyResult; + fn delete_all_trash(&self) -> FlowyResult<()>; + fn read_trash(&self, trash_id: &str) -> FlowyResult; + fn delete_trash(&self, trash_ids: Vec) -> FlowyResult<()>; +} + +pub struct FlowyCorePersistence { + database: Arc, +} + +impl FlowyCorePersistence { + pub fn new(database: Arc) -> Self { Self { database } } + + pub fn begin_transaction(&self, f: F) -> FlowyResult + where + F: for<'a> FnOnce(Box) -> FlowyResult, + { + //[[immediate_transaction]] + // https://sqlite.org/lang_transaction.html + // IMMEDIATE cause the database connection to start a new write immediately, + // without waiting for a write statement. The BEGIN IMMEDIATE might fail + // with SQLITE_BUSY if another write transaction is already active on another + // database connection. + // + // EXCLUSIVE is similar to IMMEDIATE in that a write transaction is started + // immediately. EXCLUSIVE and IMMEDIATE are the same in WAL mode, but in + // other journaling modes, EXCLUSIVE prevents other database connections from + // reading the database while the transaction is underway. + let conn = self.database.db_connection()?; + conn.immediate_transaction::<_, FlowyError, _>(|| f(Box::new(V1Transaction(&conn)))) + } + + // pub fn scope_transaction(&self, f: F) -> FlowyResult + // where + // F: for<'a> FnOnce(Box) -> + // FlowyResult, { + // match thread::scope(|_s| self.begin_transaction(f)) { + // Ok(result) => result, + // Err(e) => Err(FlowyError::internal().context(e)), + // } + // } +} diff --git a/frontend/rust-lib/flowy-core/src/services/app/sql.rs b/frontend/rust-lib/flowy-core/src/services/persistence/version_1/app_sql.rs similarity index 95% rename from frontend/rust-lib/flowy-core/src/services/app/sql.rs rename to frontend/rust-lib/flowy-core/src/services/persistence/version_1/app_sql.rs index d1083b4fce..ad51dc2109 100644 --- a/frontend/rust-lib/flowy-core/src/services/app/sql.rs +++ b/frontend/rust-lib/flowy-core/src/services/persistence/version_1/app_sql.rs @@ -1,10 +1,7 @@ -use crate::{ - entities::{ - app::{App, ColorStyle, UpdateAppParams}, - trash::{Trash, TrashType}, - view::RepeatedView, - }, - services::workspace::sql::WorkspaceTable, +use crate::entities::{ + app::{App, ColorStyle, UpdateAppParams}, + trash::{Trash, TrashType}, + view::RepeatedView, }; use diesel::sql_types::Binary; use flowy_database::{ @@ -15,10 +12,9 @@ use flowy_database::{ use serde::{Deserialize, Serialize, __private::TryFrom}; use std::convert::TryInto; -use crate::errors::FlowyError; - -pub struct AppTableSql {} +use crate::{errors::FlowyError, services::persistence::version_1::workspace_sql::WorkspaceTable}; +pub struct AppTableSql(); impl AppTableSql { pub(crate) fn create_app(app: App, conn: &SqliteConnection) -> Result<(), FlowyError> { let app_table = AppTable::new(app); @@ -161,7 +157,7 @@ impl_sql_binary_expression!(ColorStyleCol); #[derive(AsChangeset, Identifiable, Default, Debug)] #[table_name = "app_table"] -pub(crate) struct AppChangeset { +pub struct AppChangeset { pub id: String, pub name: Option, pub desc: Option, diff --git a/frontend/rust-lib/flowy-core/src/services/persistence/version_1/mod.rs b/frontend/rust-lib/flowy-core/src/services/persistence/version_1/mod.rs new file mode 100644 index 0000000000..e6992a4a04 --- /dev/null +++ b/frontend/rust-lib/flowy-core/src/services/persistence/version_1/mod.rs @@ -0,0 +1,5 @@ +pub mod app_sql; +pub mod trash_sql; +pub mod v1_impl; +pub mod view_sql; +pub mod workspace_sql; diff --git a/frontend/rust-lib/flowy-core/src/services/trash/sql.rs b/frontend/rust-lib/flowy-core/src/services/persistence/version_1/trash_sql.rs similarity index 99% rename from frontend/rust-lib/flowy-core/src/services/trash/sql.rs rename to frontend/rust-lib/flowy-core/src/services/persistence/version_1/trash_sql.rs index fc90c20305..7511b998f9 100644 --- a/frontend/rust-lib/flowy-core/src/services/trash/sql.rs +++ b/frontend/rust-lib/flowy-core/src/services/persistence/version_1/trash_sql.rs @@ -9,8 +9,7 @@ use flowy_database::{ SqliteConnection, }; -pub struct TrashTableSql {} - +pub struct TrashTableSql(); impl TrashTableSql { pub(crate) fn create_trash(trashes: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { for trash in trashes { diff --git a/frontend/rust-lib/flowy-core/src/services/persistence/version_1/v1_impl.rs b/frontend/rust-lib/flowy-core/src/services/persistence/version_1/v1_impl.rs new file mode 100644 index 0000000000..93bad64d2c --- /dev/null +++ b/frontend/rust-lib/flowy-core/src/services/persistence/version_1/v1_impl.rs @@ -0,0 +1,163 @@ +use crate::services::persistence::{ + version_1::{ + app_sql::{AppChangeset, AppTableSql}, + view_sql::{ViewChangeset, ViewTableSql}, + workspace_sql::{WorkspaceChangeset, WorkspaceTableSql}, + }, + FlowyCorePersistenceTransaction, + TrashTableSql, +}; +use flowy_core_data_model::entities::{ + app::App, + prelude::{RepeatedTrash, Trash, View, Workspace}, +}; +use flowy_error::FlowyResult; +use lib_sqlite::DBConnection; + +pub struct V1Transaction<'a>(pub &'a DBConnection); + +impl<'a> FlowyCorePersistenceTransaction for V1Transaction<'a> { + fn create_workspace(&self, user_id: &str, workspace: Workspace) -> FlowyResult<()> { + let _ = WorkspaceTableSql::create_workspace(user_id, workspace, &*self.0)?; + Ok(()) + } + + fn read_workspaces(&self, user_id: &str, workspace_id: Option) -> FlowyResult> { + let tables = WorkspaceTableSql::read_workspaces(workspace_id, user_id, &*self.0)?; + let workspaces = tables.into_iter().map(Workspace::from).collect::>(); + Ok(workspaces) + } + + fn update_workspace(&self, changeset: WorkspaceChangeset) -> FlowyResult<()> { + WorkspaceTableSql::update_workspace(changeset, &*self.0) + } + + fn delete_workspace(&self, workspace_id: &str) -> FlowyResult<()> { + WorkspaceTableSql::delete_workspace(workspace_id, &*self.0) + } + + fn create_app(&self, app: App) -> FlowyResult<()> { + let _ = AppTableSql::create_app(app, &*self.0)?; + Ok(()) + } + + fn update_app(&self, changeset: AppChangeset) -> FlowyResult<()> { + let _ = AppTableSql::update_app(changeset, &*self.0)?; + Ok(()) + } + + fn read_app(&self, app_id: &str) -> FlowyResult { + let table = AppTableSql::read_app(app_id, &*self.0)?; + Ok(App::from(table)) + } + + fn read_workspace_apps(&self, workspace_id: &str) -> FlowyResult> { + let tables = AppTableSql::read_workspace_apps(workspace_id, false, &*self.0)?; + let apps = tables.into_iter().map(App::from).collect::>(); + Ok(apps) + } + + fn delete_app(&self, app_id: &str) -> FlowyResult { + let table = AppTableSql::delete_app(app_id, &*self.0)?; + Ok(App::from(table)) + } + + fn create_view(&self, view: View) -> FlowyResult<()> { + let _ = ViewTableSql::create_view(view, &*self.0)?; + Ok(()) + } + + fn read_view(&self, view_id: &str) -> FlowyResult { + let table = ViewTableSql::read_view(view_id, &*self.0)?; + Ok(View::from(table)) + } + + fn read_views(&self, belong_to_id: &str) -> FlowyResult> { + let tables = ViewTableSql::read_views(belong_to_id, &*self.0)?; + let views = tables.into_iter().map(View::from).collect::>(); + Ok(views) + } + + fn update_view(&self, changeset: ViewChangeset) -> FlowyResult<()> { + let _ = ViewTableSql::update_view(changeset, &*self.0)?; + Ok(()) + } + + fn delete_view(&self, view_id: &str) -> FlowyResult<()> { + let _ = ViewTableSql::delete_view(view_id, &*self.0)?; + Ok(()) + } + + fn create_trash(&self, trashes: Vec) -> FlowyResult<()> { + let _ = TrashTableSql::create_trash(trashes, &*self.0)?; + Ok(()) + } + + fn read_all_trash(&self) -> FlowyResult { TrashTableSql::read_all(&*self.0) } + + fn delete_all_trash(&self) -> FlowyResult<()> { TrashTableSql::delete_all(&*self.0) } + + fn read_trash(&self, trash_id: &str) -> FlowyResult { + let table = TrashTableSql::read(trash_id, &*self.0)?; + Ok(Trash::from(table)) + } + + fn delete_trash(&self, trash_ids: Vec) -> FlowyResult<()> { + for trash_id in &trash_ids { + let _ = TrashTableSql::delete_trash(&trash_id, &*self.0)?; + } + Ok(()) + } +} + +// https://www.reddit.com/r/rust/comments/droxdg/why_arent_traits_impld_for_boxdyn_trait/ +impl FlowyCorePersistenceTransaction for Box +where + T: FlowyCorePersistenceTransaction + ?Sized, +{ + fn create_workspace(&self, user_id: &str, workspace: Workspace) -> FlowyResult<()> { + (**self).create_workspace(user_id, workspace) + } + + fn read_workspaces(&self, user_id: &str, workspace_id: Option) -> FlowyResult> { + (**self).read_workspaces(user_id, workspace_id) + } + + fn update_workspace(&self, changeset: WorkspaceChangeset) -> FlowyResult<()> { + (**self).update_workspace(changeset) + } + + fn delete_workspace(&self, workspace_id: &str) -> FlowyResult<()> { (**self).delete_workspace(workspace_id) } + + fn create_app(&self, app: App) -> FlowyResult<()> { (**self).create_app(app) } + + fn update_app(&self, changeset: AppChangeset) -> FlowyResult<()> { (**self).update_app(changeset) } + + fn read_app(&self, app_id: &str) -> FlowyResult { (**self).read_app(app_id) } + + fn read_workspace_apps(&self, workspace_id: &str) -> FlowyResult> { + (**self).read_workspace_apps(workspace_id) + } + + fn delete_app(&self, app_id: &str) -> FlowyResult { (**self).delete_app(app_id) } + + fn create_view(&self, view: View) -> FlowyResult<()> { (**self).create_view(view) } + + fn read_view(&self, view_id: &str) -> FlowyResult { (**self).read_view(view_id) } + + fn read_views(&self, belong_to_id: &str) -> FlowyResult> { (**self).read_views(belong_to_id) } + + fn update_view(&self, changeset: ViewChangeset) -> FlowyResult<()> { (**self).update_view(changeset) } + + fn delete_view(&self, view_id: &str) -> FlowyResult<()> { (**self).delete_view(view_id) } + + fn create_trash(&self, trashes: Vec) -> FlowyResult<()> { (**self).create_trash(trashes) } + + fn read_all_trash(&self) -> FlowyResult { (**self).read_all_trash() } + + fn delete_all_trash(&self) -> FlowyResult<()> { (**self).delete_all_trash() } + + fn read_trash(&self, trash_id: &str) -> FlowyResult { (**self).read_trash(trash_id) } + + fn delete_trash(&self, trash_ids: Vec) -> FlowyResult<()> { (**self).delete_trash(trash_ids) } +} diff --git a/frontend/rust-lib/flowy-core/src/services/view/sql.rs b/frontend/rust-lib/flowy-core/src/services/persistence/version_1/view_sql.rs similarity index 98% rename from frontend/rust-lib/flowy-core/src/services/view/sql.rs rename to frontend/rust-lib/flowy-core/src/services/persistence/version_1/view_sql.rs index 231a756717..badddf724b 100644 --- a/frontend/rust-lib/flowy-core/src/services/view/sql.rs +++ b/frontend/rust-lib/flowy-core/src/services/persistence/version_1/view_sql.rs @@ -4,7 +4,7 @@ use crate::{ view::{RepeatedView, UpdateViewParams, View, ViewType}, }, errors::FlowyError, - services::app::sql::AppTable, + services::persistence::version_1::app_sql::AppTable, }; use diesel::sql_types::Integer; use flowy_database::{ @@ -14,8 +14,7 @@ use flowy_database::{ }; use lib_infra::timestamp; -pub struct ViewTableSql {} - +pub struct ViewTableSql(); impl ViewTableSql { pub(crate) fn create_view(view: View, conn: &SqliteConnection) -> Result<(), FlowyError> { let view_table = ViewTable::new(view); @@ -182,7 +181,7 @@ impl std::convert::From for Trash { #[derive(AsChangeset, Identifiable, Clone, Default, Debug)] #[table_name = "view_table"] -pub(crate) struct ViewChangeset { +pub struct ViewChangeset { pub id: String, pub name: Option, pub desc: Option, diff --git a/frontend/rust-lib/flowy-core/src/services/workspace/sql.rs b/frontend/rust-lib/flowy-core/src/services/persistence/version_1/workspace_sql.rs similarity index 89% rename from frontend/rust-lib/flowy-core/src/services/workspace/sql.rs rename to frontend/rust-lib/flowy-core/src/services/persistence/version_1/workspace_sql.rs index 19649123ec..dab057526a 100644 --- a/frontend/rust-lib/flowy-core/src/services/workspace/sql.rs +++ b/frontend/rust-lib/flowy-core/src/services/persistence/version_1/workspace_sql.rs @@ -10,8 +10,7 @@ use flowy_database::{ prelude::*, schema::{workspace_table, workspace_table::dsl}, }; -pub(crate) struct WorkspaceTableSql {} - +pub(crate) struct WorkspaceTableSql(); impl WorkspaceTableSql { pub(crate) fn create_workspace( user_id: &str, @@ -22,7 +21,7 @@ impl WorkspaceTableSql { match diesel_record_count!(workspace_table, &table.id, conn) { 0 => diesel_insert_table!(workspace_table, &table, conn), _ => { - let changeset = WorkspaceTableChangeset::from_table(table); + let changeset = WorkspaceChangeset::from_table(table); diesel_update_table!(workspace_table, changeset, conn); }, } @@ -49,10 +48,7 @@ impl WorkspaceTableSql { } #[allow(dead_code)] - pub(crate) fn update_workspace( - changeset: WorkspaceTableChangeset, - conn: &SqliteConnection, - ) -> Result<(), FlowyError> { + pub(crate) fn update_workspace(changeset: WorkspaceChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> { diesel_update_table!(workspace_table, changeset, conn); Ok(()) } @@ -106,15 +102,15 @@ impl std::convert::From for Workspace { #[derive(AsChangeset, Identifiable, Clone, Default, Debug)] #[table_name = "workspace_table"] -pub struct WorkspaceTableChangeset { +pub struct WorkspaceChangeset { pub id: String, pub name: Option, pub desc: Option, } -impl WorkspaceTableChangeset { +impl WorkspaceChangeset { pub fn new(params: UpdateWorkspaceParams) -> Self { - WorkspaceTableChangeset { + WorkspaceChangeset { id: params.id, name: params.name, desc: params.desc, @@ -122,7 +118,7 @@ impl WorkspaceTableChangeset { } pub(crate) fn from_table(table: WorkspaceTable) -> Self { - WorkspaceTableChangeset { + WorkspaceChangeset { id: table.id, name: Some(table.name), desc: Some(table.desc), diff --git a/frontend/rust-lib/flowy-core/src/services/trash/controller.rs b/frontend/rust-lib/flowy-core/src/services/trash/controller.rs index e2efab3040..d0fe088b95 100644 --- a/frontend/rust-lib/flowy-core/src/services/trash/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/trash/controller.rs @@ -2,16 +2,15 @@ use crate::{ dart_notification::{send_anonymous_dart_notification, WorkspaceNotification}, entities::trash::{RepeatedTrash, RepeatedTrashId, Trash, TrashId, TrashType}, errors::{FlowyError, FlowyResult}, - module::{WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser}, - services::trash::sql::TrashTableSql, + module::{WorkspaceCloudService, WorkspaceUser}, + services::persistence::{FlowyCorePersistence, FlowyCorePersistenceTransaction}, }; -use crossbeam_utils::thread; -use flowy_database::SqliteConnection; + use std::{fmt::Formatter, sync::Arc}; use tokio::sync::{broadcast, mpsc}; pub struct TrashController { - pub database: Arc, + persistence: Arc, notify: broadcast::Sender, cloud_service: Arc, user: Arc, @@ -19,14 +18,13 @@ pub struct TrashController { impl TrashController { pub fn new( - database: Arc, + persistence: Arc, cloud_service: Arc, user: Arc, ) -> Self { let (tx, _) = broadcast::channel(10); - Self { - database, + persistence, notify: tx, cloud_service, user, @@ -38,22 +36,16 @@ impl TrashController { #[tracing::instrument(level = "debug", skip(self), fields(putback) err)] pub async fn putback(&self, trash_id: &str) -> FlowyResult<()> { let (tx, mut rx) = mpsc::channel::>(1); - let trash_table = TrashTableSql::read(trash_id, &*self.database.db_connection()?)?; - let _ = thread::scope(|_s| { - let conn = self.database.db_connection()?; - conn.immediate_transaction::<_, FlowyError, _>(|| { - let _ = TrashTableSql::delete_trash(trash_id, &*conn)?; - notify_trash_changed(TrashTableSql::read_all(&conn)?); - Ok(()) - })?; - - Ok::<(), FlowyError>(()) - }) - .unwrap()?; + let trash = self.persistence.begin_transaction(|transaction| { + let trash = transaction.read_trash(trash_id); + let _ = transaction.delete_trash(vec![trash_id.to_owned()])?; + notify_trash_changed(transaction.read_all_trash()?); + trash + })?; let identifier = TrashId { - id: trash_table.id, - ty: trash_table.ty.into(), + id: trash.id, + ty: trash.ty, }; let _ = self.delete_trash_on_server(RepeatedTrashId { @@ -69,15 +61,11 @@ impl TrashController { #[tracing::instrument(level = "debug", skip(self) err)] pub async fn restore_all(&self) -> FlowyResult<()> { - let repeated_trash = thread::scope(|_s| { - let conn = self.database.db_connection()?; - conn.immediate_transaction::<_, FlowyError, _>(|| { - let repeated_trash = TrashTableSql::read_all(&*conn)?; - let _ = TrashTableSql::delete_all(&*conn)?; - Ok(repeated_trash) - }) - }) - .unwrap()?; + let repeated_trash = self.persistence.begin_transaction(|transaction| { + let trash = transaction.read_all_trash(); + let _ = transaction.delete_all_trash(); + trash + })?; let identifiers: RepeatedTrashId = repeated_trash.items.clone().into(); let (tx, mut rx) = mpsc::channel::>(1); @@ -91,7 +79,9 @@ impl TrashController { #[tracing::instrument(level = "debug", skip(self), err)] pub async fn delete_all(&self) -> FlowyResult<()> { - let repeated_trash = TrashTableSql::read_all(&*(self.database.db_connection()?))?; + let repeated_trash = self + .persistence + .begin_transaction(|transaction| transaction.read_all_trash())?; let trash_identifiers: RepeatedTrashId = repeated_trash.items.clone().into(); let _ = self.delete_with_identifiers(trash_identifiers.clone()).await?; @@ -103,7 +93,10 @@ impl TrashController { #[tracing::instrument(level = "debug", skip(self), err)] pub async fn delete(&self, trash_identifiers: RepeatedTrashId) -> FlowyResult<()> { let _ = self.delete_with_identifiers(trash_identifiers.clone()).await?; - notify_trash_changed(TrashTableSql::read_all(&*(self.database.db_connection()?))?); + let repeated_trash = self + .persistence + .begin_transaction(|transaction| transaction.read_all_trash())?; + notify_trash_changed(repeated_trash); let _ = self.delete_trash_on_server(trash_identifiers)?; Ok(()) @@ -122,14 +115,15 @@ impl TrashController { Err(e) => log::error!("{}", e), }, } - - let conn = self.database.db_connection()?; - conn.immediate_transaction::<_, FlowyError, _>(|| { - for trash_identifier in &trash_identifiers.items { - let _ = TrashTableSql::delete_trash(&trash_identifier.id, &conn)?; - } - Ok(()) + let _ = self.persistence.begin_transaction(|transaction| { + let ids = trash_identifiers + .items + .into_iter() + .map(|item| item.id) + .collect::>(); + transaction.delete_trash(ids) })?; + Ok(()) } @@ -156,19 +150,13 @@ impl TrashController { ) .as_str(), ); - let _ = thread::scope(|_s| { - let conn = self.database.db_connection()?; - conn.immediate_transaction::<_, FlowyError, _>(|| { - let _ = TrashTableSql::create_trash(repeated_trash.clone(), &*conn)?; - let _ = self.create_trash_on_server(repeated_trash); - - notify_trash_changed(TrashTableSql::read_all(&conn)?); - Ok(()) - })?; - Ok::<(), FlowyError>(()) - }) - .unwrap()?; + let _ = self.persistence.begin_transaction(|transaction| { + let _ = transaction.create_trash(repeated_trash.clone())?; + let _ = self.create_trash_on_server(repeated_trash); + notify_trash_changed(transaction.read_all_trash()?); + Ok(()) + })?; let _ = self.notify.send(TrashEvent::NewTrash(identifiers.into(), tx)); let _ = rx.recv().await.unwrap()?; @@ -177,14 +165,20 @@ impl TrashController { pub fn subscribe(&self) -> broadcast::Receiver { self.notify.subscribe() } - pub fn read_trash(&self, conn: &SqliteConnection) -> Result { - let repeated_trash = TrashTableSql::read_all(&*conn)?; + pub fn read_trash(&self) -> Result { + let repeated_trash = self + .persistence + .begin_transaction(|transaction| transaction.read_all_trash())?; let _ = self.read_trash_on_server()?; Ok(repeated_trash) } - pub fn read_trash_ids(&self, conn: &SqliteConnection) -> Result, FlowyError> { - let ids = TrashTableSql::read_all(&*conn)? + pub fn read_trash_ids<'a>( + &self, + transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a), + ) -> Result, FlowyError> { + let ids = transaction + .read_all_trash()? .into_inner() .into_iter() .map(|item| item.id) @@ -227,27 +221,22 @@ impl TrashController { fn read_trash_on_server(&self) -> FlowyResult<()> { let token = self.user.token()?; let server = self.cloud_service.clone(); - let pool = self.database.db_pool()?; + let persistence = self.persistence.clone(); tokio::spawn(async move { match server.read_trash(&token).await { Ok(repeated_trash) => { tracing::debug!("Remote trash count: {}", repeated_trash.items.len()); - match pool.get() { - Ok(conn) => { - let result = conn.immediate_transaction::<_, FlowyError, _>(|| { - let _ = TrashTableSql::create_trash(repeated_trash.items.clone(), &*conn)?; - TrashTableSql::read_all(&conn) - }); + let result = persistence.begin_transaction(|transaction| { + let _ = transaction.create_trash(repeated_trash.items.clone())?; + transaction.read_all_trash() + }); - match result { - Ok(repeated_trash) => { - notify_trash_changed(repeated_trash); - }, - Err(e) => log::error!("Save trash failed: {:?}", e), - } + match result { + Ok(repeated_trash) => { + notify_trash_changed(repeated_trash); }, - Err(e) => log::error!("Require db connection failed: {:?}", e), + Err(e) => log::error!("Save trash failed: {:?}", e), } }, Err(e) => log::error!("Read trash failed: {:?}", e), diff --git a/frontend/rust-lib/flowy-core/src/services/trash/event_handler.rs b/frontend/rust-lib/flowy-core/src/services/trash/event_handler.rs index 1aaa3ceeda..4794d521e8 100644 --- a/frontend/rust-lib/flowy-core/src/services/trash/event_handler.rs +++ b/frontend/rust-lib/flowy-core/src/services/trash/event_handler.rs @@ -10,8 +10,7 @@ use std::sync::Arc; pub(crate) async fn read_trash_handler( controller: Unit>, ) -> DataResult { - let conn = controller.database.db_connection()?; - let repeated_trash = controller.read_trash(&conn)?; + let repeated_trash = controller.read_trash()?; data_result(repeated_trash) } diff --git a/frontend/rust-lib/flowy-core/src/services/trash/mod.rs b/frontend/rust-lib/flowy-core/src/services/trash/mod.rs index c08fd82307..e854517f17 100644 --- a/frontend/rust-lib/flowy-core/src/services/trash/mod.rs +++ b/frontend/rust-lib/flowy-core/src/services/trash/mod.rs @@ -1,3 +1,2 @@ pub mod controller; pub mod event_handler; -mod sql; diff --git a/frontend/rust-lib/flowy-core/src/services/view/controller.rs b/frontend/rust-lib/flowy-core/src/services/view/controller.rs index c161968875..c58e0818b6 100644 --- a/frontend/rust-lib/flowy-core/src/services/view/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/view/controller.rs @@ -3,7 +3,7 @@ use flowy_collaboration::entities::{ doc::{DocumentDelta, DocumentId}, revision::{RepeatedRevision, Revision}, }; -use flowy_database::SqliteConnection; + use futures::{FutureExt, StreamExt}; use std::{collections::HashSet, sync::Arc}; @@ -14,9 +14,9 @@ use crate::{ view::{CreateViewParams, RepeatedView, UpdateViewParams, View, ViewId}, }, errors::{FlowyError, FlowyResult}, - module::{WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser}, + module::{WorkspaceCloudService, WorkspaceUser}, services::{ - view::sql::{ViewChangeset, ViewTable, ViewTableSql}, + persistence::{FlowyCorePersistence, FlowyCorePersistenceTransaction, ViewChangeset}, TrashController, TrashEvent, }, @@ -31,7 +31,7 @@ const LATEST_VIEW_ID: &str = "latest_view_id"; pub(crate) struct ViewController { user: Arc, cloud_service: Arc, - database: Arc, + persistence: Arc, trash_controller: Arc, document_ctx: Arc, } @@ -39,7 +39,7 @@ pub(crate) struct ViewController { impl ViewController { pub(crate) fn new( user: Arc, - database: Arc, + persistence: Arc, cloud_service: Arc, trash_can: Arc, document_ctx: Arc, @@ -47,7 +47,7 @@ impl ViewController { Self { user, cloud_service, - database, + persistence, trash_controller: trash_can, document_ctx, } @@ -77,51 +77,37 @@ impl ViewController { } pub(crate) async fn create_view_on_local(&self, view: View) -> Result<(), FlowyError> { - let conn = &*self.database.db_connection()?; - let trash_can = self.trash_controller.clone(); - - conn.immediate_transaction::<_, FlowyError, _>(|| { + let trash_controller = self.trash_controller.clone(); + self.persistence.begin_transaction(|transaction| { let belong_to_id = view.belong_to_id.clone(); - let _ = self.save_view(view, conn)?; - let _ = notify_views_changed(&belong_to_id, trash_can, &conn)?; - + let _ = transaction.create_view(view)?; + let _ = notify_views_changed(&belong_to_id, trash_controller, &transaction)?; Ok(()) - })?; - - Ok(()) - } - - pub(crate) fn save_view(&self, view: View, conn: &SqliteConnection) -> Result<(), FlowyError> { - let _ = ViewTableSql::create_view(view, conn)?; - Ok(()) + }) } #[tracing::instrument(skip(self, params), fields(view_id = %params.view_id), err)] pub(crate) async fn read_view(&self, params: ViewId) -> Result { - let conn = self.database.db_connection()?; - let view_table = ViewTableSql::read_view(¶ms.view_id, &*conn)?; - - let trash_ids = self.trash_controller.read_trash_ids(&conn)?; - if trash_ids.contains(&view_table.id) { - return Err(FlowyError::record_not_found()); - } - - let view: View = view_table.into(); + let view = self.persistence.begin_transaction(|transaction| { + let view = transaction.read_view(¶ms.view_id)?; + let trash_ids = self.trash_controller.read_trash_ids(&transaction)?; + if trash_ids.contains(&view.id) { + return Err(FlowyError::record_not_found()); + } + Ok(view) + })?; let _ = self.read_view_on_server(params); Ok(view) } - pub(crate) fn read_view_tables(&self, ids: Vec) -> Result, FlowyError> { - let conn = &*self.database.db_connection()?; - let mut view_tables = vec![]; - conn.immediate_transaction::<_, FlowyError, _>(|| { + pub(crate) fn read_local_views(&self, ids: Vec) -> Result, FlowyError> { + self.persistence.begin_transaction(|transaction| { + let mut views = vec![]; for view_id in ids { - view_tables.push(ViewTableSql::read_view(&view_id, conn)?); + views.push(transaction.read_view(&view_id)?); } - Ok(()) - })?; - - Ok(view_tables) + Ok(views) + }) } #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)] @@ -156,7 +142,10 @@ impl ViewController { #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)] pub(crate) async fn duplicate_view(&self, params: DocumentId) -> Result<(), FlowyError> { - let view: View = ViewTableSql::read_view(¶ms.doc_id, &*self.database.db_connection()?)?.into(); + let view = self + .persistence + .begin_transaction(|transaction| transaction.read_view(¶ms.doc_id))?; + let editor = self.document_ctx.controller.open_document(¶ms.doc_id).await?; let document_json = editor.document_json().await?; let duplicate_params = CreateViewParams { @@ -186,31 +175,27 @@ impl ViewController { // belong_to_id will be the app_id or view_id. #[tracing::instrument(level = "debug", skip(self), err)] pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result { - // TODO: read from server - let conn = self.database.db_connection()?; - let repeated_view = read_belonging_views_on_local(belong_to_id, self.trash_controller.clone(), &conn)?; - Ok(repeated_view) + self.persistence.begin_transaction(|transaction| { + read_belonging_views_on_local(belong_to_id, self.trash_controller.clone(), &transaction) + }) } #[tracing::instrument(level = "debug", skip(self, params), err)] pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result { - let conn = &*self.database.db_connection()?; let changeset = ViewChangeset::new(params.clone()); let view_id = changeset.id.clone(); - - let updated_view = conn.immediate_transaction::<_, FlowyError, _>(|| { - let _ = ViewTableSql::update_view(changeset, conn)?; - let view: View = ViewTableSql::read_view(&view_id, conn)?.into(); + let view = self.persistence.begin_transaction(|transaction| { + let _ = transaction.update_view(changeset)?; + let view = transaction.read_view(&view_id)?; + send_dart_notification(&view_id, WorkspaceNotification::ViewUpdated) + .payload(view.clone()) + .send(); + let _ = notify_views_changed(&view.belong_to_id, self.trash_controller.clone(), &transaction)?; Ok(view) })?; - send_dart_notification(&view_id, WorkspaceNotification::ViewUpdated) - .payload(updated_view.clone()) - .send(); - // - let _ = notify_views_changed(&updated_view.belong_to_id, self.trash_controller.clone(), conn)?; let _ = self.update_view_on_server(params); - Ok(updated_view) + Ok(view) } pub(crate) async fn receive_document_delta(&self, params: DocumentDelta) -> Result { @@ -222,9 +207,10 @@ impl ViewController { match KV::get_str(LATEST_VIEW_ID) { None => Ok(None), Some(view_id) => { - let conn = self.database.db_connection()?; - let view_table = ViewTableSql::read_view(&view_id, &*conn)?; - Ok(Some(view_table.into())) + let view = self + .persistence + .begin_transaction(|transaction| transaction.read_view(&view_id))?; + Ok(Some(view)) }, } } @@ -260,23 +246,19 @@ impl ViewController { fn read_view_on_server(&self, params: ViewId) -> Result<(), FlowyError> { let token = self.user.token()?; let server = self.cloud_service.clone(); - let pool = self.database.db_pool()?; + let persistence = self.persistence.clone(); // TODO: Retry with RetryAction? tokio::spawn(async move { match server.read_view(&token, params).await { - Ok(Some(view)) => match pool.get() { - Ok(conn) => { - let result = ViewTableSql::create_view(view.clone(), &conn); - match result { - Ok(_) => { - send_dart_notification(&view.id, WorkspaceNotification::ViewUpdated) - .payload(view.clone()) - .send(); - }, - Err(e) => log::error!("Save view failed: {:?}", e), - } - }, - Err(e) => log::error!("Require db connection failed: {:?}", e), + Ok(Some(view)) => { + match persistence.begin_transaction(|transaction| transaction.create_view(view.clone())) { + Ok(_) => { + send_dart_notification(&view.id, WorkspaceNotification::ViewUpdated) + .payload(view.clone()) + .send(); + }, + Err(e) => log::error!("Save view failed: {:?}", e), + } }, Ok(None) => {}, Err(e) => log::error!("Read view failed: {:?}", e), @@ -287,9 +269,9 @@ impl ViewController { fn listen_trash_can_event(&self) { let mut rx = self.trash_controller.subscribe(); - let database = self.database.clone(); + let persistence = self.persistence.clone(); let document = self.document_ctx.clone(); - let trash_can = self.trash_controller.clone(); + let trash_controller = self.trash_controller.clone(); let _ = tokio::spawn(async move { loop { let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move { @@ -300,96 +282,87 @@ impl ViewController { })); if let Some(event) = stream.next().await { - handle_trash_event(database.clone(), document.clone(), trash_can.clone(), event).await + handle_trash_event(persistence.clone(), document.clone(), trash_controller.clone(), event).await } } }); } } -#[tracing::instrument(level = "trace", skip(database, context, trash_can))] +#[tracing::instrument(level = "trace", skip(persistence, context, trash_can))] async fn handle_trash_event( - database: Arc, + persistence: Arc, context: Arc, trash_can: Arc, event: TrashEvent, ) { - let db_result = database.db_connection(); - match event { TrashEvent::NewTrash(identifiers, ret) => { - let result = || { - let conn = &*db_result?; - let view_tables = read_view_tables(identifiers, conn)?; - for view_table in view_tables { - let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?; - notify_dart(view_table, WorkspaceNotification::ViewDeleted); + let result = persistence.begin_transaction(|transaction| { + let views = read_local_views_with_transaction(identifiers, &transaction)?; + for view in views { + let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?; + notify_dart(view, WorkspaceNotification::ViewDeleted); } - Ok::<(), FlowyError>(()) - }; - let _ = ret.send(result()).await; + Ok(()) + }); + let _ = ret.send(result).await; }, TrashEvent::Putback(identifiers, ret) => { - let result = || { - let conn = &*db_result?; - let view_tables = read_view_tables(identifiers, conn)?; - for view_table in view_tables { - let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?; - notify_dart(view_table, WorkspaceNotification::ViewRestored); + let result = persistence.begin_transaction(|transaction| { + let views = read_local_views_with_transaction(identifiers, &transaction)?; + for view in views { + let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?; + notify_dart(view, WorkspaceNotification::ViewRestored); } - Ok::<(), FlowyError>(()) - }; - let _ = ret.send(result()).await; + Ok(()) + }); + let _ = ret.send(result).await; }, TrashEvent::Delete(identifiers, ret) => { - let result = || { - let conn = &*db_result?; - let _ = conn.immediate_transaction::<_, FlowyError, _>(|| { - let mut notify_ids = HashSet::new(); - for identifier in identifiers.items { - let view_table = ViewTableSql::read_view(&identifier.id, conn)?; - let _ = ViewTableSql::delete_view(&identifier.id, conn)?; - let _ = context.controller.delete(&identifier.id)?; - notify_ids.insert(view_table.belong_to_id); - } + let result = persistence.begin_transaction(|transaction| { + let mut notify_ids = HashSet::new(); + for identifier in identifiers.items { + let view = transaction.read_view(&identifier.id)?; + let _ = transaction.delete_view(&identifier.id)?; + let _ = context.controller.delete(&identifier.id)?; + notify_ids.insert(view.belong_to_id); + } - for notify_id in notify_ids { - let _ = notify_views_changed(¬ify_id, trash_can.clone(), conn)?; - } + for notify_id in notify_ids { + let _ = notify_views_changed(¬ify_id, trash_can.clone(), &transaction)?; + } - Ok(()) - })?; - Ok::<(), FlowyError>(()) - }; - let _ = ret.send(result()).await; + Ok(()) + }); + let _ = ret.send(result).await; }, } } -fn read_view_tables(identifiers: RepeatedTrashId, conn: &SqliteConnection) -> Result, FlowyError> { - let mut view_tables = vec![]; - let _ = conn.immediate_transaction::<_, FlowyError, _>(|| { - for identifier in identifiers.items { - let view_table = ViewTableSql::read_view(&identifier.id, conn)?; - view_tables.push(view_table); - } - Ok(()) - })?; - Ok(view_tables) +fn read_local_views_with_transaction<'a>( + identifiers: RepeatedTrashId, + transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a), +) -> Result, FlowyError> { + let mut views = vec![]; + for identifier in identifiers.items { + let view = transaction.read_view(&identifier.id)?; + views.push(view); + } + Ok(views) } -fn notify_dart(view_table: ViewTable, notification: WorkspaceNotification) { - let view: View = view_table.into(); +fn notify_dart(view: View, notification: WorkspaceNotification) { send_dart_notification(&view.id, notification).payload(view).send(); } -#[tracing::instrument(skip(belong_to_id, trash_controller, conn), fields(view_count), err)] -fn notify_views_changed( +#[tracing::instrument(skip(belong_to_id, trash_controller, transaction), fields(view_count), err)] +fn notify_views_changed<'a>( belong_to_id: &str, trash_controller: Arc, - conn: &SqliteConnection, + transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a), ) -> FlowyResult<()> { - let repeated_view = read_belonging_views_on_local(belong_to_id, trash_controller.clone(), conn)?; + let repeated_view = read_belonging_views_on_local(belong_to_id, trash_controller.clone(), transaction)?; tracing::Span::current().record("view_count", &format!("{}", repeated_view.len()).as_str()); send_dart_notification(&belong_to_id, WorkspaceNotification::AppViewsChanged) .payload(repeated_view) @@ -397,19 +370,14 @@ fn notify_views_changed( Ok(()) } -fn read_belonging_views_on_local( +fn read_belonging_views_on_local<'a>( belong_to_id: &str, trash_controller: Arc, - conn: &SqliteConnection, + transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a), ) -> FlowyResult { - let mut view_tables = ViewTableSql::read_views(belong_to_id, conn)?; - let trash_ids = trash_controller.read_trash_ids(conn)?; - view_tables.retain(|view_table| !trash_ids.contains(&view_table.id)); - - let views = view_tables - .into_iter() - .map(|view_table| view_table.into()) - .collect::>(); + let mut views = transaction.read_views(belong_to_id)?; + let trash_ids = trash_controller.read_trash_ids(transaction)?; + views.retain(|view_table| !trash_ids.contains(&view_table.id)); Ok(RepeatedView { items: views }) } diff --git a/frontend/rust-lib/flowy-core/src/services/view/event_handler.rs b/frontend/rust-lib/flowy-core/src/services/view/event_handler.rs index 1357754941..fe58e14548 100644 --- a/frontend/rust-lib/flowy-core/src/services/view/event_handler.rs +++ b/frontend/rust-lib/flowy-core/src/services/view/event_handler.rs @@ -70,9 +70,9 @@ pub(crate) async fn delete_view_handler( } let trash = view_controller - .read_view_tables(params.items)? + .read_local_views(params.items)? .into_iter() - .map(|view_table| view_table.into()) + .map(|view| view.into()) .collect::>(); let _ = trash_controller.add(trash).await?; diff --git a/frontend/rust-lib/flowy-core/src/services/view/mod.rs b/frontend/rust-lib/flowy-core/src/services/view/mod.rs index c08fd82307..e854517f17 100644 --- a/frontend/rust-lib/flowy-core/src/services/view/mod.rs +++ b/frontend/rust-lib/flowy-core/src/services/view/mod.rs @@ -1,3 +1,2 @@ pub mod controller; pub mod event_handler; -mod sql; diff --git a/frontend/rust-lib/flowy-core/src/services/workspace/controller.rs b/frontend/rust-lib/flowy-core/src/services/workspace/controller.rs index 59fd24643e..bfaf723a88 100644 --- a/frontend/rust-lib/flowy-core/src/services/workspace/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/workspace/controller.rs @@ -1,20 +1,20 @@ use crate::{ dart_notification::*, errors::*, - module::{WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser}, + module::{WorkspaceCloudService, WorkspaceUser}, services::{ + persistence::{FlowyCorePersistence, FlowyCorePersistenceTransaction, WorkspaceChangeset}, read_local_workspace_apps, - workspace::sql::{WorkspaceTableChangeset, WorkspaceTableSql}, TrashController, }, }; use flowy_core_data_model::entities::{app::RepeatedApp, workspace::*}; -use flowy_database::{kv::KV, SqliteConnection}; +use flowy_database::kv::KV; use std::sync::Arc; pub struct WorkspaceController { pub user: Arc, - pub(crate) database: Arc, + persistence: Arc, pub(crate) trash_controller: Arc, cloud_service: Arc, } @@ -22,13 +22,13 @@ pub struct WorkspaceController { impl WorkspaceController { pub(crate) fn new( user: Arc, - database: Arc, + persistence: Arc, trash_can: Arc, cloud_service: Arc, ) -> Self { Self { user, - database, + persistence, trash_controller: trash_can, cloud_service, } @@ -47,49 +47,31 @@ impl WorkspaceController { pub(crate) async fn create_workspace_on_local(&self, workspace: Workspace) -> Result { let user_id = self.user.user_id()?; let token = self.user.token()?; - let conn = &*self.database.db_connection()?; - //[[immediate_transaction]] - // https://sqlite.org/lang_transaction.html - // IMMEDIATE cause the database connection to start a new write immediately, - // without waiting for a write statement. The BEGIN IMMEDIATE might fail - // with SQLITE_BUSY if another write transaction is already active on another - // database connection. - // - // EXCLUSIVE is similar to IMMEDIATE in that a write transaction is started - // immediately. EXCLUSIVE and IMMEDIATE are the same in WAL mode, but in - // other journaling modes, EXCLUSIVE prevents other database connections from - // reading the database while the transaction is underway. - conn.immediate_transaction::<_, FlowyError, _>(|| { - WorkspaceTableSql::create_workspace(&user_id, workspace.clone(), conn)?; - let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?; - send_dart_notification(&token, WorkspaceNotification::UserCreateWorkspace) - .payload(repeated_workspace) - .send(); - - Ok(()) + let workspaces = self.persistence.begin_transaction(|transaction| { + let _ = transaction.create_workspace(&user_id, workspace.clone())?; + transaction.read_workspaces(&user_id, None) })?; - + let repeated_workspace = RepeatedWorkspace { items: workspaces }; + send_dart_notification(&token, WorkspaceNotification::UserCreateWorkspace) + .payload(repeated_workspace) + .send(); set_current_workspace(&workspace.id); - Ok(workspace) } #[allow(dead_code)] pub(crate) async fn update_workspace(&self, params: UpdateWorkspaceParams) -> Result<(), FlowyError> { - let changeset = WorkspaceTableChangeset::new(params.clone()); + let changeset = WorkspaceChangeset::new(params.clone()); let workspace_id = changeset.id.clone(); - let conn = &*self.database.db_connection()?; - conn.immediate_transaction::<_, FlowyError, _>(|| { - let _ = WorkspaceTableSql::update_workspace(changeset, conn)?; + let workspace = self.persistence.begin_transaction(|transaction| { + let _ = transaction.update_workspace(changeset)?; let user_id = self.user.user_id()?; - let workspace = self.read_local_workspace(workspace_id.clone(), &user_id, conn)?; - send_dart_notification(&workspace_id, WorkspaceNotification::WorkspaceUpdated) - .payload(workspace) - .send(); - - Ok(()) + self.read_local_workspace(workspace_id.clone(), &user_id, &transaction) })?; + send_dart_notification(&workspace_id, WorkspaceNotification::WorkspaceUpdated) + .payload(workspace) + .send(); let _ = self.update_workspace_on_server(params)?; Ok(()) @@ -99,26 +81,23 @@ impl WorkspaceController { pub(crate) async fn delete_workspace(&self, workspace_id: &str) -> Result<(), FlowyError> { let user_id = self.user.user_id()?; let token = self.user.token()?; - let conn = &*self.database.db_connection()?; - conn.immediate_transaction::<_, FlowyError, _>(|| { - let _ = WorkspaceTableSql::delete_workspace(workspace_id, conn)?; - let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?; - send_dart_notification(&token, WorkspaceNotification::UserDeleteWorkspace) - .payload(repeated_workspace) - .send(); - - Ok(()) + let repeated_workspace = self.persistence.begin_transaction(|transaction| { + let _ = transaction.delete_workspace(workspace_id)?; + self.read_local_workspaces(None, &user_id, &transaction) })?; - + send_dart_notification(&token, WorkspaceNotification::UserDeleteWorkspace) + .payload(repeated_workspace) + .send(); let _ = self.delete_workspace_on_server(workspace_id)?; Ok(()) } pub(crate) async fn open_workspace(&self, params: WorkspaceId) -> Result { let user_id = self.user.user_id()?; - let conn = self.database.db_connection()?; if let Some(workspace_id) = params.workspace_id { - let workspace = self.read_local_workspace(workspace_id, &user_id, &*conn)?; + let workspace = self + .persistence + .begin_transaction(|transaction| self.read_local_workspace(workspace_id, &user_id, &transaction))?; set_current_workspace(&workspace.id); Ok(workspace) } else { @@ -128,52 +107,39 @@ impl WorkspaceController { pub(crate) async fn read_current_workspace_apps(&self) -> Result { let workspace_id = get_current_workspace()?; - let conn = self.database.db_connection()?; - let repeated_app = self.read_local_apps(&workspace_id, &*conn)?; + let repeated_app = self.persistence.begin_transaction(|transaction| { + read_local_workspace_apps(&workspace_id, self.trash_controller.clone(), &transaction) + })?; // TODO: read from server Ok(repeated_app) } - #[tracing::instrument(level = "debug", skip(self, conn), err)] - pub(crate) fn read_local_workspaces( + #[tracing::instrument(level = "debug", skip(self, transaction), err)] + pub(crate) fn read_local_workspaces<'a>( &self, workspace_id: Option, user_id: &str, - conn: &SqliteConnection, + transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a), ) -> Result { let workspace_id = workspace_id.to_owned(); - let workspace_tables = WorkspaceTableSql::read_workspaces(workspace_id, user_id, conn)?; - - let mut workspaces = vec![]; - for table in workspace_tables { - let workspace: Workspace = table.into(); - workspaces.push(workspace); - } + let workspaces = transaction.read_workspaces(user_id, workspace_id)?; Ok(RepeatedWorkspace { items: workspaces }) } - pub(crate) fn read_local_workspace( + pub(crate) fn read_local_workspace<'a>( &self, workspace_id: String, user_id: &str, - conn: &SqliteConnection, + transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a), ) -> Result { - // Opti: fetch single workspace from local db - let mut repeated_workspace = self.read_local_workspaces(Some(workspace_id.clone()), user_id, conn)?; - if repeated_workspace.is_empty() { + let mut workspaces = transaction.read_workspaces(user_id, Some(workspace_id.clone()))?; + if workspaces.is_empty() { return Err(FlowyError::record_not_found().context(format!("{} workspace not found", workspace_id))); } - - debug_assert_eq!(repeated_workspace.len(), 1); - let workspace = repeated_workspace.drain(..1).collect::>().pop().unwrap(); + debug_assert_eq!(workspaces.len(), 1); + let workspace = workspaces.drain(..1).collect::>().pop().unwrap(); Ok(workspace) } - - #[tracing::instrument(level = "debug", skip(self, conn), err)] - fn read_local_apps(&self, workspace_id: &str, conn: &SqliteConnection) -> Result { - let repeated_app = read_local_workspace_apps(workspace_id, self.trash_controller.clone(), conn)?; - Ok(repeated_app) - } } impl WorkspaceController { diff --git a/frontend/rust-lib/flowy-core/src/services/workspace/event_handler.rs b/frontend/rust-lib/flowy-core/src/services/workspace/event_handler.rs index 2086c64ea9..2d75049f21 100644 --- a/frontend/rust-lib/flowy-core/src/services/workspace/event_handler.rs +++ b/frontend/rust-lib/flowy-core/src/services/workspace/event_handler.rs @@ -2,18 +2,14 @@ use crate::{ context::CoreContext, dart_notification::{send_dart_notification, WorkspaceNotification}, errors::FlowyError, - services::{ - get_current_workspace, - read_local_workspace_apps, - workspace::sql::WorkspaceTableSql, - WorkspaceController, - }, + services::{get_current_workspace, read_local_workspace_apps, WorkspaceController}, }; use flowy_core_data_model::entities::{ app::RepeatedApp, view::View, workspace::{CurrentWorkspaceSetting, QueryWorkspaceRequest, RepeatedWorkspace, WorkspaceId, *}, }; +use flowy_error::FlowyResult; use lib_dispatch::prelude::{data_result, Data, DataResult, Unit}; use std::{convert::TryInto, sync::Arc}; @@ -53,21 +49,19 @@ pub(crate) async fn read_workspaces_handler( ) -> DataResult { let params: WorkspaceId = data.into_inner().try_into()?; let user_id = core.user.user_id()?; - let conn = &*core.database.db_connection()?; let workspace_controller = core.workspace_controller.clone(); let trash_controller = core.trash_controller.clone(); - let workspaces = conn.immediate_transaction::<_, FlowyError, _>(|| { - let mut workspaces = workspace_controller.read_local_workspaces(params.workspace_id.clone(), &user_id, conn)?; + let workspaces = core.persistence.begin_transaction(|transaction| { + let mut workspaces = + workspace_controller.read_local_workspaces(params.workspace_id.clone(), &user_id, &transaction)?; for workspace in workspaces.iter_mut() { - let apps = read_local_workspace_apps(&workspace.id, trash_controller.clone(), conn)?.into_inner(); + let apps = read_local_workspace_apps(&workspace.id, trash_controller.clone(), &transaction)?.into_inner(); workspace.apps.items = apps; } Ok(workspaces) })?; - let _ = read_workspaces_on_server(core, user_id, params); - data_result(workspaces) } @@ -80,10 +74,11 @@ pub async fn read_cur_workspace_handler( let params = WorkspaceId { workspace_id: Some(workspace_id.clone()), }; - let conn = &*core.database.db_connection()?; - let workspace = core - .workspace_controller - .read_local_workspace(workspace_id, &user_id, conn)?; + + let workspace = core.persistence.begin_transaction(|transaction| { + core.workspace_controller + .read_local_workspace(workspace_id, &user_id, &transaction) + })?; let latest_view: Option = core.view_controller.latest_visit_view().unwrap_or(None); let setting = CurrentWorkspaceSetting { workspace, latest_view }; @@ -98,30 +93,29 @@ fn read_workspaces_on_server( params: WorkspaceId, ) -> Result<(), FlowyError> { let (token, server) = (core.user.token()?, core.cloud_service.clone()); - let app_ctrl = core.app_controller.clone(); - let view_ctrl = core.view_controller.clone(); - let conn = core.database.db_connection()?; + let _app_ctrl = core.app_controller.clone(); + let _view_ctrl = core.view_controller.clone(); + let persistence = core.persistence.clone(); tokio::spawn(async move { - // Opti: handle the error and retry? let workspaces = server.read_workspace(&token, params).await?; - let _ = (&*conn).immediate_transaction::<_, FlowyError, _>(|| { + let _ = persistence.begin_transaction(|transaction| { tracing::debug!("Save {} workspace", workspaces.len()); for workspace in &workspaces.items { let m_workspace = workspace.clone(); let apps = m_workspace.apps.clone().into_inner(); - let _ = WorkspaceTableSql::create_workspace(&user_id, m_workspace, &*conn)?; + let _ = transaction.create_workspace(&user_id, m_workspace)?; tracing::debug!("Save {} apps", apps.len()); for app in apps { let views = app.belongings.clone().into_inner(); - match app_ctrl.save_app(app, &*conn) { + match transaction.create_app(app) { Ok(_) => {}, Err(e) => log::error!("create app failed: {:?}", e), } tracing::debug!("Save {} views", views.len()); for view in views { - match view_ctrl.save_view(view, &*conn) { + match transaction.create_view(view) { Ok(_) => {}, Err(e) => log::error!("create view failed: {:?}", e), } diff --git a/frontend/rust-lib/flowy-core/src/services/workspace/mod.rs b/frontend/rust-lib/flowy-core/src/services/workspace/mod.rs index 8bf3cc04ff..e854517f17 100644 --- a/frontend/rust-lib/flowy-core/src/services/workspace/mod.rs +++ b/frontend/rust-lib/flowy-core/src/services/workspace/mod.rs @@ -1,3 +1,2 @@ pub mod controller; pub mod event_handler; -pub(crate) mod sql; diff --git a/shared-lib/flowy-core-data-model/src/entities/trash.rs b/shared-lib/flowy-core-data-model/src/entities/trash.rs index c0c8e2d208..2321464348 100644 --- a/shared-lib/flowy-core-data-model/src/entities/trash.rs +++ b/shared-lib/flowy-core-data-model/src/entities/trash.rs @@ -1,4 +1,4 @@ -use crate::impl_def_and_def_mut; +use crate::{entities::app::App, impl_def_and_def_mut}; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; use std::fmt::Formatter; @@ -123,3 +123,15 @@ pub struct RepeatedTrash { } impl_def_and_def_mut!(RepeatedTrash, Trash); + +impl std::convert::From for Trash { + fn from(app: App) -> Self { + Trash { + id: app.id, + name: app.name, + modified_time: app.modified_time, + create_time: app.create_time, + ty: TrashType::App, + } + } +}