diff --git a/backend/tests/document_test/edit_script.rs b/backend/tests/document_test/edit_script.rs index 83003b6097..cf900f97f0 100644 --- a/backend/tests/document_test/edit_script.rs +++ b/backend/tests/document_test/edit_script.rs @@ -2,7 +2,7 @@ #![cfg_attr(rustfmt, rustfmt::skip)] use std::convert::TryInto; use actix_web::web::Data; -use flowy_document::services::doc::edit::ClientDocumentEditor; +use flowy_document::core::edit::ClientDocumentEditor; use flowy_test::{helper::ViewTest, FlowySDKTest}; use flowy_user::services::user::UserSession; use futures_util::{stream, stream::StreamExt}; diff --git a/backend/tests/util/helper.rs b/backend/tests/util/helper.rs index 66c609c433..ee1b92b96a 100644 --- a/backend/tests/util/helper.rs +++ b/backend/tests/util/helper.rs @@ -6,15 +6,14 @@ use backend::{ use backend_service::{ configuration::{get_client_server_configuration, ClientServerConfiguration}, errors::ServerError, - user_request::*, - workspace_request::*, + http_request::*, }; use flowy_collaboration::{ document::default::initial_delta_string, entities::doc::{CreateDocParams, DocumentId, DocumentInfo}, }; use flowy_core_data_model::entities::prelude::*; -use flowy_document::services::server::{create_doc_request, read_doc_request}; +use flowy_document::server::{create_doc_request, read_doc_request}; use flowy_user_data_model::entities::*; use lib_infra::uuid_string; use sqlx::{Connection, Executor, PgConnection, PgPool}; diff --git a/frontend/rust-lib/flowy-core/src/core/core_context.rs b/frontend/rust-lib/flowy-core/src/context.rs similarity index 100% rename from frontend/rust-lib/flowy-core/src/core/core_context.rs rename to frontend/rust-lib/flowy-core/src/context.rs diff --git a/frontend/rust-lib/flowy-core/src/core/aggregate_tasks/mod.rs b/frontend/rust-lib/flowy-core/src/core/aggregate_tasks/mod.rs deleted file mode 100644 index e14803561f..0000000000 --- a/frontend/rust-lib/flowy-core/src/core/aggregate_tasks/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod workspace_task; - -pub use workspace_task::*; diff --git a/frontend/rust-lib/flowy-core/src/core/aggregate_tasks/workspace_task.rs b/frontend/rust-lib/flowy-core/src/core/aggregate_tasks/workspace_task.rs deleted file mode 100644 index ab42d92455..0000000000 --- a/frontend/rust-lib/flowy-core/src/core/aggregate_tasks/workspace_task.rs +++ /dev/null @@ -1,60 +0,0 @@ -use crate::{ - core::CoreContext, - errors::FlowyError, - notify::{send_dart_notification, WorkspaceNotification}, - services::workspace::sql::{WorkspaceTable, WorkspaceTableSql}, -}; -use flowy_core_data_model::entities::workspace::WorkspaceId; -use lib_dispatch::prelude::Unit; -use std::sync::Arc; - -#[tracing::instrument(level = "debug", skip(core), err)] -pub fn read_workspaces_on_server( - core: Unit>, - user_id: String, - params: WorkspaceId, -) -> Result<(), FlowyError> { - let (token, server) = (core.user.token()?, core.server.clone()); - let app_ctrl = core.app_controller.clone(); - let view_ctrl = core.view_controller.clone(); - let conn = core.database.db_connection()?; - - tokio::spawn(async move { - // Opti: handle the error and retry? - let workspaces = server.read_workspace(&token, params).await?; - let _ = (&*conn).immediate_transaction::<_, FlowyError, _>(|| { - tracing::debug!("Save {} workspace", workspaces.len()); - for workspace in &workspaces.items { - let m_workspace = workspace.clone(); - let apps = m_workspace.apps.clone().into_inner(); - let workspace_table = WorkspaceTable::new(m_workspace, &user_id); - - let _ = WorkspaceTableSql::create_workspace(workspace_table, &*conn)?; - tracing::debug!("Save {} apps", apps.len()); - for app in apps { - let views = app.belongings.clone().into_inner(); - match app_ctrl.save_app(app, &*conn) { - Ok(_) => {}, - Err(e) => log::error!("create app failed: {:?}", e), - } - - tracing::debug!("Save {} views", views.len()); - for view in views { - match view_ctrl.save_view(view, &*conn) { - Ok(_) => {}, - Err(e) => log::error!("create view failed: {:?}", e), - } - } - } - } - Ok(()) - })?; - - send_dart_notification(&token, WorkspaceNotification::WorkspaceListUpdated) - .payload(workspaces) - .send(); - Result::<(), FlowyError>::Ok(()) - }); - - Ok(()) -} diff --git a/frontend/rust-lib/flowy-core/src/core/event_handler.rs b/frontend/rust-lib/flowy-core/src/core/event_handler.rs deleted file mode 100644 index f3e9ac3faa..0000000000 --- a/frontend/rust-lib/flowy-core/src/core/event_handler.rs +++ /dev/null @@ -1,56 +0,0 @@ -use crate::{ - core::{aggregate_tasks::read_workspaces_on_server, CoreContext}, - errors::FlowyError, - services::{get_current_workspace, read_local_workspace_apps}, -}; -use flowy_core_data_model::entities::{ - view::View, - workspace::{CurrentWorkspaceSetting, QueryWorkspaceRequest, RepeatedWorkspace, WorkspaceId}, -}; -use lib_dispatch::prelude::{data_result, Data, DataResult, Unit}; -use std::{convert::TryInto, sync::Arc}; - -#[tracing::instrument(skip(data, core), err)] -pub(crate) async fn read_workspaces_handler( - data: Data, - core: Unit>, -) -> DataResult { - let params: WorkspaceId = data.into_inner().try_into()?; - let user_id = core.user.user_id()?; - let conn = &*core.database.db_connection()?; - let workspace_controller = core.workspace_controller.clone(); - - let trash_controller = core.trash_controller.clone(); - let workspaces = conn.immediate_transaction::<_, FlowyError, _>(|| { - let mut workspaces = workspace_controller.read_local_workspaces(params.workspace_id.clone(), &user_id, conn)?; - for workspace in workspaces.iter_mut() { - let apps = read_local_workspace_apps(&workspace.id, trash_controller.clone(), conn)?.into_inner(); - workspace.apps.items = apps; - } - Ok(workspaces) - })?; - - let _ = read_workspaces_on_server(core, user_id, params); - - data_result(workspaces) -} - -#[tracing::instrument(skip(core), err)] -pub async fn read_cur_workspace_handler( - core: Unit>, -) -> DataResult { - let workspace_id = get_current_workspace()?; - let user_id = core.user.user_id()?; - let params = WorkspaceId { - workspace_id: Some(workspace_id.clone()), - }; - let conn = &*core.database.db_connection()?; - let workspace = core - .workspace_controller - .read_local_workspace(workspace_id, &user_id, conn)?; - - let latest_view: Option = core.view_controller.latest_visit_view().unwrap_or(None); - let setting = CurrentWorkspaceSetting { workspace, latest_view }; - let _ = read_workspaces_on_server(core, user_id, params); - data_result(setting) -} diff --git a/frontend/rust-lib/flowy-core/src/core/mod.rs b/frontend/rust-lib/flowy-core/src/core/mod.rs deleted file mode 100644 index 626ce77015..0000000000 --- a/frontend/rust-lib/flowy-core/src/core/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod aggregate_tasks; -mod core_context; - -pub mod event_handler; -pub use core_context::*; diff --git a/frontend/rust-lib/flowy-core/src/event_handler.rs b/frontend/rust-lib/flowy-core/src/event_handler.rs new file mode 100644 index 0000000000..13d32013d3 --- /dev/null +++ b/frontend/rust-lib/flowy-core/src/event_handler.rs @@ -0,0 +1,112 @@ +use crate::{ + context::CoreContext, + errors::FlowyError, + notify::{send_dart_notification, WorkspaceNotification}, + services::{ + get_current_workspace, + read_local_workspace_apps, + workspace::sql::{WorkspaceTable, WorkspaceTableSql}, + }, +}; +use flowy_core_data_model::entities::{ + view::View, + workspace::{CurrentWorkspaceSetting, QueryWorkspaceRequest, RepeatedWorkspace, WorkspaceId}, +}; +use lib_dispatch::prelude::{data_result, Data, DataResult, Unit}; +use std::{convert::TryInto, sync::Arc}; + +#[tracing::instrument(skip(data, core), err)] +pub(crate) async fn read_workspaces_handler( + data: Data, + core: Unit>, +) -> DataResult { + let params: WorkspaceId = data.into_inner().try_into()?; + let user_id = core.user.user_id()?; + let conn = &*core.database.db_connection()?; + let workspace_controller = core.workspace_controller.clone(); + + let trash_controller = core.trash_controller.clone(); + let workspaces = conn.immediate_transaction::<_, FlowyError, _>(|| { + let mut workspaces = workspace_controller.read_local_workspaces(params.workspace_id.clone(), &user_id, conn)?; + for workspace in workspaces.iter_mut() { + let apps = read_local_workspace_apps(&workspace.id, trash_controller.clone(), conn)?.into_inner(); + workspace.apps.items = apps; + } + Ok(workspaces) + })?; + + let _ = read_workspaces_on_server(core, user_id, params); + + data_result(workspaces) +} + +#[tracing::instrument(skip(core), err)] +pub async fn read_cur_workspace_handler( + core: Unit>, +) -> DataResult { + let workspace_id = get_current_workspace()?; + let user_id = core.user.user_id()?; + let params = WorkspaceId { + workspace_id: Some(workspace_id.clone()), + }; + let conn = &*core.database.db_connection()?; + let workspace = core + .workspace_controller + .read_local_workspace(workspace_id, &user_id, conn)?; + + let latest_view: Option = core.view_controller.latest_visit_view().unwrap_or(None); + let setting = CurrentWorkspaceSetting { workspace, latest_view }; + let _ = read_workspaces_on_server(core, user_id, params); + data_result(setting) +} + +#[tracing::instrument(level = "debug", skip(core), err)] +fn read_workspaces_on_server( + core: Unit>, + user_id: String, + params: WorkspaceId, +) -> Result<(), FlowyError> { + let (token, server) = (core.user.token()?, core.server.clone()); + let app_ctrl = core.app_controller.clone(); + let view_ctrl = core.view_controller.clone(); + let conn = core.database.db_connection()?; + + tokio::spawn(async move { + // Opti: handle the error and retry? + let workspaces = server.read_workspace(&token, params).await?; + let _ = (&*conn).immediate_transaction::<_, FlowyError, _>(|| { + tracing::debug!("Save {} workspace", workspaces.len()); + for workspace in &workspaces.items { + let m_workspace = workspace.clone(); + let apps = m_workspace.apps.clone().into_inner(); + let workspace_table = WorkspaceTable::new(m_workspace, &user_id); + + let _ = WorkspaceTableSql::create_workspace(workspace_table, &*conn)?; + tracing::debug!("Save {} apps", apps.len()); + for app in apps { + let views = app.belongings.clone().into_inner(); + match app_ctrl.save_app(app, &*conn) { + Ok(_) => {}, + Err(e) => log::error!("create app failed: {:?}", e), + } + + tracing::debug!("Save {} views", views.len()); + for view in views { + match view_ctrl.save_view(view, &*conn) { + Ok(_) => {}, + Err(e) => log::error!("create view failed: {:?}", e), + } + } + } + } + Ok(()) + })?; + + send_dart_notification(&token, WorkspaceNotification::WorkspaceListUpdated) + .payload(workspaces) + .send(); + Result::<(), FlowyError>::Ok(()) + }); + + Ok(()) +} diff --git a/frontend/rust-lib/flowy-core/src/lib.rs b/frontend/rust-lib/flowy-core/src/lib.rs index 51cb51acf7..37f2a306b1 100644 --- a/frontend/rust-lib/flowy-core/src/lib.rs +++ b/frontend/rust-lib/flowy-core/src/lib.rs @@ -10,8 +10,8 @@ mod macros; #[macro_use] extern crate flowy_database; -pub mod core; - +pub mod context; +pub mod event_handler; mod notify; pub mod protobuf; mod util; @@ -19,7 +19,7 @@ mod util; pub mod prelude { pub use flowy_core_data_model::entities::{app::*, trash::*, view::*, workspace::*}; - pub use crate::{core::*, errors::*, module::*}; + pub use crate::{errors::*, module::*}; } pub mod errors { diff --git a/frontend/rust-lib/flowy-core/src/module.rs b/frontend/rust-lib/flowy-core/src/module.rs index 1d5f1a0615..e99925f3fa 100644 --- a/frontend/rust-lib/flowy-core/src/module.rs +++ b/frontend/rust-lib/flowy-core/src/module.rs @@ -5,11 +5,11 @@ use flowy_database::DBConnection; use flowy_document::context::DocumentContext; use lib_dispatch::prelude::*; use lib_sqlite::ConnectionPool; - use crate::{ - core::{event_handler::*, CoreContext}, + context::CoreContext, errors::FlowyError, event::WorkspaceEvent, + event_handler::*, services::{ app::event_handler::*, server::construct_workspace_server, diff --git a/frontend/rust-lib/flowy-core/src/services/server/server_api.rs b/frontend/rust-lib/flowy-core/src/services/server/server_api.rs index da6c1d158a..2b6b5eabf7 100644 --- a/frontend/rust-lib/flowy-core/src/services/server/server_api.rs +++ b/frontend/rust-lib/flowy-core/src/services/server/server_api.rs @@ -9,7 +9,7 @@ use crate::{ notify::{send_dart_notification, WorkspaceNotification}, services::server::WorkspaceServerAPI, }; -use backend_service::{configuration::ClientServerConfiguration, middleware::*, workspace_request::*}; +use backend_service::{configuration::ClientServerConfiguration, http_request::*, middleware::*}; use lib_infra::future::FutureResult; pub struct WorkspaceHttpServer { diff --git a/frontend/rust-lib/flowy-document/src/context.rs b/frontend/rust-lib/flowy-document/src/context.rs index 19d22e3365..7a30967bbc 100644 --- a/frontend/rust-lib/flowy-document/src/context.rs +++ b/frontend/rust-lib/flowy-document/src/context.rs @@ -1,13 +1,11 @@ -use crate::{ - errors::FlowyError, - services::{ - controller::DocumentController, - doc::{DocumentWSReceivers, DocumentWebSocket}, - server::construct_doc_server, - }, -}; +use crate::errors::FlowyError; use backend_service::configuration::ClientServerConfiguration; +use crate::{ + controller::DocumentController, + core::{DocumentWSReceivers, DocumentWebSocket}, + server::construct_doc_server, +}; use flowy_database::ConnectionPool; use std::sync::Arc; diff --git a/frontend/rust-lib/flowy-document/src/services/controller.rs b/frontend/rust-lib/flowy-document/src/controller.rs similarity index 96% rename from frontend/rust-lib/flowy-document/src/services/controller.rs rename to frontend/rust-lib/flowy-document/src/controller.rs index d38c2e35af..dca7408098 100644 --- a/frontend/rust-lib/flowy-document/src/services/controller.rs +++ b/frontend/rust-lib/flowy-document/src/controller.rs @@ -1,16 +1,14 @@ use crate::{ context::DocumentUser, - errors::FlowyError, - services::{ - doc::{ - edit::ClientDocumentEditor, - revision::{RevisionCache, RevisionManager, RevisionServer}, - DocumentWSReceivers, - DocumentWebSocket, - WSStateReceiver, - }, - server::Server, + core::{ + edit::ClientDocumentEditor, + revision::{RevisionCache, RevisionManager, RevisionServer}, + DocumentWSReceivers, + DocumentWebSocket, + WSStateReceiver, }, + errors::FlowyError, + server::Server, }; use bytes::Bytes; use dashmap::DashMap; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs b/frontend/rust-lib/flowy-document/src/core/edit/editor.rs similarity index 99% rename from frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs rename to frontend/rust-lib/flowy-document/src/core/edit/editor.rs index c586315eb3..ef16e1c47c 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/core/edit/editor.rs @@ -1,10 +1,10 @@ use crate::{ context::DocumentUser, - errors::FlowyError, - services::doc::{ + core::{ web_socket::{make_document_ws_manager, DocumentWebSocketManager}, *, }, + errors::FlowyError, }; use bytes::Bytes; use flowy_collaboration::{ diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs b/frontend/rust-lib/flowy-document/src/core/edit/mod.rs similarity index 100% rename from frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs rename to frontend/rust-lib/flowy-document/src/core/edit/mod.rs diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs b/frontend/rust-lib/flowy-document/src/core/edit/queue.rs similarity index 100% rename from frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs rename to frontend/rust-lib/flowy-document/src/core/edit/queue.rs diff --git a/frontend/rust-lib/flowy-document/src/services/doc/mod.rs b/frontend/rust-lib/flowy-document/src/core/mod.rs similarity index 76% rename from frontend/rust-lib/flowy-document/src/services/doc/mod.rs rename to frontend/rust-lib/flowy-document/src/core/mod.rs index a582ca006f..d2e604915b 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/mod.rs +++ b/frontend/rust-lib/flowy-document/src/core/mod.rs @@ -1,7 +1,7 @@ pub mod edit; pub mod revision; mod web_socket; -pub use crate::services::ws_receivers::*; +pub use crate::ws_receivers::*; pub use edit::*; pub use revision::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs b/frontend/rust-lib/flowy-document/src/core/revision/cache.rs similarity index 90% rename from frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs rename to frontend/rust-lib/flowy-document/src/core/revision/cache.rs index 5f8a1e7193..1e4a3827a1 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs +++ b/frontend/rust-lib/flowy-document/src/core/revision/cache.rs @@ -1,33 +1,32 @@ use crate::{ - errors::FlowyError, - services::doc::revision::cache::{ - disk::{Persistence, RevisionDiskCache}, + core::revision::{ + disk::{DocumentRevisionDiskCache, RevisionChangeset, RevisionTableState, SQLitePersistence}, memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate}, }, - sql_tables::{RevisionChangeset, RevisionTableState}, + errors::FlowyError, }; -use std::borrow::Cow; - use flowy_collaboration::entities::revision::{Revision, RevisionRange, RevisionState}; use flowy_database::ConnectionPool; use flowy_error::{internal_error, FlowyResult}; - -use std::sync::{ - atomic::{AtomicI64, Ordering::SeqCst}, - Arc, +use std::{ + borrow::Cow, + sync::{ + atomic::{AtomicI64, Ordering::SeqCst}, + Arc, + }, }; use tokio::task::spawn_blocking; pub struct RevisionCache { doc_id: String, - disk_cache: Arc>, + disk_cache: Arc>, memory_cache: Arc, latest_rev_id: AtomicI64, } impl RevisionCache { pub fn new(user_id: &str, doc_id: &str, pool: Arc) -> RevisionCache { - let disk_cache = Arc::new(Persistence::new(user_id, pool)); + let disk_cache = Arc::new(SQLitePersistence::new(user_id, pool)); let memory_cache = Arc::new(RevisionMemoryCache::new(doc_id, Arc::new(disk_cache.clone()))); let doc_id = doc_id.to_owned(); Self { @@ -121,7 +120,7 @@ impl RevisionCache { .collect::>(); let _ = self.memory_cache.reset_with_revisions(&revision_records).await?; - let _ = self.disk_cache.reset_with_revisions(doc_id, revision_records)?; + let _ = self.disk_cache.reset_document(doc_id, revision_records)?; Ok(()) } @@ -131,7 +130,7 @@ impl RevisionCache { } } -impl RevisionMemoryCacheDelegate for Arc { +impl RevisionMemoryCacheDelegate for Arc { fn checkpoint_tick(&self, mut records: Vec) -> FlowyResult<()> { let conn = &*self.pool.get().map_err(internal_error)?; records.retain(|record| record.write_to_disk); diff --git a/frontend/rust-lib/flowy-document/src/core/revision/disk/mod.rs b/frontend/rust-lib/flowy-document/src/core/revision/disk/mod.rs new file mode 100644 index 0000000000..6c95aaed7f --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/core/revision/disk/mod.rs @@ -0,0 +1,42 @@ +mod sql_impl; +use crate::core::revision::RevisionRecord; +use diesel::SqliteConnection; +use flowy_collaboration::entities::revision::RevisionRange; +pub use sql_impl::*; + +use flowy_error::FlowyResult; +use std::fmt::Debug; + +pub trait DocumentRevisionDiskCache: Sync + Send { + type Error: Debug; + fn write_revision_records( + &self, + revisions: Vec, + conn: &SqliteConnection, + ) -> Result<(), Self::Error>; + + // Read all the records if the rev_ids is None + fn read_revision_records( + &self, + doc_id: &str, + rev_ids: Option>, + ) -> Result, Self::Error>; + + fn read_revision_records_with_range( + &self, + doc_id: &str, + range: &RevisionRange, + ) -> Result, Self::Error>; + + fn update_revision_record(&self, changesets: Vec) -> FlowyResult<()>; + + // Delete all the records if the rev_ids is None + fn delete_revision_records( + &self, + doc_id: &str, + rev_ids: Option>, + conn: &SqliteConnection, + ) -> Result<(), Self::Error>; + + fn reset_document(&self, doc_id: &str, revision_records: Vec) -> Result<(), Self::Error>; +} diff --git a/frontend/rust-lib/flowy-document/src/core/revision/disk/sql_impl.rs b/frontend/rust-lib/flowy-document/src/core/revision/disk/sql_impl.rs new file mode 100644 index 0000000000..587c9cec85 --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/core/revision/disk/sql_impl.rs @@ -0,0 +1,306 @@ +use crate::core::revision::{disk::DocumentRevisionDiskCache, RevisionRecord}; +use bytes::Bytes; +use diesel::{sql_types::Integer, update, SqliteConnection}; +use flowy_collaboration::{ + entities::revision::{RevId, RevType, Revision, RevisionRange, RevisionState}, + util::md5, +}; +use flowy_database::{ + insert_or_ignore_into, + prelude::*, + schema::{rev_table, rev_table::dsl}, + ConnectionPool, +}; +use flowy_error::{internal_error, FlowyError, FlowyResult}; +use std::sync::Arc; + +pub struct SQLitePersistence { + user_id: String, + pub(crate) pool: Arc, +} + +impl DocumentRevisionDiskCache for SQLitePersistence { + type Error = FlowyError; + + fn write_revision_records( + &self, + revisions: Vec, + conn: &SqliteConnection, + ) -> Result<(), Self::Error> { + let _ = RevisionTableSql::create(revisions, conn)?; + Ok(()) + } + + fn read_revision_records( + &self, + doc_id: &str, + rev_ids: Option>, + ) -> Result, Self::Error> { + let conn = self.pool.get().map_err(internal_error)?; + let records = RevisionTableSql::read(&self.user_id, doc_id, rev_ids, &*conn)?; + Ok(records) + } + + fn read_revision_records_with_range( + &self, + doc_id: &str, + range: &RevisionRange, + ) -> Result, Self::Error> { + let conn = &*self.pool.get().map_err(internal_error)?; + let revisions = RevisionTableSql::read_with_range(&self.user_id, doc_id, range.clone(), conn)?; + Ok(revisions) + } + + fn update_revision_record(&self, changesets: Vec) -> FlowyResult<()> { + let conn = &*self.pool.get().map_err(internal_error)?; + let _ = conn.immediate_transaction::<_, FlowyError, _>(|| { + for changeset in changesets { + let _ = RevisionTableSql::update(changeset, conn)?; + } + Ok(()) + })?; + Ok(()) + } + + fn delete_revision_records( + &self, + doc_id: &str, + rev_ids: Option>, + conn: &SqliteConnection, + ) -> Result<(), Self::Error> { + let _ = RevisionTableSql::delete(doc_id, rev_ids, conn)?; + Ok(()) + } + + fn reset_document(&self, doc_id: &str, revision_records: Vec) -> Result<(), Self::Error> { + let conn = self.pool.get().map_err(internal_error)?; + conn.immediate_transaction::<_, FlowyError, _>(|| { + let _ = self.delete_revision_records(doc_id, None, &*conn)?; + let _ = self.write_revision_records(revision_records, &*conn)?; + Ok(()) + }) + } +} + +impl SQLitePersistence { + pub(crate) fn new(user_id: &str, pool: Arc) -> Self { + Self { + user_id: user_id.to_owned(), + pool, + } + } +} + +pub struct RevisionTableSql {} + +impl RevisionTableSql { + pub(crate) fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { + // Batch insert: https://diesel.rs/guides/all-about-inserts.html + let records = revision_records + .into_iter() + .map(|record| { + let rev_state: RevisionTableState = record.state.into(); + ( + dsl::doc_id.eq(record.revision.doc_id), + dsl::base_rev_id.eq(record.revision.base_rev_id), + dsl::rev_id.eq(record.revision.rev_id), + dsl::data.eq(record.revision.delta_data), + dsl::state.eq(rev_state), + dsl::ty.eq(RevTableType::Local), + ) + }) + .collect::>(); + + let _ = insert_or_ignore_into(dsl::rev_table).values(&records).execute(conn)?; + Ok(()) + } + + pub(crate) fn update(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> { + let filter = dsl::rev_table + .filter(dsl::rev_id.eq(changeset.rev_id.as_ref())) + .filter(dsl::doc_id.eq(changeset.doc_id)); + let _ = update(filter).set(dsl::state.eq(changeset.state)).execute(conn)?; + tracing::debug!("Set {} to {:?}", changeset.rev_id, changeset.state); + Ok(()) + } + + pub(crate) fn read( + user_id: &str, + doc_id: &str, + rev_ids: Option>, + conn: &SqliteConnection, + ) -> Result, FlowyError> { + let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(doc_id)).into_boxed(); + if let Some(rev_ids) = rev_ids { + sql = sql.filter(dsl::rev_id.eq_any(rev_ids)); + } + let rows = sql.order(dsl::rev_id.asc()).load::(conn)?; + let records = rows + .into_iter() + .map(|row| mk_revision_record_from_table(user_id, row)) + .collect::>(); + + Ok(records) + } + + pub(crate) fn read_with_range( + user_id: &str, + doc_id: &str, + range: RevisionRange, + conn: &SqliteConnection, + ) -> Result, FlowyError> { + let rev_tables = dsl::rev_table + .filter(dsl::rev_id.ge(range.start)) + .filter(dsl::rev_id.le(range.end)) + .filter(dsl::doc_id.eq(doc_id)) + .order(dsl::rev_id.asc()) + .load::(conn)?; + + let revisions = rev_tables + .into_iter() + .map(|table| mk_revision_record_from_table(user_id, table)) + .collect::>(); + Ok(revisions) + } + + pub(crate) fn delete(doc_id: &str, rev_ids: Option>, conn: &SqliteConnection) -> Result<(), FlowyError> { + let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(doc_id)).into_boxed(); + if let Some(rev_ids) = rev_ids { + sql = sql.filter(dsl::rev_id.eq_any(rev_ids)); + } + + let affected_row = sql.execute(conn)?; + tracing::debug!("Delete {} revision rows", affected_row); + Ok(()) + } +} + +#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)] +#[table_name = "rev_table"] +pub(crate) struct RevisionTable { + id: i32, + pub(crate) doc_id: String, + pub(crate) base_rev_id: i64, + pub(crate) rev_id: i64, + pub(crate) data: Vec, + pub(crate) state: RevisionTableState, + pub(crate) ty: RevTableType, // Deprecated +} + +#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)] +#[repr(i32)] +#[sql_type = "Integer"] +pub enum RevisionTableState { + Local = 0, + Ack = 1, +} + +impl std::default::Default for RevisionTableState { + fn default() -> Self { RevisionTableState::Local } +} + +impl std::convert::From for RevisionTableState { + fn from(value: i32) -> Self { + match value { + 0 => RevisionTableState::Local, + 1 => RevisionTableState::Ack, + o => { + log::error!("Unsupported rev state {}, fallback to RevState::Local", o); + RevisionTableState::Local + }, + } + } +} + +impl RevisionTableState { + pub fn value(&self) -> i32 { *self as i32 } +} +impl_sql_integer_expression!(RevisionTableState); + +impl std::convert::From for RevisionState { + fn from(s: RevisionTableState) -> Self { + match s { + RevisionTableState::Local => RevisionState::Local, + RevisionTableState::Ack => RevisionState::Ack, + } + } +} + +impl std::convert::From for RevisionTableState { + fn from(s: RevisionState) -> Self { + match s { + RevisionState::Local => RevisionTableState::Local, + RevisionState::Ack => RevisionTableState::Ack, + } + } +} + +pub(crate) fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> RevisionRecord { + let md5 = md5(&table.data); + let revision = Revision::new( + &table.doc_id, + table.base_rev_id, + table.rev_id, + Bytes::from(table.data), + &user_id, + md5, + ); + RevisionRecord { + revision, + state: table.state.into(), + write_to_disk: false, + } +} + +#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)] +#[repr(i32)] +#[sql_type = "Integer"] +pub enum RevTableType { + Local = 0, + Remote = 1, +} + +impl std::default::Default for RevTableType { + fn default() -> Self { RevTableType::Local } +} + +impl std::convert::From for RevTableType { + fn from(value: i32) -> Self { + match value { + 0 => RevTableType::Local, + 1 => RevTableType::Remote, + o => { + log::error!("Unsupported rev type {}, fallback to RevTableType::Local", o); + RevTableType::Local + }, + } + } +} +impl RevTableType { + pub fn value(&self) -> i32 { *self as i32 } +} +impl_sql_integer_expression!(RevTableType); + +impl std::convert::From for RevTableType { + fn from(ty: RevType) -> Self { + match ty { + RevType::DeprecatedLocal => RevTableType::Local, + RevType::DeprecatedRemote => RevTableType::Remote, + } + } +} + +impl std::convert::From for RevType { + fn from(ty: RevTableType) -> Self { + match ty { + RevTableType::Local => RevType::DeprecatedLocal, + RevTableType::Remote => RevType::DeprecatedRemote, + } + } +} + +pub struct RevisionChangeset { + pub(crate) doc_id: String, + pub(crate) rev_id: RevId, + pub(crate) state: RevisionTableState, +} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/frontend/rust-lib/flowy-document/src/core/revision/manager.rs similarity index 99% rename from frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs rename to frontend/rust-lib/flowy-document/src/core/revision/manager.rs index 270d0abacf..5f096e423b 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/frontend/rust-lib/flowy-document/src/core/revision/manager.rs @@ -1,6 +1,6 @@ use crate::{ + core::{revision::RevisionCache, RevisionRecord}, errors::FlowyError, - services::doc::{revision::RevisionCache, RevisionRecord}, }; use bytes::Bytes; use dashmap::DashMap; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs b/frontend/rust-lib/flowy-document/src/core/revision/memory.rs similarity index 99% rename from frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs rename to frontend/rust-lib/flowy-document/src/core/revision/memory.rs index 6ef86db121..35fc8f9bbf 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs +++ b/frontend/rust-lib/flowy-document/src/core/revision/memory.rs @@ -1,4 +1,4 @@ -use crate::services::doc::RevisionRecord; +use crate::core::RevisionRecord; use dashmap::DashMap; use flowy_collaboration::entities::revision::RevisionRange; use flowy_error::{FlowyError, FlowyResult}; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/mod.rs b/frontend/rust-lib/flowy-document/src/core/revision/mod.rs similarity index 74% rename from frontend/rust-lib/flowy-document/src/services/doc/revision/mod.rs rename to frontend/rust-lib/flowy-document/src/core/revision/mod.rs index e101635339..78286f4c9e 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/mod.rs +++ b/frontend/rust-lib/flowy-document/src/core/revision/mod.rs @@ -1,5 +1,7 @@ -mod cache; mod manager; +mod cache; +mod disk; +mod memory; pub use cache::*; pub use manager::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/http_ws_impl.rs b/frontend/rust-lib/flowy-document/src/core/web_socket/http_ws_impl.rs similarity index 98% rename from frontend/rust-lib/flowy-document/src/services/doc/web_socket/http_ws_impl.rs rename to frontend/rust-lib/flowy-document/src/core/web_socket/http_ws_impl.rs index 8604585aaf..cd5febd2cf 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/http_ws_impl.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket/http_ws_impl.rs @@ -1,5 +1,5 @@ -use crate::services::{ - doc::{web_socket::web_socket::DocumentWebSocketManager, SYNC_INTERVAL_IN_MILLIS}, +use crate::{ + core::{web_socket::web_socket::DocumentWebSocketManager, SYNC_INTERVAL_IN_MILLIS}, ws_receivers::{DocumentWSReceiver, DocumentWebSocket}, }; use async_stream::stream; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/local_ws_impl.rs b/frontend/rust-lib/flowy-document/src/core/web_socket/local_ws_impl.rs similarity index 85% rename from frontend/rust-lib/flowy-document/src/services/doc/web_socket/local_ws_impl.rs rename to frontend/rust-lib/flowy-document/src/core/web_socket/local_ws_impl.rs index 0e25b9d9e1..e3be436cbd 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/local_ws_impl.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket/local_ws_impl.rs @@ -1,4 +1,4 @@ -use crate::services::doc::{web_socket::DocumentWebSocketManager, DocumentWSReceiver}; +use crate::core::{web_socket::DocumentWebSocketManager, DocumentWSReceiver}; use flowy_collaboration::entities::ws::DocumentServerWSData; use lib_ws::WSConnectState; use std::sync::Arc; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/mod.rs b/frontend/rust-lib/flowy-document/src/core/web_socket/mod.rs similarity index 100% rename from frontend/rust-lib/flowy-document/src/services/doc/web_socket/mod.rs rename to frontend/rust-lib/flowy-document/src/core/web_socket/mod.rs diff --git a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs b/frontend/rust-lib/flowy-document/src/core/web_socket/web_socket.rs similarity index 99% rename from frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs rename to frontend/rust-lib/flowy-document/src/core/web_socket/web_socket.rs index 54dde7b901..67d1590926 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket/web_socket.rs @@ -1,4 +1,4 @@ -use crate::services::doc::{ +use crate::core::{ web_socket::{DocumentWSSinkDataProvider, DocumentWSSteamConsumer, HttpWebSocketManager}, DocumentMD5, DocumentWSReceiver, @@ -18,7 +18,7 @@ use flowy_collaboration::{ use flowy_error::{internal_error, FlowyError, FlowyResult}; use lib_infra::future::FutureResult; -use crate::services::doc::web_socket::local_ws_impl::LocalWebSocketManager; +use crate::core::web_socket::local_ws_impl::LocalWebSocketManager; use flowy_collaboration::entities::ws::DocumentServerWSDataType; use lib_ot::rich_text::RichTextDelta; use lib_ws::WSConnectState; diff --git a/frontend/rust-lib/flowy-document/src/lib.rs b/frontend/rust-lib/flowy-document/src/lib.rs index da57a5a290..4079b04145 100644 --- a/frontend/rust-lib/flowy-document/src/lib.rs +++ b/frontend/rust-lib/flowy-document/src/lib.rs @@ -1,8 +1,10 @@ pub mod context; +pub(crate) mod controller; +pub mod core; mod notify; pub mod protobuf; -pub mod services; -mod sql_tables; +pub mod server; +mod ws_receivers; #[macro_use] extern crate flowy_database; diff --git a/frontend/rust-lib/flowy-document/src/services/server/middleware.rs b/frontend/rust-lib/flowy-document/src/server/middleware.rs similarity index 100% rename from frontend/rust-lib/flowy-document/src/services/server/middleware.rs rename to frontend/rust-lib/flowy-document/src/server/middleware.rs diff --git a/frontend/rust-lib/flowy-document/src/services/server/mod.rs b/frontend/rust-lib/flowy-document/src/server/mod.rs similarity index 100% rename from frontend/rust-lib/flowy-document/src/services/server/mod.rs rename to frontend/rust-lib/flowy-document/src/server/mod.rs diff --git a/frontend/rust-lib/flowy-document/src/services/server/server_api.rs b/frontend/rust-lib/flowy-document/src/server/server_api.rs similarity index 96% rename from frontend/rust-lib/flowy-document/src/services/server/server_api.rs rename to frontend/rust-lib/flowy-document/src/server/server_api.rs index 923b5c26ca..7273f96aaf 100644 --- a/frontend/rust-lib/flowy-document/src/services/server/server_api.rs +++ b/frontend/rust-lib/flowy-document/src/server/server_api.rs @@ -1,7 +1,8 @@ -use crate::{errors::FlowyError, services::server::DocumentServerAPI}; +use crate::errors::FlowyError; use backend_service::{configuration::*, request::HttpRequestBuilder}; use flowy_collaboration::entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams}; use lib_infra::future::FutureResult; +use crate::server::DocumentServerAPI; pub struct DocServer { config: ClientServerConfiguration, diff --git a/frontend/rust-lib/flowy-document/src/services/server/server_api_mock.rs b/frontend/rust-lib/flowy-document/src/server/server_api_mock.rs similarity index 92% rename from frontend/rust-lib/flowy-document/src/services/server/server_api_mock.rs rename to frontend/rust-lib/flowy-document/src/server/server_api_mock.rs index 67872601b2..941a2a8e6d 100644 --- a/frontend/rust-lib/flowy-document/src/services/server/server_api_mock.rs +++ b/frontend/rust-lib/flowy-document/src/server/server_api_mock.rs @@ -4,7 +4,8 @@ use flowy_collaboration::{ }; use lib_infra::future::FutureResult; -use crate::{errors::FlowyError, services::server::DocumentServerAPI}; +use crate::errors::FlowyError; +use crate::server::DocumentServerAPI; pub struct DocServerMock {} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs deleted file mode 100644 index d7c09897d1..0000000000 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs +++ /dev/null @@ -1,123 +0,0 @@ -use crate::services::doc::revision::RevisionRecord; - -use crate::sql_tables::{RevisionChangeset, RevisionTableSql}; -use diesel::SqliteConnection; -use flowy_collaboration::entities::revision::RevisionRange; -use flowy_database::ConnectionPool; -use flowy_error::{internal_error, FlowyError, FlowyResult}; -use std::{fmt::Debug, sync::Arc}; - -pub trait RevisionDiskCache: Sync + Send { - type Error: Debug; - fn write_revision_records( - &self, - revisions: Vec, - conn: &SqliteConnection, - ) -> Result<(), Self::Error>; - - // Read all the records if the rev_ids is None - fn read_revision_records( - &self, - doc_id: &str, - rev_ids: Option>, - ) -> Result, Self::Error>; - - fn read_revision_records_with_range( - &self, - doc_id: &str, - range: &RevisionRange, - ) -> Result, Self::Error>; - - fn update_revision_record(&self, changesets: Vec) -> FlowyResult<()>; - - // Delete all the records if the rev_ids is None - fn delete_revision_records( - &self, - doc_id: &str, - rev_ids: Option>, - conn: &SqliteConnection, - ) -> Result<(), Self::Error>; - - fn reset_with_revisions(&self, doc_id: &str, revision_records: Vec) -> Result<(), Self::Error>; - - fn db_pool(&self) -> Arc; -} - -pub(crate) struct Persistence { - user_id: String, - pub(crate) pool: Arc, -} - -impl RevisionDiskCache for Persistence { - type Error = FlowyError; - - fn write_revision_records( - &self, - revisions: Vec, - conn: &SqliteConnection, - ) -> Result<(), Self::Error> { - let _ = RevisionTableSql::create(revisions, conn)?; - Ok(()) - } - - fn read_revision_records( - &self, - doc_id: &str, - rev_ids: Option>, - ) -> Result, Self::Error> { - let conn = self.pool.get().map_err(internal_error)?; - let records = RevisionTableSql::read(&self.user_id, doc_id, rev_ids, &*conn)?; - Ok(records) - } - - fn read_revision_records_with_range( - &self, - doc_id: &str, - range: &RevisionRange, - ) -> Result, Self::Error> { - let conn = &*self.pool.get().map_err(internal_error)?; - let revisions = RevisionTableSql::read_with_range(&self.user_id, doc_id, range.clone(), conn)?; - Ok(revisions) - } - - fn update_revision_record(&self, changesets: Vec) -> FlowyResult<()> { - let conn = &*self.pool.get().map_err(internal_error)?; - let _ = conn.immediate_transaction::<_, FlowyError, _>(|| { - for changeset in changesets { - let _ = RevisionTableSql::update(changeset, conn)?; - } - Ok(()) - })?; - Ok(()) - } - - fn delete_revision_records( - &self, - doc_id: &str, - rev_ids: Option>, - conn: &SqliteConnection, - ) -> Result<(), Self::Error> { - let _ = RevisionTableSql::delete(doc_id, rev_ids, conn)?; - Ok(()) - } - - fn reset_with_revisions(&self, doc_id: &str, revision_records: Vec) -> Result<(), Self::Error> { - let conn = self.db_pool().get().map_err(internal_error)?; - conn.immediate_transaction::<_, FlowyError, _>(|| { - let _ = self.delete_revision_records(doc_id, None, &*conn)?; - let _ = self.write_revision_records(revision_records, &*conn)?; - Ok(()) - }) - } - - fn db_pool(&self) -> Arc { self.pool.clone() } -} - -impl Persistence { - pub(crate) fn new(user_id: &str, pool: Arc) -> Self { - Self { - user_id: user_id.to_owned(), - pool, - } - } -} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs deleted file mode 100644 index 431c0af4c7..0000000000 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -#![allow(clippy::module_inception)] -mod cache; -mod disk; -mod memory; - -pub use cache::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/sync.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/sync.rs deleted file mode 100644 index b9f454e9f5..0000000000 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/sync.rs +++ /dev/null @@ -1,71 +0,0 @@ -use crate::services::doc::revision::RevisionRecord; -use dashmap::DashMap; -use flowy_error::{FlowyError, FlowyResult}; -use lib_ot::errors::OTError; -use std::{collections::VecDeque, sync::Arc}; -use tokio::sync::RwLock; - -pub struct RevisionSyncSeq { - revs_map: Arc>, - local_revs: Arc>>, -} - -impl std::default::Default for RevisionSyncSeq { - fn default() -> Self { - let local_revs = Arc::new(RwLock::new(VecDeque::new())); - RevisionSyncSeq { - revs_map: Arc::new(DashMap::new()), - local_revs, - } - } -} - -impl RevisionSyncSeq { - pub fn new() -> Self { RevisionSyncSeq::default() } - - pub async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> { - // The last revision's rev_id must be greater than the new one. - if let Some(rev_id) = self.local_revs.read().await.back() { - if *rev_id >= record.revision.rev_id { - return Err(OTError::revision_id_conflict() - .context(format!("The new revision's id must be greater than {}", rev_id))); - } - } - self.local_revs.write().await.push_back(record.revision.rev_id); - self.revs_map.insert(record.revision.rev_id, record); - Ok(()) - } - - pub async fn ack_revision(&self, rev_id: &i64) -> FlowyResult<()> { - if let Some(pop_rev_id) = self.next_sync_rev_id().await { - if &pop_rev_id != rev_id { - let desc = format!( - "The ack rev_id:{} is not equal to the current rev_id:{}", - rev_id, pop_rev_id - ); - // tracing::error!("{}", desc); - return Err(FlowyError::internal().context(desc)); - } - - tracing::debug!("pop revision {}", pop_rev_id); - self.revs_map.remove(&pop_rev_id); - let _ = self.local_revs.write().await.pop_front(); - } - Ok(()) - } - - pub async fn next_sync_revision(&self) -> Option<(i64, RevisionRecord)> { - match self.local_revs.read().await.front() { - None => None, - Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())), - } - } - - pub async fn next_sync_rev_id(&self) -> Option { self.local_revs.read().await.front().copied() } -} - -#[cfg(feature = "flowy_unit_test")] -impl RevisionSyncSeq { - pub fn revs_map(&self) -> Arc> { self.revs_map.clone() } - pub fn pending_revs(&self) -> Arc>> { self.local_revs.clone() } -} diff --git a/frontend/rust-lib/flowy-document/src/services/mod.rs b/frontend/rust-lib/flowy-document/src/services/mod.rs deleted file mode 100644 index fdffae6ee3..0000000000 --- a/frontend/rust-lib/flowy-document/src/services/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub(crate) mod controller; -pub mod doc; -pub mod server; -mod ws_receivers; diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/doc_sql.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/doc_sql.rs deleted file mode 100644 index f434c42273..0000000000 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/doc_sql.rs +++ /dev/null @@ -1,47 +0,0 @@ -use crate::{ - errors::{internal_error, DocError}, - sql_tables::doc::{DocTable, DocTableChangeset}, -}; -use flowy_database::{ - prelude::*, - schema::{doc_table, doc_table::dsl}, - ConnectionPool, - SqliteConnection, -}; -use std::sync::Arc; - -pub struct DocTableSql {} - -impl DocTableSql { - pub(crate) fn create_doc_table(&self, doc_table: DocTable, conn: &SqliteConnection) -> Result<(), DocError> { - let _ = diesel::insert_into(doc_table::table).values(doc_table).execute(conn)?; - Ok(()) - } - - pub(crate) fn update_doc_table( - &self, - changeset: DocTableChangeset, - conn: &SqliteConnection, - ) -> Result<(), DocError> { - diesel_update_table!(doc_table, changeset, conn); - Ok(()) - } - - pub(crate) fn read_doc_table(&self, doc_id: &str, pool: Arc) -> Result { - let conn = &*pool.get().map_err(internal_error)?; - let doc_table = dsl::doc_table - .filter(doc_table::id.eq(doc_id)) - .first::(conn)?; - - Ok(doc_table) - } - - #[allow(dead_code)] - pub(crate) fn delete_doc(&self, doc_id: &str, conn: &SqliteConnection) -> Result { - let doc_table = dsl::doc_table - .filter(doc_table::id.eq(doc_id)) - .first::(conn)?; - diesel_delete_table!(doc_table, doc_id, conn); - Ok(doc_table) - } -} diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs deleted file mode 100644 index 74d3cb2283..0000000000 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs +++ /dev/null @@ -1,52 +0,0 @@ -use crate::entities::doc::Doc; -use flowy_database::schema::doc_table; - -#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)] -#[table_name = "doc_table"] -pub(crate) struct DocTable { - pub(crate) id: String, - pub(crate) data: String, - pub(crate) rev_id: i64, - pub(crate) base_rev_id: i64, -} - -impl DocTable { - pub fn new(doc: Doc) -> Self { - Self { - id: doc.id, - data: doc.data, - rev_id: doc.rev_id.into(), - base_rev_id: doc.base_rev_id.into(), - } - } -} - -#[derive(AsChangeset, Identifiable, Default, Debug)] -#[table_name = "doc_table"] -pub(crate) struct DocTableChangeset { - pub id: String, - pub data: String, - pub rev_id: i64, -} - -impl std::convert::Into for DocTable { - fn into(self) -> Doc { - Doc { - id: self.id, - data: self.data, - rev_id: self.rev_id.into(), - base_rev_id: self.base_rev_id.into(), - } - } -} - -impl std::convert::From for DocTable { - fn from(doc: Doc) -> Self { - Self { - id: doc.id, - data: doc.data, - rev_id: doc.rev_id.into(), - base_rev_id: doc.base_rev_id.into(), - } - } -} diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/mod.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/mod.rs deleted file mode 100644 index 1d96022349..0000000000 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod rev_sql; -mod rev_table; - -pub(crate) use rev_sql::*; -pub(crate) use rev_table::*; diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs deleted file mode 100644 index e077e225e0..0000000000 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs +++ /dev/null @@ -1,98 +0,0 @@ -use crate::{ - errors::FlowyError, - services::doc::revision::RevisionRecord, - sql_tables::{ - doc::RevisionTable, - mk_revision_record_from_table, - RevTableType, - RevisionChangeset, - RevisionTableState, - }, -}; -use diesel::update; -use flowy_collaboration::entities::revision::RevisionRange; -use flowy_database::{insert_or_ignore_into, prelude::*, schema::rev_table::dsl, SqliteConnection}; - -pub struct RevisionTableSql {} - -impl RevisionTableSql { - pub(crate) fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { - // Batch insert: https://diesel.rs/guides/all-about-inserts.html - let records = revision_records - .into_iter() - .map(|record| { - let rev_state: RevisionTableState = record.state.into(); - ( - dsl::doc_id.eq(record.revision.doc_id), - dsl::base_rev_id.eq(record.revision.base_rev_id), - dsl::rev_id.eq(record.revision.rev_id), - dsl::data.eq(record.revision.delta_data), - dsl::state.eq(rev_state), - dsl::ty.eq(RevTableType::Local), - ) - }) - .collect::>(); - - let _ = insert_or_ignore_into(dsl::rev_table).values(&records).execute(conn)?; - Ok(()) - } - - pub(crate) fn update(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> { - let filter = dsl::rev_table - .filter(dsl::rev_id.eq(changeset.rev_id.as_ref())) - .filter(dsl::doc_id.eq(changeset.doc_id)); - let _ = update(filter).set(dsl::state.eq(changeset.state)).execute(conn)?; - tracing::debug!("Set {} to {:?}", changeset.rev_id, changeset.state); - Ok(()) - } - - pub(crate) fn read( - user_id: &str, - doc_id: &str, - rev_ids: Option>, - conn: &SqliteConnection, - ) -> Result, FlowyError> { - let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(doc_id)).into_boxed(); - if let Some(rev_ids) = rev_ids { - sql = sql.filter(dsl::rev_id.eq_any(rev_ids)); - } - let rows = sql.order(dsl::rev_id.asc()).load::(conn)?; - let records = rows - .into_iter() - .map(|row| mk_revision_record_from_table(user_id, row)) - .collect::>(); - - Ok(records) - } - - pub(crate) fn read_with_range( - user_id: &str, - doc_id: &str, - range: RevisionRange, - conn: &SqliteConnection, - ) -> Result, FlowyError> { - let rev_tables = dsl::rev_table - .filter(dsl::rev_id.ge(range.start)) - .filter(dsl::rev_id.le(range.end)) - .filter(dsl::doc_id.eq(doc_id)) - .order(dsl::rev_id.asc()) - .load::(conn)?; - - let revisions = rev_tables - .into_iter() - .map(|table| mk_revision_record_from_table(user_id, table)) - .collect::>(); - Ok(revisions) - } - - pub(crate) fn delete(doc_id: &str, rev_ids: Option>, conn: &SqliteConnection) -> Result<(), FlowyError> { - let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(doc_id)).into_boxed(); - if let Some(rev_ids) = rev_ids { - sql = sql.filter(dsl::rev_id.eq_any(rev_ids)); - } - - let affected_row = sql.execute(conn)?; - tracing::debug!("Delete {} revision rows", affected_row); - Ok(()) - } -} diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs deleted file mode 100644 index 34a0143d63..0000000000 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs +++ /dev/null @@ -1,138 +0,0 @@ -use crate::services::doc::revision::RevisionRecord; -use bytes::Bytes; -use diesel::sql_types::Integer; -use flowy_collaboration::{ - entities::revision::{RevId, RevType, Revision, RevisionState}, - util::md5, -}; -use flowy_database::schema::rev_table; - -#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)] -#[table_name = "rev_table"] -pub(crate) struct RevisionTable { - id: i32, - pub(crate) doc_id: String, - pub(crate) base_rev_id: i64, - pub(crate) rev_id: i64, - pub(crate) data: Vec, - pub(crate) state: RevisionTableState, - pub(crate) ty: RevTableType, // Deprecated -} - -#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)] -#[repr(i32)] -#[sql_type = "Integer"] -pub enum RevisionTableState { - Local = 0, - Ack = 1, -} - -impl std::default::Default for RevisionTableState { - fn default() -> Self { RevisionTableState::Local } -} - -impl std::convert::From for RevisionTableState { - fn from(value: i32) -> Self { - match value { - 0 => RevisionTableState::Local, - 1 => RevisionTableState::Ack, - o => { - log::error!("Unsupported rev state {}, fallback to RevState::Local", o); - RevisionTableState::Local - }, - } - } -} - -impl RevisionTableState { - pub fn value(&self) -> i32 { *self as i32 } -} -impl_sql_integer_expression!(RevisionTableState); - -impl std::convert::From for RevisionState { - fn from(s: RevisionTableState) -> Self { - match s { - RevisionTableState::Local => RevisionState::Local, - RevisionTableState::Ack => RevisionState::Ack, - } - } -} - -impl std::convert::From for RevisionTableState { - fn from(s: RevisionState) -> Self { - match s { - RevisionState::Local => RevisionTableState::Local, - RevisionState::Ack => RevisionTableState::Ack, - } - } -} - -pub(crate) fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> RevisionRecord { - let md5 = md5(&table.data); - let revision = Revision::new( - &table.doc_id, - table.base_rev_id, - table.rev_id, - Bytes::from(table.data), - &user_id, - md5, - ); - RevisionRecord { - revision, - state: table.state.into(), - write_to_disk: false, - } -} - -#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)] -#[repr(i32)] -#[sql_type = "Integer"] -pub enum RevTableType { - Local = 0, - Remote = 1, -} - -impl std::default::Default for RevTableType { - fn default() -> Self { RevTableType::Local } -} - -impl std::convert::From for RevTableType { - fn from(value: i32) -> Self { - match value { - 0 => RevTableType::Local, - 1 => RevTableType::Remote, - o => { - log::error!("Unsupported rev type {}, fallback to RevTableType::Local", o); - RevTableType::Local - }, - } - } -} -impl RevTableType { - pub fn value(&self) -> i32 { *self as i32 } -} -impl_sql_integer_expression!(RevTableType); - -impl std::convert::From for RevTableType { - fn from(ty: RevType) -> Self { - match ty { - RevType::DeprecatedLocal => RevTableType::Local, - RevType::DeprecatedRemote => RevTableType::Remote, - } - } -} - -impl std::convert::From for RevType { - fn from(ty: RevTableType) -> Self { - match ty { - RevTableType::Local => RevType::DeprecatedLocal, - RevTableType::Remote => RevType::DeprecatedRemote, - } - } -} - -pub struct RevisionChangeset { - pub(crate) doc_id: String, - pub(crate) rev_id: RevId, - pub(crate) state: RevisionTableState, -} diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/mod.rs b/frontend/rust-lib/flowy-document/src/sql_tables/mod.rs deleted file mode 100644 index 0bcdb06e6e..0000000000 --- a/frontend/rust-lib/flowy-document/src/sql_tables/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub(crate) mod doc; - -pub(crate) use doc::*; diff --git a/frontend/rust-lib/flowy-document/src/services/ws_receivers.rs b/frontend/rust-lib/flowy-document/src/ws_receivers.rs similarity index 100% rename from frontend/rust-lib/flowy-document/src/services/ws_receivers.rs rename to frontend/rust-lib/flowy-document/src/ws_receivers.rs diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index ee1a7fa244..74ccb222d4 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -3,8 +3,8 @@ use flowy_collaboration::entities::ws::DocumentClientWSData; use flowy_database::ConnectionPool; use flowy_document::{ context::DocumentUser, + core::{DocumentWSReceivers, DocumentWebSocket, WSStateReceiver}, errors::{internal_error, FlowyError}, - services::doc::{DocumentWSReceivers, DocumentWebSocket, WSStateReceiver}, }; use flowy_net::services::ws::FlowyWSConnect; use flowy_user::services::user::UserSession; diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index 8a7e176a7d..bca2760b16 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -2,7 +2,7 @@ mod deps_resolve; pub mod module; use crate::deps_resolve::{DocumentDepsResolver, WorkspaceDepsResolver}; use backend_service::configuration::ClientServerConfiguration; -use flowy_core::{errors::FlowyError, module::init_core, prelude::CoreContext}; +use flowy_core::{context::CoreContext, errors::FlowyError, module::init_core}; use flowy_document::context::DocumentContext; use flowy_net::{ entities::NetworkType, diff --git a/frontend/rust-lib/flowy-sdk/src/module.rs b/frontend/rust-lib/flowy-sdk/src/module.rs index 0175d59e2a..af6a7ce455 100644 --- a/frontend/rust-lib/flowy-sdk/src/module.rs +++ b/frontend/rust-lib/flowy-sdk/src/module.rs @@ -1,5 +1,4 @@ -use flowy_core::prelude::CoreContext; - +use flowy_core::context::CoreContext; use flowy_net::services::ws::FlowyWSConnect; use flowy_user::services::user::UserSession; use lib_dispatch::prelude::Module; diff --git a/frontend/rust-lib/flowy-test/src/doc_script.rs b/frontend/rust-lib/flowy-test/src/doc_script.rs index bf3243e294..4111b9eac6 100644 --- a/frontend/rust-lib/flowy-test/src/doc_script.rs +++ b/frontend/rust-lib/flowy-test/src/doc_script.rs @@ -1,6 +1,6 @@ use crate::{helper::ViewTest, FlowySDKTest}; use flowy_collaboration::entities::revision::RevisionState; -use flowy_document::services::doc::{edit::ClientDocumentEditor, SYNC_INTERVAL_IN_MILLIS}; +use flowy_document::core::{edit::ClientDocumentEditor, SYNC_INTERVAL_IN_MILLIS}; use lib_ot::{core::Interval, rich_text::RichTextDelta}; use std::sync::Arc; use tokio::time::{sleep, Duration}; diff --git a/frontend/rust-lib/flowy-user/src/services/server/server_api.rs b/frontend/rust-lib/flowy-user/src/services/server/server_api.rs index 6fc29af1bb..5161849b37 100644 --- a/frontend/rust-lib/flowy-user/src/services/server/server_api.rs +++ b/frontend/rust-lib/flowy-user/src/services/server/server_api.rs @@ -3,7 +3,7 @@ use crate::{ errors::FlowyError, services::server::UserServerAPI, }; -use backend_service::{configuration::*, user_request::*}; +use backend_service::{configuration::*, http_request::*}; use lib_infra::future::FutureResult; pub struct UserHttpServer { diff --git a/shared-lib/backend-service/src/workspace_request.rs b/shared-lib/backend-service/src/http_request.rs similarity index 77% rename from shared-lib/backend-service/src/workspace_request.rs rename to shared-lib/backend-service/src/http_request.rs index 2249ec4d26..015bd358bb 100644 --- a/shared-lib/backend-service/src/workspace_request.rs +++ b/shared-lib/backend-service/src/http_request.rs @@ -1,10 +1,57 @@ use crate::{configuration::HEADER_TOKEN, errors::ServerError, request::HttpRequestBuilder}; use flowy_core_data_model::entities::prelude::*; +use flowy_user_data_model::entities::prelude::*; pub(crate) fn request_builder() -> HttpRequestBuilder { HttpRequestBuilder::new().middleware(crate::middleware::BACKEND_API_MIDDLEWARE.clone()) } +pub async fn user_sign_up_request(params: SignUpParams, url: &str) -> Result { + let response = request_builder() + .post(&url.to_owned()) + .protobuf(params)? + .response() + .await?; + Ok(response) +} + +pub async fn user_sign_in_request(params: SignInParams, url: &str) -> Result { + let response = request_builder() + .post(&url.to_owned()) + .protobuf(params)? + .response() + .await?; + Ok(response) +} + +pub async fn user_sign_out_request(token: &str, url: &str) -> Result<(), ServerError> { + let _ = request_builder() + .delete(&url.to_owned()) + .header(HEADER_TOKEN, token) + .send() + .await?; + Ok(()) +} + +pub async fn get_user_profile_request(token: &str, url: &str) -> Result { + let user_profile = request_builder() + .get(&url.to_owned()) + .header(HEADER_TOKEN, token) + .response() + .await?; + Ok(user_profile) +} + +pub async fn update_user_profile_request(token: &str, params: UpdateUserParams, url: &str) -> Result<(), ServerError> { + let _ = request_builder() + .patch(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .send() + .await?; + Ok(()) +} + pub async fn create_workspace_request( token: &str, params: CreateWorkspaceParams, diff --git a/shared-lib/backend-service/src/lib.rs b/shared-lib/backend-service/src/lib.rs index fddeb9cffc..6bd5aa18c4 100644 --- a/shared-lib/backend-service/src/lib.rs +++ b/shared-lib/backend-service/src/lib.rs @@ -1,7 +1,6 @@ pub mod configuration; pub mod errors; +pub mod http_request; pub mod middleware; pub mod request; pub mod response; -pub mod user_request; -pub mod workspace_request; diff --git a/shared-lib/backend-service/src/user_request.rs b/shared-lib/backend-service/src/user_request.rs deleted file mode 100644 index 92043719c7..0000000000 --- a/shared-lib/backend-service/src/user_request.rs +++ /dev/null @@ -1,52 +0,0 @@ -use crate::{configuration::HEADER_TOKEN, errors::ServerError, request::HttpRequestBuilder}; -use flowy_user_data_model::entities::prelude::*; - -pub(crate) fn request_builder() -> HttpRequestBuilder { - HttpRequestBuilder::new().middleware(crate::middleware::BACKEND_API_MIDDLEWARE.clone()) -} - -pub async fn user_sign_up_request(params: SignUpParams, url: &str) -> Result { - let response = request_builder() - .post(&url.to_owned()) - .protobuf(params)? - .response() - .await?; - Ok(response) -} - -pub async fn user_sign_in_request(params: SignInParams, url: &str) -> Result { - let response = request_builder() - .post(&url.to_owned()) - .protobuf(params)? - .response() - .await?; - Ok(response) -} - -pub async fn user_sign_out_request(token: &str, url: &str) -> Result<(), ServerError> { - let _ = request_builder() - .delete(&url.to_owned()) - .header(HEADER_TOKEN, token) - .send() - .await?; - Ok(()) -} - -pub async fn get_user_profile_request(token: &str, url: &str) -> Result { - let user_profile = request_builder() - .get(&url.to_owned()) - .header(HEADER_TOKEN, token) - .response() - .await?; - Ok(user_profile) -} - -pub async fn update_user_profile_request(token: &str, params: UpdateUserParams, url: &str) -> Result<(), ServerError> { - let _ = request_builder() - .patch(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .send() - .await?; - Ok(()) -}