diff --git a/backend/src/middleware/cors_middleware.rs b/backend/src/middleware/cors_middleware.rs index 4b2484c617..516c16c9df 100644 --- a/backend/src/middleware/cors_middleware.rs +++ b/backend/src/middleware/cors_middleware.rs @@ -1,6 +1,5 @@ use actix_cors::Cors; use actix_web::http; -use flowy_net::config::HEADER_TOKEN; // https://javascript.info/fetch-crossorigin#cors-for-safe-requests // https://docs.rs/actix-cors/0.5.4/actix_cors/index.html diff --git a/rust-lib/flowy-database/src/lib.rs b/rust-lib/flowy-database/src/lib.rs index d3049f89d5..d0467df862 100644 --- a/rust-lib/flowy-database/src/lib.rs +++ b/rust-lib/flowy-database/src/lib.rs @@ -15,9 +15,10 @@ pub use diesel_derives::*; extern crate diesel_migrations; pub use flowy_sqlite::{DBConnection, Database}; +pub type Error = diesel::result::Error; use diesel_migrations::*; -use flowy_sqlite::{Error, PoolConfig}; +use flowy_sqlite::PoolConfig; use std::{fmt::Debug, io, path::Path}; pub mod prelude { @@ -41,7 +42,7 @@ pub fn init(storage_path: &str) -> Result { fn as_io_error(e: E) -> io::Error where - E: Into + Debug, + E: Into + Debug, { let msg = format!("{:?}", e); io::Error::new(io::ErrorKind::NotConnected, msg) diff --git a/rust-lib/flowy-dispatch/src/errors/errors.rs b/rust-lib/flowy-dispatch/src/errors/errors.rs index 7be1d77356..5cf674a281 100644 --- a/rust-lib/flowy-dispatch/src/errors/errors.rs +++ b/rust-lib/flowy-dispatch/src/errors/errors.rs @@ -17,11 +17,7 @@ pub trait Error: fmt::Debug + DynClone + Send + Sync { dyn_clone::clone_trait_object!(Error); impl From for DispatchError { - fn from(err: T) -> DispatchError { - DispatchError { - inner: Box::new(err), - } - } + fn from(err: T) -> DispatchError { DispatchError { inner: Box::new(err) } } } #[derive(Clone)] @@ -48,9 +44,7 @@ impl std::error::Error for DispatchError { } impl From> for DispatchError { - fn from(err: SendError) -> Self { - InternalError::Other(format!("{}", err)).into() - } + fn from(err: SendError) -> Self { InternalError::Other(format!("{}", err)).into() } } impl From for DispatchError { @@ -59,9 +53,7 @@ impl From for DispatchError { #[cfg(feature = "use_protobuf")] impl From for DispatchError { - fn from(e: protobuf::ProtobufError) -> Self { - InternalError::ProtobufError(format!("{:?}", e)).into() - } + fn from(e: protobuf::ProtobufError) -> Self { InternalError::ProtobufError(format!("{:?}", e)).into() } } impl FromBytes for DispatchError { @@ -90,7 +82,6 @@ pub(crate) enum InternalError { UnexpectedNone(String), DeserializeFromBytes(String), JoinError(String), - Lock(String), ServiceNotFound(String), HandleNotFound(String), Other(String), @@ -103,7 +94,6 @@ impl fmt::Display for InternalError { InternalError::UnexpectedNone(s) => fmt::Display::fmt(&s, f), InternalError::DeserializeFromBytes(s) => fmt::Display::fmt(&s, f), InternalError::JoinError(s) => fmt::Display::fmt(&s, f), - InternalError::Lock(s) => fmt::Display::fmt(&s, f), InternalError::ServiceNotFound(s) => fmt::Display::fmt(&s, f), InternalError::HandleNotFound(s) => fmt::Display::fmt(&s, f), InternalError::Other(s) => fmt::Display::fmt(&s, f), diff --git a/rust-lib/flowy-dispatch/src/response/response.rs b/rust-lib/flowy-dispatch/src/response/response.rs index 2f3f2e7a61..6892ced68f 100644 --- a/rust-lib/flowy-dispatch/src/response/response.rs +++ b/rust-lib/flowy-dispatch/src/response/response.rs @@ -47,8 +47,6 @@ impl EventResponse { }, } } - - pub(crate) fn is_success(&self) -> bool { self.status_code == StatusCode::Ok } } impl std::fmt::Display for EventResponse { diff --git a/rust-lib/flowy-document/src/errors.rs b/rust-lib/flowy-document/src/errors.rs index 6f682c0f8a..08f374bace 100644 --- a/rust-lib/flowy-document/src/errors.rs +++ b/rust-lib/flowy-document/src/errors.rs @@ -15,12 +15,7 @@ pub struct DocError { } impl DocError { - fn new(code: DocErrorCode, msg: &str) -> Self { - Self { - code, - msg: msg.to_owned(), - } - } + fn new(code: DocErrorCode, msg: &str) -> Self { Self { code, msg: msg.to_owned() } } } #[derive(Debug, Clone, ProtoBuf_Enum, Display, PartialEq, Eq)] @@ -57,20 +52,12 @@ impl std::default::Default for DocErrorCode { fn default() -> Self { DocErrorCode::Unknown } } -impl std::convert::From for DocError { - fn from(error: flowy_database::result::Error) -> Self { - ErrorBuilder::new(DocErrorCode::EditorDBInternalError) - .error(error) - .build() - } +impl std::convert::From for DocError { + fn from(error: flowy_database::Error) -> Self { ErrorBuilder::new(DocErrorCode::EditorDBInternalError).error(error).build() } } impl std::convert::From for DocError { - fn from(error: FileError) -> Self { - ErrorBuilder::new(DocErrorCode::DocOpenFileError) - .error(error) - .build() - } + fn from(error: FileError) -> Self { ErrorBuilder::new(DocErrorCode::DocOpenFileError).error(error).build() } } impl flowy_dispatch::Error for DocError { diff --git a/rust-lib/flowy-user/src/errors.rs b/rust-lib/flowy-user/src/errors.rs index db149b7ed8..d3c54f9a0e 100644 --- a/rust-lib/flowy-user/src/errors.rs +++ b/rust-lib/flowy-user/src/errors.rs @@ -98,8 +98,8 @@ impl std::default::Default for ErrorCode { fn default() -> Self { ErrorCode::Unknown } } -impl std::convert::From for UserError { - fn from(error: flowy_database::result::Error) -> Self { ErrorBuilder::new(ErrorCode::UserDatabaseInternalError).error(error).build() } +impl std::convert::From for UserError { + fn from(error: flowy_database::Error) -> Self { ErrorBuilder::new(ErrorCode::UserDatabaseInternalError).error(error).build() } } impl std::convert::From<::r2d2::Error> for UserError { diff --git a/rust-lib/flowy-workspace/src/errors.rs b/rust-lib/flowy-workspace/src/errors.rs index 52e8a192c0..c565294eee 100644 --- a/rust-lib/flowy-workspace/src/errors.rs +++ b/rust-lib/flowy-workspace/src/errors.rs @@ -91,8 +91,8 @@ impl std::convert::From for WorkspaceError { } } -impl std::convert::From for WorkspaceError { - fn from(error: flowy_database::result::Error) -> Self { ErrorBuilder::new(ErrorCode::WorkspaceDatabaseError).error(error).build() } +impl std::convert::From for WorkspaceError { + fn from(error: flowy_database::Error) -> Self { ErrorBuilder::new(ErrorCode::WorkspaceDatabaseError).error(error).build() } } impl flowy_dispatch::Error for WorkspaceError { diff --git a/rust-lib/flowy-workspace/src/handlers/app_handler.rs b/rust-lib/flowy-workspace/src/handlers/app_handler.rs index c6bfa601fb..54251ddc71 100644 --- a/rust-lib/flowy-workspace/src/handlers/app_handler.rs +++ b/rust-lib/flowy-workspace/src/handlers/app_handler.rs @@ -1,17 +1,14 @@ use crate::{ - entities::{ - app::{ - App, - CreateAppParams, - CreateAppRequest, - DeleteAppParams, - DeleteAppRequest, - QueryAppParams, - QueryAppRequest, - UpdateAppParams, - UpdateAppRequest, - }, - view::RepeatedView, + entities::app::{ + App, + CreateAppParams, + CreateAppRequest, + DeleteAppParams, + DeleteAppRequest, + QueryAppParams, + QueryAppRequest, + UpdateAppParams, + UpdateAppRequest, }, errors::WorkspaceError, services::{AppController, ViewController}, diff --git a/rust-lib/flowy-workspace/src/handlers/view_handler.rs b/rust-lib/flowy-workspace/src/handlers/view_handler.rs index 20547c91d5..cb434198b2 100644 --- a/rust-lib/flowy-workspace/src/handlers/view_handler.rs +++ b/rust-lib/flowy-workspace/src/handlers/view_handler.rs @@ -6,7 +6,6 @@ use crate::{ DeleteViewRequest, QueryViewParams, QueryViewRequest, - RepeatedView, UpdateViewParams, UpdateViewRequest, View, diff --git a/rust-lib/flowy-workspace/src/module.rs b/rust-lib/flowy-workspace/src/module.rs index f731581697..b30e3b75ce 100644 --- a/rust-lib/flowy-workspace/src/module.rs +++ b/rust-lib/flowy-workspace/src/module.rs @@ -28,12 +28,7 @@ pub fn create(user: Arc, database: Arc let server = construct_workspace_server(); let view_controller = Arc::new(ViewController::new(user.clone(), database.clone(), server.clone())); - let app_controller = Arc::new(AppController::new( - user.clone(), - database.clone(), - view_controller.clone(), - server.clone(), - )); + let app_controller = Arc::new(AppController::new(user.clone(), database.clone(), server.clone())); let workspace_controller = Arc::new(WorkspaceController::new( user.clone(), diff --git a/rust-lib/flowy-workspace/src/services/app_controller.rs b/rust-lib/flowy-workspace/src/services/app_controller.rs index e057992a36..d475537567 100644 --- a/rust-lib/flowy-workspace/src/services/app_controller.rs +++ b/rust-lib/flowy-workspace/src/services/app_controller.rs @@ -3,33 +3,27 @@ use crate::{ errors::*, module::{WorkspaceDatabase, WorkspaceUser}, observable::*, - services::{helper::spawn, server::Server, ViewController}, + services::{helper::spawn, server::Server}, sql_tables::app::{AppTable, AppTableChangeset, AppTableSql}, }; -use crate::entities::view::RepeatedView; +use flowy_database::SqliteConnection; use std::sync::Arc; pub(crate) struct AppController { user: Arc, sql: Arc, - #[allow(dead_code)] - view_controller: Arc, + database: Arc, server: Server, } impl AppController { - pub(crate) fn new( - user: Arc, - database: Arc, - view_controller: Arc, - server: Server, - ) -> Self { - let sql = Arc::new(AppTableSql::new(database)); + pub(crate) fn new(user: Arc, database: Arc, server: Server) -> Self { + let sql = Arc::new(AppTableSql {}); Self { user, sql, - view_controller, + database, server, } } @@ -38,10 +32,11 @@ impl AppController { pub(crate) async fn create_app(&self, params: CreateAppParams) -> Result { let app = self.create_app_on_server(params).await?; let app_table = AppTable::new(app.clone()); - let _ = self.sql.create_app(app_table)?; + let conn = self.database.db_connection()?; + let _ = self.sql.create_app(app_table, &*conn)?; // Opti: transaction - let apps = self.read_local_apps(&app.workspace_id)?; + let apps = self.read_local_apps(&app.workspace_id, &*conn)?; ObservableBuilder::new(&app.workspace_id, WorkspaceObservable::WorkspaceCreateApp) .payload(apps) .build(); @@ -49,36 +44,40 @@ impl AppController { } pub(crate) async fn read_app(&self, params: QueryAppParams) -> Result { - let app_table = self.sql.read_app(¶ms.app_id, params.is_trash)?; + let app_table = self + .sql + .read_app(¶ms.app_id, params.is_trash, &*self.database.db_connection()?)?; let _ = self.read_app_on_server(params).await?; Ok(app_table.into()) } pub(crate) async fn delete_app(&self, app_id: &str) -> Result<(), WorkspaceError> { - let app = self.sql.delete_app(app_id)?; - let _ = self.delete_app_on_server(app_id).await?; + let _ = self.delete_app_on_server(app_id); + let conn = self.database.db_connection()?; + let app = self.sql.delete_app(app_id, &*conn)?; // Opti: transaction - let apps = self.read_local_apps(&app.workspace_id)?; + let apps = self.read_local_apps(&app.workspace_id, &*conn)?; ObservableBuilder::new(&app.workspace_id, WorkspaceObservable::WorkspaceDeleteApp) .payload(apps) .build(); Ok(()) } - fn read_local_apps(&self, workspace_id: &str) -> Result { - let app_tables = self.sql.read_apps(workspace_id, false)?; + fn read_local_apps(&self, workspace_id: &str, conn: &SqliteConnection) -> Result { + let app_tables = self.sql.read_apps(workspace_id, false, conn)?; let apps = app_tables.into_iter().map(|table| table.into()).collect::>(); Ok(RepeatedApp { items: apps }) } pub(crate) async fn update_app(&self, params: UpdateAppParams) -> Result<(), WorkspaceError> { - let changeset = AppTableChangeset::new(params.clone()); - let app_id = changeset.id.clone(); - let _ = self.sql.update_app(changeset)?; - let _ = self.update_app_on_server(params).await?; + let _ = self.update_app_on_server(params.clone()).await?; - let app: App = self.sql.read_app(&app_id, false)?.into(); + let changeset = AppTableChangeset::new(params); + let app_id = changeset.id.clone(); + let conn = self.database.db_connection()?; + let _ = self.sql.update_app(changeset, &*conn)?; + let app: App = self.sql.read_app(&app_id, false, &*conn)?.into(); ObservableBuilder::new(&app_id, WorkspaceObservable::AppUpdated) .payload(app) .build(); diff --git a/rust-lib/flowy-workspace/src/services/view_controller.rs b/rust-lib/flowy-workspace/src/services/view_controller.rs index a7d1243477..b58e0bdf2b 100644 --- a/rust-lib/flowy-workspace/src/services/view_controller.rs +++ b/rust-lib/flowy-workspace/src/services/view_controller.rs @@ -12,73 +12,93 @@ use crate::{ module::WorkspaceUser, observable::ObservableBuilder, }; +use flowy_database::SqliteConnection; use std::sync::Arc; pub(crate) struct ViewController { user: Arc, sql: Arc, server: Server, + database: Arc, } impl ViewController { pub(crate) fn new(user: Arc, database: Arc, server: Server) -> Self { - let sql = Arc::new(ViewTableSql { database }); - Self { user, sql, server } + let sql = Arc::new(ViewTableSql {}); + Self { + user, + sql, + server, + database, + } } pub(crate) async fn create_view(&self, params: CreateViewParams) -> Result { let view = self.create_view_on_server(params).await?; + let conn = &*self.database.db_connection()?; let view_table = ViewTable::new(view.clone()); - let _ = self.sql.create_view(view_table)?; - let repeated_view = self.read_local_views_belong_to(&view.belong_to_id)?; - ObservableBuilder::new(&view.belong_to_id, WorkspaceObservable::AppCreateView) - .payload(repeated_view) - .build(); + (conn).immediate_transaction::<_, WorkspaceError, _>(|| { + let _ = self.sql.create_view(view_table, conn)?; + let repeated_view = self.read_local_views_belong_to(&view.belong_to_id, conn)?; + ObservableBuilder::new(&view.belong_to_id, WorkspaceObservable::AppCreateView) + .payload(repeated_view) + .build(); + Ok(()) + })?; + Ok(view) } pub(crate) async fn read_view(&self, params: QueryViewParams) -> Result { - let view_table = self.sql.read_view(¶ms.view_id, params.is_trash)?; + let conn = self.database.db_connection()?; + let view_table = self.sql.read_view(¶ms.view_id, Some(params.is_trash), &*conn)?; let view: View = view_table.into(); - let _ = self.read_view_on_server(params).await?; + let _ = self.read_view_on_server(params); Ok(view) } pub(crate) async fn delete_view(&self, view_id: &str) -> Result<(), WorkspaceError> { - let view_table = self.sql.delete_view(view_id)?; - let _ = self.delete_view_on_server(view_id).await?; + let conn = &*self.database.db_connection()?; + + (conn).immediate_transaction::<_, WorkspaceError, _>(|| { + let view_table = self.sql.delete_view(view_id, conn)?; + let repeated_view = self.read_local_views_belong_to(&view_table.belong_to_id, conn)?; + ObservableBuilder::new(&view_table.belong_to_id, WorkspaceObservable::AppDeleteView) + .payload(repeated_view) + .build(); + Ok(()) + })?; + + let _ = self.delete_view_on_server(view_id); - let repeated_view = self.read_local_views_belong_to(&view_table.belong_to_id)?; - ObservableBuilder::new(&view_table.belong_to_id, WorkspaceObservable::AppDeleteView) - .payload(repeated_view) - .build(); Ok(()) } // belong_to_id will be the app_id or view_id. + #[tracing::instrument(level = "debug", skip(self), err)] pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result { // TODO: read from server - let views = self - .sql - .read_views_belong_to(belong_to_id)? - .into_iter() - .map(|view_table| view_table.into()) - .collect::>(); - - Ok(RepeatedView { items: views }) + let conn = self.database.db_connection()?; + let repeated_view = self.read_local_views_belong_to(belong_to_id, &*conn)?; + Ok(repeated_view) } pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<(), WorkspaceError> { + let conn = &*self.database.db_connection()?; let changeset = ViewTableChangeset::new(params.clone()); let view_id = changeset.id.clone(); - let _ = self.sql.update_view(changeset)?; - let _ = self.update_view_on_server(params).await?; - let view: View = self.sql.read_view(&view_id, false)?.into(); - ObservableBuilder::new(&view_id, WorkspaceObservable::ViewUpdated) - .payload(view) - .build(); + (conn).immediate_transaction::<_, WorkspaceError, _>(|| { + let _ = self.sql.update_view(changeset, conn)?; + let view: View = self.sql.read_view(&view_id, None, conn)?.into(); + ObservableBuilder::new(&view_id, WorkspaceObservable::ViewUpdated) + .payload(view) + .build(); + Ok(()) + })?; + + let _ = self.update_view_on_server(params); Ok(()) } } @@ -143,10 +163,10 @@ impl ViewController { } // belong_to_id will be the app_id or view_id. - fn read_local_views_belong_to(&self, belong_to_id: &str) -> Result { + fn read_local_views_belong_to(&self, belong_to_id: &str, conn: &SqliteConnection) -> Result { let views = self .sql - .read_views_belong_to(belong_to_id)? + .read_views_belong_to(belong_to_id, conn)? .into_iter() .map(|view_table| view_table.into()) .collect::>(); diff --git a/rust-lib/flowy-workspace/src/services/workspace_controller.rs b/rust-lib/flowy-workspace/src/services/workspace_controller.rs index 74373e73a4..e7e08b59b2 100644 --- a/rust-lib/flowy-workspace/src/services/workspace_controller.rs +++ b/rust-lib/flowy-workspace/src/services/workspace_controller.rs @@ -4,17 +4,20 @@ use crate::{ module::{WorkspaceDatabase, WorkspaceUser}, observable::WorkspaceObservable, services::{helper::spawn, server::Server, AppController}, - sql_tables::workspace::{WorkspaceSql, WorkspaceTable, WorkspaceTableChangeset}, + sql_tables::workspace::{WorkspaceTable, WorkspaceTableChangeset, WorkspaceTableSql}, }; use flowy_infra::kv::KV; use crate::{entities::app::RepeatedApp, observable::ObservableBuilder}; +use flowy_database::SqliteConnection; use std::sync::Arc; pub(crate) struct WorkspaceController { pub user: Arc, - pub sql: Arc, + pub workspace_sql: Arc, + // pub app_sql: Arc, + pub database: Arc, pub app_controller: Arc, server: Server, } @@ -26,10 +29,11 @@ impl WorkspaceController { app_controller: Arc, server: Server, ) -> Self { - let sql = Arc::new(WorkspaceSql::new(database)); + let sql = Arc::new(WorkspaceTableSql {}); Self { user, - sql, + workspace_sql: sql, + database, app_controller, server, } @@ -39,48 +43,73 @@ impl WorkspaceController { let workspace = self.create_workspace_on_server(params.clone()).await?; let user_id = self.user.user_id()?; let workspace_table = WorkspaceTable::new(workspace.clone(), &user_id); - let _ = self.sql.create_workspace(workspace_table)?; + let conn = &*self.database.db_connection()?; + //[[immediate_transaction]] + // https://sqlite.org/lang_transaction.html + // IMMEDIATE cause the database connection to start a new write immediately, + // without waiting for a write statement. The BEGIN IMMEDIATE might fail + // with SQLITE_BUSY if another write transaction is already active on another + // database connection. + // + // EXCLUSIVE is similar to IMMEDIATE in that a write transaction is started + // immediately. EXCLUSIVE and IMMEDIATE are the same in WAL mode, but in + // other journaling modes, EXCLUSIVE prevents other database connections from + // reading the database while the transaction is underway. + (conn).immediate_transaction::<_, WorkspaceError, _>(|| { + self.workspace_sql.create_workspace(workspace_table, conn)?; + let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?; + ObservableBuilder::new(&user_id, WorkspaceObservable::UserCreateWorkspace) + .payload(repeated_workspace) + .build(); + + Ok(()) + })?; - // Opti: read all local workspaces may cause performance issues - let repeated_workspace = self.read_local_workspaces(None, &user_id)?; - ObservableBuilder::new(&user_id, WorkspaceObservable::UserCreateWorkspace) - .payload(repeated_workspace) - .build(); Ok(workspace) } pub(crate) async fn update_workspace(&self, params: UpdateWorkspaceParams) -> Result<(), WorkspaceError> { - let changeset = WorkspaceTableChangeset::new(params.clone()); - let workspace_id = changeset.id.clone(); - let _ = self.sql.update_workspace(changeset)?; - let _ = self.update_workspace_on_server(params).await?; + let _ = self.update_workspace_on_server(params.clone()).await?; + + let changeset = WorkspaceTableChangeset::new(params); + let workspace_id = changeset.id.clone(); + let conn = &*self.database.db_connection()?; + (conn).immediate_transaction::<_, WorkspaceError, _>(|| { + let _ = self.workspace_sql.update_workspace(changeset, conn)?; + let user_id = self.user.user_id()?; + let workspace = self.read_local_workspace(workspace_id.clone(), &user_id, conn)?; + ObservableBuilder::new(&workspace_id, WorkspaceObservable::WorkspaceUpdated) + .payload(workspace) + .build(); + + Ok(()) + })?; - // Opti: transaction - let user_id = self.user.user_id()?; - let workspace = self.read_local_workspace(workspace_id.clone(), &user_id)?; - ObservableBuilder::new(&workspace_id, WorkspaceObservable::WorkspaceUpdated) - .payload(workspace) - .build(); Ok(()) } pub(crate) async fn delete_workspace(&self, workspace_id: &str) -> Result<(), WorkspaceError> { let user_id = self.user.user_id()?; - let _ = self.sql.delete_workspace(workspace_id)?; let _ = self.delete_workspace_on_server(workspace_id).await?; + let conn = &*self.database.db_connection()?; + (conn).immediate_transaction::<_, WorkspaceError, _>(|| { + let _ = self.workspace_sql.delete_workspace(workspace_id, conn)?; + let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?; + ObservableBuilder::new(&user_id, WorkspaceObservable::UserDeleteWorkspace) + .payload(repeated_workspace) + .build(); + + Ok(()) + })?; - // Opti: read all local workspaces may cause performance issues - let repeated_workspace = self.read_local_workspaces(None, &user_id)?; - ObservableBuilder::new(&user_id, WorkspaceObservable::UserDeleteWorkspace) - .payload(repeated_workspace) - .build(); Ok(()) } pub(crate) async fn open_workspace(&self, params: QueryWorkspaceParams) -> Result { let user_id = self.user.user_id()?; + let conn = self.database.db_connection()?; if let Some(workspace_id) = params.workspace_id.clone() { - let workspace = self.read_local_workspace(workspace_id, &user_id)?; + let workspace = self.read_local_workspace(workspace_id, &user_id, &*conn)?; set_current_workspace(&workspace.id); Ok(workspace) } else { @@ -92,34 +121,47 @@ impl WorkspaceController { pub(crate) async fn read_workspaces(&self, params: QueryWorkspaceParams) -> Result { let user_id = self.user.user_id()?; - let workspaces = self.read_local_workspaces(params.workspace_id.clone(), &user_id)?; - let _ = self.read_workspaces_on_server(user_id, params).await?; + let _ = self.read_workspaces_on_server(user_id.clone(), params.clone()).await; + + let conn = self.database.db_connection()?; + let workspaces = self.read_local_workspaces(params.workspace_id.clone(), &user_id, &*conn)?; Ok(workspaces) } pub(crate) async fn read_cur_workspace(&self) -> Result { let workspace_id = get_current_workspace()?; let user_id = self.user.user_id()?; - let workspace = self.read_local_workspace(workspace_id, &user_id)?; + let params = QueryWorkspaceParams { + workspace_id: Some(workspace_id.clone()), + }; + let _ = self.read_workspaces_on_server(user_id.clone(), params).await?; + + let conn = self.database.db_connection()?; + let workspace = self.read_local_workspace(workspace_id, &user_id, &*conn)?; Ok(workspace) } pub(crate) async fn read_workspace_apps(&self) -> Result { let workspace_id = get_current_workspace()?; - let apps = self.read_local_apps(&workspace_id)?; + let conn = self.database.db_connection()?; + let apps = self.read_local_apps(&workspace_id, &*conn)?; // TODO: read from server Ok(RepeatedApp { items: apps }) } - #[tracing::instrument(level = "debug", skip(self), err)] - fn read_local_workspaces(&self, workspace_id: Option, user_id: &str) -> Result { - let sql = self.sql.clone(); + #[tracing::instrument(level = "debug", skip(self, conn), err)] + fn read_local_workspaces( + &self, + workspace_id: Option, + user_id: &str, + conn: &SqliteConnection, + ) -> Result { let workspace_id = workspace_id.to_owned(); - let workspace_tables = sql.read_workspaces(workspace_id, user_id)?; + let workspace_tables = self.workspace_sql.read_workspaces(workspace_id, user_id, conn)?; let mut workspaces = vec![]; for table in workspace_tables { - let apps = self.read_local_apps(&table.id)?; + let apps = self.read_local_apps(&table.id, conn)?; let mut workspace: Workspace = table.into(); workspace.apps.items = apps; workspaces.push(workspace); @@ -127,9 +169,9 @@ impl WorkspaceController { Ok(RepeatedWorkspace { items: workspaces }) } - fn read_local_workspace(&self, workspace_id: String, user_id: &str) -> Result { + fn read_local_workspace(&self, workspace_id: String, user_id: &str, conn: &SqliteConnection) -> Result { // Opti: fetch single workspace from local db - let mut repeated_workspace = self.read_local_workspaces(Some(workspace_id), user_id)?; + let mut repeated_workspace = self.read_local_workspaces(Some(workspace_id), user_id, conn)?; if repeated_workspace.is_empty() { return Err(ErrorBuilder::new(ErrorCode::RecordNotFound).build()); } @@ -139,11 +181,11 @@ impl WorkspaceController { Ok(workspace) } - #[tracing::instrument(level = "debug", skip(self), err)] - fn read_local_apps(&self, workspace_id: &str) -> Result, WorkspaceError> { + #[tracing::instrument(level = "debug", skip(self, conn), err)] + fn read_local_apps(&self, workspace_id: &str, conn: &SqliteConnection) -> Result, WorkspaceError> { let apps = self - .sql - .read_apps_belong_to_workspace(workspace_id)? + .workspace_sql + .read_apps_belong_to_workspace(workspace_id, conn)? .into_iter() .map(|app_table| app_table.into()) .collect::>(); @@ -202,20 +244,36 @@ impl WorkspaceController { #[tracing::instrument(skip(self), err)] async fn read_workspaces_on_server(&self, user_id: String, params: QueryWorkspaceParams) -> Result<(), WorkspaceError> { let (token, server) = self.token_with_server()?; - let sql = self.sql.clone(); - let conn = self.sql.get_db_conn()?; + let sql = self.workspace_sql.clone(); + let conn = self.database.db_connection()?; spawn(async move { // Opti: retry? let workspaces = server.read_workspace(&token, params).await?; let _ = (&*conn).immediate_transaction::<_, WorkspaceError, _>(|| { for workspace in &workspaces.items { let mut m_workspace = workspace.clone(); - let repeated_app = m_workspace.apps.take_items(); + let apps = m_workspace.apps.take_items(); let workspace_table = WorkspaceTable::new(m_workspace, &user_id); - log::debug!("Save workspace: {} to disk", &workspace.id); - let _ = sql.create_workspace_with(workspace_table, &*conn)?; - log::debug!("Save workspace: {} apps to disk", &workspace.id); - let _ = sql.create_apps(repeated_app, &*conn)?; + log::debug!("Save workspace"); + let _ = sql.create_workspace(workspace_table, &*conn)?; + log::debug!("Save apps"); + + for mut app in apps { + let views = app.belongings.take_items(); + // let _ = sql.create_apps(vec![app], &*conn)?; + + // pub(crate) fn create_apps(&self, apps: Vec, conn: &SqliteConnection) -> + // Result<(), WorkspaceError> { for app in apps { + // let _ = self.app_sql.create_app_with(AppTable::new(app), conn)?; + // } + // Ok(()) + // } + + log::debug!("Save views"); + for _view in views { + // + } + } } Ok(()) })?; diff --git a/rust-lib/flowy-workspace/src/sql_tables/app/app_sql.rs b/rust-lib/flowy-workspace/src/sql_tables/app/app_sql.rs index 37d2346495..d477a86a2f 100644 --- a/rust-lib/flowy-workspace/src/sql_tables/app/app_sql.rs +++ b/rust-lib/flowy-workspace/src/sql_tables/app/app_sql.rs @@ -1,6 +1,5 @@ use crate::{ errors::WorkspaceError, - module::WorkspaceDatabase, sql_tables::app::{AppTable, AppTableChangeset}, }; use flowy_database::{ @@ -8,24 +7,11 @@ use flowy_database::{ schema::{app_table, app_table::dsl}, SqliteConnection, }; -use std::sync::Arc; -pub struct AppTableSql { - database: Arc, -} +pub struct AppTableSql {} impl AppTableSql { - pub fn new(database: Arc) -> Self { Self { database } } -} - -impl AppTableSql { - pub(crate) fn create_app(&self, app_table: AppTable) -> Result<(), WorkspaceError> { - let conn = self.database.db_connection()?; - let _ = self.create_app_with(app_table, &*conn)?; - Ok(()) - } - - pub(crate) fn create_app_with(&self, app_table: AppTable, conn: &SqliteConnection) -> Result<(), WorkspaceError> { + pub(crate) fn create_app(&self, app_table: AppTable, conn: &SqliteConnection) -> Result<(), WorkspaceError> { match diesel_record_count!(app_table, &app_table.id, conn) { 0 => diesel_insert_table!(app_table, &app_table, conn), _ => { @@ -36,36 +22,31 @@ impl AppTableSql { Ok(()) } - pub(crate) fn update_app(&self, changeset: AppTableChangeset) -> Result<(), WorkspaceError> { - let conn = self.database.db_connection()?; - diesel_update_table!(app_table, changeset, &*conn); + pub(crate) fn update_app(&self, changeset: AppTableChangeset, conn: &SqliteConnection) -> Result<(), WorkspaceError> { + diesel_update_table!(app_table, changeset, conn); Ok(()) } - pub(crate) fn read_app(&self, app_id: &str, is_trash: bool) -> Result { + pub(crate) fn read_app(&self, app_id: &str, is_trash: bool, conn: &SqliteConnection) -> Result { let app_table = dsl::app_table .filter(app_table::id.eq(app_id)) .filter(app_table::is_trash.eq(is_trash)) - .first::(&*(self.database.db_connection()?))?; + .first::(conn)?; Ok(app_table) } - pub(crate) fn read_apps(&self, workspace_id: &str, is_trash: bool) -> Result, WorkspaceError> { + pub(crate) fn read_apps(&self, workspace_id: &str, is_trash: bool, conn: &SqliteConnection) -> Result, WorkspaceError> { let app_table = dsl::app_table .filter(app_table::workspace_id.eq(workspace_id)) .filter(app_table::is_trash.eq(is_trash)) - .load::(&*(self.database.db_connection()?))?; + .load::(conn)?; Ok(app_table) } - pub(crate) fn delete_app(&self, app_id: &str) -> Result { - let conn = self.database.db_connection()?; - // TODO: group into sql transaction - let app_table = dsl::app_table - .filter(app_table::id.eq(app_id)) - .first::(&*(self.database.db_connection()?))?; + pub(crate) fn delete_app(&self, app_id: &str, conn: &SqliteConnection) -> Result { + let app_table = dsl::app_table.filter(app_table::id.eq(app_id)).first::(conn)?; diesel_delete_table!(app_table, app_id, conn); Ok(app_table) } diff --git a/rust-lib/flowy-workspace/src/sql_tables/app/mod.rs b/rust-lib/flowy-workspace/src/sql_tables/app/mod.rs index 2ae56b4950..4e2792e3d1 100644 --- a/rust-lib/flowy-workspace/src/sql_tables/app/mod.rs +++ b/rust-lib/flowy-workspace/src/sql_tables/app/mod.rs @@ -1,5 +1,5 @@ mod app_sql; mod app_table; -pub use app_sql::*; -pub use app_table::*; +pub(crate) use app_sql::*; +pub(crate) use app_table::*; diff --git a/rust-lib/flowy-workspace/src/sql_tables/view/view_sql.rs b/rust-lib/flowy-workspace/src/sql_tables/view/view_sql.rs index d1d0e15232..64254f9a6a 100644 --- a/rust-lib/flowy-workspace/src/sql_tables/view/view_sql.rs +++ b/rust-lib/flowy-workspace/src/sql_tables/view/view_sql.rs @@ -1,56 +1,46 @@ use crate::{ errors::WorkspaceError, - module::WorkspaceDatabase, sql_tables::view::{ViewTable, ViewTableChangeset}, }; use flowy_database::{ prelude::*, schema::{view_table, view_table::dsl}, + SqliteConnection, }; -use std::sync::Arc; -pub struct ViewTableSql { - pub database: Arc, -} +pub struct ViewTableSql {} impl ViewTableSql { - pub(crate) fn create_view(&self, view_table: ViewTable) -> Result<(), WorkspaceError> { - let conn = self.database.db_connection()?; - let _ = diesel::insert_into(view_table::table).values(view_table).execute(&*conn)?; + pub(crate) fn create_view(&self, view_table: ViewTable, conn: &SqliteConnection) -> Result<(), WorkspaceError> { + let _ = diesel::insert_into(view_table::table).values(view_table).execute(conn)?; Ok(()) } - pub(crate) fn read_view(&self, view_id: &str, is_trash: bool) -> Result { - let view_table = dsl::view_table - .filter(view_table::id.eq(view_id)) - .filter(view_table::is_trash.eq(is_trash)) - .first::(&*(self.database.db_connection()?))?; - + pub(crate) fn read_view(&self, view_id: &str, is_trash: Option, conn: &SqliteConnection) -> Result { + // https://docs.diesel.rs/diesel/query_builder/struct.UpdateStatement.html + let mut filter = dsl::view_table.filter(view_table::id.eq(view_id)).into_boxed(); + if let Some(is_trash) = is_trash { + filter = filter.filter(view_table::is_trash.eq(is_trash)); + } + let view_table = filter.first::(conn)?; Ok(view_table) } - pub(crate) fn read_views_belong_to(&self, belong_to_id: &str) -> Result, WorkspaceError> { + pub(crate) fn read_views_belong_to(&self, belong_to_id: &str, conn: &SqliteConnection) -> Result, WorkspaceError> { let view_tables = dsl::view_table .filter(view_table::belong_to_id.eq(belong_to_id)) - .load::(&*(self.database.db_connection()?))?; + .load::(conn)?; Ok(view_tables) } - pub(crate) fn update_view(&self, changeset: ViewTableChangeset) -> Result<(), WorkspaceError> { - let conn = self.database.db_connection()?; - diesel_update_table!(view_table, changeset, &*conn); + pub(crate) fn update_view(&self, changeset: ViewTableChangeset, conn: &SqliteConnection) -> Result<(), WorkspaceError> { + diesel_update_table!(view_table, changeset, conn); Ok(()) } - pub(crate) fn delete_view(&self, view_id: &str) -> Result { - let conn = self.database.db_connection()?; - - // TODO: group into transaction - let view_table = dsl::view_table - .filter(view_table::id.eq(view_id)) - .first::(&*(self.database.db_connection()?))?; - + pub(crate) fn delete_view(&self, view_id: &str, conn: &SqliteConnection) -> Result { + let view_table = dsl::view_table.filter(view_table::id.eq(view_id)).first::(conn)?; diesel_delete_table!(view_table, view_id, conn); Ok(view_table) } diff --git a/rust-lib/flowy-workspace/src/sql_tables/workspace/mod.rs b/rust-lib/flowy-workspace/src/sql_tables/workspace/mod.rs index 2c8735b267..e9461c942f 100644 --- a/rust-lib/flowy-workspace/src/sql_tables/workspace/mod.rs +++ b/rust-lib/flowy-workspace/src/sql_tables/workspace/mod.rs @@ -1,5 +1,5 @@ mod workspace_sql; mod workspace_table; -pub use workspace_sql::*; -pub use workspace_table::*; +pub(crate) use workspace_sql::*; +pub(crate) use workspace_table::*; diff --git a/rust-lib/flowy-workspace/src/sql_tables/workspace/workspace_sql.rs b/rust-lib/flowy-workspace/src/sql_tables/workspace/workspace_sql.rs index 157ecc6594..a3eb9dbdf9 100644 --- a/rust-lib/flowy-workspace/src/sql_tables/workspace/workspace_sql.rs +++ b/rust-lib/flowy-workspace/src/sql_tables/workspace/workspace_sql.rs @@ -1,53 +1,20 @@ use crate::{ - entities::app::App, errors::WorkspaceError, - module::WorkspaceDatabase, sql_tables::{ - app::{AppTable, AppTableSql}, + app::AppTable, workspace::{WorkspaceTable, WorkspaceTableChangeset}, }, }; use diesel::SqliteConnection; use flowy_database::{ - macros::*, prelude::*, schema::{workspace_table, workspace_table::dsl}, - DBConnection, }; -use std::sync::Arc; -pub(crate) struct WorkspaceSql { - database: Arc, - app_sql: Arc, -} +pub(crate) struct WorkspaceTableSql {} -impl WorkspaceSql { - pub fn new(database: Arc) -> Self { - Self { - database: database.clone(), - app_sql: Arc::new(AppTableSql::new(database.clone())), - } - } -} - -impl WorkspaceSql { - pub(crate) fn create_workspace(&self, table: WorkspaceTable) -> Result<(), WorkspaceError> { - let conn = &*self.database.db_connection()?; - //[[immediate_transaction]] - // https://sqlite.org/lang_transaction.html - // IMMEDIATE cause the database connection to start a new write immediately, - // without waiting for a write statement. The BEGIN IMMEDIATE might fail - // with SQLITE_BUSY if another write transaction is already active on another - // database connection. - // - // EXCLUSIVE is similar to IMMEDIATE in that a write transaction is started - // immediately. EXCLUSIVE and IMMEDIATE are the same in WAL mode, but in - // other journaling modes, EXCLUSIVE prevents other database connections from - // reading the database while the transaction is underway. - (conn).immediate_transaction::<_, WorkspaceError, _>(|| self.create_workspace_with(table, conn)) - } - - pub(crate) fn create_workspace_with(&self, table: WorkspaceTable, conn: &SqliteConnection) -> Result<(), WorkspaceError> { +impl WorkspaceTableSql { + pub(crate) fn create_workspace(&self, table: WorkspaceTable, conn: &SqliteConnection) -> Result<(), WorkspaceError> { match diesel_record_count!(workspace_table, &table.id, conn) { 0 => diesel_insert_table!(workspace_table, &table, conn), _ => { @@ -58,55 +25,48 @@ impl WorkspaceSql { Ok(()) } - pub(crate) fn create_apps(&self, apps: Vec, conn: &SqliteConnection) -> Result<(), WorkspaceError> { - for app in apps { - let _ = self.app_sql.create_app_with(AppTable::new(app), conn)?; - } - Ok(()) - } - - pub(crate) fn read_workspaces(&self, workspace_id: Option, user_id: &str) -> Result, WorkspaceError> { + pub(crate) fn read_workspaces( + &self, + workspace_id: Option, + user_id: &str, + conn: &SqliteConnection, + ) -> Result, WorkspaceError> { let workspaces = match workspace_id { None => dsl::workspace_table .filter(workspace_table::user_id.eq(user_id)) - .load::(&*(self.database.db_connection()?))?, + .load::(conn)?, Some(workspace_id) => dsl::workspace_table .filter(workspace_table::user_id.eq(user_id)) .filter(workspace_table::id.eq(&workspace_id)) - .load::(&*(self.database.db_connection()?))?, + .load::(conn)?, }; Ok(workspaces) } - pub(crate) fn update_workspace(&self, changeset: WorkspaceTableChangeset) -> Result<(), WorkspaceError> { - let conn = self.database.db_connection()?; - diesel_update_table!(workspace_table, changeset, &*conn); + pub(crate) fn update_workspace(&self, changeset: WorkspaceTableChangeset, conn: &SqliteConnection) -> Result<(), WorkspaceError> { + diesel_update_table!(workspace_table, changeset, conn); Ok(()) } - pub(crate) fn delete_workspace(&self, workspace_id: &str) -> Result<(), WorkspaceError> { - let conn = self.database.db_connection()?; + pub(crate) fn delete_workspace(&self, workspace_id: &str, conn: &SqliteConnection) -> Result<(), WorkspaceError> { diesel_delete_table!(workspace_table, workspace_id, conn); Ok(()) } - pub(crate) fn read_apps_belong_to_workspace(&self, workspace_id: &str) -> Result, WorkspaceError> { - let conn = self.database.db_connection()?; - + pub(crate) fn read_apps_belong_to_workspace( + &self, + workspace_id: &str, + conn: &SqliteConnection, + ) -> Result, WorkspaceError> { let apps = conn.immediate_transaction::<_, WorkspaceError, _>(|| { let workspace_table: WorkspaceTable = dsl::workspace_table .filter(workspace_table::id.eq(workspace_id)) - .first::(&*(conn))?; - let apps = AppTable::belonging_to(&workspace_table).load::(&*conn)?; + .first::(conn)?; + let apps = AppTable::belonging_to(&workspace_table).load::(conn)?; Ok(apps) })?; Ok(apps) } - - pub(crate) fn get_db_conn(&self) -> Result { - let db = self.database.db_connection()?; - Ok(db) - } }