From b307312a71cab9e09ba3a046aa143431a2ccdf55 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Sat, 23 Mar 2024 09:18:47 +0800 Subject: [PATCH] chore: fix database row sync (#4964) * chore: fix database row sync * ci: fix test * ci: fix web build * chore: bump collab --- frontend/appflowy_tauri/src-tauri/Cargo.lock | 15 +-- frontend/appflowy_tauri/src-tauri/Cargo.toml | 14 +-- frontend/appflowy_web/wasm-libs/Cargo.lock | 15 +-- frontend/appflowy_web/wasm-libs/Cargo.toml | 14 +-- .../wasm-libs/af-user/src/manager.rs | 6 +- .../wasm-libs/af-wasm/src/integrate/server.rs | 4 +- frontend/rust-lib/Cargo.lock | 15 +-- frontend/rust-lib/Cargo.toml | 14 +-- frontend/rust-lib/collab-integrate/Cargo.toml | 1 + .../collab-integrate/src/collab_builder.rs | 76 +++++++-------- .../src/native/plugin_provider.rs | 5 +- .../src/wasm/plugin_provider.rs | 5 +- .../rust-lib/event-integration/src/lib.rs | 23 +++-- .../flowy-core/src/integrate/trait_impls.rs | 86 ++++++++--------- .../rust-lib/flowy-database-pub/src/cloud.rs | 14 ++- .../rust-lib/flowy-database2/src/manager.rs | 42 +++++---- .../src/services/database/database_editor.rs | 1 + .../rust-lib/flowy-document-pub/src/cloud.rs | 3 +- .../rust-lib/flowy-document/src/manager.rs | 51 +++++----- .../flowy-document/tests/document/util.rs | 9 +- .../rust-lib/flowy-folder-pub/src/cloud.rs | 3 +- frontend/rust-lib/flowy-folder/src/manager.rs | 94 +++++++++---------- .../rust-lib/flowy-folder/src/manager_init.rs | 21 ++++- .../src/af_cloud/impls/database.rs | 9 +- .../src/af_cloud/impls/document.rs | 12 ++- .../flowy-server/src/af_cloud/impls/folder.rs | 13 ++- .../af_cloud/impls/user/cloud_service_impl.rs | 3 +- .../flowy-server/src/af_cloud/server.rs | 23 +---- .../src/local_server/impls/database.rs | 3 +- .../src/local_server/impls/document.rs | 3 +- .../src/local_server/impls/folder.rs | 3 +- .../src/local_server/impls/user.rs | 3 +- frontend/rust-lib/flowy-server/src/server.rs | 5 +- .../src/supabase/api/collab_storage.rs | 6 +- .../flowy-server/src/supabase/api/database.rs | 3 +- .../flowy-server/src/supabase/api/document.rs | 12 ++- .../flowy-server/src/supabase/api/folder.rs | 13 ++- .../flowy-server/src/supabase/api/request.rs | 8 +- .../flowy-server/src/supabase/api/user.rs | 4 +- .../tests/supabase_test/database_test.rs | 10 +- .../flowy-server/tests/supabase_test/util.rs | 12 ++- frontend/rust-lib/flowy-user-pub/src/cloud.rs | 3 +- .../src/anon_user/migrate_anon_user_collab.rs | 13 ++- .../data_import/appflowy_data_import.rs | 13 ++- .../user_manager/manager_user_awareness.rs | 6 +- 45 files changed, 364 insertions(+), 347 deletions(-) diff --git a/frontend/appflowy_tauri/src-tauri/Cargo.lock b/frontend/appflowy_tauri/src-tauri/Cargo.lock index 55704b05f5..0d8588cbec 100644 --- a/frontend/appflowy_tauri/src-tauri/Cargo.lock +++ b/frontend/appflowy_tauri/src-tauri/Cargo.lock @@ -838,7 +838,7 @@ dependencies = [ [[package]] name = "collab" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=25c4be5#25c4be5d60fa67f0d2de7f69cc8292a4506e07de" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "async-trait", @@ -862,7 +862,7 @@ dependencies = [ [[package]] name = "collab-database" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=25c4be5#25c4be5d60fa67f0d2de7f69cc8292a4506e07de" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "async-trait", @@ -892,7 +892,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=25c4be5#25c4be5d60fa67f0d2de7f69cc8292a4506e07de" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "collab", @@ -911,7 +911,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=25c4be5#25c4be5d60fa67f0d2de7f69cc8292a4506e07de" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "bytes", @@ -926,7 +926,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=25c4be5#25c4be5d60fa67f0d2de7f69cc8292a4506e07de" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "chrono", @@ -952,6 +952,7 @@ dependencies = [ "collab", "collab-entity", "collab-plugins", + "futures", "lib-infra", "parking_lot 0.12.1", "serde", @@ -963,7 +964,7 @@ dependencies = [ [[package]] name = "collab-plugins" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=25c4be5#25c4be5d60fa67f0d2de7f69cc8292a4506e07de" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "async-stream", @@ -1002,7 +1003,7 @@ dependencies = [ [[package]] name = "collab-user" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=25c4be5#25c4be5d60fa67f0d2de7f69cc8292a4506e07de" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "collab", diff --git a/frontend/appflowy_tauri/src-tauri/Cargo.toml b/frontend/appflowy_tauri/src-tauri/Cargo.toml index 6145a9c065..f393cc77c0 100644 --- a/frontend/appflowy_tauri/src-tauri/Cargo.toml +++ b/frontend/appflowy_tauri/src-tauri/Cargo.toml @@ -96,10 +96,10 @@ client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "ab9 # To switch to the local path, run: # scripts/tool/update_collab_source.sh # ⚠️⚠️⚠️️ -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } diff --git a/frontend/appflowy_web/wasm-libs/Cargo.lock b/frontend/appflowy_web/wasm-libs/Cargo.lock index 820a6e5274..bc93a7fbc0 100644 --- a/frontend/appflowy_web/wasm-libs/Cargo.lock +++ b/frontend/appflowy_web/wasm-libs/Cargo.lock @@ -636,7 +636,7 @@ dependencies = [ [[package]] name = "collab" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0970b2e1440134af7c83bb8fc80cac5d2dedebb7#0970b2e1440134af7c83bb8fc80cac5d2dedebb7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "async-trait", @@ -660,7 +660,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0970b2e1440134af7c83bb8fc80cac5d2dedebb7#0970b2e1440134af7c83bb8fc80cac5d2dedebb7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "collab", @@ -679,7 +679,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0970b2e1440134af7c83bb8fc80cac5d2dedebb7#0970b2e1440134af7c83bb8fc80cac5d2dedebb7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "bytes", @@ -694,7 +694,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0970b2e1440134af7c83bb8fc80cac5d2dedebb7#0970b2e1440134af7c83bb8fc80cac5d2dedebb7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "chrono", @@ -720,6 +720,7 @@ dependencies = [ "collab", "collab-entity", "collab-plugins", + "futures", "lib-infra", "parking_lot 0.12.1", "serde", @@ -731,7 +732,7 @@ dependencies = [ [[package]] name = "collab-plugins" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0970b2e1440134af7c83bb8fc80cac5d2dedebb7#0970b2e1440134af7c83bb8fc80cac5d2dedebb7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "async-stream", @@ -769,7 +770,7 @@ dependencies = [ [[package]] name = "collab-user" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0970b2e1440134af7c83bb8fc80cac5d2dedebb7#0970b2e1440134af7c83bb8fc80cac5d2dedebb7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "collab", @@ -5025,4 +5026,4 @@ dependencies = [ [[patch.unused]] name = "collab-database" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0970b2e1440134af7c83bb8fc80cac5d2dedebb7#0970b2e1440134af7c83bb8fc80cac5d2dedebb7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" diff --git a/frontend/appflowy_web/wasm-libs/Cargo.toml b/frontend/appflowy_web/wasm-libs/Cargo.toml index 3bf0651211..fd574f9e8b 100644 --- a/frontend/appflowy_web/wasm-libs/Cargo.toml +++ b/frontend/appflowy_web/wasm-libs/Cargo.toml @@ -65,10 +65,10 @@ client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "ab9 # To switch to the local path, run: # scripts/tool/update_collab_source.sh # ⚠️⚠️⚠️️ -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } diff --git a/frontend/appflowy_web/wasm-libs/af-user/src/manager.rs b/frontend/appflowy_web/wasm-libs/af-user/src/manager.rs index 4d64464d42..98931395ae 100644 --- a/frontend/appflowy_web/wasm-libs/af-user/src/manager.rs +++ b/frontend/appflowy_web/wasm-libs/af-user/src/manager.rs @@ -2,7 +2,7 @@ use crate::authenticate_user::AuthenticateUser; use crate::define::{user_profile_key, user_workspace_key, AF_USER_SESSION_KEY}; use af_persistence::store::{AppFlowyWASMStore, IndexddbStore}; use anyhow::Context; -use collab::core::collab::CollabDocState; +use collab::core::collab::DocStateSource; use collab_entity::CollabType; use collab_integrate::collab_builder::{AppFlowyCollabBuilder, CollabBuilderConfig}; use collab_integrate::{CollabKVDB, MutexCollab}; @@ -200,7 +200,7 @@ impl UserManager { &self, session: &Session, collab_db: Weak, - raw_data: CollabDocState, + raw_data: Vec, ) -> Result, FlowyError> { let collab_builder = self.collab_builder.upgrade().ok_or(FlowyError::new( ErrorCode::Internal, @@ -212,7 +212,7 @@ impl UserManager { session.user_id, &user_awareness_id.to_string(), CollabType::UserAwareness, - raw_data, + DocStateSource::FromDocState(raw_data), collab_db, CollabBuilderConfig::default().sync_enable(true), ) diff --git a/frontend/appflowy_web/wasm-libs/af-wasm/src/integrate/server.rs b/frontend/appflowy_web/wasm-libs/af-wasm/src/integrate/server.rs index 335c16381f..6f3c71025a 100644 --- a/frontend/appflowy_web/wasm-libs/af-wasm/src/integrate/server.rs +++ b/frontend/appflowy_web/wasm-libs/af-wasm/src/integrate/server.rs @@ -55,8 +55,8 @@ impl CollabCloudPluginProvider for ServerProviderWASM { CollabPluginProviderType::AppFlowyCloud } - fn get_plugins(&self, _context: CollabPluginProviderContext) -> Fut>> { - to_fut(async move { vec![] }) + fn get_plugins(&self, _context: CollabPluginProviderContext) -> Vec> { + vec![] } fn is_sync_enabled(&self) -> bool { diff --git a/frontend/rust-lib/Cargo.lock b/frontend/rust-lib/Cargo.lock index 12dc05bf02..cab5329f93 100644 --- a/frontend/rust-lib/Cargo.lock +++ b/frontend/rust-lib/Cargo.lock @@ -764,7 +764,7 @@ dependencies = [ [[package]] name = "collab" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=25c4be5#25c4be5d60fa67f0d2de7f69cc8292a4506e07de" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "async-trait", @@ -788,7 +788,7 @@ dependencies = [ [[package]] name = "collab-database" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=25c4be5#25c4be5d60fa67f0d2de7f69cc8292a4506e07de" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "async-trait", @@ -818,7 +818,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=25c4be5#25c4be5d60fa67f0d2de7f69cc8292a4506e07de" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "collab", @@ -837,7 +837,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=25c4be5#25c4be5d60fa67f0d2de7f69cc8292a4506e07de" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "bytes", @@ -852,7 +852,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=25c4be5#25c4be5d60fa67f0d2de7f69cc8292a4506e07de" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "chrono", @@ -878,6 +878,7 @@ dependencies = [ "collab", "collab-entity", "collab-plugins", + "futures", "lib-infra", "parking_lot 0.12.1", "serde", @@ -889,7 +890,7 @@ dependencies = [ [[package]] name = "collab-plugins" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=25c4be5#25c4be5d60fa67f0d2de7f69cc8292a4506e07de" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "async-stream", @@ -928,7 +929,7 @@ dependencies = [ [[package]] name = "collab-user" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=25c4be5#25c4be5d60fa67f0d2de7f69cc8292a4506e07de" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79be7f4c6e8e672b6f08ffda866876c01fc28e62#79be7f4c6e8e672b6f08ffda866876c01fc28e62" dependencies = [ "anyhow", "collab", diff --git a/frontend/rust-lib/Cargo.toml b/frontend/rust-lib/Cargo.toml index a24f99c133..016af82cc6 100644 --- a/frontend/rust-lib/Cargo.toml +++ b/frontend/rust-lib/Cargo.toml @@ -120,10 +120,10 @@ client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "ab9 # To switch to the local path, run: # scripts/tool/update_collab_source.sh # ⚠️⚠️⚠️️ -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "25c4be5" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79be7f4c6e8e672b6f08ffda866876c01fc28e62" } diff --git a/frontend/rust-lib/collab-integrate/Cargo.toml b/frontend/rust-lib/collab-integrate/Cargo.toml index 19f5e879ab..048eecabf5 100644 --- a/frontend/rust-lib/collab-integrate/Cargo.toml +++ b/frontend/rust-lib/collab-integrate/Cargo.toml @@ -19,6 +19,7 @@ parking_lot.workspace = true async-trait.workspace = true tokio = { workspace = true, features = ["sync"]} lib-infra = { workspace = true } +futures = "0.3" [features] default = [] \ No newline at end of file diff --git a/frontend/rust-lib/collab-integrate/src/collab_builder.rs b/frontend/rust-lib/collab-integrate/src/collab_builder.rs index 6efb41a01f..38c8f9d64a 100644 --- a/frontend/rust-lib/collab-integrate/src/collab_builder.rs +++ b/frontend/rust-lib/collab-integrate/src/collab_builder.rs @@ -3,7 +3,7 @@ use std::sync::{Arc, Weak}; use crate::CollabKVDB; use anyhow::Error; -use collab::core::collab::{CollabDocState, MutexCollab}; +use collab::core::collab::{DocStateSource, MutexCollab}; use collab::preclude::CollabBuilder; use collab_entity::{CollabObject, CollabType}; use collab_plugins::connect_state::{CollabConnectReachability, CollabConnectState}; @@ -68,7 +68,7 @@ impl Display for CollabPluginProviderContext { pub struct AppFlowyCollabBuilder { network_reachability: CollabConnectReachability, workspace_id: RwLock>, - plugin_provider: tokio::sync::RwLock>, + plugin_provider: RwLock>, snapshot_persistence: Mutex>>, #[cfg(not(target_arch = "wasm32"))] rocksdb_backup: Mutex>>, @@ -97,7 +97,7 @@ impl AppFlowyCollabBuilder { Self { network_reachability: CollabConnectReachability::new(), workspace_id: Default::default(), - plugin_provider: tokio::sync::RwLock::new(Arc::new(storage_provider)), + plugin_provider: RwLock::new(Arc::new(storage_provider)), snapshot_persistence: Default::default(), #[cfg(not(target_arch = "wasm32"))] rocksdb_backup: Default::default(), @@ -167,22 +167,20 @@ impl AppFlowyCollabBuilder { uid: i64, object_id: &str, object_type: CollabType, - collab_doc_state: CollabDocState, + collab_doc_state: DocStateSource, collab_db: Weak, build_config: CollabBuilderConfig, ) -> Result, Error> { let persistence_config = CollabPersistenceConfig::default(); - self - .build_with_config( - uid, - object_id, - object_type, - collab_db, - collab_doc_state, - persistence_config, - build_config, - ) - .await + self.build_with_config( + uid, + object_id, + object_type, + collab_db, + collab_doc_state, + persistence_config, + build_config, + ) } /// Creates a new collaboration builder with the custom configuration. @@ -200,13 +198,13 @@ impl AppFlowyCollabBuilder { /// - `collab_db`: A weak reference to the [CollabKVDB]. /// #[allow(clippy::too_many_arguments)] - pub async fn build_with_config( + pub fn build_with_config( &self, uid: i64, object_id: &str, object_type: CollabType, collab_db: Weak, - collab_doc_state: CollabDocState, + collab_doc_state: DocStateSource, #[allow(unused_variables)] persistence_config: CollabPersistenceConfig, build_config: CollabBuilderConfig, ) -> Result, Error> { @@ -240,23 +238,22 @@ impl AppFlowyCollabBuilder { { let collab_object = self.collab_object(uid, object_id, object_type)?; if build_config.sync_enable { - let provider_type = self.plugin_provider.read().await.provider_type(); + let provider_type = self.plugin_provider.read().provider_type(); let span = tracing::span!(tracing::Level::TRACE, "collab_builder", object_id = %object_id); let _enter = span.enter(); match provider_type { CollabPluginProviderType::AppFlowyCloud => { trace!("init appflowy cloud collab plugins"); let local_collab = Arc::downgrade(&collab); - let plugins = self - .plugin_provider - .read() - .await - .get_plugins(CollabPluginProviderContext::AppFlowyCloud { - uid, - collab_object, - local_collab, - }) - .await; + let plugins = + self + .plugin_provider + .read() + .get_plugins(CollabPluginProviderContext::AppFlowyCloud { + uid, + collab_object, + local_collab, + }); trace!("add appflowy cloud collab plugins: {}", plugins.len()); for plugin in plugins { @@ -269,17 +266,16 @@ impl AppFlowyCollabBuilder { trace!("init supabase collab plugins"); let local_collab = Arc::downgrade(&collab); let local_collab_db = collab_db.clone(); - let plugins = self - .plugin_provider - .read() - .await - .get_plugins(CollabPluginProviderContext::Supabase { - uid, - collab_object, - local_collab, - local_collab_db, - }) - .await; + let plugins = + self + .plugin_provider + .read() + .get_plugins(CollabPluginProviderContext::Supabase { + uid, + collab_object, + local_collab, + local_collab_db, + }); for plugin in plugins { collab.lock().add_plugin(plugin); } @@ -291,7 +287,7 @@ impl AppFlowyCollabBuilder { } #[cfg(target_arch = "wasm32")] - collab.lock().initialize().await; + futures::executor::block_on(collab.lock().initialize()); #[cfg(not(target_arch = "wasm32"))] collab.lock().initialize(); diff --git a/frontend/rust-lib/collab-integrate/src/native/plugin_provider.rs b/frontend/rust-lib/collab-integrate/src/native/plugin_provider.rs index f5b6ce5225..94256bb439 100644 --- a/frontend/rust-lib/collab-integrate/src/native/plugin_provider.rs +++ b/frontend/rust-lib/collab-integrate/src/native/plugin_provider.rs @@ -1,12 +1,11 @@ use crate::collab_builder::{CollabPluginProviderContext, CollabPluginProviderType}; use collab::preclude::CollabPlugin; -use lib_infra::future::Fut; use std::sync::Arc; pub trait CollabCloudPluginProvider: Send + Sync + 'static { fn provider_type(&self) -> CollabPluginProviderType; - fn get_plugins(&self, context: CollabPluginProviderContext) -> Fut>>; + fn get_plugins(&self, context: CollabPluginProviderContext) -> Vec>; fn is_sync_enabled(&self) -> bool; } @@ -19,7 +18,7 @@ where (**self).provider_type() } - fn get_plugins(&self, context: CollabPluginProviderContext) -> Fut>> { + fn get_plugins(&self, context: CollabPluginProviderContext) -> Vec> { (**self).get_plugins(context) } diff --git a/frontend/rust-lib/collab-integrate/src/wasm/plugin_provider.rs b/frontend/rust-lib/collab-integrate/src/wasm/plugin_provider.rs index 86c4a26a63..545e6c461c 100644 --- a/frontend/rust-lib/collab-integrate/src/wasm/plugin_provider.rs +++ b/frontend/rust-lib/collab-integrate/src/wasm/plugin_provider.rs @@ -2,12 +2,11 @@ use crate::collab_builder::{CollabPluginProviderContext, CollabPluginProviderTyp use collab::preclude::CollabPlugin; use lib_infra::future::Fut; use std::rc::Rc; -use std::sync::Arc; pub trait CollabCloudPluginProvider: 'static { fn provider_type(&self) -> CollabPluginProviderType; - fn get_plugins(&self, context: CollabPluginProviderContext) -> Fut>>; + fn get_plugins(&self, context: CollabPluginProviderContext) -> Vec>; fn is_sync_enabled(&self) -> bool; } @@ -20,7 +19,7 @@ where (**self).provider_type() } - fn get_plugins(&self, context: CollabPluginProviderContext) -> Fut>> { + fn get_plugins(&self, context: CollabPluginProviderContext) -> Vec> { (**self).get_plugins(context) } diff --git a/frontend/rust-lib/event-integration/src/lib.rs b/frontend/rust-lib/event-integration/src/lib.rs index a91125ca54..f335e290e8 100644 --- a/frontend/rust-lib/event-integration/src/lib.rs +++ b/frontend/rust-lib/event-integration/src/lib.rs @@ -1,4 +1,4 @@ -use collab::core::collab::CollabDocState; +use collab::core::collab::DocStateSource; use collab::core::origin::CollabOrigin; use collab_document::blocks::DocumentData; use collab_document::document::Document; @@ -108,31 +108,34 @@ impl EventIntegrationTest { pub async fn get_collab_doc_state( &self, oid: &str, - collay_type: CollabType, - ) -> Result { + collab_type: CollabType, + ) -> Result, FlowyError> { let server = self.server_provider.get_server().unwrap(); let workspace_id = self.get_current_workspace().await.id; let uid = self.get_user_profile().await?.id; let doc_state = server .folder_service() - .get_folder_doc_state(&workspace_id, uid, collay_type, oid) + .get_folder_doc_state(&workspace_id, uid, collab_type, oid) .await?; Ok(doc_state) } } -pub fn document_data_from_document_doc_state( - doc_id: &str, - doc_state: CollabDocState, -) -> DocumentData { +pub fn document_data_from_document_doc_state(doc_id: &str, doc_state: Vec) -> DocumentData { document_from_document_doc_state(doc_id, doc_state) .get_document_data() .unwrap() } -pub fn document_from_document_doc_state(doc_id: &str, doc_state: CollabDocState) -> Document { - Document::from_doc_state(CollabOrigin::Empty, doc_state, doc_id, vec![]).unwrap() +pub fn document_from_document_doc_state(doc_id: &str, doc_state: Vec) -> Document { + Document::from_doc_state( + CollabOrigin::Empty, + DocStateSource::FromDocState(doc_state), + doc_id, + vec![], + ) + .unwrap() } async fn init_core(config: AppFlowyCoreConfig) -> AppFlowyCore { diff --git a/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs b/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs index 9e7a4c0dfa..b920e4116d 100644 --- a/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs +++ b/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::Error; use client_api::collab_sync::{SinkConfig, SyncObject, SyncPlugin}; -use collab::core::collab::CollabDocState; + use collab::core::origin::{CollabClient, CollabOrigin}; use collab::preclude::CollabPlugin; use collab_entity::CollabType; @@ -26,7 +26,7 @@ use flowy_server_pub::supabase_config::SupabaseConfiguration; use flowy_storage::ObjectValue; use flowy_user_pub::cloud::{UserCloudService, UserCloudServiceProvider}; use flowy_user_pub::entities::{Authenticator, UserTokenState}; -use lib_infra::future::{to_fut, Fut, FutureResult}; +use lib_infra::future::FutureResult; use crate::integrate::server::{Server, ServerProvider}; @@ -184,7 +184,7 @@ impl FolderCloudService for ServerProvider { uid: i64, collab_type: CollabType, object_id: &str, - ) -> FutureResult { + ) -> FutureResult, Error> { let object_id = object_id.to_string(); let workspace_id = workspace_id.to_string(); let server = self.get_server(); @@ -225,7 +225,7 @@ impl DatabaseCloudService for ServerProvider { object_id: &str, collab_type: CollabType, workspace_id: &str, - ) -> FutureResult { + ) -> FutureResult, Error> { let workspace_id = workspace_id.to_string(); let server = self.get_server(); let database_id = object_id.to_string(); @@ -274,7 +274,7 @@ impl DocumentCloudService for ServerProvider { &self, document_id: &str, workspace_id: &str, - ) -> FutureResult { + ) -> FutureResult, FlowyError> { let workspace_id = workspace_id.to_string(); let document_id = document_id.to_string(); let server = self.get_server(); @@ -326,61 +326,58 @@ impl CollabCloudPluginProvider for ServerProvider { } #[instrument(level = "debug", skip(self, context), fields(server_type = %self.get_server_type()))] - fn get_plugins(&self, context: CollabPluginProviderContext) -> Fut>> { + fn get_plugins(&self, context: CollabPluginProviderContext) -> Vec> { // If the user is local, we don't need to create a sync plugin. if self.get_server_type().is_local() { debug!( "User authenticator is local, skip create sync plugin for: {}", context ); - return to_fut(async move { vec![] }); + return vec![]; } match context { - CollabPluginProviderContext::Local => to_fut(async move { vec![] }), + CollabPluginProviderContext::Local => vec![], CollabPluginProviderContext::AppFlowyCloud { uid: _, collab_object, local_collab, } => { if let Ok(server) = self.get_server() { - to_fut(async move { - let mut plugins: Vec> = vec![]; + // to_fut(async move { + let mut plugins: Vec> = vec![]; + // If the user is local, we don't need to create a sync plugin. - // If the user is local, we don't need to create a sync plugin. - - match server.collab_ws_channel(&collab_object.object_id).await { - Ok(Some((channel, ws_connect_state, is_connected))) => { - let origin = CollabOrigin::Client(CollabClient::new( - collab_object.uid, - collab_object.device_id.clone(), - )); - let sync_object = SyncObject::from(collab_object); - let (sink, stream) = (channel.sink(), channel.stream()); - let sink_config = SinkConfig::new().send_timeout(8); - let sync_plugin = SyncPlugin::new( - origin, - sync_object, - local_collab, - sink, - sink_config, - stream, - Some(channel), - !is_connected, - ws_connect_state, - ); - plugins.push(Box::new(sync_plugin)); - }, - Ok(None) => { - tracing::error!("🔴Failed to get collab ws channel: channel is none"); - }, - Err(err) => tracing::error!("🔴Failed to get collab ws channel: {:?}", err), - } - - plugins - }) + match server.collab_ws_channel(&collab_object.object_id) { + Ok(Some((channel, ws_connect_state, is_connected))) => { + let origin = CollabOrigin::Client(CollabClient::new( + collab_object.uid, + collab_object.device_id.clone(), + )); + let sync_object = SyncObject::from(collab_object); + let (sink, stream) = (channel.sink(), channel.stream()); + let sink_config = SinkConfig::new().send_timeout(8); + let sync_plugin = SyncPlugin::new( + origin, + sync_object, + local_collab, + sink, + sink_config, + stream, + Some(channel), + !is_connected, + ws_connect_state, + ); + plugins.push(Box::new(sync_plugin)); + }, + Ok(None) => { + tracing::error!("🔴Failed to get collab ws channel: channel is none"); + }, + Err(err) => tracing::error!("🔴Failed to get collab ws channel: {:?}", err), + } + plugins } else { - to_fut(async move { vec![] }) + vec![] } }, CollabPluginProviderContext::Supabase { @@ -404,8 +401,7 @@ impl CollabCloudPluginProvider for ServerProvider { local_collab_db, ))); } - - to_fut(async move { plugins }) + plugins }, } } diff --git a/frontend/rust-lib/flowy-database-pub/src/cloud.rs b/frontend/rust-lib/flowy-database-pub/src/cloud.rs index 5e1bb5e1c9..b92beb4fd1 100644 --- a/frontend/rust-lib/flowy-database-pub/src/cloud.rs +++ b/frontend/rust-lib/flowy-database-pub/src/cloud.rs @@ -1,12 +1,10 @@ +use anyhow::Error; +use collab::core::collab::DocStateSource; +use collab_entity::CollabType; +use lib_infra::future::FutureResult; use std::collections::HashMap; -use anyhow::Error; -use collab::core::collab::CollabDocState; -use collab_entity::CollabType; - -use lib_infra::future::FutureResult; - -pub type CollabDocStateByOid = HashMap; +pub type CollabDocStateByOid = HashMap; /// A trait for database cloud service. /// Each kind of server should implement this trait. Check out the [AppFlowyServerProvider] of @@ -17,7 +15,7 @@ pub trait DatabaseCloudService: Send + Sync { object_id: &str, collab_type: CollabType, workspace_id: &str, - ) -> FutureResult; + ) -> FutureResult, Error>; fn batch_get_database_object_doc_state( &self, diff --git a/frontend/rust-lib/flowy-database2/src/manager.rs b/frontend/rust-lib/flowy-database2/src/manager.rs index bf5f1505f2..2d61efb89c 100644 --- a/frontend/rust-lib/flowy-database2/src/manager.rs +++ b/frontend/rust-lib/flowy-database2/src/manager.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::num::NonZeroUsize; use std::sync::{Arc, Weak}; -use collab::core::collab::{CollabDocState, MutexCollab}; +use collab::core::collab::{DocStateSource, MutexCollab}; use collab_database::blocks::BlockEvent; use collab_database::database::{get_inline_view_id, DatabaseData, MutexDatabase}; use collab_database::error::DatabaseError; @@ -12,7 +12,7 @@ use collab_database::user::{ use collab_database::views::{CreateDatabaseParams, CreateViewParams, DatabaseLayout}; use collab_entity::CollabType; use collab_plugins::local_storage::kv::KVTransactionDB; -use futures::executor::block_on; + use lru::LruCache; use tokio::sync::{Mutex, RwLock}; use tracing::{event, instrument, trace}; @@ -98,7 +98,7 @@ impl DatabaseManager { }; let config = CollabPersistenceConfig::new().snapshot_per_update(100); - let mut workspace_database_doc_state = CollabDocState::default(); + let mut workspace_database_doc_state = DocStateSource::FromDisk; // If the workspace database not exist in disk, try to fetch from remote. if !self.is_collab_exist(uid, &collab_db, &workspace_database_object_id) { trace!("workspace database not exist, try to fetch from remote"); @@ -111,8 +111,8 @@ impl DatabaseManager { ) .await { - Ok(remote_doc_state) => { - workspace_database_doc_state = remote_doc_state; + Ok(doc_state) => { + workspace_database_doc_state = DocStateSource::FromDocState(doc_state); }, Err(err) => { return Err(FlowyError::record_not_found().with_context(format!( @@ -423,7 +423,7 @@ impl DatabaseCollabService for UserDatabaseCollabServiceImpl { &self, object_id: &str, object_ty: CollabType, - ) -> CollabFuture> { + ) -> CollabFuture> { let workspace_id = self.workspace_id.clone(); let object_id = object_id.to_string(); let weak_cloud_service = Arc::downgrade(&self.cloud_service); @@ -431,13 +431,13 @@ impl DatabaseCollabService for UserDatabaseCollabServiceImpl { match weak_cloud_service.upgrade() { None => { tracing::warn!("Cloud service is dropped"); - Ok(vec![]) + Ok(DocStateSource::FromDocState(vec![])) }, Some(cloud_service) => { - let updates = cloud_service + let doc_state = cloud_service .get_database_object_doc_state(&object_id, object_ty, &workspace_id) .await?; - Ok(updates) + Ok(DocStateSource::FromDocState(doc_state)) }, } }) @@ -472,18 +472,20 @@ impl DatabaseCollabService for UserDatabaseCollabServiceImpl { object_id: &str, object_type: CollabType, collab_db: Weak, - collab_raw_data: CollabDocState, + collab_raw_data: DocStateSource, persistence_config: CollabPersistenceConfig, ) -> Arc { - block_on(self.collab_builder.build_with_config( - uid, - object_id, - object_type, - collab_db, - collab_raw_data, - persistence_config, - CollabBuilderConfig::default().sync_enable(true), - )) - .unwrap() + self + .collab_builder + .build_with_config( + uid, + object_id, + object_type, + collab_db, + collab_raw_data, + persistence_config, + CollabBuilderConfig::default().sync_enable(true), + ) + .unwrap() } } diff --git a/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs b/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs index d9f7ec2541..dc11653262 100644 --- a/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs +++ b/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs @@ -805,6 +805,7 @@ impl DatabaseEditor { }?; (field, database.get_cell(field_id, &row_id).cell) }; + let new_cell = apply_cell_changeset(cell_changeset, cell, &field, Some(self.cell_cache.clone()))?; self.update_cell(view_id, row_id, field_id, new_cell).await diff --git a/frontend/rust-lib/flowy-document-pub/src/cloud.rs b/frontend/rust-lib/flowy-document-pub/src/cloud.rs index 7ff9cd6a36..2f4da1bd37 100644 --- a/frontend/rust-lib/flowy-document-pub/src/cloud.rs +++ b/frontend/rust-lib/flowy-document-pub/src/cloud.rs @@ -1,5 +1,4 @@ use anyhow::Error; -use collab::core::collab::CollabDocState; pub use collab_document::blocks::DocumentData; use flowy_error::FlowyError; @@ -13,7 +12,7 @@ pub trait DocumentCloudService: Send + Sync + 'static { &self, document_id: &str, workspace_id: &str, - ) -> FutureResult; + ) -> FutureResult, FlowyError>; fn get_document_snapshots( &self, diff --git a/frontend/rust-lib/flowy-document/src/manager.rs b/frontend/rust-lib/flowy-document/src/manager.rs index 2251b9d260..309d65e03e 100644 --- a/frontend/rust-lib/flowy-document/src/manager.rs +++ b/frontend/rust-lib/flowy-document/src/manager.rs @@ -2,7 +2,7 @@ use std::num::NonZeroUsize; use std::sync::Arc; use std::sync::Weak; -use collab::core::collab::{CollabDocState, MutexCollab}; +use collab::core::collab::{DocStateSource, MutexCollab}; use collab::core::collab_plugin::EncodedCollab; use collab::core::origin::CollabOrigin; use collab::preclude::Collab; @@ -16,8 +16,6 @@ use lru::LruCache; use parking_lot::Mutex; use tokio::io::AsyncWriteExt; use tracing::error; -use tracing::info; -use tracing::warn; use tracing::{event, instrument}; use collab_integrate::collab_builder::{AppFlowyCollabBuilder, CollabBuilderConfig}; @@ -122,7 +120,7 @@ impl DocumentManager { .doc_state .to_vec(); let collab = self - .collab_for_document(uid, doc_id, doc_state, false) + .collab_for_document(uid, doc_id, DocStateSource::FromDocState(doc_state), false) .await?; collab.lock().flush(); Ok(()) @@ -138,14 +136,16 @@ impl DocumentManager { return Ok(doc); } - let mut doc_state = CollabDocState::default(); + let mut doc_state = DocStateSource::FromDisk; // If the document does not exist in local disk, try get the doc state from the cloud. This happens // When user_device_a create a document and user_device_b open the document. if !self.is_doc_exist(doc_id).await? { - doc_state = self - .cloud_service - .get_document_doc_state(doc_id, &self.user_service.workspace_id()?) - .await?; + doc_state = DocStateSource::FromDocState( + self + .cloud_service + .get_document_doc_state(doc_id, &self.user_service.workspace_id()?) + .await?, + ); // the doc_state should not be empty if remote return the doc state without error. if doc_state.is_empty() { @@ -183,16 +183,16 @@ impl DocumentManager { } pub async fn get_document_data(&self, doc_id: &str) -> FlowyResult { - let mut updates = vec![]; + let mut doc_state = vec![]; if !self.is_doc_exist(doc_id).await? { - updates = self + doc_state = self .cloud_service .get_document_doc_state(doc_id, &self.user_service.workspace_id()?) .await?; } let uid = self.user_service.user_id()?; let collab = self - .collab_for_document(uid, doc_id, updates, false) + .collab_for_document(uid, doc_id, DocStateSource::FromDocState(doc_state), false) .await?; Document::open(collab)? .get_document_data() @@ -284,7 +284,7 @@ impl DocumentManager { #[cfg(not(target_arch = "wasm32"))] { if tokio::fs::metadata(&local_file_path).await.is_ok() { - warn!("file already exist in user local disk: {}", local_file_path); + tracing::warn!("file already exist in user local disk: {}", local_file_path); return Ok(()); } @@ -298,7 +298,7 @@ impl DocumentManager { .await?; let n = file.write(&object_value.raw).await?; - info!("downloaded {} bytes to file: {}", n, local_file_path); + tracing::info!("downloaded {} bytes to file: {}", n, local_file_path); } Ok(()) } @@ -326,22 +326,19 @@ impl DocumentManager { &self, uid: i64, doc_id: &str, - doc_state: CollabDocState, + doc_state: DocStateSource, sync_enable: bool, ) -> FlowyResult> { let db = self.user_service.collab_db(uid)?; - let collab = self - .collab_builder - .build_with_config( - uid, - doc_id, - CollabType::Document, - db, - doc_state, - CollabPersistenceConfig::default().snapshot_per_update(1000), - CollabBuilderConfig::default().sync_enable(sync_enable), - ) - .await?; + let collab = self.collab_builder.build_with_config( + uid, + doc_id, + CollabType::Document, + db, + doc_state, + CollabPersistenceConfig::default().snapshot_per_update(1000), + CollabBuilderConfig::default().sync_enable(sync_enable), + )?; Ok(collab) } diff --git a/frontend/rust-lib/flowy-document/tests/document/util.rs b/frontend/rust-lib/flowy-document/tests/document/util.rs index 860d3b7a40..7958418772 100644 --- a/frontend/rust-lib/flowy-document/tests/document/util.rs +++ b/frontend/rust-lib/flowy-document/tests/document/util.rs @@ -2,7 +2,6 @@ use std::ops::Deref; use std::sync::Arc; use anyhow::Error; -use collab::core::collab::CollabDocState; use collab::preclude::CollabPlugin; use collab_document::blocks::DocumentData; use collab_document::document_data::default_document_data; @@ -24,7 +23,7 @@ use flowy_document_pub::cloud::*; use flowy_error::{ErrorCode, FlowyError, FlowyResult}; use flowy_storage::ObjectStorageService; use lib_infra::async_trait::async_trait; -use lib_infra::future::{to_fut, Fut, FutureResult}; +use lib_infra::future::FutureResult; pub struct DocumentTest { inner: DocumentManager, @@ -135,7 +134,7 @@ impl DocumentCloudService for LocalTestDocumentCloudServiceImpl { &self, document_id: &str, _workspace_id: &str, - ) -> FutureResult { + ) -> FutureResult, FlowyError> { let document_id = document_id.to_string(); FutureResult::new(async move { Err(FlowyError::new( @@ -197,8 +196,8 @@ impl CollabCloudPluginProvider for DefaultCollabStorageProvider { CollabPluginProviderType::Local } - fn get_plugins(&self, _context: CollabPluginProviderContext) -> Fut>> { - to_fut(async move { vec![] }) + fn get_plugins(&self, _context: CollabPluginProviderContext) -> Vec> { + vec![] } fn is_sync_enabled(&self) -> bool { diff --git a/frontend/rust-lib/flowy-folder-pub/src/cloud.rs b/frontend/rust-lib/flowy-folder-pub/src/cloud.rs index 316795ca59..cee216a217 100644 --- a/frontend/rust-lib/flowy-folder-pub/src/cloud.rs +++ b/frontend/rust-lib/flowy-folder-pub/src/cloud.rs @@ -1,5 +1,4 @@ pub use anyhow::Error; -use collab::core::collab::CollabDocState; use collab_entity::CollabType; pub use collab_folder::{Folder, FolderData, Workspace}; use uuid::Uuid; @@ -36,7 +35,7 @@ pub trait FolderCloudService: Send + Sync + 'static { uid: i64, collab_type: CollabType, object_id: &str, - ) -> FutureResult; + ) -> FutureResult, Error>; fn batch_create_folder_collab_objects( &self, diff --git a/frontend/rust-lib/flowy-folder/src/manager.rs b/frontend/rust-lib/flowy-folder/src/manager.rs index 03bf15002e..310bcf582e 100644 --- a/frontend/rust-lib/flowy-folder/src/manager.rs +++ b/frontend/rust-lib/flowy-folder/src/manager.rs @@ -1,25 +1,3 @@ -use std::fmt::{Display, Formatter}; -use std::ops::Deref; -use std::sync::{Arc, Weak}; - -use collab::core::collab::{CollabDocState, MutexCollab}; -use collab_entity::CollabType; -use collab_folder::error::FolderError; -use collab_folder::{ - Folder, FolderData, FolderNotify, Section, SectionItem, TrashInfo, UserId, View, ViewLayout, - ViewUpdate, Workspace, -}; -use parking_lot::{Mutex, RwLock}; -use tracing::{error, info, instrument}; - -use collab_integrate::collab_builder::{AppFlowyCollabBuilder, CollabBuilderConfig}; -use collab_integrate::{CollabKVDB, CollabPersistenceConfig}; -use flowy_error::{ErrorCode, FlowyError, FlowyResult}; -use flowy_folder_pub::cloud::{gen_view_id, FolderCloudService}; -use flowy_folder_pub::folder_builder::ParentChildViews; - -use lib_infra::conditional_send_sync_trait; - use crate::entities::icon::UpdateViewIconParams; use crate::entities::{ view_pb_with_child_views, view_pb_without_child_views, CreateViewParams, CreateWorkspaceParams, @@ -38,6 +16,24 @@ use crate::util::{ folder_not_init_error, insert_parent_child_views, workspace_data_not_sync_error, }; use crate::view_operation::{create_view, FolderOperationHandler, FolderOperationHandlers}; +use collab::core::collab::{DocStateSource, MutexCollab}; +use collab_entity::CollabType; +use collab_folder::error::FolderError; +use collab_folder::{ + Folder, FolderData, FolderNotify, Section, SectionItem, TrashInfo, UserId, View, ViewLayout, + ViewUpdate, Workspace, +}; +use collab_integrate::collab_builder::{AppFlowyCollabBuilder, CollabBuilderConfig}; +use collab_integrate::{CollabKVDB, CollabPersistenceConfig}; +use flowy_error::{ErrorCode, FlowyError, FlowyResult}; +use flowy_folder_pub::cloud::{gen_view_id, FolderCloudService}; +use flowy_folder_pub::folder_builder::ParentChildViews; +use lib_infra::conditional_send_sync_trait; +use parking_lot::{Mutex, RwLock}; +use std::fmt::{Display, Formatter}; +use std::ops::Deref; +use std::sync::{Arc, Weak}; +use tracing::{error, info, instrument}; conditional_send_sync_trait! { "[crate::manager::FolderUser] represents the user for folder."; @@ -164,24 +160,21 @@ impl FolderManager { uid: i64, workspace_id: &str, collab_db: Weak, - collab_doc_state: CollabDocState, + doc_state: DocStateSource, folder_notifier: T, ) -> Result { let folder_notifier = folder_notifier.into(); - let collab = self - .collab_builder - .build_with_config( - uid, - workspace_id, - CollabType::Folder, - collab_db, - collab_doc_state, - CollabPersistenceConfig::new() - .enable_snapshot(true) - .snapshot_per_update(50), - CollabBuilderConfig::default().sync_enable(true), - ) - .await?; + let collab = self.collab_builder.build_with_config( + uid, + workspace_id, + CollabType::Folder, + collab_db, + doc_state, + CollabPersistenceConfig::new() + .enable_snapshot(true) + .snapshot_per_update(50), + CollabBuilderConfig::default().sync_enable(true), + )?; let (should_clear, err) = match Folder::open(UserId::from(uid), collab, folder_notifier) { Ok(folder) => { return Ok(folder); @@ -207,20 +200,17 @@ impl FolderManager { workspace_id: &str, collab_db: Weak, ) -> Result, FlowyError> { - let collab = self - .collab_builder - .build_with_config( - uid, - workspace_id, - CollabType::Folder, - collab_db, - vec![], - CollabPersistenceConfig::new() - .enable_snapshot(true) - .snapshot_per_update(50), - CollabBuilderConfig::default().sync_enable(true), - ) - .await?; + let collab = self.collab_builder.build_with_config( + uid, + workspace_id, + CollabType::Folder, + collab_db, + DocStateSource::FromDocState(vec![]), + CollabPersistenceConfig::new() + .enable_snapshot(true) + .snapshot_per_update(50), + CollabBuilderConfig::default().sync_enable(true), + )?; Ok(collab) } @@ -1229,7 +1219,7 @@ pub enum FolderInitDataSource { /// It means using the data stored on local disk to initialize the folder LocalDisk { create_if_not_exist: bool }, /// If there is no data stored on local disk, we will use the data from the server to initialize the folder - Cloud(CollabDocState), + Cloud(Vec), /// If the user is new, we use the [DefaultFolderBuilder] to create the default folder. FolderData(FolderData), } diff --git a/frontend/rust-lib/flowy-folder/src/manager_init.rs b/frontend/rust-lib/flowy-folder/src/manager_init.rs index f73ea35953..b3dbf98364 100644 --- a/frontend/rust-lib/flowy-folder/src/manager_init.rs +++ b/frontend/rust-lib/flowy-folder/src/manager_init.rs @@ -6,6 +6,7 @@ use collab_integrate::CollabKVDB; use flowy_error::{FlowyError, FlowyResult}; +use collab::core::collab::DocStateSource; use std::sync::{Arc, Weak}; use tracing::{event, Level}; @@ -54,7 +55,13 @@ impl FolderManager { if is_exist { event!(Level::INFO, "Init folder from local disk"); self - .make_folder(uid, &workspace_id, collab_db, vec![], folder_notifier) + .make_folder( + uid, + &workspace_id, + collab_db, + DocStateSource::FromDisk, + folder_notifier, + ) .await? } else if create_if_not_exist { // 2. if the folder doesn't exist and create_if_not_exist is true, create a default folder @@ -76,7 +83,7 @@ impl FolderManager { uid, &workspace_id, collab_db.clone(), - doc_state, + DocStateSource::FromDocState(doc_state), folder_notifier.clone(), ) .await? @@ -86,7 +93,13 @@ impl FolderManager { if doc_state.is_empty() { event!(Level::ERROR, "remote folder data is empty, open from local"); self - .make_folder(uid, &workspace_id, collab_db, vec![], folder_notifier) + .make_folder( + uid, + &workspace_id, + collab_db, + DocStateSource::FromDisk, + folder_notifier, + ) .await? } else { event!(Level::INFO, "Restore folder from remote data"); @@ -95,7 +108,7 @@ impl FolderManager { uid, &workspace_id, collab_db.clone(), - doc_state, + DocStateSource::FromDocState(doc_state), folder_notifier.clone(), ) .await? diff --git a/frontend/rust-lib/flowy-server/src/af_cloud/impls/database.rs b/frontend/rust-lib/flowy-server/src/af_cloud/impls/database.rs index ad5c7ce5cf..c369a260ea 100644 --- a/frontend/rust-lib/flowy-server/src/af_cloud/impls/database.rs +++ b/frontend/rust-lib/flowy-server/src/af_cloud/impls/database.rs @@ -2,7 +2,7 @@ use anyhow::Error; use client_api::entity::QueryCollabResult::{Failed, Success}; use client_api::entity::{QueryCollab, QueryCollabParams}; use client_api::error::ErrorCode::RecordNotFound; -use collab::core::collab::CollabDocState; +use collab::core::collab::DocStateSource; use collab::core::collab_plugin::EncodedCollab; use collab_entity::CollabType; use tracing::error; @@ -23,7 +23,7 @@ where object_id: &str, collab_type: CollabType, workspace_id: &str, - ) -> FutureResult { + ) -> FutureResult, Error> { let workspace_id = workspace_id.to_string(); let object_id = object_id.to_string(); let try_get_client = self.0.try_get_client(); @@ -73,7 +73,10 @@ where .flat_map(|(object_id, result)| match result { Success { encode_collab_v1 } => { match EncodedCollab::decode_from_bytes(&encode_collab_v1) { - Ok(encode) => Some((object_id, encode.doc_state.to_vec())), + Ok(encode) => Some(( + object_id, + DocStateSource::FromDocState(encode.doc_state.to_vec()), + )), Err(err) => { error!("Failed to decode collab: {}", err); None diff --git a/frontend/rust-lib/flowy-server/src/af_cloud/impls/document.rs b/frontend/rust-lib/flowy-server/src/af_cloud/impls/document.rs index 2712d272d3..7c5904ab1d 100644 --- a/frontend/rust-lib/flowy-server/src/af_cloud/impls/document.rs +++ b/frontend/rust-lib/flowy-server/src/af_cloud/impls/document.rs @@ -1,6 +1,6 @@ use anyhow::Error; use client_api::entity::{QueryCollab, QueryCollabParams}; -use collab::core::collab::CollabDocState; +use collab::core::collab::DocStateSource; use collab::core::origin::CollabOrigin; use collab_document::document::Document; use collab_entity::CollabType; @@ -21,7 +21,7 @@ where &self, document_id: &str, workspace_id: &str, - ) -> FutureResult { + ) -> FutureResult, FlowyError> { let workspace_id = workspace_id.to_string(); let try_get_client = self.0.try_get_client(); let document_id = document_id.to_string(); @@ -74,8 +74,12 @@ where .map_err(FlowyError::from)? .doc_state .to_vec(); - let document = - Document::from_doc_state(CollabOrigin::Empty, doc_state, &document_id, vec![])?; + let document = Document::from_doc_state( + CollabOrigin::Empty, + DocStateSource::FromDocState(doc_state), + &document_id, + vec![], + )?; Ok(document.get_document_data().ok()) }) } diff --git a/frontend/rust-lib/flowy-server/src/af_cloud/impls/folder.rs b/frontend/rust-lib/flowy-server/src/af_cloud/impls/folder.rs index dcc1b8aa3a..4706babfb2 100644 --- a/frontend/rust-lib/flowy-server/src/af_cloud/impls/folder.rs +++ b/frontend/rust-lib/flowy-server/src/af_cloud/impls/folder.rs @@ -2,7 +2,7 @@ use anyhow::Error; use client_api::entity::{ workspace_dto::CreateWorkspaceParam, CollabParams, QueryCollab, QueryCollabParams, }; -use collab::core::collab::CollabDocState; +use collab::core::collab::DocStateSource; use collab::core::origin::CollabOrigin; use collab_entity::CollabType; use collab_folder::RepeatedViewIdentifier; @@ -96,8 +96,13 @@ where .map_err(FlowyError::from)? .doc_state .to_vec(); - let folder = - Folder::from_collab_doc_state(uid, CollabOrigin::Empty, doc_state, &workspace_id, vec![])?; + let folder = Folder::from_collab_doc_state( + uid, + CollabOrigin::Empty, + DocStateSource::FromDocState(doc_state), + &workspace_id, + vec![], + )?; Ok(folder.get_folder_data()) }) } @@ -116,7 +121,7 @@ where _uid: i64, collab_type: CollabType, object_id: &str, - ) -> FutureResult { + ) -> FutureResult, Error> { let object_id = object_id.to_string(); let workspace_id = workspace_id.to_string(); let try_get_client = self.0.try_get_client(); diff --git a/frontend/rust-lib/flowy-server/src/af_cloud/impls/user/cloud_service_impl.rs b/frontend/rust-lib/flowy-server/src/af_cloud/impls/user/cloud_service_impl.rs index f035301cdc..4e900385cb 100644 --- a/frontend/rust-lib/flowy-server/src/af_cloud/impls/user/cloud_service_impl.rs +++ b/frontend/rust-lib/flowy-server/src/af_cloud/impls/user/cloud_service_impl.rs @@ -7,7 +7,6 @@ use client_api::entity::workspace_dto::{ }; use client_api::entity::{AFRole, AFWorkspace, AuthProvider, CollabParams, CreateCollabParams}; use client_api::{Client, ClientConfiguration}; -use collab::core::collab::CollabDocState; use collab_entity::CollabObject; use parking_lot::RwLock; @@ -239,7 +238,7 @@ where }) } - fn get_user_awareness_doc_state(&self, _uid: i64) -> FutureResult { + fn get_user_awareness_doc_state(&self, _uid: i64) -> FutureResult, FlowyError> { FutureResult::new(async { Ok(vec![]) }) } diff --git a/frontend/rust-lib/flowy-server/src/af_cloud/server.rs b/frontend/rust-lib/flowy-server/src/af_cloud/server.rs index 18e7d7ad3f..6cb8d8697c 100644 --- a/frontend/rust-lib/flowy-server/src/af_cloud/server.rs +++ b/frontend/rust-lib/flowy-server/src/af_cloud/server.rs @@ -25,7 +25,6 @@ use flowy_server_pub::af_cloud_config::AFCloudConfiguration; use flowy_user_pub::cloud::{UserCloudService, UserUpdate}; use flowy_user_pub::entities::UserTokenState; use lib_dispatch::prelude::af_spawn; -use lib_infra::future::FutureResult; use crate::af_cloud::impls::{ AFCloudDatabaseCloudServiceImpl, AFCloudDocumentCloudServiceImpl, AFCloudFileStorageServiceImpl, @@ -196,7 +195,7 @@ impl AppFlowyServer for AppFlowyCloudServer { fn collab_ws_channel( &self, _object_id: &str, - ) -> FutureResult< + ) -> Result< Option<( Arc>, WSConnectStateReceiver, @@ -204,22 +203,10 @@ impl AppFlowyServer for AppFlowyCloudServer { )>, Error, > { - if self.enable_sync.load(Ordering::SeqCst) { - let object_id = _object_id.to_string(); - let weak_ws_client = Arc::downgrade(&self.ws_client); - FutureResult::new(async move { - match weak_ws_client.upgrade() { - None => Ok(None), - Some(ws_client) => { - let channel = ws_client.subscribe_collab(object_id).ok(); - let connect_state_recv = ws_client.subscribe_connect_state(); - Ok(channel.map(|c| (c, connect_state_recv, ws_client.is_connected()))) - }, - } - }) - } else { - FutureResult::new(async { Ok(None) }) - } + let object_id = _object_id.to_string(); + let channel = self.ws_client.subscribe_collab(object_id).ok(); + let connect_state_recv = self.ws_client.subscribe_connect_state(); + Ok(channel.map(|c| (c, connect_state_recv, self.ws_client.is_connected()))) } fn file_storage(&self) -> Option> { diff --git a/frontend/rust-lib/flowy-server/src/local_server/impls/database.rs b/frontend/rust-lib/flowy-server/src/local_server/impls/database.rs index 9092c967a9..14b2c32aba 100644 --- a/frontend/rust-lib/flowy-server/src/local_server/impls/database.rs +++ b/frontend/rust-lib/flowy-server/src/local_server/impls/database.rs @@ -1,5 +1,4 @@ use anyhow::Error; -use collab::core::collab::CollabDocState; use collab_entity::CollabType; use flowy_database_pub::cloud::{CollabDocStateByOid, DatabaseCloudService, DatabaseSnapshot}; @@ -13,7 +12,7 @@ impl DatabaseCloudService for LocalServerDatabaseCloudServiceImpl { _object_id: &str, _collab_type: CollabType, _workspace_id: &str, - ) -> FutureResult { + ) -> FutureResult, Error> { FutureResult::new(async move { Ok(vec![]) }) } diff --git a/frontend/rust-lib/flowy-server/src/local_server/impls/document.rs b/frontend/rust-lib/flowy-server/src/local_server/impls/document.rs index e22d36bc04..bc712d03d0 100644 --- a/frontend/rust-lib/flowy-server/src/local_server/impls/document.rs +++ b/frontend/rust-lib/flowy-server/src/local_server/impls/document.rs @@ -1,5 +1,4 @@ use anyhow::Error; -use collab::core::collab::CollabDocState; use flowy_document_pub::cloud::*; use flowy_error::{ErrorCode, FlowyError}; @@ -12,7 +11,7 @@ impl DocumentCloudService for LocalServerDocumentCloudServiceImpl { &self, document_id: &str, _workspace_id: &str, - ) -> FutureResult { + ) -> FutureResult, FlowyError> { let document_id = document_id.to_string(); FutureResult::new(async move { Err(FlowyError::new( diff --git a/frontend/rust-lib/flowy-server/src/local_server/impls/folder.rs b/frontend/rust-lib/flowy-server/src/local_server/impls/folder.rs index 4920df3c51..ea0ee027b9 100644 --- a/frontend/rust-lib/flowy-server/src/local_server/impls/folder.rs +++ b/frontend/rust-lib/flowy-server/src/local_server/impls/folder.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use anyhow::{anyhow, Error}; -use collab::core::collab::CollabDocState; use collab_entity::CollabType; use flowy_folder_pub::cloud::{ @@ -59,7 +58,7 @@ impl FolderCloudService for LocalServerFolderCloudServiceImpl { _uid: i64, _collab_type: CollabType, _object_id: &str, - ) -> FutureResult { + ) -> FutureResult, Error> { FutureResult::new(async { Err(anyhow!( "Local server doesn't support get collab doc state from remote" diff --git a/frontend/rust-lib/flowy-server/src/local_server/impls/user.rs b/frontend/rust-lib/flowy-server/src/local_server/impls/user.rs index 94e55bd4f0..62bd938c1d 100644 --- a/frontend/rust-lib/flowy-server/src/local_server/impls/user.rs +++ b/frontend/rust-lib/flowy-server/src/local_server/impls/user.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use collab::core::collab::CollabDocState; use collab_entity::CollabObject; use lazy_static::lazy_static; use parking_lot::Mutex; @@ -149,7 +148,7 @@ impl UserCloudService for LocalServerUserAuthServiceImpl { FutureResult::new(async { Ok(vec![]) }) } - fn get_user_awareness_doc_state(&self, _uid: i64) -> FutureResult { + fn get_user_awareness_doc_state(&self, _uid: i64) -> FutureResult, FlowyError> { FutureResult::new(async { Ok(vec![]) }) } diff --git a/frontend/rust-lib/flowy-server/src/server.rs b/frontend/rust-lib/flowy-server/src/server.rs index a20fe8465a..5459d8735b 100644 --- a/frontend/rust-lib/flowy-server/src/server.rs +++ b/frontend/rust-lib/flowy-server/src/server.rs @@ -16,7 +16,6 @@ use flowy_document_pub::cloud::DocumentCloudService; use flowy_folder_pub::cloud::FolderCloudService; use flowy_user_pub::cloud::UserCloudService; use flowy_user_pub::entities::UserTokenState; -use lib_infra::future::FutureResult; pub trait AppFlowyEncryption: Send + Sync + 'static { fn get_secret(&self) -> Option; @@ -123,7 +122,7 @@ pub trait AppFlowyServer: Send + Sync + 'static { fn collab_ws_channel( &self, _object_id: &str, - ) -> FutureResult< + ) -> Result< Option<( Arc>, WSConnectStateReceiver, @@ -131,7 +130,7 @@ pub trait AppFlowyServer: Send + Sync + 'static { )>, anyhow::Error, > { - FutureResult::new(async { Ok(None) }) + Ok(None) } fn file_storage(&self) -> Option>; diff --git a/frontend/rust-lib/flowy-server/src/supabase/api/collab_storage.rs b/frontend/rust-lib/flowy-server/src/supabase/api/collab_storage.rs index 3138e72f86..a27a6221f1 100644 --- a/frontend/rust-lib/flowy-server/src/supabase/api/collab_storage.rs +++ b/frontend/rust-lib/flowy-server/src/supabase/api/collab_storage.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, Weak}; use anyhow::Error; use chrono::{DateTime, Utc}; use client_api::collab_sync::collab_msg::MsgId; -use collab::core::collab::CollabDocState; +use collab::core::collab::DocStateSource; use collab::preclude::merge_updates_v1; use collab_entity::CollabObject; use collab_plugins::cloud_storage::{ @@ -62,7 +62,7 @@ where true } - async fn get_doc_state(&self, object: &CollabObject) -> Result { + async fn get_doc_state(&self, object: &CollabObject) -> Result { let postgrest = self.server.try_get_weak_postgrest()?; let action = FetchObjectUpdateAction::new( object.object_id.clone(), @@ -70,7 +70,7 @@ where postgrest, ); let doc_state = action.run().await?; - Ok(doc_state) + Ok(DocStateSource::FromDocState(doc_state)) } async fn get_snapshots(&self, object_id: &str, limit: usize) -> Vec { diff --git a/frontend/rust-lib/flowy-server/src/supabase/api/database.rs b/frontend/rust-lib/flowy-server/src/supabase/api/database.rs index afd6a2cac8..b5e3689e19 100644 --- a/frontend/rust-lib/flowy-server/src/supabase/api/database.rs +++ b/frontend/rust-lib/flowy-server/src/supabase/api/database.rs @@ -1,5 +1,4 @@ use anyhow::Error; -use collab::core::collab::CollabDocState; use collab_entity::CollabType; use tokio::sync::oneshot::channel; @@ -31,7 +30,7 @@ where object_id: &str, collab_type: CollabType, _workspace_id: &str, - ) -> FutureResult { + ) -> FutureResult, Error> { let try_get_postgrest = self.server.try_get_weak_postgrest(); let object_id = object_id.to_string(); let (tx, rx) = channel(); diff --git a/frontend/rust-lib/flowy-server/src/supabase/api/document.rs b/frontend/rust-lib/flowy-server/src/supabase/api/document.rs index 869421ea75..2d2738f391 100644 --- a/frontend/rust-lib/flowy-server/src/supabase/api/document.rs +++ b/frontend/rust-lib/flowy-server/src/supabase/api/document.rs @@ -1,5 +1,5 @@ use anyhow::Error; -use collab::core::collab::CollabDocState; +use collab::core::collab::DocStateSource; use collab::core::origin::CollabOrigin; use collab_document::blocks::DocumentData; use collab_document::document::Document; @@ -33,7 +33,7 @@ where &self, document_id: &str, workspace_id: &str, - ) -> FutureResult { + ) -> FutureResult, FlowyError> { let try_get_postgrest = self.server.try_get_weak_postgrest(); let document_id = document_id.to_string(); let (tx, rx) = channel(); @@ -94,8 +94,12 @@ where let action = FetchObjectUpdateAction::new(document_id.clone(), CollabType::Document, postgrest); let doc_state = action.run_with_fix_interval(5, 10).await?; - let document = - Document::from_doc_state(CollabOrigin::Empty, doc_state, &document_id, vec![])?; + let document = Document::from_doc_state( + CollabOrigin::Empty, + DocStateSource::FromDocState(doc_state), + &document_id, + vec![], + )?; Ok(document.get_document_data().ok()) } .await, diff --git a/frontend/rust-lib/flowy-server/src/supabase/api/folder.rs b/frontend/rust-lib/flowy-server/src/supabase/api/folder.rs index 81c19a015d..04b20fc7ed 100644 --- a/frontend/rust-lib/flowy-server/src/supabase/api/folder.rs +++ b/frontend/rust-lib/flowy-server/src/supabase/api/folder.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use anyhow::{anyhow, Error}; use chrono::{DateTime, Utc}; -use collab::core::collab::CollabDocState; +use collab::core::collab::DocStateSource; use collab::core::origin::CollabOrigin; use collab_entity::CollabType; use serde_json::Value; @@ -102,8 +102,13 @@ where let doc_state = merge_updates_v1(&updates) .map_err(|err| anyhow::anyhow!("merge updates failed: {:?}", err))?; - let folder = - Folder::from_collab_doc_state(uid, CollabOrigin::Empty, doc_state, &workspace_id, vec![])?; + let folder = Folder::from_collab_doc_state( + uid, + CollabOrigin::Empty, + DocStateSource::FromDocState(doc_state), + &workspace_id, + vec![], + )?; Ok(folder.get_folder_data()) }) } @@ -137,7 +142,7 @@ where _uid: i64, collab_type: CollabType, object_id: &str, - ) -> FutureResult { + ) -> FutureResult, Error> { let try_get_postgrest = self.server.try_get_weak_postgrest(); let object_id = object_id.to_string(); let (tx, rx) = channel(); diff --git a/frontend/rust-lib/flowy-server/src/supabase/api/request.rs b/frontend/rust-lib/flowy-server/src/supabase/api/request.rs index 4dab453ddd..5601b4a20f 100644 --- a/frontend/rust-lib/flowy-server/src/supabase/api/request.rs +++ b/frontend/rust-lib/flowy-server/src/supabase/api/request.rs @@ -7,7 +7,7 @@ use std::time::Duration; use anyhow::Error; use chrono::{DateTime, Utc}; -use collab::core::collab::CollabDocState; +use collab::core::collab::DocStateSource; use collab_entity::{CollabObject, CollabType}; use collab_plugins::cloud_storage::RemoteCollabSnapshot; use serde_json::Value; @@ -60,7 +60,7 @@ impl FetchObjectUpdateAction { impl Action for FetchObjectUpdateAction { type Future = Pin> + Send>>; - type Item = CollabDocState; + type Item = Vec; type Error = anyhow::Error; fn run(&mut self) -> Self::Future { @@ -284,7 +284,7 @@ pub async fn batch_get_updates_from_server( match parser_updates_form_json(record.clone(), &postgrest.secret()) { Ok(items) => { if items.is_empty() { - updates_by_oid.insert(oid.to_string(), vec![]); + updates_by_oid.insert(oid.to_string(), DocStateSource::FromDocState(vec![])); } else { let updates = items .iter() @@ -293,7 +293,7 @@ pub async fn batch_get_updates_from_server( let doc_state = merge_updates_v1(&updates) .map_err(|err| anyhow::anyhow!("merge updates failed: {:?}", err))?; - updates_by_oid.insert(oid.to_string(), doc_state); + updates_by_oid.insert(oid.to_string(), DocStateSource::FromDocState(doc_state)); } }, Err(e) => { diff --git a/frontend/rust-lib/flowy-server/src/supabase/api/user.rs b/frontend/rust-lib/flowy-server/src/supabase/api/user.rs index 5e58b7b677..34490e3f89 100644 --- a/frontend/rust-lib/flowy-server/src/supabase/api/user.rs +++ b/frontend/rust-lib/flowy-server/src/supabase/api/user.rs @@ -6,7 +6,7 @@ use std::sync::{Arc, Weak}; use std::time::Duration; use anyhow::Error; -use collab::core::collab::{CollabDocState, MutexCollab}; +use collab::core::collab::MutexCollab; use collab::core::origin::CollabOrigin; use collab_entity::{CollabObject, CollabType}; use parking_lot::RwLock; @@ -249,7 +249,7 @@ where }) } - fn get_user_awareness_doc_state(&self, uid: i64) -> FutureResult { + fn get_user_awareness_doc_state(&self, uid: i64) -> FutureResult, FlowyError> { let try_get_postgrest = self.server.try_get_weak_postgrest(); let awareness_id = uid.to_string(); let (tx, rx) = channel(); diff --git a/frontend/rust-lib/flowy-server/tests/supabase_test/database_test.rs b/frontend/rust-lib/flowy-server/tests/supabase_test/database_test.rs index da17de9c65..4eabe8c5c0 100644 --- a/frontend/rust-lib/flowy-server/tests/supabase_test/database_test.rs +++ b/frontend/rust-lib/flowy-server/tests/supabase_test/database_test.rs @@ -1,3 +1,4 @@ +use collab::core::collab::DocStateSource; use collab_entity::{CollabObject, CollabType}; use uuid::Uuid; @@ -50,7 +51,12 @@ async fn supabase_create_database_test() { .unwrap(); assert_eq!(updates_by_oid.len(), 3); - for (_, update) in updates_by_oid { - assert_eq!(update.len(), 2); + for (_, source) in updates_by_oid { + match source { + DocStateSource::FromDisk => panic!("should not be from disk"), + DocStateSource::FromDocState(doc_state) => { + assert_eq!(doc_state.len(), 2); + }, + } } } diff --git a/frontend/rust-lib/flowy-server/tests/supabase_test/util.rs b/frontend/rust-lib/flowy-server/tests/supabase_test/util.rs index e413346d1b..466b728359 100644 --- a/frontend/rust-lib/flowy-server/tests/supabase_test/util.rs +++ b/frontend/rust-lib/flowy-server/tests/supabase_test/util.rs @@ -2,7 +2,7 @@ use flowy_storage::ObjectStorageService; use std::collections::HashMap; use std::sync::Arc; -use collab::core::collab::MutexCollab; +use collab::core::collab::{DocStateSource, MutexCollab}; use collab::core::origin::CollabOrigin; use collab_plugins::cloud_storage::RemoteCollabStorage; use uuid::Uuid; @@ -122,8 +122,14 @@ pub async fn print_encryption_folder_snapshot( .pop() .unwrap(); let collab = Arc::new( - MutexCollab::new_with_doc_state(CollabOrigin::Empty, folder_id, snapshot.blob, vec![], false) - .unwrap(), + MutexCollab::new_with_doc_state( + CollabOrigin::Empty, + folder_id, + DocStateSource::FromDocState(snapshot.blob), + vec![], + false, + ) + .unwrap(), ); let folder_data = Folder::open(uid, collab, None) .unwrap() diff --git a/frontend/rust-lib/flowy-user-pub/src/cloud.rs b/frontend/rust-lib/flowy-user-pub/src/cloud.rs index 1301cd3dec..928e1ce7f0 100644 --- a/frontend/rust-lib/flowy-user-pub/src/cloud.rs +++ b/frontend/rust-lib/flowy-user-pub/src/cloud.rs @@ -1,4 +1,3 @@ -use collab::core::collab::CollabDocState; use collab_entity::{CollabObject, CollabType}; use flowy_error::{internal_error, ErrorCode, FlowyError}; use lib_infra::box_any::BoxAny; @@ -213,7 +212,7 @@ pub trait UserCloudService: Send + Sync + 'static { FutureResult::new(async { Ok(vec![]) }) } - fn get_user_awareness_doc_state(&self, uid: i64) -> FutureResult; + fn get_user_awareness_doc_state(&self, uid: i64) -> FutureResult, FlowyError>; fn receive_realtime_event(&self, _json: Value) {} diff --git a/frontend/rust-lib/flowy-user/src/anon_user/migrate_anon_user_collab.rs b/frontend/rust-lib/flowy-user/src/anon_user/migrate_anon_user_collab.rs index 3d736b874d..4e5fc0cb81 100644 --- a/frontend/rust-lib/flowy-user/src/anon_user/migrate_anon_user_collab.rs +++ b/frontend/rust-lib/flowy-user/src/anon_user/migrate_anon_user_collab.rs @@ -3,7 +3,7 @@ use std::ops::{Deref, DerefMut}; use std::sync::Arc; use anyhow::anyhow; -use collab::core::collab::MutexCollab; +use collab::core::collab::{DocStateSource, MutexCollab}; use collab::core::origin::{CollabClient, CollabOrigin}; use collab::preclude::Collab; use collab_database::database::{ @@ -305,9 +305,14 @@ where } let origin = CollabOrigin::Client(CollabClient::new(new_uid, "phantom")); - let new_folder_collab = - Collab::new_with_doc_state(origin, new_workspace_id, vec![], vec![], false) - .map_err(|err| PersistenceError::Internal(err.into()))?; + let new_folder_collab = Collab::new_with_doc_state( + origin, + new_workspace_id, + DocStateSource::FromDisk, + vec![], + false, + ) + .map_err(|err| PersistenceError::Internal(err.into()))?; let mutex_collab = Arc::new(MutexCollab::from_collab(new_folder_collab)); let new_user_id = UserId::from(new_uid); info!("migrated folder: {:?}", folder_data); diff --git a/frontend/rust-lib/flowy-user/src/services/data_import/appflowy_data_import.rs b/frontend/rust-lib/flowy-user/src/services/data_import/appflowy_data_import.rs index 9c4d370367..35eda7c58a 100644 --- a/frontend/rust-lib/flowy-user/src/services/data_import/appflowy_data_import.rs +++ b/frontend/rust-lib/flowy-user/src/services/data_import/appflowy_data_import.rs @@ -5,7 +5,7 @@ use crate::services::entities::UserPaths; use crate::services::sqlite_sql::user_sql::select_user_profile; use crate::user_manager::run_collab_data_migration; use anyhow::anyhow; -use collab::core::collab::{CollabDocState, MutexCollab}; +use collab::core::collab::{DocStateSource, MutexCollab}; use collab::core::origin::CollabOrigin; use collab::core::transaction::DocTransactionExtension; use collab::preclude::updates::decoder::Decode; @@ -447,7 +447,7 @@ where } fn import_collab_object_with_doc_state<'a, W>( - doc_state: CollabDocState, + doc_state: Vec, new_uid: i64, new_object_id: &str, w_txn: &'a W, @@ -456,8 +456,13 @@ where W: CollabKVAction<'a>, PersistenceError: From, { - let collab = - Collab::new_with_doc_state(CollabOrigin::Empty, new_object_id, doc_state, vec![], false)?; + let collab = Collab::new_with_doc_state( + CollabOrigin::Empty, + new_object_id, + DocStateSource::FromDocState(doc_state), + vec![], + false, + )?; write_collab_object(&collab, new_uid, new_object_id, w_txn); Ok(()) } diff --git a/frontend/rust-lib/flowy-user/src/user_manager/manager_user_awareness.rs b/frontend/rust-lib/flowy-user/src/user_manager/manager_user_awareness.rs index f7fc49803e..3c1249304b 100644 --- a/frontend/rust-lib/flowy-user/src/user_manager/manager_user_awareness.rs +++ b/frontend/rust-lib/flowy-user/src/user_manager/manager_user_awareness.rs @@ -1,7 +1,7 @@ use std::sync::{Arc, Weak}; use anyhow::Context; -use collab::core::collab::{CollabDocState, MutexCollab}; +use collab::core::collab::{DocStateSource, MutexCollab}; use collab_entity::reminder::Reminder; use collab_entity::CollabType; use collab_integrate::collab_builder::CollabBuilderConfig; @@ -164,7 +164,7 @@ impl UserManager { &self, session: &Session, collab_db: Weak, - raw_data: CollabDocState, + doc_state: Vec, ) -> Result, FlowyError> { let collab_builder = self.collab_builder.upgrade().ok_or(FlowyError::new( ErrorCode::Internal, @@ -176,7 +176,7 @@ impl UserManager { session.user_id, &user_awareness_id.to_string(), CollabType::UserAwareness, - raw_data, + DocStateSource::FromDocState(doc_state), collab_db, CollabBuilderConfig::default().sync_enable(true), )