From d1c5df4b8818e9bdeaa9df0018a07a06dc98bb51 Mon Sep 17 00:00:00 2001 From: appflowy Date: Thu, 20 Jan 2022 23:51:11 +0800 Subject: [PATCH] add conflict resolver --- .../src/services/folder/view/controller.rs | 9 +- .../rust-lib/flowy-core/src/controller.rs | 49 ++- frontend/rust-lib/flowy-core/src/module.rs | 28 +- .../flowy-core/src/services/app/controller.rs | 111 ++++--- .../src/services/app/event_handler.rs | 3 +- .../flowy-core/src/services/folder_editor.rs | 99 ++++++ .../rust-lib/flowy-core/src/services/mod.rs | 2 + .../src/services/persistence/migration.rs | 8 +- .../src/services/persistence/mod.rs | 37 +-- .../services/persistence/version_2/v2_impl.rs | 127 ++------ .../src/services/trash/controller.rs | 89 +++--- .../src/services/trash/event_handler.rs | 2 +- .../src/services/view/controller.rs | 157 ++++++---- .../src/services/view/event_handler.rs | 3 +- .../flowy-core/src/services/web_socket.rs | 85 +++++ .../src/services/workspace/controller.rs | 47 ++- .../src/services/workspace/event_handler.rs | 81 ++--- .../rust-lib/flowy-document/src/controller.rs | 17 +- .../rust-lib/flowy-document/src/core/queue.rs | 72 ++--- .../flowy-document/src/core/web_socket.rs | 291 +++++------------- .../{core_deps.rs => folder_deps.rs} | 17 +- .../flowy-sdk/src/deps_resolve/mod.rs | 4 +- frontend/rust-lib/flowy-sdk/src/lib.rs | 2 +- .../flowy-sync/src/conflict_resolve.rs | 182 +++++++++++ frontend/rust-lib/flowy-sync/src/lib.rs | 2 + .../rust-lib/flowy-sync/src/ws_manager.rs | 130 +++++++- frontend/rust-lib/flowy-test/src/lib.rs | 15 +- .../flowy-collaboration/src/folder/builder.rs | 69 +++++ .../src/folder/folder_pad.rs | 41 +-- .../flowy-collaboration/src/folder/mod.rs | 1 + shared-lib/flowy-collaboration/src/util.rs | 19 +- 31 files changed, 1078 insertions(+), 721 deletions(-) create mode 100644 frontend/rust-lib/flowy-core/src/services/folder_editor.rs create mode 100644 frontend/rust-lib/flowy-core/src/services/web_socket.rs rename frontend/rust-lib/flowy-sdk/src/deps_resolve/{core_deps.rs => folder_deps.rs} (88%) create mode 100644 frontend/rust-lib/flowy-sync/src/conflict_resolve.rs create mode 100644 shared-lib/flowy-collaboration/src/folder/builder.rs diff --git a/backend/src/services/folder/view/controller.rs b/backend/src/services/folder/view/controller.rs index 01cd6bb1eb..9bd18c874f 100644 --- a/backend/src/services/folder/view/controller.rs +++ b/backend/src/services/folder/view/controller.rs @@ -7,9 +7,10 @@ use crate::{ util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder}, }; use backend_service::errors::{invalid_params, ServerError}; -use bytes::Bytes; + use chrono::Utc; use flowy_collaboration::{ + client_document::default::initial_delta, entities::revision::{RepeatedRevision, Revision}, protobuf::CreateDocParams as CreateDocParamsPB, }; @@ -90,9 +91,9 @@ pub(crate) async fn create_view( .await .map_err(map_sqlx_error)?; - let delta_data = Bytes::from(params.view_data); - let md5 = format!("{:x}", md5::compute(&delta_data)); - let revision = Revision::new(&view.id, 0, 0, delta_data, user_id, md5); + let initial_delta_data = initial_delta().to_bytes(); + let md5 = format!("{:x}", md5::compute(&initial_delta_data)); + let revision = Revision::new(&view.id, 0, 0, initial_delta_data, user_id, md5); let repeated_revision = RepeatedRevision::new(vec![revision]); let mut create_doc_params = CreateDocParamsPB::new(); create_doc_params.set_revisions(repeated_revision.try_into().unwrap()); diff --git a/frontend/rust-lib/flowy-core/src/controller.rs b/frontend/rust-lib/flowy-core/src/controller.rs index e53a2ee2cf..0d882ee5a5 100644 --- a/frontend/rust-lib/flowy-core/src/controller.rs +++ b/frontend/rust-lib/flowy-core/src/controller.rs @@ -6,16 +6,18 @@ use flowy_document::context::DocumentContext; use flowy_sync::RevisionWebSocket; use lazy_static::lazy_static; -use flowy_collaboration::folder::FolderPad; +use flowy_collaboration::{entities::ws_data::ServerRevisionWSData, folder::FolderPad}; use parking_lot::RwLock; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, convert::TryInto, sync::Arc}; +use tokio::sync::RwLock as TokioRwLock; use crate::{ dart_notification::{send_dart_notification, WorkspaceNotification}, entities::workspace::RepeatedWorkspace, errors::FlowyResult, - module::{FolderCouldServiceV1, WorkspaceUser}, + module::{FolderCouldServiceV1, WorkspaceDatabase, WorkspaceUser}, services::{ + folder_editor::FolderEditor, persistence::FolderPersistence, set_current_workspace, AppController, @@ -33,25 +35,29 @@ pub struct FolderManager { pub user: Arc, pub(crate) cloud_service: Arc, pub(crate) persistence: Arc, - pub workspace_controller: Arc, + pub(crate) workspace_controller: Arc, pub(crate) app_controller: Arc, pub(crate) view_controller: Arc, pub(crate) trash_controller: Arc, - ws_sender: Arc, + web_socket: Arc, + folder_editor: Arc>>>, } impl FolderManager { - pub(crate) fn new( + pub fn new( user: Arc, cloud_service: Arc, - persistence: Arc, + database: Arc, flowy_document: Arc, - ws_sender: Arc, + web_socket: Arc, ) -> Self { if let Ok(token) = user.token() { INIT_FOLDER_FLAG.write().insert(token, false); } + let folder_editor = Arc::new(TokioRwLock::new(None)); + let persistence = Arc::new(FolderPersistence::new(database.clone(), folder_editor.clone())); + let trash_controller = Arc::new(TrashController::new( persistence.clone(), cloud_service.clone(), @@ -88,7 +94,8 @@ impl FolderManager { app_controller, view_controller, trash_controller, - ws_sender, + web_socket, + folder_editor, } } @@ -101,7 +108,21 @@ impl FolderManager { // } // } - pub async fn did_receive_ws_data(&self, _data: Bytes) {} + pub async fn did_receive_ws_data(&self, data: Bytes) { + let result: Result = data.try_into(); + match result { + Ok(data) => match self.folder_editor.read().await.clone() { + None => {}, + Some(editor) => match editor.receive_ws_data(data).await { + Ok(_) => {}, + Err(e) => tracing::error!("Folder receive data error: {:?}", e), + }, + }, + Err(e) => { + tracing::error!("Folder ws data parser failed: {:?}", e); + }, + } + } pub async fn initialize(&self, user_id: &str) -> FlowyResult<()> { if let Some(is_init) = INIT_FOLDER_FLAG.read().get(user_id) { @@ -110,6 +131,12 @@ impl FolderManager { } } let _ = self.persistence.initialize(user_id).await?; + + let token = self.user.token()?; + let pool = self.persistence.db_pool()?; + let folder_editor = FolderEditor::new(user_id, &token, pool, self.web_socket.clone()).await?; + *self.folder_editor.write().await = Some(Arc::new(folder_editor)); + let _ = self.app_controller.initialize()?; let _ = self.view_controller.initialize()?; INIT_FOLDER_FLAG.write().insert(user_id.to_owned(), true); @@ -121,7 +148,7 @@ impl FolderManager { self.initialize(user_id).await } - pub async fn clear(&self) { self.persistence.user_did_logout() } + pub async fn clear(&self) { *self.folder_editor.write().await = None; } } struct DefaultFolderBuilder(); diff --git a/frontend/rust-lib/flowy-core/src/module.rs b/frontend/rust-lib/flowy-core/src/module.rs index 6123fc6dd0..603f5a5ce2 100644 --- a/frontend/rust-lib/flowy-core/src/module.rs +++ b/frontend/rust-lib/flowy-core/src/module.rs @@ -8,17 +8,10 @@ use crate::{ }, errors::FlowyError, event::WorkspaceEvent, - services::{ - app::event_handler::*, - persistence::FolderPersistence, - trash::event_handler::*, - view::event_handler::*, - workspace::event_handler::*, - }, + services::{app::event_handler::*, trash::event_handler::*, view::event_handler::*, workspace::event_handler::*}, }; use flowy_database::DBConnection; -use flowy_document::context::DocumentContext; -use flowy_sync::RevisionWebSocket; + use lib_dispatch::prelude::*; use lib_infra::future::FutureResult; use lib_sqlite::ConnectionPool; @@ -41,23 +34,6 @@ pub trait WorkspaceDatabase: Send + Sync { } } -pub fn init_folder( - user: Arc, - database: Arc, - flowy_document: Arc, - cloud_service: Arc, - ws_sender: Arc, -) -> Arc { - let persistence = Arc::new(FolderPersistence::new(user.clone(), database.clone())); - Arc::new(FolderManager::new( - user, - cloud_service, - persistence, - flowy_document, - ws_sender, - )) -} - pub fn create(folder: Arc) -> Module { let mut module = Module::new() .name("Flowy-Workspace") diff --git a/frontend/rust-lib/flowy-core/src/services/app/controller.rs b/frontend/rust-lib/flowy-core/src/services/app/controller.rs index eb302cb08d..ae74e97672 100644 --- a/frontend/rust-lib/flowy-core/src/services/app/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/app/controller.rs @@ -50,23 +50,29 @@ impl AppController { } pub(crate) async fn create_app_on_local(&self, app: App) -> Result { - let _ = self.persistence.begin_transaction(|transaction| { - let _ = transaction.create_app(app.clone())?; - let _ = notify_apps_changed(&app.workspace_id, self.trash_controller.clone(), &transaction)?; - Ok(()) - })?; + let _ = self + .persistence + .begin_transaction(|transaction| { + let _ = transaction.create_app(app.clone())?; + let _ = notify_apps_changed(&app.workspace_id, self.trash_controller.clone(), &transaction)?; + Ok(()) + }) + .await?; Ok(app) } pub(crate) async fn read_app(&self, params: AppId) -> Result { - let app = self.persistence.begin_transaction(|transaction| { - let app = transaction.read_app(¶ms.app_id)?; - let trash_ids = self.trash_controller.read_trash_ids(&transaction)?; - if trash_ids.contains(&app.id) { - return Err(FlowyError::record_not_found()); - } - Ok(app) - })?; + let app = self + .persistence + .begin_transaction(|transaction| { + let app = transaction.read_app(¶ms.app_id)?; + let trash_ids = self.trash_controller.read_trash_ids(&transaction)?; + if trash_ids.contains(&app.id) { + return Err(FlowyError::record_not_found()); + } + Ok(app) + }) + .await?; let _ = self.read_app_on_server(params)?; Ok(app) } @@ -75,11 +81,14 @@ impl AppController { let changeset = AppChangeset::new(params.clone()); let app_id = changeset.id.clone(); - let app = self.persistence.begin_transaction(|transaction| { - let _ = transaction.update_app(changeset)?; - let app = transaction.read_app(&app_id)?; - Ok(app) - })?; + let app = self + .persistence + .begin_transaction(|transaction| { + let _ = transaction.update_app(changeset)?; + let app = transaction.read_app(&app_id)?; + Ok(app) + }) + .await?; send_dart_notification(&app_id, WorkspaceNotification::AppUpdated) .payload(app) .send(); @@ -87,14 +96,17 @@ impl AppController { Ok(()) } - pub(crate) fn read_local_apps(&self, ids: Vec) -> Result, FlowyError> { - let apps = self.persistence.begin_transaction(|transaction| { - let mut apps = vec![]; - for id in ids { - apps.push(transaction.read_app(&id)?); - } - Ok(apps) - })?; + pub(crate) async fn read_local_apps(&self, ids: Vec) -> Result, FlowyError> { + let apps = self + .persistence + .begin_transaction(|transaction| { + let mut apps = vec![]; + for id in ids { + apps.push(transaction.read_app(&id)?); + } + Ok(apps) + }) + .await?; Ok(apps) } } @@ -131,7 +143,10 @@ impl AppController { tokio::spawn(async move { match server.read_app(&token, params).await { Ok(Some(app)) => { - match persistence.begin_transaction(|transaction| transaction.create_app(app.clone())) { + match persistence + .begin_transaction(|transaction| transaction.create_app(app.clone())) + .await + { Ok(_) => { send_dart_notification(&app.id, WorkspaceNotification::AppUpdated) .payload(app) @@ -175,29 +190,33 @@ async fn handle_trash_event( ) { match event { TrashEvent::NewTrash(identifiers, ret) | TrashEvent::Putback(identifiers, ret) => { - let result = persistence.begin_transaction(|transaction| { - for identifier in identifiers.items { - let app = transaction.read_app(&identifier.id)?; - let _ = notify_apps_changed(&app.workspace_id, trash_controller.clone(), &transaction)?; - } - Ok(()) - }); + let result = persistence + .begin_transaction(|transaction| { + for identifier in identifiers.items { + let app = transaction.read_app(&identifier.id)?; + let _ = notify_apps_changed(&app.workspace_id, trash_controller.clone(), &transaction)?; + } + Ok(()) + }) + .await; let _ = ret.send(result).await; }, TrashEvent::Delete(identifiers, ret) => { - let result = persistence.begin_transaction(|transaction| { - let mut notify_ids = HashSet::new(); - for identifier in identifiers.items { - let app = transaction.read_app(&identifier.id)?; - let _ = transaction.delete_app(&identifier.id)?; - notify_ids.insert(app.workspace_id); - } + let result = persistence + .begin_transaction(|transaction| { + let mut notify_ids = HashSet::new(); + for identifier in identifiers.items { + let app = transaction.read_app(&identifier.id)?; + let _ = transaction.delete_app(&identifier.id)?; + notify_ids.insert(app.workspace_id); + } - for notify_id in notify_ids { - let _ = notify_apps_changed(¬ify_id, trash_controller.clone(), &transaction)?; - } - Ok(()) - }); + for notify_id in notify_ids { + let _ = notify_apps_changed(¬ify_id, trash_controller.clone(), &transaction)?; + } + Ok(()) + }) + .await; let _ = ret.send(result).await; }, } diff --git a/frontend/rust-lib/flowy-core/src/services/app/event_handler.rs b/frontend/rust-lib/flowy-core/src/services/app/event_handler.rs index 94e503a035..c8d9b58824 100644 --- a/frontend/rust-lib/flowy-core/src/services/app/event_handler.rs +++ b/frontend/rust-lib/flowy-core/src/services/app/event_handler.rs @@ -26,7 +26,8 @@ pub(crate) async fn delete_app_handler( ) -> Result<(), FlowyError> { let params: AppId = data.into_inner().try_into()?; let trash = app_controller - .read_local_apps(vec![params.app_id])? + .read_local_apps(vec![params.app_id]) + .await? .into_iter() .map(|app| app.into()) .collect::>(); diff --git a/frontend/rust-lib/flowy-core/src/services/folder_editor.rs b/frontend/rust-lib/flowy-core/src/services/folder_editor.rs new file mode 100644 index 0000000000..f4aa1b3347 --- /dev/null +++ b/frontend/rust-lib/flowy-core/src/services/folder_editor.rs @@ -0,0 +1,99 @@ +use crate::services::{persistence::FOLDER_ID, web_socket::make_folder_ws_manager}; +use flowy_collaboration::{ + entities::{revision::Revision, ws_data::ServerRevisionWSData}, + folder::{FolderChange, FolderPad}, +}; + +use flowy_error::{FlowyError, FlowyResult}; +use flowy_sync::{ + RevisionCache, + RevisionCloudService, + RevisionManager, + RevisionObjectBuilder, + RevisionWebSocket, + RevisionWebSocketManager, +}; +use lib_infra::future::FutureResult; +use lib_sqlite::ConnectionPool; +use parking_lot::RwLock; +use std::sync::Arc; + +pub struct FolderEditor { + user_id: String, + pub(crate) folder: Arc>, + rev_manager: Arc, + ws_manager: Arc, +} + +impl FolderEditor { + pub async fn new( + user_id: &str, + token: &str, + pool: Arc, + web_socket: Arc, + ) -> FlowyResult { + let cache = Arc::new(RevisionCache::new(user_id, FOLDER_ID, pool)); + let mut rev_manager = RevisionManager::new(user_id, FOLDER_ID, cache); + let cloud = Arc::new(FolderRevisionCloudServiceImpl { + token: token.to_string(), + }); + let folder_pad = Arc::new(RwLock::new(rev_manager.load::(cloud).await?)); + let rev_manager = Arc::new(rev_manager); + let ws_manager = make_folder_ws_manager(rev_manager.clone(), web_socket, folder_pad.clone()).await; + + let user_id = user_id.to_owned(); + Ok(Self { + user_id, + folder: folder_pad, + rev_manager, + ws_manager, + }) + } + + pub async fn receive_ws_data(&self, data: ServerRevisionWSData) -> FlowyResult<()> { + let _ = self.ws_manager.ws_passthrough_tx.send(data).await.map_err(|e| { + let err_msg = format!("{} passthrough error: {}", FOLDER_ID, e); + FlowyError::internal().context(err_msg) + })?; + Ok(()) + } + + pub(crate) fn apply_change(&self, change: FolderChange) -> FlowyResult<()> { + let FolderChange { delta, md5 } = change; + let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair(); + let delta_data = delta.to_bytes(); + let revision = Revision::new( + &self.rev_manager.object_id, + base_rev_id, + rev_id, + delta_data, + &self.user_id, + md5, + ); + let _ = futures::executor::block_on(async { self.rev_manager.add_local_revision(&revision).await })?; + Ok(()) + } +} + +struct FolderPadBuilder(); +impl RevisionObjectBuilder for FolderPadBuilder { + type Output = FolderPad; + + fn build_with_revisions(_object_id: &str, revisions: Vec) -> FlowyResult { + let pad = FolderPad::from_revisions(revisions)?; + Ok(pad) + } +} + +struct FolderRevisionCloudServiceImpl { + #[allow(dead_code)] + token: String, + // server: Arc, +} + +impl RevisionCloudService for FolderRevisionCloudServiceImpl { + #[tracing::instrument(level = "debug", skip(self))] + fn fetch_object(&self, _user_id: &str, _object_id: &str) -> FutureResult, FlowyError> { + FutureResult::new(async move { Ok(vec![]) }) + } +} diff --git a/frontend/rust-lib/flowy-core/src/services/mod.rs b/frontend/rust-lib/flowy-core/src/services/mod.rs index 9481a853ad..606d4bdb70 100644 --- a/frontend/rust-lib/flowy-core/src/services/mod.rs +++ b/frontend/rust-lib/flowy-core/src/services/mod.rs @@ -4,7 +4,9 @@ pub(crate) use view::controller::*; pub(crate) use workspace::controller::*; pub(crate) mod app; +pub(crate) mod folder_editor; pub(crate) mod persistence; pub(crate) mod trash; pub(crate) mod view; +mod web_socket; pub(crate) mod workspace; diff --git a/frontend/rust-lib/flowy-core/src/services/persistence/migration.rs b/frontend/rust-lib/flowy-core/src/services/persistence/migration.rs index f32021e565..27c6229d36 100644 --- a/frontend/rust-lib/flowy-core/src/services/persistence/migration.rs +++ b/frontend/rust-lib/flowy-core/src/services/persistence/migration.rs @@ -1,11 +1,8 @@ use crate::{ module::WorkspaceDatabase, - services::persistence::{AppTableSql, TrashTableSql, ViewTableSql, WorkspaceTableSql, FOLDER_ID}, -}; -use flowy_collaboration::{ - entities::revision::{md5, Revision}, - folder::FolderPad, + services::persistence::{AppTableSql, TrashTableSql, ViewTableSql, WorkspaceTableSql}, }; +use flowy_collaboration::{entities::revision::md5, folder::FolderPad}; use flowy_core_data_model::entities::{ app::{App, RepeatedApp}, view::{RepeatedView, View}, @@ -13,7 +10,6 @@ use flowy_core_data_model::entities::{ }; use flowy_database::kv::KV; use flowy_error::{FlowyError, FlowyResult}; -use flowy_sync::{RevisionCache, RevisionManager}; use std::sync::Arc; pub(crate) const V1_MIGRATION: &str = "FOLDER_V1_MIGRATION"; diff --git a/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs b/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs index e2011bdf7a..b659544481 100644 --- a/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs +++ b/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs @@ -6,13 +6,13 @@ use flowy_collaboration::{ entities::revision::{Revision, RevisionState}, folder::FolderPad, }; -use parking_lot::RwLock; use std::sync::Arc; +use tokio::sync::RwLock; pub use version_1::{app_sql::*, trash_sql::*, v1_impl::V1Transaction, view_sql::*, workspace_sql::*}; use crate::{ module::{WorkspaceDatabase, WorkspaceUser}, - services::persistence::{migration::FolderMigration, version_2::v2_impl::FolderEditor}, + services::{folder_editor::FolderEditor, persistence::migration::FolderMigration}, }; use flowy_core_data_model::entities::{ app::App, @@ -22,7 +22,8 @@ use flowy_core_data_model::entities::{ workspace::Workspace, }; use flowy_error::{FlowyError, FlowyResult}; -use flowy_sync::{mk_revision_disk_cache, RevisionCache, RevisionManager, RevisionRecord}; +use flowy_sync::{mk_revision_disk_cache, RevisionRecord}; +use lib_sqlite::ConnectionPool; pub const FOLDER_ID: &str = "flowy_folder"; @@ -50,16 +51,13 @@ pub trait FolderPersistenceTransaction { } pub struct FolderPersistence { - user: Arc, database: Arc, - folder_editor: RwLock>>, + folder_editor: Arc>>>, } impl FolderPersistence { - pub fn new(user: Arc, database: Arc) -> Self { - let folder_editor = RwLock::new(None); + pub fn new(database: Arc, folder_editor: Arc>>>) -> Self { Self { - user, database, folder_editor, } @@ -89,21 +87,17 @@ impl FolderPersistence { conn.immediate_transaction::<_, FlowyError, _>(|| f(Box::new(V1Transaction(&conn)))) } - pub fn begin_transaction(&self, f: F) -> FlowyResult + pub async fn begin_transaction(&self, f: F) -> FlowyResult where F: FnOnce(Arc) -> FlowyResult, { - match self.folder_editor.read().clone() { - None => { - tracing::error!("FolderEditor should be initialized after user login in."); - let editor = futures::executor::block_on(async { self.init_folder_editor().await })?; - f(editor) - }, + match self.folder_editor.read().await.clone() { + None => Err(FlowyError::internal().context("FolderEditor should be initialized after user login in.")), Some(editor) => f(editor), } } - pub fn user_did_logout(&self) { *self.folder_editor.write() = None; } + pub fn db_pool(&self) -> FlowyResult> { self.database.db_pool() } pub async fn initialize(&self, user_id: &str) -> FlowyResult<()> { let migrations = FolderMigration::new(user_id, self.database.clone()); @@ -112,20 +106,9 @@ impl FolderPersistence { self.save_folder(user_id, migrated_folder).await?; } - let _ = self.init_folder_editor().await?; Ok(()) } - async fn init_folder_editor(&self) -> FlowyResult> { - let user_id = self.user.user_id()?; - let token = self.user.token()?; - let pool = self.database.db_pool()?; - let folder_editor = FolderEditor::new(&user_id, &token, pool).await?; - let editor = Arc::new(folder_editor); - *self.folder_editor.write() = Some(editor.clone()); - Ok(editor) - } - pub async fn save_folder(&self, user_id: &str, folder: FolderPad) -> FlowyResult<()> { let pool = self.database.db_pool()?; let delta_data = folder.delta().to_bytes(); diff --git a/frontend/rust-lib/flowy-core/src/services/persistence/version_2/v2_impl.rs b/frontend/rust-lib/flowy-core/src/services/persistence/version_2/v2_impl.rs index 4537e8668c..60fda2fd4f 100644 --- a/frontend/rust-lib/flowy-core/src/services/persistence/version_2/v2_impl.rs +++ b/frontend/rust-lib/flowy-core/src/services/persistence/version_2/v2_impl.rs @@ -1,81 +1,32 @@ -use crate::services::persistence::{ - AppChangeset, - FolderPersistenceTransaction, - ViewChangeset, - WorkspaceChangeset, - FOLDER_ID, -}; -use flowy_collaboration::{ - entities::revision::Revision, - folder::{FolderChange, FolderPad}, +use crate::services::{ + folder_editor::FolderEditor, + persistence::{AppChangeset, FolderPersistenceTransaction, ViewChangeset, WorkspaceChangeset}, }; + use flowy_core_data_model::entities::{ app::App, prelude::{RepeatedTrash, Trash, View, Workspace}, }; use flowy_error::{FlowyError, FlowyResult}; -use flowy_sync::{RevisionCache, RevisionCloudService, RevisionManager, RevisionObjectBuilder}; -use lib_infra::future::FutureResult; -use lib_sqlite::ConnectionPool; -use parking_lot::RwLock; + use std::sync::Arc; -pub struct FolderEditor { - user_id: String, - folder_pad: Arc>, - rev_manager: Arc, -} - -impl FolderEditor { - pub async fn new(user_id: &str, token: &str, pool: Arc) -> FlowyResult { - let cache = Arc::new(RevisionCache::new(user_id, FOLDER_ID, pool)); - let mut rev_manager = RevisionManager::new(user_id, FOLDER_ID, cache); - let cloud = Arc::new(FolderRevisionCloudServiceImpl { - token: token.to_string(), - }); - let folder_pad = Arc::new(RwLock::new(rev_manager.load::(cloud).await?)); - let rev_manager = Arc::new(rev_manager); - let user_id = user_id.to_owned(); - Ok(Self { - user_id, - folder_pad, - rev_manager, - }) - } - - fn apply_change(&self, change: FolderChange) -> FlowyResult<()> { - let FolderChange { delta, md5 } = change; - let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair(); - let delta_data = delta.to_bytes(); - let revision = Revision::new( - &self.rev_manager.object_id, - base_rev_id, - rev_id, - delta_data, - &self.user_id, - md5, - ); - let _ = futures::executor::block_on(async { self.rev_manager.add_local_revision(&revision).await })?; - Ok(()) - } -} - impl FolderPersistenceTransaction for FolderEditor { fn create_workspace(&self, _user_id: &str, workspace: Workspace) -> FlowyResult<()> { - if let Some(change) = self.folder_pad.write().create_workspace(workspace)? { + if let Some(change) = self.folder.write().create_workspace(workspace)? { let _ = self.apply_change(change)?; } Ok(()) } fn read_workspaces(&self, _user_id: &str, workspace_id: Option) -> FlowyResult> { - let workspaces = self.folder_pad.read().read_workspaces(workspace_id)?; + let workspaces = self.folder.read().read_workspaces(workspace_id)?; Ok(workspaces) } fn update_workspace(&self, changeset: WorkspaceChangeset) -> FlowyResult<()> { if let Some(change) = self - .folder_pad + .folder .write() .update_workspace(&changeset.id, changeset.name, changeset.desc)? { @@ -85,14 +36,14 @@ impl FolderPersistenceTransaction for FolderEditor { } fn delete_workspace(&self, workspace_id: &str) -> FlowyResult<()> { - if let Some(change) = self.folder_pad.write().delete_workspace(workspace_id)? { + if let Some(change) = self.folder.write().delete_workspace(workspace_id)? { let _ = self.apply_change(change)?; } Ok(()) } fn create_app(&self, app: App) -> FlowyResult<()> { - if let Some(change) = self.folder_pad.write().create_app(app)? { + if let Some(change) = self.folder.write().create_app(app)? { let _ = self.apply_change(change)?; } Ok(()) @@ -100,7 +51,7 @@ impl FolderPersistenceTransaction for FolderEditor { fn update_app(&self, changeset: AppChangeset) -> FlowyResult<()> { if let Some(change) = self - .folder_pad + .folder .write() .update_app(&changeset.id, changeset.name, changeset.desc)? { @@ -110,12 +61,12 @@ impl FolderPersistenceTransaction for FolderEditor { } fn read_app(&self, app_id: &str) -> FlowyResult { - let app = self.folder_pad.read().read_app(app_id)?; + let app = self.folder.read().read_app(app_id)?; Ok(app) } fn read_workspace_apps(&self, workspace_id: &str) -> FlowyResult> { - let workspaces = self.folder_pad.read().read_workspaces(Some(workspace_id.to_owned()))?; + let workspaces = self.folder.read().read_workspaces(Some(workspace_id.to_owned()))?; match workspaces.first() { None => { Err(FlowyError::record_not_found().context(format!("can't find workspace with id {}", workspace_id))) @@ -125,92 +76,68 @@ impl FolderPersistenceTransaction for FolderEditor { } fn delete_app(&self, app_id: &str) -> FlowyResult { - let app = self.folder_pad.read().read_app(app_id)?; - if let Some(change) = self.folder_pad.write().delete_app(app_id)? { + let app = self.folder.read().read_app(app_id)?; + if let Some(change) = self.folder.write().delete_app(app_id)? { let _ = self.apply_change(change)?; } Ok(app) } fn create_view(&self, view: View) -> FlowyResult<()> { - if let Some(change) = self.folder_pad.write().create_view(view)? { + if let Some(change) = self.folder.write().create_view(view)? { let _ = self.apply_change(change)?; } Ok(()) } fn read_view(&self, view_id: &str) -> FlowyResult { - let view = self.folder_pad.read().read_view(view_id)?; + let view = self.folder.read().read_view(view_id)?; Ok(view) } fn read_views(&self, belong_to_id: &str) -> FlowyResult> { - let views = self.folder_pad.read().read_views(belong_to_id)?; + let views = self.folder.read().read_views(belong_to_id)?; Ok(views) } fn update_view(&self, changeset: ViewChangeset) -> FlowyResult<()> { - if let Some(change) = self.folder_pad.write().update_view( - &changeset.id, - changeset.name, - changeset.desc, - changeset.modified_time, - )? { + if let Some(change) = + self.folder + .write() + .update_view(&changeset.id, changeset.name, changeset.desc, changeset.modified_time)? + { let _ = self.apply_change(change)?; } Ok(()) } fn delete_view(&self, view_id: &str) -> FlowyResult<()> { - if let Some(change) = self.folder_pad.write().delete_view(view_id)? { + if let Some(change) = self.folder.write().delete_view(view_id)? { let _ = self.apply_change(change)?; } Ok(()) } fn create_trash(&self, trashes: Vec) -> FlowyResult<()> { - if let Some(change) = self.folder_pad.write().create_trash(trashes)? { + if let Some(change) = self.folder.write().create_trash(trashes)? { let _ = self.apply_change(change)?; } Ok(()) } fn read_trash(&self, trash_id: Option) -> FlowyResult { - let trash = self.folder_pad.read().read_trash(trash_id)?; + let trash = self.folder.read().read_trash(trash_id)?; Ok(RepeatedTrash { items: trash }) } fn delete_trash(&self, trash_ids: Option>) -> FlowyResult<()> { - if let Some(change) = self.folder_pad.write().delete_trash(trash_ids)? { + if let Some(change) = self.folder.write().delete_trash(trash_ids)? { let _ = self.apply_change(change)?; } Ok(()) } } -struct FolderPadBuilder(); -impl RevisionObjectBuilder for FolderPadBuilder { - type Output = FolderPad; - - fn build_with_revisions(_object_id: &str, revisions: Vec) -> FlowyResult { - let pad = FolderPad::from_revisions(revisions)?; - Ok(pad) - } -} - -struct FolderRevisionCloudServiceImpl { - #[allow(dead_code)] - token: String, - // server: Arc, -} - -impl RevisionCloudService for FolderRevisionCloudServiceImpl { - #[tracing::instrument(level = "debug", skip(self))] - fn fetch_object(&self, _user_id: &str, _object_id: &str) -> FutureResult, FlowyError> { - FutureResult::new(async move { Ok(vec![]) }) - } -} - impl FolderPersistenceTransaction for Arc where T: FolderPersistenceTransaction + ?Sized, diff --git a/frontend/rust-lib/flowy-core/src/services/trash/controller.rs b/frontend/rust-lib/flowy-core/src/services/trash/controller.rs index 8d276b5223..f83a00c7f5 100644 --- a/frontend/rust-lib/flowy-core/src/services/trash/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/trash/controller.rs @@ -34,16 +34,19 @@ impl TrashController { #[tracing::instrument(level = "debug", skip(self), fields(putback) err)] pub async fn putback(&self, trash_id: &str) -> FlowyResult<()> { let (tx, mut rx) = mpsc::channel::>(1); - let trash = self.persistence.begin_transaction(|transaction| { - let mut repeated_trash = transaction.read_trash(Some(trash_id.to_owned()))?; - let _ = transaction.delete_trash(Some(vec![trash_id.to_owned()]))?; - notify_trash_changed(transaction.read_trash(None)?); + let trash = self + .persistence + .begin_transaction(|transaction| { + let mut repeated_trash = transaction.read_trash(Some(trash_id.to_owned()))?; + let _ = transaction.delete_trash(Some(vec![trash_id.to_owned()]))?; + notify_trash_changed(transaction.read_trash(None)?); - if repeated_trash.is_empty() { - return Err(FlowyError::internal().context("Try to put back trash is not exists")); - } - Ok(repeated_trash.pop().unwrap()) - })?; + if repeated_trash.is_empty() { + return Err(FlowyError::internal().context("Try to put back trash is not exists")); + } + Ok(repeated_trash.pop().unwrap()) + }) + .await?; let identifier = TrashId { id: trash.id, @@ -63,11 +66,14 @@ impl TrashController { #[tracing::instrument(level = "debug", skip(self) err)] pub async fn restore_all(&self) -> FlowyResult<()> { - let repeated_trash = self.persistence.begin_transaction(|transaction| { - let trash = transaction.read_trash(None); - let _ = transaction.delete_trash(None); - trash - })?; + let repeated_trash = self + .persistence + .begin_transaction(|transaction| { + let trash = transaction.read_trash(None); + let _ = transaction.delete_trash(None); + trash + }) + .await?; let identifiers: RepeatedTrashId = repeated_trash.items.clone().into(); let (tx, mut rx) = mpsc::channel::>(1); @@ -83,7 +89,8 @@ impl TrashController { pub async fn delete_all(&self) -> FlowyResult<()> { let repeated_trash = self .persistence - .begin_transaction(|transaction| transaction.read_trash(None))?; + .begin_transaction(|transaction| transaction.read_trash(None)) + .await?; let trash_identifiers: RepeatedTrashId = repeated_trash.items.clone().into(); let _ = self.delete_with_identifiers(trash_identifiers.clone()).await?; @@ -97,7 +104,8 @@ impl TrashController { let _ = self.delete_with_identifiers(trash_identifiers.clone()).await?; let repeated_trash = self .persistence - .begin_transaction(|transaction| transaction.read_trash(None))?; + .begin_transaction(|transaction| transaction.read_trash(None)) + .await?; notify_trash_changed(repeated_trash); let _ = self.delete_trash_on_server(trash_identifiers)?; @@ -117,14 +125,17 @@ impl TrashController { Err(e) => log::error!("{}", e), }, } - let _ = self.persistence.begin_transaction(|transaction| { - let ids = trash_identifiers - .items - .into_iter() - .map(|item| item.id) - .collect::>(); - transaction.delete_trash(Some(ids)) - })?; + let _ = self + .persistence + .begin_transaction(|transaction| { + let ids = trash_identifiers + .items + .into_iter() + .map(|item| item.id) + .collect::>(); + transaction.delete_trash(Some(ids)) + }) + .await?; Ok(()) } @@ -153,12 +164,15 @@ impl TrashController { .as_str(), ); - let _ = self.persistence.begin_transaction(|transaction| { - let _ = transaction.create_trash(repeated_trash.clone())?; - let _ = self.create_trash_on_server(repeated_trash); - notify_trash_changed(transaction.read_trash(None)?); - Ok(()) - })?; + let _ = self + .persistence + .begin_transaction(|transaction| { + let _ = transaction.create_trash(repeated_trash.clone())?; + let _ = self.create_trash_on_server(repeated_trash); + notify_trash_changed(transaction.read_trash(None)?); + Ok(()) + }) + .await?; let _ = self.notify.send(TrashEvent::NewTrash(identifiers.into(), tx)); let _ = rx.recv().await.unwrap()?; @@ -167,10 +181,11 @@ impl TrashController { pub fn subscribe(&self) -> broadcast::Receiver { self.notify.subscribe() } - pub fn read_trash(&self) -> Result { + pub async fn read_trash(&self) -> Result { let repeated_trash = self .persistence - .begin_transaction(|transaction| transaction.read_trash(None))?; + .begin_transaction(|transaction| transaction.read_trash(None)) + .await?; let _ = self.read_trash_on_server()?; Ok(repeated_trash) } @@ -229,10 +244,12 @@ impl TrashController { match server.read_trash(&token).await { Ok(repeated_trash) => { tracing::debug!("Remote trash count: {}", repeated_trash.items.len()); - let result = persistence.begin_transaction(|transaction| { - let _ = transaction.create_trash(repeated_trash.items.clone())?; - transaction.read_trash(None) - }); + let result = persistence + .begin_transaction(|transaction| { + let _ = transaction.create_trash(repeated_trash.items.clone())?; + transaction.read_trash(None) + }) + .await; match result { Ok(repeated_trash) => { diff --git a/frontend/rust-lib/flowy-core/src/services/trash/event_handler.rs b/frontend/rust-lib/flowy-core/src/services/trash/event_handler.rs index 4794d521e8..4b7fc27c3d 100644 --- a/frontend/rust-lib/flowy-core/src/services/trash/event_handler.rs +++ b/frontend/rust-lib/flowy-core/src/services/trash/event_handler.rs @@ -10,7 +10,7 @@ use std::sync::Arc; pub(crate) async fn read_trash_handler( controller: Unit>, ) -> DataResult { - let repeated_trash = controller.read_trash()?; + let repeated_trash = controller.read_trash().await?; data_result(repeated_trash) } diff --git a/frontend/rust-lib/flowy-core/src/services/view/controller.rs b/frontend/rust-lib/flowy-core/src/services/view/controller.rs index a433cd9f9e..e10b306be6 100644 --- a/frontend/rust-lib/flowy-core/src/services/view/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/view/controller.rs @@ -106,36 +106,43 @@ impl ViewController { pub(crate) async fn create_view_on_local(&self, view: View) -> Result<(), FlowyError> { let trash_controller = self.trash_controller.clone(); - self.persistence.begin_transaction(|transaction| { - let belong_to_id = view.belong_to_id.clone(); - let _ = transaction.create_view(view)?; - let _ = notify_views_changed(&belong_to_id, trash_controller, &transaction)?; - Ok(()) - }) + self.persistence + .begin_transaction(|transaction| { + let belong_to_id = view.belong_to_id.clone(); + let _ = transaction.create_view(view)?; + let _ = notify_views_changed(&belong_to_id, trash_controller, &transaction)?; + Ok(()) + }) + .await } #[tracing::instrument(skip(self, params), fields(view_id = %params.view_id), err)] pub(crate) async fn read_view(&self, params: ViewId) -> Result { - let view = self.persistence.begin_transaction(|transaction| { - let view = transaction.read_view(¶ms.view_id)?; - let trash_ids = self.trash_controller.read_trash_ids(&transaction)?; - if trash_ids.contains(&view.id) { - return Err(FlowyError::record_not_found()); - } - Ok(view) - })?; + let view = self + .persistence + .begin_transaction(|transaction| { + let view = transaction.read_view(¶ms.view_id)?; + let trash_ids = self.trash_controller.read_trash_ids(&transaction)?; + if trash_ids.contains(&view.id) { + return Err(FlowyError::record_not_found()); + } + Ok(view) + }) + .await?; let _ = self.read_view_on_server(params); Ok(view) } - pub(crate) fn read_local_views(&self, ids: Vec) -> Result, FlowyError> { - self.persistence.begin_transaction(|transaction| { - let mut views = vec![]; - for view_id in ids { - views.push(transaction.read_view(&view_id)?); - } - Ok(views) - }) + pub(crate) async fn read_local_views(&self, ids: Vec) -> Result, FlowyError> { + self.persistence + .begin_transaction(|transaction| { + let mut views = vec![]; + for view_id in ids { + views.push(transaction.read_view(&view_id)?); + } + Ok(views) + }) + .await } #[tracing::instrument(level = "debug", skip(self), err)] @@ -170,7 +177,8 @@ impl ViewController { pub(crate) async fn duplicate_view(&self, doc_id: &str) -> Result<(), FlowyError> { let view = self .persistence - .begin_transaction(|transaction| transaction.read_view(doc_id))?; + .begin_transaction(|transaction| transaction.read_view(doc_id)) + .await?; let editor = self.document_ctx.controller.open_document(doc_id).await?; let document_json = editor.document_json().await?; @@ -201,24 +209,29 @@ impl ViewController { // belong_to_id will be the app_id or view_id. #[tracing::instrument(level = "debug", skip(self), err)] pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result { - self.persistence.begin_transaction(|transaction| { - read_belonging_views_on_local(belong_to_id, self.trash_controller.clone(), &transaction) - }) + self.persistence + .begin_transaction(|transaction| { + read_belonging_views_on_local(belong_to_id, self.trash_controller.clone(), &transaction) + }) + .await } #[tracing::instrument(level = "debug", skip(self, params), err)] pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result { let changeset = ViewChangeset::new(params.clone()); let view_id = changeset.id.clone(); - let view = self.persistence.begin_transaction(|transaction| { - let _ = transaction.update_view(changeset)?; - let view = transaction.read_view(&view_id)?; - send_dart_notification(&view_id, WorkspaceNotification::ViewUpdated) - .payload(view.clone()) - .send(); - let _ = notify_views_changed(&view.belong_to_id, self.trash_controller.clone(), &transaction)?; - Ok(view) - })?; + let view = self + .persistence + .begin_transaction(|transaction| { + let _ = transaction.update_view(changeset)?; + let view = transaction.read_view(&view_id)?; + send_dart_notification(&view_id, WorkspaceNotification::ViewUpdated) + .payload(view.clone()) + .send(); + let _ = notify_views_changed(&view.belong_to_id, self.trash_controller.clone(), &transaction)?; + Ok(view) + }) + .await?; let _ = self.update_view_on_server(params); Ok(view) @@ -229,13 +242,14 @@ impl ViewController { Ok(doc) } - pub(crate) fn latest_visit_view(&self) -> FlowyResult> { + pub(crate) async fn latest_visit_view(&self) -> FlowyResult> { match KV::get_str(LATEST_VIEW_ID) { None => Ok(None), Some(view_id) => { let view = self .persistence - .begin_transaction(|transaction| transaction.read_view(&view_id))?; + .begin_transaction(|transaction| transaction.read_view(&view_id)) + .await?; Ok(Some(view)) }, } @@ -277,7 +291,10 @@ impl ViewController { tokio::spawn(async move { match server.read_view(&token, params).await { Ok(Some(view)) => { - match persistence.begin_transaction(|transaction| transaction.create_view(view.clone())) { + match persistence + .begin_transaction(|transaction| transaction.create_view(view.clone())) + .await + { Ok(_) => { send_dart_notification(&view.id, WorkspaceNotification::ViewUpdated) .payload(view.clone()) @@ -324,43 +341,49 @@ async fn handle_trash_event( ) { match event { TrashEvent::NewTrash(identifiers, ret) => { - let result = persistence.begin_transaction(|transaction| { - let views = read_local_views_with_transaction(identifiers, &transaction)?; - for view in views { - let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?; - notify_dart(view, WorkspaceNotification::ViewDeleted); - } - Ok(()) - }); + let result = persistence + .begin_transaction(|transaction| { + let views = read_local_views_with_transaction(identifiers, &transaction)?; + for view in views { + let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?; + notify_dart(view, WorkspaceNotification::ViewDeleted); + } + Ok(()) + }) + .await; let _ = ret.send(result).await; }, TrashEvent::Putback(identifiers, ret) => { - let result = persistence.begin_transaction(|transaction| { - let views = read_local_views_with_transaction(identifiers, &transaction)?; - for view in views { - let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?; - notify_dart(view, WorkspaceNotification::ViewRestored); - } - Ok(()) - }); + let result = persistence + .begin_transaction(|transaction| { + let views = read_local_views_with_transaction(identifiers, &transaction)?; + for view in views { + let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?; + notify_dart(view, WorkspaceNotification::ViewRestored); + } + Ok(()) + }) + .await; let _ = ret.send(result).await; }, TrashEvent::Delete(identifiers, ret) => { - let result = persistence.begin_transaction(|transaction| { - let mut notify_ids = HashSet::new(); - for identifier in identifiers.items { - let view = transaction.read_view(&identifier.id)?; - let _ = transaction.delete_view(&identifier.id)?; - let _ = context.controller.delete(&identifier.id)?; - notify_ids.insert(view.belong_to_id); - } + let result = persistence + .begin_transaction(|transaction| { + let mut notify_ids = HashSet::new(); + for identifier in identifiers.items { + let view = transaction.read_view(&identifier.id)?; + let _ = transaction.delete_view(&identifier.id)?; + let _ = context.controller.delete(&identifier.id)?; + notify_ids.insert(view.belong_to_id); + } - for notify_id in notify_ids { - let _ = notify_views_changed(¬ify_id, trash_can.clone(), &transaction)?; - } + for notify_id in notify_ids { + let _ = notify_views_changed(¬ify_id, trash_can.clone(), &transaction)?; + } - Ok(()) - }); + Ok(()) + }) + .await; let _ = ret.send(result).await; }, } diff --git a/frontend/rust-lib/flowy-core/src/services/view/event_handler.rs b/frontend/rust-lib/flowy-core/src/services/view/event_handler.rs index 8ad82d11db..c93aa7f81c 100644 --- a/frontend/rust-lib/flowy-core/src/services/view/event_handler.rs +++ b/frontend/rust-lib/flowy-core/src/services/view/event_handler.rs @@ -72,7 +72,8 @@ pub(crate) async fn delete_view_handler( } let trash = view_controller - .read_local_views(params.items)? + .read_local_views(params.items) + .await? .into_iter() .map(|view| view.into()) .collect::>(); diff --git a/frontend/rust-lib/flowy-core/src/services/web_socket.rs b/frontend/rust-lib/flowy-core/src/services/web_socket.rs new file mode 100644 index 0000000000..078c6a9e46 --- /dev/null +++ b/frontend/rust-lib/flowy-core/src/services/web_socket.rs @@ -0,0 +1,85 @@ +use crate::services::persistence::FOLDER_ID; +use bytes::Bytes; +use flowy_collaboration::{ + entities::{ + revision::RevisionRange, + ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSDataType}, + }, + folder::FolderPad, +}; +use flowy_error::FlowyError; +use flowy_sync::{ + CompositeWSSinkDataProvider, + RevisionManager, + RevisionWSSinkDataProvider, + RevisionWSSteamConsumer, + RevisionWebSocket, + RevisionWebSocketManager, +}; +use lib_infra::future::FutureResult; +use parking_lot::RwLock; +use std::{sync::Arc, time::Duration}; + +pub(crate) async fn make_folder_ws_manager( + rev_manager: Arc, + web_socket: Arc, + folder_pad: Arc>, +) -> Arc { + let object_id = FOLDER_ID; + let composite_sink_provider = Arc::new(CompositeWSSinkDataProvider::new(object_id, rev_manager.clone())); + let ws_stream_consumer = Arc::new(FolderWSStreamConsumerAdapter { + object_id: object_id.to_string(), + folder_pad, + rev_manager, + sink_provider: composite_sink_provider.clone(), + }); + let sink_provider = Arc::new(FolderWSSinkDataProviderAdapter(composite_sink_provider)); + let ping_duration = Duration::from_millis(2000); + Arc::new(RevisionWebSocketManager::new( + object_id, + web_socket, + sink_provider, + ws_stream_consumer, + ping_duration, + )) +} + +pub(crate) struct FolderWSSinkDataProviderAdapter(Arc); +impl RevisionWSSinkDataProvider for FolderWSSinkDataProviderAdapter { + fn next(&self) -> FutureResult, FlowyError> { + let sink_provider = self.0.clone(); + FutureResult::new(async move { sink_provider.next().await }) + } +} + +struct FolderWSStreamConsumerAdapter { + object_id: String, + folder_pad: Arc>, + rev_manager: Arc, + sink_provider: Arc, +} + +impl RevisionWSSteamConsumer for FolderWSStreamConsumerAdapter { + fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> { todo!() } + + fn receive_ack(&self, id: String, ty: ServerRevisionWSDataType) -> FutureResult<(), FlowyError> { + let sink_provider = self.sink_provider.clone(); + FutureResult::new(async move { sink_provider.ack_data(id, ty).await }) + } + + fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> { + FutureResult::new(async move { Ok(()) }) + } + + fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> { + let rev_manager = self.rev_manager.clone(); + let sink_provider = self.sink_provider.clone(); + let object_id = self.object_id.clone(); + FutureResult::new(async move { + let revisions = rev_manager.get_revisions_in_range(range).await?; + let data = ClientRevisionWSData::from_revisions(&object_id, revisions); + sink_provider.push_data(data).await; + Ok(()) + }) + } +} diff --git a/frontend/rust-lib/flowy-core/src/services/workspace/controller.rs b/frontend/rust-lib/flowy-core/src/services/workspace/controller.rs index 6a821e5a36..3abccc6602 100644 --- a/frontend/rust-lib/flowy-core/src/services/workspace/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/workspace/controller.rs @@ -41,10 +41,13 @@ impl WorkspaceController { let workspace = self.create_workspace_on_server(params.clone()).await?; let user_id = self.user.user_id()?; let token = self.user.token()?; - let workspaces = self.persistence.begin_transaction(|transaction| { - let _ = transaction.create_workspace(&user_id, workspace.clone())?; - transaction.read_workspaces(&user_id, None) - })?; + let workspaces = self + .persistence + .begin_transaction(|transaction| { + let _ = transaction.create_workspace(&user_id, workspace.clone())?; + transaction.read_workspaces(&user_id, None) + }) + .await?; let repeated_workspace = RepeatedWorkspace { items: workspaces }; send_dart_notification(&token, WorkspaceNotification::UserCreateWorkspace) .payload(repeated_workspace) @@ -57,11 +60,14 @@ impl WorkspaceController { pub(crate) async fn update_workspace(&self, params: UpdateWorkspaceParams) -> Result<(), FlowyError> { let changeset = WorkspaceChangeset::new(params.clone()); let workspace_id = changeset.id.clone(); - let workspace = self.persistence.begin_transaction(|transaction| { - let _ = transaction.update_workspace(changeset)?; - let user_id = self.user.user_id()?; - self.read_local_workspace(workspace_id.clone(), &user_id, &transaction) - })?; + let workspace = self + .persistence + .begin_transaction(|transaction| { + let _ = transaction.update_workspace(changeset)?; + let user_id = self.user.user_id()?; + self.read_local_workspace(workspace_id.clone(), &user_id, &transaction) + }) + .await?; send_dart_notification(&workspace_id, WorkspaceNotification::WorkspaceUpdated) .payload(workspace) @@ -75,10 +81,13 @@ impl WorkspaceController { pub(crate) async fn delete_workspace(&self, workspace_id: &str) -> Result<(), FlowyError> { let user_id = self.user.user_id()?; let token = self.user.token()?; - let repeated_workspace = self.persistence.begin_transaction(|transaction| { - let _ = transaction.delete_workspace(workspace_id)?; - self.read_local_workspaces(None, &user_id, &transaction) - })?; + let repeated_workspace = self + .persistence + .begin_transaction(|transaction| { + let _ = transaction.delete_workspace(workspace_id)?; + self.read_local_workspaces(None, &user_id, &transaction) + }) + .await?; send_dart_notification(&token, WorkspaceNotification::UserDeleteWorkspace) .payload(repeated_workspace) .send(); @@ -91,7 +100,8 @@ impl WorkspaceController { if let Some(workspace_id) = params.workspace_id { let workspace = self .persistence - .begin_transaction(|transaction| self.read_local_workspace(workspace_id, &user_id, &transaction))?; + .begin_transaction(|transaction| self.read_local_workspace(workspace_id, &user_id, &transaction)) + .await?; set_current_workspace(&workspace.id); Ok(workspace) } else { @@ -101,9 +111,12 @@ impl WorkspaceController { pub(crate) async fn read_current_workspace_apps(&self) -> Result { let workspace_id = get_current_workspace()?; - let repeated_app = self.persistence.begin_transaction(|transaction| { - read_local_workspace_apps(&workspace_id, self.trash_controller.clone(), &transaction) - })?; + let repeated_app = self + .persistence + .begin_transaction(|transaction| { + read_local_workspace_apps(&workspace_id, self.trash_controller.clone(), &transaction) + }) + .await?; // TODO: read from server Ok(repeated_app) } diff --git a/frontend/rust-lib/flowy-core/src/services/workspace/event_handler.rs b/frontend/rust-lib/flowy-core/src/services/workspace/event_handler.rs index 5a03256fe0..811db3bacf 100644 --- a/frontend/rust-lib/flowy-core/src/services/workspace/event_handler.rs +++ b/frontend/rust-lib/flowy-core/src/services/workspace/event_handler.rs @@ -52,15 +52,19 @@ pub(crate) async fn read_workspaces_handler( let workspace_controller = folder.workspace_controller.clone(); let trash_controller = folder.trash_controller.clone(); - let workspaces = folder.persistence.begin_transaction(|transaction| { - let mut workspaces = - workspace_controller.read_local_workspaces(params.workspace_id.clone(), &user_id, &transaction)?; - for workspace in workspaces.iter_mut() { - let apps = read_local_workspace_apps(&workspace.id, trash_controller.clone(), &transaction)?.into_inner(); - workspace.apps.items = apps; - } - Ok(workspaces) - })?; + let workspaces = folder + .persistence + .begin_transaction(|transaction| { + let mut workspaces = + workspace_controller.read_local_workspaces(params.workspace_id.clone(), &user_id, &transaction)?; + for workspace in workspaces.iter_mut() { + let apps = + read_local_workspace_apps(&workspace.id, trash_controller.clone(), &transaction)?.into_inner(); + workspace.apps.items = apps; + } + Ok(workspaces) + }) + .await?; let _ = read_workspaces_on_server(folder, user_id, params); data_result(workspaces) } @@ -75,13 +79,16 @@ pub async fn read_cur_workspace_handler( workspace_id: Some(workspace_id.clone()), }; - let workspace = folder.persistence.begin_transaction(|transaction| { - folder - .workspace_controller - .read_local_workspace(workspace_id, &user_id, &transaction) - })?; + let workspace = folder + .persistence + .begin_transaction(|transaction| { + folder + .workspace_controller + .read_local_workspace(workspace_id, &user_id, &transaction) + }) + .await?; - let latest_view: Option = folder.view_controller.latest_visit_view().unwrap_or(None); + let latest_view: Option = folder.view_controller.latest_visit_view().await.unwrap_or(None); let setting = CurrentWorkspaceSetting { workspace, latest_view }; let _ = read_workspaces_on_server(folder, user_id, params); data_result(setting) @@ -98,31 +105,33 @@ fn read_workspaces_on_server( tokio::spawn(async move { let workspaces = server.read_workspace(&token, params).await?; - let _ = persistence.begin_transaction(|transaction| { - tracing::debug!("Save {} workspace", workspaces.len()); - for workspace in &workspaces.items { - let m_workspace = workspace.clone(); - let apps = m_workspace.apps.clone().into_inner(); - let _ = transaction.create_workspace(&user_id, m_workspace)?; - tracing::debug!("Save {} apps", apps.len()); - for app in apps { - let views = app.belongings.clone().into_inner(); - match transaction.create_app(app) { - Ok(_) => {}, - Err(e) => log::error!("create app failed: {:?}", e), - } - - tracing::debug!("Save {} views", views.len()); - for view in views { - match transaction.create_view(view) { + let _ = persistence + .begin_transaction(|transaction| { + tracing::debug!("Save {} workspace", workspaces.len()); + for workspace in &workspaces.items { + let m_workspace = workspace.clone(); + let apps = m_workspace.apps.clone().into_inner(); + let _ = transaction.create_workspace(&user_id, m_workspace)?; + tracing::debug!("Save {} apps", apps.len()); + for app in apps { + let views = app.belongings.clone().into_inner(); + match transaction.create_app(app) { Ok(_) => {}, - Err(e) => log::error!("create view failed: {:?}", e), + Err(e) => log::error!("create app failed: {:?}", e), + } + + tracing::debug!("Save {} views", views.len()); + for view in views { + match transaction.create_view(view) { + Ok(_) => {}, + Err(e) => log::error!("create view failed: {:?}", e), + } } } } - } - Ok(()) - })?; + Ok(()) + }) + .await?; send_dart_notification(&token, WorkspaceNotification::WorkspaceListUpdated) .payload(workspaces) diff --git a/frontend/rust-lib/flowy-document/src/controller.rs b/frontend/rust-lib/flowy-document/src/controller.rs index 980de70cbc..812858c03b 100644 --- a/frontend/rust-lib/flowy-document/src/controller.rs +++ b/frontend/rust-lib/flowy-document/src/controller.rs @@ -97,12 +97,17 @@ impl DocumentController { } pub async fn did_receive_ws_data(&self, data: Bytes) { - let data: ServerRevisionWSData = data.try_into().unwrap(); - match self.ws_receivers.get(&data.object_id) { - None => tracing::error!("Can't find any source handler for {:?}", data.object_id), - Some(handler) => match handler.receive_ws_data(data).await { - Ok(_) => {}, - Err(e) => tracing::error!("{}", e), + let result: Result = data.try_into(); + match result { + Ok(data) => match self.ws_receivers.get(&data.object_id) { + None => tracing::error!("Can't find any source handler for {:?}", data.object_id), + Some(handler) => match handler.receive_ws_data(data).await { + Ok(_) => {}, + Err(e) => tracing::error!("{}", e), + }, + }, + Err(e) => { + tracing::error!("Document ws data parser failed: {:?}", e); }, } } diff --git a/frontend/rust-lib/flowy-document/src/core/queue.rs b/frontend/rust-lib/flowy-document/src/core/queue.rs index 3c11b16ade..dcab03322b 100644 --- a/frontend/rust-lib/flowy-document/src/core/queue.rs +++ b/frontend/rust-lib/flowy-document/src/core/queue.rs @@ -7,11 +7,11 @@ use flowy_collaboration::{ util::make_delta_from_revisions, }; use flowy_error::FlowyError; -use flowy_sync::RevisionManager; +use flowy_sync::{DeltaMD5, RevisionManager, TransformDeltas}; use futures::stream::StreamExt; use lib_ot::{ core::{Interval, OperationTransformable}, - rich_text::{RichTextAttribute, RichTextDelta}, + rich_text::{RichTextAttribute, RichTextAttributes, RichTextDelta}, }; use std::sync::Arc; use tokio::sync::{oneshot, RwLock}; @@ -72,62 +72,36 @@ impl EditorCommandQueue { let _ = self.save_local_delta(delta, md5).await?; let _ = ret.send(Ok(())); }, - EditorCommand::ComposeRemoteDelta { - revisions, - client_delta, - server_delta, - ret, - } => { + EditorCommand::ComposeRemoteDelta { client_delta, ret } => { let mut document = self.document.write().await; let _ = document.compose_delta(client_delta.clone())?; let md5 = document.md5(); - for revision in &revisions { - let _ = self.rev_manager.add_remote_revision(revision).await?; - } - - let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair(); - let doc_id = self.rev_manager.object_id.clone(); - let user_id = self.user.user_id()?; - let (client_revision, server_revision) = make_client_and_server_revision( - &doc_id, - &user_id, - base_rev_id, - rev_id, - client_delta, - Some(server_delta), - md5, - ); - let _ = self.rev_manager.add_remote_revision(&client_revision).await?; - let _ = ret.send(Ok(server_revision)); + drop(document); + let _ = ret.send(Ok(md5)); }, - EditorCommand::OverrideDelta { revisions, delta, ret } => { + EditorCommand::ResetDelta { delta, ret } => { let mut document = self.document.write().await; let _ = document.set_delta(delta); let md5 = document.md5(); drop(document); - - let repeated_revision = RepeatedRevision::new(revisions); - assert_eq!(repeated_revision.last().unwrap().md5, md5); - let _ = self.rev_manager.reset_object(repeated_revision).await?; - let _ = ret.send(Ok(())); + let _ = ret.send(Ok(md5)); }, - EditorCommand::TransformRevision { revisions, ret } => { + EditorCommand::TransformDelta { delta, ret } => { let f = || async { - let new_delta = make_delta_from_revisions(revisions)?; let read_guard = self.document.read().await; let mut server_prime: Option = None; let client_prime: RichTextDelta; // The document is empty if its text is equal to the initial text. if read_guard.is_empty::() { // Do nothing - client_prime = new_delta; + client_prime = delta; } else { - let (s_prime, c_prime) = read_guard.delta().transform(&new_delta)?; + let (s_prime, c_prime) = read_guard.delta().transform(&delta)?; client_prime = c_prime; server_prime = Some(s_prime); } drop(read_guard); - Ok::(TransformDeltas { + Ok::, CollaborateError>(TransformDeltas { client_prime, server_prime, }) @@ -251,19 +225,16 @@ pub(crate) enum EditorCommand { ret: Ret<()>, }, ComposeRemoteDelta { - revisions: Vec, client_delta: RichTextDelta, - server_delta: RichTextDelta, - ret: Ret>, + ret: Ret, }, - OverrideDelta { - revisions: Vec, + ResetDelta { delta: RichTextDelta, - ret: Ret<()>, + ret: Ret, }, - TransformRevision { - revisions: Vec, - ret: Ret, + TransformDelta { + delta: RichTextDelta, + ret: Ret>, }, Insert { index: usize, @@ -310,8 +281,8 @@ impl std::fmt::Debug for EditorCommand { let s = match self { EditorCommand::ComposeLocalDelta { .. } => "ComposeLocalDelta", EditorCommand::ComposeRemoteDelta { .. } => "ComposeRemoteDelta", - EditorCommand::OverrideDelta { .. } => "OverrideDelta", - EditorCommand::TransformRevision { .. } => "TransformRevision", + EditorCommand::ResetDelta { .. } => "ResetDelta", + EditorCommand::TransformDelta { .. } => "TransformDelta", EditorCommand::Insert { .. } => "Insert", EditorCommand::Delete { .. } => "Delete", EditorCommand::Format { .. } => "Format", @@ -326,8 +297,3 @@ impl std::fmt::Debug for EditorCommand { f.write_str(s) } } - -pub(crate) struct TransformDeltas { - pub client_prime: RichTextDelta, - pub server_prime: Option, -} diff --git a/frontend/rust-lib/flowy-document/src/core/web_socket.rs b/frontend/rust-lib/flowy-document/src/core/web_socket.rs index e8317b19cb..0ac9232455 100644 --- a/frontend/rust-lib/flowy-document/src/core/web_socket.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket.rs @@ -1,32 +1,37 @@ use crate::{ - core::{EditorCommand, TransformDeltas, SYNC_INTERVAL_IN_MILLIS}, + core::{EditorCommand, SYNC_INTERVAL_IN_MILLIS}, DocumentWSReceiver, }; use async_trait::async_trait; use bytes::Bytes; use flowy_collaboration::{ entities::{ - revision::{RepeatedRevision, Revision, RevisionRange}, + revision::RevisionRange, ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSData, ServerRevisionWSDataType}, }, errors::CollaborateResult, }; -use flowy_error::{internal_error, FlowyError, FlowyResult}; +use flowy_error::{internal_error, FlowyError}; use flowy_sync::{ + CompositeWSSinkDataProvider, + DeltaMD5, + ResolverTarget, + RevisionConflictResolver, RevisionManager, RevisionWSSinkDataProvider, RevisionWSSteamConsumer, RevisionWebSocket, RevisionWebSocketManager, + TransformDeltas, }; -use lib_infra::future::FutureResult; +use lib_infra::future::{BoxResultFuture, FutureResult}; +use lib_ot::{core::Delta, rich_text::RichTextAttributes}; use lib_ws::WSConnectState; -use std::{collections::VecDeque, convert::TryFrom, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use tokio::sync::{ broadcast, mpsc::{Receiver, Sender}, oneshot, - RwLock, }; pub(crate) type EditorCommandSender = Sender; @@ -39,19 +44,25 @@ pub(crate) async fn make_document_ws_manager( rev_manager: Arc, web_socket: Arc, ) -> Arc { - let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone())); - let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { + let composite_sink_provider = Arc::new(CompositeWSSinkDataProvider::new(&doc_id, rev_manager.clone())); + let resolver_target = Arc::new(DocumentRevisionResolver { edit_cmd_tx }); + let resolver = RevisionConflictResolver::::new( + &user_id, + resolver_target, + Arc::new(composite_sink_provider.clone()), + rev_manager.clone(), + ); + let ws_stream_consumer = Arc::new(DocumentWSSteamConsumerAdapter { object_id: doc_id.clone(), - edit_cmd_tx, - rev_manager: rev_manager.clone(), - shared_sink: shared_sink.clone(), + resolver: Arc::new(resolver), }); - let data_provider = Arc::new(DocumentWSSinkDataProviderAdapter(shared_sink)); + + let sink_provider = Arc::new(DocumentWSSinkDataProviderAdapter(composite_sink_provider)); let ping_duration = Duration::from_millis(SYNC_INTERVAL_IN_MILLIS); let ws_manager = Arc::new(RevisionWebSocketManager::new( &doc_id, web_socket, - data_provider, + sink_provider, ws_stream_consumer, ping_duration, )); @@ -77,31 +88,20 @@ fn listen_document_ws_state( }); } -pub(crate) struct DocumentWebSocketSteamConsumerAdapter { - pub(crate) object_id: String, - pub(crate) edit_cmd_tx: EditorCommandSender, - pub(crate) rev_manager: Arc, - pub(crate) shared_sink: Arc, +pub(crate) struct DocumentWSSteamConsumerAdapter { + object_id: String, + resolver: Arc>, } -impl RevisionWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter { +impl RevisionWSSteamConsumer for DocumentWSSteamConsumerAdapter { fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> { - let rev_manager = self.rev_manager.clone(); - let edit_cmd_tx = self.edit_cmd_tx.clone(); - let shared_sink = self.shared_sink.clone(); - let object_id = self.object_id.clone(); - FutureResult::new(async move { - if let Some(server_composed_revision) = handle_remote_revision(edit_cmd_tx, rev_manager, bytes).await? { - let data = ClientRevisionWSData::from_revisions(&object_id, vec![server_composed_revision]); - shared_sink.push_back(data).await; - } - Ok(()) - }) + let resolver = self.resolver.clone(); + FutureResult::new(async move { resolver.receive_bytes(bytes).await }) } fn receive_ack(&self, id: String, ty: ServerRevisionWSDataType) -> FutureResult<(), FlowyError> { - let shared_sink = self.shared_sink.clone(); - FutureResult::new(async move { shared_sink.ack(id, ty).await }) + let resolver = self.resolver.clone(); + FutureResult::new(async move { resolver.ack_revision(id, ty).await }) } fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> { @@ -110,202 +110,71 @@ impl RevisionWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter { } fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> { - let rev_manager = self.rev_manager.clone(); - let shared_sink = self.shared_sink.clone(); - let object_id = self.object_id.clone(); - FutureResult::new(async move { - let revisions = rev_manager.get_revisions_in_range(range).await?; - let data = ClientRevisionWSData::from_revisions(&object_id, revisions); - shared_sink.push_back(data).await; - Ok(()) - }) + let resolver = self.resolver.clone(); + FutureResult::new(async move { resolver.send_revisions(range).await }) } } -pub(crate) struct DocumentWSSinkDataProviderAdapter(pub(crate) Arc); +pub(crate) struct DocumentWSSinkDataProviderAdapter(pub(crate) Arc); impl RevisionWSSinkDataProvider for DocumentWSSinkDataProviderAdapter { fn next(&self) -> FutureResult, FlowyError> { - let shared_sink = self.0.clone(); - FutureResult::new(async move { shared_sink.next().await }) + let sink_provider = self.0.clone(); + FutureResult::new(async move { sink_provider.next().await }) } } -async fn transform_pushed_revisions( - revisions: Vec, - edit_cmd_tx: &EditorCommandSender, -) -> FlowyResult { - let (ret, rx) = oneshot::channel::>(); - edit_cmd_tx - .send(EditorCommand::TransformRevision { revisions, ret }) - .await - .map_err(internal_error)?; - let transform_delta = rx - .await - .map_err(|e| FlowyError::internal().context(format!("transform_pushed_revisions failed: {}", e)))??; - Ok(transform_delta) -} - -#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes), err)] -pub(crate) async fn handle_remote_revision( +struct DocumentRevisionResolver { edit_cmd_tx: EditorCommandSender, - rev_manager: Arc, - bytes: Bytes, -) -> FlowyResult> { - let mut revisions = RepeatedRevision::try_from(bytes)?.into_inner(); - if revisions.is_empty() { - return Ok(None); - } +} - let first_revision = revisions.first().unwrap(); - if let Some(local_revision) = rev_manager.get_revision(first_revision.rev_id).await { - if local_revision.md5 == first_revision.md5 { - // The local revision is equal to the pushed revision. Just ignore it. - revisions = revisions.split_off(1); - if revisions.is_empty() { - return Ok(None); - } - } else { - return Ok(None); - } - } - - let TransformDeltas { - client_prime, - server_prime, - } = transform_pushed_revisions(revisions.clone(), &edit_cmd_tx).await?; - - match server_prime { - None => { - // The server_prime is None means the client local revisions conflict with the - // server, and it needs to override the client delta. +impl ResolverTarget for DocumentRevisionResolver { + fn compose_delta(&self, delta: Delta) -> BoxResultFuture { + let tx = self.edit_cmd_tx.clone(); + Box::pin(async move { let (ret, rx) = oneshot::channel(); - let _ = edit_cmd_tx - .send(EditorCommand::OverrideDelta { - revisions, - delta: client_prime, - ret, - }) - .await; - let _ = rx.await.map_err(|e| { - FlowyError::internal().context(format!("handle EditorCommand::OverrideDelta failed: {}", e)) - })??; - Ok(None) - }, - Some(server_prime) => { - let (ret, rx) = oneshot::channel(); - edit_cmd_tx - .send(EditorCommand::ComposeRemoteDelta { - revisions, - client_delta: client_prime, - server_delta: server_prime, - ret, - }) - .await - .map_err(internal_error)?; - let result = rx.await.map_err(|e| { + tx.send(EditorCommand::ComposeRemoteDelta { + client_delta: delta, + ret, + }) + .await + .map_err(internal_error)?; + let md5 = rx.await.map_err(|e| { FlowyError::internal().context(format!("handle EditorCommand::ComposeRemoteDelta failed: {}", e)) })??; - Ok(result) - }, - } -} - -#[derive(Clone)] -enum SourceType { - Shared, - Revision, -} - -#[derive(Clone)] -pub(crate) struct SharedWSSinkDataProvider { - shared: Arc>>, - rev_manager: Arc, - source_ty: Arc>, -} - -impl SharedWSSinkDataProvider { - pub(crate) fn new(rev_manager: Arc) -> Self { - SharedWSSinkDataProvider { - shared: Arc::new(RwLock::new(VecDeque::new())), - rev_manager, - source_ty: Arc::new(RwLock::new(SourceType::Shared)), - } + Ok(md5) + }) } - #[allow(dead_code)] - pub(crate) async fn push_front(&self, data: ClientRevisionWSData) { self.shared.write().await.push_front(data); } - - async fn push_back(&self, data: ClientRevisionWSData) { self.shared.write().await.push_back(data); } - - async fn next(&self) -> FlowyResult> { - let source_ty = self.source_ty.read().await.clone(); - match source_ty { - SourceType::Shared => match self.shared.read().await.front() { - None => { - *self.source_ty.write().await = SourceType::Revision; - Ok(None) - }, - Some(data) => { - tracing::debug!("[SharedWSSinkDataProvider]: {}:{:?}", data.object_id, data.ty); - Ok(Some(data.clone())) - }, - }, - SourceType::Revision => { - if !self.shared.read().await.is_empty() { - *self.source_ty.write().await = SourceType::Shared; - return Ok(None); - } - - match self.rev_manager.next_sync_revision().await? { - Some(rev) => { - let doc_id = rev.object_id.clone(); - Ok(Some(ClientRevisionWSData::from_revisions(&doc_id, vec![rev]))) - }, - None => { - // - let doc_id = self.rev_manager.object_id.clone(); - let latest_rev_id = self.rev_manager.rev_id(); - Ok(Some(ClientRevisionWSData::ping(&doc_id, latest_rev_id))) - }, - } - }, - } + fn transform_delta( + &self, + delta: Delta, + ) -> BoxResultFuture, FlowyError> { + let tx = self.edit_cmd_tx.clone(); + Box::pin(async move { + let (ret, rx) = oneshot::channel::>>(); + tx.send(EditorCommand::TransformDelta { delta, ret }) + .await + .map_err(internal_error)?; + let transform_delta = rx + .await + .map_err(|e| FlowyError::internal().context(format!("TransformDelta failed: {}", e)))??; + Ok(transform_delta) + }) } - async fn ack(&self, id: String, _ty: ServerRevisionWSDataType) -> FlowyResult<()> { - // let _ = self.rev_manager.ack_revision(id).await?; - let source_ty = self.source_ty.read().await.clone(); - match source_ty { - SourceType::Shared => { - let should_pop = match self.shared.read().await.front() { - None => false, - Some(val) => { - let expected_id = val.id(); - if expected_id == id { - true - } else { - tracing::error!("The front element's {} is not equal to the {}", expected_id, id); - false - } - }, - }; - if should_pop { - let _ = self.shared.write().await.pop_front(); - } - }, - SourceType::Revision => { - match id.parse::() { - Ok(rev_id) => { - let _ = self.rev_manager.ack_revision(rev_id).await?; - }, - Err(e) => { - tracing::error!("Parse rev_id from {} failed. {}", id, e); - }, - }; - }, - } - - Ok(()) + fn reset_delta(&self, delta: Delta) -> BoxResultFuture { + let tx = self.edit_cmd_tx.clone(); + Box::pin(async move { + let (ret, rx) = oneshot::channel(); + let _ = tx + .send(EditorCommand::ResetDelta { delta, ret }) + .await + .map_err(internal_error)?; + let md5 = rx.await.map_err(|e| { + FlowyError::internal().context(format!("handle EditorCommand::OverrideDelta failed: {}", e)) + })??; + Ok(md5) + }) } } diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs similarity index 88% rename from frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs rename to frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs index d728e466cb..7ee5c821da 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs @@ -4,7 +4,7 @@ use flowy_collaboration::entities::ws_data::ClientRevisionWSData; use flowy_core::{ controller::FolderManager, errors::{internal_error, FlowyError}, - module::{init_folder, FolderCouldServiceV1, WorkspaceDatabase, WorkspaceUser}, + module::{FolderCouldServiceV1, WorkspaceDatabase, WorkspaceUser}, }; use flowy_database::ConnectionPool; use flowy_document::context::DocumentContext; @@ -18,8 +18,8 @@ use flowy_user::services::UserSession; use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage}; use std::{convert::TryInto, sync::Arc}; -pub struct CoreDepsResolver(); -impl CoreDepsResolver { +pub struct FolderDepsResolver(); +impl FolderDepsResolver { pub fn resolve( local_server: Option>, user_session: Arc, @@ -29,13 +29,20 @@ impl CoreDepsResolver { ) -> Arc { let user: Arc = Arc::new(WorkspaceUserImpl(user_session.clone())); let database: Arc = Arc::new(WorkspaceDatabaseImpl(user_session)); - let ws_sender = Arc::new(FolderWebSocketImpl(ws_conn.clone())); + let web_socket = Arc::new(FolderWebSocketImpl(ws_conn.clone())); let cloud_service: Arc = match local_server { None => Arc::new(CoreHttpCloudService::new(server_config.clone())), Some(local_server) => local_server, }; - let folder_manager = init_folder(user, database, flowy_document.clone(), cloud_service, ws_sender); + let folder_manager = Arc::new(FolderManager::new( + user, + cloud_service, + database, + flowy_document.clone(), + web_socket, + )); + let receiver = Arc::new(FolderWSMessageReceiverImpl(folder_manager.clone())); ws_conn.add_ws_message_receiver(receiver).unwrap(); diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs index 6b391852a1..e46a3b263c 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs @@ -1,7 +1,7 @@ -mod core_deps; mod document_deps; +mod folder_deps; mod user_deps; -pub use core_deps::*; pub use document_deps::*; +pub use folder_deps::*; pub use user_deps::*; diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index 1c6f694c2e..e9837387b8 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -250,7 +250,7 @@ fn mk_core_context( server_config: &ClientServerConfiguration, ws_conn: &Arc, ) -> Arc { - CoreDepsResolver::resolve( + FolderDepsResolver::resolve( local_server.clone(), user_session.clone(), server_config, diff --git a/frontend/rust-lib/flowy-sync/src/conflict_resolve.rs b/frontend/rust-lib/flowy-sync/src/conflict_resolve.rs new file mode 100644 index 0000000000..bec3862009 --- /dev/null +++ b/frontend/rust-lib/flowy-sync/src/conflict_resolve.rs @@ -0,0 +1,182 @@ +use crate::RevisionManager; +use bytes::Bytes; +use flowy_collaboration::{ + entities::{ + revision::{RepeatedRevision, Revision, RevisionRange}, + ws_data::{ClientRevisionWSData, ServerRevisionWSDataType}, + }, + util::make_delta_from_revisions, +}; +use flowy_error::{FlowyError, FlowyResult}; +use lib_infra::future::BoxResultFuture; +use lib_ot::core::{Attributes, Delta}; +use serde::de::DeserializeOwned; +use std::{convert::TryFrom, sync::Arc}; +use tokio::sync::oneshot; + +pub type DeltaMD5 = String; + +pub trait ResolverTarget +where + T: Attributes + Send + Sync, +{ + fn compose_delta(&self, delta: Delta) -> BoxResultFuture; + fn transform_delta(&self, delta: Delta) -> BoxResultFuture, FlowyError>; + fn reset_delta(&self, delta: Delta) -> BoxResultFuture; +} + +pub trait ResolverRevisionSink: Send + Sync { + fn send(&self, revisions: Vec) -> BoxResultFuture<(), FlowyError>; + fn ack(&self, rev_id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError>; +} + +pub struct RevisionConflictResolver +where + T: Attributes + Send + Sync, +{ + user_id: String, + target: Arc>, + rev_sink: Arc, + rev_manager: Arc, +} + +impl RevisionConflictResolver +where + T: Attributes + Send + Sync, + // DeserializeOwned + serde::Serialize, +{ + pub fn new( + user_id: &str, + target: Arc>, + rev_sink: Arc, + rev_manager: Arc, + ) -> Self { + let user_id = user_id.to_owned(); + Self { + user_id, + target, + rev_sink, + rev_manager, + } + } + + pub async fn receive_bytes(&self, bytes: Bytes) -> FlowyResult<()> { + let repeated_revision = RepeatedRevision::try_from(bytes)?; + if repeated_revision.is_empty() { + return Ok(()); + } + + // match self.handle_revision(repeated_revision).await? { + // None => {}, + // Some(server_revision) => { + // self.rev_sink.send(vec![server_revision]); + // }, + // } + Ok(()) + } + + pub async fn ack_revision(&self, rev_id: String, ty: ServerRevisionWSDataType) -> FlowyResult<()> { + self.rev_sink.ack(rev_id, ty).await + } + + pub async fn send_revisions(&self, range: RevisionRange) -> FlowyResult<()> { + let revisions = self.rev_manager.get_revisions_in_range(range).await?; + self.rev_sink.send(revisions).await; + Ok(()) + } + + // async fn handle_revision(&self, repeated_revision: RepeatedRevision) -> + // FlowyResult> { let mut revisions = + // repeated_revision.into_inner(); let first_revision = + // revisions.first().unwrap(); if let Some(local_revision) = + // self.rev_manager.get_revision(first_revision.rev_id).await { if + // local_revision.md5 == first_revision.md5 { // The local + // revision is equal to the pushed revision. Just ignore it. + // revisions = revisions.split_off(1); if revisions.is_empty() { + // return Ok(None); + // } + // } else { + // return Ok(None); + // } + // } + // + // let new_delta = make_delta_from_revisions(revisions.clone())?; + // + // let TransformDeltas { + // client_prime, + // server_prime, + // } = self.target.transform_delta(new_delta).await?; + // + // match server_prime { + // None => { + // // The server_prime is None means the client local revisions + // conflict with the // server, and it needs to override the + // client delta. let md5 = + // self.target.reset_delta(client_prime).await?; let + // repeated_revision = RepeatedRevision::new(revisions); + // assert_eq!(repeated_revision.last().unwrap().md5, md5); let _ + // = self.rev_manager.reset_object(repeated_revision).await?; + // Ok(None) }, + // Some(server_prime) => { + // let md5 = self.target.compose_delta(client_prime.clone()).await?; + // for revision in &revisions { + // let _ = + // self.rev_manager.add_remote_revision(revision).await?; } + // let (client_revision, server_revision) = + // make_client_and_server_revision( &self.user_id, + // &self.rev_manager, + // client_prime, + // Some(server_prime), + // md5, + // ); + // let _ = + // self.rev_manager.add_remote_revision(&client_revision).await?; + // Ok(server_revision) + // }, + // } + // } +} + +fn make_client_and_server_revision( + user_id: &str, + rev_manager: &Arc, + client_delta: Delta, + server_delta: Option>, + md5: String, +) -> (Revision, Option) +where + T: Attributes + serde::Serialize, +{ + let (base_rev_id, rev_id) = rev_manager.next_rev_id_pair(); + let client_revision = Revision::new( + &rev_manager.object_id, + base_rev_id, + rev_id, + client_delta.to_bytes(), + &user_id, + md5.clone(), + ); + + match server_delta { + None => (client_revision, None), + Some(server_delta) => { + let server_revision = Revision::new( + &rev_manager.object_id, + base_rev_id, + rev_id, + server_delta.to_bytes(), + &user_id, + md5, + ); + (client_revision, Some(server_revision)) + }, + } +} + +pub struct TransformDeltas +where + T: Attributes, +{ + pub client_prime: Delta, + pub server_prime: Option>, +} diff --git a/frontend/rust-lib/flowy-sync/src/lib.rs b/frontend/rust-lib/flowy-sync/src/lib.rs index 052c04f113..16cc05d2c7 100644 --- a/frontend/rust-lib/flowy-sync/src/lib.rs +++ b/frontend/rust-lib/flowy-sync/src/lib.rs @@ -1,8 +1,10 @@ mod cache; +mod conflict_resolve; mod rev_manager; mod ws_manager; pub use cache::*; +pub use conflict_resolve::*; pub use rev_manager::*; pub use ws_manager::*; diff --git a/frontend/rust-lib/flowy-sync/src/ws_manager.rs b/frontend/rust-lib/flowy-sync/src/ws_manager.rs index 48f2af4a9a..00fbf384bf 100644 --- a/frontend/rust-lib/flowy-sync/src/ws_manager.rs +++ b/frontend/rust-lib/flowy-sync/src/ws_manager.rs @@ -1,19 +1,21 @@ +use crate::{ResolverRevisionSink, RevisionManager}; use async_stream::stream; use bytes::Bytes; use flowy_collaboration::entities::{ - revision::{RevId, RevisionRange}, + revision::{RevId, Revision, RevisionRange}, ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSData, ServerRevisionWSDataType}, }; use flowy_error::{internal_error, FlowyError, FlowyResult}; use futures_util::stream::StreamExt; -use lib_infra::future::FutureResult; +use lib_infra::future::{BoxResultFuture, FutureResult}; use lib_ws::WSConnectState; -use std::{convert::TryFrom, sync::Arc}; +use std::{collections::VecDeque, convert::TryFrom, sync::Arc}; use tokio::{ sync::{ broadcast, mpsc, mpsc::{Receiver, Sender}, + RwLock, }, task::spawn_blocking, time::{interval, Duration}, @@ -34,14 +36,14 @@ pub trait RevisionWSSinkDataProvider: Send + Sync { } pub type WSStateReceiver = tokio::sync::broadcast::Receiver; -pub trait RevisionWebSocket: Send + Sync { +pub trait RevisionWebSocket: Send + Sync + 'static { fn send(&self, data: ClientRevisionWSData) -> Result<(), FlowyError>; fn subscribe_state_changed(&self) -> WSStateReceiver; } pub struct RevisionWebSocketManager { pub object_id: String, - data_provider: Arc, + sink_provider: Arc, stream_consumer: Arc, web_socket: Arc, pub ws_passthrough_tx: Sender, @@ -54,7 +56,7 @@ impl RevisionWebSocketManager { pub fn new( object_id: &str, web_socket: Arc, - data_provider: Arc, + sink_provider: Arc, stream_consumer: Arc, ping_duration: Duration, ) -> Self { @@ -64,7 +66,7 @@ impl RevisionWebSocketManager { let (state_passthrough_tx, _) = broadcast::channel(2); let mut manager = RevisionWebSocketManager { object_id, - data_provider, + sink_provider, stream_consumer, web_socket, ws_passthrough_tx, @@ -80,7 +82,7 @@ impl RevisionWebSocketManager { let ws_msg_rx = self.ws_passthrough_rx.take().expect("Only take once"); let sink = RevisionWSSink::new( &self.object_id, - self.data_provider.clone(), + self.sink_provider.clone(), self.web_socket.clone(), self.stop_sync_tx.subscribe(), ping_duration, @@ -172,10 +174,7 @@ impl RevisionWSStream { async fn handle_message(&self, msg: ServerRevisionWSData) -> FlowyResult<()> { let ServerRevisionWSData { object_id: _, ty, data } = msg; - let bytes = spawn_blocking(move || Bytes::from(data)) - .await - .map_err(internal_error)?; - + let bytes = Bytes::from(data); tracing::trace!("[RevisionWSStream]: new message: {:?}", ty); match ty { ServerRevisionWSDataType::ServerPushRev => { @@ -280,3 +279,110 @@ async fn tick(sender: mpsc::Sender<()>, duration: Duration) { interval.tick().await; } } + +#[derive(Clone)] +enum Source { + Custom, + Revision, +} + +#[derive(Clone)] +pub struct CompositeWSSinkDataProvider { + object_id: String, + container: Arc>>, + rev_manager: Arc, + source: Arc>, +} + +impl CompositeWSSinkDataProvider { + pub fn new(object_id: &str, rev_manager: Arc) -> Self { + CompositeWSSinkDataProvider { + object_id: object_id.to_owned(), + container: Arc::new(RwLock::new(VecDeque::new())), + rev_manager, + source: Arc::new(RwLock::new(Source::Custom)), + } + } + + pub async fn push_data(&self, data: ClientRevisionWSData) { self.container.write().await.push_back(data); } + + pub async fn next(&self) -> FlowyResult> { + let source = self.source.read().await.clone(); + let data = match source { + Source::Custom => match self.container.read().await.front() { + None => { + *self.source.write().await = Source::Revision; + Ok(None) + }, + Some(data) => Ok(Some(data.clone())), + }, + Source::Revision => { + if !self.container.read().await.is_empty() { + *self.source.write().await = Source::Custom; + return Ok(None); + } + + match self.rev_manager.next_sync_revision().await? { + Some(rev) => Ok(Some(ClientRevisionWSData::from_revisions(&self.object_id, vec![rev]))), + None => Ok(Some(ClientRevisionWSData::ping( + &self.object_id, + self.rev_manager.rev_id(), + ))), + } + }, + }; + + if let Ok(Some(data)) = &data { + tracing::trace!("[CompositeWSSinkDataProvider]: {}:{:?}", data.object_id, data.ty); + } + + data + } + + pub async fn ack_data(&self, id: String, _ty: ServerRevisionWSDataType) -> FlowyResult<()> { + let source = self.source.read().await.clone(); + match source { + Source::Custom => { + let should_pop = match self.container.read().await.front() { + None => false, + Some(val) => { + let expected_id = val.id(); + if expected_id == id { + true + } else { + tracing::error!("The front element's {} is not equal to the {}", expected_id, id); + false + } + }, + }; + if should_pop { + let _ = self.container.write().await.pop_front(); + } + Ok(()) + }, + Source::Revision => { + let rev_id = id.parse::().map_err(|e| { + FlowyError::internal().context(format!("Parse {} rev_id from {} failed. {}", self.object_id, id, e)) + })?; + let _ = self.rev_manager.ack_revision(rev_id).await?; + Ok::<(), FlowyError>(()) + }, + } + } +} + +impl ResolverRevisionSink for Arc { + fn send(&self, revisions: Vec) -> BoxResultFuture<(), FlowyError> { + let sink = self.clone(); + Box::pin(async move { + sink.push_data(ClientRevisionWSData::from_revisions(&sink.object_id, revisions)) + .await; + Ok(()) + }) + } + + fn ack(&self, rev_id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError> { + let sink = self.clone(); + Box::pin(async move { sink.ack_data(rev_id, ty).await }) + } +} diff --git a/frontend/rust-lib/flowy-test/src/lib.rs b/frontend/rust-lib/flowy-test/src/lib.rs index d3cd6c5cf2..2ba8a20828 100644 --- a/frontend/rust-lib/flowy-test/src/lib.rs +++ b/frontend/rust-lib/flowy-test/src/lib.rs @@ -4,9 +4,8 @@ pub mod helper; use crate::helper::*; use backend_service::configuration::{get_client_server_configuration, ClientServerConfiguration}; use flowy_sdk::{FlowySDK, FlowySDKConfig}; -use flowy_user::{entities::UserProfile, services::database::UserDB}; +use flowy_user::entities::UserProfile; use lib_infra::uuid_string; -use std::sync::Arc; pub mod prelude { pub use crate::{event_builder::*, helper::*, *}; @@ -52,15 +51,3 @@ impl FlowySDKTest { context.user_profile } } - -pub struct MigrationTest { - pub db: UserDB, -} - -impl MigrationTest { - pub fn new() -> Self { - let dir = root_dir(); - let db = UserDB::new(&dir); - Self { db } - } -} diff --git a/shared-lib/flowy-collaboration/src/folder/builder.rs b/shared-lib/flowy-collaboration/src/folder/builder.rs new file mode 100644 index 0000000000..3da8a65215 --- /dev/null +++ b/shared-lib/flowy-collaboration/src/folder/builder.rs @@ -0,0 +1,69 @@ +use crate::{ + entities::revision::Revision, + errors::{CollaborateError, CollaborateResult}, + folder::{default_folder_delta, FolderPad}, +}; +use flowy_core_data_model::entities::{trash::Trash, workspace::Workspace}; +use lib_ot::core::{OperationTransformable, PlainDelta, PlainDeltaBuilder}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +#[derive(Serialize, Deserialize)] +pub(crate) struct FolderPadBuilder { + workspaces: Vec>, + trash: Vec>, +} + +impl FolderPadBuilder { + pub(crate) fn new() -> Self { + Self { + workspaces: vec![], + trash: vec![], + } + } + + pub(crate) fn with_workspace(mut self, workspaces: Vec) -> Self { + self.workspaces = workspaces.into_iter().map(Arc::new).collect::>(); + self + } + + pub(crate) fn with_trash(mut self, trash: Vec) -> Self { + self.trash = trash.into_iter().map(Arc::new).collect::>(); + self + } + + pub(crate) fn build_with_delta(self, mut delta: PlainDelta) -> CollaborateResult { + if delta.is_empty() { + delta = default_folder_delta(); + } + let folder_json = delta.apply("").unwrap(); + let mut folder: FolderPad = serde_json::from_str(&folder_json).map_err(|e| { + CollaborateError::internal().context(format!("Deserialize json to root folder failed: {}", e)) + })?; + folder.root = delta; + Ok(folder) + } + + pub(crate) fn build_with_revisions(self, revisions: Vec) -> CollaborateResult { + let mut folder_delta = PlainDelta::new(); + for revision in revisions { + if revision.delta_data.is_empty() { + tracing::warn!("revision delta_data is empty"); + } + + let delta = PlainDelta::from_bytes(revision.delta_data)?; + folder_delta = folder_delta.compose(&delta)?; + } + self.build_with_delta(folder_delta) + } + + pub(crate) fn build(self) -> CollaborateResult { + let json = serde_json::to_string(&self) + .map_err(|e| CollaborateError::internal().context(format!("serial trash to json failed: {}", e)))?; + Ok(FolderPad { + workspaces: self.workspaces, + trash: self.trash, + root: PlainDeltaBuilder::new().insert(&json).build(), + }) + } +} diff --git a/shared-lib/flowy-collaboration/src/folder/folder_pad.rs b/shared-lib/flowy-collaboration/src/folder/folder_pad.rs index c555ef652e..b37e5e5aa1 100644 --- a/shared-lib/flowy-collaboration/src/folder/folder_pad.rs +++ b/shared-lib/flowy-collaboration/src/folder/folder_pad.rs @@ -1,6 +1,7 @@ use crate::{ entities::revision::{md5, Revision}, errors::{CollaborateError, CollaborateResult}, + folder::builder::FolderPadBuilder, }; use dissimilar::*; use flowy_core_data_model::entities::{app::App, trash::Trash, view::View, workspace::Workspace}; @@ -10,10 +11,10 @@ use std::sync::Arc; #[derive(Debug, Deserialize, Serialize, Clone, Eq, PartialEq)] pub struct FolderPad { - workspaces: Vec>, - trash: Vec>, + pub(crate) workspaces: Vec>, + pub(crate) trash: Vec>, #[serde(skip)] - root: PlainDelta, + pub(crate) root: PlainDelta, } pub fn default_folder_delta() -> PlainDelta { @@ -40,39 +41,17 @@ pub struct FolderChange { impl FolderPad { pub fn new(workspaces: Vec, trash: Vec) -> CollaborateResult { - let mut pad = FolderPad::default(); - pad.workspaces = workspaces.into_iter().map(Arc::new).collect::>(); - pad.trash = trash.into_iter().map(Arc::new).collect::>(); - let json = pad.to_json()?; - pad.root = PlainDeltaBuilder::new().insert(&json).build(); - Ok(pad) + FolderPadBuilder::new() + .with_workspace(workspaces) + .with_trash(trash) + .build() } pub fn from_revisions(revisions: Vec) -> CollaborateResult { - let mut folder_delta = PlainDelta::new(); - for revision in revisions { - if revision.delta_data.is_empty() { - tracing::warn!("revision delta_data is empty"); - } - - let delta = PlainDelta::from_bytes(revision.delta_data)?; - folder_delta = folder_delta.compose(&delta)?; - } - - Self::from_delta(folder_delta) + FolderPadBuilder::new().build_with_revisions(revisions) } - pub fn from_delta(mut delta: PlainDelta) -> CollaborateResult { - if delta.is_empty() { - delta = default_folder_delta(); - } - let folder_json = delta.apply("").unwrap(); - let mut folder: FolderPad = serde_json::from_str(&folder_json).map_err(|e| { - CollaborateError::internal().context(format!("Deserialize json to root folder failed: {}", e)) - })?; - folder.root = delta; - Ok(folder) - } + pub fn from_delta(delta: PlainDelta) -> CollaborateResult { FolderPadBuilder::new().build_with_delta(delta) } pub fn delta(&self) -> &PlainDelta { &self.root } diff --git a/shared-lib/flowy-collaboration/src/folder/mod.rs b/shared-lib/flowy-collaboration/src/folder/mod.rs index c669aa6bbb..5f08d39be2 100644 --- a/shared-lib/flowy-collaboration/src/folder/mod.rs +++ b/shared-lib/flowy-collaboration/src/folder/mod.rs @@ -1,3 +1,4 @@ +mod builder; mod folder_pad; pub use folder_pad::*; diff --git a/shared-lib/flowy-collaboration/src/util.rs b/shared-lib/flowy-collaboration/src/util.rs index c18d045cf0..82fbc21b0d 100644 --- a/shared-lib/flowy-collaboration/src/util.rs +++ b/shared-lib/flowy-collaboration/src/util.rs @@ -4,16 +4,15 @@ use crate::{ protobuf::{DocumentInfo as DocumentInfoPB, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, }; use lib_ot::{ - core::{OperationTransformable, NEW_LINE, WHITESPACE}, + core::{Attributes, Delta, OperationTransformable, NEW_LINE, WHITESPACE}, errors::OTError, rich_text::RichTextDelta, }; +use serde::de::DeserializeOwned; use std::{ convert::TryInto, sync::atomic::{AtomicI64, Ordering::SeqCst}, }; -use serde::de::DeserializeOwned; -use lib_ot::core::{Attributes, Delta}; #[inline] pub fn find_newline(s: &str) -> Option { s.find(NEW_LINE) } @@ -47,10 +46,13 @@ impl RevIdCounter { pub fn set(&self, n: i64) { let _ = self.0.fetch_update(SeqCst, SeqCst, |_| Some(n)); } } -pub fn make_delta_from_revisions(revisions: Vec) -> CollaborateResult { - let mut delta = RichTextDelta::new(); +pub fn make_delta_from_revisions(revisions: Vec) -> CollaborateResult> +where + T: Attributes + DeserializeOwned, +{ + let mut delta = Delta::::new(); for revision in revisions { - let revision_delta = RichTextDelta::from_bytes(revision.delta_data).map_err(|e| { + let revision_delta = Delta::::from_bytes(revision.delta_data).map_err(|e| { let err_msg = format!("Deserialize remote revision failed: {:?}", e); CollaborateError::internal().context(err_msg) })?; @@ -59,7 +61,10 @@ pub fn make_delta_from_revisions(revisions: Vec) -> CollaborateResult< Ok(delta) } -pub fn make_delta_from_revision_pb(revisions: Vec) -> CollaborateResult> where T: Attributes + DeserializeOwned { +pub fn make_delta_from_revision_pb(revisions: Vec) -> CollaborateResult> +where + T: Attributes + DeserializeOwned, +{ let mut new_delta = Delta::::new(); for revision in revisions { let delta = Delta::::from_bytes(revision.delta_data).map_err(|e| {